blob: 9e4f8b947d479eb8c20efb4307e55ac289cc6b23 [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
4#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07005
Austin Schuhc41603c2020-10-11 16:17:37 -07006#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -07007#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -08008#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -07009#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070010#include "aos/json_to_flatbuffer.h"
11#include "aos/testing/tmpdir.h"
Brian Smarttea913d42021-12-10 15:02:38 -080012#include "gflags/gflags.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070013#include "gtest/gtest.h"
Austin Schuhc243b422020-10-11 15:35:08 -070014
15namespace aos {
16namespace logger {
17namespace testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070018namespace chrono = std::chrono;
Austin Schuhc243b422020-10-11 15:35:08 -070019
Austin Schuhd863e6e2022-10-16 15:44:50 -070020// Adapter class to make it easy to test DetachedBufferWriter without adding
21// test only boilerplate to DetachedBufferWriter.
22class TestDetachedBufferWriter : public DetachedBufferWriter {
23 public:
24 TestDetachedBufferWriter(std::string_view filename)
25 : DetachedBufferWriter(filename, std::make_unique<DummyEncoder>()) {}
26 void WriteSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
27 QueueSpan(absl::Span<const uint8_t>(buffer.data(), buffer.size()));
28 }
29};
30
Austin Schuhe243aaf2020-10-11 15:46:02 -070031// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070032template <typename T>
33SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
34 const std::string_view data) {
35 flatbuffers::FlatBufferBuilder fbb;
36 fbb.ForceDefaults(true);
37 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
38 return fbb.Release();
39}
40
Austin Schuhe243aaf2020-10-11 15:46:02 -070041// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070042TEST(SpanReaderTest, ReadWrite) {
43 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
44 unlink(logfile.c_str());
45
46 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080047 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070048 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080049 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070050
51 {
Austin Schuhd863e6e2022-10-16 15:44:50 -070052 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080053 writer.QueueSpan(m1.span());
54 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070055 }
56
57 SpanReader reader(logfile);
58
59 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhcf5f6442021-07-06 10:43:28 -070060 EXPECT_EQ(reader.PeekMessage(), m1.span());
61 EXPECT_EQ(reader.PeekMessage(), m1.span());
Austin Schuhadd6eb32020-11-09 21:24:26 -080062 EXPECT_EQ(reader.ReadMessage(), m1.span());
63 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhcf5f6442021-07-06 10:43:28 -070064 EXPECT_EQ(reader.PeekMessage(), absl::Span<const uint8_t>());
Austin Schuhc243b422020-10-11 15:35:08 -070065 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
66}
67
Austin Schuhe243aaf2020-10-11 15:46:02 -070068// Tests that we can actually parse the resulting messages at a basic level
69// through MessageReader.
70TEST(MessageReaderTest, ReadWrite) {
71 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
72 unlink(logfile.c_str());
73
74 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
75 JsonToSizedFlatbuffer<LogFileHeader>(
76 R"({ "max_out_of_order_duration": 100000000 })");
77 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
78 JsonToSizedFlatbuffer<MessageHeader>(
79 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
80 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
81 JsonToSizedFlatbuffer<MessageHeader>(
82 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
83
84 {
Austin Schuhd863e6e2022-10-16 15:44:50 -070085 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080086 writer.QueueSpan(config.span());
87 writer.QueueSpan(m1.span());
88 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -070089 }
90
91 MessageReader reader(logfile);
92
93 EXPECT_EQ(reader.filename(), logfile);
94
95 EXPECT_EQ(
96 reader.max_out_of_order_duration(),
97 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
98 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
99 EXPECT_TRUE(reader.ReadMessage());
100 EXPECT_EQ(reader.newest_timestamp(),
101 monotonic_clock::time_point(chrono::nanoseconds(1)));
102 EXPECT_TRUE(reader.ReadMessage());
103 EXPECT_EQ(reader.newest_timestamp(),
104 monotonic_clock::time_point(chrono::nanoseconds(2)));
105 EXPECT_FALSE(reader.ReadMessage());
106}
107
Austin Schuh32f68492020-11-08 21:45:51 -0800108// Tests that we explode when messages are too far out of order.
109TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
110 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
111 unlink(logfile0.c_str());
112
113 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
114 JsonToSizedFlatbuffer<LogFileHeader>(
115 R"({
116 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800117 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800118 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
119 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
120 "parts_index": 0
121})");
122
123 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
124 JsonToSizedFlatbuffer<MessageHeader>(
125 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
126 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
127 JsonToSizedFlatbuffer<MessageHeader>(
128 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
129 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
130 JsonToSizedFlatbuffer<MessageHeader>(
131 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
132
133 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700134 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800135 writer.QueueSpan(config0.span());
136 writer.QueueSpan(m1.span());
137 writer.QueueSpan(m2.span());
138 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800139 }
140
141 const std::vector<LogFile> parts = SortParts({logfile0});
142
143 PartsMessageReader reader(parts[0].parts[0]);
144
145 EXPECT_TRUE(reader.ReadMessage());
146 EXPECT_TRUE(reader.ReadMessage());
147 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
148}
149
Austin Schuhc41603c2020-10-11 16:17:37 -0700150// Tests that we can transparently re-assemble part files with a
151// PartsMessageReader.
152TEST(PartsMessageReaderTest, ReadWrite) {
153 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
154 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
155 unlink(logfile0.c_str());
156 unlink(logfile1.c_str());
157
158 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
159 JsonToSizedFlatbuffer<LogFileHeader>(
160 R"({
161 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800162 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700163 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
164 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
165 "parts_index": 0
166})");
167 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
168 JsonToSizedFlatbuffer<LogFileHeader>(
169 R"({
170 "max_out_of_order_duration": 200000000,
171 "monotonic_start_time": 0,
172 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800173 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700174 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
175 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
176 "parts_index": 1
177})");
178
179 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
180 JsonToSizedFlatbuffer<MessageHeader>(
181 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
182 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
183 JsonToSizedFlatbuffer<MessageHeader>(
184 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
185
186 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700187 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800188 writer.QueueSpan(config0.span());
189 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700190 }
191 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700192 TestDetachedBufferWriter writer(logfile1);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800193 writer.QueueSpan(config1.span());
194 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700195 }
196
197 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
198
199 PartsMessageReader reader(parts[0].parts[0]);
200
201 EXPECT_EQ(reader.filename(), logfile0);
202
203 // Confirm that the timestamps track, and the filename also updates.
204 // Read the first message.
205 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
206 EXPECT_EQ(
207 reader.max_out_of_order_duration(),
208 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
209 EXPECT_TRUE(reader.ReadMessage());
210 EXPECT_EQ(reader.filename(), logfile0);
211 EXPECT_EQ(reader.newest_timestamp(),
212 monotonic_clock::time_point(chrono::nanoseconds(1)));
213 EXPECT_EQ(
214 reader.max_out_of_order_duration(),
215 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
216
217 // Read the second message.
218 EXPECT_TRUE(reader.ReadMessage());
219 EXPECT_EQ(reader.filename(), logfile1);
220 EXPECT_EQ(reader.newest_timestamp(),
221 monotonic_clock::time_point(chrono::nanoseconds(2)));
222 EXPECT_EQ(
223 reader.max_out_of_order_duration(),
224 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
225
226 // And then confirm that reading again returns no message.
227 EXPECT_FALSE(reader.ReadMessage());
228 EXPECT_EQ(reader.filename(), logfile1);
229 EXPECT_EQ(
230 reader.max_out_of_order_duration(),
231 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800232 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Austin Schuhc41603c2020-10-11 16:17:37 -0700233}
Austin Schuh32f68492020-11-08 21:45:51 -0800234
Austin Schuh1be0ce42020-11-29 22:43:26 -0800235// Tests that Message's operator < works as expected.
236TEST(MessageTest, Sorting) {
237 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
238
239 Message m1{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700240 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700241 .timestamp =
242 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
Austin Schuh48507722021-07-17 17:29:24 -0700243 .monotonic_remote_boot = 0xffffff,
244 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700245 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800246 Message m2{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700247 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700248 .timestamp =
249 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
Austin Schuh48507722021-07-17 17:29:24 -0700250 .monotonic_remote_boot = 0xffffff,
251 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700252 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800253
254 EXPECT_LT(m1, m2);
255 EXPECT_GE(m2, m1);
256
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700257 m1.timestamp.time = e;
258 m2.timestamp.time = e;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800259
260 m1.channel_index = 1;
261 m2.channel_index = 2;
262
263 EXPECT_LT(m1, m2);
264 EXPECT_GE(m2, m1);
265
266 m1.channel_index = 0;
267 m2.channel_index = 0;
Austin Schuh58646e22021-08-23 23:51:46 -0700268 m1.queue_index.index = 0u;
269 m2.queue_index.index = 1u;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800270
271 EXPECT_LT(m1, m2);
272 EXPECT_GE(m2, m1);
273}
274
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800275aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
276 const aos::FlatbufferDetachedBuffer<Configuration> &config,
277 const std::string_view json) {
278 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700279 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800280 flatbuffers::Offset<Configuration> config_offset =
281 aos::CopyFlatBuffer(config, &fbb);
282 LogFileHeader::Builder header_builder(fbb);
283 header_builder.add_configuration(config_offset);
284 fbb.Finish(header_builder.Finish());
285 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
286
287 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
288 JsonToFlatbuffer<LogFileHeader>(json));
289 CHECK(header_updates.Verify());
290 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700291 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800292 fbb2.FinishSizePrefixed(
293 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
294 return fbb2.Release();
295}
296
297class SortingElementTest : public ::testing::Test {
298 public:
299 SortingElementTest()
300 : config_(JsonToFlatbuffer<Configuration>(
301 R"({
302 "channels": [
303 {
304 "name": "/a",
305 "type": "aos.logger.testing.TestMessage",
306 "source_node": "pi1",
307 "destination_nodes": [
308 {
309 "name": "pi2"
310 },
311 {
312 "name": "pi3"
313 }
314 ]
315 },
316 {
317 "name": "/b",
318 "type": "aos.logger.testing.TestMessage",
319 "source_node": "pi1"
320 },
321 {
322 "name": "/c",
323 "type": "aos.logger.testing.TestMessage",
324 "source_node": "pi1"
Austin Schuh48507722021-07-17 17:29:24 -0700325 },
326 {
327 "name": "/d",
328 "type": "aos.logger.testing.TestMessage",
329 "source_node": "pi2",
330 "destination_nodes": [
331 {
332 "name": "pi1"
333 }
334 ]
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800335 }
336 ],
337 "nodes": [
338 {
339 "name": "pi1"
340 },
341 {
342 "name": "pi2"
343 },
344 {
345 "name": "pi3"
346 }
347 ]
348}
349)")),
350 config0_(MakeHeader(config_, R"({
351 /* 100ms */
352 "max_out_of_order_duration": 100000000,
353 "node": {
354 "name": "pi1"
355 },
356 "logger_node": {
357 "name": "pi1"
358 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800359 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800360 "realtime_start_time": 1000000000000,
361 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700362 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
363 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
364 "boot_uuids": [
365 "1d782c63-b3c7-466e-bea9-a01308b43333",
366 "",
367 ""
368 ],
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800369 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
370 "parts_index": 0
371})")),
372 config1_(MakeHeader(config_,
373 R"({
374 /* 100ms */
375 "max_out_of_order_duration": 100000000,
376 "node": {
377 "name": "pi1"
378 },
379 "logger_node": {
380 "name": "pi1"
381 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800382 "monotonic_start_time": 1000000,
383 "realtime_start_time": 1000000000000,
384 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700385 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
386 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
387 "boot_uuids": [
388 "1d782c63-b3c7-466e-bea9-a01308b43333",
389 "",
390 ""
391 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800392 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
393 "parts_index": 0
394})")),
395 config2_(MakeHeader(config_,
396 R"({
397 /* 100ms */
398 "max_out_of_order_duration": 100000000,
399 "node": {
400 "name": "pi2"
401 },
402 "logger_node": {
403 "name": "pi2"
404 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800405 "monotonic_start_time": 0,
406 "realtime_start_time": 1000000000000,
Austin Schuh48507722021-07-17 17:29:24 -0700407 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
408 "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
409 "boot_uuids": [
410 "",
411 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
412 ""
413 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800414 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
415 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
416 "parts_index": 0
417})")),
418 config3_(MakeHeader(config_,
419 R"({
420 /* 100ms */
421 "max_out_of_order_duration": 100000000,
422 "node": {
423 "name": "pi1"
424 },
425 "logger_node": {
426 "name": "pi1"
427 },
428 "monotonic_start_time": 2000000,
429 "realtime_start_time": 1000000000,
430 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
Austin Schuh48507722021-07-17 17:29:24 -0700431 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
432 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
433 "boot_uuids": [
434 "1d782c63-b3c7-466e-bea9-a01308b43333",
435 "",
436 ""
437 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800438 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800439 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800440})")),
441 config4_(MakeHeader(config_,
442 R"({
443 /* 100ms */
444 "max_out_of_order_duration": 100000000,
445 "node": {
446 "name": "pi2"
447 },
448 "logger_node": {
449 "name": "pi1"
450 },
451 "monotonic_start_time": 2000000,
452 "realtime_start_time": 1000000000,
453 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
454 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
Austin Schuh48507722021-07-17 17:29:24 -0700455 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
456 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
457 "boot_uuids": [
458 "1d782c63-b3c7-466e-bea9-a01308b43333",
459 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
460 ""
461 ],
Austin Schuh8bf1e632021-01-02 22:41:04 -0800462 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800463})")) {
464 unlink(logfile0_.c_str());
465 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800466 unlink(logfile2_.c_str());
Austin Schuh48507722021-07-17 17:29:24 -0700467 unlink(logfile3_.c_str());
Austin Schuh921b2562021-07-31 19:37:03 -0700468 queue_index_.resize(config_.message().channels()->size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800469 }
470
471 protected:
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800472 flatbuffers::DetachedBuffer MakeLogMessage(
473 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
474 int value) {
475 flatbuffers::FlatBufferBuilder message_fbb;
476 message_fbb.ForceDefaults(true);
477 TestMessage::Builder test_message_builder(message_fbb);
478 test_message_builder.add_value(value);
479 message_fbb.Finish(test_message_builder.Finish());
480
481 aos::Context context;
482 context.monotonic_event_time = monotonic_now;
483 context.realtime_event_time = aos::realtime_clock::epoch() +
484 chrono::seconds(1000) +
485 monotonic_now.time_since_epoch();
Austin Schuh921b2562021-07-31 19:37:03 -0700486 CHECK_LT(static_cast<size_t>(channel_index), queue_index_.size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800487 context.queue_index = queue_index_[channel_index];
488 context.size = message_fbb.GetSize();
489 context.data = message_fbb.GetBufferPointer();
490
491 ++queue_index_[channel_index];
492
493 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh53e3a812021-11-03 16:57:23 -0700494 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800495 fbb.FinishSizePrefixed(
496 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
497
498 return fbb.Release();
499 }
500
501 flatbuffers::DetachedBuffer MakeTimestampMessage(
502 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800503 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
504 monotonic_clock::time_point monotonic_timestamp_time =
505 monotonic_clock::min_time) {
506 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800507 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800508
509 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800510 fbb.ForceDefaults(true);
511
512 logger::MessageHeader::Builder message_header_builder(fbb);
513
514 message_header_builder.add_channel_index(channel_index);
515
516 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
517 100);
518 message_header_builder.add_monotonic_sent_time(
519 monotonic_sent_time.time_since_epoch().count());
520 message_header_builder.add_realtime_sent_time(
521 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
522 monotonic_sent_time.time_since_epoch())
523 .time_since_epoch()
524 .count());
525
526 message_header_builder.add_monotonic_remote_time(
527 sender_monotonic_now.time_since_epoch().count());
528 message_header_builder.add_realtime_remote_time(
529 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
530 sender_monotonic_now.time_since_epoch())
531 .time_since_epoch()
532 .count());
533 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
534 1);
535
536 if (monotonic_timestamp_time != monotonic_clock::min_time) {
537 message_header_builder.add_monotonic_timestamp_time(
538 monotonic_timestamp_time.time_since_epoch().count());
539 }
540
541 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800542 LOG(INFO) << aos::FlatbufferToJson(
543 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
544 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
545
546 return fbb.Release();
547 }
548
549 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
550 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800551 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh48507722021-07-17 17:29:24 -0700552 const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800553
554 const aos::FlatbufferDetachedBuffer<Configuration> config_;
555 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
556 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800557 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
558 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800559 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800560
561 std::vector<uint32_t> queue_index_;
562};
563
564using LogPartsSorterTest = SortingElementTest;
565using LogPartsSorterDeathTest = LogPartsSorterTest;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800566using NodeMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800567using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800568
569// Tests that we can pull messages out of a log sorted in order.
570TEST_F(LogPartsSorterTest, Pull) {
571 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
572 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700573 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800574 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700575 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800576 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700577 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800578 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700579 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800580 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700581 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800582 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
583 }
584
585 const std::vector<LogFile> parts = SortParts({logfile0_});
586
587 LogPartsSorter parts_sorter(parts[0].parts[0]);
588
589 // Confirm we aren't sorted until any time until the message is popped.
590 // Peeking shouldn't change the sorted until time.
591 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
592
593 std::deque<Message> output;
594
595 ASSERT_TRUE(parts_sorter.Front() != nullptr);
596 output.emplace_back(std::move(*parts_sorter.Front()));
597 parts_sorter.PopFront();
598 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
599
600 ASSERT_TRUE(parts_sorter.Front() != nullptr);
601 output.emplace_back(std::move(*parts_sorter.Front()));
602 parts_sorter.PopFront();
603 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
604
605 ASSERT_TRUE(parts_sorter.Front() != nullptr);
606 output.emplace_back(std::move(*parts_sorter.Front()));
607 parts_sorter.PopFront();
608 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
609
610 ASSERT_TRUE(parts_sorter.Front() != nullptr);
611 output.emplace_back(std::move(*parts_sorter.Front()));
612 parts_sorter.PopFront();
613 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
614
615 ASSERT_TRUE(parts_sorter.Front() == nullptr);
616
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700617 EXPECT_EQ(output[0].timestamp.boot, 0);
618 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
619 EXPECT_EQ(output[1].timestamp.boot, 0);
620 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1000));
621 EXPECT_EQ(output[2].timestamp.boot, 0);
622 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1901));
623 EXPECT_EQ(output[3].timestamp.boot, 0);
624 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800625}
626
Austin Schuhb000de62020-12-03 22:00:40 -0800627// Tests that we can pull messages out of a log sorted in order.
628TEST_F(LogPartsSorterTest, WayBeforeStart) {
629 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
630 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700631 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhb000de62020-12-03 22:00:40 -0800632 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700633 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800634 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700635 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800636 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700637 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800638 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700639 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800640 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700641 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800642 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
643 }
644
645 const std::vector<LogFile> parts = SortParts({logfile0_});
646
647 LogPartsSorter parts_sorter(parts[0].parts[0]);
648
649 // Confirm we aren't sorted until any time until the message is popped.
650 // Peeking shouldn't change the sorted until time.
651 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
652
653 std::deque<Message> output;
654
655 for (monotonic_clock::time_point t :
656 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
657 e + chrono::milliseconds(1900), monotonic_clock::max_time,
658 monotonic_clock::max_time}) {
659 ASSERT_TRUE(parts_sorter.Front() != nullptr);
660 output.emplace_back(std::move(*parts_sorter.Front()));
661 parts_sorter.PopFront();
662 EXPECT_EQ(parts_sorter.sorted_until(), t);
663 }
664
665 ASSERT_TRUE(parts_sorter.Front() == nullptr);
666
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700667 EXPECT_EQ(output[0].timestamp.boot, 0u);
668 EXPECT_EQ(output[0].timestamp.time, e - chrono::milliseconds(1000));
669 EXPECT_EQ(output[1].timestamp.boot, 0u);
670 EXPECT_EQ(output[1].timestamp.time, e - chrono::milliseconds(500));
671 EXPECT_EQ(output[2].timestamp.boot, 0u);
672 EXPECT_EQ(output[2].timestamp.time, e - chrono::milliseconds(10));
673 EXPECT_EQ(output[3].timestamp.boot, 0u);
674 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(1901));
675 EXPECT_EQ(output[4].timestamp.boot, 0u);
676 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuhb000de62020-12-03 22:00:40 -0800677}
678
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800679// Tests that messages too far out of order trigger death.
680TEST_F(LogPartsSorterDeathTest, Pull) {
681 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
682 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700683 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800684 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700685 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800686 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700687 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800688 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700689 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800690 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
691 // The following message is too far out of order and will trigger the CHECK.
Austin Schuhd863e6e2022-10-16 15:44:50 -0700692 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800693 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
694 }
695
696 const std::vector<LogFile> parts = SortParts({logfile0_});
697
698 LogPartsSorter parts_sorter(parts[0].parts[0]);
699
700 // Confirm we aren't sorted until any time until the message is popped.
701 // Peeking shouldn't change the sorted until time.
702 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
703 std::deque<Message> output;
704
705 ASSERT_TRUE(parts_sorter.Front() != nullptr);
706 parts_sorter.PopFront();
707 ASSERT_TRUE(parts_sorter.Front() != nullptr);
708 ASSERT_TRUE(parts_sorter.Front() != nullptr);
709 parts_sorter.PopFront();
710
Austin Schuh58646e22021-08-23 23:51:46 -0700711 EXPECT_DEATH({ parts_sorter.Front(); },
712 "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800713}
714
Austin Schuh8f52ed52020-11-30 23:12:39 -0800715// Tests that we can merge data from 2 separate files, including duplicate data.
716TEST_F(NodeMergerTest, TwoFileMerger) {
717 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
718 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700719 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800720 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700721 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800722 writer1.QueueSpan(config1_.span());
723
Austin Schuhd863e6e2022-10-16 15:44:50 -0700724 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800725 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700726 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800727 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
728
Austin Schuhd863e6e2022-10-16 15:44:50 -0700729 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800730 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700731 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800732 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
733
734 // Make a duplicate!
735 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
736 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
737 writer0.QueueSpan(msg.span());
738 writer1.QueueSpan(msg.span());
739
Austin Schuhd863e6e2022-10-16 15:44:50 -0700740 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800741 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
742 }
743
744 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Austin Schuhd2f96102020-12-01 20:27:29 -0800745 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800746
Austin Schuhd2f96102020-12-01 20:27:29 -0800747 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800748
749 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
750
751 std::deque<Message> output;
752
753 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
754 ASSERT_TRUE(merger.Front() != nullptr);
755 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
756
757 output.emplace_back(std::move(*merger.Front()));
758 merger.PopFront();
759 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
760
761 ASSERT_TRUE(merger.Front() != nullptr);
762 output.emplace_back(std::move(*merger.Front()));
763 merger.PopFront();
764 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
765
766 ASSERT_TRUE(merger.Front() != nullptr);
767 output.emplace_back(std::move(*merger.Front()));
768 merger.PopFront();
769 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
770
771 ASSERT_TRUE(merger.Front() != nullptr);
772 output.emplace_back(std::move(*merger.Front()));
773 merger.PopFront();
774 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
775
776 ASSERT_TRUE(merger.Front() != nullptr);
777 output.emplace_back(std::move(*merger.Front()));
778 merger.PopFront();
779 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
780
781 ASSERT_TRUE(merger.Front() != nullptr);
782 output.emplace_back(std::move(*merger.Front()));
783 merger.PopFront();
784 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
785
786 ASSERT_TRUE(merger.Front() == nullptr);
787
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700788 EXPECT_EQ(output[0].timestamp.boot, 0u);
789 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
790 EXPECT_EQ(output[1].timestamp.boot, 0u);
791 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1001));
792 EXPECT_EQ(output[2].timestamp.boot, 0u);
793 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1002));
794 EXPECT_EQ(output[3].timestamp.boot, 0u);
795 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
796 EXPECT_EQ(output[4].timestamp.boot, 0u);
797 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(3000));
798 EXPECT_EQ(output[5].timestamp.boot, 0u);
799 EXPECT_EQ(output[5].timestamp.time, e + chrono::milliseconds(3002));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800800}
801
Austin Schuh8bf1e632021-01-02 22:41:04 -0800802// Tests that we can merge timestamps with various combinations of
803// monotonic_timestamp_time.
804TEST_F(NodeMergerTest, TwoFileTimestampMerger) {
805 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
806 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700807 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800808 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700809 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800810 writer1.QueueSpan(config1_.span());
811
812 // Neither has it.
813 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700814 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800815 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700816 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800817 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
818
819 // First only has it.
820 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700821 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800822 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
823 e + chrono::nanoseconds(971)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700824 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800825 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
826
827 // Second only has it.
828 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700829 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800830 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700831 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800832 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
833 e + chrono::nanoseconds(972)));
834
835 // Both have it.
836 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700837 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800838 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
839 e + chrono::nanoseconds(973)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700840 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800841 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
842 e + chrono::nanoseconds(973)));
843 }
844
845 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
846 ASSERT_EQ(parts.size(), 1u);
847
848 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
849
850 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
851
852 std::deque<Message> output;
853
854 for (int i = 0; i < 4; ++i) {
855 ASSERT_TRUE(merger.Front() != nullptr);
856 output.emplace_back(std::move(*merger.Front()));
857 merger.PopFront();
858 }
859 ASSERT_TRUE(merger.Front() == nullptr);
860
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700861 EXPECT_EQ(output[0].timestamp.boot, 0u);
862 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700863 EXPECT_FALSE(output[0].data->has_monotonic_timestamp_time);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700864
865 EXPECT_EQ(output[1].timestamp.boot, 0u);
866 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700867 EXPECT_TRUE(output[1].data->has_monotonic_timestamp_time);
868 EXPECT_EQ(output[1].data->monotonic_timestamp_time,
869 monotonic_clock::time_point(std::chrono::nanoseconds(971)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700870
871 EXPECT_EQ(output[2].timestamp.boot, 0u);
872 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700873 EXPECT_TRUE(output[2].data->has_monotonic_timestamp_time);
874 EXPECT_EQ(output[2].data->monotonic_timestamp_time,
875 monotonic_clock::time_point(std::chrono::nanoseconds(972)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700876
877 EXPECT_EQ(output[3].timestamp.boot, 0u);
878 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700879 EXPECT_TRUE(output[3].data->has_monotonic_timestamp_time);
880 EXPECT_EQ(output[3].data->monotonic_timestamp_time,
881 monotonic_clock::time_point(std::chrono::nanoseconds(973)));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800882}
883
Austin Schuhd2f96102020-12-01 20:27:29 -0800884// Tests that we can match timestamps on delivered messages.
885TEST_F(TimestampMapperTest, ReadNode0First) {
886 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
887 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700888 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800889 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700890 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800891 writer1.QueueSpan(config2_.span());
892
Austin Schuhd863e6e2022-10-16 15:44:50 -0700893 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800894 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700895 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800896 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
897
Austin Schuhd863e6e2022-10-16 15:44:50 -0700898 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800899 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700900 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800901 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
902
Austin Schuhd863e6e2022-10-16 15:44:50 -0700903 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800904 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700905 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800906 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
907 }
908
909 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
910
911 ASSERT_EQ(parts[0].logger_node, "pi1");
912 ASSERT_EQ(parts[1].logger_node, "pi2");
913
Austin Schuh79b30942021-01-24 22:32:21 -0800914 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800915 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -0800916 mapper0.set_timestamp_callback(
917 [&](TimestampedMessage *) { ++mapper0_count; });
918 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800919 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -0800920 mapper1.set_timestamp_callback(
921 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -0800922
923 mapper0.AddPeer(&mapper1);
924 mapper1.AddPeer(&mapper0);
925
926 {
927 std::deque<TimestampedMessage> output0;
928
Austin Schuh79b30942021-01-24 22:32:21 -0800929 EXPECT_EQ(mapper0_count, 0u);
930 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800931 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800932 EXPECT_EQ(mapper0_count, 1u);
933 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800934 output0.emplace_back(std::move(*mapper0.Front()));
935 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700936 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800937 EXPECT_EQ(mapper0_count, 1u);
938 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800939
940 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800941 EXPECT_EQ(mapper0_count, 2u);
942 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800943 output0.emplace_back(std::move(*mapper0.Front()));
944 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700945 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800946
947 ASSERT_TRUE(mapper0.Front() != nullptr);
948 output0.emplace_back(std::move(*mapper0.Front()));
949 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700950 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800951
Austin Schuh79b30942021-01-24 22:32:21 -0800952 EXPECT_EQ(mapper0_count, 3u);
953 EXPECT_EQ(mapper1_count, 0u);
954
Austin Schuhd2f96102020-12-01 20:27:29 -0800955 ASSERT_TRUE(mapper0.Front() == nullptr);
956
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700957 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
958 EXPECT_EQ(output0[0].monotonic_event_time.time,
959 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700960 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700961
962 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
963 EXPECT_EQ(output0[1].monotonic_event_time.time,
964 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700965 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700966
967 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
968 EXPECT_EQ(output0[2].monotonic_event_time.time,
969 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700970 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -0800971 }
972
973 {
974 SCOPED_TRACE("Trying node1 now");
975 std::deque<TimestampedMessage> output1;
976
Austin Schuh79b30942021-01-24 22:32:21 -0800977 EXPECT_EQ(mapper0_count, 3u);
978 EXPECT_EQ(mapper1_count, 0u);
979
Austin Schuhd2f96102020-12-01 20:27:29 -0800980 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800981 EXPECT_EQ(mapper0_count, 3u);
982 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800983 output1.emplace_back(std::move(*mapper1.Front()));
984 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700985 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800986 EXPECT_EQ(mapper0_count, 3u);
987 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800988
989 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800990 EXPECT_EQ(mapper0_count, 3u);
991 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800992 output1.emplace_back(std::move(*mapper1.Front()));
993 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700994 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800995
996 ASSERT_TRUE(mapper1.Front() != nullptr);
997 output1.emplace_back(std::move(*mapper1.Front()));
998 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700999 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001000
Austin Schuh79b30942021-01-24 22:32:21 -08001001 EXPECT_EQ(mapper0_count, 3u);
1002 EXPECT_EQ(mapper1_count, 3u);
1003
Austin Schuhd2f96102020-12-01 20:27:29 -08001004 ASSERT_TRUE(mapper1.Front() == nullptr);
1005
Austin Schuh79b30942021-01-24 22:32:21 -08001006 EXPECT_EQ(mapper0_count, 3u);
1007 EXPECT_EQ(mapper1_count, 3u);
1008
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001009 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1010 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001011 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001012 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001013
1014 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1015 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001016 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001017 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001018
1019 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1020 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001021 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001022 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001023 }
1024}
1025
Austin Schuh8bf1e632021-01-02 22:41:04 -08001026// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
1027// returned.
1028TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
1029 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1030 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001031 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001032 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001033 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001034 writer1.QueueSpan(config4_.span());
1035
Austin Schuhd863e6e2022-10-16 15:44:50 -07001036 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001037 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001038 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001039 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
1040 e + chrono::nanoseconds(971)));
1041
Austin Schuhd863e6e2022-10-16 15:44:50 -07001042 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001043 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001044 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001045 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
1046 e + chrono::nanoseconds(5458)));
1047
Austin Schuhd863e6e2022-10-16 15:44:50 -07001048 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001049 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001050 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001051 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1052 }
1053
1054 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1055
1056 for (const auto &p : parts) {
1057 LOG(INFO) << p;
1058 }
1059
1060 ASSERT_EQ(parts.size(), 1u);
1061
Austin Schuh79b30942021-01-24 22:32:21 -08001062 size_t mapper0_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001063 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001064 mapper0.set_timestamp_callback(
1065 [&](TimestampedMessage *) { ++mapper0_count; });
1066 size_t mapper1_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001067 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001068 mapper1.set_timestamp_callback(
1069 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -08001070
1071 mapper0.AddPeer(&mapper1);
1072 mapper1.AddPeer(&mapper0);
1073
1074 {
1075 std::deque<TimestampedMessage> output0;
1076
1077 for (int i = 0; i < 3; ++i) {
1078 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
1079 output0.emplace_back(std::move(*mapper0.Front()));
1080 mapper0.PopFront();
1081 }
1082
1083 ASSERT_TRUE(mapper0.Front() == nullptr);
1084
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001085 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1086 EXPECT_EQ(output0[0].monotonic_event_time.time,
1087 e + chrono::milliseconds(1000));
1088 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
1089 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
1090 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001091 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001092
1093 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1094 EXPECT_EQ(output0[1].monotonic_event_time.time,
1095 e + chrono::milliseconds(2000));
1096 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
1097 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
1098 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001099 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001100
1101 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1102 EXPECT_EQ(output0[2].monotonic_event_time.time,
1103 e + chrono::milliseconds(3000));
1104 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
1105 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
1106 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001107 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001108 }
1109
1110 {
1111 SCOPED_TRACE("Trying node1 now");
1112 std::deque<TimestampedMessage> output1;
1113
1114 for (int i = 0; i < 3; ++i) {
1115 ASSERT_TRUE(mapper1.Front() != nullptr);
1116 output1.emplace_back(std::move(*mapper1.Front()));
1117 mapper1.PopFront();
1118 }
1119
1120 ASSERT_TRUE(mapper1.Front() == nullptr);
1121
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001122 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1123 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001124 e + chrono::seconds(100) + chrono::milliseconds(1000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001125 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
1126 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001127 e + chrono::nanoseconds(971));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001128 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001129
1130 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1131 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001132 e + chrono::seconds(100) + chrono::milliseconds(2000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001133 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
1134 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001135 e + chrono::nanoseconds(5458));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001136 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001137
1138 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1139 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001140 e + chrono::seconds(100) + chrono::milliseconds(3000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001141 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
1142 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
1143 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001144 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001145 }
Austin Schuh79b30942021-01-24 22:32:21 -08001146
1147 EXPECT_EQ(mapper0_count, 3u);
1148 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001149}
1150
Austin Schuhd2f96102020-12-01 20:27:29 -08001151// Tests that we can match timestamps on delivered messages. By doing this in
1152// the reverse order, the second node needs to queue data up from the first node
1153// to find the matching timestamp.
1154TEST_F(TimestampMapperTest, ReadNode1First) {
1155 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1156 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001157 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001158 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001159 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001160 writer1.QueueSpan(config2_.span());
1161
Austin Schuhd863e6e2022-10-16 15:44:50 -07001162 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001163 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001164 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001165 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1166
Austin Schuhd863e6e2022-10-16 15:44:50 -07001167 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001168 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001169 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001170 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1171
Austin Schuhd863e6e2022-10-16 15:44:50 -07001172 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001173 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001174 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001175 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1176 }
1177
1178 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1179
1180 ASSERT_EQ(parts[0].logger_node, "pi1");
1181 ASSERT_EQ(parts[1].logger_node, "pi2");
1182
Austin Schuh79b30942021-01-24 22:32:21 -08001183 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001184 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001185 mapper0.set_timestamp_callback(
1186 [&](TimestampedMessage *) { ++mapper0_count; });
1187 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001188 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001189 mapper1.set_timestamp_callback(
1190 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001191
1192 mapper0.AddPeer(&mapper1);
1193 mapper1.AddPeer(&mapper0);
1194
1195 {
1196 SCOPED_TRACE("Trying node1 now");
1197 std::deque<TimestampedMessage> output1;
1198
1199 ASSERT_TRUE(mapper1.Front() != nullptr);
1200 output1.emplace_back(std::move(*mapper1.Front()));
1201 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001202 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001203
1204 ASSERT_TRUE(mapper1.Front() != nullptr);
1205 output1.emplace_back(std::move(*mapper1.Front()));
1206 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001207 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001208
1209 ASSERT_TRUE(mapper1.Front() != nullptr);
1210 output1.emplace_back(std::move(*mapper1.Front()));
1211 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001212 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001213
1214 ASSERT_TRUE(mapper1.Front() == nullptr);
1215
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001216 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1217 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001218 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001219 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001220
1221 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1222 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001223 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001224 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001225
1226 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1227 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001228 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001229 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001230 }
1231
1232 {
1233 std::deque<TimestampedMessage> output0;
1234
1235 ASSERT_TRUE(mapper0.Front() != nullptr);
1236 output0.emplace_back(std::move(*mapper0.Front()));
1237 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001238 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001239
1240 ASSERT_TRUE(mapper0.Front() != nullptr);
1241 output0.emplace_back(std::move(*mapper0.Front()));
1242 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001243 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001244
1245 ASSERT_TRUE(mapper0.Front() != nullptr);
1246 output0.emplace_back(std::move(*mapper0.Front()));
1247 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001248 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001249
1250 ASSERT_TRUE(mapper0.Front() == nullptr);
1251
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001252 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1253 EXPECT_EQ(output0[0].monotonic_event_time.time,
1254 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001255 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001256
1257 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1258 EXPECT_EQ(output0[1].monotonic_event_time.time,
1259 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001260 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001261
1262 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1263 EXPECT_EQ(output0[2].monotonic_event_time.time,
1264 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001265 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001266 }
Austin Schuh79b30942021-01-24 22:32:21 -08001267
1268 EXPECT_EQ(mapper0_count, 3u);
1269 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001270}
1271
1272// Tests that we return just the timestamps if we couldn't find the data and the
1273// missing data was at the beginning of the file.
1274TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1275 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1276 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001277 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001278 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001279 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001280 writer1.QueueSpan(config2_.span());
1281
1282 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001283 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001284 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1285
Austin Schuhd863e6e2022-10-16 15:44:50 -07001286 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001287 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001288 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001289 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1290
Austin Schuhd863e6e2022-10-16 15:44:50 -07001291 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001292 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001293 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001294 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1295 }
1296
1297 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1298
1299 ASSERT_EQ(parts[0].logger_node, "pi1");
1300 ASSERT_EQ(parts[1].logger_node, "pi2");
1301
Austin Schuh79b30942021-01-24 22:32:21 -08001302 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001303 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001304 mapper0.set_timestamp_callback(
1305 [&](TimestampedMessage *) { ++mapper0_count; });
1306 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001307 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001308 mapper1.set_timestamp_callback(
1309 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001310
1311 mapper0.AddPeer(&mapper1);
1312 mapper1.AddPeer(&mapper0);
1313
1314 {
1315 SCOPED_TRACE("Trying node1 now");
1316 std::deque<TimestampedMessage> output1;
1317
1318 ASSERT_TRUE(mapper1.Front() != nullptr);
1319 output1.emplace_back(std::move(*mapper1.Front()));
1320 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001321 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001322
1323 ASSERT_TRUE(mapper1.Front() != nullptr);
1324 output1.emplace_back(std::move(*mapper1.Front()));
1325 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001326 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001327
1328 ASSERT_TRUE(mapper1.Front() != nullptr);
1329 output1.emplace_back(std::move(*mapper1.Front()));
1330 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001331 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001332
1333 ASSERT_TRUE(mapper1.Front() == nullptr);
1334
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001335 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1336 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001337 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001338 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001339
1340 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1341 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001342 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001343 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001344
1345 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1346 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001347 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001348 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001349 }
Austin Schuh79b30942021-01-24 22:32:21 -08001350
1351 EXPECT_EQ(mapper0_count, 0u);
1352 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001353}
1354
1355// Tests that we return just the timestamps if we couldn't find the data and the
1356// missing data was at the end of the file.
1357TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1358 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1359 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001360 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001361 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001362 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001363 writer1.QueueSpan(config2_.span());
1364
Austin Schuhd863e6e2022-10-16 15:44:50 -07001365 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001366 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001367 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001368 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1369
Austin Schuhd863e6e2022-10-16 15:44:50 -07001370 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001371 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001372 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001373 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1374
1375 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001376 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001377 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1378 }
1379
1380 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1381
1382 ASSERT_EQ(parts[0].logger_node, "pi1");
1383 ASSERT_EQ(parts[1].logger_node, "pi2");
1384
Austin Schuh79b30942021-01-24 22:32:21 -08001385 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001386 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001387 mapper0.set_timestamp_callback(
1388 [&](TimestampedMessage *) { ++mapper0_count; });
1389 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001390 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001391 mapper1.set_timestamp_callback(
1392 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001393
1394 mapper0.AddPeer(&mapper1);
1395 mapper1.AddPeer(&mapper0);
1396
1397 {
1398 SCOPED_TRACE("Trying node1 now");
1399 std::deque<TimestampedMessage> output1;
1400
1401 ASSERT_TRUE(mapper1.Front() != nullptr);
1402 output1.emplace_back(std::move(*mapper1.Front()));
1403 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001404 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001405
1406 ASSERT_TRUE(mapper1.Front() != nullptr);
1407 output1.emplace_back(std::move(*mapper1.Front()));
1408 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001409 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001410
1411 ASSERT_TRUE(mapper1.Front() != nullptr);
1412 output1.emplace_back(std::move(*mapper1.Front()));
1413 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001414 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001415
1416 ASSERT_TRUE(mapper1.Front() == nullptr);
1417
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001418 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1419 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001420 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001421 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001422
1423 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1424 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001425 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001426 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001427
1428 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1429 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001430 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001431 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001432 }
Austin Schuh79b30942021-01-24 22:32:21 -08001433
1434 EXPECT_EQ(mapper0_count, 0u);
1435 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001436}
1437
Austin Schuh993ccb52020-12-12 15:59:32 -08001438// Tests that we handle a message which failed to forward or be logged.
1439TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1440 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1441 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001442 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001443 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001444 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001445 writer1.QueueSpan(config2_.span());
1446
Austin Schuhd863e6e2022-10-16 15:44:50 -07001447 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001448 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001449 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001450 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1451
1452 // Create both the timestamp and message, but don't log them, simulating a
1453 // forwarding drop.
1454 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1455 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1456 chrono::seconds(100));
1457
Austin Schuhd863e6e2022-10-16 15:44:50 -07001458 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001459 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001460 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001461 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1462 }
1463
1464 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1465
1466 ASSERT_EQ(parts[0].logger_node, "pi1");
1467 ASSERT_EQ(parts[1].logger_node, "pi2");
1468
Austin Schuh79b30942021-01-24 22:32:21 -08001469 size_t mapper0_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001470 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001471 mapper0.set_timestamp_callback(
1472 [&](TimestampedMessage *) { ++mapper0_count; });
1473 size_t mapper1_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001474 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001475 mapper1.set_timestamp_callback(
1476 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001477
1478 mapper0.AddPeer(&mapper1);
1479 mapper1.AddPeer(&mapper0);
1480
1481 {
1482 std::deque<TimestampedMessage> output1;
1483
1484 ASSERT_TRUE(mapper1.Front() != nullptr);
1485 output1.emplace_back(std::move(*mapper1.Front()));
1486 mapper1.PopFront();
1487
1488 ASSERT_TRUE(mapper1.Front() != nullptr);
1489 output1.emplace_back(std::move(*mapper1.Front()));
1490
1491 ASSERT_FALSE(mapper1.Front() == nullptr);
1492
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001493 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1494 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001495 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001496 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001497
1498 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1499 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001500 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001501 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh993ccb52020-12-12 15:59:32 -08001502 }
Austin Schuh79b30942021-01-24 22:32:21 -08001503
1504 EXPECT_EQ(mapper0_count, 0u);
1505 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001506}
1507
Austin Schuhd2f96102020-12-01 20:27:29 -08001508// Tests that we properly sort log files with duplicate timestamps.
1509TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1510 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1511 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001512 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001513 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001514 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001515 writer1.QueueSpan(config2_.span());
1516
Austin Schuhd863e6e2022-10-16 15:44:50 -07001517 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001518 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001519 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001520 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1521
Austin Schuhd863e6e2022-10-16 15:44:50 -07001522 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001523 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001524 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001525 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1526
Austin Schuhd863e6e2022-10-16 15:44:50 -07001527 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001528 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001529 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001530 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1531
Austin Schuhd863e6e2022-10-16 15:44:50 -07001532 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001533 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001534 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001535 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1536 }
1537
1538 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1539
1540 ASSERT_EQ(parts[0].logger_node, "pi1");
1541 ASSERT_EQ(parts[1].logger_node, "pi2");
1542
Austin Schuh79b30942021-01-24 22:32:21 -08001543 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001544 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001545 mapper0.set_timestamp_callback(
1546 [&](TimestampedMessage *) { ++mapper0_count; });
1547 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001548 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001549 mapper1.set_timestamp_callback(
1550 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001551
1552 mapper0.AddPeer(&mapper1);
1553 mapper1.AddPeer(&mapper0);
1554
1555 {
1556 SCOPED_TRACE("Trying node1 now");
1557 std::deque<TimestampedMessage> output1;
1558
1559 for (int i = 0; i < 4; ++i) {
1560 ASSERT_TRUE(mapper1.Front() != nullptr);
1561 output1.emplace_back(std::move(*mapper1.Front()));
1562 mapper1.PopFront();
1563 }
1564 ASSERT_TRUE(mapper1.Front() == nullptr);
1565
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001566 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1567 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001568 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001569 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001570
1571 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1572 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001573 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001574 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001575
1576 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1577 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001578 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001579 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001580
1581 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1582 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001583 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001584 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001585 }
Austin Schuh79b30942021-01-24 22:32:21 -08001586
1587 EXPECT_EQ(mapper0_count, 0u);
1588 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001589}
1590
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001591// Tests that we properly produce a valid start time.
Austin Schuhd2f96102020-12-01 20:27:29 -08001592TEST_F(TimestampMapperTest, StartTime) {
1593 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1594 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001595 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001596 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001597 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001598 writer1.QueueSpan(config1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001599 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001600 writer2.QueueSpan(config3_.span());
1601 }
1602
1603 const std::vector<LogFile> parts =
1604 SortParts({logfile0_, logfile1_, logfile2_});
1605
Austin Schuh79b30942021-01-24 22:32:21 -08001606 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001607 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001608 mapper0.set_timestamp_callback(
1609 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001610
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001611 EXPECT_EQ(mapper0.monotonic_start_time(0), e + chrono::milliseconds(1));
1612 EXPECT_EQ(mapper0.realtime_start_time(0),
Austin Schuhd2f96102020-12-01 20:27:29 -08001613 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001614 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001615}
1616
Austin Schuhfecf1d82020-12-19 16:57:28 -08001617// Tests that when a peer isn't registered, we treat that as if there was no
1618// data available.
1619TEST_F(TimestampMapperTest, NoPeer) {
1620 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1621 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001622 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001623 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001624 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001625 writer1.QueueSpan(config2_.span());
1626
1627 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001628 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001629 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1630
Austin Schuhd863e6e2022-10-16 15:44:50 -07001631 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001632 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001633 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001634 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1635
Austin Schuhd863e6e2022-10-16 15:44:50 -07001636 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001637 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001638 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001639 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1640 }
1641
1642 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1643
1644 ASSERT_EQ(parts[0].logger_node, "pi1");
1645 ASSERT_EQ(parts[1].logger_node, "pi2");
1646
Austin Schuh79b30942021-01-24 22:32:21 -08001647 size_t mapper1_count = 0;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001648 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001649 mapper1.set_timestamp_callback(
1650 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001651
1652 {
1653 std::deque<TimestampedMessage> output1;
1654
1655 ASSERT_TRUE(mapper1.Front() != nullptr);
1656 output1.emplace_back(std::move(*mapper1.Front()));
1657 mapper1.PopFront();
1658 ASSERT_TRUE(mapper1.Front() != nullptr);
1659 output1.emplace_back(std::move(*mapper1.Front()));
1660 mapper1.PopFront();
1661 ASSERT_TRUE(mapper1.Front() != nullptr);
1662 output1.emplace_back(std::move(*mapper1.Front()));
1663 mapper1.PopFront();
1664 ASSERT_TRUE(mapper1.Front() == nullptr);
1665
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001666 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1667 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001668 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001669 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001670
1671 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1672 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001673 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001674 EXPECT_FALSE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001675
1676 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1677 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001678 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001679 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001680 }
Austin Schuh79b30942021-01-24 22:32:21 -08001681 EXPECT_EQ(mapper1_count, 3u);
1682}
1683
1684// Tests that we can queue messages and call the timestamp callback for both
1685// nodes.
1686TEST_F(TimestampMapperTest, QueueUntilNode0) {
1687 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1688 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001689 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh79b30942021-01-24 22:32:21 -08001690 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001691 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh79b30942021-01-24 22:32:21 -08001692 writer1.QueueSpan(config2_.span());
1693
Austin Schuhd863e6e2022-10-16 15:44:50 -07001694 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001695 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001696 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001697 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1698
Austin Schuhd863e6e2022-10-16 15:44:50 -07001699 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001700 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001701 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001702 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1703
Austin Schuhd863e6e2022-10-16 15:44:50 -07001704 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001705 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001706 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001707 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1708
Austin Schuhd863e6e2022-10-16 15:44:50 -07001709 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001710 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001711 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001712 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1713 }
1714
1715 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1716
1717 ASSERT_EQ(parts[0].logger_node, "pi1");
1718 ASSERT_EQ(parts[1].logger_node, "pi2");
1719
1720 size_t mapper0_count = 0;
1721 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1722 mapper0.set_timestamp_callback(
1723 [&](TimestampedMessage *) { ++mapper0_count; });
1724 size_t mapper1_count = 0;
1725 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1726 mapper1.set_timestamp_callback(
1727 [&](TimestampedMessage *) { ++mapper1_count; });
1728
1729 mapper0.AddPeer(&mapper1);
1730 mapper1.AddPeer(&mapper0);
1731
1732 {
1733 std::deque<TimestampedMessage> output0;
1734
1735 EXPECT_EQ(mapper0_count, 0u);
1736 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001737 mapper0.QueueUntil(
1738 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001739 EXPECT_EQ(mapper0_count, 3u);
1740 EXPECT_EQ(mapper1_count, 0u);
1741
1742 ASSERT_TRUE(mapper0.Front() != nullptr);
1743 EXPECT_EQ(mapper0_count, 3u);
1744 EXPECT_EQ(mapper1_count, 0u);
1745
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001746 mapper0.QueueUntil(
1747 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001748 EXPECT_EQ(mapper0_count, 3u);
1749 EXPECT_EQ(mapper1_count, 0u);
1750
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001751 mapper0.QueueUntil(
1752 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001753 EXPECT_EQ(mapper0_count, 4u);
1754 EXPECT_EQ(mapper1_count, 0u);
1755
1756 output0.emplace_back(std::move(*mapper0.Front()));
1757 mapper0.PopFront();
1758 output0.emplace_back(std::move(*mapper0.Front()));
1759 mapper0.PopFront();
1760 output0.emplace_back(std::move(*mapper0.Front()));
1761 mapper0.PopFront();
1762 output0.emplace_back(std::move(*mapper0.Front()));
1763 mapper0.PopFront();
1764
1765 EXPECT_EQ(mapper0_count, 4u);
1766 EXPECT_EQ(mapper1_count, 0u);
1767
1768 ASSERT_TRUE(mapper0.Front() == nullptr);
1769
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001770 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1771 EXPECT_EQ(output0[0].monotonic_event_time.time,
1772 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001773 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001774
1775 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1776 EXPECT_EQ(output0[1].monotonic_event_time.time,
1777 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001778 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001779
1780 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1781 EXPECT_EQ(output0[2].monotonic_event_time.time,
1782 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001783 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001784
1785 EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
1786 EXPECT_EQ(output0[3].monotonic_event_time.time,
1787 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001788 EXPECT_TRUE(output0[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001789 }
1790
1791 {
1792 SCOPED_TRACE("Trying node1 now");
1793 std::deque<TimestampedMessage> output1;
1794
1795 EXPECT_EQ(mapper0_count, 4u);
1796 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001797 mapper1.QueueUntil(BootTimestamp{
1798 .boot = 0,
1799 .time = e + chrono::seconds(100) + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001800 EXPECT_EQ(mapper0_count, 4u);
1801 EXPECT_EQ(mapper1_count, 3u);
1802
1803 ASSERT_TRUE(mapper1.Front() != nullptr);
1804 EXPECT_EQ(mapper0_count, 4u);
1805 EXPECT_EQ(mapper1_count, 3u);
1806
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001807 mapper1.QueueUntil(BootTimestamp{
1808 .boot = 0,
1809 .time = e + chrono::seconds(100) + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001810 EXPECT_EQ(mapper0_count, 4u);
1811 EXPECT_EQ(mapper1_count, 3u);
1812
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001813 mapper1.QueueUntil(BootTimestamp{
1814 .boot = 0,
1815 .time = e + chrono::seconds(100) + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001816 EXPECT_EQ(mapper0_count, 4u);
1817 EXPECT_EQ(mapper1_count, 4u);
1818
1819 ASSERT_TRUE(mapper1.Front() != nullptr);
1820 EXPECT_EQ(mapper0_count, 4u);
1821 EXPECT_EQ(mapper1_count, 4u);
1822
1823 output1.emplace_back(std::move(*mapper1.Front()));
1824 mapper1.PopFront();
1825 ASSERT_TRUE(mapper1.Front() != nullptr);
1826 output1.emplace_back(std::move(*mapper1.Front()));
1827 mapper1.PopFront();
1828 ASSERT_TRUE(mapper1.Front() != nullptr);
1829 output1.emplace_back(std::move(*mapper1.Front()));
1830 mapper1.PopFront();
1831 ASSERT_TRUE(mapper1.Front() != nullptr);
1832 output1.emplace_back(std::move(*mapper1.Front()));
1833 mapper1.PopFront();
1834
1835 EXPECT_EQ(mapper0_count, 4u);
1836 EXPECT_EQ(mapper1_count, 4u);
1837
1838 ASSERT_TRUE(mapper1.Front() == nullptr);
1839
1840 EXPECT_EQ(mapper0_count, 4u);
1841 EXPECT_EQ(mapper1_count, 4u);
1842
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001843 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1844 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001845 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001846 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001847
1848 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1849 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001850 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001851 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001852
1853 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1854 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001855 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001856 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001857
1858 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1859 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001860 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001861 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001862 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08001863}
1864
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001865class BootMergerTest : public SortingElementTest {
1866 public:
1867 BootMergerTest()
1868 : SortingElementTest(),
1869 boot0_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07001870 /* 100ms */
1871 "max_out_of_order_duration": 100000000,
1872 "node": {
1873 "name": "pi2"
1874 },
1875 "logger_node": {
1876 "name": "pi1"
1877 },
1878 "monotonic_start_time": 1000000,
1879 "realtime_start_time": 1000000000000,
1880 "logger_monotonic_start_time": 1000000,
1881 "logger_realtime_start_time": 1000000000000,
1882 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
1883 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
1884 "parts_index": 0,
1885 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
1886 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07001887 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
1888 "boot_uuids": [
1889 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
1890 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
1891 ""
1892 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001893})")),
1894 boot1_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07001895 /* 100ms */
1896 "max_out_of_order_duration": 100000000,
1897 "node": {
1898 "name": "pi2"
1899 },
1900 "logger_node": {
1901 "name": "pi1"
1902 },
1903 "monotonic_start_time": 1000000,
1904 "realtime_start_time": 1000000000000,
1905 "logger_monotonic_start_time": 1000000,
1906 "logger_realtime_start_time": 1000000000000,
1907 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
1908 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
1909 "parts_index": 1,
1910 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
1911 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07001912 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
1913 "boot_uuids": [
1914 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
1915 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
1916 ""
1917 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001918})")) {}
Austin Schuh8bee6882021-06-28 21:03:28 -07001919
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001920 protected:
1921 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
1922 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
1923};
1924
1925// This tests that we can properly sort a multi-node log file which has the old
1926// (and buggy) timestamps in the header, and the non-resetting parts_index.
1927// These make it so we can just bairly figure out what happened first and what
1928// happened second, but not in a way that is robust to multiple nodes rebooting.
1929TEST_F(BootMergerTest, OldReboot) {
Austin Schuh8bee6882021-06-28 21:03:28 -07001930 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001931 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001932 writer.QueueSpan(boot0_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07001933 }
1934 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001935 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001936 writer.QueueSpan(boot1_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07001937 }
1938
1939 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1940
1941 ASSERT_EQ(parts.size(), 1u);
1942 ASSERT_EQ(parts[0].parts.size(), 2u);
1943
1944 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
1945 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001946 boot0_.message().source_node_boot_uuid()->string_view());
Austin Schuh8bee6882021-06-28 21:03:28 -07001947
1948 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
1949 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001950 boot1_.message().source_node_boot_uuid()->string_view());
1951}
1952
1953// This tests that we can produce messages ordered across a reboot.
1954TEST_F(BootMergerTest, SortAcrossReboot) {
1955 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1956 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001957 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001958 writer.QueueSpan(boot0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001959 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001960 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001961 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001962 MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
1963 }
1964 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001965 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001966 writer.QueueSpan(boot1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001967 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001968 MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001969 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001970 MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
1971 }
1972
1973 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1974 ASSERT_EQ(parts.size(), 1u);
1975 ASSERT_EQ(parts[0].parts.size(), 2u);
1976
1977 BootMerger merger(FilterPartsForNode(parts, "pi2"));
1978
1979 EXPECT_EQ(merger.node(), 1u);
1980
1981 std::vector<Message> output;
1982 for (int i = 0; i < 4; ++i) {
1983 ASSERT_TRUE(merger.Front() != nullptr);
1984 output.emplace_back(std::move(*merger.Front()));
1985 merger.PopFront();
1986 }
1987
1988 ASSERT_TRUE(merger.Front() == nullptr);
1989
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001990 EXPECT_EQ(output[0].timestamp.boot, 0u);
1991 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
1992 EXPECT_EQ(output[1].timestamp.boot, 0u);
1993 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(2000));
1994
1995 EXPECT_EQ(output[2].timestamp.boot, 1u);
1996 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(100));
1997 EXPECT_EQ(output[3].timestamp.boot, 1u);
1998 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
Austin Schuh8bee6882021-06-28 21:03:28 -07001999}
2000
Austin Schuh48507722021-07-17 17:29:24 -07002001class RebootTimestampMapperTest : public SortingElementTest {
2002 public:
2003 RebootTimestampMapperTest()
2004 : SortingElementTest(),
2005 boot0a_(MakeHeader(config_, R"({
2006 /* 100ms */
2007 "max_out_of_order_duration": 100000000,
2008 "node": {
2009 "name": "pi1"
2010 },
2011 "logger_node": {
2012 "name": "pi1"
2013 },
2014 "monotonic_start_time": 1000000,
2015 "realtime_start_time": 1000000000000,
2016 "logger_monotonic_start_time": 1000000,
2017 "logger_realtime_start_time": 1000000000000,
2018 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2019 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2020 "parts_index": 0,
2021 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2022 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2023 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2024 "boot_uuids": [
2025 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2026 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2027 ""
2028 ]
2029})")),
2030 boot0b_(MakeHeader(config_, R"({
2031 /* 100ms */
2032 "max_out_of_order_duration": 100000000,
2033 "node": {
2034 "name": "pi1"
2035 },
2036 "logger_node": {
2037 "name": "pi1"
2038 },
2039 "monotonic_start_time": 1000000,
2040 "realtime_start_time": 1000000000000,
2041 "logger_monotonic_start_time": 1000000,
2042 "logger_realtime_start_time": 1000000000000,
2043 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2044 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2045 "parts_index": 1,
2046 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2047 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2048 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2049 "boot_uuids": [
2050 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2051 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2052 ""
2053 ]
2054})")),
2055 boot1a_(MakeHeader(config_, R"({
2056 /* 100ms */
2057 "max_out_of_order_duration": 100000000,
2058 "node": {
2059 "name": "pi2"
2060 },
2061 "logger_node": {
2062 "name": "pi1"
2063 },
2064 "monotonic_start_time": 1000000,
2065 "realtime_start_time": 1000000000000,
2066 "logger_monotonic_start_time": 1000000,
2067 "logger_realtime_start_time": 1000000000000,
2068 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2069 "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
2070 "parts_index": 0,
2071 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2072 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2073 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2074 "boot_uuids": [
2075 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2076 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2077 ""
2078 ]
2079})")),
2080 boot1b_(MakeHeader(config_, R"({
2081 /* 100ms */
2082 "max_out_of_order_duration": 100000000,
2083 "node": {
2084 "name": "pi2"
2085 },
2086 "logger_node": {
2087 "name": "pi1"
2088 },
2089 "monotonic_start_time": 1000000,
2090 "realtime_start_time": 1000000000000,
2091 "logger_monotonic_start_time": 1000000,
2092 "logger_realtime_start_time": 1000000000000,
2093 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2094 "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
2095 "parts_index": 1,
2096 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2097 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2098 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2099 "boot_uuids": [
2100 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2101 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2102 ""
2103 ]
2104})")) {}
2105
2106 protected:
2107 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
2108 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
2109 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
2110 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
2111};
2112
Austin Schuh48507722021-07-17 17:29:24 -07002113// Tests that we can match timestamps on delivered messages in the presence of
2114// reboots on the node receiving timestamps.
2115TEST_F(RebootTimestampMapperTest, ReadNode0First) {
2116 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2117 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002118 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002119 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002120 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002121 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002122 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002123 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002124 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002125 writer1b.QueueSpan(boot1b_.span());
2126
Austin Schuhd863e6e2022-10-16 15:44:50 -07002127 writer0a.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002128 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002129 writer1a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002130 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
2131 e + chrono::milliseconds(1001)));
2132
Austin Schuhd863e6e2022-10-16 15:44:50 -07002133 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh58646e22021-08-23 23:51:46 -07002134 e + chrono::milliseconds(1000), 0, chrono::seconds(21),
2135 e + chrono::milliseconds(2001)));
2136
Austin Schuhd863e6e2022-10-16 15:44:50 -07002137 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002138 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002139 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002140 e + chrono::milliseconds(2000), 0, chrono::seconds(20),
2141 e + chrono::milliseconds(2001)));
2142
Austin Schuhd863e6e2022-10-16 15:44:50 -07002143 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002144 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002145 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002146 e + chrono::milliseconds(3000), 0, chrono::seconds(20),
2147 e + chrono::milliseconds(3001)));
2148 }
2149
Austin Schuh58646e22021-08-23 23:51:46 -07002150 const std::vector<LogFile> parts =
2151 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Austin Schuh48507722021-07-17 17:29:24 -07002152
2153 for (const auto &x : parts) {
2154 LOG(INFO) << x;
2155 }
2156 ASSERT_EQ(parts.size(), 1u);
2157 ASSERT_EQ(parts[0].logger_node, "pi1");
2158
2159 size_t mapper0_count = 0;
2160 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2161 mapper0.set_timestamp_callback(
2162 [&](TimestampedMessage *) { ++mapper0_count; });
2163 size_t mapper1_count = 0;
2164 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2165 mapper1.set_timestamp_callback(
2166 [&](TimestampedMessage *) { ++mapper1_count; });
2167
2168 mapper0.AddPeer(&mapper1);
2169 mapper1.AddPeer(&mapper0);
2170
2171 {
2172 std::deque<TimestampedMessage> output0;
2173
2174 EXPECT_EQ(mapper0_count, 0u);
2175 EXPECT_EQ(mapper1_count, 0u);
2176 ASSERT_TRUE(mapper0.Front() != nullptr);
2177 EXPECT_EQ(mapper0_count, 1u);
2178 EXPECT_EQ(mapper1_count, 0u);
2179 output0.emplace_back(std::move(*mapper0.Front()));
2180 mapper0.PopFront();
2181 EXPECT_TRUE(mapper0.started());
2182 EXPECT_EQ(mapper0_count, 1u);
2183 EXPECT_EQ(mapper1_count, 0u);
2184
2185 ASSERT_TRUE(mapper0.Front() != nullptr);
2186 EXPECT_EQ(mapper0_count, 2u);
2187 EXPECT_EQ(mapper1_count, 0u);
2188 output0.emplace_back(std::move(*mapper0.Front()));
2189 mapper0.PopFront();
2190 EXPECT_TRUE(mapper0.started());
2191
2192 ASSERT_TRUE(mapper0.Front() != nullptr);
2193 output0.emplace_back(std::move(*mapper0.Front()));
2194 mapper0.PopFront();
2195 EXPECT_TRUE(mapper0.started());
2196
2197 EXPECT_EQ(mapper0_count, 3u);
2198 EXPECT_EQ(mapper1_count, 0u);
2199
2200 ASSERT_TRUE(mapper0.Front() == nullptr);
2201
2202 LOG(INFO) << output0[0];
2203 LOG(INFO) << output0[1];
2204 LOG(INFO) << output0[2];
2205
2206 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2207 EXPECT_EQ(output0[0].monotonic_event_time.time,
2208 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002209 EXPECT_EQ(output0[0].queue_index,
2210 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002211 EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
2212 EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002213 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002214
2215 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2216 EXPECT_EQ(output0[1].monotonic_event_time.time,
2217 e + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002218 EXPECT_EQ(output0[1].queue_index,
2219 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002220 EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
2221 EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002222 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002223
2224 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2225 EXPECT_EQ(output0[2].monotonic_event_time.time,
2226 e + chrono::milliseconds(3000));
Austin Schuh58646e22021-08-23 23:51:46 -07002227 EXPECT_EQ(output0[2].queue_index,
2228 (BootQueueIndex{.boot = 0u, .index = 2u}));
Austin Schuh48507722021-07-17 17:29:24 -07002229 EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
2230 EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002231 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002232 }
2233
2234 {
2235 SCOPED_TRACE("Trying node1 now");
2236 std::deque<TimestampedMessage> output1;
2237
2238 EXPECT_EQ(mapper0_count, 3u);
2239 EXPECT_EQ(mapper1_count, 0u);
2240
2241 ASSERT_TRUE(mapper1.Front() != nullptr);
2242 EXPECT_EQ(mapper0_count, 3u);
2243 EXPECT_EQ(mapper1_count, 1u);
2244 output1.emplace_back(std::move(*mapper1.Front()));
2245 mapper1.PopFront();
2246 EXPECT_TRUE(mapper1.started());
2247 EXPECT_EQ(mapper0_count, 3u);
2248 EXPECT_EQ(mapper1_count, 1u);
2249
2250 ASSERT_TRUE(mapper1.Front() != nullptr);
2251 EXPECT_EQ(mapper0_count, 3u);
2252 EXPECT_EQ(mapper1_count, 2u);
2253 output1.emplace_back(std::move(*mapper1.Front()));
2254 mapper1.PopFront();
2255 EXPECT_TRUE(mapper1.started());
2256
2257 ASSERT_TRUE(mapper1.Front() != nullptr);
2258 output1.emplace_back(std::move(*mapper1.Front()));
2259 mapper1.PopFront();
2260 EXPECT_TRUE(mapper1.started());
2261
Austin Schuh58646e22021-08-23 23:51:46 -07002262 ASSERT_TRUE(mapper1.Front() != nullptr);
2263 output1.emplace_back(std::move(*mapper1.Front()));
2264 mapper1.PopFront();
2265 EXPECT_TRUE(mapper1.started());
2266
Austin Schuh48507722021-07-17 17:29:24 -07002267 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002268 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002269
2270 ASSERT_TRUE(mapper1.Front() == nullptr);
2271
2272 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002273 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002274
2275 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2276 EXPECT_EQ(output1[0].monotonic_event_time.time,
2277 e + chrono::seconds(100) + chrono::milliseconds(1000));
2278 EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
2279 EXPECT_EQ(output1[0].monotonic_remote_time.time,
2280 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002281 EXPECT_EQ(output1[0].remote_queue_index,
2282 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002283 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
2284 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
2285 e + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002286 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002287
2288 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2289 EXPECT_EQ(output1[1].monotonic_event_time.time,
2290 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002291 EXPECT_EQ(output1[1].remote_queue_index,
2292 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002293 EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
2294 EXPECT_EQ(output1[1].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002295 e + chrono::milliseconds(1000));
Austin Schuh48507722021-07-17 17:29:24 -07002296 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
2297 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
2298 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002299 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002300
2301 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2302 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002303 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh48507722021-07-17 17:29:24 -07002304 EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
2305 EXPECT_EQ(output1[2].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002306 e + chrono::milliseconds(2000));
2307 EXPECT_EQ(output1[2].remote_queue_index,
2308 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002309 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
2310 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002311 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002312 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002313
Austin Schuh58646e22021-08-23 23:51:46 -07002314 EXPECT_EQ(output1[3].monotonic_event_time.boot, 1u);
2315 EXPECT_EQ(output1[3].monotonic_event_time.time,
2316 e + chrono::seconds(20) + chrono::milliseconds(3000));
2317 EXPECT_EQ(output1[3].monotonic_remote_time.boot, 0u);
2318 EXPECT_EQ(output1[3].monotonic_remote_time.time,
2319 e + chrono::milliseconds(3000));
2320 EXPECT_EQ(output1[3].remote_queue_index,
2321 (BootQueueIndex{.boot = 0u, .index = 2u}));
2322 EXPECT_EQ(output1[3].monotonic_timestamp_time.boot, 0u);
2323 EXPECT_EQ(output1[3].monotonic_timestamp_time.time,
2324 e + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002325 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh58646e22021-08-23 23:51:46 -07002326
Austin Schuh48507722021-07-17 17:29:24 -07002327 LOG(INFO) << output1[0];
2328 LOG(INFO) << output1[1];
2329 LOG(INFO) << output1[2];
Austin Schuh58646e22021-08-23 23:51:46 -07002330 LOG(INFO) << output1[3];
Austin Schuh48507722021-07-17 17:29:24 -07002331 }
2332}
2333
2334TEST_F(RebootTimestampMapperTest, Node2Reboot) {
2335 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2336 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002337 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002338 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002339 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002340 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002341 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002342 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002343 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002344 writer1b.QueueSpan(boot1b_.span());
2345
Austin Schuhd863e6e2022-10-16 15:44:50 -07002346 writer1a.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002347 e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002348 writer0a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002349 e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
2350 chrono::seconds(-100),
2351 e + chrono::seconds(100) + chrono::milliseconds(1001)));
2352
Austin Schuhd863e6e2022-10-16 15:44:50 -07002353 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002354 e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002355 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002356 e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
2357 chrono::seconds(-20),
2358 e + chrono::seconds(20) + chrono::milliseconds(2001)));
2359
Austin Schuhd863e6e2022-10-16 15:44:50 -07002360 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002361 e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002362 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002363 e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
2364 chrono::seconds(-20),
2365 e + chrono::seconds(20) + chrono::milliseconds(3001)));
2366 }
2367
2368 const std::vector<LogFile> parts =
2369 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2370
2371 for (const auto &x : parts) {
2372 LOG(INFO) << x;
2373 }
2374 ASSERT_EQ(parts.size(), 1u);
2375 ASSERT_EQ(parts[0].logger_node, "pi1");
2376
2377 size_t mapper0_count = 0;
2378 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2379 mapper0.set_timestamp_callback(
2380 [&](TimestampedMessage *) { ++mapper0_count; });
2381 size_t mapper1_count = 0;
2382 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2383 mapper1.set_timestamp_callback(
2384 [&](TimestampedMessage *) { ++mapper1_count; });
2385
2386 mapper0.AddPeer(&mapper1);
2387 mapper1.AddPeer(&mapper0);
2388
2389 {
2390 std::deque<TimestampedMessage> output0;
2391
2392 EXPECT_EQ(mapper0_count, 0u);
2393 EXPECT_EQ(mapper1_count, 0u);
2394 ASSERT_TRUE(mapper0.Front() != nullptr);
2395 EXPECT_EQ(mapper0_count, 1u);
2396 EXPECT_EQ(mapper1_count, 0u);
2397 output0.emplace_back(std::move(*mapper0.Front()));
2398 mapper0.PopFront();
2399 EXPECT_TRUE(mapper0.started());
2400 EXPECT_EQ(mapper0_count, 1u);
2401 EXPECT_EQ(mapper1_count, 0u);
2402
2403 ASSERT_TRUE(mapper0.Front() != nullptr);
2404 EXPECT_EQ(mapper0_count, 2u);
2405 EXPECT_EQ(mapper1_count, 0u);
2406 output0.emplace_back(std::move(*mapper0.Front()));
2407 mapper0.PopFront();
2408 EXPECT_TRUE(mapper0.started());
2409
2410 ASSERT_TRUE(mapper0.Front() != nullptr);
2411 output0.emplace_back(std::move(*mapper0.Front()));
2412 mapper0.PopFront();
2413 EXPECT_TRUE(mapper0.started());
2414
2415 EXPECT_EQ(mapper0_count, 3u);
2416 EXPECT_EQ(mapper1_count, 0u);
2417
2418 ASSERT_TRUE(mapper0.Front() == nullptr);
2419
2420 LOG(INFO) << output0[0];
2421 LOG(INFO) << output0[1];
2422 LOG(INFO) << output0[2];
2423
2424 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2425 EXPECT_EQ(output0[0].monotonic_event_time.time,
2426 e + chrono::milliseconds(1000));
2427 EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
2428 EXPECT_EQ(output0[0].monotonic_remote_time.time,
2429 e + chrono::seconds(100) + chrono::milliseconds(1000));
2430 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
2431 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
2432 e + chrono::seconds(100) + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002433 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002434
2435 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2436 EXPECT_EQ(output0[1].monotonic_event_time.time,
2437 e + chrono::milliseconds(2000));
2438 EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
2439 EXPECT_EQ(output0[1].monotonic_remote_time.time,
2440 e + chrono::seconds(20) + chrono::milliseconds(2000));
2441 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
2442 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
2443 e + chrono::seconds(20) + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002444 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002445
2446 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2447 EXPECT_EQ(output0[2].monotonic_event_time.time,
2448 e + chrono::milliseconds(3000));
2449 EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
2450 EXPECT_EQ(output0[2].monotonic_remote_time.time,
2451 e + chrono::seconds(20) + chrono::milliseconds(3000));
2452 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
2453 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
2454 e + chrono::seconds(20) + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002455 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002456 }
2457
2458 {
2459 SCOPED_TRACE("Trying node1 now");
2460 std::deque<TimestampedMessage> output1;
2461
2462 EXPECT_EQ(mapper0_count, 3u);
2463 EXPECT_EQ(mapper1_count, 0u);
2464
2465 ASSERT_TRUE(mapper1.Front() != nullptr);
2466 EXPECT_EQ(mapper0_count, 3u);
2467 EXPECT_EQ(mapper1_count, 1u);
2468 output1.emplace_back(std::move(*mapper1.Front()));
2469 mapper1.PopFront();
2470 EXPECT_TRUE(mapper1.started());
2471 EXPECT_EQ(mapper0_count, 3u);
2472 EXPECT_EQ(mapper1_count, 1u);
2473
2474 ASSERT_TRUE(mapper1.Front() != nullptr);
2475 EXPECT_EQ(mapper0_count, 3u);
2476 EXPECT_EQ(mapper1_count, 2u);
2477 output1.emplace_back(std::move(*mapper1.Front()));
2478 mapper1.PopFront();
2479 EXPECT_TRUE(mapper1.started());
2480
2481 ASSERT_TRUE(mapper1.Front() != nullptr);
2482 output1.emplace_back(std::move(*mapper1.Front()));
2483 mapper1.PopFront();
2484 EXPECT_TRUE(mapper1.started());
2485
2486 EXPECT_EQ(mapper0_count, 3u);
2487 EXPECT_EQ(mapper1_count, 3u);
2488
2489 ASSERT_TRUE(mapper1.Front() == nullptr);
2490
2491 EXPECT_EQ(mapper0_count, 3u);
2492 EXPECT_EQ(mapper1_count, 3u);
2493
2494 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2495 EXPECT_EQ(output1[0].monotonic_event_time.time,
2496 e + chrono::seconds(100) + chrono::milliseconds(1000));
2497 EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
2498 EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002499 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002500
2501 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2502 EXPECT_EQ(output1[1].monotonic_event_time.time,
2503 e + chrono::seconds(20) + chrono::milliseconds(2000));
2504 EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
2505 EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002506 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002507
2508 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2509 EXPECT_EQ(output1[2].monotonic_event_time.time,
2510 e + chrono::seconds(20) + chrono::milliseconds(3000));
2511 EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
2512 EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002513 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002514
2515 LOG(INFO) << output1[0];
2516 LOG(INFO) << output1[1];
2517 LOG(INFO) << output1[2];
2518 }
2519}
2520
Austin Schuh44c61472021-11-22 21:04:10 -08002521class SortingDeathTest : public SortingElementTest {
2522 public:
2523 SortingDeathTest()
2524 : SortingElementTest(),
2525 part0_(MakeHeader(config_, R"({
2526 /* 100ms */
2527 "max_out_of_order_duration": 100000000,
2528 "node": {
2529 "name": "pi1"
2530 },
2531 "logger_node": {
2532 "name": "pi1"
2533 },
2534 "monotonic_start_time": 1000000,
2535 "realtime_start_time": 1000000000000,
2536 "logger_monotonic_start_time": 1000000,
2537 "logger_realtime_start_time": 1000000000000,
2538 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2539 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2540 "parts_index": 0,
2541 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2542 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2543 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2544 "boot_uuids": [
2545 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2546 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2547 ""
2548 ],
2549 "oldest_remote_monotonic_timestamps": [
2550 9223372036854775807,
2551 9223372036854775807,
2552 9223372036854775807
2553 ],
2554 "oldest_local_monotonic_timestamps": [
2555 9223372036854775807,
2556 9223372036854775807,
2557 9223372036854775807
2558 ],
2559 "oldest_remote_unreliable_monotonic_timestamps": [
2560 9223372036854775807,
2561 0,
2562 9223372036854775807
2563 ],
2564 "oldest_local_unreliable_monotonic_timestamps": [
2565 9223372036854775807,
2566 0,
2567 9223372036854775807
2568 ]
2569})")),
2570 part1_(MakeHeader(config_, R"({
2571 /* 100ms */
2572 "max_out_of_order_duration": 100000000,
2573 "node": {
2574 "name": "pi1"
2575 },
2576 "logger_node": {
2577 "name": "pi1"
2578 },
2579 "monotonic_start_time": 1000000,
2580 "realtime_start_time": 1000000000000,
2581 "logger_monotonic_start_time": 1000000,
2582 "logger_realtime_start_time": 1000000000000,
2583 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2584 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2585 "parts_index": 1,
2586 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2587 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2588 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2589 "boot_uuids": [
2590 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2591 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2592 ""
2593 ],
2594 "oldest_remote_monotonic_timestamps": [
2595 9223372036854775807,
2596 9223372036854775807,
2597 9223372036854775807
2598 ],
2599 "oldest_local_monotonic_timestamps": [
2600 9223372036854775807,
2601 9223372036854775807,
2602 9223372036854775807
2603 ],
2604 "oldest_remote_unreliable_monotonic_timestamps": [
2605 9223372036854775807,
2606 100000,
2607 9223372036854775807
2608 ],
2609 "oldest_local_unreliable_monotonic_timestamps": [
2610 9223372036854775807,
2611 100000,
2612 9223372036854775807
2613 ]
2614})")),
2615 part2_(MakeHeader(config_, R"({
2616 /* 100ms */
2617 "max_out_of_order_duration": 100000000,
2618 "node": {
2619 "name": "pi1"
2620 },
2621 "logger_node": {
2622 "name": "pi1"
2623 },
2624 "monotonic_start_time": 1000000,
2625 "realtime_start_time": 1000000000000,
2626 "logger_monotonic_start_time": 1000000,
2627 "logger_realtime_start_time": 1000000000000,
2628 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2629 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2630 "parts_index": 2,
2631 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2632 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2633 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2634 "boot_uuids": [
2635 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2636 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2637 ""
2638 ],
2639 "oldest_remote_monotonic_timestamps": [
2640 9223372036854775807,
2641 9223372036854775807,
2642 9223372036854775807
2643 ],
2644 "oldest_local_monotonic_timestamps": [
2645 9223372036854775807,
2646 9223372036854775807,
2647 9223372036854775807
2648 ],
2649 "oldest_remote_unreliable_monotonic_timestamps": [
2650 9223372036854775807,
2651 200000,
2652 9223372036854775807
2653 ],
2654 "oldest_local_unreliable_monotonic_timestamps": [
2655 9223372036854775807,
2656 200000,
2657 9223372036854775807
2658 ]
2659})")),
2660 part3_(MakeHeader(config_, R"({
2661 /* 100ms */
2662 "max_out_of_order_duration": 100000000,
2663 "node": {
2664 "name": "pi1"
2665 },
2666 "logger_node": {
2667 "name": "pi1"
2668 },
2669 "monotonic_start_time": 1000000,
2670 "realtime_start_time": 1000000000000,
2671 "logger_monotonic_start_time": 1000000,
2672 "logger_realtime_start_time": 1000000000000,
2673 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2674 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2675 "parts_index": 3,
2676 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2677 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2678 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2679 "boot_uuids": [
2680 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2681 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2682 ""
2683 ],
2684 "oldest_remote_monotonic_timestamps": [
2685 9223372036854775807,
2686 9223372036854775807,
2687 9223372036854775807
2688 ],
2689 "oldest_local_monotonic_timestamps": [
2690 9223372036854775807,
2691 9223372036854775807,
2692 9223372036854775807
2693 ],
2694 "oldest_remote_unreliable_monotonic_timestamps": [
2695 9223372036854775807,
2696 300000,
2697 9223372036854775807
2698 ],
2699 "oldest_local_unreliable_monotonic_timestamps": [
2700 9223372036854775807,
2701 300000,
2702 9223372036854775807
2703 ]
2704})")) {}
2705
2706 protected:
2707 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part0_;
2708 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part1_;
2709 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part2_;
2710 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part3_;
2711};
2712
2713// Tests that if 2 computers go back and forth trying to be the same node, we
2714// die in sorting instead of failing to estimate time.
2715TEST_F(SortingDeathTest, FightingNodes) {
2716 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002717 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh44c61472021-11-22 21:04:10 -08002718 writer0.QueueSpan(part0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002719 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh44c61472021-11-22 21:04:10 -08002720 writer1.QueueSpan(part1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002721 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuh44c61472021-11-22 21:04:10 -08002722 writer2.QueueSpan(part2_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002723 TestDetachedBufferWriter writer3(logfile3_);
Austin Schuh44c61472021-11-22 21:04:10 -08002724 writer3.QueueSpan(part3_.span());
2725 }
2726
2727 EXPECT_DEATH(
2728 {
2729 const std::vector<LogFile> parts =
2730 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2731 },
Austin Schuh22cf7862022-09-19 19:09:42 -07002732 "found overlapping boots on");
Austin Schuh44c61472021-11-22 21:04:10 -08002733}
2734
Brian Smarttea913d42021-12-10 15:02:38 -08002735// Tests that we MessageReader blows up on a bad message.
2736TEST(MessageReaderConfirmCrash, ReadWrite) {
2737 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
2738 unlink(logfile.c_str());
2739
2740 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
2741 JsonToSizedFlatbuffer<LogFileHeader>(
2742 R"({ "max_out_of_order_duration": 100000000 })");
2743 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
2744 JsonToSizedFlatbuffer<MessageHeader>(
2745 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
2746 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
2747 JsonToSizedFlatbuffer<MessageHeader>(
2748 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
2749 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m4 =
2750 JsonToSizedFlatbuffer<MessageHeader>(
2751 R"({ "channel_index": 0, "monotonic_sent_time": 4 })");
2752
2753 // Starts out like a proper flat buffer header, but it breaks down ...
2754 std::vector<uint8_t> garbage{8, 0, 0, 0, 16, 0, 0, 0, 4, 0, 0, 0};
2755 absl::Span<uint8_t> m3_span(garbage);
2756
2757 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002758 TestDetachedBufferWriter writer(logfile);
Brian Smarttea913d42021-12-10 15:02:38 -08002759 writer.QueueSpan(config.span());
2760 writer.QueueSpan(m1.span());
2761 writer.QueueSpan(m2.span());
2762 writer.QueueSpan(m3_span);
2763 writer.QueueSpan(m4.span()); // This message is "hidden"
2764 }
2765
2766 {
2767 MessageReader reader(logfile);
2768
2769 EXPECT_EQ(reader.filename(), logfile);
2770
2771 EXPECT_EQ(
2772 reader.max_out_of_order_duration(),
2773 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2774 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2775 EXPECT_TRUE(reader.ReadMessage());
2776 EXPECT_EQ(reader.newest_timestamp(),
2777 monotonic_clock::time_point(chrono::nanoseconds(1)));
2778 EXPECT_TRUE(reader.ReadMessage());
2779 EXPECT_EQ(reader.newest_timestamp(),
2780 monotonic_clock::time_point(chrono::nanoseconds(2)));
2781 // Confirm default crashing behavior
2782 EXPECT_DEATH(reader.ReadMessage(), "Corrupted message at offset");
2783 }
2784
2785 {
2786 gflags::FlagSaver fs;
2787
2788 MessageReader reader(logfile);
2789 reader.set_crash_on_corrupt_message_flag(false);
2790
2791 EXPECT_EQ(reader.filename(), logfile);
2792
2793 EXPECT_EQ(
2794 reader.max_out_of_order_duration(),
2795 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2796 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2797 EXPECT_TRUE(reader.ReadMessage());
2798 EXPECT_EQ(reader.newest_timestamp(),
2799 monotonic_clock::time_point(chrono::nanoseconds(1)));
2800 EXPECT_TRUE(reader.ReadMessage());
2801 EXPECT_EQ(reader.newest_timestamp(),
2802 monotonic_clock::time_point(chrono::nanoseconds(2)));
2803 // Confirm avoiding the corrupted message crash, stopping instead.
2804 EXPECT_FALSE(reader.ReadMessage());
2805 }
2806
2807 {
2808 gflags::FlagSaver fs;
2809
2810 MessageReader reader(logfile);
2811 reader.set_crash_on_corrupt_message_flag(false);
2812 reader.set_ignore_corrupt_messages_flag(true);
2813
2814 EXPECT_EQ(reader.filename(), logfile);
2815
2816 EXPECT_EQ(
2817 reader.max_out_of_order_duration(),
2818 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2819 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2820 EXPECT_TRUE(reader.ReadMessage());
2821 EXPECT_EQ(reader.newest_timestamp(),
2822 monotonic_clock::time_point(chrono::nanoseconds(1)));
2823 EXPECT_TRUE(reader.ReadMessage());
2824 EXPECT_EQ(reader.newest_timestamp(),
2825 monotonic_clock::time_point(chrono::nanoseconds(2)));
2826 // Confirm skipping of the corrupted message to read the hidden one.
2827 EXPECT_TRUE(reader.ReadMessage());
2828 EXPECT_EQ(reader.newest_timestamp(),
2829 monotonic_clock::time_point(chrono::nanoseconds(4)));
2830 EXPECT_FALSE(reader.ReadMessage());
2831 }
2832}
2833
Austin Schuhc243b422020-10-11 15:35:08 -07002834} // namespace testing
2835} // namespace logger
2836} // namespace aos