blob: c9ec37e17041ea71468a44883d1abb2d72ebf126 [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
4#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07005
Austin Schuhc41603c2020-10-11 16:17:37 -07006#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -07007#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -08008#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -07009#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070010#include "aos/json_to_flatbuffer.h"
11#include "aos/testing/tmpdir.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070012#include "gtest/gtest.h"
Austin Schuhc243b422020-10-11 15:35:08 -070013
14namespace aos {
15namespace logger {
16namespace testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070017namespace chrono = std::chrono;
Austin Schuhc243b422020-10-11 15:35:08 -070018
Austin Schuhe243aaf2020-10-11 15:46:02 -070019// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070020template <typename T>
21SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
22 const std::string_view data) {
23 flatbuffers::FlatBufferBuilder fbb;
24 fbb.ForceDefaults(true);
25 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
26 return fbb.Release();
27}
28
Austin Schuhe243aaf2020-10-11 15:46:02 -070029// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070030TEST(SpanReaderTest, ReadWrite) {
31 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
32 unlink(logfile.c_str());
33
34 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080035 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070036 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080037 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070038
39 {
40 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080041 writer.QueueSpan(m1.span());
42 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070043 }
44
45 SpanReader reader(logfile);
46
47 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhcf5f6442021-07-06 10:43:28 -070048 EXPECT_EQ(reader.PeekMessage(), m1.span());
49 EXPECT_EQ(reader.PeekMessage(), m1.span());
Austin Schuhadd6eb32020-11-09 21:24:26 -080050 EXPECT_EQ(reader.ReadMessage(), m1.span());
51 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhcf5f6442021-07-06 10:43:28 -070052 EXPECT_EQ(reader.PeekMessage(), absl::Span<const uint8_t>());
Austin Schuhc243b422020-10-11 15:35:08 -070053 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
54}
55
Austin Schuhe243aaf2020-10-11 15:46:02 -070056// Tests that we can actually parse the resulting messages at a basic level
57// through MessageReader.
58TEST(MessageReaderTest, ReadWrite) {
59 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
60 unlink(logfile.c_str());
61
62 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
63 JsonToSizedFlatbuffer<LogFileHeader>(
64 R"({ "max_out_of_order_duration": 100000000 })");
65 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
66 JsonToSizedFlatbuffer<MessageHeader>(
67 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
68 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
69 JsonToSizedFlatbuffer<MessageHeader>(
70 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
71
72 {
73 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080074 writer.QueueSpan(config.span());
75 writer.QueueSpan(m1.span());
76 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -070077 }
78
79 MessageReader reader(logfile);
80
81 EXPECT_EQ(reader.filename(), logfile);
82
83 EXPECT_EQ(
84 reader.max_out_of_order_duration(),
85 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
86 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
87 EXPECT_TRUE(reader.ReadMessage());
88 EXPECT_EQ(reader.newest_timestamp(),
89 monotonic_clock::time_point(chrono::nanoseconds(1)));
90 EXPECT_TRUE(reader.ReadMessage());
91 EXPECT_EQ(reader.newest_timestamp(),
92 monotonic_clock::time_point(chrono::nanoseconds(2)));
93 EXPECT_FALSE(reader.ReadMessage());
94}
95
Austin Schuh32f68492020-11-08 21:45:51 -080096// Tests that we explode when messages are too far out of order.
97TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
98 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
99 unlink(logfile0.c_str());
100
101 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
102 JsonToSizedFlatbuffer<LogFileHeader>(
103 R"({
104 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800105 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800106 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
107 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
108 "parts_index": 0
109})");
110
111 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
112 JsonToSizedFlatbuffer<MessageHeader>(
113 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
114 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
115 JsonToSizedFlatbuffer<MessageHeader>(
116 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
117 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
118 JsonToSizedFlatbuffer<MessageHeader>(
119 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
120
121 {
122 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800123 writer.QueueSpan(config0.span());
124 writer.QueueSpan(m1.span());
125 writer.QueueSpan(m2.span());
126 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800127 }
128
129 const std::vector<LogFile> parts = SortParts({logfile0});
130
131 PartsMessageReader reader(parts[0].parts[0]);
132
133 EXPECT_TRUE(reader.ReadMessage());
134 EXPECT_TRUE(reader.ReadMessage());
135 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
136}
137
Austin Schuhc41603c2020-10-11 16:17:37 -0700138// Tests that we can transparently re-assemble part files with a
139// PartsMessageReader.
140TEST(PartsMessageReaderTest, ReadWrite) {
141 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
142 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
143 unlink(logfile0.c_str());
144 unlink(logfile1.c_str());
145
146 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
147 JsonToSizedFlatbuffer<LogFileHeader>(
148 R"({
149 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800150 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700151 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
152 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
153 "parts_index": 0
154})");
155 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
156 JsonToSizedFlatbuffer<LogFileHeader>(
157 R"({
158 "max_out_of_order_duration": 200000000,
159 "monotonic_start_time": 0,
160 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800161 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700162 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
163 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
164 "parts_index": 1
165})");
166
167 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
168 JsonToSizedFlatbuffer<MessageHeader>(
169 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
170 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
171 JsonToSizedFlatbuffer<MessageHeader>(
172 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
173
174 {
175 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800176 writer.QueueSpan(config0.span());
177 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700178 }
179 {
180 DetachedBufferWriter writer(logfile1, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800181 writer.QueueSpan(config1.span());
182 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700183 }
184
185 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
186
187 PartsMessageReader reader(parts[0].parts[0]);
188
189 EXPECT_EQ(reader.filename(), logfile0);
190
191 // Confirm that the timestamps track, and the filename also updates.
192 // Read the first message.
193 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
194 EXPECT_EQ(
195 reader.max_out_of_order_duration(),
196 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
197 EXPECT_TRUE(reader.ReadMessage());
198 EXPECT_EQ(reader.filename(), logfile0);
199 EXPECT_EQ(reader.newest_timestamp(),
200 monotonic_clock::time_point(chrono::nanoseconds(1)));
201 EXPECT_EQ(
202 reader.max_out_of_order_duration(),
203 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
204
205 // Read the second message.
206 EXPECT_TRUE(reader.ReadMessage());
207 EXPECT_EQ(reader.filename(), logfile1);
208 EXPECT_EQ(reader.newest_timestamp(),
209 monotonic_clock::time_point(chrono::nanoseconds(2)));
210 EXPECT_EQ(
211 reader.max_out_of_order_duration(),
212 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
213
214 // And then confirm that reading again returns no message.
215 EXPECT_FALSE(reader.ReadMessage());
216 EXPECT_EQ(reader.filename(), logfile1);
217 EXPECT_EQ(
218 reader.max_out_of_order_duration(),
219 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800220 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Austin Schuhc41603c2020-10-11 16:17:37 -0700221}
Austin Schuh32f68492020-11-08 21:45:51 -0800222
Austin Schuh1be0ce42020-11-29 22:43:26 -0800223// Tests that Message's operator < works as expected.
224TEST(MessageTest, Sorting) {
225 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
226
227 Message m1{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700228 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700229 .timestamp =
230 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
Austin Schuh48507722021-07-17 17:29:24 -0700231 .monotonic_remote_boot = 0xffffff,
232 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700233 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800234 Message m2{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700235 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700236 .timestamp =
237 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
Austin Schuh48507722021-07-17 17:29:24 -0700238 .monotonic_remote_boot = 0xffffff,
239 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700240 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800241
242 EXPECT_LT(m1, m2);
243 EXPECT_GE(m2, m1);
244
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700245 m1.timestamp.time = e;
246 m2.timestamp.time = e;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800247
248 m1.channel_index = 1;
249 m2.channel_index = 2;
250
251 EXPECT_LT(m1, m2);
252 EXPECT_GE(m2, m1);
253
254 m1.channel_index = 0;
255 m2.channel_index = 0;
Austin Schuh58646e22021-08-23 23:51:46 -0700256 m1.queue_index.index = 0u;
257 m2.queue_index.index = 1u;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800258
259 EXPECT_LT(m1, m2);
260 EXPECT_GE(m2, m1);
261}
262
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800263aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
264 const aos::FlatbufferDetachedBuffer<Configuration> &config,
265 const std::string_view json) {
266 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700267 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800268 flatbuffers::Offset<Configuration> config_offset =
269 aos::CopyFlatBuffer(config, &fbb);
270 LogFileHeader::Builder header_builder(fbb);
271 header_builder.add_configuration(config_offset);
272 fbb.Finish(header_builder.Finish());
273 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
274
275 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
276 JsonToFlatbuffer<LogFileHeader>(json));
277 CHECK(header_updates.Verify());
278 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700279 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800280 fbb2.FinishSizePrefixed(
281 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
282 return fbb2.Release();
283}
284
285class SortingElementTest : public ::testing::Test {
286 public:
287 SortingElementTest()
288 : config_(JsonToFlatbuffer<Configuration>(
289 R"({
290 "channels": [
291 {
292 "name": "/a",
293 "type": "aos.logger.testing.TestMessage",
294 "source_node": "pi1",
295 "destination_nodes": [
296 {
297 "name": "pi2"
298 },
299 {
300 "name": "pi3"
301 }
302 ]
303 },
304 {
305 "name": "/b",
306 "type": "aos.logger.testing.TestMessage",
307 "source_node": "pi1"
308 },
309 {
310 "name": "/c",
311 "type": "aos.logger.testing.TestMessage",
312 "source_node": "pi1"
Austin Schuh48507722021-07-17 17:29:24 -0700313 },
314 {
315 "name": "/d",
316 "type": "aos.logger.testing.TestMessage",
317 "source_node": "pi2",
318 "destination_nodes": [
319 {
320 "name": "pi1"
321 }
322 ]
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800323 }
324 ],
325 "nodes": [
326 {
327 "name": "pi1"
328 },
329 {
330 "name": "pi2"
331 },
332 {
333 "name": "pi3"
334 }
335 ]
336}
337)")),
338 config0_(MakeHeader(config_, R"({
339 /* 100ms */
340 "max_out_of_order_duration": 100000000,
341 "node": {
342 "name": "pi1"
343 },
344 "logger_node": {
345 "name": "pi1"
346 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800347 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800348 "realtime_start_time": 1000000000000,
349 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700350 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
351 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
352 "boot_uuids": [
353 "1d782c63-b3c7-466e-bea9-a01308b43333",
354 "",
355 ""
356 ],
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800357 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
358 "parts_index": 0
359})")),
360 config1_(MakeHeader(config_,
361 R"({
362 /* 100ms */
363 "max_out_of_order_duration": 100000000,
364 "node": {
365 "name": "pi1"
366 },
367 "logger_node": {
368 "name": "pi1"
369 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800370 "monotonic_start_time": 1000000,
371 "realtime_start_time": 1000000000000,
372 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700373 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
374 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
375 "boot_uuids": [
376 "1d782c63-b3c7-466e-bea9-a01308b43333",
377 "",
378 ""
379 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800380 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
381 "parts_index": 0
382})")),
383 config2_(MakeHeader(config_,
384 R"({
385 /* 100ms */
386 "max_out_of_order_duration": 100000000,
387 "node": {
388 "name": "pi2"
389 },
390 "logger_node": {
391 "name": "pi2"
392 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800393 "monotonic_start_time": 0,
394 "realtime_start_time": 1000000000000,
Austin Schuh48507722021-07-17 17:29:24 -0700395 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
396 "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
397 "boot_uuids": [
398 "",
399 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
400 ""
401 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800402 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
403 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
404 "parts_index": 0
405})")),
406 config3_(MakeHeader(config_,
407 R"({
408 /* 100ms */
409 "max_out_of_order_duration": 100000000,
410 "node": {
411 "name": "pi1"
412 },
413 "logger_node": {
414 "name": "pi1"
415 },
416 "monotonic_start_time": 2000000,
417 "realtime_start_time": 1000000000,
418 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
Austin Schuh48507722021-07-17 17:29:24 -0700419 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
420 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
421 "boot_uuids": [
422 "1d782c63-b3c7-466e-bea9-a01308b43333",
423 "",
424 ""
425 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800426 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800427 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800428})")),
429 config4_(MakeHeader(config_,
430 R"({
431 /* 100ms */
432 "max_out_of_order_duration": 100000000,
433 "node": {
434 "name": "pi2"
435 },
436 "logger_node": {
437 "name": "pi1"
438 },
439 "monotonic_start_time": 2000000,
440 "realtime_start_time": 1000000000,
441 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
442 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
Austin Schuh48507722021-07-17 17:29:24 -0700443 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
444 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
445 "boot_uuids": [
446 "1d782c63-b3c7-466e-bea9-a01308b43333",
447 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
448 ""
449 ],
Austin Schuh8bf1e632021-01-02 22:41:04 -0800450 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800451})")) {
452 unlink(logfile0_.c_str());
453 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800454 unlink(logfile2_.c_str());
Austin Schuh48507722021-07-17 17:29:24 -0700455 unlink(logfile3_.c_str());
Austin Schuh921b2562021-07-31 19:37:03 -0700456 queue_index_.resize(config_.message().channels()->size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800457 }
458
459 protected:
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800460 flatbuffers::DetachedBuffer MakeLogMessage(
461 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
462 int value) {
463 flatbuffers::FlatBufferBuilder message_fbb;
464 message_fbb.ForceDefaults(true);
465 TestMessage::Builder test_message_builder(message_fbb);
466 test_message_builder.add_value(value);
467 message_fbb.Finish(test_message_builder.Finish());
468
469 aos::Context context;
470 context.monotonic_event_time = monotonic_now;
471 context.realtime_event_time = aos::realtime_clock::epoch() +
472 chrono::seconds(1000) +
473 monotonic_now.time_since_epoch();
Austin Schuh921b2562021-07-31 19:37:03 -0700474 CHECK_LT(static_cast<size_t>(channel_index), queue_index_.size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800475 context.queue_index = queue_index_[channel_index];
476 context.size = message_fbb.GetSize();
477 context.data = message_fbb.GetBufferPointer();
478
479 ++queue_index_[channel_index];
480
481 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh53e3a812021-11-03 16:57:23 -0700482 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800483 fbb.FinishSizePrefixed(
484 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
485
486 return fbb.Release();
487 }
488
489 flatbuffers::DetachedBuffer MakeTimestampMessage(
490 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800491 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
492 monotonic_clock::time_point monotonic_timestamp_time =
493 monotonic_clock::min_time) {
494 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800495 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800496
497 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800498 fbb.ForceDefaults(true);
499
500 logger::MessageHeader::Builder message_header_builder(fbb);
501
502 message_header_builder.add_channel_index(channel_index);
503
504 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
505 100);
506 message_header_builder.add_monotonic_sent_time(
507 monotonic_sent_time.time_since_epoch().count());
508 message_header_builder.add_realtime_sent_time(
509 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
510 monotonic_sent_time.time_since_epoch())
511 .time_since_epoch()
512 .count());
513
514 message_header_builder.add_monotonic_remote_time(
515 sender_monotonic_now.time_since_epoch().count());
516 message_header_builder.add_realtime_remote_time(
517 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
518 sender_monotonic_now.time_since_epoch())
519 .time_since_epoch()
520 .count());
521 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
522 1);
523
524 if (monotonic_timestamp_time != monotonic_clock::min_time) {
525 message_header_builder.add_monotonic_timestamp_time(
526 monotonic_timestamp_time.time_since_epoch().count());
527 }
528
529 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800530 LOG(INFO) << aos::FlatbufferToJson(
531 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
532 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
533
534 return fbb.Release();
535 }
536
537 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
538 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800539 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh48507722021-07-17 17:29:24 -0700540 const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800541
542 const aos::FlatbufferDetachedBuffer<Configuration> config_;
543 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
544 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800545 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
546 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800547 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800548
549 std::vector<uint32_t> queue_index_;
550};
551
552using LogPartsSorterTest = SortingElementTest;
553using LogPartsSorterDeathTest = LogPartsSorterTest;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800554using NodeMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800555using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800556
557// Tests that we can pull messages out of a log sorted in order.
558TEST_F(LogPartsSorterTest, Pull) {
559 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
560 {
561 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
562 writer.QueueSpan(config0_.span());
563 writer.QueueSizedFlatbuffer(
564 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
565 writer.QueueSizedFlatbuffer(
566 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
567 writer.QueueSizedFlatbuffer(
568 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
569 writer.QueueSizedFlatbuffer(
570 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
571 }
572
573 const std::vector<LogFile> parts = SortParts({logfile0_});
574
575 LogPartsSorter parts_sorter(parts[0].parts[0]);
576
577 // Confirm we aren't sorted until any time until the message is popped.
578 // Peeking shouldn't change the sorted until time.
579 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
580
581 std::deque<Message> output;
582
583 ASSERT_TRUE(parts_sorter.Front() != nullptr);
584 output.emplace_back(std::move(*parts_sorter.Front()));
585 parts_sorter.PopFront();
586 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
587
588 ASSERT_TRUE(parts_sorter.Front() != nullptr);
589 output.emplace_back(std::move(*parts_sorter.Front()));
590 parts_sorter.PopFront();
591 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
592
593 ASSERT_TRUE(parts_sorter.Front() != nullptr);
594 output.emplace_back(std::move(*parts_sorter.Front()));
595 parts_sorter.PopFront();
596 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
597
598 ASSERT_TRUE(parts_sorter.Front() != nullptr);
599 output.emplace_back(std::move(*parts_sorter.Front()));
600 parts_sorter.PopFront();
601 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
602
603 ASSERT_TRUE(parts_sorter.Front() == nullptr);
604
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700605 EXPECT_EQ(output[0].timestamp.boot, 0);
606 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
607 EXPECT_EQ(output[1].timestamp.boot, 0);
608 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1000));
609 EXPECT_EQ(output[2].timestamp.boot, 0);
610 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1901));
611 EXPECT_EQ(output[3].timestamp.boot, 0);
612 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800613}
614
Austin Schuhb000de62020-12-03 22:00:40 -0800615// Tests that we can pull messages out of a log sorted in order.
616TEST_F(LogPartsSorterTest, WayBeforeStart) {
617 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
618 {
619 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
620 writer.QueueSpan(config0_.span());
621 writer.QueueSizedFlatbuffer(
622 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
623 writer.QueueSizedFlatbuffer(
624 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
625 writer.QueueSizedFlatbuffer(
626 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
627 writer.QueueSizedFlatbuffer(
628 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
629 writer.QueueSizedFlatbuffer(
630 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
631 }
632
633 const std::vector<LogFile> parts = SortParts({logfile0_});
634
635 LogPartsSorter parts_sorter(parts[0].parts[0]);
636
637 // Confirm we aren't sorted until any time until the message is popped.
638 // Peeking shouldn't change the sorted until time.
639 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
640
641 std::deque<Message> output;
642
643 for (monotonic_clock::time_point t :
644 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
645 e + chrono::milliseconds(1900), monotonic_clock::max_time,
646 monotonic_clock::max_time}) {
647 ASSERT_TRUE(parts_sorter.Front() != nullptr);
648 output.emplace_back(std::move(*parts_sorter.Front()));
649 parts_sorter.PopFront();
650 EXPECT_EQ(parts_sorter.sorted_until(), t);
651 }
652
653 ASSERT_TRUE(parts_sorter.Front() == nullptr);
654
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700655 EXPECT_EQ(output[0].timestamp.boot, 0u);
656 EXPECT_EQ(output[0].timestamp.time, e - chrono::milliseconds(1000));
657 EXPECT_EQ(output[1].timestamp.boot, 0u);
658 EXPECT_EQ(output[1].timestamp.time, e - chrono::milliseconds(500));
659 EXPECT_EQ(output[2].timestamp.boot, 0u);
660 EXPECT_EQ(output[2].timestamp.time, e - chrono::milliseconds(10));
661 EXPECT_EQ(output[3].timestamp.boot, 0u);
662 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(1901));
663 EXPECT_EQ(output[4].timestamp.boot, 0u);
664 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuhb000de62020-12-03 22:00:40 -0800665}
666
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800667// Tests that messages too far out of order trigger death.
668TEST_F(LogPartsSorterDeathTest, Pull) {
669 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
670 {
671 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
672 writer.QueueSpan(config0_.span());
673 writer.QueueSizedFlatbuffer(
674 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
675 writer.QueueSizedFlatbuffer(
676 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
677 writer.QueueSizedFlatbuffer(
678 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
679 // The following message is too far out of order and will trigger the CHECK.
680 writer.QueueSizedFlatbuffer(
681 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
682 }
683
684 const std::vector<LogFile> parts = SortParts({logfile0_});
685
686 LogPartsSorter parts_sorter(parts[0].parts[0]);
687
688 // Confirm we aren't sorted until any time until the message is popped.
689 // Peeking shouldn't change the sorted until time.
690 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
691 std::deque<Message> output;
692
693 ASSERT_TRUE(parts_sorter.Front() != nullptr);
694 parts_sorter.PopFront();
695 ASSERT_TRUE(parts_sorter.Front() != nullptr);
696 ASSERT_TRUE(parts_sorter.Front() != nullptr);
697 parts_sorter.PopFront();
698
Austin Schuh58646e22021-08-23 23:51:46 -0700699 EXPECT_DEATH({ parts_sorter.Front(); },
700 "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800701}
702
Austin Schuh8f52ed52020-11-30 23:12:39 -0800703// Tests that we can merge data from 2 separate files, including duplicate data.
704TEST_F(NodeMergerTest, TwoFileMerger) {
705 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
706 {
707 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
708 writer0.QueueSpan(config0_.span());
709 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
710 writer1.QueueSpan(config1_.span());
711
712 writer0.QueueSizedFlatbuffer(
713 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
714 writer1.QueueSizedFlatbuffer(
715 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
716
717 writer0.QueueSizedFlatbuffer(
718 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
719 writer1.QueueSizedFlatbuffer(
720 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
721
722 // Make a duplicate!
723 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
724 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
725 writer0.QueueSpan(msg.span());
726 writer1.QueueSpan(msg.span());
727
728 writer1.QueueSizedFlatbuffer(
729 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
730 }
731
732 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Austin Schuhd2f96102020-12-01 20:27:29 -0800733 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800734
Austin Schuhd2f96102020-12-01 20:27:29 -0800735 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800736
737 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
738
739 std::deque<Message> output;
740
741 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
742 ASSERT_TRUE(merger.Front() != nullptr);
743 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
744
745 output.emplace_back(std::move(*merger.Front()));
746 merger.PopFront();
747 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
748
749 ASSERT_TRUE(merger.Front() != nullptr);
750 output.emplace_back(std::move(*merger.Front()));
751 merger.PopFront();
752 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
753
754 ASSERT_TRUE(merger.Front() != nullptr);
755 output.emplace_back(std::move(*merger.Front()));
756 merger.PopFront();
757 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
758
759 ASSERT_TRUE(merger.Front() != nullptr);
760 output.emplace_back(std::move(*merger.Front()));
761 merger.PopFront();
762 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
763
764 ASSERT_TRUE(merger.Front() != nullptr);
765 output.emplace_back(std::move(*merger.Front()));
766 merger.PopFront();
767 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
768
769 ASSERT_TRUE(merger.Front() != nullptr);
770 output.emplace_back(std::move(*merger.Front()));
771 merger.PopFront();
772 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
773
774 ASSERT_TRUE(merger.Front() == nullptr);
775
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700776 EXPECT_EQ(output[0].timestamp.boot, 0u);
777 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
778 EXPECT_EQ(output[1].timestamp.boot, 0u);
779 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1001));
780 EXPECT_EQ(output[2].timestamp.boot, 0u);
781 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1002));
782 EXPECT_EQ(output[3].timestamp.boot, 0u);
783 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
784 EXPECT_EQ(output[4].timestamp.boot, 0u);
785 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(3000));
786 EXPECT_EQ(output[5].timestamp.boot, 0u);
787 EXPECT_EQ(output[5].timestamp.time, e + chrono::milliseconds(3002));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800788}
789
Austin Schuh8bf1e632021-01-02 22:41:04 -0800790// Tests that we can merge timestamps with various combinations of
791// monotonic_timestamp_time.
792TEST_F(NodeMergerTest, TwoFileTimestampMerger) {
793 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
794 {
795 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
796 writer0.QueueSpan(config0_.span());
797 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
798 writer1.QueueSpan(config1_.span());
799
800 // Neither has it.
801 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
802 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
803 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
804 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
805 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
806
807 // First only has it.
808 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
809 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
810 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
811 e + chrono::nanoseconds(971)));
812 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
813 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
814
815 // Second only has it.
816 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
817 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
818 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
819 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
820 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
821 e + chrono::nanoseconds(972)));
822
823 // Both have it.
824 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
825 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
826 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
827 e + chrono::nanoseconds(973)));
828 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
829 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
830 e + chrono::nanoseconds(973)));
831 }
832
833 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
834 ASSERT_EQ(parts.size(), 1u);
835
836 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
837
838 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
839
840 std::deque<Message> output;
841
842 for (int i = 0; i < 4; ++i) {
843 ASSERT_TRUE(merger.Front() != nullptr);
844 output.emplace_back(std::move(*merger.Front()));
845 merger.PopFront();
846 }
847 ASSERT_TRUE(merger.Front() == nullptr);
848
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700849 EXPECT_EQ(output[0].timestamp.boot, 0u);
850 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700851 EXPECT_FALSE(output[0].data->has_monotonic_timestamp_time);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700852
853 EXPECT_EQ(output[1].timestamp.boot, 0u);
854 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700855 EXPECT_TRUE(output[1].data->has_monotonic_timestamp_time);
856 EXPECT_EQ(output[1].data->monotonic_timestamp_time,
857 monotonic_clock::time_point(std::chrono::nanoseconds(971)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700858
859 EXPECT_EQ(output[2].timestamp.boot, 0u);
860 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700861 EXPECT_TRUE(output[2].data->has_monotonic_timestamp_time);
862 EXPECT_EQ(output[2].data->monotonic_timestamp_time,
863 monotonic_clock::time_point(std::chrono::nanoseconds(972)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700864
865 EXPECT_EQ(output[3].timestamp.boot, 0u);
866 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700867 EXPECT_TRUE(output[3].data->has_monotonic_timestamp_time);
868 EXPECT_EQ(output[3].data->monotonic_timestamp_time,
869 monotonic_clock::time_point(std::chrono::nanoseconds(973)));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800870}
871
Austin Schuhd2f96102020-12-01 20:27:29 -0800872// Tests that we can match timestamps on delivered messages.
873TEST_F(TimestampMapperTest, ReadNode0First) {
874 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
875 {
876 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
877 writer0.QueueSpan(config0_.span());
878 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
879 writer1.QueueSpan(config2_.span());
880
881 writer0.QueueSizedFlatbuffer(
882 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
883 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
884 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
885
886 writer0.QueueSizedFlatbuffer(
887 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
888 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
889 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
890
891 writer0.QueueSizedFlatbuffer(
892 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
893 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
894 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
895 }
896
897 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
898
899 ASSERT_EQ(parts[0].logger_node, "pi1");
900 ASSERT_EQ(parts[1].logger_node, "pi2");
901
Austin Schuh79b30942021-01-24 22:32:21 -0800902 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800903 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -0800904 mapper0.set_timestamp_callback(
905 [&](TimestampedMessage *) { ++mapper0_count; });
906 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800907 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -0800908 mapper1.set_timestamp_callback(
909 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -0800910
911 mapper0.AddPeer(&mapper1);
912 mapper1.AddPeer(&mapper0);
913
914 {
915 std::deque<TimestampedMessage> output0;
916
Austin Schuh79b30942021-01-24 22:32:21 -0800917 EXPECT_EQ(mapper0_count, 0u);
918 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800919 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800920 EXPECT_EQ(mapper0_count, 1u);
921 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800922 output0.emplace_back(std::move(*mapper0.Front()));
923 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700924 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800925 EXPECT_EQ(mapper0_count, 1u);
926 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800927
928 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800929 EXPECT_EQ(mapper0_count, 2u);
930 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800931 output0.emplace_back(std::move(*mapper0.Front()));
932 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700933 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800934
935 ASSERT_TRUE(mapper0.Front() != nullptr);
936 output0.emplace_back(std::move(*mapper0.Front()));
937 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700938 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800939
Austin Schuh79b30942021-01-24 22:32:21 -0800940 EXPECT_EQ(mapper0_count, 3u);
941 EXPECT_EQ(mapper1_count, 0u);
942
Austin Schuhd2f96102020-12-01 20:27:29 -0800943 ASSERT_TRUE(mapper0.Front() == nullptr);
944
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700945 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
946 EXPECT_EQ(output0[0].monotonic_event_time.time,
947 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700948 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700949
950 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
951 EXPECT_EQ(output0[1].monotonic_event_time.time,
952 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700953 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700954
955 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
956 EXPECT_EQ(output0[2].monotonic_event_time.time,
957 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700958 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -0800959 }
960
961 {
962 SCOPED_TRACE("Trying node1 now");
963 std::deque<TimestampedMessage> output1;
964
Austin Schuh79b30942021-01-24 22:32:21 -0800965 EXPECT_EQ(mapper0_count, 3u);
966 EXPECT_EQ(mapper1_count, 0u);
967
Austin Schuhd2f96102020-12-01 20:27:29 -0800968 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800969 EXPECT_EQ(mapper0_count, 3u);
970 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800971 output1.emplace_back(std::move(*mapper1.Front()));
972 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700973 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800974 EXPECT_EQ(mapper0_count, 3u);
975 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800976
977 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800978 EXPECT_EQ(mapper0_count, 3u);
979 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800980 output1.emplace_back(std::move(*mapper1.Front()));
981 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700982 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800983
984 ASSERT_TRUE(mapper1.Front() != nullptr);
985 output1.emplace_back(std::move(*mapper1.Front()));
986 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700987 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800988
Austin Schuh79b30942021-01-24 22:32:21 -0800989 EXPECT_EQ(mapper0_count, 3u);
990 EXPECT_EQ(mapper1_count, 3u);
991
Austin Schuhd2f96102020-12-01 20:27:29 -0800992 ASSERT_TRUE(mapper1.Front() == nullptr);
993
Austin Schuh79b30942021-01-24 22:32:21 -0800994 EXPECT_EQ(mapper0_count, 3u);
995 EXPECT_EQ(mapper1_count, 3u);
996
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700997 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
998 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -0800999 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001000 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001001
1002 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1003 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001004 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001005 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001006
1007 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1008 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001009 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001010 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001011 }
1012}
1013
Austin Schuh8bf1e632021-01-02 22:41:04 -08001014// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
1015// returned.
1016TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
1017 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1018 {
1019 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1020 writer0.QueueSpan(config0_.span());
1021 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1022 writer1.QueueSpan(config4_.span());
1023
1024 writer0.QueueSizedFlatbuffer(
1025 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1026 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1027 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
1028 e + chrono::nanoseconds(971)));
1029
1030 writer0.QueueSizedFlatbuffer(
1031 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1032 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1033 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
1034 e + chrono::nanoseconds(5458)));
1035
1036 writer0.QueueSizedFlatbuffer(
1037 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1038 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1039 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1040 }
1041
1042 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1043
1044 for (const auto &p : parts) {
1045 LOG(INFO) << p;
1046 }
1047
1048 ASSERT_EQ(parts.size(), 1u);
1049
Austin Schuh79b30942021-01-24 22:32:21 -08001050 size_t mapper0_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001051 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001052 mapper0.set_timestamp_callback(
1053 [&](TimestampedMessage *) { ++mapper0_count; });
1054 size_t mapper1_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001055 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001056 mapper1.set_timestamp_callback(
1057 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -08001058
1059 mapper0.AddPeer(&mapper1);
1060 mapper1.AddPeer(&mapper0);
1061
1062 {
1063 std::deque<TimestampedMessage> output0;
1064
1065 for (int i = 0; i < 3; ++i) {
1066 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
1067 output0.emplace_back(std::move(*mapper0.Front()));
1068 mapper0.PopFront();
1069 }
1070
1071 ASSERT_TRUE(mapper0.Front() == nullptr);
1072
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001073 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1074 EXPECT_EQ(output0[0].monotonic_event_time.time,
1075 e + chrono::milliseconds(1000));
1076 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
1077 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
1078 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001079 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001080
1081 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1082 EXPECT_EQ(output0[1].monotonic_event_time.time,
1083 e + chrono::milliseconds(2000));
1084 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
1085 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
1086 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001087 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001088
1089 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1090 EXPECT_EQ(output0[2].monotonic_event_time.time,
1091 e + chrono::milliseconds(3000));
1092 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
1093 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
1094 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001095 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001096 }
1097
1098 {
1099 SCOPED_TRACE("Trying node1 now");
1100 std::deque<TimestampedMessage> output1;
1101
1102 for (int i = 0; i < 3; ++i) {
1103 ASSERT_TRUE(mapper1.Front() != nullptr);
1104 output1.emplace_back(std::move(*mapper1.Front()));
1105 mapper1.PopFront();
1106 }
1107
1108 ASSERT_TRUE(mapper1.Front() == nullptr);
1109
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001110 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1111 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001112 e + chrono::seconds(100) + chrono::milliseconds(1000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001113 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
1114 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001115 e + chrono::nanoseconds(971));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001116 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001117
1118 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1119 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001120 e + chrono::seconds(100) + chrono::milliseconds(2000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001121 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
1122 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001123 e + chrono::nanoseconds(5458));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001124 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001125
1126 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1127 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001128 e + chrono::seconds(100) + chrono::milliseconds(3000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001129 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
1130 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
1131 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001132 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001133 }
Austin Schuh79b30942021-01-24 22:32:21 -08001134
1135 EXPECT_EQ(mapper0_count, 3u);
1136 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001137}
1138
Austin Schuhd2f96102020-12-01 20:27:29 -08001139// Tests that we can match timestamps on delivered messages. By doing this in
1140// the reverse order, the second node needs to queue data up from the first node
1141// to find the matching timestamp.
1142TEST_F(TimestampMapperTest, ReadNode1First) {
1143 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1144 {
1145 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1146 writer0.QueueSpan(config0_.span());
1147 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1148 writer1.QueueSpan(config2_.span());
1149
1150 writer0.QueueSizedFlatbuffer(
1151 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1152 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1153 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1154
1155 writer0.QueueSizedFlatbuffer(
1156 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1157 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1158 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1159
1160 writer0.QueueSizedFlatbuffer(
1161 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1162 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1163 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1164 }
1165
1166 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1167
1168 ASSERT_EQ(parts[0].logger_node, "pi1");
1169 ASSERT_EQ(parts[1].logger_node, "pi2");
1170
Austin Schuh79b30942021-01-24 22:32:21 -08001171 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001172 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001173 mapper0.set_timestamp_callback(
1174 [&](TimestampedMessage *) { ++mapper0_count; });
1175 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001176 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001177 mapper1.set_timestamp_callback(
1178 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001179
1180 mapper0.AddPeer(&mapper1);
1181 mapper1.AddPeer(&mapper0);
1182
1183 {
1184 SCOPED_TRACE("Trying node1 now");
1185 std::deque<TimestampedMessage> output1;
1186
1187 ASSERT_TRUE(mapper1.Front() != nullptr);
1188 output1.emplace_back(std::move(*mapper1.Front()));
1189 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001190 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001191
1192 ASSERT_TRUE(mapper1.Front() != nullptr);
1193 output1.emplace_back(std::move(*mapper1.Front()));
1194 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001195 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001196
1197 ASSERT_TRUE(mapper1.Front() != nullptr);
1198 output1.emplace_back(std::move(*mapper1.Front()));
1199 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001200 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001201
1202 ASSERT_TRUE(mapper1.Front() == nullptr);
1203
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001204 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1205 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001206 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001207 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001208
1209 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1210 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001211 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001212 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001213
1214 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1215 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001216 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001217 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001218 }
1219
1220 {
1221 std::deque<TimestampedMessage> output0;
1222
1223 ASSERT_TRUE(mapper0.Front() != nullptr);
1224 output0.emplace_back(std::move(*mapper0.Front()));
1225 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001226 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001227
1228 ASSERT_TRUE(mapper0.Front() != nullptr);
1229 output0.emplace_back(std::move(*mapper0.Front()));
1230 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001231 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001232
1233 ASSERT_TRUE(mapper0.Front() != nullptr);
1234 output0.emplace_back(std::move(*mapper0.Front()));
1235 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001236 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001237
1238 ASSERT_TRUE(mapper0.Front() == nullptr);
1239
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001240 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1241 EXPECT_EQ(output0[0].monotonic_event_time.time,
1242 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001243 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001244
1245 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1246 EXPECT_EQ(output0[1].monotonic_event_time.time,
1247 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001248 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001249
1250 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1251 EXPECT_EQ(output0[2].monotonic_event_time.time,
1252 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001253 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001254 }
Austin Schuh79b30942021-01-24 22:32:21 -08001255
1256 EXPECT_EQ(mapper0_count, 3u);
1257 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001258}
1259
1260// Tests that we return just the timestamps if we couldn't find the data and the
1261// missing data was at the beginning of the file.
1262TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1263 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1264 {
1265 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1266 writer0.QueueSpan(config0_.span());
1267 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1268 writer1.QueueSpan(config2_.span());
1269
1270 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
1271 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1272 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1273
1274 writer0.QueueSizedFlatbuffer(
1275 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1276 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1277 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1278
1279 writer0.QueueSizedFlatbuffer(
1280 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1281 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1282 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1283 }
1284
1285 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1286
1287 ASSERT_EQ(parts[0].logger_node, "pi1");
1288 ASSERT_EQ(parts[1].logger_node, "pi2");
1289
Austin Schuh79b30942021-01-24 22:32:21 -08001290 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001291 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001292 mapper0.set_timestamp_callback(
1293 [&](TimestampedMessage *) { ++mapper0_count; });
1294 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001295 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001296 mapper1.set_timestamp_callback(
1297 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001298
1299 mapper0.AddPeer(&mapper1);
1300 mapper1.AddPeer(&mapper0);
1301
1302 {
1303 SCOPED_TRACE("Trying node1 now");
1304 std::deque<TimestampedMessage> output1;
1305
1306 ASSERT_TRUE(mapper1.Front() != nullptr);
1307 output1.emplace_back(std::move(*mapper1.Front()));
1308 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001309 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001310
1311 ASSERT_TRUE(mapper1.Front() != nullptr);
1312 output1.emplace_back(std::move(*mapper1.Front()));
1313 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001314 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001315
1316 ASSERT_TRUE(mapper1.Front() != nullptr);
1317 output1.emplace_back(std::move(*mapper1.Front()));
1318 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001319 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001320
1321 ASSERT_TRUE(mapper1.Front() == nullptr);
1322
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001323 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1324 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001325 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001326 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001327
1328 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1329 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001330 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001331 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001332
1333 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1334 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001335 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001336 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001337 }
Austin Schuh79b30942021-01-24 22:32:21 -08001338
1339 EXPECT_EQ(mapper0_count, 0u);
1340 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001341}
1342
1343// Tests that we return just the timestamps if we couldn't find the data and the
1344// missing data was at the end of the file.
1345TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1346 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1347 {
1348 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1349 writer0.QueueSpan(config0_.span());
1350 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1351 writer1.QueueSpan(config2_.span());
1352
1353 writer0.QueueSizedFlatbuffer(
1354 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1355 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1356 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1357
1358 writer0.QueueSizedFlatbuffer(
1359 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1360 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1361 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1362
1363 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
1364 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1365 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1366 }
1367
1368 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1369
1370 ASSERT_EQ(parts[0].logger_node, "pi1");
1371 ASSERT_EQ(parts[1].logger_node, "pi2");
1372
Austin Schuh79b30942021-01-24 22:32:21 -08001373 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001374 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001375 mapper0.set_timestamp_callback(
1376 [&](TimestampedMessage *) { ++mapper0_count; });
1377 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001378 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001379 mapper1.set_timestamp_callback(
1380 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001381
1382 mapper0.AddPeer(&mapper1);
1383 mapper1.AddPeer(&mapper0);
1384
1385 {
1386 SCOPED_TRACE("Trying node1 now");
1387 std::deque<TimestampedMessage> output1;
1388
1389 ASSERT_TRUE(mapper1.Front() != nullptr);
1390 output1.emplace_back(std::move(*mapper1.Front()));
1391 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001392 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001393
1394 ASSERT_TRUE(mapper1.Front() != nullptr);
1395 output1.emplace_back(std::move(*mapper1.Front()));
1396 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001397 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001398
1399 ASSERT_TRUE(mapper1.Front() != nullptr);
1400 output1.emplace_back(std::move(*mapper1.Front()));
1401 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001402 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001403
1404 ASSERT_TRUE(mapper1.Front() == nullptr);
1405
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001406 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1407 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001408 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001409 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001410
1411 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1412 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001413 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001414 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001415
1416 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1417 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001418 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001419 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001420 }
Austin Schuh79b30942021-01-24 22:32:21 -08001421
1422 EXPECT_EQ(mapper0_count, 0u);
1423 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001424}
1425
Austin Schuh993ccb52020-12-12 15:59:32 -08001426// Tests that we handle a message which failed to forward or be logged.
1427TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1428 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1429 {
1430 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1431 writer0.QueueSpan(config0_.span());
1432 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1433 writer1.QueueSpan(config2_.span());
1434
1435 writer0.QueueSizedFlatbuffer(
1436 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1437 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1438 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1439
1440 // Create both the timestamp and message, but don't log them, simulating a
1441 // forwarding drop.
1442 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1443 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1444 chrono::seconds(100));
1445
1446 writer0.QueueSizedFlatbuffer(
1447 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1448 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1449 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1450 }
1451
1452 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1453
1454 ASSERT_EQ(parts[0].logger_node, "pi1");
1455 ASSERT_EQ(parts[1].logger_node, "pi2");
1456
Austin Schuh79b30942021-01-24 22:32:21 -08001457 size_t mapper0_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001458 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001459 mapper0.set_timestamp_callback(
1460 [&](TimestampedMessage *) { ++mapper0_count; });
1461 size_t mapper1_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001462 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001463 mapper1.set_timestamp_callback(
1464 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001465
1466 mapper0.AddPeer(&mapper1);
1467 mapper1.AddPeer(&mapper0);
1468
1469 {
1470 std::deque<TimestampedMessage> output1;
1471
1472 ASSERT_TRUE(mapper1.Front() != nullptr);
1473 output1.emplace_back(std::move(*mapper1.Front()));
1474 mapper1.PopFront();
1475
1476 ASSERT_TRUE(mapper1.Front() != nullptr);
1477 output1.emplace_back(std::move(*mapper1.Front()));
1478
1479 ASSERT_FALSE(mapper1.Front() == nullptr);
1480
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001481 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1482 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001483 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001484 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001485
1486 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1487 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001488 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001489 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh993ccb52020-12-12 15:59:32 -08001490 }
Austin Schuh79b30942021-01-24 22:32:21 -08001491
1492 EXPECT_EQ(mapper0_count, 0u);
1493 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001494}
1495
Austin Schuhd2f96102020-12-01 20:27:29 -08001496// Tests that we properly sort log files with duplicate timestamps.
1497TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1498 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1499 {
1500 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1501 writer0.QueueSpan(config0_.span());
1502 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1503 writer1.QueueSpan(config2_.span());
1504
1505 writer0.QueueSizedFlatbuffer(
1506 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1507 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1508 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1509
1510 writer0.QueueSizedFlatbuffer(
1511 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1512 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1513 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1514
1515 writer0.QueueSizedFlatbuffer(
1516 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
1517 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1518 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1519
1520 writer0.QueueSizedFlatbuffer(
1521 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
1522 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1523 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1524 }
1525
1526 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1527
1528 ASSERT_EQ(parts[0].logger_node, "pi1");
1529 ASSERT_EQ(parts[1].logger_node, "pi2");
1530
Austin Schuh79b30942021-01-24 22:32:21 -08001531 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001532 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001533 mapper0.set_timestamp_callback(
1534 [&](TimestampedMessage *) { ++mapper0_count; });
1535 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001536 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001537 mapper1.set_timestamp_callback(
1538 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001539
1540 mapper0.AddPeer(&mapper1);
1541 mapper1.AddPeer(&mapper0);
1542
1543 {
1544 SCOPED_TRACE("Trying node1 now");
1545 std::deque<TimestampedMessage> output1;
1546
1547 for (int i = 0; i < 4; ++i) {
1548 ASSERT_TRUE(mapper1.Front() != nullptr);
1549 output1.emplace_back(std::move(*mapper1.Front()));
1550 mapper1.PopFront();
1551 }
1552 ASSERT_TRUE(mapper1.Front() == nullptr);
1553
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001554 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1555 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001556 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001557 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001558
1559 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1560 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001561 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001562 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001563
1564 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1565 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001566 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001567 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001568
1569 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1570 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001571 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001572 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001573 }
Austin Schuh79b30942021-01-24 22:32:21 -08001574
1575 EXPECT_EQ(mapper0_count, 0u);
1576 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001577}
1578
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001579// Tests that we properly produce a valid start time.
Austin Schuhd2f96102020-12-01 20:27:29 -08001580TEST_F(TimestampMapperTest, StartTime) {
1581 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1582 {
1583 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1584 writer0.QueueSpan(config0_.span());
1585 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1586 writer1.QueueSpan(config1_.span());
1587 DetachedBufferWriter writer2(logfile2_, std::make_unique<DummyEncoder>());
1588 writer2.QueueSpan(config3_.span());
1589 }
1590
1591 const std::vector<LogFile> parts =
1592 SortParts({logfile0_, logfile1_, logfile2_});
1593
Austin Schuh79b30942021-01-24 22:32:21 -08001594 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001595 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001596 mapper0.set_timestamp_callback(
1597 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001598
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001599 EXPECT_EQ(mapper0.monotonic_start_time(0), e + chrono::milliseconds(1));
1600 EXPECT_EQ(mapper0.realtime_start_time(0),
Austin Schuhd2f96102020-12-01 20:27:29 -08001601 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001602 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001603}
1604
Austin Schuhfecf1d82020-12-19 16:57:28 -08001605// Tests that when a peer isn't registered, we treat that as if there was no
1606// data available.
1607TEST_F(TimestampMapperTest, NoPeer) {
1608 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1609 {
1610 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1611 writer0.QueueSpan(config0_.span());
1612 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1613 writer1.QueueSpan(config2_.span());
1614
1615 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
1616 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1617 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1618
1619 writer0.QueueSizedFlatbuffer(
1620 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1621 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1622 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1623
1624 writer0.QueueSizedFlatbuffer(
1625 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1626 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1627 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1628 }
1629
1630 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1631
1632 ASSERT_EQ(parts[0].logger_node, "pi1");
1633 ASSERT_EQ(parts[1].logger_node, "pi2");
1634
Austin Schuh79b30942021-01-24 22:32:21 -08001635 size_t mapper1_count = 0;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001636 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001637 mapper1.set_timestamp_callback(
1638 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001639
1640 {
1641 std::deque<TimestampedMessage> output1;
1642
1643 ASSERT_TRUE(mapper1.Front() != nullptr);
1644 output1.emplace_back(std::move(*mapper1.Front()));
1645 mapper1.PopFront();
1646 ASSERT_TRUE(mapper1.Front() != nullptr);
1647 output1.emplace_back(std::move(*mapper1.Front()));
1648 mapper1.PopFront();
1649 ASSERT_TRUE(mapper1.Front() != nullptr);
1650 output1.emplace_back(std::move(*mapper1.Front()));
1651 mapper1.PopFront();
1652 ASSERT_TRUE(mapper1.Front() == nullptr);
1653
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001654 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1655 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001656 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001657 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001658
1659 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1660 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001661 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001662 EXPECT_FALSE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001663
1664 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1665 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001666 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001667 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001668 }
Austin Schuh79b30942021-01-24 22:32:21 -08001669 EXPECT_EQ(mapper1_count, 3u);
1670}
1671
1672// Tests that we can queue messages and call the timestamp callback for both
1673// nodes.
1674TEST_F(TimestampMapperTest, QueueUntilNode0) {
1675 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1676 {
1677 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1678 writer0.QueueSpan(config0_.span());
1679 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1680 writer1.QueueSpan(config2_.span());
1681
1682 writer0.QueueSizedFlatbuffer(
1683 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1684 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1685 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1686
1687 writer0.QueueSizedFlatbuffer(
1688 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
1689 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1690 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1691
1692 writer0.QueueSizedFlatbuffer(
1693 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
1694 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1695 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1696
1697 writer0.QueueSizedFlatbuffer(
1698 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
1699 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1700 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1701 }
1702
1703 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1704
1705 ASSERT_EQ(parts[0].logger_node, "pi1");
1706 ASSERT_EQ(parts[1].logger_node, "pi2");
1707
1708 size_t mapper0_count = 0;
1709 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1710 mapper0.set_timestamp_callback(
1711 [&](TimestampedMessage *) { ++mapper0_count; });
1712 size_t mapper1_count = 0;
1713 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1714 mapper1.set_timestamp_callback(
1715 [&](TimestampedMessage *) { ++mapper1_count; });
1716
1717 mapper0.AddPeer(&mapper1);
1718 mapper1.AddPeer(&mapper0);
1719
1720 {
1721 std::deque<TimestampedMessage> output0;
1722
1723 EXPECT_EQ(mapper0_count, 0u);
1724 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001725 mapper0.QueueUntil(
1726 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001727 EXPECT_EQ(mapper0_count, 3u);
1728 EXPECT_EQ(mapper1_count, 0u);
1729
1730 ASSERT_TRUE(mapper0.Front() != nullptr);
1731 EXPECT_EQ(mapper0_count, 3u);
1732 EXPECT_EQ(mapper1_count, 0u);
1733
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001734 mapper0.QueueUntil(
1735 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001736 EXPECT_EQ(mapper0_count, 3u);
1737 EXPECT_EQ(mapper1_count, 0u);
1738
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001739 mapper0.QueueUntil(
1740 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001741 EXPECT_EQ(mapper0_count, 4u);
1742 EXPECT_EQ(mapper1_count, 0u);
1743
1744 output0.emplace_back(std::move(*mapper0.Front()));
1745 mapper0.PopFront();
1746 output0.emplace_back(std::move(*mapper0.Front()));
1747 mapper0.PopFront();
1748 output0.emplace_back(std::move(*mapper0.Front()));
1749 mapper0.PopFront();
1750 output0.emplace_back(std::move(*mapper0.Front()));
1751 mapper0.PopFront();
1752
1753 EXPECT_EQ(mapper0_count, 4u);
1754 EXPECT_EQ(mapper1_count, 0u);
1755
1756 ASSERT_TRUE(mapper0.Front() == nullptr);
1757
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001758 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1759 EXPECT_EQ(output0[0].monotonic_event_time.time,
1760 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001761 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001762
1763 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1764 EXPECT_EQ(output0[1].monotonic_event_time.time,
1765 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001766 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001767
1768 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1769 EXPECT_EQ(output0[2].monotonic_event_time.time,
1770 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001771 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001772
1773 EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
1774 EXPECT_EQ(output0[3].monotonic_event_time.time,
1775 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001776 EXPECT_TRUE(output0[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001777 }
1778
1779 {
1780 SCOPED_TRACE("Trying node1 now");
1781 std::deque<TimestampedMessage> output1;
1782
1783 EXPECT_EQ(mapper0_count, 4u);
1784 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001785 mapper1.QueueUntil(BootTimestamp{
1786 .boot = 0,
1787 .time = e + chrono::seconds(100) + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001788 EXPECT_EQ(mapper0_count, 4u);
1789 EXPECT_EQ(mapper1_count, 3u);
1790
1791 ASSERT_TRUE(mapper1.Front() != nullptr);
1792 EXPECT_EQ(mapper0_count, 4u);
1793 EXPECT_EQ(mapper1_count, 3u);
1794
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001795 mapper1.QueueUntil(BootTimestamp{
1796 .boot = 0,
1797 .time = e + chrono::seconds(100) + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001798 EXPECT_EQ(mapper0_count, 4u);
1799 EXPECT_EQ(mapper1_count, 3u);
1800
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001801 mapper1.QueueUntil(BootTimestamp{
1802 .boot = 0,
1803 .time = e + chrono::seconds(100) + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001804 EXPECT_EQ(mapper0_count, 4u);
1805 EXPECT_EQ(mapper1_count, 4u);
1806
1807 ASSERT_TRUE(mapper1.Front() != nullptr);
1808 EXPECT_EQ(mapper0_count, 4u);
1809 EXPECT_EQ(mapper1_count, 4u);
1810
1811 output1.emplace_back(std::move(*mapper1.Front()));
1812 mapper1.PopFront();
1813 ASSERT_TRUE(mapper1.Front() != nullptr);
1814 output1.emplace_back(std::move(*mapper1.Front()));
1815 mapper1.PopFront();
1816 ASSERT_TRUE(mapper1.Front() != nullptr);
1817 output1.emplace_back(std::move(*mapper1.Front()));
1818 mapper1.PopFront();
1819 ASSERT_TRUE(mapper1.Front() != nullptr);
1820 output1.emplace_back(std::move(*mapper1.Front()));
1821 mapper1.PopFront();
1822
1823 EXPECT_EQ(mapper0_count, 4u);
1824 EXPECT_EQ(mapper1_count, 4u);
1825
1826 ASSERT_TRUE(mapper1.Front() == nullptr);
1827
1828 EXPECT_EQ(mapper0_count, 4u);
1829 EXPECT_EQ(mapper1_count, 4u);
1830
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001831 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1832 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001833 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001834 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001835
1836 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1837 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001838 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001839 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001840
1841 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1842 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001843 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001844 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001845
1846 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1847 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001848 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001849 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001850 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08001851}
1852
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001853class BootMergerTest : public SortingElementTest {
1854 public:
1855 BootMergerTest()
1856 : SortingElementTest(),
1857 boot0_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07001858 /* 100ms */
1859 "max_out_of_order_duration": 100000000,
1860 "node": {
1861 "name": "pi2"
1862 },
1863 "logger_node": {
1864 "name": "pi1"
1865 },
1866 "monotonic_start_time": 1000000,
1867 "realtime_start_time": 1000000000000,
1868 "logger_monotonic_start_time": 1000000,
1869 "logger_realtime_start_time": 1000000000000,
1870 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
1871 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
1872 "parts_index": 0,
1873 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
1874 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07001875 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
1876 "boot_uuids": [
1877 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
1878 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
1879 ""
1880 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001881})")),
1882 boot1_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07001883 /* 100ms */
1884 "max_out_of_order_duration": 100000000,
1885 "node": {
1886 "name": "pi2"
1887 },
1888 "logger_node": {
1889 "name": "pi1"
1890 },
1891 "monotonic_start_time": 1000000,
1892 "realtime_start_time": 1000000000000,
1893 "logger_monotonic_start_time": 1000000,
1894 "logger_realtime_start_time": 1000000000000,
1895 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
1896 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
1897 "parts_index": 1,
1898 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
1899 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07001900 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
1901 "boot_uuids": [
1902 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
1903 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
1904 ""
1905 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001906})")) {}
Austin Schuh8bee6882021-06-28 21:03:28 -07001907
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001908 protected:
1909 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
1910 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
1911};
1912
1913// This tests that we can properly sort a multi-node log file which has the old
1914// (and buggy) timestamps in the header, and the non-resetting parts_index.
1915// These make it so we can just bairly figure out what happened first and what
1916// happened second, but not in a way that is robust to multiple nodes rebooting.
1917TEST_F(BootMergerTest, OldReboot) {
Austin Schuh8bee6882021-06-28 21:03:28 -07001918 {
1919 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001920 writer.QueueSpan(boot0_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07001921 }
1922 {
1923 DetachedBufferWriter writer(logfile1_, std::make_unique<DummyEncoder>());
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001924 writer.QueueSpan(boot1_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07001925 }
1926
1927 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1928
1929 ASSERT_EQ(parts.size(), 1u);
1930 ASSERT_EQ(parts[0].parts.size(), 2u);
1931
1932 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
1933 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001934 boot0_.message().source_node_boot_uuid()->string_view());
Austin Schuh8bee6882021-06-28 21:03:28 -07001935
1936 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
1937 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001938 boot1_.message().source_node_boot_uuid()->string_view());
1939}
1940
1941// This tests that we can produce messages ordered across a reboot.
1942TEST_F(BootMergerTest, SortAcrossReboot) {
1943 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1944 {
1945 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
1946 writer.QueueSpan(boot0_.span());
1947 writer.QueueSizedFlatbuffer(
1948 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1949 writer.QueueSizedFlatbuffer(
1950 MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
1951 }
1952 {
1953 DetachedBufferWriter writer(logfile1_, std::make_unique<DummyEncoder>());
1954 writer.QueueSpan(boot1_.span());
1955 writer.QueueSizedFlatbuffer(
1956 MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
1957 writer.QueueSizedFlatbuffer(
1958 MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
1959 }
1960
1961 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1962 ASSERT_EQ(parts.size(), 1u);
1963 ASSERT_EQ(parts[0].parts.size(), 2u);
1964
1965 BootMerger merger(FilterPartsForNode(parts, "pi2"));
1966
1967 EXPECT_EQ(merger.node(), 1u);
1968
1969 std::vector<Message> output;
1970 for (int i = 0; i < 4; ++i) {
1971 ASSERT_TRUE(merger.Front() != nullptr);
1972 output.emplace_back(std::move(*merger.Front()));
1973 merger.PopFront();
1974 }
1975
1976 ASSERT_TRUE(merger.Front() == nullptr);
1977
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001978 EXPECT_EQ(output[0].timestamp.boot, 0u);
1979 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
1980 EXPECT_EQ(output[1].timestamp.boot, 0u);
1981 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(2000));
1982
1983 EXPECT_EQ(output[2].timestamp.boot, 1u);
1984 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(100));
1985 EXPECT_EQ(output[3].timestamp.boot, 1u);
1986 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
Austin Schuh8bee6882021-06-28 21:03:28 -07001987}
1988
Austin Schuh48507722021-07-17 17:29:24 -07001989class RebootTimestampMapperTest : public SortingElementTest {
1990 public:
1991 RebootTimestampMapperTest()
1992 : SortingElementTest(),
1993 boot0a_(MakeHeader(config_, R"({
1994 /* 100ms */
1995 "max_out_of_order_duration": 100000000,
1996 "node": {
1997 "name": "pi1"
1998 },
1999 "logger_node": {
2000 "name": "pi1"
2001 },
2002 "monotonic_start_time": 1000000,
2003 "realtime_start_time": 1000000000000,
2004 "logger_monotonic_start_time": 1000000,
2005 "logger_realtime_start_time": 1000000000000,
2006 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2007 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2008 "parts_index": 0,
2009 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2010 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2011 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2012 "boot_uuids": [
2013 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2014 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2015 ""
2016 ]
2017})")),
2018 boot0b_(MakeHeader(config_, R"({
2019 /* 100ms */
2020 "max_out_of_order_duration": 100000000,
2021 "node": {
2022 "name": "pi1"
2023 },
2024 "logger_node": {
2025 "name": "pi1"
2026 },
2027 "monotonic_start_time": 1000000,
2028 "realtime_start_time": 1000000000000,
2029 "logger_monotonic_start_time": 1000000,
2030 "logger_realtime_start_time": 1000000000000,
2031 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2032 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2033 "parts_index": 1,
2034 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2035 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2036 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2037 "boot_uuids": [
2038 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2039 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2040 ""
2041 ]
2042})")),
2043 boot1a_(MakeHeader(config_, R"({
2044 /* 100ms */
2045 "max_out_of_order_duration": 100000000,
2046 "node": {
2047 "name": "pi2"
2048 },
2049 "logger_node": {
2050 "name": "pi1"
2051 },
2052 "monotonic_start_time": 1000000,
2053 "realtime_start_time": 1000000000000,
2054 "logger_monotonic_start_time": 1000000,
2055 "logger_realtime_start_time": 1000000000000,
2056 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2057 "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
2058 "parts_index": 0,
2059 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2060 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2061 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2062 "boot_uuids": [
2063 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2064 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2065 ""
2066 ]
2067})")),
2068 boot1b_(MakeHeader(config_, R"({
2069 /* 100ms */
2070 "max_out_of_order_duration": 100000000,
2071 "node": {
2072 "name": "pi2"
2073 },
2074 "logger_node": {
2075 "name": "pi1"
2076 },
2077 "monotonic_start_time": 1000000,
2078 "realtime_start_time": 1000000000000,
2079 "logger_monotonic_start_time": 1000000,
2080 "logger_realtime_start_time": 1000000000000,
2081 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2082 "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
2083 "parts_index": 1,
2084 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2085 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2086 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2087 "boot_uuids": [
2088 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2089 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2090 ""
2091 ]
2092})")) {}
2093
2094 protected:
2095 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
2096 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
2097 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
2098 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
2099};
2100
Austin Schuh48507722021-07-17 17:29:24 -07002101// Tests that we can match timestamps on delivered messages in the presence of
2102// reboots on the node receiving timestamps.
2103TEST_F(RebootTimestampMapperTest, ReadNode0First) {
2104 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2105 {
2106 DetachedBufferWriter writer0a(logfile0_, std::make_unique<DummyEncoder>());
2107 writer0a.QueueSpan(boot0a_.span());
2108 DetachedBufferWriter writer0b(logfile1_, std::make_unique<DummyEncoder>());
2109 writer0b.QueueSpan(boot0b_.span());
2110 DetachedBufferWriter writer1a(logfile2_, std::make_unique<DummyEncoder>());
2111 writer1a.QueueSpan(boot1a_.span());
2112 DetachedBufferWriter writer1b(logfile3_, std::make_unique<DummyEncoder>());
2113 writer1b.QueueSpan(boot1b_.span());
2114
2115 writer0a.QueueSizedFlatbuffer(
2116 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
2117 writer1a.QueueSizedFlatbuffer(MakeTimestampMessage(
2118 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
2119 e + chrono::milliseconds(1001)));
2120
Austin Schuh58646e22021-08-23 23:51:46 -07002121 writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
2122 e + chrono::milliseconds(1000), 0, chrono::seconds(21),
2123 e + chrono::milliseconds(2001)));
2124
Austin Schuh48507722021-07-17 17:29:24 -07002125 writer0b.QueueSizedFlatbuffer(
2126 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
2127 writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
2128 e + chrono::milliseconds(2000), 0, chrono::seconds(20),
2129 e + chrono::milliseconds(2001)));
2130
2131 writer0b.QueueSizedFlatbuffer(
2132 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
2133 writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
2134 e + chrono::milliseconds(3000), 0, chrono::seconds(20),
2135 e + chrono::milliseconds(3001)));
2136 }
2137
Austin Schuh58646e22021-08-23 23:51:46 -07002138 const std::vector<LogFile> parts =
2139 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Austin Schuh48507722021-07-17 17:29:24 -07002140
2141 for (const auto &x : parts) {
2142 LOG(INFO) << x;
2143 }
2144 ASSERT_EQ(parts.size(), 1u);
2145 ASSERT_EQ(parts[0].logger_node, "pi1");
2146
2147 size_t mapper0_count = 0;
2148 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2149 mapper0.set_timestamp_callback(
2150 [&](TimestampedMessage *) { ++mapper0_count; });
2151 size_t mapper1_count = 0;
2152 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2153 mapper1.set_timestamp_callback(
2154 [&](TimestampedMessage *) { ++mapper1_count; });
2155
2156 mapper0.AddPeer(&mapper1);
2157 mapper1.AddPeer(&mapper0);
2158
2159 {
2160 std::deque<TimestampedMessage> output0;
2161
2162 EXPECT_EQ(mapper0_count, 0u);
2163 EXPECT_EQ(mapper1_count, 0u);
2164 ASSERT_TRUE(mapper0.Front() != nullptr);
2165 EXPECT_EQ(mapper0_count, 1u);
2166 EXPECT_EQ(mapper1_count, 0u);
2167 output0.emplace_back(std::move(*mapper0.Front()));
2168 mapper0.PopFront();
2169 EXPECT_TRUE(mapper0.started());
2170 EXPECT_EQ(mapper0_count, 1u);
2171 EXPECT_EQ(mapper1_count, 0u);
2172
2173 ASSERT_TRUE(mapper0.Front() != nullptr);
2174 EXPECT_EQ(mapper0_count, 2u);
2175 EXPECT_EQ(mapper1_count, 0u);
2176 output0.emplace_back(std::move(*mapper0.Front()));
2177 mapper0.PopFront();
2178 EXPECT_TRUE(mapper0.started());
2179
2180 ASSERT_TRUE(mapper0.Front() != nullptr);
2181 output0.emplace_back(std::move(*mapper0.Front()));
2182 mapper0.PopFront();
2183 EXPECT_TRUE(mapper0.started());
2184
2185 EXPECT_EQ(mapper0_count, 3u);
2186 EXPECT_EQ(mapper1_count, 0u);
2187
2188 ASSERT_TRUE(mapper0.Front() == nullptr);
2189
2190 LOG(INFO) << output0[0];
2191 LOG(INFO) << output0[1];
2192 LOG(INFO) << output0[2];
2193
2194 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2195 EXPECT_EQ(output0[0].monotonic_event_time.time,
2196 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002197 EXPECT_EQ(output0[0].queue_index,
2198 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002199 EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
2200 EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002201 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002202
2203 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2204 EXPECT_EQ(output0[1].monotonic_event_time.time,
2205 e + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002206 EXPECT_EQ(output0[1].queue_index,
2207 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002208 EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
2209 EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002210 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002211
2212 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2213 EXPECT_EQ(output0[2].monotonic_event_time.time,
2214 e + chrono::milliseconds(3000));
Austin Schuh58646e22021-08-23 23:51:46 -07002215 EXPECT_EQ(output0[2].queue_index,
2216 (BootQueueIndex{.boot = 0u, .index = 2u}));
Austin Schuh48507722021-07-17 17:29:24 -07002217 EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
2218 EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002219 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002220 }
2221
2222 {
2223 SCOPED_TRACE("Trying node1 now");
2224 std::deque<TimestampedMessage> output1;
2225
2226 EXPECT_EQ(mapper0_count, 3u);
2227 EXPECT_EQ(mapper1_count, 0u);
2228
2229 ASSERT_TRUE(mapper1.Front() != nullptr);
2230 EXPECT_EQ(mapper0_count, 3u);
2231 EXPECT_EQ(mapper1_count, 1u);
2232 output1.emplace_back(std::move(*mapper1.Front()));
2233 mapper1.PopFront();
2234 EXPECT_TRUE(mapper1.started());
2235 EXPECT_EQ(mapper0_count, 3u);
2236 EXPECT_EQ(mapper1_count, 1u);
2237
2238 ASSERT_TRUE(mapper1.Front() != nullptr);
2239 EXPECT_EQ(mapper0_count, 3u);
2240 EXPECT_EQ(mapper1_count, 2u);
2241 output1.emplace_back(std::move(*mapper1.Front()));
2242 mapper1.PopFront();
2243 EXPECT_TRUE(mapper1.started());
2244
2245 ASSERT_TRUE(mapper1.Front() != nullptr);
2246 output1.emplace_back(std::move(*mapper1.Front()));
2247 mapper1.PopFront();
2248 EXPECT_TRUE(mapper1.started());
2249
Austin Schuh58646e22021-08-23 23:51:46 -07002250 ASSERT_TRUE(mapper1.Front() != nullptr);
2251 output1.emplace_back(std::move(*mapper1.Front()));
2252 mapper1.PopFront();
2253 EXPECT_TRUE(mapper1.started());
2254
Austin Schuh48507722021-07-17 17:29:24 -07002255 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002256 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002257
2258 ASSERT_TRUE(mapper1.Front() == nullptr);
2259
2260 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002261 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002262
2263 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2264 EXPECT_EQ(output1[0].monotonic_event_time.time,
2265 e + chrono::seconds(100) + chrono::milliseconds(1000));
2266 EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
2267 EXPECT_EQ(output1[0].monotonic_remote_time.time,
2268 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002269 EXPECT_EQ(output1[0].remote_queue_index,
2270 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002271 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
2272 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
2273 e + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002274 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002275
2276 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2277 EXPECT_EQ(output1[1].monotonic_event_time.time,
2278 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002279 EXPECT_EQ(output1[1].remote_queue_index,
2280 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002281 EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
2282 EXPECT_EQ(output1[1].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002283 e + chrono::milliseconds(1000));
Austin Schuh48507722021-07-17 17:29:24 -07002284 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
2285 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
2286 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002287 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002288
2289 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2290 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002291 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh48507722021-07-17 17:29:24 -07002292 EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
2293 EXPECT_EQ(output1[2].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002294 e + chrono::milliseconds(2000));
2295 EXPECT_EQ(output1[2].remote_queue_index,
2296 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002297 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
2298 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002299 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002300 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002301
Austin Schuh58646e22021-08-23 23:51:46 -07002302 EXPECT_EQ(output1[3].monotonic_event_time.boot, 1u);
2303 EXPECT_EQ(output1[3].monotonic_event_time.time,
2304 e + chrono::seconds(20) + chrono::milliseconds(3000));
2305 EXPECT_EQ(output1[3].monotonic_remote_time.boot, 0u);
2306 EXPECT_EQ(output1[3].monotonic_remote_time.time,
2307 e + chrono::milliseconds(3000));
2308 EXPECT_EQ(output1[3].remote_queue_index,
2309 (BootQueueIndex{.boot = 0u, .index = 2u}));
2310 EXPECT_EQ(output1[3].monotonic_timestamp_time.boot, 0u);
2311 EXPECT_EQ(output1[3].monotonic_timestamp_time.time,
2312 e + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002313 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh58646e22021-08-23 23:51:46 -07002314
Austin Schuh48507722021-07-17 17:29:24 -07002315 LOG(INFO) << output1[0];
2316 LOG(INFO) << output1[1];
2317 LOG(INFO) << output1[2];
Austin Schuh58646e22021-08-23 23:51:46 -07002318 LOG(INFO) << output1[3];
Austin Schuh48507722021-07-17 17:29:24 -07002319 }
2320}
2321
2322TEST_F(RebootTimestampMapperTest, Node2Reboot) {
2323 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2324 {
2325 DetachedBufferWriter writer0a(logfile0_, std::make_unique<DummyEncoder>());
2326 writer0a.QueueSpan(boot0a_.span());
2327 DetachedBufferWriter writer0b(logfile1_, std::make_unique<DummyEncoder>());
2328 writer0b.QueueSpan(boot0b_.span());
2329 DetachedBufferWriter writer1a(logfile2_, std::make_unique<DummyEncoder>());
2330 writer1a.QueueSpan(boot1a_.span());
2331 DetachedBufferWriter writer1b(logfile3_, std::make_unique<DummyEncoder>());
2332 writer1b.QueueSpan(boot1b_.span());
2333
2334 writer1a.QueueSizedFlatbuffer(MakeLogMessage(
2335 e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
2336 writer0a.QueueSizedFlatbuffer(MakeTimestampMessage(
2337 e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
2338 chrono::seconds(-100),
2339 e + chrono::seconds(100) + chrono::milliseconds(1001)));
2340
2341 writer1b.QueueSizedFlatbuffer(MakeLogMessage(
2342 e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
2343 writer0b.QueueSizedFlatbuffer(MakeTimestampMessage(
2344 e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
2345 chrono::seconds(-20),
2346 e + chrono::seconds(20) + chrono::milliseconds(2001)));
2347
2348 writer1b.QueueSizedFlatbuffer(MakeLogMessage(
2349 e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
2350 writer0b.QueueSizedFlatbuffer(MakeTimestampMessage(
2351 e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
2352 chrono::seconds(-20),
2353 e + chrono::seconds(20) + chrono::milliseconds(3001)));
2354 }
2355
2356 const std::vector<LogFile> parts =
2357 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2358
2359 for (const auto &x : parts) {
2360 LOG(INFO) << x;
2361 }
2362 ASSERT_EQ(parts.size(), 1u);
2363 ASSERT_EQ(parts[0].logger_node, "pi1");
2364
2365 size_t mapper0_count = 0;
2366 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2367 mapper0.set_timestamp_callback(
2368 [&](TimestampedMessage *) { ++mapper0_count; });
2369 size_t mapper1_count = 0;
2370 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2371 mapper1.set_timestamp_callback(
2372 [&](TimestampedMessage *) { ++mapper1_count; });
2373
2374 mapper0.AddPeer(&mapper1);
2375 mapper1.AddPeer(&mapper0);
2376
2377 {
2378 std::deque<TimestampedMessage> output0;
2379
2380 EXPECT_EQ(mapper0_count, 0u);
2381 EXPECT_EQ(mapper1_count, 0u);
2382 ASSERT_TRUE(mapper0.Front() != nullptr);
2383 EXPECT_EQ(mapper0_count, 1u);
2384 EXPECT_EQ(mapper1_count, 0u);
2385 output0.emplace_back(std::move(*mapper0.Front()));
2386 mapper0.PopFront();
2387 EXPECT_TRUE(mapper0.started());
2388 EXPECT_EQ(mapper0_count, 1u);
2389 EXPECT_EQ(mapper1_count, 0u);
2390
2391 ASSERT_TRUE(mapper0.Front() != nullptr);
2392 EXPECT_EQ(mapper0_count, 2u);
2393 EXPECT_EQ(mapper1_count, 0u);
2394 output0.emplace_back(std::move(*mapper0.Front()));
2395 mapper0.PopFront();
2396 EXPECT_TRUE(mapper0.started());
2397
2398 ASSERT_TRUE(mapper0.Front() != nullptr);
2399 output0.emplace_back(std::move(*mapper0.Front()));
2400 mapper0.PopFront();
2401 EXPECT_TRUE(mapper0.started());
2402
2403 EXPECT_EQ(mapper0_count, 3u);
2404 EXPECT_EQ(mapper1_count, 0u);
2405
2406 ASSERT_TRUE(mapper0.Front() == nullptr);
2407
2408 LOG(INFO) << output0[0];
2409 LOG(INFO) << output0[1];
2410 LOG(INFO) << output0[2];
2411
2412 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2413 EXPECT_EQ(output0[0].monotonic_event_time.time,
2414 e + chrono::milliseconds(1000));
2415 EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
2416 EXPECT_EQ(output0[0].monotonic_remote_time.time,
2417 e + chrono::seconds(100) + chrono::milliseconds(1000));
2418 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
2419 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
2420 e + chrono::seconds(100) + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002421 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002422
2423 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2424 EXPECT_EQ(output0[1].monotonic_event_time.time,
2425 e + chrono::milliseconds(2000));
2426 EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
2427 EXPECT_EQ(output0[1].monotonic_remote_time.time,
2428 e + chrono::seconds(20) + chrono::milliseconds(2000));
2429 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
2430 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
2431 e + chrono::seconds(20) + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002432 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002433
2434 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2435 EXPECT_EQ(output0[2].monotonic_event_time.time,
2436 e + chrono::milliseconds(3000));
2437 EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
2438 EXPECT_EQ(output0[2].monotonic_remote_time.time,
2439 e + chrono::seconds(20) + chrono::milliseconds(3000));
2440 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
2441 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
2442 e + chrono::seconds(20) + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002443 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002444 }
2445
2446 {
2447 SCOPED_TRACE("Trying node1 now");
2448 std::deque<TimestampedMessage> output1;
2449
2450 EXPECT_EQ(mapper0_count, 3u);
2451 EXPECT_EQ(mapper1_count, 0u);
2452
2453 ASSERT_TRUE(mapper1.Front() != nullptr);
2454 EXPECT_EQ(mapper0_count, 3u);
2455 EXPECT_EQ(mapper1_count, 1u);
2456 output1.emplace_back(std::move(*mapper1.Front()));
2457 mapper1.PopFront();
2458 EXPECT_TRUE(mapper1.started());
2459 EXPECT_EQ(mapper0_count, 3u);
2460 EXPECT_EQ(mapper1_count, 1u);
2461
2462 ASSERT_TRUE(mapper1.Front() != nullptr);
2463 EXPECT_EQ(mapper0_count, 3u);
2464 EXPECT_EQ(mapper1_count, 2u);
2465 output1.emplace_back(std::move(*mapper1.Front()));
2466 mapper1.PopFront();
2467 EXPECT_TRUE(mapper1.started());
2468
2469 ASSERT_TRUE(mapper1.Front() != nullptr);
2470 output1.emplace_back(std::move(*mapper1.Front()));
2471 mapper1.PopFront();
2472 EXPECT_TRUE(mapper1.started());
2473
2474 EXPECT_EQ(mapper0_count, 3u);
2475 EXPECT_EQ(mapper1_count, 3u);
2476
2477 ASSERT_TRUE(mapper1.Front() == nullptr);
2478
2479 EXPECT_EQ(mapper0_count, 3u);
2480 EXPECT_EQ(mapper1_count, 3u);
2481
2482 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2483 EXPECT_EQ(output1[0].monotonic_event_time.time,
2484 e + chrono::seconds(100) + chrono::milliseconds(1000));
2485 EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
2486 EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002487 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002488
2489 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2490 EXPECT_EQ(output1[1].monotonic_event_time.time,
2491 e + chrono::seconds(20) + chrono::milliseconds(2000));
2492 EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
2493 EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002494 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002495
2496 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2497 EXPECT_EQ(output1[2].monotonic_event_time.time,
2498 e + chrono::seconds(20) + chrono::milliseconds(3000));
2499 EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
2500 EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002501 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002502
2503 LOG(INFO) << output1[0];
2504 LOG(INFO) << output1[1];
2505 LOG(INFO) << output1[2];
2506 }
2507}
2508
Austin Schuh44c61472021-11-22 21:04:10 -08002509class SortingDeathTest : public SortingElementTest {
2510 public:
2511 SortingDeathTest()
2512 : SortingElementTest(),
2513 part0_(MakeHeader(config_, R"({
2514 /* 100ms */
2515 "max_out_of_order_duration": 100000000,
2516 "node": {
2517 "name": "pi1"
2518 },
2519 "logger_node": {
2520 "name": "pi1"
2521 },
2522 "monotonic_start_time": 1000000,
2523 "realtime_start_time": 1000000000000,
2524 "logger_monotonic_start_time": 1000000,
2525 "logger_realtime_start_time": 1000000000000,
2526 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2527 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2528 "parts_index": 0,
2529 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2530 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2531 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2532 "boot_uuids": [
2533 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2534 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2535 ""
2536 ],
2537 "oldest_remote_monotonic_timestamps": [
2538 9223372036854775807,
2539 9223372036854775807,
2540 9223372036854775807
2541 ],
2542 "oldest_local_monotonic_timestamps": [
2543 9223372036854775807,
2544 9223372036854775807,
2545 9223372036854775807
2546 ],
2547 "oldest_remote_unreliable_monotonic_timestamps": [
2548 9223372036854775807,
2549 0,
2550 9223372036854775807
2551 ],
2552 "oldest_local_unreliable_monotonic_timestamps": [
2553 9223372036854775807,
2554 0,
2555 9223372036854775807
2556 ]
2557})")),
2558 part1_(MakeHeader(config_, R"({
2559 /* 100ms */
2560 "max_out_of_order_duration": 100000000,
2561 "node": {
2562 "name": "pi1"
2563 },
2564 "logger_node": {
2565 "name": "pi1"
2566 },
2567 "monotonic_start_time": 1000000,
2568 "realtime_start_time": 1000000000000,
2569 "logger_monotonic_start_time": 1000000,
2570 "logger_realtime_start_time": 1000000000000,
2571 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2572 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2573 "parts_index": 1,
2574 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2575 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2576 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2577 "boot_uuids": [
2578 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2579 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2580 ""
2581 ],
2582 "oldest_remote_monotonic_timestamps": [
2583 9223372036854775807,
2584 9223372036854775807,
2585 9223372036854775807
2586 ],
2587 "oldest_local_monotonic_timestamps": [
2588 9223372036854775807,
2589 9223372036854775807,
2590 9223372036854775807
2591 ],
2592 "oldest_remote_unreliable_monotonic_timestamps": [
2593 9223372036854775807,
2594 100000,
2595 9223372036854775807
2596 ],
2597 "oldest_local_unreliable_monotonic_timestamps": [
2598 9223372036854775807,
2599 100000,
2600 9223372036854775807
2601 ]
2602})")),
2603 part2_(MakeHeader(config_, R"({
2604 /* 100ms */
2605 "max_out_of_order_duration": 100000000,
2606 "node": {
2607 "name": "pi1"
2608 },
2609 "logger_node": {
2610 "name": "pi1"
2611 },
2612 "monotonic_start_time": 1000000,
2613 "realtime_start_time": 1000000000000,
2614 "logger_monotonic_start_time": 1000000,
2615 "logger_realtime_start_time": 1000000000000,
2616 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2617 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2618 "parts_index": 2,
2619 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2620 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2621 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2622 "boot_uuids": [
2623 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2624 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2625 ""
2626 ],
2627 "oldest_remote_monotonic_timestamps": [
2628 9223372036854775807,
2629 9223372036854775807,
2630 9223372036854775807
2631 ],
2632 "oldest_local_monotonic_timestamps": [
2633 9223372036854775807,
2634 9223372036854775807,
2635 9223372036854775807
2636 ],
2637 "oldest_remote_unreliable_monotonic_timestamps": [
2638 9223372036854775807,
2639 200000,
2640 9223372036854775807
2641 ],
2642 "oldest_local_unreliable_monotonic_timestamps": [
2643 9223372036854775807,
2644 200000,
2645 9223372036854775807
2646 ]
2647})")),
2648 part3_(MakeHeader(config_, R"({
2649 /* 100ms */
2650 "max_out_of_order_duration": 100000000,
2651 "node": {
2652 "name": "pi1"
2653 },
2654 "logger_node": {
2655 "name": "pi1"
2656 },
2657 "monotonic_start_time": 1000000,
2658 "realtime_start_time": 1000000000000,
2659 "logger_monotonic_start_time": 1000000,
2660 "logger_realtime_start_time": 1000000000000,
2661 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2662 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2663 "parts_index": 3,
2664 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2665 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2666 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2667 "boot_uuids": [
2668 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2669 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2670 ""
2671 ],
2672 "oldest_remote_monotonic_timestamps": [
2673 9223372036854775807,
2674 9223372036854775807,
2675 9223372036854775807
2676 ],
2677 "oldest_local_monotonic_timestamps": [
2678 9223372036854775807,
2679 9223372036854775807,
2680 9223372036854775807
2681 ],
2682 "oldest_remote_unreliable_monotonic_timestamps": [
2683 9223372036854775807,
2684 300000,
2685 9223372036854775807
2686 ],
2687 "oldest_local_unreliable_monotonic_timestamps": [
2688 9223372036854775807,
2689 300000,
2690 9223372036854775807
2691 ]
2692})")) {}
2693
2694 protected:
2695 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part0_;
2696 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part1_;
2697 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part2_;
2698 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part3_;
2699};
2700
2701// Tests that if 2 computers go back and forth trying to be the same node, we
2702// die in sorting instead of failing to estimate time.
2703TEST_F(SortingDeathTest, FightingNodes) {
2704 {
2705 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
2706 writer0.QueueSpan(part0_.span());
2707 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
2708 writer1.QueueSpan(part1_.span());
2709 DetachedBufferWriter writer2(logfile2_, std::make_unique<DummyEncoder>());
2710 writer2.QueueSpan(part2_.span());
2711 DetachedBufferWriter writer3(logfile3_, std::make_unique<DummyEncoder>());
2712 writer3.QueueSpan(part3_.span());
2713 }
2714
2715 EXPECT_DEATH(
2716 {
2717 const std::vector<LogFile> parts =
2718 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2719 },
2720 "Found overlapping boots on");
2721}
2722
Austin Schuhc243b422020-10-11 15:35:08 -07002723} // namespace testing
2724} // namespace logger
2725} // namespace aos