blob: 763bb7ff563e0190f6fdd10baf9ffc37f79c06a2 [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
4#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07005
Austin Schuhc41603c2020-10-11 16:17:37 -07006#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -07007#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -08008#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -07009#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070010#include "aos/json_to_flatbuffer.h"
11#include "aos/testing/tmpdir.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070012#include "gtest/gtest.h"
Austin Schuhc243b422020-10-11 15:35:08 -070013
14namespace aos {
15namespace logger {
16namespace testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070017namespace chrono = std::chrono;
Austin Schuhc243b422020-10-11 15:35:08 -070018
Austin Schuhe243aaf2020-10-11 15:46:02 -070019// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070020template <typename T>
21SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
22 const std::string_view data) {
23 flatbuffers::FlatBufferBuilder fbb;
24 fbb.ForceDefaults(true);
25 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
26 return fbb.Release();
27}
28
Austin Schuhe243aaf2020-10-11 15:46:02 -070029// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070030TEST(SpanReaderTest, ReadWrite) {
31 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
32 unlink(logfile.c_str());
33
34 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080035 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070036 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080037 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070038
39 {
40 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080041 writer.QueueSpan(m1.span());
42 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070043 }
44
45 SpanReader reader(logfile);
46
47 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhcf5f6442021-07-06 10:43:28 -070048 EXPECT_EQ(reader.PeekMessage(), m1.span());
49 EXPECT_EQ(reader.PeekMessage(), m1.span());
Austin Schuhadd6eb32020-11-09 21:24:26 -080050 EXPECT_EQ(reader.ReadMessage(), m1.span());
51 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhcf5f6442021-07-06 10:43:28 -070052 EXPECT_EQ(reader.PeekMessage(), absl::Span<const uint8_t>());
Austin Schuhc243b422020-10-11 15:35:08 -070053 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
54}
55
Austin Schuhe243aaf2020-10-11 15:46:02 -070056// Tests that we can actually parse the resulting messages at a basic level
57// through MessageReader.
58TEST(MessageReaderTest, ReadWrite) {
59 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
60 unlink(logfile.c_str());
61
62 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
63 JsonToSizedFlatbuffer<LogFileHeader>(
64 R"({ "max_out_of_order_duration": 100000000 })");
65 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
66 JsonToSizedFlatbuffer<MessageHeader>(
67 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
68 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
69 JsonToSizedFlatbuffer<MessageHeader>(
70 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
71
72 {
73 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080074 writer.QueueSpan(config.span());
75 writer.QueueSpan(m1.span());
76 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -070077 }
78
79 MessageReader reader(logfile);
80
81 EXPECT_EQ(reader.filename(), logfile);
82
83 EXPECT_EQ(
84 reader.max_out_of_order_duration(),
85 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
86 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
87 EXPECT_TRUE(reader.ReadMessage());
88 EXPECT_EQ(reader.newest_timestamp(),
89 monotonic_clock::time_point(chrono::nanoseconds(1)));
90 EXPECT_TRUE(reader.ReadMessage());
91 EXPECT_EQ(reader.newest_timestamp(),
92 monotonic_clock::time_point(chrono::nanoseconds(2)));
93 EXPECT_FALSE(reader.ReadMessage());
94}
95
Austin Schuh32f68492020-11-08 21:45:51 -080096// Tests that we explode when messages are too far out of order.
97TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
98 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
99 unlink(logfile0.c_str());
100
101 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
102 JsonToSizedFlatbuffer<LogFileHeader>(
103 R"({
104 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800105 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800106 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
107 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
108 "parts_index": 0
109})");
110
111 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
112 JsonToSizedFlatbuffer<MessageHeader>(
113 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
114 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
115 JsonToSizedFlatbuffer<MessageHeader>(
116 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
117 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
118 JsonToSizedFlatbuffer<MessageHeader>(
119 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
120
121 {
122 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800123 writer.QueueSpan(config0.span());
124 writer.QueueSpan(m1.span());
125 writer.QueueSpan(m2.span());
126 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800127 }
128
129 const std::vector<LogFile> parts = SortParts({logfile0});
130
131 PartsMessageReader reader(parts[0].parts[0]);
132
133 EXPECT_TRUE(reader.ReadMessage());
134 EXPECT_TRUE(reader.ReadMessage());
135 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
136}
137
Austin Schuhc41603c2020-10-11 16:17:37 -0700138// Tests that we can transparently re-assemble part files with a
139// PartsMessageReader.
140TEST(PartsMessageReaderTest, ReadWrite) {
141 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
142 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
143 unlink(logfile0.c_str());
144 unlink(logfile1.c_str());
145
146 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
147 JsonToSizedFlatbuffer<LogFileHeader>(
148 R"({
149 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800150 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700151 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
152 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
153 "parts_index": 0
154})");
155 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
156 JsonToSizedFlatbuffer<LogFileHeader>(
157 R"({
158 "max_out_of_order_duration": 200000000,
159 "monotonic_start_time": 0,
160 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800161 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700162 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
163 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
164 "parts_index": 1
165})");
166
167 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
168 JsonToSizedFlatbuffer<MessageHeader>(
169 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
170 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
171 JsonToSizedFlatbuffer<MessageHeader>(
172 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
173
174 {
175 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800176 writer.QueueSpan(config0.span());
177 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700178 }
179 {
180 DetachedBufferWriter writer(logfile1, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800181 writer.QueueSpan(config1.span());
182 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700183 }
184
185 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
186
187 PartsMessageReader reader(parts[0].parts[0]);
188
189 EXPECT_EQ(reader.filename(), logfile0);
190
191 // Confirm that the timestamps track, and the filename also updates.
192 // Read the first message.
193 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
194 EXPECT_EQ(
195 reader.max_out_of_order_duration(),
196 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
197 EXPECT_TRUE(reader.ReadMessage());
198 EXPECT_EQ(reader.filename(), logfile0);
199 EXPECT_EQ(reader.newest_timestamp(),
200 monotonic_clock::time_point(chrono::nanoseconds(1)));
201 EXPECT_EQ(
202 reader.max_out_of_order_duration(),
203 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
204
205 // Read the second message.
206 EXPECT_TRUE(reader.ReadMessage());
207 EXPECT_EQ(reader.filename(), logfile1);
208 EXPECT_EQ(reader.newest_timestamp(),
209 monotonic_clock::time_point(chrono::nanoseconds(2)));
210 EXPECT_EQ(
211 reader.max_out_of_order_duration(),
212 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
213
214 // And then confirm that reading again returns no message.
215 EXPECT_FALSE(reader.ReadMessage());
216 EXPECT_EQ(reader.filename(), logfile1);
217 EXPECT_EQ(
218 reader.max_out_of_order_duration(),
219 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800220 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Austin Schuhc41603c2020-10-11 16:17:37 -0700221}
Austin Schuh32f68492020-11-08 21:45:51 -0800222
Austin Schuh1be0ce42020-11-29 22:43:26 -0800223// Tests that Message's operator < works as expected.
224TEST(MessageTest, Sorting) {
225 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
226
227 Message m1{.channel_index = 0,
228 .queue_index = 0,
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700229 .timestamp =
230 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
Austin Schuh48507722021-07-17 17:29:24 -0700231 .monotonic_remote_boot = 0xffffff,
232 .monotonic_timestamp_boot = 0xffffff,
Austin Schuh1be0ce42020-11-29 22:43:26 -0800233 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
234 Message m2{.channel_index = 0,
235 .queue_index = 0,
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700236 .timestamp =
237 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
Austin Schuh48507722021-07-17 17:29:24 -0700238 .monotonic_remote_boot = 0xffffff,
239 .monotonic_timestamp_boot = 0xffffff,
Austin Schuh1be0ce42020-11-29 22:43:26 -0800240 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
241
242 EXPECT_LT(m1, m2);
243 EXPECT_GE(m2, m1);
244
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700245 m1.timestamp.time = e;
246 m2.timestamp.time = e;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800247
248 m1.channel_index = 1;
249 m2.channel_index = 2;
250
251 EXPECT_LT(m1, m2);
252 EXPECT_GE(m2, m1);
253
254 m1.channel_index = 0;
255 m2.channel_index = 0;
256 m1.queue_index = 0;
257 m2.queue_index = 1;
258
259 EXPECT_LT(m1, m2);
260 EXPECT_GE(m2, m1);
261}
262
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800263aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
264 const aos::FlatbufferDetachedBuffer<Configuration> &config,
265 const std::string_view json) {
266 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700267 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800268 flatbuffers::Offset<Configuration> config_offset =
269 aos::CopyFlatBuffer(config, &fbb);
270 LogFileHeader::Builder header_builder(fbb);
271 header_builder.add_configuration(config_offset);
272 fbb.Finish(header_builder.Finish());
273 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
274
275 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
276 JsonToFlatbuffer<LogFileHeader>(json));
277 CHECK(header_updates.Verify());
278 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700279 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800280 fbb2.FinishSizePrefixed(
281 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
282 return fbb2.Release();
283}
284
285class SortingElementTest : public ::testing::Test {
286 public:
287 SortingElementTest()
288 : config_(JsonToFlatbuffer<Configuration>(
289 R"({
290 "channels": [
291 {
292 "name": "/a",
293 "type": "aos.logger.testing.TestMessage",
294 "source_node": "pi1",
295 "destination_nodes": [
296 {
297 "name": "pi2"
298 },
299 {
300 "name": "pi3"
301 }
302 ]
303 },
304 {
305 "name": "/b",
306 "type": "aos.logger.testing.TestMessage",
307 "source_node": "pi1"
308 },
309 {
310 "name": "/c",
311 "type": "aos.logger.testing.TestMessage",
312 "source_node": "pi1"
Austin Schuh48507722021-07-17 17:29:24 -0700313 },
314 {
315 "name": "/d",
316 "type": "aos.logger.testing.TestMessage",
317 "source_node": "pi2",
318 "destination_nodes": [
319 {
320 "name": "pi1"
321 }
322 ]
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800323 }
324 ],
325 "nodes": [
326 {
327 "name": "pi1"
328 },
329 {
330 "name": "pi2"
331 },
332 {
333 "name": "pi3"
334 }
335 ]
336}
337)")),
338 config0_(MakeHeader(config_, R"({
339 /* 100ms */
340 "max_out_of_order_duration": 100000000,
341 "node": {
342 "name": "pi1"
343 },
344 "logger_node": {
345 "name": "pi1"
346 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800347 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800348 "realtime_start_time": 1000000000000,
349 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700350 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
351 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
352 "boot_uuids": [
353 "1d782c63-b3c7-466e-bea9-a01308b43333",
354 "",
355 ""
356 ],
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800357 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
358 "parts_index": 0
359})")),
360 config1_(MakeHeader(config_,
361 R"({
362 /* 100ms */
363 "max_out_of_order_duration": 100000000,
364 "node": {
365 "name": "pi1"
366 },
367 "logger_node": {
368 "name": "pi1"
369 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800370 "monotonic_start_time": 1000000,
371 "realtime_start_time": 1000000000000,
372 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700373 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
374 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
375 "boot_uuids": [
376 "1d782c63-b3c7-466e-bea9-a01308b43333",
377 "",
378 ""
379 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800380 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
381 "parts_index": 0
382})")),
383 config2_(MakeHeader(config_,
384 R"({
385 /* 100ms */
386 "max_out_of_order_duration": 100000000,
387 "node": {
388 "name": "pi2"
389 },
390 "logger_node": {
391 "name": "pi2"
392 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800393 "monotonic_start_time": 0,
394 "realtime_start_time": 1000000000000,
Austin Schuh48507722021-07-17 17:29:24 -0700395 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
396 "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
397 "boot_uuids": [
398 "",
399 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
400 ""
401 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800402 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
403 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
404 "parts_index": 0
405})")),
406 config3_(MakeHeader(config_,
407 R"({
408 /* 100ms */
409 "max_out_of_order_duration": 100000000,
410 "node": {
411 "name": "pi1"
412 },
413 "logger_node": {
414 "name": "pi1"
415 },
416 "monotonic_start_time": 2000000,
417 "realtime_start_time": 1000000000,
418 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
Austin Schuh48507722021-07-17 17:29:24 -0700419 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
420 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
421 "boot_uuids": [
422 "1d782c63-b3c7-466e-bea9-a01308b43333",
423 "",
424 ""
425 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800426 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800427 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800428})")),
429 config4_(MakeHeader(config_,
430 R"({
431 /* 100ms */
432 "max_out_of_order_duration": 100000000,
433 "node": {
434 "name": "pi2"
435 },
436 "logger_node": {
437 "name": "pi1"
438 },
439 "monotonic_start_time": 2000000,
440 "realtime_start_time": 1000000000,
441 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
442 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
Austin Schuh48507722021-07-17 17:29:24 -0700443 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
444 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
445 "boot_uuids": [
446 "1d782c63-b3c7-466e-bea9-a01308b43333",
447 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
448 ""
449 ],
Austin Schuh8bf1e632021-01-02 22:41:04 -0800450 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800451})")) {
452 unlink(logfile0_.c_str());
453 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800454 unlink(logfile2_.c_str());
Austin Schuh48507722021-07-17 17:29:24 -0700455 unlink(logfile3_.c_str());
Austin Schuh921b2562021-07-31 19:37:03 -0700456 queue_index_.resize(config_.message().channels()->size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800457 }
458
459 protected:
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800460 flatbuffers::DetachedBuffer MakeLogMessage(
461 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
462 int value) {
463 flatbuffers::FlatBufferBuilder message_fbb;
464 message_fbb.ForceDefaults(true);
465 TestMessage::Builder test_message_builder(message_fbb);
466 test_message_builder.add_value(value);
467 message_fbb.Finish(test_message_builder.Finish());
468
469 aos::Context context;
470 context.monotonic_event_time = monotonic_now;
471 context.realtime_event_time = aos::realtime_clock::epoch() +
472 chrono::seconds(1000) +
473 monotonic_now.time_since_epoch();
Austin Schuh921b2562021-07-31 19:37:03 -0700474 CHECK_LT(static_cast<size_t>(channel_index), queue_index_.size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800475 context.queue_index = queue_index_[channel_index];
476 context.size = message_fbb.GetSize();
477 context.data = message_fbb.GetBufferPointer();
478
479 ++queue_index_[channel_index];
480
481 flatbuffers::FlatBufferBuilder fbb;
482 fbb.FinishSizePrefixed(
483 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
484
485 return fbb.Release();
486 }
487
488 flatbuffers::DetachedBuffer MakeTimestampMessage(
489 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800490 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
491 monotonic_clock::time_point monotonic_timestamp_time =
492 monotonic_clock::min_time) {
493 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800494 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800495
496 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800497 fbb.ForceDefaults(true);
498
499 logger::MessageHeader::Builder message_header_builder(fbb);
500
501 message_header_builder.add_channel_index(channel_index);
502
503 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
504 100);
505 message_header_builder.add_monotonic_sent_time(
506 monotonic_sent_time.time_since_epoch().count());
507 message_header_builder.add_realtime_sent_time(
508 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
509 monotonic_sent_time.time_since_epoch())
510 .time_since_epoch()
511 .count());
512
513 message_header_builder.add_monotonic_remote_time(
514 sender_monotonic_now.time_since_epoch().count());
515 message_header_builder.add_realtime_remote_time(
516 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
517 sender_monotonic_now.time_since_epoch())
518 .time_since_epoch()
519 .count());
520 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
521 1);
522
523 if (monotonic_timestamp_time != monotonic_clock::min_time) {
524 message_header_builder.add_monotonic_timestamp_time(
525 monotonic_timestamp_time.time_since_epoch().count());
526 }
527
528 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800529 LOG(INFO) << aos::FlatbufferToJson(
530 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
531 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
532
533 return fbb.Release();
534 }
535
536 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
537 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800538 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh48507722021-07-17 17:29:24 -0700539 const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800540
541 const aos::FlatbufferDetachedBuffer<Configuration> config_;
542 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
543 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800544 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
545 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800546 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800547
548 std::vector<uint32_t> queue_index_;
549};
550
551using LogPartsSorterTest = SortingElementTest;
552using LogPartsSorterDeathTest = LogPartsSorterTest;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800553using NodeMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800554using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800555
556// Tests that we can pull messages out of a log sorted in order.
557TEST_F(LogPartsSorterTest, Pull) {
558 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
559 {
560 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
561 writer.QueueSpan(config0_.span());
562 writer.QueueSizedFlatbuffer(
563 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
564 writer.QueueSizedFlatbuffer(
565 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
566 writer.QueueSizedFlatbuffer(
567 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
568 writer.QueueSizedFlatbuffer(
569 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
570 }
571
572 const std::vector<LogFile> parts = SortParts({logfile0_});
573
574 LogPartsSorter parts_sorter(parts[0].parts[0]);
575
576 // Confirm we aren't sorted until any time until the message is popped.
577 // Peeking shouldn't change the sorted until time.
578 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
579
580 std::deque<Message> output;
581
582 ASSERT_TRUE(parts_sorter.Front() != nullptr);
583 output.emplace_back(std::move(*parts_sorter.Front()));
584 parts_sorter.PopFront();
585 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
586
587 ASSERT_TRUE(parts_sorter.Front() != nullptr);
588 output.emplace_back(std::move(*parts_sorter.Front()));
589 parts_sorter.PopFront();
590 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
591
592 ASSERT_TRUE(parts_sorter.Front() != nullptr);
593 output.emplace_back(std::move(*parts_sorter.Front()));
594 parts_sorter.PopFront();
595 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
596
597 ASSERT_TRUE(parts_sorter.Front() != nullptr);
598 output.emplace_back(std::move(*parts_sorter.Front()));
599 parts_sorter.PopFront();
600 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
601
602 ASSERT_TRUE(parts_sorter.Front() == nullptr);
603
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700604 EXPECT_EQ(output[0].timestamp.boot, 0);
605 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
606 EXPECT_EQ(output[1].timestamp.boot, 0);
607 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1000));
608 EXPECT_EQ(output[2].timestamp.boot, 0);
609 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1901));
610 EXPECT_EQ(output[3].timestamp.boot, 0);
611 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800612}
613
Austin Schuhb000de62020-12-03 22:00:40 -0800614// Tests that we can pull messages out of a log sorted in order.
615TEST_F(LogPartsSorterTest, WayBeforeStart) {
616 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
617 {
618 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
619 writer.QueueSpan(config0_.span());
620 writer.QueueSizedFlatbuffer(
621 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
622 writer.QueueSizedFlatbuffer(
623 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
624 writer.QueueSizedFlatbuffer(
625 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
626 writer.QueueSizedFlatbuffer(
627 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
628 writer.QueueSizedFlatbuffer(
629 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
630 }
631
632 const std::vector<LogFile> parts = SortParts({logfile0_});
633
634 LogPartsSorter parts_sorter(parts[0].parts[0]);
635
636 // Confirm we aren't sorted until any time until the message is popped.
637 // Peeking shouldn't change the sorted until time.
638 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
639
640 std::deque<Message> output;
641
642 for (monotonic_clock::time_point t :
643 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
644 e + chrono::milliseconds(1900), monotonic_clock::max_time,
645 monotonic_clock::max_time}) {
646 ASSERT_TRUE(parts_sorter.Front() != nullptr);
647 output.emplace_back(std::move(*parts_sorter.Front()));
648 parts_sorter.PopFront();
649 EXPECT_EQ(parts_sorter.sorted_until(), t);
650 }
651
652 ASSERT_TRUE(parts_sorter.Front() == nullptr);
653
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700654 EXPECT_EQ(output[0].timestamp.boot, 0u);
655 EXPECT_EQ(output[0].timestamp.time, e - chrono::milliseconds(1000));
656 EXPECT_EQ(output[1].timestamp.boot, 0u);
657 EXPECT_EQ(output[1].timestamp.time, e - chrono::milliseconds(500));
658 EXPECT_EQ(output[2].timestamp.boot, 0u);
659 EXPECT_EQ(output[2].timestamp.time, e - chrono::milliseconds(10));
660 EXPECT_EQ(output[3].timestamp.boot, 0u);
661 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(1901));
662 EXPECT_EQ(output[4].timestamp.boot, 0u);
663 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuhb000de62020-12-03 22:00:40 -0800664}
665
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800666// Tests that messages too far out of order trigger death.
667TEST_F(LogPartsSorterDeathTest, Pull) {
668 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
669 {
670 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
671 writer.QueueSpan(config0_.span());
672 writer.QueueSizedFlatbuffer(
673 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
674 writer.QueueSizedFlatbuffer(
675 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
676 writer.QueueSizedFlatbuffer(
677 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
678 // The following message is too far out of order and will trigger the CHECK.
679 writer.QueueSizedFlatbuffer(
680 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
681 }
682
683 const std::vector<LogFile> parts = SortParts({logfile0_});
684
685 LogPartsSorter parts_sorter(parts[0].parts[0]);
686
687 // Confirm we aren't sorted until any time until the message is popped.
688 // Peeking shouldn't change the sorted until time.
689 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
690 std::deque<Message> output;
691
692 ASSERT_TRUE(parts_sorter.Front() != nullptr);
693 parts_sorter.PopFront();
694 ASSERT_TRUE(parts_sorter.Front() != nullptr);
695 ASSERT_TRUE(parts_sorter.Front() != nullptr);
696 parts_sorter.PopFront();
697
Austin Schuha040c3f2021-02-13 16:09:07 -0800698 EXPECT_DEATH({ parts_sorter.Front(); }, "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800699}
700
Austin Schuh8f52ed52020-11-30 23:12:39 -0800701// Tests that we can merge data from 2 separate files, including duplicate data.
702TEST_F(NodeMergerTest, TwoFileMerger) {
703 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
704 {
705 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
706 writer0.QueueSpan(config0_.span());
707 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
708 writer1.QueueSpan(config1_.span());
709
710 writer0.QueueSizedFlatbuffer(
711 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
712 writer1.QueueSizedFlatbuffer(
713 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
714
715 writer0.QueueSizedFlatbuffer(
716 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
717 writer1.QueueSizedFlatbuffer(
718 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
719
720 // Make a duplicate!
721 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
722 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
723 writer0.QueueSpan(msg.span());
724 writer1.QueueSpan(msg.span());
725
726 writer1.QueueSizedFlatbuffer(
727 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
728 }
729
730 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Austin Schuhd2f96102020-12-01 20:27:29 -0800731 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800732
Austin Schuhd2f96102020-12-01 20:27:29 -0800733 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800734
735 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
736
737 std::deque<Message> output;
738
739 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
740 ASSERT_TRUE(merger.Front() != nullptr);
741 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
742
743 output.emplace_back(std::move(*merger.Front()));
744 merger.PopFront();
745 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
746
747 ASSERT_TRUE(merger.Front() != nullptr);
748 output.emplace_back(std::move(*merger.Front()));
749 merger.PopFront();
750 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
751
752 ASSERT_TRUE(merger.Front() != nullptr);
753 output.emplace_back(std::move(*merger.Front()));
754 merger.PopFront();
755 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
756
757 ASSERT_TRUE(merger.Front() != nullptr);
758 output.emplace_back(std::move(*merger.Front()));
759 merger.PopFront();
760 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
761
762 ASSERT_TRUE(merger.Front() != nullptr);
763 output.emplace_back(std::move(*merger.Front()));
764 merger.PopFront();
765 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
766
767 ASSERT_TRUE(merger.Front() != nullptr);
768 output.emplace_back(std::move(*merger.Front()));
769 merger.PopFront();
770 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
771
772 ASSERT_TRUE(merger.Front() == nullptr);
773
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700774 EXPECT_EQ(output[0].timestamp.boot, 0u);
775 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
776 EXPECT_EQ(output[1].timestamp.boot, 0u);
777 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1001));
778 EXPECT_EQ(output[2].timestamp.boot, 0u);
779 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1002));
780 EXPECT_EQ(output[3].timestamp.boot, 0u);
781 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
782 EXPECT_EQ(output[4].timestamp.boot, 0u);
783 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(3000));
784 EXPECT_EQ(output[5].timestamp.boot, 0u);
785 EXPECT_EQ(output[5].timestamp.time, e + chrono::milliseconds(3002));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800786}
787
Austin Schuh8bf1e632021-01-02 22:41:04 -0800788// Tests that we can merge timestamps with various combinations of
789// monotonic_timestamp_time.
790TEST_F(NodeMergerTest, TwoFileTimestampMerger) {
791 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
792 {
793 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
794 writer0.QueueSpan(config0_.span());
795 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
796 writer1.QueueSpan(config1_.span());
797
798 // Neither has it.
799 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
800 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
801 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
802 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
803 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
804
805 // First only has it.
806 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
807 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
808 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
809 e + chrono::nanoseconds(971)));
810 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
811 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
812
813 // Second only has it.
814 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
815 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
816 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
817 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
818 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
819 e + chrono::nanoseconds(972)));
820
821 // Both have it.
822 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
823 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
824 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
825 e + chrono::nanoseconds(973)));
826 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
827 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
828 e + chrono::nanoseconds(973)));
829 }
830
831 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
832 ASSERT_EQ(parts.size(), 1u);
833
834 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
835
836 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
837
838 std::deque<Message> output;
839
840 for (int i = 0; i < 4; ++i) {
841 ASSERT_TRUE(merger.Front() != nullptr);
842 output.emplace_back(std::move(*merger.Front()));
843 merger.PopFront();
844 }
845 ASSERT_TRUE(merger.Front() == nullptr);
846
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700847 EXPECT_EQ(output[0].timestamp.boot, 0u);
848 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800849 EXPECT_FALSE(output[0].data.message().has_monotonic_timestamp_time());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700850
851 EXPECT_EQ(output[1].timestamp.boot, 0u);
852 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800853 EXPECT_TRUE(output[1].data.message().has_monotonic_timestamp_time());
854 EXPECT_EQ(output[1].data.message().monotonic_timestamp_time(), 971);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700855
856 EXPECT_EQ(output[2].timestamp.boot, 0u);
857 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800858 EXPECT_TRUE(output[2].data.message().has_monotonic_timestamp_time());
859 EXPECT_EQ(output[2].data.message().monotonic_timestamp_time(), 972);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700860
861 EXPECT_EQ(output[3].timestamp.boot, 0u);
862 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800863 EXPECT_TRUE(output[3].data.message().has_monotonic_timestamp_time());
864 EXPECT_EQ(output[3].data.message().monotonic_timestamp_time(), 973);
865}
866
Austin Schuhd2f96102020-12-01 20:27:29 -0800867// Tests that we can match timestamps on delivered messages.
868TEST_F(TimestampMapperTest, ReadNode0First) {
869 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
870 {
871 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
872 writer0.QueueSpan(config0_.span());
873 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
874 writer1.QueueSpan(config2_.span());
875
876 writer0.QueueSizedFlatbuffer(
877 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
878 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
879 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
880
881 writer0.QueueSizedFlatbuffer(
882 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
883 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
884 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
885
886 writer0.QueueSizedFlatbuffer(
887 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
888 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
889 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
890 }
891
892 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
893
894 ASSERT_EQ(parts[0].logger_node, "pi1");
895 ASSERT_EQ(parts[1].logger_node, "pi2");
896
Austin Schuh79b30942021-01-24 22:32:21 -0800897 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800898 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -0800899 mapper0.set_timestamp_callback(
900 [&](TimestampedMessage *) { ++mapper0_count; });
901 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800902 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -0800903 mapper1.set_timestamp_callback(
904 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -0800905
906 mapper0.AddPeer(&mapper1);
907 mapper1.AddPeer(&mapper0);
908
909 {
910 std::deque<TimestampedMessage> output0;
911
Austin Schuh79b30942021-01-24 22:32:21 -0800912 EXPECT_EQ(mapper0_count, 0u);
913 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800914 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800915 EXPECT_EQ(mapper0_count, 1u);
916 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800917 output0.emplace_back(std::move(*mapper0.Front()));
918 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700919 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800920 EXPECT_EQ(mapper0_count, 1u);
921 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800922
923 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800924 EXPECT_EQ(mapper0_count, 2u);
925 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800926 output0.emplace_back(std::move(*mapper0.Front()));
927 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700928 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800929
930 ASSERT_TRUE(mapper0.Front() != nullptr);
931 output0.emplace_back(std::move(*mapper0.Front()));
932 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700933 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800934
Austin Schuh79b30942021-01-24 22:32:21 -0800935 EXPECT_EQ(mapper0_count, 3u);
936 EXPECT_EQ(mapper1_count, 0u);
937
Austin Schuhd2f96102020-12-01 20:27:29 -0800938 ASSERT_TRUE(mapper0.Front() == nullptr);
939
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700940 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
941 EXPECT_EQ(output0[0].monotonic_event_time.time,
942 e + chrono::milliseconds(1000));
Austin Schuhd2f96102020-12-01 20:27:29 -0800943 EXPECT_TRUE(output0[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700944
945 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
946 EXPECT_EQ(output0[1].monotonic_event_time.time,
947 e + chrono::milliseconds(2000));
Austin Schuhd2f96102020-12-01 20:27:29 -0800948 EXPECT_TRUE(output0[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700949
950 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
951 EXPECT_EQ(output0[2].monotonic_event_time.time,
952 e + chrono::milliseconds(3000));
Austin Schuhd2f96102020-12-01 20:27:29 -0800953 EXPECT_TRUE(output0[2].data.Verify());
954 }
955
956 {
957 SCOPED_TRACE("Trying node1 now");
958 std::deque<TimestampedMessage> output1;
959
Austin Schuh79b30942021-01-24 22:32:21 -0800960 EXPECT_EQ(mapper0_count, 3u);
961 EXPECT_EQ(mapper1_count, 0u);
962
Austin Schuhd2f96102020-12-01 20:27:29 -0800963 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800964 EXPECT_EQ(mapper0_count, 3u);
965 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800966 output1.emplace_back(std::move(*mapper1.Front()));
967 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700968 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800969 EXPECT_EQ(mapper0_count, 3u);
970 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800971
972 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800973 EXPECT_EQ(mapper0_count, 3u);
974 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800975 output1.emplace_back(std::move(*mapper1.Front()));
976 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700977 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800978
979 ASSERT_TRUE(mapper1.Front() != nullptr);
980 output1.emplace_back(std::move(*mapper1.Front()));
981 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700982 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800983
Austin Schuh79b30942021-01-24 22:32:21 -0800984 EXPECT_EQ(mapper0_count, 3u);
985 EXPECT_EQ(mapper1_count, 3u);
986
Austin Schuhd2f96102020-12-01 20:27:29 -0800987 ASSERT_TRUE(mapper1.Front() == nullptr);
988
Austin Schuh79b30942021-01-24 22:32:21 -0800989 EXPECT_EQ(mapper0_count, 3u);
990 EXPECT_EQ(mapper1_count, 3u);
991
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700992 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
993 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -0800994 e + chrono::seconds(100) + chrono::milliseconds(1000));
995 EXPECT_TRUE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700996
997 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
998 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -0800999 e + chrono::seconds(100) + chrono::milliseconds(2000));
1000 EXPECT_TRUE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001001
1002 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1003 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001004 e + chrono::seconds(100) + chrono::milliseconds(3000));
1005 EXPECT_TRUE(output1[2].data.Verify());
1006 }
1007}
1008
Austin Schuh8bf1e632021-01-02 22:41:04 -08001009// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
1010// returned.
1011TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
1012 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1013 {
1014 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1015 writer0.QueueSpan(config0_.span());
1016 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1017 writer1.QueueSpan(config4_.span());
1018
1019 writer0.QueueSizedFlatbuffer(
1020 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1021 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1022 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
1023 e + chrono::nanoseconds(971)));
1024
1025 writer0.QueueSizedFlatbuffer(
1026 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1027 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1028 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
1029 e + chrono::nanoseconds(5458)));
1030
1031 writer0.QueueSizedFlatbuffer(
1032 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1033 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1034 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1035 }
1036
1037 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1038
1039 for (const auto &p : parts) {
1040 LOG(INFO) << p;
1041 }
1042
1043 ASSERT_EQ(parts.size(), 1u);
1044
Austin Schuh79b30942021-01-24 22:32:21 -08001045 size_t mapper0_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001046 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001047 mapper0.set_timestamp_callback(
1048 [&](TimestampedMessage *) { ++mapper0_count; });
1049 size_t mapper1_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001050 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001051 mapper1.set_timestamp_callback(
1052 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -08001053
1054 mapper0.AddPeer(&mapper1);
1055 mapper1.AddPeer(&mapper0);
1056
1057 {
1058 std::deque<TimestampedMessage> output0;
1059
1060 for (int i = 0; i < 3; ++i) {
1061 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
1062 output0.emplace_back(std::move(*mapper0.Front()));
1063 mapper0.PopFront();
1064 }
1065
1066 ASSERT_TRUE(mapper0.Front() == nullptr);
1067
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001068 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1069 EXPECT_EQ(output0[0].monotonic_event_time.time,
1070 e + chrono::milliseconds(1000));
1071 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
1072 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
1073 monotonic_clock::min_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001074 EXPECT_TRUE(output0[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001075
1076 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1077 EXPECT_EQ(output0[1].monotonic_event_time.time,
1078 e + chrono::milliseconds(2000));
1079 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
1080 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
1081 monotonic_clock::min_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001082 EXPECT_TRUE(output0[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001083
1084 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1085 EXPECT_EQ(output0[2].monotonic_event_time.time,
1086 e + chrono::milliseconds(3000));
1087 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
1088 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
1089 monotonic_clock::min_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001090 EXPECT_TRUE(output0[2].data.Verify());
1091 }
1092
1093 {
1094 SCOPED_TRACE("Trying node1 now");
1095 std::deque<TimestampedMessage> output1;
1096
1097 for (int i = 0; i < 3; ++i) {
1098 ASSERT_TRUE(mapper1.Front() != nullptr);
1099 output1.emplace_back(std::move(*mapper1.Front()));
1100 mapper1.PopFront();
1101 }
1102
1103 ASSERT_TRUE(mapper1.Front() == nullptr);
1104
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001105 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1106 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001107 e + chrono::seconds(100) + chrono::milliseconds(1000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001108 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
1109 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001110 e + chrono::nanoseconds(971));
1111 EXPECT_TRUE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001112
1113 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1114 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001115 e + chrono::seconds(100) + chrono::milliseconds(2000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001116 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
1117 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001118 e + chrono::nanoseconds(5458));
1119 EXPECT_TRUE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001120
1121 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1122 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001123 e + chrono::seconds(100) + chrono::milliseconds(3000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001124 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
1125 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
1126 monotonic_clock::min_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001127 EXPECT_TRUE(output1[2].data.Verify());
1128 }
Austin Schuh79b30942021-01-24 22:32:21 -08001129
1130 EXPECT_EQ(mapper0_count, 3u);
1131 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001132}
1133
Austin Schuhd2f96102020-12-01 20:27:29 -08001134// Tests that we can match timestamps on delivered messages. By doing this in
1135// the reverse order, the second node needs to queue data up from the first node
1136// to find the matching timestamp.
1137TEST_F(TimestampMapperTest, ReadNode1First) {
1138 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1139 {
1140 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1141 writer0.QueueSpan(config0_.span());
1142 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1143 writer1.QueueSpan(config2_.span());
1144
1145 writer0.QueueSizedFlatbuffer(
1146 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1147 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1148 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1149
1150 writer0.QueueSizedFlatbuffer(
1151 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1152 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1153 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1154
1155 writer0.QueueSizedFlatbuffer(
1156 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1157 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1158 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1159 }
1160
1161 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1162
1163 ASSERT_EQ(parts[0].logger_node, "pi1");
1164 ASSERT_EQ(parts[1].logger_node, "pi2");
1165
Austin Schuh79b30942021-01-24 22:32:21 -08001166 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001167 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001168 mapper0.set_timestamp_callback(
1169 [&](TimestampedMessage *) { ++mapper0_count; });
1170 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001171 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001172 mapper1.set_timestamp_callback(
1173 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001174
1175 mapper0.AddPeer(&mapper1);
1176 mapper1.AddPeer(&mapper0);
1177
1178 {
1179 SCOPED_TRACE("Trying node1 now");
1180 std::deque<TimestampedMessage> output1;
1181
1182 ASSERT_TRUE(mapper1.Front() != nullptr);
1183 output1.emplace_back(std::move(*mapper1.Front()));
1184 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001185 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001186
1187 ASSERT_TRUE(mapper1.Front() != nullptr);
1188 output1.emplace_back(std::move(*mapper1.Front()));
1189 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001190 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001191
1192 ASSERT_TRUE(mapper1.Front() != nullptr);
1193 output1.emplace_back(std::move(*mapper1.Front()));
1194 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001195 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001196
1197 ASSERT_TRUE(mapper1.Front() == nullptr);
1198
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001199 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1200 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001201 e + chrono::seconds(100) + chrono::milliseconds(1000));
1202 EXPECT_TRUE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001203
1204 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1205 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001206 e + chrono::seconds(100) + chrono::milliseconds(2000));
1207 EXPECT_TRUE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001208
1209 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1210 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001211 e + chrono::seconds(100) + chrono::milliseconds(3000));
1212 EXPECT_TRUE(output1[2].data.Verify());
1213 }
1214
1215 {
1216 std::deque<TimestampedMessage> output0;
1217
1218 ASSERT_TRUE(mapper0.Front() != nullptr);
1219 output0.emplace_back(std::move(*mapper0.Front()));
1220 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001221 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001222
1223 ASSERT_TRUE(mapper0.Front() != nullptr);
1224 output0.emplace_back(std::move(*mapper0.Front()));
1225 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001226 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001227
1228 ASSERT_TRUE(mapper0.Front() != nullptr);
1229 output0.emplace_back(std::move(*mapper0.Front()));
1230 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001231 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001232
1233 ASSERT_TRUE(mapper0.Front() == nullptr);
1234
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001235 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1236 EXPECT_EQ(output0[0].monotonic_event_time.time,
1237 e + chrono::milliseconds(1000));
Austin Schuhd2f96102020-12-01 20:27:29 -08001238 EXPECT_TRUE(output0[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001239
1240 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1241 EXPECT_EQ(output0[1].monotonic_event_time.time,
1242 e + chrono::milliseconds(2000));
Austin Schuhd2f96102020-12-01 20:27:29 -08001243 EXPECT_TRUE(output0[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001244
1245 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1246 EXPECT_EQ(output0[2].monotonic_event_time.time,
1247 e + chrono::milliseconds(3000));
Austin Schuhd2f96102020-12-01 20:27:29 -08001248 EXPECT_TRUE(output0[2].data.Verify());
1249 }
Austin Schuh79b30942021-01-24 22:32:21 -08001250
1251 EXPECT_EQ(mapper0_count, 3u);
1252 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001253}
1254
1255// Tests that we return just the timestamps if we couldn't find the data and the
1256// missing data was at the beginning of the file.
1257TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1258 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1259 {
1260 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1261 writer0.QueueSpan(config0_.span());
1262 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1263 writer1.QueueSpan(config2_.span());
1264
1265 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
1266 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1267 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1268
1269 writer0.QueueSizedFlatbuffer(
1270 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1271 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1272 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1273
1274 writer0.QueueSizedFlatbuffer(
1275 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1276 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1277 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1278 }
1279
1280 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1281
1282 ASSERT_EQ(parts[0].logger_node, "pi1");
1283 ASSERT_EQ(parts[1].logger_node, "pi2");
1284
Austin Schuh79b30942021-01-24 22:32:21 -08001285 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001286 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001287 mapper0.set_timestamp_callback(
1288 [&](TimestampedMessage *) { ++mapper0_count; });
1289 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001290 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001291 mapper1.set_timestamp_callback(
1292 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001293
1294 mapper0.AddPeer(&mapper1);
1295 mapper1.AddPeer(&mapper0);
1296
1297 {
1298 SCOPED_TRACE("Trying node1 now");
1299 std::deque<TimestampedMessage> output1;
1300
1301 ASSERT_TRUE(mapper1.Front() != nullptr);
1302 output1.emplace_back(std::move(*mapper1.Front()));
1303 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001304 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001305
1306 ASSERT_TRUE(mapper1.Front() != nullptr);
1307 output1.emplace_back(std::move(*mapper1.Front()));
1308 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001309 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001310
1311 ASSERT_TRUE(mapper1.Front() != nullptr);
1312 output1.emplace_back(std::move(*mapper1.Front()));
1313 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001314 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001315
1316 ASSERT_TRUE(mapper1.Front() == nullptr);
1317
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001318 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1319 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001320 e + chrono::seconds(100) + chrono::milliseconds(1000));
1321 EXPECT_FALSE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001322
1323 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1324 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001325 e + chrono::seconds(100) + chrono::milliseconds(2000));
1326 EXPECT_TRUE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001327
1328 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1329 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001330 e + chrono::seconds(100) + chrono::milliseconds(3000));
1331 EXPECT_TRUE(output1[2].data.Verify());
1332 }
Austin Schuh79b30942021-01-24 22:32:21 -08001333
1334 EXPECT_EQ(mapper0_count, 0u);
1335 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001336}
1337
1338// Tests that we return just the timestamps if we couldn't find the data and the
1339// missing data was at the end of the file.
1340TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1341 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1342 {
1343 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1344 writer0.QueueSpan(config0_.span());
1345 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1346 writer1.QueueSpan(config2_.span());
1347
1348 writer0.QueueSizedFlatbuffer(
1349 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1350 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1351 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1352
1353 writer0.QueueSizedFlatbuffer(
1354 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1355 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1356 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1357
1358 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
1359 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1360 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1361 }
1362
1363 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1364
1365 ASSERT_EQ(parts[0].logger_node, "pi1");
1366 ASSERT_EQ(parts[1].logger_node, "pi2");
1367
Austin Schuh79b30942021-01-24 22:32:21 -08001368 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001369 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001370 mapper0.set_timestamp_callback(
1371 [&](TimestampedMessage *) { ++mapper0_count; });
1372 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001373 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001374 mapper1.set_timestamp_callback(
1375 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001376
1377 mapper0.AddPeer(&mapper1);
1378 mapper1.AddPeer(&mapper0);
1379
1380 {
1381 SCOPED_TRACE("Trying node1 now");
1382 std::deque<TimestampedMessage> output1;
1383
1384 ASSERT_TRUE(mapper1.Front() != nullptr);
1385 output1.emplace_back(std::move(*mapper1.Front()));
1386 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001387 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001388
1389 ASSERT_TRUE(mapper1.Front() != nullptr);
1390 output1.emplace_back(std::move(*mapper1.Front()));
1391 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001392 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001393
1394 ASSERT_TRUE(mapper1.Front() != nullptr);
1395 output1.emplace_back(std::move(*mapper1.Front()));
1396 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001397 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001398
1399 ASSERT_TRUE(mapper1.Front() == nullptr);
1400
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001401 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1402 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001403 e + chrono::seconds(100) + chrono::milliseconds(1000));
1404 EXPECT_TRUE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001405
1406 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1407 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001408 e + chrono::seconds(100) + chrono::milliseconds(2000));
1409 EXPECT_TRUE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001410
1411 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1412 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001413 e + chrono::seconds(100) + chrono::milliseconds(3000));
1414 EXPECT_FALSE(output1[2].data.Verify());
1415 }
Austin Schuh79b30942021-01-24 22:32:21 -08001416
1417 EXPECT_EQ(mapper0_count, 0u);
1418 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001419}
1420
Austin Schuh993ccb52020-12-12 15:59:32 -08001421// Tests that we handle a message which failed to forward or be logged.
1422TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1423 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1424 {
1425 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1426 writer0.QueueSpan(config0_.span());
1427 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1428 writer1.QueueSpan(config2_.span());
1429
1430 writer0.QueueSizedFlatbuffer(
1431 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1432 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1433 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1434
1435 // Create both the timestamp and message, but don't log them, simulating a
1436 // forwarding drop.
1437 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1438 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1439 chrono::seconds(100));
1440
1441 writer0.QueueSizedFlatbuffer(
1442 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1443 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1444 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1445 }
1446
1447 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1448
1449 ASSERT_EQ(parts[0].logger_node, "pi1");
1450 ASSERT_EQ(parts[1].logger_node, "pi2");
1451
Austin Schuh79b30942021-01-24 22:32:21 -08001452 size_t mapper0_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001453 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001454 mapper0.set_timestamp_callback(
1455 [&](TimestampedMessage *) { ++mapper0_count; });
1456 size_t mapper1_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001457 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001458 mapper1.set_timestamp_callback(
1459 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001460
1461 mapper0.AddPeer(&mapper1);
1462 mapper1.AddPeer(&mapper0);
1463
1464 {
1465 std::deque<TimestampedMessage> output1;
1466
1467 ASSERT_TRUE(mapper1.Front() != nullptr);
1468 output1.emplace_back(std::move(*mapper1.Front()));
1469 mapper1.PopFront();
1470
1471 ASSERT_TRUE(mapper1.Front() != nullptr);
1472 output1.emplace_back(std::move(*mapper1.Front()));
1473
1474 ASSERT_FALSE(mapper1.Front() == nullptr);
1475
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001476 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1477 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001478 e + chrono::seconds(100) + chrono::milliseconds(1000));
1479 EXPECT_TRUE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001480
1481 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1482 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001483 e + chrono::seconds(100) + chrono::milliseconds(3000));
1484 EXPECT_TRUE(output1[1].data.Verify());
1485 }
Austin Schuh79b30942021-01-24 22:32:21 -08001486
1487 EXPECT_EQ(mapper0_count, 0u);
1488 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001489}
1490
Austin Schuhd2f96102020-12-01 20:27:29 -08001491// Tests that we properly sort log files with duplicate timestamps.
1492TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1493 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1494 {
1495 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1496 writer0.QueueSpan(config0_.span());
1497 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1498 writer1.QueueSpan(config2_.span());
1499
1500 writer0.QueueSizedFlatbuffer(
1501 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1502 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1503 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1504
1505 writer0.QueueSizedFlatbuffer(
1506 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1507 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1508 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1509
1510 writer0.QueueSizedFlatbuffer(
1511 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
1512 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1513 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1514
1515 writer0.QueueSizedFlatbuffer(
1516 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
1517 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1518 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1519 }
1520
1521 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1522
1523 ASSERT_EQ(parts[0].logger_node, "pi1");
1524 ASSERT_EQ(parts[1].logger_node, "pi2");
1525
Austin Schuh79b30942021-01-24 22:32:21 -08001526 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001527 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001528 mapper0.set_timestamp_callback(
1529 [&](TimestampedMessage *) { ++mapper0_count; });
1530 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001531 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001532 mapper1.set_timestamp_callback(
1533 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001534
1535 mapper0.AddPeer(&mapper1);
1536 mapper1.AddPeer(&mapper0);
1537
1538 {
1539 SCOPED_TRACE("Trying node1 now");
1540 std::deque<TimestampedMessage> output1;
1541
1542 for (int i = 0; i < 4; ++i) {
1543 ASSERT_TRUE(mapper1.Front() != nullptr);
1544 output1.emplace_back(std::move(*mapper1.Front()));
1545 mapper1.PopFront();
1546 }
1547 ASSERT_TRUE(mapper1.Front() == nullptr);
1548
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001549 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1550 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001551 e + chrono::seconds(100) + chrono::milliseconds(1000));
1552 EXPECT_TRUE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001553
1554 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1555 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001556 e + chrono::seconds(100) + chrono::milliseconds(2000));
1557 EXPECT_TRUE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001558
1559 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1560 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001561 e + chrono::seconds(100) + chrono::milliseconds(2000));
1562 EXPECT_TRUE(output1[2].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001563
1564 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1565 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001566 e + chrono::seconds(100) + chrono::milliseconds(3000));
1567 EXPECT_TRUE(output1[3].data.Verify());
1568 }
Austin Schuh79b30942021-01-24 22:32:21 -08001569
1570 EXPECT_EQ(mapper0_count, 0u);
1571 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001572}
1573
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001574// Tests that we properly produce a valid start time.
Austin Schuhd2f96102020-12-01 20:27:29 -08001575TEST_F(TimestampMapperTest, StartTime) {
1576 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1577 {
1578 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1579 writer0.QueueSpan(config0_.span());
1580 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1581 writer1.QueueSpan(config1_.span());
1582 DetachedBufferWriter writer2(logfile2_, std::make_unique<DummyEncoder>());
1583 writer2.QueueSpan(config3_.span());
1584 }
1585
1586 const std::vector<LogFile> parts =
1587 SortParts({logfile0_, logfile1_, logfile2_});
1588
Austin Schuh79b30942021-01-24 22:32:21 -08001589 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001590 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001591 mapper0.set_timestamp_callback(
1592 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001593
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001594 EXPECT_EQ(mapper0.monotonic_start_time(0), e + chrono::milliseconds(1));
1595 EXPECT_EQ(mapper0.realtime_start_time(0),
Austin Schuhd2f96102020-12-01 20:27:29 -08001596 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001597 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001598}
1599
Austin Schuhfecf1d82020-12-19 16:57:28 -08001600// Tests that when a peer isn't registered, we treat that as if there was no
1601// data available.
1602TEST_F(TimestampMapperTest, NoPeer) {
1603 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1604 {
1605 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1606 writer0.QueueSpan(config0_.span());
1607 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1608 writer1.QueueSpan(config2_.span());
1609
1610 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
1611 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1612 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1613
1614 writer0.QueueSizedFlatbuffer(
1615 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1616 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1617 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1618
1619 writer0.QueueSizedFlatbuffer(
1620 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1621 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1622 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1623 }
1624
1625 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1626
1627 ASSERT_EQ(parts[0].logger_node, "pi1");
1628 ASSERT_EQ(parts[1].logger_node, "pi2");
1629
Austin Schuh79b30942021-01-24 22:32:21 -08001630 size_t mapper1_count = 0;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001631 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001632 mapper1.set_timestamp_callback(
1633 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001634
1635 {
1636 std::deque<TimestampedMessage> output1;
1637
1638 ASSERT_TRUE(mapper1.Front() != nullptr);
1639 output1.emplace_back(std::move(*mapper1.Front()));
1640 mapper1.PopFront();
1641 ASSERT_TRUE(mapper1.Front() != nullptr);
1642 output1.emplace_back(std::move(*mapper1.Front()));
1643 mapper1.PopFront();
1644 ASSERT_TRUE(mapper1.Front() != nullptr);
1645 output1.emplace_back(std::move(*mapper1.Front()));
1646 mapper1.PopFront();
1647 ASSERT_TRUE(mapper1.Front() == nullptr);
1648
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001649 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1650 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001651 e + chrono::seconds(100) + chrono::milliseconds(1000));
1652 EXPECT_FALSE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001653
1654 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1655 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001656 e + chrono::seconds(100) + chrono::milliseconds(2000));
1657 EXPECT_FALSE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001658
1659 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1660 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001661 e + chrono::seconds(100) + chrono::milliseconds(3000));
1662 EXPECT_FALSE(output1[2].data.Verify());
1663 }
Austin Schuh79b30942021-01-24 22:32:21 -08001664 EXPECT_EQ(mapper1_count, 3u);
1665}
1666
1667// Tests that we can queue messages and call the timestamp callback for both
1668// nodes.
1669TEST_F(TimestampMapperTest, QueueUntilNode0) {
1670 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1671 {
1672 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1673 writer0.QueueSpan(config0_.span());
1674 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1675 writer1.QueueSpan(config2_.span());
1676
1677 writer0.QueueSizedFlatbuffer(
1678 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1679 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1680 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1681
1682 writer0.QueueSizedFlatbuffer(
1683 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
1684 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1685 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1686
1687 writer0.QueueSizedFlatbuffer(
1688 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
1689 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1690 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1691
1692 writer0.QueueSizedFlatbuffer(
1693 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
1694 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1695 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1696 }
1697
1698 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1699
1700 ASSERT_EQ(parts[0].logger_node, "pi1");
1701 ASSERT_EQ(parts[1].logger_node, "pi2");
1702
1703 size_t mapper0_count = 0;
1704 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1705 mapper0.set_timestamp_callback(
1706 [&](TimestampedMessage *) { ++mapper0_count; });
1707 size_t mapper1_count = 0;
1708 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1709 mapper1.set_timestamp_callback(
1710 [&](TimestampedMessage *) { ++mapper1_count; });
1711
1712 mapper0.AddPeer(&mapper1);
1713 mapper1.AddPeer(&mapper0);
1714
1715 {
1716 std::deque<TimestampedMessage> output0;
1717
1718 EXPECT_EQ(mapper0_count, 0u);
1719 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001720 mapper0.QueueUntil(
1721 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001722 EXPECT_EQ(mapper0_count, 3u);
1723 EXPECT_EQ(mapper1_count, 0u);
1724
1725 ASSERT_TRUE(mapper0.Front() != nullptr);
1726 EXPECT_EQ(mapper0_count, 3u);
1727 EXPECT_EQ(mapper1_count, 0u);
1728
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001729 mapper0.QueueUntil(
1730 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001731 EXPECT_EQ(mapper0_count, 3u);
1732 EXPECT_EQ(mapper1_count, 0u);
1733
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001734 mapper0.QueueUntil(
1735 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001736 EXPECT_EQ(mapper0_count, 4u);
1737 EXPECT_EQ(mapper1_count, 0u);
1738
1739 output0.emplace_back(std::move(*mapper0.Front()));
1740 mapper0.PopFront();
1741 output0.emplace_back(std::move(*mapper0.Front()));
1742 mapper0.PopFront();
1743 output0.emplace_back(std::move(*mapper0.Front()));
1744 mapper0.PopFront();
1745 output0.emplace_back(std::move(*mapper0.Front()));
1746 mapper0.PopFront();
1747
1748 EXPECT_EQ(mapper0_count, 4u);
1749 EXPECT_EQ(mapper1_count, 0u);
1750
1751 ASSERT_TRUE(mapper0.Front() == nullptr);
1752
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001753 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1754 EXPECT_EQ(output0[0].monotonic_event_time.time,
1755 e + chrono::milliseconds(1000));
Austin Schuh79b30942021-01-24 22:32:21 -08001756 EXPECT_TRUE(output0[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001757
1758 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1759 EXPECT_EQ(output0[1].monotonic_event_time.time,
1760 e + chrono::milliseconds(1000));
Austin Schuh79b30942021-01-24 22:32:21 -08001761 EXPECT_TRUE(output0[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001762
1763 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1764 EXPECT_EQ(output0[2].monotonic_event_time.time,
1765 e + chrono::milliseconds(2000));
Austin Schuh79b30942021-01-24 22:32:21 -08001766 EXPECT_TRUE(output0[2].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001767
1768 EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
1769 EXPECT_EQ(output0[3].monotonic_event_time.time,
1770 e + chrono::milliseconds(3000));
Austin Schuh79b30942021-01-24 22:32:21 -08001771 EXPECT_TRUE(output0[3].data.Verify());
1772 }
1773
1774 {
1775 SCOPED_TRACE("Trying node1 now");
1776 std::deque<TimestampedMessage> output1;
1777
1778 EXPECT_EQ(mapper0_count, 4u);
1779 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001780 mapper1.QueueUntil(BootTimestamp{
1781 .boot = 0,
1782 .time = e + chrono::seconds(100) + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001783 EXPECT_EQ(mapper0_count, 4u);
1784 EXPECT_EQ(mapper1_count, 3u);
1785
1786 ASSERT_TRUE(mapper1.Front() != nullptr);
1787 EXPECT_EQ(mapper0_count, 4u);
1788 EXPECT_EQ(mapper1_count, 3u);
1789
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001790 mapper1.QueueUntil(BootTimestamp{
1791 .boot = 0,
1792 .time = e + chrono::seconds(100) + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001793 EXPECT_EQ(mapper0_count, 4u);
1794 EXPECT_EQ(mapper1_count, 3u);
1795
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001796 mapper1.QueueUntil(BootTimestamp{
1797 .boot = 0,
1798 .time = e + chrono::seconds(100) + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001799 EXPECT_EQ(mapper0_count, 4u);
1800 EXPECT_EQ(mapper1_count, 4u);
1801
1802 ASSERT_TRUE(mapper1.Front() != nullptr);
1803 EXPECT_EQ(mapper0_count, 4u);
1804 EXPECT_EQ(mapper1_count, 4u);
1805
1806 output1.emplace_back(std::move(*mapper1.Front()));
1807 mapper1.PopFront();
1808 ASSERT_TRUE(mapper1.Front() != nullptr);
1809 output1.emplace_back(std::move(*mapper1.Front()));
1810 mapper1.PopFront();
1811 ASSERT_TRUE(mapper1.Front() != nullptr);
1812 output1.emplace_back(std::move(*mapper1.Front()));
1813 mapper1.PopFront();
1814 ASSERT_TRUE(mapper1.Front() != nullptr);
1815 output1.emplace_back(std::move(*mapper1.Front()));
1816 mapper1.PopFront();
1817
1818 EXPECT_EQ(mapper0_count, 4u);
1819 EXPECT_EQ(mapper1_count, 4u);
1820
1821 ASSERT_TRUE(mapper1.Front() == nullptr);
1822
1823 EXPECT_EQ(mapper0_count, 4u);
1824 EXPECT_EQ(mapper1_count, 4u);
1825
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001826 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1827 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001828 e + chrono::seconds(100) + chrono::milliseconds(1000));
1829 EXPECT_TRUE(output1[0].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001830
1831 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1832 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001833 e + chrono::seconds(100) + chrono::milliseconds(1000));
1834 EXPECT_TRUE(output1[1].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001835
1836 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1837 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001838 e + chrono::seconds(100) + chrono::milliseconds(2000));
1839 EXPECT_TRUE(output1[2].data.Verify());
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001840
1841 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1842 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001843 e + chrono::seconds(100) + chrono::milliseconds(3000));
1844 EXPECT_TRUE(output1[3].data.Verify());
1845 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08001846}
1847
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001848class BootMergerTest : public SortingElementTest {
1849 public:
1850 BootMergerTest()
1851 : SortingElementTest(),
1852 boot0_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07001853 /* 100ms */
1854 "max_out_of_order_duration": 100000000,
1855 "node": {
1856 "name": "pi2"
1857 },
1858 "logger_node": {
1859 "name": "pi1"
1860 },
1861 "monotonic_start_time": 1000000,
1862 "realtime_start_time": 1000000000000,
1863 "logger_monotonic_start_time": 1000000,
1864 "logger_realtime_start_time": 1000000000000,
1865 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
1866 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
1867 "parts_index": 0,
1868 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
1869 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07001870 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
1871 "boot_uuids": [
1872 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
1873 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
1874 ""
1875 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001876})")),
1877 boot1_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07001878 /* 100ms */
1879 "max_out_of_order_duration": 100000000,
1880 "node": {
1881 "name": "pi2"
1882 },
1883 "logger_node": {
1884 "name": "pi1"
1885 },
1886 "monotonic_start_time": 1000000,
1887 "realtime_start_time": 1000000000000,
1888 "logger_monotonic_start_time": 1000000,
1889 "logger_realtime_start_time": 1000000000000,
1890 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
1891 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
1892 "parts_index": 1,
1893 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
1894 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07001895 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
1896 "boot_uuids": [
1897 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
1898 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
1899 ""
1900 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001901})")) {}
Austin Schuh8bee6882021-06-28 21:03:28 -07001902
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001903 protected:
1904 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
1905 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
1906};
1907
1908// This tests that we can properly sort a multi-node log file which has the old
1909// (and buggy) timestamps in the header, and the non-resetting parts_index.
1910// These make it so we can just bairly figure out what happened first and what
1911// happened second, but not in a way that is robust to multiple nodes rebooting.
1912TEST_F(BootMergerTest, OldReboot) {
Austin Schuh8bee6882021-06-28 21:03:28 -07001913 {
1914 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001915 writer.QueueSpan(boot0_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07001916 }
1917 {
1918 DetachedBufferWriter writer(logfile1_, std::make_unique<DummyEncoder>());
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001919 writer.QueueSpan(boot1_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07001920 }
1921
1922 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1923
1924 ASSERT_EQ(parts.size(), 1u);
1925 ASSERT_EQ(parts[0].parts.size(), 2u);
1926
1927 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
1928 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001929 boot0_.message().source_node_boot_uuid()->string_view());
Austin Schuh8bee6882021-06-28 21:03:28 -07001930
1931 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
1932 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001933 boot1_.message().source_node_boot_uuid()->string_view());
1934}
1935
1936// This tests that we can produce messages ordered across a reboot.
1937TEST_F(BootMergerTest, SortAcrossReboot) {
1938 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1939 {
1940 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
1941 writer.QueueSpan(boot0_.span());
1942 writer.QueueSizedFlatbuffer(
1943 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1944 writer.QueueSizedFlatbuffer(
1945 MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
1946 }
1947 {
1948 DetachedBufferWriter writer(logfile1_, std::make_unique<DummyEncoder>());
1949 writer.QueueSpan(boot1_.span());
1950 writer.QueueSizedFlatbuffer(
1951 MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
1952 writer.QueueSizedFlatbuffer(
1953 MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
1954 }
1955
1956 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1957 ASSERT_EQ(parts.size(), 1u);
1958 ASSERT_EQ(parts[0].parts.size(), 2u);
1959
1960 BootMerger merger(FilterPartsForNode(parts, "pi2"));
1961
1962 EXPECT_EQ(merger.node(), 1u);
1963
1964 std::vector<Message> output;
1965 for (int i = 0; i < 4; ++i) {
1966 ASSERT_TRUE(merger.Front() != nullptr);
1967 output.emplace_back(std::move(*merger.Front()));
1968 merger.PopFront();
1969 }
1970
1971 ASSERT_TRUE(merger.Front() == nullptr);
1972
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001973 EXPECT_EQ(output[0].timestamp.boot, 0u);
1974 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
1975 EXPECT_EQ(output[1].timestamp.boot, 0u);
1976 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(2000));
1977
1978 EXPECT_EQ(output[2].timestamp.boot, 1u);
1979 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(100));
1980 EXPECT_EQ(output[3].timestamp.boot, 1u);
1981 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
Austin Schuh8bee6882021-06-28 21:03:28 -07001982}
1983
Austin Schuh48507722021-07-17 17:29:24 -07001984class RebootTimestampMapperTest : public SortingElementTest {
1985 public:
1986 RebootTimestampMapperTest()
1987 : SortingElementTest(),
1988 boot0a_(MakeHeader(config_, R"({
1989 /* 100ms */
1990 "max_out_of_order_duration": 100000000,
1991 "node": {
1992 "name": "pi1"
1993 },
1994 "logger_node": {
1995 "name": "pi1"
1996 },
1997 "monotonic_start_time": 1000000,
1998 "realtime_start_time": 1000000000000,
1999 "logger_monotonic_start_time": 1000000,
2000 "logger_realtime_start_time": 1000000000000,
2001 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2002 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2003 "parts_index": 0,
2004 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2005 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2006 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2007 "boot_uuids": [
2008 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2009 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2010 ""
2011 ]
2012})")),
2013 boot0b_(MakeHeader(config_, R"({
2014 /* 100ms */
2015 "max_out_of_order_duration": 100000000,
2016 "node": {
2017 "name": "pi1"
2018 },
2019 "logger_node": {
2020 "name": "pi1"
2021 },
2022 "monotonic_start_time": 1000000,
2023 "realtime_start_time": 1000000000000,
2024 "logger_monotonic_start_time": 1000000,
2025 "logger_realtime_start_time": 1000000000000,
2026 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2027 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2028 "parts_index": 1,
2029 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2030 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2031 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2032 "boot_uuids": [
2033 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2034 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2035 ""
2036 ]
2037})")),
2038 boot1a_(MakeHeader(config_, R"({
2039 /* 100ms */
2040 "max_out_of_order_duration": 100000000,
2041 "node": {
2042 "name": "pi2"
2043 },
2044 "logger_node": {
2045 "name": "pi1"
2046 },
2047 "monotonic_start_time": 1000000,
2048 "realtime_start_time": 1000000000000,
2049 "logger_monotonic_start_time": 1000000,
2050 "logger_realtime_start_time": 1000000000000,
2051 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2052 "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
2053 "parts_index": 0,
2054 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2055 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2056 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2057 "boot_uuids": [
2058 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2059 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2060 ""
2061 ]
2062})")),
2063 boot1b_(MakeHeader(config_, R"({
2064 /* 100ms */
2065 "max_out_of_order_duration": 100000000,
2066 "node": {
2067 "name": "pi2"
2068 },
2069 "logger_node": {
2070 "name": "pi1"
2071 },
2072 "monotonic_start_time": 1000000,
2073 "realtime_start_time": 1000000000000,
2074 "logger_monotonic_start_time": 1000000,
2075 "logger_realtime_start_time": 1000000000000,
2076 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2077 "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
2078 "parts_index": 1,
2079 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2080 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2081 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2082 "boot_uuids": [
2083 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2084 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2085 ""
2086 ]
2087})")) {}
2088
2089 protected:
2090 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
2091 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
2092 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
2093 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
2094};
2095
2096
2097// Tests that we can match timestamps on delivered messages in the presence of
2098// reboots on the node receiving timestamps.
2099TEST_F(RebootTimestampMapperTest, ReadNode0First) {
2100 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2101 {
2102 DetachedBufferWriter writer0a(logfile0_, std::make_unique<DummyEncoder>());
2103 writer0a.QueueSpan(boot0a_.span());
2104 DetachedBufferWriter writer0b(logfile1_, std::make_unique<DummyEncoder>());
2105 writer0b.QueueSpan(boot0b_.span());
2106 DetachedBufferWriter writer1a(logfile2_, std::make_unique<DummyEncoder>());
2107 writer1a.QueueSpan(boot1a_.span());
2108 DetachedBufferWriter writer1b(logfile3_, std::make_unique<DummyEncoder>());
2109 writer1b.QueueSpan(boot1b_.span());
2110
2111 writer0a.QueueSizedFlatbuffer(
2112 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
2113 writer1a.QueueSizedFlatbuffer(MakeTimestampMessage(
2114 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
2115 e + chrono::milliseconds(1001)));
2116
2117 writer0b.QueueSizedFlatbuffer(
2118 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
2119 writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
2120 e + chrono::milliseconds(2000), 0, chrono::seconds(20),
2121 e + chrono::milliseconds(2001)));
2122
2123 writer0b.QueueSizedFlatbuffer(
2124 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
2125 writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
2126 e + chrono::milliseconds(3000), 0, chrono::seconds(20),
2127 e + chrono::milliseconds(3001)));
2128 }
2129
2130 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2131
2132 for (const auto &x : parts) {
2133 LOG(INFO) << x;
2134 }
2135 ASSERT_EQ(parts.size(), 1u);
2136 ASSERT_EQ(parts[0].logger_node, "pi1");
2137
2138 size_t mapper0_count = 0;
2139 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2140 mapper0.set_timestamp_callback(
2141 [&](TimestampedMessage *) { ++mapper0_count; });
2142 size_t mapper1_count = 0;
2143 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2144 mapper1.set_timestamp_callback(
2145 [&](TimestampedMessage *) { ++mapper1_count; });
2146
2147 mapper0.AddPeer(&mapper1);
2148 mapper1.AddPeer(&mapper0);
2149
2150 {
2151 std::deque<TimestampedMessage> output0;
2152
2153 EXPECT_EQ(mapper0_count, 0u);
2154 EXPECT_EQ(mapper1_count, 0u);
2155 ASSERT_TRUE(mapper0.Front() != nullptr);
2156 EXPECT_EQ(mapper0_count, 1u);
2157 EXPECT_EQ(mapper1_count, 0u);
2158 output0.emplace_back(std::move(*mapper0.Front()));
2159 mapper0.PopFront();
2160 EXPECT_TRUE(mapper0.started());
2161 EXPECT_EQ(mapper0_count, 1u);
2162 EXPECT_EQ(mapper1_count, 0u);
2163
2164 ASSERT_TRUE(mapper0.Front() != nullptr);
2165 EXPECT_EQ(mapper0_count, 2u);
2166 EXPECT_EQ(mapper1_count, 0u);
2167 output0.emplace_back(std::move(*mapper0.Front()));
2168 mapper0.PopFront();
2169 EXPECT_TRUE(mapper0.started());
2170
2171 ASSERT_TRUE(mapper0.Front() != nullptr);
2172 output0.emplace_back(std::move(*mapper0.Front()));
2173 mapper0.PopFront();
2174 EXPECT_TRUE(mapper0.started());
2175
2176 EXPECT_EQ(mapper0_count, 3u);
2177 EXPECT_EQ(mapper1_count, 0u);
2178
2179 ASSERT_TRUE(mapper0.Front() == nullptr);
2180
2181 LOG(INFO) << output0[0];
2182 LOG(INFO) << output0[1];
2183 LOG(INFO) << output0[2];
2184
2185 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2186 EXPECT_EQ(output0[0].monotonic_event_time.time,
2187 e + chrono::milliseconds(1000));
2188 EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
2189 EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
2190 EXPECT_TRUE(output0[0].data.Verify());
2191
2192 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2193 EXPECT_EQ(output0[1].monotonic_event_time.time,
2194 e + chrono::milliseconds(2000));
2195 EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
2196 EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
2197 EXPECT_TRUE(output0[1].data.Verify());
2198
2199 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2200 EXPECT_EQ(output0[2].monotonic_event_time.time,
2201 e + chrono::milliseconds(3000));
2202 EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
2203 EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
2204 EXPECT_TRUE(output0[2].data.Verify());
2205 }
2206
2207 {
2208 SCOPED_TRACE("Trying node1 now");
2209 std::deque<TimestampedMessage> output1;
2210
2211 EXPECT_EQ(mapper0_count, 3u);
2212 EXPECT_EQ(mapper1_count, 0u);
2213
2214 ASSERT_TRUE(mapper1.Front() != nullptr);
2215 EXPECT_EQ(mapper0_count, 3u);
2216 EXPECT_EQ(mapper1_count, 1u);
2217 output1.emplace_back(std::move(*mapper1.Front()));
2218 mapper1.PopFront();
2219 EXPECT_TRUE(mapper1.started());
2220 EXPECT_EQ(mapper0_count, 3u);
2221 EXPECT_EQ(mapper1_count, 1u);
2222
2223 ASSERT_TRUE(mapper1.Front() != nullptr);
2224 EXPECT_EQ(mapper0_count, 3u);
2225 EXPECT_EQ(mapper1_count, 2u);
2226 output1.emplace_back(std::move(*mapper1.Front()));
2227 mapper1.PopFront();
2228 EXPECT_TRUE(mapper1.started());
2229
2230 ASSERT_TRUE(mapper1.Front() != nullptr);
2231 output1.emplace_back(std::move(*mapper1.Front()));
2232 mapper1.PopFront();
2233 EXPECT_TRUE(mapper1.started());
2234
2235 EXPECT_EQ(mapper0_count, 3u);
2236 EXPECT_EQ(mapper1_count, 3u);
2237
2238 ASSERT_TRUE(mapper1.Front() == nullptr);
2239
2240 EXPECT_EQ(mapper0_count, 3u);
2241 EXPECT_EQ(mapper1_count, 3u);
2242
2243 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2244 EXPECT_EQ(output1[0].monotonic_event_time.time,
2245 e + chrono::seconds(100) + chrono::milliseconds(1000));
2246 EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
2247 EXPECT_EQ(output1[0].monotonic_remote_time.time,
2248 e + chrono::milliseconds(1000));
2249 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
2250 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
2251 e + chrono::milliseconds(1001));
2252 EXPECT_TRUE(output1[0].data.Verify());
2253
2254 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2255 EXPECT_EQ(output1[1].monotonic_event_time.time,
2256 e + chrono::seconds(20) + chrono::milliseconds(2000));
2257 EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
2258 EXPECT_EQ(output1[1].monotonic_remote_time.time,
2259 e + chrono::milliseconds(2000));
2260 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
2261 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
2262 e + chrono::milliseconds(2001));
2263 EXPECT_TRUE(output1[1].data.Verify());
2264
2265 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2266 EXPECT_EQ(output1[2].monotonic_event_time.time,
2267 e + chrono::seconds(20) + chrono::milliseconds(3000));
2268 EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
2269 EXPECT_EQ(output1[2].monotonic_remote_time.time,
2270 e + chrono::milliseconds(3000));
2271 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
2272 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
2273 e + chrono::milliseconds(3001));
2274 EXPECT_TRUE(output1[2].data.Verify());
2275
2276 LOG(INFO) << output1[0];
2277 LOG(INFO) << output1[1];
2278 LOG(INFO) << output1[2];
2279 }
2280}
2281
2282TEST_F(RebootTimestampMapperTest, Node2Reboot) {
2283 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2284 {
2285 DetachedBufferWriter writer0a(logfile0_, std::make_unique<DummyEncoder>());
2286 writer0a.QueueSpan(boot0a_.span());
2287 DetachedBufferWriter writer0b(logfile1_, std::make_unique<DummyEncoder>());
2288 writer0b.QueueSpan(boot0b_.span());
2289 DetachedBufferWriter writer1a(logfile2_, std::make_unique<DummyEncoder>());
2290 writer1a.QueueSpan(boot1a_.span());
2291 DetachedBufferWriter writer1b(logfile3_, std::make_unique<DummyEncoder>());
2292 writer1b.QueueSpan(boot1b_.span());
2293
2294 writer1a.QueueSizedFlatbuffer(MakeLogMessage(
2295 e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
2296 writer0a.QueueSizedFlatbuffer(MakeTimestampMessage(
2297 e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
2298 chrono::seconds(-100),
2299 e + chrono::seconds(100) + chrono::milliseconds(1001)));
2300
2301 writer1b.QueueSizedFlatbuffer(MakeLogMessage(
2302 e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
2303 writer0b.QueueSizedFlatbuffer(MakeTimestampMessage(
2304 e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
2305 chrono::seconds(-20),
2306 e + chrono::seconds(20) + chrono::milliseconds(2001)));
2307
2308 writer1b.QueueSizedFlatbuffer(MakeLogMessage(
2309 e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
2310 writer0b.QueueSizedFlatbuffer(MakeTimestampMessage(
2311 e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
2312 chrono::seconds(-20),
2313 e + chrono::seconds(20) + chrono::milliseconds(3001)));
2314 }
2315
2316 const std::vector<LogFile> parts =
2317 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2318
2319 for (const auto &x : parts) {
2320 LOG(INFO) << x;
2321 }
2322 ASSERT_EQ(parts.size(), 1u);
2323 ASSERT_EQ(parts[0].logger_node, "pi1");
2324
2325 size_t mapper0_count = 0;
2326 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2327 mapper0.set_timestamp_callback(
2328 [&](TimestampedMessage *) { ++mapper0_count; });
2329 size_t mapper1_count = 0;
2330 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2331 mapper1.set_timestamp_callback(
2332 [&](TimestampedMessage *) { ++mapper1_count; });
2333
2334 mapper0.AddPeer(&mapper1);
2335 mapper1.AddPeer(&mapper0);
2336
2337 {
2338 std::deque<TimestampedMessage> output0;
2339
2340 EXPECT_EQ(mapper0_count, 0u);
2341 EXPECT_EQ(mapper1_count, 0u);
2342 ASSERT_TRUE(mapper0.Front() != nullptr);
2343 EXPECT_EQ(mapper0_count, 1u);
2344 EXPECT_EQ(mapper1_count, 0u);
2345 output0.emplace_back(std::move(*mapper0.Front()));
2346 mapper0.PopFront();
2347 EXPECT_TRUE(mapper0.started());
2348 EXPECT_EQ(mapper0_count, 1u);
2349 EXPECT_EQ(mapper1_count, 0u);
2350
2351 ASSERT_TRUE(mapper0.Front() != nullptr);
2352 EXPECT_EQ(mapper0_count, 2u);
2353 EXPECT_EQ(mapper1_count, 0u);
2354 output0.emplace_back(std::move(*mapper0.Front()));
2355 mapper0.PopFront();
2356 EXPECT_TRUE(mapper0.started());
2357
2358 ASSERT_TRUE(mapper0.Front() != nullptr);
2359 output0.emplace_back(std::move(*mapper0.Front()));
2360 mapper0.PopFront();
2361 EXPECT_TRUE(mapper0.started());
2362
2363 EXPECT_EQ(mapper0_count, 3u);
2364 EXPECT_EQ(mapper1_count, 0u);
2365
2366 ASSERT_TRUE(mapper0.Front() == nullptr);
2367
2368 LOG(INFO) << output0[0];
2369 LOG(INFO) << output0[1];
2370 LOG(INFO) << output0[2];
2371
2372 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2373 EXPECT_EQ(output0[0].monotonic_event_time.time,
2374 e + chrono::milliseconds(1000));
2375 EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
2376 EXPECT_EQ(output0[0].monotonic_remote_time.time,
2377 e + chrono::seconds(100) + chrono::milliseconds(1000));
2378 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
2379 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
2380 e + chrono::seconds(100) + chrono::milliseconds(1001));
2381 EXPECT_TRUE(output0[0].data.Verify());
2382
2383 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2384 EXPECT_EQ(output0[1].monotonic_event_time.time,
2385 e + chrono::milliseconds(2000));
2386 EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
2387 EXPECT_EQ(output0[1].monotonic_remote_time.time,
2388 e + chrono::seconds(20) + chrono::milliseconds(2000));
2389 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
2390 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
2391 e + chrono::seconds(20) + chrono::milliseconds(2001));
2392 EXPECT_TRUE(output0[1].data.Verify());
2393
2394 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2395 EXPECT_EQ(output0[2].monotonic_event_time.time,
2396 e + chrono::milliseconds(3000));
2397 EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
2398 EXPECT_EQ(output0[2].monotonic_remote_time.time,
2399 e + chrono::seconds(20) + chrono::milliseconds(3000));
2400 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
2401 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
2402 e + chrono::seconds(20) + chrono::milliseconds(3001));
2403 EXPECT_TRUE(output0[2].data.Verify());
2404 }
2405
2406 {
2407 SCOPED_TRACE("Trying node1 now");
2408 std::deque<TimestampedMessage> output1;
2409
2410 EXPECT_EQ(mapper0_count, 3u);
2411 EXPECT_EQ(mapper1_count, 0u);
2412
2413 ASSERT_TRUE(mapper1.Front() != nullptr);
2414 EXPECT_EQ(mapper0_count, 3u);
2415 EXPECT_EQ(mapper1_count, 1u);
2416 output1.emplace_back(std::move(*mapper1.Front()));
2417 mapper1.PopFront();
2418 EXPECT_TRUE(mapper1.started());
2419 EXPECT_EQ(mapper0_count, 3u);
2420 EXPECT_EQ(mapper1_count, 1u);
2421
2422 ASSERT_TRUE(mapper1.Front() != nullptr);
2423 EXPECT_EQ(mapper0_count, 3u);
2424 EXPECT_EQ(mapper1_count, 2u);
2425 output1.emplace_back(std::move(*mapper1.Front()));
2426 mapper1.PopFront();
2427 EXPECT_TRUE(mapper1.started());
2428
2429 ASSERT_TRUE(mapper1.Front() != nullptr);
2430 output1.emplace_back(std::move(*mapper1.Front()));
2431 mapper1.PopFront();
2432 EXPECT_TRUE(mapper1.started());
2433
2434 EXPECT_EQ(mapper0_count, 3u);
2435 EXPECT_EQ(mapper1_count, 3u);
2436
2437 ASSERT_TRUE(mapper1.Front() == nullptr);
2438
2439 EXPECT_EQ(mapper0_count, 3u);
2440 EXPECT_EQ(mapper1_count, 3u);
2441
2442 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2443 EXPECT_EQ(output1[0].monotonic_event_time.time,
2444 e + chrono::seconds(100) + chrono::milliseconds(1000));
2445 EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
2446 EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
2447 EXPECT_TRUE(output1[0].data.Verify());
2448
2449 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2450 EXPECT_EQ(output1[1].monotonic_event_time.time,
2451 e + chrono::seconds(20) + chrono::milliseconds(2000));
2452 EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
2453 EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
2454 EXPECT_TRUE(output1[1].data.Verify());
2455
2456 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2457 EXPECT_EQ(output1[2].monotonic_event_time.time,
2458 e + chrono::seconds(20) + chrono::milliseconds(3000));
2459 EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
2460 EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
2461 EXPECT_TRUE(output1[2].data.Verify());
2462
2463 LOG(INFO) << output1[0];
2464 LOG(INFO) << output1[1];
2465 LOG(INFO) << output1[2];
2466 }
2467}
2468
Austin Schuhc243b422020-10-11 15:35:08 -07002469} // namespace testing
2470} // namespace logger
2471} // namespace aos