Add BootMerger to concatenate messages across boots

This wraps multiple NodeMergers in order to take a sorted list of
messages per boot and concatenate them across multiple ordered reboots.

This gives us a nice interface to move TimestampMapper over to to handle
reboots.

Change-Id: I9e6e565f39bf7a4a23e340d28eea686f1d53631f
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index e01b4c6..f4d4501 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -527,6 +527,8 @@
 }
 
 bool Message::operator<(const Message &m2) const {
+  CHECK_EQ(this->boot_count, m2.boot_count);
+
   if (this->timestamp < m2.timestamp) {
     return true;
   } else if (this->timestamp > m2.timestamp) {
@@ -544,13 +546,16 @@
 
 bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
 bool Message::operator==(const Message &m2) const {
+  CHECK_EQ(this->boot_count, m2.boot_count);
+
   return timestamp == m2.timestamp && channel_index == m2.channel_index &&
          queue_index == m2.queue_index;
 }
 
 std::ostream &operator<<(std::ostream &os, const Message &m) {
   os << "{.channel_index=" << m.channel_index
-     << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
+     << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp
+     << ", .boot_count=" << m.boot_count;
   if (m.data.Verify()) {
     os << ", .data="
        << aos::FlatbufferToJson(m.data,
@@ -608,12 +613,13 @@
         break;
       }
 
-      messages_.insert(
-          {.channel_index = m.value().message().channel_index(),
-           .queue_index = m.value().message().queue_index(),
-           .timestamp = monotonic_clock::time_point(std::chrono::nanoseconds(
-               m.value().message().monotonic_sent_time())),
-           .data = std::move(m.value())});
+      messages_.insert(Message{
+          .channel_index = m.value().message().channel_index(),
+          .queue_index = m.value().message().queue_index(),
+          .timestamp = monotonic_clock::time_point(std::chrono::nanoseconds(
+              m.value().message().monotonic_sent_time())),
+          .boot_count = parts().boot_count,
+          .data = std::move(m.value())});
 
       // Now, update sorted_until_ to match the new message.
       if (parts_message_reader_.newest_timestamp() >
@@ -756,6 +762,49 @@
   current_ = nullptr;
 }
 
+BootMerger::BootMerger(std::vector<LogParts> files) {
+  std::vector<std::vector<LogParts>> boots;
+
+  // Now, we need to split things out by boot.
+  for (size_t i = 0; i < files.size(); ++i) {
+    LOG(INFO) << "Trying file " << i;
+    const size_t boot_count = files[i].boot_count;
+    LOG(INFO) << "Boot count " << boot_count;
+    if (boot_count + 1 > boots.size()) {
+      boots.resize(boot_count + 1);
+    }
+    boots[boot_count].emplace_back(std::move(files[i]));
+  }
+
+  node_mergers_.reserve(boots.size());
+  for (size_t i = 0; i < boots.size(); ++i) {
+    LOG(INFO) << "Boot " << i;
+    for (auto &p : boots[i]) {
+      LOG(INFO) << "Part " << p;
+    }
+    node_mergers_.emplace_back(
+        std::make_unique<NodeMerger>(std::move(boots[i])));
+  }
+}
+
+Message *BootMerger::Front() {
+  Message *result = node_mergers_[index_]->Front();
+
+  if (result != nullptr) {
+    return result;
+  }
+
+  if (index_ + 1u == node_mergers_.size()) {
+    // At the end of the last node merger, just return.
+    return nullptr;
+  } else {
+    ++index_;
+    return Front();
+  }
+}
+
+void BootMerger::PopFront() { node_mergers_[index_]->PopFront(); }
+
 TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
     : node_merger_(std::move(parts)),
       timestamp_callback_([](TimestampedMessage *) {}) {
@@ -989,6 +1038,7 @@
         .channel_index = message.channel_index,
         .queue_index = remote_queue_index,
         .timestamp = monotonic_remote_time,
+        .boot_count = 0,
         .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
   }
 
@@ -1003,6 +1053,7 @@
         .channel_index = message.channel_index,
         .queue_index = remote_queue_index,
         .timestamp = monotonic_remote_time,
+        .boot_count = 0,
         .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
   }
 
@@ -1012,6 +1063,7 @@
         .channel_index = message.channel_index,
         .queue_index = remote_queue_index,
         .timestamp = monotonic_remote_time,
+        .boot_count = 0,
         .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
   }
 
@@ -1046,6 +1098,7 @@
           .channel_index = message.channel_index,
           .queue_index = remote_queue_index,
           .timestamp = monotonic_remote_time,
+          .boot_count = 0,
           .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
     }
 
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index b2a7f5d..2632302 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -350,6 +350,9 @@
   uint32_t queue_index = 0xffffffff;
   // The local timestamp on the monotonic clock.
   monotonic_clock::time_point timestamp = monotonic_clock::min_time;
+  // The current boot count added on by SortParts.
+  size_t boot_count = 0;
+
   // The data (either a timestamp header, or a data header).
   SizePrefixedFlatbufferVector<MessageHeader> data;
 
@@ -431,6 +434,13 @@
  public:
   NodeMerger(std::vector<LogParts> parts);
 
+  // Copying and moving will mess up the internal raw pointers.  Just don't do
+  // it.
+  NodeMerger(NodeMerger const &) = delete;
+  NodeMerger(NodeMerger &&) = delete;
+  void operator=(NodeMerger const &) = delete;
+  void operator=(NodeMerger &&) = delete;
+
   // Node index in the configuration of this node.
   int node() const { return node_; }
 
@@ -478,6 +488,56 @@
   monotonic_clock::time_point monotonic_start_time_ = monotonic_clock::max_time;
 };
 
+// Class to concatenate multiple boots worth of logs into a single per-node
+// stream.
+class BootMerger {
+ public:
+  BootMerger(std::vector<LogParts> file);
+
+  // Copying and moving will mess up the internal raw pointers.  Just don't do
+  // it.
+  BootMerger(BootMerger const &) = delete;
+  BootMerger(BootMerger &&) = delete;
+  void operator=(BootMerger const &) = delete;
+  void operator=(BootMerger &&) = delete;
+
+  // Node index in the configuration of this node.
+  int node() const { return node_mergers_[0]->node(); }
+
+  // List of parts being sorted together.
+  std::vector<const LogParts *> Parts() const;
+
+  const Configuration *configuration() const {
+    return node_mergers_[0]->configuration();
+  }
+
+  monotonic_clock::time_point monotonic_start_time() const {
+    return node_mergers_[index_]->monotonic_start_time();
+  }
+  realtime_clock::time_point realtime_start_time() const {
+    return node_mergers_[index_]->realtime_start_time();
+  }
+
+  bool started() const {
+    return node_mergers_[index_]->sorted_until() != monotonic_clock::min_time ||
+           index_ != 0;
+  }
+
+  // Returns the next sorted message from the set of log files.  It is safe to
+  // call std::move() on the result to move the data flatbuffer from it.
+  Message *Front();
+  // Pops the front message.  This should only be called after a call to
+  // Front().
+  void PopFront();
+
+ private:
+  int index_ = 0;
+
+  // TODO(austin): Sanjay points out this is pretty inefficient.  Don't keep so
+  // many things open.
+  std::vector<std::unique_ptr<NodeMerger>> node_mergers_;
+};
+
 // Class to match timestamps with the corresponding data from other nodes.
 //
 // This class also buffers data for the node it represents, and supports
@@ -498,8 +558,6 @@
   // timestamps out of this queue.  This lets us bootstrap time estimation
   // without exploding memory usage worst case.
 
-  std::vector<const LogParts *> Parts() const { return node_merger_.Parts(); }
-
   const Configuration *configuration() const { return configuration_.get(); }
 
   // Returns which node this is sorting for.
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index dc491c1..d5cb41b 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -224,10 +224,12 @@
   Message m1{.channel_index = 0,
              .queue_index = 0,
              .timestamp = e + chrono::milliseconds(1),
+             .boot_count = 0,
              .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
   Message m2{.channel_index = 0,
              .queue_index = 0,
              .timestamp = e + chrono::milliseconds(2),
+             .boot_count = 0,
              .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
 
   EXPECT_LT(m1, m2);
@@ -1667,13 +1669,11 @@
   }
 }
 
-// This tests that we can properly sort a multi-node log file which has the old
-// (and buggy) timestamps in the header, and the non-resetting parts_index.
-// These make it so we can just bairly figure out what happened first and what
-// happened second, but not in a way that is robust to multiple nodes rebooting.
-TEST_F(SortingElementTest, OldReboot) {
-  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0 =
-      MakeHeader(config_, R"({
+class BootMergerTest : public SortingElementTest {
+ public:
+  BootMergerTest()
+      : SortingElementTest(),
+        boot0_(MakeHeader(config_, R"({
   /* 100ms */
   "max_out_of_order_duration": 100000000,
   "node": {
@@ -1692,9 +1692,8 @@
   "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
   "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
   "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464"
-})");
-  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1 =
-      MakeHeader(config_, R"({
+})")),
+        boot1_(MakeHeader(config_, R"({
   /* 100ms */
   "max_out_of_order_duration": 100000000,
   "node": {
@@ -1713,15 +1712,25 @@
   "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
   "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
   "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2"
-})");
+})")) {}
 
+ protected:
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
+};
+
+// This tests that we can properly sort a multi-node log file which has the old
+// (and buggy) timestamps in the header, and the non-resetting parts_index.
+// These make it so we can just bairly figure out what happened first and what
+// happened second, but not in a way that is robust to multiple nodes rebooting.
+TEST_F(BootMergerTest, OldReboot) {
   {
     DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
-    writer.QueueSpan(boot0.span());
+    writer.QueueSpan(boot0_.span());
   }
   {
     DetachedBufferWriter writer(logfile1_, std::make_unique<DummyEncoder>());
-    writer.QueueSpan(boot1.span());
+    writer.QueueSpan(boot1_.span());
   }
 
   const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
@@ -1731,11 +1740,58 @@
 
   EXPECT_EQ(parts[0].parts[0].boot_count, 0);
   EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
-            boot0.message().source_node_boot_uuid()->string_view());
+            boot0_.message().source_node_boot_uuid()->string_view());
 
   EXPECT_EQ(parts[0].parts[1].boot_count, 1);
   EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
-            boot1.message().source_node_boot_uuid()->string_view());
+            boot1_.message().source_node_boot_uuid()->string_view());
+}
+
+// This tests that we can produce messages ordered across a reboot.
+TEST_F(BootMergerTest, SortAcrossReboot) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
+    writer.QueueSpan(boot0_.span());
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
+  }
+  {
+    DetachedBufferWriter writer(logfile1_, std::make_unique<DummyEncoder>());
+    writer.QueueSpan(boot1_.span());
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+  ASSERT_EQ(parts.size(), 1u);
+  ASSERT_EQ(parts[0].parts.size(), 2u);
+
+  BootMerger merger(FilterPartsForNode(parts, "pi2"));
+
+  EXPECT_EQ(merger.node(), 1u);
+
+  std::vector<Message> output;
+  for (int i = 0; i < 4; ++i) {
+    ASSERT_TRUE(merger.Front() != nullptr);
+    output.emplace_back(std::move(*merger.Front()));
+    merger.PopFront();
+  }
+
+  ASSERT_TRUE(merger.Front() == nullptr);
+
+  EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
+  EXPECT_EQ(output[0].boot_count, 0u);
+  EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(2000));
+  EXPECT_EQ(output[1].boot_count, 0u);
+  EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(100));
+  EXPECT_EQ(output[2].boot_count, 1u);
+  EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(200));
+  EXPECT_EQ(output[3].boot_count, 1u);
 }
 
 }  // namespace testing