Handle replaying logs with dropped forwarded messages

If a -> b, unreliably, we will end up with data log files with
non-contiguous queue indices.  Add a test for handling this behavior,
and when it occurs, fall back to a simpler search algorithm.

Change-Id: I4b3e9724e2b527ccde6fd2cdd315eb6ab52fa47c
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 1d83466..a380451 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -1015,6 +1015,64 @@
   }
 }
 
+// Tests that we handle a message which failed to forward or be logged.
+TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+    writer0.QueueSpan(config0_.span());
+    DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+    writer1.QueueSpan(config2_.span());
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+    // Create both the timestamp and message, but don't log them, simulating a
+    // forwarding drop.
+    MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
+    MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
+                         chrono::seconds(100));
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+  ASSERT_EQ(parts[0].logger_node, "pi1");
+  ASSERT_EQ(parts[1].logger_node, "pi2");
+
+  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+  mapper0.AddPeer(&mapper1);
+  mapper1.AddPeer(&mapper0);
+
+  {
+    std::deque<TimestampedMessage> output1;
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+
+    ASSERT_FALSE(mapper1.Front() == nullptr);
+
+    EXPECT_EQ(output1[0].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(1000));
+    EXPECT_TRUE(output1[0].data.Verify());
+    EXPECT_EQ(output1[1].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(3000));
+    EXPECT_TRUE(output1[1].data.Verify());
+  }
+}
+
 // Tests that we properly sort log files with duplicate timestamps.
 TEST_F(TimestampMapperTest, ReadSameTimestamp) {
   const aos::monotonic_clock::time_point e = monotonic_clock::epoch();