Add TimestampMapper to match timestamps with data
This finishes the main sorting code for
https://docs.google.com/document/d/1RZ6ZlADRUHmwiFOOmXA87FHPLFuN-7mS7tbFCwguZDE/edit#
This creates an object to buffer sorted data in, and to buffer data to
match every time we find a timestamp. Currently, there is no timing out
of the data buffered, but only the data that *could* be forwarded is
buffered.
We also have a bunch of restrictions here to simplify the logic. The
plan is to relax them as we run into them rather than solve everything
at the start. And, we can then add tests a lot better.
Change-Id: Idbc515e5594bc031139c7b994aaa71826ff68c0a
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 94ae25b..3bed06d 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -320,7 +320,7 @@
"logger_node": {
"name": "pi1"
},
- "monotonic_start_time": 0,
+ "monotonic_start_time": 1000000,
"realtime_start_time": 1000000000000,
"log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
"parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
@@ -336,14 +336,47 @@
"logger_node": {
"name": "pi1"
},
+ "monotonic_start_time": 1000000,
+ "realtime_start_time": 1000000000000,
+ "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+ "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
+ "parts_index": 0
+})")),
+ config2_(MakeHeader(config_,
+ R"({
+ /* 100ms */
+ "max_out_of_order_duration": 100000000,
+ "node": {
+ "name": "pi2"
+ },
+ "logger_node": {
+ "name": "pi2"
+ },
"monotonic_start_time": 0,
"realtime_start_time": 1000000000000,
- "log_event_uuid": "d4724d35-a6c6-4a30-8a94-d192f4c18260",
- "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
+ "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
+ "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
+ "parts_index": 0
+})")),
+ config3_(MakeHeader(config_,
+ R"({
+ /* 100ms */
+ "max_out_of_order_duration": 100000000,
+ "node": {
+ "name": "pi1"
+ },
+ "logger_node": {
+ "name": "pi1"
+ },
+ "monotonic_start_time": 2000000,
+ "realtime_start_time": 1000000000,
+ "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
+ "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
"parts_index": 0
})")) {
unlink(logfile0_.c_str());
unlink(logfile1_.c_str());
+ unlink(logfile2_.c_str());
queue_index_.resize(kChannels);
}
@@ -407,10 +440,13 @@
const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
+ const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
const aos::FlatbufferDetachedBuffer<Configuration> config_;
const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
std::vector<uint32_t> queue_index_;
};
@@ -418,6 +454,7 @@
using LogPartsSorterTest = SortingElementTest;
using LogPartsSorterDeathTest = LogPartsSorterTest;
using NodeMergerTest = SortingElementTest;
+using TimestampMapperTest = SortingElementTest;
// Tests that we can pull messages out of a log sorted in order.
TEST_F(LogPartsSorterTest, Pull) {
@@ -538,15 +575,9 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
- ASSERT_EQ(parts.size(), 2u);
+ ASSERT_EQ(parts.size(), 1u);
- std::vector<std::unique_ptr<LogPartsSorter>> parts_sorters;
- parts_sorters.emplace_back(
- std::make_unique<LogPartsSorter>(parts[0].parts[0]));
- parts_sorters.emplace_back(
- std::make_unique<LogPartsSorter>(parts[1].parts[0]));
-
- NodeMerger merger(std::move(parts_sorters));
+ NodeMerger merger(FilterPartsForNode(parts, "pi1"));
EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
@@ -595,6 +626,437 @@
EXPECT_EQ(output[5].timestamp, e + chrono::milliseconds(3002));
}
+// Tests that we can match timestamps on delivered messages.
+TEST_F(TimestampMapperTest, ReadNode0First) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config2_.span());
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+ ASSERT_EQ(parts[1].logger_node, "pi2");
+
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ std::deque<TimestampedMessage> output0;
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_EQ(mapper0.sorted_until(), e + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_EQ(mapper0.sorted_until(), e + chrono::milliseconds(2900));
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper0.Front() == nullptr);
+
+ EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
+ EXPECT_TRUE(output0[0].data.Verify());
+ EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
+ EXPECT_TRUE(output0[1].data.Verify());
+ EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
+ EXPECT_TRUE(output0[2].data.Verify());
+ }
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(2900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(output1[0].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_EQ(output1[1].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(2000));
+ EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_EQ(output1[2].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(3000));
+ EXPECT_TRUE(output1[2].data.Verify());
+ }
+}
+
+// Tests that we can match timestamps on delivered messages. By doing this in
+// the reverse order, the second node needs to queue data up from the first node
+// to find the matching timestamp.
+TEST_F(TimestampMapperTest, ReadNode1First) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config2_.span());
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+ ASSERT_EQ(parts[1].logger_node, "pi2");
+
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(2900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(output1[0].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_EQ(output1[1].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(2000));
+ EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_EQ(output1[2].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(3000));
+ EXPECT_TRUE(output1[2].data.Verify());
+ }
+
+ {
+ std::deque<TimestampedMessage> output0;
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper0.Front() == nullptr);
+
+ EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
+ EXPECT_TRUE(output0[0].data.Verify());
+ EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
+ EXPECT_TRUE(output0[1].data.Verify());
+ EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
+ EXPECT_TRUE(output0[2].data.Verify());
+ }
+}
+
+// Tests that we return just the timestamps if we couldn't find the data and the
+// missing data was at the beginning of the file.
+TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config2_.span());
+
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+ ASSERT_EQ(parts[1].logger_node, "pi2");
+
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(2900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(output1[0].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_FALSE(output1[0].data.Verify());
+ EXPECT_EQ(output1[1].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(2000));
+ EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_EQ(output1[2].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(3000));
+ EXPECT_TRUE(output1[2].data.Verify());
+ }
+}
+
+// Tests that we return just the timestamps if we couldn't find the data and the
+// missing data was at the end of the file.
+TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config2_.span());
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+ ASSERT_EQ(parts[1].logger_node, "pi2");
+
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(2900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(output1[0].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_EQ(output1[1].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(2000));
+ EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_EQ(output1[2].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(3000));
+ EXPECT_FALSE(output1[2].data.Verify());
+ }
+}
+
+// Tests that we properly sort log files with duplicate timestamps.
+TEST_F(TimestampMapperTest, ReadSameTimestamp) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config2_.span());
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+ ASSERT_EQ(parts[1].logger_node, "pi2");
+
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ for (int i = 0; i < 4; ++i) {
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ }
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(output1[0].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_EQ(output1[1].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(2000));
+ EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_EQ(output1[2].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(2000));
+ EXPECT_TRUE(output1[2].data.Verify());
+ EXPECT_EQ(output1[3].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(3000));
+ EXPECT_TRUE(output1[3].data.Verify());
+ }
+}
+
+// Tests that we properly sort log files with duplicate timestamps.
+TEST_F(TimestampMapperTest, StartTime) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config1_.span());
+ DetachedBufferWriter writer2(logfile2_, std::make_unique<DummyEncoder>());
+ writer2.QueueSpan(config3_.span());
+ }
+
+ const std::vector<LogFile> parts =
+ SortParts({logfile0_, logfile1_, logfile2_});
+
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+
+ EXPECT_EQ(mapper0.monotonic_start_time(), e + chrono::milliseconds(1));
+ EXPECT_EQ(mapper0.realtime_start_time(),
+ realtime_clock::time_point(chrono::seconds(1000)));
+}
+
} // namespace testing
} // namespace logger
} // namespace aos