Save remote timestamps in timestamp log file headers

We have the earliest time a message made it across the network in each
log file.  Previously, this was only done for the main data files, so
we'd only get timestamps when our message_bridge_client is connected to
the remote.  We see some logs where we never connect, so we never get
those timestamps.  There is still enough information in the log files to
sort if we fix that.

There are 3 parts to this.
  1) Clarify that we really mean source node for the timestamps, not
     logger node.
  2) Put the timestamps in the log file header
  3) Update the sorting code.

This exposes a bug in the log reader where it can't handle a node
without a start time.  A follow-up commit will fix that, but that can
happen while this is being reviewed.

Change-Id: Iff898dc30ec0fb54d670ea21412d8ad35ab80f4c
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index 9a5d038..b41212c 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -46,11 +46,33 @@
   }
 }
 
-void NewDataWriter::Reboot() {
+void NewDataWriter::Reboot(const UUID &source_node_boot_uuid) {
   parts_uuid_ = UUID::Random();
   ++parts_index_;
   reopen_(this);
   header_written_ = false;
+  for (State &state : state_) {
+    state.boot_uuid = UUID::Zero();
+    state.oldest_remote_monotonic_timestamp = monotonic_clock::max_time;
+    state.oldest_local_monotonic_timestamp = monotonic_clock::max_time;
+    state.oldest_remote_unreliable_monotonic_timestamp =
+        monotonic_clock::max_time;
+    state.oldest_local_unreliable_monotonic_timestamp =
+        monotonic_clock::max_time;
+  }
+
+  state_[node_index_].boot_uuid = source_node_boot_uuid;
+
+  VLOG(1) << "Rebooted " << filename();
+}
+
+void NewDataWriter::UpdateBoot(const UUID &source_node_boot_uuid) {
+  if (state_[node_index_].boot_uuid != source_node_boot_uuid) {
+    state_[node_index_].boot_uuid = source_node_boot_uuid;
+    if (header_written_) {
+      Reboot(source_node_boot_uuid);
+    }
+  }
 }
 
 void NewDataWriter::UpdateRemote(
@@ -77,7 +99,6 @@
     rotate = true;
   }
 
-
   // Did the unreliable timestamps change?
   if (!reliable) {
     if (state.oldest_remote_unreliable_monotonic_timestamp >
@@ -113,18 +134,15 @@
                                  const UUID &source_node_boot_uuid,
                                  aos::monotonic_clock::time_point now) {
   // Trigger a reboot if we detect the boot UUID change.
-  if (state_[node_index_].boot_uuid != source_node_boot_uuid) {
-    state_[node_index_].boot_uuid = source_node_boot_uuid;
-    if (header_written_) {
-      Reboot();
-    }
+  UpdateBoot(source_node_boot_uuid);
 
+  if (!header_written_) {
     QueueHeader(MakeHeader());
   }
 
   // If the start time has changed for this node, trigger a rotation.
   if (log_namer_->monotonic_start_time(node_index_, source_node_boot_uuid) !=
-           monotonic_start_time_) {
+      monotonic_start_time_) {
     CHECK(header_written_);
     Rotate();
   }
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index 00e1856..c3fc5d4 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -59,6 +59,11 @@
                     const UUID &node_boot_uuid,
                     aos::monotonic_clock::time_point now);
 
+  // Updates the current boot for the source node.  This is useful when you want
+  // to queue a message that may trigger a reboot rotation, but then need to
+  // update the remote timestamps.
+  void UpdateBoot(const UUID &source_node_boot_uuid);
+
   // Returns the filename of the writer.
   std::string_view filename() const {
     return writer ? writer->filename() : "(closed)";
@@ -97,7 +102,7 @@
 
  private:
   // Signals that a node has rebooted.
-  void Reboot();
+  void Reboot(const UUID &source_node_boot_uuid);
 
   void QueueHeader(
       aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &&header);
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index f379ca4..8eaeb73 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -695,6 +695,20 @@
         const auto end = event_loop_->monotonic_now();
         RecordCreateMessageTime(start, end, &f);
 
+        // Timestamps tell us information about what happened too!
+        // Capture any reboots so UpdateRemote is properly recorded.
+        f.contents_writer->UpdateBoot(UUID::FromVector(msg->boot_uuid()));
+
+        // Start with recording info about the data flowing from our node to the
+        // remote.
+        f.contents_writer->UpdateRemote(
+            node_index_, event_loop_->boot_uuid(),
+            monotonic_clock::time_point(
+                chrono::nanoseconds(msg->monotonic_remote_time())),
+            monotonic_clock::time_point(
+                chrono::nanoseconds(msg->monotonic_sent_time())),
+            f.reliable_forwarding);
+
         f.contents_writer->QueueMessage(
             &fbb, UUID::FromVector(msg->boot_uuid()), end);
       }
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 7610295..5000ff9 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -297,7 +297,7 @@
   std::set<std::string> config_sha256_list;
 
   // Map from a observed pair of boots to the associated timestamps.
-  // logger_node -> logger_node_boot_uuid -> destination_node index ->
+  // source_node -> source_node_boot_uuid -> destination_node index ->
   // destination_boot_uuid -> list of all times from all parts.
   absl::btree_map<
       std::string,
@@ -544,7 +544,7 @@
     }
 
     VLOG(1) << "Parts: " << parts_uuid << ", source boot uuid "
-            << source_boot_uuid << " " << parts_index;
+            << source_boot_uuid << " index " << parts_index;
     auto it = log_it->second.unsorted_parts.find(
         std::pair(parts_uuid, std::string(source_boot_uuid)));
     if (it == log_it->second.unsorted_parts.end()) {
@@ -569,12 +569,12 @@
     // We've got a newer log with boot_uuids, and oldest timestamps.  Fill in
     // this->boot_times with the info we have found.
     if (HasNewTimestamps(&log_header->message())) {
-      auto node_boot_times_it = boot_times.find(logger_node);
+      auto node_boot_times_it = boot_times.find(node);
       if (node_boot_times_it == boot_times.end()) {
         node_boot_times_it =
             boot_times
                 .emplace(
-                    logger_node,
+                    node,
                     absl::btree_map<
                         std::string,
                         absl::btree_map<
@@ -583,14 +583,15 @@
                                             std::vector<BootPairTimes>>>>())
                 .first;
       }
+
       auto source_boot_times_it =
-          node_boot_times_it->second.find(std::string(logger_boot_uuid));
+          node_boot_times_it->second.find(std::string(source_boot_uuid));
 
       if (source_boot_times_it == node_boot_times_it->second.end()) {
         source_boot_times_it =
             node_boot_times_it->second
                 .emplace(
-                    logger_boot_uuid,
+                    source_boot_uuid,
                     absl::btree_map<
                         size_t, absl::btree_map<std::string,
                                                 std::vector<BootPairTimes>>>())
@@ -616,6 +617,7 @@
                    .oldest_remote_unreliable_monotonic_timestamps()
                    ->size());
       CHECK(!logger_boot_uuid.empty());
+      CHECK(!source_boot_uuid.empty());
       for (size_t node_index = 0; node_index < boot_uuids_size; ++node_index) {
         const std::string_view boot_uuid =
             log_header->message().boot_uuids()->Get(node_index)->string_view();
@@ -638,7 +640,7 @@
                 log_header->message()
                     .oldest_remote_unreliable_monotonic_timestamps()
                     ->Get(node_index)));
-        if (boot_uuid.empty() || boot_uuid == logger_boot_uuid) {
+        if (boot_uuid.empty() || boot_uuid == source_boot_uuid) {
           CHECK_EQ(oldest_local_monotonic_timestamp, monotonic_clock::max_time);
           CHECK_EQ(oldest_remote_monotonic_timestamp,
                    monotonic_clock::max_time);
@@ -775,31 +777,46 @@
       << *config_sha256_list.begin();
   const Configuration *config = config_it->second.get();
 
-  for (const auto &node_boot_times : boot_times) {
-    const std::string &logger_node_name = node_boot_times.first;
+  // Map from a observed pair of boots to the associated timestamps.
+  // destination_node -> destination_node_boot_uuid -> source_node ->
+  // source_boot_uuid -> list of all times from all parts.
+  //
+  // This is boot_times but flipped inside out so we can order on destinations
+  // instead of sources for when we have the same destination for 2 different
+  // boots.
+  absl::btree_map<
+      std::string,
+      absl::btree_map<
+          std::string,
+          absl::btree_map<std::string,
+                          absl::btree_map<std::string, BootPairTimes>>>>
+      reverse_boot_times;
 
-    // We know nothing about the order of the logger node's boot, but we
+  for (const auto &node_boot_times : boot_times) {
+    const std::string &source_node_name = node_boot_times.first;
+
+    // We know nothing about the order of the source node's boot, but we
     // know it happened.  If there is only 1 single boot, the constraint
     // code will happily mark it as boot 0.  Otherwise we'll get the
     // appropriate boot count if it can be computed or an error.
     //
     // Add it to the boots list to kick this off.
     auto logger_node_boot_constraints_it =
-        boot_constraints.find(logger_node_name);
+        boot_constraints.find(source_node_name);
     if (logger_node_boot_constraints_it == boot_constraints.end()) {
       logger_node_boot_constraints_it =
           boot_constraints
-              .insert(std::make_pair(logger_node_name, NodeBootState()))
+              .insert(std::make_pair(source_node_name, NodeBootState()))
               .first;
     }
 
-    for (const auto &source_boot_time : node_boot_times.second) {
-      const std::string &logger_boot_uuid = source_boot_time.first;
-      logger_node_boot_constraints_it->second.boots.insert(logger_boot_uuid);
+    for (const auto &destination_boot_time : node_boot_times.second) {
+      const std::string &source_boot_uuid = destination_boot_time.first;
+      logger_node_boot_constraints_it->second.boots.insert(source_boot_uuid);
 
-      for (const auto &source_nodes : source_boot_time.second) {
-        const std::string source_node_name =
-            config->nodes()->Get(source_nodes.first)->name()->str();
+      for (const auto &destination_nodes : destination_boot_time.second) {
+        const std::string destination_node_name =
+            config->nodes()->Get(destination_nodes.first)->name()->str();
 
         // Now, we have a bunch of remote boots for the same local boot and
         // remote node.  We want to sort them by observed local time.  This will
@@ -807,7 +824,7 @@
         // node too so we can check for overlapping boots.
         std::vector<std::tuple<std::string, BootPairTimes, BootPairTimes>>
             source_boot_times;
-        for (const auto &boot_time_list : source_nodes.second) {
+        for (const auto &boot_time_list : destination_nodes.second) {
           // Track the first boot time we have evidence of.
           BootPairTimes boot_time = boot_time_list.second[0];
           // And the last one so we can look for overlapping boots.
@@ -817,13 +834,26 @@
             if (next_boot_time.oldest_local_unreliable_monotonic_timestamp !=
                 aos::monotonic_clock::max_time) {
               VLOG(1)
-                  << "Remote time "
+                  << "Unreliable remote time "
                   << next_boot_time.oldest_remote_unreliable_monotonic_timestamp
-                  << " " << boot_time_list.first;
+                  << " remote " << boot_time_list.first << " local "
+                  << source_boot_uuid;
               VLOG(1)
-                  << "Local time "
+                  << "Unreliable local time "
                   << next_boot_time.oldest_local_unreliable_monotonic_timestamp
-                  << " " << boot_time_list.first;
+                  << " remote " << boot_time_list.first << " local "
+                  << source_boot_uuid;
+            }
+            if (next_boot_time.oldest_local_monotonic_timestamp !=
+                aos::monotonic_clock::max_time) {
+              VLOG(1) << "Reliable remote time "
+                      << next_boot_time.oldest_remote_monotonic_timestamp
+                      << " remote " << boot_time_list.first << " local "
+                      << source_boot_uuid;
+              VLOG(1) << "Reliable local time "
+                      << next_boot_time.oldest_local_monotonic_timestamp
+                      << " remote " << boot_time_list.first << " local "
+                      << source_boot_uuid;
             }
             // If we found an existing entry, update the min to be the min of
             // all records.  This lets us combine info from multiple part files.
@@ -867,6 +897,57 @@
           }
           source_boot_times.emplace_back(
               std::make_tuple(boot_time_list.first, boot_time, max_boot_time));
+
+          // While we are building up the forwards set of constraints, build up
+          // a list of reverse constraints.  This gives us a list of boots for
+          // the case when we have 2 logs from different boots, where the remote
+          // is both the same boot.
+
+          auto reverse_destination_node_it =
+              reverse_boot_times.find(destination_node_name);
+          if (reverse_destination_node_it == reverse_boot_times.end()) {
+            reverse_destination_node_it =
+                reverse_boot_times
+                    .emplace(
+                        destination_node_name,
+                        absl::btree_map<
+                            std::string,
+                            absl::btree_map<
+                                std::string,
+                                absl::btree_map<std::string, BootPairTimes>>>())
+                    .first;
+          }
+
+          auto reverse_destination_node_boot_uuid_it =
+              reverse_destination_node_it->second.find(boot_time_list.first);
+          if (reverse_destination_node_boot_uuid_it ==
+              reverse_destination_node_it->second.end()) {
+            reverse_destination_node_boot_uuid_it =
+                reverse_destination_node_it->second
+                    .emplace(boot_time_list.first,
+                             absl::btree_map<
+                                 std::string,
+                                 absl::btree_map<std::string, BootPairTimes>>())
+                    .first;
+          }
+
+          auto reverse_source_node_it =
+              reverse_destination_node_boot_uuid_it->second.find(
+                  source_node_name);
+          if (reverse_source_node_it ==
+              reverse_destination_node_boot_uuid_it->second.end()) {
+            reverse_source_node_it =
+                reverse_destination_node_boot_uuid_it->second
+                    .emplace(source_node_name,
+                             absl::btree_map<std::string, BootPairTimes>())
+                    .first;
+          }
+
+          auto reverse_source_node_boot_uuid_it =
+              reverse_source_node_it->second.find(source_boot_uuid);
+          CHECK(reverse_source_node_boot_uuid_it ==
+                reverse_source_node_it->second.end());
+          reverse_source_node_it->second.emplace(source_boot_uuid, boot_time);
         }
         std::sort(
             source_boot_times.begin(), source_boot_times.end(),
@@ -918,17 +999,17 @@
               const std::string &fatal_source_boot_uuid =
                   std::get<0>(fatal_boot_time);
               LOG(ERROR) << "Boot " << fatal_boot_id << ", "
-                         << fatal_source_boot_uuid << " on " << source_node_name
+                         << fatal_source_boot_uuid << " on " << destination_node_name
                          << " spans ["
                          << std::get<1>(fatal_boot_time)
                                 .oldest_local_unreliable_monotonic_timestamp
                          << ", "
                          << std::get<2>(fatal_boot_time)
                                 .oldest_local_unreliable_monotonic_timestamp
-                         << "] on logger " << logger_node_name;
+                         << "] on source " << destination_node_name;
             }
-            LOG(FATAL) << "Found overlapping boots on " << source_node_name
-                       << " logged on " << logger_node_name << ", "
+            LOG(FATAL) << "Found overlapping boots on " << destination_node_name
+                       << " source node " << destination_node_name << ", "
                        << std::get<1>(boot_time)
                               .oldest_local_unreliable_monotonic_timestamp
                        << " < " << last_boot_time;
@@ -938,11 +1019,11 @@
                                .oldest_local_unreliable_monotonic_timestamp;
 
           auto source_node_boot_constraints_it =
-              boot_constraints.find(source_node_name);
+              boot_constraints.find(destination_node_name);
           if (source_node_boot_constraints_it == boot_constraints.end()) {
             source_node_boot_constraints_it =
                 boot_constraints
-                    .insert(std::make_pair(source_node_name, NodeBootState()))
+                    .insert(std::make_pair(destination_node_name, NodeBootState()))
                     .first;
           }
 
@@ -995,6 +1076,101 @@
     }
   }
 
+  // Now, sort the reverse direction so we can handle when we only get
+  // timestamps and need to make sense of the boots.
+  for (const auto &destination_node : reverse_boot_times) {
+    for (const auto &destination_node_boot_uuid : destination_node.second) {
+      for (const auto &source_node : destination_node_boot_uuid.second) {
+        // Now, we need to take all the boots + times and put them in a list to
+        // sort and derive constraints from.
+
+        std::vector<std::pair<std::string, BootPairTimes>>
+            destination_boot_times;
+        for (const auto &source_boot_uuid : source_node.second) {
+          destination_boot_times.emplace_back(source_boot_uuid);
+        }
+
+        std::sort(
+            destination_boot_times.begin(), destination_boot_times.end(),
+            [](const std::pair<std::string, BootPairTimes> &a,
+               const std::pair<std::string, BootPairTimes> &b) {
+              // There are cases where we will only have a reliable timestamp.
+              // In that case, we need to use oldest_remote_monotonic_timestamp.
+              // But, that may result in collisions if the same message gets
+              // forwarded to both boots, so it will have the same timestamp.
+              // Solve that by breaking the tie with the unreliable messages.
+              if (a.second.oldest_remote_monotonic_timestamp ==
+                  b.second.oldest_remote_monotonic_timestamp) {
+                CHECK_NE(a.second.oldest_remote_unreliable_monotonic_timestamp,
+                         b.second.oldest_remote_unreliable_monotonic_timestamp);
+                return a.second.oldest_remote_unreliable_monotonic_timestamp <
+                       b.second.oldest_remote_unreliable_monotonic_timestamp;
+              } else {
+                return a.second.oldest_remote_monotonic_timestamp <
+                       b.second.oldest_remote_monotonic_timestamp;
+              }
+            });
+
+        for (size_t boot_id = 0; boot_id < destination_boot_times.size();
+             ++boot_id) {
+          const std::pair<std::string, BootPairTimes> &boot_time =
+              destination_boot_times[boot_id];
+          const std::string &source_boot_uuid = boot_time.first;
+
+          auto destination_node_boot_constraints_it =
+              boot_constraints.find(source_node.first);
+          if (destination_node_boot_constraints_it == boot_constraints.end()) {
+            destination_node_boot_constraints_it =
+                boot_constraints
+                    .insert(std::make_pair(source_node.first, NodeBootState()))
+                    .first;
+          }
+
+          // Track that this boot happened.
+          destination_node_boot_constraints_it->second.boots.insert(
+              source_boot_uuid);
+          if (boot_id > 0) {
+            const std::pair<std::string, BootPairTimes> &prior_boot_time =
+                destination_boot_times[boot_id - 1];
+            const std::string &prior_boot_uuid = prior_boot_time.first;
+
+            std::map<std::string, std::vector<std::pair<std::string, bool>>>
+                &per_node_boot_constraints =
+                    destination_node_boot_constraints_it->second.constraints;
+
+            auto first_per_boot_constraints =
+                per_node_boot_constraints.find(prior_boot_uuid);
+            if (first_per_boot_constraints == per_node_boot_constraints.end()) {
+              first_per_boot_constraints =
+                  per_node_boot_constraints
+                      .insert(std::make_pair(
+                          prior_boot_uuid,
+                          std::vector<std::pair<std::string, bool>>()))
+                      .first;
+            }
+
+            auto second_per_boot_constraints =
+                per_node_boot_constraints.find(source_boot_uuid);
+            if (second_per_boot_constraints ==
+                per_node_boot_constraints.end()) {
+              second_per_boot_constraints =
+                  per_node_boot_constraints
+                      .insert(std::make_pair(
+                          source_boot_uuid,
+                          std::vector<std::pair<std::string, bool>>()))
+                      .first;
+            }
+
+            first_per_boot_constraints->second.emplace_back(
+                std::make_pair(source_boot_uuid, true));
+            second_per_boot_constraints->second.emplace_back(
+                std::make_pair(prior_boot_uuid, false));
+          }
+        }
+      }
+    }
+  }
+
   return boot_constraints;
 }
 
diff --git a/aos/events/logging/logger.fbs b/aos/events/logging/logger.fbs
index 6d9f7cb..eb70184 100644
--- a/aos/events/logging/logger.fbs
+++ b/aos/events/logging/logger.fbs
@@ -136,6 +136,9 @@
   // For all channels sent from a specific node, these vectors hold the
   // timestamp of the oldest known message from that node, and the
   // corresponding monotonic timestamp for when that was received on our node.
+  //
+  // The local node is the node that this log file is from the perspective of
+  // (field 6)
   corrupted_oldest_remote_monotonic_timestamps:[int64] (id: 20, deprecated);
   corrupted_oldest_local_monotonic_timestamps:[int64] (id: 21, deprecated);
   oldest_remote_monotonic_timestamps:[int64] (id: 24);
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 1f784d3..6d45413 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -2518,6 +2518,231 @@
   }
 }
 
+// Tests that we can sort a log which only has timestamps from the remote
+// because the local message_bridge_client failed to connect.
+TEST_P(MultinodeLoggerTest, RemoteRebootOnlyTimestamps) {
+  const UUID pi1_boot0 = UUID::Random();
+  const UUID pi2_boot0 = UUID::Random();
+  const UUID pi2_boot1 = UUID::Random();
+  {
+    CHECK_EQ(pi1_index_, 0u);
+    CHECK_EQ(pi2_index_, 1u);
+
+    time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
+    time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
+    time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
+
+    time_converter_.AddNextTimestamp(
+        distributed_clock::epoch(),
+        {BootTimestamp::epoch(), BootTimestamp::epoch()});
+    const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
+    time_converter_.AddNextTimestamp(
+        distributed_clock::epoch() + reboot_time,
+        {BootTimestamp::epoch() + reboot_time,
+         BootTimestamp{
+             .boot = 1,
+             .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
+  }
+  pi2_->Disconnect(pi1_->node());
+
+  std::vector<std::string> filenames;
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+    EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
+              pi1_boot0);
+    EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
+              pi2_boot0);
+
+    StartLogger(&pi1_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(10000));
+
+    VLOG(1) << "Reboot now!";
+
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+    EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
+              pi1_boot0);
+    EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
+              pi2_boot1);
+    pi1_logger.AppendAllFilenames(&filenames);
+  }
+
+  // Confirm that our new oldest timestamps properly update as we reboot and
+  // rotate.
+  size_t timestamp_file_count = 0;
+  for (const std::string &file : filenames) {
+    std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
+        ReadHeader(file);
+    CHECK(log_header);
+
+    if (log_header->message().has_configuration()) {
+      continue;
+    }
+
+    const monotonic_clock::time_point monotonic_start_time =
+        monotonic_clock::time_point(
+            chrono::nanoseconds(log_header->message().monotonic_start_time()));
+    const UUID source_node_boot_uuid = UUID::FromString(
+        log_header->message().source_node_boot_uuid()->string_view());
+
+    ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
+    ASSERT_EQ(
+        log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
+    ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
+    ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
+              2u);
+    ASSERT_TRUE(log_header->message()
+                    .has_oldest_remote_unreliable_monotonic_timestamps());
+    ASSERT_EQ(log_header->message()
+                  .oldest_remote_unreliable_monotonic_timestamps()
+                  ->size(),
+              2u);
+    ASSERT_TRUE(log_header->message()
+                    .has_oldest_local_unreliable_monotonic_timestamps());
+    ASSERT_EQ(log_header->message()
+                  .oldest_local_unreliable_monotonic_timestamps()
+                  ->size(),
+              2u);
+
+    if (log_header->message().node()->name()->string_view() != "pi1") {
+      ASSERT_TRUE(file.find("aos.message_bridge.RemoteMessage") !=
+                  std::string::npos);
+
+      const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
+          ReadNthMessage(file, 0);
+      CHECK(msg);
+
+      EXPECT_TRUE(msg->message().has_monotonic_sent_time());
+      EXPECT_TRUE(msg->message().has_monotonic_remote_time());
+
+      const monotonic_clock::time_point
+          expected_oldest_local_monotonic_timestamps(
+              chrono::nanoseconds(msg->message().monotonic_sent_time()));
+      const monotonic_clock::time_point
+          expected_oldest_remote_monotonic_timestamps(
+              chrono::nanoseconds(msg->message().monotonic_remote_time()));
+
+      EXPECT_NE(expected_oldest_local_monotonic_timestamps,
+                monotonic_clock::min_time);
+      EXPECT_NE(expected_oldest_remote_monotonic_timestamps,
+                monotonic_clock::min_time);
+
+      ++timestamp_file_count;
+      // Since the log file is from the perspective of the other node,
+      const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
+          monotonic_clock::time_point(chrono::nanoseconds(
+              log_header->message().oldest_remote_monotonic_timestamps()->Get(
+                  0)));
+      const monotonic_clock::time_point oldest_local_monotonic_timestamps =
+          monotonic_clock::time_point(chrono::nanoseconds(
+              log_header->message().oldest_local_monotonic_timestamps()->Get(
+                  0)));
+      const monotonic_clock::time_point
+          oldest_remote_unreliable_monotonic_timestamps =
+              monotonic_clock::time_point(chrono::nanoseconds(
+                  log_header->message()
+                      .oldest_remote_unreliable_monotonic_timestamps()
+                      ->Get(0)));
+      const monotonic_clock::time_point
+          oldest_local_unreliable_monotonic_timestamps =
+              monotonic_clock::time_point(chrono::nanoseconds(
+                  log_header->message()
+                      .oldest_local_unreliable_monotonic_timestamps()
+                      ->Get(0)));
+      // Confirm that the oldest timestamps match what we expect.  Based on what
+      // we are doing, we know that the oldest time is the first message's time.
+      //
+      // This makes the test robust to both the split and combined config tests.
+      switch (log_header->message().parts_index()) {
+        case 0:
+        case 1:
+          EXPECT_EQ(oldest_remote_monotonic_timestamps,
+                    expected_oldest_remote_monotonic_timestamps);
+          EXPECT_EQ(oldest_local_monotonic_timestamps,
+                    expected_oldest_local_monotonic_timestamps);
+          EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+                    expected_oldest_remote_monotonic_timestamps);
+          EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+                    expected_oldest_local_monotonic_timestamps);
+          break;
+        default:
+          FAIL();
+          break;
+      }
+
+      switch (log_header->message().parts_index()) {
+        case 0:
+          EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
+          EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
+          break;
+        case 1:
+          EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
+          EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
+          break;
+        default:
+          FAIL();
+          break;
+      }
+
+      continue;
+    }
+    EXPECT_EQ(
+        log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
+        monotonic_clock::max_time.time_since_epoch().count());
+    EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
+              monotonic_clock::max_time.time_since_epoch().count());
+    EXPECT_EQ(log_header->message()
+                  .oldest_remote_unreliable_monotonic_timestamps()
+                  ->Get(0),
+              monotonic_clock::max_time.time_since_epoch().count());
+    EXPECT_EQ(log_header->message()
+                  .oldest_local_unreliable_monotonic_timestamps()
+                  ->Get(0),
+              monotonic_clock::max_time.time_since_epoch().count());
+
+    const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
+        monotonic_clock::time_point(chrono::nanoseconds(
+            log_header->message().oldest_remote_monotonic_timestamps()->Get(
+                1)));
+    const monotonic_clock::time_point oldest_local_monotonic_timestamps =
+        monotonic_clock::time_point(chrono::nanoseconds(
+            log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
+    const monotonic_clock::time_point
+        oldest_remote_unreliable_monotonic_timestamps =
+            monotonic_clock::time_point(chrono::nanoseconds(
+                log_header->message()
+                    .oldest_remote_unreliable_monotonic_timestamps()
+                    ->Get(1)));
+    const monotonic_clock::time_point
+        oldest_local_unreliable_monotonic_timestamps =
+            monotonic_clock::time_point(chrono::nanoseconds(
+                log_header->message()
+                    .oldest_local_unreliable_monotonic_timestamps()
+                    ->Get(1)));
+    switch (log_header->message().parts_index()) {
+      case 0:
+        EXPECT_EQ(oldest_remote_monotonic_timestamps,
+                  monotonic_clock::max_time);
+        EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
+        EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+                  monotonic_clock::max_time);
+        EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+                  monotonic_clock::max_time);
+        break;
+      default:
+        FAIL();
+        break;
+    }
+  }
+
+  EXPECT_TRUE(timestamp_file_count == 2u || timestamp_file_count == 4u);
+
+  // TODO(austin): Finish reading it.  We don't have a valid start time so
+  // log_reader gets rather grumpy.
+}
+
 // Tests that we properly handle one direction of message_bridge being
 // unavailable.
 TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {