blob: d024ae0d7a553e1d66ed844918850258e3786278 [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
4#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07005
Austin Schuhc41603c2020-10-11 16:17:37 -07006#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -07007#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -08008#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -07009#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070010#include "aos/json_to_flatbuffer.h"
11#include "aos/testing/tmpdir.h"
Brian Smarttea913d42021-12-10 15:02:38 -080012#include "gflags/gflags.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070013#include "gtest/gtest.h"
Austin Schuhc243b422020-10-11 15:35:08 -070014
15namespace aos {
16namespace logger {
17namespace testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070018namespace chrono = std::chrono;
Austin Schuhc243b422020-10-11 15:35:08 -070019
Austin Schuhe243aaf2020-10-11 15:46:02 -070020// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070021template <typename T>
22SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
23 const std::string_view data) {
24 flatbuffers::FlatBufferBuilder fbb;
25 fbb.ForceDefaults(true);
26 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
27 return fbb.Release();
28}
29
Austin Schuhe243aaf2020-10-11 15:46:02 -070030// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070031TEST(SpanReaderTest, ReadWrite) {
32 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
33 unlink(logfile.c_str());
34
35 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080036 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070037 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080038 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070039
40 {
41 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080042 writer.QueueSpan(m1.span());
43 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070044 }
45
46 SpanReader reader(logfile);
47
48 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhcf5f6442021-07-06 10:43:28 -070049 EXPECT_EQ(reader.PeekMessage(), m1.span());
50 EXPECT_EQ(reader.PeekMessage(), m1.span());
Austin Schuhadd6eb32020-11-09 21:24:26 -080051 EXPECT_EQ(reader.ReadMessage(), m1.span());
52 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhcf5f6442021-07-06 10:43:28 -070053 EXPECT_EQ(reader.PeekMessage(), absl::Span<const uint8_t>());
Austin Schuhc243b422020-10-11 15:35:08 -070054 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
55}
56
Austin Schuhe243aaf2020-10-11 15:46:02 -070057// Tests that we can actually parse the resulting messages at a basic level
58// through MessageReader.
59TEST(MessageReaderTest, ReadWrite) {
60 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
61 unlink(logfile.c_str());
62
63 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
64 JsonToSizedFlatbuffer<LogFileHeader>(
65 R"({ "max_out_of_order_duration": 100000000 })");
66 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
67 JsonToSizedFlatbuffer<MessageHeader>(
68 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
69 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
70 JsonToSizedFlatbuffer<MessageHeader>(
71 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
72
73 {
74 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080075 writer.QueueSpan(config.span());
76 writer.QueueSpan(m1.span());
77 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -070078 }
79
80 MessageReader reader(logfile);
81
82 EXPECT_EQ(reader.filename(), logfile);
83
84 EXPECT_EQ(
85 reader.max_out_of_order_duration(),
86 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
87 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
88 EXPECT_TRUE(reader.ReadMessage());
89 EXPECT_EQ(reader.newest_timestamp(),
90 monotonic_clock::time_point(chrono::nanoseconds(1)));
91 EXPECT_TRUE(reader.ReadMessage());
92 EXPECT_EQ(reader.newest_timestamp(),
93 monotonic_clock::time_point(chrono::nanoseconds(2)));
94 EXPECT_FALSE(reader.ReadMessage());
95}
96
Austin Schuh32f68492020-11-08 21:45:51 -080097// Tests that we explode when messages are too far out of order.
98TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
99 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
100 unlink(logfile0.c_str());
101
102 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
103 JsonToSizedFlatbuffer<LogFileHeader>(
104 R"({
105 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800106 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800107 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
108 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
109 "parts_index": 0
110})");
111
112 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
113 JsonToSizedFlatbuffer<MessageHeader>(
114 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
115 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
116 JsonToSizedFlatbuffer<MessageHeader>(
117 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
118 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
119 JsonToSizedFlatbuffer<MessageHeader>(
120 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
121
122 {
123 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800124 writer.QueueSpan(config0.span());
125 writer.QueueSpan(m1.span());
126 writer.QueueSpan(m2.span());
127 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800128 }
129
130 const std::vector<LogFile> parts = SortParts({logfile0});
131
132 PartsMessageReader reader(parts[0].parts[0]);
133
134 EXPECT_TRUE(reader.ReadMessage());
135 EXPECT_TRUE(reader.ReadMessage());
136 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
137}
138
Austin Schuhc41603c2020-10-11 16:17:37 -0700139// Tests that we can transparently re-assemble part files with a
140// PartsMessageReader.
141TEST(PartsMessageReaderTest, ReadWrite) {
142 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
143 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
144 unlink(logfile0.c_str());
145 unlink(logfile1.c_str());
146
147 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
148 JsonToSizedFlatbuffer<LogFileHeader>(
149 R"({
150 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800151 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700152 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
153 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
154 "parts_index": 0
155})");
156 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
157 JsonToSizedFlatbuffer<LogFileHeader>(
158 R"({
159 "max_out_of_order_duration": 200000000,
160 "monotonic_start_time": 0,
161 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800162 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700163 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
164 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
165 "parts_index": 1
166})");
167
168 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
169 JsonToSizedFlatbuffer<MessageHeader>(
170 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
171 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
172 JsonToSizedFlatbuffer<MessageHeader>(
173 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
174
175 {
176 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800177 writer.QueueSpan(config0.span());
178 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700179 }
180 {
181 DetachedBufferWriter writer(logfile1, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800182 writer.QueueSpan(config1.span());
183 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700184 }
185
186 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
187
188 PartsMessageReader reader(parts[0].parts[0]);
189
190 EXPECT_EQ(reader.filename(), logfile0);
191
192 // Confirm that the timestamps track, and the filename also updates.
193 // Read the first message.
194 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
195 EXPECT_EQ(
196 reader.max_out_of_order_duration(),
197 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
198 EXPECT_TRUE(reader.ReadMessage());
199 EXPECT_EQ(reader.filename(), logfile0);
200 EXPECT_EQ(reader.newest_timestamp(),
201 monotonic_clock::time_point(chrono::nanoseconds(1)));
202 EXPECT_EQ(
203 reader.max_out_of_order_duration(),
204 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
205
206 // Read the second message.
207 EXPECT_TRUE(reader.ReadMessage());
208 EXPECT_EQ(reader.filename(), logfile1);
209 EXPECT_EQ(reader.newest_timestamp(),
210 monotonic_clock::time_point(chrono::nanoseconds(2)));
211 EXPECT_EQ(
212 reader.max_out_of_order_duration(),
213 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
214
215 // And then confirm that reading again returns no message.
216 EXPECT_FALSE(reader.ReadMessage());
217 EXPECT_EQ(reader.filename(), logfile1);
218 EXPECT_EQ(
219 reader.max_out_of_order_duration(),
220 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800221 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Austin Schuhc41603c2020-10-11 16:17:37 -0700222}
Austin Schuh32f68492020-11-08 21:45:51 -0800223
Austin Schuh1be0ce42020-11-29 22:43:26 -0800224// Tests that Message's operator < works as expected.
225TEST(MessageTest, Sorting) {
226 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
227
228 Message m1{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700229 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700230 .timestamp =
231 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
Austin Schuh48507722021-07-17 17:29:24 -0700232 .monotonic_remote_boot = 0xffffff,
233 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700234 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800235 Message m2{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700236 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700237 .timestamp =
238 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
Austin Schuh48507722021-07-17 17:29:24 -0700239 .monotonic_remote_boot = 0xffffff,
240 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700241 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800242
243 EXPECT_LT(m1, m2);
244 EXPECT_GE(m2, m1);
245
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700246 m1.timestamp.time = e;
247 m2.timestamp.time = e;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800248
249 m1.channel_index = 1;
250 m2.channel_index = 2;
251
252 EXPECT_LT(m1, m2);
253 EXPECT_GE(m2, m1);
254
255 m1.channel_index = 0;
256 m2.channel_index = 0;
Austin Schuh58646e22021-08-23 23:51:46 -0700257 m1.queue_index.index = 0u;
258 m2.queue_index.index = 1u;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800259
260 EXPECT_LT(m1, m2);
261 EXPECT_GE(m2, m1);
262}
263
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800264aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
265 const aos::FlatbufferDetachedBuffer<Configuration> &config,
266 const std::string_view json) {
267 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700268 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800269 flatbuffers::Offset<Configuration> config_offset =
270 aos::CopyFlatBuffer(config, &fbb);
271 LogFileHeader::Builder header_builder(fbb);
272 header_builder.add_configuration(config_offset);
273 fbb.Finish(header_builder.Finish());
274 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
275
276 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
277 JsonToFlatbuffer<LogFileHeader>(json));
278 CHECK(header_updates.Verify());
279 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700280 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800281 fbb2.FinishSizePrefixed(
282 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
283 return fbb2.Release();
284}
285
286class SortingElementTest : public ::testing::Test {
287 public:
288 SortingElementTest()
289 : config_(JsonToFlatbuffer<Configuration>(
290 R"({
291 "channels": [
292 {
293 "name": "/a",
294 "type": "aos.logger.testing.TestMessage",
295 "source_node": "pi1",
296 "destination_nodes": [
297 {
298 "name": "pi2"
299 },
300 {
301 "name": "pi3"
302 }
303 ]
304 },
305 {
306 "name": "/b",
307 "type": "aos.logger.testing.TestMessage",
308 "source_node": "pi1"
309 },
310 {
311 "name": "/c",
312 "type": "aos.logger.testing.TestMessage",
313 "source_node": "pi1"
Austin Schuh48507722021-07-17 17:29:24 -0700314 },
315 {
316 "name": "/d",
317 "type": "aos.logger.testing.TestMessage",
318 "source_node": "pi2",
319 "destination_nodes": [
320 {
321 "name": "pi1"
322 }
323 ]
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800324 }
325 ],
326 "nodes": [
327 {
328 "name": "pi1"
329 },
330 {
331 "name": "pi2"
332 },
333 {
334 "name": "pi3"
335 }
336 ]
337}
338)")),
339 config0_(MakeHeader(config_, R"({
340 /* 100ms */
341 "max_out_of_order_duration": 100000000,
342 "node": {
343 "name": "pi1"
344 },
345 "logger_node": {
346 "name": "pi1"
347 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800348 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800349 "realtime_start_time": 1000000000000,
350 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700351 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
352 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
353 "boot_uuids": [
354 "1d782c63-b3c7-466e-bea9-a01308b43333",
355 "",
356 ""
357 ],
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800358 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
359 "parts_index": 0
360})")),
361 config1_(MakeHeader(config_,
362 R"({
363 /* 100ms */
364 "max_out_of_order_duration": 100000000,
365 "node": {
366 "name": "pi1"
367 },
368 "logger_node": {
369 "name": "pi1"
370 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800371 "monotonic_start_time": 1000000,
372 "realtime_start_time": 1000000000000,
373 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700374 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
375 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
376 "boot_uuids": [
377 "1d782c63-b3c7-466e-bea9-a01308b43333",
378 "",
379 ""
380 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800381 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
382 "parts_index": 0
383})")),
384 config2_(MakeHeader(config_,
385 R"({
386 /* 100ms */
387 "max_out_of_order_duration": 100000000,
388 "node": {
389 "name": "pi2"
390 },
391 "logger_node": {
392 "name": "pi2"
393 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800394 "monotonic_start_time": 0,
395 "realtime_start_time": 1000000000000,
Austin Schuh48507722021-07-17 17:29:24 -0700396 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
397 "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
398 "boot_uuids": [
399 "",
400 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
401 ""
402 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800403 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
404 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
405 "parts_index": 0
406})")),
407 config3_(MakeHeader(config_,
408 R"({
409 /* 100ms */
410 "max_out_of_order_duration": 100000000,
411 "node": {
412 "name": "pi1"
413 },
414 "logger_node": {
415 "name": "pi1"
416 },
417 "monotonic_start_time": 2000000,
418 "realtime_start_time": 1000000000,
419 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
Austin Schuh48507722021-07-17 17:29:24 -0700420 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
421 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
422 "boot_uuids": [
423 "1d782c63-b3c7-466e-bea9-a01308b43333",
424 "",
425 ""
426 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800427 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800428 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800429})")),
430 config4_(MakeHeader(config_,
431 R"({
432 /* 100ms */
433 "max_out_of_order_duration": 100000000,
434 "node": {
435 "name": "pi2"
436 },
437 "logger_node": {
438 "name": "pi1"
439 },
440 "monotonic_start_time": 2000000,
441 "realtime_start_time": 1000000000,
442 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
443 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
Austin Schuh48507722021-07-17 17:29:24 -0700444 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
445 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
446 "boot_uuids": [
447 "1d782c63-b3c7-466e-bea9-a01308b43333",
448 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
449 ""
450 ],
Austin Schuh8bf1e632021-01-02 22:41:04 -0800451 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800452})")) {
453 unlink(logfile0_.c_str());
454 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800455 unlink(logfile2_.c_str());
Austin Schuh48507722021-07-17 17:29:24 -0700456 unlink(logfile3_.c_str());
Austin Schuh921b2562021-07-31 19:37:03 -0700457 queue_index_.resize(config_.message().channels()->size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800458 }
459
460 protected:
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800461 flatbuffers::DetachedBuffer MakeLogMessage(
462 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
463 int value) {
464 flatbuffers::FlatBufferBuilder message_fbb;
465 message_fbb.ForceDefaults(true);
466 TestMessage::Builder test_message_builder(message_fbb);
467 test_message_builder.add_value(value);
468 message_fbb.Finish(test_message_builder.Finish());
469
470 aos::Context context;
471 context.monotonic_event_time = monotonic_now;
472 context.realtime_event_time = aos::realtime_clock::epoch() +
473 chrono::seconds(1000) +
474 monotonic_now.time_since_epoch();
Austin Schuh921b2562021-07-31 19:37:03 -0700475 CHECK_LT(static_cast<size_t>(channel_index), queue_index_.size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800476 context.queue_index = queue_index_[channel_index];
477 context.size = message_fbb.GetSize();
478 context.data = message_fbb.GetBufferPointer();
479
480 ++queue_index_[channel_index];
481
482 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh53e3a812021-11-03 16:57:23 -0700483 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800484 fbb.FinishSizePrefixed(
485 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
486
487 return fbb.Release();
488 }
489
490 flatbuffers::DetachedBuffer MakeTimestampMessage(
491 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800492 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
493 monotonic_clock::time_point monotonic_timestamp_time =
494 monotonic_clock::min_time) {
495 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800496 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800497
498 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800499 fbb.ForceDefaults(true);
500
501 logger::MessageHeader::Builder message_header_builder(fbb);
502
503 message_header_builder.add_channel_index(channel_index);
504
505 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
506 100);
507 message_header_builder.add_monotonic_sent_time(
508 monotonic_sent_time.time_since_epoch().count());
509 message_header_builder.add_realtime_sent_time(
510 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
511 monotonic_sent_time.time_since_epoch())
512 .time_since_epoch()
513 .count());
514
515 message_header_builder.add_monotonic_remote_time(
516 sender_monotonic_now.time_since_epoch().count());
517 message_header_builder.add_realtime_remote_time(
518 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
519 sender_monotonic_now.time_since_epoch())
520 .time_since_epoch()
521 .count());
522 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
523 1);
524
525 if (monotonic_timestamp_time != monotonic_clock::min_time) {
526 message_header_builder.add_monotonic_timestamp_time(
527 monotonic_timestamp_time.time_since_epoch().count());
528 }
529
530 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800531 LOG(INFO) << aos::FlatbufferToJson(
532 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
533 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
534
535 return fbb.Release();
536 }
537
538 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
539 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800540 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh48507722021-07-17 17:29:24 -0700541 const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800542
543 const aos::FlatbufferDetachedBuffer<Configuration> config_;
544 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
545 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800546 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
547 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800548 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800549
550 std::vector<uint32_t> queue_index_;
551};
552
553using LogPartsSorterTest = SortingElementTest;
554using LogPartsSorterDeathTest = LogPartsSorterTest;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800555using NodeMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800556using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800557
558// Tests that we can pull messages out of a log sorted in order.
559TEST_F(LogPartsSorterTest, Pull) {
560 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
561 {
562 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
563 writer.QueueSpan(config0_.span());
564 writer.QueueSizedFlatbuffer(
565 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
566 writer.QueueSizedFlatbuffer(
567 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
568 writer.QueueSizedFlatbuffer(
569 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
570 writer.QueueSizedFlatbuffer(
571 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
572 }
573
574 const std::vector<LogFile> parts = SortParts({logfile0_});
575
576 LogPartsSorter parts_sorter(parts[0].parts[0]);
577
578 // Confirm we aren't sorted until any time until the message is popped.
579 // Peeking shouldn't change the sorted until time.
580 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
581
582 std::deque<Message> output;
583
584 ASSERT_TRUE(parts_sorter.Front() != nullptr);
585 output.emplace_back(std::move(*parts_sorter.Front()));
586 parts_sorter.PopFront();
587 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
588
589 ASSERT_TRUE(parts_sorter.Front() != nullptr);
590 output.emplace_back(std::move(*parts_sorter.Front()));
591 parts_sorter.PopFront();
592 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
593
594 ASSERT_TRUE(parts_sorter.Front() != nullptr);
595 output.emplace_back(std::move(*parts_sorter.Front()));
596 parts_sorter.PopFront();
597 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
598
599 ASSERT_TRUE(parts_sorter.Front() != nullptr);
600 output.emplace_back(std::move(*parts_sorter.Front()));
601 parts_sorter.PopFront();
602 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
603
604 ASSERT_TRUE(parts_sorter.Front() == nullptr);
605
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700606 EXPECT_EQ(output[0].timestamp.boot, 0);
607 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
608 EXPECT_EQ(output[1].timestamp.boot, 0);
609 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1000));
610 EXPECT_EQ(output[2].timestamp.boot, 0);
611 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1901));
612 EXPECT_EQ(output[3].timestamp.boot, 0);
613 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800614}
615
Austin Schuhb000de62020-12-03 22:00:40 -0800616// Tests that we can pull messages out of a log sorted in order.
617TEST_F(LogPartsSorterTest, WayBeforeStart) {
618 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
619 {
620 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
621 writer.QueueSpan(config0_.span());
622 writer.QueueSizedFlatbuffer(
623 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
624 writer.QueueSizedFlatbuffer(
625 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
626 writer.QueueSizedFlatbuffer(
627 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
628 writer.QueueSizedFlatbuffer(
629 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
630 writer.QueueSizedFlatbuffer(
631 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
632 }
633
634 const std::vector<LogFile> parts = SortParts({logfile0_});
635
636 LogPartsSorter parts_sorter(parts[0].parts[0]);
637
638 // Confirm we aren't sorted until any time until the message is popped.
639 // Peeking shouldn't change the sorted until time.
640 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
641
642 std::deque<Message> output;
643
644 for (monotonic_clock::time_point t :
645 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
646 e + chrono::milliseconds(1900), monotonic_clock::max_time,
647 monotonic_clock::max_time}) {
648 ASSERT_TRUE(parts_sorter.Front() != nullptr);
649 output.emplace_back(std::move(*parts_sorter.Front()));
650 parts_sorter.PopFront();
651 EXPECT_EQ(parts_sorter.sorted_until(), t);
652 }
653
654 ASSERT_TRUE(parts_sorter.Front() == nullptr);
655
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700656 EXPECT_EQ(output[0].timestamp.boot, 0u);
657 EXPECT_EQ(output[0].timestamp.time, e - chrono::milliseconds(1000));
658 EXPECT_EQ(output[1].timestamp.boot, 0u);
659 EXPECT_EQ(output[1].timestamp.time, e - chrono::milliseconds(500));
660 EXPECT_EQ(output[2].timestamp.boot, 0u);
661 EXPECT_EQ(output[2].timestamp.time, e - chrono::milliseconds(10));
662 EXPECT_EQ(output[3].timestamp.boot, 0u);
663 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(1901));
664 EXPECT_EQ(output[4].timestamp.boot, 0u);
665 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuhb000de62020-12-03 22:00:40 -0800666}
667
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800668// Tests that messages too far out of order trigger death.
669TEST_F(LogPartsSorterDeathTest, Pull) {
670 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
671 {
672 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
673 writer.QueueSpan(config0_.span());
674 writer.QueueSizedFlatbuffer(
675 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
676 writer.QueueSizedFlatbuffer(
677 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
678 writer.QueueSizedFlatbuffer(
679 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
680 // The following message is too far out of order and will trigger the CHECK.
681 writer.QueueSizedFlatbuffer(
682 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
683 }
684
685 const std::vector<LogFile> parts = SortParts({logfile0_});
686
687 LogPartsSorter parts_sorter(parts[0].parts[0]);
688
689 // Confirm we aren't sorted until any time until the message is popped.
690 // Peeking shouldn't change the sorted until time.
691 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
692 std::deque<Message> output;
693
694 ASSERT_TRUE(parts_sorter.Front() != nullptr);
695 parts_sorter.PopFront();
696 ASSERT_TRUE(parts_sorter.Front() != nullptr);
697 ASSERT_TRUE(parts_sorter.Front() != nullptr);
698 parts_sorter.PopFront();
699
Austin Schuh58646e22021-08-23 23:51:46 -0700700 EXPECT_DEATH({ parts_sorter.Front(); },
701 "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800702}
703
Austin Schuh8f52ed52020-11-30 23:12:39 -0800704// Tests that we can merge data from 2 separate files, including duplicate data.
705TEST_F(NodeMergerTest, TwoFileMerger) {
706 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
707 {
708 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
709 writer0.QueueSpan(config0_.span());
710 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
711 writer1.QueueSpan(config1_.span());
712
713 writer0.QueueSizedFlatbuffer(
714 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
715 writer1.QueueSizedFlatbuffer(
716 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
717
718 writer0.QueueSizedFlatbuffer(
719 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
720 writer1.QueueSizedFlatbuffer(
721 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
722
723 // Make a duplicate!
724 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
725 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
726 writer0.QueueSpan(msg.span());
727 writer1.QueueSpan(msg.span());
728
729 writer1.QueueSizedFlatbuffer(
730 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
731 }
732
733 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Austin Schuhd2f96102020-12-01 20:27:29 -0800734 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800735
Austin Schuhd2f96102020-12-01 20:27:29 -0800736 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800737
738 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
739
740 std::deque<Message> output;
741
742 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
743 ASSERT_TRUE(merger.Front() != nullptr);
744 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
745
746 output.emplace_back(std::move(*merger.Front()));
747 merger.PopFront();
748 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
749
750 ASSERT_TRUE(merger.Front() != nullptr);
751 output.emplace_back(std::move(*merger.Front()));
752 merger.PopFront();
753 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
754
755 ASSERT_TRUE(merger.Front() != nullptr);
756 output.emplace_back(std::move(*merger.Front()));
757 merger.PopFront();
758 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
759
760 ASSERT_TRUE(merger.Front() != nullptr);
761 output.emplace_back(std::move(*merger.Front()));
762 merger.PopFront();
763 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
764
765 ASSERT_TRUE(merger.Front() != nullptr);
766 output.emplace_back(std::move(*merger.Front()));
767 merger.PopFront();
768 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
769
770 ASSERT_TRUE(merger.Front() != nullptr);
771 output.emplace_back(std::move(*merger.Front()));
772 merger.PopFront();
773 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
774
775 ASSERT_TRUE(merger.Front() == nullptr);
776
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700777 EXPECT_EQ(output[0].timestamp.boot, 0u);
778 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
779 EXPECT_EQ(output[1].timestamp.boot, 0u);
780 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1001));
781 EXPECT_EQ(output[2].timestamp.boot, 0u);
782 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1002));
783 EXPECT_EQ(output[3].timestamp.boot, 0u);
784 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
785 EXPECT_EQ(output[4].timestamp.boot, 0u);
786 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(3000));
787 EXPECT_EQ(output[5].timestamp.boot, 0u);
788 EXPECT_EQ(output[5].timestamp.time, e + chrono::milliseconds(3002));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800789}
790
Austin Schuh8bf1e632021-01-02 22:41:04 -0800791// Tests that we can merge timestamps with various combinations of
792// monotonic_timestamp_time.
793TEST_F(NodeMergerTest, TwoFileTimestampMerger) {
794 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
795 {
796 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
797 writer0.QueueSpan(config0_.span());
798 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
799 writer1.QueueSpan(config1_.span());
800
801 // Neither has it.
802 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
803 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
804 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
805 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
806 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
807
808 // First only has it.
809 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
810 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
811 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
812 e + chrono::nanoseconds(971)));
813 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
814 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
815
816 // Second only has it.
817 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
818 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
819 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
820 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
821 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
822 e + chrono::nanoseconds(972)));
823
824 // Both have it.
825 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
826 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
827 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
828 e + chrono::nanoseconds(973)));
829 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
830 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
831 e + chrono::nanoseconds(973)));
832 }
833
834 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
835 ASSERT_EQ(parts.size(), 1u);
836
837 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
838
839 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
840
841 std::deque<Message> output;
842
843 for (int i = 0; i < 4; ++i) {
844 ASSERT_TRUE(merger.Front() != nullptr);
845 output.emplace_back(std::move(*merger.Front()));
846 merger.PopFront();
847 }
848 ASSERT_TRUE(merger.Front() == nullptr);
849
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700850 EXPECT_EQ(output[0].timestamp.boot, 0u);
851 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700852 EXPECT_FALSE(output[0].data->has_monotonic_timestamp_time);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700853
854 EXPECT_EQ(output[1].timestamp.boot, 0u);
855 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700856 EXPECT_TRUE(output[1].data->has_monotonic_timestamp_time);
857 EXPECT_EQ(output[1].data->monotonic_timestamp_time,
858 monotonic_clock::time_point(std::chrono::nanoseconds(971)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700859
860 EXPECT_EQ(output[2].timestamp.boot, 0u);
861 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700862 EXPECT_TRUE(output[2].data->has_monotonic_timestamp_time);
863 EXPECT_EQ(output[2].data->monotonic_timestamp_time,
864 monotonic_clock::time_point(std::chrono::nanoseconds(972)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700865
866 EXPECT_EQ(output[3].timestamp.boot, 0u);
867 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700868 EXPECT_TRUE(output[3].data->has_monotonic_timestamp_time);
869 EXPECT_EQ(output[3].data->monotonic_timestamp_time,
870 monotonic_clock::time_point(std::chrono::nanoseconds(973)));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800871}
872
Austin Schuhd2f96102020-12-01 20:27:29 -0800873// Tests that we can match timestamps on delivered messages.
874TEST_F(TimestampMapperTest, ReadNode0First) {
875 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
876 {
877 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
878 writer0.QueueSpan(config0_.span());
879 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
880 writer1.QueueSpan(config2_.span());
881
882 writer0.QueueSizedFlatbuffer(
883 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
884 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
885 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
886
887 writer0.QueueSizedFlatbuffer(
888 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
889 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
890 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
891
892 writer0.QueueSizedFlatbuffer(
893 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
894 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
895 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
896 }
897
898 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
899
900 ASSERT_EQ(parts[0].logger_node, "pi1");
901 ASSERT_EQ(parts[1].logger_node, "pi2");
902
Austin Schuh79b30942021-01-24 22:32:21 -0800903 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800904 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -0800905 mapper0.set_timestamp_callback(
906 [&](TimestampedMessage *) { ++mapper0_count; });
907 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800908 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -0800909 mapper1.set_timestamp_callback(
910 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -0800911
912 mapper0.AddPeer(&mapper1);
913 mapper1.AddPeer(&mapper0);
914
915 {
916 std::deque<TimestampedMessage> output0;
917
Austin Schuh79b30942021-01-24 22:32:21 -0800918 EXPECT_EQ(mapper0_count, 0u);
919 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800920 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800921 EXPECT_EQ(mapper0_count, 1u);
922 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800923 output0.emplace_back(std::move(*mapper0.Front()));
924 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700925 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800926 EXPECT_EQ(mapper0_count, 1u);
927 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800928
929 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800930 EXPECT_EQ(mapper0_count, 2u);
931 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800932 output0.emplace_back(std::move(*mapper0.Front()));
933 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700934 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800935
936 ASSERT_TRUE(mapper0.Front() != nullptr);
937 output0.emplace_back(std::move(*mapper0.Front()));
938 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700939 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800940
Austin Schuh79b30942021-01-24 22:32:21 -0800941 EXPECT_EQ(mapper0_count, 3u);
942 EXPECT_EQ(mapper1_count, 0u);
943
Austin Schuhd2f96102020-12-01 20:27:29 -0800944 ASSERT_TRUE(mapper0.Front() == nullptr);
945
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700946 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
947 EXPECT_EQ(output0[0].monotonic_event_time.time,
948 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700949 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700950
951 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
952 EXPECT_EQ(output0[1].monotonic_event_time.time,
953 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700954 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700955
956 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
957 EXPECT_EQ(output0[2].monotonic_event_time.time,
958 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700959 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -0800960 }
961
962 {
963 SCOPED_TRACE("Trying node1 now");
964 std::deque<TimestampedMessage> output1;
965
Austin Schuh79b30942021-01-24 22:32:21 -0800966 EXPECT_EQ(mapper0_count, 3u);
967 EXPECT_EQ(mapper1_count, 0u);
968
Austin Schuhd2f96102020-12-01 20:27:29 -0800969 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800970 EXPECT_EQ(mapper0_count, 3u);
971 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800972 output1.emplace_back(std::move(*mapper1.Front()));
973 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700974 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800975 EXPECT_EQ(mapper0_count, 3u);
976 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800977
978 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800979 EXPECT_EQ(mapper0_count, 3u);
980 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800981 output1.emplace_back(std::move(*mapper1.Front()));
982 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700983 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800984
985 ASSERT_TRUE(mapper1.Front() != nullptr);
986 output1.emplace_back(std::move(*mapper1.Front()));
987 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700988 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800989
Austin Schuh79b30942021-01-24 22:32:21 -0800990 EXPECT_EQ(mapper0_count, 3u);
991 EXPECT_EQ(mapper1_count, 3u);
992
Austin Schuhd2f96102020-12-01 20:27:29 -0800993 ASSERT_TRUE(mapper1.Front() == nullptr);
994
Austin Schuh79b30942021-01-24 22:32:21 -0800995 EXPECT_EQ(mapper0_count, 3u);
996 EXPECT_EQ(mapper1_count, 3u);
997
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700998 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
999 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001000 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001001 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001002
1003 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1004 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001005 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001006 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001007
1008 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1009 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001010 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001011 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001012 }
1013}
1014
Austin Schuh8bf1e632021-01-02 22:41:04 -08001015// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
1016// returned.
1017TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
1018 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1019 {
1020 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1021 writer0.QueueSpan(config0_.span());
1022 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1023 writer1.QueueSpan(config4_.span());
1024
1025 writer0.QueueSizedFlatbuffer(
1026 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1027 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1028 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
1029 e + chrono::nanoseconds(971)));
1030
1031 writer0.QueueSizedFlatbuffer(
1032 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1033 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1034 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
1035 e + chrono::nanoseconds(5458)));
1036
1037 writer0.QueueSizedFlatbuffer(
1038 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1039 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1040 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1041 }
1042
1043 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1044
1045 for (const auto &p : parts) {
1046 LOG(INFO) << p;
1047 }
1048
1049 ASSERT_EQ(parts.size(), 1u);
1050
Austin Schuh79b30942021-01-24 22:32:21 -08001051 size_t mapper0_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001052 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001053 mapper0.set_timestamp_callback(
1054 [&](TimestampedMessage *) { ++mapper0_count; });
1055 size_t mapper1_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001056 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001057 mapper1.set_timestamp_callback(
1058 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -08001059
1060 mapper0.AddPeer(&mapper1);
1061 mapper1.AddPeer(&mapper0);
1062
1063 {
1064 std::deque<TimestampedMessage> output0;
1065
1066 for (int i = 0; i < 3; ++i) {
1067 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
1068 output0.emplace_back(std::move(*mapper0.Front()));
1069 mapper0.PopFront();
1070 }
1071
1072 ASSERT_TRUE(mapper0.Front() == nullptr);
1073
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001074 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1075 EXPECT_EQ(output0[0].monotonic_event_time.time,
1076 e + chrono::milliseconds(1000));
1077 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
1078 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
1079 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001080 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001081
1082 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1083 EXPECT_EQ(output0[1].monotonic_event_time.time,
1084 e + chrono::milliseconds(2000));
1085 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
1086 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
1087 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001088 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001089
1090 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1091 EXPECT_EQ(output0[2].monotonic_event_time.time,
1092 e + chrono::milliseconds(3000));
1093 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
1094 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
1095 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001096 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001097 }
1098
1099 {
1100 SCOPED_TRACE("Trying node1 now");
1101 std::deque<TimestampedMessage> output1;
1102
1103 for (int i = 0; i < 3; ++i) {
1104 ASSERT_TRUE(mapper1.Front() != nullptr);
1105 output1.emplace_back(std::move(*mapper1.Front()));
1106 mapper1.PopFront();
1107 }
1108
1109 ASSERT_TRUE(mapper1.Front() == nullptr);
1110
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001111 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1112 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001113 e + chrono::seconds(100) + chrono::milliseconds(1000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001114 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
1115 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001116 e + chrono::nanoseconds(971));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001117 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001118
1119 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1120 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001121 e + chrono::seconds(100) + chrono::milliseconds(2000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001122 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
1123 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001124 e + chrono::nanoseconds(5458));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001125 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001126
1127 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1128 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001129 e + chrono::seconds(100) + chrono::milliseconds(3000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001130 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
1131 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
1132 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001133 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001134 }
Austin Schuh79b30942021-01-24 22:32:21 -08001135
1136 EXPECT_EQ(mapper0_count, 3u);
1137 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001138}
1139
Austin Schuhd2f96102020-12-01 20:27:29 -08001140// Tests that we can match timestamps on delivered messages. By doing this in
1141// the reverse order, the second node needs to queue data up from the first node
1142// to find the matching timestamp.
1143TEST_F(TimestampMapperTest, ReadNode1First) {
1144 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1145 {
1146 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1147 writer0.QueueSpan(config0_.span());
1148 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1149 writer1.QueueSpan(config2_.span());
1150
1151 writer0.QueueSizedFlatbuffer(
1152 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1153 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1154 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1155
1156 writer0.QueueSizedFlatbuffer(
1157 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1158 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1159 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1160
1161 writer0.QueueSizedFlatbuffer(
1162 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1163 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1164 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1165 }
1166
1167 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1168
1169 ASSERT_EQ(parts[0].logger_node, "pi1");
1170 ASSERT_EQ(parts[1].logger_node, "pi2");
1171
Austin Schuh79b30942021-01-24 22:32:21 -08001172 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001173 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001174 mapper0.set_timestamp_callback(
1175 [&](TimestampedMessage *) { ++mapper0_count; });
1176 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001177 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001178 mapper1.set_timestamp_callback(
1179 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001180
1181 mapper0.AddPeer(&mapper1);
1182 mapper1.AddPeer(&mapper0);
1183
1184 {
1185 SCOPED_TRACE("Trying node1 now");
1186 std::deque<TimestampedMessage> output1;
1187
1188 ASSERT_TRUE(mapper1.Front() != nullptr);
1189 output1.emplace_back(std::move(*mapper1.Front()));
1190 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001191 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001192
1193 ASSERT_TRUE(mapper1.Front() != nullptr);
1194 output1.emplace_back(std::move(*mapper1.Front()));
1195 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001196 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001197
1198 ASSERT_TRUE(mapper1.Front() != nullptr);
1199 output1.emplace_back(std::move(*mapper1.Front()));
1200 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001201 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001202
1203 ASSERT_TRUE(mapper1.Front() == nullptr);
1204
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001205 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1206 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001207 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001208 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001209
1210 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1211 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001212 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001213 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001214
1215 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1216 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001217 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001218 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001219 }
1220
1221 {
1222 std::deque<TimestampedMessage> output0;
1223
1224 ASSERT_TRUE(mapper0.Front() != nullptr);
1225 output0.emplace_back(std::move(*mapper0.Front()));
1226 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001227 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001228
1229 ASSERT_TRUE(mapper0.Front() != nullptr);
1230 output0.emplace_back(std::move(*mapper0.Front()));
1231 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001232 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001233
1234 ASSERT_TRUE(mapper0.Front() != nullptr);
1235 output0.emplace_back(std::move(*mapper0.Front()));
1236 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001237 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001238
1239 ASSERT_TRUE(mapper0.Front() == nullptr);
1240
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001241 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1242 EXPECT_EQ(output0[0].monotonic_event_time.time,
1243 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001244 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001245
1246 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1247 EXPECT_EQ(output0[1].monotonic_event_time.time,
1248 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001249 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001250
1251 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1252 EXPECT_EQ(output0[2].monotonic_event_time.time,
1253 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001254 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001255 }
Austin Schuh79b30942021-01-24 22:32:21 -08001256
1257 EXPECT_EQ(mapper0_count, 3u);
1258 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001259}
1260
1261// Tests that we return just the timestamps if we couldn't find the data and the
1262// missing data was at the beginning of the file.
1263TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1264 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1265 {
1266 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1267 writer0.QueueSpan(config0_.span());
1268 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1269 writer1.QueueSpan(config2_.span());
1270
1271 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
1272 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1273 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1274
1275 writer0.QueueSizedFlatbuffer(
1276 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1277 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1278 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1279
1280 writer0.QueueSizedFlatbuffer(
1281 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1282 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1283 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1284 }
1285
1286 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1287
1288 ASSERT_EQ(parts[0].logger_node, "pi1");
1289 ASSERT_EQ(parts[1].logger_node, "pi2");
1290
Austin Schuh79b30942021-01-24 22:32:21 -08001291 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001292 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001293 mapper0.set_timestamp_callback(
1294 [&](TimestampedMessage *) { ++mapper0_count; });
1295 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001296 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001297 mapper1.set_timestamp_callback(
1298 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001299
1300 mapper0.AddPeer(&mapper1);
1301 mapper1.AddPeer(&mapper0);
1302
1303 {
1304 SCOPED_TRACE("Trying node1 now");
1305 std::deque<TimestampedMessage> output1;
1306
1307 ASSERT_TRUE(mapper1.Front() != nullptr);
1308 output1.emplace_back(std::move(*mapper1.Front()));
1309 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001310 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001311
1312 ASSERT_TRUE(mapper1.Front() != nullptr);
1313 output1.emplace_back(std::move(*mapper1.Front()));
1314 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001315 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001316
1317 ASSERT_TRUE(mapper1.Front() != nullptr);
1318 output1.emplace_back(std::move(*mapper1.Front()));
1319 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001320 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001321
1322 ASSERT_TRUE(mapper1.Front() == nullptr);
1323
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001324 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1325 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001326 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001327 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001328
1329 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1330 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001331 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001332 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001333
1334 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1335 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001336 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001337 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001338 }
Austin Schuh79b30942021-01-24 22:32:21 -08001339
1340 EXPECT_EQ(mapper0_count, 0u);
1341 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001342}
1343
1344// Tests that we return just the timestamps if we couldn't find the data and the
1345// missing data was at the end of the file.
1346TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1347 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1348 {
1349 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1350 writer0.QueueSpan(config0_.span());
1351 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1352 writer1.QueueSpan(config2_.span());
1353
1354 writer0.QueueSizedFlatbuffer(
1355 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1356 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1357 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1358
1359 writer0.QueueSizedFlatbuffer(
1360 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1361 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1362 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1363
1364 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
1365 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1366 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1367 }
1368
1369 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1370
1371 ASSERT_EQ(parts[0].logger_node, "pi1");
1372 ASSERT_EQ(parts[1].logger_node, "pi2");
1373
Austin Schuh79b30942021-01-24 22:32:21 -08001374 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001375 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001376 mapper0.set_timestamp_callback(
1377 [&](TimestampedMessage *) { ++mapper0_count; });
1378 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001379 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001380 mapper1.set_timestamp_callback(
1381 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001382
1383 mapper0.AddPeer(&mapper1);
1384 mapper1.AddPeer(&mapper0);
1385
1386 {
1387 SCOPED_TRACE("Trying node1 now");
1388 std::deque<TimestampedMessage> output1;
1389
1390 ASSERT_TRUE(mapper1.Front() != nullptr);
1391 output1.emplace_back(std::move(*mapper1.Front()));
1392 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001393 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001394
1395 ASSERT_TRUE(mapper1.Front() != nullptr);
1396 output1.emplace_back(std::move(*mapper1.Front()));
1397 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001398 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001399
1400 ASSERT_TRUE(mapper1.Front() != nullptr);
1401 output1.emplace_back(std::move(*mapper1.Front()));
1402 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001403 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001404
1405 ASSERT_TRUE(mapper1.Front() == nullptr);
1406
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001407 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1408 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001409 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001410 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001411
1412 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1413 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001414 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001415 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001416
1417 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1418 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001419 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001420 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001421 }
Austin Schuh79b30942021-01-24 22:32:21 -08001422
1423 EXPECT_EQ(mapper0_count, 0u);
1424 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001425}
1426
Austin Schuh993ccb52020-12-12 15:59:32 -08001427// Tests that we handle a message which failed to forward or be logged.
1428TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1429 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1430 {
1431 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1432 writer0.QueueSpan(config0_.span());
1433 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1434 writer1.QueueSpan(config2_.span());
1435
1436 writer0.QueueSizedFlatbuffer(
1437 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1438 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1439 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1440
1441 // Create both the timestamp and message, but don't log them, simulating a
1442 // forwarding drop.
1443 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1444 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1445 chrono::seconds(100));
1446
1447 writer0.QueueSizedFlatbuffer(
1448 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1449 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1450 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1451 }
1452
1453 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1454
1455 ASSERT_EQ(parts[0].logger_node, "pi1");
1456 ASSERT_EQ(parts[1].logger_node, "pi2");
1457
Austin Schuh79b30942021-01-24 22:32:21 -08001458 size_t mapper0_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001459 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001460 mapper0.set_timestamp_callback(
1461 [&](TimestampedMessage *) { ++mapper0_count; });
1462 size_t mapper1_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001463 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001464 mapper1.set_timestamp_callback(
1465 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001466
1467 mapper0.AddPeer(&mapper1);
1468 mapper1.AddPeer(&mapper0);
1469
1470 {
1471 std::deque<TimestampedMessage> output1;
1472
1473 ASSERT_TRUE(mapper1.Front() != nullptr);
1474 output1.emplace_back(std::move(*mapper1.Front()));
1475 mapper1.PopFront();
1476
1477 ASSERT_TRUE(mapper1.Front() != nullptr);
1478 output1.emplace_back(std::move(*mapper1.Front()));
1479
1480 ASSERT_FALSE(mapper1.Front() == nullptr);
1481
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001482 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1483 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001484 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001485 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001486
1487 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1488 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001489 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001490 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh993ccb52020-12-12 15:59:32 -08001491 }
Austin Schuh79b30942021-01-24 22:32:21 -08001492
1493 EXPECT_EQ(mapper0_count, 0u);
1494 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001495}
1496
Austin Schuhd2f96102020-12-01 20:27:29 -08001497// Tests that we properly sort log files with duplicate timestamps.
1498TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1499 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1500 {
1501 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1502 writer0.QueueSpan(config0_.span());
1503 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1504 writer1.QueueSpan(config2_.span());
1505
1506 writer0.QueueSizedFlatbuffer(
1507 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1508 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1509 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1510
1511 writer0.QueueSizedFlatbuffer(
1512 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1513 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1514 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1515
1516 writer0.QueueSizedFlatbuffer(
1517 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
1518 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1519 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1520
1521 writer0.QueueSizedFlatbuffer(
1522 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
1523 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1524 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1525 }
1526
1527 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1528
1529 ASSERT_EQ(parts[0].logger_node, "pi1");
1530 ASSERT_EQ(parts[1].logger_node, "pi2");
1531
Austin Schuh79b30942021-01-24 22:32:21 -08001532 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001533 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001534 mapper0.set_timestamp_callback(
1535 [&](TimestampedMessage *) { ++mapper0_count; });
1536 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001537 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001538 mapper1.set_timestamp_callback(
1539 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001540
1541 mapper0.AddPeer(&mapper1);
1542 mapper1.AddPeer(&mapper0);
1543
1544 {
1545 SCOPED_TRACE("Trying node1 now");
1546 std::deque<TimestampedMessage> output1;
1547
1548 for (int i = 0; i < 4; ++i) {
1549 ASSERT_TRUE(mapper1.Front() != nullptr);
1550 output1.emplace_back(std::move(*mapper1.Front()));
1551 mapper1.PopFront();
1552 }
1553 ASSERT_TRUE(mapper1.Front() == nullptr);
1554
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001555 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1556 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001557 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001558 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001559
1560 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1561 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001562 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001563 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001564
1565 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1566 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001567 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001568 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001569
1570 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1571 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001572 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001573 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001574 }
Austin Schuh79b30942021-01-24 22:32:21 -08001575
1576 EXPECT_EQ(mapper0_count, 0u);
1577 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001578}
1579
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001580// Tests that we properly produce a valid start time.
Austin Schuhd2f96102020-12-01 20:27:29 -08001581TEST_F(TimestampMapperTest, StartTime) {
1582 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1583 {
1584 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1585 writer0.QueueSpan(config0_.span());
1586 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1587 writer1.QueueSpan(config1_.span());
1588 DetachedBufferWriter writer2(logfile2_, std::make_unique<DummyEncoder>());
1589 writer2.QueueSpan(config3_.span());
1590 }
1591
1592 const std::vector<LogFile> parts =
1593 SortParts({logfile0_, logfile1_, logfile2_});
1594
Austin Schuh79b30942021-01-24 22:32:21 -08001595 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001596 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001597 mapper0.set_timestamp_callback(
1598 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001599
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001600 EXPECT_EQ(mapper0.monotonic_start_time(0), e + chrono::milliseconds(1));
1601 EXPECT_EQ(mapper0.realtime_start_time(0),
Austin Schuhd2f96102020-12-01 20:27:29 -08001602 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001603 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001604}
1605
Austin Schuhfecf1d82020-12-19 16:57:28 -08001606// Tests that when a peer isn't registered, we treat that as if there was no
1607// data available.
1608TEST_F(TimestampMapperTest, NoPeer) {
1609 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1610 {
1611 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1612 writer0.QueueSpan(config0_.span());
1613 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1614 writer1.QueueSpan(config2_.span());
1615
1616 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
1617 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1618 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1619
1620 writer0.QueueSizedFlatbuffer(
1621 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1622 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1623 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1624
1625 writer0.QueueSizedFlatbuffer(
1626 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1627 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1628 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1629 }
1630
1631 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1632
1633 ASSERT_EQ(parts[0].logger_node, "pi1");
1634 ASSERT_EQ(parts[1].logger_node, "pi2");
1635
Austin Schuh79b30942021-01-24 22:32:21 -08001636 size_t mapper1_count = 0;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001637 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001638 mapper1.set_timestamp_callback(
1639 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001640
1641 {
1642 std::deque<TimestampedMessage> output1;
1643
1644 ASSERT_TRUE(mapper1.Front() != nullptr);
1645 output1.emplace_back(std::move(*mapper1.Front()));
1646 mapper1.PopFront();
1647 ASSERT_TRUE(mapper1.Front() != nullptr);
1648 output1.emplace_back(std::move(*mapper1.Front()));
1649 mapper1.PopFront();
1650 ASSERT_TRUE(mapper1.Front() != nullptr);
1651 output1.emplace_back(std::move(*mapper1.Front()));
1652 mapper1.PopFront();
1653 ASSERT_TRUE(mapper1.Front() == nullptr);
1654
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001655 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1656 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001657 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001658 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001659
1660 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1661 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001662 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001663 EXPECT_FALSE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001664
1665 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1666 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001667 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001668 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001669 }
Austin Schuh79b30942021-01-24 22:32:21 -08001670 EXPECT_EQ(mapper1_count, 3u);
1671}
1672
1673// Tests that we can queue messages and call the timestamp callback for both
1674// nodes.
1675TEST_F(TimestampMapperTest, QueueUntilNode0) {
1676 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1677 {
1678 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1679 writer0.QueueSpan(config0_.span());
1680 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1681 writer1.QueueSpan(config2_.span());
1682
1683 writer0.QueueSizedFlatbuffer(
1684 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1685 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1686 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1687
1688 writer0.QueueSizedFlatbuffer(
1689 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
1690 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1691 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1692
1693 writer0.QueueSizedFlatbuffer(
1694 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
1695 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1696 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1697
1698 writer0.QueueSizedFlatbuffer(
1699 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
1700 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1701 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1702 }
1703
1704 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1705
1706 ASSERT_EQ(parts[0].logger_node, "pi1");
1707 ASSERT_EQ(parts[1].logger_node, "pi2");
1708
1709 size_t mapper0_count = 0;
1710 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1711 mapper0.set_timestamp_callback(
1712 [&](TimestampedMessage *) { ++mapper0_count; });
1713 size_t mapper1_count = 0;
1714 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1715 mapper1.set_timestamp_callback(
1716 [&](TimestampedMessage *) { ++mapper1_count; });
1717
1718 mapper0.AddPeer(&mapper1);
1719 mapper1.AddPeer(&mapper0);
1720
1721 {
1722 std::deque<TimestampedMessage> output0;
1723
1724 EXPECT_EQ(mapper0_count, 0u);
1725 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001726 mapper0.QueueUntil(
1727 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001728 EXPECT_EQ(mapper0_count, 3u);
1729 EXPECT_EQ(mapper1_count, 0u);
1730
1731 ASSERT_TRUE(mapper0.Front() != nullptr);
1732 EXPECT_EQ(mapper0_count, 3u);
1733 EXPECT_EQ(mapper1_count, 0u);
1734
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001735 mapper0.QueueUntil(
1736 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001737 EXPECT_EQ(mapper0_count, 3u);
1738 EXPECT_EQ(mapper1_count, 0u);
1739
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001740 mapper0.QueueUntil(
1741 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001742 EXPECT_EQ(mapper0_count, 4u);
1743 EXPECT_EQ(mapper1_count, 0u);
1744
1745 output0.emplace_back(std::move(*mapper0.Front()));
1746 mapper0.PopFront();
1747 output0.emplace_back(std::move(*mapper0.Front()));
1748 mapper0.PopFront();
1749 output0.emplace_back(std::move(*mapper0.Front()));
1750 mapper0.PopFront();
1751 output0.emplace_back(std::move(*mapper0.Front()));
1752 mapper0.PopFront();
1753
1754 EXPECT_EQ(mapper0_count, 4u);
1755 EXPECT_EQ(mapper1_count, 0u);
1756
1757 ASSERT_TRUE(mapper0.Front() == nullptr);
1758
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001759 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1760 EXPECT_EQ(output0[0].monotonic_event_time.time,
1761 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001762 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001763
1764 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1765 EXPECT_EQ(output0[1].monotonic_event_time.time,
1766 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001767 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001768
1769 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1770 EXPECT_EQ(output0[2].monotonic_event_time.time,
1771 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001772 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001773
1774 EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
1775 EXPECT_EQ(output0[3].monotonic_event_time.time,
1776 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001777 EXPECT_TRUE(output0[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001778 }
1779
1780 {
1781 SCOPED_TRACE("Trying node1 now");
1782 std::deque<TimestampedMessage> output1;
1783
1784 EXPECT_EQ(mapper0_count, 4u);
1785 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001786 mapper1.QueueUntil(BootTimestamp{
1787 .boot = 0,
1788 .time = e + chrono::seconds(100) + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001789 EXPECT_EQ(mapper0_count, 4u);
1790 EXPECT_EQ(mapper1_count, 3u);
1791
1792 ASSERT_TRUE(mapper1.Front() != nullptr);
1793 EXPECT_EQ(mapper0_count, 4u);
1794 EXPECT_EQ(mapper1_count, 3u);
1795
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001796 mapper1.QueueUntil(BootTimestamp{
1797 .boot = 0,
1798 .time = e + chrono::seconds(100) + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001799 EXPECT_EQ(mapper0_count, 4u);
1800 EXPECT_EQ(mapper1_count, 3u);
1801
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001802 mapper1.QueueUntil(BootTimestamp{
1803 .boot = 0,
1804 .time = e + chrono::seconds(100) + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001805 EXPECT_EQ(mapper0_count, 4u);
1806 EXPECT_EQ(mapper1_count, 4u);
1807
1808 ASSERT_TRUE(mapper1.Front() != nullptr);
1809 EXPECT_EQ(mapper0_count, 4u);
1810 EXPECT_EQ(mapper1_count, 4u);
1811
1812 output1.emplace_back(std::move(*mapper1.Front()));
1813 mapper1.PopFront();
1814 ASSERT_TRUE(mapper1.Front() != nullptr);
1815 output1.emplace_back(std::move(*mapper1.Front()));
1816 mapper1.PopFront();
1817 ASSERT_TRUE(mapper1.Front() != nullptr);
1818 output1.emplace_back(std::move(*mapper1.Front()));
1819 mapper1.PopFront();
1820 ASSERT_TRUE(mapper1.Front() != nullptr);
1821 output1.emplace_back(std::move(*mapper1.Front()));
1822 mapper1.PopFront();
1823
1824 EXPECT_EQ(mapper0_count, 4u);
1825 EXPECT_EQ(mapper1_count, 4u);
1826
1827 ASSERT_TRUE(mapper1.Front() == nullptr);
1828
1829 EXPECT_EQ(mapper0_count, 4u);
1830 EXPECT_EQ(mapper1_count, 4u);
1831
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001832 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1833 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001834 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001835 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001836
1837 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1838 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001839 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001840 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001841
1842 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1843 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001844 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001845 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001846
1847 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1848 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001849 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001850 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001851 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08001852}
1853
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001854class BootMergerTest : public SortingElementTest {
1855 public:
1856 BootMergerTest()
1857 : SortingElementTest(),
1858 boot0_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07001859 /* 100ms */
1860 "max_out_of_order_duration": 100000000,
1861 "node": {
1862 "name": "pi2"
1863 },
1864 "logger_node": {
1865 "name": "pi1"
1866 },
1867 "monotonic_start_time": 1000000,
1868 "realtime_start_time": 1000000000000,
1869 "logger_monotonic_start_time": 1000000,
1870 "logger_realtime_start_time": 1000000000000,
1871 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
1872 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
1873 "parts_index": 0,
1874 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
1875 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07001876 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
1877 "boot_uuids": [
1878 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
1879 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
1880 ""
1881 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001882})")),
1883 boot1_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07001884 /* 100ms */
1885 "max_out_of_order_duration": 100000000,
1886 "node": {
1887 "name": "pi2"
1888 },
1889 "logger_node": {
1890 "name": "pi1"
1891 },
1892 "monotonic_start_time": 1000000,
1893 "realtime_start_time": 1000000000000,
1894 "logger_monotonic_start_time": 1000000,
1895 "logger_realtime_start_time": 1000000000000,
1896 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
1897 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
1898 "parts_index": 1,
1899 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
1900 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07001901 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
1902 "boot_uuids": [
1903 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
1904 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
1905 ""
1906 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001907})")) {}
Austin Schuh8bee6882021-06-28 21:03:28 -07001908
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001909 protected:
1910 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
1911 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
1912};
1913
1914// This tests that we can properly sort a multi-node log file which has the old
1915// (and buggy) timestamps in the header, and the non-resetting parts_index.
1916// These make it so we can just bairly figure out what happened first and what
1917// happened second, but not in a way that is robust to multiple nodes rebooting.
1918TEST_F(BootMergerTest, OldReboot) {
Austin Schuh8bee6882021-06-28 21:03:28 -07001919 {
1920 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001921 writer.QueueSpan(boot0_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07001922 }
1923 {
1924 DetachedBufferWriter writer(logfile1_, std::make_unique<DummyEncoder>());
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001925 writer.QueueSpan(boot1_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07001926 }
1927
1928 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1929
1930 ASSERT_EQ(parts.size(), 1u);
1931 ASSERT_EQ(parts[0].parts.size(), 2u);
1932
1933 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
1934 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001935 boot0_.message().source_node_boot_uuid()->string_view());
Austin Schuh8bee6882021-06-28 21:03:28 -07001936
1937 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
1938 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001939 boot1_.message().source_node_boot_uuid()->string_view());
1940}
1941
1942// This tests that we can produce messages ordered across a reboot.
1943TEST_F(BootMergerTest, SortAcrossReboot) {
1944 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1945 {
1946 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
1947 writer.QueueSpan(boot0_.span());
1948 writer.QueueSizedFlatbuffer(
1949 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1950 writer.QueueSizedFlatbuffer(
1951 MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
1952 }
1953 {
1954 DetachedBufferWriter writer(logfile1_, std::make_unique<DummyEncoder>());
1955 writer.QueueSpan(boot1_.span());
1956 writer.QueueSizedFlatbuffer(
1957 MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
1958 writer.QueueSizedFlatbuffer(
1959 MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
1960 }
1961
1962 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1963 ASSERT_EQ(parts.size(), 1u);
1964 ASSERT_EQ(parts[0].parts.size(), 2u);
1965
1966 BootMerger merger(FilterPartsForNode(parts, "pi2"));
1967
1968 EXPECT_EQ(merger.node(), 1u);
1969
1970 std::vector<Message> output;
1971 for (int i = 0; i < 4; ++i) {
1972 ASSERT_TRUE(merger.Front() != nullptr);
1973 output.emplace_back(std::move(*merger.Front()));
1974 merger.PopFront();
1975 }
1976
1977 ASSERT_TRUE(merger.Front() == nullptr);
1978
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001979 EXPECT_EQ(output[0].timestamp.boot, 0u);
1980 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
1981 EXPECT_EQ(output[1].timestamp.boot, 0u);
1982 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(2000));
1983
1984 EXPECT_EQ(output[2].timestamp.boot, 1u);
1985 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(100));
1986 EXPECT_EQ(output[3].timestamp.boot, 1u);
1987 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
Austin Schuh8bee6882021-06-28 21:03:28 -07001988}
1989
Austin Schuh48507722021-07-17 17:29:24 -07001990class RebootTimestampMapperTest : public SortingElementTest {
1991 public:
1992 RebootTimestampMapperTest()
1993 : SortingElementTest(),
1994 boot0a_(MakeHeader(config_, R"({
1995 /* 100ms */
1996 "max_out_of_order_duration": 100000000,
1997 "node": {
1998 "name": "pi1"
1999 },
2000 "logger_node": {
2001 "name": "pi1"
2002 },
2003 "monotonic_start_time": 1000000,
2004 "realtime_start_time": 1000000000000,
2005 "logger_monotonic_start_time": 1000000,
2006 "logger_realtime_start_time": 1000000000000,
2007 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2008 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2009 "parts_index": 0,
2010 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2011 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2012 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2013 "boot_uuids": [
2014 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2015 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2016 ""
2017 ]
2018})")),
2019 boot0b_(MakeHeader(config_, R"({
2020 /* 100ms */
2021 "max_out_of_order_duration": 100000000,
2022 "node": {
2023 "name": "pi1"
2024 },
2025 "logger_node": {
2026 "name": "pi1"
2027 },
2028 "monotonic_start_time": 1000000,
2029 "realtime_start_time": 1000000000000,
2030 "logger_monotonic_start_time": 1000000,
2031 "logger_realtime_start_time": 1000000000000,
2032 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2033 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2034 "parts_index": 1,
2035 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2036 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2037 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2038 "boot_uuids": [
2039 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2040 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2041 ""
2042 ]
2043})")),
2044 boot1a_(MakeHeader(config_, R"({
2045 /* 100ms */
2046 "max_out_of_order_duration": 100000000,
2047 "node": {
2048 "name": "pi2"
2049 },
2050 "logger_node": {
2051 "name": "pi1"
2052 },
2053 "monotonic_start_time": 1000000,
2054 "realtime_start_time": 1000000000000,
2055 "logger_monotonic_start_time": 1000000,
2056 "logger_realtime_start_time": 1000000000000,
2057 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2058 "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
2059 "parts_index": 0,
2060 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2061 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2062 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2063 "boot_uuids": [
2064 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2065 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2066 ""
2067 ]
2068})")),
2069 boot1b_(MakeHeader(config_, R"({
2070 /* 100ms */
2071 "max_out_of_order_duration": 100000000,
2072 "node": {
2073 "name": "pi2"
2074 },
2075 "logger_node": {
2076 "name": "pi1"
2077 },
2078 "monotonic_start_time": 1000000,
2079 "realtime_start_time": 1000000000000,
2080 "logger_monotonic_start_time": 1000000,
2081 "logger_realtime_start_time": 1000000000000,
2082 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2083 "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
2084 "parts_index": 1,
2085 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2086 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2087 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2088 "boot_uuids": [
2089 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2090 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2091 ""
2092 ]
2093})")) {}
2094
2095 protected:
2096 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
2097 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
2098 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
2099 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
2100};
2101
Austin Schuh48507722021-07-17 17:29:24 -07002102// Tests that we can match timestamps on delivered messages in the presence of
2103// reboots on the node receiving timestamps.
2104TEST_F(RebootTimestampMapperTest, ReadNode0First) {
2105 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2106 {
2107 DetachedBufferWriter writer0a(logfile0_, std::make_unique<DummyEncoder>());
2108 writer0a.QueueSpan(boot0a_.span());
2109 DetachedBufferWriter writer0b(logfile1_, std::make_unique<DummyEncoder>());
2110 writer0b.QueueSpan(boot0b_.span());
2111 DetachedBufferWriter writer1a(logfile2_, std::make_unique<DummyEncoder>());
2112 writer1a.QueueSpan(boot1a_.span());
2113 DetachedBufferWriter writer1b(logfile3_, std::make_unique<DummyEncoder>());
2114 writer1b.QueueSpan(boot1b_.span());
2115
2116 writer0a.QueueSizedFlatbuffer(
2117 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
2118 writer1a.QueueSizedFlatbuffer(MakeTimestampMessage(
2119 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
2120 e + chrono::milliseconds(1001)));
2121
Austin Schuh58646e22021-08-23 23:51:46 -07002122 writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
2123 e + chrono::milliseconds(1000), 0, chrono::seconds(21),
2124 e + chrono::milliseconds(2001)));
2125
Austin Schuh48507722021-07-17 17:29:24 -07002126 writer0b.QueueSizedFlatbuffer(
2127 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
2128 writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
2129 e + chrono::milliseconds(2000), 0, chrono::seconds(20),
2130 e + chrono::milliseconds(2001)));
2131
2132 writer0b.QueueSizedFlatbuffer(
2133 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
2134 writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
2135 e + chrono::milliseconds(3000), 0, chrono::seconds(20),
2136 e + chrono::milliseconds(3001)));
2137 }
2138
Austin Schuh58646e22021-08-23 23:51:46 -07002139 const std::vector<LogFile> parts =
2140 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Austin Schuh48507722021-07-17 17:29:24 -07002141
2142 for (const auto &x : parts) {
2143 LOG(INFO) << x;
2144 }
2145 ASSERT_EQ(parts.size(), 1u);
2146 ASSERT_EQ(parts[0].logger_node, "pi1");
2147
2148 size_t mapper0_count = 0;
2149 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2150 mapper0.set_timestamp_callback(
2151 [&](TimestampedMessage *) { ++mapper0_count; });
2152 size_t mapper1_count = 0;
2153 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2154 mapper1.set_timestamp_callback(
2155 [&](TimestampedMessage *) { ++mapper1_count; });
2156
2157 mapper0.AddPeer(&mapper1);
2158 mapper1.AddPeer(&mapper0);
2159
2160 {
2161 std::deque<TimestampedMessage> output0;
2162
2163 EXPECT_EQ(mapper0_count, 0u);
2164 EXPECT_EQ(mapper1_count, 0u);
2165 ASSERT_TRUE(mapper0.Front() != nullptr);
2166 EXPECT_EQ(mapper0_count, 1u);
2167 EXPECT_EQ(mapper1_count, 0u);
2168 output0.emplace_back(std::move(*mapper0.Front()));
2169 mapper0.PopFront();
2170 EXPECT_TRUE(mapper0.started());
2171 EXPECT_EQ(mapper0_count, 1u);
2172 EXPECT_EQ(mapper1_count, 0u);
2173
2174 ASSERT_TRUE(mapper0.Front() != nullptr);
2175 EXPECT_EQ(mapper0_count, 2u);
2176 EXPECT_EQ(mapper1_count, 0u);
2177 output0.emplace_back(std::move(*mapper0.Front()));
2178 mapper0.PopFront();
2179 EXPECT_TRUE(mapper0.started());
2180
2181 ASSERT_TRUE(mapper0.Front() != nullptr);
2182 output0.emplace_back(std::move(*mapper0.Front()));
2183 mapper0.PopFront();
2184 EXPECT_TRUE(mapper0.started());
2185
2186 EXPECT_EQ(mapper0_count, 3u);
2187 EXPECT_EQ(mapper1_count, 0u);
2188
2189 ASSERT_TRUE(mapper0.Front() == nullptr);
2190
2191 LOG(INFO) << output0[0];
2192 LOG(INFO) << output0[1];
2193 LOG(INFO) << output0[2];
2194
2195 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2196 EXPECT_EQ(output0[0].monotonic_event_time.time,
2197 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002198 EXPECT_EQ(output0[0].queue_index,
2199 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002200 EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
2201 EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002202 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002203
2204 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2205 EXPECT_EQ(output0[1].monotonic_event_time.time,
2206 e + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002207 EXPECT_EQ(output0[1].queue_index,
2208 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002209 EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
2210 EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002211 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002212
2213 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2214 EXPECT_EQ(output0[2].monotonic_event_time.time,
2215 e + chrono::milliseconds(3000));
Austin Schuh58646e22021-08-23 23:51:46 -07002216 EXPECT_EQ(output0[2].queue_index,
2217 (BootQueueIndex{.boot = 0u, .index = 2u}));
Austin Schuh48507722021-07-17 17:29:24 -07002218 EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
2219 EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002220 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002221 }
2222
2223 {
2224 SCOPED_TRACE("Trying node1 now");
2225 std::deque<TimestampedMessage> output1;
2226
2227 EXPECT_EQ(mapper0_count, 3u);
2228 EXPECT_EQ(mapper1_count, 0u);
2229
2230 ASSERT_TRUE(mapper1.Front() != nullptr);
2231 EXPECT_EQ(mapper0_count, 3u);
2232 EXPECT_EQ(mapper1_count, 1u);
2233 output1.emplace_back(std::move(*mapper1.Front()));
2234 mapper1.PopFront();
2235 EXPECT_TRUE(mapper1.started());
2236 EXPECT_EQ(mapper0_count, 3u);
2237 EXPECT_EQ(mapper1_count, 1u);
2238
2239 ASSERT_TRUE(mapper1.Front() != nullptr);
2240 EXPECT_EQ(mapper0_count, 3u);
2241 EXPECT_EQ(mapper1_count, 2u);
2242 output1.emplace_back(std::move(*mapper1.Front()));
2243 mapper1.PopFront();
2244 EXPECT_TRUE(mapper1.started());
2245
2246 ASSERT_TRUE(mapper1.Front() != nullptr);
2247 output1.emplace_back(std::move(*mapper1.Front()));
2248 mapper1.PopFront();
2249 EXPECT_TRUE(mapper1.started());
2250
Austin Schuh58646e22021-08-23 23:51:46 -07002251 ASSERT_TRUE(mapper1.Front() != nullptr);
2252 output1.emplace_back(std::move(*mapper1.Front()));
2253 mapper1.PopFront();
2254 EXPECT_TRUE(mapper1.started());
2255
Austin Schuh48507722021-07-17 17:29:24 -07002256 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002257 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002258
2259 ASSERT_TRUE(mapper1.Front() == nullptr);
2260
2261 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002262 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002263
2264 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2265 EXPECT_EQ(output1[0].monotonic_event_time.time,
2266 e + chrono::seconds(100) + chrono::milliseconds(1000));
2267 EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
2268 EXPECT_EQ(output1[0].monotonic_remote_time.time,
2269 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002270 EXPECT_EQ(output1[0].remote_queue_index,
2271 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002272 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
2273 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
2274 e + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002275 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002276
2277 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2278 EXPECT_EQ(output1[1].monotonic_event_time.time,
2279 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002280 EXPECT_EQ(output1[1].remote_queue_index,
2281 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002282 EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
2283 EXPECT_EQ(output1[1].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002284 e + chrono::milliseconds(1000));
Austin Schuh48507722021-07-17 17:29:24 -07002285 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
2286 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
2287 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002288 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002289
2290 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2291 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002292 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh48507722021-07-17 17:29:24 -07002293 EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
2294 EXPECT_EQ(output1[2].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002295 e + chrono::milliseconds(2000));
2296 EXPECT_EQ(output1[2].remote_queue_index,
2297 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002298 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
2299 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002300 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002301 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002302
Austin Schuh58646e22021-08-23 23:51:46 -07002303 EXPECT_EQ(output1[3].monotonic_event_time.boot, 1u);
2304 EXPECT_EQ(output1[3].monotonic_event_time.time,
2305 e + chrono::seconds(20) + chrono::milliseconds(3000));
2306 EXPECT_EQ(output1[3].monotonic_remote_time.boot, 0u);
2307 EXPECT_EQ(output1[3].monotonic_remote_time.time,
2308 e + chrono::milliseconds(3000));
2309 EXPECT_EQ(output1[3].remote_queue_index,
2310 (BootQueueIndex{.boot = 0u, .index = 2u}));
2311 EXPECT_EQ(output1[3].monotonic_timestamp_time.boot, 0u);
2312 EXPECT_EQ(output1[3].monotonic_timestamp_time.time,
2313 e + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002314 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh58646e22021-08-23 23:51:46 -07002315
Austin Schuh48507722021-07-17 17:29:24 -07002316 LOG(INFO) << output1[0];
2317 LOG(INFO) << output1[1];
2318 LOG(INFO) << output1[2];
Austin Schuh58646e22021-08-23 23:51:46 -07002319 LOG(INFO) << output1[3];
Austin Schuh48507722021-07-17 17:29:24 -07002320 }
2321}
2322
2323TEST_F(RebootTimestampMapperTest, Node2Reboot) {
2324 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2325 {
2326 DetachedBufferWriter writer0a(logfile0_, std::make_unique<DummyEncoder>());
2327 writer0a.QueueSpan(boot0a_.span());
2328 DetachedBufferWriter writer0b(logfile1_, std::make_unique<DummyEncoder>());
2329 writer0b.QueueSpan(boot0b_.span());
2330 DetachedBufferWriter writer1a(logfile2_, std::make_unique<DummyEncoder>());
2331 writer1a.QueueSpan(boot1a_.span());
2332 DetachedBufferWriter writer1b(logfile3_, std::make_unique<DummyEncoder>());
2333 writer1b.QueueSpan(boot1b_.span());
2334
2335 writer1a.QueueSizedFlatbuffer(MakeLogMessage(
2336 e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
2337 writer0a.QueueSizedFlatbuffer(MakeTimestampMessage(
2338 e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
2339 chrono::seconds(-100),
2340 e + chrono::seconds(100) + chrono::milliseconds(1001)));
2341
2342 writer1b.QueueSizedFlatbuffer(MakeLogMessage(
2343 e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
2344 writer0b.QueueSizedFlatbuffer(MakeTimestampMessage(
2345 e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
2346 chrono::seconds(-20),
2347 e + chrono::seconds(20) + chrono::milliseconds(2001)));
2348
2349 writer1b.QueueSizedFlatbuffer(MakeLogMessage(
2350 e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
2351 writer0b.QueueSizedFlatbuffer(MakeTimestampMessage(
2352 e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
2353 chrono::seconds(-20),
2354 e + chrono::seconds(20) + chrono::milliseconds(3001)));
2355 }
2356
2357 const std::vector<LogFile> parts =
2358 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2359
2360 for (const auto &x : parts) {
2361 LOG(INFO) << x;
2362 }
2363 ASSERT_EQ(parts.size(), 1u);
2364 ASSERT_EQ(parts[0].logger_node, "pi1");
2365
2366 size_t mapper0_count = 0;
2367 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2368 mapper0.set_timestamp_callback(
2369 [&](TimestampedMessage *) { ++mapper0_count; });
2370 size_t mapper1_count = 0;
2371 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2372 mapper1.set_timestamp_callback(
2373 [&](TimestampedMessage *) { ++mapper1_count; });
2374
2375 mapper0.AddPeer(&mapper1);
2376 mapper1.AddPeer(&mapper0);
2377
2378 {
2379 std::deque<TimestampedMessage> output0;
2380
2381 EXPECT_EQ(mapper0_count, 0u);
2382 EXPECT_EQ(mapper1_count, 0u);
2383 ASSERT_TRUE(mapper0.Front() != nullptr);
2384 EXPECT_EQ(mapper0_count, 1u);
2385 EXPECT_EQ(mapper1_count, 0u);
2386 output0.emplace_back(std::move(*mapper0.Front()));
2387 mapper0.PopFront();
2388 EXPECT_TRUE(mapper0.started());
2389 EXPECT_EQ(mapper0_count, 1u);
2390 EXPECT_EQ(mapper1_count, 0u);
2391
2392 ASSERT_TRUE(mapper0.Front() != nullptr);
2393 EXPECT_EQ(mapper0_count, 2u);
2394 EXPECT_EQ(mapper1_count, 0u);
2395 output0.emplace_back(std::move(*mapper0.Front()));
2396 mapper0.PopFront();
2397 EXPECT_TRUE(mapper0.started());
2398
2399 ASSERT_TRUE(mapper0.Front() != nullptr);
2400 output0.emplace_back(std::move(*mapper0.Front()));
2401 mapper0.PopFront();
2402 EXPECT_TRUE(mapper0.started());
2403
2404 EXPECT_EQ(mapper0_count, 3u);
2405 EXPECT_EQ(mapper1_count, 0u);
2406
2407 ASSERT_TRUE(mapper0.Front() == nullptr);
2408
2409 LOG(INFO) << output0[0];
2410 LOG(INFO) << output0[1];
2411 LOG(INFO) << output0[2];
2412
2413 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2414 EXPECT_EQ(output0[0].monotonic_event_time.time,
2415 e + chrono::milliseconds(1000));
2416 EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
2417 EXPECT_EQ(output0[0].monotonic_remote_time.time,
2418 e + chrono::seconds(100) + chrono::milliseconds(1000));
2419 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
2420 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
2421 e + chrono::seconds(100) + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002422 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002423
2424 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2425 EXPECT_EQ(output0[1].monotonic_event_time.time,
2426 e + chrono::milliseconds(2000));
2427 EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
2428 EXPECT_EQ(output0[1].monotonic_remote_time.time,
2429 e + chrono::seconds(20) + chrono::milliseconds(2000));
2430 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
2431 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
2432 e + chrono::seconds(20) + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002433 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002434
2435 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2436 EXPECT_EQ(output0[2].monotonic_event_time.time,
2437 e + chrono::milliseconds(3000));
2438 EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
2439 EXPECT_EQ(output0[2].monotonic_remote_time.time,
2440 e + chrono::seconds(20) + chrono::milliseconds(3000));
2441 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
2442 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
2443 e + chrono::seconds(20) + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002444 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002445 }
2446
2447 {
2448 SCOPED_TRACE("Trying node1 now");
2449 std::deque<TimestampedMessage> output1;
2450
2451 EXPECT_EQ(mapper0_count, 3u);
2452 EXPECT_EQ(mapper1_count, 0u);
2453
2454 ASSERT_TRUE(mapper1.Front() != nullptr);
2455 EXPECT_EQ(mapper0_count, 3u);
2456 EXPECT_EQ(mapper1_count, 1u);
2457 output1.emplace_back(std::move(*mapper1.Front()));
2458 mapper1.PopFront();
2459 EXPECT_TRUE(mapper1.started());
2460 EXPECT_EQ(mapper0_count, 3u);
2461 EXPECT_EQ(mapper1_count, 1u);
2462
2463 ASSERT_TRUE(mapper1.Front() != nullptr);
2464 EXPECT_EQ(mapper0_count, 3u);
2465 EXPECT_EQ(mapper1_count, 2u);
2466 output1.emplace_back(std::move(*mapper1.Front()));
2467 mapper1.PopFront();
2468 EXPECT_TRUE(mapper1.started());
2469
2470 ASSERT_TRUE(mapper1.Front() != nullptr);
2471 output1.emplace_back(std::move(*mapper1.Front()));
2472 mapper1.PopFront();
2473 EXPECT_TRUE(mapper1.started());
2474
2475 EXPECT_EQ(mapper0_count, 3u);
2476 EXPECT_EQ(mapper1_count, 3u);
2477
2478 ASSERT_TRUE(mapper1.Front() == nullptr);
2479
2480 EXPECT_EQ(mapper0_count, 3u);
2481 EXPECT_EQ(mapper1_count, 3u);
2482
2483 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2484 EXPECT_EQ(output1[0].monotonic_event_time.time,
2485 e + chrono::seconds(100) + chrono::milliseconds(1000));
2486 EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
2487 EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002488 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002489
2490 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2491 EXPECT_EQ(output1[1].monotonic_event_time.time,
2492 e + chrono::seconds(20) + chrono::milliseconds(2000));
2493 EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
2494 EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002495 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002496
2497 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2498 EXPECT_EQ(output1[2].monotonic_event_time.time,
2499 e + chrono::seconds(20) + chrono::milliseconds(3000));
2500 EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
2501 EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002502 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002503
2504 LOG(INFO) << output1[0];
2505 LOG(INFO) << output1[1];
2506 LOG(INFO) << output1[2];
2507 }
2508}
2509
Austin Schuh44c61472021-11-22 21:04:10 -08002510class SortingDeathTest : public SortingElementTest {
2511 public:
2512 SortingDeathTest()
2513 : SortingElementTest(),
2514 part0_(MakeHeader(config_, R"({
2515 /* 100ms */
2516 "max_out_of_order_duration": 100000000,
2517 "node": {
2518 "name": "pi1"
2519 },
2520 "logger_node": {
2521 "name": "pi1"
2522 },
2523 "monotonic_start_time": 1000000,
2524 "realtime_start_time": 1000000000000,
2525 "logger_monotonic_start_time": 1000000,
2526 "logger_realtime_start_time": 1000000000000,
2527 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2528 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2529 "parts_index": 0,
2530 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2531 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2532 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2533 "boot_uuids": [
2534 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2535 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2536 ""
2537 ],
2538 "oldest_remote_monotonic_timestamps": [
2539 9223372036854775807,
2540 9223372036854775807,
2541 9223372036854775807
2542 ],
2543 "oldest_local_monotonic_timestamps": [
2544 9223372036854775807,
2545 9223372036854775807,
2546 9223372036854775807
2547 ],
2548 "oldest_remote_unreliable_monotonic_timestamps": [
2549 9223372036854775807,
2550 0,
2551 9223372036854775807
2552 ],
2553 "oldest_local_unreliable_monotonic_timestamps": [
2554 9223372036854775807,
2555 0,
2556 9223372036854775807
2557 ]
2558})")),
2559 part1_(MakeHeader(config_, R"({
2560 /* 100ms */
2561 "max_out_of_order_duration": 100000000,
2562 "node": {
2563 "name": "pi1"
2564 },
2565 "logger_node": {
2566 "name": "pi1"
2567 },
2568 "monotonic_start_time": 1000000,
2569 "realtime_start_time": 1000000000000,
2570 "logger_monotonic_start_time": 1000000,
2571 "logger_realtime_start_time": 1000000000000,
2572 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2573 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2574 "parts_index": 1,
2575 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2576 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2577 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2578 "boot_uuids": [
2579 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2580 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2581 ""
2582 ],
2583 "oldest_remote_monotonic_timestamps": [
2584 9223372036854775807,
2585 9223372036854775807,
2586 9223372036854775807
2587 ],
2588 "oldest_local_monotonic_timestamps": [
2589 9223372036854775807,
2590 9223372036854775807,
2591 9223372036854775807
2592 ],
2593 "oldest_remote_unreliable_monotonic_timestamps": [
2594 9223372036854775807,
2595 100000,
2596 9223372036854775807
2597 ],
2598 "oldest_local_unreliable_monotonic_timestamps": [
2599 9223372036854775807,
2600 100000,
2601 9223372036854775807
2602 ]
2603})")),
2604 part2_(MakeHeader(config_, R"({
2605 /* 100ms */
2606 "max_out_of_order_duration": 100000000,
2607 "node": {
2608 "name": "pi1"
2609 },
2610 "logger_node": {
2611 "name": "pi1"
2612 },
2613 "monotonic_start_time": 1000000,
2614 "realtime_start_time": 1000000000000,
2615 "logger_monotonic_start_time": 1000000,
2616 "logger_realtime_start_time": 1000000000000,
2617 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2618 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2619 "parts_index": 2,
2620 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2621 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2622 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2623 "boot_uuids": [
2624 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2625 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2626 ""
2627 ],
2628 "oldest_remote_monotonic_timestamps": [
2629 9223372036854775807,
2630 9223372036854775807,
2631 9223372036854775807
2632 ],
2633 "oldest_local_monotonic_timestamps": [
2634 9223372036854775807,
2635 9223372036854775807,
2636 9223372036854775807
2637 ],
2638 "oldest_remote_unreliable_monotonic_timestamps": [
2639 9223372036854775807,
2640 200000,
2641 9223372036854775807
2642 ],
2643 "oldest_local_unreliable_monotonic_timestamps": [
2644 9223372036854775807,
2645 200000,
2646 9223372036854775807
2647 ]
2648})")),
2649 part3_(MakeHeader(config_, R"({
2650 /* 100ms */
2651 "max_out_of_order_duration": 100000000,
2652 "node": {
2653 "name": "pi1"
2654 },
2655 "logger_node": {
2656 "name": "pi1"
2657 },
2658 "monotonic_start_time": 1000000,
2659 "realtime_start_time": 1000000000000,
2660 "logger_monotonic_start_time": 1000000,
2661 "logger_realtime_start_time": 1000000000000,
2662 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2663 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2664 "parts_index": 3,
2665 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2666 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2667 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2668 "boot_uuids": [
2669 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2670 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2671 ""
2672 ],
2673 "oldest_remote_monotonic_timestamps": [
2674 9223372036854775807,
2675 9223372036854775807,
2676 9223372036854775807
2677 ],
2678 "oldest_local_monotonic_timestamps": [
2679 9223372036854775807,
2680 9223372036854775807,
2681 9223372036854775807
2682 ],
2683 "oldest_remote_unreliable_monotonic_timestamps": [
2684 9223372036854775807,
2685 300000,
2686 9223372036854775807
2687 ],
2688 "oldest_local_unreliable_monotonic_timestamps": [
2689 9223372036854775807,
2690 300000,
2691 9223372036854775807
2692 ]
2693})")) {}
2694
2695 protected:
2696 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part0_;
2697 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part1_;
2698 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part2_;
2699 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part3_;
2700};
2701
2702// Tests that if 2 computers go back and forth trying to be the same node, we
2703// die in sorting instead of failing to estimate time.
2704TEST_F(SortingDeathTest, FightingNodes) {
2705 {
2706 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
2707 writer0.QueueSpan(part0_.span());
2708 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
2709 writer1.QueueSpan(part1_.span());
2710 DetachedBufferWriter writer2(logfile2_, std::make_unique<DummyEncoder>());
2711 writer2.QueueSpan(part2_.span());
2712 DetachedBufferWriter writer3(logfile3_, std::make_unique<DummyEncoder>());
2713 writer3.QueueSpan(part3_.span());
2714 }
2715
2716 EXPECT_DEATH(
2717 {
2718 const std::vector<LogFile> parts =
2719 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2720 },
2721 "Found overlapping boots on");
2722}
2723
Brian Smarttea913d42021-12-10 15:02:38 -08002724// Tests that we MessageReader blows up on a bad message.
2725TEST(MessageReaderConfirmCrash, ReadWrite) {
2726 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
2727 unlink(logfile.c_str());
2728
2729 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
2730 JsonToSizedFlatbuffer<LogFileHeader>(
2731 R"({ "max_out_of_order_duration": 100000000 })");
2732 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
2733 JsonToSizedFlatbuffer<MessageHeader>(
2734 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
2735 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
2736 JsonToSizedFlatbuffer<MessageHeader>(
2737 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
2738 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m4 =
2739 JsonToSizedFlatbuffer<MessageHeader>(
2740 R"({ "channel_index": 0, "monotonic_sent_time": 4 })");
2741
2742 // Starts out like a proper flat buffer header, but it breaks down ...
2743 std::vector<uint8_t> garbage{8, 0, 0, 0, 16, 0, 0, 0, 4, 0, 0, 0};
2744 absl::Span<uint8_t> m3_span(garbage);
2745
2746 {
2747 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
2748 writer.QueueSpan(config.span());
2749 writer.QueueSpan(m1.span());
2750 writer.QueueSpan(m2.span());
2751 writer.QueueSpan(m3_span);
2752 writer.QueueSpan(m4.span()); // This message is "hidden"
2753 }
2754
2755 {
2756 MessageReader reader(logfile);
2757
2758 EXPECT_EQ(reader.filename(), logfile);
2759
2760 EXPECT_EQ(
2761 reader.max_out_of_order_duration(),
2762 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2763 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2764 EXPECT_TRUE(reader.ReadMessage());
2765 EXPECT_EQ(reader.newest_timestamp(),
2766 monotonic_clock::time_point(chrono::nanoseconds(1)));
2767 EXPECT_TRUE(reader.ReadMessage());
2768 EXPECT_EQ(reader.newest_timestamp(),
2769 monotonic_clock::time_point(chrono::nanoseconds(2)));
2770 // Confirm default crashing behavior
2771 EXPECT_DEATH(reader.ReadMessage(), "Corrupted message at offset");
2772 }
2773
2774 {
2775 gflags::FlagSaver fs;
2776
2777 MessageReader reader(logfile);
2778 reader.set_crash_on_corrupt_message_flag(false);
2779
2780 EXPECT_EQ(reader.filename(), logfile);
2781
2782 EXPECT_EQ(
2783 reader.max_out_of_order_duration(),
2784 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2785 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2786 EXPECT_TRUE(reader.ReadMessage());
2787 EXPECT_EQ(reader.newest_timestamp(),
2788 monotonic_clock::time_point(chrono::nanoseconds(1)));
2789 EXPECT_TRUE(reader.ReadMessage());
2790 EXPECT_EQ(reader.newest_timestamp(),
2791 monotonic_clock::time_point(chrono::nanoseconds(2)));
2792 // Confirm avoiding the corrupted message crash, stopping instead.
2793 EXPECT_FALSE(reader.ReadMessage());
2794 }
2795
2796 {
2797 gflags::FlagSaver fs;
2798
2799 MessageReader reader(logfile);
2800 reader.set_crash_on_corrupt_message_flag(false);
2801 reader.set_ignore_corrupt_messages_flag(true);
2802
2803 EXPECT_EQ(reader.filename(), logfile);
2804
2805 EXPECT_EQ(
2806 reader.max_out_of_order_duration(),
2807 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2808 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2809 EXPECT_TRUE(reader.ReadMessage());
2810 EXPECT_EQ(reader.newest_timestamp(),
2811 monotonic_clock::time_point(chrono::nanoseconds(1)));
2812 EXPECT_TRUE(reader.ReadMessage());
2813 EXPECT_EQ(reader.newest_timestamp(),
2814 monotonic_clock::time_point(chrono::nanoseconds(2)));
2815 // Confirm skipping of the corrupted message to read the hidden one.
2816 EXPECT_TRUE(reader.ReadMessage());
2817 EXPECT_EQ(reader.newest_timestamp(),
2818 monotonic_clock::time_point(chrono::nanoseconds(4)));
2819 EXPECT_FALSE(reader.ReadMessage());
2820 }
2821}
2822
Austin Schuhc243b422020-10-11 15:35:08 -07002823} // namespace testing
2824} // namespace logger
2825} // namespace aos