blob: 90a716eec9f6ee9d933d2b9669c9af8326cbb49e [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
4#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07005
Austin Schuhc41603c2020-10-11 16:17:37 -07006#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -07007#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -08008#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -07009#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070010#include "aos/json_to_flatbuffer.h"
11#include "aos/testing/tmpdir.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070012#include "gtest/gtest.h"
Austin Schuhc243b422020-10-11 15:35:08 -070013
14namespace aos {
15namespace logger {
16namespace testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070017namespace chrono = std::chrono;
Austin Schuhc243b422020-10-11 15:35:08 -070018
Austin Schuhe243aaf2020-10-11 15:46:02 -070019// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070020template <typename T>
21SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
22 const std::string_view data) {
23 flatbuffers::FlatBufferBuilder fbb;
24 fbb.ForceDefaults(true);
25 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
26 return fbb.Release();
27}
28
Austin Schuhe243aaf2020-10-11 15:46:02 -070029// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070030TEST(SpanReaderTest, ReadWrite) {
31 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
32 unlink(logfile.c_str());
33
34 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080035 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070036 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080037 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070038
39 {
40 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080041 writer.QueueSpan(m1.span());
42 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070043 }
44
45 SpanReader reader(logfile);
46
47 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080048 EXPECT_EQ(reader.ReadMessage(), m1.span());
49 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070050 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
51}
52
Austin Schuhe243aaf2020-10-11 15:46:02 -070053// Tests that we can actually parse the resulting messages at a basic level
54// through MessageReader.
55TEST(MessageReaderTest, ReadWrite) {
56 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
57 unlink(logfile.c_str());
58
59 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
60 JsonToSizedFlatbuffer<LogFileHeader>(
61 R"({ "max_out_of_order_duration": 100000000 })");
62 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
63 JsonToSizedFlatbuffer<MessageHeader>(
64 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
65 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
66 JsonToSizedFlatbuffer<MessageHeader>(
67 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
68
69 {
70 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080071 writer.QueueSpan(config.span());
72 writer.QueueSpan(m1.span());
73 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -070074 }
75
76 MessageReader reader(logfile);
77
78 EXPECT_EQ(reader.filename(), logfile);
79
80 EXPECT_EQ(
81 reader.max_out_of_order_duration(),
82 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
83 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
84 EXPECT_TRUE(reader.ReadMessage());
85 EXPECT_EQ(reader.newest_timestamp(),
86 monotonic_clock::time_point(chrono::nanoseconds(1)));
87 EXPECT_TRUE(reader.ReadMessage());
88 EXPECT_EQ(reader.newest_timestamp(),
89 monotonic_clock::time_point(chrono::nanoseconds(2)));
90 EXPECT_FALSE(reader.ReadMessage());
91}
92
Austin Schuh32f68492020-11-08 21:45:51 -080093// Tests that we explode when messages are too far out of order.
94TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
95 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
96 unlink(logfile0.c_str());
97
98 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
99 JsonToSizedFlatbuffer<LogFileHeader>(
100 R"({
101 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800102 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800103 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
104 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
105 "parts_index": 0
106})");
107
108 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
109 JsonToSizedFlatbuffer<MessageHeader>(
110 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
111 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
112 JsonToSizedFlatbuffer<MessageHeader>(
113 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
114 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
115 JsonToSizedFlatbuffer<MessageHeader>(
116 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
117
118 {
119 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800120 writer.QueueSpan(config0.span());
121 writer.QueueSpan(m1.span());
122 writer.QueueSpan(m2.span());
123 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800124 }
125
126 const std::vector<LogFile> parts = SortParts({logfile0});
127
128 PartsMessageReader reader(parts[0].parts[0]);
129
130 EXPECT_TRUE(reader.ReadMessage());
131 EXPECT_TRUE(reader.ReadMessage());
132 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
133}
134
Austin Schuhc41603c2020-10-11 16:17:37 -0700135// Tests that we can transparently re-assemble part files with a
136// PartsMessageReader.
137TEST(PartsMessageReaderTest, ReadWrite) {
138 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
139 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
140 unlink(logfile0.c_str());
141 unlink(logfile1.c_str());
142
143 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
144 JsonToSizedFlatbuffer<LogFileHeader>(
145 R"({
146 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800147 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700148 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
149 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
150 "parts_index": 0
151})");
152 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
153 JsonToSizedFlatbuffer<LogFileHeader>(
154 R"({
155 "max_out_of_order_duration": 200000000,
156 "monotonic_start_time": 0,
157 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800158 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700159 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
160 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
161 "parts_index": 1
162})");
163
164 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
165 JsonToSizedFlatbuffer<MessageHeader>(
166 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
167 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
168 JsonToSizedFlatbuffer<MessageHeader>(
169 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
170
171 {
172 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800173 writer.QueueSpan(config0.span());
174 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700175 }
176 {
177 DetachedBufferWriter writer(logfile1, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800178 writer.QueueSpan(config1.span());
179 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700180 }
181
182 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
183
184 PartsMessageReader reader(parts[0].parts[0]);
185
186 EXPECT_EQ(reader.filename(), logfile0);
187
188 // Confirm that the timestamps track, and the filename also updates.
189 // Read the first message.
190 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
191 EXPECT_EQ(
192 reader.max_out_of_order_duration(),
193 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
194 EXPECT_TRUE(reader.ReadMessage());
195 EXPECT_EQ(reader.filename(), logfile0);
196 EXPECT_EQ(reader.newest_timestamp(),
197 monotonic_clock::time_point(chrono::nanoseconds(1)));
198 EXPECT_EQ(
199 reader.max_out_of_order_duration(),
200 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
201
202 // Read the second message.
203 EXPECT_TRUE(reader.ReadMessage());
204 EXPECT_EQ(reader.filename(), logfile1);
205 EXPECT_EQ(reader.newest_timestamp(),
206 monotonic_clock::time_point(chrono::nanoseconds(2)));
207 EXPECT_EQ(
208 reader.max_out_of_order_duration(),
209 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
210
211 // And then confirm that reading again returns no message.
212 EXPECT_FALSE(reader.ReadMessage());
213 EXPECT_EQ(reader.filename(), logfile1);
214 EXPECT_EQ(
215 reader.max_out_of_order_duration(),
216 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800217 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Austin Schuhc41603c2020-10-11 16:17:37 -0700218}
Austin Schuh32f68492020-11-08 21:45:51 -0800219
Austin Schuh1be0ce42020-11-29 22:43:26 -0800220// Tests that Message's operator < works as expected.
221TEST(MessageTest, Sorting) {
222 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
223
224 Message m1{.channel_index = 0,
225 .queue_index = 0,
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700226 .timestamp =
227 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
Austin Schuh1be0ce42020-11-29 22:43:26 -0800228 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
229 Message m2{.channel_index = 0,
230 .queue_index = 0,
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700231 .timestamp =
232 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
Austin Schuh1be0ce42020-11-29 22:43:26 -0800233 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
234
235 EXPECT_LT(m1, m2);
236 EXPECT_GE(m2, m1);
237
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700238 m1.timestamp.time = e;
239 m2.timestamp.time = e;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800240
241 m1.channel_index = 1;
242 m2.channel_index = 2;
243
244 EXPECT_LT(m1, m2);
245 EXPECT_GE(m2, m1);
246
247 m1.channel_index = 0;
248 m2.channel_index = 0;
249 m1.queue_index = 0;
250 m2.queue_index = 1;
251
252 EXPECT_LT(m1, m2);
253 EXPECT_GE(m2, m1);
254}
255
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800256aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
257 const aos::FlatbufferDetachedBuffer<Configuration> &config,
258 const std::string_view json) {
259 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700260 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800261 flatbuffers::Offset<Configuration> config_offset =
262 aos::CopyFlatBuffer(config, &fbb);
263 LogFileHeader::Builder header_builder(fbb);
264 header_builder.add_configuration(config_offset);
265 fbb.Finish(header_builder.Finish());
266 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
267
268 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
269 JsonToFlatbuffer<LogFileHeader>(json));
270 CHECK(header_updates.Verify());
271 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700272 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800273 fbb2.FinishSizePrefixed(
274 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
275 return fbb2.Release();
276}
277
278class SortingElementTest : public ::testing::Test {
279 public:
280 SortingElementTest()
281 : config_(JsonToFlatbuffer<Configuration>(
282 R"({
283 "channels": [
284 {
285 "name": "/a",
286 "type": "aos.logger.testing.TestMessage",
287 "source_node": "pi1",
288 "destination_nodes": [
289 {
290 "name": "pi2"
291 },
292 {
293 "name": "pi3"
294 }
295 ]
296 },
297 {
298 "name": "/b",
299 "type": "aos.logger.testing.TestMessage",
300 "source_node": "pi1"
301 },
302 {
303 "name": "/c",
304 "type": "aos.logger.testing.TestMessage",
305 "source_node": "pi1"
306 }
307 ],
308 "nodes": [
309 {
310 "name": "pi1"
311 },
312 {
313 "name": "pi2"
314 },
315 {
316 "name": "pi3"
317 }
318 ]
319}
320)")),
321 config0_(MakeHeader(config_, R"({
322 /* 100ms */
323 "max_out_of_order_duration": 100000000,
324 "node": {
325 "name": "pi1"
326 },
327 "logger_node": {
328 "name": "pi1"
329 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800330 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800331 "realtime_start_time": 1000000000000,
332 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
333 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
334 "parts_index": 0
335})")),
336 config1_(MakeHeader(config_,
337 R"({
338 /* 100ms */
339 "max_out_of_order_duration": 100000000,
340 "node": {
341 "name": "pi1"
342 },
343 "logger_node": {
344 "name": "pi1"
345 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800346 "monotonic_start_time": 1000000,
347 "realtime_start_time": 1000000000000,
348 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
349 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
350 "parts_index": 0
351})")),
352 config2_(MakeHeader(config_,
353 R"({
354 /* 100ms */
355 "max_out_of_order_duration": 100000000,
356 "node": {
357 "name": "pi2"
358 },
359 "logger_node": {
360 "name": "pi2"
361 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800362 "monotonic_start_time": 0,
363 "realtime_start_time": 1000000000000,
Austin Schuhd2f96102020-12-01 20:27:29 -0800364 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
365 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
366 "parts_index": 0
367})")),
368 config3_(MakeHeader(config_,
369 R"({
370 /* 100ms */
371 "max_out_of_order_duration": 100000000,
372 "node": {
373 "name": "pi1"
374 },
375 "logger_node": {
376 "name": "pi1"
377 },
378 "monotonic_start_time": 2000000,
379 "realtime_start_time": 1000000000,
380 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
381 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800382 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800383})")),
384 config4_(MakeHeader(config_,
385 R"({
386 /* 100ms */
387 "max_out_of_order_duration": 100000000,
388 "node": {
389 "name": "pi2"
390 },
391 "logger_node": {
392 "name": "pi1"
393 },
394 "monotonic_start_time": 2000000,
395 "realtime_start_time": 1000000000,
396 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
397 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
398 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800399})")) {
400 unlink(logfile0_.c_str());
401 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800402 unlink(logfile2_.c_str());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800403 queue_index_.resize(kChannels);
404 }
405
406 protected:
407 static constexpr size_t kChannels = 3u;
408
409 flatbuffers::DetachedBuffer MakeLogMessage(
410 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
411 int value) {
412 flatbuffers::FlatBufferBuilder message_fbb;
413 message_fbb.ForceDefaults(true);
414 TestMessage::Builder test_message_builder(message_fbb);
415 test_message_builder.add_value(value);
416 message_fbb.Finish(test_message_builder.Finish());
417
418 aos::Context context;
419 context.monotonic_event_time = monotonic_now;
420 context.realtime_event_time = aos::realtime_clock::epoch() +
421 chrono::seconds(1000) +
422 monotonic_now.time_since_epoch();
423 context.queue_index = queue_index_[channel_index];
424 context.size = message_fbb.GetSize();
425 context.data = message_fbb.GetBufferPointer();
426
427 ++queue_index_[channel_index];
428
429 flatbuffers::FlatBufferBuilder fbb;
430 fbb.FinishSizePrefixed(
431 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
432
433 return fbb.Release();
434 }
435
436 flatbuffers::DetachedBuffer MakeTimestampMessage(
437 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800438 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
439 monotonic_clock::time_point monotonic_timestamp_time =
440 monotonic_clock::min_time) {
441 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800442 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800443
444 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800445 fbb.ForceDefaults(true);
446
447 logger::MessageHeader::Builder message_header_builder(fbb);
448
449 message_header_builder.add_channel_index(channel_index);
450
451 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
452 100);
453 message_header_builder.add_monotonic_sent_time(
454 monotonic_sent_time.time_since_epoch().count());
455 message_header_builder.add_realtime_sent_time(
456 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
457 monotonic_sent_time.time_since_epoch())
458 .time_since_epoch()
459 .count());
460
461 message_header_builder.add_monotonic_remote_time(
462 sender_monotonic_now.time_since_epoch().count());
463 message_header_builder.add_realtime_remote_time(
464 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
465 sender_monotonic_now.time_since_epoch())
466 .time_since_epoch()
467 .count());
468 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
469 1);
470
471 if (monotonic_timestamp_time != monotonic_clock::min_time) {
472 message_header_builder.add_monotonic_timestamp_time(
473 monotonic_timestamp_time.time_since_epoch().count());
474 }
475
476 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800477 LOG(INFO) << aos::FlatbufferToJson(
478 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
479 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
480
481 return fbb.Release();
482 }
483
484 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
485 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800486 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800487
488 const aos::FlatbufferDetachedBuffer<Configuration> config_;
489 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
490 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800491 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
492 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800493 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800494
495 std::vector<uint32_t> queue_index_;
496};
497
498using LogPartsSorterTest = SortingElementTest;
499using LogPartsSorterDeathTest = LogPartsSorterTest;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800500using NodeMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800501using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800502
503// Tests that we can pull messages out of a log sorted in order.
504TEST_F(LogPartsSorterTest, Pull) {
505 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
506 {
507 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
508 writer.QueueSpan(config0_.span());
509 writer.QueueSizedFlatbuffer(
510 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
511 writer.QueueSizedFlatbuffer(
512 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
513 writer.QueueSizedFlatbuffer(
514 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
515 writer.QueueSizedFlatbuffer(
516 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
517 }
518
519 const std::vector<LogFile> parts = SortParts({logfile0_});
520
521 LogPartsSorter parts_sorter(parts[0].parts[0]);
522
523 // Confirm we aren't sorted until any time until the message is popped.
524 // Peeking shouldn't change the sorted until time.
525 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
526
527 std::deque<Message> output;
528
529 ASSERT_TRUE(parts_sorter.Front() != nullptr);
530 output.emplace_back(std::move(*parts_sorter.Front()));
531 parts_sorter.PopFront();
532 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
533
534 ASSERT_TRUE(parts_sorter.Front() != nullptr);
535 output.emplace_back(std::move(*parts_sorter.Front()));
536 parts_sorter.PopFront();
537 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
538
539 ASSERT_TRUE(parts_sorter.Front() != nullptr);
540 output.emplace_back(std::move(*parts_sorter.Front()));
541 parts_sorter.PopFront();
542 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
543
544 ASSERT_TRUE(parts_sorter.Front() != nullptr);
545 output.emplace_back(std::move(*parts_sorter.Front()));
546 parts_sorter.PopFront();
547 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
548
549 ASSERT_TRUE(parts_sorter.Front() == nullptr);
550
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700551 EXPECT_EQ(output[0].timestamp.boot, 0);
552 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
553 EXPECT_EQ(output[1].timestamp.boot, 0);
554 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1000));
555 EXPECT_EQ(output[2].timestamp.boot, 0);
556 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1901));
557 EXPECT_EQ(output[3].timestamp.boot, 0);
558 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800559}
560
Austin Schuhb000de62020-12-03 22:00:40 -0800561// Tests that we can pull messages out of a log sorted in order.
562TEST_F(LogPartsSorterTest, WayBeforeStart) {
563 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
564 {
565 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
566 writer.QueueSpan(config0_.span());
567 writer.QueueSizedFlatbuffer(
568 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
569 writer.QueueSizedFlatbuffer(
570 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
571 writer.QueueSizedFlatbuffer(
572 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
573 writer.QueueSizedFlatbuffer(
574 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
575 writer.QueueSizedFlatbuffer(
576 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
577 }
578
579 const std::vector<LogFile> parts = SortParts({logfile0_});
580
581 LogPartsSorter parts_sorter(parts[0].parts[0]);
582
583 // Confirm we aren't sorted until any time until the message is popped.
584 // Peeking shouldn't change the sorted until time.
585 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
586
587 std::deque<Message> output;
588
589 for (monotonic_clock::time_point t :
590 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
591 e + chrono::milliseconds(1900), monotonic_clock::max_time,
592 monotonic_clock::max_time}) {
593 ASSERT_TRUE(parts_sorter.Front() != nullptr);
594 output.emplace_back(std::move(*parts_sorter.Front()));
595 parts_sorter.PopFront();
596 EXPECT_EQ(parts_sorter.sorted_until(), t);
597 }
598
599 ASSERT_TRUE(parts_sorter.Front() == nullptr);
600
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700601 EXPECT_EQ(output[0].timestamp.boot, 0u);
602 EXPECT_EQ(output[0].timestamp.time, e - chrono::milliseconds(1000));
603 EXPECT_EQ(output[1].timestamp.boot, 0u);
604 EXPECT_EQ(output[1].timestamp.time, e - chrono::milliseconds(500));
605 EXPECT_EQ(output[2].timestamp.boot, 0u);
606 EXPECT_EQ(output[2].timestamp.time, e - chrono::milliseconds(10));
607 EXPECT_EQ(output[3].timestamp.boot, 0u);
608 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(1901));
609 EXPECT_EQ(output[4].timestamp.boot, 0u);
610 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuhb000de62020-12-03 22:00:40 -0800611}
612
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800613// Tests that messages too far out of order trigger death.
614TEST_F(LogPartsSorterDeathTest, Pull) {
615 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
616 {
617 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
618 writer.QueueSpan(config0_.span());
619 writer.QueueSizedFlatbuffer(
620 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
621 writer.QueueSizedFlatbuffer(
622 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
623 writer.QueueSizedFlatbuffer(
624 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
625 // The following message is too far out of order and will trigger the CHECK.
626 writer.QueueSizedFlatbuffer(
627 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
628 }
629
630 const std::vector<LogFile> parts = SortParts({logfile0_});
631
632 LogPartsSorter parts_sorter(parts[0].parts[0]);
633
634 // Confirm we aren't sorted until any time until the message is popped.
635 // Peeking shouldn't change the sorted until time.
636 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
637 std::deque<Message> output;
638
639 ASSERT_TRUE(parts_sorter.Front() != nullptr);
640 parts_sorter.PopFront();
641 ASSERT_TRUE(parts_sorter.Front() != nullptr);
642 ASSERT_TRUE(parts_sorter.Front() != nullptr);
643 parts_sorter.PopFront();
644
Austin Schuha040c3f2021-02-13 16:09:07 -0800645 EXPECT_DEATH({ parts_sorter.Front(); }, "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800646}
647
Austin Schuh8f52ed52020-11-30 23:12:39 -0800648// Tests that we can merge data from 2 separate files, including duplicate data.
649TEST_F(NodeMergerTest, TwoFileMerger) {
650 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
651 {
652 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
653 writer0.QueueSpan(config0_.span());
654 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
655 writer1.QueueSpan(config1_.span());
656
657 writer0.QueueSizedFlatbuffer(
658 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
659 writer1.QueueSizedFlatbuffer(
660 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
661
662 writer0.QueueSizedFlatbuffer(
663 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
664 writer1.QueueSizedFlatbuffer(
665 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
666
667 // Make a duplicate!
668 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
669 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
670 writer0.QueueSpan(msg.span());
671 writer1.QueueSpan(msg.span());
672
673 writer1.QueueSizedFlatbuffer(
674 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
675 }
676
677 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Austin Schuhd2f96102020-12-01 20:27:29 -0800678 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800679
Austin Schuhd2f96102020-12-01 20:27:29 -0800680 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800681
682 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
683
684 std::deque<Message> output;
685
686 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
687 ASSERT_TRUE(merger.Front() != nullptr);
688 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
689
690 output.emplace_back(std::move(*merger.Front()));
691 merger.PopFront();
692 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
693
694 ASSERT_TRUE(merger.Front() != nullptr);
695 output.emplace_back(std::move(*merger.Front()));
696 merger.PopFront();
697 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
698
699 ASSERT_TRUE(merger.Front() != nullptr);
700 output.emplace_back(std::move(*merger.Front()));
701 merger.PopFront();
702 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
703
704 ASSERT_TRUE(merger.Front() != nullptr);
705 output.emplace_back(std::move(*merger.Front()));
706 merger.PopFront();
707 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
708
709 ASSERT_TRUE(merger.Front() != nullptr);
710 output.emplace_back(std::move(*merger.Front()));
711 merger.PopFront();
712 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
713
714 ASSERT_TRUE(merger.Front() != nullptr);
715 output.emplace_back(std::move(*merger.Front()));
716 merger.PopFront();
717 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
718
719 ASSERT_TRUE(merger.Front() == nullptr);
720
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700721 EXPECT_EQ(output[0].timestamp.boot, 0u);
722 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
723 EXPECT_EQ(output[1].timestamp.boot, 0u);
724 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1001));
725 EXPECT_EQ(output[2].timestamp.boot, 0u);
726 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1002));
727 EXPECT_EQ(output[3].timestamp.boot, 0u);
728 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
729 EXPECT_EQ(output[4].timestamp.boot, 0u);
730 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(3000));
731 EXPECT_EQ(output[5].timestamp.boot, 0u);
732 EXPECT_EQ(output[5].timestamp.time, e + chrono::milliseconds(3002));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800733}
734
Austin Schuh8bf1e632021-01-02 22:41:04 -0800735// Tests that we can merge timestamps with various combinations of
736// monotonic_timestamp_time.
737TEST_F(NodeMergerTest, TwoFileTimestampMerger) {
738 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
739 {
740 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
741 writer0.QueueSpan(config0_.span());
742 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
743 writer1.QueueSpan(config1_.span());
744
745 // Neither has it.
746 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
747 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
748 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
749 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
750 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
751
752 // First only has it.
753 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
754 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
755 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
756 e + chrono::nanoseconds(971)));
757 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
758 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
759
760 // Second only has it.
761 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
762 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
763 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
764 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
765 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
766 e + chrono::nanoseconds(972)));
767
768 // Both have it.
769 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
770 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
771 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
772 e + chrono::nanoseconds(973)));
773 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
774 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
775 e + chrono::nanoseconds(973)));
776 }
777
778 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
779 ASSERT_EQ(parts.size(), 1u);
780
781 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
782
783 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
784
785 std::deque<Message> output;
786
787 for (int i = 0; i < 4; ++i) {
788 ASSERT_TRUE(merger.Front() != nullptr);
789 output.emplace_back(std::move(*merger.Front()));
790 merger.PopFront();
791 }
792 ASSERT_TRUE(merger.Front() == nullptr);
793
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700794 EXPECT_EQ(output[0].timestamp.boot, 0u);
795 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800796 EXPECT_FALSE(output[0].data.message().has_monotonic_timestamp_time());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700797
798 EXPECT_EQ(output[1].timestamp.boot, 0u);
799 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800800 EXPECT_TRUE(output[1].data.message().has_monotonic_timestamp_time());
801 EXPECT_EQ(output[1].data.message().monotonic_timestamp_time(), 971);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700802
803 EXPECT_EQ(output[2].timestamp.boot, 0u);
804 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800805 EXPECT_TRUE(output[2].data.message().has_monotonic_timestamp_time());
806 EXPECT_EQ(output[2].data.message().monotonic_timestamp_time(), 972);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700807
808 EXPECT_EQ(output[3].timestamp.boot, 0u);
809 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800810 EXPECT_TRUE(output[3].data.message().has_monotonic_timestamp_time());
811 EXPECT_EQ(output[3].data.message().monotonic_timestamp_time(), 973);
812}
813
Austin Schuhd2f96102020-12-01 20:27:29 -0800814// Tests that we can match timestamps on delivered messages.
815TEST_F(TimestampMapperTest, ReadNode0First) {
816 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
817 {
818 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
819 writer0.QueueSpan(config0_.span());
820 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
821 writer1.QueueSpan(config2_.span());
822
823 writer0.QueueSizedFlatbuffer(
824 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
825 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
826 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
827
828 writer0.QueueSizedFlatbuffer(
829 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
830 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
831 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
832
833 writer0.QueueSizedFlatbuffer(
834 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
835 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
836 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
837 }
838
839 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
840
841 ASSERT_EQ(parts[0].logger_node, "pi1");
842 ASSERT_EQ(parts[1].logger_node, "pi2");
843
Austin Schuh79b30942021-01-24 22:32:21 -0800844 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800845 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -0800846 mapper0.set_timestamp_callback(
847 [&](TimestampedMessage *) { ++mapper0_count; });
848 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800849 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -0800850 mapper1.set_timestamp_callback(
851 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -0800852
853 mapper0.AddPeer(&mapper1);
854 mapper1.AddPeer(&mapper0);
855
856 {
857 std::deque<TimestampedMessage> output0;
858
Austin Schuh79b30942021-01-24 22:32:21 -0800859 EXPECT_EQ(mapper0_count, 0u);
860 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800861 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800862 EXPECT_EQ(mapper0_count, 1u);
863 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800864 output0.emplace_back(std::move(*mapper0.Front()));
865 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700866 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800867 EXPECT_EQ(mapper0_count, 1u);
868 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800869
870 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800871 EXPECT_EQ(mapper0_count, 2u);
872 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800873 output0.emplace_back(std::move(*mapper0.Front()));
874 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700875 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800876
877 ASSERT_TRUE(mapper0.Front() != nullptr);
878 output0.emplace_back(std::move(*mapper0.Front()));
879 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700880 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800881
Austin Schuh79b30942021-01-24 22:32:21 -0800882 EXPECT_EQ(mapper0_count, 3u);
883 EXPECT_EQ(mapper1_count, 0u);
884
Austin Schuhd2f96102020-12-01 20:27:29 -0800885 ASSERT_TRUE(mapper0.Front() == nullptr);
886
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700887 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
888 EXPECT_EQ(output0[0].monotonic_event_time.time,
889 e + chrono::milliseconds(1000));
Austin Schuhd2f96102020-12-01 20:27:29 -0800890 EXPECT_TRUE(output0[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700891
892 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
893 EXPECT_EQ(output0[1].monotonic_event_time.time,
894 e + chrono::milliseconds(2000));
Austin Schuhd2f96102020-12-01 20:27:29 -0800895 EXPECT_TRUE(output0[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700896
897 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
898 EXPECT_EQ(output0[2].monotonic_event_time.time,
899 e + chrono::milliseconds(3000));
Austin Schuhd2f96102020-12-01 20:27:29 -0800900 EXPECT_TRUE(output0[2].data.Verify());
901 }
902
903 {
904 SCOPED_TRACE("Trying node1 now");
905 std::deque<TimestampedMessage> output1;
906
Austin Schuh79b30942021-01-24 22:32:21 -0800907 EXPECT_EQ(mapper0_count, 3u);
908 EXPECT_EQ(mapper1_count, 0u);
909
Austin Schuhd2f96102020-12-01 20:27:29 -0800910 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800911 EXPECT_EQ(mapper0_count, 3u);
912 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800913 output1.emplace_back(std::move(*mapper1.Front()));
914 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700915 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800916 EXPECT_EQ(mapper0_count, 3u);
917 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800918
919 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800920 EXPECT_EQ(mapper0_count, 3u);
921 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800922 output1.emplace_back(std::move(*mapper1.Front()));
923 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700924 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800925
926 ASSERT_TRUE(mapper1.Front() != nullptr);
927 output1.emplace_back(std::move(*mapper1.Front()));
928 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700929 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800930
Austin Schuh79b30942021-01-24 22:32:21 -0800931 EXPECT_EQ(mapper0_count, 3u);
932 EXPECT_EQ(mapper1_count, 3u);
933
Austin Schuhd2f96102020-12-01 20:27:29 -0800934 ASSERT_TRUE(mapper1.Front() == nullptr);
935
Austin Schuh79b30942021-01-24 22:32:21 -0800936 EXPECT_EQ(mapper0_count, 3u);
937 EXPECT_EQ(mapper1_count, 3u);
938
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700939 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
940 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -0800941 e + chrono::seconds(100) + chrono::milliseconds(1000));
942 EXPECT_TRUE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700943
944 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
945 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -0800946 e + chrono::seconds(100) + chrono::milliseconds(2000));
947 EXPECT_TRUE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700948
949 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
950 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -0800951 e + chrono::seconds(100) + chrono::milliseconds(3000));
952 EXPECT_TRUE(output1[2].data.Verify());
953 }
954}
955
Austin Schuh8bf1e632021-01-02 22:41:04 -0800956// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
957// returned.
958TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
959 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
960 {
961 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
962 writer0.QueueSpan(config0_.span());
963 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
964 writer1.QueueSpan(config4_.span());
965
966 writer0.QueueSizedFlatbuffer(
967 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
968 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
969 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
970 e + chrono::nanoseconds(971)));
971
972 writer0.QueueSizedFlatbuffer(
973 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
974 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
975 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
976 e + chrono::nanoseconds(5458)));
977
978 writer0.QueueSizedFlatbuffer(
979 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
980 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
981 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
982 }
983
984 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
985
986 for (const auto &p : parts) {
987 LOG(INFO) << p;
988 }
989
990 ASSERT_EQ(parts.size(), 1u);
991
Austin Schuh79b30942021-01-24 22:32:21 -0800992 size_t mapper0_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800993 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -0800994 mapper0.set_timestamp_callback(
995 [&](TimestampedMessage *) { ++mapper0_count; });
996 size_t mapper1_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800997 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -0800998 mapper1.set_timestamp_callback(
999 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -08001000
1001 mapper0.AddPeer(&mapper1);
1002 mapper1.AddPeer(&mapper0);
1003
1004 {
1005 std::deque<TimestampedMessage> output0;
1006
1007 for (int i = 0; i < 3; ++i) {
1008 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
1009 output0.emplace_back(std::move(*mapper0.Front()));
1010 mapper0.PopFront();
1011 }
1012
1013 ASSERT_TRUE(mapper0.Front() == nullptr);
1014
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001015 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1016 EXPECT_EQ(output0[0].monotonic_event_time.time,
1017 e + chrono::milliseconds(1000));
1018 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
1019 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
1020 monotonic_clock::min_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001021 EXPECT_TRUE(output0[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001022
1023 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1024 EXPECT_EQ(output0[1].monotonic_event_time.time,
1025 e + chrono::milliseconds(2000));
1026 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
1027 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
1028 monotonic_clock::min_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001029 EXPECT_TRUE(output0[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001030
1031 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1032 EXPECT_EQ(output0[2].monotonic_event_time.time,
1033 e + chrono::milliseconds(3000));
1034 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
1035 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
1036 monotonic_clock::min_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001037 EXPECT_TRUE(output0[2].data.Verify());
1038 }
1039
1040 {
1041 SCOPED_TRACE("Trying node1 now");
1042 std::deque<TimestampedMessage> output1;
1043
1044 for (int i = 0; i < 3; ++i) {
1045 ASSERT_TRUE(mapper1.Front() != nullptr);
1046 output1.emplace_back(std::move(*mapper1.Front()));
1047 mapper1.PopFront();
1048 }
1049
1050 ASSERT_TRUE(mapper1.Front() == nullptr);
1051
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001052 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1053 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001054 e + chrono::seconds(100) + chrono::milliseconds(1000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001055 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
1056 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001057 e + chrono::nanoseconds(971));
1058 EXPECT_TRUE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001059
1060 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1061 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001062 e + chrono::seconds(100) + chrono::milliseconds(2000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001063 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
1064 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001065 e + chrono::nanoseconds(5458));
1066 EXPECT_TRUE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001067
1068 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1069 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001070 e + chrono::seconds(100) + chrono::milliseconds(3000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001071 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
1072 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
1073 monotonic_clock::min_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001074 EXPECT_TRUE(output1[2].data.Verify());
1075 }
Austin Schuh79b30942021-01-24 22:32:21 -08001076
1077 EXPECT_EQ(mapper0_count, 3u);
1078 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001079}
1080
Austin Schuhd2f96102020-12-01 20:27:29 -08001081// Tests that we can match timestamps on delivered messages. By doing this in
1082// the reverse order, the second node needs to queue data up from the first node
1083// to find the matching timestamp.
1084TEST_F(TimestampMapperTest, ReadNode1First) {
1085 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1086 {
1087 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1088 writer0.QueueSpan(config0_.span());
1089 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1090 writer1.QueueSpan(config2_.span());
1091
1092 writer0.QueueSizedFlatbuffer(
1093 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1094 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1095 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1096
1097 writer0.QueueSizedFlatbuffer(
1098 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1099 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1100 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1101
1102 writer0.QueueSizedFlatbuffer(
1103 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1104 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1105 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1106 }
1107
1108 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1109
1110 ASSERT_EQ(parts[0].logger_node, "pi1");
1111 ASSERT_EQ(parts[1].logger_node, "pi2");
1112
Austin Schuh79b30942021-01-24 22:32:21 -08001113 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001114 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001115 mapper0.set_timestamp_callback(
1116 [&](TimestampedMessage *) { ++mapper0_count; });
1117 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001118 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001119 mapper1.set_timestamp_callback(
1120 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001121
1122 mapper0.AddPeer(&mapper1);
1123 mapper1.AddPeer(&mapper0);
1124
1125 {
1126 SCOPED_TRACE("Trying node1 now");
1127 std::deque<TimestampedMessage> output1;
1128
1129 ASSERT_TRUE(mapper1.Front() != nullptr);
1130 output1.emplace_back(std::move(*mapper1.Front()));
1131 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001132 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001133
1134 ASSERT_TRUE(mapper1.Front() != nullptr);
1135 output1.emplace_back(std::move(*mapper1.Front()));
1136 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001137 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001138
1139 ASSERT_TRUE(mapper1.Front() != nullptr);
1140 output1.emplace_back(std::move(*mapper1.Front()));
1141 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001142 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001143
1144 ASSERT_TRUE(mapper1.Front() == nullptr);
1145
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001146 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1147 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001148 e + chrono::seconds(100) + chrono::milliseconds(1000));
1149 EXPECT_TRUE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001150
1151 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1152 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001153 e + chrono::seconds(100) + chrono::milliseconds(2000));
1154 EXPECT_TRUE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001155
1156 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1157 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001158 e + chrono::seconds(100) + chrono::milliseconds(3000));
1159 EXPECT_TRUE(output1[2].data.Verify());
1160 }
1161
1162 {
1163 std::deque<TimestampedMessage> output0;
1164
1165 ASSERT_TRUE(mapper0.Front() != nullptr);
1166 output0.emplace_back(std::move(*mapper0.Front()));
1167 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001168 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001169
1170 ASSERT_TRUE(mapper0.Front() != nullptr);
1171 output0.emplace_back(std::move(*mapper0.Front()));
1172 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001173 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001174
1175 ASSERT_TRUE(mapper0.Front() != nullptr);
1176 output0.emplace_back(std::move(*mapper0.Front()));
1177 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001178 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001179
1180 ASSERT_TRUE(mapper0.Front() == nullptr);
1181
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001182 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1183 EXPECT_EQ(output0[0].monotonic_event_time.time,
1184 e + chrono::milliseconds(1000));
Austin Schuhd2f96102020-12-01 20:27:29 -08001185 EXPECT_TRUE(output0[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001186
1187 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1188 EXPECT_EQ(output0[1].monotonic_event_time.time,
1189 e + chrono::milliseconds(2000));
Austin Schuhd2f96102020-12-01 20:27:29 -08001190 EXPECT_TRUE(output0[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001191
1192 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1193 EXPECT_EQ(output0[2].monotonic_event_time.time,
1194 e + chrono::milliseconds(3000));
Austin Schuhd2f96102020-12-01 20:27:29 -08001195 EXPECT_TRUE(output0[2].data.Verify());
1196 }
Austin Schuh79b30942021-01-24 22:32:21 -08001197
1198 EXPECT_EQ(mapper0_count, 3u);
1199 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001200}
1201
1202// Tests that we return just the timestamps if we couldn't find the data and the
1203// missing data was at the beginning of the file.
1204TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1205 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1206 {
1207 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1208 writer0.QueueSpan(config0_.span());
1209 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1210 writer1.QueueSpan(config2_.span());
1211
1212 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
1213 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1214 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1215
1216 writer0.QueueSizedFlatbuffer(
1217 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1218 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1219 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1220
1221 writer0.QueueSizedFlatbuffer(
1222 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1223 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1224 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1225 }
1226
1227 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1228
1229 ASSERT_EQ(parts[0].logger_node, "pi1");
1230 ASSERT_EQ(parts[1].logger_node, "pi2");
1231
Austin Schuh79b30942021-01-24 22:32:21 -08001232 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001233 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001234 mapper0.set_timestamp_callback(
1235 [&](TimestampedMessage *) { ++mapper0_count; });
1236 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001237 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001238 mapper1.set_timestamp_callback(
1239 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001240
1241 mapper0.AddPeer(&mapper1);
1242 mapper1.AddPeer(&mapper0);
1243
1244 {
1245 SCOPED_TRACE("Trying node1 now");
1246 std::deque<TimestampedMessage> output1;
1247
1248 ASSERT_TRUE(mapper1.Front() != nullptr);
1249 output1.emplace_back(std::move(*mapper1.Front()));
1250 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001251 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001252
1253 ASSERT_TRUE(mapper1.Front() != nullptr);
1254 output1.emplace_back(std::move(*mapper1.Front()));
1255 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001256 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001257
1258 ASSERT_TRUE(mapper1.Front() != nullptr);
1259 output1.emplace_back(std::move(*mapper1.Front()));
1260 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001261 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001262
1263 ASSERT_TRUE(mapper1.Front() == nullptr);
1264
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001265 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1266 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001267 e + chrono::seconds(100) + chrono::milliseconds(1000));
1268 EXPECT_FALSE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001269
1270 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1271 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001272 e + chrono::seconds(100) + chrono::milliseconds(2000));
1273 EXPECT_TRUE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001274
1275 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1276 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001277 e + chrono::seconds(100) + chrono::milliseconds(3000));
1278 EXPECT_TRUE(output1[2].data.Verify());
1279 }
Austin Schuh79b30942021-01-24 22:32:21 -08001280
1281 EXPECT_EQ(mapper0_count, 0u);
1282 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001283}
1284
1285// Tests that we return just the timestamps if we couldn't find the data and the
1286// missing data was at the end of the file.
1287TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1288 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1289 {
1290 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1291 writer0.QueueSpan(config0_.span());
1292 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1293 writer1.QueueSpan(config2_.span());
1294
1295 writer0.QueueSizedFlatbuffer(
1296 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1297 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1298 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1299
1300 writer0.QueueSizedFlatbuffer(
1301 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1302 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1303 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1304
1305 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
1306 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1307 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1308 }
1309
1310 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1311
1312 ASSERT_EQ(parts[0].logger_node, "pi1");
1313 ASSERT_EQ(parts[1].logger_node, "pi2");
1314
Austin Schuh79b30942021-01-24 22:32:21 -08001315 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001316 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001317 mapper0.set_timestamp_callback(
1318 [&](TimestampedMessage *) { ++mapper0_count; });
1319 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001320 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001321 mapper1.set_timestamp_callback(
1322 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001323
1324 mapper0.AddPeer(&mapper1);
1325 mapper1.AddPeer(&mapper0);
1326
1327 {
1328 SCOPED_TRACE("Trying node1 now");
1329 std::deque<TimestampedMessage> output1;
1330
1331 ASSERT_TRUE(mapper1.Front() != nullptr);
1332 output1.emplace_back(std::move(*mapper1.Front()));
1333 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001334 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001335
1336 ASSERT_TRUE(mapper1.Front() != nullptr);
1337 output1.emplace_back(std::move(*mapper1.Front()));
1338 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001339 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001340
1341 ASSERT_TRUE(mapper1.Front() != nullptr);
1342 output1.emplace_back(std::move(*mapper1.Front()));
1343 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001344 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001345
1346 ASSERT_TRUE(mapper1.Front() == nullptr);
1347
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001348 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1349 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001350 e + chrono::seconds(100) + chrono::milliseconds(1000));
1351 EXPECT_TRUE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001352
1353 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1354 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001355 e + chrono::seconds(100) + chrono::milliseconds(2000));
1356 EXPECT_TRUE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001357
1358 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1359 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001360 e + chrono::seconds(100) + chrono::milliseconds(3000));
1361 EXPECT_FALSE(output1[2].data.Verify());
1362 }
Austin Schuh79b30942021-01-24 22:32:21 -08001363
1364 EXPECT_EQ(mapper0_count, 0u);
1365 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001366}
1367
Austin Schuh993ccb52020-12-12 15:59:32 -08001368// Tests that we handle a message which failed to forward or be logged.
1369TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1370 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1371 {
1372 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1373 writer0.QueueSpan(config0_.span());
1374 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1375 writer1.QueueSpan(config2_.span());
1376
1377 writer0.QueueSizedFlatbuffer(
1378 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1379 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1380 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1381
1382 // Create both the timestamp and message, but don't log them, simulating a
1383 // forwarding drop.
1384 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1385 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1386 chrono::seconds(100));
1387
1388 writer0.QueueSizedFlatbuffer(
1389 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1390 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1391 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1392 }
1393
1394 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1395
1396 ASSERT_EQ(parts[0].logger_node, "pi1");
1397 ASSERT_EQ(parts[1].logger_node, "pi2");
1398
Austin Schuh79b30942021-01-24 22:32:21 -08001399 size_t mapper0_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001400 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001401 mapper0.set_timestamp_callback(
1402 [&](TimestampedMessage *) { ++mapper0_count; });
1403 size_t mapper1_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001404 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001405 mapper1.set_timestamp_callback(
1406 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001407
1408 mapper0.AddPeer(&mapper1);
1409 mapper1.AddPeer(&mapper0);
1410
1411 {
1412 std::deque<TimestampedMessage> output1;
1413
1414 ASSERT_TRUE(mapper1.Front() != nullptr);
1415 output1.emplace_back(std::move(*mapper1.Front()));
1416 mapper1.PopFront();
1417
1418 ASSERT_TRUE(mapper1.Front() != nullptr);
1419 output1.emplace_back(std::move(*mapper1.Front()));
1420
1421 ASSERT_FALSE(mapper1.Front() == nullptr);
1422
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001423 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1424 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001425 e + chrono::seconds(100) + chrono::milliseconds(1000));
1426 EXPECT_TRUE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001427
1428 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1429 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001430 e + chrono::seconds(100) + chrono::milliseconds(3000));
1431 EXPECT_TRUE(output1[1].data.Verify());
1432 }
Austin Schuh79b30942021-01-24 22:32:21 -08001433
1434 EXPECT_EQ(mapper0_count, 0u);
1435 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001436}
1437
Austin Schuhd2f96102020-12-01 20:27:29 -08001438// Tests that we properly sort log files with duplicate timestamps.
1439TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1440 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1441 {
1442 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1443 writer0.QueueSpan(config0_.span());
1444 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1445 writer1.QueueSpan(config2_.span());
1446
1447 writer0.QueueSizedFlatbuffer(
1448 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1449 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1450 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1451
1452 writer0.QueueSizedFlatbuffer(
1453 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1454 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1455 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1456
1457 writer0.QueueSizedFlatbuffer(
1458 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
1459 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1460 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1461
1462 writer0.QueueSizedFlatbuffer(
1463 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
1464 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1465 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1466 }
1467
1468 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1469
1470 ASSERT_EQ(parts[0].logger_node, "pi1");
1471 ASSERT_EQ(parts[1].logger_node, "pi2");
1472
Austin Schuh79b30942021-01-24 22:32:21 -08001473 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001474 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001475 mapper0.set_timestamp_callback(
1476 [&](TimestampedMessage *) { ++mapper0_count; });
1477 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001478 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001479 mapper1.set_timestamp_callback(
1480 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001481
1482 mapper0.AddPeer(&mapper1);
1483 mapper1.AddPeer(&mapper0);
1484
1485 {
1486 SCOPED_TRACE("Trying node1 now");
1487 std::deque<TimestampedMessage> output1;
1488
1489 for (int i = 0; i < 4; ++i) {
1490 ASSERT_TRUE(mapper1.Front() != nullptr);
1491 output1.emplace_back(std::move(*mapper1.Front()));
1492 mapper1.PopFront();
1493 }
1494 ASSERT_TRUE(mapper1.Front() == nullptr);
1495
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001496 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1497 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001498 e + chrono::seconds(100) + chrono::milliseconds(1000));
1499 EXPECT_TRUE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001500
1501 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1502 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001503 e + chrono::seconds(100) + chrono::milliseconds(2000));
1504 EXPECT_TRUE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001505
1506 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1507 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001508 e + chrono::seconds(100) + chrono::milliseconds(2000));
1509 EXPECT_TRUE(output1[2].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001510
1511 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1512 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001513 e + chrono::seconds(100) + chrono::milliseconds(3000));
1514 EXPECT_TRUE(output1[3].data.Verify());
1515 }
Austin Schuh79b30942021-01-24 22:32:21 -08001516
1517 EXPECT_EQ(mapper0_count, 0u);
1518 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001519}
1520
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001521// Tests that we properly produce a valid start time.
Austin Schuhd2f96102020-12-01 20:27:29 -08001522TEST_F(TimestampMapperTest, StartTime) {
1523 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1524 {
1525 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1526 writer0.QueueSpan(config0_.span());
1527 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1528 writer1.QueueSpan(config1_.span());
1529 DetachedBufferWriter writer2(logfile2_, std::make_unique<DummyEncoder>());
1530 writer2.QueueSpan(config3_.span());
1531 }
1532
1533 const std::vector<LogFile> parts =
1534 SortParts({logfile0_, logfile1_, logfile2_});
1535
Austin Schuh79b30942021-01-24 22:32:21 -08001536 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001537 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001538 mapper0.set_timestamp_callback(
1539 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001540
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001541 EXPECT_EQ(mapper0.monotonic_start_time(0), e + chrono::milliseconds(1));
1542 EXPECT_EQ(mapper0.realtime_start_time(0),
Austin Schuhd2f96102020-12-01 20:27:29 -08001543 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001544 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001545}
1546
Austin Schuhfecf1d82020-12-19 16:57:28 -08001547// Tests that when a peer isn't registered, we treat that as if there was no
1548// data available.
1549TEST_F(TimestampMapperTest, NoPeer) {
1550 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1551 {
1552 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1553 writer0.QueueSpan(config0_.span());
1554 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1555 writer1.QueueSpan(config2_.span());
1556
1557 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
1558 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1559 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1560
1561 writer0.QueueSizedFlatbuffer(
1562 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1563 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1564 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1565
1566 writer0.QueueSizedFlatbuffer(
1567 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1568 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1569 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1570 }
1571
1572 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1573
1574 ASSERT_EQ(parts[0].logger_node, "pi1");
1575 ASSERT_EQ(parts[1].logger_node, "pi2");
1576
Austin Schuh79b30942021-01-24 22:32:21 -08001577 size_t mapper1_count = 0;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001578 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001579 mapper1.set_timestamp_callback(
1580 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001581
1582 {
1583 std::deque<TimestampedMessage> output1;
1584
1585 ASSERT_TRUE(mapper1.Front() != nullptr);
1586 output1.emplace_back(std::move(*mapper1.Front()));
1587 mapper1.PopFront();
1588 ASSERT_TRUE(mapper1.Front() != nullptr);
1589 output1.emplace_back(std::move(*mapper1.Front()));
1590 mapper1.PopFront();
1591 ASSERT_TRUE(mapper1.Front() != nullptr);
1592 output1.emplace_back(std::move(*mapper1.Front()));
1593 mapper1.PopFront();
1594 ASSERT_TRUE(mapper1.Front() == nullptr);
1595
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001596 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1597 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001598 e + chrono::seconds(100) + chrono::milliseconds(1000));
1599 EXPECT_FALSE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001600
1601 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1602 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001603 e + chrono::seconds(100) + chrono::milliseconds(2000));
1604 EXPECT_FALSE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001605
1606 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1607 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001608 e + chrono::seconds(100) + chrono::milliseconds(3000));
1609 EXPECT_FALSE(output1[2].data.Verify());
1610 }
Austin Schuh79b30942021-01-24 22:32:21 -08001611 EXPECT_EQ(mapper1_count, 3u);
1612}
1613
1614// Tests that we can queue messages and call the timestamp callback for both
1615// nodes.
1616TEST_F(TimestampMapperTest, QueueUntilNode0) {
1617 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1618 {
1619 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1620 writer0.QueueSpan(config0_.span());
1621 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1622 writer1.QueueSpan(config2_.span());
1623
1624 writer0.QueueSizedFlatbuffer(
1625 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1626 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1627 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1628
1629 writer0.QueueSizedFlatbuffer(
1630 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
1631 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1632 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1633
1634 writer0.QueueSizedFlatbuffer(
1635 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
1636 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1637 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1638
1639 writer0.QueueSizedFlatbuffer(
1640 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
1641 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1642 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1643 }
1644
1645 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1646
1647 ASSERT_EQ(parts[0].logger_node, "pi1");
1648 ASSERT_EQ(parts[1].logger_node, "pi2");
1649
1650 size_t mapper0_count = 0;
1651 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1652 mapper0.set_timestamp_callback(
1653 [&](TimestampedMessage *) { ++mapper0_count; });
1654 size_t mapper1_count = 0;
1655 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1656 mapper1.set_timestamp_callback(
1657 [&](TimestampedMessage *) { ++mapper1_count; });
1658
1659 mapper0.AddPeer(&mapper1);
1660 mapper1.AddPeer(&mapper0);
1661
1662 {
1663 std::deque<TimestampedMessage> output0;
1664
1665 EXPECT_EQ(mapper0_count, 0u);
1666 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001667 mapper0.QueueUntil(
1668 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001669 EXPECT_EQ(mapper0_count, 3u);
1670 EXPECT_EQ(mapper1_count, 0u);
1671
1672 ASSERT_TRUE(mapper0.Front() != nullptr);
1673 EXPECT_EQ(mapper0_count, 3u);
1674 EXPECT_EQ(mapper1_count, 0u);
1675
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001676 mapper0.QueueUntil(
1677 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001678 EXPECT_EQ(mapper0_count, 3u);
1679 EXPECT_EQ(mapper1_count, 0u);
1680
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001681 mapper0.QueueUntil(
1682 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001683 EXPECT_EQ(mapper0_count, 4u);
1684 EXPECT_EQ(mapper1_count, 0u);
1685
1686 output0.emplace_back(std::move(*mapper0.Front()));
1687 mapper0.PopFront();
1688 output0.emplace_back(std::move(*mapper0.Front()));
1689 mapper0.PopFront();
1690 output0.emplace_back(std::move(*mapper0.Front()));
1691 mapper0.PopFront();
1692 output0.emplace_back(std::move(*mapper0.Front()));
1693 mapper0.PopFront();
1694
1695 EXPECT_EQ(mapper0_count, 4u);
1696 EXPECT_EQ(mapper1_count, 0u);
1697
1698 ASSERT_TRUE(mapper0.Front() == nullptr);
1699
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001700 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1701 EXPECT_EQ(output0[0].monotonic_event_time.time,
1702 e + chrono::milliseconds(1000));
Austin Schuh79b30942021-01-24 22:32:21 -08001703 EXPECT_TRUE(output0[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001704
1705 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1706 EXPECT_EQ(output0[1].monotonic_event_time.time,
1707 e + chrono::milliseconds(1000));
Austin Schuh79b30942021-01-24 22:32:21 -08001708 EXPECT_TRUE(output0[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001709
1710 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1711 EXPECT_EQ(output0[2].monotonic_event_time.time,
1712 e + chrono::milliseconds(2000));
Austin Schuh79b30942021-01-24 22:32:21 -08001713 EXPECT_TRUE(output0[2].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001714
1715 EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
1716 EXPECT_EQ(output0[3].monotonic_event_time.time,
1717 e + chrono::milliseconds(3000));
Austin Schuh79b30942021-01-24 22:32:21 -08001718 EXPECT_TRUE(output0[3].data.Verify());
1719 }
1720
1721 {
1722 SCOPED_TRACE("Trying node1 now");
1723 std::deque<TimestampedMessage> output1;
1724
1725 EXPECT_EQ(mapper0_count, 4u);
1726 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001727 mapper1.QueueUntil(BootTimestamp{
1728 .boot = 0,
1729 .time = e + chrono::seconds(100) + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001730 EXPECT_EQ(mapper0_count, 4u);
1731 EXPECT_EQ(mapper1_count, 3u);
1732
1733 ASSERT_TRUE(mapper1.Front() != nullptr);
1734 EXPECT_EQ(mapper0_count, 4u);
1735 EXPECT_EQ(mapper1_count, 3u);
1736
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001737 mapper1.QueueUntil(BootTimestamp{
1738 .boot = 0,
1739 .time = e + chrono::seconds(100) + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001740 EXPECT_EQ(mapper0_count, 4u);
1741 EXPECT_EQ(mapper1_count, 3u);
1742
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001743 mapper1.QueueUntil(BootTimestamp{
1744 .boot = 0,
1745 .time = e + chrono::seconds(100) + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001746 EXPECT_EQ(mapper0_count, 4u);
1747 EXPECT_EQ(mapper1_count, 4u);
1748
1749 ASSERT_TRUE(mapper1.Front() != nullptr);
1750 EXPECT_EQ(mapper0_count, 4u);
1751 EXPECT_EQ(mapper1_count, 4u);
1752
1753 output1.emplace_back(std::move(*mapper1.Front()));
1754 mapper1.PopFront();
1755 ASSERT_TRUE(mapper1.Front() != nullptr);
1756 output1.emplace_back(std::move(*mapper1.Front()));
1757 mapper1.PopFront();
1758 ASSERT_TRUE(mapper1.Front() != nullptr);
1759 output1.emplace_back(std::move(*mapper1.Front()));
1760 mapper1.PopFront();
1761 ASSERT_TRUE(mapper1.Front() != nullptr);
1762 output1.emplace_back(std::move(*mapper1.Front()));
1763 mapper1.PopFront();
1764
1765 EXPECT_EQ(mapper0_count, 4u);
1766 EXPECT_EQ(mapper1_count, 4u);
1767
1768 ASSERT_TRUE(mapper1.Front() == nullptr);
1769
1770 EXPECT_EQ(mapper0_count, 4u);
1771 EXPECT_EQ(mapper1_count, 4u);
1772
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001773 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1774 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001775 e + chrono::seconds(100) + chrono::milliseconds(1000));
1776 EXPECT_TRUE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001777
1778 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1779 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001780 e + chrono::seconds(100) + chrono::milliseconds(1000));
1781 EXPECT_TRUE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001782
1783 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1784 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001785 e + chrono::seconds(100) + chrono::milliseconds(2000));
1786 EXPECT_TRUE(output1[2].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001787
1788 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1789 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001790 e + chrono::seconds(100) + chrono::milliseconds(3000));
1791 EXPECT_TRUE(output1[3].data.Verify());
1792 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08001793}
1794
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001795class BootMergerTest : public SortingElementTest {
1796 public:
1797 BootMergerTest()
1798 : SortingElementTest(),
1799 boot0_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07001800 /* 100ms */
1801 "max_out_of_order_duration": 100000000,
1802 "node": {
1803 "name": "pi2"
1804 },
1805 "logger_node": {
1806 "name": "pi1"
1807 },
1808 "monotonic_start_time": 1000000,
1809 "realtime_start_time": 1000000000000,
1810 "logger_monotonic_start_time": 1000000,
1811 "logger_realtime_start_time": 1000000000000,
1812 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
1813 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
1814 "parts_index": 0,
1815 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
1816 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
1817 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464"
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001818})")),
1819 boot1_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07001820 /* 100ms */
1821 "max_out_of_order_duration": 100000000,
1822 "node": {
1823 "name": "pi2"
1824 },
1825 "logger_node": {
1826 "name": "pi1"
1827 },
1828 "monotonic_start_time": 1000000,
1829 "realtime_start_time": 1000000000000,
1830 "logger_monotonic_start_time": 1000000,
1831 "logger_realtime_start_time": 1000000000000,
1832 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
1833 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
1834 "parts_index": 1,
1835 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
1836 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
1837 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2"
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001838})")) {}
Austin Schuh8bee6882021-06-28 21:03:28 -07001839
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001840 protected:
1841 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
1842 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
1843};
1844
1845// This tests that we can properly sort a multi-node log file which has the old
1846// (and buggy) timestamps in the header, and the non-resetting parts_index.
1847// These make it so we can just bairly figure out what happened first and what
1848// happened second, but not in a way that is robust to multiple nodes rebooting.
1849TEST_F(BootMergerTest, OldReboot) {
Austin Schuh8bee6882021-06-28 21:03:28 -07001850 {
1851 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001852 writer.QueueSpan(boot0_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07001853 }
1854 {
1855 DetachedBufferWriter writer(logfile1_, std::make_unique<DummyEncoder>());
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001856 writer.QueueSpan(boot1_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07001857 }
1858
1859 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1860
1861 ASSERT_EQ(parts.size(), 1u);
1862 ASSERT_EQ(parts[0].parts.size(), 2u);
1863
1864 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
1865 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001866 boot0_.message().source_node_boot_uuid()->string_view());
Austin Schuh8bee6882021-06-28 21:03:28 -07001867
1868 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
1869 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001870 boot1_.message().source_node_boot_uuid()->string_view());
1871}
1872
1873// This tests that we can produce messages ordered across a reboot.
1874TEST_F(BootMergerTest, SortAcrossReboot) {
1875 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1876 {
1877 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
1878 writer.QueueSpan(boot0_.span());
1879 writer.QueueSizedFlatbuffer(
1880 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1881 writer.QueueSizedFlatbuffer(
1882 MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
1883 }
1884 {
1885 DetachedBufferWriter writer(logfile1_, std::make_unique<DummyEncoder>());
1886 writer.QueueSpan(boot1_.span());
1887 writer.QueueSizedFlatbuffer(
1888 MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
1889 writer.QueueSizedFlatbuffer(
1890 MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
1891 }
1892
1893 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1894 ASSERT_EQ(parts.size(), 1u);
1895 ASSERT_EQ(parts[0].parts.size(), 2u);
1896
1897 BootMerger merger(FilterPartsForNode(parts, "pi2"));
1898
1899 EXPECT_EQ(merger.node(), 1u);
1900
1901 std::vector<Message> output;
1902 for (int i = 0; i < 4; ++i) {
1903 ASSERT_TRUE(merger.Front() != nullptr);
1904 output.emplace_back(std::move(*merger.Front()));
1905 merger.PopFront();
1906 }
1907
1908 ASSERT_TRUE(merger.Front() == nullptr);
1909
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001910 EXPECT_EQ(output[0].timestamp.boot, 0u);
1911 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
1912 EXPECT_EQ(output[1].timestamp.boot, 0u);
1913 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(2000));
1914
1915 EXPECT_EQ(output[2].timestamp.boot, 1u);
1916 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(100));
1917 EXPECT_EQ(output[3].timestamp.boot, 1u);
1918 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
Austin Schuh8bee6882021-06-28 21:03:28 -07001919}
1920
Austin Schuhc243b422020-10-11 15:35:08 -07001921} // namespace testing
1922} // namespace logger
1923} // namespace aos