AOS was not writing timestamp data header

AOS wasn't writing out the header to the timestamp data log part. Which
means the log part is corrupted and can't be read. Let's fix the bug
that's causing that, and put some checks in so if a log doesn't have a
header if throws a more helpful error than a segfault.

This was only happening when trying to relog a logfile with a
pre-existing config.

Change-Id: I373821f4cfbead90daddd820d3c61693e8886eda
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index a26eb49..9741f70 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -171,6 +171,8 @@
       if (message.span() == absl::Span<const uint8_t>()) {
         break;
       }
+      CHECK(message.Verify());
+
       const auto *const channels = full_header->configuration()->channels();
       const size_t channel_index = message.message().channel_index();
       CHECK_LT(channel_index, channels->size());
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index a97ba5e..a880f35 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -388,10 +388,17 @@
   LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
   return nullptr;
 }
-
 MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
                                      EventLoop *event_loop)
-    : LogNamer(event_loop), base_name_(base_name), old_base_name_() {}
+    : MultiNodeLogNamer(base_name, event_loop->configuration(), event_loop,
+                        event_loop->node()) {}
+
+MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
+                                     const Configuration *configuration,
+                                     EventLoop *event_loop, const Node *node)
+    : LogNamer(configuration, event_loop, node),
+      base_name_(base_name),
+      old_base_name_() {}
 
 MultiNodeLogNamer::~MultiNodeLogNamer() {
   if (!ran_out_of_space_) {
@@ -432,7 +439,8 @@
   writer->QueueSizedFlatbuffer(header->Release());
 
   if (!writer->ran_out_of_space()) {
-    all_filenames_.emplace_back(filename);
+    all_filenames_.emplace_back(
+        absl::StrCat(config_sha256, ".bfbs", extension_));
   }
   CloseWriter(&writer);
 }
@@ -499,7 +507,7 @@
     nodes_.emplace_back(node);
   }
 
-  NewDataWriter data_writer(this, node,
+  NewDataWriter data_writer(this, configuration::GetNode(configuration_, node),
                             [this, channel](NewDataWriter *data_writer) {
                               OpenForwardedTimestampWriter(channel,
                                                            data_writer);
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index 8a44c97..c2c8dd1 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -114,10 +114,11 @@
  public:
   // Constructs a LogNamer with the primary node (ie the one the logger runs on)
   // being node.
-  LogNamer(EventLoop *event_loop)
+  LogNamer(const aos::Configuration *configuration, EventLoop *event_loop,
+           const aos::Node *node)
       : event_loop_(event_loop),
-        configuration_(event_loop_->configuration()),
-        node_(event_loop_->node()),
+        configuration_(configuration),
+        node_(node),
         logger_node_index_(configuration::GetNodeIndex(configuration_, node_)) {
     nodes_.emplace_back(node_);
     node_states_.resize(configuration::NodesCount(configuration_));
@@ -241,10 +242,11 @@
 // any other log type.
 class LocalLogNamer : public LogNamer {
  public:
-  LocalLogNamer(std::string_view base_name, aos::EventLoop *event_loop)
-      : LogNamer(event_loop),
+  LocalLogNamer(std::string_view base_name, aos::EventLoop *event_loop,
+                const aos::Node *node)
+      : LogNamer(event_loop->configuration(), event_loop, node),
         base_name_(base_name),
-        data_writer_(this, node(),
+        data_writer_(this, node,
                      [this](NewDataWriter *writer) {
                        writer->writer = std::make_unique<DetachedBufferWriter>(
                            absl::StrCat(base_name_, ".part",
@@ -289,6 +291,9 @@
 class MultiNodeLogNamer : public LogNamer {
  public:
   MultiNodeLogNamer(std::string_view base_name, EventLoop *event_loop);
+  MultiNodeLogNamer(std::string_view base_name,
+                    const Configuration *configuration, EventLoop *event_loop,
+                    const Node *node);
   ~MultiNodeLogNamer() override;
 
   std::string_view base_name() const final { return base_name_; }
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index bc04f83..fffc886 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -23,6 +23,8 @@
                std::function<bool(const Channel *)> should_log)
     : event_loop_(event_loop),
       configuration_(configuration),
+      node_(configuration::GetNode(configuration_, event_loop->node())),
+      node_index_(configuration::GetNodeIndex(configuration_, node_)),
       name_(network::GetHostname()),
       timer_handler_(event_loop_->AddTimer(
           [this]() { DoLogData(event_loop_->monotonic_now(), true); })),
@@ -31,7 +33,7 @@
               ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
                     "/aos")
               : aos::Fetcher<message_bridge::ServerStatistics>()) {
-  VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
+  VLOG(1) << "Creating logger for " << FlatbufferToJson(node_);
 
   std::map<const Channel *, const Node *> timestamp_logger_channels;
 
@@ -47,7 +49,7 @@
       if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
               connection, event_loop_->node())) {
         const Node *other_node = configuration::GetNode(
-            event_loop_->configuration(), connection->name()->string_view());
+            configuration_, connection->name()->string_view());
 
         VLOG(1) << "Timestamps are logged from "
                 << FlatbufferToJson(other_node);
@@ -57,9 +59,6 @@
     }
   }
 
-  const size_t our_node_index =
-      configuration::GetNodeIndex(configuration_, event_loop_->node());
-
   for (size_t channel_index = 0;
        channel_index < configuration_->channels()->size(); ++channel_index) {
     const Channel *const config_channel =
@@ -73,7 +72,7 @@
     CHECK(channel != nullptr)
         << ": Failed to look up channel "
         << aos::configuration::CleanedChannelToString(config_channel);
-    if (!should_log(channel)) {
+    if (!should_log(config_channel)) {
       continue;
     }
 
@@ -82,25 +81,25 @@
     fs.channel = channel;
 
     const bool is_local =
-        configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
+        configuration::ChannelIsSendableOnNode(config_channel, node_);
 
     const bool is_readable =
-        configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
+        configuration::ChannelIsReadableOnNode(config_channel, node_);
     const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
-        channel, event_loop_->node());
+        config_channel, node_);
     const bool log_message = is_logged && is_readable;
 
     bool log_delivery_times = false;
-    if (event_loop_->node() != nullptr) {
+    if (configuration::MultiNode(configuration_)) {
       const aos::Connection *connection =
-          configuration::ConnectionToNode(channel, event_loop_->node());
+          configuration::ConnectionToNode(config_channel, node_);
 
       log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
           connection, event_loop_->node());
 
       CHECK_EQ(log_delivery_times,
                configuration::ConnectionDeliveryTimeIsLoggedOnNode(
-                   channel, event_loop_->node(), event_loop_->node()));
+                   config_channel, node_, node_));
 
       if (connection) {
         fs.reliable_forwarding = (connection->time_to_live() == 0);
@@ -120,7 +119,7 @@
       if (log_delivery_times) {
         VLOG(1) << "  Delivery times";
         fs.wants_timestamp_writer = true;
-        fs.timestamp_node_index = our_node_index;
+        fs.timestamp_node_index = static_cast<int>(node_index_);
       }
       // Both the timestamp and data writers want data_node_index so it knows
       // what the source node is.
@@ -138,7 +137,7 @@
         if (!is_local) {
           fs.log_type = LogType::kLogRemoteMessage;
         } else {
-          fs.data_node_index = our_node_index;
+          fs.data_node_index = static_cast<int>(node_index_);
         }
       }
       if (log_contents) {
@@ -167,8 +166,7 @@
 
     const Channel *logged_channel = aos::configuration::GetChannel(
         configuration_, event_loop_channel->name()->string_view(),
-        event_loop_channel->type()->string_view(), "",
-        configuration::GetNode(configuration_, event_loop_->node()));
+        event_loop_channel->type()->string_view(), "", node_);
 
     if (logged_channel != nullptr) {
       event_loop_to_logged_channel_index_[event_loop_channel_index] =
@@ -252,7 +250,7 @@
 
   log_event_uuid_ = UUID::Random();
   log_start_uuid_ = log_start_uuid;
-  VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
+  VLOG(1) << "Starting logger for " << FlatbufferToJson(node_);
 
   // We want to do as much work as possible before the initial Fetch. Time
   // between that and actually starting to log opens up the possibility of
@@ -302,7 +300,7 @@
   const aos::monotonic_clock::time_point header_time =
       event_loop_->monotonic_now();
 
-  LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
+  LOG(INFO) << "Logging node as " << FlatbufferToJson(node_)
             << " start_time " << last_synchronized_time_ << ", took "
             << chrono::duration<double>(fetch_time - beginning_time).count()
             << " to fetch, "
@@ -404,7 +402,7 @@
       monotonic_clock::min_time) {
     return false;
   }
-  if (event_loop_->node() == node ||
+  if (node_ == node ||
       !configuration::MultiNode(configuration_)) {
     // There are no offsets to compute for ourself, so always succeed.
     log_namer_->SetStartTimes(node_index, monotonic_start_time,
@@ -484,7 +482,7 @@
   flatbuffers::Offset<Node> logger_node_offset;
 
   if (configuration::MultiNode(configuration_)) {
-    logger_node_offset = RecursiveCopyFlatBuffer(event_loop_->node(), &fbb);
+    logger_node_offset = RecursiveCopyFlatBuffer(node_, &fbb);
   }
 
   aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
@@ -557,9 +555,6 @@
   // reboots which may have happened.
   WriteMissingTimestamps();
 
-  int our_node_index = aos::configuration::GetNodeIndex(
-      event_loop_->configuration(), event_loop_->node());
-
   // Write each channel to disk, one at a time.
   for (FetcherStruct &f : fetchers_) {
     while (true) {
@@ -583,7 +578,7 @@
       }
       if (f.writer != nullptr) {
         const UUID source_node_boot_uuid =
-            our_node_index != f.data_node_index
+            static_cast<int>(node_index_) != f.data_node_index
                 ? f.fetcher->context().source_boot_uuid
                 : event_loop_->boot_uuid();
         // Write!
@@ -598,7 +593,7 @@
         RecordCreateMessageTime(start, end, &f);
 
         VLOG(2) << "Writing data as node "
-                << FlatbufferToJson(event_loop_->node()) << " for channel "
+                << FlatbufferToJson(node_) << " for channel "
                 << configuration::CleanedChannelToString(f.fetcher->channel())
                 << " to " << f.writer->filename() << " data "
                 << FlatbufferToJson(
@@ -623,7 +618,7 @@
         RecordCreateMessageTime(start, end, &f);
 
         VLOG(2) << "Writing timestamps as node "
-                << FlatbufferToJson(event_loop_->node()) << " for channel "
+                << FlatbufferToJson(node_) << " for channel "
                 << configuration::CleanedChannelToString(f.fetcher->channel())
                 << " to " << f.timestamp_writer->filename() << " timestamp "
                 << FlatbufferToJson(
diff --git a/aos/events/logging/log_writer.h b/aos/events/logging/log_writer.h
index 9f55edd..5c8b0c7 100644
--- a/aos/events/logging/log_writer.h
+++ b/aos/events/logging/log_writer.h
@@ -149,7 +149,8 @@
   // starts.
   void StartLoggingLocalNamerOnRun(std::string base_name) {
     event_loop_->OnRun([this, base_name]() {
-      StartLogging(std::make_unique<LocalLogNamer>(base_name, event_loop_));
+      StartLogging(
+          std::make_unique<LocalLogNamer>(base_name, event_loop_, node_));
     });
   }
 
@@ -157,7 +158,8 @@
   // processing starts.
   void StartLoggingOnRun(std::string base_name) {
     event_loop_->OnRun([this, base_name]() {
-      StartLogging(std::make_unique<MultiNodeLogNamer>(base_name, event_loop_));
+      StartLogging(std::make_unique<MultiNodeLogNamer>(
+          base_name, configuration_, event_loop_, node_));
     });
   }
 
@@ -236,6 +238,19 @@
   // The configuration to place at the top of the log file.
   const Configuration *const configuration_;
 
+  // The node that is writing the log.
+  // For most cases, this is the same node as the node that is reading the
+  // messages. However, in some cases, these two nodes may be different. i.e. if
+  // one node reading and modifying the messages, and another node is listening
+  // and saving those messages to another log.
+  //
+  // node_ is a pointer to the writing node, and that node is guaranteed to be
+  // in configuration_ which is the configuration being written to the top of
+  // the log file.
+  const Node *const node_;
+  // The node_index_ is the index of the node in configuration_.
+  const size_t node_index_;
+
   UUID log_event_uuid_ = UUID::Zero();
   const UUID logger_instance_uuid_ = UUID::Random();
   std::unique_ptr<LogNamer> log_namer_;
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 4c19867..20e38d2 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -486,6 +486,8 @@
 
   raw_log_file_header_ = std::move(*raw_log_file_header);
 
+  CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
+
   max_out_of_order_duration_ =
       FLAGS_max_out_of_order > 0
           ? chrono::duration_cast<chrono::nanoseconds>(
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index c974511..d6ad8fe 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -170,10 +170,11 @@
     logger.set_polling_period(std::chrono::milliseconds(100));
     logger_event_loop->OnRun(
         [base_name1, base_name2, &logger_event_loop, &logger]() {
-          logger.StartLogging(
-              std::make_unique<LocalLogNamer>(base_name1, logger_event_loop.get()));
+          logger.StartLogging(std::make_unique<LocalLogNamer>(
+              base_name1, logger_event_loop.get(), logger_event_loop->node()));
           EXPECT_DEATH(logger.StartLogging(std::make_unique<LocalLogNamer>(
-                           base_name2, logger_event_loop.get())),
+                           base_name2, logger_event_loop.get(),
+                           logger_event_loop->node())),
                        "Already logging");
         });
     event_loop_factory_.RunFor(chrono::milliseconds(20000));
@@ -203,8 +204,8 @@
     logger.set_separate_config(false);
     logger.set_polling_period(std::chrono::milliseconds(100));
     logger_event_loop->OnRun([base_name, &logger_event_loop, &logger]() {
-      logger.StartLogging(
-          std::make_unique<LocalLogNamer>(base_name, logger_event_loop.get()));
+      logger.StartLogging(std::make_unique<LocalLogNamer>(
+          base_name, logger_event_loop.get(), logger_event_loop->node()));
       logger.StopLogging(aos::monotonic_clock::min_time);
       EXPECT_DEATH(logger.StopLogging(aos::monotonic_clock::min_time),
                    "Not logging right now");
@@ -240,13 +241,13 @@
     Logger logger(logger_event_loop.get());
     logger.set_separate_config(false);
     logger.set_polling_period(std::chrono::milliseconds(100));
-    logger.StartLogging(
-        std::make_unique<LocalLogNamer>(base_name1, logger_event_loop.get()));
+    logger.StartLogging(std::make_unique<LocalLogNamer>(
+        base_name1, logger_event_loop.get(), logger_event_loop->node()));
     event_loop_factory_.RunFor(chrono::milliseconds(10000));
     logger.StopLogging(logger_event_loop->monotonic_now());
     event_loop_factory_.RunFor(chrono::milliseconds(10000));
-    logger.StartLogging(
-        std::make_unique<LocalLogNamer>(base_name2, logger_event_loop.get()));
+    logger.StartLogging(std::make_unique<LocalLogNamer>(
+        base_name2, logger_event_loop.get(), logger_event_loop->node()));
     event_loop_factory_.RunFor(chrono::milliseconds(10000));
   }
 
@@ -657,14 +658,27 @@
   struct LoggerState {
     std::unique_ptr<EventLoop> event_loop;
     std::unique_ptr<Logger> logger;
+    const Configuration *configuration;
+    const Node *node;
+    MultiNodeLogNamer *log_namer;
   };
 
   LoggerState MakeLogger(const Node *node,
-                         SimulatedEventLoopFactory *factory = nullptr) {
+                         SimulatedEventLoopFactory *factory = nullptr,
+                         const Configuration *configuration = nullptr) {
     if (factory == nullptr) {
       factory = &event_loop_factory_;
     }
-    return {factory->MakeEventLoop("logger", node), {}};
+    if (configuration == nullptr) {
+      configuration = factory->configuration();
+    }
+    return {
+        factory->MakeEventLoop(
+            "logger", configuration::GetNode(factory->configuration(), node)),
+        {},
+        configuration,
+        node,
+        nullptr};
   }
 
   void StartLogger(LoggerState *logger, std::string logfile_base = "",
@@ -677,14 +691,16 @@
       }
     }
 
-    logger->logger = std::make_unique<Logger>(logger->event_loop.get());
+    logger->logger = std::make_unique<Logger>(logger->event_loop.get(),
+                                              logger->configuration);
     logger->logger->set_polling_period(std::chrono::milliseconds(100));
     logger->logger->set_name(absl::StrCat(
         "name_prefix_", logger->event_loop->node()->name()->str()));
     logger->event_loop->OnRun([logger, logfile_base, compress]() {
       std::unique_ptr<MultiNodeLogNamer> namer =
-          std::make_unique<MultiNodeLogNamer>(logfile_base,
-                                              logger->event_loop.get());
+          std::make_unique<MultiNodeLogNamer>(
+              logfile_base, logger->configuration, logger->event_loop.get(),
+              logger->node);
       if (compress) {
 #ifdef LZMA
         namer->set_extension(".xz");
@@ -694,6 +710,7 @@
         LOG(FATAL) << "Compression unsupported";
 #endif
       }
+      logger->log_namer = namer.get();
 
       logger->logger->StartLogging(std::move(namer));
     });
@@ -2485,7 +2502,86 @@
                       Param{"multinode_pingpong_split_config.json", false,
                             kSplitConfigSha1}));
 
-// TODO(austin): Make a log file where the remote node has no start time.
+// Tests that we can relog with a different config.  This makes most sense when
+// you are trying to edit a log and want to use channel renaming + the original
+// config in the new log.
+TEST_P(MultinodeLoggerTest, LogDifferentConfig) {
+  time_converter_.StartEqual();
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+  }
+
+  LogReader reader(SortParts(logfiles_));
+  reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
+
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+  log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+  // This sends out the fetched messages and advances time to the start of the
+  // log file.
+  reader.Register(&log_reader_factory);
+
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+  const Node *pi2 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+  LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
+  LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
+  LOG(INFO) << "now pi1 "
+            << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
+  LOG(INFO) << "now pi2 "
+            << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
+
+  EXPECT_THAT(reader.LoggedNodes(),
+              ::testing::ElementsAre(
+                  configuration::GetNode(reader.logged_configuration(), pi1),
+                  configuration::GetNode(reader.logged_configuration(), pi2)));
+
+  reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
+
+  // And confirm we can re-create a log again, while checking the contents.
+  std::vector<std::string> log_files;
+  {
+    LoggerState pi1_logger =
+        MakeLogger(configuration::GetNode(reader.logged_configuration(), pi1_),
+                   &log_reader_factory, reader.logged_configuration());
+    LoggerState pi2_logger =
+        MakeLogger(configuration::GetNode(reader.logged_configuration(), pi2_),
+                   &log_reader_factory, reader.logged_configuration());
+
+    StartLogger(&pi1_logger, tmp_dir_ + "/relogged1");
+    StartLogger(&pi2_logger, tmp_dir_ + "/relogged2");
+
+    log_reader_factory.Run();
+
+    for (auto &x : pi1_logger.log_namer->all_filenames()) {
+      log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged1_", x));
+    }
+    for (auto &x : pi2_logger.log_namer->all_filenames()) {
+      log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged2_", x));
+    }
+  }
+
+  reader.Deregister();
+
+  // And verify that we can run the LogReader over the relogged files without
+  // hitting any fatal errors.
+  {
+    LogReader relogged_reader(SortParts(log_files));
+    relogged_reader.Register();
+
+    relogged_reader.event_loop_factory()->Run();
+  }
+}
 
 }  // namespace testing
 }  // namespace logger