Add LogPartsSorter to sort messages from a log file

Based on https://docs.google.com/document/d/1RZ6ZlADRUHmwiFOOmXA87FHPLFuN-7mS7tbFCwguZDE/edit#
we need to start by sorting all the messages from a set of parts.  Make
a class and test this.  (The testing infrastructure looks a bit
over-kill, but will be re-used a lot for the following tests).

Change-Id: Ifa1ba880ddf7cf923f24826e504b902a4787ad03
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index dafb452..d1025fa 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -5,6 +5,7 @@
 
 #include "aos/events/logging/logfile_sorting.h"
 #include "aos/events/logging/test_message_generated.h"
+#include "aos/flatbuffer_merge.h"
 #include "aos/flatbuffers.h"
 #include "aos/json_to_flatbuffer.h"
 #include "aos/testing/tmpdir.h"
@@ -31,11 +32,9 @@
   unlink(logfile.c_str());
 
   const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
-      JsonToSizedFlatbuffer<TestMessage>(
-          R"({ "value": 1 })");
+      JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
   const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
-      JsonToSizedFlatbuffer<TestMessage>(
-          R"({ "value": 2 })");
+      JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
 
   {
     DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
@@ -249,6 +248,265 @@
   EXPECT_GE(m2, m1);
 }
 
+aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
+    const aos::FlatbufferDetachedBuffer<Configuration> &config,
+    const std::string_view json) {
+  flatbuffers::FlatBufferBuilder fbb;
+  flatbuffers::Offset<Configuration> config_offset =
+      aos::CopyFlatBuffer(config, &fbb);
+  LogFileHeader::Builder header_builder(fbb);
+  header_builder.add_configuration(config_offset);
+  fbb.Finish(header_builder.Finish());
+  aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
+
+  aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
+      JsonToFlatbuffer<LogFileHeader>(json));
+  CHECK(header_updates.Verify());
+  flatbuffers::FlatBufferBuilder fbb2;
+  fbb2.FinishSizePrefixed(
+      aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
+  return fbb2.Release();
+}
+
+class SortingElementTest : public ::testing::Test {
+ public:
+  SortingElementTest()
+      : config_(JsonToFlatbuffer<Configuration>(
+            R"({
+  "channels": [
+    {
+      "name": "/a",
+      "type": "aos.logger.testing.TestMessage",
+      "source_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi2"
+        },
+        {
+          "name": "pi3"
+        }
+      ]
+    },
+    {
+      "name": "/b",
+      "type": "aos.logger.testing.TestMessage",
+      "source_node": "pi1"
+    },
+    {
+      "name": "/c",
+      "type": "aos.logger.testing.TestMessage",
+      "source_node": "pi1"
+    }
+  ],
+  "nodes": [
+    {
+      "name": "pi1"
+    },
+    {
+      "name": "pi2"
+    },
+    {
+      "name": "pi3"
+    }
+  ]
+}
+)")),
+        config0_(MakeHeader(config_, R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi1"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 0,
+  "realtime_start_time": 1000000000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
+  "parts_index": 0
+})")),
+        config1_(MakeHeader(config_,
+                            R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi1"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 0,
+  "realtime_start_time": 1000000000000,
+  "log_event_uuid": "d4724d35-a6c6-4a30-8a94-d192f4c18260",
+  "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
+  "parts_index": 0
+})")) {
+    unlink(logfile0_.c_str());
+    unlink(logfile1_.c_str());
+    queue_index_.resize(kChannels);
+  }
+
+ protected:
+  static constexpr size_t kChannels = 3u;
+
+  flatbuffers::DetachedBuffer MakeLogMessage(
+      const aos::monotonic_clock::time_point monotonic_now, int channel_index,
+      int value) {
+    flatbuffers::FlatBufferBuilder message_fbb;
+    message_fbb.ForceDefaults(true);
+    TestMessage::Builder test_message_builder(message_fbb);
+    test_message_builder.add_value(value);
+    message_fbb.Finish(test_message_builder.Finish());
+
+    aos::Context context;
+    context.monotonic_event_time = monotonic_now;
+    context.realtime_event_time = aos::realtime_clock::epoch() +
+                                  chrono::seconds(1000) +
+                                  monotonic_now.time_since_epoch();
+    context.queue_index = queue_index_[channel_index];
+    context.size = message_fbb.GetSize();
+    context.data = message_fbb.GetBufferPointer();
+
+    ++queue_index_[channel_index];
+
+    flatbuffers::FlatBufferBuilder fbb;
+    fbb.FinishSizePrefixed(
+        PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
+
+    return fbb.Release();
+  }
+
+  flatbuffers::DetachedBuffer MakeTimestampMessage(
+      const aos::monotonic_clock::time_point sender_monotonic_now,
+      int channel_index, chrono::nanoseconds receiver_monotonic_offset) {
+    aos::Context context;
+    context.monotonic_remote_time = sender_monotonic_now;
+    context.realtime_remote_time = aos::realtime_clock::epoch() +
+                                   chrono::seconds(1000) +
+                                   sender_monotonic_now.time_since_epoch();
+    context.remote_queue_index = queue_index_[channel_index] - 1;
+    context.monotonic_event_time =
+        sender_monotonic_now + receiver_monotonic_offset;
+    context.realtime_event_time =
+        aos::realtime_clock::epoch() + chrono::seconds(1000) +
+        context.monotonic_event_time.time_since_epoch();
+    context.queue_index = queue_index_[channel_index] - 1 + 100;
+    context.size = 0;
+    context.data = nullptr;
+
+    flatbuffers::FlatBufferBuilder fbb;
+    fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index,
+                                       LogType::kLogDeliveryTimeOnly));
+    LOG(INFO) << aos::FlatbufferToJson(
+        aos::SizePrefixedFlatbufferSpan<MessageHeader>(
+            absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
+
+    return fbb.Release();
+  }
+
+  const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
+  const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
+
+  const aos::FlatbufferDetachedBuffer<Configuration> config_;
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
+
+  std::vector<uint32_t> queue_index_;
+};
+
+using LogPartsSorterTest = SortingElementTest;
+using LogPartsSorterDeathTest = LogPartsSorterTest;
+
+// Tests that we can pull messages out of a log sorted in order.
+TEST_F(LogPartsSorterTest, Pull) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
+    writer.QueueSpan(config0_.span());
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_});
+
+  LogPartsSorter parts_sorter(parts[0].parts[0]);
+
+  // Confirm we aren't sorted until any time until the message is popped.
+  // Peeking shouldn't change the sorted until time.
+  EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
+
+  std::deque<Message> output;
+
+  ASSERT_TRUE(parts_sorter.Front() != nullptr);
+  output.emplace_back(std::move(*parts_sorter.Front()));
+  parts_sorter.PopFront();
+  EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
+
+  ASSERT_TRUE(parts_sorter.Front() != nullptr);
+  output.emplace_back(std::move(*parts_sorter.Front()));
+  parts_sorter.PopFront();
+  EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
+
+  ASSERT_TRUE(parts_sorter.Front() != nullptr);
+  output.emplace_back(std::move(*parts_sorter.Front()));
+  parts_sorter.PopFront();
+  EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
+
+  ASSERT_TRUE(parts_sorter.Front() != nullptr);
+  output.emplace_back(std::move(*parts_sorter.Front()));
+  parts_sorter.PopFront();
+  EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
+
+  ASSERT_TRUE(parts_sorter.Front() == nullptr);
+
+  EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
+  EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(1000));
+  EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(1901));
+  EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(2000));
+}
+
+// Tests that messages too far out of order trigger death.
+TEST_F(LogPartsSorterDeathTest, Pull) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
+    writer.QueueSpan(config0_.span());
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
+    // The following message is too far out of order and will trigger the CHECK.
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_});
+
+  LogPartsSorter parts_sorter(parts[0].parts[0]);
+
+  // Confirm we aren't sorted until any time until the message is popped.
+  // Peeking shouldn't change the sorted until time.
+  EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
+  std::deque<Message> output;
+
+  ASSERT_TRUE(parts_sorter.Front() != nullptr);
+  parts_sorter.PopFront();
+  ASSERT_TRUE(parts_sorter.Front() != nullptr);
+  ASSERT_TRUE(parts_sorter.Front() != nullptr);
+  parts_sorter.PopFront();
+
+  EXPECT_DEATH({ parts_sorter.Front(); }, "Max out of order exceeded.");
+}
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos