Add NodeMerger for merging sorted logs

Following
https://docs.google.com/document/d/1RZ6ZlADRUHmwiFOOmXA87FHPLFuN-7mS7tbFCwguZDE/edit#
further, the next step is to sort all the log files together for a
single node, deduplicating here.  This implements and tests that.

Change-Id: I506ae65b350d83e41ba011521c0860627ac662df
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 83d798d..4474c7e 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -483,6 +483,10 @@
 }
 
 bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
+bool Message::operator==(const Message &m2) const {
+  return timestamp == m2.timestamp && channel_index == m2.channel_index &&
+         queue_index == m2.queue_index;
+}
 
 std::ostream &operator<<(std::ostream &os, const Message &m) {
   os << "{.channel_index=" << m.channel_index
@@ -555,6 +559,49 @@
   return ss.str();
 }
 
+NodeMerger::NodeMerger(std::vector<std::unique_ptr<LogPartsSorter>> parts)
+    : parts_sorters_(std::move(parts)) {}
+
+Message *NodeMerger::Front() {
+  // Return the current Front if we have one, otherwise go compute one.
+  if (current_ != nullptr) {
+    return current_->Front();
+  }
+
+  // Otherwise, do a simple search for the oldest message, deduplicating any
+  // duplicates.
+  Message *oldest = nullptr;
+  sorted_until_ = monotonic_clock::max_time;
+  for (std::unique_ptr<LogPartsSorter> &parts_sorter : parts_sorters_) {
+    Message *m = parts_sorter->Front();
+    if (!m) {
+      sorted_until_ = std::min(sorted_until_, parts_sorter->sorted_until());
+      continue;
+    }
+    if (oldest == nullptr || *m < *oldest) {
+      oldest = m;
+      current_ = parts_sorter.get();
+    } else if (*m == *oldest) {
+      // Found a duplicate.  It doesn't matter which one we return.  It is
+      // easiest to just drop the new one.
+      parts_sorter->PopFront();
+    }
+
+    // PopFront may change this, so compute it down here.
+    sorted_until_ = std::min(sorted_until_, parts_sorter->sorted_until());
+  }
+
+  // Return the oldest message found.  This will be nullptr if nothing was
+  // found, indicating there is nothing left.
+  return oldest;
+}
+
+void NodeMerger::PopFront() {
+  CHECK(current_ != nullptr) << "Popping before calling Front()";
+  current_->PopFront();
+  current_ = nullptr;
+}
+
 SplitMessageReader::SplitMessageReader(
     const std::vector<std::string> &filenames)
     : filenames_(filenames),
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 13c83cf..d0afdc9 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -327,6 +327,7 @@
 
   bool operator<(const Message &m2) const;
   bool operator>=(const Message &m2) const;
+  bool operator==(const Message &m2) const;
 };
 
 std::ostream &operator<<(std::ostream &os, const Message &m);
@@ -369,6 +370,38 @@
   absl::btree_set<Message> messages_;
 };
 
+// Class to run merge sort on the messages from multiple LogPartsSorter
+// instances.
+class NodeMerger {
+ public:
+  NodeMerger(std::vector<std::unique_ptr<LogPartsSorter>> parts);
+
+  // The log file header for one of the log files.
+  const LogFileHeader *log_file_header() const {
+    CHECK(!parts_sorters_.empty());
+    return parts_sorters_[0]->log_file_header();
+  }
+
+  // The time this data is sorted until.
+  monotonic_clock::time_point sorted_until() const { return sorted_until_; }
+
+  // Returns the next sorted message from the set of log files.  It is safe to
+  // call std::move() on the result to move the data flatbuffer from it.
+  Message *Front();
+  // Pops the front message.  This should only be called after a call to
+  // Front().
+  void PopFront();
+
+ private:
+  // Unsorted list of all parts sorters.
+  std::vector<std::unique_ptr<LogPartsSorter>> parts_sorters_;
+  // Pointer to the parts sorter holding the current Front message if one
+  // exists, or nullptr if a new one needs to be found.
+  LogPartsSorter *current_ = nullptr;
+  // Cached sorted_until value.
+  aos::monotonic_clock::time_point sorted_until_ = monotonic_clock::min_time;
+};
+
 class TimestampMerger;
 
 // A design requirement is that the relevant data for a channel is not more than
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index d1025fa..94ae25b 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -417,6 +417,7 @@
 
 using LogPartsSorterTest = SortingElementTest;
 using LogPartsSorterDeathTest = LogPartsSorterTest;
+using NodeMergerTest = SortingElementTest;
 
 // Tests that we can pull messages out of a log sorted in order.
 TEST_F(LogPartsSorterTest, Pull) {
@@ -507,6 +508,93 @@
   EXPECT_DEATH({ parts_sorter.Front(); }, "Max out of order exceeded.");
 }
 
+// Tests that we can merge data from 2 separate files, including duplicate data.
+TEST_F(NodeMergerTest, TwoFileMerger) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+    writer0.QueueSpan(config0_.span());
+    DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+    writer1.QueueSpan(config1_.span());
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+    writer1.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+    writer1.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
+
+    // Make a duplicate!
+    SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
+        MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+    writer0.QueueSpan(msg.span());
+    writer1.QueueSpan(msg.span());
+
+    writer1.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+  ASSERT_EQ(parts.size(), 2u);
+
+  std::vector<std::unique_ptr<LogPartsSorter>> parts_sorters;
+  parts_sorters.emplace_back(
+      std::make_unique<LogPartsSorter>(parts[0].parts[0]));
+  parts_sorters.emplace_back(
+      std::make_unique<LogPartsSorter>(parts[1].parts[0]));
+
+  NodeMerger merger(std::move(parts_sorters));
+
+  EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
+
+  std::deque<Message> output;
+
+  EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
+  ASSERT_TRUE(merger.Front() != nullptr);
+  EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
+
+  output.emplace_back(std::move(*merger.Front()));
+  merger.PopFront();
+  EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
+
+  ASSERT_TRUE(merger.Front() != nullptr);
+  output.emplace_back(std::move(*merger.Front()));
+  merger.PopFront();
+  EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
+
+  ASSERT_TRUE(merger.Front() != nullptr);
+  output.emplace_back(std::move(*merger.Front()));
+  merger.PopFront();
+  EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
+
+  ASSERT_TRUE(merger.Front() != nullptr);
+  output.emplace_back(std::move(*merger.Front()));
+  merger.PopFront();
+  EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
+
+  ASSERT_TRUE(merger.Front() != nullptr);
+  output.emplace_back(std::move(*merger.Front()));
+  merger.PopFront();
+  EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
+
+  ASSERT_TRUE(merger.Front() != nullptr);
+  output.emplace_back(std::move(*merger.Front()));
+  merger.PopFront();
+  EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
+
+  ASSERT_TRUE(merger.Front() == nullptr);
+
+  EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
+  EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(1001));
+  EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(1002));
+  EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(2000));
+  EXPECT_EQ(output[4].timestamp, e + chrono::milliseconds(3000));
+  EXPECT_EQ(output[5].timestamp, e + chrono::milliseconds(3002));
+}
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos