Add node_boots parsing from log file headers
Jim suggested that we could store the boot UUID in the header for all
the nodes rather than per message. That seems reasonable and lets us do
other things with it in the future. Use it when available to determine
which boot happens when.
When we only have 1 boot for a node, fall back to setting the boot count
to 0 for sorting. This lets us get started with the new boot counter.
A follow-up commit will start logging with this.
Change-Id: Ia9e0bd2860de82f70f0daa02782d06f1fd4f4218
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 4973d02..e99db94 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -228,11 +228,15 @@
.queue_index = 0,
.timestamp =
BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
+ .monotonic_remote_boot = 0xffffff,
+ .monotonic_timestamp_boot = 0xffffff,
.data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
Message m2{.channel_index = 0,
.queue_index = 0,
.timestamp =
BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
+ .monotonic_remote_boot = 0xffffff,
+ .monotonic_timestamp_boot = 0xffffff,
.data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
EXPECT_LT(m1, m2);
@@ -306,6 +310,16 @@
"name": "/c",
"type": "aos.logger.testing.TestMessage",
"source_node": "pi1"
+ },
+ {
+ "name": "/d",
+ "type": "aos.logger.testing.TestMessage",
+ "source_node": "pi2",
+ "destination_nodes": [
+ {
+ "name": "pi1"
+ }
+ ]
}
],
"nodes": [
@@ -333,6 +347,13 @@
"monotonic_start_time": 1000000,
"realtime_start_time": 1000000000000,
"log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+ "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "boot_uuids": [
+ "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "",
+ ""
+ ],
"parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
"parts_index": 0
})")),
@@ -349,6 +370,13 @@
"monotonic_start_time": 1000000,
"realtime_start_time": 1000000000000,
"log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+ "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "boot_uuids": [
+ "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "",
+ ""
+ ],
"parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
"parts_index": 0
})")),
@@ -364,6 +392,13 @@
},
"monotonic_start_time": 0,
"realtime_start_time": 1000000000000,
+ "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+ "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+ "boot_uuids": [
+ "",
+ "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+ ""
+ ],
"log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
"parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
"parts_index": 0
@@ -381,6 +416,13 @@
"monotonic_start_time": 2000000,
"realtime_start_time": 1000000000,
"log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
+ "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "boot_uuids": [
+ "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "",
+ ""
+ ],
"parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
"parts_index": 0
})")),
@@ -398,11 +440,19 @@
"realtime_start_time": 1000000000,
"log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
"parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
+ "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+ "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "boot_uuids": [
+ "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+ ""
+ ],
"parts_index": 0
})")) {
unlink(logfile0_.c_str());
unlink(logfile1_.c_str());
unlink(logfile2_.c_str());
+ unlink(logfile3_.c_str());
queue_index_.resize(kChannels);
}
@@ -487,6 +537,7 @@
const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
+ const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
const aos::FlatbufferDetachedBuffer<Configuration> config_;
const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
@@ -1817,7 +1868,12 @@
"parts_index": 0,
"logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
"logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
- "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464"
+ "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+ "boot_uuids": [
+ "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+ ""
+ ]
})")),
boot1_(MakeHeader(config_, R"({
/* 100ms */
@@ -1837,7 +1893,12 @@
"parts_index": 1,
"logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
"logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
- "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2"
+ "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+ "boot_uuids": [
+ "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+ ""
+ ]
})")) {}
protected:
@@ -1921,6 +1982,491 @@
EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
}
+class RebootTimestampMapperTest : public SortingElementTest {
+ public:
+ RebootTimestampMapperTest()
+ : SortingElementTest(),
+ boot0a_(MakeHeader(config_, R"({
+ /* 100ms */
+ "max_out_of_order_duration": 100000000,
+ "node": {
+ "name": "pi1"
+ },
+ "logger_node": {
+ "name": "pi1"
+ },
+ "monotonic_start_time": 1000000,
+ "realtime_start_time": 1000000000000,
+ "logger_monotonic_start_time": 1000000,
+ "logger_realtime_start_time": 1000000000000,
+ "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+ "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
+ "parts_index": 0,
+ "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+ "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "boot_uuids": [
+ "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+ ""
+ ]
+})")),
+ boot0b_(MakeHeader(config_, R"({
+ /* 100ms */
+ "max_out_of_order_duration": 100000000,
+ "node": {
+ "name": "pi1"
+ },
+ "logger_node": {
+ "name": "pi1"
+ },
+ "monotonic_start_time": 1000000,
+ "realtime_start_time": 1000000000000,
+ "logger_monotonic_start_time": 1000000,
+ "logger_realtime_start_time": 1000000000000,
+ "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+ "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
+ "parts_index": 1,
+ "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+ "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "boot_uuids": [
+ "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+ ""
+ ]
+})")),
+ boot1a_(MakeHeader(config_, R"({
+ /* 100ms */
+ "max_out_of_order_duration": 100000000,
+ "node": {
+ "name": "pi2"
+ },
+ "logger_node": {
+ "name": "pi1"
+ },
+ "monotonic_start_time": 1000000,
+ "realtime_start_time": 1000000000000,
+ "logger_monotonic_start_time": 1000000,
+ "logger_realtime_start_time": 1000000000000,
+ "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+ "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
+ "parts_index": 0,
+ "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+ "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+ "boot_uuids": [
+ "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+ ""
+ ]
+})")),
+ boot1b_(MakeHeader(config_, R"({
+ /* 100ms */
+ "max_out_of_order_duration": 100000000,
+ "node": {
+ "name": "pi2"
+ },
+ "logger_node": {
+ "name": "pi1"
+ },
+ "monotonic_start_time": 1000000,
+ "realtime_start_time": 1000000000000,
+ "logger_monotonic_start_time": 1000000,
+ "logger_realtime_start_time": 1000000000000,
+ "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+ "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
+ "parts_index": 1,
+ "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+ "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+ "boot_uuids": [
+ "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+ ""
+ ]
+})")) {}
+
+ protected:
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
+};
+
+
+// Tests that we can match timestamps on delivered messages in the presence of
+// reboots on the node receiving timestamps.
+TEST_F(RebootTimestampMapperTest, ReadNode0First) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0a(logfile0_, std::make_unique<DummyEncoder>());
+ writer0a.QueueSpan(boot0a_.span());
+ DetachedBufferWriter writer0b(logfile1_, std::make_unique<DummyEncoder>());
+ writer0b.QueueSpan(boot0b_.span());
+ DetachedBufferWriter writer1a(logfile2_, std::make_unique<DummyEncoder>());
+ writer1a.QueueSpan(boot1a_.span());
+ DetachedBufferWriter writer1b(logfile3_, std::make_unique<DummyEncoder>());
+ writer1b.QueueSpan(boot1b_.span());
+
+ writer0a.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer1a.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100),
+ e + chrono::milliseconds(1001)));
+
+ writer0b.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(20),
+ e + chrono::milliseconds(2001)));
+
+ writer0b.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+ writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(3000), 0, chrono::seconds(20),
+ e + chrono::milliseconds(3001)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
+
+ for (const auto &x : parts) {
+ LOG(INFO) << x;
+ }
+ ASSERT_EQ(parts.size(), 1u);
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+
+ size_t mapper0_count = 0;
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ mapper0.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper0_count; });
+ size_t mapper1_count = 0;
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ mapper1.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper1_count; });
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ std::deque<TimestampedMessage> output0;
+
+ EXPECT_EQ(mapper0_count, 0u);
+ EXPECT_EQ(mapper1_count, 0u);
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 1u);
+ EXPECT_EQ(mapper1_count, 0u);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_TRUE(mapper0.started());
+ EXPECT_EQ(mapper0_count, 1u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 2u);
+ EXPECT_EQ(mapper1_count, 0u);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_TRUE(mapper0.started());
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_TRUE(mapper0.started());
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ ASSERT_TRUE(mapper0.Front() == nullptr);
+
+ LOG(INFO) << output0[0];
+ LOG(INFO) << output0[1];
+ LOG(INFO) << output0[2];
+
+ EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
+ EXPECT_EQ(output0[0].monotonic_event_time.time,
+ e + chrono::milliseconds(1000));
+ EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
+ EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
+ EXPECT_TRUE(output0[0].data.Verify());
+
+ EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
+ EXPECT_EQ(output0[1].monotonic_event_time.time,
+ e + chrono::milliseconds(2000));
+ EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
+ EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
+ EXPECT_TRUE(output0[1].data.Verify());
+
+ EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
+ EXPECT_EQ(output0[2].monotonic_event_time.time,
+ e + chrono::milliseconds(3000));
+ EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
+ EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
+ EXPECT_TRUE(output0[2].data.Verify());
+ }
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 1u);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_TRUE(mapper1.started());
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 1u);
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 2u);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_TRUE(mapper1.started());
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_TRUE(mapper1.started());
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 3u);
+
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 3u);
+
+ EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
+ EXPECT_EQ(output1[0].monotonic_event_time.time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
+ EXPECT_EQ(output1[0].monotonic_remote_time.time,
+ e + chrono::milliseconds(1000));
+ EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
+ EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
+ e + chrono::milliseconds(1001));
+ EXPECT_TRUE(output1[0].data.Verify());
+
+ EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
+ EXPECT_EQ(output1[1].monotonic_event_time.time,
+ e + chrono::seconds(20) + chrono::milliseconds(2000));
+ EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
+ EXPECT_EQ(output1[1].monotonic_remote_time.time,
+ e + chrono::milliseconds(2000));
+ EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
+ EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
+ e + chrono::milliseconds(2001));
+ EXPECT_TRUE(output1[1].data.Verify());
+
+ EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
+ EXPECT_EQ(output1[2].monotonic_event_time.time,
+ e + chrono::seconds(20) + chrono::milliseconds(3000));
+ EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
+ EXPECT_EQ(output1[2].monotonic_remote_time.time,
+ e + chrono::milliseconds(3000));
+ EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
+ EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
+ e + chrono::milliseconds(3001));
+ EXPECT_TRUE(output1[2].data.Verify());
+
+ LOG(INFO) << output1[0];
+ LOG(INFO) << output1[1];
+ LOG(INFO) << output1[2];
+ }
+}
+
+TEST_F(RebootTimestampMapperTest, Node2Reboot) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0a(logfile0_, std::make_unique<DummyEncoder>());
+ writer0a.QueueSpan(boot0a_.span());
+ DetachedBufferWriter writer0b(logfile1_, std::make_unique<DummyEncoder>());
+ writer0b.QueueSpan(boot0b_.span());
+ DetachedBufferWriter writer1a(logfile2_, std::make_unique<DummyEncoder>());
+ writer1a.QueueSpan(boot1a_.span());
+ DetachedBufferWriter writer1b(logfile3_, std::make_unique<DummyEncoder>());
+ writer1b.QueueSpan(boot1b_.span());
+
+ writer1a.QueueSizedFlatbuffer(MakeLogMessage(
+ e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
+ writer0a.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
+ chrono::seconds(-100),
+ e + chrono::seconds(100) + chrono::milliseconds(1001)));
+
+ writer1b.QueueSizedFlatbuffer(MakeLogMessage(
+ e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
+ writer0b.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
+ chrono::seconds(-20),
+ e + chrono::seconds(20) + chrono::milliseconds(2001)));
+
+ writer1b.QueueSizedFlatbuffer(MakeLogMessage(
+ e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
+ writer0b.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
+ chrono::seconds(-20),
+ e + chrono::seconds(20) + chrono::milliseconds(3001)));
+ }
+
+ const std::vector<LogFile> parts =
+ SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
+
+ for (const auto &x : parts) {
+ LOG(INFO) << x;
+ }
+ ASSERT_EQ(parts.size(), 1u);
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+
+ size_t mapper0_count = 0;
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ mapper0.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper0_count; });
+ size_t mapper1_count = 0;
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ mapper1.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper1_count; });
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ std::deque<TimestampedMessage> output0;
+
+ EXPECT_EQ(mapper0_count, 0u);
+ EXPECT_EQ(mapper1_count, 0u);
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 1u);
+ EXPECT_EQ(mapper1_count, 0u);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_TRUE(mapper0.started());
+ EXPECT_EQ(mapper0_count, 1u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 2u);
+ EXPECT_EQ(mapper1_count, 0u);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_TRUE(mapper0.started());
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_TRUE(mapper0.started());
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ ASSERT_TRUE(mapper0.Front() == nullptr);
+
+ LOG(INFO) << output0[0];
+ LOG(INFO) << output0[1];
+ LOG(INFO) << output0[2];
+
+ EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
+ EXPECT_EQ(output0[0].monotonic_event_time.time,
+ e + chrono::milliseconds(1000));
+ EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
+ EXPECT_EQ(output0[0].monotonic_remote_time.time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
+ EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
+ e + chrono::seconds(100) + chrono::milliseconds(1001));
+ EXPECT_TRUE(output0[0].data.Verify());
+
+ EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
+ EXPECT_EQ(output0[1].monotonic_event_time.time,
+ e + chrono::milliseconds(2000));
+ EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
+ EXPECT_EQ(output0[1].monotonic_remote_time.time,
+ e + chrono::seconds(20) + chrono::milliseconds(2000));
+ EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
+ EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
+ e + chrono::seconds(20) + chrono::milliseconds(2001));
+ EXPECT_TRUE(output0[1].data.Verify());
+
+ EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
+ EXPECT_EQ(output0[2].monotonic_event_time.time,
+ e + chrono::milliseconds(3000));
+ EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
+ EXPECT_EQ(output0[2].monotonic_remote_time.time,
+ e + chrono::seconds(20) + chrono::milliseconds(3000));
+ EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
+ EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
+ e + chrono::seconds(20) + chrono::milliseconds(3001));
+ EXPECT_TRUE(output0[2].data.Verify());
+ }
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 1u);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_TRUE(mapper1.started());
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 1u);
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 2u);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_TRUE(mapper1.started());
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_TRUE(mapper1.started());
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 3u);
+
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 3u);
+
+ EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
+ EXPECT_EQ(output1[0].monotonic_event_time.time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
+ EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
+ EXPECT_TRUE(output1[0].data.Verify());
+
+ EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
+ EXPECT_EQ(output1[1].monotonic_event_time.time,
+ e + chrono::seconds(20) + chrono::milliseconds(2000));
+ EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
+ EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
+ EXPECT_TRUE(output1[1].data.Verify());
+
+ EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
+ EXPECT_EQ(output1[2].monotonic_event_time.time,
+ e + chrono::seconds(20) + chrono::milliseconds(3000));
+ EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
+ EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
+ EXPECT_TRUE(output1[2].data.Verify());
+
+ LOG(INFO) << output1[0];
+ LOG(INFO) << output1[1];
+ LOG(INFO) << output1[2];
+ }
+}
+
} // namespace testing
} // namespace logger
} // namespace aos