Add support for rotated log files

We want to be able to chunk log files.  This adds read support for
chunking.  Write support is only added to enable testing, and needs more
configuration to be useful.

Change-Id: I0c7bc9980438a934ac0f2692ef06caf2605bbc88
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 714796e..e55b80d 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -10,6 +10,7 @@
 
 #include "aos/configuration.h"
 #include "aos/events/logging/logger_generated.h"
+#include "aos/flatbuffer_merge.h"
 #include "flatbuffers/flatbuffers.h"
 #include "gflags/gflags.h"
 #include "glog/logging.h"
@@ -235,13 +236,45 @@
   return result;
 }
 
-SortedMessageReader::SortedMessageReader(std::string_view filename)
-    : message_reader_(filename) {
+SortedMessageReader::SortedMessageReader(
+    const std::vector<std::string> &filenames)
+    : filenames_(filenames),
+      log_file_header_(FlatbufferDetachedBuffer<LogFileHeader>::Empty()) {
+  CHECK(NextLogFile()) << ": filenames is empty.  Need files to read.";
+
+  log_file_header_ = CopyFlatBuffer(message_reader_->log_file_header());
+
   channels_.resize(configuration()->channels()->size());
 
   QueueMessages();
 }
 
+bool SortedMessageReader::NextLogFile() {
+  if (next_filename_index_ == filenames_.size()) {
+    return false;
+  }
+  message_reader_ =
+      std::make_unique<MessageReader>(filenames_[next_filename_index_]);
+
+  // We can't support the config diverging between two log file headers.  See if
+  // they are the same.
+  if (next_filename_index_ != 0) {
+    // Since we copied before, we need to copy again to guarantee that things
+    // didn't get re-ordered.
+    const FlatbufferDetachedBuffer<LogFileHeader> new_log_file_header =
+        CopyFlatBuffer(message_reader_->log_file_header());
+    CHECK_EQ(new_log_file_header.size(), log_file_header_.size());
+    CHECK(memcmp(new_log_file_header.data(), log_file_header_.data(),
+                 log_file_header_.size()) == 0)
+        << ": Header is different between log file chunks "
+        << filenames_[next_filename_index_] << " and "
+        << filenames_[next_filename_index_ - 1] << ", this is not supported.";
+  }
+
+  ++next_filename_index_;
+  return true;
+}
+
 void SortedMessageReader::EmplaceDataBack(
     FlatbufferVector<MessageHeader> &&new_data) {
   const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
@@ -289,17 +322,19 @@
     // Those messages might be very old. Make sure to read a chunk past the
     // starting time.
     if (channel_heap_.size() > 0 &&
-        message_reader_.newest_timestamp() >
+        message_reader_->newest_timestamp() >
             std::max(oldest_message().first, monotonic_start_time()) +
-                message_reader_.max_out_of_order_duration()) {
+                message_reader_->max_out_of_order_duration()) {
       break;
     }
 
     if (std::optional<FlatbufferVector<MessageHeader>> msg =
-            message_reader_.ReadMessage()) {
+            message_reader_->ReadMessage()) {
       EmplaceDataBack(std::move(msg.value()));
     } else {
-      break;
+      if (!NextLogFile()) {
+        break;
+      }
     }
   }
 }
@@ -328,7 +363,7 @@
     channel.oldest_timestamp = monotonic_clock::min_time;
   }
 
-  if (oldest_channel_data.first > message_reader_.queue_data_time()) {
+  if (oldest_channel_data.first > message_reader_->queue_data_time()) {
     QueueMessages();
   }
 
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 285fe05..6b8e9aa 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -5,6 +5,7 @@
 
 #include <deque>
 #include <optional>
+#include <string>
 #include <string_view>
 #include <vector>
 
@@ -198,11 +199,11 @@
 // sorted list of pointers to those.
 class SortedMessageReader {
  public:
-  SortedMessageReader(std::string_view filename);
+  SortedMessageReader(const std::vector<std::string> &filenames);
 
   // Returns the header from the log file.
   const LogFileHeader *log_file_header() const {
-    return message_reader_.log_file_header();
+    return &log_file_header_.message();
   }
 
   // Returns a pointer to the channel with the oldest message in it, and the
@@ -250,6 +251,9 @@
   PopOldestChannel();
 
  private:
+  // Moves to the next log file in the list.
+  bool NextLogFile();
+
   // Adds more messages to the sorted list.
   void QueueMessages();
 
@@ -279,7 +283,11 @@
     }
   };
 
-  MessageReader message_reader_;
+  std::vector<std::string> filenames_;
+  size_t next_filename_index_ = 0;
+
+  FlatbufferDetachedBuffer<LogFileHeader> log_file_header_;
+  std::unique_ptr<MessageReader> message_reader_;
 
   // TODO(austin): Multithreaded read at some point.  Gotta go faster!
   // Especially if we start compressing.
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index d5626a9..dd71e50 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -91,71 +91,82 @@
     // starts here.  It needs to be after everything is fetched so that the
     // fetchers are all pointed at the most recent message before the start
     // time.
-    const monotonic_clock::time_point monotonic_now =
-        event_loop_->monotonic_now();
-    const realtime_clock::time_point realtime_now = event_loop_->realtime_now();
-    last_synchronized_time_ = monotonic_now;
+    monotonic_start_time_ = event_loop_->monotonic_now();
+    realtime_start_time_ = event_loop_->realtime_now();
+    last_synchronized_time_ = monotonic_start_time_;
 
-    {
-      // Now write the header with this timestamp in it.
-      flatbuffers::FlatBufferBuilder fbb;
-      fbb.ForceDefaults(1);
+    LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node());
 
-      flatbuffers::Offset<aos::Configuration> configuration_offset =
-          CopyFlatBuffer(event_loop_->configuration(), &fbb);
-
-      flatbuffers::Offset<flatbuffers::String> string_offset =
-          fbb.CreateString(network::GetHostname());
-
-      flatbuffers::Offset<Node> node_offset;
-      if (event_loop_->node() != nullptr) {
-        node_offset = CopyFlatBuffer(event_loop_->node(), &fbb);
-      }
-      LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node());
-
-      aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
-
-      log_file_header_builder.add_name(string_offset);
-
-      // Only add the node if we are running in a multinode configuration.
-      if (event_loop_->node() != nullptr) {
-        log_file_header_builder.add_node(node_offset);
-      }
-
-      log_file_header_builder.add_configuration(configuration_offset);
-      // The worst case theoretical out of order is the polling period times 2.
-      // One message could get logged right after the boundary, but be for right
-      // before the next boundary.  And the reverse could happen for another
-      // message.  Report back 3x to be extra safe, and because the cost isn't
-      // huge on the read side.
-      log_file_header_builder.add_max_out_of_order_duration(
-          std::chrono::duration_cast<std::chrono::nanoseconds>(3 *
-                                                               polling_period)
-              .count());
-
-      log_file_header_builder.add_monotonic_start_time(
-          std::chrono::duration_cast<std::chrono::nanoseconds>(
-              monotonic_now.time_since_epoch())
-              .count());
-      log_file_header_builder.add_realtime_start_time(
-          std::chrono::duration_cast<std::chrono::nanoseconds>(
-              realtime_now.time_since_epoch())
-              .count());
-
-      fbb.FinishSizePrefixed(log_file_header_builder.Finish());
-      writer_->QueueSizedFlatbuffer(&fbb);
-    }
+    WriteHeader();
 
     timer_handler_->Setup(event_loop_->monotonic_now() + polling_period,
                           polling_period);
   });
 }
 
+void Logger::WriteHeader() {
+  // Now write the header with this timestamp in it.
+  flatbuffers::FlatBufferBuilder fbb;
+  fbb.ForceDefaults(1);
+
+  flatbuffers::Offset<aos::Configuration> configuration_offset =
+      CopyFlatBuffer(event_loop_->configuration(), &fbb);
+
+  flatbuffers::Offset<flatbuffers::String> string_offset =
+      fbb.CreateString(network::GetHostname());
+
+  flatbuffers::Offset<Node> node_offset;
+  if (event_loop_->node() != nullptr) {
+    node_offset = CopyFlatBuffer(event_loop_->node(), &fbb);
+  }
+
+  aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
+
+  log_file_header_builder.add_name(string_offset);
+
+  // Only add the node if we are running in a multinode configuration.
+  if (event_loop_->node() != nullptr) {
+    log_file_header_builder.add_node(node_offset);
+  }
+
+  log_file_header_builder.add_configuration(configuration_offset);
+  // The worst case theoretical out of order is the polling period times 2.
+  // One message could get logged right after the boundary, but be for right
+  // before the next boundary.  And the reverse could happen for another
+  // message.  Report back 3x to be extra safe, and because the cost isn't
+  // huge on the read side.
+  log_file_header_builder.add_max_out_of_order_duration(
+      std::chrono::duration_cast<std::chrono::nanoseconds>(3 * polling_period_)
+          .count());
+
+  log_file_header_builder.add_monotonic_start_time(
+      std::chrono::duration_cast<std::chrono::nanoseconds>(
+          monotonic_start_time_.time_since_epoch())
+          .count());
+  log_file_header_builder.add_realtime_start_time(
+      std::chrono::duration_cast<std::chrono::nanoseconds>(
+          realtime_start_time_.time_since_epoch())
+          .count());
+
+  fbb.FinishSizePrefixed(log_file_header_builder.Finish());
+  writer_->QueueSizedFlatbuffer(&fbb);
+}
+
+void Logger::Rotate(DetachedBufferWriter *writer) {
+  // Force data up until now to be written.
+  DoLogData();
+
+  // Swap the writer out, and re-write the header.
+  writer_ = writer;
+  WriteHeader();
+}
+
 void Logger::DoLogData() {
   // We want to guarentee that messages aren't out of order by more than
   // max_out_of_order_duration.  To do this, we need sync points.  Every write
   // cycle should be a sync point.
-  const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
+  const monotonic_clock::time_point monotonic_now =
+      event_loop_->monotonic_now();
 
   do {
     // Move the sync point up by at most polling_period.  This forces one sync
@@ -221,15 +232,18 @@
 
 LogReader::LogReader(std::string_view filename,
                      const Configuration *replay_configuration)
-    : sorted_message_reader_(filename),
+    : LogReader(std::vector<std::string>{std::string(filename)},
+                replay_configuration) {}
+
+LogReader::LogReader(const std::vector<std::string> &filenames,
+                     const Configuration *replay_configuration)
+    : sorted_message_reader_(filenames),
       replay_configuration_(replay_configuration) {
   channels_.resize(logged_configuration()->channels()->size());
   MakeRemappedConfig();
 }
 
-LogReader::~LogReader() {
-  Deregister();
-}
+LogReader::~LogReader() { Deregister(); }
 
 const Configuration *LogReader::logged_configuration() const {
   return sorted_message_reader_.configuration();
@@ -477,8 +491,8 @@
         &new_config_fbb));
   }
   // Create the Configuration containing the new channels that we want to add.
-  const auto
-      new_name_vector_offsets = new_config_fbb.CreateVector(channel_offsets);
+  const auto new_name_vector_offsets =
+      new_config_fbb.CreateVector(channel_offsets);
   ConfigurationBuilder new_config_builder(new_config_fbb);
   new_config_builder.add_channels(new_name_vector_offsets);
   new_config_fbb.Finish(new_config_builder.Finish());
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index be9ea36..337109b 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -25,7 +25,13 @@
          std::chrono::milliseconds polling_period =
              std::chrono::milliseconds(100));
 
+  // Rotates the log file with the new writer.  This writes out the header
+  // again, but keeps going as if nothing else happened.
+  void Rotate(DetachedBufferWriter *writer);
+
  private:
+  void WriteHeader();
+
   void DoLogData();
 
   EventLoop *event_loop_;
@@ -51,6 +57,9 @@
   // Last time that data was written for all channels to disk.
   monotonic_clock::time_point last_synchronized_time_;
 
+  monotonic_clock::time_point monotonic_start_time_;
+  realtime_clock::time_point realtime_start_time_;
+
   // Max size that the header has consumed.  This much extra data will be
   // reserved in the builder to avoid reallocating.
   size_t max_header_size_ = 0;
@@ -65,6 +74,8 @@
   // config did.
   LogReader(std::string_view filename,
             const Configuration *replay_configuration = nullptr);
+  LogReader(const std::vector<std::string> &filename,
+            const Configuration *replay_configuration = nullptr);
   ~LogReader();
 
   // Registers all the callbacks to send the log file data out on an event loop
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 4fff962..5323493 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -97,6 +97,71 @@
   EXPECT_EQ(ping_count, 2010);
 }
 
+// Tests that we can read and write rotated log files.
+TEST_F(LoggerTest, RotatedLogFile) {
+  const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+  const ::std::string logfile0 = tmpdir + "/logfile0.bfbs";
+  const ::std::string logfile1 = tmpdir + "/logfile1.bfbs";
+  // Remove it.
+  unlink(logfile0.c_str());
+  unlink(logfile1.c_str());
+
+  LOG(INFO) << "Logging data to " << logfile0 << " and " << logfile1;
+
+  {
+    DetachedBufferWriter writer0(logfile0);
+    DetachedBufferWriter writer1(logfile1);
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory_.MakeEventLoop("logger");
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    Logger logger(&writer0, logger_event_loop.get(),
+                  std::chrono::milliseconds(100));
+    event_loop_factory_.RunFor(chrono::milliseconds(10000));
+    logger.Rotate(&writer1);
+    event_loop_factory_.RunFor(chrono::milliseconds(10000));
+  }
+
+  // Even though it doesn't make any difference here, exercise the logic for
+  // passing in a separate config.
+  LogReader reader(std::vector<std::string>{logfile0, logfile1},
+                   &config_.message());
+
+  // Confirm that we can remap logged channels to point to new buses.
+  reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
+
+  // This sends out the fetched messages and advances time to the start of the
+  // log file.
+  reader.Register();
+
+  EXPECT_EQ(reader.node(), nullptr);
+
+  std::unique_ptr<EventLoop> test_event_loop =
+      reader.event_loop_factory()->MakeEventLoop("log_reader");
+
+  int ping_count = 10;
+  int pong_count = 10;
+
+  // Confirm that the ping value matches in the remapped channel location.
+  test_event_loop->MakeWatcher("/original/test",
+                               [&ping_count](const examples::Ping &ping) {
+                                 EXPECT_EQ(ping.value(), ping_count + 1);
+                                 ++ping_count;
+                               });
+  // Confirm that the ping and pong counts both match, and the value also
+  // matches.
+  test_event_loop->MakeWatcher(
+      "/test", [&pong_count, &ping_count](const examples::Pong &pong) {
+        EXPECT_EQ(pong.value(), pong_count + 1);
+        ++pong_count;
+        EXPECT_EQ(ping_count, pong_count);
+      });
+
+  reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
+  EXPECT_EQ(ping_count, 2010);
+}
+
 // Tests that a large number of messages per second doesn't overwhelm writev.
 TEST_F(LoggerTest, ManyMessages) {
   const ::std::string tmpdir(getenv("TEST_TMPDIR"));