Track matching forwarded times for log file sorting

Previously, the only thing we knew was a loose ordering based on part
index.  That isn't very conclusive.  This now provides us with
corresponding timestamps for when things cross the network boundary in a
way that we think should tell us definitively what came first.

There are 2 main problem cases.  Let's say we have 2 channels.  /a which
is reliable, and /b which isn't, both sent from the same remote node.

case 1:  /a -> boot 0 received on boot 0.
         /b -> boot 1 received on boot 0.
 We start logging after both messages arrive.

case 2:  /a -> boot 0 received on boot 0.
         /b -> boot 0 received on boot 0.
 We log for a bit, then reboot.  More messages show up when we reconnect.
         /a -> boot 0 received on boot 1.
         /b -> boot 0 received on boot 1.

In case 1: we only have a reliable timestamp from boot 0, but that
reliable timestamp makes it clear that /a was before /b, so boot 0 was
before boot 1.

In case 2: we have the same reliable timestamp, so that tells us nothing.
The unreliable timestamps though tell a different story.  /b will be after
/a, since any messages on /b generated before the reboot won't get
delivered.  So, we get an ordering constraint saying that any sent /b's
on the second boot were after /b on the first boot.

We believe that any other cases are covered by the same mechanism.
Without fully implementing and debugging the sorting code, we won't know
for certain.  Sanjay and I have been unable to break the logic so far.

Change-Id: I990bf249b18bf43072997cdb099ac66c2fa8fc57
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/configuration.cc b/aos/configuration.cc
index 7fad440..24e15d9 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -1137,15 +1137,15 @@
 bool ConnectionDeliveryTimeIsLoggedOnNode(const Channel *channel,
                                           const Node *node,
                                           const Node *logger_node) {
-  const Connection *connection = ConnectionToNode(channel, node);
-  if (connection == nullptr) {
-    return false;
-  }
-  return ConnectionDeliveryTimeIsLoggedOnNode(connection, logger_node);
+  return ConnectionDeliveryTimeIsLoggedOnNode(ConnectionToNode(channel, node),
+                                              logger_node);
 }
 
 bool ConnectionDeliveryTimeIsLoggedOnNode(const Connection *connection,
                                           const Node *node) {
+  if (connection == nullptr) {
+    return false;
+  }
   switch (connection->timestamp_logger()) {
     case LoggerConfig::LOCAL_AND_REMOTE_LOGGER:
       CHECK(connection->has_timestamp_logger_nodes());
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index a4b15e8..93dd02d 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -25,9 +25,8 @@
       log_namer_(log_namer),
       reopen_(std::move(reopen)),
       close_(std::move(close)) {
-  boot_uuids_.resize(configuration::NodesCount(log_namer->configuration_),
-                     UUID::Zero());
-  CHECK_LT(node_index_, boot_uuids_.size());
+  state_.resize(configuration::NodesCount(log_namer->configuration_));
+  CHECK_LT(node_index_, state_.size());
   reopen_(this);
 }
 
@@ -54,14 +53,44 @@
   header_written_ = false;
 }
 
-void NewDataWriter::UpdateRemote(size_t remote_node_index,
-                                 const UUID &remote_node_boot_uuid) {
-  CHECK_LT(remote_node_index, boot_uuids_.size());
-  if (boot_uuids_[remote_node_index] != remote_node_boot_uuid) {
+void NewDataWriter::UpdateRemote(
+    const size_t remote_node_index, const UUID &remote_node_boot_uuid,
+    const monotonic_clock::time_point monotonic_remote_time,
+    const monotonic_clock::time_point monotonic_event_time,
+    const bool reliable) {
+  bool rotate = false;
+  CHECK_LT(remote_node_index, state_.size());
+  State &state = state_[remote_node_index];
+  if (state.boot_uuid != remote_node_boot_uuid) {
     VLOG(1) << filename() << " Remote " << remote_node_index << " updated to "
-              << remote_node_boot_uuid << " from "
-              << boot_uuids_[remote_node_index];
-    boot_uuids_[remote_node_index] = remote_node_boot_uuid;
+            << remote_node_boot_uuid << " from " << state.boot_uuid;
+    state.boot_uuid = remote_node_boot_uuid;
+    state.oldest_remote_monotonic_timestamp = monotonic_clock::max_time;
+    state.oldest_local_monotonic_timestamp = monotonic_clock::max_time;
+    state.oldest_remote_unreliable_monotonic_timestamp =
+        monotonic_clock::max_time;
+    state.oldest_local_unreliable_monotonic_timestamp =
+        monotonic_clock::max_time;
+    rotate = true;
+  }
+
+  if (!reliable) {
+    if (state.oldest_remote_unreliable_monotonic_timestamp >
+        monotonic_remote_time) {
+      state.oldest_remote_unreliable_monotonic_timestamp =
+          monotonic_remote_time;
+      state.oldest_local_unreliable_monotonic_timestamp = monotonic_event_time;
+      rotate = true;
+    }
+  }
+
+  if (state.oldest_remote_monotonic_timestamp > monotonic_remote_time) {
+    state.oldest_remote_monotonic_timestamp = monotonic_remote_time;
+    state.oldest_local_monotonic_timestamp = monotonic_event_time;
+    rotate = true;
+  }
+
+  if (rotate) {
     Rotate();
   }
 }
@@ -70,15 +99,15 @@
                                  const UUID &source_node_boot_uuid,
                                  aos::monotonic_clock::time_point now) {
   // TODO(austin): Handle remote nodes changing too, not just the source node.
-  if (boot_uuids_[node_index_] != source_node_boot_uuid) {
-    boot_uuids_[node_index_] = source_node_boot_uuid;
+  if (state_[node_index_].boot_uuid != source_node_boot_uuid) {
+    state_[node_index_].boot_uuid = source_node_boot_uuid;
     if (header_written_) {
       Reboot();
     }
 
     QueueHeader(MakeHeader());
   }
-  CHECK_EQ(boot_uuids_[node_index_], source_node_boot_uuid);
+  CHECK_EQ(state_[node_index_].boot_uuid, source_node_boot_uuid);
   CHECK(header_written_) << ": Attempting to write message before header to "
                          << writer->filename();
   writer->QueueSizedFlatbuffer(fbb, now);
@@ -88,14 +117,14 @@
 NewDataWriter::MakeHeader() {
   const size_t logger_node_index = log_namer_->logger_node_index();
   const UUID &logger_node_boot_uuid = log_namer_->logger_node_boot_uuid();
-  if (boot_uuids_[logger_node_index] == UUID::Zero()) {
+  if (state_[logger_node_index].boot_uuid == UUID::Zero()) {
     VLOG(1) << filename() << " Logger node is " << logger_node_index
             << " and uuid is " << logger_node_boot_uuid;
-    boot_uuids_[logger_node_index] = logger_node_boot_uuid;
+    state_[logger_node_index].boot_uuid = logger_node_boot_uuid;
   } else {
-    CHECK_EQ(boot_uuids_[logger_node_index], logger_node_boot_uuid);
+    CHECK_EQ(state_[logger_node_index].boot_uuid, logger_node_boot_uuid);
   }
-  return log_namer_->MakeHeader(node_index_, boot_uuids_, parts_uuid(),
+  return log_namer_->MakeHeader(node_index_, state_, parts_uuid(),
                                 parts_index_);
 }
 
@@ -104,7 +133,7 @@
   CHECK(!header_written_) << ": Attempting to write duplicate header to "
                           << writer->filename();
   CHECK(header.message().has_source_node_boot_uuid());
-  CHECK_EQ(boot_uuids_[node_index_],
+  CHECK_EQ(state_[node_index_].boot_uuid,
            UUID::FromString(header.message().source_node_boot_uuid()));
   // TODO(austin): This triggers a dummy allocation that we don't need as part
   // of releasing.  Can we skip it?
@@ -120,12 +149,12 @@
 }
 
 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
-    size_t node_index, const std::vector<UUID> &boot_uuids,
+    size_t node_index, const std::vector<NewDataWriter::State> &state,
     const UUID &parts_uuid, int parts_index) const {
-  const UUID &source_node_boot_uuid = boot_uuids[node_index];
+  const UUID &source_node_boot_uuid = state[node_index].boot_uuid;
   const Node *const source_node =
       configuration::GetNode(configuration_, node_index);
-  CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 20u);
+  CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 24u);
   flatbuffers::FlatBufferBuilder fbb;
   fbb.ForceDefaults(true);
 
@@ -181,10 +210,10 @@
   }
 
   std::vector<flatbuffers::Offset<flatbuffers::String>> boot_uuid_offsets;
-  boot_uuid_offsets.reserve(boot_uuids.size());
-  for (const UUID &uuid : boot_uuids) {
-    if (uuid != UUID::Zero()) {
-      boot_uuid_offsets.emplace_back(uuid.PackString(&fbb));
+  boot_uuid_offsets.reserve(state.size());
+  for (const NewDataWriter::State &state : state) {
+    if (state.boot_uuid != UUID::Zero()) {
+      boot_uuid_offsets.emplace_back(state.boot_uuid.PackString(&fbb));
     } else {
       boot_uuid_offsets.emplace_back(fbb.CreateString(""));
     }
@@ -194,6 +223,43 @@
       flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
       boot_uuids_offset = fbb.CreateVector(boot_uuid_offsets);
 
+  int64_t *oldest_remote_monotonic_timestamps;
+  flatbuffers::Offset<flatbuffers::Vector<int64_t>>
+      oldest_remote_monotonic_timestamps_offset = fbb.CreateUninitializedVector(
+          state.size(), &oldest_remote_monotonic_timestamps);
+
+  int64_t *oldest_local_monotonic_timestamps;
+  flatbuffers::Offset<flatbuffers::Vector<int64_t>>
+      oldest_local_monotonic_timestamps_offset = fbb.CreateUninitializedVector(
+          state.size(), &oldest_local_monotonic_timestamps);
+
+  int64_t *oldest_remote_unreliable_monotonic_timestamps;
+  flatbuffers::Offset<flatbuffers::Vector<int64_t>>
+      oldest_remote_unreliable_monotonic_timestamps_offset =
+          fbb.CreateUninitializedVector(
+              state.size(), &oldest_remote_unreliable_monotonic_timestamps);
+
+  int64_t *oldest_local_unreliable_monotonic_timestamps;
+  flatbuffers::Offset<flatbuffers::Vector<int64_t>>
+      oldest_local_unreliable_monotonic_timestamps_offset =
+          fbb.CreateUninitializedVector(
+              state.size(), &oldest_local_unreliable_monotonic_timestamps);
+
+  for (size_t i = 0; i < state.size(); ++i) {
+    oldest_remote_monotonic_timestamps[i] =
+        state[i].oldest_remote_monotonic_timestamp.time_since_epoch().count();
+    oldest_local_monotonic_timestamps[i] =
+        state[i].oldest_local_monotonic_timestamp.time_since_epoch().count();
+    oldest_remote_unreliable_monotonic_timestamps[i] =
+        state[i]
+            .oldest_remote_unreliable_monotonic_timestamp.time_since_epoch()
+            .count();
+    oldest_local_unreliable_monotonic_timestamps[i] =
+        state[i]
+            .oldest_local_unreliable_monotonic_timestamp.time_since_epoch()
+            .count();
+  }
+
   aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
 
   log_file_header_builder.add_name(name_offset);
@@ -263,6 +329,14 @@
       std::chrono::duration_cast<std::chrono::nanoseconds>(
           event_loop_->realtime_now().time_since_epoch())
           .count());
+  log_file_header_builder.add_oldest_remote_monotonic_timestamps(
+      oldest_remote_monotonic_timestamps_offset);
+  log_file_header_builder.add_oldest_local_monotonic_timestamps(
+      oldest_local_monotonic_timestamps_offset);
+  log_file_header_builder.add_oldest_remote_unreliable_monotonic_timestamps(
+      oldest_remote_unreliable_monotonic_timestamps_offset);
+  log_file_header_builder.add_oldest_local_unreliable_monotonic_timestamps(
+      oldest_local_unreliable_monotonic_timestamps_offset);
   fbb.FinishSizePrefixed(log_file_header_builder.Finish());
   aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
       fbb.Release());
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index d9f42db..8a44c97 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -44,14 +44,10 @@
   // Rotates the log file, delaying writing the new header until data arrives.
   void Rotate();
 
-  // TODO(austin): Copy header and add all UUIDs and such when available
-  // whenever data is written.
-  //
-  // TODO(austin): Add known timestamps for each node every time we cycle a log
-  // for sorting.
-
-  void UpdateRemote(size_t remote_node_index,
-                    const UUID &remote_node_boot_uuid);
+  void UpdateRemote(size_t remote_node_index, const UUID &remote_node_boot_uuid,
+                    monotonic_clock::time_point monotonic_remote_time,
+                    monotonic_clock::time_point monotonic_event_time,
+                    bool reliable);
   // Queues up a message with the provided boot UUID.
   void QueueMessage(flatbuffers::FlatBufferBuilder *fbb,
                     const UUID &node_boot_uuid,
@@ -69,6 +65,28 @@
   size_t parts_index() const { return parts_index_; }
   const Node *node() const { return node_; }
 
+  // Datastructure used to capture all the information about a remote node.
+  struct State {
+    // Boot UUID of the node.
+    UUID boot_uuid = UUID::Zero();
+    // Timestamp on the remote monotonic clock of the oldest message sent to
+    // node_index_.
+    monotonic_clock::time_point oldest_remote_monotonic_timestamp =
+        monotonic_clock::max_time;
+    // Timestamp on the local monotonic clock of the message in
+    // oldest_remote_monotonic_timestamp.
+    monotonic_clock::time_point oldest_local_monotonic_timestamp =
+        monotonic_clock::max_time;
+    // Timestamp on the remote monotonic clock of the oldest message sent to
+    // node_index_, excluding messages forwarded with time_to_live() == 0.
+    monotonic_clock::time_point oldest_remote_unreliable_monotonic_timestamp =
+        monotonic_clock::max_time;
+    // Timestamp on the local monotonic clock of the message in
+    // oldest_local_unreliable_monotonic_timestamp.
+    monotonic_clock::time_point oldest_local_unreliable_monotonic_timestamp =
+        monotonic_clock::max_time;
+  };
+
  private:
   // Signals that a node has rebooted.
   void Reboot();
@@ -88,7 +106,7 @@
   std::function<void(NewDataWriter *)> close_;
   bool header_written_ = false;
 
-  std::vector<UUID> boot_uuids_;
+  std::vector<State> state_;
 };
 
 // Interface describing how to name, track, and add headers to log file parts.
@@ -185,7 +203,7 @@
   // Creates a new header by copying fields out of the template and combining
   // them with the arguments provided.
   aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
-      size_t node_index, const std::vector<UUID> &boot_uuids,
+      size_t node_index, const std::vector<NewDataWriter::State> &state,
       const UUID &parts_uuid, int parts_index) const;
 
   EventLoop *event_loop_;
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index 4bc4448..bc04f83 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -92,8 +92,19 @@
 
     bool log_delivery_times = false;
     if (event_loop_->node() != nullptr) {
+      const aos::Connection *connection =
+          configuration::ConnectionToNode(channel, event_loop_->node());
+
       log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
-          channel, event_loop_->node(), event_loop_->node());
+          connection, event_loop_->node());
+
+      CHECK_EQ(log_delivery_times,
+               configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+                   channel, event_loop_->node(), event_loop_->node()));
+
+      if (connection) {
+        fs.reliable_forwarding = (connection->time_to_live() == 0);
+      }
     }
 
     // Now, detect a RemoteMessage timestamp logger where we should just log the
@@ -620,8 +631,10 @@
                            fbb.GetBufferPointer()));
 
         // Tell our writer that we know something about the remote boot.
-        f.timestamp_writer->UpdateRemote(f.data_node_index,
-                                         f.fetcher->context().source_boot_uuid);
+        f.timestamp_writer->UpdateRemote(
+            f.data_node_index, f.fetcher->context().source_boot_uuid,
+            f.fetcher->context().monotonic_remote_time,
+            f.fetcher->context().monotonic_event_time, f.reliable_forwarding);
         f.timestamp_writer->QueueMessage(&fbb, event_loop_->boot_uuid(), end);
       }
 
diff --git a/aos/events/logging/log_writer.h b/aos/events/logging/log_writer.h
index f624028..9f55edd 100644
--- a/aos/events/logging/log_writer.h
+++ b/aos/events/logging/log_writer.h
@@ -194,6 +194,9 @@
     int timestamp_node_index = -1;
     // Node that the contents this contents_writer will log are from.
     int contents_node_index = -1;
+
+    // If true, this message is being sent over a reliable channel.
+    bool reliable_forwarding = false;
   };
 
   // Vector mapping from the channel index from the event loop to the logged
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 1821039..e27cecb 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -33,7 +33,7 @@
 }
 
 bool ConfigOnly(const LogFileHeader *header) {
-  CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 20u);
+  CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 24u);
   if (header->has_monotonic_start_time()) return false;
   if (header->has_realtime_start_time()) return false;
   if (header->has_max_out_of_order_duration()) return false;
@@ -53,6 +53,10 @@
   if (header->has_boot_uuids()) return false;
   if (header->has_logger_part_monotonic_start_time()) return false;
   if (header->has_logger_part_realtime_start_time()) return false;
+  if (header->has_oldest_remote_monotonic_timestamps()) return false;
+  if (header->has_oldest_local_monotonic_timestamps()) return false;
+  if (header->has_oldest_remote_unreliable_monotonic_timestamps()) return false;
+  if (header->has_oldest_local_unreliable_monotonic_timestamps()) return false;
 
   return header->has_configuration();
 }
diff --git a/aos/events/logging/logger.fbs b/aos/events/logging/logger.fbs
index 23da360..c71baad 100644
--- a/aos/events/logging/logger.fbs
+++ b/aos/events/logging/logger.fbs
@@ -67,7 +67,8 @@
   source_node_boot_uuid: string (id: 13);
 
   // Timestamps that this logfile started at on the logger's clocks.  This is
-  // mostly useful when trying to deduce the order of node reboots.
+  // mostly useful when trying to deduce the order of node reboots.  These
+  // timestamps don't change on reboot, so they can't be used reliably.
   logger_monotonic_start_time:int64 = -9223372036854775808 (id: 14);
   logger_realtime_start_time:int64 = -9223372036854775808 (id: 15);
 
@@ -95,12 +96,53 @@
   logger_node:Node (id: 9);
 
   // The boot UUIDs for all nodes we know them for, or "" for the ones we don't.
-  // TODO(austin): Actually add this in the log writer.
   boot_uuids:[string] (id: 17);
 
   // Timestamps that the header on this part file was written on the logger node.
   logger_part_monotonic_start_time:int64 = -9223372036854775808 (id: 18);
   logger_part_realtime_start_time:int64 = -9223372036854775808 (id: 19);
+
+  // These timestamps provide summary information about the oldest messages we
+  // know which crossed the network.  The goal is to enable log file sorting
+  // to determine the order of all boots by observing corresponding times
+  // across the network and using those to determine constraints so we can sort
+  // a DAG.
+  //
+  // There are 2 main problem cases.  Let's say we have 2 channels.  /a which
+  // is reliable, and /b which isn't, both sent from the same remote node.
+  //
+  // case 1:  /a -> boot 0 received on boot 0.
+  //          /b -> boot 1 received on boot 0.
+  //  We start logging after both messages arrive.
+  //
+  // case 2:  /a -> boot 0 received on boot 0.
+  //          /b -> boot 0 received on boot 0.
+  //  We log for a bit, then reboot.  More messages show up when we reconnect.
+  //          /a -> boot 0 received on boot 1.
+  //          /b -> boot 0 received on boot 1.
+  //
+  // In case 1: we only have a reliable timestamp from boot 0, but that
+  // reliable timestamp makes it clear that /a was before /b, so boot 0 was
+  // before boot 1.
+  //
+  // In case 2: we have the same reliable timestamp, so that tells us nothing.
+  // The unreliable timestamps though tell a different story.  /b will be after
+  // /a, since any messages on /b generated before the reboot won't get
+  // delivered.  So, we get an ordering constraint saying that any sent /b's
+  // on the second boot were after /b on the first boot.
+  //
+  // We believe that any other cases are covered by the same mechanism.
+  //
+  // For all channels sent from a specific node, these vectors hold the
+  // timestamp of the oldest known message from that node, and the
+  // corresponding monotonic timestamp for when that was received on our node.
+  oldest_remote_monotonic_timestamps:[int64] (id: 20);
+  oldest_local_monotonic_timestamps:[int64] (id: 21);
+
+  // For all channels *excluding* the reliable channels (ttl == 0), record the
+  // same quantity.
+  oldest_remote_unreliable_monotonic_timestamps:[int64] (id: 22);
+  oldest_local_unreliable_monotonic_timestamps:[int64] (id: 23);
 }
 
 // Table holding a message.
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index a439f84..8cad486 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -558,6 +558,7 @@
     result.emplace_back(logfile_base1_ + "_pi1_data.part0.bfbs");
     result.emplace_back(logfile_base1_ + "_pi1_data.part1.bfbs");
     result.emplace_back(logfile_base1_ + "_pi1_data.part2.bfbs");
+    result.emplace_back(logfile_base1_ + "_pi1_data.part3.bfbs");
     result.emplace_back(logfile_base1_ +
                         "_pi2_data/test/aos.examples.Pong.part0.bfbs");
     result.emplace_back(logfile_base1_ +
@@ -1527,7 +1528,8 @@
     event_loop_factory_.RunFor(logger_run3);
   }
 
-  LogReader reader(SortParts(logfiles_));
+  LogReader reader(
+      SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 3, 2)));
 
   SimulatedEventLoopFactory log_reader_factory(reader.configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -2268,6 +2270,119 @@
     event_loop_factory_.RunFor(chrono::milliseconds(20000));
   }
 
+  // Confirm that our new oldest timestamps properly update as we reboot and
+  // rotate.
+  for (const std::string &file : pi1_reboot_logfiles_) {
+    std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
+        ReadHeader(file);
+    CHECK(log_header);
+    if (log_header->message().has_configuration()) {
+      continue;
+    }
+
+    if (log_header->message().node()->name()->string_view() != "pi1") {
+      continue;
+    }
+    SCOPED_TRACE(file);
+    SCOPED_TRACE(aos::FlatbufferToJson(
+        *log_header, {.multi_line = true, .max_vector_size = 100}));
+    ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
+    ASSERT_EQ(
+        log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
+    EXPECT_EQ(
+        log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
+        monotonic_clock::max_time.time_since_epoch().count());
+    ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
+    ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
+              2u);
+    EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
+              monotonic_clock::max_time.time_since_epoch().count());
+    ASSERT_TRUE(log_header->message()
+                    .has_oldest_remote_unreliable_monotonic_timestamps());
+    ASSERT_EQ(log_header->message()
+                  .oldest_remote_unreliable_monotonic_timestamps()
+                  ->size(),
+              2u);
+    EXPECT_EQ(log_header->message()
+                  .oldest_remote_unreliable_monotonic_timestamps()
+                  ->Get(0),
+              monotonic_clock::max_time.time_since_epoch().count());
+    ASSERT_TRUE(log_header->message()
+                    .has_oldest_local_unreliable_monotonic_timestamps());
+    ASSERT_EQ(log_header->message()
+                  .oldest_local_unreliable_monotonic_timestamps()
+                  ->size(),
+              2u);
+    EXPECT_EQ(log_header->message()
+                  .oldest_local_unreliable_monotonic_timestamps()
+                  ->Get(0),
+              monotonic_clock::max_time.time_since_epoch().count());
+
+    const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
+        monotonic_clock::time_point(chrono::nanoseconds(
+            log_header->message().oldest_remote_monotonic_timestamps()->Get(
+                1)));
+    const monotonic_clock::time_point oldest_local_monotonic_timestamps =
+        monotonic_clock::time_point(chrono::nanoseconds(
+            log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
+    const monotonic_clock::time_point
+        oldest_remote_unreliable_monotonic_timestamps =
+            monotonic_clock::time_point(chrono::nanoseconds(
+                log_header->message()
+                    .oldest_remote_unreliable_monotonic_timestamps()
+                    ->Get(1)));
+    const monotonic_clock::time_point
+        oldest_local_unreliable_monotonic_timestamps =
+            monotonic_clock::time_point(chrono::nanoseconds(
+                log_header->message()
+                    .oldest_local_unreliable_monotonic_timestamps()
+                    ->Get(1)));
+    switch (log_header->message().parts_index()) {
+      case 0:
+        EXPECT_EQ(oldest_remote_monotonic_timestamps,
+                  monotonic_clock::max_time);
+        EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
+        EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+                  monotonic_clock::max_time);
+        EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+                  monotonic_clock::max_time);
+        break;
+      case 1:
+        EXPECT_EQ(oldest_remote_monotonic_timestamps,
+                  monotonic_clock::time_point(chrono::microseconds(90200)));
+        EXPECT_EQ(oldest_local_monotonic_timestamps,
+                  monotonic_clock::time_point(chrono::microseconds(90350)));
+        EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+                  monotonic_clock::time_point(chrono::microseconds(90200)));
+        EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+                  monotonic_clock::time_point(chrono::microseconds(90350)));
+        break;
+      case 2:
+        EXPECT_EQ(oldest_remote_monotonic_timestamps,
+                  monotonic_clock::time_point(chrono::microseconds(10100000)));
+        EXPECT_EQ(oldest_local_monotonic_timestamps,
+                  monotonic_clock::time_point(chrono::microseconds(10100150)));
+        EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+                  monotonic_clock::max_time);
+        EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+                  monotonic_clock::max_time);
+        break;
+      case 3:
+        EXPECT_EQ(oldest_remote_monotonic_timestamps,
+                  monotonic_clock::time_point(chrono::microseconds(10100000)));
+        EXPECT_EQ(oldest_local_monotonic_timestamps,
+                  monotonic_clock::time_point(chrono::microseconds(10100150)));
+        EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+                  monotonic_clock::time_point(chrono::microseconds(10100200)));
+        EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+                  monotonic_clock::time_point(chrono::microseconds(10100350)));
+        break;
+      default:
+        FAIL();
+        break;
+    }
+  }
+
   // Confirm that we refuse to replay logs with missing boot uuids.
   EXPECT_DEATH(
       {