Support queueing data in TimestampMapper and viewing timestamps

When solving for the starting time of a log, we don't always have enough
data queued on all the nodes.  We can either do gymnastics to queue
enough data, or we can teach MultiNodeNoncausalOffsetEstimator how to
enqueue more data on each node when asked to evaluate times after the
pre-loaded range.

Teaching MultiNodeNoncausalOffsetEstimator how to automatically enqueue
more data will be more robust in the long term, so let's go that route.

This commit gives us enough hooks to remove the std::deque from inside
LogReader::State and directly use TimestampMapper.  That in turn should
let us automatically enqueue more data when needed, and add it to the
filters automatically.

Change-Id: Ife1248d0b515d9ea9088ace84fa020a351289d0c
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 2fe5473..3850b44 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -708,15 +708,7 @@
 
 TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
     : node_merger_(std::move(parts)),
-      message_{.channel_index = 0xffffffff,
-               .queue_index = 0xffffffff,
-               .monotonic_event_time = monotonic_clock::min_time,
-               .realtime_event_time = realtime_clock::min_time,
-               .remote_queue_index = 0xffffffff,
-               .monotonic_remote_time = monotonic_clock::min_time,
-               .realtime_remote_time = realtime_clock::min_time,
-               .monotonic_timestamp_time = monotonic_clock::min_time,
-               .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()} {
+      timestamp_callback_([](TimestampedMessage *) {}) {
   for (const LogParts *part : node_merger_.Parts()) {
     if (!configuration_) {
       configuration_ = part->config;
@@ -774,8 +766,8 @@
   }
 }
 
-void TimestampMapper::FillMessage(Message *m) {
-  message_ = {
+void TimestampMapper::QueueMessage(Message *m) {
+  matched_messages_.emplace_back(TimestampedMessage{
       .channel_index = m->channel_index,
       .queue_index = m->queue_index,
       .monotonic_event_time = m->timestamp,
@@ -785,7 +777,7 @@
       .monotonic_remote_time = monotonic_clock::min_time,
       .realtime_remote_time = realtime_clock::min_time,
       .monotonic_timestamp_time = monotonic_clock::min_time,
-      .data = std::move(m->data)};
+      .data = std::move(m->data)});
 }
 
 TimestampedMessage *TimestampMapper::Front() {
@@ -794,27 +786,40 @@
     case FirstMessage::kNeedsUpdate:
       break;
     case FirstMessage::kInMessage:
-      return &message_;
+      return &matched_messages_.front();
     case FirstMessage::kNullptr:
       return nullptr;
   }
 
+  if (matched_messages_.empty()) {
+    if (!QueueMatched()) {
+      first_message_ = FirstMessage::kNullptr;
+      return nullptr;
+    }
+  }
+  first_message_ = FirstMessage::kInMessage;
+  return &matched_messages_.front();
+}
+
+bool TimestampMapper::QueueMatched() {
   if (nodes_data_.empty()) {
     // Simple path.  We are single node, so there are no timestamps to match!
     CHECK_EQ(messages_.size(), 0u);
     Message *m = node_merger_.Front();
     if (!m) {
-      first_message_ = FirstMessage::kNullptr;
-      return nullptr;
+      return false;
     }
-    // Fill in message_ so we have a place to associate remote timestamps, and
-    // return it.
-    FillMessage(m);
+    // Enqueue this message into matched_messages_ so we have a place to
+    // associate remote timestamps, and return it.
+    QueueMessage(m);
 
-    CHECK_GE(message_.monotonic_event_time, last_message_time_);
-    last_message_time_ = message_.monotonic_event_time;
-    first_message_ = FirstMessage::kInMessage;
-    return &message_;
+    CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
+    last_message_time_ = matched_messages_.back().monotonic_event_time;
+
+    // We are thin wrapper around node_merger.  Call it directly.
+    node_merger_.PopFront();
+    timestamp_callback_(&matched_messages_.back());
+    return true;
   }
 
   // We need to only add messages to the list so they get processed for messages
@@ -823,8 +828,7 @@
   if (messages_.empty()) {
     if (!Queue()) {
       // Found nothing to add, we are out of data!
-      first_message_ = FirstMessage::kNullptr;
-      return nullptr;
+      return false;
     }
 
     // Now that it has been added (and cannibalized), forget about it upstream.
@@ -835,18 +839,19 @@
 
   if (source_node_[m->channel_index] == node()) {
     // From us, just forward it on, filling the remote data in as invalid.
-    FillMessage(m);
-    CHECK_GE(message_.monotonic_event_time, last_message_time_);
-    last_message_time_ = message_.monotonic_event_time;
-    first_message_ = FirstMessage::kInMessage;
-    return &message_;
+    QueueMessage(m);
+    CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
+    last_message_time_ = matched_messages_.back().monotonic_event_time;
+    messages_.pop_front();
+    timestamp_callback_(&matched_messages_.back());
+    return true;
   } else {
     // Got a timestamp, find the matching remote data, match it, and return it.
     Message data = MatchingMessageFor(*m);
 
     // Return the data from the remote.  The local message only has timestamp
     // info which isn't relevant anymore once extracted.
-    message_ = {
+    matched_messages_.emplace_back(TimestampedMessage{
         .channel_index = m->channel_index,
         .queue_index = m->queue_index,
         .monotonic_event_time = m->timestamp,
@@ -861,11 +866,21 @@
         .monotonic_timestamp_time =
             monotonic_clock::time_point(std::chrono::nanoseconds(
                 m->data.message().monotonic_timestamp_time())),
-        .data = std::move(data.data)};
-    CHECK_GE(message_.monotonic_event_time, last_message_time_);
-    last_message_time_ = message_.monotonic_event_time;
-    first_message_ = FirstMessage::kInMessage;
-    return &message_;
+        .data = std::move(data.data)});
+    CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
+    last_message_time_ = matched_messages_.back().monotonic_event_time;
+    // Since messages_ holds the data, drop it.
+    messages_.pop_front();
+    timestamp_callback_(&matched_messages_.back());
+    return true;
+  }
+}
+
+void TimestampMapper::QueueUntil(monotonic_clock::time_point queue_time) {
+  while (last_message_time_ <= queue_time) {
+    if (!QueueMatched()) {
+      return;
+    }
   }
 }
 
@@ -873,13 +888,7 @@
   CHECK(first_message_ != FirstMessage::kNeedsUpdate);
   first_message_ = FirstMessage::kNeedsUpdate;
 
-  if (nodes_data_.empty()) {
-    // We are thin wrapper around node_merger.  Call it directly.
-    node_merger_.PopFront();
-  } else {
-    // Since messages_ holds the data, drop it.
-    messages_.pop_front();
-  }
+  matched_messages_.pop_front();
 }
 
 Message TimestampMapper::MatchingMessageFor(const Message &message) {
@@ -913,7 +922,7 @@
   std::deque<Message> *data_queue =
       &peer->nodes_data_[node()].channels[message.channel_index].messages;
 
-  peer->QueueUntil(monotonic_remote_time);
+  peer->QueueUnmatchedUntil(monotonic_remote_time);
 
   if (data_queue->empty()) {
     return Message{
@@ -981,7 +990,7 @@
   }
 }
 
-void TimestampMapper::QueueUntil(monotonic_clock::time_point t) {
+void TimestampMapper::QueueUnmatchedUntil(monotonic_clock::time_point t) {
   if (queued_until_ > t) {
     return;
   }
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 94525cf..070a2de 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -463,6 +463,9 @@
 };
 
 // Class to match timestamps with the corresponding data from other nodes.
+//
+// This class also buffers data for the node it represents, and supports
+// notifying when new data is queued as well as queueing until a point in time.
 class TimestampMapper {
  public:
   TimestampMapper(std::vector<LogParts> file);
@@ -512,6 +515,14 @@
   // Returns debug information about this node.
   std::string DebugString() const;
 
+  // Queues data the provided time.
+  void QueueUntil(monotonic_clock::time_point queue_time);
+
+  // Sets a callback to be called whenever a full message is queued.
+  void set_timestamp_callback(std::function<void(TimestampedMessage *)> fn) {
+    timestamp_callback_ = fn;
+  }
+
  private:
   // The state for a remote node.  This holds the data that needs to be matched
   // with the remote node's timestamps.
@@ -546,13 +557,17 @@
   // otherwise.
   bool Queue();
 
+  // Queues up a single matched message into our matched message queue.  Returns
+  // true if one was queued, and false otherwise.
+  bool QueueMatched();
+
   // Queues up data until we have at least one message >= to time t.
   // Useful for triggering a remote node to read enough data to have the
   // timestamp you care about available.
-  void QueueUntil(monotonic_clock::time_point t);
+  void QueueUnmatchedUntil(monotonic_clock::time_point t);
 
-  // Fills message_ with the contents of m.
-  void FillMessage(Message *m);
+  // Queues m into matched_messages_.
+  void QueueMessage(Message *m);
 
   // The node merger to source messages from.
   NodeMerger node_merger_;
@@ -569,10 +584,10 @@
   std::vector<NodeData> nodes_data_;
 
   // Latest message to return.
-  TimestampedMessage message_;
+  std::deque<TimestampedMessage> matched_messages_;
 
-  // Tracks if the first message points to message_, nullptr (all done), or is
-  // invalid.
+  // Tracks the state of the first message in matched_messages_.  Do we need to
+  // update it, is it valid, or should we return nullptr?
   enum class FirstMessage {
     kNeedsUpdate,
     kInMessage,
@@ -585,6 +600,8 @@
   monotonic_clock::time_point last_message_time_ = monotonic_clock::min_time;
   // Time this node is queued up until.  Used for caching.
   monotonic_clock::time_point queued_until_ = monotonic_clock::min_time;
+
+  std::function<void(TimestampedMessage *)> timestamp_callback_;
 };
 
 // Returns the node name with a trailing space, or an empty string if we are on
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 979821e..fc7635a 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -815,8 +815,14 @@
   ASSERT_EQ(parts[0].logger_node, "pi1");
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
+  size_t mapper0_count = 0;
   TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  mapper0.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper0_count; });
+  size_t mapper1_count = 0;
   TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  mapper1.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper1_count; });
 
   mapper0.AddPeer(&mapper1);
   mapper1.AddPeer(&mapper0);
@@ -824,12 +830,20 @@
   {
     std::deque<TimestampedMessage> output0;
 
+    EXPECT_EQ(mapper0_count, 0u);
+    EXPECT_EQ(mapper1_count, 0u);
     ASSERT_TRUE(mapper0.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 1u);
+    EXPECT_EQ(mapper1_count, 0u);
     output0.emplace_back(std::move(*mapper0.Front()));
     mapper0.PopFront();
     EXPECT_EQ(mapper0.sorted_until(), e + chrono::milliseconds(1900));
+    EXPECT_EQ(mapper0_count, 1u);
+    EXPECT_EQ(mapper1_count, 0u);
 
     ASSERT_TRUE(mapper0.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 2u);
+    EXPECT_EQ(mapper1_count, 0u);
     output0.emplace_back(std::move(*mapper0.Front()));
     mapper0.PopFront();
     EXPECT_EQ(mapper0.sorted_until(), e + chrono::milliseconds(2900));
@@ -839,6 +853,9 @@
     mapper0.PopFront();
     EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
 
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 0u);
+
     ASSERT_TRUE(mapper0.Front() == nullptr);
 
     EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
@@ -853,13 +870,22 @@
     SCOPED_TRACE("Trying node1 now");
     std::deque<TimestampedMessage> output1;
 
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 0u);
+
     ASSERT_TRUE(mapper1.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 1u);
     output1.emplace_back(std::move(*mapper1.Front()));
     mapper1.PopFront();
     EXPECT_EQ(mapper1.sorted_until(),
               e + chrono::seconds(100) + chrono::milliseconds(1900));
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 1u);
 
     ASSERT_TRUE(mapper1.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 2u);
     output1.emplace_back(std::move(*mapper1.Front()));
     mapper1.PopFront();
     EXPECT_EQ(mapper1.sorted_until(),
@@ -870,8 +896,14 @@
     mapper1.PopFront();
     EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
 
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 3u);
+
     ASSERT_TRUE(mapper1.Front() == nullptr);
 
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 3u);
+
     EXPECT_EQ(output1[0].monotonic_event_time,
               e + chrono::seconds(100) + chrono::milliseconds(1000));
     EXPECT_TRUE(output1[0].data.Verify());
@@ -920,8 +952,14 @@
 
   ASSERT_EQ(parts.size(), 1u);
 
+  size_t mapper0_count = 0;
   TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  mapper0.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper0_count; });
+  size_t mapper1_count = 0;
   TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  mapper1.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper1_count; });
 
   mapper0.AddPeer(&mapper1);
   mapper1.AddPeer(&mapper0);
@@ -975,6 +1013,9 @@
     EXPECT_EQ(output1[2].monotonic_timestamp_time, monotonic_clock::min_time);
     EXPECT_TRUE(output1[2].data.Verify());
   }
+
+  EXPECT_EQ(mapper0_count, 3u);
+  EXPECT_EQ(mapper1_count, 3u);
 }
 
 // Tests that we can match timestamps on delivered messages.  By doing this in
@@ -1009,8 +1050,14 @@
   ASSERT_EQ(parts[0].logger_node, "pi1");
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
+  size_t mapper0_count = 0;
   TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  mapper0.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper0_count; });
+  size_t mapper1_count = 0;
   TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  mapper1.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper1_count; });
 
   mapper0.AddPeer(&mapper1);
   mapper1.AddPeer(&mapper0);
@@ -1076,6 +1123,9 @@
     EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
     EXPECT_TRUE(output0[2].data.Verify());
   }
+
+  EXPECT_EQ(mapper0_count, 3u);
+  EXPECT_EQ(mapper1_count, 3u);
 }
 
 // Tests that we return just the timestamps if we couldn't find the data and the
@@ -1108,8 +1158,14 @@
   ASSERT_EQ(parts[0].logger_node, "pi1");
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
+  size_t mapper0_count = 0;
   TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  mapper0.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper0_count; });
+  size_t mapper1_count = 0;
   TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  mapper1.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper1_count; });
 
   mapper0.AddPeer(&mapper1);
   mapper1.AddPeer(&mapper0);
@@ -1147,6 +1203,9 @@
               e + chrono::seconds(100) + chrono::milliseconds(3000));
     EXPECT_TRUE(output1[2].data.Verify());
   }
+
+  EXPECT_EQ(mapper0_count, 0u);
+  EXPECT_EQ(mapper1_count, 3u);
 }
 
 // Tests that we return just the timestamps if we couldn't find the data and the
@@ -1179,8 +1238,14 @@
   ASSERT_EQ(parts[0].logger_node, "pi1");
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
+  size_t mapper0_count = 0;
   TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  mapper0.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper0_count; });
+  size_t mapper1_count = 0;
   TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  mapper1.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper1_count; });
 
   mapper0.AddPeer(&mapper1);
   mapper1.AddPeer(&mapper0);
@@ -1218,6 +1283,9 @@
               e + chrono::seconds(100) + chrono::milliseconds(3000));
     EXPECT_FALSE(output1[2].data.Verify());
   }
+
+  EXPECT_EQ(mapper0_count, 0u);
+  EXPECT_EQ(mapper1_count, 3u);
 }
 
 // Tests that we handle a message which failed to forward or be logged.
@@ -1251,8 +1319,14 @@
   ASSERT_EQ(parts[0].logger_node, "pi1");
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
+  size_t mapper0_count = 0;
   TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  mapper0.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper0_count; });
+  size_t mapper1_count = 0;
   TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  mapper1.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper1_count; });
 
   mapper0.AddPeer(&mapper1);
   mapper1.AddPeer(&mapper0);
@@ -1276,6 +1350,9 @@
               e + chrono::seconds(100) + chrono::milliseconds(3000));
     EXPECT_TRUE(output1[1].data.Verify());
   }
+
+  EXPECT_EQ(mapper0_count, 0u);
+  EXPECT_EQ(mapper1_count, 2u);
 }
 
 // Tests that we properly sort log files with duplicate timestamps.
@@ -1313,8 +1390,14 @@
   ASSERT_EQ(parts[0].logger_node, "pi1");
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
+  size_t mapper0_count = 0;
   TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  mapper0.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper0_count; });
+  size_t mapper1_count = 0;
   TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  mapper1.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper1_count; });
 
   mapper0.AddPeer(&mapper1);
   mapper1.AddPeer(&mapper0);
@@ -1343,6 +1426,9 @@
               e + chrono::seconds(100) + chrono::milliseconds(3000));
     EXPECT_TRUE(output1[3].data.Verify());
   }
+
+  EXPECT_EQ(mapper0_count, 0u);
+  EXPECT_EQ(mapper1_count, 4u);
 }
 
 // Tests that we properly sort log files with duplicate timestamps.
@@ -1360,11 +1446,15 @@
   const std::vector<LogFile> parts =
       SortParts({logfile0_, logfile1_, logfile2_});
 
+  size_t mapper0_count = 0;
   TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  mapper0.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper0_count; });
 
   EXPECT_EQ(mapper0.monotonic_start_time(), e + chrono::milliseconds(1));
   EXPECT_EQ(mapper0.realtime_start_time(),
             realtime_clock::time_point(chrono::seconds(1000)));
+  EXPECT_EQ(mapper0_count, 0u);
 }
 
 // Tests that when a peer isn't registered, we treat that as if there was no
@@ -1397,7 +1487,10 @@
   ASSERT_EQ(parts[0].logger_node, "pi1");
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
+  size_t mapper1_count = 0;
   TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  mapper1.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper1_count; });
 
   {
     std::deque<TimestampedMessage> output1;
@@ -1423,6 +1516,161 @@
               e + chrono::seconds(100) + chrono::milliseconds(3000));
     EXPECT_FALSE(output1[2].data.Verify());
   }
+  EXPECT_EQ(mapper1_count, 3u);
+}
+
+// Tests that we can queue messages and call the timestamp callback for both
+// nodes.
+TEST_F(TimestampMapperTest, QueueUntilNode0) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+    writer0.QueueSpan(config0_.span());
+    DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+    writer1.QueueSpan(config2_.span());
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+  ASSERT_EQ(parts[0].logger_node, "pi1");
+  ASSERT_EQ(parts[1].logger_node, "pi2");
+
+  size_t mapper0_count = 0;
+  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  mapper0.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper0_count; });
+  size_t mapper1_count = 0;
+  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  mapper1.set_timestamp_callback(
+      [&](TimestampedMessage *) { ++mapper1_count; });
+
+  mapper0.AddPeer(&mapper1);
+  mapper1.AddPeer(&mapper0);
+
+  {
+    std::deque<TimestampedMessage> output0;
+
+    EXPECT_EQ(mapper0_count, 0u);
+    EXPECT_EQ(mapper1_count, 0u);
+    mapper0.QueueUntil(e + chrono::milliseconds(1000));
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    mapper0.QueueUntil(e + chrono::milliseconds(1500));
+    EXPECT_EQ(mapper0_count, 3u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    mapper0.QueueUntil(e + chrono::milliseconds(2500));
+    EXPECT_EQ(mapper0_count, 4u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+
+    EXPECT_EQ(mapper0_count, 4u);
+    EXPECT_EQ(mapper1_count, 0u);
+
+    ASSERT_TRUE(mapper0.Front() == nullptr);
+
+    EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
+    EXPECT_TRUE(output0[0].data.Verify());
+    EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(1000));
+    EXPECT_TRUE(output0[1].data.Verify());
+    EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(2000));
+    EXPECT_TRUE(output0[2].data.Verify());
+    EXPECT_EQ(output0[3].monotonic_event_time, e + chrono::milliseconds(3000));
+    EXPECT_TRUE(output0[3].data.Verify());
+  }
+
+  {
+    SCOPED_TRACE("Trying node1 now");
+    std::deque<TimestampedMessage> output1;
+
+    EXPECT_EQ(mapper0_count, 4u);
+    EXPECT_EQ(mapper1_count, 0u);
+    mapper1.QueueUntil(e + chrono::seconds(100) + chrono::milliseconds(1000));
+    EXPECT_EQ(mapper0_count, 4u);
+    EXPECT_EQ(mapper1_count, 3u);
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 4u);
+    EXPECT_EQ(mapper1_count, 3u);
+
+    mapper1.QueueUntil(e + chrono::seconds(100) + chrono::milliseconds(1500));
+    EXPECT_EQ(mapper0_count, 4u);
+    EXPECT_EQ(mapper1_count, 3u);
+
+    mapper1.QueueUntil(e + chrono::seconds(100) + chrono::milliseconds(2500));
+    EXPECT_EQ(mapper0_count, 4u);
+    EXPECT_EQ(mapper1_count, 4u);
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    EXPECT_EQ(mapper0_count, 4u);
+    EXPECT_EQ(mapper1_count, 4u);
+
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+
+    EXPECT_EQ(mapper0_count, 4u);
+    EXPECT_EQ(mapper1_count, 4u);
+
+    ASSERT_TRUE(mapper1.Front() == nullptr);
+
+    EXPECT_EQ(mapper0_count, 4u);
+    EXPECT_EQ(mapper1_count, 4u);
+
+    EXPECT_EQ(output1[0].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(1000));
+    EXPECT_TRUE(output1[0].data.Verify());
+    EXPECT_EQ(output1[1].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(1000));
+    EXPECT_TRUE(output1[1].data.Verify());
+    EXPECT_EQ(output1[2].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(2000));
+    EXPECT_TRUE(output1[2].data.Verify());
+    EXPECT_EQ(output1[3].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(3000));
+    EXPECT_TRUE(output1[3].data.Verify());
+  }
 }
 
 }  // namespace testing