blob: 7795e667bba166aa401e52998f78c4f924edadc0 [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
4#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07005
Austin Schuhc41603c2020-10-11 16:17:37 -07006#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -07007#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -08008#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -07009#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070010#include "aos/json_to_flatbuffer.h"
11#include "aos/testing/tmpdir.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070012#include "gtest/gtest.h"
Austin Schuhc243b422020-10-11 15:35:08 -070013
14namespace aos {
15namespace logger {
16namespace testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070017namespace chrono = std::chrono;
Austin Schuhc243b422020-10-11 15:35:08 -070018
Austin Schuhe243aaf2020-10-11 15:46:02 -070019// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070020template <typename T>
21SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
22 const std::string_view data) {
23 flatbuffers::FlatBufferBuilder fbb;
24 fbb.ForceDefaults(true);
25 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
26 return fbb.Release();
27}
28
Austin Schuhe243aaf2020-10-11 15:46:02 -070029// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070030TEST(SpanReaderTest, ReadWrite) {
31 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
32 unlink(logfile.c_str());
33
34 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080035 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070036 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080037 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070038
39 {
40 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080041 writer.QueueSpan(m1.span());
42 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070043 }
44
45 SpanReader reader(logfile);
46
47 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080048 EXPECT_EQ(reader.ReadMessage(), m1.span());
49 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070050 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
51}
52
Austin Schuhe243aaf2020-10-11 15:46:02 -070053// Tests that we can actually parse the resulting messages at a basic level
54// through MessageReader.
55TEST(MessageReaderTest, ReadWrite) {
56 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
57 unlink(logfile.c_str());
58
59 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
60 JsonToSizedFlatbuffer<LogFileHeader>(
61 R"({ "max_out_of_order_duration": 100000000 })");
62 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
63 JsonToSizedFlatbuffer<MessageHeader>(
64 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
65 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
66 JsonToSizedFlatbuffer<MessageHeader>(
67 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
68
69 {
70 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080071 writer.QueueSpan(config.span());
72 writer.QueueSpan(m1.span());
73 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -070074 }
75
76 MessageReader reader(logfile);
77
78 EXPECT_EQ(reader.filename(), logfile);
79
80 EXPECT_EQ(
81 reader.max_out_of_order_duration(),
82 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
83 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
84 EXPECT_TRUE(reader.ReadMessage());
85 EXPECT_EQ(reader.newest_timestamp(),
86 monotonic_clock::time_point(chrono::nanoseconds(1)));
87 EXPECT_TRUE(reader.ReadMessage());
88 EXPECT_EQ(reader.newest_timestamp(),
89 monotonic_clock::time_point(chrono::nanoseconds(2)));
90 EXPECT_FALSE(reader.ReadMessage());
91}
92
Austin Schuh32f68492020-11-08 21:45:51 -080093// Tests that we explode when messages are too far out of order.
94TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
95 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
96 unlink(logfile0.c_str());
97
98 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
99 JsonToSizedFlatbuffer<LogFileHeader>(
100 R"({
101 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800102 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800103 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
104 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
105 "parts_index": 0
106})");
107
108 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
109 JsonToSizedFlatbuffer<MessageHeader>(
110 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
111 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
112 JsonToSizedFlatbuffer<MessageHeader>(
113 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
114 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
115 JsonToSizedFlatbuffer<MessageHeader>(
116 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
117
118 {
119 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800120 writer.QueueSpan(config0.span());
121 writer.QueueSpan(m1.span());
122 writer.QueueSpan(m2.span());
123 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800124 }
125
126 const std::vector<LogFile> parts = SortParts({logfile0});
127
128 PartsMessageReader reader(parts[0].parts[0]);
129
130 EXPECT_TRUE(reader.ReadMessage());
131 EXPECT_TRUE(reader.ReadMessage());
132 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
133}
134
Austin Schuhc41603c2020-10-11 16:17:37 -0700135// Tests that we can transparently re-assemble part files with a
136// PartsMessageReader.
137TEST(PartsMessageReaderTest, ReadWrite) {
138 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
139 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
140 unlink(logfile0.c_str());
141 unlink(logfile1.c_str());
142
143 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
144 JsonToSizedFlatbuffer<LogFileHeader>(
145 R"({
146 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800147 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700148 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
149 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
150 "parts_index": 0
151})");
152 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
153 JsonToSizedFlatbuffer<LogFileHeader>(
154 R"({
155 "max_out_of_order_duration": 200000000,
156 "monotonic_start_time": 0,
157 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800158 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700159 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
160 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
161 "parts_index": 1
162})");
163
164 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
165 JsonToSizedFlatbuffer<MessageHeader>(
166 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
167 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
168 JsonToSizedFlatbuffer<MessageHeader>(
169 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
170
171 {
172 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800173 writer.QueueSpan(config0.span());
174 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700175 }
176 {
177 DetachedBufferWriter writer(logfile1, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800178 writer.QueueSpan(config1.span());
179 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700180 }
181
182 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
183
184 PartsMessageReader reader(parts[0].parts[0]);
185
186 EXPECT_EQ(reader.filename(), logfile0);
187
188 // Confirm that the timestamps track, and the filename also updates.
189 // Read the first message.
190 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
191 EXPECT_EQ(
192 reader.max_out_of_order_duration(),
193 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
194 EXPECT_TRUE(reader.ReadMessage());
195 EXPECT_EQ(reader.filename(), logfile0);
196 EXPECT_EQ(reader.newest_timestamp(),
197 monotonic_clock::time_point(chrono::nanoseconds(1)));
198 EXPECT_EQ(
199 reader.max_out_of_order_duration(),
200 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
201
202 // Read the second message.
203 EXPECT_TRUE(reader.ReadMessage());
204 EXPECT_EQ(reader.filename(), logfile1);
205 EXPECT_EQ(reader.newest_timestamp(),
206 monotonic_clock::time_point(chrono::nanoseconds(2)));
207 EXPECT_EQ(
208 reader.max_out_of_order_duration(),
209 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
210
211 // And then confirm that reading again returns no message.
212 EXPECT_FALSE(reader.ReadMessage());
213 EXPECT_EQ(reader.filename(), logfile1);
214 EXPECT_EQ(
215 reader.max_out_of_order_duration(),
216 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800217 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Austin Schuhc41603c2020-10-11 16:17:37 -0700218}
Austin Schuh32f68492020-11-08 21:45:51 -0800219
Austin Schuh1be0ce42020-11-29 22:43:26 -0800220// Tests that Message's operator < works as expected.
221TEST(MessageTest, Sorting) {
222 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
223
224 Message m1{.channel_index = 0,
225 .queue_index = 0,
226 .timestamp = e + chrono::milliseconds(1),
227 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
228 Message m2{.channel_index = 0,
229 .queue_index = 0,
230 .timestamp = e + chrono::milliseconds(2),
231 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
232
233 EXPECT_LT(m1, m2);
234 EXPECT_GE(m2, m1);
235
236 m1.timestamp = e;
237 m2.timestamp = e;
238
239 m1.channel_index = 1;
240 m2.channel_index = 2;
241
242 EXPECT_LT(m1, m2);
243 EXPECT_GE(m2, m1);
244
245 m1.channel_index = 0;
246 m2.channel_index = 0;
247 m1.queue_index = 0;
248 m2.queue_index = 1;
249
250 EXPECT_LT(m1, m2);
251 EXPECT_GE(m2, m1);
252}
253
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800254aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
255 const aos::FlatbufferDetachedBuffer<Configuration> &config,
256 const std::string_view json) {
257 flatbuffers::FlatBufferBuilder fbb;
258 flatbuffers::Offset<Configuration> config_offset =
259 aos::CopyFlatBuffer(config, &fbb);
260 LogFileHeader::Builder header_builder(fbb);
261 header_builder.add_configuration(config_offset);
262 fbb.Finish(header_builder.Finish());
263 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
264
265 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
266 JsonToFlatbuffer<LogFileHeader>(json));
267 CHECK(header_updates.Verify());
268 flatbuffers::FlatBufferBuilder fbb2;
269 fbb2.FinishSizePrefixed(
270 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
271 return fbb2.Release();
272}
273
274class SortingElementTest : public ::testing::Test {
275 public:
276 SortingElementTest()
277 : config_(JsonToFlatbuffer<Configuration>(
278 R"({
279 "channels": [
280 {
281 "name": "/a",
282 "type": "aos.logger.testing.TestMessage",
283 "source_node": "pi1",
284 "destination_nodes": [
285 {
286 "name": "pi2"
287 },
288 {
289 "name": "pi3"
290 }
291 ]
292 },
293 {
294 "name": "/b",
295 "type": "aos.logger.testing.TestMessage",
296 "source_node": "pi1"
297 },
298 {
299 "name": "/c",
300 "type": "aos.logger.testing.TestMessage",
301 "source_node": "pi1"
302 }
303 ],
304 "nodes": [
305 {
306 "name": "pi1"
307 },
308 {
309 "name": "pi2"
310 },
311 {
312 "name": "pi3"
313 }
314 ]
315}
316)")),
317 config0_(MakeHeader(config_, R"({
318 /* 100ms */
319 "max_out_of_order_duration": 100000000,
320 "node": {
321 "name": "pi1"
322 },
323 "logger_node": {
324 "name": "pi1"
325 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800326 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800327 "realtime_start_time": 1000000000000,
328 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
329 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
330 "parts_index": 0
331})")),
332 config1_(MakeHeader(config_,
333 R"({
334 /* 100ms */
335 "max_out_of_order_duration": 100000000,
336 "node": {
337 "name": "pi1"
338 },
339 "logger_node": {
340 "name": "pi1"
341 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800342 "monotonic_start_time": 1000000,
343 "realtime_start_time": 1000000000000,
344 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
345 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
346 "parts_index": 0
347})")),
348 config2_(MakeHeader(config_,
349 R"({
350 /* 100ms */
351 "max_out_of_order_duration": 100000000,
352 "node": {
353 "name": "pi2"
354 },
355 "logger_node": {
356 "name": "pi2"
357 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800358 "monotonic_start_time": 0,
359 "realtime_start_time": 1000000000000,
Austin Schuhd2f96102020-12-01 20:27:29 -0800360 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
361 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
362 "parts_index": 0
363})")),
364 config3_(MakeHeader(config_,
365 R"({
366 /* 100ms */
367 "max_out_of_order_duration": 100000000,
368 "node": {
369 "name": "pi1"
370 },
371 "logger_node": {
372 "name": "pi1"
373 },
374 "monotonic_start_time": 2000000,
375 "realtime_start_time": 1000000000,
376 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
377 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800378 "parts_index": 0
379})")) {
380 unlink(logfile0_.c_str());
381 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800382 unlink(logfile2_.c_str());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800383 queue_index_.resize(kChannels);
384 }
385
386 protected:
387 static constexpr size_t kChannels = 3u;
388
389 flatbuffers::DetachedBuffer MakeLogMessage(
390 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
391 int value) {
392 flatbuffers::FlatBufferBuilder message_fbb;
393 message_fbb.ForceDefaults(true);
394 TestMessage::Builder test_message_builder(message_fbb);
395 test_message_builder.add_value(value);
396 message_fbb.Finish(test_message_builder.Finish());
397
398 aos::Context context;
399 context.monotonic_event_time = monotonic_now;
400 context.realtime_event_time = aos::realtime_clock::epoch() +
401 chrono::seconds(1000) +
402 monotonic_now.time_since_epoch();
403 context.queue_index = queue_index_[channel_index];
404 context.size = message_fbb.GetSize();
405 context.data = message_fbb.GetBufferPointer();
406
407 ++queue_index_[channel_index];
408
409 flatbuffers::FlatBufferBuilder fbb;
410 fbb.FinishSizePrefixed(
411 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
412
413 return fbb.Release();
414 }
415
416 flatbuffers::DetachedBuffer MakeTimestampMessage(
417 const aos::monotonic_clock::time_point sender_monotonic_now,
418 int channel_index, chrono::nanoseconds receiver_monotonic_offset) {
419 aos::Context context;
420 context.monotonic_remote_time = sender_monotonic_now;
421 context.realtime_remote_time = aos::realtime_clock::epoch() +
422 chrono::seconds(1000) +
423 sender_monotonic_now.time_since_epoch();
424 context.remote_queue_index = queue_index_[channel_index] - 1;
425 context.monotonic_event_time =
426 sender_monotonic_now + receiver_monotonic_offset;
427 context.realtime_event_time =
428 aos::realtime_clock::epoch() + chrono::seconds(1000) +
429 context.monotonic_event_time.time_since_epoch();
430 context.queue_index = queue_index_[channel_index] - 1 + 100;
431 context.size = 0;
432 context.data = nullptr;
433
434 flatbuffers::FlatBufferBuilder fbb;
435 fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index,
436 LogType::kLogDeliveryTimeOnly));
437 LOG(INFO) << aos::FlatbufferToJson(
438 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
439 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
440
441 return fbb.Release();
442 }
443
444 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
445 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800446 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800447
448 const aos::FlatbufferDetachedBuffer<Configuration> config_;
449 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
450 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800451 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
452 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800453
454 std::vector<uint32_t> queue_index_;
455};
456
457using LogPartsSorterTest = SortingElementTest;
458using LogPartsSorterDeathTest = LogPartsSorterTest;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800459using NodeMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800460using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800461
462// Tests that we can pull messages out of a log sorted in order.
463TEST_F(LogPartsSorterTest, Pull) {
464 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
465 {
466 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
467 writer.QueueSpan(config0_.span());
468 writer.QueueSizedFlatbuffer(
469 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
470 writer.QueueSizedFlatbuffer(
471 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
472 writer.QueueSizedFlatbuffer(
473 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
474 writer.QueueSizedFlatbuffer(
475 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
476 }
477
478 const std::vector<LogFile> parts = SortParts({logfile0_});
479
480 LogPartsSorter parts_sorter(parts[0].parts[0]);
481
482 // Confirm we aren't sorted until any time until the message is popped.
483 // Peeking shouldn't change the sorted until time.
484 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
485
486 std::deque<Message> output;
487
488 ASSERT_TRUE(parts_sorter.Front() != nullptr);
489 output.emplace_back(std::move(*parts_sorter.Front()));
490 parts_sorter.PopFront();
491 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
492
493 ASSERT_TRUE(parts_sorter.Front() != nullptr);
494 output.emplace_back(std::move(*parts_sorter.Front()));
495 parts_sorter.PopFront();
496 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
497
498 ASSERT_TRUE(parts_sorter.Front() != nullptr);
499 output.emplace_back(std::move(*parts_sorter.Front()));
500 parts_sorter.PopFront();
501 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
502
503 ASSERT_TRUE(parts_sorter.Front() != nullptr);
504 output.emplace_back(std::move(*parts_sorter.Front()));
505 parts_sorter.PopFront();
506 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
507
508 ASSERT_TRUE(parts_sorter.Front() == nullptr);
509
510 EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
511 EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(1000));
512 EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(1901));
513 EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(2000));
514}
515
Austin Schuhb000de62020-12-03 22:00:40 -0800516// Tests that we can pull messages out of a log sorted in order.
517TEST_F(LogPartsSorterTest, WayBeforeStart) {
518 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
519 {
520 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
521 writer.QueueSpan(config0_.span());
522 writer.QueueSizedFlatbuffer(
523 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
524 writer.QueueSizedFlatbuffer(
525 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
526 writer.QueueSizedFlatbuffer(
527 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
528 writer.QueueSizedFlatbuffer(
529 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
530 writer.QueueSizedFlatbuffer(
531 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
532 }
533
534 const std::vector<LogFile> parts = SortParts({logfile0_});
535
536 LogPartsSorter parts_sorter(parts[0].parts[0]);
537
538 // Confirm we aren't sorted until any time until the message is popped.
539 // Peeking shouldn't change the sorted until time.
540 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
541
542 std::deque<Message> output;
543
544 for (monotonic_clock::time_point t :
545 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
546 e + chrono::milliseconds(1900), monotonic_clock::max_time,
547 monotonic_clock::max_time}) {
548 ASSERT_TRUE(parts_sorter.Front() != nullptr);
549 output.emplace_back(std::move(*parts_sorter.Front()));
550 parts_sorter.PopFront();
551 EXPECT_EQ(parts_sorter.sorted_until(), t);
552 }
553
554 ASSERT_TRUE(parts_sorter.Front() == nullptr);
555
556 EXPECT_EQ(output[0].timestamp, e - chrono::milliseconds(1000));
557 EXPECT_EQ(output[1].timestamp, e - chrono::milliseconds(500));
558 EXPECT_EQ(output[2].timestamp, e - chrono::milliseconds(10));
559 EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(1901));
560 EXPECT_EQ(output[4].timestamp, e + chrono::milliseconds(2000));
561}
562
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800563// Tests that messages too far out of order trigger death.
564TEST_F(LogPartsSorterDeathTest, Pull) {
565 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
566 {
567 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
568 writer.QueueSpan(config0_.span());
569 writer.QueueSizedFlatbuffer(
570 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
571 writer.QueueSizedFlatbuffer(
572 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
573 writer.QueueSizedFlatbuffer(
574 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
575 // The following message is too far out of order and will trigger the CHECK.
576 writer.QueueSizedFlatbuffer(
577 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
578 }
579
580 const std::vector<LogFile> parts = SortParts({logfile0_});
581
582 LogPartsSorter parts_sorter(parts[0].parts[0]);
583
584 // Confirm we aren't sorted until any time until the message is popped.
585 // Peeking shouldn't change the sorted until time.
586 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
587 std::deque<Message> output;
588
589 ASSERT_TRUE(parts_sorter.Front() != nullptr);
590 parts_sorter.PopFront();
591 ASSERT_TRUE(parts_sorter.Front() != nullptr);
592 ASSERT_TRUE(parts_sorter.Front() != nullptr);
593 parts_sorter.PopFront();
594
595 EXPECT_DEATH({ parts_sorter.Front(); }, "Max out of order exceeded.");
596}
597
Austin Schuh8f52ed52020-11-30 23:12:39 -0800598// Tests that we can merge data from 2 separate files, including duplicate data.
599TEST_F(NodeMergerTest, TwoFileMerger) {
600 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
601 {
602 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
603 writer0.QueueSpan(config0_.span());
604 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
605 writer1.QueueSpan(config1_.span());
606
607 writer0.QueueSizedFlatbuffer(
608 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
609 writer1.QueueSizedFlatbuffer(
610 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
611
612 writer0.QueueSizedFlatbuffer(
613 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
614 writer1.QueueSizedFlatbuffer(
615 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
616
617 // Make a duplicate!
618 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
619 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
620 writer0.QueueSpan(msg.span());
621 writer1.QueueSpan(msg.span());
622
623 writer1.QueueSizedFlatbuffer(
624 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
625 }
626
627 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Austin Schuhd2f96102020-12-01 20:27:29 -0800628 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800629
Austin Schuhd2f96102020-12-01 20:27:29 -0800630 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800631
632 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
633
634 std::deque<Message> output;
635
636 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
637 ASSERT_TRUE(merger.Front() != nullptr);
638 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
639
640 output.emplace_back(std::move(*merger.Front()));
641 merger.PopFront();
642 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
643
644 ASSERT_TRUE(merger.Front() != nullptr);
645 output.emplace_back(std::move(*merger.Front()));
646 merger.PopFront();
647 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
648
649 ASSERT_TRUE(merger.Front() != nullptr);
650 output.emplace_back(std::move(*merger.Front()));
651 merger.PopFront();
652 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
653
654 ASSERT_TRUE(merger.Front() != nullptr);
655 output.emplace_back(std::move(*merger.Front()));
656 merger.PopFront();
657 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
658
659 ASSERT_TRUE(merger.Front() != nullptr);
660 output.emplace_back(std::move(*merger.Front()));
661 merger.PopFront();
662 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
663
664 ASSERT_TRUE(merger.Front() != nullptr);
665 output.emplace_back(std::move(*merger.Front()));
666 merger.PopFront();
667 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
668
669 ASSERT_TRUE(merger.Front() == nullptr);
670
671 EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
672 EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(1001));
673 EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(1002));
674 EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(2000));
675 EXPECT_EQ(output[4].timestamp, e + chrono::milliseconds(3000));
676 EXPECT_EQ(output[5].timestamp, e + chrono::milliseconds(3002));
677}
678
Austin Schuhd2f96102020-12-01 20:27:29 -0800679// Tests that we can match timestamps on delivered messages.
680TEST_F(TimestampMapperTest, ReadNode0First) {
681 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
682 {
683 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
684 writer0.QueueSpan(config0_.span());
685 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
686 writer1.QueueSpan(config2_.span());
687
688 writer0.QueueSizedFlatbuffer(
689 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
690 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
691 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
692
693 writer0.QueueSizedFlatbuffer(
694 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
695 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
696 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
697
698 writer0.QueueSizedFlatbuffer(
699 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
700 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
701 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
702 }
703
704 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
705
706 ASSERT_EQ(parts[0].logger_node, "pi1");
707 ASSERT_EQ(parts[1].logger_node, "pi2");
708
709 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
710 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
711
712 mapper0.AddPeer(&mapper1);
713 mapper1.AddPeer(&mapper0);
714
715 {
716 std::deque<TimestampedMessage> output0;
717
718 ASSERT_TRUE(mapper0.Front() != nullptr);
719 output0.emplace_back(std::move(*mapper0.Front()));
720 mapper0.PopFront();
721 EXPECT_EQ(mapper0.sorted_until(), e + chrono::milliseconds(1900));
722
723 ASSERT_TRUE(mapper0.Front() != nullptr);
724 output0.emplace_back(std::move(*mapper0.Front()));
725 mapper0.PopFront();
726 EXPECT_EQ(mapper0.sorted_until(), e + chrono::milliseconds(2900));
727
728 ASSERT_TRUE(mapper0.Front() != nullptr);
729 output0.emplace_back(std::move(*mapper0.Front()));
730 mapper0.PopFront();
731 EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
732
733 ASSERT_TRUE(mapper0.Front() == nullptr);
734
735 EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
736 EXPECT_TRUE(output0[0].data.Verify());
737 EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
738 EXPECT_TRUE(output0[1].data.Verify());
739 EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
740 EXPECT_TRUE(output0[2].data.Verify());
741 }
742
743 {
744 SCOPED_TRACE("Trying node1 now");
745 std::deque<TimestampedMessage> output1;
746
747 ASSERT_TRUE(mapper1.Front() != nullptr);
748 output1.emplace_back(std::move(*mapper1.Front()));
749 mapper1.PopFront();
750 EXPECT_EQ(mapper1.sorted_until(),
751 e + chrono::seconds(100) + chrono::milliseconds(1900));
752
753 ASSERT_TRUE(mapper1.Front() != nullptr);
754 output1.emplace_back(std::move(*mapper1.Front()));
755 mapper1.PopFront();
756 EXPECT_EQ(mapper1.sorted_until(),
757 e + chrono::seconds(100) + chrono::milliseconds(2900));
758
759 ASSERT_TRUE(mapper1.Front() != nullptr);
760 output1.emplace_back(std::move(*mapper1.Front()));
761 mapper1.PopFront();
762 EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
763
764 ASSERT_TRUE(mapper1.Front() == nullptr);
765
766 EXPECT_EQ(output1[0].monotonic_event_time,
767 e + chrono::seconds(100) + chrono::milliseconds(1000));
768 EXPECT_TRUE(output1[0].data.Verify());
769 EXPECT_EQ(output1[1].monotonic_event_time,
770 e + chrono::seconds(100) + chrono::milliseconds(2000));
771 EXPECT_TRUE(output1[1].data.Verify());
772 EXPECT_EQ(output1[2].monotonic_event_time,
773 e + chrono::seconds(100) + chrono::milliseconds(3000));
774 EXPECT_TRUE(output1[2].data.Verify());
775 }
776}
777
778// Tests that we can match timestamps on delivered messages. By doing this in
779// the reverse order, the second node needs to queue data up from the first node
780// to find the matching timestamp.
781TEST_F(TimestampMapperTest, ReadNode1First) {
782 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
783 {
784 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
785 writer0.QueueSpan(config0_.span());
786 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
787 writer1.QueueSpan(config2_.span());
788
789 writer0.QueueSizedFlatbuffer(
790 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
791 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
792 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
793
794 writer0.QueueSizedFlatbuffer(
795 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
796 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
797 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
798
799 writer0.QueueSizedFlatbuffer(
800 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
801 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
802 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
803 }
804
805 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
806
807 ASSERT_EQ(parts[0].logger_node, "pi1");
808 ASSERT_EQ(parts[1].logger_node, "pi2");
809
810 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
811 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
812
813 mapper0.AddPeer(&mapper1);
814 mapper1.AddPeer(&mapper0);
815
816 {
817 SCOPED_TRACE("Trying node1 now");
818 std::deque<TimestampedMessage> output1;
819
820 ASSERT_TRUE(mapper1.Front() != nullptr);
821 output1.emplace_back(std::move(*mapper1.Front()));
822 mapper1.PopFront();
823 EXPECT_EQ(mapper1.sorted_until(),
824 e + chrono::seconds(100) + chrono::milliseconds(1900));
825
826 ASSERT_TRUE(mapper1.Front() != nullptr);
827 output1.emplace_back(std::move(*mapper1.Front()));
828 mapper1.PopFront();
829 EXPECT_EQ(mapper1.sorted_until(),
830 e + chrono::seconds(100) + chrono::milliseconds(2900));
831
832 ASSERT_TRUE(mapper1.Front() != nullptr);
833 output1.emplace_back(std::move(*mapper1.Front()));
834 mapper1.PopFront();
835 EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
836
837 ASSERT_TRUE(mapper1.Front() == nullptr);
838
839 EXPECT_EQ(output1[0].monotonic_event_time,
840 e + chrono::seconds(100) + chrono::milliseconds(1000));
841 EXPECT_TRUE(output1[0].data.Verify());
842 EXPECT_EQ(output1[1].monotonic_event_time,
843 e + chrono::seconds(100) + chrono::milliseconds(2000));
844 EXPECT_TRUE(output1[1].data.Verify());
845 EXPECT_EQ(output1[2].monotonic_event_time,
846 e + chrono::seconds(100) + chrono::milliseconds(3000));
847 EXPECT_TRUE(output1[2].data.Verify());
848 }
849
850 {
851 std::deque<TimestampedMessage> output0;
852
853 ASSERT_TRUE(mapper0.Front() != nullptr);
854 output0.emplace_back(std::move(*mapper0.Front()));
855 mapper0.PopFront();
856 EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
857
858 ASSERT_TRUE(mapper0.Front() != nullptr);
859 output0.emplace_back(std::move(*mapper0.Front()));
860 mapper0.PopFront();
861 EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
862
863 ASSERT_TRUE(mapper0.Front() != nullptr);
864 output0.emplace_back(std::move(*mapper0.Front()));
865 mapper0.PopFront();
866 EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
867
868 ASSERT_TRUE(mapper0.Front() == nullptr);
869
870 EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
871 EXPECT_TRUE(output0[0].data.Verify());
872 EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
873 EXPECT_TRUE(output0[1].data.Verify());
874 EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
875 EXPECT_TRUE(output0[2].data.Verify());
876 }
877}
878
879// Tests that we return just the timestamps if we couldn't find the data and the
880// missing data was at the beginning of the file.
881TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
882 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
883 {
884 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
885 writer0.QueueSpan(config0_.span());
886 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
887 writer1.QueueSpan(config2_.span());
888
889 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
890 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
891 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
892
893 writer0.QueueSizedFlatbuffer(
894 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
895 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
896 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
897
898 writer0.QueueSizedFlatbuffer(
899 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
900 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
901 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
902 }
903
904 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
905
906 ASSERT_EQ(parts[0].logger_node, "pi1");
907 ASSERT_EQ(parts[1].logger_node, "pi2");
908
909 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
910 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
911
912 mapper0.AddPeer(&mapper1);
913 mapper1.AddPeer(&mapper0);
914
915 {
916 SCOPED_TRACE("Trying node1 now");
917 std::deque<TimestampedMessage> output1;
918
919 ASSERT_TRUE(mapper1.Front() != nullptr);
920 output1.emplace_back(std::move(*mapper1.Front()));
921 mapper1.PopFront();
922 EXPECT_EQ(mapper1.sorted_until(),
923 e + chrono::seconds(100) + chrono::milliseconds(1900));
924
925 ASSERT_TRUE(mapper1.Front() != nullptr);
926 output1.emplace_back(std::move(*mapper1.Front()));
927 mapper1.PopFront();
928 EXPECT_EQ(mapper1.sorted_until(),
929 e + chrono::seconds(100) + chrono::milliseconds(2900));
930
931 ASSERT_TRUE(mapper1.Front() != nullptr);
932 output1.emplace_back(std::move(*mapper1.Front()));
933 mapper1.PopFront();
934 EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
935
936 ASSERT_TRUE(mapper1.Front() == nullptr);
937
938 EXPECT_EQ(output1[0].monotonic_event_time,
939 e + chrono::seconds(100) + chrono::milliseconds(1000));
940 EXPECT_FALSE(output1[0].data.Verify());
941 EXPECT_EQ(output1[1].monotonic_event_time,
942 e + chrono::seconds(100) + chrono::milliseconds(2000));
943 EXPECT_TRUE(output1[1].data.Verify());
944 EXPECT_EQ(output1[2].monotonic_event_time,
945 e + chrono::seconds(100) + chrono::milliseconds(3000));
946 EXPECT_TRUE(output1[2].data.Verify());
947 }
948}
949
950// Tests that we return just the timestamps if we couldn't find the data and the
951// missing data was at the end of the file.
952TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
953 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
954 {
955 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
956 writer0.QueueSpan(config0_.span());
957 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
958 writer1.QueueSpan(config2_.span());
959
960 writer0.QueueSizedFlatbuffer(
961 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
962 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
963 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
964
965 writer0.QueueSizedFlatbuffer(
966 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
967 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
968 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
969
970 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
971 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
972 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
973 }
974
975 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
976
977 ASSERT_EQ(parts[0].logger_node, "pi1");
978 ASSERT_EQ(parts[1].logger_node, "pi2");
979
980 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
981 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
982
983 mapper0.AddPeer(&mapper1);
984 mapper1.AddPeer(&mapper0);
985
986 {
987 SCOPED_TRACE("Trying node1 now");
988 std::deque<TimestampedMessage> output1;
989
990 ASSERT_TRUE(mapper1.Front() != nullptr);
991 output1.emplace_back(std::move(*mapper1.Front()));
992 mapper1.PopFront();
993 EXPECT_EQ(mapper1.sorted_until(),
994 e + chrono::seconds(100) + chrono::milliseconds(1900));
995
996 ASSERT_TRUE(mapper1.Front() != nullptr);
997 output1.emplace_back(std::move(*mapper1.Front()));
998 mapper1.PopFront();
999 EXPECT_EQ(mapper1.sorted_until(),
1000 e + chrono::seconds(100) + chrono::milliseconds(2900));
1001
1002 ASSERT_TRUE(mapper1.Front() != nullptr);
1003 output1.emplace_back(std::move(*mapper1.Front()));
1004 mapper1.PopFront();
1005 EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
1006
1007 ASSERT_TRUE(mapper1.Front() == nullptr);
1008
1009 EXPECT_EQ(output1[0].monotonic_event_time,
1010 e + chrono::seconds(100) + chrono::milliseconds(1000));
1011 EXPECT_TRUE(output1[0].data.Verify());
1012 EXPECT_EQ(output1[1].monotonic_event_time,
1013 e + chrono::seconds(100) + chrono::milliseconds(2000));
1014 EXPECT_TRUE(output1[1].data.Verify());
1015 EXPECT_EQ(output1[2].monotonic_event_time,
1016 e + chrono::seconds(100) + chrono::milliseconds(3000));
1017 EXPECT_FALSE(output1[2].data.Verify());
1018 }
1019}
1020
Austin Schuh993ccb52020-12-12 15:59:32 -08001021// Tests that we handle a message which failed to forward or be logged.
1022TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1023 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1024 {
1025 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1026 writer0.QueueSpan(config0_.span());
1027 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1028 writer1.QueueSpan(config2_.span());
1029
1030 writer0.QueueSizedFlatbuffer(
1031 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1032 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1033 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1034
1035 // Create both the timestamp and message, but don't log them, simulating a
1036 // forwarding drop.
1037 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1038 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1039 chrono::seconds(100));
1040
1041 writer0.QueueSizedFlatbuffer(
1042 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1043 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1044 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1045 }
1046
1047 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1048
1049 ASSERT_EQ(parts[0].logger_node, "pi1");
1050 ASSERT_EQ(parts[1].logger_node, "pi2");
1051
1052 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1053 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1054
1055 mapper0.AddPeer(&mapper1);
1056 mapper1.AddPeer(&mapper0);
1057
1058 {
1059 std::deque<TimestampedMessage> output1;
1060
1061 ASSERT_TRUE(mapper1.Front() != nullptr);
1062 output1.emplace_back(std::move(*mapper1.Front()));
1063 mapper1.PopFront();
1064
1065 ASSERT_TRUE(mapper1.Front() != nullptr);
1066 output1.emplace_back(std::move(*mapper1.Front()));
1067
1068 ASSERT_FALSE(mapper1.Front() == nullptr);
1069
1070 EXPECT_EQ(output1[0].monotonic_event_time,
1071 e + chrono::seconds(100) + chrono::milliseconds(1000));
1072 EXPECT_TRUE(output1[0].data.Verify());
1073 EXPECT_EQ(output1[1].monotonic_event_time,
1074 e + chrono::seconds(100) + chrono::milliseconds(3000));
1075 EXPECT_TRUE(output1[1].data.Verify());
1076 }
1077}
1078
Austin Schuhd2f96102020-12-01 20:27:29 -08001079// Tests that we properly sort log files with duplicate timestamps.
1080TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1081 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1082 {
1083 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1084 writer0.QueueSpan(config0_.span());
1085 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1086 writer1.QueueSpan(config2_.span());
1087
1088 writer0.QueueSizedFlatbuffer(
1089 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1090 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1091 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1092
1093 writer0.QueueSizedFlatbuffer(
1094 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1095 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1096 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1097
1098 writer0.QueueSizedFlatbuffer(
1099 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
1100 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1101 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1102
1103 writer0.QueueSizedFlatbuffer(
1104 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
1105 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1106 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1107 }
1108
1109 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1110
1111 ASSERT_EQ(parts[0].logger_node, "pi1");
1112 ASSERT_EQ(parts[1].logger_node, "pi2");
1113
1114 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1115 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1116
1117 mapper0.AddPeer(&mapper1);
1118 mapper1.AddPeer(&mapper0);
1119
1120 {
1121 SCOPED_TRACE("Trying node1 now");
1122 std::deque<TimestampedMessage> output1;
1123
1124 for (int i = 0; i < 4; ++i) {
1125 ASSERT_TRUE(mapper1.Front() != nullptr);
1126 output1.emplace_back(std::move(*mapper1.Front()));
1127 mapper1.PopFront();
1128 }
1129 ASSERT_TRUE(mapper1.Front() == nullptr);
1130
1131 EXPECT_EQ(output1[0].monotonic_event_time,
1132 e + chrono::seconds(100) + chrono::milliseconds(1000));
1133 EXPECT_TRUE(output1[0].data.Verify());
1134 EXPECT_EQ(output1[1].monotonic_event_time,
1135 e + chrono::seconds(100) + chrono::milliseconds(2000));
1136 EXPECT_TRUE(output1[1].data.Verify());
1137 EXPECT_EQ(output1[2].monotonic_event_time,
1138 e + chrono::seconds(100) + chrono::milliseconds(2000));
1139 EXPECT_TRUE(output1[2].data.Verify());
1140 EXPECT_EQ(output1[3].monotonic_event_time,
1141 e + chrono::seconds(100) + chrono::milliseconds(3000));
1142 EXPECT_TRUE(output1[3].data.Verify());
1143 }
1144}
1145
1146// Tests that we properly sort log files with duplicate timestamps.
1147TEST_F(TimestampMapperTest, StartTime) {
1148 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1149 {
1150 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1151 writer0.QueueSpan(config0_.span());
1152 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1153 writer1.QueueSpan(config1_.span());
1154 DetachedBufferWriter writer2(logfile2_, std::make_unique<DummyEncoder>());
1155 writer2.QueueSpan(config3_.span());
1156 }
1157
1158 const std::vector<LogFile> parts =
1159 SortParts({logfile0_, logfile1_, logfile2_});
1160
1161 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1162
1163 EXPECT_EQ(mapper0.monotonic_start_time(), e + chrono::milliseconds(1));
1164 EXPECT_EQ(mapper0.realtime_start_time(),
1165 realtime_clock::time_point(chrono::seconds(1000)));
1166}
1167
Austin Schuhfecf1d82020-12-19 16:57:28 -08001168// Tests that when a peer isn't registered, we treat that as if there was no
1169// data available.
1170TEST_F(TimestampMapperTest, NoPeer) {
1171 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1172 {
1173 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1174 writer0.QueueSpan(config0_.span());
1175 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1176 writer1.QueueSpan(config2_.span());
1177
1178 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
1179 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1180 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1181
1182 writer0.QueueSizedFlatbuffer(
1183 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1184 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1185 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1186
1187 writer0.QueueSizedFlatbuffer(
1188 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1189 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1190 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1191 }
1192
1193 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1194
1195 ASSERT_EQ(parts[0].logger_node, "pi1");
1196 ASSERT_EQ(parts[1].logger_node, "pi2");
1197
1198 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1199
1200 {
1201 std::deque<TimestampedMessage> output1;
1202
1203 ASSERT_TRUE(mapper1.Front() != nullptr);
1204 output1.emplace_back(std::move(*mapper1.Front()));
1205 mapper1.PopFront();
1206 ASSERT_TRUE(mapper1.Front() != nullptr);
1207 output1.emplace_back(std::move(*mapper1.Front()));
1208 mapper1.PopFront();
1209 ASSERT_TRUE(mapper1.Front() != nullptr);
1210 output1.emplace_back(std::move(*mapper1.Front()));
1211 mapper1.PopFront();
1212 ASSERT_TRUE(mapper1.Front() == nullptr);
1213
1214 EXPECT_EQ(output1[0].monotonic_event_time,
1215 e + chrono::seconds(100) + chrono::milliseconds(1000));
1216 EXPECT_FALSE(output1[0].data.Verify());
1217 EXPECT_EQ(output1[1].monotonic_event_time,
1218 e + chrono::seconds(100) + chrono::milliseconds(2000));
1219 EXPECT_FALSE(output1[1].data.Verify());
1220 EXPECT_EQ(output1[2].monotonic_event_time,
1221 e + chrono::seconds(100) + chrono::milliseconds(3000));
1222 EXPECT_FALSE(output1[2].data.Verify());
1223 }
1224}
1225
Austin Schuhc243b422020-10-11 15:35:08 -07001226} // namespace testing
1227} // namespace logger
1228} // namespace aos