blob: fc7635ac7e553086cd038ace89b3d38951b08c90 [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
4#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07005
Austin Schuhc41603c2020-10-11 16:17:37 -07006#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -07007#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -08008#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -07009#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070010#include "aos/json_to_flatbuffer.h"
11#include "aos/testing/tmpdir.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070012#include "gtest/gtest.h"
Austin Schuhc243b422020-10-11 15:35:08 -070013
14namespace aos {
15namespace logger {
16namespace testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070017namespace chrono = std::chrono;
Austin Schuhc243b422020-10-11 15:35:08 -070018
Austin Schuhe243aaf2020-10-11 15:46:02 -070019// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070020template <typename T>
21SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
22 const std::string_view data) {
23 flatbuffers::FlatBufferBuilder fbb;
24 fbb.ForceDefaults(true);
25 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
26 return fbb.Release();
27}
28
Austin Schuhe243aaf2020-10-11 15:46:02 -070029// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070030TEST(SpanReaderTest, ReadWrite) {
31 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
32 unlink(logfile.c_str());
33
34 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080035 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070036 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080037 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070038
39 {
40 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080041 writer.QueueSpan(m1.span());
42 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070043 }
44
45 SpanReader reader(logfile);
46
47 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080048 EXPECT_EQ(reader.ReadMessage(), m1.span());
49 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070050 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
51}
52
Austin Schuhe243aaf2020-10-11 15:46:02 -070053// Tests that we can actually parse the resulting messages at a basic level
54// through MessageReader.
55TEST(MessageReaderTest, ReadWrite) {
56 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
57 unlink(logfile.c_str());
58
59 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
60 JsonToSizedFlatbuffer<LogFileHeader>(
61 R"({ "max_out_of_order_duration": 100000000 })");
62 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
63 JsonToSizedFlatbuffer<MessageHeader>(
64 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
65 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
66 JsonToSizedFlatbuffer<MessageHeader>(
67 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
68
69 {
70 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080071 writer.QueueSpan(config.span());
72 writer.QueueSpan(m1.span());
73 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -070074 }
75
76 MessageReader reader(logfile);
77
78 EXPECT_EQ(reader.filename(), logfile);
79
80 EXPECT_EQ(
81 reader.max_out_of_order_duration(),
82 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
83 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
84 EXPECT_TRUE(reader.ReadMessage());
85 EXPECT_EQ(reader.newest_timestamp(),
86 monotonic_clock::time_point(chrono::nanoseconds(1)));
87 EXPECT_TRUE(reader.ReadMessage());
88 EXPECT_EQ(reader.newest_timestamp(),
89 monotonic_clock::time_point(chrono::nanoseconds(2)));
90 EXPECT_FALSE(reader.ReadMessage());
91}
92
Austin Schuh32f68492020-11-08 21:45:51 -080093// Tests that we explode when messages are too far out of order.
94TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
95 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
96 unlink(logfile0.c_str());
97
98 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
99 JsonToSizedFlatbuffer<LogFileHeader>(
100 R"({
101 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800102 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800103 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
104 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
105 "parts_index": 0
106})");
107
108 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
109 JsonToSizedFlatbuffer<MessageHeader>(
110 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
111 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
112 JsonToSizedFlatbuffer<MessageHeader>(
113 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
114 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
115 JsonToSizedFlatbuffer<MessageHeader>(
116 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
117
118 {
119 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800120 writer.QueueSpan(config0.span());
121 writer.QueueSpan(m1.span());
122 writer.QueueSpan(m2.span());
123 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800124 }
125
126 const std::vector<LogFile> parts = SortParts({logfile0});
127
128 PartsMessageReader reader(parts[0].parts[0]);
129
130 EXPECT_TRUE(reader.ReadMessage());
131 EXPECT_TRUE(reader.ReadMessage());
132 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
133}
134
Austin Schuhc41603c2020-10-11 16:17:37 -0700135// Tests that we can transparently re-assemble part files with a
136// PartsMessageReader.
137TEST(PartsMessageReaderTest, ReadWrite) {
138 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
139 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
140 unlink(logfile0.c_str());
141 unlink(logfile1.c_str());
142
143 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
144 JsonToSizedFlatbuffer<LogFileHeader>(
145 R"({
146 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800147 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700148 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
149 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
150 "parts_index": 0
151})");
152 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
153 JsonToSizedFlatbuffer<LogFileHeader>(
154 R"({
155 "max_out_of_order_duration": 200000000,
156 "monotonic_start_time": 0,
157 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800158 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700159 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
160 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
161 "parts_index": 1
162})");
163
164 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
165 JsonToSizedFlatbuffer<MessageHeader>(
166 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
167 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
168 JsonToSizedFlatbuffer<MessageHeader>(
169 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
170
171 {
172 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800173 writer.QueueSpan(config0.span());
174 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700175 }
176 {
177 DetachedBufferWriter writer(logfile1, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800178 writer.QueueSpan(config1.span());
179 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700180 }
181
182 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
183
184 PartsMessageReader reader(parts[0].parts[0]);
185
186 EXPECT_EQ(reader.filename(), logfile0);
187
188 // Confirm that the timestamps track, and the filename also updates.
189 // Read the first message.
190 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
191 EXPECT_EQ(
192 reader.max_out_of_order_duration(),
193 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
194 EXPECT_TRUE(reader.ReadMessage());
195 EXPECT_EQ(reader.filename(), logfile0);
196 EXPECT_EQ(reader.newest_timestamp(),
197 monotonic_clock::time_point(chrono::nanoseconds(1)));
198 EXPECT_EQ(
199 reader.max_out_of_order_duration(),
200 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
201
202 // Read the second message.
203 EXPECT_TRUE(reader.ReadMessage());
204 EXPECT_EQ(reader.filename(), logfile1);
205 EXPECT_EQ(reader.newest_timestamp(),
206 monotonic_clock::time_point(chrono::nanoseconds(2)));
207 EXPECT_EQ(
208 reader.max_out_of_order_duration(),
209 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
210
211 // And then confirm that reading again returns no message.
212 EXPECT_FALSE(reader.ReadMessage());
213 EXPECT_EQ(reader.filename(), logfile1);
214 EXPECT_EQ(
215 reader.max_out_of_order_duration(),
216 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800217 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Austin Schuhc41603c2020-10-11 16:17:37 -0700218}
Austin Schuh32f68492020-11-08 21:45:51 -0800219
Austin Schuh1be0ce42020-11-29 22:43:26 -0800220// Tests that Message's operator < works as expected.
221TEST(MessageTest, Sorting) {
222 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
223
224 Message m1{.channel_index = 0,
225 .queue_index = 0,
226 .timestamp = e + chrono::milliseconds(1),
227 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
228 Message m2{.channel_index = 0,
229 .queue_index = 0,
230 .timestamp = e + chrono::milliseconds(2),
231 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
232
233 EXPECT_LT(m1, m2);
234 EXPECT_GE(m2, m1);
235
236 m1.timestamp = e;
237 m2.timestamp = e;
238
239 m1.channel_index = 1;
240 m2.channel_index = 2;
241
242 EXPECT_LT(m1, m2);
243 EXPECT_GE(m2, m1);
244
245 m1.channel_index = 0;
246 m2.channel_index = 0;
247 m1.queue_index = 0;
248 m2.queue_index = 1;
249
250 EXPECT_LT(m1, m2);
251 EXPECT_GE(m2, m1);
252}
253
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800254aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
255 const aos::FlatbufferDetachedBuffer<Configuration> &config,
256 const std::string_view json) {
257 flatbuffers::FlatBufferBuilder fbb;
258 flatbuffers::Offset<Configuration> config_offset =
259 aos::CopyFlatBuffer(config, &fbb);
260 LogFileHeader::Builder header_builder(fbb);
261 header_builder.add_configuration(config_offset);
262 fbb.Finish(header_builder.Finish());
263 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
264
265 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
266 JsonToFlatbuffer<LogFileHeader>(json));
267 CHECK(header_updates.Verify());
268 flatbuffers::FlatBufferBuilder fbb2;
269 fbb2.FinishSizePrefixed(
270 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
271 return fbb2.Release();
272}
273
274class SortingElementTest : public ::testing::Test {
275 public:
276 SortingElementTest()
277 : config_(JsonToFlatbuffer<Configuration>(
278 R"({
279 "channels": [
280 {
281 "name": "/a",
282 "type": "aos.logger.testing.TestMessage",
283 "source_node": "pi1",
284 "destination_nodes": [
285 {
286 "name": "pi2"
287 },
288 {
289 "name": "pi3"
290 }
291 ]
292 },
293 {
294 "name": "/b",
295 "type": "aos.logger.testing.TestMessage",
296 "source_node": "pi1"
297 },
298 {
299 "name": "/c",
300 "type": "aos.logger.testing.TestMessage",
301 "source_node": "pi1"
302 }
303 ],
304 "nodes": [
305 {
306 "name": "pi1"
307 },
308 {
309 "name": "pi2"
310 },
311 {
312 "name": "pi3"
313 }
314 ]
315}
316)")),
317 config0_(MakeHeader(config_, R"({
318 /* 100ms */
319 "max_out_of_order_duration": 100000000,
320 "node": {
321 "name": "pi1"
322 },
323 "logger_node": {
324 "name": "pi1"
325 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800326 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800327 "realtime_start_time": 1000000000000,
328 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
329 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
330 "parts_index": 0
331})")),
332 config1_(MakeHeader(config_,
333 R"({
334 /* 100ms */
335 "max_out_of_order_duration": 100000000,
336 "node": {
337 "name": "pi1"
338 },
339 "logger_node": {
340 "name": "pi1"
341 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800342 "monotonic_start_time": 1000000,
343 "realtime_start_time": 1000000000000,
344 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
345 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
346 "parts_index": 0
347})")),
348 config2_(MakeHeader(config_,
349 R"({
350 /* 100ms */
351 "max_out_of_order_duration": 100000000,
352 "node": {
353 "name": "pi2"
354 },
355 "logger_node": {
356 "name": "pi2"
357 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800358 "monotonic_start_time": 0,
359 "realtime_start_time": 1000000000000,
Austin Schuhd2f96102020-12-01 20:27:29 -0800360 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
361 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
362 "parts_index": 0
363})")),
364 config3_(MakeHeader(config_,
365 R"({
366 /* 100ms */
367 "max_out_of_order_duration": 100000000,
368 "node": {
369 "name": "pi1"
370 },
371 "logger_node": {
372 "name": "pi1"
373 },
374 "monotonic_start_time": 2000000,
375 "realtime_start_time": 1000000000,
376 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
377 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800378 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800379})")),
380 config4_(MakeHeader(config_,
381 R"({
382 /* 100ms */
383 "max_out_of_order_duration": 100000000,
384 "node": {
385 "name": "pi2"
386 },
387 "logger_node": {
388 "name": "pi1"
389 },
390 "monotonic_start_time": 2000000,
391 "realtime_start_time": 1000000000,
392 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
393 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
394 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800395})")) {
396 unlink(logfile0_.c_str());
397 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800398 unlink(logfile2_.c_str());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800399 queue_index_.resize(kChannels);
400 }
401
402 protected:
403 static constexpr size_t kChannels = 3u;
404
405 flatbuffers::DetachedBuffer MakeLogMessage(
406 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
407 int value) {
408 flatbuffers::FlatBufferBuilder message_fbb;
409 message_fbb.ForceDefaults(true);
410 TestMessage::Builder test_message_builder(message_fbb);
411 test_message_builder.add_value(value);
412 message_fbb.Finish(test_message_builder.Finish());
413
414 aos::Context context;
415 context.monotonic_event_time = monotonic_now;
416 context.realtime_event_time = aos::realtime_clock::epoch() +
417 chrono::seconds(1000) +
418 monotonic_now.time_since_epoch();
419 context.queue_index = queue_index_[channel_index];
420 context.size = message_fbb.GetSize();
421 context.data = message_fbb.GetBufferPointer();
422
423 ++queue_index_[channel_index];
424
425 flatbuffers::FlatBufferBuilder fbb;
426 fbb.FinishSizePrefixed(
427 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
428
429 return fbb.Release();
430 }
431
432 flatbuffers::DetachedBuffer MakeTimestampMessage(
433 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800434 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
435 monotonic_clock::time_point monotonic_timestamp_time =
436 monotonic_clock::min_time) {
437 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800438 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800439
440 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800441 fbb.ForceDefaults(true);
442
443 logger::MessageHeader::Builder message_header_builder(fbb);
444
445 message_header_builder.add_channel_index(channel_index);
446
447 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
448 100);
449 message_header_builder.add_monotonic_sent_time(
450 monotonic_sent_time.time_since_epoch().count());
451 message_header_builder.add_realtime_sent_time(
452 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
453 monotonic_sent_time.time_since_epoch())
454 .time_since_epoch()
455 .count());
456
457 message_header_builder.add_monotonic_remote_time(
458 sender_monotonic_now.time_since_epoch().count());
459 message_header_builder.add_realtime_remote_time(
460 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
461 sender_monotonic_now.time_since_epoch())
462 .time_since_epoch()
463 .count());
464 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
465 1);
466
467 if (monotonic_timestamp_time != monotonic_clock::min_time) {
468 message_header_builder.add_monotonic_timestamp_time(
469 monotonic_timestamp_time.time_since_epoch().count());
470 }
471
472 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800473 LOG(INFO) << aos::FlatbufferToJson(
474 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
475 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
476
477 return fbb.Release();
478 }
479
480 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
481 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800482 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800483
484 const aos::FlatbufferDetachedBuffer<Configuration> config_;
485 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
486 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800487 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
488 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800489 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800490
491 std::vector<uint32_t> queue_index_;
492};
493
494using LogPartsSorterTest = SortingElementTest;
495using LogPartsSorterDeathTest = LogPartsSorterTest;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800496using NodeMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800497using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800498
499// Tests that we can pull messages out of a log sorted in order.
500TEST_F(LogPartsSorterTest, Pull) {
501 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
502 {
503 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
504 writer.QueueSpan(config0_.span());
505 writer.QueueSizedFlatbuffer(
506 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
507 writer.QueueSizedFlatbuffer(
508 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
509 writer.QueueSizedFlatbuffer(
510 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
511 writer.QueueSizedFlatbuffer(
512 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
513 }
514
515 const std::vector<LogFile> parts = SortParts({logfile0_});
516
517 LogPartsSorter parts_sorter(parts[0].parts[0]);
518
519 // Confirm we aren't sorted until any time until the message is popped.
520 // Peeking shouldn't change the sorted until time.
521 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
522
523 std::deque<Message> output;
524
525 ASSERT_TRUE(parts_sorter.Front() != nullptr);
526 output.emplace_back(std::move(*parts_sorter.Front()));
527 parts_sorter.PopFront();
528 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
529
530 ASSERT_TRUE(parts_sorter.Front() != nullptr);
531 output.emplace_back(std::move(*parts_sorter.Front()));
532 parts_sorter.PopFront();
533 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
534
535 ASSERT_TRUE(parts_sorter.Front() != nullptr);
536 output.emplace_back(std::move(*parts_sorter.Front()));
537 parts_sorter.PopFront();
538 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
539
540 ASSERT_TRUE(parts_sorter.Front() != nullptr);
541 output.emplace_back(std::move(*parts_sorter.Front()));
542 parts_sorter.PopFront();
543 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
544
545 ASSERT_TRUE(parts_sorter.Front() == nullptr);
546
547 EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
548 EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(1000));
549 EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(1901));
550 EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(2000));
551}
552
Austin Schuhb000de62020-12-03 22:00:40 -0800553// Tests that we can pull messages out of a log sorted in order.
554TEST_F(LogPartsSorterTest, WayBeforeStart) {
555 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
556 {
557 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
558 writer.QueueSpan(config0_.span());
559 writer.QueueSizedFlatbuffer(
560 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
561 writer.QueueSizedFlatbuffer(
562 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
563 writer.QueueSizedFlatbuffer(
564 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
565 writer.QueueSizedFlatbuffer(
566 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
567 writer.QueueSizedFlatbuffer(
568 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
569 }
570
571 const std::vector<LogFile> parts = SortParts({logfile0_});
572
573 LogPartsSorter parts_sorter(parts[0].parts[0]);
574
575 // Confirm we aren't sorted until any time until the message is popped.
576 // Peeking shouldn't change the sorted until time.
577 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
578
579 std::deque<Message> output;
580
581 for (monotonic_clock::time_point t :
582 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
583 e + chrono::milliseconds(1900), monotonic_clock::max_time,
584 monotonic_clock::max_time}) {
585 ASSERT_TRUE(parts_sorter.Front() != nullptr);
586 output.emplace_back(std::move(*parts_sorter.Front()));
587 parts_sorter.PopFront();
588 EXPECT_EQ(parts_sorter.sorted_until(), t);
589 }
590
591 ASSERT_TRUE(parts_sorter.Front() == nullptr);
592
593 EXPECT_EQ(output[0].timestamp, e - chrono::milliseconds(1000));
594 EXPECT_EQ(output[1].timestamp, e - chrono::milliseconds(500));
595 EXPECT_EQ(output[2].timestamp, e - chrono::milliseconds(10));
596 EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(1901));
597 EXPECT_EQ(output[4].timestamp, e + chrono::milliseconds(2000));
598}
599
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800600// Tests that messages too far out of order trigger death.
601TEST_F(LogPartsSorterDeathTest, Pull) {
602 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
603 {
604 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
605 writer.QueueSpan(config0_.span());
606 writer.QueueSizedFlatbuffer(
607 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
608 writer.QueueSizedFlatbuffer(
609 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
610 writer.QueueSizedFlatbuffer(
611 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
612 // The following message is too far out of order and will trigger the CHECK.
613 writer.QueueSizedFlatbuffer(
614 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
615 }
616
617 const std::vector<LogFile> parts = SortParts({logfile0_});
618
619 LogPartsSorter parts_sorter(parts[0].parts[0]);
620
621 // Confirm we aren't sorted until any time until the message is popped.
622 // Peeking shouldn't change the sorted until time.
623 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
624 std::deque<Message> output;
625
626 ASSERT_TRUE(parts_sorter.Front() != nullptr);
627 parts_sorter.PopFront();
628 ASSERT_TRUE(parts_sorter.Front() != nullptr);
629 ASSERT_TRUE(parts_sorter.Front() != nullptr);
630 parts_sorter.PopFront();
631
632 EXPECT_DEATH({ parts_sorter.Front(); }, "Max out of order exceeded.");
633}
634
Austin Schuh8f52ed52020-11-30 23:12:39 -0800635// Tests that we can merge data from 2 separate files, including duplicate data.
636TEST_F(NodeMergerTest, TwoFileMerger) {
637 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
638 {
639 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
640 writer0.QueueSpan(config0_.span());
641 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
642 writer1.QueueSpan(config1_.span());
643
644 writer0.QueueSizedFlatbuffer(
645 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
646 writer1.QueueSizedFlatbuffer(
647 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
648
649 writer0.QueueSizedFlatbuffer(
650 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
651 writer1.QueueSizedFlatbuffer(
652 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
653
654 // Make a duplicate!
655 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
656 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
657 writer0.QueueSpan(msg.span());
658 writer1.QueueSpan(msg.span());
659
660 writer1.QueueSizedFlatbuffer(
661 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
662 }
663
664 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Austin Schuhd2f96102020-12-01 20:27:29 -0800665 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800666
Austin Schuhd2f96102020-12-01 20:27:29 -0800667 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800668
669 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
670
671 std::deque<Message> output;
672
673 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
674 ASSERT_TRUE(merger.Front() != nullptr);
675 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
676
677 output.emplace_back(std::move(*merger.Front()));
678 merger.PopFront();
679 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
680
681 ASSERT_TRUE(merger.Front() != nullptr);
682 output.emplace_back(std::move(*merger.Front()));
683 merger.PopFront();
684 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
685
686 ASSERT_TRUE(merger.Front() != nullptr);
687 output.emplace_back(std::move(*merger.Front()));
688 merger.PopFront();
689 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
690
691 ASSERT_TRUE(merger.Front() != nullptr);
692 output.emplace_back(std::move(*merger.Front()));
693 merger.PopFront();
694 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
695
696 ASSERT_TRUE(merger.Front() != nullptr);
697 output.emplace_back(std::move(*merger.Front()));
698 merger.PopFront();
699 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
700
701 ASSERT_TRUE(merger.Front() != nullptr);
702 output.emplace_back(std::move(*merger.Front()));
703 merger.PopFront();
704 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
705
706 ASSERT_TRUE(merger.Front() == nullptr);
707
708 EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
709 EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(1001));
710 EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(1002));
711 EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(2000));
712 EXPECT_EQ(output[4].timestamp, e + chrono::milliseconds(3000));
713 EXPECT_EQ(output[5].timestamp, e + chrono::milliseconds(3002));
714}
715
Austin Schuh8bf1e632021-01-02 22:41:04 -0800716// Tests that we can merge timestamps with various combinations of
717// monotonic_timestamp_time.
718TEST_F(NodeMergerTest, TwoFileTimestampMerger) {
719 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
720 {
721 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
722 writer0.QueueSpan(config0_.span());
723 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
724 writer1.QueueSpan(config1_.span());
725
726 // Neither has it.
727 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
728 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
729 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
730 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
731 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
732
733 // First only has it.
734 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
735 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
736 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
737 e + chrono::nanoseconds(971)));
738 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
739 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
740
741 // Second only has it.
742 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
743 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
744 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
745 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
746 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
747 e + chrono::nanoseconds(972)));
748
749 // Both have it.
750 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
751 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
752 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
753 e + chrono::nanoseconds(973)));
754 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
755 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
756 e + chrono::nanoseconds(973)));
757 }
758
759 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
760 ASSERT_EQ(parts.size(), 1u);
761
762 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
763
764 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
765
766 std::deque<Message> output;
767
768 for (int i = 0; i < 4; ++i) {
769 ASSERT_TRUE(merger.Front() != nullptr);
770 output.emplace_back(std::move(*merger.Front()));
771 merger.PopFront();
772 }
773 ASSERT_TRUE(merger.Front() == nullptr);
774
775 EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(101000));
776 EXPECT_FALSE(output[0].data.message().has_monotonic_timestamp_time());
777 EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(101001));
778 EXPECT_TRUE(output[1].data.message().has_monotonic_timestamp_time());
779 EXPECT_EQ(output[1].data.message().monotonic_timestamp_time(), 971);
780 EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(101002));
781 EXPECT_TRUE(output[2].data.message().has_monotonic_timestamp_time());
782 EXPECT_EQ(output[2].data.message().monotonic_timestamp_time(), 972);
783 EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(101003));
784 EXPECT_TRUE(output[3].data.message().has_monotonic_timestamp_time());
785 EXPECT_EQ(output[3].data.message().monotonic_timestamp_time(), 973);
786}
787
Austin Schuhd2f96102020-12-01 20:27:29 -0800788// Tests that we can match timestamps on delivered messages.
789TEST_F(TimestampMapperTest, ReadNode0First) {
790 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
791 {
792 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
793 writer0.QueueSpan(config0_.span());
794 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
795 writer1.QueueSpan(config2_.span());
796
797 writer0.QueueSizedFlatbuffer(
798 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
799 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
800 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
801
802 writer0.QueueSizedFlatbuffer(
803 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
804 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
805 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
806
807 writer0.QueueSizedFlatbuffer(
808 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
809 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
810 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
811 }
812
813 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
814
815 ASSERT_EQ(parts[0].logger_node, "pi1");
816 ASSERT_EQ(parts[1].logger_node, "pi2");
817
Austin Schuh79b30942021-01-24 22:32:21 -0800818 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800819 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -0800820 mapper0.set_timestamp_callback(
821 [&](TimestampedMessage *) { ++mapper0_count; });
822 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800823 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -0800824 mapper1.set_timestamp_callback(
825 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -0800826
827 mapper0.AddPeer(&mapper1);
828 mapper1.AddPeer(&mapper0);
829
830 {
831 std::deque<TimestampedMessage> output0;
832
Austin Schuh79b30942021-01-24 22:32:21 -0800833 EXPECT_EQ(mapper0_count, 0u);
834 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800835 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800836 EXPECT_EQ(mapper0_count, 1u);
837 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800838 output0.emplace_back(std::move(*mapper0.Front()));
839 mapper0.PopFront();
840 EXPECT_EQ(mapper0.sorted_until(), e + chrono::milliseconds(1900));
Austin Schuh79b30942021-01-24 22:32:21 -0800841 EXPECT_EQ(mapper0_count, 1u);
842 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800843
844 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800845 EXPECT_EQ(mapper0_count, 2u);
846 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800847 output0.emplace_back(std::move(*mapper0.Front()));
848 mapper0.PopFront();
849 EXPECT_EQ(mapper0.sorted_until(), e + chrono::milliseconds(2900));
850
851 ASSERT_TRUE(mapper0.Front() != nullptr);
852 output0.emplace_back(std::move(*mapper0.Front()));
853 mapper0.PopFront();
854 EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
855
Austin Schuh79b30942021-01-24 22:32:21 -0800856 EXPECT_EQ(mapper0_count, 3u);
857 EXPECT_EQ(mapper1_count, 0u);
858
Austin Schuhd2f96102020-12-01 20:27:29 -0800859 ASSERT_TRUE(mapper0.Front() == nullptr);
860
861 EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
862 EXPECT_TRUE(output0[0].data.Verify());
863 EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
864 EXPECT_TRUE(output0[1].data.Verify());
865 EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
866 EXPECT_TRUE(output0[2].data.Verify());
867 }
868
869 {
870 SCOPED_TRACE("Trying node1 now");
871 std::deque<TimestampedMessage> output1;
872
Austin Schuh79b30942021-01-24 22:32:21 -0800873 EXPECT_EQ(mapper0_count, 3u);
874 EXPECT_EQ(mapper1_count, 0u);
875
Austin Schuhd2f96102020-12-01 20:27:29 -0800876 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800877 EXPECT_EQ(mapper0_count, 3u);
878 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800879 output1.emplace_back(std::move(*mapper1.Front()));
880 mapper1.PopFront();
881 EXPECT_EQ(mapper1.sorted_until(),
882 e + chrono::seconds(100) + chrono::milliseconds(1900));
Austin Schuh79b30942021-01-24 22:32:21 -0800883 EXPECT_EQ(mapper0_count, 3u);
884 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800885
886 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800887 EXPECT_EQ(mapper0_count, 3u);
888 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800889 output1.emplace_back(std::move(*mapper1.Front()));
890 mapper1.PopFront();
891 EXPECT_EQ(mapper1.sorted_until(),
892 e + chrono::seconds(100) + chrono::milliseconds(2900));
893
894 ASSERT_TRUE(mapper1.Front() != nullptr);
895 output1.emplace_back(std::move(*mapper1.Front()));
896 mapper1.PopFront();
897 EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
898
Austin Schuh79b30942021-01-24 22:32:21 -0800899 EXPECT_EQ(mapper0_count, 3u);
900 EXPECT_EQ(mapper1_count, 3u);
901
Austin Schuhd2f96102020-12-01 20:27:29 -0800902 ASSERT_TRUE(mapper1.Front() == nullptr);
903
Austin Schuh79b30942021-01-24 22:32:21 -0800904 EXPECT_EQ(mapper0_count, 3u);
905 EXPECT_EQ(mapper1_count, 3u);
906
Austin Schuhd2f96102020-12-01 20:27:29 -0800907 EXPECT_EQ(output1[0].monotonic_event_time,
908 e + chrono::seconds(100) + chrono::milliseconds(1000));
909 EXPECT_TRUE(output1[0].data.Verify());
910 EXPECT_EQ(output1[1].monotonic_event_time,
911 e + chrono::seconds(100) + chrono::milliseconds(2000));
912 EXPECT_TRUE(output1[1].data.Verify());
913 EXPECT_EQ(output1[2].monotonic_event_time,
914 e + chrono::seconds(100) + chrono::milliseconds(3000));
915 EXPECT_TRUE(output1[2].data.Verify());
916 }
917}
918
Austin Schuh8bf1e632021-01-02 22:41:04 -0800919// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
920// returned.
921TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
922 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
923 {
924 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
925 writer0.QueueSpan(config0_.span());
926 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
927 writer1.QueueSpan(config4_.span());
928
929 writer0.QueueSizedFlatbuffer(
930 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
931 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
932 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
933 e + chrono::nanoseconds(971)));
934
935 writer0.QueueSizedFlatbuffer(
936 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
937 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
938 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
939 e + chrono::nanoseconds(5458)));
940
941 writer0.QueueSizedFlatbuffer(
942 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
943 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
944 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
945 }
946
947 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
948
949 for (const auto &p : parts) {
950 LOG(INFO) << p;
951 }
952
953 ASSERT_EQ(parts.size(), 1u);
954
Austin Schuh79b30942021-01-24 22:32:21 -0800955 size_t mapper0_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800956 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -0800957 mapper0.set_timestamp_callback(
958 [&](TimestampedMessage *) { ++mapper0_count; });
959 size_t mapper1_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800960 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -0800961 mapper1.set_timestamp_callback(
962 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -0800963
964 mapper0.AddPeer(&mapper1);
965 mapper1.AddPeer(&mapper0);
966
967 {
968 std::deque<TimestampedMessage> output0;
969
970 for (int i = 0; i < 3; ++i) {
971 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
972 output0.emplace_back(std::move(*mapper0.Front()));
973 mapper0.PopFront();
974 }
975
976 ASSERT_TRUE(mapper0.Front() == nullptr);
977
978 EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
979 EXPECT_EQ(output0[0].monotonic_timestamp_time, monotonic_clock::min_time);
980 EXPECT_TRUE(output0[0].data.Verify());
981 EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
982 EXPECT_EQ(output0[1].monotonic_timestamp_time, monotonic_clock::min_time);
983 EXPECT_TRUE(output0[1].data.Verify());
984 EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
985 EXPECT_EQ(output0[2].monotonic_timestamp_time, monotonic_clock::min_time);
986 EXPECT_TRUE(output0[2].data.Verify());
987 }
988
989 {
990 SCOPED_TRACE("Trying node1 now");
991 std::deque<TimestampedMessage> output1;
992
993 for (int i = 0; i < 3; ++i) {
994 ASSERT_TRUE(mapper1.Front() != nullptr);
995 output1.emplace_back(std::move(*mapper1.Front()));
996 mapper1.PopFront();
997 }
998
999 ASSERT_TRUE(mapper1.Front() == nullptr);
1000
1001 EXPECT_EQ(output1[0].monotonic_event_time,
1002 e + chrono::seconds(100) + chrono::milliseconds(1000));
1003 EXPECT_EQ(output1[0].monotonic_timestamp_time,
1004 e + chrono::nanoseconds(971));
1005 EXPECT_TRUE(output1[0].data.Verify());
1006 EXPECT_EQ(output1[1].monotonic_event_time,
1007 e + chrono::seconds(100) + chrono::milliseconds(2000));
1008 EXPECT_EQ(output1[1].monotonic_timestamp_time,
1009 e + chrono::nanoseconds(5458));
1010 EXPECT_TRUE(output1[1].data.Verify());
1011 EXPECT_EQ(output1[2].monotonic_event_time,
1012 e + chrono::seconds(100) + chrono::milliseconds(3000));
1013 EXPECT_EQ(output1[2].monotonic_timestamp_time, monotonic_clock::min_time);
1014 EXPECT_TRUE(output1[2].data.Verify());
1015 }
Austin Schuh79b30942021-01-24 22:32:21 -08001016
1017 EXPECT_EQ(mapper0_count, 3u);
1018 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001019}
1020
Austin Schuhd2f96102020-12-01 20:27:29 -08001021// Tests that we can match timestamps on delivered messages. By doing this in
1022// the reverse order, the second node needs to queue data up from the first node
1023// to find the matching timestamp.
1024TEST_F(TimestampMapperTest, ReadNode1First) {
1025 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1026 {
1027 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1028 writer0.QueueSpan(config0_.span());
1029 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1030 writer1.QueueSpan(config2_.span());
1031
1032 writer0.QueueSizedFlatbuffer(
1033 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1034 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1035 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1036
1037 writer0.QueueSizedFlatbuffer(
1038 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1039 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1040 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1041
1042 writer0.QueueSizedFlatbuffer(
1043 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1044 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1045 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1046 }
1047
1048 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1049
1050 ASSERT_EQ(parts[0].logger_node, "pi1");
1051 ASSERT_EQ(parts[1].logger_node, "pi2");
1052
Austin Schuh79b30942021-01-24 22:32:21 -08001053 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001054 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001055 mapper0.set_timestamp_callback(
1056 [&](TimestampedMessage *) { ++mapper0_count; });
1057 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001058 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001059 mapper1.set_timestamp_callback(
1060 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001061
1062 mapper0.AddPeer(&mapper1);
1063 mapper1.AddPeer(&mapper0);
1064
1065 {
1066 SCOPED_TRACE("Trying node1 now");
1067 std::deque<TimestampedMessage> output1;
1068
1069 ASSERT_TRUE(mapper1.Front() != nullptr);
1070 output1.emplace_back(std::move(*mapper1.Front()));
1071 mapper1.PopFront();
1072 EXPECT_EQ(mapper1.sorted_until(),
1073 e + chrono::seconds(100) + chrono::milliseconds(1900));
1074
1075 ASSERT_TRUE(mapper1.Front() != nullptr);
1076 output1.emplace_back(std::move(*mapper1.Front()));
1077 mapper1.PopFront();
1078 EXPECT_EQ(mapper1.sorted_until(),
1079 e + chrono::seconds(100) + chrono::milliseconds(2900));
1080
1081 ASSERT_TRUE(mapper1.Front() != nullptr);
1082 output1.emplace_back(std::move(*mapper1.Front()));
1083 mapper1.PopFront();
1084 EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
1085
1086 ASSERT_TRUE(mapper1.Front() == nullptr);
1087
1088 EXPECT_EQ(output1[0].monotonic_event_time,
1089 e + chrono::seconds(100) + chrono::milliseconds(1000));
1090 EXPECT_TRUE(output1[0].data.Verify());
1091 EXPECT_EQ(output1[1].monotonic_event_time,
1092 e + chrono::seconds(100) + chrono::milliseconds(2000));
1093 EXPECT_TRUE(output1[1].data.Verify());
1094 EXPECT_EQ(output1[2].monotonic_event_time,
1095 e + chrono::seconds(100) + chrono::milliseconds(3000));
1096 EXPECT_TRUE(output1[2].data.Verify());
1097 }
1098
1099 {
1100 std::deque<TimestampedMessage> output0;
1101
1102 ASSERT_TRUE(mapper0.Front() != nullptr);
1103 output0.emplace_back(std::move(*mapper0.Front()));
1104 mapper0.PopFront();
1105 EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
1106
1107 ASSERT_TRUE(mapper0.Front() != nullptr);
1108 output0.emplace_back(std::move(*mapper0.Front()));
1109 mapper0.PopFront();
1110 EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
1111
1112 ASSERT_TRUE(mapper0.Front() != nullptr);
1113 output0.emplace_back(std::move(*mapper0.Front()));
1114 mapper0.PopFront();
1115 EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
1116
1117 ASSERT_TRUE(mapper0.Front() == nullptr);
1118
1119 EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
1120 EXPECT_TRUE(output0[0].data.Verify());
1121 EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
1122 EXPECT_TRUE(output0[1].data.Verify());
1123 EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
1124 EXPECT_TRUE(output0[2].data.Verify());
1125 }
Austin Schuh79b30942021-01-24 22:32:21 -08001126
1127 EXPECT_EQ(mapper0_count, 3u);
1128 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001129}
1130
1131// Tests that we return just the timestamps if we couldn't find the data and the
1132// missing data was at the beginning of the file.
1133TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1134 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1135 {
1136 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1137 writer0.QueueSpan(config0_.span());
1138 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1139 writer1.QueueSpan(config2_.span());
1140
1141 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
1142 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1143 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1144
1145 writer0.QueueSizedFlatbuffer(
1146 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1147 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1148 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1149
1150 writer0.QueueSizedFlatbuffer(
1151 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1152 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1153 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1154 }
1155
1156 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1157
1158 ASSERT_EQ(parts[0].logger_node, "pi1");
1159 ASSERT_EQ(parts[1].logger_node, "pi2");
1160
Austin Schuh79b30942021-01-24 22:32:21 -08001161 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001162 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001163 mapper0.set_timestamp_callback(
1164 [&](TimestampedMessage *) { ++mapper0_count; });
1165 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001166 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001167 mapper1.set_timestamp_callback(
1168 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001169
1170 mapper0.AddPeer(&mapper1);
1171 mapper1.AddPeer(&mapper0);
1172
1173 {
1174 SCOPED_TRACE("Trying node1 now");
1175 std::deque<TimestampedMessage> output1;
1176
1177 ASSERT_TRUE(mapper1.Front() != nullptr);
1178 output1.emplace_back(std::move(*mapper1.Front()));
1179 mapper1.PopFront();
1180 EXPECT_EQ(mapper1.sorted_until(),
1181 e + chrono::seconds(100) + chrono::milliseconds(1900));
1182
1183 ASSERT_TRUE(mapper1.Front() != nullptr);
1184 output1.emplace_back(std::move(*mapper1.Front()));
1185 mapper1.PopFront();
1186 EXPECT_EQ(mapper1.sorted_until(),
1187 e + chrono::seconds(100) + chrono::milliseconds(2900));
1188
1189 ASSERT_TRUE(mapper1.Front() != nullptr);
1190 output1.emplace_back(std::move(*mapper1.Front()));
1191 mapper1.PopFront();
1192 EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
1193
1194 ASSERT_TRUE(mapper1.Front() == nullptr);
1195
1196 EXPECT_EQ(output1[0].monotonic_event_time,
1197 e + chrono::seconds(100) + chrono::milliseconds(1000));
1198 EXPECT_FALSE(output1[0].data.Verify());
1199 EXPECT_EQ(output1[1].monotonic_event_time,
1200 e + chrono::seconds(100) + chrono::milliseconds(2000));
1201 EXPECT_TRUE(output1[1].data.Verify());
1202 EXPECT_EQ(output1[2].monotonic_event_time,
1203 e + chrono::seconds(100) + chrono::milliseconds(3000));
1204 EXPECT_TRUE(output1[2].data.Verify());
1205 }
Austin Schuh79b30942021-01-24 22:32:21 -08001206
1207 EXPECT_EQ(mapper0_count, 0u);
1208 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001209}
1210
1211// Tests that we return just the timestamps if we couldn't find the data and the
1212// missing data was at the end of the file.
1213TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1214 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1215 {
1216 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1217 writer0.QueueSpan(config0_.span());
1218 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1219 writer1.QueueSpan(config2_.span());
1220
1221 writer0.QueueSizedFlatbuffer(
1222 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1223 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1224 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1225
1226 writer0.QueueSizedFlatbuffer(
1227 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1228 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1229 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1230
1231 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
1232 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1233 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1234 }
1235
1236 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1237
1238 ASSERT_EQ(parts[0].logger_node, "pi1");
1239 ASSERT_EQ(parts[1].logger_node, "pi2");
1240
Austin Schuh79b30942021-01-24 22:32:21 -08001241 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001242 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001243 mapper0.set_timestamp_callback(
1244 [&](TimestampedMessage *) { ++mapper0_count; });
1245 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001246 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001247 mapper1.set_timestamp_callback(
1248 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001249
1250 mapper0.AddPeer(&mapper1);
1251 mapper1.AddPeer(&mapper0);
1252
1253 {
1254 SCOPED_TRACE("Trying node1 now");
1255 std::deque<TimestampedMessage> output1;
1256
1257 ASSERT_TRUE(mapper1.Front() != nullptr);
1258 output1.emplace_back(std::move(*mapper1.Front()));
1259 mapper1.PopFront();
1260 EXPECT_EQ(mapper1.sorted_until(),
1261 e + chrono::seconds(100) + chrono::milliseconds(1900));
1262
1263 ASSERT_TRUE(mapper1.Front() != nullptr);
1264 output1.emplace_back(std::move(*mapper1.Front()));
1265 mapper1.PopFront();
1266 EXPECT_EQ(mapper1.sorted_until(),
1267 e + chrono::seconds(100) + chrono::milliseconds(2900));
1268
1269 ASSERT_TRUE(mapper1.Front() != nullptr);
1270 output1.emplace_back(std::move(*mapper1.Front()));
1271 mapper1.PopFront();
1272 EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
1273
1274 ASSERT_TRUE(mapper1.Front() == nullptr);
1275
1276 EXPECT_EQ(output1[0].monotonic_event_time,
1277 e + chrono::seconds(100) + chrono::milliseconds(1000));
1278 EXPECT_TRUE(output1[0].data.Verify());
1279 EXPECT_EQ(output1[1].monotonic_event_time,
1280 e + chrono::seconds(100) + chrono::milliseconds(2000));
1281 EXPECT_TRUE(output1[1].data.Verify());
1282 EXPECT_EQ(output1[2].monotonic_event_time,
1283 e + chrono::seconds(100) + chrono::milliseconds(3000));
1284 EXPECT_FALSE(output1[2].data.Verify());
1285 }
Austin Schuh79b30942021-01-24 22:32:21 -08001286
1287 EXPECT_EQ(mapper0_count, 0u);
1288 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001289}
1290
Austin Schuh993ccb52020-12-12 15:59:32 -08001291// Tests that we handle a message which failed to forward or be logged.
1292TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1293 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1294 {
1295 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1296 writer0.QueueSpan(config0_.span());
1297 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1298 writer1.QueueSpan(config2_.span());
1299
1300 writer0.QueueSizedFlatbuffer(
1301 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1302 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1303 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1304
1305 // Create both the timestamp and message, but don't log them, simulating a
1306 // forwarding drop.
1307 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1308 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1309 chrono::seconds(100));
1310
1311 writer0.QueueSizedFlatbuffer(
1312 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1313 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1314 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1315 }
1316
1317 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1318
1319 ASSERT_EQ(parts[0].logger_node, "pi1");
1320 ASSERT_EQ(parts[1].logger_node, "pi2");
1321
Austin Schuh79b30942021-01-24 22:32:21 -08001322 size_t mapper0_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001323 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001324 mapper0.set_timestamp_callback(
1325 [&](TimestampedMessage *) { ++mapper0_count; });
1326 size_t mapper1_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001327 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001328 mapper1.set_timestamp_callback(
1329 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001330
1331 mapper0.AddPeer(&mapper1);
1332 mapper1.AddPeer(&mapper0);
1333
1334 {
1335 std::deque<TimestampedMessage> output1;
1336
1337 ASSERT_TRUE(mapper1.Front() != nullptr);
1338 output1.emplace_back(std::move(*mapper1.Front()));
1339 mapper1.PopFront();
1340
1341 ASSERT_TRUE(mapper1.Front() != nullptr);
1342 output1.emplace_back(std::move(*mapper1.Front()));
1343
1344 ASSERT_FALSE(mapper1.Front() == nullptr);
1345
1346 EXPECT_EQ(output1[0].monotonic_event_time,
1347 e + chrono::seconds(100) + chrono::milliseconds(1000));
1348 EXPECT_TRUE(output1[0].data.Verify());
1349 EXPECT_EQ(output1[1].monotonic_event_time,
1350 e + chrono::seconds(100) + chrono::milliseconds(3000));
1351 EXPECT_TRUE(output1[1].data.Verify());
1352 }
Austin Schuh79b30942021-01-24 22:32:21 -08001353
1354 EXPECT_EQ(mapper0_count, 0u);
1355 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001356}
1357
Austin Schuhd2f96102020-12-01 20:27:29 -08001358// Tests that we properly sort log files with duplicate timestamps.
1359TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1360 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1361 {
1362 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1363 writer0.QueueSpan(config0_.span());
1364 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1365 writer1.QueueSpan(config2_.span());
1366
1367 writer0.QueueSizedFlatbuffer(
1368 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1369 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1370 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1371
1372 writer0.QueueSizedFlatbuffer(
1373 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1374 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1375 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1376
1377 writer0.QueueSizedFlatbuffer(
1378 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
1379 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1380 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1381
1382 writer0.QueueSizedFlatbuffer(
1383 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
1384 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1385 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1386 }
1387
1388 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1389
1390 ASSERT_EQ(parts[0].logger_node, "pi1");
1391 ASSERT_EQ(parts[1].logger_node, "pi2");
1392
Austin Schuh79b30942021-01-24 22:32:21 -08001393 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001394 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001395 mapper0.set_timestamp_callback(
1396 [&](TimestampedMessage *) { ++mapper0_count; });
1397 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001398 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001399 mapper1.set_timestamp_callback(
1400 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001401
1402 mapper0.AddPeer(&mapper1);
1403 mapper1.AddPeer(&mapper0);
1404
1405 {
1406 SCOPED_TRACE("Trying node1 now");
1407 std::deque<TimestampedMessage> output1;
1408
1409 for (int i = 0; i < 4; ++i) {
1410 ASSERT_TRUE(mapper1.Front() != nullptr);
1411 output1.emplace_back(std::move(*mapper1.Front()));
1412 mapper1.PopFront();
1413 }
1414 ASSERT_TRUE(mapper1.Front() == nullptr);
1415
1416 EXPECT_EQ(output1[0].monotonic_event_time,
1417 e + chrono::seconds(100) + chrono::milliseconds(1000));
1418 EXPECT_TRUE(output1[0].data.Verify());
1419 EXPECT_EQ(output1[1].monotonic_event_time,
1420 e + chrono::seconds(100) + chrono::milliseconds(2000));
1421 EXPECT_TRUE(output1[1].data.Verify());
1422 EXPECT_EQ(output1[2].monotonic_event_time,
1423 e + chrono::seconds(100) + chrono::milliseconds(2000));
1424 EXPECT_TRUE(output1[2].data.Verify());
1425 EXPECT_EQ(output1[3].monotonic_event_time,
1426 e + chrono::seconds(100) + chrono::milliseconds(3000));
1427 EXPECT_TRUE(output1[3].data.Verify());
1428 }
Austin Schuh79b30942021-01-24 22:32:21 -08001429
1430 EXPECT_EQ(mapper0_count, 0u);
1431 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001432}
1433
1434// Tests that we properly sort log files with duplicate timestamps.
1435TEST_F(TimestampMapperTest, StartTime) {
1436 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1437 {
1438 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1439 writer0.QueueSpan(config0_.span());
1440 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1441 writer1.QueueSpan(config1_.span());
1442 DetachedBufferWriter writer2(logfile2_, std::make_unique<DummyEncoder>());
1443 writer2.QueueSpan(config3_.span());
1444 }
1445
1446 const std::vector<LogFile> parts =
1447 SortParts({logfile0_, logfile1_, logfile2_});
1448
Austin Schuh79b30942021-01-24 22:32:21 -08001449 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001450 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001451 mapper0.set_timestamp_callback(
1452 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001453
1454 EXPECT_EQ(mapper0.monotonic_start_time(), e + chrono::milliseconds(1));
1455 EXPECT_EQ(mapper0.realtime_start_time(),
1456 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001457 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001458}
1459
Austin Schuhfecf1d82020-12-19 16:57:28 -08001460// Tests that when a peer isn't registered, we treat that as if there was no
1461// data available.
1462TEST_F(TimestampMapperTest, NoPeer) {
1463 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1464 {
1465 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1466 writer0.QueueSpan(config0_.span());
1467 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1468 writer1.QueueSpan(config2_.span());
1469
1470 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
1471 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1472 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1473
1474 writer0.QueueSizedFlatbuffer(
1475 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1476 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1477 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1478
1479 writer0.QueueSizedFlatbuffer(
1480 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1481 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1482 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1483 }
1484
1485 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1486
1487 ASSERT_EQ(parts[0].logger_node, "pi1");
1488 ASSERT_EQ(parts[1].logger_node, "pi2");
1489
Austin Schuh79b30942021-01-24 22:32:21 -08001490 size_t mapper1_count = 0;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001491 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001492 mapper1.set_timestamp_callback(
1493 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001494
1495 {
1496 std::deque<TimestampedMessage> output1;
1497
1498 ASSERT_TRUE(mapper1.Front() != nullptr);
1499 output1.emplace_back(std::move(*mapper1.Front()));
1500 mapper1.PopFront();
1501 ASSERT_TRUE(mapper1.Front() != nullptr);
1502 output1.emplace_back(std::move(*mapper1.Front()));
1503 mapper1.PopFront();
1504 ASSERT_TRUE(mapper1.Front() != nullptr);
1505 output1.emplace_back(std::move(*mapper1.Front()));
1506 mapper1.PopFront();
1507 ASSERT_TRUE(mapper1.Front() == nullptr);
1508
1509 EXPECT_EQ(output1[0].monotonic_event_time,
1510 e + chrono::seconds(100) + chrono::milliseconds(1000));
1511 EXPECT_FALSE(output1[0].data.Verify());
1512 EXPECT_EQ(output1[1].monotonic_event_time,
1513 e + chrono::seconds(100) + chrono::milliseconds(2000));
1514 EXPECT_FALSE(output1[1].data.Verify());
1515 EXPECT_EQ(output1[2].monotonic_event_time,
1516 e + chrono::seconds(100) + chrono::milliseconds(3000));
1517 EXPECT_FALSE(output1[2].data.Verify());
1518 }
Austin Schuh79b30942021-01-24 22:32:21 -08001519 EXPECT_EQ(mapper1_count, 3u);
1520}
1521
1522// Tests that we can queue messages and call the timestamp callback for both
1523// nodes.
1524TEST_F(TimestampMapperTest, QueueUntilNode0) {
1525 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1526 {
1527 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1528 writer0.QueueSpan(config0_.span());
1529 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1530 writer1.QueueSpan(config2_.span());
1531
1532 writer0.QueueSizedFlatbuffer(
1533 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1534 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1535 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1536
1537 writer0.QueueSizedFlatbuffer(
1538 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
1539 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1540 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1541
1542 writer0.QueueSizedFlatbuffer(
1543 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
1544 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1545 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1546
1547 writer0.QueueSizedFlatbuffer(
1548 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
1549 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1550 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1551 }
1552
1553 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1554
1555 ASSERT_EQ(parts[0].logger_node, "pi1");
1556 ASSERT_EQ(parts[1].logger_node, "pi2");
1557
1558 size_t mapper0_count = 0;
1559 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1560 mapper0.set_timestamp_callback(
1561 [&](TimestampedMessage *) { ++mapper0_count; });
1562 size_t mapper1_count = 0;
1563 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1564 mapper1.set_timestamp_callback(
1565 [&](TimestampedMessage *) { ++mapper1_count; });
1566
1567 mapper0.AddPeer(&mapper1);
1568 mapper1.AddPeer(&mapper0);
1569
1570 {
1571 std::deque<TimestampedMessage> output0;
1572
1573 EXPECT_EQ(mapper0_count, 0u);
1574 EXPECT_EQ(mapper1_count, 0u);
1575 mapper0.QueueUntil(e + chrono::milliseconds(1000));
1576 EXPECT_EQ(mapper0_count, 3u);
1577 EXPECT_EQ(mapper1_count, 0u);
1578
1579 ASSERT_TRUE(mapper0.Front() != nullptr);
1580 EXPECT_EQ(mapper0_count, 3u);
1581 EXPECT_EQ(mapper1_count, 0u);
1582
1583 mapper0.QueueUntil(e + chrono::milliseconds(1500));
1584 EXPECT_EQ(mapper0_count, 3u);
1585 EXPECT_EQ(mapper1_count, 0u);
1586
1587 mapper0.QueueUntil(e + chrono::milliseconds(2500));
1588 EXPECT_EQ(mapper0_count, 4u);
1589 EXPECT_EQ(mapper1_count, 0u);
1590
1591 output0.emplace_back(std::move(*mapper0.Front()));
1592 mapper0.PopFront();
1593 output0.emplace_back(std::move(*mapper0.Front()));
1594 mapper0.PopFront();
1595 output0.emplace_back(std::move(*mapper0.Front()));
1596 mapper0.PopFront();
1597 output0.emplace_back(std::move(*mapper0.Front()));
1598 mapper0.PopFront();
1599
1600 EXPECT_EQ(mapper0_count, 4u);
1601 EXPECT_EQ(mapper1_count, 0u);
1602
1603 ASSERT_TRUE(mapper0.Front() == nullptr);
1604
1605 EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
1606 EXPECT_TRUE(output0[0].data.Verify());
1607 EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(1000));
1608 EXPECT_TRUE(output0[1].data.Verify());
1609 EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(2000));
1610 EXPECT_TRUE(output0[2].data.Verify());
1611 EXPECT_EQ(output0[3].monotonic_event_time, e + chrono::milliseconds(3000));
1612 EXPECT_TRUE(output0[3].data.Verify());
1613 }
1614
1615 {
1616 SCOPED_TRACE("Trying node1 now");
1617 std::deque<TimestampedMessage> output1;
1618
1619 EXPECT_EQ(mapper0_count, 4u);
1620 EXPECT_EQ(mapper1_count, 0u);
1621 mapper1.QueueUntil(e + chrono::seconds(100) + chrono::milliseconds(1000));
1622 EXPECT_EQ(mapper0_count, 4u);
1623 EXPECT_EQ(mapper1_count, 3u);
1624
1625 ASSERT_TRUE(mapper1.Front() != nullptr);
1626 EXPECT_EQ(mapper0_count, 4u);
1627 EXPECT_EQ(mapper1_count, 3u);
1628
1629 mapper1.QueueUntil(e + chrono::seconds(100) + chrono::milliseconds(1500));
1630 EXPECT_EQ(mapper0_count, 4u);
1631 EXPECT_EQ(mapper1_count, 3u);
1632
1633 mapper1.QueueUntil(e + chrono::seconds(100) + chrono::milliseconds(2500));
1634 EXPECT_EQ(mapper0_count, 4u);
1635 EXPECT_EQ(mapper1_count, 4u);
1636
1637 ASSERT_TRUE(mapper1.Front() != nullptr);
1638 EXPECT_EQ(mapper0_count, 4u);
1639 EXPECT_EQ(mapper1_count, 4u);
1640
1641 output1.emplace_back(std::move(*mapper1.Front()));
1642 mapper1.PopFront();
1643 ASSERT_TRUE(mapper1.Front() != nullptr);
1644 output1.emplace_back(std::move(*mapper1.Front()));
1645 mapper1.PopFront();
1646 ASSERT_TRUE(mapper1.Front() != nullptr);
1647 output1.emplace_back(std::move(*mapper1.Front()));
1648 mapper1.PopFront();
1649 ASSERT_TRUE(mapper1.Front() != nullptr);
1650 output1.emplace_back(std::move(*mapper1.Front()));
1651 mapper1.PopFront();
1652
1653 EXPECT_EQ(mapper0_count, 4u);
1654 EXPECT_EQ(mapper1_count, 4u);
1655
1656 ASSERT_TRUE(mapper1.Front() == nullptr);
1657
1658 EXPECT_EQ(mapper0_count, 4u);
1659 EXPECT_EQ(mapper1_count, 4u);
1660
1661 EXPECT_EQ(output1[0].monotonic_event_time,
1662 e + chrono::seconds(100) + chrono::milliseconds(1000));
1663 EXPECT_TRUE(output1[0].data.Verify());
1664 EXPECT_EQ(output1[1].monotonic_event_time,
1665 e + chrono::seconds(100) + chrono::milliseconds(1000));
1666 EXPECT_TRUE(output1[1].data.Verify());
1667 EXPECT_EQ(output1[2].monotonic_event_time,
1668 e + chrono::seconds(100) + chrono::milliseconds(2000));
1669 EXPECT_TRUE(output1[2].data.Verify());
1670 EXPECT_EQ(output1[3].monotonic_event_time,
1671 e + chrono::seconds(100) + chrono::milliseconds(3000));
1672 EXPECT_TRUE(output1[3].data.Verify());
1673 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08001674}
1675
Austin Schuhc243b422020-10-11 15:35:08 -07001676} // namespace testing
1677} // namespace logger
1678} // namespace aos