Add BootMerger to concatenate messages across boots
This wraps multiple NodeMergers in order to take a sorted list of
messages per boot and concatenate them across multiple ordered reboots.
This gives us a nice interface to move TimestampMapper over to to handle
reboots.
Change-Id: I9e6e565f39bf7a4a23e340d28eea686f1d53631f
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index dc491c1..d5cb41b 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -224,10 +224,12 @@
Message m1{.channel_index = 0,
.queue_index = 0,
.timestamp = e + chrono::milliseconds(1),
+ .boot_count = 0,
.data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
Message m2{.channel_index = 0,
.queue_index = 0,
.timestamp = e + chrono::milliseconds(2),
+ .boot_count = 0,
.data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
EXPECT_LT(m1, m2);
@@ -1667,13 +1669,11 @@
}
}
-// This tests that we can properly sort a multi-node log file which has the old
-// (and buggy) timestamps in the header, and the non-resetting parts_index.
-// These make it so we can just bairly figure out what happened first and what
-// happened second, but not in a way that is robust to multiple nodes rebooting.
-TEST_F(SortingElementTest, OldReboot) {
- const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0 =
- MakeHeader(config_, R"({
+class BootMergerTest : public SortingElementTest {
+ public:
+ BootMergerTest()
+ : SortingElementTest(),
+ boot0_(MakeHeader(config_, R"({
/* 100ms */
"max_out_of_order_duration": 100000000,
"node": {
@@ -1692,9 +1692,8 @@
"logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
"logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
"source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464"
-})");
- const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1 =
- MakeHeader(config_, R"({
+})")),
+ boot1_(MakeHeader(config_, R"({
/* 100ms */
"max_out_of_order_duration": 100000000,
"node": {
@@ -1713,15 +1712,25 @@
"logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
"logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
"source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2"
-})");
+})")) {}
+ protected:
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
+};
+
+// This tests that we can properly sort a multi-node log file which has the old
+// (and buggy) timestamps in the header, and the non-resetting parts_index.
+// These make it so we can just bairly figure out what happened first and what
+// happened second, but not in a way that is robust to multiple nodes rebooting.
+TEST_F(BootMergerTest, OldReboot) {
{
DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
- writer.QueueSpan(boot0.span());
+ writer.QueueSpan(boot0_.span());
}
{
DetachedBufferWriter writer(logfile1_, std::make_unique<DummyEncoder>());
- writer.QueueSpan(boot1.span());
+ writer.QueueSpan(boot1_.span());
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
@@ -1731,11 +1740,58 @@
EXPECT_EQ(parts[0].parts[0].boot_count, 0);
EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
- boot0.message().source_node_boot_uuid()->string_view());
+ boot0_.message().source_node_boot_uuid()->string_view());
EXPECT_EQ(parts[0].parts[1].boot_count, 1);
EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
- boot1.message().source_node_boot_uuid()->string_view());
+ boot1_.message().source_node_boot_uuid()->string_view());
+}
+
+// This tests that we can produce messages ordered across a reboot.
+TEST_F(BootMergerTest, SortAcrossReboot) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
+ writer.QueueSpan(boot0_.span());
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
+ }
+ {
+ DetachedBufferWriter writer(logfile1_, std::make_unique<DummyEncoder>());
+ writer.QueueSpan(boot1_.span());
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ ASSERT_EQ(parts.size(), 1u);
+ ASSERT_EQ(parts[0].parts.size(), 2u);
+
+ BootMerger merger(FilterPartsForNode(parts, "pi2"));
+
+ EXPECT_EQ(merger.node(), 1u);
+
+ std::vector<Message> output;
+ for (int i = 0; i < 4; ++i) {
+ ASSERT_TRUE(merger.Front() != nullptr);
+ output.emplace_back(std::move(*merger.Front()));
+ merger.PopFront();
+ }
+
+ ASSERT_TRUE(merger.Front() == nullptr);
+
+ EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
+ EXPECT_EQ(output[0].boot_count, 0u);
+ EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(2000));
+ EXPECT_EQ(output[1].boot_count, 0u);
+ EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(100));
+ EXPECT_EQ(output[2].boot_count, 1u);
+ EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(200));
+ EXPECT_EQ(output[3].boot_count, 1u);
}
} // namespace testing