Repair possible message loss due to log set restarts.

Moves the restart logic down inside aos. Slight refactor
therein to encapsulate common Start/Restart logic.

Change-Id: I4f4e6c88fe6af7bc2d37710c35a3e96d7867e876
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index 0e1dece..b8d0c7b 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -272,12 +272,9 @@
   return true;
 }
 
-void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
-                          std::optional<UUID> log_start_uuid) {
-  CHECK(!log_namer_) << ": Already logging";
-  log_namer_ = std::move(log_namer);
-
+std::string Logger::WriteConfiguration(LogNamer* log_namer) {
   std::string config_sha256;
+
   if (separate_config_) {
     flatbuffers::FlatBufferBuilder fbb;
     flatbuffers::Offset<aos::Configuration> configuration_offset =
@@ -289,12 +286,24 @@
         fbb.Release());
     config_sha256 = Sha256(config_header.span());
     LOG(INFO) << "Config sha256 of " << config_sha256;
-    log_namer_->WriteConfiguration(&config_header, config_sha256);
+    log_namer->WriteConfiguration(&config_header, config_sha256);
   }
 
+  return config_sha256;
+}
+
+void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
+                          std::optional<UUID> log_start_uuid) {
+  CHECK(!log_namer_) << ": Already logging";
+
+  VLOG(1) << "Starting logger for " << FlatbufferToJson(node_);
+
+  auto config_sha256 = WriteConfiguration(log_namer.get());
+
+  log_namer_ = std::move(log_namer);
+
   log_event_uuid_ = UUID::Random();
   log_start_uuid_ = log_start_uuid;
-  VLOG(1) << "Starting logger for " << FlatbufferToJson(node_);
 
   // We want to do as much work as possible before the initial Fetch. Time
   // between that and actually starting to log opens up the possibility of
@@ -342,12 +351,12 @@
   const aos::monotonic_clock::time_point header_time =
       event_loop_->monotonic_now();
 
-  LOG(INFO) << "Logging node as " << FlatbufferToJson(node_)
-            << " start_time " << last_synchronized_time_ << ", took "
-            << chrono::duration<double>(fetch_time - beginning_time).count()
-            << " to fetch, "
-            << chrono::duration<double>(header_time - fetch_time).count()
-            << " to write headers, boot uuid " << event_loop_->boot_uuid();
+  VLOG(1) << "Logging node as " << FlatbufferToJson(node_)
+          << " start_time " << last_synchronized_time_ << ", took "
+          << chrono::duration<double>(fetch_time - beginning_time).count()
+          << " to fetch, "
+          << chrono::duration<double>(header_time - fetch_time).count()
+          << " to write headers, boot uuid " << event_loop_->boot_uuid();
 
   // Force logging up until the start of the log file now, so the messages at
   // the start are always ordered before the rest of the messages.
@@ -364,6 +373,113 @@
                         polling_period_);
 }
 
+std::unique_ptr<LogNamer> Logger::RestartLogging(std::unique_ptr<LogNamer> log_namer,
+                          std::optional<UUID> log_start_uuid) {
+  CHECK(log_namer_) << ": Unexpected restart while not logging";
+
+  VLOG(1) << "Restarting logger for " << FlatbufferToJson(node_);
+
+  // Force out every currently pending message, pointing all fetchers at the
+  // last (currently available) records.  Note that LogUntil() updates
+  // last_synchronized_time_ to the time value that it receives.
+  while(LogUntil(last_synchronized_time_ + polling_period_));
+
+  std::unique_ptr<LogNamer> old_log_namer = std::move(log_namer_);
+  log_namer_ = std::move(log_namer);
+
+  const aos::monotonic_clock::time_point beginning_time =
+      event_loop_->monotonic_now();
+
+  auto config_sha256 = WriteConfiguration(log_namer_.get());
+
+  log_event_uuid_ = UUID::Random();
+  log_start_uuid_ = log_start_uuid;
+
+  log_namer_->SetHeaderTemplate(MakeHeader(config_sha256));
+
+  // Note that WriteHeader updates last_synchronized_time_ to be the
+  // current time when it is called, which is then the "start time"
+  // of the new (restarted) log. This timestamp will be after
+  // the timestamp of the last message fetched on each channel.
+  WriteHeader();
+
+  const aos::monotonic_clock::time_point header_time =
+      event_loop_->monotonic_now();
+
+  // Write the transition record(s) for each channel ...
+  for (FetcherStruct &f : fetchers_) {
+
+    // Create writers from the new namer
+    NewDataWriter *next_writer = nullptr;
+    NewDataWriter *next_timestamp_writer = nullptr;
+    NewDataWriter *next_contents_writer = nullptr;
+
+    if (f.wants_writer) {
+      next_writer = log_namer_->MakeWriter(f.channel);
+    }
+    if (f.wants_timestamp_writer) {
+      next_timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
+    }
+    if (f.wants_contents_writer) {
+      next_contents_writer = log_namer_->MakeForwardedTimestampWriter(
+          f.channel, CHECK_NOTNULL(f.timestamp_node));
+    }
+
+    if (f.fetcher->context().data != nullptr) {
+
+      // Write the last message fetched as the first of the new log of this type.
+      // The timestamps on these will all be before the new start time.
+      WriteData(next_writer, f);
+      WriteTimestamps(next_timestamp_writer, f);
+      WriteContent(next_contents_writer, f);
+
+      // It is possible that a few more snuck in. Write them all out also, including
+      // any that should also be in the old log.
+      while (true) {
+          // Get the next message ...
+          const auto start = event_loop_->monotonic_now();
+          const bool got_new = f.fetcher->FetchNext();
+          const auto end = event_loop_->monotonic_now();
+          RecordFetchResult(start, end, got_new, &f);
+
+          if (got_new) {
+            if (f.fetcher->context().monotonic_event_time < last_synchronized_time_) {
+              WriteFetchedRecord(f);
+            }
+
+            WriteData(next_writer, f);
+            WriteTimestamps(next_timestamp_writer, f);
+            WriteContent(next_contents_writer, f);
+
+            if (f.fetcher->context().monotonic_event_time > last_synchronized_time_) {
+              break;
+            }
+          } else {
+            break;
+          }
+      }
+    }
+
+    // Switch fully over to the new writers.
+    f.writer = next_writer;
+    f.timestamp_writer = next_timestamp_writer;
+    f.contents_writer = next_contents_writer;
+    f.written = true;
+  }
+
+  const aos::monotonic_clock::time_point channel_time =
+      event_loop_->monotonic_now();
+
+  VLOG(1) << "Logging node as " << FlatbufferToJson(node_)
+          << " restart_time " << last_synchronized_time_ << ", took "
+          << chrono::duration<double>(header_time - beginning_time).count()
+          << " to prepare and write header, "
+          << chrono::duration<double>(channel_time - header_time).count()
+          << " to write initial channel messages, boot uuid " << event_loop_->boot_uuid();
+
+  return old_log_namer;
+}
+
 std::unique_ptr<LogNamer> Logger::StopLogging(
     aos::monotonic_clock::time_point end_time) {
   CHECK(log_namer_) << ": Not logging right now";
@@ -445,6 +561,7 @@
         monotonic_clock::min_time) {
       return false;
     }
+
     // There are no offsets to compute for ourself, so always succeed.
     log_namer_->SetStartTimes(node_index, event_loop_->boot_uuid(),
                               monotonic_start_time, realtime_start_time,
@@ -603,7 +720,148 @@
   }
 }
 
-void Logger::LogUntil(monotonic_clock::time_point t) {
+void Logger::WriteData(NewDataWriter *writer, const FetcherStruct &f) {
+  if (writer != nullptr) {
+    const UUID source_node_boot_uuid =
+        static_cast<int>(node_index_) != f.data_node_index
+            ? f.fetcher->context().source_boot_uuid
+            : event_loop_->boot_uuid();
+    // Write!
+    const auto start = event_loop_->monotonic_now();
+    flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
+                                       max_header_size_);
+    fbb.ForceDefaults(true);
+
+    fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+                                       f.channel_index, f.log_type));
+    const auto end = event_loop_->monotonic_now();
+    RecordCreateMessageTime(start, end, f);
+
+    max_header_size_ = std::max(max_header_size_,
+                                fbb.GetSize() - f.fetcher->context().size);
+    writer->QueueMessage(&fbb, source_node_boot_uuid, end);
+
+    VLOG(2) << "Wrote data as node "
+            << FlatbufferToJson(node_) << " for channel "
+            << configuration::CleanedChannelToString(f.fetcher->channel())
+            << " to " << writer->filename() << " data "
+            << FlatbufferToJson(
+                   flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+                       fbb.GetBufferPointer()));
+  }
+}
+
+void Logger::WriteTimestamps(NewDataWriter *timestamp_writer, const FetcherStruct &f) {
+  if (timestamp_writer != nullptr) {
+    // And now handle timestamps.
+    const auto start = event_loop_->monotonic_now();
+    flatbuffers::FlatBufferBuilder fbb;
+    fbb.ForceDefaults(true);
+
+    fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+                                       f.channel_index,
+                                       LogType::kLogDeliveryTimeOnly));
+    const auto end = event_loop_->monotonic_now();
+    RecordCreateMessageTime(start, end, f);
+
+    // Tell our writer that we know something about the remote boot.
+    timestamp_writer->UpdateRemote(
+        f.data_node_index, f.fetcher->context().source_boot_uuid,
+        f.fetcher->context().monotonic_remote_time,
+        f.fetcher->context().monotonic_event_time, f.reliable_forwarding);
+    timestamp_writer->QueueMessage(&fbb, event_loop_->boot_uuid(), end);
+
+    VLOG(2) << "Wrote timestamps as node "
+            << FlatbufferToJson(node_) << " for channel "
+            << configuration::CleanedChannelToString(f.fetcher->channel())
+            << " to " << timestamp_writer->filename() << " timestamp "
+            << FlatbufferToJson(
+                   flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+                       fbb.GetBufferPointer()));
+  }
+}
+
+void Logger::WriteContent(NewDataWriter *contents_writer, const FetcherStruct &f) {
+  if (contents_writer != nullptr) {
+    const auto start = event_loop_->monotonic_now();
+    // And now handle the special message contents channel.  Copy the
+    // message into a FlatBufferBuilder and save it to disk.
+    // TODO(austin): We can be more efficient here when we start to
+    // care...
+    flatbuffers::FlatBufferBuilder fbb;
+    fbb.ForceDefaults(true);
+
+    const RemoteMessage *msg =
+        flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
+
+    CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
+
+    logger::MessageHeader::Builder message_header_builder(fbb);
+
+    // TODO(austin): This needs to check the channel_index and confirm
+    // that it should be logged before squirreling away the timestamp to
+    // disk.  We don't want to log irrelevant timestamps.
+
+    // Note: this must match the same order as MessageBridgeServer and
+    // PackMessage.  We want identical headers to have identical
+    // on-the-wire formats to make comparing them easier.
+
+    // Translate from the channel index that the event loop uses to the
+    // channel index in the log file.
+    message_header_builder.add_channel_index(
+        event_loop_to_logged_channel_index_[msg->channel_index()]);
+
+    message_header_builder.add_queue_index(msg->queue_index());
+    message_header_builder.add_monotonic_sent_time(msg->monotonic_sent_time());
+    message_header_builder.add_realtime_sent_time(msg->realtime_sent_time());
+
+    message_header_builder.add_monotonic_remote_time(
+        msg->monotonic_remote_time());
+    message_header_builder.add_realtime_remote_time(
+        msg->realtime_remote_time());
+    message_header_builder.add_remote_queue_index(msg->remote_queue_index());
+
+    const aos::monotonic_clock::time_point monotonic_timestamp_time =
+        f.fetcher->context().monotonic_event_time;
+    message_header_builder.add_monotonic_timestamp_time(
+        monotonic_timestamp_time.time_since_epoch().count());
+
+    fbb.FinishSizePrefixed(message_header_builder.Finish());
+    const auto end = event_loop_->monotonic_now();
+    RecordCreateMessageTime(start, end, f);
+
+    // Timestamps tell us information about what happened too!
+    // Capture any reboots so UpdateRemote is properly recorded.
+    contents_writer->UpdateBoot(UUID::FromVector(msg->boot_uuid()));
+
+    // Start with recording info about the data flowing from our node to the
+    // remote.
+    const bool reliable =
+        f.channel_reliable_contents.size() != 0u
+            ? f.channel_reliable_contents[msg->channel_index()]
+            : f.reliable_contents;
+
+    contents_writer->UpdateRemote(node_index_, event_loop_->boot_uuid(),
+        monotonic_clock::time_point(
+            chrono::nanoseconds(msg->monotonic_remote_time())),
+        monotonic_clock::time_point(
+            chrono::nanoseconds(msg->monotonic_sent_time())),
+        reliable, monotonic_timestamp_time);
+
+    contents_writer->QueueMessage(
+        &fbb, UUID::FromVector(msg->boot_uuid()), end);
+  }
+}
+
+void Logger::WriteFetchedRecord(FetcherStruct &f) {
+  WriteData(f.writer, f);
+  WriteTimestamps(f.timestamp_writer, f);
+  WriteContent(f.contents_writer, f);
+}
+
+bool Logger::LogUntil(monotonic_clock::time_point t) {
+  bool has_pending_messages = false;
+
   // Grab the latest ServerStatistics message.  This will always have the
   // oppertunity to be >= to the current time, so it will always represent any
   // reboots which may have happened.
@@ -628,143 +886,18 @@
 
       // TODO(james): Write tests to exercise this logic.
       if (f.fetcher->context().monotonic_event_time >= t) {
+        has_pending_messages = true;
         break;
       }
-      if (f.writer != nullptr) {
-        const UUID source_node_boot_uuid =
-            static_cast<int>(node_index_) != f.data_node_index
-                ? f.fetcher->context().source_boot_uuid
-                : event_loop_->boot_uuid();
-        // Write!
-        const auto start = event_loop_->monotonic_now();
-        flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
-                                           max_header_size_);
-        fbb.ForceDefaults(true);
 
-        fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
-                                           f.channel_index, f.log_type));
-        const auto end = event_loop_->monotonic_now();
-        RecordCreateMessageTime(start, end, &f);
-
-        VLOG(2) << "Writing data as node "
-                << FlatbufferToJson(node_) << " for channel "
-                << configuration::CleanedChannelToString(f.fetcher->channel())
-                << " to " << f.writer->filename() << " data "
-                << FlatbufferToJson(
-                       flatbuffers::GetSizePrefixedRoot<MessageHeader>(
-                           fbb.GetBufferPointer()));
-
-        max_header_size_ = std::max(max_header_size_,
-                                    fbb.GetSize() - f.fetcher->context().size);
-        f.writer->QueueMessage(&fbb, source_node_boot_uuid, end);
-      }
-
-      if (f.timestamp_writer != nullptr) {
-        // And now handle timestamps.
-        const auto start = event_loop_->monotonic_now();
-        flatbuffers::FlatBufferBuilder fbb;
-        fbb.ForceDefaults(true);
-
-        fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
-                                           f.channel_index,
-                                           LogType::kLogDeliveryTimeOnly));
-        const auto end = event_loop_->monotonic_now();
-        RecordCreateMessageTime(start, end, &f);
-
-        VLOG(2) << "Writing timestamps as node "
-                << FlatbufferToJson(node_) << " for channel "
-                << configuration::CleanedChannelToString(f.fetcher->channel())
-                << " to " << f.timestamp_writer->filename() << " timestamp "
-                << FlatbufferToJson(
-                       flatbuffers::GetSizePrefixedRoot<MessageHeader>(
-                           fbb.GetBufferPointer()));
-
-        // Tell our writer that we know something about the remote boot.
-        f.timestamp_writer->UpdateRemote(
-            f.data_node_index, f.fetcher->context().source_boot_uuid,
-            f.fetcher->context().monotonic_remote_time,
-            f.fetcher->context().monotonic_event_time, f.reliable_forwarding);
-        f.timestamp_writer->QueueMessage(&fbb, event_loop_->boot_uuid(), end);
-      }
-
-      if (f.contents_writer != nullptr) {
-        const auto start = event_loop_->monotonic_now();
-        // And now handle the special message contents channel.  Copy the
-        // message into a FlatBufferBuilder and save it to disk.
-        // TODO(austin): We can be more efficient here when we start to
-        // care...
-        flatbuffers::FlatBufferBuilder fbb;
-        fbb.ForceDefaults(true);
-
-        const RemoteMessage *msg =
-            flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
-
-        CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
-
-        logger::MessageHeader::Builder message_header_builder(fbb);
-
-        // TODO(austin): This needs to check the channel_index and confirm
-        // that it should be logged before squirreling away the timestamp to
-        // disk.  We don't want to log irrelevant timestamps.
-
-        // Note: this must match the same order as MessageBridgeServer and
-        // PackMessage.  We want identical headers to have identical
-        // on-the-wire formats to make comparing them easier.
-
-        // Translate from the channel index that the event loop uses to the
-        // channel index in the log file.
-        message_header_builder.add_channel_index(
-            event_loop_to_logged_channel_index_[msg->channel_index()]);
-
-        message_header_builder.add_queue_index(msg->queue_index());
-        message_header_builder.add_monotonic_sent_time(
-            msg->monotonic_sent_time());
-        message_header_builder.add_realtime_sent_time(
-            msg->realtime_sent_time());
-
-        message_header_builder.add_monotonic_remote_time(
-            msg->monotonic_remote_time());
-        message_header_builder.add_realtime_remote_time(
-            msg->realtime_remote_time());
-        message_header_builder.add_remote_queue_index(
-            msg->remote_queue_index());
-
-        const aos::monotonic_clock::time_point monotonic_timestamp_time =
-            f.fetcher->context().monotonic_event_time;
-        message_header_builder.add_monotonic_timestamp_time(
-            monotonic_timestamp_time.time_since_epoch().count());
-
-        fbb.FinishSizePrefixed(message_header_builder.Finish());
-        const auto end = event_loop_->monotonic_now();
-        RecordCreateMessageTime(start, end, &f);
-
-        // Timestamps tell us information about what happened too!
-        // Capture any reboots so UpdateRemote is properly recorded.
-        f.contents_writer->UpdateBoot(UUID::FromVector(msg->boot_uuid()));
-
-        // Start with recording info about the data flowing from our node to the
-        // remote.
-        const bool reliable =
-            f.channel_reliable_contents.size() != 0u
-                ? f.channel_reliable_contents[msg->channel_index()]
-                : f.reliable_contents;
-
-        f.contents_writer->UpdateRemote(
-            node_index_, event_loop_->boot_uuid(),
-            monotonic_clock::time_point(
-                chrono::nanoseconds(msg->monotonic_remote_time())),
-            monotonic_clock::time_point(
-                chrono::nanoseconds(msg->monotonic_sent_time())),
-            reliable, monotonic_timestamp_time);
-
-        f.contents_writer->QueueMessage(
-            &fbb, UUID::FromVector(msg->boot_uuid()), end);
-      }
+      WriteFetchedRecord(f);
 
       f.written = true;
     }
   }
   last_synchronized_time_ = t;
+
+  return has_pending_messages;
 }
 
 void Logger::DoLogData(const monotonic_clock::time_point end_time,
@@ -808,15 +941,15 @@
 
 void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
                                      aos::monotonic_clock::time_point end,
-                                     FetcherStruct *fetcher) {
+                                     const FetcherStruct &fetcher) {
   const auto duration = end - start;
   total_copy_time_ += duration;
   ++total_copy_count_;
-  total_copy_bytes_ += fetcher->fetcher->context().size;
+  total_copy_bytes_ += fetcher.fetcher->context().size;
   if (duration > max_copy_time_) {
     max_copy_time_ = duration;
-    max_copy_time_channel_ = fetcher->channel_index;
-    max_copy_time_size_ = fetcher->fetcher->context().size;
+    max_copy_time_channel_ = fetcher.channel_index;
+    max_copy_time_size_ = fetcher.fetcher->context().size;
   }
 }
 
diff --git a/aos/events/logging/log_writer.h b/aos/events/logging/log_writer.h
index 6b04f5d..3c75f29 100644
--- a/aos/events/logging/log_writer.h
+++ b/aos/events/logging/log_writer.h
@@ -128,6 +128,11 @@
   void StartLogging(std::unique_ptr<LogNamer> log_namer,
                     std::optional<UUID> log_start_uuid = std::nullopt);
 
+  // Restart logging using a new naming scheme. Intended for log rotation.
+  // Returns a unique_ptr to the prior log_namer instance.
+  std::unique_ptr<LogNamer> RestartLogging(std::unique_ptr<LogNamer> log_namer,
+                    std::optional<UUID> log_start_uuid = std::nullopt);
+
   // Moves the current log location to the new name. Returns true if a change
   // was made, false otherwise.
   // Only renaming the folder is supported, not the file base name.
@@ -216,6 +221,9 @@
   // channel index.
   std::vector<int> event_loop_to_logged_channel_index_;
 
+  // Start/Restart write configuration into LogNamer space.
+  std::string WriteConfiguration(LogNamer* log_namer);
+
   void WriteHeader();
 
   // Makes a template header for all the follower nodes.
@@ -232,10 +240,18 @@
 
   void WriteMissingTimestamps();
 
+  void WriteData(NewDataWriter *writer, const FetcherStruct &f);
+  void WriteTimestamps(NewDataWriter *timestamps_writer, const FetcherStruct &f);
+  void WriteContent(NewDataWriter *contents_writer, const FetcherStruct &f);
+
+  void WriteFetchedRecord(FetcherStruct &f);
+
   // Fetches from each channel until all the data is logged.  This is dangerous
   // because it lets you log for more than 1 period.  All calls need to verify
   // that t isn't greater than 1 period in the future.
-  void LogUntil(monotonic_clock::time_point t);
+  // Returns true if there is at least one message that has been fetched but
+  // not yet written.
+  bool LogUntil(monotonic_clock::time_point t);
 
   void RecordFetchResult(aos::monotonic_clock::time_point start,
                          aos::monotonic_clock::time_point end, bool got_new,
@@ -243,7 +259,7 @@
 
   void RecordCreateMessageTime(aos::monotonic_clock::time_point start,
                                aos::monotonic_clock::time_point end,
-                               FetcherStruct *fetcher);
+                               const FetcherStruct &fetcher);
 
   EventLoop *const event_loop_;
   // The configuration to place at the top of the log file.