blob: dc491c1341d45cfe9ac123140aa4e600479637a3 [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
4#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07005
Austin Schuhc41603c2020-10-11 16:17:37 -07006#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -07007#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -08008#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -07009#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070010#include "aos/json_to_flatbuffer.h"
11#include "aos/testing/tmpdir.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070012#include "gtest/gtest.h"
Austin Schuhc243b422020-10-11 15:35:08 -070013
14namespace aos {
15namespace logger {
16namespace testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070017namespace chrono = std::chrono;
Austin Schuhc243b422020-10-11 15:35:08 -070018
Austin Schuhe243aaf2020-10-11 15:46:02 -070019// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070020template <typename T>
21SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
22 const std::string_view data) {
23 flatbuffers::FlatBufferBuilder fbb;
24 fbb.ForceDefaults(true);
25 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
26 return fbb.Release();
27}
28
Austin Schuhe243aaf2020-10-11 15:46:02 -070029// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070030TEST(SpanReaderTest, ReadWrite) {
31 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
32 unlink(logfile.c_str());
33
34 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080035 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070036 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080037 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070038
39 {
40 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080041 writer.QueueSpan(m1.span());
42 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070043 }
44
45 SpanReader reader(logfile);
46
47 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080048 EXPECT_EQ(reader.ReadMessage(), m1.span());
49 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070050 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
51}
52
Austin Schuhe243aaf2020-10-11 15:46:02 -070053// Tests that we can actually parse the resulting messages at a basic level
54// through MessageReader.
55TEST(MessageReaderTest, ReadWrite) {
56 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
57 unlink(logfile.c_str());
58
59 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
60 JsonToSizedFlatbuffer<LogFileHeader>(
61 R"({ "max_out_of_order_duration": 100000000 })");
62 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
63 JsonToSizedFlatbuffer<MessageHeader>(
64 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
65 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
66 JsonToSizedFlatbuffer<MessageHeader>(
67 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
68
69 {
70 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080071 writer.QueueSpan(config.span());
72 writer.QueueSpan(m1.span());
73 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -070074 }
75
76 MessageReader reader(logfile);
77
78 EXPECT_EQ(reader.filename(), logfile);
79
80 EXPECT_EQ(
81 reader.max_out_of_order_duration(),
82 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
83 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
84 EXPECT_TRUE(reader.ReadMessage());
85 EXPECT_EQ(reader.newest_timestamp(),
86 monotonic_clock::time_point(chrono::nanoseconds(1)));
87 EXPECT_TRUE(reader.ReadMessage());
88 EXPECT_EQ(reader.newest_timestamp(),
89 monotonic_clock::time_point(chrono::nanoseconds(2)));
90 EXPECT_FALSE(reader.ReadMessage());
91}
92
Austin Schuh32f68492020-11-08 21:45:51 -080093// Tests that we explode when messages are too far out of order.
94TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
95 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
96 unlink(logfile0.c_str());
97
98 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
99 JsonToSizedFlatbuffer<LogFileHeader>(
100 R"({
101 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800102 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800103 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
104 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
105 "parts_index": 0
106})");
107
108 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
109 JsonToSizedFlatbuffer<MessageHeader>(
110 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
111 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
112 JsonToSizedFlatbuffer<MessageHeader>(
113 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
114 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
115 JsonToSizedFlatbuffer<MessageHeader>(
116 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
117
118 {
119 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800120 writer.QueueSpan(config0.span());
121 writer.QueueSpan(m1.span());
122 writer.QueueSpan(m2.span());
123 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800124 }
125
126 const std::vector<LogFile> parts = SortParts({logfile0});
127
128 PartsMessageReader reader(parts[0].parts[0]);
129
130 EXPECT_TRUE(reader.ReadMessage());
131 EXPECT_TRUE(reader.ReadMessage());
132 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
133}
134
Austin Schuhc41603c2020-10-11 16:17:37 -0700135// Tests that we can transparently re-assemble part files with a
136// PartsMessageReader.
137TEST(PartsMessageReaderTest, ReadWrite) {
138 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
139 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
140 unlink(logfile0.c_str());
141 unlink(logfile1.c_str());
142
143 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
144 JsonToSizedFlatbuffer<LogFileHeader>(
145 R"({
146 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800147 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700148 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
149 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
150 "parts_index": 0
151})");
152 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
153 JsonToSizedFlatbuffer<LogFileHeader>(
154 R"({
155 "max_out_of_order_duration": 200000000,
156 "monotonic_start_time": 0,
157 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800158 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700159 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
160 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
161 "parts_index": 1
162})");
163
164 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
165 JsonToSizedFlatbuffer<MessageHeader>(
166 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
167 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
168 JsonToSizedFlatbuffer<MessageHeader>(
169 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
170
171 {
172 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800173 writer.QueueSpan(config0.span());
174 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700175 }
176 {
177 DetachedBufferWriter writer(logfile1, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800178 writer.QueueSpan(config1.span());
179 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700180 }
181
182 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
183
184 PartsMessageReader reader(parts[0].parts[0]);
185
186 EXPECT_EQ(reader.filename(), logfile0);
187
188 // Confirm that the timestamps track, and the filename also updates.
189 // Read the first message.
190 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
191 EXPECT_EQ(
192 reader.max_out_of_order_duration(),
193 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
194 EXPECT_TRUE(reader.ReadMessage());
195 EXPECT_EQ(reader.filename(), logfile0);
196 EXPECT_EQ(reader.newest_timestamp(),
197 monotonic_clock::time_point(chrono::nanoseconds(1)));
198 EXPECT_EQ(
199 reader.max_out_of_order_duration(),
200 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
201
202 // Read the second message.
203 EXPECT_TRUE(reader.ReadMessage());
204 EXPECT_EQ(reader.filename(), logfile1);
205 EXPECT_EQ(reader.newest_timestamp(),
206 monotonic_clock::time_point(chrono::nanoseconds(2)));
207 EXPECT_EQ(
208 reader.max_out_of_order_duration(),
209 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
210
211 // And then confirm that reading again returns no message.
212 EXPECT_FALSE(reader.ReadMessage());
213 EXPECT_EQ(reader.filename(), logfile1);
214 EXPECT_EQ(
215 reader.max_out_of_order_duration(),
216 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800217 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Austin Schuhc41603c2020-10-11 16:17:37 -0700218}
Austin Schuh32f68492020-11-08 21:45:51 -0800219
Austin Schuh1be0ce42020-11-29 22:43:26 -0800220// Tests that Message's operator < works as expected.
221TEST(MessageTest, Sorting) {
222 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
223
224 Message m1{.channel_index = 0,
225 .queue_index = 0,
226 .timestamp = e + chrono::milliseconds(1),
227 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
228 Message m2{.channel_index = 0,
229 .queue_index = 0,
230 .timestamp = e + chrono::milliseconds(2),
231 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
232
233 EXPECT_LT(m1, m2);
234 EXPECT_GE(m2, m1);
235
236 m1.timestamp = e;
237 m2.timestamp = e;
238
239 m1.channel_index = 1;
240 m2.channel_index = 2;
241
242 EXPECT_LT(m1, m2);
243 EXPECT_GE(m2, m1);
244
245 m1.channel_index = 0;
246 m2.channel_index = 0;
247 m1.queue_index = 0;
248 m2.queue_index = 1;
249
250 EXPECT_LT(m1, m2);
251 EXPECT_GE(m2, m1);
252}
253
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800254aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
255 const aos::FlatbufferDetachedBuffer<Configuration> &config,
256 const std::string_view json) {
257 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700258 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800259 flatbuffers::Offset<Configuration> config_offset =
260 aos::CopyFlatBuffer(config, &fbb);
261 LogFileHeader::Builder header_builder(fbb);
262 header_builder.add_configuration(config_offset);
263 fbb.Finish(header_builder.Finish());
264 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
265
266 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
267 JsonToFlatbuffer<LogFileHeader>(json));
268 CHECK(header_updates.Verify());
269 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700270 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800271 fbb2.FinishSizePrefixed(
272 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
273 return fbb2.Release();
274}
275
276class SortingElementTest : public ::testing::Test {
277 public:
278 SortingElementTest()
279 : config_(JsonToFlatbuffer<Configuration>(
280 R"({
281 "channels": [
282 {
283 "name": "/a",
284 "type": "aos.logger.testing.TestMessage",
285 "source_node": "pi1",
286 "destination_nodes": [
287 {
288 "name": "pi2"
289 },
290 {
291 "name": "pi3"
292 }
293 ]
294 },
295 {
296 "name": "/b",
297 "type": "aos.logger.testing.TestMessage",
298 "source_node": "pi1"
299 },
300 {
301 "name": "/c",
302 "type": "aos.logger.testing.TestMessage",
303 "source_node": "pi1"
304 }
305 ],
306 "nodes": [
307 {
308 "name": "pi1"
309 },
310 {
311 "name": "pi2"
312 },
313 {
314 "name": "pi3"
315 }
316 ]
317}
318)")),
319 config0_(MakeHeader(config_, R"({
320 /* 100ms */
321 "max_out_of_order_duration": 100000000,
322 "node": {
323 "name": "pi1"
324 },
325 "logger_node": {
326 "name": "pi1"
327 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800328 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800329 "realtime_start_time": 1000000000000,
330 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
331 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
332 "parts_index": 0
333})")),
334 config1_(MakeHeader(config_,
335 R"({
336 /* 100ms */
337 "max_out_of_order_duration": 100000000,
338 "node": {
339 "name": "pi1"
340 },
341 "logger_node": {
342 "name": "pi1"
343 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800344 "monotonic_start_time": 1000000,
345 "realtime_start_time": 1000000000000,
346 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
347 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
348 "parts_index": 0
349})")),
350 config2_(MakeHeader(config_,
351 R"({
352 /* 100ms */
353 "max_out_of_order_duration": 100000000,
354 "node": {
355 "name": "pi2"
356 },
357 "logger_node": {
358 "name": "pi2"
359 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800360 "monotonic_start_time": 0,
361 "realtime_start_time": 1000000000000,
Austin Schuhd2f96102020-12-01 20:27:29 -0800362 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
363 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
364 "parts_index": 0
365})")),
366 config3_(MakeHeader(config_,
367 R"({
368 /* 100ms */
369 "max_out_of_order_duration": 100000000,
370 "node": {
371 "name": "pi1"
372 },
373 "logger_node": {
374 "name": "pi1"
375 },
376 "monotonic_start_time": 2000000,
377 "realtime_start_time": 1000000000,
378 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
379 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800380 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800381})")),
382 config4_(MakeHeader(config_,
383 R"({
384 /* 100ms */
385 "max_out_of_order_duration": 100000000,
386 "node": {
387 "name": "pi2"
388 },
389 "logger_node": {
390 "name": "pi1"
391 },
392 "monotonic_start_time": 2000000,
393 "realtime_start_time": 1000000000,
394 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
395 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
396 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800397})")) {
398 unlink(logfile0_.c_str());
399 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800400 unlink(logfile2_.c_str());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800401 queue_index_.resize(kChannels);
402 }
403
404 protected:
405 static constexpr size_t kChannels = 3u;
406
407 flatbuffers::DetachedBuffer MakeLogMessage(
408 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
409 int value) {
410 flatbuffers::FlatBufferBuilder message_fbb;
411 message_fbb.ForceDefaults(true);
412 TestMessage::Builder test_message_builder(message_fbb);
413 test_message_builder.add_value(value);
414 message_fbb.Finish(test_message_builder.Finish());
415
416 aos::Context context;
417 context.monotonic_event_time = monotonic_now;
418 context.realtime_event_time = aos::realtime_clock::epoch() +
419 chrono::seconds(1000) +
420 monotonic_now.time_since_epoch();
421 context.queue_index = queue_index_[channel_index];
422 context.size = message_fbb.GetSize();
423 context.data = message_fbb.GetBufferPointer();
424
425 ++queue_index_[channel_index];
426
427 flatbuffers::FlatBufferBuilder fbb;
428 fbb.FinishSizePrefixed(
429 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
430
431 return fbb.Release();
432 }
433
434 flatbuffers::DetachedBuffer MakeTimestampMessage(
435 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800436 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
437 monotonic_clock::time_point monotonic_timestamp_time =
438 monotonic_clock::min_time) {
439 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800440 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800441
442 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800443 fbb.ForceDefaults(true);
444
445 logger::MessageHeader::Builder message_header_builder(fbb);
446
447 message_header_builder.add_channel_index(channel_index);
448
449 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
450 100);
451 message_header_builder.add_monotonic_sent_time(
452 monotonic_sent_time.time_since_epoch().count());
453 message_header_builder.add_realtime_sent_time(
454 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
455 monotonic_sent_time.time_since_epoch())
456 .time_since_epoch()
457 .count());
458
459 message_header_builder.add_monotonic_remote_time(
460 sender_monotonic_now.time_since_epoch().count());
461 message_header_builder.add_realtime_remote_time(
462 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
463 sender_monotonic_now.time_since_epoch())
464 .time_since_epoch()
465 .count());
466 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
467 1);
468
469 if (monotonic_timestamp_time != monotonic_clock::min_time) {
470 message_header_builder.add_monotonic_timestamp_time(
471 monotonic_timestamp_time.time_since_epoch().count());
472 }
473
474 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800475 LOG(INFO) << aos::FlatbufferToJson(
476 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
477 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
478
479 return fbb.Release();
480 }
481
482 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
483 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800484 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800485
486 const aos::FlatbufferDetachedBuffer<Configuration> config_;
487 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
488 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800489 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
490 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800491 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800492
493 std::vector<uint32_t> queue_index_;
494};
495
496using LogPartsSorterTest = SortingElementTest;
497using LogPartsSorterDeathTest = LogPartsSorterTest;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800498using NodeMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800499using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800500
501// Tests that we can pull messages out of a log sorted in order.
502TEST_F(LogPartsSorterTest, Pull) {
503 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
504 {
505 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
506 writer.QueueSpan(config0_.span());
507 writer.QueueSizedFlatbuffer(
508 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
509 writer.QueueSizedFlatbuffer(
510 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
511 writer.QueueSizedFlatbuffer(
512 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
513 writer.QueueSizedFlatbuffer(
514 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
515 }
516
517 const std::vector<LogFile> parts = SortParts({logfile0_});
518
519 LogPartsSorter parts_sorter(parts[0].parts[0]);
520
521 // Confirm we aren't sorted until any time until the message is popped.
522 // Peeking shouldn't change the sorted until time.
523 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
524
525 std::deque<Message> output;
526
527 ASSERT_TRUE(parts_sorter.Front() != nullptr);
528 output.emplace_back(std::move(*parts_sorter.Front()));
529 parts_sorter.PopFront();
530 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
531
532 ASSERT_TRUE(parts_sorter.Front() != nullptr);
533 output.emplace_back(std::move(*parts_sorter.Front()));
534 parts_sorter.PopFront();
535 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
536
537 ASSERT_TRUE(parts_sorter.Front() != nullptr);
538 output.emplace_back(std::move(*parts_sorter.Front()));
539 parts_sorter.PopFront();
540 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
541
542 ASSERT_TRUE(parts_sorter.Front() != nullptr);
543 output.emplace_back(std::move(*parts_sorter.Front()));
544 parts_sorter.PopFront();
545 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
546
547 ASSERT_TRUE(parts_sorter.Front() == nullptr);
548
549 EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
550 EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(1000));
551 EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(1901));
552 EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(2000));
553}
554
Austin Schuhb000de62020-12-03 22:00:40 -0800555// Tests that we can pull messages out of a log sorted in order.
556TEST_F(LogPartsSorterTest, WayBeforeStart) {
557 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
558 {
559 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
560 writer.QueueSpan(config0_.span());
561 writer.QueueSizedFlatbuffer(
562 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
563 writer.QueueSizedFlatbuffer(
564 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
565 writer.QueueSizedFlatbuffer(
566 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
567 writer.QueueSizedFlatbuffer(
568 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
569 writer.QueueSizedFlatbuffer(
570 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
571 }
572
573 const std::vector<LogFile> parts = SortParts({logfile0_});
574
575 LogPartsSorter parts_sorter(parts[0].parts[0]);
576
577 // Confirm we aren't sorted until any time until the message is popped.
578 // Peeking shouldn't change the sorted until time.
579 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
580
581 std::deque<Message> output;
582
583 for (monotonic_clock::time_point t :
584 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
585 e + chrono::milliseconds(1900), monotonic_clock::max_time,
586 monotonic_clock::max_time}) {
587 ASSERT_TRUE(parts_sorter.Front() != nullptr);
588 output.emplace_back(std::move(*parts_sorter.Front()));
589 parts_sorter.PopFront();
590 EXPECT_EQ(parts_sorter.sorted_until(), t);
591 }
592
593 ASSERT_TRUE(parts_sorter.Front() == nullptr);
594
595 EXPECT_EQ(output[0].timestamp, e - chrono::milliseconds(1000));
596 EXPECT_EQ(output[1].timestamp, e - chrono::milliseconds(500));
597 EXPECT_EQ(output[2].timestamp, e - chrono::milliseconds(10));
598 EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(1901));
599 EXPECT_EQ(output[4].timestamp, e + chrono::milliseconds(2000));
600}
601
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800602// Tests that messages too far out of order trigger death.
603TEST_F(LogPartsSorterDeathTest, Pull) {
604 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
605 {
606 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
607 writer.QueueSpan(config0_.span());
608 writer.QueueSizedFlatbuffer(
609 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
610 writer.QueueSizedFlatbuffer(
611 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
612 writer.QueueSizedFlatbuffer(
613 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
614 // The following message is too far out of order and will trigger the CHECK.
615 writer.QueueSizedFlatbuffer(
616 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
617 }
618
619 const std::vector<LogFile> parts = SortParts({logfile0_});
620
621 LogPartsSorter parts_sorter(parts[0].parts[0]);
622
623 // Confirm we aren't sorted until any time until the message is popped.
624 // Peeking shouldn't change the sorted until time.
625 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
626 std::deque<Message> output;
627
628 ASSERT_TRUE(parts_sorter.Front() != nullptr);
629 parts_sorter.PopFront();
630 ASSERT_TRUE(parts_sorter.Front() != nullptr);
631 ASSERT_TRUE(parts_sorter.Front() != nullptr);
632 parts_sorter.PopFront();
633
Austin Schuha040c3f2021-02-13 16:09:07 -0800634 EXPECT_DEATH({ parts_sorter.Front(); }, "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800635}
636
Austin Schuh8f52ed52020-11-30 23:12:39 -0800637// Tests that we can merge data from 2 separate files, including duplicate data.
638TEST_F(NodeMergerTest, TwoFileMerger) {
639 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
640 {
641 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
642 writer0.QueueSpan(config0_.span());
643 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
644 writer1.QueueSpan(config1_.span());
645
646 writer0.QueueSizedFlatbuffer(
647 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
648 writer1.QueueSizedFlatbuffer(
649 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
650
651 writer0.QueueSizedFlatbuffer(
652 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
653 writer1.QueueSizedFlatbuffer(
654 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
655
656 // Make a duplicate!
657 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
658 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
659 writer0.QueueSpan(msg.span());
660 writer1.QueueSpan(msg.span());
661
662 writer1.QueueSizedFlatbuffer(
663 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
664 }
665
666 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Austin Schuhd2f96102020-12-01 20:27:29 -0800667 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800668
Austin Schuhd2f96102020-12-01 20:27:29 -0800669 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800670
671 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
672
673 std::deque<Message> output;
674
675 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
676 ASSERT_TRUE(merger.Front() != nullptr);
677 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
678
679 output.emplace_back(std::move(*merger.Front()));
680 merger.PopFront();
681 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
682
683 ASSERT_TRUE(merger.Front() != nullptr);
684 output.emplace_back(std::move(*merger.Front()));
685 merger.PopFront();
686 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
687
688 ASSERT_TRUE(merger.Front() != nullptr);
689 output.emplace_back(std::move(*merger.Front()));
690 merger.PopFront();
691 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
692
693 ASSERT_TRUE(merger.Front() != nullptr);
694 output.emplace_back(std::move(*merger.Front()));
695 merger.PopFront();
696 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
697
698 ASSERT_TRUE(merger.Front() != nullptr);
699 output.emplace_back(std::move(*merger.Front()));
700 merger.PopFront();
701 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
702
703 ASSERT_TRUE(merger.Front() != nullptr);
704 output.emplace_back(std::move(*merger.Front()));
705 merger.PopFront();
706 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
707
708 ASSERT_TRUE(merger.Front() == nullptr);
709
710 EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
711 EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(1001));
712 EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(1002));
713 EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(2000));
714 EXPECT_EQ(output[4].timestamp, e + chrono::milliseconds(3000));
715 EXPECT_EQ(output[5].timestamp, e + chrono::milliseconds(3002));
716}
717
Austin Schuh8bf1e632021-01-02 22:41:04 -0800718// Tests that we can merge timestamps with various combinations of
719// monotonic_timestamp_time.
720TEST_F(NodeMergerTest, TwoFileTimestampMerger) {
721 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
722 {
723 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
724 writer0.QueueSpan(config0_.span());
725 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
726 writer1.QueueSpan(config1_.span());
727
728 // Neither has it.
729 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
730 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
731 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
732 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
733 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
734
735 // First only has it.
736 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
737 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
738 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
739 e + chrono::nanoseconds(971)));
740 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
741 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
742
743 // Second only has it.
744 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
745 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
746 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
747 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
748 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
749 e + chrono::nanoseconds(972)));
750
751 // Both have it.
752 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
753 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
754 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
755 e + chrono::nanoseconds(973)));
756 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
757 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
758 e + chrono::nanoseconds(973)));
759 }
760
761 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
762 ASSERT_EQ(parts.size(), 1u);
763
764 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
765
766 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
767
768 std::deque<Message> output;
769
770 for (int i = 0; i < 4; ++i) {
771 ASSERT_TRUE(merger.Front() != nullptr);
772 output.emplace_back(std::move(*merger.Front()));
773 merger.PopFront();
774 }
775 ASSERT_TRUE(merger.Front() == nullptr);
776
777 EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(101000));
778 EXPECT_FALSE(output[0].data.message().has_monotonic_timestamp_time());
779 EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(101001));
780 EXPECT_TRUE(output[1].data.message().has_monotonic_timestamp_time());
781 EXPECT_EQ(output[1].data.message().monotonic_timestamp_time(), 971);
782 EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(101002));
783 EXPECT_TRUE(output[2].data.message().has_monotonic_timestamp_time());
784 EXPECT_EQ(output[2].data.message().monotonic_timestamp_time(), 972);
785 EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(101003));
786 EXPECT_TRUE(output[3].data.message().has_monotonic_timestamp_time());
787 EXPECT_EQ(output[3].data.message().monotonic_timestamp_time(), 973);
788}
789
Austin Schuhd2f96102020-12-01 20:27:29 -0800790// Tests that we can match timestamps on delivered messages.
791TEST_F(TimestampMapperTest, ReadNode0First) {
792 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
793 {
794 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
795 writer0.QueueSpan(config0_.span());
796 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
797 writer1.QueueSpan(config2_.span());
798
799 writer0.QueueSizedFlatbuffer(
800 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
801 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
802 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
803
804 writer0.QueueSizedFlatbuffer(
805 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
806 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
807 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
808
809 writer0.QueueSizedFlatbuffer(
810 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
811 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
812 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
813 }
814
815 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
816
817 ASSERT_EQ(parts[0].logger_node, "pi1");
818 ASSERT_EQ(parts[1].logger_node, "pi2");
819
Austin Schuh79b30942021-01-24 22:32:21 -0800820 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800821 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -0800822 mapper0.set_timestamp_callback(
823 [&](TimestampedMessage *) { ++mapper0_count; });
824 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800825 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -0800826 mapper1.set_timestamp_callback(
827 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -0800828
829 mapper0.AddPeer(&mapper1);
830 mapper1.AddPeer(&mapper0);
831
832 {
833 std::deque<TimestampedMessage> output0;
834
Austin Schuh79b30942021-01-24 22:32:21 -0800835 EXPECT_EQ(mapper0_count, 0u);
836 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800837 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800838 EXPECT_EQ(mapper0_count, 1u);
839 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800840 output0.emplace_back(std::move(*mapper0.Front()));
841 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700842 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800843 EXPECT_EQ(mapper0_count, 1u);
844 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800845
846 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800847 EXPECT_EQ(mapper0_count, 2u);
848 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800849 output0.emplace_back(std::move(*mapper0.Front()));
850 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700851 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800852
853 ASSERT_TRUE(mapper0.Front() != nullptr);
854 output0.emplace_back(std::move(*mapper0.Front()));
855 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700856 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800857
Austin Schuh79b30942021-01-24 22:32:21 -0800858 EXPECT_EQ(mapper0_count, 3u);
859 EXPECT_EQ(mapper1_count, 0u);
860
Austin Schuhd2f96102020-12-01 20:27:29 -0800861 ASSERT_TRUE(mapper0.Front() == nullptr);
862
863 EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
864 EXPECT_TRUE(output0[0].data.Verify());
865 EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
866 EXPECT_TRUE(output0[1].data.Verify());
867 EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
868 EXPECT_TRUE(output0[2].data.Verify());
869 }
870
871 {
872 SCOPED_TRACE("Trying node1 now");
873 std::deque<TimestampedMessage> output1;
874
Austin Schuh79b30942021-01-24 22:32:21 -0800875 EXPECT_EQ(mapper0_count, 3u);
876 EXPECT_EQ(mapper1_count, 0u);
877
Austin Schuhd2f96102020-12-01 20:27:29 -0800878 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800879 EXPECT_EQ(mapper0_count, 3u);
880 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800881 output1.emplace_back(std::move(*mapper1.Front()));
882 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700883 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800884 EXPECT_EQ(mapper0_count, 3u);
885 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800886
887 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800888 EXPECT_EQ(mapper0_count, 3u);
889 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800890 output1.emplace_back(std::move(*mapper1.Front()));
891 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700892 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800893
894 ASSERT_TRUE(mapper1.Front() != nullptr);
895 output1.emplace_back(std::move(*mapper1.Front()));
896 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700897 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800898
Austin Schuh79b30942021-01-24 22:32:21 -0800899 EXPECT_EQ(mapper0_count, 3u);
900 EXPECT_EQ(mapper1_count, 3u);
901
Austin Schuhd2f96102020-12-01 20:27:29 -0800902 ASSERT_TRUE(mapper1.Front() == nullptr);
903
Austin Schuh79b30942021-01-24 22:32:21 -0800904 EXPECT_EQ(mapper0_count, 3u);
905 EXPECT_EQ(mapper1_count, 3u);
906
Austin Schuhd2f96102020-12-01 20:27:29 -0800907 EXPECT_EQ(output1[0].monotonic_event_time,
908 e + chrono::seconds(100) + chrono::milliseconds(1000));
909 EXPECT_TRUE(output1[0].data.Verify());
910 EXPECT_EQ(output1[1].monotonic_event_time,
911 e + chrono::seconds(100) + chrono::milliseconds(2000));
912 EXPECT_TRUE(output1[1].data.Verify());
913 EXPECT_EQ(output1[2].monotonic_event_time,
914 e + chrono::seconds(100) + chrono::milliseconds(3000));
915 EXPECT_TRUE(output1[2].data.Verify());
916 }
917}
918
Austin Schuh8bf1e632021-01-02 22:41:04 -0800919// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
920// returned.
921TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
922 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
923 {
924 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
925 writer0.QueueSpan(config0_.span());
926 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
927 writer1.QueueSpan(config4_.span());
928
929 writer0.QueueSizedFlatbuffer(
930 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
931 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
932 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
933 e + chrono::nanoseconds(971)));
934
935 writer0.QueueSizedFlatbuffer(
936 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
937 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
938 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
939 e + chrono::nanoseconds(5458)));
940
941 writer0.QueueSizedFlatbuffer(
942 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
943 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
944 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
945 }
946
947 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
948
949 for (const auto &p : parts) {
950 LOG(INFO) << p;
951 }
952
953 ASSERT_EQ(parts.size(), 1u);
954
Austin Schuh79b30942021-01-24 22:32:21 -0800955 size_t mapper0_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800956 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -0800957 mapper0.set_timestamp_callback(
958 [&](TimestampedMessage *) { ++mapper0_count; });
959 size_t mapper1_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800960 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -0800961 mapper1.set_timestamp_callback(
962 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -0800963
964 mapper0.AddPeer(&mapper1);
965 mapper1.AddPeer(&mapper0);
966
967 {
968 std::deque<TimestampedMessage> output0;
969
970 for (int i = 0; i < 3; ++i) {
971 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
972 output0.emplace_back(std::move(*mapper0.Front()));
973 mapper0.PopFront();
974 }
975
976 ASSERT_TRUE(mapper0.Front() == nullptr);
977
978 EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
979 EXPECT_EQ(output0[0].monotonic_timestamp_time, monotonic_clock::min_time);
980 EXPECT_TRUE(output0[0].data.Verify());
981 EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
982 EXPECT_EQ(output0[1].monotonic_timestamp_time, monotonic_clock::min_time);
983 EXPECT_TRUE(output0[1].data.Verify());
984 EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
985 EXPECT_EQ(output0[2].monotonic_timestamp_time, monotonic_clock::min_time);
986 EXPECT_TRUE(output0[2].data.Verify());
987 }
988
989 {
990 SCOPED_TRACE("Trying node1 now");
991 std::deque<TimestampedMessage> output1;
992
993 for (int i = 0; i < 3; ++i) {
994 ASSERT_TRUE(mapper1.Front() != nullptr);
995 output1.emplace_back(std::move(*mapper1.Front()));
996 mapper1.PopFront();
997 }
998
999 ASSERT_TRUE(mapper1.Front() == nullptr);
1000
1001 EXPECT_EQ(output1[0].monotonic_event_time,
1002 e + chrono::seconds(100) + chrono::milliseconds(1000));
1003 EXPECT_EQ(output1[0].monotonic_timestamp_time,
1004 e + chrono::nanoseconds(971));
1005 EXPECT_TRUE(output1[0].data.Verify());
1006 EXPECT_EQ(output1[1].monotonic_event_time,
1007 e + chrono::seconds(100) + chrono::milliseconds(2000));
1008 EXPECT_EQ(output1[1].monotonic_timestamp_time,
1009 e + chrono::nanoseconds(5458));
1010 EXPECT_TRUE(output1[1].data.Verify());
1011 EXPECT_EQ(output1[2].monotonic_event_time,
1012 e + chrono::seconds(100) + chrono::milliseconds(3000));
1013 EXPECT_EQ(output1[2].monotonic_timestamp_time, monotonic_clock::min_time);
1014 EXPECT_TRUE(output1[2].data.Verify());
1015 }
Austin Schuh79b30942021-01-24 22:32:21 -08001016
1017 EXPECT_EQ(mapper0_count, 3u);
1018 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001019}
1020
Austin Schuhd2f96102020-12-01 20:27:29 -08001021// Tests that we can match timestamps on delivered messages. By doing this in
1022// the reverse order, the second node needs to queue data up from the first node
1023// to find the matching timestamp.
1024TEST_F(TimestampMapperTest, ReadNode1First) {
1025 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1026 {
1027 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1028 writer0.QueueSpan(config0_.span());
1029 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1030 writer1.QueueSpan(config2_.span());
1031
1032 writer0.QueueSizedFlatbuffer(
1033 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1034 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1035 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1036
1037 writer0.QueueSizedFlatbuffer(
1038 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1039 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1040 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1041
1042 writer0.QueueSizedFlatbuffer(
1043 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1044 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1045 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1046 }
1047
1048 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1049
1050 ASSERT_EQ(parts[0].logger_node, "pi1");
1051 ASSERT_EQ(parts[1].logger_node, "pi2");
1052
Austin Schuh79b30942021-01-24 22:32:21 -08001053 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001054 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001055 mapper0.set_timestamp_callback(
1056 [&](TimestampedMessage *) { ++mapper0_count; });
1057 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001058 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001059 mapper1.set_timestamp_callback(
1060 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001061
1062 mapper0.AddPeer(&mapper1);
1063 mapper1.AddPeer(&mapper0);
1064
1065 {
1066 SCOPED_TRACE("Trying node1 now");
1067 std::deque<TimestampedMessage> output1;
1068
1069 ASSERT_TRUE(mapper1.Front() != nullptr);
1070 output1.emplace_back(std::move(*mapper1.Front()));
1071 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001072 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001073
1074 ASSERT_TRUE(mapper1.Front() != nullptr);
1075 output1.emplace_back(std::move(*mapper1.Front()));
1076 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001077 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001078
1079 ASSERT_TRUE(mapper1.Front() != nullptr);
1080 output1.emplace_back(std::move(*mapper1.Front()));
1081 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001082 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001083
1084 ASSERT_TRUE(mapper1.Front() == nullptr);
1085
1086 EXPECT_EQ(output1[0].monotonic_event_time,
1087 e + chrono::seconds(100) + chrono::milliseconds(1000));
1088 EXPECT_TRUE(output1[0].data.Verify());
1089 EXPECT_EQ(output1[1].monotonic_event_time,
1090 e + chrono::seconds(100) + chrono::milliseconds(2000));
1091 EXPECT_TRUE(output1[1].data.Verify());
1092 EXPECT_EQ(output1[2].monotonic_event_time,
1093 e + chrono::seconds(100) + chrono::milliseconds(3000));
1094 EXPECT_TRUE(output1[2].data.Verify());
1095 }
1096
1097 {
1098 std::deque<TimestampedMessage> output0;
1099
1100 ASSERT_TRUE(mapper0.Front() != nullptr);
1101 output0.emplace_back(std::move(*mapper0.Front()));
1102 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001103 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001104
1105 ASSERT_TRUE(mapper0.Front() != nullptr);
1106 output0.emplace_back(std::move(*mapper0.Front()));
1107 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001108 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001109
1110 ASSERT_TRUE(mapper0.Front() != nullptr);
1111 output0.emplace_back(std::move(*mapper0.Front()));
1112 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001113 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001114
1115 ASSERT_TRUE(mapper0.Front() == nullptr);
1116
1117 EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
1118 EXPECT_TRUE(output0[0].data.Verify());
1119 EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
1120 EXPECT_TRUE(output0[1].data.Verify());
1121 EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
1122 EXPECT_TRUE(output0[2].data.Verify());
1123 }
Austin Schuh79b30942021-01-24 22:32:21 -08001124
1125 EXPECT_EQ(mapper0_count, 3u);
1126 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001127}
1128
1129// Tests that we return just the timestamps if we couldn't find the data and the
1130// missing data was at the beginning of the file.
1131TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1132 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1133 {
1134 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1135 writer0.QueueSpan(config0_.span());
1136 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1137 writer1.QueueSpan(config2_.span());
1138
1139 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
1140 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1141 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1142
1143 writer0.QueueSizedFlatbuffer(
1144 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1145 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1146 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1147
1148 writer0.QueueSizedFlatbuffer(
1149 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1150 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1151 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1152 }
1153
1154 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1155
1156 ASSERT_EQ(parts[0].logger_node, "pi1");
1157 ASSERT_EQ(parts[1].logger_node, "pi2");
1158
Austin Schuh79b30942021-01-24 22:32:21 -08001159 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001160 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001161 mapper0.set_timestamp_callback(
1162 [&](TimestampedMessage *) { ++mapper0_count; });
1163 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001164 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001165 mapper1.set_timestamp_callback(
1166 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001167
1168 mapper0.AddPeer(&mapper1);
1169 mapper1.AddPeer(&mapper0);
1170
1171 {
1172 SCOPED_TRACE("Trying node1 now");
1173 std::deque<TimestampedMessage> output1;
1174
1175 ASSERT_TRUE(mapper1.Front() != nullptr);
1176 output1.emplace_back(std::move(*mapper1.Front()));
1177 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001178 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001179
1180 ASSERT_TRUE(mapper1.Front() != nullptr);
1181 output1.emplace_back(std::move(*mapper1.Front()));
1182 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001183 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001184
1185 ASSERT_TRUE(mapper1.Front() != nullptr);
1186 output1.emplace_back(std::move(*mapper1.Front()));
1187 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001188 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001189
1190 ASSERT_TRUE(mapper1.Front() == nullptr);
1191
1192 EXPECT_EQ(output1[0].monotonic_event_time,
1193 e + chrono::seconds(100) + chrono::milliseconds(1000));
1194 EXPECT_FALSE(output1[0].data.Verify());
1195 EXPECT_EQ(output1[1].monotonic_event_time,
1196 e + chrono::seconds(100) + chrono::milliseconds(2000));
1197 EXPECT_TRUE(output1[1].data.Verify());
1198 EXPECT_EQ(output1[2].monotonic_event_time,
1199 e + chrono::seconds(100) + chrono::milliseconds(3000));
1200 EXPECT_TRUE(output1[2].data.Verify());
1201 }
Austin Schuh79b30942021-01-24 22:32:21 -08001202
1203 EXPECT_EQ(mapper0_count, 0u);
1204 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001205}
1206
1207// Tests that we return just the timestamps if we couldn't find the data and the
1208// missing data was at the end of the file.
1209TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1210 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1211 {
1212 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1213 writer0.QueueSpan(config0_.span());
1214 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1215 writer1.QueueSpan(config2_.span());
1216
1217 writer0.QueueSizedFlatbuffer(
1218 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1219 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1220 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1221
1222 writer0.QueueSizedFlatbuffer(
1223 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1224 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1225 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1226
1227 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
1228 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1229 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1230 }
1231
1232 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1233
1234 ASSERT_EQ(parts[0].logger_node, "pi1");
1235 ASSERT_EQ(parts[1].logger_node, "pi2");
1236
Austin Schuh79b30942021-01-24 22:32:21 -08001237 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001238 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001239 mapper0.set_timestamp_callback(
1240 [&](TimestampedMessage *) { ++mapper0_count; });
1241 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001242 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001243 mapper1.set_timestamp_callback(
1244 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001245
1246 mapper0.AddPeer(&mapper1);
1247 mapper1.AddPeer(&mapper0);
1248
1249 {
1250 SCOPED_TRACE("Trying node1 now");
1251 std::deque<TimestampedMessage> output1;
1252
1253 ASSERT_TRUE(mapper1.Front() != nullptr);
1254 output1.emplace_back(std::move(*mapper1.Front()));
1255 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001256 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001257
1258 ASSERT_TRUE(mapper1.Front() != nullptr);
1259 output1.emplace_back(std::move(*mapper1.Front()));
1260 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001261 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001262
1263 ASSERT_TRUE(mapper1.Front() != nullptr);
1264 output1.emplace_back(std::move(*mapper1.Front()));
1265 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001266 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001267
1268 ASSERT_TRUE(mapper1.Front() == nullptr);
1269
1270 EXPECT_EQ(output1[0].monotonic_event_time,
1271 e + chrono::seconds(100) + chrono::milliseconds(1000));
1272 EXPECT_TRUE(output1[0].data.Verify());
1273 EXPECT_EQ(output1[1].monotonic_event_time,
1274 e + chrono::seconds(100) + chrono::milliseconds(2000));
1275 EXPECT_TRUE(output1[1].data.Verify());
1276 EXPECT_EQ(output1[2].monotonic_event_time,
1277 e + chrono::seconds(100) + chrono::milliseconds(3000));
1278 EXPECT_FALSE(output1[2].data.Verify());
1279 }
Austin Schuh79b30942021-01-24 22:32:21 -08001280
1281 EXPECT_EQ(mapper0_count, 0u);
1282 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001283}
1284
Austin Schuh993ccb52020-12-12 15:59:32 -08001285// Tests that we handle a message which failed to forward or be logged.
1286TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1287 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1288 {
1289 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1290 writer0.QueueSpan(config0_.span());
1291 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1292 writer1.QueueSpan(config2_.span());
1293
1294 writer0.QueueSizedFlatbuffer(
1295 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1296 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1297 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1298
1299 // Create both the timestamp and message, but don't log them, simulating a
1300 // forwarding drop.
1301 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1302 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1303 chrono::seconds(100));
1304
1305 writer0.QueueSizedFlatbuffer(
1306 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1307 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1308 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1309 }
1310
1311 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1312
1313 ASSERT_EQ(parts[0].logger_node, "pi1");
1314 ASSERT_EQ(parts[1].logger_node, "pi2");
1315
Austin Schuh79b30942021-01-24 22:32:21 -08001316 size_t mapper0_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001317 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001318 mapper0.set_timestamp_callback(
1319 [&](TimestampedMessage *) { ++mapper0_count; });
1320 size_t mapper1_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001321 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001322 mapper1.set_timestamp_callback(
1323 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001324
1325 mapper0.AddPeer(&mapper1);
1326 mapper1.AddPeer(&mapper0);
1327
1328 {
1329 std::deque<TimestampedMessage> output1;
1330
1331 ASSERT_TRUE(mapper1.Front() != nullptr);
1332 output1.emplace_back(std::move(*mapper1.Front()));
1333 mapper1.PopFront();
1334
1335 ASSERT_TRUE(mapper1.Front() != nullptr);
1336 output1.emplace_back(std::move(*mapper1.Front()));
1337
1338 ASSERT_FALSE(mapper1.Front() == nullptr);
1339
1340 EXPECT_EQ(output1[0].monotonic_event_time,
1341 e + chrono::seconds(100) + chrono::milliseconds(1000));
1342 EXPECT_TRUE(output1[0].data.Verify());
1343 EXPECT_EQ(output1[1].monotonic_event_time,
1344 e + chrono::seconds(100) + chrono::milliseconds(3000));
1345 EXPECT_TRUE(output1[1].data.Verify());
1346 }
Austin Schuh79b30942021-01-24 22:32:21 -08001347
1348 EXPECT_EQ(mapper0_count, 0u);
1349 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001350}
1351
Austin Schuhd2f96102020-12-01 20:27:29 -08001352// Tests that we properly sort log files with duplicate timestamps.
1353TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1354 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1355 {
1356 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1357 writer0.QueueSpan(config0_.span());
1358 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1359 writer1.QueueSpan(config2_.span());
1360
1361 writer0.QueueSizedFlatbuffer(
1362 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1363 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1364 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1365
1366 writer0.QueueSizedFlatbuffer(
1367 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1368 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1369 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1370
1371 writer0.QueueSizedFlatbuffer(
1372 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
1373 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1374 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1375
1376 writer0.QueueSizedFlatbuffer(
1377 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
1378 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1379 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1380 }
1381
1382 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1383
1384 ASSERT_EQ(parts[0].logger_node, "pi1");
1385 ASSERT_EQ(parts[1].logger_node, "pi2");
1386
Austin Schuh79b30942021-01-24 22:32:21 -08001387 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001388 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001389 mapper0.set_timestamp_callback(
1390 [&](TimestampedMessage *) { ++mapper0_count; });
1391 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001392 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001393 mapper1.set_timestamp_callback(
1394 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001395
1396 mapper0.AddPeer(&mapper1);
1397 mapper1.AddPeer(&mapper0);
1398
1399 {
1400 SCOPED_TRACE("Trying node1 now");
1401 std::deque<TimestampedMessage> output1;
1402
1403 for (int i = 0; i < 4; ++i) {
1404 ASSERT_TRUE(mapper1.Front() != nullptr);
1405 output1.emplace_back(std::move(*mapper1.Front()));
1406 mapper1.PopFront();
1407 }
1408 ASSERT_TRUE(mapper1.Front() == nullptr);
1409
1410 EXPECT_EQ(output1[0].monotonic_event_time,
1411 e + chrono::seconds(100) + chrono::milliseconds(1000));
1412 EXPECT_TRUE(output1[0].data.Verify());
1413 EXPECT_EQ(output1[1].monotonic_event_time,
1414 e + chrono::seconds(100) + chrono::milliseconds(2000));
1415 EXPECT_TRUE(output1[1].data.Verify());
1416 EXPECT_EQ(output1[2].monotonic_event_time,
1417 e + chrono::seconds(100) + chrono::milliseconds(2000));
1418 EXPECT_TRUE(output1[2].data.Verify());
1419 EXPECT_EQ(output1[3].monotonic_event_time,
1420 e + chrono::seconds(100) + chrono::milliseconds(3000));
1421 EXPECT_TRUE(output1[3].data.Verify());
1422 }
Austin Schuh79b30942021-01-24 22:32:21 -08001423
1424 EXPECT_EQ(mapper0_count, 0u);
1425 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001426}
1427
1428// Tests that we properly sort log files with duplicate timestamps.
1429TEST_F(TimestampMapperTest, StartTime) {
1430 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1431 {
1432 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1433 writer0.QueueSpan(config0_.span());
1434 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1435 writer1.QueueSpan(config1_.span());
1436 DetachedBufferWriter writer2(logfile2_, std::make_unique<DummyEncoder>());
1437 writer2.QueueSpan(config3_.span());
1438 }
1439
1440 const std::vector<LogFile> parts =
1441 SortParts({logfile0_, logfile1_, logfile2_});
1442
Austin Schuh79b30942021-01-24 22:32:21 -08001443 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001444 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001445 mapper0.set_timestamp_callback(
1446 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001447
1448 EXPECT_EQ(mapper0.monotonic_start_time(), e + chrono::milliseconds(1));
1449 EXPECT_EQ(mapper0.realtime_start_time(),
1450 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001451 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001452}
1453
Austin Schuhfecf1d82020-12-19 16:57:28 -08001454// Tests that when a peer isn't registered, we treat that as if there was no
1455// data available.
1456TEST_F(TimestampMapperTest, NoPeer) {
1457 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1458 {
1459 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1460 writer0.QueueSpan(config0_.span());
1461 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1462 writer1.QueueSpan(config2_.span());
1463
1464 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
1465 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1466 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1467
1468 writer0.QueueSizedFlatbuffer(
1469 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1470 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1471 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1472
1473 writer0.QueueSizedFlatbuffer(
1474 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1475 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1476 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1477 }
1478
1479 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1480
1481 ASSERT_EQ(parts[0].logger_node, "pi1");
1482 ASSERT_EQ(parts[1].logger_node, "pi2");
1483
Austin Schuh79b30942021-01-24 22:32:21 -08001484 size_t mapper1_count = 0;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001485 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001486 mapper1.set_timestamp_callback(
1487 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001488
1489 {
1490 std::deque<TimestampedMessage> output1;
1491
1492 ASSERT_TRUE(mapper1.Front() != nullptr);
1493 output1.emplace_back(std::move(*mapper1.Front()));
1494 mapper1.PopFront();
1495 ASSERT_TRUE(mapper1.Front() != nullptr);
1496 output1.emplace_back(std::move(*mapper1.Front()));
1497 mapper1.PopFront();
1498 ASSERT_TRUE(mapper1.Front() != nullptr);
1499 output1.emplace_back(std::move(*mapper1.Front()));
1500 mapper1.PopFront();
1501 ASSERT_TRUE(mapper1.Front() == nullptr);
1502
1503 EXPECT_EQ(output1[0].monotonic_event_time,
1504 e + chrono::seconds(100) + chrono::milliseconds(1000));
1505 EXPECT_FALSE(output1[0].data.Verify());
1506 EXPECT_EQ(output1[1].monotonic_event_time,
1507 e + chrono::seconds(100) + chrono::milliseconds(2000));
1508 EXPECT_FALSE(output1[1].data.Verify());
1509 EXPECT_EQ(output1[2].monotonic_event_time,
1510 e + chrono::seconds(100) + chrono::milliseconds(3000));
1511 EXPECT_FALSE(output1[2].data.Verify());
1512 }
Austin Schuh79b30942021-01-24 22:32:21 -08001513 EXPECT_EQ(mapper1_count, 3u);
1514}
1515
1516// Tests that we can queue messages and call the timestamp callback for both
1517// nodes.
1518TEST_F(TimestampMapperTest, QueueUntilNode0) {
1519 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1520 {
1521 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1522 writer0.QueueSpan(config0_.span());
1523 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1524 writer1.QueueSpan(config2_.span());
1525
1526 writer0.QueueSizedFlatbuffer(
1527 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1528 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1529 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1530
1531 writer0.QueueSizedFlatbuffer(
1532 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
1533 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1534 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1535
1536 writer0.QueueSizedFlatbuffer(
1537 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
1538 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1539 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1540
1541 writer0.QueueSizedFlatbuffer(
1542 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
1543 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1544 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1545 }
1546
1547 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1548
1549 ASSERT_EQ(parts[0].logger_node, "pi1");
1550 ASSERT_EQ(parts[1].logger_node, "pi2");
1551
1552 size_t mapper0_count = 0;
1553 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1554 mapper0.set_timestamp_callback(
1555 [&](TimestampedMessage *) { ++mapper0_count; });
1556 size_t mapper1_count = 0;
1557 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1558 mapper1.set_timestamp_callback(
1559 [&](TimestampedMessage *) { ++mapper1_count; });
1560
1561 mapper0.AddPeer(&mapper1);
1562 mapper1.AddPeer(&mapper0);
1563
1564 {
1565 std::deque<TimestampedMessage> output0;
1566
1567 EXPECT_EQ(mapper0_count, 0u);
1568 EXPECT_EQ(mapper1_count, 0u);
1569 mapper0.QueueUntil(e + chrono::milliseconds(1000));
1570 EXPECT_EQ(mapper0_count, 3u);
1571 EXPECT_EQ(mapper1_count, 0u);
1572
1573 ASSERT_TRUE(mapper0.Front() != nullptr);
1574 EXPECT_EQ(mapper0_count, 3u);
1575 EXPECT_EQ(mapper1_count, 0u);
1576
1577 mapper0.QueueUntil(e + chrono::milliseconds(1500));
1578 EXPECT_EQ(mapper0_count, 3u);
1579 EXPECT_EQ(mapper1_count, 0u);
1580
1581 mapper0.QueueUntil(e + chrono::milliseconds(2500));
1582 EXPECT_EQ(mapper0_count, 4u);
1583 EXPECT_EQ(mapper1_count, 0u);
1584
1585 output0.emplace_back(std::move(*mapper0.Front()));
1586 mapper0.PopFront();
1587 output0.emplace_back(std::move(*mapper0.Front()));
1588 mapper0.PopFront();
1589 output0.emplace_back(std::move(*mapper0.Front()));
1590 mapper0.PopFront();
1591 output0.emplace_back(std::move(*mapper0.Front()));
1592 mapper0.PopFront();
1593
1594 EXPECT_EQ(mapper0_count, 4u);
1595 EXPECT_EQ(mapper1_count, 0u);
1596
1597 ASSERT_TRUE(mapper0.Front() == nullptr);
1598
1599 EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
1600 EXPECT_TRUE(output0[0].data.Verify());
1601 EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(1000));
1602 EXPECT_TRUE(output0[1].data.Verify());
1603 EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(2000));
1604 EXPECT_TRUE(output0[2].data.Verify());
1605 EXPECT_EQ(output0[3].monotonic_event_time, e + chrono::milliseconds(3000));
1606 EXPECT_TRUE(output0[3].data.Verify());
1607 }
1608
1609 {
1610 SCOPED_TRACE("Trying node1 now");
1611 std::deque<TimestampedMessage> output1;
1612
1613 EXPECT_EQ(mapper0_count, 4u);
1614 EXPECT_EQ(mapper1_count, 0u);
1615 mapper1.QueueUntil(e + chrono::seconds(100) + chrono::milliseconds(1000));
1616 EXPECT_EQ(mapper0_count, 4u);
1617 EXPECT_EQ(mapper1_count, 3u);
1618
1619 ASSERT_TRUE(mapper1.Front() != nullptr);
1620 EXPECT_EQ(mapper0_count, 4u);
1621 EXPECT_EQ(mapper1_count, 3u);
1622
1623 mapper1.QueueUntil(e + chrono::seconds(100) + chrono::milliseconds(1500));
1624 EXPECT_EQ(mapper0_count, 4u);
1625 EXPECT_EQ(mapper1_count, 3u);
1626
1627 mapper1.QueueUntil(e + chrono::seconds(100) + chrono::milliseconds(2500));
1628 EXPECT_EQ(mapper0_count, 4u);
1629 EXPECT_EQ(mapper1_count, 4u);
1630
1631 ASSERT_TRUE(mapper1.Front() != nullptr);
1632 EXPECT_EQ(mapper0_count, 4u);
1633 EXPECT_EQ(mapper1_count, 4u);
1634
1635 output1.emplace_back(std::move(*mapper1.Front()));
1636 mapper1.PopFront();
1637 ASSERT_TRUE(mapper1.Front() != nullptr);
1638 output1.emplace_back(std::move(*mapper1.Front()));
1639 mapper1.PopFront();
1640 ASSERT_TRUE(mapper1.Front() != nullptr);
1641 output1.emplace_back(std::move(*mapper1.Front()));
1642 mapper1.PopFront();
1643 ASSERT_TRUE(mapper1.Front() != nullptr);
1644 output1.emplace_back(std::move(*mapper1.Front()));
1645 mapper1.PopFront();
1646
1647 EXPECT_EQ(mapper0_count, 4u);
1648 EXPECT_EQ(mapper1_count, 4u);
1649
1650 ASSERT_TRUE(mapper1.Front() == nullptr);
1651
1652 EXPECT_EQ(mapper0_count, 4u);
1653 EXPECT_EQ(mapper1_count, 4u);
1654
1655 EXPECT_EQ(output1[0].monotonic_event_time,
1656 e + chrono::seconds(100) + chrono::milliseconds(1000));
1657 EXPECT_TRUE(output1[0].data.Verify());
1658 EXPECT_EQ(output1[1].monotonic_event_time,
1659 e + chrono::seconds(100) + chrono::milliseconds(1000));
1660 EXPECT_TRUE(output1[1].data.Verify());
1661 EXPECT_EQ(output1[2].monotonic_event_time,
1662 e + chrono::seconds(100) + chrono::milliseconds(2000));
1663 EXPECT_TRUE(output1[2].data.Verify());
1664 EXPECT_EQ(output1[3].monotonic_event_time,
1665 e + chrono::seconds(100) + chrono::milliseconds(3000));
1666 EXPECT_TRUE(output1[3].data.Verify());
1667 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08001668}
1669
Austin Schuh8bee6882021-06-28 21:03:28 -07001670// This tests that we can properly sort a multi-node log file which has the old
1671// (and buggy) timestamps in the header, and the non-resetting parts_index.
1672// These make it so we can just bairly figure out what happened first and what
1673// happened second, but not in a way that is robust to multiple nodes rebooting.
1674TEST_F(SortingElementTest, OldReboot) {
1675 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0 =
1676 MakeHeader(config_, R"({
1677 /* 100ms */
1678 "max_out_of_order_duration": 100000000,
1679 "node": {
1680 "name": "pi2"
1681 },
1682 "logger_node": {
1683 "name": "pi1"
1684 },
1685 "monotonic_start_time": 1000000,
1686 "realtime_start_time": 1000000000000,
1687 "logger_monotonic_start_time": 1000000,
1688 "logger_realtime_start_time": 1000000000000,
1689 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
1690 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
1691 "parts_index": 0,
1692 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
1693 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
1694 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464"
1695})");
1696 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1 =
1697 MakeHeader(config_, R"({
1698 /* 100ms */
1699 "max_out_of_order_duration": 100000000,
1700 "node": {
1701 "name": "pi2"
1702 },
1703 "logger_node": {
1704 "name": "pi1"
1705 },
1706 "monotonic_start_time": 1000000,
1707 "realtime_start_time": 1000000000000,
1708 "logger_monotonic_start_time": 1000000,
1709 "logger_realtime_start_time": 1000000000000,
1710 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
1711 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
1712 "parts_index": 1,
1713 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
1714 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
1715 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2"
1716})");
1717
1718 {
1719 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
1720 writer.QueueSpan(boot0.span());
1721 }
1722 {
1723 DetachedBufferWriter writer(logfile1_, std::make_unique<DummyEncoder>());
1724 writer.QueueSpan(boot1.span());
1725 }
1726
1727 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1728
1729 ASSERT_EQ(parts.size(), 1u);
1730 ASSERT_EQ(parts[0].parts.size(), 2u);
1731
1732 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
1733 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
1734 boot0.message().source_node_boot_uuid()->string_view());
1735
1736 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
1737 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
1738 boot1.message().source_node_boot_uuid()->string_view());
1739}
1740
Austin Schuhc243b422020-10-11 15:35:08 -07001741} // namespace testing
1742} // namespace logger
1743} // namespace aos