Add node_boots parsing from log file headers

Jim suggested that we could store the boot UUID in the header for all
the nodes rather than per message.  That seems reasonable and lets us do
other things with it in the future.  Use it when available to determine
which boot happens when.

When we only have 1 boot for a node, fall back to setting the boot count
to 0 for sorting.  This lets us get started with the new boot counter.

A follow-up commit will start logging with this.

Change-Id: Ia9e0bd2860de82f70f0daa02782d06f1fd4f4218
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 86d3930..118ddf7 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -33,7 +33,7 @@
 }
 
 bool ConfigOnly(const LogFileHeader *header) {
-  CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 17u);
+  CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 18u);
   if (header->has_monotonic_start_time()) return false;
   if (header->has_realtime_start_time()) return false;
   if (header->has_max_out_of_order_duration()) return false;
@@ -50,6 +50,7 @@
   if (header->has_parts_uuid()) return false;
   if (header->has_parts_index()) return false;
   if (header->has_logger_node()) return false;
+  if (header->has_boot_uuids()) return false;
 
   return header->has_configuration();
 }
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index cb22130..89be828 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -516,7 +516,52 @@
 }
 
 PartsMessageReader::PartsMessageReader(LogParts log_parts)
-    : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {}
+    : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
+  ComputeBootCounts();
+}
+
+void PartsMessageReader::ComputeBootCounts() {
+  boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
+                      std::nullopt);
+
+  // We have 3 vintages of log files with different amounts of information.
+  if (log_file_header()->has_boot_uuids()) {
+    // The new hotness with the boots explicitly listed out.  We can use the log
+    // file header to compute the boot count of all relevant nodes.
+    CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
+    size_t node_index = 0;
+    for (const flatbuffers::String *boot_uuid :
+         *log_file_header()->boot_uuids()) {
+      CHECK(parts_.boots);
+      if (boot_uuid->size() != 0) {
+        auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
+        if (it != parts_.boots->boot_count_map.end()) {
+          boot_counts_[node_index] = it->second;
+        }
+      } else if (parts().boots->boots[node_index].size() == 1u) {
+        boot_counts_[node_index] = 0;
+      }
+      ++node_index;
+    }
+  } else {
+    // Older multi-node logs which are guarenteed to have UUIDs logged, or
+    // single node log files with boot UUIDs in the header.  We only know how to
+    // order certain boots in certain circumstances.
+    if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
+      for (size_t node_index = 0; node_index < boot_counts_.size();
+           ++node_index) {
+        CHECK(parts_.boots);
+        if (parts().boots->boots[node_index].size() == 1u) {
+          boot_counts_[node_index] = 0;
+        }
+      }
+    } else {
+      // Really old single node logs without any UUIDs.  They can't reboot.
+      CHECK_EQ(boot_counts_.size(), 1u);
+      boot_counts_[0] = 0u;
+    }
+  }
+}
 
 std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
 PartsMessageReader::ReadMessage() {
@@ -557,6 +602,7 @@
     return;
   }
   message_reader_ = MessageReader(parts_.parts[next_part_index_]);
+  ComputeBootCounts();
   ++next_part_index_;
 }
 
@@ -625,7 +671,9 @@
 }
 
 LogPartsSorter::LogPartsSorter(LogParts log_parts)
-    : parts_message_reader_(log_parts) {}
+    : parts_message_reader_(log_parts),
+      source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
+}
 
 Message *LogPartsSorter::Front() {
   // Queue up data until enough data has been queued that the front message is
@@ -633,7 +681,8 @@
   // sure the nothing path is checked quickly.
   if (sorted_until() != monotonic_clock::max_time) {
     while (true) {
-      if (!messages_.empty() && messages_.begin()->timestamp.time < sorted_until() &&
+      if (!messages_.empty() &&
+          messages_.begin()->timestamp.time < sorted_until() &&
           sorted_until() >= monotonic_start_time()) {
         break;
       }
@@ -646,6 +695,20 @@
         break;
       }
 
+      size_t monotonic_timestamp_boot = 0;
+      if (m.value().message().has_monotonic_timestamp_time()) {
+        monotonic_timestamp_boot = parts().logger_boot_count;
+      }
+      size_t monotonic_remote_boot = 0xffffff;
+
+      if (m.value().message().has_monotonic_remote_time()) {
+        std::optional<size_t> boot = parts_message_reader_.boot_count(
+            source_node_index_[m->message().channel_index()]);
+        CHECK(boot) << ": Failed to find boot for node "
+                    << source_node_index_[m->message().channel_index()];
+        monotonic_remote_boot = *boot;
+      }
+
       messages_.insert(Message{
           .channel_index = m.value().message().channel_index(),
           .queue_index = m.value().message().queue_index(),
@@ -654,6 +717,8 @@
                   .boot = parts().boot_count,
                   .time = monotonic_clock::time_point(std::chrono::nanoseconds(
                       m.value().message().monotonic_sent_time()))},
+          .monotonic_remote_boot = monotonic_remote_boot,
+          .monotonic_timestamp_boot = monotonic_timestamp_boot,
           .data = std::move(m.value())});
 
       // Now, update sorted_until_ to match the new message.
@@ -802,9 +867,7 @@
 
   // Now, we need to split things out by boot.
   for (size_t i = 0; i < files.size(); ++i) {
-    LOG(INFO) << "Trying file " << i;
     const size_t boot_count = files[i].boot_count;
-    LOG(INFO) << "Boot count " << boot_count;
     if (boot_count + 1 > boots.size()) {
       boots.resize(boot_count + 1);
     }
@@ -813,9 +876,9 @@
 
   node_mergers_.reserve(boots.size());
   for (size_t i = 0; i < boots.size(); ++i) {
-    LOG(INFO) << "Boot " << i;
+    VLOG(2) << "Boot " << i;
     for (auto &p : boots[i]) {
-      LOG(INFO) << "Part " << p;
+      VLOG(2) << "Part " << p;
     }
     node_mergers_.emplace_back(
         std::make_unique<NodeMerger>(std::move(boots[i])));
@@ -1005,14 +1068,15 @@
             std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
         .remote_queue_index = m->data.message().remote_queue_index(),
         .monotonic_remote_time =
-            // TODO(austin): 0 is wrong...
-        {0, monotonic_clock::time_point(std::chrono::nanoseconds(
-                m->data.message().monotonic_remote_time()))},
+            {m->monotonic_remote_boot,
+             monotonic_clock::time_point(std::chrono::nanoseconds(
+                 m->data.message().monotonic_remote_time()))},
         .realtime_remote_time = realtime_clock::time_point(
             std::chrono::nanoseconds(m->data.message().realtime_remote_time())),
         .monotonic_timestamp_time =
-            {0, monotonic_clock::time_point(std::chrono::nanoseconds(
-                    m->data.message().monotonic_timestamp_time()))},
+            {m->monotonic_timestamp_boot,
+             monotonic_clock::time_point(std::chrono::nanoseconds(
+                 m->data.message().monotonic_timestamp_time()))},
         .data = std::move(data.data)});
     CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
     last_message_time_ = matched_messages_.back().monotonic_event_time;
@@ -1078,7 +1142,7 @@
   CHECK(message.data.message().has_realtime_remote_time());
 
   const BootTimestamp monotonic_remote_time{
-      .boot = 0,
+      .boot = message.monotonic_remote_boot,
       .time = monotonic_clock::time_point(std::chrono::nanoseconds(
           message.data.message().monotonic_remote_time()))};
   const realtime_clock::time_point realtime_remote_time(
@@ -1096,6 +1160,8 @@
         .channel_index = message.channel_index,
         .queue_index = remote_queue_index,
         .timestamp = monotonic_remote_time,
+        .monotonic_remote_boot = 0xffffff,
+        .monotonic_timestamp_boot = 0xffffff,
         .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
   }
 
@@ -1110,6 +1176,8 @@
         .channel_index = message.channel_index,
         .queue_index = remote_queue_index,
         .timestamp = monotonic_remote_time,
+        .monotonic_remote_boot = 0xffffff,
+        .monotonic_timestamp_boot = 0xffffff,
         .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
   }
 
@@ -1119,6 +1187,8 @@
         .channel_index = message.channel_index,
         .queue_index = remote_queue_index,
         .timestamp = monotonic_remote_time,
+        .monotonic_remote_boot = 0xffffff,
+        .monotonic_timestamp_boot = 0xffffff,
         .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
   }
 
@@ -1153,6 +1223,8 @@
           .channel_index = message.channel_index,
           .queue_index = remote_queue_index,
           .timestamp = monotonic_remote_time,
+          .monotonic_remote_boot = 0xffffff,
+          .monotonic_timestamp_boot = 0xffffff,
           .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
     }
 
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index f876d25..f31f4df 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -334,10 +334,19 @@
   // Note: reading the next message may change the max_out_of_order_duration().
   std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadMessage();
 
+  // Returns the boot count for the requested node, or std::nullopt if we don't
+  // know.
+  std::optional<size_t> boot_count(size_t node_index) const {
+    CHECK_GE(node_index, 0u);
+    CHECK_LT(node_index, boot_counts_.size());
+    return boot_counts_[node_index];
+  }
+
  private:
   // Opens the next log and updates message_reader_.  Sets done_ if there is
   // nothing more to do.
   void NextLog();
+  void ComputeBootCounts();
 
   const LogParts parts_;
   size_t next_part_index_ = 1u;
@@ -354,6 +363,9 @@
   bool after_start_ = false;
 
   monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
+
+  // Per node boot counts.
+  std::vector<std::optional<size_t>> boot_counts_;
 };
 
 // Struct to hold a message as it gets sorted on a single node.
@@ -365,6 +377,11 @@
   // The local timestamp.
   BootTimestamp timestamp;
 
+  // Remote boot when this is a timestamp.
+  size_t monotonic_remote_boot = 0xffffff;
+
+  size_t monotonic_timestamp_boot = 0xffffff;
+
   // The data (either a timestamp header, or a data header).
   SizePrefixedFlatbufferVector<MessageHeader> data;
 
@@ -437,6 +454,10 @@
   // Set used for efficient sorting of messages.  We can benchmark and evaluate
   // other data structures if this proves to be the bottleneck.
   absl::btree_set<Message> messages_;
+
+  // Mapping from channel to source node.
+  // TODO(austin): Should we put this in Boots so it can be cached for everyone?
+  std::vector<size_t> source_node_index_;
 };
 
 // Class to run merge sort on the messages from multiple LogPartsSorter
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 4973d02..e99db94 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -228,11 +228,15 @@
              .queue_index = 0,
              .timestamp =
                  BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
+             .monotonic_remote_boot = 0xffffff,
+             .monotonic_timestamp_boot = 0xffffff,
              .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
   Message m2{.channel_index = 0,
              .queue_index = 0,
              .timestamp =
                  BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
+             .monotonic_remote_boot = 0xffffff,
+             .monotonic_timestamp_boot = 0xffffff,
              .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
 
   EXPECT_LT(m1, m2);
@@ -306,6 +310,16 @@
       "name": "/c",
       "type": "aos.logger.testing.TestMessage",
       "source_node": "pi1"
+    },
+    {
+      "name": "/d",
+      "type": "aos.logger.testing.TestMessage",
+      "source_node": "pi2",
+      "destination_nodes": [
+        {
+          "name": "pi1"
+        }
+      ]
     }
   ],
   "nodes": [
@@ -333,6 +347,13 @@
   "monotonic_start_time": 1000000,
   "realtime_start_time": 1000000000000,
   "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+  "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+  "boot_uuids": [
+    "1d782c63-b3c7-466e-bea9-a01308b43333",
+    "",
+    ""
+  ],
   "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
   "parts_index": 0
 })")),
@@ -349,6 +370,13 @@
   "monotonic_start_time": 1000000,
   "realtime_start_time": 1000000000000,
   "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+  "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+  "boot_uuids": [
+    "1d782c63-b3c7-466e-bea9-a01308b43333",
+    "",
+    ""
+  ],
   "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
   "parts_index": 0
 })")),
@@ -364,6 +392,13 @@
   },
   "monotonic_start_time": 0,
   "realtime_start_time": 1000000000000,
+  "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+  "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+  "boot_uuids": [
+    "",
+    "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+    ""
+  ],
   "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
   "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
   "parts_index": 0
@@ -381,6 +416,13 @@
   "monotonic_start_time": 2000000,
   "realtime_start_time": 1000000000,
   "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
+  "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+  "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+  "boot_uuids": [
+    "1d782c63-b3c7-466e-bea9-a01308b43333",
+    "",
+    ""
+  ],
   "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
   "parts_index": 0
 })")),
@@ -398,11 +440,19 @@
   "realtime_start_time": 1000000000,
   "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
   "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
+  "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+  "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+  "boot_uuids": [
+    "1d782c63-b3c7-466e-bea9-a01308b43333",
+    "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+    ""
+  ],
   "parts_index": 0
 })")) {
     unlink(logfile0_.c_str());
     unlink(logfile1_.c_str());
     unlink(logfile2_.c_str());
+    unlink(logfile3_.c_str());
     queue_index_.resize(kChannels);
   }
 
@@ -487,6 +537,7 @@
   const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
   const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
   const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
+  const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
 
   const aos::FlatbufferDetachedBuffer<Configuration> config_;
   const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
@@ -1817,7 +1868,12 @@
   "parts_index": 0,
   "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
   "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
-  "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464"
+  "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+  "boot_uuids": [
+    "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+    "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+    ""
+  ]
 })")),
         boot1_(MakeHeader(config_, R"({
   /* 100ms */
@@ -1837,7 +1893,12 @@
   "parts_index": 1,
   "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
   "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
-  "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2"
+  "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+  "boot_uuids": [
+    "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+    "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+    ""
+  ]
 })")) {}
 
  protected:
@@ -1921,6 +1982,491 @@
   EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
 }
 
+class RebootTimestampMapperTest : public SortingElementTest {
+ public:
+  RebootTimestampMapperTest()
+      : SortingElementTest(),
+        boot0a_(MakeHeader(config_, R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi1"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 1000000,
+  "realtime_start_time": 1000000000000,
+  "logger_monotonic_start_time": 1000000,
+  "logger_realtime_start_time": 1000000000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
+  "parts_index": 0,
+  "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+  "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+  "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+  "boot_uuids": [
+    "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+    "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+    ""
+  ]
+})")),
+        boot0b_(MakeHeader(config_, R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi1"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 1000000,
+  "realtime_start_time": 1000000000000,
+  "logger_monotonic_start_time": 1000000,
+  "logger_realtime_start_time": 1000000000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
+  "parts_index": 1,
+  "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+  "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+  "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+  "boot_uuids": [
+    "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+    "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+    ""
+  ]
+})")),
+        boot1a_(MakeHeader(config_, R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi2"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 1000000,
+  "realtime_start_time": 1000000000000,
+  "logger_monotonic_start_time": 1000000,
+  "logger_realtime_start_time": 1000000000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
+  "parts_index": 0,
+  "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+  "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+  "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+  "boot_uuids": [
+    "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+    "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+    ""
+  ]
+})")),
+        boot1b_(MakeHeader(config_, R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi2"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 1000000,
+  "realtime_start_time": 1000000000000,
+  "logger_monotonic_start_time": 1000000,
+  "logger_realtime_start_time": 1000000000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
+  "parts_index": 1,
+  "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+  "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+  "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+  "boot_uuids": [
+    "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+    "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+    ""
+  ]
+})")) {}
+
+ protected:
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
+};
+
+
+// Tests that we can match timestamps on delivered messages in the presence of
+// reboots on the node receiving timestamps.
+TEST_F(RebootTimestampMapperTest, ReadNode0First) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer0a(logfile0_, std::make_unique<DummyEncoder>());
+    writer0a.QueueSpan(boot0a_.span());
+    DetachedBufferWriter writer0b(logfile1_, std::make_unique<DummyEncoder>());
+    writer0b.QueueSpan(boot0b_.span());
+    DetachedBufferWriter writer1a(logfile2_, std::make_unique<DummyEncoder>());
+    writer1a.QueueSpan(boot1a_.span());
+    DetachedBufferWriter writer1b(logfile3_, std::make_unique<DummyEncoder>());
+    writer1b.QueueSpan(boot1b_.span());
+
+    writer0a.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+    writer1a.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1000), 0, chrono::seconds(100),
+        e + chrono::milliseconds(1001)));
+
+    writer0b.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+    writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(2000), 0, chrono::seconds(20),
+        e + chrono::milliseconds(2001)));
+
+    writer0b.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+    writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(3000), 0, chrono::seconds(20),
+        e + chrono::milliseconds(3001)));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
+
+  for (const auto &x : parts) {
+    LOG(INFO) << x;
+  }
+  ASSERT_EQ(parts.size(), 1u);
+  ASSERT_EQ(parts[0].logger_node, "pi1");
+
+  size_t mapper0_count = 0;
+  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  mapper0.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper0_count; });
+  size_t mapper1_count = 0;
+  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  mapper1.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper1_count; });
+
+  mapper0.AddPeer(&mapper1);
+  mapper1.AddPeer(&mapper0);
+
+  {
+    std::deque<TimestampedMessage> output0;
+
+    EXPECT_EQ(mapper0_count, 0u);
+    EXPECT_EQ(mapper1_count, 0u);
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 1u);
+    EXPECT_EQ(mapper1_count, 0u);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_TRUE(mapper0.started());
+    EXPECT_EQ(mapper0_count, 1u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 2u);
+    EXPECT_EQ(mapper1_count, 0u);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_TRUE(mapper0.started());
+
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_TRUE(mapper0.started());
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper0.Front() == nullptr);
+
+    LOG(INFO) << output0[0];
+    LOG(INFO) << output0[1];
+    LOG(INFO) << output0[2];
+
+    EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output0[0].monotonic_event_time.time,
+              e + chrono::milliseconds(1000));
+    EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
+    EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
+    EXPECT_TRUE(output0[0].data.Verify());
+
+    EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output0[1].monotonic_event_time.time,
+              e + chrono::milliseconds(2000));
+    EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
+    EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
+    EXPECT_TRUE(output0[1].data.Verify());
+
+    EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output0[2].monotonic_event_time.time,
+              e + chrono::milliseconds(3000));
+    EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
+    EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
+    EXPECT_TRUE(output0[2].data.Verify());
+  }
+
+  {
+    SCOPED_TRACE("Trying node1 now");
+    std::deque<TimestampedMessage> output1;
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 1u);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_TRUE(mapper1.started());
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 1u);
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 2u);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_TRUE(mapper1.started());
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_TRUE(mapper1.started());
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 3u);
+
+    ASSERT_TRUE(mapper1.Front() == nullptr);
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 3u);
+
+    EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output1[0].monotonic_event_time.time,
+              e + chrono::seconds(100) + chrono::milliseconds(1000));
+    EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
+    EXPECT_EQ(output1[0].monotonic_remote_time.time,
+              e + chrono::milliseconds(1000));
+    EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
+    EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
+              e + chrono::milliseconds(1001));
+    EXPECT_TRUE(output1[0].data.Verify());
+
+    EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
+    EXPECT_EQ(output1[1].monotonic_event_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(2000));
+    EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
+    EXPECT_EQ(output1[1].monotonic_remote_time.time,
+              e + chrono::milliseconds(2000));
+    EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
+    EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
+              e + chrono::milliseconds(2001));
+    EXPECT_TRUE(output1[1].data.Verify());
+
+    EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
+    EXPECT_EQ(output1[2].monotonic_event_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(3000));
+    EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
+    EXPECT_EQ(output1[2].monotonic_remote_time.time,
+              e + chrono::milliseconds(3000));
+    EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
+    EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
+              e + chrono::milliseconds(3001));
+    EXPECT_TRUE(output1[2].data.Verify());
+
+    LOG(INFO) << output1[0];
+    LOG(INFO) << output1[1];
+    LOG(INFO) << output1[2];
+  }
+}
+
+TEST_F(RebootTimestampMapperTest, Node2Reboot) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer0a(logfile0_, std::make_unique<DummyEncoder>());
+    writer0a.QueueSpan(boot0a_.span());
+    DetachedBufferWriter writer0b(logfile1_, std::make_unique<DummyEncoder>());
+    writer0b.QueueSpan(boot0b_.span());
+    DetachedBufferWriter writer1a(logfile2_, std::make_unique<DummyEncoder>());
+    writer1a.QueueSpan(boot1a_.span());
+    DetachedBufferWriter writer1b(logfile3_, std::make_unique<DummyEncoder>());
+    writer1b.QueueSpan(boot1b_.span());
+
+    writer1a.QueueSizedFlatbuffer(MakeLogMessage(
+        e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
+    writer0a.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
+        chrono::seconds(-100),
+        e + chrono::seconds(100) + chrono::milliseconds(1001)));
+
+    writer1b.QueueSizedFlatbuffer(MakeLogMessage(
+        e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
+    writer0b.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
+        chrono::seconds(-20),
+        e + chrono::seconds(20) + chrono::milliseconds(2001)));
+
+    writer1b.QueueSizedFlatbuffer(MakeLogMessage(
+        e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
+    writer0b.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
+        chrono::seconds(-20),
+        e + chrono::seconds(20) + chrono::milliseconds(3001)));
+  }
+
+  const std::vector<LogFile> parts =
+      SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
+
+  for (const auto &x : parts) {
+    LOG(INFO) << x;
+  }
+  ASSERT_EQ(parts.size(), 1u);
+  ASSERT_EQ(parts[0].logger_node, "pi1");
+
+  size_t mapper0_count = 0;
+  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  mapper0.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper0_count; });
+  size_t mapper1_count = 0;
+  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  mapper1.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper1_count; });
+
+  mapper0.AddPeer(&mapper1);
+  mapper1.AddPeer(&mapper0);
+
+  {
+    std::deque<TimestampedMessage> output0;
+
+    EXPECT_EQ(mapper0_count, 0u);
+    EXPECT_EQ(mapper1_count, 0u);
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 1u);
+    EXPECT_EQ(mapper1_count, 0u);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_TRUE(mapper0.started());
+    EXPECT_EQ(mapper0_count, 1u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 2u);
+    EXPECT_EQ(mapper1_count, 0u);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_TRUE(mapper0.started());
+
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_TRUE(mapper0.started());
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper0.Front() == nullptr);
+
+    LOG(INFO) << output0[0];
+    LOG(INFO) << output0[1];
+    LOG(INFO) << output0[2];
+
+    EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output0[0].monotonic_event_time.time,
+              e + chrono::milliseconds(1000));
+    EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
+    EXPECT_EQ(output0[0].monotonic_remote_time.time,
+              e + chrono::seconds(100) + chrono::milliseconds(1000));
+    EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
+    EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
+              e + chrono::seconds(100) + chrono::milliseconds(1001));
+    EXPECT_TRUE(output0[0].data.Verify());
+
+    EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output0[1].monotonic_event_time.time,
+              e + chrono::milliseconds(2000));
+    EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
+    EXPECT_EQ(output0[1].monotonic_remote_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(2000));
+    EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
+    EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(2001));
+    EXPECT_TRUE(output0[1].data.Verify());
+
+    EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output0[2].monotonic_event_time.time,
+              e + chrono::milliseconds(3000));
+    EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
+    EXPECT_EQ(output0[2].monotonic_remote_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(3000));
+    EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
+    EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(3001));
+    EXPECT_TRUE(output0[2].data.Verify());
+  }
+
+  {
+    SCOPED_TRACE("Trying node1 now");
+    std::deque<TimestampedMessage> output1;
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 1u);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_TRUE(mapper1.started());
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 1u);
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 2u);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_TRUE(mapper1.started());
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_TRUE(mapper1.started());
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 3u);
+
+    ASSERT_TRUE(mapper1.Front() == nullptr);
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 3u);
+
+    EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output1[0].monotonic_event_time.time,
+              e + chrono::seconds(100) + chrono::milliseconds(1000));
+    EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
+    EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
+    EXPECT_TRUE(output1[0].data.Verify());
+
+    EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
+    EXPECT_EQ(output1[1].monotonic_event_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(2000));
+    EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
+    EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
+    EXPECT_TRUE(output1[1].data.Verify());
+
+    EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
+    EXPECT_EQ(output1[2].monotonic_event_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(3000));
+    EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
+    EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
+    EXPECT_TRUE(output1[2].data.Verify());
+
+    LOG(INFO) << output1[0];
+    LOG(INFO) << output1[1];
+    LOG(INFO) << output1[2];
+  }
+}
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/logging/logger.fbs b/aos/events/logging/logger.fbs
index 2896963..d5f1248 100644
--- a/aos/events/logging/logger.fbs
+++ b/aos/events/logging/logger.fbs
@@ -93,6 +93,10 @@
 
   // The node the data was logged on, if known and running in a multi-node configuration.
   logger_node:Node (id: 9);
+
+  // The boot UUIDs for all nodes we know them for, or "" for the ones we don't.
+  // TODO(austin): Actually add this in the log writer.
+  boot_uuids:[string] (id: 17);
 }
 
 // Table holding a message.