Add NodeMerger for merging sorted logs
Following
https://docs.google.com/document/d/1RZ6ZlADRUHmwiFOOmXA87FHPLFuN-7mS7tbFCwguZDE/edit#
further, the next step is to sort all the log files together for a
single node, deduplicating here. This implements and tests that.
Change-Id: I506ae65b350d83e41ba011521c0860627ac662df
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index d1025fa..94ae25b 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -417,6 +417,7 @@
using LogPartsSorterTest = SortingElementTest;
using LogPartsSorterDeathTest = LogPartsSorterTest;
+using NodeMergerTest = SortingElementTest;
// Tests that we can pull messages out of a log sorted in order.
TEST_F(LogPartsSorterTest, Pull) {
@@ -507,6 +508,93 @@
EXPECT_DEATH({ parts_sorter.Front(); }, "Max out of order exceeded.");
}
+// Tests that we can merge data from 2 separate files, including duplicate data.
+TEST_F(NodeMergerTest, TwoFileMerger) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config1_.span());
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer1.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
+
+ // Make a duplicate!
+ SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+ writer0.QueueSpan(msg.span());
+ writer1.QueueSpan(msg.span());
+
+ writer1.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ ASSERT_EQ(parts.size(), 2u);
+
+ std::vector<std::unique_ptr<LogPartsSorter>> parts_sorters;
+ parts_sorters.emplace_back(
+ std::make_unique<LogPartsSorter>(parts[0].parts[0]));
+ parts_sorters.emplace_back(
+ std::make_unique<LogPartsSorter>(parts[1].parts[0]));
+
+ NodeMerger merger(std::move(parts_sorters));
+
+ EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
+
+ std::deque<Message> output;
+
+ EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
+ ASSERT_TRUE(merger.Front() != nullptr);
+ EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
+
+ output.emplace_back(std::move(*merger.Front()));
+ merger.PopFront();
+ EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(merger.Front() != nullptr);
+ output.emplace_back(std::move(*merger.Front()));
+ merger.PopFront();
+ EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
+
+ ASSERT_TRUE(merger.Front() != nullptr);
+ output.emplace_back(std::move(*merger.Front()));
+ merger.PopFront();
+ EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
+
+ ASSERT_TRUE(merger.Front() != nullptr);
+ output.emplace_back(std::move(*merger.Front()));
+ merger.PopFront();
+ EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
+
+ ASSERT_TRUE(merger.Front() != nullptr);
+ output.emplace_back(std::move(*merger.Front()));
+ merger.PopFront();
+ EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(merger.Front() != nullptr);
+ output.emplace_back(std::move(*merger.Front()));
+ merger.PopFront();
+ EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(merger.Front() == nullptr);
+
+ EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
+ EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(1001));
+ EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(1002));
+ EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(2000));
+ EXPECT_EQ(output[4].timestamp, e + chrono::milliseconds(3000));
+ EXPECT_EQ(output[5].timestamp, e + chrono::milliseconds(3002));
+}
+
} // namespace testing
} // namespace logger
} // namespace aos