During rotation, delay writing new records.

Change-Id: Ic6231a18354953ae2863d3797789ad5c4095fdc0
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index b8d0c7b..8018273 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -95,7 +95,8 @@
         if (it != timestamp_logger_channels.end()) {
           CHECK(!is_split);
           CHECK_LT(channel_index, std::get<2>(it->second).size());
-          std::get<2>(it->second)[channel_index] = (connection->time_to_live() == 0);
+          std::get<2>(it->second)[channel_index] =
+              (connection->time_to_live() == 0);
         } else {
           if (is_split) {
             timestamp_logger_channels.insert(std::make_pair(
@@ -272,7 +273,7 @@
   return true;
 }
 
-std::string Logger::WriteConfiguration(LogNamer* log_namer) {
+std::string Logger::WriteConfiguration(LogNamer *log_namer) {
   std::string config_sha256;
 
   if (separate_config_) {
@@ -351,8 +352,8 @@
   const aos::monotonic_clock::time_point header_time =
       event_loop_->monotonic_now();
 
-  VLOG(1) << "Logging node as " << FlatbufferToJson(node_)
-          << " start_time " << last_synchronized_time_ << ", took "
+  VLOG(1) << "Logging node as " << FlatbufferToJson(node_) << " start_time "
+          << last_synchronized_time_ << ", took "
           << chrono::duration<double>(fetch_time - beginning_time).count()
           << " to fetch, "
           << chrono::duration<double>(header_time - fetch_time).count()
@@ -373,8 +374,8 @@
                         polling_period_);
 }
 
-std::unique_ptr<LogNamer> Logger::RestartLogging(std::unique_ptr<LogNamer> log_namer,
-                          std::optional<UUID> log_start_uuid) {
+std::unique_ptr<LogNamer> Logger::RestartLogging(
+    std::unique_ptr<LogNamer> log_namer, std::optional<UUID> log_start_uuid) {
   CHECK(log_namer_) << ": Unexpected restart while not logging";
 
   VLOG(1) << "Restarting logger for " << FlatbufferToJson(node_);
@@ -382,7 +383,8 @@
   // Force out every currently pending message, pointing all fetchers at the
   // last (currently available) records.  Note that LogUntil() updates
   // last_synchronized_time_ to the time value that it receives.
-  while(LogUntil(last_synchronized_time_ + polling_period_));
+  while (LogUntil(last_synchronized_time_ + polling_period_))
+    ;
 
   std::unique_ptr<LogNamer> old_log_namer = std::move(log_namer_);
   log_namer_ = std::move(log_namer);
@@ -408,7 +410,6 @@
 
   // Write the transition record(s) for each channel ...
   for (FetcherStruct &f : fetchers_) {
-
     // Create writers from the new namer
     NewDataWriter *next_writer = nullptr;
     NewDataWriter *next_timestamp_writer = nullptr;
@@ -426,37 +427,38 @@
     }
 
     if (f.fetcher->context().data != nullptr) {
-
-      // Write the last message fetched as the first of the new log of this type.
-      // The timestamps on these will all be before the new start time.
+      // Write the last message fetched as the first of the new log of this
+      // type. The timestamps on these will all be before the new start time.
       WriteData(next_writer, f);
       WriteTimestamps(next_timestamp_writer, f);
       WriteContent(next_contents_writer, f);
 
-      // It is possible that a few more snuck in. Write them all out also, including
-      // any that should also be in the old log.
+      // It is possible that a few more snuck in. Write them all out also,
+      // including any that should also be in the old log.
       while (true) {
-          // Get the next message ...
-          const auto start = event_loop_->monotonic_now();
-          const bool got_new = f.fetcher->FetchNext();
-          const auto end = event_loop_->monotonic_now();
-          RecordFetchResult(start, end, got_new, &f);
+        // Get the next message ...
+        const auto start = event_loop_->monotonic_now();
+        const bool got_new = f.fetcher->FetchNext();
+        const auto end = event_loop_->monotonic_now();
+        RecordFetchResult(start, end, got_new, &f);
 
-          if (got_new) {
-            if (f.fetcher->context().monotonic_event_time < last_synchronized_time_) {
-              WriteFetchedRecord(f);
-            }
-
+        if (got_new) {
+          if (f.fetcher->context().monotonic_event_time <=
+              last_synchronized_time_) {
+            WriteFetchedRecord(f);
             WriteData(next_writer, f);
             WriteTimestamps(next_timestamp_writer, f);
             WriteContent(next_contents_writer, f);
 
-            if (f.fetcher->context().monotonic_event_time > last_synchronized_time_) {
-              break;
-            }
           } else {
+            f.written = false;
             break;
           }
+
+        } else {
+          f.written = true;
+          break;
+        }
       }
     }
 
@@ -464,18 +466,18 @@
     f.writer = next_writer;
     f.timestamp_writer = next_timestamp_writer;
     f.contents_writer = next_contents_writer;
-    f.written = true;
   }
 
   const aos::monotonic_clock::time_point channel_time =
       event_loop_->monotonic_now();
 
-  VLOG(1) << "Logging node as " << FlatbufferToJson(node_)
-          << " restart_time " << last_synchronized_time_ << ", took "
+  VLOG(1) << "Logging node as " << FlatbufferToJson(node_) << " restart_time "
+          << last_synchronized_time_ << ", took "
           << chrono::duration<double>(header_time - beginning_time).count()
           << " to prepare and write header, "
           << chrono::duration<double>(channel_time - header_time).count()
-          << " to write initial channel messages, boot uuid " << event_loop_->boot_uuid();
+          << " to write initial channel messages, boot uuid "
+          << event_loop_->boot_uuid();
 
   return old_log_namer;
 }
@@ -732,26 +734,26 @@
                                        max_header_size_);
     fbb.ForceDefaults(true);
 
-    fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
-                                       f.channel_index, f.log_type));
+    fbb.FinishSizePrefixed(
+        PackMessage(&fbb, f.fetcher->context(), f.channel_index, f.log_type));
     const auto end = event_loop_->monotonic_now();
     RecordCreateMessageTime(start, end, f);
 
-    max_header_size_ = std::max(max_header_size_,
-                                fbb.GetSize() - f.fetcher->context().size);
+    max_header_size_ =
+        std::max(max_header_size_, fbb.GetSize() - f.fetcher->context().size);
     writer->QueueMessage(&fbb, source_node_boot_uuid, end);
 
-    VLOG(2) << "Wrote data as node "
-            << FlatbufferToJson(node_) << " for channel "
+    VLOG(2) << "Wrote data as node " << FlatbufferToJson(node_)
+            << " for channel "
             << configuration::CleanedChannelToString(f.fetcher->channel())
             << " to " << writer->filename() << " data "
-            << FlatbufferToJson(
-                   flatbuffers::GetSizePrefixedRoot<MessageHeader>(
-                       fbb.GetBufferPointer()));
+            << FlatbufferToJson(flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+                   fbb.GetBufferPointer()));
   }
 }
 
-void Logger::WriteTimestamps(NewDataWriter *timestamp_writer, const FetcherStruct &f) {
+void Logger::WriteTimestamps(NewDataWriter *timestamp_writer,
+                             const FetcherStruct &f) {
   if (timestamp_writer != nullptr) {
     // And now handle timestamps.
     const auto start = event_loop_->monotonic_now();
@@ -771,17 +773,17 @@
         f.fetcher->context().monotonic_event_time, f.reliable_forwarding);
     timestamp_writer->QueueMessage(&fbb, event_loop_->boot_uuid(), end);
 
-    VLOG(2) << "Wrote timestamps as node "
-            << FlatbufferToJson(node_) << " for channel "
+    VLOG(2) << "Wrote timestamps as node " << FlatbufferToJson(node_)
+            << " for channel "
             << configuration::CleanedChannelToString(f.fetcher->channel())
             << " to " << timestamp_writer->filename() << " timestamp "
-            << FlatbufferToJson(
-                   flatbuffers::GetSizePrefixedRoot<MessageHeader>(
-                       fbb.GetBufferPointer()));
+            << FlatbufferToJson(flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+                   fbb.GetBufferPointer()));
   }
 }
 
-void Logger::WriteContent(NewDataWriter *contents_writer, const FetcherStruct &f) {
+void Logger::WriteContent(NewDataWriter *contents_writer,
+                          const FetcherStruct &f) {
   if (contents_writer != nullptr) {
     const auto start = event_loop_->monotonic_now();
     // And now handle the special message contents channel.  Copy the
@@ -841,15 +843,16 @@
             ? f.channel_reliable_contents[msg->channel_index()]
             : f.reliable_contents;
 
-    contents_writer->UpdateRemote(node_index_, event_loop_->boot_uuid(),
+    contents_writer->UpdateRemote(
+        node_index_, event_loop_->boot_uuid(),
         monotonic_clock::time_point(
             chrono::nanoseconds(msg->monotonic_remote_time())),
         monotonic_clock::time_point(
             chrono::nanoseconds(msg->monotonic_sent_time())),
         reliable, monotonic_timestamp_time);
 
-    contents_writer->QueueMessage(
-        &fbb, UUID::FromVector(msg->boot_uuid()), end);
+    contents_writer->QueueMessage(&fbb, UUID::FromVector(msg->boot_uuid()),
+                                  end);
   }
 }