Handle messages from before the start time

These messages violate the "max_out_of_order" time by definition.  We
log one message per channel, round robin.  So, queue them all up before
continuing.

Change-Id: Ida38292630a6dd1035ff59e931c3053d127aafd9
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 2886a8e..ec79f17 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -446,9 +446,11 @@
       // time.
       // TODO(austin): Does this work with startup when we don't know the remote
       // start time too?  Look at one of those logs to compare.
-      CHECK_GE(monotonic_sent_time,
-               newest_timestamp_ - max_out_of_order_duration())
-          << ": Max out of order exceeded.";
+      if (monotonic_sent_time > parts_.monotonic_start_time) {
+        CHECK_GE(monotonic_sent_time,
+                 newest_timestamp_ - max_out_of_order_duration())
+            << ": Max out of order exceeded. " << parts_;
+      }
       return message;
     }
     NextLog();
@@ -532,7 +534,8 @@
   // sure the nothing path is checked quickly.
   if (sorted_until() != monotonic_clock::max_time) {
     while (true) {
-      if (!messages_.empty() && messages_.begin()->timestamp < sorted_until()) {
+      if (!messages_.empty() && messages_.begin()->timestamp < sorted_until() &&
+          sorted_until() >= monotonic_start_time()) {
         break;
       }
 
@@ -566,9 +569,12 @@
   // Now that we have enough data queued, return a pointer to the oldest piece
   // of data if it exists.
   if (messages_.empty()) {
+    last_message_time_ = monotonic_clock::max_time;
     return nullptr;
   }
 
+  CHECK_GE(messages_.begin()->timestamp, last_message_time_);
+  last_message_time_ = messages_.begin()->timestamp;
   return &(*messages_.begin());
 }
 
@@ -610,7 +616,9 @@
 Message *NodeMerger::Front() {
   // Return the current Front if we have one, otherwise go compute one.
   if (current_ != nullptr) {
-    return current_->Front();
+    Message *result = current_->Front();
+    CHECK_GE(result->timestamp, last_message_time_);
+    return result;
   }
 
   // Otherwise, do a simple search for the oldest message, deduplicating any
@@ -636,6 +644,13 @@
     sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
   }
 
+  if (oldest) {
+    CHECK_GE(oldest->timestamp, last_message_time_);
+    last_message_time_ = oldest->timestamp;
+  } else {
+    last_message_time_ = monotonic_clock::max_time;
+  }
+
   // Return the oldest message found.  This will be nullptr if nothing was
   // found, indicating there is nothing left.
   return oldest;
@@ -934,7 +949,7 @@
       if (channel_data.messages.empty()) {
         continue;
       }
- 
+
       ss << "  channel " << channel_index << " [\n";
       for (const Message &m : channel_data.messages) {
         ss << "    " << m << "\n";
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 4a19209..d24e0af 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -394,6 +394,10 @@
   // Cache of the time we are sorted until.
   aos::monotonic_clock::time_point sorted_until_ = monotonic_clock::min_time;
 
+  // Timestamp of the last message returned.  Used to make sure nothing goes
+  // backwards.
+  monotonic_clock::time_point last_message_time_ = monotonic_clock::min_time;
+
   // Set used for efficient sorting of messages.  We can benchmark and evaluate
   // other data structures if this proves to be the bottleneck.
   absl::btree_set<Message> messages_;
@@ -443,6 +447,10 @@
   // Cached node.
   int node_;
 
+  // Timestamp of the last message returned.  Used to make sure nothing goes
+  // backwards.
+  monotonic_clock::time_point last_message_time_ = monotonic_clock::min_time;
+
   realtime_clock::time_point realtime_start_time_ = realtime_clock::max_time;
   monotonic_clock::time_point monotonic_start_time_ = monotonic_clock::max_time;
 };
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 3bed06d..1d83466 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -510,6 +510,53 @@
   EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(2000));
 }
 
+// Tests that we can pull messages out of a log sorted in order.
+TEST_F(LogPartsSorterTest, WayBeforeStart) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
+    writer.QueueSpan(config0_.span());
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_});
+
+  LogPartsSorter parts_sorter(parts[0].parts[0]);
+
+  // Confirm we aren't sorted until any time until the message is popped.
+  // Peeking shouldn't change the sorted until time.
+  EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
+
+  std::deque<Message> output;
+
+  for (monotonic_clock::time_point t :
+       {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
+        e + chrono::milliseconds(1900), monotonic_clock::max_time,
+        monotonic_clock::max_time}) {
+    ASSERT_TRUE(parts_sorter.Front() != nullptr);
+    output.emplace_back(std::move(*parts_sorter.Front()));
+    parts_sorter.PopFront();
+    EXPECT_EQ(parts_sorter.sorted_until(), t);
+  }
+
+  ASSERT_TRUE(parts_sorter.Front() == nullptr);
+
+  EXPECT_EQ(output[0].timestamp, e - chrono::milliseconds(1000));
+  EXPECT_EQ(output[1].timestamp, e - chrono::milliseconds(500));
+  EXPECT_EQ(output[2].timestamp, e - chrono::milliseconds(10));
+  EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(1901));
+  EXPECT_EQ(output[4].timestamp, e + chrono::milliseconds(2000));
+}
+
 // Tests that messages too far out of order trigger death.
 TEST_F(LogPartsSorterDeathTest, Pull) {
   const aos::monotonic_clock::time_point e = monotonic_clock::epoch();