Actually populate boot_uuids in the header

Now that we have all the infrastructure, actually populate the header
and start tracking it.  This disambiguates boots.

We still need some sort of timestamp to help make it more clear which
boot comes first and not rely on parts_index.

Change-Id: I00d0cdf4cf78905a11e9966f44b22e115851e8e0
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index ca7fc61..5ab4636 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -25,6 +25,9 @@
       log_namer_(log_namer),
       reopen_(std::move(reopen)),
       close_(std::move(close)) {
+  boot_uuids_.resize(configuration::NodesCount(log_namer->configuration_),
+                     UUID::Zero());
+  CHECK_LT(node_index_, boot_uuids_.size());
   reopen_(this);
 }
 
@@ -35,11 +38,13 @@
 }
 
 void NewDataWriter::Rotate() {
-  ++parts_index_;
-  reopen_(this);
-  header_written_ = false;
-  QueueHeader(log_namer_->MakeHeader(node_index_, source_node_boot_uuid_,
-                                     parts_uuid(), parts_index_));
+  // No need to rotate if nothing has been written.
+  if (header_written_) {
+    ++parts_index_;
+    reopen_(this);
+    header_written_ = false;
+    QueueHeader(MakeHeader());
+  }
 }
 
 void NewDataWriter::Reboot() {
@@ -49,31 +54,58 @@
   header_written_ = false;
 }
 
-void NewDataWriter::QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb,
-                                         const UUID &source_node_boot_uuid,
-                                         aos::monotonic_clock::time_point now) {
+void NewDataWriter::UpdateRemote(size_t remote_node_index,
+                                 const UUID &remote_node_boot_uuid) {
+  CHECK_LT(remote_node_index, boot_uuids_.size());
+  if (boot_uuids_[remote_node_index] != remote_node_boot_uuid) {
+    VLOG(1) << filename() << " Remote " << remote_node_index << " updated to "
+              << remote_node_boot_uuid << " from "
+              << boot_uuids_[remote_node_index];
+    boot_uuids_[remote_node_index] = remote_node_boot_uuid;
+    Rotate();
+  }
+}
+
+void NewDataWriter::QueueMessage(flatbuffers::FlatBufferBuilder *fbb,
+                                 const UUID &source_node_boot_uuid,
+                                 aos::monotonic_clock::time_point now) {
   // TODO(austin): Handle remote nodes changing too, not just the source node.
-  if (source_node_boot_uuid_ != source_node_boot_uuid) {
+  if (boot_uuids_[node_index_] != source_node_boot_uuid) {
+    boot_uuids_[node_index_] = source_node_boot_uuid;
     if (header_written_) {
       Reboot();
     }
 
-    QueueHeader(log_namer_->MakeHeader(node_index_, source_node_boot_uuid,
-                                       parts_uuid(), parts_index_));
+    QueueHeader(MakeHeader());
   }
-  CHECK_EQ(source_node_boot_uuid_, source_node_boot_uuid);
+  CHECK_EQ(boot_uuids_[node_index_], source_node_boot_uuid);
   CHECK(header_written_) << ": Attempting to write message before header to "
                          << writer->filename();
   writer->QueueSizedFlatbuffer(fbb, now);
 }
 
+aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
+NewDataWriter::MakeHeader() {
+  const size_t logger_node_index = log_namer_->logger_node_index();
+  const UUID &logger_node_boot_uuid = log_namer_->logger_node_boot_uuid();
+  if (boot_uuids_[logger_node_index] == UUID::Zero()) {
+    VLOG(1) << filename() << " Logger node is " << logger_node_index
+            << " and uuid is " << logger_node_boot_uuid;
+    boot_uuids_[logger_node_index] = logger_node_boot_uuid;
+  } else {
+    CHECK_EQ(boot_uuids_[logger_node_index], logger_node_boot_uuid);
+  }
+  return log_namer_->MakeHeader(node_index_, boot_uuids_, parts_uuid(),
+                                parts_index_);
+}
+
 void NewDataWriter::QueueHeader(
     aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &&header) {
   CHECK(!header_written_) << ": Attempting to write duplicate header to "
                           << writer->filename();
   CHECK(header.message().has_source_node_boot_uuid());
-  source_node_boot_uuid_ =
-      UUID::FromString(header.message().source_node_boot_uuid());
+  CHECK_EQ(boot_uuids_[node_index_],
+           UUID::FromString(header.message().source_node_boot_uuid()));
   // TODO(austin): This triggers a dummy allocation that we don't need as part
   // of releasing.  Can we skip it?
   writer->QueueSizedFlatbuffer(header.Release());
@@ -88,8 +120,9 @@
 }
 
 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
-    size_t node_index, const UUID &source_node_boot_uuid,
+    size_t node_index, const std::vector<UUID> &boot_uuids,
     const UUID &parts_uuid, int parts_index) const {
+  const UUID &source_node_boot_uuid = boot_uuids[node_index];
   const Node *const source_node =
       configuration::GetNode(configuration_, node_index);
   CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 18u);
@@ -147,6 +180,20 @@
     logger_node_offset = RecursiveCopyFlatBuffer(node_, &fbb);
   }
 
+  std::vector<flatbuffers::Offset<flatbuffers::String>> boot_uuid_offsets;
+  boot_uuid_offsets.reserve(boot_uuids.size());
+  for (const UUID &uuid : boot_uuids) {
+    if (uuid != UUID::Zero()) {
+      boot_uuid_offsets.emplace_back(uuid.PackString(&fbb));
+    } else {
+      boot_uuid_offsets.emplace_back(fbb.CreateString(""));
+    }
+  }
+
+  flatbuffers::Offset<
+      flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
+      boot_uuids_offset = fbb.CreateVector(boot_uuid_offsets);
+
   aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
 
   log_file_header_builder.add_name(name_offset);
@@ -207,6 +254,7 @@
     log_file_header_builder.add_configuration_sha256(config_sha256_offset);
   }
 
+  log_file_header_builder.add_boot_uuids(boot_uuids_offset);
   fbb.FinishSizePrefixed(log_file_header_builder.Finish());
   aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
       fbb.Release());
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index 2727e63..b6efb80 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -50,17 +50,16 @@
   // TODO(austin): Add known timestamps for each node every time we cycle a log
   // for sorting.
 
+  void UpdateRemote(size_t remote_node_index,
+                    const UUID &remote_node_boot_uuid);
   // Queues up a message with the provided boot UUID.
-  void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb,
-                            const UUID &source_node_boot_uuid,
-                            aos::monotonic_clock::time_point now);
+  void QueueMessage(flatbuffers::FlatBufferBuilder *fbb,
+                    const UUID &node_boot_uuid,
+                    aos::monotonic_clock::time_point now);
 
   // Returns the filename of the writer.
   std::string_view filename() const { return writer->filename(); }
 
-  // Signals that a node has rebooted.
-  void Reboot();
-
   void Close();
 
   std::unique_ptr<DetachedBufferWriter> writer = nullptr;
@@ -71,11 +70,16 @@
   const Node *node() const { return node_; }
 
  private:
+  // Signals that a node has rebooted.
+  void Reboot();
+
   void QueueHeader(
       aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &&header);
 
+  aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader();
+
   const Node *const node_ = nullptr;
-  size_t node_index_ = 0;
+  const size_t node_index_ = 0;
   LogNamer *log_namer_;
   UUID parts_uuid_ = UUID::Random();
   size_t parts_index_ = 0;
@@ -83,7 +87,8 @@
   std::function<void(NewDataWriter *)> reopen_;
   std::function<void(NewDataWriter *)> close_;
   bool header_written_ = false;
-  UUID source_node_boot_uuid_ = UUID::Zero();
+
+  std::vector<UUID> boot_uuids_;
 };
 
 // Interface describing how to name, track, and add headers to log file parts.
@@ -92,7 +97,9 @@
   // Constructs a LogNamer with the primary node (ie the one the logger runs on)
   // being node.
   LogNamer(const Configuration *configuration, const Node *node)
-      : configuration_(configuration), node_(node) {
+      : configuration_(configuration),
+        node_(node),
+        logger_node_index_(configuration::GetNodeIndex(configuration_, node)) {
     nodes_.emplace_back(node_);
     node_states_.resize(configuration::NodesCount(configuration_));
   }
@@ -138,6 +145,8 @@
 
   // Returns the node the logger is running on.
   const Node *node() const { return node_; }
+  const UUID &logger_node_boot_uuid() const { return logger_node_boot_uuid_; }
+  size_t logger_node_index() const { return logger_node_index_; }
 
   // Writes out the nested Configuration object to the config file location.
   virtual void WriteConfiguration(
@@ -147,6 +156,8 @@
   void SetHeaderTemplate(
       aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> header) {
     header_ = std::move(header);
+    logger_node_boot_uuid_ =
+        UUID::FromString(header_.message().logger_node_boot_uuid());
   }
 
   void SetStartTimes(size_t node_index,
@@ -173,11 +184,13 @@
   // Creates a new header by copying fields out of the template and combining
   // them with the arguments provided.
   aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
-      size_t node_index, const UUID &source_node_boot_uuid,
+      size_t node_index, const std::vector<UUID> &boot_uuids,
       const UUID &parts_uuid, int parts_index) const;
 
   const Configuration *const configuration_;
   const Node *const node_;
+  const size_t logger_node_index_;
+  UUID logger_node_boot_uuid_;
   std::vector<const Node *> nodes_;
 
   friend NewDataWriter;
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index 580a126..4bc4448 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -111,14 +111,20 @@
         fs.wants_timestamp_writer = true;
         fs.timestamp_node_index = our_node_index;
       }
-      if (log_message) {
-        VLOG(1) << "  Data";
-        fs.wants_writer = true;
+      // Both the timestamp and data writers want data_node_index so it knows
+      // what the source node is.
+      if (log_message || log_delivery_times) {
         if (!is_local) {
           const Node *source_node = configuration::GetNode(
               configuration_, channel->source_node()->string_view());
           fs.data_node_index =
               configuration::GetNodeIndex(configuration_, source_node);
+        }
+      }
+      if (log_message) {
+        VLOG(1) << "  Data";
+        fs.wants_writer = true;
+        if (!is_local) {
           fs.log_type = LogType::kLogRemoteMessage;
         } else {
           fs.data_node_index = our_node_index;
@@ -590,7 +596,7 @@
 
         max_header_size_ = std::max(max_header_size_,
                                     fbb.GetSize() - f.fetcher->context().size);
-        f.writer->QueueSizedFlatbuffer(&fbb, source_node_boot_uuid, end);
+        f.writer->QueueMessage(&fbb, source_node_boot_uuid, end);
       }
 
       if (f.timestamp_writer != nullptr) {
@@ -613,11 +619,10 @@
                        flatbuffers::GetSizePrefixedRoot<MessageHeader>(
                            fbb.GetBufferPointer()));
 
-        // TODO(austin): How do I track remote timestamp boot UUIDs?  I need to
-        // update the uuid list in the header when one changes and track
-        // timestamps.
-        f.timestamp_writer->QueueSizedFlatbuffer(&fbb, event_loop_->boot_uuid(),
-                                                 end);
+        // Tell our writer that we know something about the remote boot.
+        f.timestamp_writer->UpdateRemote(f.data_node_index,
+                                         f.fetcher->context().source_boot_uuid);
+        f.timestamp_writer->QueueMessage(&fbb, event_loop_->boot_uuid(), end);
       }
 
       if (f.contents_writer != nullptr) {
@@ -671,7 +676,7 @@
         const auto end = event_loop_->monotonic_now();
         RecordCreateMessageTime(start, end, &f);
 
-        f.contents_writer->QueueSizedFlatbuffer(
+        f.contents_writer->QueueMessage(
             &fbb, UUID::FromVector(msg->boot_uuid()), end);
       }
 
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index a7d627d..a02f4ea 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -483,18 +483,26 @@
   bool shared() const { return GetParam().shared; }
 
   std::vector<std::string> MakeLogFiles(std::string logfile_base1,
-                                        std::string logfile_base2) {
+                                        std::string logfile_base2,
+                                        size_t pi1_data_count = 2,
+                                        size_t pi2_data_count = 2) {
     std::vector<std::string> result;
     result.emplace_back(
         absl::StrCat(logfile_base1, "_", GetParam().sha256, ".bfbs"));
     result.emplace_back(
         absl::StrCat(logfile_base2, "_", GetParam().sha256, ".bfbs"));
-    result.emplace_back(logfile_base1 + "_pi1_data.part0.bfbs");
+    for (size_t i = 0; i < pi1_data_count; ++i) {
+      result.emplace_back(
+          absl::StrCat(logfile_base1, "_pi1_data.part", i, ".bfbs"));
+    }
     result.emplace_back(logfile_base1 +
                         "_pi2_data/test/aos.examples.Pong.part0.bfbs");
     result.emplace_back(logfile_base1 +
                         "_pi2_data/test/aos.examples.Pong.part1.bfbs");
-    result.emplace_back(logfile_base2 + "_pi2_data.part0.bfbs");
+    for (size_t i = 0; i < pi2_data_count; ++i) {
+      result.emplace_back(
+          absl::StrCat(logfile_base2, "_pi2_data.part", i, ".bfbs"));
+    }
     result.emplace_back(
         logfile_base2 +
         "_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part0.bfbs");
@@ -553,6 +561,8 @@
   std::vector<std::string> MakePi1RebootLogfiles() {
     std::vector<std::string> result;
     result.emplace_back(logfile_base1_ + "_pi1_data.part0.bfbs");
+    result.emplace_back(logfile_base1_ + "_pi1_data.part1.bfbs");
+    result.emplace_back(logfile_base1_ + "_pi1_data.part2.bfbs");
     result.emplace_back(logfile_base1_ +
                         "_pi2_data/test/aos.examples.Pong.part0.bfbs");
     result.emplace_back(logfile_base1_ +
@@ -640,17 +650,17 @@
 
   std::vector<std::vector<std::string>> StructureLogFiles() {
     std::vector<std::vector<std::string>> result{
-        std::vector<std::string>{logfiles_[2]},
-        std::vector<std::string>{logfiles_[3], logfiles_[4]},
-        std::vector<std::string>{logfiles_[5]},
+        std::vector<std::string>{logfiles_[2], logfiles_[3]},
+        std::vector<std::string>{logfiles_[4], logfiles_[5]},
         std::vector<std::string>{logfiles_[6], logfiles_[7]},
         std::vector<std::string>{logfiles_[8], logfiles_[9]},
         std::vector<std::string>{logfiles_[10], logfiles_[11]},
-        std::vector<std::string>{logfiles_[12], logfiles_[13]}};
+        std::vector<std::string>{logfiles_[12], logfiles_[13]},
+        std::vector<std::string>{logfiles_[14], logfiles_[15]}};
 
     if (!shared()) {
       result.emplace_back(
-          std::vector<std::string>{logfiles_[14], logfiles_[15]});
+          std::vector<std::string>{logfiles_[16], logfiles_[17]});
     }
 
     return result;
@@ -929,27 +939,29 @@
 
     // And confirm everything is on the correct node.
     EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
-    EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi2");
+    EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi1");
     EXPECT_EQ(log_header[4].message().node()->name()->string_view(), "pi2");
     EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
-    EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi1");
-    EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi1");
-    EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi2");
-    EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi2");
+    EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi2");
+    EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi2");
+    EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi1");
+    EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi1");
     EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi2");
     EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi2");
-    EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi1");
-    EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi1");
+    EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
+    EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
+    EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi1");
+    EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi1");
     if (!shared()) {
-      EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
-      EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi2");
+      EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi2");
+      EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi2");
     }
 
     // And the parts index matches.
     EXPECT_EQ(log_header[2].message().parts_index(), 0);
-    EXPECT_EQ(log_header[3].message().parts_index(), 0);
-    EXPECT_EQ(log_header[4].message().parts_index(), 1);
-    EXPECT_EQ(log_header[5].message().parts_index(), 0);
+    EXPECT_EQ(log_header[3].message().parts_index(), 1);
+    EXPECT_EQ(log_header[4].message().parts_index(), 0);
+    EXPECT_EQ(log_header[5].message().parts_index(), 1);
     EXPECT_EQ(log_header[6].message().parts_index(), 0);
     EXPECT_EQ(log_header[7].message().parts_index(), 1);
     EXPECT_EQ(log_header[8].message().parts_index(), 0);
@@ -958,9 +970,11 @@
     EXPECT_EQ(log_header[11].message().parts_index(), 1);
     EXPECT_EQ(log_header[12].message().parts_index(), 0);
     EXPECT_EQ(log_header[13].message().parts_index(), 1);
+    EXPECT_EQ(log_header[14].message().parts_index(), 0);
+    EXPECT_EQ(log_header[15].message().parts_index(), 1);
     if (!shared()) {
-      EXPECT_EQ(log_header[14].message().parts_index(), 0);
-      EXPECT_EQ(log_header[15].message().parts_index(), 1);
+      EXPECT_EQ(log_header[16].message().parts_index(), 0);
+      EXPECT_EQ(log_header[17].message().parts_index(), 1);
     }
   }
 
@@ -974,161 +988,178 @@
     EXPECT_THAT(
         CountChannelsData(config, logfiles_[2]),
         UnorderedElementsAre(
+            std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
+                            1),
+            std::make_tuple("/test", "aos.examples.Ping", 1)))
+        << " : " << logfiles_[2];
+    EXPECT_THAT(
+        CountChannelsData(config, logfiles_[3]),
+        UnorderedElementsAre(
             std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
             std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
-                            21),
+                            20),
             std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
                             200),
             std::make_tuple("/pi1/aos", "aos.timing.Report", 40),
-            std::make_tuple("/test", "aos.examples.Ping", 2001)))
-        << " : " << logfiles_[2];
+            std::make_tuple("/test", "aos.examples.Ping", 2000)))
+        << " : " << logfiles_[3];
     // Timestamps for pong
+    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
+                UnorderedElementsAre())
+        << " : " << logfiles_[2];
     EXPECT_THAT(
-        CountChannelsTimestamp(config, logfiles_[2]),
+        CountChannelsTimestamp(config, logfiles_[3]),
         UnorderedElementsAre(
             std::make_tuple("/test", "aos.examples.Pong", 2001),
             std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)))
-        << " : " << logfiles_[2];
+        << " : " << logfiles_[3];
 
     // Pong data.
     EXPECT_THAT(
-        CountChannelsData(config, logfiles_[3]),
+        CountChannelsData(config, logfiles_[4]),
         UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 91)))
-        << " : " << logfiles_[3];
-    EXPECT_THAT(CountChannelsData(config, logfiles_[4]),
+        << " : " << logfiles_[4];
+    EXPECT_THAT(CountChannelsData(config, logfiles_[5]),
                 UnorderedElementsAre(
                     std::make_tuple("/test", "aos.examples.Pong", 1910)))
-        << " : " << logfiles_[3];
+        << " : " << logfiles_[5];
 
     // No timestamps
-    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[3]),
-                UnorderedElementsAre())
-        << " : " << logfiles_[3];
     EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[4]),
                 UnorderedElementsAre())
         << " : " << logfiles_[4];
+    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
+                UnorderedElementsAre())
+        << " : " << logfiles_[5];
 
     // Timing reports and pongs.
+    EXPECT_THAT(CountChannelsData(config, logfiles_[6]),
+                UnorderedElementsAre(std::make_tuple(
+                    "/pi2/aos", "aos.message_bridge.ServerStatistics", 1)))
+        << " : " << logfiles_[6];
     EXPECT_THAT(
-        CountChannelsData(config, logfiles_[5]),
+        CountChannelsData(config, logfiles_[7]),
         UnorderedElementsAre(
             std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
             std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
-                            21),
+                            20),
             std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
                             200),
             std::make_tuple("/pi2/aos", "aos.timing.Report", 40),
             std::make_tuple("/test", "aos.examples.Pong", 2001)))
-        << " : " << logfiles_[5];
+        << " : " << logfiles_[7];
     // And ping timestamps.
-    EXPECT_THAT(
-        CountChannelsTimestamp(config, logfiles_[5]),
-        UnorderedElementsAre(
-            std::make_tuple("/test", "aos.examples.Ping", 2001),
-            std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
-        << " : " << logfiles_[5];
-
-    // And then test that the remotely logged timestamp data files only have
-    // timestamps in them.
     EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[6]),
                 UnorderedElementsAre())
         << " : " << logfiles_[6];
-    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[7]),
-                UnorderedElementsAre())
+    EXPECT_THAT(
+        CountChannelsTimestamp(config, logfiles_[7]),
+        UnorderedElementsAre(
+            std::make_tuple("/test", "aos.examples.Ping", 2001),
+            std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
         << " : " << logfiles_[7];
+
+    // And then test that the remotely logged timestamp data files only have
+    // timestamps in them.
     EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[8]),
                 UnorderedElementsAre())
         << " : " << logfiles_[8];
     EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[9]),
                 UnorderedElementsAre())
         << " : " << logfiles_[9];
-
-    EXPECT_THAT(CountChannelsData(config, logfiles_[6]),
-                UnorderedElementsAre(std::make_tuple(
-                    "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
-        << " : " << logfiles_[6];
-    EXPECT_THAT(CountChannelsData(config, logfiles_[7]),
-                UnorderedElementsAre(std::make_tuple(
-                    "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
-        << " : " << logfiles_[7];
+    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
+                UnorderedElementsAre())
+        << " : " << logfiles_[10];
+    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
+                UnorderedElementsAre())
+        << " : " << logfiles_[11];
 
     EXPECT_THAT(CountChannelsData(config, logfiles_[8]),
                 UnorderedElementsAre(std::make_tuple(
-                    "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
+                    "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
         << " : " << logfiles_[8];
     EXPECT_THAT(CountChannelsData(config, logfiles_[9]),
                 UnorderedElementsAre(std::make_tuple(
-                    "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
+                    "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
         << " : " << logfiles_[9];
 
-    // Timestamps from pi2 on pi1, and the other way.
     EXPECT_THAT(CountChannelsData(config, logfiles_[10]),
-                UnorderedElementsAre())
+                UnorderedElementsAre(std::make_tuple(
+                    "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
         << " : " << logfiles_[10];
     EXPECT_THAT(CountChannelsData(config, logfiles_[11]),
-                UnorderedElementsAre())
+                UnorderedElementsAre(std::make_tuple(
+                    "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
         << " : " << logfiles_[11];
+
+    // Timestamps from pi2 on pi1, and the other way.
     EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
                 UnorderedElementsAre())
         << " : " << logfiles_[12];
     EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
                 UnorderedElementsAre())
         << " : " << logfiles_[13];
+    EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
+                UnorderedElementsAre())
+        << " : " << logfiles_[14];
+    EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
+                UnorderedElementsAre())
+        << " : " << logfiles_[15];
     if (!shared()) {
-      EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
+      EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
                   UnorderedElementsAre())
-          << " : " << logfiles_[14];
-      EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
+          << " : " << logfiles_[16];
+      EXPECT_THAT(CountChannelsData(config, logfiles_[17]),
                   UnorderedElementsAre())
-          << " : " << logfiles_[15];
+          << " : " << logfiles_[17];
     }
 
     if (shared()) {
       EXPECT_THAT(
-          CountChannelsTimestamp(config, logfiles_[10]),
+          CountChannelsTimestamp(config, logfiles_[12]),
           UnorderedElementsAre(
               std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 9),
               std::make_tuple("/test", "aos.examples.Ping", 91)))
-          << " : " << logfiles_[10];
+          << " : " << logfiles_[12];
       EXPECT_THAT(
-          CountChannelsTimestamp(config, logfiles_[11]),
+          CountChannelsTimestamp(config, logfiles_[13]),
           UnorderedElementsAre(
               std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 191),
               std::make_tuple("/test", "aos.examples.Ping", 1910)))
-          << " : " << logfiles_[11];
-      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[12]),
-                  UnorderedElementsAre(std::make_tuple(
-                      "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
-          << " : " << logfiles_[12];
-      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[13]),
-                  UnorderedElementsAre(std::make_tuple(
-                      "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
-          << " : " << logfiles_[13];
-    } else {
-      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
-                  UnorderedElementsAre(std::make_tuple(
-                      "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
-          << " : " << logfiles_[10];
-      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
-                  UnorderedElementsAre(std::make_tuple(
-                      "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
-          << " : " << logfiles_[11];
-      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[12]),
-                  UnorderedElementsAre(std::make_tuple(
-                      "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
-          << " : " << logfiles_[12];
-      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[13]),
-                  UnorderedElementsAre(std::make_tuple(
-                      "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
           << " : " << logfiles_[13];
       EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
-                  UnorderedElementsAre(
-                      std::make_tuple("/test", "aos.examples.Ping", 91)))
+                  UnorderedElementsAre(std::make_tuple(
+                      "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
           << " : " << logfiles_[14];
       EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
+                  UnorderedElementsAre(std::make_tuple(
+                      "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
+          << " : " << logfiles_[15];
+    } else {
+      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[12]),
+                  UnorderedElementsAre(std::make_tuple(
+                      "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
+          << " : " << logfiles_[12];
+      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[13]),
+                  UnorderedElementsAre(std::make_tuple(
+                      "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
+          << " : " << logfiles_[13];
+      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
+                  UnorderedElementsAre(std::make_tuple(
+                      "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
+          << " : " << logfiles_[14];
+      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
+                  UnorderedElementsAre(std::make_tuple(
+                      "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
+          << " : " << logfiles_[15];
+      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[16]),
+                  UnorderedElementsAre(
+                      std::make_tuple("/test", "aos.examples.Ping", 91)))
+          << " : " << logfiles_[16];
+      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
                   UnorderedElementsAre(
                       std::make_tuple("/test", "aos.examples.Ping", 1910)))
-          << " : " << logfiles_[15];
+          << " : " << logfiles_[17];
     }
   }
 
@@ -1336,7 +1367,10 @@
     event_loop_factory_.RunFor(chrono::milliseconds(20000));
   }
 
-  LogReader reader(SortParts(logfiles_));
+  // Since we delay starting pi2, it already knows about all the timestamps so
+  // we don't end up with extra parts.
+  LogReader reader(
+      SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 2, 1)));
 
   SimulatedEventLoopFactory log_reader_factory(reader.configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));