blob: e1e2ebdd9d71338c6068b627694a44fdd6173db0 [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
4#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07005
Austin Schuhc41603c2020-10-11 16:17:37 -07006#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -07007#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -08008#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -07009#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070010#include "aos/json_to_flatbuffer.h"
11#include "aos/testing/tmpdir.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070012#include "gtest/gtest.h"
Austin Schuhc243b422020-10-11 15:35:08 -070013
14namespace aos {
15namespace logger {
16namespace testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070017namespace chrono = std::chrono;
Austin Schuhc243b422020-10-11 15:35:08 -070018
Austin Schuhe243aaf2020-10-11 15:46:02 -070019// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070020template <typename T>
21SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
22 const std::string_view data) {
23 flatbuffers::FlatBufferBuilder fbb;
24 fbb.ForceDefaults(true);
25 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
26 return fbb.Release();
27}
28
Austin Schuhe243aaf2020-10-11 15:46:02 -070029// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070030TEST(SpanReaderTest, ReadWrite) {
31 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
32 unlink(logfile.c_str());
33
34 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080035 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070036 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080037 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070038
39 {
40 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080041 writer.QueueSpan(m1.span());
42 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070043 }
44
45 SpanReader reader(logfile);
46
47 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhcf5f6442021-07-06 10:43:28 -070048 EXPECT_EQ(reader.PeekMessage(), m1.span());
49 EXPECT_EQ(reader.PeekMessage(), m1.span());
Austin Schuhadd6eb32020-11-09 21:24:26 -080050 EXPECT_EQ(reader.ReadMessage(), m1.span());
51 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhcf5f6442021-07-06 10:43:28 -070052 EXPECT_EQ(reader.PeekMessage(), absl::Span<const uint8_t>());
Austin Schuhc243b422020-10-11 15:35:08 -070053 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
54}
55
Austin Schuhe243aaf2020-10-11 15:46:02 -070056// Tests that we can actually parse the resulting messages at a basic level
57// through MessageReader.
58TEST(MessageReaderTest, ReadWrite) {
59 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
60 unlink(logfile.c_str());
61
62 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
63 JsonToSizedFlatbuffer<LogFileHeader>(
64 R"({ "max_out_of_order_duration": 100000000 })");
65 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
66 JsonToSizedFlatbuffer<MessageHeader>(
67 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
68 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
69 JsonToSizedFlatbuffer<MessageHeader>(
70 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
71
72 {
73 DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -080074 writer.QueueSpan(config.span());
75 writer.QueueSpan(m1.span());
76 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -070077 }
78
79 MessageReader reader(logfile);
80
81 EXPECT_EQ(reader.filename(), logfile);
82
83 EXPECT_EQ(
84 reader.max_out_of_order_duration(),
85 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
86 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
87 EXPECT_TRUE(reader.ReadMessage());
88 EXPECT_EQ(reader.newest_timestamp(),
89 monotonic_clock::time_point(chrono::nanoseconds(1)));
90 EXPECT_TRUE(reader.ReadMessage());
91 EXPECT_EQ(reader.newest_timestamp(),
92 monotonic_clock::time_point(chrono::nanoseconds(2)));
93 EXPECT_FALSE(reader.ReadMessage());
94}
95
Austin Schuh32f68492020-11-08 21:45:51 -080096// Tests that we explode when messages are too far out of order.
97TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
98 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
99 unlink(logfile0.c_str());
100
101 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
102 JsonToSizedFlatbuffer<LogFileHeader>(
103 R"({
104 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800105 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800106 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
107 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
108 "parts_index": 0
109})");
110
111 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
112 JsonToSizedFlatbuffer<MessageHeader>(
113 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
114 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
115 JsonToSizedFlatbuffer<MessageHeader>(
116 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
117 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
118 JsonToSizedFlatbuffer<MessageHeader>(
119 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
120
121 {
122 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800123 writer.QueueSpan(config0.span());
124 writer.QueueSpan(m1.span());
125 writer.QueueSpan(m2.span());
126 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800127 }
128
129 const std::vector<LogFile> parts = SortParts({logfile0});
130
131 PartsMessageReader reader(parts[0].parts[0]);
132
133 EXPECT_TRUE(reader.ReadMessage());
134 EXPECT_TRUE(reader.ReadMessage());
135 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
136}
137
Austin Schuhc41603c2020-10-11 16:17:37 -0700138// Tests that we can transparently re-assemble part files with a
139// PartsMessageReader.
140TEST(PartsMessageReaderTest, ReadWrite) {
141 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
142 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
143 unlink(logfile0.c_str());
144 unlink(logfile1.c_str());
145
146 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
147 JsonToSizedFlatbuffer<LogFileHeader>(
148 R"({
149 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800150 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700151 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
152 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
153 "parts_index": 0
154})");
155 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
156 JsonToSizedFlatbuffer<LogFileHeader>(
157 R"({
158 "max_out_of_order_duration": 200000000,
159 "monotonic_start_time": 0,
160 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800161 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700162 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
163 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
164 "parts_index": 1
165})");
166
167 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
168 JsonToSizedFlatbuffer<MessageHeader>(
169 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
170 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
171 JsonToSizedFlatbuffer<MessageHeader>(
172 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
173
174 {
175 DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800176 writer.QueueSpan(config0.span());
177 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700178 }
179 {
180 DetachedBufferWriter writer(logfile1, std::make_unique<DummyEncoder>());
Austin Schuhadd6eb32020-11-09 21:24:26 -0800181 writer.QueueSpan(config1.span());
182 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700183 }
184
185 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
186
187 PartsMessageReader reader(parts[0].parts[0]);
188
189 EXPECT_EQ(reader.filename(), logfile0);
190
191 // Confirm that the timestamps track, and the filename also updates.
192 // Read the first message.
193 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
194 EXPECT_EQ(
195 reader.max_out_of_order_duration(),
196 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
197 EXPECT_TRUE(reader.ReadMessage());
198 EXPECT_EQ(reader.filename(), logfile0);
199 EXPECT_EQ(reader.newest_timestamp(),
200 monotonic_clock::time_point(chrono::nanoseconds(1)));
201 EXPECT_EQ(
202 reader.max_out_of_order_duration(),
203 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
204
205 // Read the second message.
206 EXPECT_TRUE(reader.ReadMessage());
207 EXPECT_EQ(reader.filename(), logfile1);
208 EXPECT_EQ(reader.newest_timestamp(),
209 monotonic_clock::time_point(chrono::nanoseconds(2)));
210 EXPECT_EQ(
211 reader.max_out_of_order_duration(),
212 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
213
214 // And then confirm that reading again returns no message.
215 EXPECT_FALSE(reader.ReadMessage());
216 EXPECT_EQ(reader.filename(), logfile1);
217 EXPECT_EQ(
218 reader.max_out_of_order_duration(),
219 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800220 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Austin Schuhc41603c2020-10-11 16:17:37 -0700221}
Austin Schuh32f68492020-11-08 21:45:51 -0800222
Austin Schuh1be0ce42020-11-29 22:43:26 -0800223// Tests that Message's operator < works as expected.
224TEST(MessageTest, Sorting) {
225 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
226
227 Message m1{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700228 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700229 .timestamp =
230 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
Austin Schuh48507722021-07-17 17:29:24 -0700231 .monotonic_remote_boot = 0xffffff,
232 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700233 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800234 Message m2{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700235 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700236 .timestamp =
237 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
Austin Schuh48507722021-07-17 17:29:24 -0700238 .monotonic_remote_boot = 0xffffff,
239 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700240 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800241
242 EXPECT_LT(m1, m2);
243 EXPECT_GE(m2, m1);
244
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700245 m1.timestamp.time = e;
246 m2.timestamp.time = e;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800247
248 m1.channel_index = 1;
249 m2.channel_index = 2;
250
251 EXPECT_LT(m1, m2);
252 EXPECT_GE(m2, m1);
253
254 m1.channel_index = 0;
255 m2.channel_index = 0;
Austin Schuh58646e22021-08-23 23:51:46 -0700256 m1.queue_index.index = 0u;
257 m2.queue_index.index = 1u;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800258
259 EXPECT_LT(m1, m2);
260 EXPECT_GE(m2, m1);
261}
262
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800263aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
264 const aos::FlatbufferDetachedBuffer<Configuration> &config,
265 const std::string_view json) {
266 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700267 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800268 flatbuffers::Offset<Configuration> config_offset =
269 aos::CopyFlatBuffer(config, &fbb);
270 LogFileHeader::Builder header_builder(fbb);
271 header_builder.add_configuration(config_offset);
272 fbb.Finish(header_builder.Finish());
273 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
274
275 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
276 JsonToFlatbuffer<LogFileHeader>(json));
277 CHECK(header_updates.Verify());
278 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700279 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800280 fbb2.FinishSizePrefixed(
281 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
282 return fbb2.Release();
283}
284
285class SortingElementTest : public ::testing::Test {
286 public:
287 SortingElementTest()
288 : config_(JsonToFlatbuffer<Configuration>(
289 R"({
290 "channels": [
291 {
292 "name": "/a",
293 "type": "aos.logger.testing.TestMessage",
294 "source_node": "pi1",
295 "destination_nodes": [
296 {
297 "name": "pi2"
298 },
299 {
300 "name": "pi3"
301 }
302 ]
303 },
304 {
305 "name": "/b",
306 "type": "aos.logger.testing.TestMessage",
307 "source_node": "pi1"
308 },
309 {
310 "name": "/c",
311 "type": "aos.logger.testing.TestMessage",
312 "source_node": "pi1"
Austin Schuh48507722021-07-17 17:29:24 -0700313 },
314 {
315 "name": "/d",
316 "type": "aos.logger.testing.TestMessage",
317 "source_node": "pi2",
318 "destination_nodes": [
319 {
320 "name": "pi1"
321 }
322 ]
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800323 }
324 ],
325 "nodes": [
326 {
327 "name": "pi1"
328 },
329 {
330 "name": "pi2"
331 },
332 {
333 "name": "pi3"
334 }
335 ]
336}
337)")),
338 config0_(MakeHeader(config_, R"({
339 /* 100ms */
340 "max_out_of_order_duration": 100000000,
341 "node": {
342 "name": "pi1"
343 },
344 "logger_node": {
345 "name": "pi1"
346 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800347 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800348 "realtime_start_time": 1000000000000,
349 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700350 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
351 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
352 "boot_uuids": [
353 "1d782c63-b3c7-466e-bea9-a01308b43333",
354 "",
355 ""
356 ],
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800357 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
358 "parts_index": 0
359})")),
360 config1_(MakeHeader(config_,
361 R"({
362 /* 100ms */
363 "max_out_of_order_duration": 100000000,
364 "node": {
365 "name": "pi1"
366 },
367 "logger_node": {
368 "name": "pi1"
369 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800370 "monotonic_start_time": 1000000,
371 "realtime_start_time": 1000000000000,
372 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700373 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
374 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
375 "boot_uuids": [
376 "1d782c63-b3c7-466e-bea9-a01308b43333",
377 "",
378 ""
379 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800380 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
381 "parts_index": 0
382})")),
383 config2_(MakeHeader(config_,
384 R"({
385 /* 100ms */
386 "max_out_of_order_duration": 100000000,
387 "node": {
388 "name": "pi2"
389 },
390 "logger_node": {
391 "name": "pi2"
392 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800393 "monotonic_start_time": 0,
394 "realtime_start_time": 1000000000000,
Austin Schuh48507722021-07-17 17:29:24 -0700395 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
396 "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
397 "boot_uuids": [
398 "",
399 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
400 ""
401 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800402 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
403 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
404 "parts_index": 0
405})")),
406 config3_(MakeHeader(config_,
407 R"({
408 /* 100ms */
409 "max_out_of_order_duration": 100000000,
410 "node": {
411 "name": "pi1"
412 },
413 "logger_node": {
414 "name": "pi1"
415 },
416 "monotonic_start_time": 2000000,
417 "realtime_start_time": 1000000000,
418 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
Austin Schuh48507722021-07-17 17:29:24 -0700419 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
420 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
421 "boot_uuids": [
422 "1d782c63-b3c7-466e-bea9-a01308b43333",
423 "",
424 ""
425 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800426 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800427 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800428})")),
429 config4_(MakeHeader(config_,
430 R"({
431 /* 100ms */
432 "max_out_of_order_duration": 100000000,
433 "node": {
434 "name": "pi2"
435 },
436 "logger_node": {
437 "name": "pi1"
438 },
439 "monotonic_start_time": 2000000,
440 "realtime_start_time": 1000000000,
441 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
442 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
Austin Schuh48507722021-07-17 17:29:24 -0700443 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
444 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
445 "boot_uuids": [
446 "1d782c63-b3c7-466e-bea9-a01308b43333",
447 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
448 ""
449 ],
Austin Schuh8bf1e632021-01-02 22:41:04 -0800450 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800451})")) {
452 unlink(logfile0_.c_str());
453 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800454 unlink(logfile2_.c_str());
Austin Schuh48507722021-07-17 17:29:24 -0700455 unlink(logfile3_.c_str());
Austin Schuh921b2562021-07-31 19:37:03 -0700456 queue_index_.resize(config_.message().channels()->size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800457 }
458
459 protected:
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800460 flatbuffers::DetachedBuffer MakeLogMessage(
461 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
462 int value) {
463 flatbuffers::FlatBufferBuilder message_fbb;
464 message_fbb.ForceDefaults(true);
465 TestMessage::Builder test_message_builder(message_fbb);
466 test_message_builder.add_value(value);
467 message_fbb.Finish(test_message_builder.Finish());
468
469 aos::Context context;
470 context.monotonic_event_time = monotonic_now;
471 context.realtime_event_time = aos::realtime_clock::epoch() +
472 chrono::seconds(1000) +
473 monotonic_now.time_since_epoch();
Austin Schuh921b2562021-07-31 19:37:03 -0700474 CHECK_LT(static_cast<size_t>(channel_index), queue_index_.size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800475 context.queue_index = queue_index_[channel_index];
476 context.size = message_fbb.GetSize();
477 context.data = message_fbb.GetBufferPointer();
478
479 ++queue_index_[channel_index];
480
481 flatbuffers::FlatBufferBuilder fbb;
482 fbb.FinishSizePrefixed(
483 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
484
485 return fbb.Release();
486 }
487
488 flatbuffers::DetachedBuffer MakeTimestampMessage(
489 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800490 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
491 monotonic_clock::time_point monotonic_timestamp_time =
492 monotonic_clock::min_time) {
493 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800494 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800495
496 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800497 fbb.ForceDefaults(true);
498
499 logger::MessageHeader::Builder message_header_builder(fbb);
500
501 message_header_builder.add_channel_index(channel_index);
502
503 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
504 100);
505 message_header_builder.add_monotonic_sent_time(
506 monotonic_sent_time.time_since_epoch().count());
507 message_header_builder.add_realtime_sent_time(
508 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
509 monotonic_sent_time.time_since_epoch())
510 .time_since_epoch()
511 .count());
512
513 message_header_builder.add_monotonic_remote_time(
514 sender_monotonic_now.time_since_epoch().count());
515 message_header_builder.add_realtime_remote_time(
516 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
517 sender_monotonic_now.time_since_epoch())
518 .time_since_epoch()
519 .count());
520 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
521 1);
522
523 if (monotonic_timestamp_time != monotonic_clock::min_time) {
524 message_header_builder.add_monotonic_timestamp_time(
525 monotonic_timestamp_time.time_since_epoch().count());
526 }
527
528 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800529 LOG(INFO) << aos::FlatbufferToJson(
530 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
531 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
532
533 return fbb.Release();
534 }
535
536 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
537 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800538 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh48507722021-07-17 17:29:24 -0700539 const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800540
541 const aos::FlatbufferDetachedBuffer<Configuration> config_;
542 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
543 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800544 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
545 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800546 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800547
548 std::vector<uint32_t> queue_index_;
549};
550
551using LogPartsSorterTest = SortingElementTest;
552using LogPartsSorterDeathTest = LogPartsSorterTest;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800553using NodeMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800554using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800555
556// Tests that we can pull messages out of a log sorted in order.
557TEST_F(LogPartsSorterTest, Pull) {
558 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
559 {
560 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
561 writer.QueueSpan(config0_.span());
562 writer.QueueSizedFlatbuffer(
563 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
564 writer.QueueSizedFlatbuffer(
565 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
566 writer.QueueSizedFlatbuffer(
567 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
568 writer.QueueSizedFlatbuffer(
569 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
570 }
571
572 const std::vector<LogFile> parts = SortParts({logfile0_});
573
574 LogPartsSorter parts_sorter(parts[0].parts[0]);
575
576 // Confirm we aren't sorted until any time until the message is popped.
577 // Peeking shouldn't change the sorted until time.
578 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
579
580 std::deque<Message> output;
581
582 ASSERT_TRUE(parts_sorter.Front() != nullptr);
583 output.emplace_back(std::move(*parts_sorter.Front()));
584 parts_sorter.PopFront();
585 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
586
587 ASSERT_TRUE(parts_sorter.Front() != nullptr);
588 output.emplace_back(std::move(*parts_sorter.Front()));
589 parts_sorter.PopFront();
590 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
591
592 ASSERT_TRUE(parts_sorter.Front() != nullptr);
593 output.emplace_back(std::move(*parts_sorter.Front()));
594 parts_sorter.PopFront();
595 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
596
597 ASSERT_TRUE(parts_sorter.Front() != nullptr);
598 output.emplace_back(std::move(*parts_sorter.Front()));
599 parts_sorter.PopFront();
600 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
601
602 ASSERT_TRUE(parts_sorter.Front() == nullptr);
603
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700604 EXPECT_EQ(output[0].timestamp.boot, 0);
605 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
606 EXPECT_EQ(output[1].timestamp.boot, 0);
607 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1000));
608 EXPECT_EQ(output[2].timestamp.boot, 0);
609 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1901));
610 EXPECT_EQ(output[3].timestamp.boot, 0);
611 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800612}
613
Austin Schuhb000de62020-12-03 22:00:40 -0800614// Tests that we can pull messages out of a log sorted in order.
615TEST_F(LogPartsSorterTest, WayBeforeStart) {
616 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
617 {
618 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
619 writer.QueueSpan(config0_.span());
620 writer.QueueSizedFlatbuffer(
621 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
622 writer.QueueSizedFlatbuffer(
623 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
624 writer.QueueSizedFlatbuffer(
625 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
626 writer.QueueSizedFlatbuffer(
627 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
628 writer.QueueSizedFlatbuffer(
629 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
630 }
631
632 const std::vector<LogFile> parts = SortParts({logfile0_});
633
634 LogPartsSorter parts_sorter(parts[0].parts[0]);
635
636 // Confirm we aren't sorted until any time until the message is popped.
637 // Peeking shouldn't change the sorted until time.
638 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
639
640 std::deque<Message> output;
641
642 for (monotonic_clock::time_point t :
643 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
644 e + chrono::milliseconds(1900), monotonic_clock::max_time,
645 monotonic_clock::max_time}) {
646 ASSERT_TRUE(parts_sorter.Front() != nullptr);
647 output.emplace_back(std::move(*parts_sorter.Front()));
648 parts_sorter.PopFront();
649 EXPECT_EQ(parts_sorter.sorted_until(), t);
650 }
651
652 ASSERT_TRUE(parts_sorter.Front() == nullptr);
653
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700654 EXPECT_EQ(output[0].timestamp.boot, 0u);
655 EXPECT_EQ(output[0].timestamp.time, e - chrono::milliseconds(1000));
656 EXPECT_EQ(output[1].timestamp.boot, 0u);
657 EXPECT_EQ(output[1].timestamp.time, e - chrono::milliseconds(500));
658 EXPECT_EQ(output[2].timestamp.boot, 0u);
659 EXPECT_EQ(output[2].timestamp.time, e - chrono::milliseconds(10));
660 EXPECT_EQ(output[3].timestamp.boot, 0u);
661 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(1901));
662 EXPECT_EQ(output[4].timestamp.boot, 0u);
663 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuhb000de62020-12-03 22:00:40 -0800664}
665
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800666// Tests that messages too far out of order trigger death.
667TEST_F(LogPartsSorterDeathTest, Pull) {
668 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
669 {
670 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
671 writer.QueueSpan(config0_.span());
672 writer.QueueSizedFlatbuffer(
673 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
674 writer.QueueSizedFlatbuffer(
675 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
676 writer.QueueSizedFlatbuffer(
677 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
678 // The following message is too far out of order and will trigger the CHECK.
679 writer.QueueSizedFlatbuffer(
680 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
681 }
682
683 const std::vector<LogFile> parts = SortParts({logfile0_});
684
685 LogPartsSorter parts_sorter(parts[0].parts[0]);
686
687 // Confirm we aren't sorted until any time until the message is popped.
688 // Peeking shouldn't change the sorted until time.
689 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
690 std::deque<Message> output;
691
692 ASSERT_TRUE(parts_sorter.Front() != nullptr);
693 parts_sorter.PopFront();
694 ASSERT_TRUE(parts_sorter.Front() != nullptr);
695 ASSERT_TRUE(parts_sorter.Front() != nullptr);
696 parts_sorter.PopFront();
697
Austin Schuh58646e22021-08-23 23:51:46 -0700698 EXPECT_DEATH({ parts_sorter.Front(); },
699 "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800700}
701
Austin Schuh8f52ed52020-11-30 23:12:39 -0800702// Tests that we can merge data from 2 separate files, including duplicate data.
703TEST_F(NodeMergerTest, TwoFileMerger) {
704 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
705 {
706 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
707 writer0.QueueSpan(config0_.span());
708 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
709 writer1.QueueSpan(config1_.span());
710
711 writer0.QueueSizedFlatbuffer(
712 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
713 writer1.QueueSizedFlatbuffer(
714 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
715
716 writer0.QueueSizedFlatbuffer(
717 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
718 writer1.QueueSizedFlatbuffer(
719 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
720
721 // Make a duplicate!
722 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
723 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
724 writer0.QueueSpan(msg.span());
725 writer1.QueueSpan(msg.span());
726
727 writer1.QueueSizedFlatbuffer(
728 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
729 }
730
731 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Austin Schuhd2f96102020-12-01 20:27:29 -0800732 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800733
Austin Schuhd2f96102020-12-01 20:27:29 -0800734 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800735
736 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
737
738 std::deque<Message> output;
739
740 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
741 ASSERT_TRUE(merger.Front() != nullptr);
742 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
743
744 output.emplace_back(std::move(*merger.Front()));
745 merger.PopFront();
746 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
747
748 ASSERT_TRUE(merger.Front() != nullptr);
749 output.emplace_back(std::move(*merger.Front()));
750 merger.PopFront();
751 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
752
753 ASSERT_TRUE(merger.Front() != nullptr);
754 output.emplace_back(std::move(*merger.Front()));
755 merger.PopFront();
756 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
757
758 ASSERT_TRUE(merger.Front() != nullptr);
759 output.emplace_back(std::move(*merger.Front()));
760 merger.PopFront();
761 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
762
763 ASSERT_TRUE(merger.Front() != nullptr);
764 output.emplace_back(std::move(*merger.Front()));
765 merger.PopFront();
766 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
767
768 ASSERT_TRUE(merger.Front() != nullptr);
769 output.emplace_back(std::move(*merger.Front()));
770 merger.PopFront();
771 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
772
773 ASSERT_TRUE(merger.Front() == nullptr);
774
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700775 EXPECT_EQ(output[0].timestamp.boot, 0u);
776 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
777 EXPECT_EQ(output[1].timestamp.boot, 0u);
778 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1001));
779 EXPECT_EQ(output[2].timestamp.boot, 0u);
780 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1002));
781 EXPECT_EQ(output[3].timestamp.boot, 0u);
782 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
783 EXPECT_EQ(output[4].timestamp.boot, 0u);
784 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(3000));
785 EXPECT_EQ(output[5].timestamp.boot, 0u);
786 EXPECT_EQ(output[5].timestamp.time, e + chrono::milliseconds(3002));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800787}
788
Austin Schuh8bf1e632021-01-02 22:41:04 -0800789// Tests that we can merge timestamps with various combinations of
790// monotonic_timestamp_time.
791TEST_F(NodeMergerTest, TwoFileTimestampMerger) {
792 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
793 {
794 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
795 writer0.QueueSpan(config0_.span());
796 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
797 writer1.QueueSpan(config1_.span());
798
799 // Neither has it.
800 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
801 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
802 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
803 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
804 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
805
806 // First only has it.
807 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
808 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
809 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
810 e + chrono::nanoseconds(971)));
811 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
812 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
813
814 // Second only has it.
815 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
816 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
817 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
818 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
819 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
820 e + chrono::nanoseconds(972)));
821
822 // Both have it.
823 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
824 writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
825 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
826 e + chrono::nanoseconds(973)));
827 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
828 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
829 e + chrono::nanoseconds(973)));
830 }
831
832 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
833 ASSERT_EQ(parts.size(), 1u);
834
835 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
836
837 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
838
839 std::deque<Message> output;
840
841 for (int i = 0; i < 4; ++i) {
842 ASSERT_TRUE(merger.Front() != nullptr);
843 output.emplace_back(std::move(*merger.Front()));
844 merger.PopFront();
845 }
846 ASSERT_TRUE(merger.Front() == nullptr);
847
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700848 EXPECT_EQ(output[0].timestamp.boot, 0u);
849 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700850 EXPECT_FALSE(output[0].data->has_monotonic_timestamp_time);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700851
852 EXPECT_EQ(output[1].timestamp.boot, 0u);
853 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700854 EXPECT_TRUE(output[1].data->has_monotonic_timestamp_time);
855 EXPECT_EQ(output[1].data->monotonic_timestamp_time,
856 monotonic_clock::time_point(std::chrono::nanoseconds(971)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700857
858 EXPECT_EQ(output[2].timestamp.boot, 0u);
859 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700860 EXPECT_TRUE(output[2].data->has_monotonic_timestamp_time);
861 EXPECT_EQ(output[2].data->monotonic_timestamp_time,
862 monotonic_clock::time_point(std::chrono::nanoseconds(972)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700863
864 EXPECT_EQ(output[3].timestamp.boot, 0u);
865 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700866 EXPECT_TRUE(output[3].data->has_monotonic_timestamp_time);
867 EXPECT_EQ(output[3].data->monotonic_timestamp_time,
868 monotonic_clock::time_point(std::chrono::nanoseconds(973)));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800869}
870
Austin Schuhd2f96102020-12-01 20:27:29 -0800871// Tests that we can match timestamps on delivered messages.
872TEST_F(TimestampMapperTest, ReadNode0First) {
873 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
874 {
875 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
876 writer0.QueueSpan(config0_.span());
877 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
878 writer1.QueueSpan(config2_.span());
879
880 writer0.QueueSizedFlatbuffer(
881 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
882 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
883 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
884
885 writer0.QueueSizedFlatbuffer(
886 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
887 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
888 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
889
890 writer0.QueueSizedFlatbuffer(
891 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
892 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
893 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
894 }
895
896 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
897
898 ASSERT_EQ(parts[0].logger_node, "pi1");
899 ASSERT_EQ(parts[1].logger_node, "pi2");
900
Austin Schuh79b30942021-01-24 22:32:21 -0800901 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800902 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -0800903 mapper0.set_timestamp_callback(
904 [&](TimestampedMessage *) { ++mapper0_count; });
905 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800906 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -0800907 mapper1.set_timestamp_callback(
908 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -0800909
910 mapper0.AddPeer(&mapper1);
911 mapper1.AddPeer(&mapper0);
912
913 {
914 std::deque<TimestampedMessage> output0;
915
Austin Schuh79b30942021-01-24 22:32:21 -0800916 EXPECT_EQ(mapper0_count, 0u);
917 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800918 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800919 EXPECT_EQ(mapper0_count, 1u);
920 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800921 output0.emplace_back(std::move(*mapper0.Front()));
922 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700923 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800924 EXPECT_EQ(mapper0_count, 1u);
925 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800926
927 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800928 EXPECT_EQ(mapper0_count, 2u);
929 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800930 output0.emplace_back(std::move(*mapper0.Front()));
931 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700932 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800933
934 ASSERT_TRUE(mapper0.Front() != nullptr);
935 output0.emplace_back(std::move(*mapper0.Front()));
936 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700937 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800938
Austin Schuh79b30942021-01-24 22:32:21 -0800939 EXPECT_EQ(mapper0_count, 3u);
940 EXPECT_EQ(mapper1_count, 0u);
941
Austin Schuhd2f96102020-12-01 20:27:29 -0800942 ASSERT_TRUE(mapper0.Front() == nullptr);
943
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700944 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
945 EXPECT_EQ(output0[0].monotonic_event_time.time,
946 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700947 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700948
949 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
950 EXPECT_EQ(output0[1].monotonic_event_time.time,
951 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700952 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700953
954 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
955 EXPECT_EQ(output0[2].monotonic_event_time.time,
956 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700957 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -0800958 }
959
960 {
961 SCOPED_TRACE("Trying node1 now");
962 std::deque<TimestampedMessage> output1;
963
Austin Schuh79b30942021-01-24 22:32:21 -0800964 EXPECT_EQ(mapper0_count, 3u);
965 EXPECT_EQ(mapper1_count, 0u);
966
Austin Schuhd2f96102020-12-01 20:27:29 -0800967 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800968 EXPECT_EQ(mapper0_count, 3u);
969 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800970 output1.emplace_back(std::move(*mapper1.Front()));
971 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700972 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800973 EXPECT_EQ(mapper0_count, 3u);
974 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800975
976 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800977 EXPECT_EQ(mapper0_count, 3u);
978 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800979 output1.emplace_back(std::move(*mapper1.Front()));
980 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700981 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800982
983 ASSERT_TRUE(mapper1.Front() != nullptr);
984 output1.emplace_back(std::move(*mapper1.Front()));
985 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700986 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800987
Austin Schuh79b30942021-01-24 22:32:21 -0800988 EXPECT_EQ(mapper0_count, 3u);
989 EXPECT_EQ(mapper1_count, 3u);
990
Austin Schuhd2f96102020-12-01 20:27:29 -0800991 ASSERT_TRUE(mapper1.Front() == nullptr);
992
Austin Schuh79b30942021-01-24 22:32:21 -0800993 EXPECT_EQ(mapper0_count, 3u);
994 EXPECT_EQ(mapper1_count, 3u);
995
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700996 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
997 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -0800998 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700999 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001000
1001 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1002 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001003 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001004 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001005
1006 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1007 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001008 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001009 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001010 }
1011}
1012
Austin Schuh8bf1e632021-01-02 22:41:04 -08001013// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
1014// returned.
1015TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
1016 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1017 {
1018 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1019 writer0.QueueSpan(config0_.span());
1020 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1021 writer1.QueueSpan(config4_.span());
1022
1023 writer0.QueueSizedFlatbuffer(
1024 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1025 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1026 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
1027 e + chrono::nanoseconds(971)));
1028
1029 writer0.QueueSizedFlatbuffer(
1030 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1031 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1032 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
1033 e + chrono::nanoseconds(5458)));
1034
1035 writer0.QueueSizedFlatbuffer(
1036 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1037 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1038 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1039 }
1040
1041 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1042
1043 for (const auto &p : parts) {
1044 LOG(INFO) << p;
1045 }
1046
1047 ASSERT_EQ(parts.size(), 1u);
1048
Austin Schuh79b30942021-01-24 22:32:21 -08001049 size_t mapper0_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001050 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001051 mapper0.set_timestamp_callback(
1052 [&](TimestampedMessage *) { ++mapper0_count; });
1053 size_t mapper1_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001054 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001055 mapper1.set_timestamp_callback(
1056 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -08001057
1058 mapper0.AddPeer(&mapper1);
1059 mapper1.AddPeer(&mapper0);
1060
1061 {
1062 std::deque<TimestampedMessage> output0;
1063
1064 for (int i = 0; i < 3; ++i) {
1065 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
1066 output0.emplace_back(std::move(*mapper0.Front()));
1067 mapper0.PopFront();
1068 }
1069
1070 ASSERT_TRUE(mapper0.Front() == nullptr);
1071
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001072 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1073 EXPECT_EQ(output0[0].monotonic_event_time.time,
1074 e + chrono::milliseconds(1000));
1075 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
1076 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
1077 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001078 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001079
1080 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1081 EXPECT_EQ(output0[1].monotonic_event_time.time,
1082 e + chrono::milliseconds(2000));
1083 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
1084 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
1085 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001086 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001087
1088 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1089 EXPECT_EQ(output0[2].monotonic_event_time.time,
1090 e + chrono::milliseconds(3000));
1091 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
1092 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
1093 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001094 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001095 }
1096
1097 {
1098 SCOPED_TRACE("Trying node1 now");
1099 std::deque<TimestampedMessage> output1;
1100
1101 for (int i = 0; i < 3; ++i) {
1102 ASSERT_TRUE(mapper1.Front() != nullptr);
1103 output1.emplace_back(std::move(*mapper1.Front()));
1104 mapper1.PopFront();
1105 }
1106
1107 ASSERT_TRUE(mapper1.Front() == nullptr);
1108
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001109 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1110 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001111 e + chrono::seconds(100) + chrono::milliseconds(1000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001112 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
1113 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001114 e + chrono::nanoseconds(971));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001115 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001116
1117 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1118 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001119 e + chrono::seconds(100) + chrono::milliseconds(2000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001120 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
1121 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001122 e + chrono::nanoseconds(5458));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001123 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001124
1125 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1126 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001127 e + chrono::seconds(100) + chrono::milliseconds(3000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001128 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
1129 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
1130 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001131 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001132 }
Austin Schuh79b30942021-01-24 22:32:21 -08001133
1134 EXPECT_EQ(mapper0_count, 3u);
1135 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001136}
1137
Austin Schuhd2f96102020-12-01 20:27:29 -08001138// Tests that we can match timestamps on delivered messages. By doing this in
1139// the reverse order, the second node needs to queue data up from the first node
1140// to find the matching timestamp.
1141TEST_F(TimestampMapperTest, ReadNode1First) {
1142 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1143 {
1144 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1145 writer0.QueueSpan(config0_.span());
1146 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1147 writer1.QueueSpan(config2_.span());
1148
1149 writer0.QueueSizedFlatbuffer(
1150 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1151 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1152 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1153
1154 writer0.QueueSizedFlatbuffer(
1155 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1156 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1157 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1158
1159 writer0.QueueSizedFlatbuffer(
1160 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1161 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1162 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1163 }
1164
1165 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1166
1167 ASSERT_EQ(parts[0].logger_node, "pi1");
1168 ASSERT_EQ(parts[1].logger_node, "pi2");
1169
Austin Schuh79b30942021-01-24 22:32:21 -08001170 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001171 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001172 mapper0.set_timestamp_callback(
1173 [&](TimestampedMessage *) { ++mapper0_count; });
1174 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001175 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001176 mapper1.set_timestamp_callback(
1177 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001178
1179 mapper0.AddPeer(&mapper1);
1180 mapper1.AddPeer(&mapper0);
1181
1182 {
1183 SCOPED_TRACE("Trying node1 now");
1184 std::deque<TimestampedMessage> output1;
1185
1186 ASSERT_TRUE(mapper1.Front() != nullptr);
1187 output1.emplace_back(std::move(*mapper1.Front()));
1188 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001189 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001190
1191 ASSERT_TRUE(mapper1.Front() != nullptr);
1192 output1.emplace_back(std::move(*mapper1.Front()));
1193 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001194 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001195
1196 ASSERT_TRUE(mapper1.Front() != nullptr);
1197 output1.emplace_back(std::move(*mapper1.Front()));
1198 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001199 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001200
1201 ASSERT_TRUE(mapper1.Front() == nullptr);
1202
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001203 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1204 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001205 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001206 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001207
1208 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1209 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001210 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001211 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001212
1213 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1214 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001215 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001216 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001217 }
1218
1219 {
1220 std::deque<TimestampedMessage> output0;
1221
1222 ASSERT_TRUE(mapper0.Front() != nullptr);
1223 output0.emplace_back(std::move(*mapper0.Front()));
1224 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001225 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001226
1227 ASSERT_TRUE(mapper0.Front() != nullptr);
1228 output0.emplace_back(std::move(*mapper0.Front()));
1229 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001230 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001231
1232 ASSERT_TRUE(mapper0.Front() != nullptr);
1233 output0.emplace_back(std::move(*mapper0.Front()));
1234 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001235 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001236
1237 ASSERT_TRUE(mapper0.Front() == nullptr);
1238
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001239 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1240 EXPECT_EQ(output0[0].monotonic_event_time.time,
1241 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001242 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001243
1244 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1245 EXPECT_EQ(output0[1].monotonic_event_time.time,
1246 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001247 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001248
1249 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1250 EXPECT_EQ(output0[2].monotonic_event_time.time,
1251 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001252 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001253 }
Austin Schuh79b30942021-01-24 22:32:21 -08001254
1255 EXPECT_EQ(mapper0_count, 3u);
1256 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001257}
1258
1259// Tests that we return just the timestamps if we couldn't find the data and the
1260// missing data was at the beginning of the file.
1261TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1262 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1263 {
1264 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1265 writer0.QueueSpan(config0_.span());
1266 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1267 writer1.QueueSpan(config2_.span());
1268
1269 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
1270 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1271 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1272
1273 writer0.QueueSizedFlatbuffer(
1274 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1275 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1276 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1277
1278 writer0.QueueSizedFlatbuffer(
1279 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1280 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1281 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1282 }
1283
1284 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1285
1286 ASSERT_EQ(parts[0].logger_node, "pi1");
1287 ASSERT_EQ(parts[1].logger_node, "pi2");
1288
Austin Schuh79b30942021-01-24 22:32:21 -08001289 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001290 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001291 mapper0.set_timestamp_callback(
1292 [&](TimestampedMessage *) { ++mapper0_count; });
1293 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001294 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001295 mapper1.set_timestamp_callback(
1296 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001297
1298 mapper0.AddPeer(&mapper1);
1299 mapper1.AddPeer(&mapper0);
1300
1301 {
1302 SCOPED_TRACE("Trying node1 now");
1303 std::deque<TimestampedMessage> output1;
1304
1305 ASSERT_TRUE(mapper1.Front() != nullptr);
1306 output1.emplace_back(std::move(*mapper1.Front()));
1307 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001308 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001309
1310 ASSERT_TRUE(mapper1.Front() != nullptr);
1311 output1.emplace_back(std::move(*mapper1.Front()));
1312 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001313 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001314
1315 ASSERT_TRUE(mapper1.Front() != nullptr);
1316 output1.emplace_back(std::move(*mapper1.Front()));
1317 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001318 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001319
1320 ASSERT_TRUE(mapper1.Front() == nullptr);
1321
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001322 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1323 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001324 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001325 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001326
1327 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1328 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001329 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001330 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001331
1332 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1333 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001334 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001335 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001336 }
Austin Schuh79b30942021-01-24 22:32:21 -08001337
1338 EXPECT_EQ(mapper0_count, 0u);
1339 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001340}
1341
1342// Tests that we return just the timestamps if we couldn't find the data and the
1343// missing data was at the end of the file.
1344TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1345 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1346 {
1347 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1348 writer0.QueueSpan(config0_.span());
1349 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1350 writer1.QueueSpan(config2_.span());
1351
1352 writer0.QueueSizedFlatbuffer(
1353 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1354 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1355 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1356
1357 writer0.QueueSizedFlatbuffer(
1358 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1359 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1360 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1361
1362 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
1363 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1364 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1365 }
1366
1367 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1368
1369 ASSERT_EQ(parts[0].logger_node, "pi1");
1370 ASSERT_EQ(parts[1].logger_node, "pi2");
1371
Austin Schuh79b30942021-01-24 22:32:21 -08001372 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001373 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001374 mapper0.set_timestamp_callback(
1375 [&](TimestampedMessage *) { ++mapper0_count; });
1376 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001377 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001378 mapper1.set_timestamp_callback(
1379 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001380
1381 mapper0.AddPeer(&mapper1);
1382 mapper1.AddPeer(&mapper0);
1383
1384 {
1385 SCOPED_TRACE("Trying node1 now");
1386 std::deque<TimestampedMessage> output1;
1387
1388 ASSERT_TRUE(mapper1.Front() != nullptr);
1389 output1.emplace_back(std::move(*mapper1.Front()));
1390 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001391 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001392
1393 ASSERT_TRUE(mapper1.Front() != nullptr);
1394 output1.emplace_back(std::move(*mapper1.Front()));
1395 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001396 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001397
1398 ASSERT_TRUE(mapper1.Front() != nullptr);
1399 output1.emplace_back(std::move(*mapper1.Front()));
1400 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001401 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001402
1403 ASSERT_TRUE(mapper1.Front() == nullptr);
1404
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001405 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1406 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001407 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001408 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001409
1410 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1411 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001412 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001413 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001414
1415 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1416 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001417 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001418 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001419 }
Austin Schuh79b30942021-01-24 22:32:21 -08001420
1421 EXPECT_EQ(mapper0_count, 0u);
1422 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001423}
1424
Austin Schuh993ccb52020-12-12 15:59:32 -08001425// Tests that we handle a message which failed to forward or be logged.
1426TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1427 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1428 {
1429 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1430 writer0.QueueSpan(config0_.span());
1431 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1432 writer1.QueueSpan(config2_.span());
1433
1434 writer0.QueueSizedFlatbuffer(
1435 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1436 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1437 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1438
1439 // Create both the timestamp and message, but don't log them, simulating a
1440 // forwarding drop.
1441 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1442 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1443 chrono::seconds(100));
1444
1445 writer0.QueueSizedFlatbuffer(
1446 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1447 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1448 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1449 }
1450
1451 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1452
1453 ASSERT_EQ(parts[0].logger_node, "pi1");
1454 ASSERT_EQ(parts[1].logger_node, "pi2");
1455
Austin Schuh79b30942021-01-24 22:32:21 -08001456 size_t mapper0_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001457 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001458 mapper0.set_timestamp_callback(
1459 [&](TimestampedMessage *) { ++mapper0_count; });
1460 size_t mapper1_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001461 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001462 mapper1.set_timestamp_callback(
1463 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001464
1465 mapper0.AddPeer(&mapper1);
1466 mapper1.AddPeer(&mapper0);
1467
1468 {
1469 std::deque<TimestampedMessage> output1;
1470
1471 ASSERT_TRUE(mapper1.Front() != nullptr);
1472 output1.emplace_back(std::move(*mapper1.Front()));
1473 mapper1.PopFront();
1474
1475 ASSERT_TRUE(mapper1.Front() != nullptr);
1476 output1.emplace_back(std::move(*mapper1.Front()));
1477
1478 ASSERT_FALSE(mapper1.Front() == nullptr);
1479
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001480 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1481 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001482 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001483 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001484
1485 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1486 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001487 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001488 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh993ccb52020-12-12 15:59:32 -08001489 }
Austin Schuh79b30942021-01-24 22:32:21 -08001490
1491 EXPECT_EQ(mapper0_count, 0u);
1492 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001493}
1494
Austin Schuhd2f96102020-12-01 20:27:29 -08001495// Tests that we properly sort log files with duplicate timestamps.
1496TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1497 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1498 {
1499 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1500 writer0.QueueSpan(config0_.span());
1501 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1502 writer1.QueueSpan(config2_.span());
1503
1504 writer0.QueueSizedFlatbuffer(
1505 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1506 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1507 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1508
1509 writer0.QueueSizedFlatbuffer(
1510 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1511 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1512 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1513
1514 writer0.QueueSizedFlatbuffer(
1515 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
1516 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1517 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1518
1519 writer0.QueueSizedFlatbuffer(
1520 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
1521 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1522 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1523 }
1524
1525 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1526
1527 ASSERT_EQ(parts[0].logger_node, "pi1");
1528 ASSERT_EQ(parts[1].logger_node, "pi2");
1529
Austin Schuh79b30942021-01-24 22:32:21 -08001530 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001531 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001532 mapper0.set_timestamp_callback(
1533 [&](TimestampedMessage *) { ++mapper0_count; });
1534 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001535 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001536 mapper1.set_timestamp_callback(
1537 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001538
1539 mapper0.AddPeer(&mapper1);
1540 mapper1.AddPeer(&mapper0);
1541
1542 {
1543 SCOPED_TRACE("Trying node1 now");
1544 std::deque<TimestampedMessage> output1;
1545
1546 for (int i = 0; i < 4; ++i) {
1547 ASSERT_TRUE(mapper1.Front() != nullptr);
1548 output1.emplace_back(std::move(*mapper1.Front()));
1549 mapper1.PopFront();
1550 }
1551 ASSERT_TRUE(mapper1.Front() == nullptr);
1552
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001553 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1554 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001555 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001556 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001557
1558 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1559 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001560 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001561 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001562
1563 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1564 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001565 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001566 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001567
1568 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1569 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001570 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001571 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001572 }
Austin Schuh79b30942021-01-24 22:32:21 -08001573
1574 EXPECT_EQ(mapper0_count, 0u);
1575 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001576}
1577
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001578// Tests that we properly produce a valid start time.
Austin Schuhd2f96102020-12-01 20:27:29 -08001579TEST_F(TimestampMapperTest, StartTime) {
1580 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1581 {
1582 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1583 writer0.QueueSpan(config0_.span());
1584 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1585 writer1.QueueSpan(config1_.span());
1586 DetachedBufferWriter writer2(logfile2_, std::make_unique<DummyEncoder>());
1587 writer2.QueueSpan(config3_.span());
1588 }
1589
1590 const std::vector<LogFile> parts =
1591 SortParts({logfile0_, logfile1_, logfile2_});
1592
Austin Schuh79b30942021-01-24 22:32:21 -08001593 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001594 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001595 mapper0.set_timestamp_callback(
1596 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001597
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001598 EXPECT_EQ(mapper0.monotonic_start_time(0), e + chrono::milliseconds(1));
1599 EXPECT_EQ(mapper0.realtime_start_time(0),
Austin Schuhd2f96102020-12-01 20:27:29 -08001600 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001601 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001602}
1603
Austin Schuhfecf1d82020-12-19 16:57:28 -08001604// Tests that when a peer isn't registered, we treat that as if there was no
1605// data available.
1606TEST_F(TimestampMapperTest, NoPeer) {
1607 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1608 {
1609 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1610 writer0.QueueSpan(config0_.span());
1611 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1612 writer1.QueueSpan(config2_.span());
1613
1614 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
1615 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1616 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1617
1618 writer0.QueueSizedFlatbuffer(
1619 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1620 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1621 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1622
1623 writer0.QueueSizedFlatbuffer(
1624 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1625 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1626 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1627 }
1628
1629 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1630
1631 ASSERT_EQ(parts[0].logger_node, "pi1");
1632 ASSERT_EQ(parts[1].logger_node, "pi2");
1633
Austin Schuh79b30942021-01-24 22:32:21 -08001634 size_t mapper1_count = 0;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001635 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001636 mapper1.set_timestamp_callback(
1637 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001638
1639 {
1640 std::deque<TimestampedMessage> output1;
1641
1642 ASSERT_TRUE(mapper1.Front() != nullptr);
1643 output1.emplace_back(std::move(*mapper1.Front()));
1644 mapper1.PopFront();
1645 ASSERT_TRUE(mapper1.Front() != nullptr);
1646 output1.emplace_back(std::move(*mapper1.Front()));
1647 mapper1.PopFront();
1648 ASSERT_TRUE(mapper1.Front() != nullptr);
1649 output1.emplace_back(std::move(*mapper1.Front()));
1650 mapper1.PopFront();
1651 ASSERT_TRUE(mapper1.Front() == nullptr);
1652
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001653 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1654 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001655 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001656 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001657
1658 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1659 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001660 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001661 EXPECT_FALSE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001662
1663 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1664 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001665 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001666 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001667 }
Austin Schuh79b30942021-01-24 22:32:21 -08001668 EXPECT_EQ(mapper1_count, 3u);
1669}
1670
1671// Tests that we can queue messages and call the timestamp callback for both
1672// nodes.
1673TEST_F(TimestampMapperTest, QueueUntilNode0) {
1674 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1675 {
1676 DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
1677 writer0.QueueSpan(config0_.span());
1678 DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
1679 writer1.QueueSpan(config2_.span());
1680
1681 writer0.QueueSizedFlatbuffer(
1682 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1683 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1684 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1685
1686 writer0.QueueSizedFlatbuffer(
1687 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
1688 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1689 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1690
1691 writer0.QueueSizedFlatbuffer(
1692 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
1693 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1694 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1695
1696 writer0.QueueSizedFlatbuffer(
1697 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
1698 writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
1699 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1700 }
1701
1702 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1703
1704 ASSERT_EQ(parts[0].logger_node, "pi1");
1705 ASSERT_EQ(parts[1].logger_node, "pi2");
1706
1707 size_t mapper0_count = 0;
1708 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1709 mapper0.set_timestamp_callback(
1710 [&](TimestampedMessage *) { ++mapper0_count; });
1711 size_t mapper1_count = 0;
1712 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1713 mapper1.set_timestamp_callback(
1714 [&](TimestampedMessage *) { ++mapper1_count; });
1715
1716 mapper0.AddPeer(&mapper1);
1717 mapper1.AddPeer(&mapper0);
1718
1719 {
1720 std::deque<TimestampedMessage> output0;
1721
1722 EXPECT_EQ(mapper0_count, 0u);
1723 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001724 mapper0.QueueUntil(
1725 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001726 EXPECT_EQ(mapper0_count, 3u);
1727 EXPECT_EQ(mapper1_count, 0u);
1728
1729 ASSERT_TRUE(mapper0.Front() != nullptr);
1730 EXPECT_EQ(mapper0_count, 3u);
1731 EXPECT_EQ(mapper1_count, 0u);
1732
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001733 mapper0.QueueUntil(
1734 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001735 EXPECT_EQ(mapper0_count, 3u);
1736 EXPECT_EQ(mapper1_count, 0u);
1737
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001738 mapper0.QueueUntil(
1739 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001740 EXPECT_EQ(mapper0_count, 4u);
1741 EXPECT_EQ(mapper1_count, 0u);
1742
1743 output0.emplace_back(std::move(*mapper0.Front()));
1744 mapper0.PopFront();
1745 output0.emplace_back(std::move(*mapper0.Front()));
1746 mapper0.PopFront();
1747 output0.emplace_back(std::move(*mapper0.Front()));
1748 mapper0.PopFront();
1749 output0.emplace_back(std::move(*mapper0.Front()));
1750 mapper0.PopFront();
1751
1752 EXPECT_EQ(mapper0_count, 4u);
1753 EXPECT_EQ(mapper1_count, 0u);
1754
1755 ASSERT_TRUE(mapper0.Front() == nullptr);
1756
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001757 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1758 EXPECT_EQ(output0[0].monotonic_event_time.time,
1759 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001760 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001761
1762 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1763 EXPECT_EQ(output0[1].monotonic_event_time.time,
1764 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001765 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001766
1767 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1768 EXPECT_EQ(output0[2].monotonic_event_time.time,
1769 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001770 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001771
1772 EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
1773 EXPECT_EQ(output0[3].monotonic_event_time.time,
1774 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001775 EXPECT_TRUE(output0[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001776 }
1777
1778 {
1779 SCOPED_TRACE("Trying node1 now");
1780 std::deque<TimestampedMessage> output1;
1781
1782 EXPECT_EQ(mapper0_count, 4u);
1783 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001784 mapper1.QueueUntil(BootTimestamp{
1785 .boot = 0,
1786 .time = e + chrono::seconds(100) + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001787 EXPECT_EQ(mapper0_count, 4u);
1788 EXPECT_EQ(mapper1_count, 3u);
1789
1790 ASSERT_TRUE(mapper1.Front() != nullptr);
1791 EXPECT_EQ(mapper0_count, 4u);
1792 EXPECT_EQ(mapper1_count, 3u);
1793
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001794 mapper1.QueueUntil(BootTimestamp{
1795 .boot = 0,
1796 .time = e + chrono::seconds(100) + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001797 EXPECT_EQ(mapper0_count, 4u);
1798 EXPECT_EQ(mapper1_count, 3u);
1799
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001800 mapper1.QueueUntil(BootTimestamp{
1801 .boot = 0,
1802 .time = e + chrono::seconds(100) + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001803 EXPECT_EQ(mapper0_count, 4u);
1804 EXPECT_EQ(mapper1_count, 4u);
1805
1806 ASSERT_TRUE(mapper1.Front() != nullptr);
1807 EXPECT_EQ(mapper0_count, 4u);
1808 EXPECT_EQ(mapper1_count, 4u);
1809
1810 output1.emplace_back(std::move(*mapper1.Front()));
1811 mapper1.PopFront();
1812 ASSERT_TRUE(mapper1.Front() != nullptr);
1813 output1.emplace_back(std::move(*mapper1.Front()));
1814 mapper1.PopFront();
1815 ASSERT_TRUE(mapper1.Front() != nullptr);
1816 output1.emplace_back(std::move(*mapper1.Front()));
1817 mapper1.PopFront();
1818 ASSERT_TRUE(mapper1.Front() != nullptr);
1819 output1.emplace_back(std::move(*mapper1.Front()));
1820 mapper1.PopFront();
1821
1822 EXPECT_EQ(mapper0_count, 4u);
1823 EXPECT_EQ(mapper1_count, 4u);
1824
1825 ASSERT_TRUE(mapper1.Front() == nullptr);
1826
1827 EXPECT_EQ(mapper0_count, 4u);
1828 EXPECT_EQ(mapper1_count, 4u);
1829
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001830 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1831 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001832 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001833 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001834
1835 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1836 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001837 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001838 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001839
1840 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1841 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001842 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001843 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001844
1845 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1846 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001847 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001848 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001849 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08001850}
1851
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001852class BootMergerTest : public SortingElementTest {
1853 public:
1854 BootMergerTest()
1855 : SortingElementTest(),
1856 boot0_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07001857 /* 100ms */
1858 "max_out_of_order_duration": 100000000,
1859 "node": {
1860 "name": "pi2"
1861 },
1862 "logger_node": {
1863 "name": "pi1"
1864 },
1865 "monotonic_start_time": 1000000,
1866 "realtime_start_time": 1000000000000,
1867 "logger_monotonic_start_time": 1000000,
1868 "logger_realtime_start_time": 1000000000000,
1869 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
1870 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
1871 "parts_index": 0,
1872 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
1873 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07001874 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
1875 "boot_uuids": [
1876 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
1877 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
1878 ""
1879 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001880})")),
1881 boot1_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07001882 /* 100ms */
1883 "max_out_of_order_duration": 100000000,
1884 "node": {
1885 "name": "pi2"
1886 },
1887 "logger_node": {
1888 "name": "pi1"
1889 },
1890 "monotonic_start_time": 1000000,
1891 "realtime_start_time": 1000000000000,
1892 "logger_monotonic_start_time": 1000000,
1893 "logger_realtime_start_time": 1000000000000,
1894 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
1895 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
1896 "parts_index": 1,
1897 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
1898 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07001899 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
1900 "boot_uuids": [
1901 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
1902 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
1903 ""
1904 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001905})")) {}
Austin Schuh8bee6882021-06-28 21:03:28 -07001906
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001907 protected:
1908 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
1909 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
1910};
1911
1912// This tests that we can properly sort a multi-node log file which has the old
1913// (and buggy) timestamps in the header, and the non-resetting parts_index.
1914// These make it so we can just bairly figure out what happened first and what
1915// happened second, but not in a way that is robust to multiple nodes rebooting.
1916TEST_F(BootMergerTest, OldReboot) {
Austin Schuh8bee6882021-06-28 21:03:28 -07001917 {
1918 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001919 writer.QueueSpan(boot0_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07001920 }
1921 {
1922 DetachedBufferWriter writer(logfile1_, std::make_unique<DummyEncoder>());
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001923 writer.QueueSpan(boot1_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07001924 }
1925
1926 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1927
1928 ASSERT_EQ(parts.size(), 1u);
1929 ASSERT_EQ(parts[0].parts.size(), 2u);
1930
1931 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
1932 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001933 boot0_.message().source_node_boot_uuid()->string_view());
Austin Schuh8bee6882021-06-28 21:03:28 -07001934
1935 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
1936 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001937 boot1_.message().source_node_boot_uuid()->string_view());
1938}
1939
1940// This tests that we can produce messages ordered across a reboot.
1941TEST_F(BootMergerTest, SortAcrossReboot) {
1942 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1943 {
1944 DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
1945 writer.QueueSpan(boot0_.span());
1946 writer.QueueSizedFlatbuffer(
1947 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1948 writer.QueueSizedFlatbuffer(
1949 MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
1950 }
1951 {
1952 DetachedBufferWriter writer(logfile1_, std::make_unique<DummyEncoder>());
1953 writer.QueueSpan(boot1_.span());
1954 writer.QueueSizedFlatbuffer(
1955 MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
1956 writer.QueueSizedFlatbuffer(
1957 MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
1958 }
1959
1960 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1961 ASSERT_EQ(parts.size(), 1u);
1962 ASSERT_EQ(parts[0].parts.size(), 2u);
1963
1964 BootMerger merger(FilterPartsForNode(parts, "pi2"));
1965
1966 EXPECT_EQ(merger.node(), 1u);
1967
1968 std::vector<Message> output;
1969 for (int i = 0; i < 4; ++i) {
1970 ASSERT_TRUE(merger.Front() != nullptr);
1971 output.emplace_back(std::move(*merger.Front()));
1972 merger.PopFront();
1973 }
1974
1975 ASSERT_TRUE(merger.Front() == nullptr);
1976
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001977 EXPECT_EQ(output[0].timestamp.boot, 0u);
1978 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
1979 EXPECT_EQ(output[1].timestamp.boot, 0u);
1980 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(2000));
1981
1982 EXPECT_EQ(output[2].timestamp.boot, 1u);
1983 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(100));
1984 EXPECT_EQ(output[3].timestamp.boot, 1u);
1985 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
Austin Schuh8bee6882021-06-28 21:03:28 -07001986}
1987
Austin Schuh48507722021-07-17 17:29:24 -07001988class RebootTimestampMapperTest : public SortingElementTest {
1989 public:
1990 RebootTimestampMapperTest()
1991 : SortingElementTest(),
1992 boot0a_(MakeHeader(config_, R"({
1993 /* 100ms */
1994 "max_out_of_order_duration": 100000000,
1995 "node": {
1996 "name": "pi1"
1997 },
1998 "logger_node": {
1999 "name": "pi1"
2000 },
2001 "monotonic_start_time": 1000000,
2002 "realtime_start_time": 1000000000000,
2003 "logger_monotonic_start_time": 1000000,
2004 "logger_realtime_start_time": 1000000000000,
2005 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2006 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2007 "parts_index": 0,
2008 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2009 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2010 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2011 "boot_uuids": [
2012 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2013 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2014 ""
2015 ]
2016})")),
2017 boot0b_(MakeHeader(config_, R"({
2018 /* 100ms */
2019 "max_out_of_order_duration": 100000000,
2020 "node": {
2021 "name": "pi1"
2022 },
2023 "logger_node": {
2024 "name": "pi1"
2025 },
2026 "monotonic_start_time": 1000000,
2027 "realtime_start_time": 1000000000000,
2028 "logger_monotonic_start_time": 1000000,
2029 "logger_realtime_start_time": 1000000000000,
2030 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2031 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2032 "parts_index": 1,
2033 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2034 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2035 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2036 "boot_uuids": [
2037 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2038 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2039 ""
2040 ]
2041})")),
2042 boot1a_(MakeHeader(config_, R"({
2043 /* 100ms */
2044 "max_out_of_order_duration": 100000000,
2045 "node": {
2046 "name": "pi2"
2047 },
2048 "logger_node": {
2049 "name": "pi1"
2050 },
2051 "monotonic_start_time": 1000000,
2052 "realtime_start_time": 1000000000000,
2053 "logger_monotonic_start_time": 1000000,
2054 "logger_realtime_start_time": 1000000000000,
2055 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2056 "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
2057 "parts_index": 0,
2058 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2059 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2060 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2061 "boot_uuids": [
2062 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2063 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2064 ""
2065 ]
2066})")),
2067 boot1b_(MakeHeader(config_, R"({
2068 /* 100ms */
2069 "max_out_of_order_duration": 100000000,
2070 "node": {
2071 "name": "pi2"
2072 },
2073 "logger_node": {
2074 "name": "pi1"
2075 },
2076 "monotonic_start_time": 1000000,
2077 "realtime_start_time": 1000000000000,
2078 "logger_monotonic_start_time": 1000000,
2079 "logger_realtime_start_time": 1000000000000,
2080 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2081 "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
2082 "parts_index": 1,
2083 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2084 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2085 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2086 "boot_uuids": [
2087 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2088 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2089 ""
2090 ]
2091})")) {}
2092
2093 protected:
2094 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
2095 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
2096 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
2097 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
2098};
2099
Austin Schuh48507722021-07-17 17:29:24 -07002100// Tests that we can match timestamps on delivered messages in the presence of
2101// reboots on the node receiving timestamps.
2102TEST_F(RebootTimestampMapperTest, ReadNode0First) {
2103 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2104 {
2105 DetachedBufferWriter writer0a(logfile0_, std::make_unique<DummyEncoder>());
2106 writer0a.QueueSpan(boot0a_.span());
2107 DetachedBufferWriter writer0b(logfile1_, std::make_unique<DummyEncoder>());
2108 writer0b.QueueSpan(boot0b_.span());
2109 DetachedBufferWriter writer1a(logfile2_, std::make_unique<DummyEncoder>());
2110 writer1a.QueueSpan(boot1a_.span());
2111 DetachedBufferWriter writer1b(logfile3_, std::make_unique<DummyEncoder>());
2112 writer1b.QueueSpan(boot1b_.span());
2113
2114 writer0a.QueueSizedFlatbuffer(
2115 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
2116 writer1a.QueueSizedFlatbuffer(MakeTimestampMessage(
2117 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
2118 e + chrono::milliseconds(1001)));
2119
Austin Schuh58646e22021-08-23 23:51:46 -07002120 writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
2121 e + chrono::milliseconds(1000), 0, chrono::seconds(21),
2122 e + chrono::milliseconds(2001)));
2123
Austin Schuh48507722021-07-17 17:29:24 -07002124 writer0b.QueueSizedFlatbuffer(
2125 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
2126 writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
2127 e + chrono::milliseconds(2000), 0, chrono::seconds(20),
2128 e + chrono::milliseconds(2001)));
2129
2130 writer0b.QueueSizedFlatbuffer(
2131 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
2132 writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
2133 e + chrono::milliseconds(3000), 0, chrono::seconds(20),
2134 e + chrono::milliseconds(3001)));
2135 }
2136
Austin Schuh58646e22021-08-23 23:51:46 -07002137 const std::vector<LogFile> parts =
2138 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Austin Schuh48507722021-07-17 17:29:24 -07002139
2140 for (const auto &x : parts) {
2141 LOG(INFO) << x;
2142 }
2143 ASSERT_EQ(parts.size(), 1u);
2144 ASSERT_EQ(parts[0].logger_node, "pi1");
2145
2146 size_t mapper0_count = 0;
2147 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2148 mapper0.set_timestamp_callback(
2149 [&](TimestampedMessage *) { ++mapper0_count; });
2150 size_t mapper1_count = 0;
2151 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2152 mapper1.set_timestamp_callback(
2153 [&](TimestampedMessage *) { ++mapper1_count; });
2154
2155 mapper0.AddPeer(&mapper1);
2156 mapper1.AddPeer(&mapper0);
2157
2158 {
2159 std::deque<TimestampedMessage> output0;
2160
2161 EXPECT_EQ(mapper0_count, 0u);
2162 EXPECT_EQ(mapper1_count, 0u);
2163 ASSERT_TRUE(mapper0.Front() != nullptr);
2164 EXPECT_EQ(mapper0_count, 1u);
2165 EXPECT_EQ(mapper1_count, 0u);
2166 output0.emplace_back(std::move(*mapper0.Front()));
2167 mapper0.PopFront();
2168 EXPECT_TRUE(mapper0.started());
2169 EXPECT_EQ(mapper0_count, 1u);
2170 EXPECT_EQ(mapper1_count, 0u);
2171
2172 ASSERT_TRUE(mapper0.Front() != nullptr);
2173 EXPECT_EQ(mapper0_count, 2u);
2174 EXPECT_EQ(mapper1_count, 0u);
2175 output0.emplace_back(std::move(*mapper0.Front()));
2176 mapper0.PopFront();
2177 EXPECT_TRUE(mapper0.started());
2178
2179 ASSERT_TRUE(mapper0.Front() != nullptr);
2180 output0.emplace_back(std::move(*mapper0.Front()));
2181 mapper0.PopFront();
2182 EXPECT_TRUE(mapper0.started());
2183
2184 EXPECT_EQ(mapper0_count, 3u);
2185 EXPECT_EQ(mapper1_count, 0u);
2186
2187 ASSERT_TRUE(mapper0.Front() == nullptr);
2188
2189 LOG(INFO) << output0[0];
2190 LOG(INFO) << output0[1];
2191 LOG(INFO) << output0[2];
2192
2193 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2194 EXPECT_EQ(output0[0].monotonic_event_time.time,
2195 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002196 EXPECT_EQ(output0[0].queue_index,
2197 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002198 EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
2199 EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002200 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002201
2202 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2203 EXPECT_EQ(output0[1].monotonic_event_time.time,
2204 e + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002205 EXPECT_EQ(output0[1].queue_index,
2206 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002207 EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
2208 EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002209 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002210
2211 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2212 EXPECT_EQ(output0[2].monotonic_event_time.time,
2213 e + chrono::milliseconds(3000));
Austin Schuh58646e22021-08-23 23:51:46 -07002214 EXPECT_EQ(output0[2].queue_index,
2215 (BootQueueIndex{.boot = 0u, .index = 2u}));
Austin Schuh48507722021-07-17 17:29:24 -07002216 EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
2217 EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002218 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002219 }
2220
2221 {
2222 SCOPED_TRACE("Trying node1 now");
2223 std::deque<TimestampedMessage> output1;
2224
2225 EXPECT_EQ(mapper0_count, 3u);
2226 EXPECT_EQ(mapper1_count, 0u);
2227
2228 ASSERT_TRUE(mapper1.Front() != nullptr);
2229 EXPECT_EQ(mapper0_count, 3u);
2230 EXPECT_EQ(mapper1_count, 1u);
2231 output1.emplace_back(std::move(*mapper1.Front()));
2232 mapper1.PopFront();
2233 EXPECT_TRUE(mapper1.started());
2234 EXPECT_EQ(mapper0_count, 3u);
2235 EXPECT_EQ(mapper1_count, 1u);
2236
2237 ASSERT_TRUE(mapper1.Front() != nullptr);
2238 EXPECT_EQ(mapper0_count, 3u);
2239 EXPECT_EQ(mapper1_count, 2u);
2240 output1.emplace_back(std::move(*mapper1.Front()));
2241 mapper1.PopFront();
2242 EXPECT_TRUE(mapper1.started());
2243
2244 ASSERT_TRUE(mapper1.Front() != nullptr);
2245 output1.emplace_back(std::move(*mapper1.Front()));
2246 mapper1.PopFront();
2247 EXPECT_TRUE(mapper1.started());
2248
Austin Schuh58646e22021-08-23 23:51:46 -07002249 ASSERT_TRUE(mapper1.Front() != nullptr);
2250 output1.emplace_back(std::move(*mapper1.Front()));
2251 mapper1.PopFront();
2252 EXPECT_TRUE(mapper1.started());
2253
Austin Schuh48507722021-07-17 17:29:24 -07002254 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002255 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002256
2257 ASSERT_TRUE(mapper1.Front() == nullptr);
2258
2259 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002260 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002261
2262 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2263 EXPECT_EQ(output1[0].monotonic_event_time.time,
2264 e + chrono::seconds(100) + chrono::milliseconds(1000));
2265 EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
2266 EXPECT_EQ(output1[0].monotonic_remote_time.time,
2267 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002268 EXPECT_EQ(output1[0].remote_queue_index,
2269 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002270 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
2271 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
2272 e + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002273 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002274
2275 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2276 EXPECT_EQ(output1[1].monotonic_event_time.time,
2277 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002278 EXPECT_EQ(output1[1].remote_queue_index,
2279 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002280 EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
2281 EXPECT_EQ(output1[1].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002282 e + chrono::milliseconds(1000));
Austin Schuh48507722021-07-17 17:29:24 -07002283 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
2284 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
2285 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002286 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002287
2288 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2289 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002290 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh48507722021-07-17 17:29:24 -07002291 EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
2292 EXPECT_EQ(output1[2].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002293 e + chrono::milliseconds(2000));
2294 EXPECT_EQ(output1[2].remote_queue_index,
2295 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002296 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
2297 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002298 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002299 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002300
Austin Schuh58646e22021-08-23 23:51:46 -07002301 EXPECT_EQ(output1[3].monotonic_event_time.boot, 1u);
2302 EXPECT_EQ(output1[3].monotonic_event_time.time,
2303 e + chrono::seconds(20) + chrono::milliseconds(3000));
2304 EXPECT_EQ(output1[3].monotonic_remote_time.boot, 0u);
2305 EXPECT_EQ(output1[3].monotonic_remote_time.time,
2306 e + chrono::milliseconds(3000));
2307 EXPECT_EQ(output1[3].remote_queue_index,
2308 (BootQueueIndex{.boot = 0u, .index = 2u}));
2309 EXPECT_EQ(output1[3].monotonic_timestamp_time.boot, 0u);
2310 EXPECT_EQ(output1[3].monotonic_timestamp_time.time,
2311 e + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002312 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh58646e22021-08-23 23:51:46 -07002313
Austin Schuh48507722021-07-17 17:29:24 -07002314 LOG(INFO) << output1[0];
2315 LOG(INFO) << output1[1];
2316 LOG(INFO) << output1[2];
Austin Schuh58646e22021-08-23 23:51:46 -07002317 LOG(INFO) << output1[3];
Austin Schuh48507722021-07-17 17:29:24 -07002318 }
2319}
2320
2321TEST_F(RebootTimestampMapperTest, Node2Reboot) {
2322 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2323 {
2324 DetachedBufferWriter writer0a(logfile0_, std::make_unique<DummyEncoder>());
2325 writer0a.QueueSpan(boot0a_.span());
2326 DetachedBufferWriter writer0b(logfile1_, std::make_unique<DummyEncoder>());
2327 writer0b.QueueSpan(boot0b_.span());
2328 DetachedBufferWriter writer1a(logfile2_, std::make_unique<DummyEncoder>());
2329 writer1a.QueueSpan(boot1a_.span());
2330 DetachedBufferWriter writer1b(logfile3_, std::make_unique<DummyEncoder>());
2331 writer1b.QueueSpan(boot1b_.span());
2332
2333 writer1a.QueueSizedFlatbuffer(MakeLogMessage(
2334 e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
2335 writer0a.QueueSizedFlatbuffer(MakeTimestampMessage(
2336 e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
2337 chrono::seconds(-100),
2338 e + chrono::seconds(100) + chrono::milliseconds(1001)));
2339
2340 writer1b.QueueSizedFlatbuffer(MakeLogMessage(
2341 e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
2342 writer0b.QueueSizedFlatbuffer(MakeTimestampMessage(
2343 e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
2344 chrono::seconds(-20),
2345 e + chrono::seconds(20) + chrono::milliseconds(2001)));
2346
2347 writer1b.QueueSizedFlatbuffer(MakeLogMessage(
2348 e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
2349 writer0b.QueueSizedFlatbuffer(MakeTimestampMessage(
2350 e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
2351 chrono::seconds(-20),
2352 e + chrono::seconds(20) + chrono::milliseconds(3001)));
2353 }
2354
2355 const std::vector<LogFile> parts =
2356 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2357
2358 for (const auto &x : parts) {
2359 LOG(INFO) << x;
2360 }
2361 ASSERT_EQ(parts.size(), 1u);
2362 ASSERT_EQ(parts[0].logger_node, "pi1");
2363
2364 size_t mapper0_count = 0;
2365 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2366 mapper0.set_timestamp_callback(
2367 [&](TimestampedMessage *) { ++mapper0_count; });
2368 size_t mapper1_count = 0;
2369 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2370 mapper1.set_timestamp_callback(
2371 [&](TimestampedMessage *) { ++mapper1_count; });
2372
2373 mapper0.AddPeer(&mapper1);
2374 mapper1.AddPeer(&mapper0);
2375
2376 {
2377 std::deque<TimestampedMessage> output0;
2378
2379 EXPECT_EQ(mapper0_count, 0u);
2380 EXPECT_EQ(mapper1_count, 0u);
2381 ASSERT_TRUE(mapper0.Front() != nullptr);
2382 EXPECT_EQ(mapper0_count, 1u);
2383 EXPECT_EQ(mapper1_count, 0u);
2384 output0.emplace_back(std::move(*mapper0.Front()));
2385 mapper0.PopFront();
2386 EXPECT_TRUE(mapper0.started());
2387 EXPECT_EQ(mapper0_count, 1u);
2388 EXPECT_EQ(mapper1_count, 0u);
2389
2390 ASSERT_TRUE(mapper0.Front() != nullptr);
2391 EXPECT_EQ(mapper0_count, 2u);
2392 EXPECT_EQ(mapper1_count, 0u);
2393 output0.emplace_back(std::move(*mapper0.Front()));
2394 mapper0.PopFront();
2395 EXPECT_TRUE(mapper0.started());
2396
2397 ASSERT_TRUE(mapper0.Front() != nullptr);
2398 output0.emplace_back(std::move(*mapper0.Front()));
2399 mapper0.PopFront();
2400 EXPECT_TRUE(mapper0.started());
2401
2402 EXPECT_EQ(mapper0_count, 3u);
2403 EXPECT_EQ(mapper1_count, 0u);
2404
2405 ASSERT_TRUE(mapper0.Front() == nullptr);
2406
2407 LOG(INFO) << output0[0];
2408 LOG(INFO) << output0[1];
2409 LOG(INFO) << output0[2];
2410
2411 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2412 EXPECT_EQ(output0[0].monotonic_event_time.time,
2413 e + chrono::milliseconds(1000));
2414 EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
2415 EXPECT_EQ(output0[0].monotonic_remote_time.time,
2416 e + chrono::seconds(100) + chrono::milliseconds(1000));
2417 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
2418 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
2419 e + chrono::seconds(100) + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002420 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002421
2422 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2423 EXPECT_EQ(output0[1].monotonic_event_time.time,
2424 e + chrono::milliseconds(2000));
2425 EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
2426 EXPECT_EQ(output0[1].monotonic_remote_time.time,
2427 e + chrono::seconds(20) + chrono::milliseconds(2000));
2428 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
2429 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
2430 e + chrono::seconds(20) + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002431 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002432
2433 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2434 EXPECT_EQ(output0[2].monotonic_event_time.time,
2435 e + chrono::milliseconds(3000));
2436 EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
2437 EXPECT_EQ(output0[2].monotonic_remote_time.time,
2438 e + chrono::seconds(20) + chrono::milliseconds(3000));
2439 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
2440 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
2441 e + chrono::seconds(20) + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002442 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002443 }
2444
2445 {
2446 SCOPED_TRACE("Trying node1 now");
2447 std::deque<TimestampedMessage> output1;
2448
2449 EXPECT_EQ(mapper0_count, 3u);
2450 EXPECT_EQ(mapper1_count, 0u);
2451
2452 ASSERT_TRUE(mapper1.Front() != nullptr);
2453 EXPECT_EQ(mapper0_count, 3u);
2454 EXPECT_EQ(mapper1_count, 1u);
2455 output1.emplace_back(std::move(*mapper1.Front()));
2456 mapper1.PopFront();
2457 EXPECT_TRUE(mapper1.started());
2458 EXPECT_EQ(mapper0_count, 3u);
2459 EXPECT_EQ(mapper1_count, 1u);
2460
2461 ASSERT_TRUE(mapper1.Front() != nullptr);
2462 EXPECT_EQ(mapper0_count, 3u);
2463 EXPECT_EQ(mapper1_count, 2u);
2464 output1.emplace_back(std::move(*mapper1.Front()));
2465 mapper1.PopFront();
2466 EXPECT_TRUE(mapper1.started());
2467
2468 ASSERT_TRUE(mapper1.Front() != nullptr);
2469 output1.emplace_back(std::move(*mapper1.Front()));
2470 mapper1.PopFront();
2471 EXPECT_TRUE(mapper1.started());
2472
2473 EXPECT_EQ(mapper0_count, 3u);
2474 EXPECT_EQ(mapper1_count, 3u);
2475
2476 ASSERT_TRUE(mapper1.Front() == nullptr);
2477
2478 EXPECT_EQ(mapper0_count, 3u);
2479 EXPECT_EQ(mapper1_count, 3u);
2480
2481 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2482 EXPECT_EQ(output1[0].monotonic_event_time.time,
2483 e + chrono::seconds(100) + chrono::milliseconds(1000));
2484 EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
2485 EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002486 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002487
2488 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2489 EXPECT_EQ(output1[1].monotonic_event_time.time,
2490 e + chrono::seconds(20) + chrono::milliseconds(2000));
2491 EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
2492 EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002493 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002494
2495 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2496 EXPECT_EQ(output1[2].monotonic_event_time.time,
2497 e + chrono::seconds(20) + chrono::milliseconds(3000));
2498 EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
2499 EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002500 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002501
2502 LOG(INFO) << output1[0];
2503 LOG(INFO) << output1[1];
2504 LOG(INFO) << output1[2];
2505 }
2506}
2507
Austin Schuhc243b422020-10-11 15:35:08 -07002508} // namespace testing
2509} // namespace logger
2510} // namespace aos