blob: d5cb41bb3d6b2f232e31b761462481cd92d5a189 [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
4#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07005
Austin Schuhc41603c2020-10-11 16:17:37 -07006#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -07007#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -08008#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -07009#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070010#include "aos/json_to_flatbuffer.h"
11#include "aos/testing/tmpdir.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070012#include "gtest/gtest.h"
Austin Schuhc243b422020-10-11 15:35:08 -070013
14namespace aos {
15namespace logger {
16namespace testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070017namespace chrono = std::chrono;
Austin Schuhc243b422020-10-11 15:35:08 -070018
Austin Schuhe243aaf2020-10-11 15:46:02 -070019// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070020template <typename T>
21SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
22 const std::string_view data) {
23 flatbuffers::FlatBufferBuilder fbb;
24 fbb.ForceDefaults(true);
25 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
26 return fbb.Release();
27}
28
Austin Schuhe243aaf2020-10-11 15:46:02 -070029// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070030TEST(SpanReaderTest, ReadWrite) {
31 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
32 unlink(logfile.c_str());
33
34 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080035 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070036 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080037 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070038
39 {
40 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080041 writer.QueueSpan(m1.span());
42 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070043 }
44
45 SpanReader reader(logfile);
46
47 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080048 EXPECT_EQ(reader.ReadMessage(), m1.span());
49 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070050 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
51}
52
Austin Schuhe243aaf2020-10-11 15:46:02 -070053// Tests that we can actually parse the resulting messages at a basic level
54// through MessageReader.
55TEST(MessageReaderTest, ReadWrite) {
56 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
57 unlink(logfile.c_str());
58
59 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
60 JsonToSizedFlatbuffer<LogFileHeader>(
61 R"({ "max_out_of_order_duration": 100000000 })");
62 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
63 JsonToSizedFlatbuffer<MessageHeader>(
64 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
65 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
66 JsonToSizedFlatbuffer<MessageHeader>(
67 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
68
69 {
70 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080071 writer.QueueSpan(config.span());
72 writer.QueueSpan(m1.span());
73 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -070074 }
75
76 MessageReader reader(logfile);
77
78 EXPECT_EQ(reader.filename(), logfile);
79
80 EXPECT_EQ(
81 reader.max_out_of_order_duration(),
82 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
83 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
84 EXPECT_TRUE(reader.ReadMessage());
85 EXPECT_EQ(reader.newest_timestamp(),
86 monotonic_clock::time_point(chrono::nanoseconds(1)));
87 EXPECT_TRUE(reader.ReadMessage());
88 EXPECT_EQ(reader.newest_timestamp(),
89 monotonic_clock::time_point(chrono::nanoseconds(2)));
90 EXPECT_FALSE(reader.ReadMessage());
91}
92
Austin Schuh32f68492020-11-08 21:45:51 -080093// Tests that we explode when messages are too far out of order.
94TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
95 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
96 unlink(logfile0.c_str());
97
98 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
99 JsonToSizedFlatbuffer<LogFileHeader>(
100 R"({
101 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800102 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800103 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
104 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
105 "parts_index": 0
106})");
107
108 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
109 JsonToSizedFlatbuffer<MessageHeader>(
110 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
111 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
112 JsonToSizedFlatbuffer<MessageHeader>(
113 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
114 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
115 JsonToSizedFlatbuffer<MessageHeader>(
116 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
117
118 {
119 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800120 writer.QueueSpan(config0.span());
121 writer.QueueSpan(m1.span());
122 writer.QueueSpan(m2.span());
123 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800124 }
125
126 const std::vector<LogFile> parts = SortParts({logfile0});
127
128 PartsMessageReader reader(parts[0].parts[0]);
129
130 EXPECT_TRUE(reader.ReadMessage());
131 EXPECT_TRUE(reader.ReadMessage());
132 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
133}
134
Austin Schuhc41603c2020-10-11 16:17:37 -0700135// Tests that we can transparently re-assemble part files with a
136// PartsMessageReader.
137TEST(PartsMessageReaderTest, ReadWrite) {
138 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
139 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
140 unlink(logfile0.c_str());
141 unlink(logfile1.c_str());
142
143 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
144 JsonToSizedFlatbuffer<LogFileHeader>(
145 R"({
146 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800147 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700148 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
149 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
150 "parts_index": 0
151})");
152 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
153 JsonToSizedFlatbuffer<LogFileHeader>(
154 R"({
155 "max_out_of_order_duration": 200000000,
156 "monotonic_start_time": 0,
157 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800158 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700159 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
160 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
161 "parts_index": 1
162})");
163
164 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
165 JsonToSizedFlatbuffer<MessageHeader>(
166 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
167 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
168 JsonToSizedFlatbuffer<MessageHeader>(
169 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
170
171 {
172 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800173 writer.QueueSpan(config0.span());
174 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700175 }
176 {
177 DetachedBufferWriter writer(logfile1, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800178 writer.QueueSpan(config1.span());
179 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700180 }
181
182 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
183
184 PartsMessageReader reader(parts[0].parts[0]);
185
186 EXPECT_EQ(reader.filename(), logfile0);
187
188 // Confirm that the timestamps track, and the filename also updates.
189 // Read the first message.
190 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
191 EXPECT_EQ(
192 reader.max_out_of_order_duration(),
193 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
194 EXPECT_TRUE(reader.ReadMessage());
195 EXPECT_EQ(reader.filename(), logfile0);
196 EXPECT_EQ(reader.newest_timestamp(),
197 monotonic_clock::time_point(chrono::nanoseconds(1)));
198 EXPECT_EQ(
199 reader.max_out_of_order_duration(),
200 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
201
202 // Read the second message.
203 EXPECT_TRUE(reader.ReadMessage());
204 EXPECT_EQ(reader.filename(), logfile1);
205 EXPECT_EQ(reader.newest_timestamp(),
206 monotonic_clock::time_point(chrono::nanoseconds(2)));
207 EXPECT_EQ(
208 reader.max_out_of_order_duration(),
209 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
210
211 // And then confirm that reading again returns no message.
212 EXPECT_FALSE(reader.ReadMessage());
213 EXPECT_EQ(reader.filename(), logfile1);
214 EXPECT_EQ(
215 reader.max_out_of_order_duration(),
216 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800217 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Austin Schuhc41603c2020-10-11 16:17:37 -0700218}
Austin Schuh32f68492020-11-08 21:45:51 -0800219
Austin Schuh1be0ce42020-11-29 22:43:26 -0800220// Tests that Message's operator < works as expected.
221TEST(MessageTest, Sorting) {
222 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
223
224 Message m1{.channel_index = 0,
225 .queue_index = 0,
226 .timestamp = e + chrono::milliseconds(1),
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700227 .boot_count = 0,
Austin Schuh1be0ce42020-11-29 22:43:26 -0800228 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
229 Message m2{.channel_index = 0,
230 .queue_index = 0,
231 .timestamp = e + chrono::milliseconds(2),
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700232 .boot_count = 0,
Austin Schuh1be0ce42020-11-29 22:43:26 -0800233 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
234
235 EXPECT_LT(m1, m2);
236 EXPECT_GE(m2, m1);
237
238 m1.timestamp = e;
239 m2.timestamp = e;
240
241 m1.channel_index = 1;
242 m2.channel_index = 2;
243
244 EXPECT_LT(m1, m2);
245 EXPECT_GE(m2, m1);
246
247 m1.channel_index = 0;
248 m2.channel_index = 0;
249 m1.queue_index = 0;
250 m2.queue_index = 1;
251
252 EXPECT_LT(m1, m2);
253 EXPECT_GE(m2, m1);
254}
255
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800256aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
257 const aos::FlatbufferDetachedBuffer<Configuration> &config,
258 const std::string_view json) {
259 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700260 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800261 flatbuffers::Offset<Configuration> config_offset =
262 aos::CopyFlatBuffer(config, &fbb);
263 LogFileHeader::Builder header_builder(fbb);
264 header_builder.add_configuration(config_offset);
265 fbb.Finish(header_builder.Finish());
266 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
267
268 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
269 JsonToFlatbuffer<LogFileHeader>(json));
270 CHECK(header_updates.Verify());
271 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700272 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800273 fbb2.FinishSizePrefixed(
274 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
275 return fbb2.Release();
276}
277
278class SortingElementTest : public ::testing::Test {
279 public:
280 SortingElementTest()
281 : config_(JsonToFlatbuffer<Configuration>(
282 R"({
283 "channels": [
284 {
285 "name": "/a",
286 "type": "aos.logger.testing.TestMessage",
287 "source_node": "pi1",
288 "destination_nodes": [
289 {
290 "name": "pi2"
291 },
292 {
293 "name": "pi3"
294 }
295 ]
296 },
297 {
298 "name": "/b",
299 "type": "aos.logger.testing.TestMessage",
300 "source_node": "pi1"
301 },
302 {
303 "name": "/c",
304 "type": "aos.logger.testing.TestMessage",
305 "source_node": "pi1"
306 }
307 ],
308 "nodes": [
309 {
310 "name": "pi1"
311 },
312 {
313 "name": "pi2"
314 },
315 {
316 "name": "pi3"
317 }
318 ]
319}
320)")),
321 config0_(MakeHeader(config_, R"({
322 /* 100ms */
323 "max_out_of_order_duration": 100000000,
324 "node": {
325 "name": "pi1"
326 },
327 "logger_node": {
328 "name": "pi1"
329 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800330 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800331 "realtime_start_time": 1000000000000,
332 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
333 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
334 "parts_index": 0
335})")),
336 config1_(MakeHeader(config_,
337 R"({
338 /* 100ms */
339 "max_out_of_order_duration": 100000000,
340 "node": {
341 "name": "pi1"
342 },
343 "logger_node": {
344 "name": "pi1"
345 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800346 "monotonic_start_time": 1000000,
347 "realtime_start_time": 1000000000000,
348 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
349 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
350 "parts_index": 0
351})")),
352 config2_(MakeHeader(config_,
353 R"({
354 /* 100ms */
355 "max_out_of_order_duration": 100000000,
356 "node": {
357 "name": "pi2"
358 },
359 "logger_node": {
360 "name": "pi2"
361 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800362 "monotonic_start_time": 0,
363 "realtime_start_time": 1000000000000,
Austin Schuhd2f96102020-12-01 20:27:29 -0800364 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
365 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
366 "parts_index": 0
367})")),
368 config3_(MakeHeader(config_,
369 R"({
370 /* 100ms */
371 "max_out_of_order_duration": 100000000,
372 "node": {
373 "name": "pi1"
374 },
375 "logger_node": {
376 "name": "pi1"
377 },
378 "monotonic_start_time": 2000000,
379 "realtime_start_time": 1000000000,
380 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
381 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800382 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800383})")),
384 config4_(MakeHeader(config_,
385 R"({
386 /* 100ms */
387 "max_out_of_order_duration": 100000000,
388 "node": {
389 "name": "pi2"
390 },
391 "logger_node": {
392 "name": "pi1"
393 },
394 "monotonic_start_time": 2000000,
395 "realtime_start_time": 1000000000,
396 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
397 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
398 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800399})")) {
400 unlink(logfile0_.c_str());
401 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800402 unlink(logfile2_.c_str());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800403 queue_index_.resize(kChannels);
404 }
405
406 protected:
407 static constexpr size_t kChannels = 3u;
408
409 flatbuffers::DetachedBuffer MakeLogMessage(
410 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
411 int value) {
412 flatbuffers::FlatBufferBuilder message_fbb;
413 message_fbb.ForceDefaults(true);
414 TestMessage::Builder test_message_builder(message_fbb);
415 test_message_builder.add_value(value);
416 message_fbb.Finish(test_message_builder.Finish());
417
418 aos::Context context;
419 context.monotonic_event_time = monotonic_now;
420 context.realtime_event_time = aos::realtime_clock::epoch() +
421 chrono::seconds(1000) +
422 monotonic_now.time_since_epoch();
423 context.queue_index = queue_index_[channel_index];
424 context.size = message_fbb.GetSize();
425 context.data = message_fbb.GetBufferPointer();
426
427 ++queue_index_[channel_index];
428
429 flatbuffers::FlatBufferBuilder fbb;
430 fbb.FinishSizePrefixed(
431 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
432
433 return fbb.Release();
434 }
435
436 flatbuffers::DetachedBuffer MakeTimestampMessage(
437 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800438 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
439 monotonic_clock::time_point monotonic_timestamp_time =
440 monotonic_clock::min_time) {
441 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800442 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800443
444 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800445 fbb.ForceDefaults(true);
446
447 logger::MessageHeader::Builder message_header_builder(fbb);
448
449 message_header_builder.add_channel_index(channel_index);
450
451 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
452 100);
453 message_header_builder.add_monotonic_sent_time(
454 monotonic_sent_time.time_since_epoch().count());
455 message_header_builder.add_realtime_sent_time(
456 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
457 monotonic_sent_time.time_since_epoch())
458 .time_since_epoch()
459 .count());
460
461 message_header_builder.add_monotonic_remote_time(
462 sender_monotonic_now.time_since_epoch().count());
463 message_header_builder.add_realtime_remote_time(
464 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
465 sender_monotonic_now.time_since_epoch())
466 .time_since_epoch()
467 .count());
468 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
469 1);
470
471 if (monotonic_timestamp_time != monotonic_clock::min_time) {
472 message_header_builder.add_monotonic_timestamp_time(
473 monotonic_timestamp_time.time_since_epoch().count());
474 }
475
476 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800477 LOG(INFO) << aos::FlatbufferToJson(
478 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
479 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
480
481 return fbb.Release();
482 }
483
484 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
485 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800486 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800487
488 const aos::FlatbufferDetachedBuffer<Configuration> config_;
489 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
490 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800491 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
492 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800493 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800494
495 std::vector<uint32_t> queue_index_;
496};
497
498using LogPartsSorterTest = SortingElementTest;
499using LogPartsSorterDeathTest = LogPartsSorterTest;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800500using NodeMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800501using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800502
503// Tests that we can pull messages out of a log sorted in order.
504TEST_F(LogPartsSorterTest, Pull) {
505 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
506 {
507 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
508 writer.QueueSpan(config0_.span());
509 writer.QueueSizedFlatbuffer(
510 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
511 writer.QueueSizedFlatbuffer(
512 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
513 writer.QueueSizedFlatbuffer(
514 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
515 writer.QueueSizedFlatbuffer(
516 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
517 }
518
519 const std::vector<LogFile> parts = SortParts({logfile0_});
520
521 LogPartsSorter parts_sorter(parts[0].parts[0]);
522
523 // Confirm we aren't sorted until any time until the message is popped.
524 // Peeking shouldn't change the sorted until time.
525 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
526
527 std::deque<Message> output;
528
529 ASSERT_TRUE(parts_sorter.Front() != nullptr);
530 output.emplace_back(std::move(*parts_sorter.Front()));
531 parts_sorter.PopFront();
532 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
533
534 ASSERT_TRUE(parts_sorter.Front() != nullptr);
535 output.emplace_back(std::move(*parts_sorter.Front()));
536 parts_sorter.PopFront();
537 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
538
539 ASSERT_TRUE(parts_sorter.Front() != nullptr);
540 output.emplace_back(std::move(*parts_sorter.Front()));
541 parts_sorter.PopFront();
542 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
543
544 ASSERT_TRUE(parts_sorter.Front() != nullptr);
545 output.emplace_back(std::move(*parts_sorter.Front()));
546 parts_sorter.PopFront();
547 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
548
549 ASSERT_TRUE(parts_sorter.Front() == nullptr);
550
551 EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
552 EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(1000));
553 EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(1901));
554 EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(2000));
555}
556
Austin Schuhb000de62020-12-03 22:00:40 -0800557// Tests that we can pull messages out of a log sorted in order.
558TEST_F(LogPartsSorterTest, WayBeforeStart) {
559 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
560 {
561 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
562 writer.QueueSpan(config0_.span());
563 writer.QueueSizedFlatbuffer(
564 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
565 writer.QueueSizedFlatbuffer(
566 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
567 writer.QueueSizedFlatbuffer(
568 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
569 writer.QueueSizedFlatbuffer(
570 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
571 writer.QueueSizedFlatbuffer(
572 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
573 }
574
575 const std::vector<LogFile> parts = SortParts({logfile0_});
576
577 LogPartsSorter parts_sorter(parts[0].parts[0]);
578
579 // Confirm we aren't sorted until any time until the message is popped.
580 // Peeking shouldn't change the sorted until time.
581 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
582
583 std::deque<Message> output;
584
585 for (monotonic_clock::time_point t :
586 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
587 e + chrono::milliseconds(1900), monotonic_clock::max_time,
588 monotonic_clock::max_time}) {
589 ASSERT_TRUE(parts_sorter.Front() != nullptr);
590 output.emplace_back(std::move(*parts_sorter.Front()));
591 parts_sorter.PopFront();
592 EXPECT_EQ(parts_sorter.sorted_until(), t);
593 }
594
595 ASSERT_TRUE(parts_sorter.Front() == nullptr);
596
597 EXPECT_EQ(output[0].timestamp, e - chrono::milliseconds(1000));
598 EXPECT_EQ(output[1].timestamp, e - chrono::milliseconds(500));
599 EXPECT_EQ(output[2].timestamp, e - chrono::milliseconds(10));
600 EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(1901));
601 EXPECT_EQ(output[4].timestamp, e + chrono::milliseconds(2000));
602}
603
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800604// Tests that messages too far out of order trigger death.
605TEST_F(LogPartsSorterDeathTest, Pull) {
606 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
607 {
608 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
609 writer.QueueSpan(config0_.span());
610 writer.QueueSizedFlatbuffer(
611 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
612 writer.QueueSizedFlatbuffer(
613 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
614 writer.QueueSizedFlatbuffer(
615 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
616 // The following message is too far out of order and will trigger the CHECK.
617 writer.QueueSizedFlatbuffer(
618 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
619 }
620
621 const std::vector<LogFile> parts = SortParts({logfile0_});
622
623 LogPartsSorter parts_sorter(parts[0].parts[0]);
624
625 // Confirm we aren't sorted until any time until the message is popped.
626 // Peeking shouldn't change the sorted until time.
627 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
628 std::deque<Message> output;
629
630 ASSERT_TRUE(parts_sorter.Front() != nullptr);
631 parts_sorter.PopFront();
632 ASSERT_TRUE(parts_sorter.Front() != nullptr);
633 ASSERT_TRUE(parts_sorter.Front() != nullptr);
634 parts_sorter.PopFront();
635
Austin Schuha040c3f2021-02-13 16:09:07 -0800636 EXPECT_DEATH({ parts_sorter.Front(); }, "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800637}
638
Austin Schuh8f52ed52020-11-30 23:12:39 -0800639// Tests that we can merge data from 2 separate files, including duplicate data.
640TEST_F(NodeMergerTest, TwoFileMerger) {
641 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
642 {
643 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
644 writer0.QueueSpan(config0_.span());
645 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
646 writer1.QueueSpan(config1_.span());
647
648 writer0.QueueSizedFlatbuffer(
649 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
650 writer1.QueueSizedFlatbuffer(
651 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
652
653 writer0.QueueSizedFlatbuffer(
654 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
655 writer1.QueueSizedFlatbuffer(
656 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
657
658 // Make a duplicate!
659 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
660 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
661 writer0.QueueSpan(msg.span());
662 writer1.QueueSpan(msg.span());
663
664 writer1.QueueSizedFlatbuffer(
665 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
666 }
667
668 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Austin Schuhd2f96102020-12-01 20:27:29 -0800669 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800670
Austin Schuhd2f96102020-12-01 20:27:29 -0800671 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800672
673 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
674
675 std::deque<Message> output;
676
677 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
678 ASSERT_TRUE(merger.Front() != nullptr);
679 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
680
681 output.emplace_back(std::move(*merger.Front()));
682 merger.PopFront();
683 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
684
685 ASSERT_TRUE(merger.Front() != nullptr);
686 output.emplace_back(std::move(*merger.Front()));
687 merger.PopFront();
688 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
689
690 ASSERT_TRUE(merger.Front() != nullptr);
691 output.emplace_back(std::move(*merger.Front()));
692 merger.PopFront();
693 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
694
695 ASSERT_TRUE(merger.Front() != nullptr);
696 output.emplace_back(std::move(*merger.Front()));
697 merger.PopFront();
698 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
699
700 ASSERT_TRUE(merger.Front() != nullptr);
701 output.emplace_back(std::move(*merger.Front()));
702 merger.PopFront();
703 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
704
705 ASSERT_TRUE(merger.Front() != nullptr);
706 output.emplace_back(std::move(*merger.Front()));
707 merger.PopFront();
708 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
709
710 ASSERT_TRUE(merger.Front() == nullptr);
711
712 EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
713 EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(1001));
714 EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(1002));
715 EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(2000));
716 EXPECT_EQ(output[4].timestamp, e + chrono::milliseconds(3000));
717 EXPECT_EQ(output[5].timestamp, e + chrono::milliseconds(3002));
718}
719
Austin Schuh8bf1e632021-01-02 22:41:04 -0800720// Tests that we can merge timestamps with various combinations of
721// monotonic_timestamp_time.
722TEST_F(NodeMergerTest, TwoFileTimestampMerger) {
723 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
724 {
725 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
726 writer0.QueueSpan(config0_.span());
727 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
728 writer1.QueueSpan(config1_.span());
729
730 // Neither has it.
731 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
732 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
733 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
734 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
735 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
736
737 // First only has it.
738 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
739 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
740 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
741 e + chrono::nanoseconds(971)));
742 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
743 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
744
745 // Second only has it.
746 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
747 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
748 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
749 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
750 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
751 e + chrono::nanoseconds(972)));
752
753 // Both have it.
754 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
755 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
756 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
757 e + chrono::nanoseconds(973)));
758 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
759 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
760 e + chrono::nanoseconds(973)));
761 }
762
763 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
764 ASSERT_EQ(parts.size(), 1u);
765
766 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
767
768 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
769
770 std::deque<Message> output;
771
772 for (int i = 0; i < 4; ++i) {
773 ASSERT_TRUE(merger.Front() != nullptr);
774 output.emplace_back(std::move(*merger.Front()));
775 merger.PopFront();
776 }
777 ASSERT_TRUE(merger.Front() == nullptr);
778
779 EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(101000));
780 EXPECT_FALSE(output[0].data.message().has_monotonic_timestamp_time());
781 EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(101001));
782 EXPECT_TRUE(output[1].data.message().has_monotonic_timestamp_time());
783 EXPECT_EQ(output[1].data.message().monotonic_timestamp_time(), 971);
784 EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(101002));
785 EXPECT_TRUE(output[2].data.message().has_monotonic_timestamp_time());
786 EXPECT_EQ(output[2].data.message().monotonic_timestamp_time(), 972);
787 EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(101003));
788 EXPECT_TRUE(output[3].data.message().has_monotonic_timestamp_time());
789 EXPECT_EQ(output[3].data.message().monotonic_timestamp_time(), 973);
790}
791
Austin Schuhd2f96102020-12-01 20:27:29 -0800792// Tests that we can match timestamps on delivered messages.
793TEST_F(TimestampMapperTest, ReadNode0First) {
794 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
795 {
796 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
797 writer0.QueueSpan(config0_.span());
798 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
799 writer1.QueueSpan(config2_.span());
800
801 writer0.QueueSizedFlatbuffer(
802 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
803 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
804 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
805
806 writer0.QueueSizedFlatbuffer(
807 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
808 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
809 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
810
811 writer0.QueueSizedFlatbuffer(
812 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
813 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
814 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
815 }
816
817 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
818
819 ASSERT_EQ(parts[0].logger_node, "pi1");
820 ASSERT_EQ(parts[1].logger_node, "pi2");
821
Austin Schuh79b30942021-01-24 22:32:21 -0800822 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800823 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -0800824 mapper0.set_timestamp_callback(
825 [&](TimestampedMessage *) { ++mapper0_count; });
826 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800827 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -0800828 mapper1.set_timestamp_callback(
829 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -0800830
831 mapper0.AddPeer(&mapper1);
832 mapper1.AddPeer(&mapper0);
833
834 {
835 std::deque<TimestampedMessage> output0;
836
Austin Schuh79b30942021-01-24 22:32:21 -0800837 EXPECT_EQ(mapper0_count, 0u);
838 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800839 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800840 EXPECT_EQ(mapper0_count, 1u);
841 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800842 output0.emplace_back(std::move(*mapper0.Front()));
843 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700844 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800845 EXPECT_EQ(mapper0_count, 1u);
846 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800847
848 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800849 EXPECT_EQ(mapper0_count, 2u);
850 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800851 output0.emplace_back(std::move(*mapper0.Front()));
852 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700853 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800854
855 ASSERT_TRUE(mapper0.Front() != nullptr);
856 output0.emplace_back(std::move(*mapper0.Front()));
857 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700858 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800859
Austin Schuh79b30942021-01-24 22:32:21 -0800860 EXPECT_EQ(mapper0_count, 3u);
861 EXPECT_EQ(mapper1_count, 0u);
862
Austin Schuhd2f96102020-12-01 20:27:29 -0800863 ASSERT_TRUE(mapper0.Front() == nullptr);
864
865 EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
866 EXPECT_TRUE(output0[0].data.Verify());
867 EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
868 EXPECT_TRUE(output0[1].data.Verify());
869 EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
870 EXPECT_TRUE(output0[2].data.Verify());
871 }
872
873 {
874 SCOPED_TRACE("Trying node1 now");
875 std::deque<TimestampedMessage> output1;
876
Austin Schuh79b30942021-01-24 22:32:21 -0800877 EXPECT_EQ(mapper0_count, 3u);
878 EXPECT_EQ(mapper1_count, 0u);
879
Austin Schuhd2f96102020-12-01 20:27:29 -0800880 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800881 EXPECT_EQ(mapper0_count, 3u);
882 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800883 output1.emplace_back(std::move(*mapper1.Front()));
884 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700885 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800886 EXPECT_EQ(mapper0_count, 3u);
887 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800888
889 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800890 EXPECT_EQ(mapper0_count, 3u);
891 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800892 output1.emplace_back(std::move(*mapper1.Front()));
893 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700894 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800895
896 ASSERT_TRUE(mapper1.Front() != nullptr);
897 output1.emplace_back(std::move(*mapper1.Front()));
898 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700899 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800900
Austin Schuh79b30942021-01-24 22:32:21 -0800901 EXPECT_EQ(mapper0_count, 3u);
902 EXPECT_EQ(mapper1_count, 3u);
903
Austin Schuhd2f96102020-12-01 20:27:29 -0800904 ASSERT_TRUE(mapper1.Front() == nullptr);
905
Austin Schuh79b30942021-01-24 22:32:21 -0800906 EXPECT_EQ(mapper0_count, 3u);
907 EXPECT_EQ(mapper1_count, 3u);
908
Austin Schuhd2f96102020-12-01 20:27:29 -0800909 EXPECT_EQ(output1[0].monotonic_event_time,
910 e + chrono::seconds(100) + chrono::milliseconds(1000));
911 EXPECT_TRUE(output1[0].data.Verify());
912 EXPECT_EQ(output1[1].monotonic_event_time,
913 e + chrono::seconds(100) + chrono::milliseconds(2000));
914 EXPECT_TRUE(output1[1].data.Verify());
915 EXPECT_EQ(output1[2].monotonic_event_time,
916 e + chrono::seconds(100) + chrono::milliseconds(3000));
917 EXPECT_TRUE(output1[2].data.Verify());
918 }
919}
920
Austin Schuh8bf1e632021-01-02 22:41:04 -0800921// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
922// returned.
923TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
924 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
925 {
926 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
927 writer0.QueueSpan(config0_.span());
928 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
929 writer1.QueueSpan(config4_.span());
930
931 writer0.QueueSizedFlatbuffer(
932 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
933 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
934 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
935 e + chrono::nanoseconds(971)));
936
937 writer0.QueueSizedFlatbuffer(
938 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
939 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
940 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
941 e + chrono::nanoseconds(5458)));
942
943 writer0.QueueSizedFlatbuffer(
944 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
945 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
946 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
947 }
948
949 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
950
951 for (const auto &p : parts) {
952 LOG(INFO) << p;
953 }
954
955 ASSERT_EQ(parts.size(), 1u);
956
Austin Schuh79b30942021-01-24 22:32:21 -0800957 size_t mapper0_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800958 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -0800959 mapper0.set_timestamp_callback(
960 [&](TimestampedMessage *) { ++mapper0_count; });
961 size_t mapper1_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800962 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -0800963 mapper1.set_timestamp_callback(
964 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -0800965
966 mapper0.AddPeer(&mapper1);
967 mapper1.AddPeer(&mapper0);
968
969 {
970 std::deque<TimestampedMessage> output0;
971
972 for (int i = 0; i < 3; ++i) {
973 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
974 output0.emplace_back(std::move(*mapper0.Front()));
975 mapper0.PopFront();
976 }
977
978 ASSERT_TRUE(mapper0.Front() == nullptr);
979
980 EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
981 EXPECT_EQ(output0[0].monotonic_timestamp_time, monotonic_clock::min_time);
982 EXPECT_TRUE(output0[0].data.Verify());
983 EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
984 EXPECT_EQ(output0[1].monotonic_timestamp_time, monotonic_clock::min_time);
985 EXPECT_TRUE(output0[1].data.Verify());
986 EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
987 EXPECT_EQ(output0[2].monotonic_timestamp_time, monotonic_clock::min_time);
988 EXPECT_TRUE(output0[2].data.Verify());
989 }
990
991 {
992 SCOPED_TRACE("Trying node1 now");
993 std::deque<TimestampedMessage> output1;
994
995 for (int i = 0; i < 3; ++i) {
996 ASSERT_TRUE(mapper1.Front() != nullptr);
997 output1.emplace_back(std::move(*mapper1.Front()));
998 mapper1.PopFront();
999 }
1000
1001 ASSERT_TRUE(mapper1.Front() == nullptr);
1002
1003 EXPECT_EQ(output1[0].monotonic_event_time,
1004 e + chrono::seconds(100) + chrono::milliseconds(1000));
1005 EXPECT_EQ(output1[0].monotonic_timestamp_time,
1006 e + chrono::nanoseconds(971));
1007 EXPECT_TRUE(output1[0].data.Verify());
1008 EXPECT_EQ(output1[1].monotonic_event_time,
1009 e + chrono::seconds(100) + chrono::milliseconds(2000));
1010 EXPECT_EQ(output1[1].monotonic_timestamp_time,
1011 e + chrono::nanoseconds(5458));
1012 EXPECT_TRUE(output1[1].data.Verify());
1013 EXPECT_EQ(output1[2].monotonic_event_time,
1014 e + chrono::seconds(100) + chrono::milliseconds(3000));
1015 EXPECT_EQ(output1[2].monotonic_timestamp_time, monotonic_clock::min_time);
1016 EXPECT_TRUE(output1[2].data.Verify());
1017 }
Austin Schuh79b30942021-01-24 22:32:21 -08001018
1019 EXPECT_EQ(mapper0_count, 3u);
1020 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001021}
1022
Austin Schuhd2f96102020-12-01 20:27:29 -08001023// Tests that we can match timestamps on delivered messages. By doing this in
1024// the reverse order, the second node needs to queue data up from the first node
1025// to find the matching timestamp.
1026TEST_F(TimestampMapperTest, ReadNode1First) {
1027 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1028 {
1029 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1030 writer0.QueueSpan(config0_.span());
1031 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1032 writer1.QueueSpan(config2_.span());
1033
1034 writer0.QueueSizedFlatbuffer(
1035 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1036 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1037 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1038
1039 writer0.QueueSizedFlatbuffer(
1040 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1041 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1042 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1043
1044 writer0.QueueSizedFlatbuffer(
1045 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1046 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1047 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1048 }
1049
1050 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1051
1052 ASSERT_EQ(parts[0].logger_node, "pi1");
1053 ASSERT_EQ(parts[1].logger_node, "pi2");
1054
Austin Schuh79b30942021-01-24 22:32:21 -08001055 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001056 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001057 mapper0.set_timestamp_callback(
1058 [&](TimestampedMessage *) { ++mapper0_count; });
1059 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001060 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001061 mapper1.set_timestamp_callback(
1062 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001063
1064 mapper0.AddPeer(&mapper1);
1065 mapper1.AddPeer(&mapper0);
1066
1067 {
1068 SCOPED_TRACE("Trying node1 now");
1069 std::deque<TimestampedMessage> output1;
1070
1071 ASSERT_TRUE(mapper1.Front() != nullptr);
1072 output1.emplace_back(std::move(*mapper1.Front()));
1073 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001074 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001075
1076 ASSERT_TRUE(mapper1.Front() != nullptr);
1077 output1.emplace_back(std::move(*mapper1.Front()));
1078 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001079 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001080
1081 ASSERT_TRUE(mapper1.Front() != nullptr);
1082 output1.emplace_back(std::move(*mapper1.Front()));
1083 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001084 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001085
1086 ASSERT_TRUE(mapper1.Front() == nullptr);
1087
1088 EXPECT_EQ(output1[0].monotonic_event_time,
1089 e + chrono::seconds(100) + chrono::milliseconds(1000));
1090 EXPECT_TRUE(output1[0].data.Verify());
1091 EXPECT_EQ(output1[1].monotonic_event_time,
1092 e + chrono::seconds(100) + chrono::milliseconds(2000));
1093 EXPECT_TRUE(output1[1].data.Verify());
1094 EXPECT_EQ(output1[2].monotonic_event_time,
1095 e + chrono::seconds(100) + chrono::milliseconds(3000));
1096 EXPECT_TRUE(output1[2].data.Verify());
1097 }
1098
1099 {
1100 std::deque<TimestampedMessage> output0;
1101
1102 ASSERT_TRUE(mapper0.Front() != nullptr);
1103 output0.emplace_back(std::move(*mapper0.Front()));
1104 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001105 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001106
1107 ASSERT_TRUE(mapper0.Front() != nullptr);
1108 output0.emplace_back(std::move(*mapper0.Front()));
1109 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001110 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001111
1112 ASSERT_TRUE(mapper0.Front() != nullptr);
1113 output0.emplace_back(std::move(*mapper0.Front()));
1114 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001115 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001116
1117 ASSERT_TRUE(mapper0.Front() == nullptr);
1118
1119 EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
1120 EXPECT_TRUE(output0[0].data.Verify());
1121 EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
1122 EXPECT_TRUE(output0[1].data.Verify());
1123 EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
1124 EXPECT_TRUE(output0[2].data.Verify());
1125 }
Austin Schuh79b30942021-01-24 22:32:21 -08001126
1127 EXPECT_EQ(mapper0_count, 3u);
1128 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001129}
1130
1131// Tests that we return just the timestamps if we couldn't find the data and the
1132// missing data was at the beginning of the file.
1133TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1134 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1135 {
1136 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1137 writer0.QueueSpan(config0_.span());
1138 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1139 writer1.QueueSpan(config2_.span());
1140
1141 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
1142 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1143 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1144
1145 writer0.QueueSizedFlatbuffer(
1146 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1147 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1148 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1149
1150 writer0.QueueSizedFlatbuffer(
1151 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1152 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1153 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1154 }
1155
1156 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1157
1158 ASSERT_EQ(parts[0].logger_node, "pi1");
1159 ASSERT_EQ(parts[1].logger_node, "pi2");
1160
Austin Schuh79b30942021-01-24 22:32:21 -08001161 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001162 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001163 mapper0.set_timestamp_callback(
1164 [&](TimestampedMessage *) { ++mapper0_count; });
1165 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001166 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001167 mapper1.set_timestamp_callback(
1168 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001169
1170 mapper0.AddPeer(&mapper1);
1171 mapper1.AddPeer(&mapper0);
1172
1173 {
1174 SCOPED_TRACE("Trying node1 now");
1175 std::deque<TimestampedMessage> output1;
1176
1177 ASSERT_TRUE(mapper1.Front() != nullptr);
1178 output1.emplace_back(std::move(*mapper1.Front()));
1179 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001180 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001181
1182 ASSERT_TRUE(mapper1.Front() != nullptr);
1183 output1.emplace_back(std::move(*mapper1.Front()));
1184 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001185 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001186
1187 ASSERT_TRUE(mapper1.Front() != nullptr);
1188 output1.emplace_back(std::move(*mapper1.Front()));
1189 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001190 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001191
1192 ASSERT_TRUE(mapper1.Front() == nullptr);
1193
1194 EXPECT_EQ(output1[0].monotonic_event_time,
1195 e + chrono::seconds(100) + chrono::milliseconds(1000));
1196 EXPECT_FALSE(output1[0].data.Verify());
1197 EXPECT_EQ(output1[1].monotonic_event_time,
1198 e + chrono::seconds(100) + chrono::milliseconds(2000));
1199 EXPECT_TRUE(output1[1].data.Verify());
1200 EXPECT_EQ(output1[2].monotonic_event_time,
1201 e + chrono::seconds(100) + chrono::milliseconds(3000));
1202 EXPECT_TRUE(output1[2].data.Verify());
1203 }
Austin Schuh79b30942021-01-24 22:32:21 -08001204
1205 EXPECT_EQ(mapper0_count, 0u);
1206 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001207}
1208
1209// Tests that we return just the timestamps if we couldn't find the data and the
1210// missing data was at the end of the file.
1211TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1212 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1213 {
1214 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1215 writer0.QueueSpan(config0_.span());
1216 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1217 writer1.QueueSpan(config2_.span());
1218
1219 writer0.QueueSizedFlatbuffer(
1220 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1221 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1222 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1223
1224 writer0.QueueSizedFlatbuffer(
1225 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1226 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1227 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1228
1229 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
1230 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1231 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1232 }
1233
1234 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1235
1236 ASSERT_EQ(parts[0].logger_node, "pi1");
1237 ASSERT_EQ(parts[1].logger_node, "pi2");
1238
Austin Schuh79b30942021-01-24 22:32:21 -08001239 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001240 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001241 mapper0.set_timestamp_callback(
1242 [&](TimestampedMessage *) { ++mapper0_count; });
1243 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001244 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001245 mapper1.set_timestamp_callback(
1246 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001247
1248 mapper0.AddPeer(&mapper1);
1249 mapper1.AddPeer(&mapper0);
1250
1251 {
1252 SCOPED_TRACE("Trying node1 now");
1253 std::deque<TimestampedMessage> output1;
1254
1255 ASSERT_TRUE(mapper1.Front() != nullptr);
1256 output1.emplace_back(std::move(*mapper1.Front()));
1257 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001258 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001259
1260 ASSERT_TRUE(mapper1.Front() != nullptr);
1261 output1.emplace_back(std::move(*mapper1.Front()));
1262 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001263 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001264
1265 ASSERT_TRUE(mapper1.Front() != nullptr);
1266 output1.emplace_back(std::move(*mapper1.Front()));
1267 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001268 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001269
1270 ASSERT_TRUE(mapper1.Front() == nullptr);
1271
1272 EXPECT_EQ(output1[0].monotonic_event_time,
1273 e + chrono::seconds(100) + chrono::milliseconds(1000));
1274 EXPECT_TRUE(output1[0].data.Verify());
1275 EXPECT_EQ(output1[1].monotonic_event_time,
1276 e + chrono::seconds(100) + chrono::milliseconds(2000));
1277 EXPECT_TRUE(output1[1].data.Verify());
1278 EXPECT_EQ(output1[2].monotonic_event_time,
1279 e + chrono::seconds(100) + chrono::milliseconds(3000));
1280 EXPECT_FALSE(output1[2].data.Verify());
1281 }
Austin Schuh79b30942021-01-24 22:32:21 -08001282
1283 EXPECT_EQ(mapper0_count, 0u);
1284 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001285}
1286
Austin Schuh993ccb52020-12-12 15:59:32 -08001287// Tests that we handle a message which failed to forward or be logged.
1288TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1289 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1290 {
1291 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1292 writer0.QueueSpan(config0_.span());
1293 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1294 writer1.QueueSpan(config2_.span());
1295
1296 writer0.QueueSizedFlatbuffer(
1297 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1298 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1299 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1300
1301 // Create both the timestamp and message, but don't log them, simulating a
1302 // forwarding drop.
1303 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1304 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1305 chrono::seconds(100));
1306
1307 writer0.QueueSizedFlatbuffer(
1308 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1309 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1310 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1311 }
1312
1313 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1314
1315 ASSERT_EQ(parts[0].logger_node, "pi1");
1316 ASSERT_EQ(parts[1].logger_node, "pi2");
1317
Austin Schuh79b30942021-01-24 22:32:21 -08001318 size_t mapper0_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001319 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001320 mapper0.set_timestamp_callback(
1321 [&](TimestampedMessage *) { ++mapper0_count; });
1322 size_t mapper1_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001323 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001324 mapper1.set_timestamp_callback(
1325 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001326
1327 mapper0.AddPeer(&mapper1);
1328 mapper1.AddPeer(&mapper0);
1329
1330 {
1331 std::deque<TimestampedMessage> output1;
1332
1333 ASSERT_TRUE(mapper1.Front() != nullptr);
1334 output1.emplace_back(std::move(*mapper1.Front()));
1335 mapper1.PopFront();
1336
1337 ASSERT_TRUE(mapper1.Front() != nullptr);
1338 output1.emplace_back(std::move(*mapper1.Front()));
1339
1340 ASSERT_FALSE(mapper1.Front() == nullptr);
1341
1342 EXPECT_EQ(output1[0].monotonic_event_time,
1343 e + chrono::seconds(100) + chrono::milliseconds(1000));
1344 EXPECT_TRUE(output1[0].data.Verify());
1345 EXPECT_EQ(output1[1].monotonic_event_time,
1346 e + chrono::seconds(100) + chrono::milliseconds(3000));
1347 EXPECT_TRUE(output1[1].data.Verify());
1348 }
Austin Schuh79b30942021-01-24 22:32:21 -08001349
1350 EXPECT_EQ(mapper0_count, 0u);
1351 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001352}
1353
Austin Schuhd2f96102020-12-01 20:27:29 -08001354// Tests that we properly sort log files with duplicate timestamps.
1355TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1356 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1357 {
1358 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1359 writer0.QueueSpan(config0_.span());
1360 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1361 writer1.QueueSpan(config2_.span());
1362
1363 writer0.QueueSizedFlatbuffer(
1364 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1365 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1366 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1367
1368 writer0.QueueSizedFlatbuffer(
1369 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1370 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1371 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1372
1373 writer0.QueueSizedFlatbuffer(
1374 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
1375 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1376 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1377
1378 writer0.QueueSizedFlatbuffer(
1379 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
1380 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1381 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1382 }
1383
1384 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1385
1386 ASSERT_EQ(parts[0].logger_node, "pi1");
1387 ASSERT_EQ(parts[1].logger_node, "pi2");
1388
Austin Schuh79b30942021-01-24 22:32:21 -08001389 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001390 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001391 mapper0.set_timestamp_callback(
1392 [&](TimestampedMessage *) { ++mapper0_count; });
1393 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001394 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001395 mapper1.set_timestamp_callback(
1396 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001397
1398 mapper0.AddPeer(&mapper1);
1399 mapper1.AddPeer(&mapper0);
1400
1401 {
1402 SCOPED_TRACE("Trying node1 now");
1403 std::deque<TimestampedMessage> output1;
1404
1405 for (int i = 0; i < 4; ++i) {
1406 ASSERT_TRUE(mapper1.Front() != nullptr);
1407 output1.emplace_back(std::move(*mapper1.Front()));
1408 mapper1.PopFront();
1409 }
1410 ASSERT_TRUE(mapper1.Front() == nullptr);
1411
1412 EXPECT_EQ(output1[0].monotonic_event_time,
1413 e + chrono::seconds(100) + chrono::milliseconds(1000));
1414 EXPECT_TRUE(output1[0].data.Verify());
1415 EXPECT_EQ(output1[1].monotonic_event_time,
1416 e + chrono::seconds(100) + chrono::milliseconds(2000));
1417 EXPECT_TRUE(output1[1].data.Verify());
1418 EXPECT_EQ(output1[2].monotonic_event_time,
1419 e + chrono::seconds(100) + chrono::milliseconds(2000));
1420 EXPECT_TRUE(output1[2].data.Verify());
1421 EXPECT_EQ(output1[3].monotonic_event_time,
1422 e + chrono::seconds(100) + chrono::milliseconds(3000));
1423 EXPECT_TRUE(output1[3].data.Verify());
1424 }
Austin Schuh79b30942021-01-24 22:32:21 -08001425
1426 EXPECT_EQ(mapper0_count, 0u);
1427 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001428}
1429
1430// Tests that we properly sort log files with duplicate timestamps.
1431TEST_F(TimestampMapperTest, StartTime) {
1432 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1433 {
1434 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1435 writer0.QueueSpan(config0_.span());
1436 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1437 writer1.QueueSpan(config1_.span());
1438 DetachedBufferWriter writer2(logfile2_, std::make_unique<DummyEncoder>());
1439 writer2.QueueSpan(config3_.span());
1440 }
1441
1442 const std::vector<LogFile> parts =
1443 SortParts({logfile0_, logfile1_, logfile2_});
1444
Austin Schuh79b30942021-01-24 22:32:21 -08001445 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001446 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001447 mapper0.set_timestamp_callback(
1448 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001449
1450 EXPECT_EQ(mapper0.monotonic_start_time(), e + chrono::milliseconds(1));
1451 EXPECT_EQ(mapper0.realtime_start_time(),
1452 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001453 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001454}
1455
Austin Schuhfecf1d82020-12-19 16:57:28 -08001456// Tests that when a peer isn't registered, we treat that as if there was no
1457// data available.
1458TEST_F(TimestampMapperTest, NoPeer) {
1459 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1460 {
1461 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1462 writer0.QueueSpan(config0_.span());
1463 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1464 writer1.QueueSpan(config2_.span());
1465
1466 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
1467 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1468 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1469
1470 writer0.QueueSizedFlatbuffer(
1471 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1472 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1473 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1474
1475 writer0.QueueSizedFlatbuffer(
1476 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1477 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1478 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1479 }
1480
1481 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1482
1483 ASSERT_EQ(parts[0].logger_node, "pi1");
1484 ASSERT_EQ(parts[1].logger_node, "pi2");
1485
Austin Schuh79b30942021-01-24 22:32:21 -08001486 size_t mapper1_count = 0;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001487 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001488 mapper1.set_timestamp_callback(
1489 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001490
1491 {
1492 std::deque<TimestampedMessage> output1;
1493
1494 ASSERT_TRUE(mapper1.Front() != nullptr);
1495 output1.emplace_back(std::move(*mapper1.Front()));
1496 mapper1.PopFront();
1497 ASSERT_TRUE(mapper1.Front() != nullptr);
1498 output1.emplace_back(std::move(*mapper1.Front()));
1499 mapper1.PopFront();
1500 ASSERT_TRUE(mapper1.Front() != nullptr);
1501 output1.emplace_back(std::move(*mapper1.Front()));
1502 mapper1.PopFront();
1503 ASSERT_TRUE(mapper1.Front() == nullptr);
1504
1505 EXPECT_EQ(output1[0].monotonic_event_time,
1506 e + chrono::seconds(100) + chrono::milliseconds(1000));
1507 EXPECT_FALSE(output1[0].data.Verify());
1508 EXPECT_EQ(output1[1].monotonic_event_time,
1509 e + chrono::seconds(100) + chrono::milliseconds(2000));
1510 EXPECT_FALSE(output1[1].data.Verify());
1511 EXPECT_EQ(output1[2].monotonic_event_time,
1512 e + chrono::seconds(100) + chrono::milliseconds(3000));
1513 EXPECT_FALSE(output1[2].data.Verify());
1514 }
Austin Schuh79b30942021-01-24 22:32:21 -08001515 EXPECT_EQ(mapper1_count, 3u);
1516}
1517
1518// Tests that we can queue messages and call the timestamp callback for both
1519// nodes.
1520TEST_F(TimestampMapperTest, QueueUntilNode0) {
1521 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1522 {
1523 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1524 writer0.QueueSpan(config0_.span());
1525 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1526 writer1.QueueSpan(config2_.span());
1527
1528 writer0.QueueSizedFlatbuffer(
1529 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1530 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1531 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1532
1533 writer0.QueueSizedFlatbuffer(
1534 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
1535 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1536 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1537
1538 writer0.QueueSizedFlatbuffer(
1539 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
1540 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1541 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1542
1543 writer0.QueueSizedFlatbuffer(
1544 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
1545 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1546 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1547 }
1548
1549 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1550
1551 ASSERT_EQ(parts[0].logger_node, "pi1");
1552 ASSERT_EQ(parts[1].logger_node, "pi2");
1553
1554 size_t mapper0_count = 0;
1555 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1556 mapper0.set_timestamp_callback(
1557 [&](TimestampedMessage *) { ++mapper0_count; });
1558 size_t mapper1_count = 0;
1559 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1560 mapper1.set_timestamp_callback(
1561 [&](TimestampedMessage *) { ++mapper1_count; });
1562
1563 mapper0.AddPeer(&mapper1);
1564 mapper1.AddPeer(&mapper0);
1565
1566 {
1567 std::deque<TimestampedMessage> output0;
1568
1569 EXPECT_EQ(mapper0_count, 0u);
1570 EXPECT_EQ(mapper1_count, 0u);
1571 mapper0.QueueUntil(e + chrono::milliseconds(1000));
1572 EXPECT_EQ(mapper0_count, 3u);
1573 EXPECT_EQ(mapper1_count, 0u);
1574
1575 ASSERT_TRUE(mapper0.Front() != nullptr);
1576 EXPECT_EQ(mapper0_count, 3u);
1577 EXPECT_EQ(mapper1_count, 0u);
1578
1579 mapper0.QueueUntil(e + chrono::milliseconds(1500));
1580 EXPECT_EQ(mapper0_count, 3u);
1581 EXPECT_EQ(mapper1_count, 0u);
1582
1583 mapper0.QueueUntil(e + chrono::milliseconds(2500));
1584 EXPECT_EQ(mapper0_count, 4u);
1585 EXPECT_EQ(mapper1_count, 0u);
1586
1587 output0.emplace_back(std::move(*mapper0.Front()));
1588 mapper0.PopFront();
1589 output0.emplace_back(std::move(*mapper0.Front()));
1590 mapper0.PopFront();
1591 output0.emplace_back(std::move(*mapper0.Front()));
1592 mapper0.PopFront();
1593 output0.emplace_back(std::move(*mapper0.Front()));
1594 mapper0.PopFront();
1595
1596 EXPECT_EQ(mapper0_count, 4u);
1597 EXPECT_EQ(mapper1_count, 0u);
1598
1599 ASSERT_TRUE(mapper0.Front() == nullptr);
1600
1601 EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
1602 EXPECT_TRUE(output0[0].data.Verify());
1603 EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(1000));
1604 EXPECT_TRUE(output0[1].data.Verify());
1605 EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(2000));
1606 EXPECT_TRUE(output0[2].data.Verify());
1607 EXPECT_EQ(output0[3].monotonic_event_time, e + chrono::milliseconds(3000));
1608 EXPECT_TRUE(output0[3].data.Verify());
1609 }
1610
1611 {
1612 SCOPED_TRACE("Trying node1 now");
1613 std::deque<TimestampedMessage> output1;
1614
1615 EXPECT_EQ(mapper0_count, 4u);
1616 EXPECT_EQ(mapper1_count, 0u);
1617 mapper1.QueueUntil(e + chrono::seconds(100) + chrono::milliseconds(1000));
1618 EXPECT_EQ(mapper0_count, 4u);
1619 EXPECT_EQ(mapper1_count, 3u);
1620
1621 ASSERT_TRUE(mapper1.Front() != nullptr);
1622 EXPECT_EQ(mapper0_count, 4u);
1623 EXPECT_EQ(mapper1_count, 3u);
1624
1625 mapper1.QueueUntil(e + chrono::seconds(100) + chrono::milliseconds(1500));
1626 EXPECT_EQ(mapper0_count, 4u);
1627 EXPECT_EQ(mapper1_count, 3u);
1628
1629 mapper1.QueueUntil(e + chrono::seconds(100) + chrono::milliseconds(2500));
1630 EXPECT_EQ(mapper0_count, 4u);
1631 EXPECT_EQ(mapper1_count, 4u);
1632
1633 ASSERT_TRUE(mapper1.Front() != nullptr);
1634 EXPECT_EQ(mapper0_count, 4u);
1635 EXPECT_EQ(mapper1_count, 4u);
1636
1637 output1.emplace_back(std::move(*mapper1.Front()));
1638 mapper1.PopFront();
1639 ASSERT_TRUE(mapper1.Front() != nullptr);
1640 output1.emplace_back(std::move(*mapper1.Front()));
1641 mapper1.PopFront();
1642 ASSERT_TRUE(mapper1.Front() != nullptr);
1643 output1.emplace_back(std::move(*mapper1.Front()));
1644 mapper1.PopFront();
1645 ASSERT_TRUE(mapper1.Front() != nullptr);
1646 output1.emplace_back(std::move(*mapper1.Front()));
1647 mapper1.PopFront();
1648
1649 EXPECT_EQ(mapper0_count, 4u);
1650 EXPECT_EQ(mapper1_count, 4u);
1651
1652 ASSERT_TRUE(mapper1.Front() == nullptr);
1653
1654 EXPECT_EQ(mapper0_count, 4u);
1655 EXPECT_EQ(mapper1_count, 4u);
1656
1657 EXPECT_EQ(output1[0].monotonic_event_time,
1658 e + chrono::seconds(100) + chrono::milliseconds(1000));
1659 EXPECT_TRUE(output1[0].data.Verify());
1660 EXPECT_EQ(output1[1].monotonic_event_time,
1661 e + chrono::seconds(100) + chrono::milliseconds(1000));
1662 EXPECT_TRUE(output1[1].data.Verify());
1663 EXPECT_EQ(output1[2].monotonic_event_time,
1664 e + chrono::seconds(100) + chrono::milliseconds(2000));
1665 EXPECT_TRUE(output1[2].data.Verify());
1666 EXPECT_EQ(output1[3].monotonic_event_time,
1667 e + chrono::seconds(100) + chrono::milliseconds(3000));
1668 EXPECT_TRUE(output1[3].data.Verify());
1669 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08001670}
1671
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001672class BootMergerTest : public SortingElementTest {
1673 public:
1674 BootMergerTest()
1675 : SortingElementTest(),
1676 boot0_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07001677 /* 100ms */
1678 "max_out_of_order_duration": 100000000,
1679 "node": {
1680 "name": "pi2"
1681 },
1682 "logger_node": {
1683 "name": "pi1"
1684 },
1685 "monotonic_start_time": 1000000,
1686 "realtime_start_time": 1000000000000,
1687 "logger_monotonic_start_time": 1000000,
1688 "logger_realtime_start_time": 1000000000000,
1689 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
1690 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
1691 "parts_index": 0,
1692 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
1693 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
1694 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464"
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001695})")),
1696 boot1_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07001697 /* 100ms */
1698 "max_out_of_order_duration": 100000000,
1699 "node": {
1700 "name": "pi2"
1701 },
1702 "logger_node": {
1703 "name": "pi1"
1704 },
1705 "monotonic_start_time": 1000000,
1706 "realtime_start_time": 1000000000000,
1707 "logger_monotonic_start_time": 1000000,
1708 "logger_realtime_start_time": 1000000000000,
1709 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
1710 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
1711 "parts_index": 1,
1712 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
1713 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
1714 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2"
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001715})")) {}
Austin Schuh8bee6882021-06-28 21:03:28 -07001716
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001717 protected:
1718 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
1719 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
1720};
1721
1722// This tests that we can properly sort a multi-node log file which has the old
1723// (and buggy) timestamps in the header, and the non-resetting parts_index.
1724// These make it so we can just bairly figure out what happened first and what
1725// happened second, but not in a way that is robust to multiple nodes rebooting.
1726TEST_F(BootMergerTest, OldReboot) {
Austin Schuh8bee6882021-06-28 21:03:28 -07001727 {
1728 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001729 writer.QueueSpan(boot0_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07001730 }
1731 {
1732 DetachedBufferWriter writer(logfile1_, std::make_unique<DummyEncoder>());
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001733 writer.QueueSpan(boot1_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07001734 }
1735
1736 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1737
1738 ASSERT_EQ(parts.size(), 1u);
1739 ASSERT_EQ(parts[0].parts.size(), 2u);
1740
1741 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
1742 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001743 boot0_.message().source_node_boot_uuid()->string_view());
Austin Schuh8bee6882021-06-28 21:03:28 -07001744
1745 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
1746 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001747 boot1_.message().source_node_boot_uuid()->string_view());
1748}
1749
1750// This tests that we can produce messages ordered across a reboot.
1751TEST_F(BootMergerTest, SortAcrossReboot) {
1752 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1753 {
1754 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
1755 writer.QueueSpan(boot0_.span());
1756 writer.QueueSizedFlatbuffer(
1757 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1758 writer.QueueSizedFlatbuffer(
1759 MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
1760 }
1761 {
1762 DetachedBufferWriter writer(logfile1_, std::make_unique<DummyEncoder>());
1763 writer.QueueSpan(boot1_.span());
1764 writer.QueueSizedFlatbuffer(
1765 MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
1766 writer.QueueSizedFlatbuffer(
1767 MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
1768 }
1769
1770 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1771 ASSERT_EQ(parts.size(), 1u);
1772 ASSERT_EQ(parts[0].parts.size(), 2u);
1773
1774 BootMerger merger(FilterPartsForNode(parts, "pi2"));
1775
1776 EXPECT_EQ(merger.node(), 1u);
1777
1778 std::vector<Message> output;
1779 for (int i = 0; i < 4; ++i) {
1780 ASSERT_TRUE(merger.Front() != nullptr);
1781 output.emplace_back(std::move(*merger.Front()));
1782 merger.PopFront();
1783 }
1784
1785 ASSERT_TRUE(merger.Front() == nullptr);
1786
1787 EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
1788 EXPECT_EQ(output[0].boot_count, 0u);
1789 EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(2000));
1790 EXPECT_EQ(output[1].boot_count, 0u);
1791 EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(100));
1792 EXPECT_EQ(output[2].boot_count, 1u);
1793 EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(200));
1794 EXPECT_EQ(output[3].boot_count, 1u);
Austin Schuh8bee6882021-06-28 21:03:28 -07001795}
1796
Austin Schuhc243b422020-10-11 15:35:08 -07001797} // namespace testing
1798} // namespace logger
1799} // namespace aos