Add LogPartsSorter to sort messages from a log file

Based on https://docs.google.com/document/d/1RZ6ZlADRUHmwiFOOmXA87FHPLFuN-7mS7tbFCwguZDE/edit#
we need to start by sorting all the messages from a set of parts.  Make
a class and test this.  (The testing infrastructure looks a bit
over-kill, but will be re-used a lot for the following tests).

Change-Id: Ifa1ba880ddf7cf923f24826e504b902a4787ad03
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 3a39803..83d798d 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -442,8 +442,13 @@
       newest_timestamp_ = message_reader_.newest_timestamp();
       const monotonic_clock::time_point monotonic_sent_time(
           chrono::nanoseconds(message->message().monotonic_sent_time()));
+      // TODO(austin): Does this work with startup?  Might need to use the start
+      // time.
+      // TODO(austin): Does this work with startup when we don't know the remote
+      // start time too?  Look at one of those logs to compare.
       CHECK_GE(monotonic_sent_time,
-               newest_timestamp_ - max_out_of_order_duration());
+               newest_timestamp_ - max_out_of_order_duration())
+          << ": Max out of order exceeded.";
       return message;
     }
     NextLog();
@@ -489,6 +494,67 @@
   return os;
 }
 
+LogPartsSorter::LogPartsSorter(LogParts log_parts)
+    : parts_message_reader_(log_parts) {}
+
+Message *LogPartsSorter::Front() {
+  // Queue up data until enough data has been queued that the front message is
+  // sorted enough to be safe to pop.  This may do nothing, so we should make
+  // sure the nothing path is checked quickly.
+  if (sorted_until() != monotonic_clock::max_time) {
+    while (true) {
+      if (!messages_.empty() && messages_.begin()->timestamp < sorted_until()) {
+        break;
+      }
+
+      std::optional<SizePrefixedFlatbufferVector<MessageHeader>> m =
+          parts_message_reader_.ReadMessage();
+      // No data left, sorted forever, work through what is left.
+      if (!m) {
+        sorted_until_ = monotonic_clock::max_time;
+        break;
+      }
+
+      messages_.insert(
+          {.channel_index = m.value().message().channel_index(),
+           .queue_index = m.value().message().queue_index(),
+           .timestamp = monotonic_clock::time_point(std::chrono::nanoseconds(
+               m.value().message().monotonic_sent_time())),
+           .data = std::move(m.value())});
+
+      // Now, update sorted_until_ to match the new message.
+      if (parts_message_reader_.newest_timestamp() >
+          monotonic_clock::min_time +
+              parts_message_reader_.max_out_of_order_duration()) {
+        sorted_until_ = parts_message_reader_.newest_timestamp() -
+                        parts_message_reader_.max_out_of_order_duration();
+      } else {
+        sorted_until_ = monotonic_clock::min_time;
+      }
+    }
+  }
+
+  // Now that we have enough data queued, return a pointer to the oldest piece
+  // of data if it exists.
+  if (messages_.empty()) {
+    return nullptr;
+  }
+
+  return &(*messages_.begin());
+}
+
+void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
+
+std::string LogPartsSorter::DebugString() const {
+  std::stringstream ss;
+  ss << "messages: [\n";
+  for (const Message &m : messages_) {
+    ss << m << "\n";
+  }
+  ss << "] <- " << parts_message_reader_.filename();
+  return ss.str();
+}
+
 SplitMessageReader::SplitMessageReader(
     const std::vector<std::string> &filenames)
     : filenames_(filenames),
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 25f02f9..13c83cf 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -14,6 +14,7 @@
 #include <utility>
 #include <vector>
 
+#include "absl/container/btree_set.h"
 #include "absl/types/span.h"
 #include "aos/containers/resizeable_buffer.h"
 #include "aos/events/event_loop.h"
@@ -280,6 +281,10 @@
 
   std::string_view filename() const { return message_reader_.filename(); }
 
+  const LogFileHeader *log_file_header() const {
+    return message_reader_.log_file_header();
+  }
+
   // Returns the minimum amount of data needed to queue up for sorting before
   // we are guarenteed to not see data out of order.
   std::chrono::nanoseconds max_out_of_order_duration() const {
@@ -326,6 +331,44 @@
 
 std::ostream &operator<<(std::ostream &os, const Message &m);
 
+// Class to sort the resulting messages from a PartsMessageReader.
+class LogPartsSorter {
+ public:
+  LogPartsSorter(LogParts log_parts);
+
+  // Returns the current log file header.
+  // TODO(austin): Is this the header we want to report?  Do we want a better
+  // start time?
+  // TODO(austin): Report a start time from the LogParts.  Figure out how that
+  // all works.
+  const LogFileHeader *log_file_header() const {
+    return parts_message_reader_.log_file_header();
+  }
+
+  // The time this data is sorted until.
+  monotonic_clock::time_point sorted_until() const { return sorted_until_; }
+
+  // Returns the next sorted message from the log file.  It is safe to call
+  // std::move() on the result to move the data flatbuffer from it.
+  Message *Front();
+  // Pops the front message.  This should only be called after a call to
+  // Front().
+  void PopFront();
+
+  // Returns a debug string representing the contents of this sorter.
+  std::string DebugString() const;
+
+ private:
+  // Log parts reader we are wrapping.
+  PartsMessageReader parts_message_reader_;
+  // Cache of the time we are sorted until.
+  aos::monotonic_clock::time_point sorted_until_ = monotonic_clock::min_time;
+
+  // Set used for efficient sorting of messages.  We can benchmark and evaluate
+  // other data structures if this proves to be the bottleneck.
+  absl::btree_set<Message> messages_;
+};
+
 class TimestampMerger;
 
 // A design requirement is that the relevant data for a channel is not more than
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index dafb452..d1025fa 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -5,6 +5,7 @@
 
 #include "aos/events/logging/logfile_sorting.h"
 #include "aos/events/logging/test_message_generated.h"
+#include "aos/flatbuffer_merge.h"
 #include "aos/flatbuffers.h"
 #include "aos/json_to_flatbuffer.h"
 #include "aos/testing/tmpdir.h"
@@ -31,11 +32,9 @@
   unlink(logfile.c_str());
 
   const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
-      JsonToSizedFlatbuffer<TestMessage>(
-          R"({ "value": 1 })");
+      JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
   const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
-      JsonToSizedFlatbuffer<TestMessage>(
-          R"({ "value": 2 })");
+      JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
 
   {
     DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
@@ -249,6 +248,265 @@
   EXPECT_GE(m2, m1);
 }
 
+aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
+    const aos::FlatbufferDetachedBuffer<Configuration> &config,
+    const std::string_view json) {
+  flatbuffers::FlatBufferBuilder fbb;
+  flatbuffers::Offset<Configuration> config_offset =
+      aos::CopyFlatBuffer(config, &fbb);
+  LogFileHeader::Builder header_builder(fbb);
+  header_builder.add_configuration(config_offset);
+  fbb.Finish(header_builder.Finish());
+  aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
+
+  aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
+      JsonToFlatbuffer<LogFileHeader>(json));
+  CHECK(header_updates.Verify());
+  flatbuffers::FlatBufferBuilder fbb2;
+  fbb2.FinishSizePrefixed(
+      aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
+  return fbb2.Release();
+}
+
+class SortingElementTest : public ::testing::Test {
+ public:
+  SortingElementTest()
+      : config_(JsonToFlatbuffer<Configuration>(
+            R"({
+  "channels": [
+    {
+      "name": "/a",
+      "type": "aos.logger.testing.TestMessage",
+      "source_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi2"
+        },
+        {
+          "name": "pi3"
+        }
+      ]
+    },
+    {
+      "name": "/b",
+      "type": "aos.logger.testing.TestMessage",
+      "source_node": "pi1"
+    },
+    {
+      "name": "/c",
+      "type": "aos.logger.testing.TestMessage",
+      "source_node": "pi1"
+    }
+  ],
+  "nodes": [
+    {
+      "name": "pi1"
+    },
+    {
+      "name": "pi2"
+    },
+    {
+      "name": "pi3"
+    }
+  ]
+}
+)")),
+        config0_(MakeHeader(config_, R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi1"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 0,
+  "realtime_start_time": 1000000000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
+  "parts_index": 0
+})")),
+        config1_(MakeHeader(config_,
+                            R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi1"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 0,
+  "realtime_start_time": 1000000000000,
+  "log_event_uuid": "d4724d35-a6c6-4a30-8a94-d192f4c18260",
+  "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
+  "parts_index": 0
+})")) {
+    unlink(logfile0_.c_str());
+    unlink(logfile1_.c_str());
+    queue_index_.resize(kChannels);
+  }
+
+ protected:
+  static constexpr size_t kChannels = 3u;
+
+  flatbuffers::DetachedBuffer MakeLogMessage(
+      const aos::monotonic_clock::time_point monotonic_now, int channel_index,
+      int value) {
+    flatbuffers::FlatBufferBuilder message_fbb;
+    message_fbb.ForceDefaults(true);
+    TestMessage::Builder test_message_builder(message_fbb);
+    test_message_builder.add_value(value);
+    message_fbb.Finish(test_message_builder.Finish());
+
+    aos::Context context;
+    context.monotonic_event_time = monotonic_now;
+    context.realtime_event_time = aos::realtime_clock::epoch() +
+                                  chrono::seconds(1000) +
+                                  monotonic_now.time_since_epoch();
+    context.queue_index = queue_index_[channel_index];
+    context.size = message_fbb.GetSize();
+    context.data = message_fbb.GetBufferPointer();
+
+    ++queue_index_[channel_index];
+
+    flatbuffers::FlatBufferBuilder fbb;
+    fbb.FinishSizePrefixed(
+        PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
+
+    return fbb.Release();
+  }
+
+  flatbuffers::DetachedBuffer MakeTimestampMessage(
+      const aos::monotonic_clock::time_point sender_monotonic_now,
+      int channel_index, chrono::nanoseconds receiver_monotonic_offset) {
+    aos::Context context;
+    context.monotonic_remote_time = sender_monotonic_now;
+    context.realtime_remote_time = aos::realtime_clock::epoch() +
+                                   chrono::seconds(1000) +
+                                   sender_monotonic_now.time_since_epoch();
+    context.remote_queue_index = queue_index_[channel_index] - 1;
+    context.monotonic_event_time =
+        sender_monotonic_now + receiver_monotonic_offset;
+    context.realtime_event_time =
+        aos::realtime_clock::epoch() + chrono::seconds(1000) +
+        context.monotonic_event_time.time_since_epoch();
+    context.queue_index = queue_index_[channel_index] - 1 + 100;
+    context.size = 0;
+    context.data = nullptr;
+
+    flatbuffers::FlatBufferBuilder fbb;
+    fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index,
+                                       LogType::kLogDeliveryTimeOnly));
+    LOG(INFO) << aos::FlatbufferToJson(
+        aos::SizePrefixedFlatbufferSpan<MessageHeader>(
+            absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
+
+    return fbb.Release();
+  }
+
+  const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
+  const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
+
+  const aos::FlatbufferDetachedBuffer<Configuration> config_;
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
+
+  std::vector<uint32_t> queue_index_;
+};
+
+using LogPartsSorterTest = SortingElementTest;
+using LogPartsSorterDeathTest = LogPartsSorterTest;
+
+// Tests that we can pull messages out of a log sorted in order.
+TEST_F(LogPartsSorterTest, Pull) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
+    writer.QueueSpan(config0_.span());
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_});
+
+  LogPartsSorter parts_sorter(parts[0].parts[0]);
+
+  // Confirm we aren't sorted until any time until the message is popped.
+  // Peeking shouldn't change the sorted until time.
+  EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
+
+  std::deque<Message> output;
+
+  ASSERT_TRUE(parts_sorter.Front() != nullptr);
+  output.emplace_back(std::move(*parts_sorter.Front()));
+  parts_sorter.PopFront();
+  EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
+
+  ASSERT_TRUE(parts_sorter.Front() != nullptr);
+  output.emplace_back(std::move(*parts_sorter.Front()));
+  parts_sorter.PopFront();
+  EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
+
+  ASSERT_TRUE(parts_sorter.Front() != nullptr);
+  output.emplace_back(std::move(*parts_sorter.Front()));
+  parts_sorter.PopFront();
+  EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
+
+  ASSERT_TRUE(parts_sorter.Front() != nullptr);
+  output.emplace_back(std::move(*parts_sorter.Front()));
+  parts_sorter.PopFront();
+  EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
+
+  ASSERT_TRUE(parts_sorter.Front() == nullptr);
+
+  EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
+  EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(1000));
+  EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(1901));
+  EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(2000));
+}
+
+// Tests that messages too far out of order trigger death.
+TEST_F(LogPartsSorterDeathTest, Pull) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
+    writer.QueueSpan(config0_.span());
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
+    // The following message is too far out of order and will trigger the CHECK.
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_});
+
+  LogPartsSorter parts_sorter(parts[0].parts[0]);
+
+  // Confirm we aren't sorted until any time until the message is popped.
+  // Peeking shouldn't change the sorted until time.
+  EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
+  std::deque<Message> output;
+
+  ASSERT_TRUE(parts_sorter.Front() != nullptr);
+  parts_sorter.PopFront();
+  ASSERT_TRUE(parts_sorter.Front() != nullptr);
+  ASSERT_TRUE(parts_sorter.Front() != nullptr);
+  parts_sorter.PopFront();
+
+  EXPECT_DEATH({ parts_sorter.Front(); }, "Max out of order exceeded.");
+}
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/flatbuffer_merge.h b/aos/flatbuffer_merge.h
index d98a9eb..0c09b22 100644
--- a/aos/flatbuffer_merge.h
+++ b/aos/flatbuffer_merge.h
@@ -50,6 +50,15 @@
       MergeFlatBuffers<T>(&fb1.message(), &fb2.message()));
 }
 
+template <class T>
+inline flatbuffers::Offset<T> MergeFlatBuffers(
+    const aos::Flatbuffer<T> &fb1, const aos::Flatbuffer<T> &fb2,
+    flatbuffers::FlatBufferBuilder *fbb) {
+  return MergeFlatBuffers<T>(
+      reinterpret_cast<const flatbuffers::Table *>(&fb1.message()),
+      reinterpret_cast<const flatbuffers::Table *>(&fb2.message()), fbb);
+}
+
 // Copies a flatbuffer by walking the tree and copying all the pieces.  This
 // converts DAGs to trees.
 template <class T>
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index 81a8cee..adb9769 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -477,6 +477,37 @@
   ResizeableBuffer data_;
 };
 
+// Non-owning Span backed flatbuffer.
+template <typename T>
+class SizePrefixedFlatbufferSpan : public SizePrefixedFlatbuffer<T> {
+ public:
+  // Builds a flatbuffer pointing to the contents of a span.
+  SizePrefixedFlatbufferSpan(const absl::Span<const uint8_t> data)
+      : data_(data) {}
+  // Builds a Flatbuffer pointing to the contents of another flatbuffer.
+  SizePrefixedFlatbufferSpan(const SizePrefixedFlatbuffer<T> &other) {
+    data_ = other.span();
+  }
+
+  // Points to the data in the other flatbuffer.
+  SizePrefixedFlatbufferSpan &operator=(
+      const SizePrefixedFlatbuffer<T> &other) {
+    data_ = other.span();
+    return *this;
+  }
+
+  ~SizePrefixedFlatbufferSpan() override {}
+
+  absl::Span<uint8_t> span() override {
+    LOG(FATAL) << "Unimplemented";
+    return absl::Span<uint8_t>(nullptr, 0);
+  }
+  absl::Span<const uint8_t> span() const override { return data_; }
+
+ private:
+  absl::Span<const uint8_t> data_;
+};
+
 inline flatbuffers::DetachedBuffer CopySpanAsDetachedBuffer(
     absl::Span<const uint8_t> span) {
   // Copy the data from the span.