blob: 9029ce1cddc35f6dcf295818673239401fbef343 [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
4#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07005
Austin Schuhc41603c2020-10-11 16:17:37 -07006#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -07007#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -08008#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -07009#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070010#include "aos/json_to_flatbuffer.h"
11#include "aos/testing/tmpdir.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070012#include "gtest/gtest.h"
Austin Schuhc243b422020-10-11 15:35:08 -070013
14namespace aos {
15namespace logger {
16namespace testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070017namespace chrono = std::chrono;
Austin Schuhc243b422020-10-11 15:35:08 -070018
Austin Schuhe243aaf2020-10-11 15:46:02 -070019// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070020template <typename T>
21SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
22 const std::string_view data) {
23 flatbuffers::FlatBufferBuilder fbb;
24 fbb.ForceDefaults(true);
25 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
26 return fbb.Release();
27}
28
Austin Schuhe243aaf2020-10-11 15:46:02 -070029// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070030TEST(SpanReaderTest, ReadWrite) {
31 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
32 unlink(logfile.c_str());
33
34 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080035 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070036 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080037 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070038
39 {
40 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080041 writer.QueueSpan(m1.span());
42 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070043 }
44
45 SpanReader reader(logfile);
46
47 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080048 EXPECT_EQ(reader.ReadMessage(), m1.span());
49 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070050 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
51}
52
Austin Schuhe243aaf2020-10-11 15:46:02 -070053// Tests that we can actually parse the resulting messages at a basic level
54// through MessageReader.
55TEST(MessageReaderTest, ReadWrite) {
56 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
57 unlink(logfile.c_str());
58
59 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
60 JsonToSizedFlatbuffer<LogFileHeader>(
61 R"({ "max_out_of_order_duration": 100000000 })");
62 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
63 JsonToSizedFlatbuffer<MessageHeader>(
64 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
65 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
66 JsonToSizedFlatbuffer<MessageHeader>(
67 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
68
69 {
70 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080071 writer.QueueSpan(config.span());
72 writer.QueueSpan(m1.span());
73 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -070074 }
75
76 MessageReader reader(logfile);
77
78 EXPECT_EQ(reader.filename(), logfile);
79
80 EXPECT_EQ(
81 reader.max_out_of_order_duration(),
82 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
83 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
84 EXPECT_TRUE(reader.ReadMessage());
85 EXPECT_EQ(reader.newest_timestamp(),
86 monotonic_clock::time_point(chrono::nanoseconds(1)));
87 EXPECT_TRUE(reader.ReadMessage());
88 EXPECT_EQ(reader.newest_timestamp(),
89 monotonic_clock::time_point(chrono::nanoseconds(2)));
90 EXPECT_FALSE(reader.ReadMessage());
91}
92
Austin Schuh32f68492020-11-08 21:45:51 -080093// Tests that we explode when messages are too far out of order.
94TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
95 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
96 unlink(logfile0.c_str());
97
98 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
99 JsonToSizedFlatbuffer<LogFileHeader>(
100 R"({
101 "max_out_of_order_duration": 100000000,
102 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
103 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
104 "parts_index": 0
105})");
106
107 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
108 JsonToSizedFlatbuffer<MessageHeader>(
109 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
110 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
111 JsonToSizedFlatbuffer<MessageHeader>(
112 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
113 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
114 JsonToSizedFlatbuffer<MessageHeader>(
115 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
116
117 {
118 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800119 writer.QueueSpan(config0.span());
120 writer.QueueSpan(m1.span());
121 writer.QueueSpan(m2.span());
122 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800123 }
124
125 const std::vector<LogFile> parts = SortParts({logfile0});
126
127 PartsMessageReader reader(parts[0].parts[0]);
128
129 EXPECT_TRUE(reader.ReadMessage());
130 EXPECT_TRUE(reader.ReadMessage());
131 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
132}
133
Austin Schuhc41603c2020-10-11 16:17:37 -0700134// Tests that we can transparently re-assemble part files with a
135// PartsMessageReader.
136TEST(PartsMessageReaderTest, ReadWrite) {
137 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
138 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
139 unlink(logfile0.c_str());
140 unlink(logfile1.c_str());
141
142 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
143 JsonToSizedFlatbuffer<LogFileHeader>(
144 R"({
145 "max_out_of_order_duration": 100000000,
146 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
147 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
148 "parts_index": 0
149})");
150 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
151 JsonToSizedFlatbuffer<LogFileHeader>(
152 R"({
153 "max_out_of_order_duration": 200000000,
154 "monotonic_start_time": 0,
155 "realtime_start_time": 0,
156 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
157 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
158 "parts_index": 1
159})");
160
161 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
162 JsonToSizedFlatbuffer<MessageHeader>(
163 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
164 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
165 JsonToSizedFlatbuffer<MessageHeader>(
166 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
167
168 {
169 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800170 writer.QueueSpan(config0.span());
171 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700172 }
173 {
174 DetachedBufferWriter writer(logfile1, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800175 writer.QueueSpan(config1.span());
176 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700177 }
178
179 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
180
181 PartsMessageReader reader(parts[0].parts[0]);
182
183 EXPECT_EQ(reader.filename(), logfile0);
184
185 // Confirm that the timestamps track, and the filename also updates.
186 // Read the first message.
187 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
188 EXPECT_EQ(
189 reader.max_out_of_order_duration(),
190 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
191 EXPECT_TRUE(reader.ReadMessage());
192 EXPECT_EQ(reader.filename(), logfile0);
193 EXPECT_EQ(reader.newest_timestamp(),
194 monotonic_clock::time_point(chrono::nanoseconds(1)));
195 EXPECT_EQ(
196 reader.max_out_of_order_duration(),
197 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
198
199 // Read the second message.
200 EXPECT_TRUE(reader.ReadMessage());
201 EXPECT_EQ(reader.filename(), logfile1);
202 EXPECT_EQ(reader.newest_timestamp(),
203 monotonic_clock::time_point(chrono::nanoseconds(2)));
204 EXPECT_EQ(
205 reader.max_out_of_order_duration(),
206 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
207
208 // And then confirm that reading again returns no message.
209 EXPECT_FALSE(reader.ReadMessage());
210 EXPECT_EQ(reader.filename(), logfile1);
211 EXPECT_EQ(
212 reader.max_out_of_order_duration(),
213 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800214 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Austin Schuhc41603c2020-10-11 16:17:37 -0700215}
Austin Schuh32f68492020-11-08 21:45:51 -0800216
Austin Schuh1be0ce42020-11-29 22:43:26 -0800217// Tests that Message's operator < works as expected.
218TEST(MessageTest, Sorting) {
219 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
220
221 Message m1{.channel_index = 0,
222 .queue_index = 0,
223 .timestamp = e + chrono::milliseconds(1),
224 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
225 Message m2{.channel_index = 0,
226 .queue_index = 0,
227 .timestamp = e + chrono::milliseconds(2),
228 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
229
230 EXPECT_LT(m1, m2);
231 EXPECT_GE(m2, m1);
232
233 m1.timestamp = e;
234 m2.timestamp = e;
235
236 m1.channel_index = 1;
237 m2.channel_index = 2;
238
239 EXPECT_LT(m1, m2);
240 EXPECT_GE(m2, m1);
241
242 m1.channel_index = 0;
243 m2.channel_index = 0;
244 m1.queue_index = 0;
245 m2.queue_index = 1;
246
247 EXPECT_LT(m1, m2);
248 EXPECT_GE(m2, m1);
249}
250
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800251aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
252 const aos::FlatbufferDetachedBuffer<Configuration> &config,
253 const std::string_view json) {
254 flatbuffers::FlatBufferBuilder fbb;
255 flatbuffers::Offset<Configuration> config_offset =
256 aos::CopyFlatBuffer(config, &fbb);
257 LogFileHeader::Builder header_builder(fbb);
258 header_builder.add_configuration(config_offset);
259 fbb.Finish(header_builder.Finish());
260 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
261
262 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
263 JsonToFlatbuffer<LogFileHeader>(json));
264 CHECK(header_updates.Verify());
265 flatbuffers::FlatBufferBuilder fbb2;
266 fbb2.FinishSizePrefixed(
267 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
268 return fbb2.Release();
269}
270
271class SortingElementTest : public ::testing::Test {
272 public:
273 SortingElementTest()
274 : config_(JsonToFlatbuffer<Configuration>(
275 R"({
276 "channels": [
277 {
278 "name": "/a",
279 "type": "aos.logger.testing.TestMessage",
280 "source_node": "pi1",
281 "destination_nodes": [
282 {
283 "name": "pi2"
284 },
285 {
286 "name": "pi3"
287 }
288 ]
289 },
290 {
291 "name": "/b",
292 "type": "aos.logger.testing.TestMessage",
293 "source_node": "pi1"
294 },
295 {
296 "name": "/c",
297 "type": "aos.logger.testing.TestMessage",
298 "source_node": "pi1"
299 }
300 ],
301 "nodes": [
302 {
303 "name": "pi1"
304 },
305 {
306 "name": "pi2"
307 },
308 {
309 "name": "pi3"
310 }
311 ]
312}
313)")),
314 config0_(MakeHeader(config_, R"({
315 /* 100ms */
316 "max_out_of_order_duration": 100000000,
317 "node": {
318 "name": "pi1"
319 },
320 "logger_node": {
321 "name": "pi1"
322 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800323 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800324 "realtime_start_time": 1000000000000,
325 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
326 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
327 "parts_index": 0
328})")),
329 config1_(MakeHeader(config_,
330 R"({
331 /* 100ms */
332 "max_out_of_order_duration": 100000000,
333 "node": {
334 "name": "pi1"
335 },
336 "logger_node": {
337 "name": "pi1"
338 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800339 "monotonic_start_time": 1000000,
340 "realtime_start_time": 1000000000000,
341 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
342 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
343 "parts_index": 0
344})")),
345 config2_(MakeHeader(config_,
346 R"({
347 /* 100ms */
348 "max_out_of_order_duration": 100000000,
349 "node": {
350 "name": "pi2"
351 },
352 "logger_node": {
353 "name": "pi2"
354 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800355 "monotonic_start_time": 0,
356 "realtime_start_time": 1000000000000,
Austin Schuhd2f96102020-12-01 20:27:29 -0800357 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
358 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
359 "parts_index": 0
360})")),
361 config3_(MakeHeader(config_,
362 R"({
363 /* 100ms */
364 "max_out_of_order_duration": 100000000,
365 "node": {
366 "name": "pi1"
367 },
368 "logger_node": {
369 "name": "pi1"
370 },
371 "monotonic_start_time": 2000000,
372 "realtime_start_time": 1000000000,
373 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
374 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800375 "parts_index": 0
376})")) {
377 unlink(logfile0_.c_str());
378 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800379 unlink(logfile2_.c_str());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800380 queue_index_.resize(kChannels);
381 }
382
383 protected:
384 static constexpr size_t kChannels = 3u;
385
386 flatbuffers::DetachedBuffer MakeLogMessage(
387 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
388 int value) {
389 flatbuffers::FlatBufferBuilder message_fbb;
390 message_fbb.ForceDefaults(true);
391 TestMessage::Builder test_message_builder(message_fbb);
392 test_message_builder.add_value(value);
393 message_fbb.Finish(test_message_builder.Finish());
394
395 aos::Context context;
396 context.monotonic_event_time = monotonic_now;
397 context.realtime_event_time = aos::realtime_clock::epoch() +
398 chrono::seconds(1000) +
399 monotonic_now.time_since_epoch();
400 context.queue_index = queue_index_[channel_index];
401 context.size = message_fbb.GetSize();
402 context.data = message_fbb.GetBufferPointer();
403
404 ++queue_index_[channel_index];
405
406 flatbuffers::FlatBufferBuilder fbb;
407 fbb.FinishSizePrefixed(
408 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
409
410 return fbb.Release();
411 }
412
413 flatbuffers::DetachedBuffer MakeTimestampMessage(
414 const aos::monotonic_clock::time_point sender_monotonic_now,
415 int channel_index, chrono::nanoseconds receiver_monotonic_offset) {
416 aos::Context context;
417 context.monotonic_remote_time = sender_monotonic_now;
418 context.realtime_remote_time = aos::realtime_clock::epoch() +
419 chrono::seconds(1000) +
420 sender_monotonic_now.time_since_epoch();
421 context.remote_queue_index = queue_index_[channel_index] - 1;
422 context.monotonic_event_time =
423 sender_monotonic_now + receiver_monotonic_offset;
424 context.realtime_event_time =
425 aos::realtime_clock::epoch() + chrono::seconds(1000) +
426 context.monotonic_event_time.time_since_epoch();
427 context.queue_index = queue_index_[channel_index] - 1 + 100;
428 context.size = 0;
429 context.data = nullptr;
430
431 flatbuffers::FlatBufferBuilder fbb;
432 fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index,
433 LogType::kLogDeliveryTimeOnly));
434 LOG(INFO) << aos::FlatbufferToJson(
435 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
436 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
437
438 return fbb.Release();
439 }
440
441 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
442 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800443 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800444
445 const aos::FlatbufferDetachedBuffer<Configuration> config_;
446 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
447 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800448 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
449 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800450
451 std::vector<uint32_t> queue_index_;
452};
453
454using LogPartsSorterTest = SortingElementTest;
455using LogPartsSorterDeathTest = LogPartsSorterTest;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800456using NodeMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800457using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800458
459// Tests that we can pull messages out of a log sorted in order.
460TEST_F(LogPartsSorterTest, Pull) {
461 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
462 {
463 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
464 writer.QueueSpan(config0_.span());
465 writer.QueueSizedFlatbuffer(
466 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
467 writer.QueueSizedFlatbuffer(
468 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
469 writer.QueueSizedFlatbuffer(
470 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
471 writer.QueueSizedFlatbuffer(
472 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
473 }
474
475 const std::vector<LogFile> parts = SortParts({logfile0_});
476
477 LogPartsSorter parts_sorter(parts[0].parts[0]);
478
479 // Confirm we aren't sorted until any time until the message is popped.
480 // Peeking shouldn't change the sorted until time.
481 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
482
483 std::deque<Message> output;
484
485 ASSERT_TRUE(parts_sorter.Front() != nullptr);
486 output.emplace_back(std::move(*parts_sorter.Front()));
487 parts_sorter.PopFront();
488 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
489
490 ASSERT_TRUE(parts_sorter.Front() != nullptr);
491 output.emplace_back(std::move(*parts_sorter.Front()));
492 parts_sorter.PopFront();
493 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
494
495 ASSERT_TRUE(parts_sorter.Front() != nullptr);
496 output.emplace_back(std::move(*parts_sorter.Front()));
497 parts_sorter.PopFront();
498 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
499
500 ASSERT_TRUE(parts_sorter.Front() != nullptr);
501 output.emplace_back(std::move(*parts_sorter.Front()));
502 parts_sorter.PopFront();
503 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
504
505 ASSERT_TRUE(parts_sorter.Front() == nullptr);
506
507 EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
508 EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(1000));
509 EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(1901));
510 EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(2000));
511}
512
Austin Schuhb000de62020-12-03 22:00:40 -0800513// Tests that we can pull messages out of a log sorted in order.
514TEST_F(LogPartsSorterTest, WayBeforeStart) {
515 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
516 {
517 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
518 writer.QueueSpan(config0_.span());
519 writer.QueueSizedFlatbuffer(
520 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
521 writer.QueueSizedFlatbuffer(
522 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
523 writer.QueueSizedFlatbuffer(
524 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
525 writer.QueueSizedFlatbuffer(
526 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
527 writer.QueueSizedFlatbuffer(
528 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
529 }
530
531 const std::vector<LogFile> parts = SortParts({logfile0_});
532
533 LogPartsSorter parts_sorter(parts[0].parts[0]);
534
535 // Confirm we aren't sorted until any time until the message is popped.
536 // Peeking shouldn't change the sorted until time.
537 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
538
539 std::deque<Message> output;
540
541 for (monotonic_clock::time_point t :
542 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
543 e + chrono::milliseconds(1900), monotonic_clock::max_time,
544 monotonic_clock::max_time}) {
545 ASSERT_TRUE(parts_sorter.Front() != nullptr);
546 output.emplace_back(std::move(*parts_sorter.Front()));
547 parts_sorter.PopFront();
548 EXPECT_EQ(parts_sorter.sorted_until(), t);
549 }
550
551 ASSERT_TRUE(parts_sorter.Front() == nullptr);
552
553 EXPECT_EQ(output[0].timestamp, e - chrono::milliseconds(1000));
554 EXPECT_EQ(output[1].timestamp, e - chrono::milliseconds(500));
555 EXPECT_EQ(output[2].timestamp, e - chrono::milliseconds(10));
556 EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(1901));
557 EXPECT_EQ(output[4].timestamp, e + chrono::milliseconds(2000));
558}
559
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800560// Tests that messages too far out of order trigger death.
561TEST_F(LogPartsSorterDeathTest, Pull) {
562 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
563 {
564 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
565 writer.QueueSpan(config0_.span());
566 writer.QueueSizedFlatbuffer(
567 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
568 writer.QueueSizedFlatbuffer(
569 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
570 writer.QueueSizedFlatbuffer(
571 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
572 // The following message is too far out of order and will trigger the CHECK.
573 writer.QueueSizedFlatbuffer(
574 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
575 }
576
577 const std::vector<LogFile> parts = SortParts({logfile0_});
578
579 LogPartsSorter parts_sorter(parts[0].parts[0]);
580
581 // Confirm we aren't sorted until any time until the message is popped.
582 // Peeking shouldn't change the sorted until time.
583 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
584 std::deque<Message> output;
585
586 ASSERT_TRUE(parts_sorter.Front() != nullptr);
587 parts_sorter.PopFront();
588 ASSERT_TRUE(parts_sorter.Front() != nullptr);
589 ASSERT_TRUE(parts_sorter.Front() != nullptr);
590 parts_sorter.PopFront();
591
592 EXPECT_DEATH({ parts_sorter.Front(); }, "Max out of order exceeded.");
593}
594
Austin Schuh8f52ed52020-11-30 23:12:39 -0800595// Tests that we can merge data from 2 separate files, including duplicate data.
596TEST_F(NodeMergerTest, TwoFileMerger) {
597 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
598 {
599 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
600 writer0.QueueSpan(config0_.span());
601 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
602 writer1.QueueSpan(config1_.span());
603
604 writer0.QueueSizedFlatbuffer(
605 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
606 writer1.QueueSizedFlatbuffer(
607 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
608
609 writer0.QueueSizedFlatbuffer(
610 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
611 writer1.QueueSizedFlatbuffer(
612 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
613
614 // Make a duplicate!
615 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
616 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
617 writer0.QueueSpan(msg.span());
618 writer1.QueueSpan(msg.span());
619
620 writer1.QueueSizedFlatbuffer(
621 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
622 }
623
624 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Austin Schuhd2f96102020-12-01 20:27:29 -0800625 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800626
Austin Schuhd2f96102020-12-01 20:27:29 -0800627 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800628
629 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
630
631 std::deque<Message> output;
632
633 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
634 ASSERT_TRUE(merger.Front() != nullptr);
635 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
636
637 output.emplace_back(std::move(*merger.Front()));
638 merger.PopFront();
639 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
640
641 ASSERT_TRUE(merger.Front() != nullptr);
642 output.emplace_back(std::move(*merger.Front()));
643 merger.PopFront();
644 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
645
646 ASSERT_TRUE(merger.Front() != nullptr);
647 output.emplace_back(std::move(*merger.Front()));
648 merger.PopFront();
649 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
650
651 ASSERT_TRUE(merger.Front() != nullptr);
652 output.emplace_back(std::move(*merger.Front()));
653 merger.PopFront();
654 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
655
656 ASSERT_TRUE(merger.Front() != nullptr);
657 output.emplace_back(std::move(*merger.Front()));
658 merger.PopFront();
659 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
660
661 ASSERT_TRUE(merger.Front() != nullptr);
662 output.emplace_back(std::move(*merger.Front()));
663 merger.PopFront();
664 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
665
666 ASSERT_TRUE(merger.Front() == nullptr);
667
668 EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
669 EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(1001));
670 EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(1002));
671 EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(2000));
672 EXPECT_EQ(output[4].timestamp, e + chrono::milliseconds(3000));
673 EXPECT_EQ(output[5].timestamp, e + chrono::milliseconds(3002));
674}
675
Austin Schuhd2f96102020-12-01 20:27:29 -0800676// Tests that we can match timestamps on delivered messages.
677TEST_F(TimestampMapperTest, ReadNode0First) {
678 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
679 {
680 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
681 writer0.QueueSpan(config0_.span());
682 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
683 writer1.QueueSpan(config2_.span());
684
685 writer0.QueueSizedFlatbuffer(
686 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
687 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
688 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
689
690 writer0.QueueSizedFlatbuffer(
691 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
692 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
693 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
694
695 writer0.QueueSizedFlatbuffer(
696 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
697 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
698 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
699 }
700
701 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
702
703 ASSERT_EQ(parts[0].logger_node, "pi1");
704 ASSERT_EQ(parts[1].logger_node, "pi2");
705
706 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
707 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
708
709 mapper0.AddPeer(&mapper1);
710 mapper1.AddPeer(&mapper0);
711
712 {
713 std::deque<TimestampedMessage> output0;
714
715 ASSERT_TRUE(mapper0.Front() != nullptr);
716 output0.emplace_back(std::move(*mapper0.Front()));
717 mapper0.PopFront();
718 EXPECT_EQ(mapper0.sorted_until(), e + chrono::milliseconds(1900));
719
720 ASSERT_TRUE(mapper0.Front() != nullptr);
721 output0.emplace_back(std::move(*mapper0.Front()));
722 mapper0.PopFront();
723 EXPECT_EQ(mapper0.sorted_until(), e + chrono::milliseconds(2900));
724
725 ASSERT_TRUE(mapper0.Front() != nullptr);
726 output0.emplace_back(std::move(*mapper0.Front()));
727 mapper0.PopFront();
728 EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
729
730 ASSERT_TRUE(mapper0.Front() == nullptr);
731
732 EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
733 EXPECT_TRUE(output0[0].data.Verify());
734 EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
735 EXPECT_TRUE(output0[1].data.Verify());
736 EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
737 EXPECT_TRUE(output0[2].data.Verify());
738 }
739
740 {
741 SCOPED_TRACE("Trying node1 now");
742 std::deque<TimestampedMessage> output1;
743
744 ASSERT_TRUE(mapper1.Front() != nullptr);
745 output1.emplace_back(std::move(*mapper1.Front()));
746 mapper1.PopFront();
747 EXPECT_EQ(mapper1.sorted_until(),
748 e + chrono::seconds(100) + chrono::milliseconds(1900));
749
750 ASSERT_TRUE(mapper1.Front() != nullptr);
751 output1.emplace_back(std::move(*mapper1.Front()));
752 mapper1.PopFront();
753 EXPECT_EQ(mapper1.sorted_until(),
754 e + chrono::seconds(100) + chrono::milliseconds(2900));
755
756 ASSERT_TRUE(mapper1.Front() != nullptr);
757 output1.emplace_back(std::move(*mapper1.Front()));
758 mapper1.PopFront();
759 EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
760
761 ASSERT_TRUE(mapper1.Front() == nullptr);
762
763 EXPECT_EQ(output1[0].monotonic_event_time,
764 e + chrono::seconds(100) + chrono::milliseconds(1000));
765 EXPECT_TRUE(output1[0].data.Verify());
766 EXPECT_EQ(output1[1].monotonic_event_time,
767 e + chrono::seconds(100) + chrono::milliseconds(2000));
768 EXPECT_TRUE(output1[1].data.Verify());
769 EXPECT_EQ(output1[2].monotonic_event_time,
770 e + chrono::seconds(100) + chrono::milliseconds(3000));
771 EXPECT_TRUE(output1[2].data.Verify());
772 }
773}
774
775// Tests that we can match timestamps on delivered messages. By doing this in
776// the reverse order, the second node needs to queue data up from the first node
777// to find the matching timestamp.
778TEST_F(TimestampMapperTest, ReadNode1First) {
779 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
780 {
781 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
782 writer0.QueueSpan(config0_.span());
783 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
784 writer1.QueueSpan(config2_.span());
785
786 writer0.QueueSizedFlatbuffer(
787 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
788 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
789 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
790
791 writer0.QueueSizedFlatbuffer(
792 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
793 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
794 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
795
796 writer0.QueueSizedFlatbuffer(
797 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
798 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
799 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
800 }
801
802 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
803
804 ASSERT_EQ(parts[0].logger_node, "pi1");
805 ASSERT_EQ(parts[1].logger_node, "pi2");
806
807 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
808 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
809
810 mapper0.AddPeer(&mapper1);
811 mapper1.AddPeer(&mapper0);
812
813 {
814 SCOPED_TRACE("Trying node1 now");
815 std::deque<TimestampedMessage> output1;
816
817 ASSERT_TRUE(mapper1.Front() != nullptr);
818 output1.emplace_back(std::move(*mapper1.Front()));
819 mapper1.PopFront();
820 EXPECT_EQ(mapper1.sorted_until(),
821 e + chrono::seconds(100) + chrono::milliseconds(1900));
822
823 ASSERT_TRUE(mapper1.Front() != nullptr);
824 output1.emplace_back(std::move(*mapper1.Front()));
825 mapper1.PopFront();
826 EXPECT_EQ(mapper1.sorted_until(),
827 e + chrono::seconds(100) + chrono::milliseconds(2900));
828
829 ASSERT_TRUE(mapper1.Front() != nullptr);
830 output1.emplace_back(std::move(*mapper1.Front()));
831 mapper1.PopFront();
832 EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
833
834 ASSERT_TRUE(mapper1.Front() == nullptr);
835
836 EXPECT_EQ(output1[0].monotonic_event_time,
837 e + chrono::seconds(100) + chrono::milliseconds(1000));
838 EXPECT_TRUE(output1[0].data.Verify());
839 EXPECT_EQ(output1[1].monotonic_event_time,
840 e + chrono::seconds(100) + chrono::milliseconds(2000));
841 EXPECT_TRUE(output1[1].data.Verify());
842 EXPECT_EQ(output1[2].monotonic_event_time,
843 e + chrono::seconds(100) + chrono::milliseconds(3000));
844 EXPECT_TRUE(output1[2].data.Verify());
845 }
846
847 {
848 std::deque<TimestampedMessage> output0;
849
850 ASSERT_TRUE(mapper0.Front() != nullptr);
851 output0.emplace_back(std::move(*mapper0.Front()));
852 mapper0.PopFront();
853 EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
854
855 ASSERT_TRUE(mapper0.Front() != nullptr);
856 output0.emplace_back(std::move(*mapper0.Front()));
857 mapper0.PopFront();
858 EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
859
860 ASSERT_TRUE(mapper0.Front() != nullptr);
861 output0.emplace_back(std::move(*mapper0.Front()));
862 mapper0.PopFront();
863 EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
864
865 ASSERT_TRUE(mapper0.Front() == nullptr);
866
867 EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
868 EXPECT_TRUE(output0[0].data.Verify());
869 EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
870 EXPECT_TRUE(output0[1].data.Verify());
871 EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
872 EXPECT_TRUE(output0[2].data.Verify());
873 }
874}
875
876// Tests that we return just the timestamps if we couldn't find the data and the
877// missing data was at the beginning of the file.
878TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
879 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
880 {
881 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
882 writer0.QueueSpan(config0_.span());
883 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
884 writer1.QueueSpan(config2_.span());
885
886 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
887 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
888 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
889
890 writer0.QueueSizedFlatbuffer(
891 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
892 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
893 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
894
895 writer0.QueueSizedFlatbuffer(
896 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
897 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
898 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
899 }
900
901 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
902
903 ASSERT_EQ(parts[0].logger_node, "pi1");
904 ASSERT_EQ(parts[1].logger_node, "pi2");
905
906 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
907 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
908
909 mapper0.AddPeer(&mapper1);
910 mapper1.AddPeer(&mapper0);
911
912 {
913 SCOPED_TRACE("Trying node1 now");
914 std::deque<TimestampedMessage> output1;
915
916 ASSERT_TRUE(mapper1.Front() != nullptr);
917 output1.emplace_back(std::move(*mapper1.Front()));
918 mapper1.PopFront();
919 EXPECT_EQ(mapper1.sorted_until(),
920 e + chrono::seconds(100) + chrono::milliseconds(1900));
921
922 ASSERT_TRUE(mapper1.Front() != nullptr);
923 output1.emplace_back(std::move(*mapper1.Front()));
924 mapper1.PopFront();
925 EXPECT_EQ(mapper1.sorted_until(),
926 e + chrono::seconds(100) + chrono::milliseconds(2900));
927
928 ASSERT_TRUE(mapper1.Front() != nullptr);
929 output1.emplace_back(std::move(*mapper1.Front()));
930 mapper1.PopFront();
931 EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
932
933 ASSERT_TRUE(mapper1.Front() == nullptr);
934
935 EXPECT_EQ(output1[0].monotonic_event_time,
936 e + chrono::seconds(100) + chrono::milliseconds(1000));
937 EXPECT_FALSE(output1[0].data.Verify());
938 EXPECT_EQ(output1[1].monotonic_event_time,
939 e + chrono::seconds(100) + chrono::milliseconds(2000));
940 EXPECT_TRUE(output1[1].data.Verify());
941 EXPECT_EQ(output1[2].monotonic_event_time,
942 e + chrono::seconds(100) + chrono::milliseconds(3000));
943 EXPECT_TRUE(output1[2].data.Verify());
944 }
945}
946
947// Tests that we return just the timestamps if we couldn't find the data and the
948// missing data was at the end of the file.
949TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
950 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
951 {
952 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
953 writer0.QueueSpan(config0_.span());
954 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
955 writer1.QueueSpan(config2_.span());
956
957 writer0.QueueSizedFlatbuffer(
958 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
959 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
960 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
961
962 writer0.QueueSizedFlatbuffer(
963 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
964 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
965 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
966
967 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
968 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
969 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
970 }
971
972 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
973
974 ASSERT_EQ(parts[0].logger_node, "pi1");
975 ASSERT_EQ(parts[1].logger_node, "pi2");
976
977 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
978 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
979
980 mapper0.AddPeer(&mapper1);
981 mapper1.AddPeer(&mapper0);
982
983 {
984 SCOPED_TRACE("Trying node1 now");
985 std::deque<TimestampedMessage> output1;
986
987 ASSERT_TRUE(mapper1.Front() != nullptr);
988 output1.emplace_back(std::move(*mapper1.Front()));
989 mapper1.PopFront();
990 EXPECT_EQ(mapper1.sorted_until(),
991 e + chrono::seconds(100) + chrono::milliseconds(1900));
992
993 ASSERT_TRUE(mapper1.Front() != nullptr);
994 output1.emplace_back(std::move(*mapper1.Front()));
995 mapper1.PopFront();
996 EXPECT_EQ(mapper1.sorted_until(),
997 e + chrono::seconds(100) + chrono::milliseconds(2900));
998
999 ASSERT_TRUE(mapper1.Front() != nullptr);
1000 output1.emplace_back(std::move(*mapper1.Front()));
1001 mapper1.PopFront();
1002 EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
1003
1004 ASSERT_TRUE(mapper1.Front() == nullptr);
1005
1006 EXPECT_EQ(output1[0].monotonic_event_time,
1007 e + chrono::seconds(100) + chrono::milliseconds(1000));
1008 EXPECT_TRUE(output1[0].data.Verify());
1009 EXPECT_EQ(output1[1].monotonic_event_time,
1010 e + chrono::seconds(100) + chrono::milliseconds(2000));
1011 EXPECT_TRUE(output1[1].data.Verify());
1012 EXPECT_EQ(output1[2].monotonic_event_time,
1013 e + chrono::seconds(100) + chrono::milliseconds(3000));
1014 EXPECT_FALSE(output1[2].data.Verify());
1015 }
1016}
1017
Austin Schuh993ccb52020-12-12 15:59:32 -08001018// Tests that we handle a message which failed to forward or be logged.
1019TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1020 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1021 {
1022 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1023 writer0.QueueSpan(config0_.span());
1024 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1025 writer1.QueueSpan(config2_.span());
1026
1027 writer0.QueueSizedFlatbuffer(
1028 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1029 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1030 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1031
1032 // Create both the timestamp and message, but don't log them, simulating a
1033 // forwarding drop.
1034 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1035 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1036 chrono::seconds(100));
1037
1038 writer0.QueueSizedFlatbuffer(
1039 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1040 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1041 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1042 }
1043
1044 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1045
1046 ASSERT_EQ(parts[0].logger_node, "pi1");
1047 ASSERT_EQ(parts[1].logger_node, "pi2");
1048
1049 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1050 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1051
1052 mapper0.AddPeer(&mapper1);
1053 mapper1.AddPeer(&mapper0);
1054
1055 {
1056 std::deque<TimestampedMessage> output1;
1057
1058 ASSERT_TRUE(mapper1.Front() != nullptr);
1059 output1.emplace_back(std::move(*mapper1.Front()));
1060 mapper1.PopFront();
1061
1062 ASSERT_TRUE(mapper1.Front() != nullptr);
1063 output1.emplace_back(std::move(*mapper1.Front()));
1064
1065 ASSERT_FALSE(mapper1.Front() == nullptr);
1066
1067 EXPECT_EQ(output1[0].monotonic_event_time,
1068 e + chrono::seconds(100) + chrono::milliseconds(1000));
1069 EXPECT_TRUE(output1[0].data.Verify());
1070 EXPECT_EQ(output1[1].monotonic_event_time,
1071 e + chrono::seconds(100) + chrono::milliseconds(3000));
1072 EXPECT_TRUE(output1[1].data.Verify());
1073 }
1074}
1075
Austin Schuhd2f96102020-12-01 20:27:29 -08001076// Tests that we properly sort log files with duplicate timestamps.
1077TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1078 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1079 {
1080 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1081 writer0.QueueSpan(config0_.span());
1082 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1083 writer1.QueueSpan(config2_.span());
1084
1085 writer0.QueueSizedFlatbuffer(
1086 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1087 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1088 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1089
1090 writer0.QueueSizedFlatbuffer(
1091 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1092 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1093 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1094
1095 writer0.QueueSizedFlatbuffer(
1096 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
1097 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1098 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1099
1100 writer0.QueueSizedFlatbuffer(
1101 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
1102 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1103 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1104 }
1105
1106 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1107
1108 ASSERT_EQ(parts[0].logger_node, "pi1");
1109 ASSERT_EQ(parts[1].logger_node, "pi2");
1110
1111 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1112 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1113
1114 mapper0.AddPeer(&mapper1);
1115 mapper1.AddPeer(&mapper0);
1116
1117 {
1118 SCOPED_TRACE("Trying node1 now");
1119 std::deque<TimestampedMessage> output1;
1120
1121 for (int i = 0; i < 4; ++i) {
1122 ASSERT_TRUE(mapper1.Front() != nullptr);
1123 output1.emplace_back(std::move(*mapper1.Front()));
1124 mapper1.PopFront();
1125 }
1126 ASSERT_TRUE(mapper1.Front() == nullptr);
1127
1128 EXPECT_EQ(output1[0].monotonic_event_time,
1129 e + chrono::seconds(100) + chrono::milliseconds(1000));
1130 EXPECT_TRUE(output1[0].data.Verify());
1131 EXPECT_EQ(output1[1].monotonic_event_time,
1132 e + chrono::seconds(100) + chrono::milliseconds(2000));
1133 EXPECT_TRUE(output1[1].data.Verify());
1134 EXPECT_EQ(output1[2].monotonic_event_time,
1135 e + chrono::seconds(100) + chrono::milliseconds(2000));
1136 EXPECT_TRUE(output1[2].data.Verify());
1137 EXPECT_EQ(output1[3].monotonic_event_time,
1138 e + chrono::seconds(100) + chrono::milliseconds(3000));
1139 EXPECT_TRUE(output1[3].data.Verify());
1140 }
1141}
1142
1143// Tests that we properly sort log files with duplicate timestamps.
1144TEST_F(TimestampMapperTest, StartTime) {
1145 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1146 {
1147 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1148 writer0.QueueSpan(config0_.span());
1149 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1150 writer1.QueueSpan(config1_.span());
1151 DetachedBufferWriter writer2(logfile2_, std::make_unique<DummyEncoder>());
1152 writer2.QueueSpan(config3_.span());
1153 }
1154
1155 const std::vector<LogFile> parts =
1156 SortParts({logfile0_, logfile1_, logfile2_});
1157
1158 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1159
1160 EXPECT_EQ(mapper0.monotonic_start_time(), e + chrono::milliseconds(1));
1161 EXPECT_EQ(mapper0.realtime_start_time(),
1162 realtime_clock::time_point(chrono::seconds(1000)));
1163}
1164
Austin Schuhfecf1d82020-12-19 16:57:28 -08001165// Tests that when a peer isn't registered, we treat that as if there was no
1166// data available.
1167TEST_F(TimestampMapperTest, NoPeer) {
1168 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1169 {
1170 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1171 writer0.QueueSpan(config0_.span());
1172 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1173 writer1.QueueSpan(config2_.span());
1174
1175 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
1176 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1177 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1178
1179 writer0.QueueSizedFlatbuffer(
1180 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1181 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1182 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1183
1184 writer0.QueueSizedFlatbuffer(
1185 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1186 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1187 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1188 }
1189
1190 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1191
1192 ASSERT_EQ(parts[0].logger_node, "pi1");
1193 ASSERT_EQ(parts[1].logger_node, "pi2");
1194
1195 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1196
1197 {
1198 std::deque<TimestampedMessage> output1;
1199
1200 ASSERT_TRUE(mapper1.Front() != nullptr);
1201 output1.emplace_back(std::move(*mapper1.Front()));
1202 mapper1.PopFront();
1203 ASSERT_TRUE(mapper1.Front() != nullptr);
1204 output1.emplace_back(std::move(*mapper1.Front()));
1205 mapper1.PopFront();
1206 ASSERT_TRUE(mapper1.Front() != nullptr);
1207 output1.emplace_back(std::move(*mapper1.Front()));
1208 mapper1.PopFront();
1209 ASSERT_TRUE(mapper1.Front() == nullptr);
1210
1211 EXPECT_EQ(output1[0].monotonic_event_time,
1212 e + chrono::seconds(100) + chrono::milliseconds(1000));
1213 EXPECT_FALSE(output1[0].data.Verify());
1214 EXPECT_EQ(output1[1].monotonic_event_time,
1215 e + chrono::seconds(100) + chrono::milliseconds(2000));
1216 EXPECT_FALSE(output1[1].data.Verify());
1217 EXPECT_EQ(output1[2].monotonic_event_time,
1218 e + chrono::seconds(100) + chrono::milliseconds(3000));
1219 EXPECT_FALSE(output1[2].data.Verify());
1220 }
1221}
1222
Austin Schuhc243b422020-10-11 15:35:08 -07001223} // namespace testing
1224} // namespace logger
1225} // namespace aos