Add node_boots parsing from log file headers

Jim suggested that we could store the boot UUID in the header for all
the nodes rather than per message.  That seems reasonable and lets us do
other things with it in the future.  Use it when available to determine
which boot happens when.

When we only have 1 boot for a node, fall back to setting the boot count
to 0 for sorting.  This lets us get started with the new boot counter.

A follow-up commit will start logging with this.

Change-Id: Ia9e0bd2860de82f70f0daa02782d06f1fd4f4218
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 4973d02..e99db94 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -228,11 +228,15 @@
              .queue_index = 0,
              .timestamp =
                  BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
+             .monotonic_remote_boot = 0xffffff,
+             .monotonic_timestamp_boot = 0xffffff,
              .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
   Message m2{.channel_index = 0,
              .queue_index = 0,
              .timestamp =
                  BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
+             .monotonic_remote_boot = 0xffffff,
+             .monotonic_timestamp_boot = 0xffffff,
              .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
 
   EXPECT_LT(m1, m2);
@@ -306,6 +310,16 @@
       "name": "/c",
       "type": "aos.logger.testing.TestMessage",
       "source_node": "pi1"
+    },
+    {
+      "name": "/d",
+      "type": "aos.logger.testing.TestMessage",
+      "source_node": "pi2",
+      "destination_nodes": [
+        {
+          "name": "pi1"
+        }
+      ]
     }
   ],
   "nodes": [
@@ -333,6 +347,13 @@
   "monotonic_start_time": 1000000,
   "realtime_start_time": 1000000000000,
   "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+  "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+  "boot_uuids": [
+    "1d782c63-b3c7-466e-bea9-a01308b43333",
+    "",
+    ""
+  ],
   "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
   "parts_index": 0
 })")),
@@ -349,6 +370,13 @@
   "monotonic_start_time": 1000000,
   "realtime_start_time": 1000000000000,
   "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+  "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+  "boot_uuids": [
+    "1d782c63-b3c7-466e-bea9-a01308b43333",
+    "",
+    ""
+  ],
   "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
   "parts_index": 0
 })")),
@@ -364,6 +392,13 @@
   },
   "monotonic_start_time": 0,
   "realtime_start_time": 1000000000000,
+  "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+  "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+  "boot_uuids": [
+    "",
+    "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+    ""
+  ],
   "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
   "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
   "parts_index": 0
@@ -381,6 +416,13 @@
   "monotonic_start_time": 2000000,
   "realtime_start_time": 1000000000,
   "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
+  "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+  "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+  "boot_uuids": [
+    "1d782c63-b3c7-466e-bea9-a01308b43333",
+    "",
+    ""
+  ],
   "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
   "parts_index": 0
 })")),
@@ -398,11 +440,19 @@
   "realtime_start_time": 1000000000,
   "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
   "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
+  "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+  "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+  "boot_uuids": [
+    "1d782c63-b3c7-466e-bea9-a01308b43333",
+    "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+    ""
+  ],
   "parts_index": 0
 })")) {
     unlink(logfile0_.c_str());
     unlink(logfile1_.c_str());
     unlink(logfile2_.c_str());
+    unlink(logfile3_.c_str());
     queue_index_.resize(kChannels);
   }
 
@@ -487,6 +537,7 @@
   const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
   const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
   const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
+  const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
 
   const aos::FlatbufferDetachedBuffer<Configuration> config_;
   const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
@@ -1817,7 +1868,12 @@
   "parts_index": 0,
   "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
   "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
-  "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464"
+  "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+  "boot_uuids": [
+    "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+    "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+    ""
+  ]
 })")),
         boot1_(MakeHeader(config_, R"({
   /* 100ms */
@@ -1837,7 +1893,12 @@
   "parts_index": 1,
   "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
   "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
-  "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2"
+  "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+  "boot_uuids": [
+    "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+    "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+    ""
+  ]
 })")) {}
 
  protected:
@@ -1921,6 +1982,491 @@
   EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
 }
 
+class RebootTimestampMapperTest : public SortingElementTest {
+ public:
+  RebootTimestampMapperTest()
+      : SortingElementTest(),
+        boot0a_(MakeHeader(config_, R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi1"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 1000000,
+  "realtime_start_time": 1000000000000,
+  "logger_monotonic_start_time": 1000000,
+  "logger_realtime_start_time": 1000000000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
+  "parts_index": 0,
+  "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+  "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+  "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+  "boot_uuids": [
+    "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+    "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+    ""
+  ]
+})")),
+        boot0b_(MakeHeader(config_, R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi1"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 1000000,
+  "realtime_start_time": 1000000000000,
+  "logger_monotonic_start_time": 1000000,
+  "logger_realtime_start_time": 1000000000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
+  "parts_index": 1,
+  "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+  "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+  "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+  "boot_uuids": [
+    "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+    "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+    ""
+  ]
+})")),
+        boot1a_(MakeHeader(config_, R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi2"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 1000000,
+  "realtime_start_time": 1000000000000,
+  "logger_monotonic_start_time": 1000000,
+  "logger_realtime_start_time": 1000000000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
+  "parts_index": 0,
+  "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+  "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+  "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+  "boot_uuids": [
+    "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+    "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+    ""
+  ]
+})")),
+        boot1b_(MakeHeader(config_, R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi2"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 1000000,
+  "realtime_start_time": 1000000000000,
+  "logger_monotonic_start_time": 1000000,
+  "logger_realtime_start_time": 1000000000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
+  "parts_index": 1,
+  "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+  "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+  "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+  "boot_uuids": [
+    "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+    "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+    ""
+  ]
+})")) {}
+
+ protected:
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
+};
+
+
+// Tests that we can match timestamps on delivered messages in the presence of
+// reboots on the node receiving timestamps.
+TEST_F(RebootTimestampMapperTest, ReadNode0First) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer0a(logfile0_, std::make_unique<DummyEncoder>());
+    writer0a.QueueSpan(boot0a_.span());
+    DetachedBufferWriter writer0b(logfile1_, std::make_unique<DummyEncoder>());
+    writer0b.QueueSpan(boot0b_.span());
+    DetachedBufferWriter writer1a(logfile2_, std::make_unique<DummyEncoder>());
+    writer1a.QueueSpan(boot1a_.span());
+    DetachedBufferWriter writer1b(logfile3_, std::make_unique<DummyEncoder>());
+    writer1b.QueueSpan(boot1b_.span());
+
+    writer0a.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+    writer1a.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1000), 0, chrono::seconds(100),
+        e + chrono::milliseconds(1001)));
+
+    writer0b.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+    writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(2000), 0, chrono::seconds(20),
+        e + chrono::milliseconds(2001)));
+
+    writer0b.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+    writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(3000), 0, chrono::seconds(20),
+        e + chrono::milliseconds(3001)));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
+
+  for (const auto &x : parts) {
+    LOG(INFO) << x;
+  }
+  ASSERT_EQ(parts.size(), 1u);
+  ASSERT_EQ(parts[0].logger_node, "pi1");
+
+  size_t mapper0_count = 0;
+  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  mapper0.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper0_count; });
+  size_t mapper1_count = 0;
+  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  mapper1.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper1_count; });
+
+  mapper0.AddPeer(&mapper1);
+  mapper1.AddPeer(&mapper0);
+
+  {
+    std::deque<TimestampedMessage> output0;
+
+    EXPECT_EQ(mapper0_count, 0u);
+    EXPECT_EQ(mapper1_count, 0u);
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 1u);
+    EXPECT_EQ(mapper1_count, 0u);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_TRUE(mapper0.started());
+    EXPECT_EQ(mapper0_count, 1u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 2u);
+    EXPECT_EQ(mapper1_count, 0u);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_TRUE(mapper0.started());
+
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_TRUE(mapper0.started());
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper0.Front() == nullptr);
+
+    LOG(INFO) << output0[0];
+    LOG(INFO) << output0[1];
+    LOG(INFO) << output0[2];
+
+    EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output0[0].monotonic_event_time.time,
+              e + chrono::milliseconds(1000));
+    EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
+    EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
+    EXPECT_TRUE(output0[0].data.Verify());
+
+    EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output0[1].monotonic_event_time.time,
+              e + chrono::milliseconds(2000));
+    EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
+    EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
+    EXPECT_TRUE(output0[1].data.Verify());
+
+    EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output0[2].monotonic_event_time.time,
+              e + chrono::milliseconds(3000));
+    EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
+    EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
+    EXPECT_TRUE(output0[2].data.Verify());
+  }
+
+  {
+    SCOPED_TRACE("Trying node1 now");
+    std::deque<TimestampedMessage> output1;
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 1u);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_TRUE(mapper1.started());
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 1u);
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 2u);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_TRUE(mapper1.started());
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_TRUE(mapper1.started());
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 3u);
+
+    ASSERT_TRUE(mapper1.Front() == nullptr);
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 3u);
+
+    EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output1[0].monotonic_event_time.time,
+              e + chrono::seconds(100) + chrono::milliseconds(1000));
+    EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
+    EXPECT_EQ(output1[0].monotonic_remote_time.time,
+              e + chrono::milliseconds(1000));
+    EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
+    EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
+              e + chrono::milliseconds(1001));
+    EXPECT_TRUE(output1[0].data.Verify());
+
+    EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
+    EXPECT_EQ(output1[1].monotonic_event_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(2000));
+    EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
+    EXPECT_EQ(output1[1].monotonic_remote_time.time,
+              e + chrono::milliseconds(2000));
+    EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
+    EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
+              e + chrono::milliseconds(2001));
+    EXPECT_TRUE(output1[1].data.Verify());
+
+    EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
+    EXPECT_EQ(output1[2].monotonic_event_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(3000));
+    EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
+    EXPECT_EQ(output1[2].monotonic_remote_time.time,
+              e + chrono::milliseconds(3000));
+    EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
+    EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
+              e + chrono::milliseconds(3001));
+    EXPECT_TRUE(output1[2].data.Verify());
+
+    LOG(INFO) << output1[0];
+    LOG(INFO) << output1[1];
+    LOG(INFO) << output1[2];
+  }
+}
+
+TEST_F(RebootTimestampMapperTest, Node2Reboot) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer0a(logfile0_, std::make_unique<DummyEncoder>());
+    writer0a.QueueSpan(boot0a_.span());
+    DetachedBufferWriter writer0b(logfile1_, std::make_unique<DummyEncoder>());
+    writer0b.QueueSpan(boot0b_.span());
+    DetachedBufferWriter writer1a(logfile2_, std::make_unique<DummyEncoder>());
+    writer1a.QueueSpan(boot1a_.span());
+    DetachedBufferWriter writer1b(logfile3_, std::make_unique<DummyEncoder>());
+    writer1b.QueueSpan(boot1b_.span());
+
+    writer1a.QueueSizedFlatbuffer(MakeLogMessage(
+        e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
+    writer0a.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
+        chrono::seconds(-100),
+        e + chrono::seconds(100) + chrono::milliseconds(1001)));
+
+    writer1b.QueueSizedFlatbuffer(MakeLogMessage(
+        e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
+    writer0b.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
+        chrono::seconds(-20),
+        e + chrono::seconds(20) + chrono::milliseconds(2001)));
+
+    writer1b.QueueSizedFlatbuffer(MakeLogMessage(
+        e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
+    writer0b.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
+        chrono::seconds(-20),
+        e + chrono::seconds(20) + chrono::milliseconds(3001)));
+  }
+
+  const std::vector<LogFile> parts =
+      SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
+
+  for (const auto &x : parts) {
+    LOG(INFO) << x;
+  }
+  ASSERT_EQ(parts.size(), 1u);
+  ASSERT_EQ(parts[0].logger_node, "pi1");
+
+  size_t mapper0_count = 0;
+  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  mapper0.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper0_count; });
+  size_t mapper1_count = 0;
+  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  mapper1.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper1_count; });
+
+  mapper0.AddPeer(&mapper1);
+  mapper1.AddPeer(&mapper0);
+
+  {
+    std::deque<TimestampedMessage> output0;
+
+    EXPECT_EQ(mapper0_count, 0u);
+    EXPECT_EQ(mapper1_count, 0u);
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 1u);
+    EXPECT_EQ(mapper1_count, 0u);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_TRUE(mapper0.started());
+    EXPECT_EQ(mapper0_count, 1u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 2u);
+    EXPECT_EQ(mapper1_count, 0u);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_TRUE(mapper0.started());
+
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_TRUE(mapper0.started());
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper0.Front() == nullptr);
+
+    LOG(INFO) << output0[0];
+    LOG(INFO) << output0[1];
+    LOG(INFO) << output0[2];
+
+    EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output0[0].monotonic_event_time.time,
+              e + chrono::milliseconds(1000));
+    EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
+    EXPECT_EQ(output0[0].monotonic_remote_time.time,
+              e + chrono::seconds(100) + chrono::milliseconds(1000));
+    EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
+    EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
+              e + chrono::seconds(100) + chrono::milliseconds(1001));
+    EXPECT_TRUE(output0[0].data.Verify());
+
+    EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output0[1].monotonic_event_time.time,
+              e + chrono::milliseconds(2000));
+    EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
+    EXPECT_EQ(output0[1].monotonic_remote_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(2000));
+    EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
+    EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(2001));
+    EXPECT_TRUE(output0[1].data.Verify());
+
+    EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output0[2].monotonic_event_time.time,
+              e + chrono::milliseconds(3000));
+    EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
+    EXPECT_EQ(output0[2].monotonic_remote_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(3000));
+    EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
+    EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(3001));
+    EXPECT_TRUE(output0[2].data.Verify());
+  }
+
+  {
+    SCOPED_TRACE("Trying node1 now");
+    std::deque<TimestampedMessage> output1;
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 1u);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_TRUE(mapper1.started());
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 1u);
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 2u);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_TRUE(mapper1.started());
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_TRUE(mapper1.started());
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 3u);
+
+    ASSERT_TRUE(mapper1.Front() == nullptr);
+
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 3u);
+
+    EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
+    EXPECT_EQ(output1[0].monotonic_event_time.time,
+              e + chrono::seconds(100) + chrono::milliseconds(1000));
+    EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
+    EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
+    EXPECT_TRUE(output1[0].data.Verify());
+
+    EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
+    EXPECT_EQ(output1[1].monotonic_event_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(2000));
+    EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
+    EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
+    EXPECT_TRUE(output1[1].data.Verify());
+
+    EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
+    EXPECT_EQ(output1[2].monotonic_event_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(3000));
+    EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
+    EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
+    EXPECT_TRUE(output1[2].data.Verify());
+
+    LOG(INFO) << output1[0];
+    LOG(INFO) << output1[1];
+    LOG(INFO) << output1[2];
+  }
+}
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos