Add LogPartsSorter to sort messages from a log file

Based on https://docs.google.com/document/d/1RZ6ZlADRUHmwiFOOmXA87FHPLFuN-7mS7tbFCwguZDE/edit#
we need to start by sorting all the messages from a set of parts.  Make
a class and test this.  (The testing infrastructure looks a bit
over-kill, but will be re-used a lot for the following tests).

Change-Id: Ifa1ba880ddf7cf923f24826e504b902a4787ad03
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 3a39803..83d798d 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -442,8 +442,13 @@
       newest_timestamp_ = message_reader_.newest_timestamp();
       const monotonic_clock::time_point monotonic_sent_time(
           chrono::nanoseconds(message->message().monotonic_sent_time()));
+      // TODO(austin): Does this work with startup?  Might need to use the start
+      // time.
+      // TODO(austin): Does this work with startup when we don't know the remote
+      // start time too?  Look at one of those logs to compare.
       CHECK_GE(monotonic_sent_time,
-               newest_timestamp_ - max_out_of_order_duration());
+               newest_timestamp_ - max_out_of_order_duration())
+          << ": Max out of order exceeded.";
       return message;
     }
     NextLog();
@@ -489,6 +494,67 @@
   return os;
 }
 
+LogPartsSorter::LogPartsSorter(LogParts log_parts)
+    : parts_message_reader_(log_parts) {}
+
+Message *LogPartsSorter::Front() {
+  // Queue up data until enough data has been queued that the front message is
+  // sorted enough to be safe to pop.  This may do nothing, so we should make
+  // sure the nothing path is checked quickly.
+  if (sorted_until() != monotonic_clock::max_time) {
+    while (true) {
+      if (!messages_.empty() && messages_.begin()->timestamp < sorted_until()) {
+        break;
+      }
+
+      std::optional<SizePrefixedFlatbufferVector<MessageHeader>> m =
+          parts_message_reader_.ReadMessage();
+      // No data left, sorted forever, work through what is left.
+      if (!m) {
+        sorted_until_ = monotonic_clock::max_time;
+        break;
+      }
+
+      messages_.insert(
+          {.channel_index = m.value().message().channel_index(),
+           .queue_index = m.value().message().queue_index(),
+           .timestamp = monotonic_clock::time_point(std::chrono::nanoseconds(
+               m.value().message().monotonic_sent_time())),
+           .data = std::move(m.value())});
+
+      // Now, update sorted_until_ to match the new message.
+      if (parts_message_reader_.newest_timestamp() >
+          monotonic_clock::min_time +
+              parts_message_reader_.max_out_of_order_duration()) {
+        sorted_until_ = parts_message_reader_.newest_timestamp() -
+                        parts_message_reader_.max_out_of_order_duration();
+      } else {
+        sorted_until_ = monotonic_clock::min_time;
+      }
+    }
+  }
+
+  // Now that we have enough data queued, return a pointer to the oldest piece
+  // of data if it exists.
+  if (messages_.empty()) {
+    return nullptr;
+  }
+
+  return &(*messages_.begin());
+}
+
+void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
+
+std::string LogPartsSorter::DebugString() const {
+  std::stringstream ss;
+  ss << "messages: [\n";
+  for (const Message &m : messages_) {
+    ss << m << "\n";
+  }
+  ss << "] <- " << parts_message_reader_.filename();
+  return ss.str();
+}
+
 SplitMessageReader::SplitMessageReader(
     const std::vector<std::string> &filenames)
     : filenames_(filenames),