Split local data and timestamps into separate files

This sets us up to read all the timestamps before the data to enable
sorting.  This doesn't do that yet.  It only sets the files and formats
up to be ready.

Change-Id: I1213a70c90867a082ca37b21eafd99f9b95b600d
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index 40a331a..679d364 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -704,16 +704,10 @@
 }
 
 void MultiNodeLogNamer::Rotate(const Node *node) {
-  if (node == this->node()) {
-    if (data_writer_) {
-      data_writer_->Rotate();
-    }
-  } else {
-    for (std::pair<const Channel *const, NewDataWriter> &data_writer :
-         data_writers_) {
-      if (node == data_writer.second.node()) {
-        data_writer.second.Rotate();
-      }
+  for (auto &data_map : {&node_data_writers_, &node_timestamp_writers_}) {
+    auto it = data_map->find(node);
+    if (it != data_map->end()) {
+      it->second.Rotate();
     }
   }
 }
@@ -757,25 +751,16 @@
     return nullptr;
   }
 
-  // Now, sort out if this is data generated on this node, or not.  It is
-  // generated if it is sendable on this node.
-  if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
-    if (!data_writer_) {
-      MakeDataWriter();
-    }
-    data_writer_->UpdateMaxMessageSize(
-        PackMessageSize(LogType::kLogMessage, channel->max_size()));
-    return data_writer_.get();
-  }
-
   // Ok, we have data that is being forwarded to us that we are supposed to
   // log.  It needs to be logged with send timestamps, but be sorted enough
   // to be able to be processed.
-  CHECK(data_writers_.find(channel) == data_writers_.end());
 
   // Track that this node is being logged.
-  const Node *source_node = configuration::GetNode(
-      configuration_, channel->source_node()->string_view());
+  const Node *source_node =
+      configuration::MultiNode(configuration_)
+          ? configuration::GetNode(configuration_,
+                                   channel->source_node()->string_view())
+          : nullptr;
 
   if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
     nodes_.emplace_back(source_node);
@@ -783,26 +768,26 @@
 
   // If we already have a data writer for the node, then use the same writer for
   // all channels of that node.
-  if (node_data_writers_.find(source_node) != node_data_writers_.end()) {
-    node_data_writers_.find(source_node)
-        ->second->UpdateMaxMessageSize(
-            PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
-    return node_data_writers_.find(source_node)->second;
+  auto it = node_data_writers_.find(source_node);
+  if (it != node_data_writers_.end()) {
+    it->second.UpdateMaxMessageSize(
+        PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
+    return &(it->second);
   }
 
   // If we don't have a data writer for the node, create one.
   NewDataWriter data_writer(
       this, source_node, node_,
-      [this, channel](NewDataWriter *data_writer) {
-        OpenWriter(channel, data_writer);
+      [this, source_node](NewDataWriter *data_writer) {
+        OpenDataWriter(source_node, data_writer);
       },
       [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
       PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()),
       {StoredDataType::DATA});
 
-  data_writers_.emplace(channel, std::move(data_writer));
-  node_data_writers_.emplace(source_node, &data_writers_.find(channel)->second);
-  return &(data_writers_.find(channel)->second);
+  auto result = node_data_writers_.emplace(source_node, std::move(data_writer));
+  CHECK(result.second);
+  return &(result.first->second);
 }
 
 NewDataWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
@@ -812,30 +797,31 @@
       configuration::ChannelIsReadableOnNode(channel, this->node());
   CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
 
-  CHECK(data_writers_.find(channel) == data_writers_.end());
-
   if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
     nodes_.emplace_back(node);
   }
 
+  CHECK_NE(node, this->node());
+
   // If we have a remote timestamp writer for a particular node, use the same
   // writer for all remote timestamp channels of that node.
-  if (node_timestamp_writers_.find(node) != node_timestamp_writers_.end()) {
-    return node_timestamp_writers_.find(node)->second;
+  auto it = node_timestamp_writers_.find(node);
+  if (it != node_timestamp_writers_.end()) {
+    return &(it->second);
   }
 
   // If there are no remote timestamp writers for the node, create one.
   NewDataWriter data_writer(
       this, configuration::GetNode(configuration_, node), node_,
-      [this, channel](NewDataWriter *data_writer) {
-        OpenForwardedTimestampWriter(channel, data_writer);
+      [this](NewDataWriter *data_writer) {
+        OpenForwardedTimestampWriter(node_, data_writer);
       },
       [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
       PackRemoteMessageSize(), {StoredDataType::REMOTE_TIMESTAMPS});
 
-  data_writers_.emplace(channel, std::move(data_writer));
-  node_timestamp_writers_.emplace(node, &data_writers_.find(channel)->second);
-  return &(data_writers_.find(channel)->second);
+  auto result = node_timestamp_writers_.emplace(node, std::move(data_writer));
+  CHECK(result.second);
+  return &(result.first->second);
 }
 
 NewDataWriter *MultiNodeLogNamer::MakeTimestampWriter(const Channel *channel) {
@@ -848,19 +834,29 @@
     return nullptr;
   }
 
-  if (!data_writer_) {
-    MakeDataWriter();
+  // There is only one of these.
+  auto it = node_timestamp_writers_.find(this->node());
+  if (it != node_timestamp_writers_.end()) {
+    it->second.UpdateMaxMessageSize(
+        PackMessageSize(LogType::kLogDeliveryTimeOnly, 0));
+    return &(it->second);
   }
-  data_writer_->UpdateMaxMessageSize(
-      PackMessageSize(LogType::kLogDeliveryTimeOnly, 0));
-  return data_writer_.get();
+
+  NewDataWriter data_writer(
+      this, node_, node_,
+      [this](NewDataWriter *data_writer) { OpenTimestampWriter(data_writer); },
+      [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
+      PackMessageSize(LogType::kLogDeliveryTimeOnly, 0),
+      {StoredDataType::TIMESTAMPS});
+
+  auto result = node_timestamp_writers_.emplace(node_, std::move(data_writer));
+  CHECK(result.second);
+  return &(result.first->second);
 }
 
 WriteCode MultiNodeLogNamer::Close() {
-  data_writers_.clear();
   node_data_writers_.clear();
   node_timestamp_writers_.clear();
-  data_writer_.reset();
   if (ran_out_of_space_) {
     return WriteCode::kOutOfSpace;
   }
@@ -868,13 +864,15 @@
 }
 
 void MultiNodeLogNamer::ResetStatistics() {
-  for (std::pair<const Channel *const, NewDataWriter> &data_writer :
-       data_writers_) {
+  for (std::pair<const Node *const, NewDataWriter> &data_writer :
+       node_data_writers_) {
     if (!data_writer.second.writer) continue;
     data_writer.second.writer->WriteStatistics()->ResetStats();
   }
-  if (data_writer_ != nullptr && data_writer_->writer != nullptr) {
-    data_writer_->writer->WriteStatistics()->ResetStats();
+  for (std::pair<const Node *const, NewDataWriter> &data_writer :
+       node_timestamp_writers_) {
+    if (!data_writer.second.writer) continue;
+    data_writer.second.writer->WriteStatistics()->ResetStats();
   }
   max_write_time_ = std::chrono::nanoseconds::zero();
   max_write_time_bytes_ = -1;
@@ -886,48 +884,38 @@
 }
 
 void MultiNodeLogNamer::OpenForwardedTimestampWriter(
-    const Channel *channel, NewDataWriter *data_writer) {
-  std::string filename =
-      absl::StrCat("timestamps/", data_writer->node()->name()->string_view(),
-                   "/source_", channel->source_node()->string_view(),
-                   "_timestamp_", data_writer->node()->name()->string_view(),
-                   ".part", data_writer->parts_index(), ".bfbs", extension_);
-  CreateBufferWriter(filename, data_writer->max_message_size(),
-                     &data_writer->writer);
-}
-
-void MultiNodeLogNamer::OpenWriter(const Channel *channel,
-                                   NewDataWriter *data_writer) {
+    const Node * /*source_node*/, NewDataWriter *data_writer) {
   const std::string filename = absl::StrCat(
-      CHECK_NOTNULL(channel->source_node())->string_view(), "_data/",
-      channel->source_node()->string_view(), "_data.part",
+      "timestamps/remote_", data_writer->node()->name()->string_view(), ".part",
       data_writer->parts_index(), ".bfbs", extension_);
   CreateBufferWriter(filename, data_writer->max_message_size(),
                      &data_writer->writer);
 }
 
-void MultiNodeLogNamer::MakeDataWriter() {
-  if (!data_writer_) {
-    data_writer_ = std::make_unique<NewDataWriter>(
-        this, node_, node_,
-        [this](NewDataWriter *writer) {
-          std::string name;
-          if (node() != nullptr) {
-            name = absl::StrCat(name, node()->name()->string_view(), "_");
-          }
-          absl::StrAppend(&name, "data.part", writer->parts_index(), ".bfbs",
-                          extension_);
-          CreateBufferWriter(name, writer->max_message_size(), &writer->writer);
-        },
-        [this](NewDataWriter *data_writer) {
-          CloseWriter(&data_writer->writer);
-        },
-        // Default size is 0 so it will be obvious if something doesn't fix it
-        // afterwards.
-        0,
-        std::initializer_list<StoredDataType>{StoredDataType::DATA,
-                                              StoredDataType::TIMESTAMPS});
+void MultiNodeLogNamer::OpenDataWriter(const Node *source_node,
+                                       NewDataWriter *data_writer) {
+  std::string filename;
+
+  if (source_node != nullptr) {
+    if (source_node == node_) {
+      filename = absl::StrCat(source_node->name()->string_view(), "_");
+    } else {
+      filename = absl::StrCat("data/", source_node->name()->string_view(), "_");
+    }
   }
+
+  absl::StrAppend(&filename, "data.part", data_writer->parts_index(), ".bfbs",
+                  extension_);
+  CreateBufferWriter(filename, data_writer->max_message_size(),
+                     &data_writer->writer);
+}
+
+void MultiNodeLogNamer::OpenTimestampWriter(NewDataWriter *data_writer) {
+  std::string filename =
+      absl::StrCat(node()->name()->string_view(), "_timestamps.part",
+                   data_writer->parts_index(), ".bfbs", extension_);
+  CreateBufferWriter(filename, data_writer->max_message_size(),
+                     &data_writer->writer);
 }
 
 void MultiNodeLogNamer::CreateBufferWriter(
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index 6a18997..ccdfe5a 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -506,14 +506,12 @@
 
  private:
   // Opens up a writer for timestamps forwarded back.
-  void OpenForwardedTimestampWriter(const Channel *channel,
+  void OpenForwardedTimestampWriter(const Node *source_node,
                                     NewDataWriter *data_writer);
 
   // Opens up a writer for remote data.
-  void OpenWriter(const Channel *channel, NewDataWriter *data_writer);
-
-  // Opens the main data writer file for this node responsible for data_writer_.
-  void MakeDataWriter();
+  void OpenDataWriter(const Node *source_node, NewDataWriter *data_writer);
+  void OpenTimestampWriter(NewDataWriter *data_writer);
 
   void CreateBufferWriter(std::string_view path, size_t max_message_size,
                           std::unique_ptr<DetachedBufferWriter> *destination);
@@ -523,14 +521,17 @@
   // A version of std::accumulate which operates over all of our DataWriters.
   template <typename T, typename BinaryOperation>
   T accumulate_data_writers(T t, BinaryOperation op) const {
-    for (const std::pair<const Channel *const, NewDataWriter> &data_writer :
-         data_writers_) {
+    for (const std::pair<const Node *const, NewDataWriter> &data_writer :
+         node_data_writers_) {
       if (data_writer.second.writer != nullptr) {
         t = op(std::move(t), data_writer.second);
       }
     }
-    if (data_writer_ != nullptr && data_writer_->writer != nullptr) {
-      t = op(std::move(t), *data_writer_);
+    for (const std::pair<const Node *const, NewDataWriter> &data_writer :
+         node_timestamp_writers_) {
+      if (data_writer.second.writer != nullptr) {
+        t = op(std::move(t), data_writer.second);
+      }
     }
     return t;
   }
@@ -552,15 +553,10 @@
   int total_write_messages_ = 0;
   int total_write_bytes_ = 0;
 
-  // File to write both delivery timestamps and local data to.
-  std::unique_ptr<NewDataWriter> data_writer_;
-
-  std::map<const Channel *, NewDataWriter> data_writers_;
-
   // Data writer per remote node.
-  std::map<const Node *, NewDataWriter *> node_data_writers_;
+  std::map<const Node *, NewDataWriter> node_data_writers_;
   // Remote timestamp writers per node.
-  std::map<const Node *, NewDataWriter *> node_timestamp_writers_;
+  std::map<const Node *, NewDataWriter> node_timestamp_writers_;
 };
 
 // This is specialized log namer that deals with directory centric log events.
diff --git a/aos/events/logging/multinode_logger_test.cc b/aos/events/logging/multinode_logger_test.cc
index f6e5447..648f430 100644
--- a/aos/events/logging/multinode_logger_test.cc
+++ b/aos/events/logging/multinode_logger_test.cc
@@ -79,7 +79,7 @@
     }
 
     EXPECT_EQ(logfile_uuids.size(), 2u);
-    EXPECT_EQ(parts_uuids.size(), 6u);
+    EXPECT_EQ(parts_uuids.size(), 8u);
 
     // And confirm everything is on the correct node.
     EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
@@ -105,12 +105,14 @@
 
     // And the parts index matches.
     EXPECT_EQ(log_header[2].message().parts_index(), 0);
-    EXPECT_EQ(log_header[3].message().parts_index(), 1);
-    EXPECT_EQ(log_header[4].message().parts_index(), 2);
+
+    EXPECT_EQ(log_header[3].message().parts_index(), 0);
+    EXPECT_EQ(log_header[4].message().parts_index(), 1);
 
     EXPECT_EQ(log_header[5].message().parts_index(), 0);
-    EXPECT_EQ(log_header[6].message().parts_index(), 1);
-    EXPECT_EQ(log_header[7].message().parts_index(), 2);
+
+    EXPECT_EQ(log_header[6].message().parts_index(), 0);
+    EXPECT_EQ(log_header[7].message().parts_index(), 1);
 
     EXPECT_EQ(log_header[8].message().parts_index(), 0);
     EXPECT_EQ(log_header[9].message().parts_index(), 1);
@@ -127,24 +129,18 @@
 
     // And that the data_stored field is right.
     EXPECT_THAT(*log_header[2].message().data_stored(),
-                ::testing::ElementsAre(StoredDataType::DATA,
-                                       StoredDataType::TIMESTAMPS));
+                ::testing::ElementsAre(StoredDataType::DATA));
     EXPECT_THAT(*log_header[3].message().data_stored(),
-                ::testing::ElementsAre(StoredDataType::DATA,
-                                       StoredDataType::TIMESTAMPS));
+                ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
     EXPECT_THAT(*log_header[4].message().data_stored(),
-                ::testing::ElementsAre(StoredDataType::DATA,
-                                       StoredDataType::TIMESTAMPS));
+                ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
 
     EXPECT_THAT(*log_header[5].message().data_stored(),
-                ::testing::ElementsAre(StoredDataType::DATA,
-                                       StoredDataType::TIMESTAMPS));
+                ::testing::ElementsAre(StoredDataType::DATA));
     EXPECT_THAT(*log_header[6].message().data_stored(),
-                ::testing::ElementsAre(StoredDataType::DATA,
-                                       StoredDataType::TIMESTAMPS));
+                ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
     EXPECT_THAT(*log_header[7].message().data_stored(),
-                ::testing::ElementsAre(StoredDataType::DATA,
-                                       StoredDataType::TIMESTAMPS));
+                ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
 
     EXPECT_THAT(*log_header[8].message().data_stored(),
                 ::testing::ElementsAre(StoredDataType::DATA));
@@ -176,49 +172,44 @@
     std::shared_ptr<const aos::Configuration> config = sorted_parts[0].config;
 
     // Timing reports, pings
-    EXPECT_THAT(CountChannelsData(config, logfiles_[2]),
-                UnorderedElementsAre(
-                    std::make_tuple("/pi1/aos",
-                                    "aos.message_bridge.ServerStatistics", 1),
-                    std::make_tuple("/test", "aos.examples.Ping", 1),
-                    std::make_tuple("/pi1/aos", "aos.examples.Ping", 1)))
-        << " : " << logfiles_[2];
-    {
-      std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
-          std::make_tuple("/pi1/aos", "aos.examples.Ping", 10),
-          std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 1),
-          std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
-                          1)};
-      if (!std::get<0>(GetParam()).shared) {
-        channel_counts.push_back(
-            std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
-                            "aos-message_bridge-Timestamp",
-                            "aos.message_bridge.RemoteMessage", 1));
-      }
-      EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
-                  ::testing::UnorderedElementsAreArray(channel_counts))
-          << " : " << logfiles_[3];
+    if (shared()) {
+      EXPECT_THAT(
+          CountChannelsData(config, logfiles_[2]),
+          UnorderedElementsAre(
+              std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
+              std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
+                              200),
+              std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
+                              21),
+              std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
+              std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
+              std::make_tuple("/test", "aos.examples.Ping", 2001)))
+          << " : " << logfiles_[2];
+    } else {
+      EXPECT_THAT(
+          CountChannelsData(config, logfiles_[2]),
+          UnorderedElementsAre(
+              std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
+              std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
+                              200),
+              std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
+                              21),
+              std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
+              std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
+              std::make_tuple("/test", "aos.examples.Ping", 2001),
+              std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
+                              "aos-message_bridge-Timestamp",
+                              "aos.message_bridge.RemoteMessage", 200)))
+          << " : " << logfiles_[2];
     }
-    {
-      std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
-          std::make_tuple("/pi1/aos", "aos.examples.Ping", 1990),
-          std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 199),
-          std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
-                          20),
-          std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
-                          199),
-          std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
-          std::make_tuple("/test", "aos.examples.Ping", 2000)};
-      if (!std::get<0>(GetParam()).shared) {
-        channel_counts.push_back(
-            std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
-                            "aos-message_bridge-Timestamp",
-                            "aos.message_bridge.RemoteMessage", 199));
-      }
-      EXPECT_THAT(CountChannelsData(config, logfiles_[4]),
-                  ::testing::UnorderedElementsAreArray(channel_counts))
-          << " : " << logfiles_[4];
-    }
+
+    EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
+                ::testing::UnorderedElementsAre())
+        << " : " << logfiles_[3];
+    EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
+                ::testing::UnorderedElementsAre())
+        << " : " << logfiles_[4];
+
     // Timestamps for pong
     EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
                 UnorderedElementsAre())
@@ -235,27 +226,21 @@
         << " : " << logfiles_[4];
 
     // Timing reports and pongs.
-    EXPECT_THAT(CountChannelsData(config, logfiles_[5]),
-                UnorderedElementsAre(
-                    std::make_tuple("/pi2/aos", "aos.examples.Ping", 1),
-                    std::make_tuple("/pi2/aos",
-                                    "aos.message_bridge.ServerStatistics", 1)))
-        << " : " << logfiles_[5];
     EXPECT_THAT(
-        CountChannelsData(config, logfiles_[6]),
-        UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
-        << " : " << logfiles_[6];
-    EXPECT_THAT(
-        CountChannelsData(config, logfiles_[7]),
+        CountChannelsData(config, logfiles_[5]),
         UnorderedElementsAre(
-            std::make_tuple("/pi2/aos", "aos.examples.Ping", 2000),
-            std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
-            std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
-                            20),
+            std::make_tuple("/pi2/aos", "aos.examples.Ping", 2001),
             std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
                             200),
+            std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
+                            21),
+            std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
             std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
-            std::make_tuple("/test", "aos.examples.Pong", 2000)))
+            std::make_tuple("/test", "aos.examples.Pong", 2001)))
+        << " : " << logfiles_[5];
+    EXPECT_THAT(CountChannelsData(config, logfiles_[6]), UnorderedElementsAre())
+        << " : " << logfiles_[6];
+    EXPECT_THAT(CountChannelsData(config, logfiles_[7]), UnorderedElementsAre())
         << " : " << logfiles_[7];
     // And ping timestamps.
     EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
@@ -977,7 +962,7 @@
   }
 
   const std::vector<LogFile> sorted_parts =
-      SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 3, 3));
+      SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 1, 1, 2, 2));
   EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
   LogReader reader(sorted_parts);
 
@@ -1155,10 +1140,10 @@
   // For snappy, needs to have enough data to be >1 chunk of compressed data so
   // that we don't corrupt the entire log part.
   ::std::string compressed_contents =
-      aos::util::ReadFileToStringOrDie(logfiles_[4]);
+      aos::util::ReadFileToStringOrDie(logfiles_[2]);
 
   aos::util::WriteStringToFileOrDie(
-      logfiles_[4],
+      logfiles_[2],
       compressed_contents.substr(0, compressed_contents.size() - 100));
 
   const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
@@ -1954,7 +1939,7 @@
   // hitting any fatal errors.
   {
     LogReader relogged_reader(SortParts(MakeLogFiles(
-        tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 3, 3, true)));
+        tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 1, 1, 2, 2, true)));
     relogged_reader.Register();
 
     relogged_reader.event_loop_factory()->Run();
@@ -1964,7 +1949,7 @@
   {
     LogReader relogged_reader(
         SortParts(MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2",
-                               3, 3, true)),
+                               1, 1, 2, 2, true)),
         reader.configuration());
     relogged_reader.Register();
 
@@ -2150,7 +2135,7 @@
       //
       // TODO(austin): I'm not the most thrilled with this test pattern...  It
       // feels brittle in a different way.
-      if (file.find("source_pi1_timestamp_pi2") == std::string::npos) {
+      if (file.find("timestamps/remote_pi2") == std::string::npos) {
         switch (log_header->message().parts_index()) {
           case 0:
             EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
@@ -2287,93 +2272,107 @@
               monotonic_clock::max_time);
     EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
               monotonic_clock::max_time);
-    switch (log_header->message().parts_index()) {
-      case 0:
-        EXPECT_EQ(oldest_remote_monotonic_timestamps,
-                  monotonic_clock::max_time);
-        EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
-        EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
-                  monotonic_clock::max_time);
-        EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
-                  monotonic_clock::max_time);
-        EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
-                  monotonic_clock::max_time);
-        EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
-                  monotonic_clock::max_time);
-        break;
-      case 1:
-        EXPECT_EQ(oldest_remote_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(90200)));
-        EXPECT_EQ(oldest_local_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(90350)));
-        EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(90200)));
-        EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(90350)));
-        EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
-                  monotonic_clock::max_time);
-        EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
-                  monotonic_clock::max_time);
-        break;
-      case 2:
-        EXPECT_EQ(oldest_remote_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(90200)))
-            << file;
-        EXPECT_EQ(oldest_local_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(90350)))
-            << file;
-        EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(90200)))
-            << file;
-        EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(90350)))
-            << file;
-        EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(100000)))
-            << file;
-        EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(100150)))
-            << file;
-        break;
-      case 3:
-        EXPECT_EQ(oldest_remote_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::milliseconds(1323) +
-                                              chrono::microseconds(200)));
-        EXPECT_EQ(oldest_local_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(10100350)));
-        EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::milliseconds(1323) +
-                                              chrono::microseconds(200)));
-        EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(10100350)));
-        EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
-                  monotonic_clock::max_time)
-            << file;
-        EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
-                  monotonic_clock::max_time)
-            << file;
-        break;
-      case 4:
-        EXPECT_EQ(oldest_remote_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::milliseconds(1323) +
-                                              chrono::microseconds(200)));
-        EXPECT_EQ(oldest_local_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(10100350)));
-        EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::milliseconds(1323) +
-                                              chrono::microseconds(200)));
-        EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(10100350)));
-        EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(1423000)))
-            << file;
-        EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(10200150)))
-            << file;
-        break;
-      default:
-        FAIL();
-        break;
+    if (log_header->message().data_stored()->Get(0) == StoredDataType::DATA) {
+      switch (log_header->message().parts_index()) {
+        case 0:
+          ASSERT_EQ(oldest_remote_monotonic_timestamps,
+                    monotonic_clock::max_time);
+          EXPECT_EQ(oldest_local_monotonic_timestamps,
+                    monotonic_clock::max_time);
+          EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+                    monotonic_clock::max_time);
+          EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+                    monotonic_clock::max_time);
+          EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
+                    monotonic_clock::max_time);
+          EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
+                    monotonic_clock::max_time);
+          break;
+        default:
+          FAIL();
+          break;
+      }
+    } else if (log_header->message().data_stored()->Get(0) ==
+               StoredDataType::TIMESTAMPS) {
+      switch (log_header->message().parts_index()) {
+        case 0:
+          ASSERT_EQ(oldest_remote_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(90200)));
+          EXPECT_EQ(oldest_local_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(90350)));
+          EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(90200)));
+          EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(90350)));
+          EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
+                    monotonic_clock::max_time);
+          EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
+                    monotonic_clock::max_time);
+          break;
+        case 1:
+          ASSERT_EQ(oldest_remote_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(90200)))
+              << file;
+          EXPECT_EQ(oldest_local_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(90350)))
+              << file;
+          EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(90200)))
+              << file;
+          EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(90350)))
+              << file;
+          EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(100000)))
+              << file;
+          EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(100150)))
+              << file;
+          break;
+        case 2:
+          ASSERT_EQ(oldest_remote_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::milliseconds(1323) +
+                                                chrono::microseconds(200)));
+          EXPECT_EQ(
+              oldest_local_monotonic_timestamps,
+              monotonic_clock::time_point(chrono::microseconds(10100350)));
+          EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::milliseconds(1323) +
+                                                chrono::microseconds(200)));
+          EXPECT_EQ(
+              oldest_local_unreliable_monotonic_timestamps,
+              monotonic_clock::time_point(chrono::microseconds(10100350)));
+          EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
+                    monotonic_clock::max_time)
+              << file;
+          EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
+                    monotonic_clock::max_time)
+              << file;
+          break;
+        case 3:
+          ASSERT_EQ(oldest_remote_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::milliseconds(1323) +
+                                                chrono::microseconds(200)));
+          EXPECT_EQ(
+              oldest_local_monotonic_timestamps,
+              monotonic_clock::time_point(chrono::microseconds(10100350)));
+          EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::milliseconds(1323) +
+                                                chrono::microseconds(200)));
+          EXPECT_EQ(
+              oldest_local_unreliable_monotonic_timestamps,
+              monotonic_clock::time_point(chrono::microseconds(10100350)));
+          EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(1423000)))
+              << file;
+          EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(10200150)))
+              << file;
+          break;
+        default:
+          FAIL();
+          break;
+      }
     }
   }
 
@@ -2513,7 +2512,7 @@
               2u);
 
     if (log_header->message().node()->name()->string_view() != "pi1") {
-      ASSERT_TRUE(file.find("source_pi1_timestamp_pi2") != std::string::npos);
+      ASSERT_TRUE(file.find("timestamps/remote_pi2") != std::string::npos);
 
       const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
           ReadNthMessage(file, 0);
@@ -2915,8 +2914,10 @@
 
   std::vector<std::string> missing_parts;
 
-  missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
-  missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part2" + Extension());
+  missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part0" +
+                             Extension());
+  missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part2" +
+                             Extension());
   missing_parts.emplace_back(absl::StrCat(
       logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
 
diff --git a/aos/events/logging/multinode_logger_test_lib.cc b/aos/events/logging/multinode_logger_test_lib.cc
index d15a277..f383778 100644
--- a/aos/events/logging/multinode_logger_test_lib.cc
+++ b/aos/events/logging/multinode_logger_test_lib.cc
@@ -109,8 +109,9 @@
     unlink((file + ".xz").c_str());
   }
 
-  for (const auto &file : MakeLogFiles(tmp_dir_ + "/relogged1",
-                                       tmp_dir_ + "/relogged2", 3, 3, true)) {
+  for (const auto &file :
+       MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 3, 3, 3,
+                    3, true)) {
     unlink(file.c_str());
   }
 
@@ -137,7 +138,8 @@
 
 std::vector<std::string> MultinodeLoggerTest::MakeLogFiles(
     std::string logfile_base1, std::string logfile_base2, size_t pi1_data_count,
-    size_t pi2_data_count, bool relogged_config) {
+    size_t pi2_data_count, size_t pi1_timestamps_count,
+    size_t pi2_timestamps_count, bool relogged_config) {
   std::string_view sha256 = relogged_config
                                 ? std::get<0>(GetParam()).relogged_sha256
                                 : std::get<0>(GetParam()).sha256;
@@ -148,30 +150,33 @@
     result.emplace_back(
         absl::StrCat(logfile_base1, "_pi1_data.part", i, Extension()));
   }
+  for (size_t i = 0; i < pi1_timestamps_count; ++i) {
+    result.emplace_back(
+        absl::StrCat(logfile_base1, "_pi1_timestamps.part", i, Extension()));
+  }
   for (size_t i = 0; i < pi2_data_count; ++i) {
     result.emplace_back(
         absl::StrCat(logfile_base2, "_pi2_data.part", i, Extension()));
   }
-  result.emplace_back(logfile_base2 + "_pi1_data/pi1_data.part0" + Extension());
-  result.emplace_back(logfile_base2 + "_pi1_data/pi1_data.part1" + Extension());
-  result.emplace_back(logfile_base1 + "_pi2_data/pi2_data.part0" + Extension());
-  result.emplace_back(logfile_base1 + "_pi2_data/pi2_data.part1" + Extension());
+  for (size_t i = 0; i < pi2_timestamps_count; ++i) {
+    result.emplace_back(
+        absl::StrCat(logfile_base2, "_pi2_timestamps.part", i, Extension()));
+  }
+  result.emplace_back(logfile_base2 + "_data/pi1_data.part0" + Extension());
+  result.emplace_back(logfile_base2 + "_data/pi1_data.part1" + Extension());
+  result.emplace_back(logfile_base1 + "_data/pi2_data.part0" + Extension());
+  result.emplace_back(logfile_base1 + "_data/pi2_data.part1" + Extension());
   // shared and not shared config types will have the same output since the data
   // writers are consolidated to per node instead of per channel.
-  result.emplace_back(logfile_base1 +
-                      "_timestamps/pi2/source_pi1_timestamp_pi2.part0" +
+  result.emplace_back(logfile_base1 + "_timestamps/remote_pi2.part0" +
                       Extension());
-  result.emplace_back(logfile_base1 +
-                      "_timestamps/pi2/source_pi1_timestamp_pi2.part1" +
+  result.emplace_back(logfile_base1 + "_timestamps/remote_pi2.part1" +
                       Extension());
-  result.emplace_back(logfile_base1 +
-                      "_timestamps/pi2/source_pi1_timestamp_pi2.part2" +
+  result.emplace_back(logfile_base1 + "_timestamps/remote_pi2.part2" +
                       Extension());
-  result.emplace_back(logfile_base2 +
-                      "_timestamps/pi1/source_pi2_timestamp_pi1.part0" +
+  result.emplace_back(logfile_base2 + "_timestamps/remote_pi1.part0" +
                       Extension());
-  result.emplace_back(logfile_base2 +
-                      "_timestamps/pi1/source_pi2_timestamp_pi1.part1" +
+  result.emplace_back(logfile_base2 + "_timestamps/remote_pi1.part1" +
                       Extension());
 
   return result;
@@ -180,24 +185,20 @@
 std::vector<std::string> MultinodeLoggerTest::MakePi1RebootLogfiles() {
   std::vector<std::string> result;
   result.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
-  result.emplace_back(logfile_base1_ + "_pi1_data.part1" + Extension());
-  result.emplace_back(logfile_base1_ + "_pi1_data.part2" + Extension());
-  result.emplace_back(logfile_base1_ + "_pi1_data.part3" + Extension());
-  result.emplace_back(logfile_base1_ + "_pi1_data.part4" + Extension());
-  result.emplace_back(logfile_base1_ + "_pi2_data/pi2_data.part0" +
-                      Extension());
-  result.emplace_back(logfile_base1_ + "_pi2_data/pi2_data.part1" +
-                      Extension());
-  result.emplace_back(logfile_base1_ + "_pi2_data/pi2_data.part2" +
-                      Extension());
-  result.emplace_back(logfile_base1_ + "_pi2_data/pi2_data.part3" +
-                      Extension());
+  result.emplace_back(logfile_base1_ + "_pi1_timestamps.part0" + Extension());
+  result.emplace_back(logfile_base1_ + "_pi1_timestamps.part1" + Extension());
+  result.emplace_back(logfile_base1_ + "_pi1_timestamps.part2" + Extension());
+  result.emplace_back(logfile_base1_ + "_pi1_timestamps.part3" + Extension());
+
+  result.emplace_back(logfile_base1_ + "_data/pi2_data.part0" + Extension());
+  result.emplace_back(logfile_base1_ + "_data/pi2_data.part1" + Extension());
+  result.emplace_back(logfile_base1_ + "_data/pi2_data.part2" + Extension());
+  result.emplace_back(logfile_base1_ + "_data/pi2_data.part3" + Extension());
   result.emplace_back(absl::StrCat(
       logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
   for (size_t i = 0; i < 6; ++i) {
     result.emplace_back(absl::StrCat(
-        logfile_base1_, "_timestamps/pi2/source_pi1_timestamp_pi2.part", i,
-        Extension()));
+        logfile_base1_, "_timestamps/remote_pi2.part", i, Extension()));
   }
   return result;
 }
@@ -205,7 +206,7 @@
 std::vector<std::string> MultinodeLoggerTest::MakePi1SingleDirectionLogfiles() {
   std::vector<std::string> result;
   result.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
-  result.emplace_back(logfile_base1_ + "_pi1_data.part1" + Extension());
+  result.emplace_back(logfile_base1_ + "_pi1_timestamps.part0" + Extension());
   result.emplace_back(absl::StrCat(
       logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
   return result;
@@ -221,9 +222,10 @@
 
 std::vector<std::vector<std::string>> MultinodeLoggerTest::StructureLogFiles() {
   std::vector<std::vector<std::string>> result{
-      std::vector<std::string>{logfiles_[2], logfiles_[3], logfiles_[4]},
-      // std::vector<std::string>{logfiles_[5], logfiles_[6]},
-      std::vector<std::string>{logfiles_[5], logfiles_[6], logfiles_[7]},
+      std::vector<std::string>{logfiles_[2]},
+      std::vector<std::string>{logfiles_[3], logfiles_[4]},
+      std::vector<std::string>{logfiles_[5]},
+      std::vector<std::string>{logfiles_[6], logfiles_[7]},
       std::vector<std::string>{logfiles_[8], logfiles_[9]},
       std::vector<std::string>{logfiles_[10], logfiles_[11]}};
 
diff --git a/aos/events/logging/multinode_logger_test_lib.h b/aos/events/logging/multinode_logger_test_lib.h
index cf05727..1bea704 100644
--- a/aos/events/logging/multinode_logger_test_lib.h
+++ b/aos/events/logging/multinode_logger_test_lib.h
@@ -115,8 +115,10 @@
 
   std::vector<std::string> MakeLogFiles(std::string logfile_base1,
                                         std::string logfile_base2,
-                                        size_t pi1_data_count = 3,
-                                        size_t pi2_data_count = 3,
+                                        size_t pi1_data_count = 1,
+                                        size_t pi2_data_count = 1,
+                                        size_t pi1_timestamp_count = 2,
+                                        size_t pi2_timestamp_count = 2,
                                         bool relogged_config = false);
 
   std::vector<std::string> MakePi1RebootLogfiles();