blob: b3a9bbd8931a0ea9c028bab5e4d274472453b8d2 [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
Austin Schuhfa30c352022-10-16 11:12:02 -07004#include <random>
Austin Schuhe243aaf2020-10-11 15:46:02 -07005#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07006
Austin Schuhfa30c352022-10-16 11:12:02 -07007#include "absl/strings/escaping.h"
Austin Schuhc41603c2020-10-11 16:17:37 -07008#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -07009#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -080010#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070011#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070012#include "aos/json_to_flatbuffer.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070013#include "aos/testing/path.h"
14#include "aos/testing/random_seed.h"
Austin Schuhc243b422020-10-11 15:35:08 -070015#include "aos/testing/tmpdir.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070016#include "aos/util/file.h"
17#include "external/com_github_google_flatbuffers/src/annotated_binary_text_gen.h"
18#include "external/com_github_google_flatbuffers/src/binary_annotator.h"
19#include "flatbuffers/reflection_generated.h"
Brian Smarttea913d42021-12-10 15:02:38 -080020#include "gflags/gflags.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070021#include "gtest/gtest.h"
Austin Schuhc243b422020-10-11 15:35:08 -070022
23namespace aos {
24namespace logger {
25namespace testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070026namespace chrono = std::chrono;
Austin Schuhf2d0e682022-10-16 14:20:58 -070027using aos::message_bridge::RemoteMessage;
Eric Schmiedebergb38477e2022-12-02 16:08:04 -070028using aos::testing::ArtifactPath;
Austin Schuhc243b422020-10-11 15:35:08 -070029
Austin Schuhd863e6e2022-10-16 15:44:50 -070030// Adapter class to make it easy to test DetachedBufferWriter without adding
31// test only boilerplate to DetachedBufferWriter.
32class TestDetachedBufferWriter : public DetachedBufferWriter {
33 public:
Austin Schuh48d10d62022-10-16 22:19:23 -070034 // Pick a max size that is rather conservative.
35 static constexpr size_t kMaxMessageSize = 128 * 1024;
Austin Schuhd863e6e2022-10-16 15:44:50 -070036 TestDetachedBufferWriter(std::string_view filename)
Austin Schuh48d10d62022-10-16 22:19:23 -070037 : DetachedBufferWriter(filename,
38 std::make_unique<DummyEncoder>(kMaxMessageSize)) {}
Austin Schuhd863e6e2022-10-16 15:44:50 -070039 void WriteSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
40 QueueSpan(absl::Span<const uint8_t>(buffer.data(), buffer.size()));
41 }
Austin Schuh7ef11a42023-02-04 17:15:12 -080042 void QueueSpan(absl::Span<const uint8_t> buffer) {
43 DataEncoder::SpanCopier coppier(buffer);
44 CopyMessage(&coppier, monotonic_clock::now());
45 }
Austin Schuhd863e6e2022-10-16 15:44:50 -070046};
47
Austin Schuhe243aaf2020-10-11 15:46:02 -070048// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070049template <typename T>
50SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
51 const std::string_view data) {
52 flatbuffers::FlatBufferBuilder fbb;
53 fbb.ForceDefaults(true);
54 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
55 return fbb.Release();
56}
57
Austin Schuhe243aaf2020-10-11 15:46:02 -070058// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070059TEST(SpanReaderTest, ReadWrite) {
60 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
61 unlink(logfile.c_str());
62
63 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080064 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070065 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080066 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070067
68 {
Austin Schuhd863e6e2022-10-16 15:44:50 -070069 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080070 writer.QueueSpan(m1.span());
71 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070072 }
73
74 SpanReader reader(logfile);
75
76 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhcf5f6442021-07-06 10:43:28 -070077 EXPECT_EQ(reader.PeekMessage(), m1.span());
78 EXPECT_EQ(reader.PeekMessage(), m1.span());
Austin Schuhadd6eb32020-11-09 21:24:26 -080079 EXPECT_EQ(reader.ReadMessage(), m1.span());
80 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhcf5f6442021-07-06 10:43:28 -070081 EXPECT_EQ(reader.PeekMessage(), absl::Span<const uint8_t>());
Austin Schuhc243b422020-10-11 15:35:08 -070082 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
83}
84
Austin Schuhe243aaf2020-10-11 15:46:02 -070085// Tests that we can actually parse the resulting messages at a basic level
86// through MessageReader.
87TEST(MessageReaderTest, ReadWrite) {
88 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
89 unlink(logfile.c_str());
90
91 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
92 JsonToSizedFlatbuffer<LogFileHeader>(
93 R"({ "max_out_of_order_duration": 100000000 })");
94 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
95 JsonToSizedFlatbuffer<MessageHeader>(
96 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
97 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
98 JsonToSizedFlatbuffer<MessageHeader>(
99 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
100
101 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700102 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800103 writer.QueueSpan(config.span());
104 writer.QueueSpan(m1.span());
105 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -0700106 }
107
108 MessageReader reader(logfile);
109
110 EXPECT_EQ(reader.filename(), logfile);
111
112 EXPECT_EQ(
113 reader.max_out_of_order_duration(),
114 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
115 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
116 EXPECT_TRUE(reader.ReadMessage());
117 EXPECT_EQ(reader.newest_timestamp(),
118 monotonic_clock::time_point(chrono::nanoseconds(1)));
119 EXPECT_TRUE(reader.ReadMessage());
120 EXPECT_EQ(reader.newest_timestamp(),
121 monotonic_clock::time_point(chrono::nanoseconds(2)));
122 EXPECT_FALSE(reader.ReadMessage());
123}
124
Austin Schuh32f68492020-11-08 21:45:51 -0800125// Tests that we explode when messages are too far out of order.
126TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
127 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
128 unlink(logfile0.c_str());
129
130 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
131 JsonToSizedFlatbuffer<LogFileHeader>(
132 R"({
133 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800134 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800135 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
136 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
137 "parts_index": 0
138})");
139
140 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
141 JsonToSizedFlatbuffer<MessageHeader>(
142 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
143 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
144 JsonToSizedFlatbuffer<MessageHeader>(
145 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
146 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
147 JsonToSizedFlatbuffer<MessageHeader>(
148 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
149
150 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700151 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800152 writer.QueueSpan(config0.span());
153 writer.QueueSpan(m1.span());
154 writer.QueueSpan(m2.span());
155 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800156 }
157
158 const std::vector<LogFile> parts = SortParts({logfile0});
159
160 PartsMessageReader reader(parts[0].parts[0]);
161
162 EXPECT_TRUE(reader.ReadMessage());
163 EXPECT_TRUE(reader.ReadMessage());
164 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
165}
166
Austin Schuhc41603c2020-10-11 16:17:37 -0700167// Tests that we can transparently re-assemble part files with a
168// PartsMessageReader.
169TEST(PartsMessageReaderTest, ReadWrite) {
170 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
171 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
172 unlink(logfile0.c_str());
173 unlink(logfile1.c_str());
174
175 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
176 JsonToSizedFlatbuffer<LogFileHeader>(
177 R"({
178 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800179 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700180 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
181 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
182 "parts_index": 0
183})");
184 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
185 JsonToSizedFlatbuffer<LogFileHeader>(
186 R"({
187 "max_out_of_order_duration": 200000000,
188 "monotonic_start_time": 0,
189 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800190 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700191 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
192 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
193 "parts_index": 1
194})");
195
196 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
197 JsonToSizedFlatbuffer<MessageHeader>(
198 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
199 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
200 JsonToSizedFlatbuffer<MessageHeader>(
201 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
202
203 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700204 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800205 writer.QueueSpan(config0.span());
206 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700207 }
208 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700209 TestDetachedBufferWriter writer(logfile1);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800210 writer.QueueSpan(config1.span());
211 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700212 }
213
214 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
215
216 PartsMessageReader reader(parts[0].parts[0]);
217
218 EXPECT_EQ(reader.filename(), logfile0);
219
220 // Confirm that the timestamps track, and the filename also updates.
221 // Read the first message.
222 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
223 EXPECT_EQ(
224 reader.max_out_of_order_duration(),
225 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
226 EXPECT_TRUE(reader.ReadMessage());
227 EXPECT_EQ(reader.filename(), logfile0);
228 EXPECT_EQ(reader.newest_timestamp(),
229 monotonic_clock::time_point(chrono::nanoseconds(1)));
230 EXPECT_EQ(
231 reader.max_out_of_order_duration(),
232 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
233
234 // Read the second message.
235 EXPECT_TRUE(reader.ReadMessage());
236 EXPECT_EQ(reader.filename(), logfile1);
237 EXPECT_EQ(reader.newest_timestamp(),
238 monotonic_clock::time_point(chrono::nanoseconds(2)));
239 EXPECT_EQ(
240 reader.max_out_of_order_duration(),
241 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
242
243 // And then confirm that reading again returns no message.
244 EXPECT_FALSE(reader.ReadMessage());
245 EXPECT_EQ(reader.filename(), logfile1);
246 EXPECT_EQ(
247 reader.max_out_of_order_duration(),
248 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800249 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Austin Schuhc41603c2020-10-11 16:17:37 -0700250}
Austin Schuh32f68492020-11-08 21:45:51 -0800251
Austin Schuh1be0ce42020-11-29 22:43:26 -0800252// Tests that Message's operator < works as expected.
253TEST(MessageTest, Sorting) {
254 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
255
256 Message m1{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700257 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700258 .timestamp =
259 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
Austin Schuh48507722021-07-17 17:29:24 -0700260 .monotonic_remote_boot = 0xffffff,
261 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700262 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800263 Message m2{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700264 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700265 .timestamp =
266 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
Austin Schuh48507722021-07-17 17:29:24 -0700267 .monotonic_remote_boot = 0xffffff,
268 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700269 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800270
271 EXPECT_LT(m1, m2);
272 EXPECT_GE(m2, m1);
273
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700274 m1.timestamp.time = e;
275 m2.timestamp.time = e;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800276
277 m1.channel_index = 1;
278 m2.channel_index = 2;
279
280 EXPECT_LT(m1, m2);
281 EXPECT_GE(m2, m1);
282
283 m1.channel_index = 0;
284 m2.channel_index = 0;
Austin Schuh58646e22021-08-23 23:51:46 -0700285 m1.queue_index.index = 0u;
286 m2.queue_index.index = 1u;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800287
288 EXPECT_LT(m1, m2);
289 EXPECT_GE(m2, m1);
290}
291
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800292aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
293 const aos::FlatbufferDetachedBuffer<Configuration> &config,
294 const std::string_view json) {
295 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700296 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800297 flatbuffers::Offset<Configuration> config_offset =
298 aos::CopyFlatBuffer(config, &fbb);
299 LogFileHeader::Builder header_builder(fbb);
300 header_builder.add_configuration(config_offset);
301 fbb.Finish(header_builder.Finish());
302 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
303
304 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
305 JsonToFlatbuffer<LogFileHeader>(json));
306 CHECK(header_updates.Verify());
307 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700308 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800309 fbb2.FinishSizePrefixed(
310 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
311 return fbb2.Release();
312}
313
314class SortingElementTest : public ::testing::Test {
315 public:
316 SortingElementTest()
317 : config_(JsonToFlatbuffer<Configuration>(
318 R"({
319 "channels": [
320 {
321 "name": "/a",
322 "type": "aos.logger.testing.TestMessage",
323 "source_node": "pi1",
324 "destination_nodes": [
325 {
326 "name": "pi2"
327 },
328 {
329 "name": "pi3"
330 }
331 ]
332 },
333 {
334 "name": "/b",
335 "type": "aos.logger.testing.TestMessage",
336 "source_node": "pi1"
337 },
338 {
339 "name": "/c",
340 "type": "aos.logger.testing.TestMessage",
341 "source_node": "pi1"
Austin Schuh48507722021-07-17 17:29:24 -0700342 },
343 {
344 "name": "/d",
345 "type": "aos.logger.testing.TestMessage",
346 "source_node": "pi2",
347 "destination_nodes": [
348 {
349 "name": "pi1"
350 }
351 ]
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800352 }
353 ],
354 "nodes": [
355 {
356 "name": "pi1"
357 },
358 {
359 "name": "pi2"
360 },
361 {
362 "name": "pi3"
363 }
364 ]
365}
366)")),
367 config0_(MakeHeader(config_, R"({
368 /* 100ms */
369 "max_out_of_order_duration": 100000000,
370 "node": {
371 "name": "pi1"
372 },
373 "logger_node": {
374 "name": "pi1"
375 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800376 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800377 "realtime_start_time": 1000000000000,
378 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700379 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
380 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
381 "boot_uuids": [
382 "1d782c63-b3c7-466e-bea9-a01308b43333",
383 "",
384 ""
385 ],
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800386 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
387 "parts_index": 0
388})")),
389 config1_(MakeHeader(config_,
390 R"({
391 /* 100ms */
392 "max_out_of_order_duration": 100000000,
393 "node": {
394 "name": "pi1"
395 },
396 "logger_node": {
397 "name": "pi1"
398 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800399 "monotonic_start_time": 1000000,
400 "realtime_start_time": 1000000000000,
401 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700402 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
403 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
404 "boot_uuids": [
405 "1d782c63-b3c7-466e-bea9-a01308b43333",
406 "",
407 ""
408 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800409 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
410 "parts_index": 0
411})")),
412 config2_(MakeHeader(config_,
413 R"({
414 /* 100ms */
415 "max_out_of_order_duration": 100000000,
416 "node": {
417 "name": "pi2"
418 },
419 "logger_node": {
420 "name": "pi2"
421 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800422 "monotonic_start_time": 0,
423 "realtime_start_time": 1000000000000,
Austin Schuh48507722021-07-17 17:29:24 -0700424 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
425 "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
426 "boot_uuids": [
427 "",
428 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
429 ""
430 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800431 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
432 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
433 "parts_index": 0
434})")),
435 config3_(MakeHeader(config_,
436 R"({
437 /* 100ms */
438 "max_out_of_order_duration": 100000000,
439 "node": {
440 "name": "pi1"
441 },
442 "logger_node": {
443 "name": "pi1"
444 },
445 "monotonic_start_time": 2000000,
446 "realtime_start_time": 1000000000,
447 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
Austin Schuh48507722021-07-17 17:29:24 -0700448 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
449 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
450 "boot_uuids": [
451 "1d782c63-b3c7-466e-bea9-a01308b43333",
452 "",
453 ""
454 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800455 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800456 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800457})")),
458 config4_(MakeHeader(config_,
459 R"({
460 /* 100ms */
461 "max_out_of_order_duration": 100000000,
462 "node": {
463 "name": "pi2"
464 },
465 "logger_node": {
466 "name": "pi1"
467 },
468 "monotonic_start_time": 2000000,
469 "realtime_start_time": 1000000000,
470 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
471 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
Austin Schuh48507722021-07-17 17:29:24 -0700472 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
473 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
474 "boot_uuids": [
475 "1d782c63-b3c7-466e-bea9-a01308b43333",
476 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
477 ""
478 ],
Austin Schuh8bf1e632021-01-02 22:41:04 -0800479 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800480})")) {
481 unlink(logfile0_.c_str());
482 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800483 unlink(logfile2_.c_str());
Austin Schuh48507722021-07-17 17:29:24 -0700484 unlink(logfile3_.c_str());
Austin Schuh921b2562021-07-31 19:37:03 -0700485 queue_index_.resize(config_.message().channels()->size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800486 }
487
488 protected:
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800489 flatbuffers::DetachedBuffer MakeLogMessage(
490 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
491 int value) {
492 flatbuffers::FlatBufferBuilder message_fbb;
493 message_fbb.ForceDefaults(true);
494 TestMessage::Builder test_message_builder(message_fbb);
495 test_message_builder.add_value(value);
496 message_fbb.Finish(test_message_builder.Finish());
497
498 aos::Context context;
499 context.monotonic_event_time = monotonic_now;
500 context.realtime_event_time = aos::realtime_clock::epoch() +
501 chrono::seconds(1000) +
502 monotonic_now.time_since_epoch();
Austin Schuh921b2562021-07-31 19:37:03 -0700503 CHECK_LT(static_cast<size_t>(channel_index), queue_index_.size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800504 context.queue_index = queue_index_[channel_index];
505 context.size = message_fbb.GetSize();
506 context.data = message_fbb.GetBufferPointer();
507
508 ++queue_index_[channel_index];
509
510 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh53e3a812021-11-03 16:57:23 -0700511 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800512 fbb.FinishSizePrefixed(
513 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
514
515 return fbb.Release();
516 }
517
518 flatbuffers::DetachedBuffer MakeTimestampMessage(
519 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800520 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
521 monotonic_clock::time_point monotonic_timestamp_time =
522 monotonic_clock::min_time) {
523 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800524 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800525
526 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800527 fbb.ForceDefaults(true);
528
529 logger::MessageHeader::Builder message_header_builder(fbb);
530
531 message_header_builder.add_channel_index(channel_index);
532
533 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
534 100);
535 message_header_builder.add_monotonic_sent_time(
536 monotonic_sent_time.time_since_epoch().count());
537 message_header_builder.add_realtime_sent_time(
538 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
539 monotonic_sent_time.time_since_epoch())
540 .time_since_epoch()
541 .count());
542
543 message_header_builder.add_monotonic_remote_time(
544 sender_monotonic_now.time_since_epoch().count());
545 message_header_builder.add_realtime_remote_time(
546 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
547 sender_monotonic_now.time_since_epoch())
548 .time_since_epoch()
549 .count());
550 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
551 1);
552
553 if (monotonic_timestamp_time != monotonic_clock::min_time) {
554 message_header_builder.add_monotonic_timestamp_time(
555 monotonic_timestamp_time.time_since_epoch().count());
556 }
557
558 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800559 LOG(INFO) << aos::FlatbufferToJson(
560 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
561 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
562
563 return fbb.Release();
564 }
565
566 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
567 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800568 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh48507722021-07-17 17:29:24 -0700569 const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800570
571 const aos::FlatbufferDetachedBuffer<Configuration> config_;
572 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
573 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800574 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
575 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800576 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800577
578 std::vector<uint32_t> queue_index_;
579};
580
581using LogPartsSorterTest = SortingElementTest;
582using LogPartsSorterDeathTest = LogPartsSorterTest;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800583using NodeMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800584using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800585
586// Tests that we can pull messages out of a log sorted in order.
587TEST_F(LogPartsSorterTest, Pull) {
588 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
589 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700590 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800591 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700592 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800593 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700594 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800595 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700596 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800597 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700598 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800599 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
600 }
601
602 const std::vector<LogFile> parts = SortParts({logfile0_});
603
604 LogPartsSorter parts_sorter(parts[0].parts[0]);
605
606 // Confirm we aren't sorted until any time until the message is popped.
607 // Peeking shouldn't change the sorted until time.
608 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
609
610 std::deque<Message> output;
611
612 ASSERT_TRUE(parts_sorter.Front() != nullptr);
613 output.emplace_back(std::move(*parts_sorter.Front()));
614 parts_sorter.PopFront();
615 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
616
617 ASSERT_TRUE(parts_sorter.Front() != nullptr);
618 output.emplace_back(std::move(*parts_sorter.Front()));
619 parts_sorter.PopFront();
620 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
621
622 ASSERT_TRUE(parts_sorter.Front() != nullptr);
623 output.emplace_back(std::move(*parts_sorter.Front()));
624 parts_sorter.PopFront();
625 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
626
627 ASSERT_TRUE(parts_sorter.Front() != nullptr);
628 output.emplace_back(std::move(*parts_sorter.Front()));
629 parts_sorter.PopFront();
630 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
631
632 ASSERT_TRUE(parts_sorter.Front() == nullptr);
633
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700634 EXPECT_EQ(output[0].timestamp.boot, 0);
635 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
636 EXPECT_EQ(output[1].timestamp.boot, 0);
637 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1000));
638 EXPECT_EQ(output[2].timestamp.boot, 0);
639 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1901));
640 EXPECT_EQ(output[3].timestamp.boot, 0);
641 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800642}
643
Austin Schuhb000de62020-12-03 22:00:40 -0800644// Tests that we can pull messages out of a log sorted in order.
645TEST_F(LogPartsSorterTest, WayBeforeStart) {
646 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
647 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700648 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhb000de62020-12-03 22:00:40 -0800649 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700650 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800651 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700652 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800653 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700654 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800655 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700656 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800657 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700658 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800659 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
660 }
661
662 const std::vector<LogFile> parts = SortParts({logfile0_});
663
664 LogPartsSorter parts_sorter(parts[0].parts[0]);
665
666 // Confirm we aren't sorted until any time until the message is popped.
667 // Peeking shouldn't change the sorted until time.
668 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
669
670 std::deque<Message> output;
671
672 for (monotonic_clock::time_point t :
673 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
674 e + chrono::milliseconds(1900), monotonic_clock::max_time,
675 monotonic_clock::max_time}) {
676 ASSERT_TRUE(parts_sorter.Front() != nullptr);
677 output.emplace_back(std::move(*parts_sorter.Front()));
678 parts_sorter.PopFront();
679 EXPECT_EQ(parts_sorter.sorted_until(), t);
680 }
681
682 ASSERT_TRUE(parts_sorter.Front() == nullptr);
683
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700684 EXPECT_EQ(output[0].timestamp.boot, 0u);
685 EXPECT_EQ(output[0].timestamp.time, e - chrono::milliseconds(1000));
686 EXPECT_EQ(output[1].timestamp.boot, 0u);
687 EXPECT_EQ(output[1].timestamp.time, e - chrono::milliseconds(500));
688 EXPECT_EQ(output[2].timestamp.boot, 0u);
689 EXPECT_EQ(output[2].timestamp.time, e - chrono::milliseconds(10));
690 EXPECT_EQ(output[3].timestamp.boot, 0u);
691 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(1901));
692 EXPECT_EQ(output[4].timestamp.boot, 0u);
693 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuhb000de62020-12-03 22:00:40 -0800694}
695
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800696// Tests that messages too far out of order trigger death.
697TEST_F(LogPartsSorterDeathTest, Pull) {
698 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
699 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700700 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800701 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700702 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800703 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700704 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800705 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700706 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800707 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
708 // The following message is too far out of order and will trigger the CHECK.
Austin Schuhd863e6e2022-10-16 15:44:50 -0700709 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800710 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
711 }
712
713 const std::vector<LogFile> parts = SortParts({logfile0_});
714
715 LogPartsSorter parts_sorter(parts[0].parts[0]);
716
717 // Confirm we aren't sorted until any time until the message is popped.
718 // Peeking shouldn't change the sorted until time.
719 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
720 std::deque<Message> output;
721
722 ASSERT_TRUE(parts_sorter.Front() != nullptr);
723 parts_sorter.PopFront();
724 ASSERT_TRUE(parts_sorter.Front() != nullptr);
725 ASSERT_TRUE(parts_sorter.Front() != nullptr);
726 parts_sorter.PopFront();
727
Austin Schuh58646e22021-08-23 23:51:46 -0700728 EXPECT_DEATH({ parts_sorter.Front(); },
729 "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800730}
731
Austin Schuh8f52ed52020-11-30 23:12:39 -0800732// Tests that we can merge data from 2 separate files, including duplicate data.
733TEST_F(NodeMergerTest, TwoFileMerger) {
734 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
735 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700736 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800737 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700738 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800739 writer1.QueueSpan(config1_.span());
740
Austin Schuhd863e6e2022-10-16 15:44:50 -0700741 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800742 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700743 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800744 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
745
Austin Schuhd863e6e2022-10-16 15:44:50 -0700746 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800747 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700748 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800749 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
750
751 // Make a duplicate!
752 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
753 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
754 writer0.QueueSpan(msg.span());
755 writer1.QueueSpan(msg.span());
756
Austin Schuhd863e6e2022-10-16 15:44:50 -0700757 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800758 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
759 }
760
761 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Austin Schuhd2f96102020-12-01 20:27:29 -0800762 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800763
Austin Schuhd2f96102020-12-01 20:27:29 -0800764 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800765
766 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
767
768 std::deque<Message> output;
769
770 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
771 ASSERT_TRUE(merger.Front() != nullptr);
772 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
773
774 output.emplace_back(std::move(*merger.Front()));
775 merger.PopFront();
776 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
777
778 ASSERT_TRUE(merger.Front() != nullptr);
779 output.emplace_back(std::move(*merger.Front()));
780 merger.PopFront();
781 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
782
783 ASSERT_TRUE(merger.Front() != nullptr);
784 output.emplace_back(std::move(*merger.Front()));
785 merger.PopFront();
786 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
787
788 ASSERT_TRUE(merger.Front() != nullptr);
789 output.emplace_back(std::move(*merger.Front()));
790 merger.PopFront();
791 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
792
793 ASSERT_TRUE(merger.Front() != nullptr);
794 output.emplace_back(std::move(*merger.Front()));
795 merger.PopFront();
796 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
797
798 ASSERT_TRUE(merger.Front() != nullptr);
799 output.emplace_back(std::move(*merger.Front()));
800 merger.PopFront();
801 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
802
803 ASSERT_TRUE(merger.Front() == nullptr);
804
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700805 EXPECT_EQ(output[0].timestamp.boot, 0u);
806 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
807 EXPECT_EQ(output[1].timestamp.boot, 0u);
808 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1001));
809 EXPECT_EQ(output[2].timestamp.boot, 0u);
810 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1002));
811 EXPECT_EQ(output[3].timestamp.boot, 0u);
812 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
813 EXPECT_EQ(output[4].timestamp.boot, 0u);
814 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(3000));
815 EXPECT_EQ(output[5].timestamp.boot, 0u);
816 EXPECT_EQ(output[5].timestamp.time, e + chrono::milliseconds(3002));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800817}
818
Austin Schuh8bf1e632021-01-02 22:41:04 -0800819// Tests that we can merge timestamps with various combinations of
820// monotonic_timestamp_time.
821TEST_F(NodeMergerTest, TwoFileTimestampMerger) {
822 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
823 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700824 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800825 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700826 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800827 writer1.QueueSpan(config1_.span());
828
829 // Neither has it.
830 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700831 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800832 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700833 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800834 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
835
836 // First only has it.
837 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700838 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800839 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
840 e + chrono::nanoseconds(971)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700841 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800842 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
843
844 // Second only has it.
845 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700846 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800847 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700848 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800849 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
850 e + chrono::nanoseconds(972)));
851
852 // Both have it.
853 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700854 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800855 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
856 e + chrono::nanoseconds(973)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700857 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800858 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
859 e + chrono::nanoseconds(973)));
860 }
861
862 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
863 ASSERT_EQ(parts.size(), 1u);
864
865 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
866
867 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
868
869 std::deque<Message> output;
870
871 for (int i = 0; i < 4; ++i) {
872 ASSERT_TRUE(merger.Front() != nullptr);
873 output.emplace_back(std::move(*merger.Front()));
874 merger.PopFront();
875 }
876 ASSERT_TRUE(merger.Front() == nullptr);
877
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700878 EXPECT_EQ(output[0].timestamp.boot, 0u);
879 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700880 EXPECT_FALSE(output[0].data->has_monotonic_timestamp_time);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700881
882 EXPECT_EQ(output[1].timestamp.boot, 0u);
883 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700884 EXPECT_TRUE(output[1].data->has_monotonic_timestamp_time);
885 EXPECT_EQ(output[1].data->monotonic_timestamp_time,
886 monotonic_clock::time_point(std::chrono::nanoseconds(971)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700887
888 EXPECT_EQ(output[2].timestamp.boot, 0u);
889 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700890 EXPECT_TRUE(output[2].data->has_monotonic_timestamp_time);
891 EXPECT_EQ(output[2].data->monotonic_timestamp_time,
892 monotonic_clock::time_point(std::chrono::nanoseconds(972)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700893
894 EXPECT_EQ(output[3].timestamp.boot, 0u);
895 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700896 EXPECT_TRUE(output[3].data->has_monotonic_timestamp_time);
897 EXPECT_EQ(output[3].data->monotonic_timestamp_time,
898 monotonic_clock::time_point(std::chrono::nanoseconds(973)));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800899}
900
Austin Schuhd2f96102020-12-01 20:27:29 -0800901// Tests that we can match timestamps on delivered messages.
902TEST_F(TimestampMapperTest, ReadNode0First) {
903 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
904 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700905 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800906 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700907 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800908 writer1.QueueSpan(config2_.span());
909
Austin Schuhd863e6e2022-10-16 15:44:50 -0700910 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800911 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700912 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800913 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
914
Austin Schuhd863e6e2022-10-16 15:44:50 -0700915 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800916 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700917 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800918 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
919
Austin Schuhd863e6e2022-10-16 15:44:50 -0700920 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800921 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700922 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800923 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
924 }
925
926 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
927
928 ASSERT_EQ(parts[0].logger_node, "pi1");
929 ASSERT_EQ(parts[1].logger_node, "pi2");
930
Austin Schuh79b30942021-01-24 22:32:21 -0800931 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800932 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -0800933 mapper0.set_timestamp_callback(
934 [&](TimestampedMessage *) { ++mapper0_count; });
935 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800936 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -0800937 mapper1.set_timestamp_callback(
938 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -0800939
940 mapper0.AddPeer(&mapper1);
941 mapper1.AddPeer(&mapper0);
942
943 {
944 std::deque<TimestampedMessage> output0;
945
Austin Schuh79b30942021-01-24 22:32:21 -0800946 EXPECT_EQ(mapper0_count, 0u);
947 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800948 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800949 EXPECT_EQ(mapper0_count, 1u);
950 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800951 output0.emplace_back(std::move(*mapper0.Front()));
952 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700953 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800954 EXPECT_EQ(mapper0_count, 1u);
955 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800956
957 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800958 EXPECT_EQ(mapper0_count, 2u);
959 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800960 output0.emplace_back(std::move(*mapper0.Front()));
961 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700962 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800963
964 ASSERT_TRUE(mapper0.Front() != nullptr);
965 output0.emplace_back(std::move(*mapper0.Front()));
966 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700967 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800968
Austin Schuh79b30942021-01-24 22:32:21 -0800969 EXPECT_EQ(mapper0_count, 3u);
970 EXPECT_EQ(mapper1_count, 0u);
971
Austin Schuhd2f96102020-12-01 20:27:29 -0800972 ASSERT_TRUE(mapper0.Front() == nullptr);
973
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700974 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
975 EXPECT_EQ(output0[0].monotonic_event_time.time,
976 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700977 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700978
979 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
980 EXPECT_EQ(output0[1].monotonic_event_time.time,
981 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700982 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700983
984 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
985 EXPECT_EQ(output0[2].monotonic_event_time.time,
986 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700987 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -0800988 }
989
990 {
991 SCOPED_TRACE("Trying node1 now");
992 std::deque<TimestampedMessage> output1;
993
Austin Schuh79b30942021-01-24 22:32:21 -0800994 EXPECT_EQ(mapper0_count, 3u);
995 EXPECT_EQ(mapper1_count, 0u);
996
Austin Schuhd2f96102020-12-01 20:27:29 -0800997 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800998 EXPECT_EQ(mapper0_count, 3u);
999 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001000 output1.emplace_back(std::move(*mapper1.Front()));
1001 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001002 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -08001003 EXPECT_EQ(mapper0_count, 3u);
1004 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001005
1006 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001007 EXPECT_EQ(mapper0_count, 3u);
1008 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001009 output1.emplace_back(std::move(*mapper1.Front()));
1010 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001011 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001012
1013 ASSERT_TRUE(mapper1.Front() != nullptr);
1014 output1.emplace_back(std::move(*mapper1.Front()));
1015 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001016 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001017
Austin Schuh79b30942021-01-24 22:32:21 -08001018 EXPECT_EQ(mapper0_count, 3u);
1019 EXPECT_EQ(mapper1_count, 3u);
1020
Austin Schuhd2f96102020-12-01 20:27:29 -08001021 ASSERT_TRUE(mapper1.Front() == nullptr);
1022
Austin Schuh79b30942021-01-24 22:32:21 -08001023 EXPECT_EQ(mapper0_count, 3u);
1024 EXPECT_EQ(mapper1_count, 3u);
1025
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001026 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1027 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001028 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001029 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001030
1031 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1032 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001033 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001034 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001035
1036 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1037 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001038 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001039 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001040 }
1041}
1042
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001043// Tests that we filter messages using the channel filter callback
1044TEST_F(TimestampMapperTest, ReplayChannelsCallbackTest) {
1045 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1046 {
1047 TestDetachedBufferWriter writer0(logfile0_);
1048 writer0.QueueSpan(config0_.span());
1049 TestDetachedBufferWriter writer1(logfile1_);
1050 writer1.QueueSpan(config2_.span());
1051
1052 writer0.WriteSizedFlatbuffer(
1053 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1054 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1055 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1056
1057 writer0.WriteSizedFlatbuffer(
1058 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1059 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1060 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1061
1062 writer0.WriteSizedFlatbuffer(
1063 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1064 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1065 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1066 }
1067
1068 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1069
1070 ASSERT_EQ(parts[0].logger_node, "pi1");
1071 ASSERT_EQ(parts[1].logger_node, "pi2");
1072
1073 // mapper0 will not provide any messages while mapper1 will provide all
1074 // messages due to the channel filter callbacks used
1075 size_t mapper0_count = 0;
1076 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1077 mapper0.set_timestamp_callback(
1078 [&](TimestampedMessage *) { ++mapper0_count; });
1079 mapper0.set_replay_channels_callback(
1080 [&](const TimestampedMessage &) -> bool { return mapper0_count != 2; });
1081 size_t mapper1_count = 0;
1082 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1083 mapper1.set_timestamp_callback(
1084 [&](TimestampedMessage *) { ++mapper1_count; });
1085 mapper1.set_replay_channels_callback(
1086 [&](const TimestampedMessage &) -> bool { return mapper1_count != 2; });
1087
1088 mapper0.AddPeer(&mapper1);
1089 mapper1.AddPeer(&mapper0);
1090
1091 {
1092 std::deque<TimestampedMessage> output0;
1093
1094 EXPECT_EQ(mapper0_count, 0u);
1095 EXPECT_EQ(mapper1_count, 0u);
1096
1097 ASSERT_TRUE(mapper0.Front() != nullptr);
1098 EXPECT_EQ(mapper0_count, 1u);
1099 EXPECT_EQ(mapper1_count, 0u);
1100 output0.emplace_back(std::move(*mapper0.Front()));
1101 mapper0.PopFront();
1102
1103 EXPECT_TRUE(mapper0.started());
1104 EXPECT_EQ(mapper0_count, 1u);
1105 EXPECT_EQ(mapper1_count, 0u);
1106
1107 // mapper0_count is now at 3 since the second message is not queued, but
1108 // timestamp_callback needs to be called everytime even if Front() does not
1109 // provide a message due to the replay_channels_callback.
1110 ASSERT_TRUE(mapper0.Front() != nullptr);
1111 EXPECT_EQ(mapper0_count, 3u);
1112 EXPECT_EQ(mapper1_count, 0u);
1113 output0.emplace_back(std::move(*mapper0.Front()));
1114 mapper0.PopFront();
1115
1116 EXPECT_TRUE(mapper0.started());
1117 EXPECT_EQ(mapper0_count, 3u);
1118 EXPECT_EQ(mapper1_count, 0u);
1119
1120 ASSERT_TRUE(mapper0.Front() == nullptr);
1121 EXPECT_TRUE(mapper0.started());
1122
1123 EXPECT_EQ(mapper0_count, 3u);
1124 EXPECT_EQ(mapper1_count, 0u);
1125
1126 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1127 EXPECT_EQ(output0[0].monotonic_event_time.time,
1128 e + chrono::milliseconds(1000));
1129 EXPECT_TRUE(output0[0].data != nullptr);
1130
1131 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1132 EXPECT_EQ(output0[1].monotonic_event_time.time,
1133 e + chrono::milliseconds(3000));
1134 EXPECT_TRUE(output0[1].data != nullptr);
1135 }
1136
1137 {
1138 SCOPED_TRACE("Trying node1 now");
1139 std::deque<TimestampedMessage> output1;
1140
1141 EXPECT_EQ(mapper0_count, 3u);
1142 EXPECT_EQ(mapper1_count, 0u);
1143
1144 ASSERT_TRUE(mapper1.Front() != nullptr);
1145 EXPECT_EQ(mapper0_count, 3u);
1146 EXPECT_EQ(mapper1_count, 1u);
1147 output1.emplace_back(std::move(*mapper1.Front()));
1148 mapper1.PopFront();
1149 EXPECT_TRUE(mapper1.started());
1150 EXPECT_EQ(mapper0_count, 3u);
1151 EXPECT_EQ(mapper1_count, 1u);
1152
1153 // mapper1_count is now at 3 since the second message is not queued, but
1154 // timestamp_callback needs to be called everytime even if Front() does not
1155 // provide a message due to the replay_channels_callback.
1156 ASSERT_TRUE(mapper1.Front() != nullptr);
1157 output1.emplace_back(std::move(*mapper1.Front()));
1158 mapper1.PopFront();
1159 EXPECT_TRUE(mapper1.started());
1160
1161 EXPECT_EQ(mapper0_count, 3u);
1162 EXPECT_EQ(mapper1_count, 3u);
1163
1164 ASSERT_TRUE(mapper1.Front() == nullptr);
1165
1166 EXPECT_EQ(mapper0_count, 3u);
1167 EXPECT_EQ(mapper1_count, 3u);
1168
1169 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1170 EXPECT_EQ(output1[0].monotonic_event_time.time,
1171 e + chrono::seconds(100) + chrono::milliseconds(1000));
1172 EXPECT_TRUE(output1[0].data != nullptr);
1173
1174 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1175 EXPECT_EQ(output1[1].monotonic_event_time.time,
1176 e + chrono::seconds(100) + chrono::milliseconds(3000));
1177 EXPECT_TRUE(output1[1].data != nullptr);
1178 }
1179}
Austin Schuh8bf1e632021-01-02 22:41:04 -08001180// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
1181// returned.
1182TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
1183 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1184 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001185 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001186 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001187 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001188 writer1.QueueSpan(config4_.span());
1189
Austin Schuhd863e6e2022-10-16 15:44:50 -07001190 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001191 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001192 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001193 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
1194 e + chrono::nanoseconds(971)));
1195
Austin Schuhd863e6e2022-10-16 15:44:50 -07001196 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001197 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001198 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001199 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
1200 e + chrono::nanoseconds(5458)));
1201
Austin Schuhd863e6e2022-10-16 15:44:50 -07001202 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001203 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001204 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001205 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1206 }
1207
1208 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1209
1210 for (const auto &p : parts) {
1211 LOG(INFO) << p;
1212 }
1213
1214 ASSERT_EQ(parts.size(), 1u);
1215
Austin Schuh79b30942021-01-24 22:32:21 -08001216 size_t mapper0_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001217 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001218 mapper0.set_timestamp_callback(
1219 [&](TimestampedMessage *) { ++mapper0_count; });
1220 size_t mapper1_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001221 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001222 mapper1.set_timestamp_callback(
1223 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -08001224
1225 mapper0.AddPeer(&mapper1);
1226 mapper1.AddPeer(&mapper0);
1227
1228 {
1229 std::deque<TimestampedMessage> output0;
1230
1231 for (int i = 0; i < 3; ++i) {
1232 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
1233 output0.emplace_back(std::move(*mapper0.Front()));
1234 mapper0.PopFront();
1235 }
1236
1237 ASSERT_TRUE(mapper0.Front() == nullptr);
1238
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001239 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1240 EXPECT_EQ(output0[0].monotonic_event_time.time,
1241 e + chrono::milliseconds(1000));
1242 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
1243 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
1244 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001245 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001246
1247 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1248 EXPECT_EQ(output0[1].monotonic_event_time.time,
1249 e + chrono::milliseconds(2000));
1250 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
1251 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
1252 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001253 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001254
1255 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1256 EXPECT_EQ(output0[2].monotonic_event_time.time,
1257 e + chrono::milliseconds(3000));
1258 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
1259 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
1260 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001261 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001262 }
1263
1264 {
1265 SCOPED_TRACE("Trying node1 now");
1266 std::deque<TimestampedMessage> output1;
1267
1268 for (int i = 0; i < 3; ++i) {
1269 ASSERT_TRUE(mapper1.Front() != nullptr);
1270 output1.emplace_back(std::move(*mapper1.Front()));
1271 mapper1.PopFront();
1272 }
1273
1274 ASSERT_TRUE(mapper1.Front() == nullptr);
1275
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001276 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1277 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001278 e + chrono::seconds(100) + chrono::milliseconds(1000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001279 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
1280 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001281 e + chrono::nanoseconds(971));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001282 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001283
1284 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1285 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001286 e + chrono::seconds(100) + chrono::milliseconds(2000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001287 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
1288 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001289 e + chrono::nanoseconds(5458));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001290 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001291
1292 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1293 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001294 e + chrono::seconds(100) + chrono::milliseconds(3000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001295 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
1296 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
1297 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001298 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001299 }
Austin Schuh79b30942021-01-24 22:32:21 -08001300
1301 EXPECT_EQ(mapper0_count, 3u);
1302 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001303}
1304
Austin Schuhd2f96102020-12-01 20:27:29 -08001305// Tests that we can match timestamps on delivered messages. By doing this in
1306// the reverse order, the second node needs to queue data up from the first node
1307// to find the matching timestamp.
1308TEST_F(TimestampMapperTest, ReadNode1First) {
1309 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1310 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001311 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001312 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001313 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001314 writer1.QueueSpan(config2_.span());
1315
Austin Schuhd863e6e2022-10-16 15:44:50 -07001316 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001317 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001318 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001319 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1320
Austin Schuhd863e6e2022-10-16 15:44:50 -07001321 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001322 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001323 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001324 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1325
Austin Schuhd863e6e2022-10-16 15:44:50 -07001326 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001327 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001328 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001329 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1330 }
1331
1332 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1333
1334 ASSERT_EQ(parts[0].logger_node, "pi1");
1335 ASSERT_EQ(parts[1].logger_node, "pi2");
1336
Austin Schuh79b30942021-01-24 22:32:21 -08001337 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001338 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001339 mapper0.set_timestamp_callback(
1340 [&](TimestampedMessage *) { ++mapper0_count; });
1341 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001342 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001343 mapper1.set_timestamp_callback(
1344 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001345
1346 mapper0.AddPeer(&mapper1);
1347 mapper1.AddPeer(&mapper0);
1348
1349 {
1350 SCOPED_TRACE("Trying node1 now");
1351 std::deque<TimestampedMessage> output1;
1352
1353 ASSERT_TRUE(mapper1.Front() != nullptr);
1354 output1.emplace_back(std::move(*mapper1.Front()));
1355 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001356 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001357
1358 ASSERT_TRUE(mapper1.Front() != nullptr);
1359 output1.emplace_back(std::move(*mapper1.Front()));
1360 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001361 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001362
1363 ASSERT_TRUE(mapper1.Front() != nullptr);
1364 output1.emplace_back(std::move(*mapper1.Front()));
1365 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001366 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001367
1368 ASSERT_TRUE(mapper1.Front() == nullptr);
1369
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001370 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1371 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001372 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001373 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001374
1375 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1376 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001377 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001378 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001379
1380 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1381 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001382 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001383 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001384 }
1385
1386 {
1387 std::deque<TimestampedMessage> output0;
1388
1389 ASSERT_TRUE(mapper0.Front() != nullptr);
1390 output0.emplace_back(std::move(*mapper0.Front()));
1391 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001392 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001393
1394 ASSERT_TRUE(mapper0.Front() != nullptr);
1395 output0.emplace_back(std::move(*mapper0.Front()));
1396 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001397 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001398
1399 ASSERT_TRUE(mapper0.Front() != nullptr);
1400 output0.emplace_back(std::move(*mapper0.Front()));
1401 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001402 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001403
1404 ASSERT_TRUE(mapper0.Front() == nullptr);
1405
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001406 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1407 EXPECT_EQ(output0[0].monotonic_event_time.time,
1408 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001409 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001410
1411 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1412 EXPECT_EQ(output0[1].monotonic_event_time.time,
1413 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001414 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001415
1416 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1417 EXPECT_EQ(output0[2].monotonic_event_time.time,
1418 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001419 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001420 }
Austin Schuh79b30942021-01-24 22:32:21 -08001421
1422 EXPECT_EQ(mapper0_count, 3u);
1423 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001424}
1425
1426// Tests that we return just the timestamps if we couldn't find the data and the
1427// missing data was at the beginning of the file.
1428TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1429 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1430 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001431 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001432 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001433 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001434 writer1.QueueSpan(config2_.span());
1435
1436 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001437 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001438 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1439
Austin Schuhd863e6e2022-10-16 15:44:50 -07001440 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001441 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001442 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001443 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1444
Austin Schuhd863e6e2022-10-16 15:44:50 -07001445 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001446 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001447 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001448 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1449 }
1450
1451 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1452
1453 ASSERT_EQ(parts[0].logger_node, "pi1");
1454 ASSERT_EQ(parts[1].logger_node, "pi2");
1455
Austin Schuh79b30942021-01-24 22:32:21 -08001456 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001457 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001458 mapper0.set_timestamp_callback(
1459 [&](TimestampedMessage *) { ++mapper0_count; });
1460 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001461 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001462 mapper1.set_timestamp_callback(
1463 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001464
1465 mapper0.AddPeer(&mapper1);
1466 mapper1.AddPeer(&mapper0);
1467
1468 {
1469 SCOPED_TRACE("Trying node1 now");
1470 std::deque<TimestampedMessage> output1;
1471
1472 ASSERT_TRUE(mapper1.Front() != nullptr);
1473 output1.emplace_back(std::move(*mapper1.Front()));
1474 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001475 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001476
1477 ASSERT_TRUE(mapper1.Front() != nullptr);
1478 output1.emplace_back(std::move(*mapper1.Front()));
1479 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001480 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001481
1482 ASSERT_TRUE(mapper1.Front() != nullptr);
1483 output1.emplace_back(std::move(*mapper1.Front()));
1484 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001485 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001486
1487 ASSERT_TRUE(mapper1.Front() == nullptr);
1488
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001489 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1490 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001491 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001492 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001493
1494 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1495 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001496 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001497 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001498
1499 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1500 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001501 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001502 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001503 }
Austin Schuh79b30942021-01-24 22:32:21 -08001504
1505 EXPECT_EQ(mapper0_count, 0u);
1506 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001507}
1508
1509// Tests that we return just the timestamps if we couldn't find the data and the
1510// missing data was at the end of the file.
1511TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1512 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1513 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001514 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001515 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001516 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001517 writer1.QueueSpan(config2_.span());
1518
Austin Schuhd863e6e2022-10-16 15:44:50 -07001519 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001520 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001521 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001522 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1523
Austin Schuhd863e6e2022-10-16 15:44:50 -07001524 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001525 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001526 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001527 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1528
1529 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001530 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001531 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1532 }
1533
1534 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1535
1536 ASSERT_EQ(parts[0].logger_node, "pi1");
1537 ASSERT_EQ(parts[1].logger_node, "pi2");
1538
Austin Schuh79b30942021-01-24 22:32:21 -08001539 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001540 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001541 mapper0.set_timestamp_callback(
1542 [&](TimestampedMessage *) { ++mapper0_count; });
1543 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001544 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001545 mapper1.set_timestamp_callback(
1546 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001547
1548 mapper0.AddPeer(&mapper1);
1549 mapper1.AddPeer(&mapper0);
1550
1551 {
1552 SCOPED_TRACE("Trying node1 now");
1553 std::deque<TimestampedMessage> output1;
1554
1555 ASSERT_TRUE(mapper1.Front() != nullptr);
1556 output1.emplace_back(std::move(*mapper1.Front()));
1557 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001558 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001559
1560 ASSERT_TRUE(mapper1.Front() != nullptr);
1561 output1.emplace_back(std::move(*mapper1.Front()));
1562 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001563 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001564
1565 ASSERT_TRUE(mapper1.Front() != nullptr);
1566 output1.emplace_back(std::move(*mapper1.Front()));
1567 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001568 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001569
1570 ASSERT_TRUE(mapper1.Front() == nullptr);
1571
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001572 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1573 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001574 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001575 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001576
1577 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1578 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001579 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001580 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001581
1582 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1583 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001584 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001585 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001586 }
Austin Schuh79b30942021-01-24 22:32:21 -08001587
1588 EXPECT_EQ(mapper0_count, 0u);
1589 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001590}
1591
Austin Schuh993ccb52020-12-12 15:59:32 -08001592// Tests that we handle a message which failed to forward or be logged.
1593TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1594 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1595 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001596 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001597 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001598 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001599 writer1.QueueSpan(config2_.span());
1600
Austin Schuhd863e6e2022-10-16 15:44:50 -07001601 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001602 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001603 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001604 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1605
1606 // Create both the timestamp and message, but don't log them, simulating a
1607 // forwarding drop.
1608 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1609 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1610 chrono::seconds(100));
1611
Austin Schuhd863e6e2022-10-16 15:44:50 -07001612 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001613 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001614 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001615 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1616 }
1617
1618 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1619
1620 ASSERT_EQ(parts[0].logger_node, "pi1");
1621 ASSERT_EQ(parts[1].logger_node, "pi2");
1622
Austin Schuh79b30942021-01-24 22:32:21 -08001623 size_t mapper0_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001624 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001625 mapper0.set_timestamp_callback(
1626 [&](TimestampedMessage *) { ++mapper0_count; });
1627 size_t mapper1_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001628 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001629 mapper1.set_timestamp_callback(
1630 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001631
1632 mapper0.AddPeer(&mapper1);
1633 mapper1.AddPeer(&mapper0);
1634
1635 {
1636 std::deque<TimestampedMessage> output1;
1637
1638 ASSERT_TRUE(mapper1.Front() != nullptr);
1639 output1.emplace_back(std::move(*mapper1.Front()));
1640 mapper1.PopFront();
1641
1642 ASSERT_TRUE(mapper1.Front() != nullptr);
1643 output1.emplace_back(std::move(*mapper1.Front()));
1644
1645 ASSERT_FALSE(mapper1.Front() == nullptr);
1646
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001647 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1648 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001649 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001650 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001651
1652 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1653 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001654 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001655 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh993ccb52020-12-12 15:59:32 -08001656 }
Austin Schuh79b30942021-01-24 22:32:21 -08001657
1658 EXPECT_EQ(mapper0_count, 0u);
1659 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001660}
1661
Austin Schuhd2f96102020-12-01 20:27:29 -08001662// Tests that we properly sort log files with duplicate timestamps.
1663TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1664 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1665 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001666 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001667 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001668 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001669 writer1.QueueSpan(config2_.span());
1670
Austin Schuhd863e6e2022-10-16 15:44:50 -07001671 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001672 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001673 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001674 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1675
Austin Schuhd863e6e2022-10-16 15:44:50 -07001676 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001677 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001678 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001679 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1680
Austin Schuhd863e6e2022-10-16 15:44:50 -07001681 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001682 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001683 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001684 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1685
Austin Schuhd863e6e2022-10-16 15:44:50 -07001686 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001687 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001688 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001689 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1690 }
1691
1692 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1693
1694 ASSERT_EQ(parts[0].logger_node, "pi1");
1695 ASSERT_EQ(parts[1].logger_node, "pi2");
1696
Austin Schuh79b30942021-01-24 22:32:21 -08001697 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001698 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001699 mapper0.set_timestamp_callback(
1700 [&](TimestampedMessage *) { ++mapper0_count; });
1701 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001702 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001703 mapper1.set_timestamp_callback(
1704 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001705
1706 mapper0.AddPeer(&mapper1);
1707 mapper1.AddPeer(&mapper0);
1708
1709 {
1710 SCOPED_TRACE("Trying node1 now");
1711 std::deque<TimestampedMessage> output1;
1712
1713 for (int i = 0; i < 4; ++i) {
1714 ASSERT_TRUE(mapper1.Front() != nullptr);
1715 output1.emplace_back(std::move(*mapper1.Front()));
1716 mapper1.PopFront();
1717 }
1718 ASSERT_TRUE(mapper1.Front() == nullptr);
1719
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001720 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1721 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001722 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001723 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001724
1725 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1726 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001727 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001728 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001729
1730 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1731 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001732 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001733 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001734
1735 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1736 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001737 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001738 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001739 }
Austin Schuh79b30942021-01-24 22:32:21 -08001740
1741 EXPECT_EQ(mapper0_count, 0u);
1742 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001743}
1744
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001745// Tests that we properly produce a valid start time.
Austin Schuhd2f96102020-12-01 20:27:29 -08001746TEST_F(TimestampMapperTest, StartTime) {
1747 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1748 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001749 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001750 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001751 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001752 writer1.QueueSpan(config1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001753 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001754 writer2.QueueSpan(config3_.span());
1755 }
1756
1757 const std::vector<LogFile> parts =
1758 SortParts({logfile0_, logfile1_, logfile2_});
1759
Austin Schuh79b30942021-01-24 22:32:21 -08001760 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001761 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001762 mapper0.set_timestamp_callback(
1763 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001764
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001765 EXPECT_EQ(mapper0.monotonic_start_time(0), e + chrono::milliseconds(1));
1766 EXPECT_EQ(mapper0.realtime_start_time(0),
Austin Schuhd2f96102020-12-01 20:27:29 -08001767 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001768 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001769}
1770
Austin Schuhfecf1d82020-12-19 16:57:28 -08001771// Tests that when a peer isn't registered, we treat that as if there was no
1772// data available.
1773TEST_F(TimestampMapperTest, NoPeer) {
1774 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1775 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001776 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001777 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001778 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001779 writer1.QueueSpan(config2_.span());
1780
1781 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001782 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001783 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1784
Austin Schuhd863e6e2022-10-16 15:44:50 -07001785 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001786 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001787 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001788 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1789
Austin Schuhd863e6e2022-10-16 15:44:50 -07001790 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001791 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001792 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001793 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1794 }
1795
1796 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1797
1798 ASSERT_EQ(parts[0].logger_node, "pi1");
1799 ASSERT_EQ(parts[1].logger_node, "pi2");
1800
Austin Schuh79b30942021-01-24 22:32:21 -08001801 size_t mapper1_count = 0;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001802 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001803 mapper1.set_timestamp_callback(
1804 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001805
1806 {
1807 std::deque<TimestampedMessage> output1;
1808
1809 ASSERT_TRUE(mapper1.Front() != nullptr);
1810 output1.emplace_back(std::move(*mapper1.Front()));
1811 mapper1.PopFront();
1812 ASSERT_TRUE(mapper1.Front() != nullptr);
1813 output1.emplace_back(std::move(*mapper1.Front()));
1814 mapper1.PopFront();
1815 ASSERT_TRUE(mapper1.Front() != nullptr);
1816 output1.emplace_back(std::move(*mapper1.Front()));
1817 mapper1.PopFront();
1818 ASSERT_TRUE(mapper1.Front() == nullptr);
1819
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001820 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1821 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001822 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001823 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001824
1825 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1826 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001827 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001828 EXPECT_FALSE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001829
1830 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1831 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001832 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001833 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001834 }
Austin Schuh79b30942021-01-24 22:32:21 -08001835 EXPECT_EQ(mapper1_count, 3u);
1836}
1837
1838// Tests that we can queue messages and call the timestamp callback for both
1839// nodes.
1840TEST_F(TimestampMapperTest, QueueUntilNode0) {
1841 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1842 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001843 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh79b30942021-01-24 22:32:21 -08001844 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001845 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh79b30942021-01-24 22:32:21 -08001846 writer1.QueueSpan(config2_.span());
1847
Austin Schuhd863e6e2022-10-16 15:44:50 -07001848 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001849 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001850 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001851 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1852
Austin Schuhd863e6e2022-10-16 15:44:50 -07001853 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001854 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001855 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001856 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1857
Austin Schuhd863e6e2022-10-16 15:44:50 -07001858 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001859 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001860 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001861 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1862
Austin Schuhd863e6e2022-10-16 15:44:50 -07001863 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001864 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001865 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001866 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1867 }
1868
1869 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1870
1871 ASSERT_EQ(parts[0].logger_node, "pi1");
1872 ASSERT_EQ(parts[1].logger_node, "pi2");
1873
1874 size_t mapper0_count = 0;
1875 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1876 mapper0.set_timestamp_callback(
1877 [&](TimestampedMessage *) { ++mapper0_count; });
1878 size_t mapper1_count = 0;
1879 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1880 mapper1.set_timestamp_callback(
1881 [&](TimestampedMessage *) { ++mapper1_count; });
1882
1883 mapper0.AddPeer(&mapper1);
1884 mapper1.AddPeer(&mapper0);
1885
1886 {
1887 std::deque<TimestampedMessage> output0;
1888
1889 EXPECT_EQ(mapper0_count, 0u);
1890 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001891 mapper0.QueueUntil(
1892 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001893 EXPECT_EQ(mapper0_count, 3u);
1894 EXPECT_EQ(mapper1_count, 0u);
1895
1896 ASSERT_TRUE(mapper0.Front() != nullptr);
1897 EXPECT_EQ(mapper0_count, 3u);
1898 EXPECT_EQ(mapper1_count, 0u);
1899
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001900 mapper0.QueueUntil(
1901 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001902 EXPECT_EQ(mapper0_count, 3u);
1903 EXPECT_EQ(mapper1_count, 0u);
1904
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001905 mapper0.QueueUntil(
1906 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001907 EXPECT_EQ(mapper0_count, 4u);
1908 EXPECT_EQ(mapper1_count, 0u);
1909
1910 output0.emplace_back(std::move(*mapper0.Front()));
1911 mapper0.PopFront();
1912 output0.emplace_back(std::move(*mapper0.Front()));
1913 mapper0.PopFront();
1914 output0.emplace_back(std::move(*mapper0.Front()));
1915 mapper0.PopFront();
1916 output0.emplace_back(std::move(*mapper0.Front()));
1917 mapper0.PopFront();
1918
1919 EXPECT_EQ(mapper0_count, 4u);
1920 EXPECT_EQ(mapper1_count, 0u);
1921
1922 ASSERT_TRUE(mapper0.Front() == nullptr);
1923
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001924 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1925 EXPECT_EQ(output0[0].monotonic_event_time.time,
1926 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001927 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001928
1929 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1930 EXPECT_EQ(output0[1].monotonic_event_time.time,
1931 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001932 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001933
1934 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1935 EXPECT_EQ(output0[2].monotonic_event_time.time,
1936 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001937 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001938
1939 EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
1940 EXPECT_EQ(output0[3].monotonic_event_time.time,
1941 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001942 EXPECT_TRUE(output0[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001943 }
1944
1945 {
1946 SCOPED_TRACE("Trying node1 now");
1947 std::deque<TimestampedMessage> output1;
1948
1949 EXPECT_EQ(mapper0_count, 4u);
1950 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001951 mapper1.QueueUntil(BootTimestamp{
1952 .boot = 0,
1953 .time = e + chrono::seconds(100) + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001954 EXPECT_EQ(mapper0_count, 4u);
1955 EXPECT_EQ(mapper1_count, 3u);
1956
1957 ASSERT_TRUE(mapper1.Front() != nullptr);
1958 EXPECT_EQ(mapper0_count, 4u);
1959 EXPECT_EQ(mapper1_count, 3u);
1960
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001961 mapper1.QueueUntil(BootTimestamp{
1962 .boot = 0,
1963 .time = e + chrono::seconds(100) + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001964 EXPECT_EQ(mapper0_count, 4u);
1965 EXPECT_EQ(mapper1_count, 3u);
1966
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001967 mapper1.QueueUntil(BootTimestamp{
1968 .boot = 0,
1969 .time = e + chrono::seconds(100) + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001970 EXPECT_EQ(mapper0_count, 4u);
1971 EXPECT_EQ(mapper1_count, 4u);
1972
1973 ASSERT_TRUE(mapper1.Front() != nullptr);
1974 EXPECT_EQ(mapper0_count, 4u);
1975 EXPECT_EQ(mapper1_count, 4u);
1976
1977 output1.emplace_back(std::move(*mapper1.Front()));
1978 mapper1.PopFront();
1979 ASSERT_TRUE(mapper1.Front() != nullptr);
1980 output1.emplace_back(std::move(*mapper1.Front()));
1981 mapper1.PopFront();
1982 ASSERT_TRUE(mapper1.Front() != nullptr);
1983 output1.emplace_back(std::move(*mapper1.Front()));
1984 mapper1.PopFront();
1985 ASSERT_TRUE(mapper1.Front() != nullptr);
1986 output1.emplace_back(std::move(*mapper1.Front()));
1987 mapper1.PopFront();
1988
1989 EXPECT_EQ(mapper0_count, 4u);
1990 EXPECT_EQ(mapper1_count, 4u);
1991
1992 ASSERT_TRUE(mapper1.Front() == nullptr);
1993
1994 EXPECT_EQ(mapper0_count, 4u);
1995 EXPECT_EQ(mapper1_count, 4u);
1996
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001997 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1998 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001999 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002000 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002001
2002 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
2003 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002004 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002005 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002006
2007 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
2008 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002009 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002010 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002011
2012 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
2013 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002014 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002015 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08002016 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08002017}
2018
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002019class BootMergerTest : public SortingElementTest {
2020 public:
2021 BootMergerTest()
2022 : SortingElementTest(),
2023 boot0_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002024 /* 100ms */
2025 "max_out_of_order_duration": 100000000,
2026 "node": {
2027 "name": "pi2"
2028 },
2029 "logger_node": {
2030 "name": "pi1"
2031 },
2032 "monotonic_start_time": 1000000,
2033 "realtime_start_time": 1000000000000,
2034 "logger_monotonic_start_time": 1000000,
2035 "logger_realtime_start_time": 1000000000000,
2036 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2037 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
2038 "parts_index": 0,
2039 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2040 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002041 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2042 "boot_uuids": [
2043 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2044 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2045 ""
2046 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002047})")),
2048 boot1_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002049 /* 100ms */
2050 "max_out_of_order_duration": 100000000,
2051 "node": {
2052 "name": "pi2"
2053 },
2054 "logger_node": {
2055 "name": "pi1"
2056 },
2057 "monotonic_start_time": 1000000,
2058 "realtime_start_time": 1000000000000,
2059 "logger_monotonic_start_time": 1000000,
2060 "logger_realtime_start_time": 1000000000000,
2061 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2062 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
2063 "parts_index": 1,
2064 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2065 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002066 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2067 "boot_uuids": [
2068 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2069 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2070 ""
2071 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002072})")) {}
Austin Schuh8bee6882021-06-28 21:03:28 -07002073
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002074 protected:
2075 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
2076 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
2077};
2078
2079// This tests that we can properly sort a multi-node log file which has the old
2080// (and buggy) timestamps in the header, and the non-resetting parts_index.
2081// These make it so we can just bairly figure out what happened first and what
2082// happened second, but not in a way that is robust to multiple nodes rebooting.
2083TEST_F(BootMergerTest, OldReboot) {
Austin Schuh8bee6882021-06-28 21:03:28 -07002084 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002085 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002086 writer.QueueSpan(boot0_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002087 }
2088 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002089 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002090 writer.QueueSpan(boot1_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002091 }
2092
2093 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
2094
2095 ASSERT_EQ(parts.size(), 1u);
2096 ASSERT_EQ(parts[0].parts.size(), 2u);
2097
2098 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
2099 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002100 boot0_.message().source_node_boot_uuid()->string_view());
Austin Schuh8bee6882021-06-28 21:03:28 -07002101
2102 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
2103 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002104 boot1_.message().source_node_boot_uuid()->string_view());
2105}
2106
2107// This tests that we can produce messages ordered across a reboot.
2108TEST_F(BootMergerTest, SortAcrossReboot) {
2109 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2110 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002111 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002112 writer.QueueSpan(boot0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002113 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002114 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002115 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002116 MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
2117 }
2118 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002119 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002120 writer.QueueSpan(boot1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002121 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002122 MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002123 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002124 MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
2125 }
2126
2127 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
2128 ASSERT_EQ(parts.size(), 1u);
2129 ASSERT_EQ(parts[0].parts.size(), 2u);
2130
2131 BootMerger merger(FilterPartsForNode(parts, "pi2"));
2132
2133 EXPECT_EQ(merger.node(), 1u);
2134
2135 std::vector<Message> output;
2136 for (int i = 0; i < 4; ++i) {
2137 ASSERT_TRUE(merger.Front() != nullptr);
2138 output.emplace_back(std::move(*merger.Front()));
2139 merger.PopFront();
2140 }
2141
2142 ASSERT_TRUE(merger.Front() == nullptr);
2143
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002144 EXPECT_EQ(output[0].timestamp.boot, 0u);
2145 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
2146 EXPECT_EQ(output[1].timestamp.boot, 0u);
2147 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(2000));
2148
2149 EXPECT_EQ(output[2].timestamp.boot, 1u);
2150 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(100));
2151 EXPECT_EQ(output[3].timestamp.boot, 1u);
2152 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
Austin Schuh8bee6882021-06-28 21:03:28 -07002153}
2154
Austin Schuh48507722021-07-17 17:29:24 -07002155class RebootTimestampMapperTest : public SortingElementTest {
2156 public:
2157 RebootTimestampMapperTest()
2158 : SortingElementTest(),
2159 boot0a_(MakeHeader(config_, R"({
2160 /* 100ms */
2161 "max_out_of_order_duration": 100000000,
2162 "node": {
2163 "name": "pi1"
2164 },
2165 "logger_node": {
2166 "name": "pi1"
2167 },
2168 "monotonic_start_time": 1000000,
2169 "realtime_start_time": 1000000000000,
2170 "logger_monotonic_start_time": 1000000,
2171 "logger_realtime_start_time": 1000000000000,
2172 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2173 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2174 "parts_index": 0,
2175 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2176 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2177 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2178 "boot_uuids": [
2179 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2180 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2181 ""
2182 ]
2183})")),
2184 boot0b_(MakeHeader(config_, R"({
2185 /* 100ms */
2186 "max_out_of_order_duration": 100000000,
2187 "node": {
2188 "name": "pi1"
2189 },
2190 "logger_node": {
2191 "name": "pi1"
2192 },
2193 "monotonic_start_time": 1000000,
2194 "realtime_start_time": 1000000000000,
2195 "logger_monotonic_start_time": 1000000,
2196 "logger_realtime_start_time": 1000000000000,
2197 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2198 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2199 "parts_index": 1,
2200 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2201 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2202 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2203 "boot_uuids": [
2204 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2205 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2206 ""
2207 ]
2208})")),
2209 boot1a_(MakeHeader(config_, R"({
2210 /* 100ms */
2211 "max_out_of_order_duration": 100000000,
2212 "node": {
2213 "name": "pi2"
2214 },
2215 "logger_node": {
2216 "name": "pi1"
2217 },
2218 "monotonic_start_time": 1000000,
2219 "realtime_start_time": 1000000000000,
2220 "logger_monotonic_start_time": 1000000,
2221 "logger_realtime_start_time": 1000000000000,
2222 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2223 "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
2224 "parts_index": 0,
2225 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2226 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2227 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2228 "boot_uuids": [
2229 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2230 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2231 ""
2232 ]
2233})")),
2234 boot1b_(MakeHeader(config_, R"({
2235 /* 100ms */
2236 "max_out_of_order_duration": 100000000,
2237 "node": {
2238 "name": "pi2"
2239 },
2240 "logger_node": {
2241 "name": "pi1"
2242 },
2243 "monotonic_start_time": 1000000,
2244 "realtime_start_time": 1000000000000,
2245 "logger_monotonic_start_time": 1000000,
2246 "logger_realtime_start_time": 1000000000000,
2247 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2248 "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
2249 "parts_index": 1,
2250 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2251 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2252 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2253 "boot_uuids": [
2254 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2255 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2256 ""
2257 ]
2258})")) {}
2259
2260 protected:
2261 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
2262 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
2263 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
2264 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
2265};
2266
Austin Schuh48507722021-07-17 17:29:24 -07002267// Tests that we can match timestamps on delivered messages in the presence of
2268// reboots on the node receiving timestamps.
2269TEST_F(RebootTimestampMapperTest, ReadNode0First) {
2270 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2271 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002272 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002273 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002274 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002275 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002276 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002277 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002278 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002279 writer1b.QueueSpan(boot1b_.span());
2280
Austin Schuhd863e6e2022-10-16 15:44:50 -07002281 writer0a.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002282 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002283 writer1a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002284 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
2285 e + chrono::milliseconds(1001)));
2286
Austin Schuhd863e6e2022-10-16 15:44:50 -07002287 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh58646e22021-08-23 23:51:46 -07002288 e + chrono::milliseconds(1000), 0, chrono::seconds(21),
2289 e + chrono::milliseconds(2001)));
2290
Austin Schuhd863e6e2022-10-16 15:44:50 -07002291 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002292 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002293 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002294 e + chrono::milliseconds(2000), 0, chrono::seconds(20),
2295 e + chrono::milliseconds(2001)));
2296
Austin Schuhd863e6e2022-10-16 15:44:50 -07002297 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002298 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002299 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002300 e + chrono::milliseconds(3000), 0, chrono::seconds(20),
2301 e + chrono::milliseconds(3001)));
2302 }
2303
Austin Schuh58646e22021-08-23 23:51:46 -07002304 const std::vector<LogFile> parts =
2305 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Austin Schuh48507722021-07-17 17:29:24 -07002306
2307 for (const auto &x : parts) {
2308 LOG(INFO) << x;
2309 }
2310 ASSERT_EQ(parts.size(), 1u);
2311 ASSERT_EQ(parts[0].logger_node, "pi1");
2312
2313 size_t mapper0_count = 0;
2314 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2315 mapper0.set_timestamp_callback(
2316 [&](TimestampedMessage *) { ++mapper0_count; });
2317 size_t mapper1_count = 0;
2318 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2319 mapper1.set_timestamp_callback(
2320 [&](TimestampedMessage *) { ++mapper1_count; });
2321
2322 mapper0.AddPeer(&mapper1);
2323 mapper1.AddPeer(&mapper0);
2324
2325 {
2326 std::deque<TimestampedMessage> output0;
2327
2328 EXPECT_EQ(mapper0_count, 0u);
2329 EXPECT_EQ(mapper1_count, 0u);
2330 ASSERT_TRUE(mapper0.Front() != nullptr);
2331 EXPECT_EQ(mapper0_count, 1u);
2332 EXPECT_EQ(mapper1_count, 0u);
2333 output0.emplace_back(std::move(*mapper0.Front()));
2334 mapper0.PopFront();
2335 EXPECT_TRUE(mapper0.started());
2336 EXPECT_EQ(mapper0_count, 1u);
2337 EXPECT_EQ(mapper1_count, 0u);
2338
2339 ASSERT_TRUE(mapper0.Front() != nullptr);
2340 EXPECT_EQ(mapper0_count, 2u);
2341 EXPECT_EQ(mapper1_count, 0u);
2342 output0.emplace_back(std::move(*mapper0.Front()));
2343 mapper0.PopFront();
2344 EXPECT_TRUE(mapper0.started());
2345
2346 ASSERT_TRUE(mapper0.Front() != nullptr);
2347 output0.emplace_back(std::move(*mapper0.Front()));
2348 mapper0.PopFront();
2349 EXPECT_TRUE(mapper0.started());
2350
2351 EXPECT_EQ(mapper0_count, 3u);
2352 EXPECT_EQ(mapper1_count, 0u);
2353
2354 ASSERT_TRUE(mapper0.Front() == nullptr);
2355
2356 LOG(INFO) << output0[0];
2357 LOG(INFO) << output0[1];
2358 LOG(INFO) << output0[2];
2359
2360 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2361 EXPECT_EQ(output0[0].monotonic_event_time.time,
2362 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002363 EXPECT_EQ(output0[0].queue_index,
2364 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002365 EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
2366 EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002367 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002368
2369 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2370 EXPECT_EQ(output0[1].monotonic_event_time.time,
2371 e + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002372 EXPECT_EQ(output0[1].queue_index,
2373 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002374 EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
2375 EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002376 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002377
2378 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2379 EXPECT_EQ(output0[2].monotonic_event_time.time,
2380 e + chrono::milliseconds(3000));
Austin Schuh58646e22021-08-23 23:51:46 -07002381 EXPECT_EQ(output0[2].queue_index,
2382 (BootQueueIndex{.boot = 0u, .index = 2u}));
Austin Schuh48507722021-07-17 17:29:24 -07002383 EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
2384 EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002385 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002386 }
2387
2388 {
2389 SCOPED_TRACE("Trying node1 now");
2390 std::deque<TimestampedMessage> output1;
2391
2392 EXPECT_EQ(mapper0_count, 3u);
2393 EXPECT_EQ(mapper1_count, 0u);
2394
2395 ASSERT_TRUE(mapper1.Front() != nullptr);
2396 EXPECT_EQ(mapper0_count, 3u);
2397 EXPECT_EQ(mapper1_count, 1u);
2398 output1.emplace_back(std::move(*mapper1.Front()));
2399 mapper1.PopFront();
2400 EXPECT_TRUE(mapper1.started());
2401 EXPECT_EQ(mapper0_count, 3u);
2402 EXPECT_EQ(mapper1_count, 1u);
2403
2404 ASSERT_TRUE(mapper1.Front() != nullptr);
2405 EXPECT_EQ(mapper0_count, 3u);
2406 EXPECT_EQ(mapper1_count, 2u);
2407 output1.emplace_back(std::move(*mapper1.Front()));
2408 mapper1.PopFront();
2409 EXPECT_TRUE(mapper1.started());
2410
2411 ASSERT_TRUE(mapper1.Front() != nullptr);
2412 output1.emplace_back(std::move(*mapper1.Front()));
2413 mapper1.PopFront();
2414 EXPECT_TRUE(mapper1.started());
2415
Austin Schuh58646e22021-08-23 23:51:46 -07002416 ASSERT_TRUE(mapper1.Front() != nullptr);
2417 output1.emplace_back(std::move(*mapper1.Front()));
2418 mapper1.PopFront();
2419 EXPECT_TRUE(mapper1.started());
2420
Austin Schuh48507722021-07-17 17:29:24 -07002421 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002422 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002423
2424 ASSERT_TRUE(mapper1.Front() == nullptr);
2425
2426 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002427 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002428
2429 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2430 EXPECT_EQ(output1[0].monotonic_event_time.time,
2431 e + chrono::seconds(100) + chrono::milliseconds(1000));
2432 EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
2433 EXPECT_EQ(output1[0].monotonic_remote_time.time,
2434 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002435 EXPECT_EQ(output1[0].remote_queue_index,
2436 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002437 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
2438 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
2439 e + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002440 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002441
2442 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2443 EXPECT_EQ(output1[1].monotonic_event_time.time,
2444 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002445 EXPECT_EQ(output1[1].remote_queue_index,
2446 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002447 EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
2448 EXPECT_EQ(output1[1].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002449 e + chrono::milliseconds(1000));
Austin Schuh48507722021-07-17 17:29:24 -07002450 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
2451 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
2452 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002453 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002454
2455 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2456 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002457 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh48507722021-07-17 17:29:24 -07002458 EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
2459 EXPECT_EQ(output1[2].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002460 e + chrono::milliseconds(2000));
2461 EXPECT_EQ(output1[2].remote_queue_index,
2462 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002463 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
2464 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002465 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002466 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002467
Austin Schuh58646e22021-08-23 23:51:46 -07002468 EXPECT_EQ(output1[3].monotonic_event_time.boot, 1u);
2469 EXPECT_EQ(output1[3].monotonic_event_time.time,
2470 e + chrono::seconds(20) + chrono::milliseconds(3000));
2471 EXPECT_EQ(output1[3].monotonic_remote_time.boot, 0u);
2472 EXPECT_EQ(output1[3].monotonic_remote_time.time,
2473 e + chrono::milliseconds(3000));
2474 EXPECT_EQ(output1[3].remote_queue_index,
2475 (BootQueueIndex{.boot = 0u, .index = 2u}));
2476 EXPECT_EQ(output1[3].monotonic_timestamp_time.boot, 0u);
2477 EXPECT_EQ(output1[3].monotonic_timestamp_time.time,
2478 e + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002479 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh58646e22021-08-23 23:51:46 -07002480
Austin Schuh48507722021-07-17 17:29:24 -07002481 LOG(INFO) << output1[0];
2482 LOG(INFO) << output1[1];
2483 LOG(INFO) << output1[2];
Austin Schuh58646e22021-08-23 23:51:46 -07002484 LOG(INFO) << output1[3];
Austin Schuh48507722021-07-17 17:29:24 -07002485 }
2486}
2487
2488TEST_F(RebootTimestampMapperTest, Node2Reboot) {
2489 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2490 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002491 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002492 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002493 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002494 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002495 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002496 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002497 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002498 writer1b.QueueSpan(boot1b_.span());
2499
Austin Schuhd863e6e2022-10-16 15:44:50 -07002500 writer1a.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002501 e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002502 writer0a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002503 e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
2504 chrono::seconds(-100),
2505 e + chrono::seconds(100) + chrono::milliseconds(1001)));
2506
Austin Schuhd863e6e2022-10-16 15:44:50 -07002507 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002508 e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002509 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002510 e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
2511 chrono::seconds(-20),
2512 e + chrono::seconds(20) + chrono::milliseconds(2001)));
2513
Austin Schuhd863e6e2022-10-16 15:44:50 -07002514 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002515 e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002516 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002517 e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
2518 chrono::seconds(-20),
2519 e + chrono::seconds(20) + chrono::milliseconds(3001)));
2520 }
2521
2522 const std::vector<LogFile> parts =
2523 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2524
2525 for (const auto &x : parts) {
2526 LOG(INFO) << x;
2527 }
2528 ASSERT_EQ(parts.size(), 1u);
2529 ASSERT_EQ(parts[0].logger_node, "pi1");
2530
2531 size_t mapper0_count = 0;
2532 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2533 mapper0.set_timestamp_callback(
2534 [&](TimestampedMessage *) { ++mapper0_count; });
2535 size_t mapper1_count = 0;
2536 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2537 mapper1.set_timestamp_callback(
2538 [&](TimestampedMessage *) { ++mapper1_count; });
2539
2540 mapper0.AddPeer(&mapper1);
2541 mapper1.AddPeer(&mapper0);
2542
2543 {
2544 std::deque<TimestampedMessage> output0;
2545
2546 EXPECT_EQ(mapper0_count, 0u);
2547 EXPECT_EQ(mapper1_count, 0u);
2548 ASSERT_TRUE(mapper0.Front() != nullptr);
2549 EXPECT_EQ(mapper0_count, 1u);
2550 EXPECT_EQ(mapper1_count, 0u);
2551 output0.emplace_back(std::move(*mapper0.Front()));
2552 mapper0.PopFront();
2553 EXPECT_TRUE(mapper0.started());
2554 EXPECT_EQ(mapper0_count, 1u);
2555 EXPECT_EQ(mapper1_count, 0u);
2556
2557 ASSERT_TRUE(mapper0.Front() != nullptr);
2558 EXPECT_EQ(mapper0_count, 2u);
2559 EXPECT_EQ(mapper1_count, 0u);
2560 output0.emplace_back(std::move(*mapper0.Front()));
2561 mapper0.PopFront();
2562 EXPECT_TRUE(mapper0.started());
2563
2564 ASSERT_TRUE(mapper0.Front() != nullptr);
2565 output0.emplace_back(std::move(*mapper0.Front()));
2566 mapper0.PopFront();
2567 EXPECT_TRUE(mapper0.started());
2568
2569 EXPECT_EQ(mapper0_count, 3u);
2570 EXPECT_EQ(mapper1_count, 0u);
2571
2572 ASSERT_TRUE(mapper0.Front() == nullptr);
2573
2574 LOG(INFO) << output0[0];
2575 LOG(INFO) << output0[1];
2576 LOG(INFO) << output0[2];
2577
2578 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2579 EXPECT_EQ(output0[0].monotonic_event_time.time,
2580 e + chrono::milliseconds(1000));
2581 EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
2582 EXPECT_EQ(output0[0].monotonic_remote_time.time,
2583 e + chrono::seconds(100) + chrono::milliseconds(1000));
2584 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
2585 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
2586 e + chrono::seconds(100) + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002587 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002588
2589 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2590 EXPECT_EQ(output0[1].monotonic_event_time.time,
2591 e + chrono::milliseconds(2000));
2592 EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
2593 EXPECT_EQ(output0[1].monotonic_remote_time.time,
2594 e + chrono::seconds(20) + chrono::milliseconds(2000));
2595 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
2596 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
2597 e + chrono::seconds(20) + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002598 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002599
2600 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2601 EXPECT_EQ(output0[2].monotonic_event_time.time,
2602 e + chrono::milliseconds(3000));
2603 EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
2604 EXPECT_EQ(output0[2].monotonic_remote_time.time,
2605 e + chrono::seconds(20) + chrono::milliseconds(3000));
2606 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
2607 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
2608 e + chrono::seconds(20) + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002609 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002610 }
2611
2612 {
2613 SCOPED_TRACE("Trying node1 now");
2614 std::deque<TimestampedMessage> output1;
2615
2616 EXPECT_EQ(mapper0_count, 3u);
2617 EXPECT_EQ(mapper1_count, 0u);
2618
2619 ASSERT_TRUE(mapper1.Front() != nullptr);
2620 EXPECT_EQ(mapper0_count, 3u);
2621 EXPECT_EQ(mapper1_count, 1u);
2622 output1.emplace_back(std::move(*mapper1.Front()));
2623 mapper1.PopFront();
2624 EXPECT_TRUE(mapper1.started());
2625 EXPECT_EQ(mapper0_count, 3u);
2626 EXPECT_EQ(mapper1_count, 1u);
2627
2628 ASSERT_TRUE(mapper1.Front() != nullptr);
2629 EXPECT_EQ(mapper0_count, 3u);
2630 EXPECT_EQ(mapper1_count, 2u);
2631 output1.emplace_back(std::move(*mapper1.Front()));
2632 mapper1.PopFront();
2633 EXPECT_TRUE(mapper1.started());
2634
2635 ASSERT_TRUE(mapper1.Front() != nullptr);
2636 output1.emplace_back(std::move(*mapper1.Front()));
2637 mapper1.PopFront();
2638 EXPECT_TRUE(mapper1.started());
2639
2640 EXPECT_EQ(mapper0_count, 3u);
2641 EXPECT_EQ(mapper1_count, 3u);
2642
2643 ASSERT_TRUE(mapper1.Front() == nullptr);
2644
2645 EXPECT_EQ(mapper0_count, 3u);
2646 EXPECT_EQ(mapper1_count, 3u);
2647
2648 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2649 EXPECT_EQ(output1[0].monotonic_event_time.time,
2650 e + chrono::seconds(100) + chrono::milliseconds(1000));
2651 EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
2652 EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002653 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002654
2655 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2656 EXPECT_EQ(output1[1].monotonic_event_time.time,
2657 e + chrono::seconds(20) + chrono::milliseconds(2000));
2658 EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
2659 EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002660 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002661
2662 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2663 EXPECT_EQ(output1[2].monotonic_event_time.time,
2664 e + chrono::seconds(20) + chrono::milliseconds(3000));
2665 EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
2666 EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002667 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002668
2669 LOG(INFO) << output1[0];
2670 LOG(INFO) << output1[1];
2671 LOG(INFO) << output1[2];
2672 }
2673}
2674
Austin Schuh44c61472021-11-22 21:04:10 -08002675class SortingDeathTest : public SortingElementTest {
2676 public:
2677 SortingDeathTest()
2678 : SortingElementTest(),
2679 part0_(MakeHeader(config_, R"({
2680 /* 100ms */
2681 "max_out_of_order_duration": 100000000,
2682 "node": {
2683 "name": "pi1"
2684 },
2685 "logger_node": {
2686 "name": "pi1"
2687 },
2688 "monotonic_start_time": 1000000,
2689 "realtime_start_time": 1000000000000,
2690 "logger_monotonic_start_time": 1000000,
2691 "logger_realtime_start_time": 1000000000000,
2692 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2693 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2694 "parts_index": 0,
2695 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2696 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2697 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2698 "boot_uuids": [
2699 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2700 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2701 ""
2702 ],
2703 "oldest_remote_monotonic_timestamps": [
2704 9223372036854775807,
2705 9223372036854775807,
2706 9223372036854775807
2707 ],
2708 "oldest_local_monotonic_timestamps": [
2709 9223372036854775807,
2710 9223372036854775807,
2711 9223372036854775807
2712 ],
2713 "oldest_remote_unreliable_monotonic_timestamps": [
2714 9223372036854775807,
2715 0,
2716 9223372036854775807
2717 ],
2718 "oldest_local_unreliable_monotonic_timestamps": [
2719 9223372036854775807,
2720 0,
2721 9223372036854775807
2722 ]
2723})")),
2724 part1_(MakeHeader(config_, R"({
2725 /* 100ms */
2726 "max_out_of_order_duration": 100000000,
2727 "node": {
2728 "name": "pi1"
2729 },
2730 "logger_node": {
2731 "name": "pi1"
2732 },
2733 "monotonic_start_time": 1000000,
2734 "realtime_start_time": 1000000000000,
2735 "logger_monotonic_start_time": 1000000,
2736 "logger_realtime_start_time": 1000000000000,
2737 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2738 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2739 "parts_index": 1,
2740 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2741 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2742 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2743 "boot_uuids": [
2744 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2745 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2746 ""
2747 ],
2748 "oldest_remote_monotonic_timestamps": [
2749 9223372036854775807,
2750 9223372036854775807,
2751 9223372036854775807
2752 ],
2753 "oldest_local_monotonic_timestamps": [
2754 9223372036854775807,
2755 9223372036854775807,
2756 9223372036854775807
2757 ],
2758 "oldest_remote_unreliable_monotonic_timestamps": [
2759 9223372036854775807,
2760 100000,
2761 9223372036854775807
2762 ],
2763 "oldest_local_unreliable_monotonic_timestamps": [
2764 9223372036854775807,
2765 100000,
2766 9223372036854775807
2767 ]
2768})")),
2769 part2_(MakeHeader(config_, R"({
2770 /* 100ms */
2771 "max_out_of_order_duration": 100000000,
2772 "node": {
2773 "name": "pi1"
2774 },
2775 "logger_node": {
2776 "name": "pi1"
2777 },
2778 "monotonic_start_time": 1000000,
2779 "realtime_start_time": 1000000000000,
2780 "logger_monotonic_start_time": 1000000,
2781 "logger_realtime_start_time": 1000000000000,
2782 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2783 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2784 "parts_index": 2,
2785 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2786 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2787 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2788 "boot_uuids": [
2789 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2790 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2791 ""
2792 ],
2793 "oldest_remote_monotonic_timestamps": [
2794 9223372036854775807,
2795 9223372036854775807,
2796 9223372036854775807
2797 ],
2798 "oldest_local_monotonic_timestamps": [
2799 9223372036854775807,
2800 9223372036854775807,
2801 9223372036854775807
2802 ],
2803 "oldest_remote_unreliable_monotonic_timestamps": [
2804 9223372036854775807,
2805 200000,
2806 9223372036854775807
2807 ],
2808 "oldest_local_unreliable_monotonic_timestamps": [
2809 9223372036854775807,
2810 200000,
2811 9223372036854775807
2812 ]
2813})")),
2814 part3_(MakeHeader(config_, R"({
2815 /* 100ms */
2816 "max_out_of_order_duration": 100000000,
2817 "node": {
2818 "name": "pi1"
2819 },
2820 "logger_node": {
2821 "name": "pi1"
2822 },
2823 "monotonic_start_time": 1000000,
2824 "realtime_start_time": 1000000000000,
2825 "logger_monotonic_start_time": 1000000,
2826 "logger_realtime_start_time": 1000000000000,
2827 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2828 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2829 "parts_index": 3,
2830 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2831 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2832 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2833 "boot_uuids": [
2834 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2835 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2836 ""
2837 ],
2838 "oldest_remote_monotonic_timestamps": [
2839 9223372036854775807,
2840 9223372036854775807,
2841 9223372036854775807
2842 ],
2843 "oldest_local_monotonic_timestamps": [
2844 9223372036854775807,
2845 9223372036854775807,
2846 9223372036854775807
2847 ],
2848 "oldest_remote_unreliable_monotonic_timestamps": [
2849 9223372036854775807,
2850 300000,
2851 9223372036854775807
2852 ],
2853 "oldest_local_unreliable_monotonic_timestamps": [
2854 9223372036854775807,
2855 300000,
2856 9223372036854775807
2857 ]
2858})")) {}
2859
2860 protected:
2861 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part0_;
2862 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part1_;
2863 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part2_;
2864 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part3_;
2865};
2866
2867// Tests that if 2 computers go back and forth trying to be the same node, we
2868// die in sorting instead of failing to estimate time.
2869TEST_F(SortingDeathTest, FightingNodes) {
2870 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002871 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh44c61472021-11-22 21:04:10 -08002872 writer0.QueueSpan(part0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002873 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh44c61472021-11-22 21:04:10 -08002874 writer1.QueueSpan(part1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002875 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuh44c61472021-11-22 21:04:10 -08002876 writer2.QueueSpan(part2_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002877 TestDetachedBufferWriter writer3(logfile3_);
Austin Schuh44c61472021-11-22 21:04:10 -08002878 writer3.QueueSpan(part3_.span());
2879 }
2880
2881 EXPECT_DEATH(
2882 {
2883 const std::vector<LogFile> parts =
2884 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2885 },
Austin Schuh22cf7862022-09-19 19:09:42 -07002886 "found overlapping boots on");
Austin Schuh44c61472021-11-22 21:04:10 -08002887}
2888
Brian Smarttea913d42021-12-10 15:02:38 -08002889// Tests that we MessageReader blows up on a bad message.
2890TEST(MessageReaderConfirmCrash, ReadWrite) {
2891 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
2892 unlink(logfile.c_str());
2893
2894 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
2895 JsonToSizedFlatbuffer<LogFileHeader>(
2896 R"({ "max_out_of_order_duration": 100000000 })");
2897 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
2898 JsonToSizedFlatbuffer<MessageHeader>(
2899 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
2900 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
2901 JsonToSizedFlatbuffer<MessageHeader>(
2902 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
2903 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m4 =
2904 JsonToSizedFlatbuffer<MessageHeader>(
2905 R"({ "channel_index": 0, "monotonic_sent_time": 4 })");
2906
2907 // Starts out like a proper flat buffer header, but it breaks down ...
2908 std::vector<uint8_t> garbage{8, 0, 0, 0, 16, 0, 0, 0, 4, 0, 0, 0};
2909 absl::Span<uint8_t> m3_span(garbage);
2910
2911 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002912 TestDetachedBufferWriter writer(logfile);
Brian Smarttea913d42021-12-10 15:02:38 -08002913 writer.QueueSpan(config.span());
2914 writer.QueueSpan(m1.span());
2915 writer.QueueSpan(m2.span());
2916 writer.QueueSpan(m3_span);
2917 writer.QueueSpan(m4.span()); // This message is "hidden"
2918 }
2919
2920 {
2921 MessageReader reader(logfile);
2922
2923 EXPECT_EQ(reader.filename(), logfile);
2924
2925 EXPECT_EQ(
2926 reader.max_out_of_order_duration(),
2927 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2928 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2929 EXPECT_TRUE(reader.ReadMessage());
2930 EXPECT_EQ(reader.newest_timestamp(),
2931 monotonic_clock::time_point(chrono::nanoseconds(1)));
2932 EXPECT_TRUE(reader.ReadMessage());
2933 EXPECT_EQ(reader.newest_timestamp(),
2934 monotonic_clock::time_point(chrono::nanoseconds(2)));
2935 // Confirm default crashing behavior
2936 EXPECT_DEATH(reader.ReadMessage(), "Corrupted message at offset");
2937 }
2938
2939 {
2940 gflags::FlagSaver fs;
2941
2942 MessageReader reader(logfile);
2943 reader.set_crash_on_corrupt_message_flag(false);
2944
2945 EXPECT_EQ(reader.filename(), logfile);
2946
2947 EXPECT_EQ(
2948 reader.max_out_of_order_duration(),
2949 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2950 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2951 EXPECT_TRUE(reader.ReadMessage());
2952 EXPECT_EQ(reader.newest_timestamp(),
2953 monotonic_clock::time_point(chrono::nanoseconds(1)));
2954 EXPECT_TRUE(reader.ReadMessage());
2955 EXPECT_EQ(reader.newest_timestamp(),
2956 monotonic_clock::time_point(chrono::nanoseconds(2)));
2957 // Confirm avoiding the corrupted message crash, stopping instead.
2958 EXPECT_FALSE(reader.ReadMessage());
2959 }
2960
2961 {
2962 gflags::FlagSaver fs;
2963
2964 MessageReader reader(logfile);
2965 reader.set_crash_on_corrupt_message_flag(false);
2966 reader.set_ignore_corrupt_messages_flag(true);
2967
2968 EXPECT_EQ(reader.filename(), logfile);
2969
2970 EXPECT_EQ(
2971 reader.max_out_of_order_duration(),
2972 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2973 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2974 EXPECT_TRUE(reader.ReadMessage());
2975 EXPECT_EQ(reader.newest_timestamp(),
2976 monotonic_clock::time_point(chrono::nanoseconds(1)));
2977 EXPECT_TRUE(reader.ReadMessage());
2978 EXPECT_EQ(reader.newest_timestamp(),
2979 monotonic_clock::time_point(chrono::nanoseconds(2)));
2980 // Confirm skipping of the corrupted message to read the hidden one.
2981 EXPECT_TRUE(reader.ReadMessage());
2982 EXPECT_EQ(reader.newest_timestamp(),
2983 monotonic_clock::time_point(chrono::nanoseconds(4)));
2984 EXPECT_FALSE(reader.ReadMessage());
2985 }
2986}
2987
Austin Schuhfa30c352022-10-16 11:12:02 -07002988class InlinePackMessage : public ::testing::Test {
2989 protected:
2990 aos::Context RandomContext() {
2991 data_ = RandomData();
2992 std::uniform_int_distribution<uint32_t> uint32_distribution(
2993 std::numeric_limits<uint32_t>::min(),
2994 std::numeric_limits<uint32_t>::max());
2995
2996 std::uniform_int_distribution<int64_t> time_distribution(
2997 std::numeric_limits<int64_t>::min(),
2998 std::numeric_limits<int64_t>::max());
2999
3000 aos::Context context;
3001 context.monotonic_event_time =
3002 aos::monotonic_clock::epoch() +
3003 chrono::nanoseconds(time_distribution(random_number_generator_));
3004 context.realtime_event_time =
3005 aos::realtime_clock::epoch() +
3006 chrono::nanoseconds(time_distribution(random_number_generator_));
3007
3008 context.monotonic_remote_time =
3009 aos::monotonic_clock::epoch() +
3010 chrono::nanoseconds(time_distribution(random_number_generator_));
3011 context.realtime_remote_time =
3012 aos::realtime_clock::epoch() +
3013 chrono::nanoseconds(time_distribution(random_number_generator_));
3014
3015 context.queue_index = uint32_distribution(random_number_generator_);
3016 context.remote_queue_index = uint32_distribution(random_number_generator_);
3017 context.size = data_.size();
3018 context.data = data_.data();
3019 return context;
3020 }
3021
Austin Schuhf2d0e682022-10-16 14:20:58 -07003022 aos::monotonic_clock::time_point RandomMonotonic() {
3023 std::uniform_int_distribution<int64_t> time_distribution(
3024 0, std::numeric_limits<int64_t>::max());
3025 return aos::monotonic_clock::epoch() +
3026 chrono::nanoseconds(time_distribution(random_number_generator_));
3027 }
3028
3029 aos::SizePrefixedFlatbufferDetachedBuffer<message_bridge::RemoteMessage>
3030 RandomRemoteMessage() {
3031 std::uniform_int_distribution<uint8_t> uint8_distribution(
3032 std::numeric_limits<uint8_t>::min(),
3033 std::numeric_limits<uint8_t>::max());
3034
3035 std::uniform_int_distribution<int64_t> time_distribution(
3036 std::numeric_limits<int64_t>::min(),
3037 std::numeric_limits<int64_t>::max());
3038
3039 flatbuffers::FlatBufferBuilder fbb;
3040 message_bridge::RemoteMessage::Builder builder(fbb);
3041 builder.add_queue_index(uint8_distribution(random_number_generator_));
3042
3043 builder.add_monotonic_sent_time(
3044 time_distribution(random_number_generator_));
3045 builder.add_realtime_sent_time(time_distribution(random_number_generator_));
3046 builder.add_monotonic_remote_time(
3047 time_distribution(random_number_generator_));
3048 builder.add_realtime_remote_time(
3049 time_distribution(random_number_generator_));
3050
3051 builder.add_remote_queue_index(
3052 uint8_distribution(random_number_generator_));
3053
3054 fbb.FinishSizePrefixed(builder.Finish());
3055 return fbb.Release();
3056 }
3057
Austin Schuhfa30c352022-10-16 11:12:02 -07003058 std::vector<uint8_t> RandomData() {
3059 std::vector<uint8_t> result;
3060 std::uniform_int_distribution<int> length_distribution(1, 32);
3061 std::uniform_int_distribution<uint8_t> data_distribution(
3062 std::numeric_limits<uint8_t>::min(),
3063 std::numeric_limits<uint8_t>::max());
3064
3065 const size_t length = length_distribution(random_number_generator_);
3066
3067 result.reserve(length);
3068 for (size_t i = 0; i < length; ++i) {
3069 result.emplace_back(data_distribution(random_number_generator_));
3070 }
3071 return result;
3072 }
3073
3074 std::mt19937 random_number_generator_{
3075 std::mt19937(::aos::testing::RandomSeed())};
3076
3077 std::vector<uint8_t> data_;
3078};
3079
3080// Uses the binary schema to annotate a provided flatbuffer. Returns the
3081// annotated flatbuffer.
3082std::string AnnotateBinaries(
3083 const aos::NonSizePrefixedFlatbuffer<reflection::Schema> &schema,
3084 const std::string &schema_filename,
3085 flatbuffers::span<uint8_t> binary_data) {
3086 flatbuffers::BinaryAnnotator binary_annotator(
3087 schema.span().data(), schema.span().size(), binary_data.data(),
3088 binary_data.size());
3089
3090 auto annotations = binary_annotator.Annotate();
3091
3092 flatbuffers::AnnotatedBinaryTextGenerator text_generator(
3093 flatbuffers::AnnotatedBinaryTextGenerator::Options{}, annotations,
3094 binary_data.data(), binary_data.size());
3095
3096 text_generator.Generate(aos::testing::TestTmpDir() + "/foo.bfbs",
3097 schema_filename);
3098
3099 return aos::util::ReadFileToStringOrDie(aos::testing::TestTmpDir() +
3100 "/foo.afb");
3101}
3102
3103// Tests that all variations of PackMessage are equivalent to the inline
3104// PackMessage used to avoid allocations.
3105TEST_F(InlinePackMessage, Equivilent) {
3106 std::uniform_int_distribution<uint32_t> uint32_distribution(
3107 std::numeric_limits<uint32_t>::min(),
3108 std::numeric_limits<uint32_t>::max());
3109 aos::FlatbufferVector<reflection::Schema> schema =
3110 FileToFlatbuffer<reflection::Schema>(
3111 ArtifactPath("aos/events/logging/logger.bfbs"));
3112
3113 for (const LogType type :
3114 {LogType::kLogMessage, LogType::kLogDeliveryTimeOnly,
3115 LogType::kLogMessageAndDeliveryTime, LogType::kLogRemoteMessage}) {
3116 for (int i = 0; i < 100; ++i) {
3117 aos::Context context = RandomContext();
3118 const uint32_t channel_index =
3119 uint32_distribution(random_number_generator_);
3120
3121 flatbuffers::FlatBufferBuilder fbb;
3122 fbb.ForceDefaults(true);
3123 fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index, type));
3124
3125 VLOG(1) << absl::BytesToHexString(std::string_view(
3126 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3127 fbb.GetBufferSpan().size()));
3128
3129 // Make sure that both the builder and inline method agree on sizes.
Austin Schuh48d10d62022-10-16 22:19:23 -07003130 ASSERT_EQ(fbb.GetSize(), PackMessageSize(type, context.size))
Austin Schuhfa30c352022-10-16 11:12:02 -07003131 << "log type " << static_cast<int>(type);
3132
3133 // Initialize the buffer to something nonzero to make sure all the padding
3134 // bytes are set to 0.
Austin Schuh48d10d62022-10-16 22:19:23 -07003135 std::vector<uint8_t> repacked_message(PackMessageSize(type, context.size),
3136 67);
Austin Schuhfa30c352022-10-16 11:12:02 -07003137
3138 // And verify packing inline works as expected.
3139 EXPECT_EQ(repacked_message.size(),
3140 PackMessageInline(repacked_message.data(), context,
3141 channel_index, type));
3142 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
3143 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3144 fbb.GetBufferSpan().size()))
3145 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
3146 fbb.GetBufferSpan());
3147 }
3148 }
3149}
3150
Austin Schuhf2d0e682022-10-16 14:20:58 -07003151// Tests that all variations of PackMessage are equivilent to the inline
3152// PackMessage used to avoid allocations.
3153TEST_F(InlinePackMessage, RemoteEquivilent) {
3154 aos::FlatbufferVector<reflection::Schema> schema =
3155 FileToFlatbuffer<reflection::Schema>(
3156 ArtifactPath("aos/events/logging/logger.bfbs"));
3157 std::uniform_int_distribution<uint8_t> uint8_distribution(
3158 std::numeric_limits<uint8_t>::min(), std::numeric_limits<uint8_t>::max());
3159
3160 for (int i = 0; i < 100; ++i) {
3161 aos::SizePrefixedFlatbufferDetachedBuffer<RemoteMessage> random_msg =
3162 RandomRemoteMessage();
3163 const size_t channel_index = uint8_distribution(random_number_generator_);
3164 const monotonic_clock::time_point monotonic_timestamp_time =
3165 RandomMonotonic();
3166
3167 flatbuffers::FlatBufferBuilder fbb;
3168 fbb.ForceDefaults(true);
3169 fbb.FinishSizePrefixed(PackRemoteMessage(
3170 &fbb, &random_msg.message(), channel_index, monotonic_timestamp_time));
3171
3172 VLOG(1) << absl::BytesToHexString(std::string_view(
3173 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3174 fbb.GetBufferSpan().size()));
3175
3176 // Make sure that both the builder and inline method agree on sizes.
3177 ASSERT_EQ(fbb.GetSize(), PackRemoteMessageSize());
3178
3179 // Initialize the buffer to something nonzer to make sure all the padding
3180 // bytes are set to 0.
3181 std::vector<uint8_t> repacked_message(PackRemoteMessageSize(), 67);
3182
3183 // And verify packing inline works as expected.
3184 EXPECT_EQ(
3185 repacked_message.size(),
3186 PackRemoteMessageInline(repacked_message.data(), &random_msg.message(),
3187 channel_index, monotonic_timestamp_time));
3188 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
3189 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3190 fbb.GetBufferSpan().size()))
3191 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
3192 fbb.GetBufferSpan());
3193 }
3194}
Austin Schuhfa30c352022-10-16 11:12:02 -07003195
Austin Schuhc243b422020-10-11 15:35:08 -07003196} // namespace testing
3197} // namespace logger
3198} // namespace aos