blob: 4973d028b49f45157668acb9208e6bf457e93e65 [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
4#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07005
Austin Schuhc41603c2020-10-11 16:17:37 -07006#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -07007#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -08008#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -07009#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070010#include "aos/json_to_flatbuffer.h"
11#include "aos/testing/tmpdir.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070012#include "gtest/gtest.h"
Austin Schuhc243b422020-10-11 15:35:08 -070013
14namespace aos {
15namespace logger {
16namespace testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070017namespace chrono = std::chrono;
Austin Schuhc243b422020-10-11 15:35:08 -070018
Austin Schuhe243aaf2020-10-11 15:46:02 -070019// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070020template <typename T>
21SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
22 const std::string_view data) {
23 flatbuffers::FlatBufferBuilder fbb;
24 fbb.ForceDefaults(true);
25 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
26 return fbb.Release();
27}
28
Austin Schuhe243aaf2020-10-11 15:46:02 -070029// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070030TEST(SpanReaderTest, ReadWrite) {
31 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
32 unlink(logfile.c_str());
33
34 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080035 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070036 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080037 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070038
39 {
40 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080041 writer.QueueSpan(m1.span());
42 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070043 }
44
45 SpanReader reader(logfile);
46
47 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhcf5f6442021-07-06 10:43:28 -070048 EXPECT_EQ(reader.PeekMessage(), m1.span());
49 EXPECT_EQ(reader.PeekMessage(), m1.span());
Austin Schuhadd6eb32020-11-09 21:24:26 -080050 EXPECT_EQ(reader.ReadMessage(), m1.span());
51 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhcf5f6442021-07-06 10:43:28 -070052 EXPECT_EQ(reader.PeekMessage(), absl::Span<const uint8_t>());
Austin Schuhc243b422020-10-11 15:35:08 -070053 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
54}
55
Austin Schuhe243aaf2020-10-11 15:46:02 -070056// Tests that we can actually parse the resulting messages at a basic level
57// through MessageReader.
58TEST(MessageReaderTest, ReadWrite) {
59 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
60 unlink(logfile.c_str());
61
62 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
63 JsonToSizedFlatbuffer<LogFileHeader>(
64 R"({ "max_out_of_order_duration": 100000000 })");
65 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
66 JsonToSizedFlatbuffer<MessageHeader>(
67 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
68 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
69 JsonToSizedFlatbuffer<MessageHeader>(
70 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
71
72 {
73 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080074 writer.QueueSpan(config.span());
75 writer.QueueSpan(m1.span());
76 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -070077 }
78
79 MessageReader reader(logfile);
80
81 EXPECT_EQ(reader.filename(), logfile);
82
83 EXPECT_EQ(
84 reader.max_out_of_order_duration(),
85 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
86 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
87 EXPECT_TRUE(reader.ReadMessage());
88 EXPECT_EQ(reader.newest_timestamp(),
89 monotonic_clock::time_point(chrono::nanoseconds(1)));
90 EXPECT_TRUE(reader.ReadMessage());
91 EXPECT_EQ(reader.newest_timestamp(),
92 monotonic_clock::time_point(chrono::nanoseconds(2)));
93 EXPECT_FALSE(reader.ReadMessage());
94}
95
Austin Schuh32f68492020-11-08 21:45:51 -080096// Tests that we explode when messages are too far out of order.
97TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
98 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
99 unlink(logfile0.c_str());
100
101 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
102 JsonToSizedFlatbuffer<LogFileHeader>(
103 R"({
104 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800105 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800106 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
107 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
108 "parts_index": 0
109})");
110
111 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
112 JsonToSizedFlatbuffer<MessageHeader>(
113 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
114 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
115 JsonToSizedFlatbuffer<MessageHeader>(
116 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
117 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
118 JsonToSizedFlatbuffer<MessageHeader>(
119 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
120
121 {
122 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800123 writer.QueueSpan(config0.span());
124 writer.QueueSpan(m1.span());
125 writer.QueueSpan(m2.span());
126 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800127 }
128
129 const std::vector<LogFile> parts = SortParts({logfile0});
130
131 PartsMessageReader reader(parts[0].parts[0]);
132
133 EXPECT_TRUE(reader.ReadMessage());
134 EXPECT_TRUE(reader.ReadMessage());
135 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
136}
137
Austin Schuhc41603c2020-10-11 16:17:37 -0700138// Tests that we can transparently re-assemble part files with a
139// PartsMessageReader.
140TEST(PartsMessageReaderTest, ReadWrite) {
141 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
142 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
143 unlink(logfile0.c_str());
144 unlink(logfile1.c_str());
145
146 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
147 JsonToSizedFlatbuffer<LogFileHeader>(
148 R"({
149 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800150 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700151 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
152 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
153 "parts_index": 0
154})");
155 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
156 JsonToSizedFlatbuffer<LogFileHeader>(
157 R"({
158 "max_out_of_order_duration": 200000000,
159 "monotonic_start_time": 0,
160 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800161 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700162 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
163 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
164 "parts_index": 1
165})");
166
167 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
168 JsonToSizedFlatbuffer<MessageHeader>(
169 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
170 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
171 JsonToSizedFlatbuffer<MessageHeader>(
172 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
173
174 {
175 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800176 writer.QueueSpan(config0.span());
177 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700178 }
179 {
180 DetachedBufferWriter writer(logfile1, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800181 writer.QueueSpan(config1.span());
182 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700183 }
184
185 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
186
187 PartsMessageReader reader(parts[0].parts[0]);
188
189 EXPECT_EQ(reader.filename(), logfile0);
190
191 // Confirm that the timestamps track, and the filename also updates.
192 // Read the first message.
193 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
194 EXPECT_EQ(
195 reader.max_out_of_order_duration(),
196 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
197 EXPECT_TRUE(reader.ReadMessage());
198 EXPECT_EQ(reader.filename(), logfile0);
199 EXPECT_EQ(reader.newest_timestamp(),
200 monotonic_clock::time_point(chrono::nanoseconds(1)));
201 EXPECT_EQ(
202 reader.max_out_of_order_duration(),
203 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
204
205 // Read the second message.
206 EXPECT_TRUE(reader.ReadMessage());
207 EXPECT_EQ(reader.filename(), logfile1);
208 EXPECT_EQ(reader.newest_timestamp(),
209 monotonic_clock::time_point(chrono::nanoseconds(2)));
210 EXPECT_EQ(
211 reader.max_out_of_order_duration(),
212 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
213
214 // And then confirm that reading again returns no message.
215 EXPECT_FALSE(reader.ReadMessage());
216 EXPECT_EQ(reader.filename(), logfile1);
217 EXPECT_EQ(
218 reader.max_out_of_order_duration(),
219 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800220 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Austin Schuhc41603c2020-10-11 16:17:37 -0700221}
Austin Schuh32f68492020-11-08 21:45:51 -0800222
Austin Schuh1be0ce42020-11-29 22:43:26 -0800223// Tests that Message's operator < works as expected.
224TEST(MessageTest, Sorting) {
225 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
226
227 Message m1{.channel_index = 0,
228 .queue_index = 0,
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700229 .timestamp =
230 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
Austin Schuh1be0ce42020-11-29 22:43:26 -0800231 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
232 Message m2{.channel_index = 0,
233 .queue_index = 0,
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700234 .timestamp =
235 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
Austin Schuh1be0ce42020-11-29 22:43:26 -0800236 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
237
238 EXPECT_LT(m1, m2);
239 EXPECT_GE(m2, m1);
240
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700241 m1.timestamp.time = e;
242 m2.timestamp.time = e;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800243
244 m1.channel_index = 1;
245 m2.channel_index = 2;
246
247 EXPECT_LT(m1, m2);
248 EXPECT_GE(m2, m1);
249
250 m1.channel_index = 0;
251 m2.channel_index = 0;
252 m1.queue_index = 0;
253 m2.queue_index = 1;
254
255 EXPECT_LT(m1, m2);
256 EXPECT_GE(m2, m1);
257}
258
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800259aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
260 const aos::FlatbufferDetachedBuffer<Configuration> &config,
261 const std::string_view json) {
262 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700263 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800264 flatbuffers::Offset<Configuration> config_offset =
265 aos::CopyFlatBuffer(config, &fbb);
266 LogFileHeader::Builder header_builder(fbb);
267 header_builder.add_configuration(config_offset);
268 fbb.Finish(header_builder.Finish());
269 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
270
271 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
272 JsonToFlatbuffer<LogFileHeader>(json));
273 CHECK(header_updates.Verify());
274 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700275 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800276 fbb2.FinishSizePrefixed(
277 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
278 return fbb2.Release();
279}
280
281class SortingElementTest : public ::testing::Test {
282 public:
283 SortingElementTest()
284 : config_(JsonToFlatbuffer<Configuration>(
285 R"({
286 "channels": [
287 {
288 "name": "/a",
289 "type": "aos.logger.testing.TestMessage",
290 "source_node": "pi1",
291 "destination_nodes": [
292 {
293 "name": "pi2"
294 },
295 {
296 "name": "pi3"
297 }
298 ]
299 },
300 {
301 "name": "/b",
302 "type": "aos.logger.testing.TestMessage",
303 "source_node": "pi1"
304 },
305 {
306 "name": "/c",
307 "type": "aos.logger.testing.TestMessage",
308 "source_node": "pi1"
309 }
310 ],
311 "nodes": [
312 {
313 "name": "pi1"
314 },
315 {
316 "name": "pi2"
317 },
318 {
319 "name": "pi3"
320 }
321 ]
322}
323)")),
324 config0_(MakeHeader(config_, R"({
325 /* 100ms */
326 "max_out_of_order_duration": 100000000,
327 "node": {
328 "name": "pi1"
329 },
330 "logger_node": {
331 "name": "pi1"
332 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800333 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800334 "realtime_start_time": 1000000000000,
335 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
336 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
337 "parts_index": 0
338})")),
339 config1_(MakeHeader(config_,
340 R"({
341 /* 100ms */
342 "max_out_of_order_duration": 100000000,
343 "node": {
344 "name": "pi1"
345 },
346 "logger_node": {
347 "name": "pi1"
348 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800349 "monotonic_start_time": 1000000,
350 "realtime_start_time": 1000000000000,
351 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
352 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
353 "parts_index": 0
354})")),
355 config2_(MakeHeader(config_,
356 R"({
357 /* 100ms */
358 "max_out_of_order_duration": 100000000,
359 "node": {
360 "name": "pi2"
361 },
362 "logger_node": {
363 "name": "pi2"
364 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800365 "monotonic_start_time": 0,
366 "realtime_start_time": 1000000000000,
Austin Schuhd2f96102020-12-01 20:27:29 -0800367 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
368 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
369 "parts_index": 0
370})")),
371 config3_(MakeHeader(config_,
372 R"({
373 /* 100ms */
374 "max_out_of_order_duration": 100000000,
375 "node": {
376 "name": "pi1"
377 },
378 "logger_node": {
379 "name": "pi1"
380 },
381 "monotonic_start_time": 2000000,
382 "realtime_start_time": 1000000000,
383 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
384 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800385 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800386})")),
387 config4_(MakeHeader(config_,
388 R"({
389 /* 100ms */
390 "max_out_of_order_duration": 100000000,
391 "node": {
392 "name": "pi2"
393 },
394 "logger_node": {
395 "name": "pi1"
396 },
397 "monotonic_start_time": 2000000,
398 "realtime_start_time": 1000000000,
399 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
400 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
401 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800402})")) {
403 unlink(logfile0_.c_str());
404 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800405 unlink(logfile2_.c_str());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800406 queue_index_.resize(kChannels);
407 }
408
409 protected:
410 static constexpr size_t kChannels = 3u;
411
412 flatbuffers::DetachedBuffer MakeLogMessage(
413 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
414 int value) {
415 flatbuffers::FlatBufferBuilder message_fbb;
416 message_fbb.ForceDefaults(true);
417 TestMessage::Builder test_message_builder(message_fbb);
418 test_message_builder.add_value(value);
419 message_fbb.Finish(test_message_builder.Finish());
420
421 aos::Context context;
422 context.monotonic_event_time = monotonic_now;
423 context.realtime_event_time = aos::realtime_clock::epoch() +
424 chrono::seconds(1000) +
425 monotonic_now.time_since_epoch();
426 context.queue_index = queue_index_[channel_index];
427 context.size = message_fbb.GetSize();
428 context.data = message_fbb.GetBufferPointer();
429
430 ++queue_index_[channel_index];
431
432 flatbuffers::FlatBufferBuilder fbb;
433 fbb.FinishSizePrefixed(
434 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
435
436 return fbb.Release();
437 }
438
439 flatbuffers::DetachedBuffer MakeTimestampMessage(
440 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800441 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
442 monotonic_clock::time_point monotonic_timestamp_time =
443 monotonic_clock::min_time) {
444 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800445 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800446
447 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800448 fbb.ForceDefaults(true);
449
450 logger::MessageHeader::Builder message_header_builder(fbb);
451
452 message_header_builder.add_channel_index(channel_index);
453
454 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
455 100);
456 message_header_builder.add_monotonic_sent_time(
457 monotonic_sent_time.time_since_epoch().count());
458 message_header_builder.add_realtime_sent_time(
459 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
460 monotonic_sent_time.time_since_epoch())
461 .time_since_epoch()
462 .count());
463
464 message_header_builder.add_monotonic_remote_time(
465 sender_monotonic_now.time_since_epoch().count());
466 message_header_builder.add_realtime_remote_time(
467 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
468 sender_monotonic_now.time_since_epoch())
469 .time_since_epoch()
470 .count());
471 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
472 1);
473
474 if (monotonic_timestamp_time != monotonic_clock::min_time) {
475 message_header_builder.add_monotonic_timestamp_time(
476 monotonic_timestamp_time.time_since_epoch().count());
477 }
478
479 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800480 LOG(INFO) << aos::FlatbufferToJson(
481 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
482 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
483
484 return fbb.Release();
485 }
486
487 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
488 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800489 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800490
491 const aos::FlatbufferDetachedBuffer<Configuration> config_;
492 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
493 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800494 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
495 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800496 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800497
498 std::vector<uint32_t> queue_index_;
499};
500
501using LogPartsSorterTest = SortingElementTest;
502using LogPartsSorterDeathTest = LogPartsSorterTest;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800503using NodeMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800504using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800505
506// Tests that we can pull messages out of a log sorted in order.
507TEST_F(LogPartsSorterTest, Pull) {
508 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
509 {
510 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
511 writer.QueueSpan(config0_.span());
512 writer.QueueSizedFlatbuffer(
513 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
514 writer.QueueSizedFlatbuffer(
515 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
516 writer.QueueSizedFlatbuffer(
517 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
518 writer.QueueSizedFlatbuffer(
519 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
520 }
521
522 const std::vector<LogFile> parts = SortParts({logfile0_});
523
524 LogPartsSorter parts_sorter(parts[0].parts[0]);
525
526 // Confirm we aren't sorted until any time until the message is popped.
527 // Peeking shouldn't change the sorted until time.
528 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
529
530 std::deque<Message> output;
531
532 ASSERT_TRUE(parts_sorter.Front() != nullptr);
533 output.emplace_back(std::move(*parts_sorter.Front()));
534 parts_sorter.PopFront();
535 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
536
537 ASSERT_TRUE(parts_sorter.Front() != nullptr);
538 output.emplace_back(std::move(*parts_sorter.Front()));
539 parts_sorter.PopFront();
540 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
541
542 ASSERT_TRUE(parts_sorter.Front() != nullptr);
543 output.emplace_back(std::move(*parts_sorter.Front()));
544 parts_sorter.PopFront();
545 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
546
547 ASSERT_TRUE(parts_sorter.Front() != nullptr);
548 output.emplace_back(std::move(*parts_sorter.Front()));
549 parts_sorter.PopFront();
550 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
551
552 ASSERT_TRUE(parts_sorter.Front() == nullptr);
553
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700554 EXPECT_EQ(output[0].timestamp.boot, 0);
555 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
556 EXPECT_EQ(output[1].timestamp.boot, 0);
557 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1000));
558 EXPECT_EQ(output[2].timestamp.boot, 0);
559 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1901));
560 EXPECT_EQ(output[3].timestamp.boot, 0);
561 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800562}
563
Austin Schuhb000de62020-12-03 22:00:40 -0800564// Tests that we can pull messages out of a log sorted in order.
565TEST_F(LogPartsSorterTest, WayBeforeStart) {
566 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
567 {
568 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
569 writer.QueueSpan(config0_.span());
570 writer.QueueSizedFlatbuffer(
571 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
572 writer.QueueSizedFlatbuffer(
573 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
574 writer.QueueSizedFlatbuffer(
575 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
576 writer.QueueSizedFlatbuffer(
577 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
578 writer.QueueSizedFlatbuffer(
579 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
580 }
581
582 const std::vector<LogFile> parts = SortParts({logfile0_});
583
584 LogPartsSorter parts_sorter(parts[0].parts[0]);
585
586 // Confirm we aren't sorted until any time until the message is popped.
587 // Peeking shouldn't change the sorted until time.
588 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
589
590 std::deque<Message> output;
591
592 for (monotonic_clock::time_point t :
593 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
594 e + chrono::milliseconds(1900), monotonic_clock::max_time,
595 monotonic_clock::max_time}) {
596 ASSERT_TRUE(parts_sorter.Front() != nullptr);
597 output.emplace_back(std::move(*parts_sorter.Front()));
598 parts_sorter.PopFront();
599 EXPECT_EQ(parts_sorter.sorted_until(), t);
600 }
601
602 ASSERT_TRUE(parts_sorter.Front() == nullptr);
603
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700604 EXPECT_EQ(output[0].timestamp.boot, 0u);
605 EXPECT_EQ(output[0].timestamp.time, e - chrono::milliseconds(1000));
606 EXPECT_EQ(output[1].timestamp.boot, 0u);
607 EXPECT_EQ(output[1].timestamp.time, e - chrono::milliseconds(500));
608 EXPECT_EQ(output[2].timestamp.boot, 0u);
609 EXPECT_EQ(output[2].timestamp.time, e - chrono::milliseconds(10));
610 EXPECT_EQ(output[3].timestamp.boot, 0u);
611 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(1901));
612 EXPECT_EQ(output[4].timestamp.boot, 0u);
613 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuhb000de62020-12-03 22:00:40 -0800614}
615
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800616// Tests that messages too far out of order trigger death.
617TEST_F(LogPartsSorterDeathTest, Pull) {
618 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
619 {
620 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
621 writer.QueueSpan(config0_.span());
622 writer.QueueSizedFlatbuffer(
623 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
624 writer.QueueSizedFlatbuffer(
625 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
626 writer.QueueSizedFlatbuffer(
627 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
628 // The following message is too far out of order and will trigger the CHECK.
629 writer.QueueSizedFlatbuffer(
630 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
631 }
632
633 const std::vector<LogFile> parts = SortParts({logfile0_});
634
635 LogPartsSorter parts_sorter(parts[0].parts[0]);
636
637 // Confirm we aren't sorted until any time until the message is popped.
638 // Peeking shouldn't change the sorted until time.
639 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
640 std::deque<Message> output;
641
642 ASSERT_TRUE(parts_sorter.Front() != nullptr);
643 parts_sorter.PopFront();
644 ASSERT_TRUE(parts_sorter.Front() != nullptr);
645 ASSERT_TRUE(parts_sorter.Front() != nullptr);
646 parts_sorter.PopFront();
647
Austin Schuha040c3f2021-02-13 16:09:07 -0800648 EXPECT_DEATH({ parts_sorter.Front(); }, "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800649}
650
Austin Schuh8f52ed52020-11-30 23:12:39 -0800651// Tests that we can merge data from 2 separate files, including duplicate data.
652TEST_F(NodeMergerTest, TwoFileMerger) {
653 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
654 {
655 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
656 writer0.QueueSpan(config0_.span());
657 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
658 writer1.QueueSpan(config1_.span());
659
660 writer0.QueueSizedFlatbuffer(
661 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
662 writer1.QueueSizedFlatbuffer(
663 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
664
665 writer0.QueueSizedFlatbuffer(
666 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
667 writer1.QueueSizedFlatbuffer(
668 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
669
670 // Make a duplicate!
671 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
672 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
673 writer0.QueueSpan(msg.span());
674 writer1.QueueSpan(msg.span());
675
676 writer1.QueueSizedFlatbuffer(
677 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
678 }
679
680 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Austin Schuhd2f96102020-12-01 20:27:29 -0800681 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800682
Austin Schuhd2f96102020-12-01 20:27:29 -0800683 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800684
685 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
686
687 std::deque<Message> output;
688
689 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
690 ASSERT_TRUE(merger.Front() != nullptr);
691 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
692
693 output.emplace_back(std::move(*merger.Front()));
694 merger.PopFront();
695 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
696
697 ASSERT_TRUE(merger.Front() != nullptr);
698 output.emplace_back(std::move(*merger.Front()));
699 merger.PopFront();
700 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
701
702 ASSERT_TRUE(merger.Front() != nullptr);
703 output.emplace_back(std::move(*merger.Front()));
704 merger.PopFront();
705 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
706
707 ASSERT_TRUE(merger.Front() != nullptr);
708 output.emplace_back(std::move(*merger.Front()));
709 merger.PopFront();
710 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
711
712 ASSERT_TRUE(merger.Front() != nullptr);
713 output.emplace_back(std::move(*merger.Front()));
714 merger.PopFront();
715 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
716
717 ASSERT_TRUE(merger.Front() != nullptr);
718 output.emplace_back(std::move(*merger.Front()));
719 merger.PopFront();
720 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
721
722 ASSERT_TRUE(merger.Front() == nullptr);
723
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700724 EXPECT_EQ(output[0].timestamp.boot, 0u);
725 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
726 EXPECT_EQ(output[1].timestamp.boot, 0u);
727 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1001));
728 EXPECT_EQ(output[2].timestamp.boot, 0u);
729 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1002));
730 EXPECT_EQ(output[3].timestamp.boot, 0u);
731 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
732 EXPECT_EQ(output[4].timestamp.boot, 0u);
733 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(3000));
734 EXPECT_EQ(output[5].timestamp.boot, 0u);
735 EXPECT_EQ(output[5].timestamp.time, e + chrono::milliseconds(3002));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800736}
737
Austin Schuh8bf1e632021-01-02 22:41:04 -0800738// Tests that we can merge timestamps with various combinations of
739// monotonic_timestamp_time.
740TEST_F(NodeMergerTest, TwoFileTimestampMerger) {
741 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
742 {
743 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
744 writer0.QueueSpan(config0_.span());
745 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
746 writer1.QueueSpan(config1_.span());
747
748 // Neither has it.
749 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
750 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
751 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
752 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
753 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
754
755 // First only has it.
756 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
757 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
758 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
759 e + chrono::nanoseconds(971)));
760 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
761 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
762
763 // Second only has it.
764 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
765 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
766 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
767 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
768 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
769 e + chrono::nanoseconds(972)));
770
771 // Both have it.
772 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
773 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
774 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
775 e + chrono::nanoseconds(973)));
776 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
777 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
778 e + chrono::nanoseconds(973)));
779 }
780
781 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
782 ASSERT_EQ(parts.size(), 1u);
783
784 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
785
786 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
787
788 std::deque<Message> output;
789
790 for (int i = 0; i < 4; ++i) {
791 ASSERT_TRUE(merger.Front() != nullptr);
792 output.emplace_back(std::move(*merger.Front()));
793 merger.PopFront();
794 }
795 ASSERT_TRUE(merger.Front() == nullptr);
796
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700797 EXPECT_EQ(output[0].timestamp.boot, 0u);
798 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800799 EXPECT_FALSE(output[0].data.message().has_monotonic_timestamp_time());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700800
801 EXPECT_EQ(output[1].timestamp.boot, 0u);
802 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800803 EXPECT_TRUE(output[1].data.message().has_monotonic_timestamp_time());
804 EXPECT_EQ(output[1].data.message().monotonic_timestamp_time(), 971);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700805
806 EXPECT_EQ(output[2].timestamp.boot, 0u);
807 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800808 EXPECT_TRUE(output[2].data.message().has_monotonic_timestamp_time());
809 EXPECT_EQ(output[2].data.message().monotonic_timestamp_time(), 972);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700810
811 EXPECT_EQ(output[3].timestamp.boot, 0u);
812 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800813 EXPECT_TRUE(output[3].data.message().has_monotonic_timestamp_time());
814 EXPECT_EQ(output[3].data.message().monotonic_timestamp_time(), 973);
815}
816
Austin Schuhd2f96102020-12-01 20:27:29 -0800817// Tests that we can match timestamps on delivered messages.
818TEST_F(TimestampMapperTest, ReadNode0First) {
819 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
820 {
821 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
822 writer0.QueueSpan(config0_.span());
823 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
824 writer1.QueueSpan(config2_.span());
825
826 writer0.QueueSizedFlatbuffer(
827 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
828 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
829 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
830
831 writer0.QueueSizedFlatbuffer(
832 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
833 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
834 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
835
836 writer0.QueueSizedFlatbuffer(
837 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
838 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
839 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
840 }
841
842 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
843
844 ASSERT_EQ(parts[0].logger_node, "pi1");
845 ASSERT_EQ(parts[1].logger_node, "pi2");
846
Austin Schuh79b30942021-01-24 22:32:21 -0800847 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800848 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -0800849 mapper0.set_timestamp_callback(
850 [&](TimestampedMessage *) { ++mapper0_count; });
851 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800852 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -0800853 mapper1.set_timestamp_callback(
854 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -0800855
856 mapper0.AddPeer(&mapper1);
857 mapper1.AddPeer(&mapper0);
858
859 {
860 std::deque<TimestampedMessage> output0;
861
Austin Schuh79b30942021-01-24 22:32:21 -0800862 EXPECT_EQ(mapper0_count, 0u);
863 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800864 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800865 EXPECT_EQ(mapper0_count, 1u);
866 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800867 output0.emplace_back(std::move(*mapper0.Front()));
868 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700869 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800870 EXPECT_EQ(mapper0_count, 1u);
871 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800872
873 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800874 EXPECT_EQ(mapper0_count, 2u);
875 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800876 output0.emplace_back(std::move(*mapper0.Front()));
877 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700878 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800879
880 ASSERT_TRUE(mapper0.Front() != nullptr);
881 output0.emplace_back(std::move(*mapper0.Front()));
882 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700883 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800884
Austin Schuh79b30942021-01-24 22:32:21 -0800885 EXPECT_EQ(mapper0_count, 3u);
886 EXPECT_EQ(mapper1_count, 0u);
887
Austin Schuhd2f96102020-12-01 20:27:29 -0800888 ASSERT_TRUE(mapper0.Front() == nullptr);
889
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700890 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
891 EXPECT_EQ(output0[0].monotonic_event_time.time,
892 e + chrono::milliseconds(1000));
Austin Schuhd2f96102020-12-01 20:27:29 -0800893 EXPECT_TRUE(output0[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700894
895 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
896 EXPECT_EQ(output0[1].monotonic_event_time.time,
897 e + chrono::milliseconds(2000));
Austin Schuhd2f96102020-12-01 20:27:29 -0800898 EXPECT_TRUE(output0[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700899
900 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
901 EXPECT_EQ(output0[2].monotonic_event_time.time,
902 e + chrono::milliseconds(3000));
Austin Schuhd2f96102020-12-01 20:27:29 -0800903 EXPECT_TRUE(output0[2].data.Verify());
904 }
905
906 {
907 SCOPED_TRACE("Trying node1 now");
908 std::deque<TimestampedMessage> output1;
909
Austin Schuh79b30942021-01-24 22:32:21 -0800910 EXPECT_EQ(mapper0_count, 3u);
911 EXPECT_EQ(mapper1_count, 0u);
912
Austin Schuhd2f96102020-12-01 20:27:29 -0800913 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800914 EXPECT_EQ(mapper0_count, 3u);
915 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800916 output1.emplace_back(std::move(*mapper1.Front()));
917 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700918 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800919 EXPECT_EQ(mapper0_count, 3u);
920 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800921
922 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800923 EXPECT_EQ(mapper0_count, 3u);
924 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800925 output1.emplace_back(std::move(*mapper1.Front()));
926 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700927 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800928
929 ASSERT_TRUE(mapper1.Front() != nullptr);
930 output1.emplace_back(std::move(*mapper1.Front()));
931 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700932 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800933
Austin Schuh79b30942021-01-24 22:32:21 -0800934 EXPECT_EQ(mapper0_count, 3u);
935 EXPECT_EQ(mapper1_count, 3u);
936
Austin Schuhd2f96102020-12-01 20:27:29 -0800937 ASSERT_TRUE(mapper1.Front() == nullptr);
938
Austin Schuh79b30942021-01-24 22:32:21 -0800939 EXPECT_EQ(mapper0_count, 3u);
940 EXPECT_EQ(mapper1_count, 3u);
941
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700942 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
943 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -0800944 e + chrono::seconds(100) + chrono::milliseconds(1000));
945 EXPECT_TRUE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700946
947 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
948 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -0800949 e + chrono::seconds(100) + chrono::milliseconds(2000));
950 EXPECT_TRUE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700951
952 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
953 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -0800954 e + chrono::seconds(100) + chrono::milliseconds(3000));
955 EXPECT_TRUE(output1[2].data.Verify());
956 }
957}
958
Austin Schuh8bf1e632021-01-02 22:41:04 -0800959// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
960// returned.
961TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
962 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
963 {
964 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
965 writer0.QueueSpan(config0_.span());
966 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
967 writer1.QueueSpan(config4_.span());
968
969 writer0.QueueSizedFlatbuffer(
970 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
971 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
972 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
973 e + chrono::nanoseconds(971)));
974
975 writer0.QueueSizedFlatbuffer(
976 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
977 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
978 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
979 e + chrono::nanoseconds(5458)));
980
981 writer0.QueueSizedFlatbuffer(
982 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
983 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
984 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
985 }
986
987 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
988
989 for (const auto &p : parts) {
990 LOG(INFO) << p;
991 }
992
993 ASSERT_EQ(parts.size(), 1u);
994
Austin Schuh79b30942021-01-24 22:32:21 -0800995 size_t mapper0_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800996 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -0800997 mapper0.set_timestamp_callback(
998 [&](TimestampedMessage *) { ++mapper0_count; });
999 size_t mapper1_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001000 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001001 mapper1.set_timestamp_callback(
1002 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -08001003
1004 mapper0.AddPeer(&mapper1);
1005 mapper1.AddPeer(&mapper0);
1006
1007 {
1008 std::deque<TimestampedMessage> output0;
1009
1010 for (int i = 0; i < 3; ++i) {
1011 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
1012 output0.emplace_back(std::move(*mapper0.Front()));
1013 mapper0.PopFront();
1014 }
1015
1016 ASSERT_TRUE(mapper0.Front() == nullptr);
1017
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001018 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1019 EXPECT_EQ(output0[0].monotonic_event_time.time,
1020 e + chrono::milliseconds(1000));
1021 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
1022 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
1023 monotonic_clock::min_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001024 EXPECT_TRUE(output0[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001025
1026 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1027 EXPECT_EQ(output0[1].monotonic_event_time.time,
1028 e + chrono::milliseconds(2000));
1029 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
1030 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
1031 monotonic_clock::min_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001032 EXPECT_TRUE(output0[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001033
1034 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1035 EXPECT_EQ(output0[2].monotonic_event_time.time,
1036 e + chrono::milliseconds(3000));
1037 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
1038 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
1039 monotonic_clock::min_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001040 EXPECT_TRUE(output0[2].data.Verify());
1041 }
1042
1043 {
1044 SCOPED_TRACE("Trying node1 now");
1045 std::deque<TimestampedMessage> output1;
1046
1047 for (int i = 0; i < 3; ++i) {
1048 ASSERT_TRUE(mapper1.Front() != nullptr);
1049 output1.emplace_back(std::move(*mapper1.Front()));
1050 mapper1.PopFront();
1051 }
1052
1053 ASSERT_TRUE(mapper1.Front() == nullptr);
1054
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001055 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1056 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001057 e + chrono::seconds(100) + chrono::milliseconds(1000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001058 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
1059 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001060 e + chrono::nanoseconds(971));
1061 EXPECT_TRUE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001062
1063 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1064 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001065 e + chrono::seconds(100) + chrono::milliseconds(2000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001066 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
1067 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001068 e + chrono::nanoseconds(5458));
1069 EXPECT_TRUE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001070
1071 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1072 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001073 e + chrono::seconds(100) + chrono::milliseconds(3000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001074 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
1075 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
1076 monotonic_clock::min_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001077 EXPECT_TRUE(output1[2].data.Verify());
1078 }
Austin Schuh79b30942021-01-24 22:32:21 -08001079
1080 EXPECT_EQ(mapper0_count, 3u);
1081 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001082}
1083
Austin Schuhd2f96102020-12-01 20:27:29 -08001084// Tests that we can match timestamps on delivered messages. By doing this in
1085// the reverse order, the second node needs to queue data up from the first node
1086// to find the matching timestamp.
1087TEST_F(TimestampMapperTest, ReadNode1First) {
1088 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1089 {
1090 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1091 writer0.QueueSpan(config0_.span());
1092 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1093 writer1.QueueSpan(config2_.span());
1094
1095 writer0.QueueSizedFlatbuffer(
1096 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1097 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1098 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1099
1100 writer0.QueueSizedFlatbuffer(
1101 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1102 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1103 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1104
1105 writer0.QueueSizedFlatbuffer(
1106 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1107 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1108 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1109 }
1110
1111 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1112
1113 ASSERT_EQ(parts[0].logger_node, "pi1");
1114 ASSERT_EQ(parts[1].logger_node, "pi2");
1115
Austin Schuh79b30942021-01-24 22:32:21 -08001116 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001117 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001118 mapper0.set_timestamp_callback(
1119 [&](TimestampedMessage *) { ++mapper0_count; });
1120 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001121 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001122 mapper1.set_timestamp_callback(
1123 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001124
1125 mapper0.AddPeer(&mapper1);
1126 mapper1.AddPeer(&mapper0);
1127
1128 {
1129 SCOPED_TRACE("Trying node1 now");
1130 std::deque<TimestampedMessage> output1;
1131
1132 ASSERT_TRUE(mapper1.Front() != nullptr);
1133 output1.emplace_back(std::move(*mapper1.Front()));
1134 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001135 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001136
1137 ASSERT_TRUE(mapper1.Front() != nullptr);
1138 output1.emplace_back(std::move(*mapper1.Front()));
1139 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001140 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001141
1142 ASSERT_TRUE(mapper1.Front() != nullptr);
1143 output1.emplace_back(std::move(*mapper1.Front()));
1144 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001145 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001146
1147 ASSERT_TRUE(mapper1.Front() == nullptr);
1148
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001149 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1150 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001151 e + chrono::seconds(100) + chrono::milliseconds(1000));
1152 EXPECT_TRUE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001153
1154 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1155 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001156 e + chrono::seconds(100) + chrono::milliseconds(2000));
1157 EXPECT_TRUE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001158
1159 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1160 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001161 e + chrono::seconds(100) + chrono::milliseconds(3000));
1162 EXPECT_TRUE(output1[2].data.Verify());
1163 }
1164
1165 {
1166 std::deque<TimestampedMessage> output0;
1167
1168 ASSERT_TRUE(mapper0.Front() != nullptr);
1169 output0.emplace_back(std::move(*mapper0.Front()));
1170 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001171 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001172
1173 ASSERT_TRUE(mapper0.Front() != nullptr);
1174 output0.emplace_back(std::move(*mapper0.Front()));
1175 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001176 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001177
1178 ASSERT_TRUE(mapper0.Front() != nullptr);
1179 output0.emplace_back(std::move(*mapper0.Front()));
1180 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001181 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001182
1183 ASSERT_TRUE(mapper0.Front() == nullptr);
1184
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001185 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1186 EXPECT_EQ(output0[0].monotonic_event_time.time,
1187 e + chrono::milliseconds(1000));
Austin Schuhd2f96102020-12-01 20:27:29 -08001188 EXPECT_TRUE(output0[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001189
1190 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1191 EXPECT_EQ(output0[1].monotonic_event_time.time,
1192 e + chrono::milliseconds(2000));
Austin Schuhd2f96102020-12-01 20:27:29 -08001193 EXPECT_TRUE(output0[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001194
1195 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1196 EXPECT_EQ(output0[2].monotonic_event_time.time,
1197 e + chrono::milliseconds(3000));
Austin Schuhd2f96102020-12-01 20:27:29 -08001198 EXPECT_TRUE(output0[2].data.Verify());
1199 }
Austin Schuh79b30942021-01-24 22:32:21 -08001200
1201 EXPECT_EQ(mapper0_count, 3u);
1202 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001203}
1204
1205// Tests that we return just the timestamps if we couldn't find the data and the
1206// missing data was at the beginning of the file.
1207TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1208 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1209 {
1210 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1211 writer0.QueueSpan(config0_.span());
1212 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1213 writer1.QueueSpan(config2_.span());
1214
1215 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
1216 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1217 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1218
1219 writer0.QueueSizedFlatbuffer(
1220 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1221 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1222 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1223
1224 writer0.QueueSizedFlatbuffer(
1225 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1226 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1227 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1228 }
1229
1230 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1231
1232 ASSERT_EQ(parts[0].logger_node, "pi1");
1233 ASSERT_EQ(parts[1].logger_node, "pi2");
1234
Austin Schuh79b30942021-01-24 22:32:21 -08001235 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001236 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001237 mapper0.set_timestamp_callback(
1238 [&](TimestampedMessage *) { ++mapper0_count; });
1239 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001240 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001241 mapper1.set_timestamp_callback(
1242 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001243
1244 mapper0.AddPeer(&mapper1);
1245 mapper1.AddPeer(&mapper0);
1246
1247 {
1248 SCOPED_TRACE("Trying node1 now");
1249 std::deque<TimestampedMessage> output1;
1250
1251 ASSERT_TRUE(mapper1.Front() != nullptr);
1252 output1.emplace_back(std::move(*mapper1.Front()));
1253 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001254 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001255
1256 ASSERT_TRUE(mapper1.Front() != nullptr);
1257 output1.emplace_back(std::move(*mapper1.Front()));
1258 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001259 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001260
1261 ASSERT_TRUE(mapper1.Front() != nullptr);
1262 output1.emplace_back(std::move(*mapper1.Front()));
1263 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001264 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001265
1266 ASSERT_TRUE(mapper1.Front() == nullptr);
1267
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001268 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1269 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001270 e + chrono::seconds(100) + chrono::milliseconds(1000));
1271 EXPECT_FALSE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001272
1273 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1274 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001275 e + chrono::seconds(100) + chrono::milliseconds(2000));
1276 EXPECT_TRUE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001277
1278 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1279 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001280 e + chrono::seconds(100) + chrono::milliseconds(3000));
1281 EXPECT_TRUE(output1[2].data.Verify());
1282 }
Austin Schuh79b30942021-01-24 22:32:21 -08001283
1284 EXPECT_EQ(mapper0_count, 0u);
1285 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001286}
1287
1288// Tests that we return just the timestamps if we couldn't find the data and the
1289// missing data was at the end of the file.
1290TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1291 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1292 {
1293 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1294 writer0.QueueSpan(config0_.span());
1295 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1296 writer1.QueueSpan(config2_.span());
1297
1298 writer0.QueueSizedFlatbuffer(
1299 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1300 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1301 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1302
1303 writer0.QueueSizedFlatbuffer(
1304 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1305 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1306 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1307
1308 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
1309 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1310 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1311 }
1312
1313 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1314
1315 ASSERT_EQ(parts[0].logger_node, "pi1");
1316 ASSERT_EQ(parts[1].logger_node, "pi2");
1317
Austin Schuh79b30942021-01-24 22:32:21 -08001318 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001319 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001320 mapper0.set_timestamp_callback(
1321 [&](TimestampedMessage *) { ++mapper0_count; });
1322 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001323 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001324 mapper1.set_timestamp_callback(
1325 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001326
1327 mapper0.AddPeer(&mapper1);
1328 mapper1.AddPeer(&mapper0);
1329
1330 {
1331 SCOPED_TRACE("Trying node1 now");
1332 std::deque<TimestampedMessage> output1;
1333
1334 ASSERT_TRUE(mapper1.Front() != nullptr);
1335 output1.emplace_back(std::move(*mapper1.Front()));
1336 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001337 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001338
1339 ASSERT_TRUE(mapper1.Front() != nullptr);
1340 output1.emplace_back(std::move(*mapper1.Front()));
1341 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001342 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001343
1344 ASSERT_TRUE(mapper1.Front() != nullptr);
1345 output1.emplace_back(std::move(*mapper1.Front()));
1346 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001347 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001348
1349 ASSERT_TRUE(mapper1.Front() == nullptr);
1350
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001351 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1352 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001353 e + chrono::seconds(100) + chrono::milliseconds(1000));
1354 EXPECT_TRUE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001355
1356 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1357 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001358 e + chrono::seconds(100) + chrono::milliseconds(2000));
1359 EXPECT_TRUE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001360
1361 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1362 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001363 e + chrono::seconds(100) + chrono::milliseconds(3000));
1364 EXPECT_FALSE(output1[2].data.Verify());
1365 }
Austin Schuh79b30942021-01-24 22:32:21 -08001366
1367 EXPECT_EQ(mapper0_count, 0u);
1368 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001369}
1370
Austin Schuh993ccb52020-12-12 15:59:32 -08001371// Tests that we handle a message which failed to forward or be logged.
1372TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1373 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1374 {
1375 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1376 writer0.QueueSpan(config0_.span());
1377 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1378 writer1.QueueSpan(config2_.span());
1379
1380 writer0.QueueSizedFlatbuffer(
1381 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1382 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1383 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1384
1385 // Create both the timestamp and message, but don't log them, simulating a
1386 // forwarding drop.
1387 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1388 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1389 chrono::seconds(100));
1390
1391 writer0.QueueSizedFlatbuffer(
1392 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1393 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1394 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1395 }
1396
1397 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1398
1399 ASSERT_EQ(parts[0].logger_node, "pi1");
1400 ASSERT_EQ(parts[1].logger_node, "pi2");
1401
Austin Schuh79b30942021-01-24 22:32:21 -08001402 size_t mapper0_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001403 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001404 mapper0.set_timestamp_callback(
1405 [&](TimestampedMessage *) { ++mapper0_count; });
1406 size_t mapper1_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001407 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001408 mapper1.set_timestamp_callback(
1409 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001410
1411 mapper0.AddPeer(&mapper1);
1412 mapper1.AddPeer(&mapper0);
1413
1414 {
1415 std::deque<TimestampedMessage> output1;
1416
1417 ASSERT_TRUE(mapper1.Front() != nullptr);
1418 output1.emplace_back(std::move(*mapper1.Front()));
1419 mapper1.PopFront();
1420
1421 ASSERT_TRUE(mapper1.Front() != nullptr);
1422 output1.emplace_back(std::move(*mapper1.Front()));
1423
1424 ASSERT_FALSE(mapper1.Front() == nullptr);
1425
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001426 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1427 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001428 e + chrono::seconds(100) + chrono::milliseconds(1000));
1429 EXPECT_TRUE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001430
1431 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1432 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001433 e + chrono::seconds(100) + chrono::milliseconds(3000));
1434 EXPECT_TRUE(output1[1].data.Verify());
1435 }
Austin Schuh79b30942021-01-24 22:32:21 -08001436
1437 EXPECT_EQ(mapper0_count, 0u);
1438 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001439}
1440
Austin Schuhd2f96102020-12-01 20:27:29 -08001441// Tests that we properly sort log files with duplicate timestamps.
1442TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1443 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1444 {
1445 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1446 writer0.QueueSpan(config0_.span());
1447 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1448 writer1.QueueSpan(config2_.span());
1449
1450 writer0.QueueSizedFlatbuffer(
1451 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1452 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1453 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1454
1455 writer0.QueueSizedFlatbuffer(
1456 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1457 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1458 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1459
1460 writer0.QueueSizedFlatbuffer(
1461 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
1462 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1463 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1464
1465 writer0.QueueSizedFlatbuffer(
1466 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
1467 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1468 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1469 }
1470
1471 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1472
1473 ASSERT_EQ(parts[0].logger_node, "pi1");
1474 ASSERT_EQ(parts[1].logger_node, "pi2");
1475
Austin Schuh79b30942021-01-24 22:32:21 -08001476 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001477 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001478 mapper0.set_timestamp_callback(
1479 [&](TimestampedMessage *) { ++mapper0_count; });
1480 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001481 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001482 mapper1.set_timestamp_callback(
1483 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001484
1485 mapper0.AddPeer(&mapper1);
1486 mapper1.AddPeer(&mapper0);
1487
1488 {
1489 SCOPED_TRACE("Trying node1 now");
1490 std::deque<TimestampedMessage> output1;
1491
1492 for (int i = 0; i < 4; ++i) {
1493 ASSERT_TRUE(mapper1.Front() != nullptr);
1494 output1.emplace_back(std::move(*mapper1.Front()));
1495 mapper1.PopFront();
1496 }
1497 ASSERT_TRUE(mapper1.Front() == nullptr);
1498
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001499 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1500 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001501 e + chrono::seconds(100) + chrono::milliseconds(1000));
1502 EXPECT_TRUE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001503
1504 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1505 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001506 e + chrono::seconds(100) + chrono::milliseconds(2000));
1507 EXPECT_TRUE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001508
1509 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1510 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001511 e + chrono::seconds(100) + chrono::milliseconds(2000));
1512 EXPECT_TRUE(output1[2].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001513
1514 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1515 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001516 e + chrono::seconds(100) + chrono::milliseconds(3000));
1517 EXPECT_TRUE(output1[3].data.Verify());
1518 }
Austin Schuh79b30942021-01-24 22:32:21 -08001519
1520 EXPECT_EQ(mapper0_count, 0u);
1521 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001522}
1523
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001524// Tests that we properly produce a valid start time.
Austin Schuhd2f96102020-12-01 20:27:29 -08001525TEST_F(TimestampMapperTest, StartTime) {
1526 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1527 {
1528 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1529 writer0.QueueSpan(config0_.span());
1530 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1531 writer1.QueueSpan(config1_.span());
1532 DetachedBufferWriter writer2(logfile2_, std::make_unique<DummyEncoder>());
1533 writer2.QueueSpan(config3_.span());
1534 }
1535
1536 const std::vector<LogFile> parts =
1537 SortParts({logfile0_, logfile1_, logfile2_});
1538
Austin Schuh79b30942021-01-24 22:32:21 -08001539 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001540 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001541 mapper0.set_timestamp_callback(
1542 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001543
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001544 EXPECT_EQ(mapper0.monotonic_start_time(0), e + chrono::milliseconds(1));
1545 EXPECT_EQ(mapper0.realtime_start_time(0),
Austin Schuhd2f96102020-12-01 20:27:29 -08001546 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001547 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001548}
1549
Austin Schuhfecf1d82020-12-19 16:57:28 -08001550// Tests that when a peer isn't registered, we treat that as if there was no
1551// data available.
1552TEST_F(TimestampMapperTest, NoPeer) {
1553 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1554 {
1555 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1556 writer0.QueueSpan(config0_.span());
1557 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1558 writer1.QueueSpan(config2_.span());
1559
1560 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
1561 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1562 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1563
1564 writer0.QueueSizedFlatbuffer(
1565 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1566 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1567 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1568
1569 writer0.QueueSizedFlatbuffer(
1570 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1571 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1572 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1573 }
1574
1575 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1576
1577 ASSERT_EQ(parts[0].logger_node, "pi1");
1578 ASSERT_EQ(parts[1].logger_node, "pi2");
1579
Austin Schuh79b30942021-01-24 22:32:21 -08001580 size_t mapper1_count = 0;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001581 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001582 mapper1.set_timestamp_callback(
1583 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001584
1585 {
1586 std::deque<TimestampedMessage> output1;
1587
1588 ASSERT_TRUE(mapper1.Front() != nullptr);
1589 output1.emplace_back(std::move(*mapper1.Front()));
1590 mapper1.PopFront();
1591 ASSERT_TRUE(mapper1.Front() != nullptr);
1592 output1.emplace_back(std::move(*mapper1.Front()));
1593 mapper1.PopFront();
1594 ASSERT_TRUE(mapper1.Front() != nullptr);
1595 output1.emplace_back(std::move(*mapper1.Front()));
1596 mapper1.PopFront();
1597 ASSERT_TRUE(mapper1.Front() == nullptr);
1598
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001599 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1600 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001601 e + chrono::seconds(100) + chrono::milliseconds(1000));
1602 EXPECT_FALSE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001603
1604 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1605 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001606 e + chrono::seconds(100) + chrono::milliseconds(2000));
1607 EXPECT_FALSE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001608
1609 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1610 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001611 e + chrono::seconds(100) + chrono::milliseconds(3000));
1612 EXPECT_FALSE(output1[2].data.Verify());
1613 }
Austin Schuh79b30942021-01-24 22:32:21 -08001614 EXPECT_EQ(mapper1_count, 3u);
1615}
1616
1617// Tests that we can queue messages and call the timestamp callback for both
1618// nodes.
1619TEST_F(TimestampMapperTest, QueueUntilNode0) {
1620 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1621 {
1622 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1623 writer0.QueueSpan(config0_.span());
1624 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1625 writer1.QueueSpan(config2_.span());
1626
1627 writer0.QueueSizedFlatbuffer(
1628 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1629 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1630 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1631
1632 writer0.QueueSizedFlatbuffer(
1633 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
1634 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1635 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1636
1637 writer0.QueueSizedFlatbuffer(
1638 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
1639 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1640 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1641
1642 writer0.QueueSizedFlatbuffer(
1643 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
1644 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1645 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1646 }
1647
1648 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1649
1650 ASSERT_EQ(parts[0].logger_node, "pi1");
1651 ASSERT_EQ(parts[1].logger_node, "pi2");
1652
1653 size_t mapper0_count = 0;
1654 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1655 mapper0.set_timestamp_callback(
1656 [&](TimestampedMessage *) { ++mapper0_count; });
1657 size_t mapper1_count = 0;
1658 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1659 mapper1.set_timestamp_callback(
1660 [&](TimestampedMessage *) { ++mapper1_count; });
1661
1662 mapper0.AddPeer(&mapper1);
1663 mapper1.AddPeer(&mapper0);
1664
1665 {
1666 std::deque<TimestampedMessage> output0;
1667
1668 EXPECT_EQ(mapper0_count, 0u);
1669 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001670 mapper0.QueueUntil(
1671 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001672 EXPECT_EQ(mapper0_count, 3u);
1673 EXPECT_EQ(mapper1_count, 0u);
1674
1675 ASSERT_TRUE(mapper0.Front() != nullptr);
1676 EXPECT_EQ(mapper0_count, 3u);
1677 EXPECT_EQ(mapper1_count, 0u);
1678
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001679 mapper0.QueueUntil(
1680 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001681 EXPECT_EQ(mapper0_count, 3u);
1682 EXPECT_EQ(mapper1_count, 0u);
1683
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001684 mapper0.QueueUntil(
1685 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001686 EXPECT_EQ(mapper0_count, 4u);
1687 EXPECT_EQ(mapper1_count, 0u);
1688
1689 output0.emplace_back(std::move(*mapper0.Front()));
1690 mapper0.PopFront();
1691 output0.emplace_back(std::move(*mapper0.Front()));
1692 mapper0.PopFront();
1693 output0.emplace_back(std::move(*mapper0.Front()));
1694 mapper0.PopFront();
1695 output0.emplace_back(std::move(*mapper0.Front()));
1696 mapper0.PopFront();
1697
1698 EXPECT_EQ(mapper0_count, 4u);
1699 EXPECT_EQ(mapper1_count, 0u);
1700
1701 ASSERT_TRUE(mapper0.Front() == nullptr);
1702
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001703 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1704 EXPECT_EQ(output0[0].monotonic_event_time.time,
1705 e + chrono::milliseconds(1000));
Austin Schuh79b30942021-01-24 22:32:21 -08001706 EXPECT_TRUE(output0[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001707
1708 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1709 EXPECT_EQ(output0[1].monotonic_event_time.time,
1710 e + chrono::milliseconds(1000));
Austin Schuh79b30942021-01-24 22:32:21 -08001711 EXPECT_TRUE(output0[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001712
1713 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1714 EXPECT_EQ(output0[2].monotonic_event_time.time,
1715 e + chrono::milliseconds(2000));
Austin Schuh79b30942021-01-24 22:32:21 -08001716 EXPECT_TRUE(output0[2].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001717
1718 EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
1719 EXPECT_EQ(output0[3].monotonic_event_time.time,
1720 e + chrono::milliseconds(3000));
Austin Schuh79b30942021-01-24 22:32:21 -08001721 EXPECT_TRUE(output0[3].data.Verify());
1722 }
1723
1724 {
1725 SCOPED_TRACE("Trying node1 now");
1726 std::deque<TimestampedMessage> output1;
1727
1728 EXPECT_EQ(mapper0_count, 4u);
1729 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001730 mapper1.QueueUntil(BootTimestamp{
1731 .boot = 0,
1732 .time = e + chrono::seconds(100) + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001733 EXPECT_EQ(mapper0_count, 4u);
1734 EXPECT_EQ(mapper1_count, 3u);
1735
1736 ASSERT_TRUE(mapper1.Front() != nullptr);
1737 EXPECT_EQ(mapper0_count, 4u);
1738 EXPECT_EQ(mapper1_count, 3u);
1739
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001740 mapper1.QueueUntil(BootTimestamp{
1741 .boot = 0,
1742 .time = e + chrono::seconds(100) + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001743 EXPECT_EQ(mapper0_count, 4u);
1744 EXPECT_EQ(mapper1_count, 3u);
1745
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001746 mapper1.QueueUntil(BootTimestamp{
1747 .boot = 0,
1748 .time = e + chrono::seconds(100) + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001749 EXPECT_EQ(mapper0_count, 4u);
1750 EXPECT_EQ(mapper1_count, 4u);
1751
1752 ASSERT_TRUE(mapper1.Front() != nullptr);
1753 EXPECT_EQ(mapper0_count, 4u);
1754 EXPECT_EQ(mapper1_count, 4u);
1755
1756 output1.emplace_back(std::move(*mapper1.Front()));
1757 mapper1.PopFront();
1758 ASSERT_TRUE(mapper1.Front() != nullptr);
1759 output1.emplace_back(std::move(*mapper1.Front()));
1760 mapper1.PopFront();
1761 ASSERT_TRUE(mapper1.Front() != nullptr);
1762 output1.emplace_back(std::move(*mapper1.Front()));
1763 mapper1.PopFront();
1764 ASSERT_TRUE(mapper1.Front() != nullptr);
1765 output1.emplace_back(std::move(*mapper1.Front()));
1766 mapper1.PopFront();
1767
1768 EXPECT_EQ(mapper0_count, 4u);
1769 EXPECT_EQ(mapper1_count, 4u);
1770
1771 ASSERT_TRUE(mapper1.Front() == nullptr);
1772
1773 EXPECT_EQ(mapper0_count, 4u);
1774 EXPECT_EQ(mapper1_count, 4u);
1775
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001776 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1777 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001778 e + chrono::seconds(100) + chrono::milliseconds(1000));
1779 EXPECT_TRUE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001780
1781 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1782 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001783 e + chrono::seconds(100) + chrono::milliseconds(1000));
1784 EXPECT_TRUE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001785
1786 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1787 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001788 e + chrono::seconds(100) + chrono::milliseconds(2000));
1789 EXPECT_TRUE(output1[2].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001790
1791 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1792 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001793 e + chrono::seconds(100) + chrono::milliseconds(3000));
1794 EXPECT_TRUE(output1[3].data.Verify());
1795 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08001796}
1797
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001798class BootMergerTest : public SortingElementTest {
1799 public:
1800 BootMergerTest()
1801 : SortingElementTest(),
1802 boot0_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07001803 /* 100ms */
1804 "max_out_of_order_duration": 100000000,
1805 "node": {
1806 "name": "pi2"
1807 },
1808 "logger_node": {
1809 "name": "pi1"
1810 },
1811 "monotonic_start_time": 1000000,
1812 "realtime_start_time": 1000000000000,
1813 "logger_monotonic_start_time": 1000000,
1814 "logger_realtime_start_time": 1000000000000,
1815 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
1816 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
1817 "parts_index": 0,
1818 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
1819 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
1820 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464"
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001821})")),
1822 boot1_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07001823 /* 100ms */
1824 "max_out_of_order_duration": 100000000,
1825 "node": {
1826 "name": "pi2"
1827 },
1828 "logger_node": {
1829 "name": "pi1"
1830 },
1831 "monotonic_start_time": 1000000,
1832 "realtime_start_time": 1000000000000,
1833 "logger_monotonic_start_time": 1000000,
1834 "logger_realtime_start_time": 1000000000000,
1835 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
1836 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
1837 "parts_index": 1,
1838 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
1839 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
1840 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2"
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001841})")) {}
Austin Schuh8bee6882021-06-28 21:03:28 -07001842
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001843 protected:
1844 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
1845 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
1846};
1847
1848// This tests that we can properly sort a multi-node log file which has the old
1849// (and buggy) timestamps in the header, and the non-resetting parts_index.
1850// These make it so we can just bairly figure out what happened first and what
1851// happened second, but not in a way that is robust to multiple nodes rebooting.
1852TEST_F(BootMergerTest, OldReboot) {
Austin Schuh8bee6882021-06-28 21:03:28 -07001853 {
1854 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001855 writer.QueueSpan(boot0_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07001856 }
1857 {
1858 DetachedBufferWriter writer(logfile1_, std::make_unique<DummyEncoder>());
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001859 writer.QueueSpan(boot1_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07001860 }
1861
1862 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1863
1864 ASSERT_EQ(parts.size(), 1u);
1865 ASSERT_EQ(parts[0].parts.size(), 2u);
1866
1867 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
1868 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001869 boot0_.message().source_node_boot_uuid()->string_view());
Austin Schuh8bee6882021-06-28 21:03:28 -07001870
1871 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
1872 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001873 boot1_.message().source_node_boot_uuid()->string_view());
1874}
1875
1876// This tests that we can produce messages ordered across a reboot.
1877TEST_F(BootMergerTest, SortAcrossReboot) {
1878 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1879 {
1880 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
1881 writer.QueueSpan(boot0_.span());
1882 writer.QueueSizedFlatbuffer(
1883 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1884 writer.QueueSizedFlatbuffer(
1885 MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
1886 }
1887 {
1888 DetachedBufferWriter writer(logfile1_, std::make_unique<DummyEncoder>());
1889 writer.QueueSpan(boot1_.span());
1890 writer.QueueSizedFlatbuffer(
1891 MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
1892 writer.QueueSizedFlatbuffer(
1893 MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
1894 }
1895
1896 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1897 ASSERT_EQ(parts.size(), 1u);
1898 ASSERT_EQ(parts[0].parts.size(), 2u);
1899
1900 BootMerger merger(FilterPartsForNode(parts, "pi2"));
1901
1902 EXPECT_EQ(merger.node(), 1u);
1903
1904 std::vector<Message> output;
1905 for (int i = 0; i < 4; ++i) {
1906 ASSERT_TRUE(merger.Front() != nullptr);
1907 output.emplace_back(std::move(*merger.Front()));
1908 merger.PopFront();
1909 }
1910
1911 ASSERT_TRUE(merger.Front() == nullptr);
1912
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001913 EXPECT_EQ(output[0].timestamp.boot, 0u);
1914 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
1915 EXPECT_EQ(output[1].timestamp.boot, 0u);
1916 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(2000));
1917
1918 EXPECT_EQ(output[2].timestamp.boot, 1u);
1919 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(100));
1920 EXPECT_EQ(output[3].timestamp.boot, 1u);
1921 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
Austin Schuh8bee6882021-06-28 21:03:28 -07001922}
1923
Austin Schuhc243b422020-10-11 15:35:08 -07001924} // namespace testing
1925} // namespace logger
1926} // namespace aos