Merge "Add node_boots parsing from log file headers"
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 86d3930..118ddf7 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -33,7 +33,7 @@
 }
 
 bool ConfigOnly(const LogFileHeader *header) {
-  CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 17u);
+  CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 18u);
   if (header->has_monotonic_start_time()) return false;
   if (header->has_realtime_start_time()) return false;
   if (header->has_max_out_of_order_duration()) return false;
@@ -50,6 +50,7 @@
   if (header->has_parts_uuid()) return false;
   if (header->has_parts_index()) return false;
   if (header->has_logger_node()) return false;
+  if (header->has_boot_uuids()) return false;
 
   return header->has_configuration();
 }
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 7600c7e..cd2ceb8 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -516,7 +516,52 @@
 }
 
 PartsMessageReader::PartsMessageReader(LogParts log_parts)
-    : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {}
+    : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
+  ComputeBootCounts();
+}
+
+void PartsMessageReader::ComputeBootCounts() {
+  boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
+                      std::nullopt);
+
+  // We have 3 vintages of log files with different amounts of information.
+  if (log_file_header()->has_boot_uuids()) {
+    // The new hotness with the boots explicitly listed out.  We can use the log
+    // file header to compute the boot count of all relevant nodes.
+    CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
+    size_t node_index = 0;
+    for (const flatbuffers::String *boot_uuid :
+         *log_file_header()->boot_uuids()) {
+      CHECK(parts_.boots);
+      if (boot_uuid->size() != 0) {
+        auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
+        if (it != parts_.boots->boot_count_map.end()) {
+          boot_counts_[node_index] = it->second;
+        }
+      } else if (parts().boots->boots[node_index].size() == 1u) {
+        boot_counts_[node_index] = 0;
+      }
+      ++node_index;
+    }
+  } else {
+    // Older multi-node logs which are guarenteed to have UUIDs logged, or
+    // single node log files with boot UUIDs in the header.  We only know how to
+    // order certain boots in certain circumstances.
+    if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
+      for (size_t node_index = 0; node_index < boot_counts_.size();
+           ++node_index) {
+        CHECK(parts_.boots);
+        if (parts().boots->boots[node_index].size() == 1u) {
+          boot_counts_[node_index] = 0;
+        }
+      }
+    } else {
+      // Really old single node logs without any UUIDs.  They can't reboot.
+      CHECK_EQ(boot_counts_.size(), 1u);
+      boot_counts_[0] = 0u;
+    }
+  }
+}
 
 std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
 PartsMessageReader::ReadMessage() {
@@ -557,6 +602,7 @@
     return;
   }
   message_reader_ = MessageReader(parts_.parts[next_part_index_]);
+  ComputeBootCounts();
   ++next_part_index_;
 }
 
@@ -625,7 +671,9 @@
 }
 
 LogPartsSorter::LogPartsSorter(LogParts log_parts)
-    : parts_message_reader_(log_parts) {}
+    : parts_message_reader_(log_parts),
+      source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
+}
 
 Message *LogPartsSorter::Front() {
   // Queue up data until enough data has been queued that the front message is
@@ -633,7 +681,8 @@
   // sure the nothing path is checked quickly.
   if (sorted_until() != monotonic_clock::max_time) {
     while (true) {
-      if (!messages_.empty() && messages_.begin()->timestamp.time < sorted_until() &&
+      if (!messages_.empty() &&
+          messages_.begin()->timestamp.time < sorted_until() &&
           sorted_until() >= monotonic_start_time()) {
         break;
       }
@@ -646,6 +695,20 @@
         break;
       }
 
+      size_t monotonic_timestamp_boot = 0;
+      if (m.value().message().has_monotonic_timestamp_time()) {
+        monotonic_timestamp_boot = parts().logger_boot_count;
+      }
+      size_t monotonic_remote_boot = 0xffffff;
+
+      if (m.value().message().has_monotonic_remote_time()) {
+        std::optional<size_t> boot = parts_message_reader_.boot_count(
+            source_node_index_[m->message().channel_index()]);
+        CHECK(boot) << ": Failed to find boot for node "
+                    << source_node_index_[m->message().channel_index()];
+        monotonic_remote_boot = *boot;
+      }
+
       messages_.insert(Message{
           .channel_index = m.value().message().channel_index(),
           .queue_index = m.value().message().queue_index(),
@@ -654,6 +717,8 @@
                   .boot = parts().boot_count,
                   .time = monotonic_clock::time_point(std::chrono::nanoseconds(
                       m.value().message().monotonic_sent_time()))},
+          .monotonic_remote_boot = monotonic_remote_boot,
+          .monotonic_timestamp_boot = monotonic_timestamp_boot,
           .data = std::move(m.value())});
 
       // Now, update sorted_until_ to match the new message.
@@ -802,9 +867,7 @@
 
   // Now, we need to split things out by boot.
   for (size_t i = 0; i < files.size(); ++i) {
-    LOG(INFO) << "Trying file " << i;
     const size_t boot_count = files[i].boot_count;
-    LOG(INFO) << "Boot count " << boot_count;
     if (boot_count + 1 > boots.size()) {
       boots.resize(boot_count + 1);
     }
@@ -813,9 +876,9 @@
 
   node_mergers_.reserve(boots.size());
   for (size_t i = 0; i < boots.size(); ++i) {
-    LOG(INFO) << "Boot " << i;
+    VLOG(2) << "Boot " << i;
     for (auto &p : boots[i]) {
-      LOG(INFO) << "Part " << p;
+      VLOG(2) << "Part " << p;
     }
     node_mergers_.emplace_back(
         std::make_unique<NodeMerger>(std::move(boots[i])));
@@ -1007,14 +1070,15 @@
             std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
         .remote_queue_index = m->data.message().remote_queue_index(),
         .monotonic_remote_time =
-            // TODO(austin): 0 is wrong...
-        {0, monotonic_clock::time_point(std::chrono::nanoseconds(
-                m->data.message().monotonic_remote_time()))},
+            {m->monotonic_remote_boot,
+             monotonic_clock::time_point(std::chrono::nanoseconds(
+                 m->data.message().monotonic_remote_time()))},
         .realtime_remote_time = realtime_clock::time_point(
             std::chrono::nanoseconds(m->data.message().realtime_remote_time())),
         .monotonic_timestamp_time =
-            {0, monotonic_clock::time_point(std::chrono::nanoseconds(
-                    m->data.message().monotonic_timestamp_time()))},
+            {m->monotonic_timestamp_boot,
+             monotonic_clock::time_point(std::chrono::nanoseconds(
+                 m->data.message().monotonic_timestamp_time()))},
         .data = std::move(data.data)});
     CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
     last_message_time_ = matched_messages_.back().monotonic_event_time;
@@ -1080,7 +1144,7 @@
   CHECK(message.data.message().has_realtime_remote_time());
 
   const BootTimestamp monotonic_remote_time{
-      .boot = 0,
+      .boot = message.monotonic_remote_boot,
       .time = monotonic_clock::time_point(std::chrono::nanoseconds(
           message.data.message().monotonic_remote_time()))};
   const realtime_clock::time_point realtime_remote_time(
@@ -1098,6 +1162,8 @@
         .channel_index = message.channel_index,
         .queue_index = remote_queue_index,
         .timestamp = monotonic_remote_time,
+        .monotonic_remote_boot = 0xffffff,
+        .monotonic_timestamp_boot = 0xffffff,
         .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
   }
 
@@ -1112,6 +1178,8 @@
         .channel_index = message.channel_index,
         .queue_index = remote_queue_index,
         .timestamp = monotonic_remote_time,
+        .monotonic_remote_boot = 0xffffff,
+        .monotonic_timestamp_boot = 0xffffff,
         .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
   }
 
@@ -1121,6 +1189,8 @@
         .channel_index = message.channel_index,
         .queue_index = remote_queue_index,
         .timestamp = monotonic_remote_time,
+        .monotonic_remote_boot = 0xffffff,
+        .monotonic_timestamp_boot = 0xffffff,
         .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
   }
 
@@ -1155,6 +1225,8 @@
           .channel_index = message.channel_index,
           .queue_index = remote_queue_index,
           .timestamp = monotonic_remote_time,
+          .monotonic_remote_boot = 0xffffff,
+          .monotonic_timestamp_boot = 0xffffff,
           .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
     }
 
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 93d9e8b..56b582f 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -334,10 +334,19 @@
   // Note: reading the next message may change the max_out_of_order_duration().
   std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadMessage();
 
+  // Returns the boot count for the requested node, or std::nullopt if we don't
+  // know.
+  std::optional<size_t> boot_count(size_t node_index) const {
+    CHECK_GE(node_index, 0u);
+    CHECK_LT(node_index, boot_counts_.size());
+    return boot_counts_[node_index];
+  }
+
  private:
   // Opens the next log and updates message_reader_.  Sets done_ if there is
   // nothing more to do.
   void NextLog();
+  void ComputeBootCounts();
 
   const LogParts parts_;
   size_t next_part_index_ = 1u;
@@ -354,6 +363,9 @@
   bool after_start_ = false;
 
   monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
+
+  // Per node boot counts.
+  std::vector<std::optional<size_t>> boot_counts_;
 };
 
 // Struct to hold a message as it gets sorted on a single node.
@@ -365,6 +377,11 @@
   // The local timestamp.
   BootTimestamp timestamp;
 
+  // Remote boot when this is a timestamp.
+  size_t monotonic_remote_boot = 0xffffff;
+
+  size_t monotonic_timestamp_boot = 0xffffff;
+
   // The data (either a timestamp header, or a data header).
   SizePrefixedFlatbufferVector<MessageHeader> data;
 
@@ -437,6 +454,10 @@
   // Set used for efficient sorting of messages.  We can benchmark and evaluate
   // other data structures if this proves to be the bottleneck.
   absl::btree_set<Message> messages_;
+
+  // Mapping from channel to source node.
+  // TODO(austin): Should we put this in Boots so it can be cached for everyone?
+  std::vector<size_t> source_node_index_;
 };
 
 // Class to run merge sort on the messages from multiple LogPartsSorter
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 4973d02..e99db94 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -228,11 +228,15 @@
              .queue_index = 0,
              .timestamp =
                  BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
+             .monotonic_remote_boot = 0xffffff,
+             .monotonic_timestamp_boot = 0xffffff,
              .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
   Message m2{.channel_index = 0,
              .queue_index = 0,
              .timestamp =
                  BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
+             .monotonic_remote_boot = 0xffffff,
+             .monotonic_timestamp_boot = 0xffffff,
              .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
 
   EXPECT_LT(m1, m2);
@@ -306,6 +310,16 @@
       "name": "/c",
       "type": "aos.logger.testing.TestMessage",
       "source_node": "pi1"
+    },
+    {
+      "name": "/d",
+      "type": "aos.logger.testing.TestMessage",
+      "source_node": "pi2",
+      "destination_nodes": [
+        {
+          "name": "pi1"
+        }
+      ]
     }
   ],
   "nodes": [
@@ -333,6 +347,13 @@
   "monotonic_start_time": 1000000,
   "realtime_start_time": 1000000000000,
   "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+  "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+  "boot_uuids": [
+    "1d782c63-b3c7-466e-bea9-a01308b43333",
+    "",
+    ""
+  ],
   "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
   "parts_index": 0
 })")),
@@ -349,6 +370,13 @@
   "monotonic_start_time": 1000000,
   "realtime_start_time": 1000000000000,
   "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+  "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+  "boot_uuids": [
+    "1d782c63-b3c7-466e-bea9-a01308b43333",
+    "",
+    ""
+  ],
   "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
   "parts_index": 0
 })")),
@@ -364,6 +392,13 @@
   },
   "monotonic_start_time": 0,
   "realtime_start_time": 1000000000000,
+  "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+  "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+  "boot_uuids": [
+    "",
+    "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+    ""
+  ],
   "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
   "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
   "parts_index": 0
@@ -381,6 +416,13 @@
   "monotonic_start_time": 2000000,
   "realtime_start_time": 1000000000,
   "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
+  "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+  "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+  "boot_uuids": [
+    "1d782c63-b3c7-466e-bea9-a01308b43333",
+    "",
+    ""
+  ],
   "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
   "parts_index": 0
 })")),
@@ -398,11 +440,19 @@
   "realtime_start_time": 1000000000,
   "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
   "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
+  "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+  "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+  "boot_uuids": [
+    "1d782c63-b3c7-466e-bea9-a01308b43333",
+    "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+    ""
+  ],
   "parts_index": 0
 })")) {
     unlink(logfile0_.c_str());
     unlink(logfile1_.c_str());
     unlink(logfile2_.c_str());
+    unlink(logfile3_.c_str());
     queue_index_.resize(kChannels);
   }
 
@@ -487,6 +537,7 @@
   const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
   const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
   const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
+  const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
 
   const aos::FlatbufferDetachedBuffer<Configuration> config_;
   const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
@@ -1817,7 +1868,12 @@
   "parts_index": 0,
   "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
   "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
-  "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464"
+  "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+  "boot_uuids": [
+    "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+    "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+    ""
+  ]
 })")),
         boot1_(MakeHeader(config_, R"({
   /* 100ms */
@@ -1837,7 +1893,12 @@
   "parts_index": 1,
   "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
   "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
-  "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2"
+  "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+  "boot_uuids": [
+    "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+    "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+    ""
+  ]
 })")) {}
 
  protected:
@@ -1921,6 +1982,491 @@
   EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
 }
 
+class RebootTimestampMapperTest : public SortingElementTest {
+ public:
+  RebootTimestampMapperTest()
+      : SortingElementTest(),
+        boot0a_(MakeHeader(config_, R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi1"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 1000000,
+  "realtime_start_time": 1000000000000,
+  "logger_monotonic_start_time": 1000000,
+  "logger_realtime_start_time": 1000000000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
+  "parts_index": 0,
+  "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+  "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+  "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+  "boot_uuids": [
+    "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+    "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+    ""
+  ]
+})")),
+        boot0b_(MakeHeader(config_, R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi1"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 1000000,
+  "realtime_start_time": 1000000000000,
+  "logger_monotonic_start_time": 1000000,
+  "logger_realtime_start_time": 1000000000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
+  "parts_index": 1,
+  "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+  "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+  "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+  "boot_uuids": [
+    "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+    "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+    ""
+  ]
+})")),
+        boot1a_(MakeHeader(config_, R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi2"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 1000000,
+  "realtime_start_time": 1000000000000,
+  "logger_monotonic_start_time": 1000000,
+  "logger_realtime_start_time": 1000000000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
+  "parts_index": 0,
+  "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+  "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+  "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+  "boot_uuids": [
+    "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+    "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+    ""
+  ]
+})")),
+        boot1b_(MakeHeader(config_, R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi2"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 1000000,
+  "realtime_start_time": 1000000000000,
+  "logger_monotonic_start_time": 1000000,
+  "logger_realtime_start_time": 1000000000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
+  "parts_index": 1,
+  "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+  "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+  "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+  "boot_uuids": [
+    "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+    "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+    ""
+  ]
+})")) {}
+
+ protected:
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
+};
+
+
+// Tests that we can match timestamps on delivered messages in the presence of
+// reboots on the node receiving timestamps.
+TEST_F(RebootTimestampMapperTest, ReadNode0First) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer0a(logfile0_, std::make_unique<DummyEncoder>());
+    writer0a.QueueSpan(boot0a_.span());
+    DetachedBufferWriter writer0b(logfile1_, std::make_unique<DummyEncoder>());
+    writer0b.QueueSpan(boot0b_.span());
+    DetachedBufferWriter writer1a(logfile2_, std::make_unique<DummyEncoder>());
+    writer1a.QueueSpan(boot1a_.span());
+    DetachedBufferWriter writer1b(logfile3_, std::make_unique<DummyEncoder>());
+    writer1b.QueueSpan(boot1b_.span());
+
+    writer0a.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+    writer1a.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1000), 0, chrono::seconds(100),
+        e + chrono::milliseconds(1001)));
+
+    writer0b.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+    writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(2000), 0, chrono::seconds(20),
+        e + chrono::milliseconds(2001)));
+
+    writer0b.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+    writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(3000), 0, chrono::seconds(20),
+        e + chrono::milliseconds(3001)));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
+
+  for (const auto &x : parts) {
+    LOG(INFO) << x;
+  }
+  ASSERT_EQ(parts.size(), 1u);
+  ASSERT_EQ(parts[0].logger_node, "pi1");
+
+  size_t mapper0_count = 0;
+  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  mapper0.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper0_count; });
+  size_t mapper1_count = 0;
+  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  mapper1.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper1_count; });
+
+  mapper0.AddPeer(&mapper1);
+  mapper1.AddPeer(&mapper0);
+
+  {
+    std::deque<TimestampedMessage> output0;
+
+    EXPECT_EQ(mapper0_count, 0u);
+    EXPECT_EQ(mapper1_count, 0u);
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 1u);
+    EXPECT_EQ(mapper1_count, 0u);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_TRUE(mapper0.started());
+    EXPECT_EQ(mapper0_count, 1u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 2u);
+    EXPECT_EQ(mapper1_count, 0u);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_TRUE(mapper0.started());
+
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_TRUE(mapper0.started());
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper0.Front() == nullptr);
+
+    LOG(INFO) << output0[0];
+    LOG(INFO) << output0[1];
+    LOG(INFO) << output0[2];
+
+    EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output0[0].monotonic_event_time.time,
+              e + chrono::milliseconds(1000));
+    EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
+    EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
+    EXPECT_TRUE(output0[0].data.Verify());
+
+    EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output0[1].monotonic_event_time.time,
+              e + chrono::milliseconds(2000));
+    EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
+    EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
+    EXPECT_TRUE(output0[1].data.Verify());
+
+    EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output0[2].monotonic_event_time.time,
+              e + chrono::milliseconds(3000));
+    EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
+    EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
+    EXPECT_TRUE(output0[2].data.Verify());
+  }
+
+  {
+    SCOPED_TRACE("Trying node1 now");
+    std::deque<TimestampedMessage> output1;
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 1u);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_TRUE(mapper1.started());
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 1u);
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 2u);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_TRUE(mapper1.started());
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_TRUE(mapper1.started());
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 3u);
+
+    ASSERT_TRUE(mapper1.Front() == nullptr);
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 3u);
+
+    EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output1[0].monotonic_event_time.time,
+              e + chrono::seconds(100) + chrono::milliseconds(1000));
+    EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
+    EXPECT_EQ(output1[0].monotonic_remote_time.time,
+              e + chrono::milliseconds(1000));
+    EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
+    EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
+              e + chrono::milliseconds(1001));
+    EXPECT_TRUE(output1[0].data.Verify());
+
+    EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
+    EXPECT_EQ(output1[1].monotonic_event_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(2000));
+    EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
+    EXPECT_EQ(output1[1].monotonic_remote_time.time,
+              e + chrono::milliseconds(2000));
+    EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
+    EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
+              e + chrono::milliseconds(2001));
+    EXPECT_TRUE(output1[1].data.Verify());
+
+    EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
+    EXPECT_EQ(output1[2].monotonic_event_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(3000));
+    EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
+    EXPECT_EQ(output1[2].monotonic_remote_time.time,
+              e + chrono::milliseconds(3000));
+    EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
+    EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
+              e + chrono::milliseconds(3001));
+    EXPECT_TRUE(output1[2].data.Verify());
+
+    LOG(INFO) << output1[0];
+    LOG(INFO) << output1[1];
+    LOG(INFO) << output1[2];
+  }
+}
+
+TEST_F(RebootTimestampMapperTest, Node2Reboot) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer0a(logfile0_, std::make_unique<DummyEncoder>());
+    writer0a.QueueSpan(boot0a_.span());
+    DetachedBufferWriter writer0b(logfile1_, std::make_unique<DummyEncoder>());
+    writer0b.QueueSpan(boot0b_.span());
+    DetachedBufferWriter writer1a(logfile2_, std::make_unique<DummyEncoder>());
+    writer1a.QueueSpan(boot1a_.span());
+    DetachedBufferWriter writer1b(logfile3_, std::make_unique<DummyEncoder>());
+    writer1b.QueueSpan(boot1b_.span());
+
+    writer1a.QueueSizedFlatbuffer(MakeLogMessage(
+        e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
+    writer0a.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
+        chrono::seconds(-100),
+        e + chrono::seconds(100) + chrono::milliseconds(1001)));
+
+    writer1b.QueueSizedFlatbuffer(MakeLogMessage(
+        e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
+    writer0b.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
+        chrono::seconds(-20),
+        e + chrono::seconds(20) + chrono::milliseconds(2001)));
+
+    writer1b.QueueSizedFlatbuffer(MakeLogMessage(
+        e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
+    writer0b.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
+        chrono::seconds(-20),
+        e + chrono::seconds(20) + chrono::milliseconds(3001)));
+  }
+
+  const std::vector<LogFile> parts =
+      SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
+
+  for (const auto &x : parts) {
+    LOG(INFO) << x;
+  }
+  ASSERT_EQ(parts.size(), 1u);
+  ASSERT_EQ(parts[0].logger_node, "pi1");
+
+  size_t mapper0_count = 0;
+  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  mapper0.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper0_count; });
+  size_t mapper1_count = 0;
+  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  mapper1.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper1_count; });
+
+  mapper0.AddPeer(&mapper1);
+  mapper1.AddPeer(&mapper0);
+
+  {
+    std::deque<TimestampedMessage> output0;
+
+    EXPECT_EQ(mapper0_count, 0u);
+    EXPECT_EQ(mapper1_count, 0u);
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 1u);
+    EXPECT_EQ(mapper1_count, 0u);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_TRUE(mapper0.started());
+    EXPECT_EQ(mapper0_count, 1u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 2u);
+    EXPECT_EQ(mapper1_count, 0u);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_TRUE(mapper0.started());
+
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_TRUE(mapper0.started());
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper0.Front() == nullptr);
+
+    LOG(INFO) << output0[0];
+    LOG(INFO) << output0[1];
+    LOG(INFO) << output0[2];
+
+    EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output0[0].monotonic_event_time.time,
+              e + chrono::milliseconds(1000));
+    EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
+    EXPECT_EQ(output0[0].monotonic_remote_time.time,
+              e + chrono::seconds(100) + chrono::milliseconds(1000));
+    EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
+    EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
+              e + chrono::seconds(100) + chrono::milliseconds(1001));
+    EXPECT_TRUE(output0[0].data.Verify());
+
+    EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output0[1].monotonic_event_time.time,
+              e + chrono::milliseconds(2000));
+    EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
+    EXPECT_EQ(output0[1].monotonic_remote_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(2000));
+    EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
+    EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(2001));
+    EXPECT_TRUE(output0[1].data.Verify());
+
+    EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output0[2].monotonic_event_time.time,
+              e + chrono::milliseconds(3000));
+    EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
+    EXPECT_EQ(output0[2].monotonic_remote_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(3000));
+    EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
+    EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(3001));
+    EXPECT_TRUE(output0[2].data.Verify());
+  }
+
+  {
+    SCOPED_TRACE("Trying node1 now");
+    std::deque<TimestampedMessage> output1;
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 1u);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_TRUE(mapper1.started());
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 1u);
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 2u);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_TRUE(mapper1.started());
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_TRUE(mapper1.started());
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 3u);
+
+    ASSERT_TRUE(mapper1.Front() == nullptr);
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 3u);
+
+    EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output1[0].monotonic_event_time.time,
+              e + chrono::seconds(100) + chrono::milliseconds(1000));
+    EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
+    EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
+    EXPECT_TRUE(output1[0].data.Verify());
+
+    EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
+    EXPECT_EQ(output1[1].monotonic_event_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(2000));
+    EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
+    EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
+    EXPECT_TRUE(output1[1].data.Verify());
+
+    EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
+    EXPECT_EQ(output1[2].monotonic_event_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(3000));
+    EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
+    EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
+    EXPECT_TRUE(output1[2].data.Verify());
+
+    LOG(INFO) << output1[0];
+    LOG(INFO) << output1[1];
+    LOG(INFO) << output1[2];
+  }
+}
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/logging/logger.fbs b/aos/events/logging/logger.fbs
index 2896963..d5f1248 100644
--- a/aos/events/logging/logger.fbs
+++ b/aos/events/logging/logger.fbs
@@ -93,6 +93,10 @@
 
   // The node the data was logged on, if known and running in a multi-node configuration.
   logger_node:Node (id: 9);
+
+  // The boot UUIDs for all nodes we know them for, or "" for the ones we don't.
+  // TODO(austin): Actually add this in the log writer.
+  boot_uuids:[string] (id: 17);
 }
 
 // Table holding a message.