During rotation, delay writing new records.
Change-Id: Ic6231a18354953ae2863d3797789ad5c4095fdc0
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index b8d0c7b..8018273 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -95,7 +95,8 @@
if (it != timestamp_logger_channels.end()) {
CHECK(!is_split);
CHECK_LT(channel_index, std::get<2>(it->second).size());
- std::get<2>(it->second)[channel_index] = (connection->time_to_live() == 0);
+ std::get<2>(it->second)[channel_index] =
+ (connection->time_to_live() == 0);
} else {
if (is_split) {
timestamp_logger_channels.insert(std::make_pair(
@@ -272,7 +273,7 @@
return true;
}
-std::string Logger::WriteConfiguration(LogNamer* log_namer) {
+std::string Logger::WriteConfiguration(LogNamer *log_namer) {
std::string config_sha256;
if (separate_config_) {
@@ -351,8 +352,8 @@
const aos::monotonic_clock::time_point header_time =
event_loop_->monotonic_now();
- VLOG(1) << "Logging node as " << FlatbufferToJson(node_)
- << " start_time " << last_synchronized_time_ << ", took "
+ VLOG(1) << "Logging node as " << FlatbufferToJson(node_) << " start_time "
+ << last_synchronized_time_ << ", took "
<< chrono::duration<double>(fetch_time - beginning_time).count()
<< " to fetch, "
<< chrono::duration<double>(header_time - fetch_time).count()
@@ -373,8 +374,8 @@
polling_period_);
}
-std::unique_ptr<LogNamer> Logger::RestartLogging(std::unique_ptr<LogNamer> log_namer,
- std::optional<UUID> log_start_uuid) {
+std::unique_ptr<LogNamer> Logger::RestartLogging(
+ std::unique_ptr<LogNamer> log_namer, std::optional<UUID> log_start_uuid) {
CHECK(log_namer_) << ": Unexpected restart while not logging";
VLOG(1) << "Restarting logger for " << FlatbufferToJson(node_);
@@ -382,7 +383,8 @@
// Force out every currently pending message, pointing all fetchers at the
// last (currently available) records. Note that LogUntil() updates
// last_synchronized_time_ to the time value that it receives.
- while(LogUntil(last_synchronized_time_ + polling_period_));
+ while (LogUntil(last_synchronized_time_ + polling_period_))
+ ;
std::unique_ptr<LogNamer> old_log_namer = std::move(log_namer_);
log_namer_ = std::move(log_namer);
@@ -408,7 +410,6 @@
// Write the transition record(s) for each channel ...
for (FetcherStruct &f : fetchers_) {
-
// Create writers from the new namer
NewDataWriter *next_writer = nullptr;
NewDataWriter *next_timestamp_writer = nullptr;
@@ -426,37 +427,38 @@
}
if (f.fetcher->context().data != nullptr) {
-
- // Write the last message fetched as the first of the new log of this type.
- // The timestamps on these will all be before the new start time.
+ // Write the last message fetched as the first of the new log of this
+ // type. The timestamps on these will all be before the new start time.
WriteData(next_writer, f);
WriteTimestamps(next_timestamp_writer, f);
WriteContent(next_contents_writer, f);
- // It is possible that a few more snuck in. Write them all out also, including
- // any that should also be in the old log.
+ // It is possible that a few more snuck in. Write them all out also,
+ // including any that should also be in the old log.
while (true) {
- // Get the next message ...
- const auto start = event_loop_->monotonic_now();
- const bool got_new = f.fetcher->FetchNext();
- const auto end = event_loop_->monotonic_now();
- RecordFetchResult(start, end, got_new, &f);
+ // Get the next message ...
+ const auto start = event_loop_->monotonic_now();
+ const bool got_new = f.fetcher->FetchNext();
+ const auto end = event_loop_->monotonic_now();
+ RecordFetchResult(start, end, got_new, &f);
- if (got_new) {
- if (f.fetcher->context().monotonic_event_time < last_synchronized_time_) {
- WriteFetchedRecord(f);
- }
-
+ if (got_new) {
+ if (f.fetcher->context().monotonic_event_time <=
+ last_synchronized_time_) {
+ WriteFetchedRecord(f);
WriteData(next_writer, f);
WriteTimestamps(next_timestamp_writer, f);
WriteContent(next_contents_writer, f);
- if (f.fetcher->context().monotonic_event_time > last_synchronized_time_) {
- break;
- }
} else {
+ f.written = false;
break;
}
+
+ } else {
+ f.written = true;
+ break;
+ }
}
}
@@ -464,18 +466,18 @@
f.writer = next_writer;
f.timestamp_writer = next_timestamp_writer;
f.contents_writer = next_contents_writer;
- f.written = true;
}
const aos::monotonic_clock::time_point channel_time =
event_loop_->monotonic_now();
- VLOG(1) << "Logging node as " << FlatbufferToJson(node_)
- << " restart_time " << last_synchronized_time_ << ", took "
+ VLOG(1) << "Logging node as " << FlatbufferToJson(node_) << " restart_time "
+ << last_synchronized_time_ << ", took "
<< chrono::duration<double>(header_time - beginning_time).count()
<< " to prepare and write header, "
<< chrono::duration<double>(channel_time - header_time).count()
- << " to write initial channel messages, boot uuid " << event_loop_->boot_uuid();
+ << " to write initial channel messages, boot uuid "
+ << event_loop_->boot_uuid();
return old_log_namer;
}
@@ -732,26 +734,26 @@
max_header_size_);
fbb.ForceDefaults(true);
- fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
- f.channel_index, f.log_type));
+ fbb.FinishSizePrefixed(
+ PackMessage(&fbb, f.fetcher->context(), f.channel_index, f.log_type));
const auto end = event_loop_->monotonic_now();
RecordCreateMessageTime(start, end, f);
- max_header_size_ = std::max(max_header_size_,
- fbb.GetSize() - f.fetcher->context().size);
+ max_header_size_ =
+ std::max(max_header_size_, fbb.GetSize() - f.fetcher->context().size);
writer->QueueMessage(&fbb, source_node_boot_uuid, end);
- VLOG(2) << "Wrote data as node "
- << FlatbufferToJson(node_) << " for channel "
+ VLOG(2) << "Wrote data as node " << FlatbufferToJson(node_)
+ << " for channel "
<< configuration::CleanedChannelToString(f.fetcher->channel())
<< " to " << writer->filename() << " data "
- << FlatbufferToJson(
- flatbuffers::GetSizePrefixedRoot<MessageHeader>(
- fbb.GetBufferPointer()));
+ << FlatbufferToJson(flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+ fbb.GetBufferPointer()));
}
}
-void Logger::WriteTimestamps(NewDataWriter *timestamp_writer, const FetcherStruct &f) {
+void Logger::WriteTimestamps(NewDataWriter *timestamp_writer,
+ const FetcherStruct &f) {
if (timestamp_writer != nullptr) {
// And now handle timestamps.
const auto start = event_loop_->monotonic_now();
@@ -771,17 +773,17 @@
f.fetcher->context().monotonic_event_time, f.reliable_forwarding);
timestamp_writer->QueueMessage(&fbb, event_loop_->boot_uuid(), end);
- VLOG(2) << "Wrote timestamps as node "
- << FlatbufferToJson(node_) << " for channel "
+ VLOG(2) << "Wrote timestamps as node " << FlatbufferToJson(node_)
+ << " for channel "
<< configuration::CleanedChannelToString(f.fetcher->channel())
<< " to " << timestamp_writer->filename() << " timestamp "
- << FlatbufferToJson(
- flatbuffers::GetSizePrefixedRoot<MessageHeader>(
- fbb.GetBufferPointer()));
+ << FlatbufferToJson(flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+ fbb.GetBufferPointer()));
}
}
-void Logger::WriteContent(NewDataWriter *contents_writer, const FetcherStruct &f) {
+void Logger::WriteContent(NewDataWriter *contents_writer,
+ const FetcherStruct &f) {
if (contents_writer != nullptr) {
const auto start = event_loop_->monotonic_now();
// And now handle the special message contents channel. Copy the
@@ -841,15 +843,16 @@
? f.channel_reliable_contents[msg->channel_index()]
: f.reliable_contents;
- contents_writer->UpdateRemote(node_index_, event_loop_->boot_uuid(),
+ contents_writer->UpdateRemote(
+ node_index_, event_loop_->boot_uuid(),
monotonic_clock::time_point(
chrono::nanoseconds(msg->monotonic_remote_time())),
monotonic_clock::time_point(
chrono::nanoseconds(msg->monotonic_sent_time())),
reliable, monotonic_timestamp_time);
- contents_writer->QueueMessage(
- &fbb, UUID::FromVector(msg->boot_uuid()), end);
+ contents_writer->QueueMessage(&fbb, UUID::FromVector(msg->boot_uuid()),
+ end);
}
}