blob: f4071e49e3ea0de66b28f72358acbfc7ec970405 [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
Alexei Strots01395492023-03-20 13:59:56 -07004#include <filesystem>
Austin Schuhfa30c352022-10-16 11:12:02 -07005#include <random>
Austin Schuhe243aaf2020-10-11 15:46:02 -07006#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07007
Austin Schuhfa30c352022-10-16 11:12:02 -07008#include "absl/strings/escaping.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07009#include "external/com_github_google_flatbuffers/src/annotated_binary_text_gen.h"
10#include "external/com_github_google_flatbuffers/src/binary_annotator.h"
11#include "flatbuffers/reflection_generated.h"
12#include "gflags/gflags.h"
13#include "gtest/gtest.h"
14
Austin Schuhc41603c2020-10-11 16:17:37 -070015#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -070016#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -080017#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070018#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070019#include "aos/json_to_flatbuffer.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070020#include "aos/testing/path.h"
21#include "aos/testing/random_seed.h"
Austin Schuhc243b422020-10-11 15:35:08 -070022#include "aos/testing/tmpdir.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070023#include "aos/util/file.h"
Austin Schuhc243b422020-10-11 15:35:08 -070024
25namespace aos {
26namespace logger {
27namespace testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070028namespace chrono = std::chrono;
Austin Schuhf2d0e682022-10-16 14:20:58 -070029using aos::message_bridge::RemoteMessage;
Eric Schmiedebergb38477e2022-12-02 16:08:04 -070030using aos::testing::ArtifactPath;
Austin Schuhc243b422020-10-11 15:35:08 -070031
Austin Schuhd863e6e2022-10-16 15:44:50 -070032// Adapter class to make it easy to test DetachedBufferWriter without adding
33// test only boilerplate to DetachedBufferWriter.
Alexei Strots15c22b12023-04-04 16:27:17 -070034class TestDetachedBufferWriter : public FileBackend,
35 public DetachedBufferWriter {
Austin Schuhd863e6e2022-10-16 15:44:50 -070036 public:
Austin Schuh48d10d62022-10-16 22:19:23 -070037 // Pick a max size that is rather conservative.
38 static constexpr size_t kMaxMessageSize = 128 * 1024;
Austin Schuhd863e6e2022-10-16 15:44:50 -070039 TestDetachedBufferWriter(std::string_view filename)
colleen61276dc2023-06-01 09:23:29 -070040 : FileBackend("/", false),
Alexei Strots15c22b12023-04-04 16:27:17 -070041 DetachedBufferWriter(FileBackend::RequestFile(filename),
42 std::make_unique<DummyEncoder>(kMaxMessageSize)) {}
Austin Schuhd863e6e2022-10-16 15:44:50 -070043 void WriteSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
44 QueueSpan(absl::Span<const uint8_t>(buffer.data(), buffer.size()));
45 }
Austin Schuh7ef11a42023-02-04 17:15:12 -080046 void QueueSpan(absl::Span<const uint8_t> buffer) {
47 DataEncoder::SpanCopier coppier(buffer);
48 CopyMessage(&coppier, monotonic_clock::now());
49 }
Austin Schuhd863e6e2022-10-16 15:44:50 -070050};
51
Austin Schuhe243aaf2020-10-11 15:46:02 -070052// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070053template <typename T>
54SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
55 const std::string_view data) {
56 flatbuffers::FlatBufferBuilder fbb;
57 fbb.ForceDefaults(true);
58 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
59 return fbb.Release();
60}
61
Austin Schuhe243aaf2020-10-11 15:46:02 -070062// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070063TEST(SpanReaderTest, ReadWrite) {
64 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
65 unlink(logfile.c_str());
66
67 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080068 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070069 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080070 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070071
72 {
Austin Schuhd863e6e2022-10-16 15:44:50 -070073 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080074 writer.QueueSpan(m1.span());
75 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070076 }
77
78 SpanReader reader(logfile);
79
80 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhcf5f6442021-07-06 10:43:28 -070081 EXPECT_EQ(reader.PeekMessage(), m1.span());
82 EXPECT_EQ(reader.PeekMessage(), m1.span());
Austin Schuhadd6eb32020-11-09 21:24:26 -080083 EXPECT_EQ(reader.ReadMessage(), m1.span());
84 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhcf5f6442021-07-06 10:43:28 -070085 EXPECT_EQ(reader.PeekMessage(), absl::Span<const uint8_t>());
Austin Schuhc243b422020-10-11 15:35:08 -070086 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
87}
88
Austin Schuhe243aaf2020-10-11 15:46:02 -070089// Tests that we can actually parse the resulting messages at a basic level
90// through MessageReader.
91TEST(MessageReaderTest, ReadWrite) {
92 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
93 unlink(logfile.c_str());
94
95 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
96 JsonToSizedFlatbuffer<LogFileHeader>(
97 R"({ "max_out_of_order_duration": 100000000 })");
98 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
99 JsonToSizedFlatbuffer<MessageHeader>(
100 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
101 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
102 JsonToSizedFlatbuffer<MessageHeader>(
103 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
104
105 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700106 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800107 writer.QueueSpan(config.span());
108 writer.QueueSpan(m1.span());
109 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -0700110 }
111
112 MessageReader reader(logfile);
113
114 EXPECT_EQ(reader.filename(), logfile);
115
116 EXPECT_EQ(
117 reader.max_out_of_order_duration(),
118 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
119 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
120 EXPECT_TRUE(reader.ReadMessage());
121 EXPECT_EQ(reader.newest_timestamp(),
122 monotonic_clock::time_point(chrono::nanoseconds(1)));
123 EXPECT_TRUE(reader.ReadMessage());
124 EXPECT_EQ(reader.newest_timestamp(),
125 monotonic_clock::time_point(chrono::nanoseconds(2)));
126 EXPECT_FALSE(reader.ReadMessage());
127}
128
Austin Schuh32f68492020-11-08 21:45:51 -0800129// Tests that we explode when messages are too far out of order.
130TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
131 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
132 unlink(logfile0.c_str());
133
134 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
135 JsonToSizedFlatbuffer<LogFileHeader>(
136 R"({
137 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800138 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800139 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
140 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
141 "parts_index": 0
142})");
143
144 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
145 JsonToSizedFlatbuffer<MessageHeader>(
146 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
147 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
148 JsonToSizedFlatbuffer<MessageHeader>(
149 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
150 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
151 JsonToSizedFlatbuffer<MessageHeader>(
152 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
153
154 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700155 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800156 writer.QueueSpan(config0.span());
157 writer.QueueSpan(m1.span());
158 writer.QueueSpan(m2.span());
159 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800160 }
Alexei Strots01395492023-03-20 13:59:56 -0700161 ASSERT_TRUE(std::filesystem::exists(logfile0)) << logfile0;
Austin Schuh32f68492020-11-08 21:45:51 -0800162
163 const std::vector<LogFile> parts = SortParts({logfile0});
164
165 PartsMessageReader reader(parts[0].parts[0]);
166
167 EXPECT_TRUE(reader.ReadMessage());
168 EXPECT_TRUE(reader.ReadMessage());
169 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
170}
171
Austin Schuhc41603c2020-10-11 16:17:37 -0700172// Tests that we can transparently re-assemble part files with a
173// PartsMessageReader.
174TEST(PartsMessageReaderTest, ReadWrite) {
175 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
176 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
177 unlink(logfile0.c_str());
178 unlink(logfile1.c_str());
179
180 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
181 JsonToSizedFlatbuffer<LogFileHeader>(
182 R"({
183 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800184 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700185 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
186 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
187 "parts_index": 0
188})");
189 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
190 JsonToSizedFlatbuffer<LogFileHeader>(
191 R"({
192 "max_out_of_order_duration": 200000000,
193 "monotonic_start_time": 0,
194 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800195 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700196 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
197 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
198 "parts_index": 1
199})");
200
201 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
202 JsonToSizedFlatbuffer<MessageHeader>(
203 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
204 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
205 JsonToSizedFlatbuffer<MessageHeader>(
206 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
207
208 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700209 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800210 writer.QueueSpan(config0.span());
211 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700212 }
213 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700214 TestDetachedBufferWriter writer(logfile1);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800215 writer.QueueSpan(config1.span());
216 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700217 }
218
219 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
220
221 PartsMessageReader reader(parts[0].parts[0]);
222
223 EXPECT_EQ(reader.filename(), logfile0);
224
225 // Confirm that the timestamps track, and the filename also updates.
226 // Read the first message.
227 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
228 EXPECT_EQ(
229 reader.max_out_of_order_duration(),
230 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
231 EXPECT_TRUE(reader.ReadMessage());
232 EXPECT_EQ(reader.filename(), logfile0);
233 EXPECT_EQ(reader.newest_timestamp(),
234 monotonic_clock::time_point(chrono::nanoseconds(1)));
235 EXPECT_EQ(
236 reader.max_out_of_order_duration(),
237 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
238
239 // Read the second message.
240 EXPECT_TRUE(reader.ReadMessage());
241 EXPECT_EQ(reader.filename(), logfile1);
242 EXPECT_EQ(reader.newest_timestamp(),
243 monotonic_clock::time_point(chrono::nanoseconds(2)));
244 EXPECT_EQ(
245 reader.max_out_of_order_duration(),
246 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
247
248 // And then confirm that reading again returns no message.
249 EXPECT_FALSE(reader.ReadMessage());
250 EXPECT_EQ(reader.filename(), logfile1);
251 EXPECT_EQ(
252 reader.max_out_of_order_duration(),
253 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800254 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Austin Schuhc41603c2020-10-11 16:17:37 -0700255}
Austin Schuh32f68492020-11-08 21:45:51 -0800256
Austin Schuh1be0ce42020-11-29 22:43:26 -0800257// Tests that Message's operator < works as expected.
258TEST(MessageTest, Sorting) {
259 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
260
261 Message m1{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700262 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700263 .timestamp =
264 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
Austin Schuh48507722021-07-17 17:29:24 -0700265 .monotonic_remote_boot = 0xffffff,
266 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700267 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800268 Message m2{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700269 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700270 .timestamp =
271 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
Austin Schuh48507722021-07-17 17:29:24 -0700272 .monotonic_remote_boot = 0xffffff,
273 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700274 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800275
276 EXPECT_LT(m1, m2);
277 EXPECT_GE(m2, m1);
278
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700279 m1.timestamp.time = e;
280 m2.timestamp.time = e;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800281
282 m1.channel_index = 1;
283 m2.channel_index = 2;
284
285 EXPECT_LT(m1, m2);
286 EXPECT_GE(m2, m1);
287
288 m1.channel_index = 0;
289 m2.channel_index = 0;
Austin Schuh58646e22021-08-23 23:51:46 -0700290 m1.queue_index.index = 0u;
291 m2.queue_index.index = 1u;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800292
293 EXPECT_LT(m1, m2);
294 EXPECT_GE(m2, m1);
295}
296
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800297aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
298 const aos::FlatbufferDetachedBuffer<Configuration> &config,
299 const std::string_view json) {
300 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700301 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800302 flatbuffers::Offset<Configuration> config_offset =
303 aos::CopyFlatBuffer(config, &fbb);
304 LogFileHeader::Builder header_builder(fbb);
305 header_builder.add_configuration(config_offset);
306 fbb.Finish(header_builder.Finish());
307 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
308
309 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
310 JsonToFlatbuffer<LogFileHeader>(json));
311 CHECK(header_updates.Verify());
312 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700313 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800314 fbb2.FinishSizePrefixed(
315 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
316 return fbb2.Release();
317}
318
319class SortingElementTest : public ::testing::Test {
320 public:
321 SortingElementTest()
322 : config_(JsonToFlatbuffer<Configuration>(
323 R"({
324 "channels": [
325 {
326 "name": "/a",
327 "type": "aos.logger.testing.TestMessage",
328 "source_node": "pi1",
329 "destination_nodes": [
330 {
331 "name": "pi2"
332 },
333 {
334 "name": "pi3"
335 }
336 ]
337 },
338 {
339 "name": "/b",
340 "type": "aos.logger.testing.TestMessage",
341 "source_node": "pi1"
342 },
343 {
344 "name": "/c",
345 "type": "aos.logger.testing.TestMessage",
346 "source_node": "pi1"
Austin Schuh48507722021-07-17 17:29:24 -0700347 },
348 {
349 "name": "/d",
350 "type": "aos.logger.testing.TestMessage",
351 "source_node": "pi2",
352 "destination_nodes": [
353 {
354 "name": "pi1"
355 }
356 ]
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800357 }
358 ],
359 "nodes": [
360 {
361 "name": "pi1"
362 },
363 {
364 "name": "pi2"
365 },
366 {
367 "name": "pi3"
368 }
369 ]
370}
371)")),
372 config0_(MakeHeader(config_, R"({
373 /* 100ms */
374 "max_out_of_order_duration": 100000000,
375 "node": {
376 "name": "pi1"
377 },
378 "logger_node": {
379 "name": "pi1"
380 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800381 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800382 "realtime_start_time": 1000000000000,
383 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700384 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
385 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
386 "boot_uuids": [
387 "1d782c63-b3c7-466e-bea9-a01308b43333",
388 "",
389 ""
390 ],
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800391 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
392 "parts_index": 0
393})")),
394 config1_(MakeHeader(config_,
395 R"({
396 /* 100ms */
397 "max_out_of_order_duration": 100000000,
398 "node": {
399 "name": "pi1"
400 },
401 "logger_node": {
402 "name": "pi1"
403 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800404 "monotonic_start_time": 1000000,
405 "realtime_start_time": 1000000000000,
406 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700407 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
408 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
409 "boot_uuids": [
410 "1d782c63-b3c7-466e-bea9-a01308b43333",
411 "",
412 ""
413 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800414 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
415 "parts_index": 0
416})")),
417 config2_(MakeHeader(config_,
418 R"({
419 /* 100ms */
420 "max_out_of_order_duration": 100000000,
421 "node": {
422 "name": "pi2"
423 },
424 "logger_node": {
425 "name": "pi2"
426 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800427 "monotonic_start_time": 0,
428 "realtime_start_time": 1000000000000,
Austin Schuh48507722021-07-17 17:29:24 -0700429 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
430 "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
431 "boot_uuids": [
432 "",
433 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
434 ""
435 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800436 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
437 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
438 "parts_index": 0
439})")),
440 config3_(MakeHeader(config_,
441 R"({
442 /* 100ms */
443 "max_out_of_order_duration": 100000000,
444 "node": {
445 "name": "pi1"
446 },
447 "logger_node": {
448 "name": "pi1"
449 },
450 "monotonic_start_time": 2000000,
451 "realtime_start_time": 1000000000,
452 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
Austin Schuh48507722021-07-17 17:29:24 -0700453 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
454 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
455 "boot_uuids": [
456 "1d782c63-b3c7-466e-bea9-a01308b43333",
457 "",
458 ""
459 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800460 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800461 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800462})")),
463 config4_(MakeHeader(config_,
464 R"({
465 /* 100ms */
466 "max_out_of_order_duration": 100000000,
467 "node": {
468 "name": "pi2"
469 },
470 "logger_node": {
471 "name": "pi1"
472 },
473 "monotonic_start_time": 2000000,
474 "realtime_start_time": 1000000000,
475 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
476 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
Austin Schuh48507722021-07-17 17:29:24 -0700477 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
478 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
479 "boot_uuids": [
480 "1d782c63-b3c7-466e-bea9-a01308b43333",
481 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
482 ""
483 ],
Austin Schuh8bf1e632021-01-02 22:41:04 -0800484 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800485})")) {
486 unlink(logfile0_.c_str());
487 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800488 unlink(logfile2_.c_str());
Austin Schuh48507722021-07-17 17:29:24 -0700489 unlink(logfile3_.c_str());
Austin Schuh921b2562021-07-31 19:37:03 -0700490 queue_index_.resize(config_.message().channels()->size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800491 }
492
493 protected:
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800494 flatbuffers::DetachedBuffer MakeLogMessage(
495 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
496 int value) {
497 flatbuffers::FlatBufferBuilder message_fbb;
498 message_fbb.ForceDefaults(true);
499 TestMessage::Builder test_message_builder(message_fbb);
500 test_message_builder.add_value(value);
501 message_fbb.Finish(test_message_builder.Finish());
502
503 aos::Context context;
504 context.monotonic_event_time = monotonic_now;
505 context.realtime_event_time = aos::realtime_clock::epoch() +
506 chrono::seconds(1000) +
507 monotonic_now.time_since_epoch();
Austin Schuh921b2562021-07-31 19:37:03 -0700508 CHECK_LT(static_cast<size_t>(channel_index), queue_index_.size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800509 context.queue_index = queue_index_[channel_index];
510 context.size = message_fbb.GetSize();
511 context.data = message_fbb.GetBufferPointer();
512
513 ++queue_index_[channel_index];
514
515 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh53e3a812021-11-03 16:57:23 -0700516 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800517 fbb.FinishSizePrefixed(
518 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
519
520 return fbb.Release();
521 }
522
523 flatbuffers::DetachedBuffer MakeTimestampMessage(
524 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800525 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
526 monotonic_clock::time_point monotonic_timestamp_time =
527 monotonic_clock::min_time) {
528 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800529 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800530
531 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800532 fbb.ForceDefaults(true);
533
534 logger::MessageHeader::Builder message_header_builder(fbb);
535
536 message_header_builder.add_channel_index(channel_index);
537
538 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
539 100);
540 message_header_builder.add_monotonic_sent_time(
541 monotonic_sent_time.time_since_epoch().count());
542 message_header_builder.add_realtime_sent_time(
543 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
544 monotonic_sent_time.time_since_epoch())
545 .time_since_epoch()
546 .count());
547
548 message_header_builder.add_monotonic_remote_time(
549 sender_monotonic_now.time_since_epoch().count());
550 message_header_builder.add_realtime_remote_time(
551 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
552 sender_monotonic_now.time_since_epoch())
553 .time_since_epoch()
554 .count());
555 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
556 1);
557
558 if (monotonic_timestamp_time != monotonic_clock::min_time) {
559 message_header_builder.add_monotonic_timestamp_time(
560 monotonic_timestamp_time.time_since_epoch().count());
561 }
562
563 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800564 LOG(INFO) << aos::FlatbufferToJson(
565 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
566 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
567
568 return fbb.Release();
569 }
570
571 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
572 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800573 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh48507722021-07-17 17:29:24 -0700574 const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800575
576 const aos::FlatbufferDetachedBuffer<Configuration> config_;
577 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
578 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800579 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
580 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800581 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800582
583 std::vector<uint32_t> queue_index_;
584};
585
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700586using MessageSorterTest = SortingElementTest;
587using MessageSorterDeathTest = MessageSorterTest;
588using PartsMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800589using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800590
591// Tests that we can pull messages out of a log sorted in order.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700592TEST_F(MessageSorterTest, Pull) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800593 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
594 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700595 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800596 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700597 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800598 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700599 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800600 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700601 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800602 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700603 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800604 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
605 }
606
607 const std::vector<LogFile> parts = SortParts({logfile0_});
608
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700609 MessageSorter message_sorter(parts[0].parts[0]);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800610
611 // Confirm we aren't sorted until any time until the message is popped.
612 // Peeking shouldn't change the sorted until time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700613 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800614
615 std::deque<Message> output;
616
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700617 ASSERT_TRUE(message_sorter.Front() != nullptr);
618 output.emplace_back(std::move(*message_sorter.Front()));
619 message_sorter.PopFront();
620 EXPECT_EQ(message_sorter.sorted_until(), e + chrono::milliseconds(1900));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800621
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700622 ASSERT_TRUE(message_sorter.Front() != nullptr);
623 output.emplace_back(std::move(*message_sorter.Front()));
624 message_sorter.PopFront();
625 EXPECT_EQ(message_sorter.sorted_until(), e + chrono::milliseconds(1900));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800626
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700627 ASSERT_TRUE(message_sorter.Front() != nullptr);
628 output.emplace_back(std::move(*message_sorter.Front()));
629 message_sorter.PopFront();
630 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::max_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800631
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700632 ASSERT_TRUE(message_sorter.Front() != nullptr);
633 output.emplace_back(std::move(*message_sorter.Front()));
634 message_sorter.PopFront();
635 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::max_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800636
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700637 ASSERT_TRUE(message_sorter.Front() == nullptr);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800638
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700639 EXPECT_EQ(output[0].timestamp.boot, 0);
640 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
641 EXPECT_EQ(output[1].timestamp.boot, 0);
642 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1000));
643 EXPECT_EQ(output[2].timestamp.boot, 0);
644 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1901));
645 EXPECT_EQ(output[3].timestamp.boot, 0);
646 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800647}
648
Austin Schuhb000de62020-12-03 22:00:40 -0800649// Tests that we can pull messages out of a log sorted in order.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700650TEST_F(MessageSorterTest, WayBeforeStart) {
Austin Schuhb000de62020-12-03 22:00:40 -0800651 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
652 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700653 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhb000de62020-12-03 22:00:40 -0800654 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700655 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800656 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700657 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800658 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700659 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800660 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700661 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800662 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700663 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800664 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
665 }
666
667 const std::vector<LogFile> parts = SortParts({logfile0_});
668
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700669 MessageSorter message_sorter(parts[0].parts[0]);
Austin Schuhb000de62020-12-03 22:00:40 -0800670
671 // Confirm we aren't sorted until any time until the message is popped.
672 // Peeking shouldn't change the sorted until time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700673 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
Austin Schuhb000de62020-12-03 22:00:40 -0800674
675 std::deque<Message> output;
676
677 for (monotonic_clock::time_point t :
678 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
679 e + chrono::milliseconds(1900), monotonic_clock::max_time,
680 monotonic_clock::max_time}) {
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700681 ASSERT_TRUE(message_sorter.Front() != nullptr);
682 output.emplace_back(std::move(*message_sorter.Front()));
683 message_sorter.PopFront();
684 EXPECT_EQ(message_sorter.sorted_until(), t);
Austin Schuhb000de62020-12-03 22:00:40 -0800685 }
686
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700687 ASSERT_TRUE(message_sorter.Front() == nullptr);
Austin Schuhb000de62020-12-03 22:00:40 -0800688
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700689 EXPECT_EQ(output[0].timestamp.boot, 0u);
690 EXPECT_EQ(output[0].timestamp.time, e - chrono::milliseconds(1000));
691 EXPECT_EQ(output[1].timestamp.boot, 0u);
692 EXPECT_EQ(output[1].timestamp.time, e - chrono::milliseconds(500));
693 EXPECT_EQ(output[2].timestamp.boot, 0u);
694 EXPECT_EQ(output[2].timestamp.time, e - chrono::milliseconds(10));
695 EXPECT_EQ(output[3].timestamp.boot, 0u);
696 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(1901));
697 EXPECT_EQ(output[4].timestamp.boot, 0u);
698 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuhb000de62020-12-03 22:00:40 -0800699}
700
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800701// Tests that messages too far out of order trigger death.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700702TEST_F(MessageSorterDeathTest, Pull) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800703 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
704 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700705 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800706 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700707 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800708 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700709 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800710 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700711 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800712 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
713 // The following message is too far out of order and will trigger the CHECK.
Austin Schuhd863e6e2022-10-16 15:44:50 -0700714 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800715 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
716 }
717
718 const std::vector<LogFile> parts = SortParts({logfile0_});
719
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700720 MessageSorter message_sorter(parts[0].parts[0]);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800721
722 // Confirm we aren't sorted until any time until the message is popped.
723 // Peeking shouldn't change the sorted until time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700724 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800725 std::deque<Message> output;
726
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700727 ASSERT_TRUE(message_sorter.Front() != nullptr);
728 message_sorter.PopFront();
729 ASSERT_TRUE(message_sorter.Front() != nullptr);
730 ASSERT_TRUE(message_sorter.Front() != nullptr);
731 message_sorter.PopFront();
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800732
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700733 EXPECT_DEATH({ message_sorter.Front(); },
Austin Schuh58646e22021-08-23 23:51:46 -0700734 "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800735}
736
Austin Schuh8f52ed52020-11-30 23:12:39 -0800737// Tests that we can merge data from 2 separate files, including duplicate data.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700738TEST_F(PartsMergerTest, TwoFileMerger) {
Austin Schuh8f52ed52020-11-30 23:12:39 -0800739 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
740 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700741 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800742 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700743 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800744 writer1.QueueSpan(config1_.span());
745
Austin Schuhd863e6e2022-10-16 15:44:50 -0700746 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800747 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700748 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800749 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
750
Austin Schuhd863e6e2022-10-16 15:44:50 -0700751 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800752 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700753 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800754 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
755
756 // Make a duplicate!
757 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
758 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
759 writer0.QueueSpan(msg.span());
760 writer1.QueueSpan(msg.span());
761
Austin Schuhd863e6e2022-10-16 15:44:50 -0700762 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800763 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
764 }
765
766 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -0700767 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -0800768 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800769
Alexei Strots1f51ac72023-05-15 10:14:54 -0700770 PartsMerger merger("pi1", 0, log_files);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800771
772 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
773
774 std::deque<Message> output;
775
776 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
777 ASSERT_TRUE(merger.Front() != nullptr);
778 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
779
780 output.emplace_back(std::move(*merger.Front()));
781 merger.PopFront();
782 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
783
784 ASSERT_TRUE(merger.Front() != nullptr);
785 output.emplace_back(std::move(*merger.Front()));
786 merger.PopFront();
787 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
788
789 ASSERT_TRUE(merger.Front() != nullptr);
790 output.emplace_back(std::move(*merger.Front()));
791 merger.PopFront();
792 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
793
794 ASSERT_TRUE(merger.Front() != nullptr);
795 output.emplace_back(std::move(*merger.Front()));
796 merger.PopFront();
797 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
798
799 ASSERT_TRUE(merger.Front() != nullptr);
800 output.emplace_back(std::move(*merger.Front()));
801 merger.PopFront();
802 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
803
804 ASSERT_TRUE(merger.Front() != nullptr);
805 output.emplace_back(std::move(*merger.Front()));
806 merger.PopFront();
807 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
808
809 ASSERT_TRUE(merger.Front() == nullptr);
810
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700811 EXPECT_EQ(output[0].timestamp.boot, 0u);
812 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
813 EXPECT_EQ(output[1].timestamp.boot, 0u);
814 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1001));
815 EXPECT_EQ(output[2].timestamp.boot, 0u);
816 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1002));
817 EXPECT_EQ(output[3].timestamp.boot, 0u);
818 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
819 EXPECT_EQ(output[4].timestamp.boot, 0u);
820 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(3000));
821 EXPECT_EQ(output[5].timestamp.boot, 0u);
822 EXPECT_EQ(output[5].timestamp.time, e + chrono::milliseconds(3002));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800823}
824
Austin Schuh8bf1e632021-01-02 22:41:04 -0800825// Tests that we can merge timestamps with various combinations of
826// monotonic_timestamp_time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700827TEST_F(PartsMergerTest, TwoFileTimestampMerger) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800828 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
829 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700830 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800831 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700832 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800833 writer1.QueueSpan(config1_.span());
834
835 // Neither has it.
836 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700837 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800838 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700839 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800840 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
841
842 // First only has it.
843 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700844 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800845 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
846 e + chrono::nanoseconds(971)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700847 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800848 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
849
850 // Second only has it.
851 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700852 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800853 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700854 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800855 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
856 e + chrono::nanoseconds(972)));
857
858 // Both have it.
859 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700860 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800861 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
862 e + chrono::nanoseconds(973)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700863 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800864 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
865 e + chrono::nanoseconds(973)));
866 }
867
868 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -0700869 LogFilesContainer log_files(parts);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800870 ASSERT_EQ(parts.size(), 1u);
871
Alexei Strots1f51ac72023-05-15 10:14:54 -0700872 PartsMerger merger("pi1", 0, log_files);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800873
874 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
875
876 std::deque<Message> output;
877
878 for (int i = 0; i < 4; ++i) {
879 ASSERT_TRUE(merger.Front() != nullptr);
880 output.emplace_back(std::move(*merger.Front()));
881 merger.PopFront();
882 }
883 ASSERT_TRUE(merger.Front() == nullptr);
884
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700885 EXPECT_EQ(output[0].timestamp.boot, 0u);
886 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700887 EXPECT_FALSE(output[0].data->has_monotonic_timestamp_time);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700888
889 EXPECT_EQ(output[1].timestamp.boot, 0u);
890 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700891 EXPECT_TRUE(output[1].data->has_monotonic_timestamp_time);
892 EXPECT_EQ(output[1].data->monotonic_timestamp_time,
893 monotonic_clock::time_point(std::chrono::nanoseconds(971)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700894
895 EXPECT_EQ(output[2].timestamp.boot, 0u);
896 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700897 EXPECT_TRUE(output[2].data->has_monotonic_timestamp_time);
898 EXPECT_EQ(output[2].data->monotonic_timestamp_time,
899 monotonic_clock::time_point(std::chrono::nanoseconds(972)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700900
901 EXPECT_EQ(output[3].timestamp.boot, 0u);
902 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700903 EXPECT_TRUE(output[3].data->has_monotonic_timestamp_time);
904 EXPECT_EQ(output[3].data->monotonic_timestamp_time,
905 monotonic_clock::time_point(std::chrono::nanoseconds(973)));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800906}
907
Austin Schuhd2f96102020-12-01 20:27:29 -0800908// Tests that we can match timestamps on delivered messages.
909TEST_F(TimestampMapperTest, ReadNode0First) {
910 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
911 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700912 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800913 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700914 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800915 writer1.QueueSpan(config2_.span());
916
Austin Schuhd863e6e2022-10-16 15:44:50 -0700917 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800918 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700919 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800920 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
921
Austin Schuhd863e6e2022-10-16 15:44:50 -0700922 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800923 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700924 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800925 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
926
Austin Schuhd863e6e2022-10-16 15:44:50 -0700927 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800928 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700929 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800930 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
931 }
932
933 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -0700934 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -0800935 ASSERT_EQ(parts[0].logger_node, "pi1");
936 ASSERT_EQ(parts[1].logger_node, "pi2");
937
Austin Schuh79b30942021-01-24 22:32:21 -0800938 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -0700939
940 TimestampMapper mapper0("pi1", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -0800941 mapper0.set_timestamp_callback(
942 [&](TimestampedMessage *) { ++mapper0_count; });
943 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -0700944 TimestampMapper mapper1("pi2", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -0800945 mapper1.set_timestamp_callback(
946 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -0800947
948 mapper0.AddPeer(&mapper1);
949 mapper1.AddPeer(&mapper0);
950
951 {
952 std::deque<TimestampedMessage> output0;
953
Austin Schuh79b30942021-01-24 22:32:21 -0800954 EXPECT_EQ(mapper0_count, 0u);
955 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800956 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800957 EXPECT_EQ(mapper0_count, 1u);
958 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800959 output0.emplace_back(std::move(*mapper0.Front()));
960 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700961 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800962 EXPECT_EQ(mapper0_count, 1u);
963 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800964
965 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800966 EXPECT_EQ(mapper0_count, 2u);
967 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800968 output0.emplace_back(std::move(*mapper0.Front()));
969 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700970 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800971
972 ASSERT_TRUE(mapper0.Front() != nullptr);
973 output0.emplace_back(std::move(*mapper0.Front()));
974 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700975 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800976
Austin Schuh79b30942021-01-24 22:32:21 -0800977 EXPECT_EQ(mapper0_count, 3u);
978 EXPECT_EQ(mapper1_count, 0u);
979
Austin Schuhd2f96102020-12-01 20:27:29 -0800980 ASSERT_TRUE(mapper0.Front() == nullptr);
981
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700982 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
983 EXPECT_EQ(output0[0].monotonic_event_time.time,
984 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700985 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700986
987 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
988 EXPECT_EQ(output0[1].monotonic_event_time.time,
989 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700990 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700991
992 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
993 EXPECT_EQ(output0[2].monotonic_event_time.time,
994 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700995 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -0800996 }
997
998 {
999 SCOPED_TRACE("Trying node1 now");
1000 std::deque<TimestampedMessage> output1;
1001
Austin Schuh79b30942021-01-24 22:32:21 -08001002 EXPECT_EQ(mapper0_count, 3u);
1003 EXPECT_EQ(mapper1_count, 0u);
1004
Austin Schuhd2f96102020-12-01 20:27:29 -08001005 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001006 EXPECT_EQ(mapper0_count, 3u);
1007 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001008 output1.emplace_back(std::move(*mapper1.Front()));
1009 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001010 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -08001011 EXPECT_EQ(mapper0_count, 3u);
1012 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001013
1014 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001015 EXPECT_EQ(mapper0_count, 3u);
1016 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001017 output1.emplace_back(std::move(*mapper1.Front()));
1018 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001019 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001020
1021 ASSERT_TRUE(mapper1.Front() != nullptr);
1022 output1.emplace_back(std::move(*mapper1.Front()));
1023 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001024 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001025
Austin Schuh79b30942021-01-24 22:32:21 -08001026 EXPECT_EQ(mapper0_count, 3u);
1027 EXPECT_EQ(mapper1_count, 3u);
1028
Austin Schuhd2f96102020-12-01 20:27:29 -08001029 ASSERT_TRUE(mapper1.Front() == nullptr);
1030
Austin Schuh79b30942021-01-24 22:32:21 -08001031 EXPECT_EQ(mapper0_count, 3u);
1032 EXPECT_EQ(mapper1_count, 3u);
1033
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001034 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1035 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001036 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001037 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001038
1039 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1040 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001041 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001042 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001043
1044 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1045 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001046 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001047 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001048 }
1049}
1050
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001051// Tests that we filter messages using the channel filter callback
1052TEST_F(TimestampMapperTest, ReplayChannelsCallbackTest) {
1053 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1054 {
1055 TestDetachedBufferWriter writer0(logfile0_);
1056 writer0.QueueSpan(config0_.span());
1057 TestDetachedBufferWriter writer1(logfile1_);
1058 writer1.QueueSpan(config2_.span());
1059
1060 writer0.WriteSizedFlatbuffer(
1061 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1062 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1063 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1064
1065 writer0.WriteSizedFlatbuffer(
1066 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1067 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1068 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1069
1070 writer0.WriteSizedFlatbuffer(
1071 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1072 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1073 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1074 }
1075
1076 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001077 LogFilesContainer log_files(parts);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001078 ASSERT_EQ(parts[0].logger_node, "pi1");
1079 ASSERT_EQ(parts[1].logger_node, "pi2");
1080
1081 // mapper0 will not provide any messages while mapper1 will provide all
1082 // messages due to the channel filter callbacks used
1083 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001084
1085 TimestampMapper mapper0("pi1", log_files);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001086 mapper0.set_timestamp_callback(
1087 [&](TimestampedMessage *) { ++mapper0_count; });
1088 mapper0.set_replay_channels_callback(
1089 [&](const TimestampedMessage &) -> bool { return mapper0_count != 2; });
1090 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001091 TimestampMapper mapper1("pi2", log_files);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001092 mapper1.set_timestamp_callback(
1093 [&](TimestampedMessage *) { ++mapper1_count; });
1094 mapper1.set_replay_channels_callback(
1095 [&](const TimestampedMessage &) -> bool { return mapper1_count != 2; });
1096
1097 mapper0.AddPeer(&mapper1);
1098 mapper1.AddPeer(&mapper0);
1099
1100 {
1101 std::deque<TimestampedMessage> output0;
1102
1103 EXPECT_EQ(mapper0_count, 0u);
1104 EXPECT_EQ(mapper1_count, 0u);
1105
1106 ASSERT_TRUE(mapper0.Front() != nullptr);
1107 EXPECT_EQ(mapper0_count, 1u);
1108 EXPECT_EQ(mapper1_count, 0u);
1109 output0.emplace_back(std::move(*mapper0.Front()));
1110 mapper0.PopFront();
1111
1112 EXPECT_TRUE(mapper0.started());
1113 EXPECT_EQ(mapper0_count, 1u);
1114 EXPECT_EQ(mapper1_count, 0u);
1115
1116 // mapper0_count is now at 3 since the second message is not queued, but
1117 // timestamp_callback needs to be called everytime even if Front() does not
1118 // provide a message due to the replay_channels_callback.
1119 ASSERT_TRUE(mapper0.Front() != nullptr);
1120 EXPECT_EQ(mapper0_count, 3u);
1121 EXPECT_EQ(mapper1_count, 0u);
1122 output0.emplace_back(std::move(*mapper0.Front()));
1123 mapper0.PopFront();
1124
1125 EXPECT_TRUE(mapper0.started());
1126 EXPECT_EQ(mapper0_count, 3u);
1127 EXPECT_EQ(mapper1_count, 0u);
1128
1129 ASSERT_TRUE(mapper0.Front() == nullptr);
1130 EXPECT_TRUE(mapper0.started());
1131
1132 EXPECT_EQ(mapper0_count, 3u);
1133 EXPECT_EQ(mapper1_count, 0u);
1134
1135 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1136 EXPECT_EQ(output0[0].monotonic_event_time.time,
1137 e + chrono::milliseconds(1000));
1138 EXPECT_TRUE(output0[0].data != nullptr);
1139
1140 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1141 EXPECT_EQ(output0[1].monotonic_event_time.time,
1142 e + chrono::milliseconds(3000));
1143 EXPECT_TRUE(output0[1].data != nullptr);
1144 }
1145
1146 {
1147 SCOPED_TRACE("Trying node1 now");
1148 std::deque<TimestampedMessage> output1;
1149
1150 EXPECT_EQ(mapper0_count, 3u);
1151 EXPECT_EQ(mapper1_count, 0u);
1152
1153 ASSERT_TRUE(mapper1.Front() != nullptr);
1154 EXPECT_EQ(mapper0_count, 3u);
1155 EXPECT_EQ(mapper1_count, 1u);
1156 output1.emplace_back(std::move(*mapper1.Front()));
1157 mapper1.PopFront();
1158 EXPECT_TRUE(mapper1.started());
1159 EXPECT_EQ(mapper0_count, 3u);
1160 EXPECT_EQ(mapper1_count, 1u);
1161
1162 // mapper1_count is now at 3 since the second message is not queued, but
1163 // timestamp_callback needs to be called everytime even if Front() does not
1164 // provide a message due to the replay_channels_callback.
1165 ASSERT_TRUE(mapper1.Front() != nullptr);
1166 output1.emplace_back(std::move(*mapper1.Front()));
1167 mapper1.PopFront();
1168 EXPECT_TRUE(mapper1.started());
1169
1170 EXPECT_EQ(mapper0_count, 3u);
1171 EXPECT_EQ(mapper1_count, 3u);
1172
1173 ASSERT_TRUE(mapper1.Front() == nullptr);
1174
1175 EXPECT_EQ(mapper0_count, 3u);
1176 EXPECT_EQ(mapper1_count, 3u);
1177
1178 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1179 EXPECT_EQ(output1[0].monotonic_event_time.time,
1180 e + chrono::seconds(100) + chrono::milliseconds(1000));
1181 EXPECT_TRUE(output1[0].data != nullptr);
1182
1183 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1184 EXPECT_EQ(output1[1].monotonic_event_time.time,
1185 e + chrono::seconds(100) + chrono::milliseconds(3000));
1186 EXPECT_TRUE(output1[1].data != nullptr);
1187 }
1188}
Austin Schuh8bf1e632021-01-02 22:41:04 -08001189// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
1190// returned.
1191TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
1192 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1193 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001194 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001195 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001196 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001197 writer1.QueueSpan(config4_.span());
1198
Austin Schuhd863e6e2022-10-16 15:44:50 -07001199 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001200 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001201 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001202 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
1203 e + chrono::nanoseconds(971)));
1204
Austin Schuhd863e6e2022-10-16 15:44:50 -07001205 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001206 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001207 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001208 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
1209 e + chrono::nanoseconds(5458)));
1210
Austin Schuhd863e6e2022-10-16 15:44:50 -07001211 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001212 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001213 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001214 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1215 }
1216
1217 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001218 LogFilesContainer log_files(parts);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001219 ASSERT_EQ(parts.size(), 1u);
1220
Austin Schuh79b30942021-01-24 22:32:21 -08001221 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001222 TimestampMapper mapper0("pi1", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001223 mapper0.set_timestamp_callback(
1224 [&](TimestampedMessage *) { ++mapper0_count; });
1225 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001226 TimestampMapper mapper1("pi2", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001227 mapper1.set_timestamp_callback(
1228 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -08001229
1230 mapper0.AddPeer(&mapper1);
1231 mapper1.AddPeer(&mapper0);
1232
1233 {
1234 std::deque<TimestampedMessage> output0;
1235
1236 for (int i = 0; i < 3; ++i) {
1237 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
1238 output0.emplace_back(std::move(*mapper0.Front()));
1239 mapper0.PopFront();
1240 }
1241
1242 ASSERT_TRUE(mapper0.Front() == nullptr);
1243
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001244 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1245 EXPECT_EQ(output0[0].monotonic_event_time.time,
1246 e + chrono::milliseconds(1000));
1247 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
1248 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
1249 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001250 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001251
1252 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1253 EXPECT_EQ(output0[1].monotonic_event_time.time,
1254 e + chrono::milliseconds(2000));
1255 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
1256 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
1257 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001258 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001259
1260 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1261 EXPECT_EQ(output0[2].monotonic_event_time.time,
1262 e + chrono::milliseconds(3000));
1263 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
1264 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
1265 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001266 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001267 }
1268
1269 {
1270 SCOPED_TRACE("Trying node1 now");
1271 std::deque<TimestampedMessage> output1;
1272
1273 for (int i = 0; i < 3; ++i) {
1274 ASSERT_TRUE(mapper1.Front() != nullptr);
1275 output1.emplace_back(std::move(*mapper1.Front()));
1276 mapper1.PopFront();
1277 }
1278
1279 ASSERT_TRUE(mapper1.Front() == nullptr);
1280
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001281 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1282 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001283 e + chrono::seconds(100) + chrono::milliseconds(1000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001284 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
1285 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001286 e + chrono::nanoseconds(971));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001287 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001288
1289 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1290 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001291 e + chrono::seconds(100) + chrono::milliseconds(2000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001292 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
1293 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001294 e + chrono::nanoseconds(5458));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001295 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001296
1297 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1298 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001299 e + chrono::seconds(100) + chrono::milliseconds(3000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001300 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
1301 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
1302 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001303 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001304 }
Austin Schuh79b30942021-01-24 22:32:21 -08001305
1306 EXPECT_EQ(mapper0_count, 3u);
1307 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001308}
1309
Austin Schuhd2f96102020-12-01 20:27:29 -08001310// Tests that we can match timestamps on delivered messages. By doing this in
1311// the reverse order, the second node needs to queue data up from the first node
1312// to find the matching timestamp.
1313TEST_F(TimestampMapperTest, ReadNode1First) {
1314 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1315 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001316 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001317 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001318 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001319 writer1.QueueSpan(config2_.span());
1320
Austin Schuhd863e6e2022-10-16 15:44:50 -07001321 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001322 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001323 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001324 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1325
Austin Schuhd863e6e2022-10-16 15:44:50 -07001326 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001327 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001328 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001329 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1330
Austin Schuhd863e6e2022-10-16 15:44:50 -07001331 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001332 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001333 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001334 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1335 }
1336
1337 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001338 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001339
1340 ASSERT_EQ(parts[0].logger_node, "pi1");
1341 ASSERT_EQ(parts[1].logger_node, "pi2");
1342
Austin Schuh79b30942021-01-24 22:32:21 -08001343 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001344 TimestampMapper mapper0("pi1", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001345 mapper0.set_timestamp_callback(
1346 [&](TimestampedMessage *) { ++mapper0_count; });
1347 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001348 TimestampMapper mapper1("pi2", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001349 mapper1.set_timestamp_callback(
1350 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001351
1352 mapper0.AddPeer(&mapper1);
1353 mapper1.AddPeer(&mapper0);
1354
1355 {
1356 SCOPED_TRACE("Trying node1 now");
1357 std::deque<TimestampedMessage> output1;
1358
1359 ASSERT_TRUE(mapper1.Front() != nullptr);
1360 output1.emplace_back(std::move(*mapper1.Front()));
1361 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001362 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001363
1364 ASSERT_TRUE(mapper1.Front() != nullptr);
1365 output1.emplace_back(std::move(*mapper1.Front()));
1366 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001367 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001368
1369 ASSERT_TRUE(mapper1.Front() != nullptr);
1370 output1.emplace_back(std::move(*mapper1.Front()));
1371 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001372 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001373
1374 ASSERT_TRUE(mapper1.Front() == nullptr);
1375
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001376 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1377 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001378 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001379 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001380
1381 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1382 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001383 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001384 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001385
1386 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1387 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001388 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001389 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001390 }
1391
1392 {
1393 std::deque<TimestampedMessage> output0;
1394
1395 ASSERT_TRUE(mapper0.Front() != nullptr);
1396 output0.emplace_back(std::move(*mapper0.Front()));
1397 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001398 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001399
1400 ASSERT_TRUE(mapper0.Front() != nullptr);
1401 output0.emplace_back(std::move(*mapper0.Front()));
1402 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001403 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001404
1405 ASSERT_TRUE(mapper0.Front() != nullptr);
1406 output0.emplace_back(std::move(*mapper0.Front()));
1407 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001408 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001409
1410 ASSERT_TRUE(mapper0.Front() == nullptr);
1411
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001412 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1413 EXPECT_EQ(output0[0].monotonic_event_time.time,
1414 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001415 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001416
1417 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1418 EXPECT_EQ(output0[1].monotonic_event_time.time,
1419 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001420 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001421
1422 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1423 EXPECT_EQ(output0[2].monotonic_event_time.time,
1424 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001425 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001426 }
Austin Schuh79b30942021-01-24 22:32:21 -08001427
1428 EXPECT_EQ(mapper0_count, 3u);
1429 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001430}
1431
1432// Tests that we return just the timestamps if we couldn't find the data and the
1433// missing data was at the beginning of the file.
1434TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1435 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1436 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001437 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001438 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001439 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001440 writer1.QueueSpan(config2_.span());
1441
1442 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001443 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001444 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1445
Austin Schuhd863e6e2022-10-16 15:44:50 -07001446 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001447 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001448 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001449 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1450
Austin Schuhd863e6e2022-10-16 15:44:50 -07001451 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001452 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001453 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001454 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1455 }
1456
1457 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001458 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001459
1460 ASSERT_EQ(parts[0].logger_node, "pi1");
1461 ASSERT_EQ(parts[1].logger_node, "pi2");
1462
Austin Schuh79b30942021-01-24 22:32:21 -08001463 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001464 TimestampMapper mapper0("pi1", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001465 mapper0.set_timestamp_callback(
1466 [&](TimestampedMessage *) { ++mapper0_count; });
1467 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001468 TimestampMapper mapper1("pi2", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001469 mapper1.set_timestamp_callback(
1470 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001471
1472 mapper0.AddPeer(&mapper1);
1473 mapper1.AddPeer(&mapper0);
1474
1475 {
1476 SCOPED_TRACE("Trying node1 now");
1477 std::deque<TimestampedMessage> output1;
1478
1479 ASSERT_TRUE(mapper1.Front() != nullptr);
1480 output1.emplace_back(std::move(*mapper1.Front()));
1481 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001482 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001483
1484 ASSERT_TRUE(mapper1.Front() != nullptr);
1485 output1.emplace_back(std::move(*mapper1.Front()));
1486 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001487 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001488
1489 ASSERT_TRUE(mapper1.Front() != nullptr);
1490 output1.emplace_back(std::move(*mapper1.Front()));
1491 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001492 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001493
1494 ASSERT_TRUE(mapper1.Front() == nullptr);
1495
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001496 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1497 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001498 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001499 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001500
1501 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1502 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001503 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001504 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001505
1506 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1507 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001508 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001509 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001510 }
Austin Schuh79b30942021-01-24 22:32:21 -08001511
1512 EXPECT_EQ(mapper0_count, 0u);
1513 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001514}
1515
1516// Tests that we return just the timestamps if we couldn't find the data and the
1517// missing data was at the end of the file.
1518TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1519 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1520 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001521 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001522 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001523 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001524 writer1.QueueSpan(config2_.span());
1525
Austin Schuhd863e6e2022-10-16 15:44:50 -07001526 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001527 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001528 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001529 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1530
Austin Schuhd863e6e2022-10-16 15:44:50 -07001531 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001532 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001533 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001534 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1535
1536 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001537 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001538 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1539 }
1540
1541 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001542 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001543
1544 ASSERT_EQ(parts[0].logger_node, "pi1");
1545 ASSERT_EQ(parts[1].logger_node, "pi2");
1546
Austin Schuh79b30942021-01-24 22:32:21 -08001547 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001548 TimestampMapper mapper0("pi1", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001549 mapper0.set_timestamp_callback(
1550 [&](TimestampedMessage *) { ++mapper0_count; });
1551 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001552 TimestampMapper mapper1("pi2", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001553 mapper1.set_timestamp_callback(
1554 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001555
1556 mapper0.AddPeer(&mapper1);
1557 mapper1.AddPeer(&mapper0);
1558
1559 {
1560 SCOPED_TRACE("Trying node1 now");
1561 std::deque<TimestampedMessage> output1;
1562
1563 ASSERT_TRUE(mapper1.Front() != nullptr);
1564 output1.emplace_back(std::move(*mapper1.Front()));
1565 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001566 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001567
1568 ASSERT_TRUE(mapper1.Front() != nullptr);
1569 output1.emplace_back(std::move(*mapper1.Front()));
1570 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001571 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001572
1573 ASSERT_TRUE(mapper1.Front() != nullptr);
1574 output1.emplace_back(std::move(*mapper1.Front()));
1575 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001576 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001577
1578 ASSERT_TRUE(mapper1.Front() == nullptr);
1579
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001580 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1581 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001582 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001583 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001584
1585 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1586 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001587 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001588 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001589
1590 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1591 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001592 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001593 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001594 }
Austin Schuh79b30942021-01-24 22:32:21 -08001595
1596 EXPECT_EQ(mapper0_count, 0u);
1597 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001598}
1599
Austin Schuh993ccb52020-12-12 15:59:32 -08001600// Tests that we handle a message which failed to forward or be logged.
1601TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1602 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1603 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001604 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001605 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001606 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001607 writer1.QueueSpan(config2_.span());
1608
Austin Schuhd863e6e2022-10-16 15:44:50 -07001609 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001610 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001611 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001612 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1613
1614 // Create both the timestamp and message, but don't log them, simulating a
1615 // forwarding drop.
1616 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1617 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1618 chrono::seconds(100));
1619
Austin Schuhd863e6e2022-10-16 15:44:50 -07001620 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001621 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001622 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001623 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1624 }
1625
1626 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001627 LogFilesContainer log_files(parts);
Austin Schuh993ccb52020-12-12 15:59:32 -08001628
1629 ASSERT_EQ(parts[0].logger_node, "pi1");
1630 ASSERT_EQ(parts[1].logger_node, "pi2");
1631
Austin Schuh79b30942021-01-24 22:32:21 -08001632 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001633 TimestampMapper mapper0("pi1", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001634 mapper0.set_timestamp_callback(
1635 [&](TimestampedMessage *) { ++mapper0_count; });
1636 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001637 TimestampMapper mapper1("pi2", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001638 mapper1.set_timestamp_callback(
1639 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001640
1641 mapper0.AddPeer(&mapper1);
1642 mapper1.AddPeer(&mapper0);
1643
1644 {
1645 std::deque<TimestampedMessage> output1;
1646
1647 ASSERT_TRUE(mapper1.Front() != nullptr);
1648 output1.emplace_back(std::move(*mapper1.Front()));
1649 mapper1.PopFront();
1650
1651 ASSERT_TRUE(mapper1.Front() != nullptr);
1652 output1.emplace_back(std::move(*mapper1.Front()));
1653
1654 ASSERT_FALSE(mapper1.Front() == nullptr);
1655
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001656 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1657 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001658 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001659 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001660
1661 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1662 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001663 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001664 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh993ccb52020-12-12 15:59:32 -08001665 }
Austin Schuh79b30942021-01-24 22:32:21 -08001666
1667 EXPECT_EQ(mapper0_count, 0u);
1668 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001669}
1670
Austin Schuhd2f96102020-12-01 20:27:29 -08001671// Tests that we properly sort log files with duplicate timestamps.
1672TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1673 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1674 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001675 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001676 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001677 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001678 writer1.QueueSpan(config2_.span());
1679
Austin Schuhd863e6e2022-10-16 15:44:50 -07001680 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001681 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001682 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001683 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1684
Austin Schuhd863e6e2022-10-16 15:44:50 -07001685 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001686 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001687 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001688 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1689
Austin Schuhd863e6e2022-10-16 15:44:50 -07001690 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001691 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001692 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001693 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1694
Austin Schuhd863e6e2022-10-16 15:44:50 -07001695 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001696 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001697 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001698 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1699 }
1700
1701 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001702 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001703
1704 ASSERT_EQ(parts[0].logger_node, "pi1");
1705 ASSERT_EQ(parts[1].logger_node, "pi2");
1706
Austin Schuh79b30942021-01-24 22:32:21 -08001707 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001708 TimestampMapper mapper0("pi1", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001709 mapper0.set_timestamp_callback(
1710 [&](TimestampedMessage *) { ++mapper0_count; });
1711 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001712 TimestampMapper mapper1("pi2", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001713 mapper1.set_timestamp_callback(
1714 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001715
1716 mapper0.AddPeer(&mapper1);
1717 mapper1.AddPeer(&mapper0);
1718
1719 {
1720 SCOPED_TRACE("Trying node1 now");
1721 std::deque<TimestampedMessage> output1;
1722
1723 for (int i = 0; i < 4; ++i) {
1724 ASSERT_TRUE(mapper1.Front() != nullptr);
1725 output1.emplace_back(std::move(*mapper1.Front()));
1726 mapper1.PopFront();
1727 }
1728 ASSERT_TRUE(mapper1.Front() == nullptr);
1729
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001730 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1731 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001732 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001733 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001734
1735 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1736 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001737 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001738 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001739
1740 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1741 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001742 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001743 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001744
1745 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1746 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001747 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001748 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001749 }
Austin Schuh79b30942021-01-24 22:32:21 -08001750
1751 EXPECT_EQ(mapper0_count, 0u);
1752 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001753}
1754
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001755// Tests that we properly produce a valid start time.
Austin Schuhd2f96102020-12-01 20:27:29 -08001756TEST_F(TimestampMapperTest, StartTime) {
1757 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1758 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001759 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001760 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001761 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001762 writer1.QueueSpan(config1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001763 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001764 writer2.QueueSpan(config3_.span());
1765 }
1766
1767 const std::vector<LogFile> parts =
1768 SortParts({logfile0_, logfile1_, logfile2_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001769 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001770
Austin Schuh79b30942021-01-24 22:32:21 -08001771 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001772 TimestampMapper mapper0("pi1", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001773 mapper0.set_timestamp_callback(
1774 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001775
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001776 EXPECT_EQ(mapper0.monotonic_start_time(0), e + chrono::milliseconds(1));
1777 EXPECT_EQ(mapper0.realtime_start_time(0),
Austin Schuhd2f96102020-12-01 20:27:29 -08001778 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001779 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001780}
1781
Austin Schuhfecf1d82020-12-19 16:57:28 -08001782// Tests that when a peer isn't registered, we treat that as if there was no
1783// data available.
1784TEST_F(TimestampMapperTest, NoPeer) {
1785 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1786 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001787 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001788 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001789 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001790 writer1.QueueSpan(config2_.span());
1791
1792 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001793 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001794 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1795
Austin Schuhd863e6e2022-10-16 15:44:50 -07001796 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001797 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001798 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001799 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1800
Austin Schuhd863e6e2022-10-16 15:44:50 -07001801 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001802 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001803 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001804 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1805 }
1806
1807 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001808 LogFilesContainer log_files(parts);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001809
1810 ASSERT_EQ(parts[0].logger_node, "pi1");
1811 ASSERT_EQ(parts[1].logger_node, "pi2");
1812
Austin Schuh79b30942021-01-24 22:32:21 -08001813 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001814 TimestampMapper mapper1("pi2", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001815 mapper1.set_timestamp_callback(
1816 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001817
1818 {
1819 std::deque<TimestampedMessage> output1;
1820
1821 ASSERT_TRUE(mapper1.Front() != nullptr);
1822 output1.emplace_back(std::move(*mapper1.Front()));
1823 mapper1.PopFront();
1824 ASSERT_TRUE(mapper1.Front() != nullptr);
1825 output1.emplace_back(std::move(*mapper1.Front()));
1826 mapper1.PopFront();
1827 ASSERT_TRUE(mapper1.Front() != nullptr);
1828 output1.emplace_back(std::move(*mapper1.Front()));
1829 mapper1.PopFront();
1830 ASSERT_TRUE(mapper1.Front() == nullptr);
1831
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001832 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1833 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001834 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001835 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001836
1837 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1838 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001839 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001840 EXPECT_FALSE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001841
1842 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1843 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001844 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001845 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001846 }
Austin Schuh79b30942021-01-24 22:32:21 -08001847 EXPECT_EQ(mapper1_count, 3u);
1848}
1849
1850// Tests that we can queue messages and call the timestamp callback for both
1851// nodes.
1852TEST_F(TimestampMapperTest, QueueUntilNode0) {
1853 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1854 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001855 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh79b30942021-01-24 22:32:21 -08001856 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001857 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh79b30942021-01-24 22:32:21 -08001858 writer1.QueueSpan(config2_.span());
1859
Austin Schuhd863e6e2022-10-16 15:44:50 -07001860 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001861 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001862 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001863 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1864
Austin Schuhd863e6e2022-10-16 15:44:50 -07001865 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001866 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001867 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001868 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1869
Austin Schuhd863e6e2022-10-16 15:44:50 -07001870 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001871 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001872 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001873 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1874
Austin Schuhd863e6e2022-10-16 15:44:50 -07001875 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001876 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001877 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001878 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1879 }
1880
1881 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001882 LogFilesContainer log_files(parts);
Austin Schuh79b30942021-01-24 22:32:21 -08001883
1884 ASSERT_EQ(parts[0].logger_node, "pi1");
1885 ASSERT_EQ(parts[1].logger_node, "pi2");
1886
1887 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001888 TimestampMapper mapper0("pi1", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001889 mapper0.set_timestamp_callback(
1890 [&](TimestampedMessage *) { ++mapper0_count; });
1891 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001892 TimestampMapper mapper1("pi2", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001893 mapper1.set_timestamp_callback(
1894 [&](TimestampedMessage *) { ++mapper1_count; });
1895
1896 mapper0.AddPeer(&mapper1);
1897 mapper1.AddPeer(&mapper0);
1898
1899 {
1900 std::deque<TimestampedMessage> output0;
1901
1902 EXPECT_EQ(mapper0_count, 0u);
1903 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001904 mapper0.QueueUntil(
1905 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001906 EXPECT_EQ(mapper0_count, 3u);
1907 EXPECT_EQ(mapper1_count, 0u);
1908
1909 ASSERT_TRUE(mapper0.Front() != nullptr);
1910 EXPECT_EQ(mapper0_count, 3u);
1911 EXPECT_EQ(mapper1_count, 0u);
1912
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001913 mapper0.QueueUntil(
1914 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001915 EXPECT_EQ(mapper0_count, 3u);
1916 EXPECT_EQ(mapper1_count, 0u);
1917
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001918 mapper0.QueueUntil(
1919 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001920 EXPECT_EQ(mapper0_count, 4u);
1921 EXPECT_EQ(mapper1_count, 0u);
1922
1923 output0.emplace_back(std::move(*mapper0.Front()));
1924 mapper0.PopFront();
1925 output0.emplace_back(std::move(*mapper0.Front()));
1926 mapper0.PopFront();
1927 output0.emplace_back(std::move(*mapper0.Front()));
1928 mapper0.PopFront();
1929 output0.emplace_back(std::move(*mapper0.Front()));
1930 mapper0.PopFront();
1931
1932 EXPECT_EQ(mapper0_count, 4u);
1933 EXPECT_EQ(mapper1_count, 0u);
1934
1935 ASSERT_TRUE(mapper0.Front() == nullptr);
1936
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001937 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1938 EXPECT_EQ(output0[0].monotonic_event_time.time,
1939 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001940 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001941
1942 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1943 EXPECT_EQ(output0[1].monotonic_event_time.time,
1944 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001945 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001946
1947 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1948 EXPECT_EQ(output0[2].monotonic_event_time.time,
1949 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001950 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001951
1952 EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
1953 EXPECT_EQ(output0[3].monotonic_event_time.time,
1954 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001955 EXPECT_TRUE(output0[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001956 }
1957
1958 {
1959 SCOPED_TRACE("Trying node1 now");
1960 std::deque<TimestampedMessage> output1;
1961
1962 EXPECT_EQ(mapper0_count, 4u);
1963 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001964 mapper1.QueueUntil(BootTimestamp{
1965 .boot = 0,
1966 .time = e + chrono::seconds(100) + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001967 EXPECT_EQ(mapper0_count, 4u);
1968 EXPECT_EQ(mapper1_count, 3u);
1969
1970 ASSERT_TRUE(mapper1.Front() != nullptr);
1971 EXPECT_EQ(mapper0_count, 4u);
1972 EXPECT_EQ(mapper1_count, 3u);
1973
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001974 mapper1.QueueUntil(BootTimestamp{
1975 .boot = 0,
1976 .time = e + chrono::seconds(100) + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001977 EXPECT_EQ(mapper0_count, 4u);
1978 EXPECT_EQ(mapper1_count, 3u);
1979
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001980 mapper1.QueueUntil(BootTimestamp{
1981 .boot = 0,
1982 .time = e + chrono::seconds(100) + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001983 EXPECT_EQ(mapper0_count, 4u);
1984 EXPECT_EQ(mapper1_count, 4u);
1985
1986 ASSERT_TRUE(mapper1.Front() != nullptr);
1987 EXPECT_EQ(mapper0_count, 4u);
1988 EXPECT_EQ(mapper1_count, 4u);
1989
1990 output1.emplace_back(std::move(*mapper1.Front()));
1991 mapper1.PopFront();
1992 ASSERT_TRUE(mapper1.Front() != nullptr);
1993 output1.emplace_back(std::move(*mapper1.Front()));
1994 mapper1.PopFront();
1995 ASSERT_TRUE(mapper1.Front() != nullptr);
1996 output1.emplace_back(std::move(*mapper1.Front()));
1997 mapper1.PopFront();
1998 ASSERT_TRUE(mapper1.Front() != nullptr);
1999 output1.emplace_back(std::move(*mapper1.Front()));
2000 mapper1.PopFront();
2001
2002 EXPECT_EQ(mapper0_count, 4u);
2003 EXPECT_EQ(mapper1_count, 4u);
2004
2005 ASSERT_TRUE(mapper1.Front() == nullptr);
2006
2007 EXPECT_EQ(mapper0_count, 4u);
2008 EXPECT_EQ(mapper1_count, 4u);
2009
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002010 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2011 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002012 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002013 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002014
2015 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
2016 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002017 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002018 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002019
2020 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
2021 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002022 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002023 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002024
2025 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
2026 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002027 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002028 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08002029 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08002030}
2031
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002032class BootMergerTest : public SortingElementTest {
2033 public:
2034 BootMergerTest()
2035 : SortingElementTest(),
2036 boot0_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002037 /* 100ms */
2038 "max_out_of_order_duration": 100000000,
2039 "node": {
2040 "name": "pi2"
2041 },
2042 "logger_node": {
2043 "name": "pi1"
2044 },
2045 "monotonic_start_time": 1000000,
2046 "realtime_start_time": 1000000000000,
2047 "logger_monotonic_start_time": 1000000,
2048 "logger_realtime_start_time": 1000000000000,
2049 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2050 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
2051 "parts_index": 0,
2052 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2053 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002054 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2055 "boot_uuids": [
2056 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2057 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2058 ""
2059 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002060})")),
2061 boot1_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002062 /* 100ms */
2063 "max_out_of_order_duration": 100000000,
2064 "node": {
2065 "name": "pi2"
2066 },
2067 "logger_node": {
2068 "name": "pi1"
2069 },
2070 "monotonic_start_time": 1000000,
2071 "realtime_start_time": 1000000000000,
2072 "logger_monotonic_start_time": 1000000,
2073 "logger_realtime_start_time": 1000000000000,
2074 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2075 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
2076 "parts_index": 1,
2077 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2078 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002079 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2080 "boot_uuids": [
2081 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2082 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2083 ""
2084 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002085})")) {}
Austin Schuh8bee6882021-06-28 21:03:28 -07002086
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002087 protected:
2088 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
2089 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
2090};
2091
2092// This tests that we can properly sort a multi-node log file which has the old
2093// (and buggy) timestamps in the header, and the non-resetting parts_index.
2094// These make it so we can just bairly figure out what happened first and what
2095// happened second, but not in a way that is robust to multiple nodes rebooting.
2096TEST_F(BootMergerTest, OldReboot) {
Austin Schuh8bee6882021-06-28 21:03:28 -07002097 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002098 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002099 writer.QueueSpan(boot0_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002100 }
2101 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002102 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002103 writer.QueueSpan(boot1_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002104 }
2105
2106 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
2107
2108 ASSERT_EQ(parts.size(), 1u);
2109 ASSERT_EQ(parts[0].parts.size(), 2u);
2110
2111 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
2112 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002113 boot0_.message().source_node_boot_uuid()->string_view());
Austin Schuh8bee6882021-06-28 21:03:28 -07002114
2115 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
2116 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002117 boot1_.message().source_node_boot_uuid()->string_view());
2118}
2119
2120// This tests that we can produce messages ordered across a reboot.
2121TEST_F(BootMergerTest, SortAcrossReboot) {
2122 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2123 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002124 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002125 writer.QueueSpan(boot0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002126 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002127 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002128 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002129 MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
2130 }
2131 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002132 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002133 writer.QueueSpan(boot1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002134 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002135 MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002136 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002137 MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
2138 }
2139
2140 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07002141 LogFilesContainer log_files(parts);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002142 ASSERT_EQ(parts.size(), 1u);
2143 ASSERT_EQ(parts[0].parts.size(), 2u);
2144
Alexei Strots1f51ac72023-05-15 10:14:54 -07002145 BootMerger merger("pi2", log_files);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002146
2147 EXPECT_EQ(merger.node(), 1u);
2148
2149 std::vector<Message> output;
2150 for (int i = 0; i < 4; ++i) {
2151 ASSERT_TRUE(merger.Front() != nullptr);
2152 output.emplace_back(std::move(*merger.Front()));
2153 merger.PopFront();
2154 }
2155
2156 ASSERT_TRUE(merger.Front() == nullptr);
2157
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002158 EXPECT_EQ(output[0].timestamp.boot, 0u);
2159 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
2160 EXPECT_EQ(output[1].timestamp.boot, 0u);
2161 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(2000));
2162
2163 EXPECT_EQ(output[2].timestamp.boot, 1u);
2164 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(100));
2165 EXPECT_EQ(output[3].timestamp.boot, 1u);
2166 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
Austin Schuh8bee6882021-06-28 21:03:28 -07002167}
2168
Austin Schuh48507722021-07-17 17:29:24 -07002169class RebootTimestampMapperTest : public SortingElementTest {
2170 public:
2171 RebootTimestampMapperTest()
2172 : SortingElementTest(),
2173 boot0a_(MakeHeader(config_, R"({
2174 /* 100ms */
2175 "max_out_of_order_duration": 100000000,
2176 "node": {
2177 "name": "pi1"
2178 },
2179 "logger_node": {
2180 "name": "pi1"
2181 },
2182 "monotonic_start_time": 1000000,
2183 "realtime_start_time": 1000000000000,
2184 "logger_monotonic_start_time": 1000000,
2185 "logger_realtime_start_time": 1000000000000,
2186 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2187 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2188 "parts_index": 0,
2189 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2190 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2191 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2192 "boot_uuids": [
2193 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2194 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2195 ""
2196 ]
2197})")),
2198 boot0b_(MakeHeader(config_, R"({
2199 /* 100ms */
2200 "max_out_of_order_duration": 100000000,
2201 "node": {
2202 "name": "pi1"
2203 },
2204 "logger_node": {
2205 "name": "pi1"
2206 },
2207 "monotonic_start_time": 1000000,
2208 "realtime_start_time": 1000000000000,
2209 "logger_monotonic_start_time": 1000000,
2210 "logger_realtime_start_time": 1000000000000,
2211 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2212 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2213 "parts_index": 1,
2214 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2215 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2216 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2217 "boot_uuids": [
2218 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2219 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2220 ""
2221 ]
2222})")),
2223 boot1a_(MakeHeader(config_, R"({
2224 /* 100ms */
2225 "max_out_of_order_duration": 100000000,
2226 "node": {
2227 "name": "pi2"
2228 },
2229 "logger_node": {
2230 "name": "pi1"
2231 },
2232 "monotonic_start_time": 1000000,
2233 "realtime_start_time": 1000000000000,
2234 "logger_monotonic_start_time": 1000000,
2235 "logger_realtime_start_time": 1000000000000,
2236 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2237 "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
2238 "parts_index": 0,
2239 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2240 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2241 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2242 "boot_uuids": [
2243 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2244 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2245 ""
2246 ]
2247})")),
2248 boot1b_(MakeHeader(config_, R"({
2249 /* 100ms */
2250 "max_out_of_order_duration": 100000000,
2251 "node": {
2252 "name": "pi2"
2253 },
2254 "logger_node": {
2255 "name": "pi1"
2256 },
2257 "monotonic_start_time": 1000000,
2258 "realtime_start_time": 1000000000000,
2259 "logger_monotonic_start_time": 1000000,
2260 "logger_realtime_start_time": 1000000000000,
2261 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2262 "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
2263 "parts_index": 1,
2264 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2265 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2266 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2267 "boot_uuids": [
2268 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2269 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2270 ""
2271 ]
2272})")) {}
2273
2274 protected:
2275 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
2276 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
2277 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
2278 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
2279};
2280
Austin Schuh48507722021-07-17 17:29:24 -07002281// Tests that we can match timestamps on delivered messages in the presence of
2282// reboots on the node receiving timestamps.
2283TEST_F(RebootTimestampMapperTest, ReadNode0First) {
2284 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2285 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002286 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002287 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002288 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002289 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002290 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002291 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002292 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002293 writer1b.QueueSpan(boot1b_.span());
2294
Austin Schuhd863e6e2022-10-16 15:44:50 -07002295 writer0a.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002296 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002297 writer1a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002298 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
2299 e + chrono::milliseconds(1001)));
2300
Austin Schuhd863e6e2022-10-16 15:44:50 -07002301 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh58646e22021-08-23 23:51:46 -07002302 e + chrono::milliseconds(1000), 0, chrono::seconds(21),
2303 e + chrono::milliseconds(2001)));
2304
Austin Schuhd863e6e2022-10-16 15:44:50 -07002305 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002306 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002307 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002308 e + chrono::milliseconds(2000), 0, chrono::seconds(20),
2309 e + chrono::milliseconds(2001)));
2310
Austin Schuhd863e6e2022-10-16 15:44:50 -07002311 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002312 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002313 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002314 e + chrono::milliseconds(3000), 0, chrono::seconds(20),
2315 e + chrono::milliseconds(3001)));
2316 }
2317
Austin Schuh58646e22021-08-23 23:51:46 -07002318 const std::vector<LogFile> parts =
2319 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07002320 LogFilesContainer log_files(parts);
Austin Schuh48507722021-07-17 17:29:24 -07002321
2322 for (const auto &x : parts) {
2323 LOG(INFO) << x;
2324 }
2325 ASSERT_EQ(parts.size(), 1u);
2326 ASSERT_EQ(parts[0].logger_node, "pi1");
2327
2328 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07002329 TimestampMapper mapper0("pi1", log_files);
Austin Schuh48507722021-07-17 17:29:24 -07002330 mapper0.set_timestamp_callback(
2331 [&](TimestampedMessage *) { ++mapper0_count; });
2332 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07002333 TimestampMapper mapper1("pi2", log_files);
Austin Schuh48507722021-07-17 17:29:24 -07002334 mapper1.set_timestamp_callback(
2335 [&](TimestampedMessage *) { ++mapper1_count; });
2336
2337 mapper0.AddPeer(&mapper1);
2338 mapper1.AddPeer(&mapper0);
2339
2340 {
2341 std::deque<TimestampedMessage> output0;
2342
2343 EXPECT_EQ(mapper0_count, 0u);
2344 EXPECT_EQ(mapper1_count, 0u);
2345 ASSERT_TRUE(mapper0.Front() != nullptr);
2346 EXPECT_EQ(mapper0_count, 1u);
2347 EXPECT_EQ(mapper1_count, 0u);
2348 output0.emplace_back(std::move(*mapper0.Front()));
2349 mapper0.PopFront();
2350 EXPECT_TRUE(mapper0.started());
2351 EXPECT_EQ(mapper0_count, 1u);
2352 EXPECT_EQ(mapper1_count, 0u);
2353
2354 ASSERT_TRUE(mapper0.Front() != nullptr);
2355 EXPECT_EQ(mapper0_count, 2u);
2356 EXPECT_EQ(mapper1_count, 0u);
2357 output0.emplace_back(std::move(*mapper0.Front()));
2358 mapper0.PopFront();
2359 EXPECT_TRUE(mapper0.started());
2360
2361 ASSERT_TRUE(mapper0.Front() != nullptr);
2362 output0.emplace_back(std::move(*mapper0.Front()));
2363 mapper0.PopFront();
2364 EXPECT_TRUE(mapper0.started());
2365
2366 EXPECT_EQ(mapper0_count, 3u);
2367 EXPECT_EQ(mapper1_count, 0u);
2368
2369 ASSERT_TRUE(mapper0.Front() == nullptr);
2370
2371 LOG(INFO) << output0[0];
2372 LOG(INFO) << output0[1];
2373 LOG(INFO) << output0[2];
2374
2375 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2376 EXPECT_EQ(output0[0].monotonic_event_time.time,
2377 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002378 EXPECT_EQ(output0[0].queue_index,
2379 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002380 EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
2381 EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002382 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002383
2384 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2385 EXPECT_EQ(output0[1].monotonic_event_time.time,
2386 e + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002387 EXPECT_EQ(output0[1].queue_index,
2388 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002389 EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
2390 EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002391 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002392
2393 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2394 EXPECT_EQ(output0[2].monotonic_event_time.time,
2395 e + chrono::milliseconds(3000));
Austin Schuh58646e22021-08-23 23:51:46 -07002396 EXPECT_EQ(output0[2].queue_index,
2397 (BootQueueIndex{.boot = 0u, .index = 2u}));
Austin Schuh48507722021-07-17 17:29:24 -07002398 EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
2399 EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002400 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002401 }
2402
2403 {
2404 SCOPED_TRACE("Trying node1 now");
2405 std::deque<TimestampedMessage> output1;
2406
2407 EXPECT_EQ(mapper0_count, 3u);
2408 EXPECT_EQ(mapper1_count, 0u);
2409
2410 ASSERT_TRUE(mapper1.Front() != nullptr);
2411 EXPECT_EQ(mapper0_count, 3u);
2412 EXPECT_EQ(mapper1_count, 1u);
2413 output1.emplace_back(std::move(*mapper1.Front()));
2414 mapper1.PopFront();
2415 EXPECT_TRUE(mapper1.started());
2416 EXPECT_EQ(mapper0_count, 3u);
2417 EXPECT_EQ(mapper1_count, 1u);
2418
2419 ASSERT_TRUE(mapper1.Front() != nullptr);
2420 EXPECT_EQ(mapper0_count, 3u);
2421 EXPECT_EQ(mapper1_count, 2u);
2422 output1.emplace_back(std::move(*mapper1.Front()));
2423 mapper1.PopFront();
2424 EXPECT_TRUE(mapper1.started());
2425
2426 ASSERT_TRUE(mapper1.Front() != nullptr);
2427 output1.emplace_back(std::move(*mapper1.Front()));
2428 mapper1.PopFront();
2429 EXPECT_TRUE(mapper1.started());
2430
Austin Schuh58646e22021-08-23 23:51:46 -07002431 ASSERT_TRUE(mapper1.Front() != nullptr);
2432 output1.emplace_back(std::move(*mapper1.Front()));
2433 mapper1.PopFront();
2434 EXPECT_TRUE(mapper1.started());
2435
Austin Schuh48507722021-07-17 17:29:24 -07002436 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002437 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002438
2439 ASSERT_TRUE(mapper1.Front() == nullptr);
2440
2441 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002442 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002443
2444 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2445 EXPECT_EQ(output1[0].monotonic_event_time.time,
2446 e + chrono::seconds(100) + chrono::milliseconds(1000));
2447 EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
2448 EXPECT_EQ(output1[0].monotonic_remote_time.time,
2449 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002450 EXPECT_EQ(output1[0].remote_queue_index,
2451 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002452 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
2453 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
2454 e + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002455 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002456
2457 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2458 EXPECT_EQ(output1[1].monotonic_event_time.time,
2459 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002460 EXPECT_EQ(output1[1].remote_queue_index,
2461 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002462 EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
2463 EXPECT_EQ(output1[1].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002464 e + chrono::milliseconds(1000));
Austin Schuh48507722021-07-17 17:29:24 -07002465 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
2466 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
2467 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002468 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002469
2470 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2471 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002472 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh48507722021-07-17 17:29:24 -07002473 EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
2474 EXPECT_EQ(output1[2].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002475 e + chrono::milliseconds(2000));
2476 EXPECT_EQ(output1[2].remote_queue_index,
2477 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002478 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
2479 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002480 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002481 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002482
Austin Schuh58646e22021-08-23 23:51:46 -07002483 EXPECT_EQ(output1[3].monotonic_event_time.boot, 1u);
2484 EXPECT_EQ(output1[3].monotonic_event_time.time,
2485 e + chrono::seconds(20) + chrono::milliseconds(3000));
2486 EXPECT_EQ(output1[3].monotonic_remote_time.boot, 0u);
2487 EXPECT_EQ(output1[3].monotonic_remote_time.time,
2488 e + chrono::milliseconds(3000));
2489 EXPECT_EQ(output1[3].remote_queue_index,
2490 (BootQueueIndex{.boot = 0u, .index = 2u}));
2491 EXPECT_EQ(output1[3].monotonic_timestamp_time.boot, 0u);
2492 EXPECT_EQ(output1[3].monotonic_timestamp_time.time,
2493 e + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002494 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh58646e22021-08-23 23:51:46 -07002495
Austin Schuh48507722021-07-17 17:29:24 -07002496 LOG(INFO) << output1[0];
2497 LOG(INFO) << output1[1];
2498 LOG(INFO) << output1[2];
Austin Schuh58646e22021-08-23 23:51:46 -07002499 LOG(INFO) << output1[3];
Austin Schuh48507722021-07-17 17:29:24 -07002500 }
2501}
2502
2503TEST_F(RebootTimestampMapperTest, Node2Reboot) {
2504 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2505 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002506 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002507 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002508 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002509 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002510 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002511 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002512 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002513 writer1b.QueueSpan(boot1b_.span());
2514
Austin Schuhd863e6e2022-10-16 15:44:50 -07002515 writer1a.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002516 e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002517 writer0a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002518 e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
2519 chrono::seconds(-100),
2520 e + chrono::seconds(100) + chrono::milliseconds(1001)));
2521
Austin Schuhd863e6e2022-10-16 15:44:50 -07002522 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002523 e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002524 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002525 e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
2526 chrono::seconds(-20),
2527 e + chrono::seconds(20) + chrono::milliseconds(2001)));
2528
Austin Schuhd863e6e2022-10-16 15:44:50 -07002529 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002530 e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002531 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002532 e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
2533 chrono::seconds(-20),
2534 e + chrono::seconds(20) + chrono::milliseconds(3001)));
2535 }
2536
2537 const std::vector<LogFile> parts =
2538 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07002539 LogFilesContainer log_files(parts);
Austin Schuh48507722021-07-17 17:29:24 -07002540
2541 for (const auto &x : parts) {
2542 LOG(INFO) << x;
2543 }
2544 ASSERT_EQ(parts.size(), 1u);
2545 ASSERT_EQ(parts[0].logger_node, "pi1");
2546
2547 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07002548 TimestampMapper mapper0("pi1", log_files);
Austin Schuh48507722021-07-17 17:29:24 -07002549 mapper0.set_timestamp_callback(
2550 [&](TimestampedMessage *) { ++mapper0_count; });
2551 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07002552 TimestampMapper mapper1("pi2", log_files);
Austin Schuh48507722021-07-17 17:29:24 -07002553 mapper1.set_timestamp_callback(
2554 [&](TimestampedMessage *) { ++mapper1_count; });
2555
2556 mapper0.AddPeer(&mapper1);
2557 mapper1.AddPeer(&mapper0);
2558
2559 {
2560 std::deque<TimestampedMessage> output0;
2561
2562 EXPECT_EQ(mapper0_count, 0u);
2563 EXPECT_EQ(mapper1_count, 0u);
2564 ASSERT_TRUE(mapper0.Front() != nullptr);
2565 EXPECT_EQ(mapper0_count, 1u);
2566 EXPECT_EQ(mapper1_count, 0u);
2567 output0.emplace_back(std::move(*mapper0.Front()));
2568 mapper0.PopFront();
2569 EXPECT_TRUE(mapper0.started());
2570 EXPECT_EQ(mapper0_count, 1u);
2571 EXPECT_EQ(mapper1_count, 0u);
2572
2573 ASSERT_TRUE(mapper0.Front() != nullptr);
2574 EXPECT_EQ(mapper0_count, 2u);
2575 EXPECT_EQ(mapper1_count, 0u);
2576 output0.emplace_back(std::move(*mapper0.Front()));
2577 mapper0.PopFront();
2578 EXPECT_TRUE(mapper0.started());
2579
2580 ASSERT_TRUE(mapper0.Front() != nullptr);
2581 output0.emplace_back(std::move(*mapper0.Front()));
2582 mapper0.PopFront();
2583 EXPECT_TRUE(mapper0.started());
2584
2585 EXPECT_EQ(mapper0_count, 3u);
2586 EXPECT_EQ(mapper1_count, 0u);
2587
2588 ASSERT_TRUE(mapper0.Front() == nullptr);
2589
2590 LOG(INFO) << output0[0];
2591 LOG(INFO) << output0[1];
2592 LOG(INFO) << output0[2];
2593
2594 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2595 EXPECT_EQ(output0[0].monotonic_event_time.time,
2596 e + chrono::milliseconds(1000));
2597 EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
2598 EXPECT_EQ(output0[0].monotonic_remote_time.time,
2599 e + chrono::seconds(100) + chrono::milliseconds(1000));
2600 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
2601 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
2602 e + chrono::seconds(100) + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002603 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002604
2605 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2606 EXPECT_EQ(output0[1].monotonic_event_time.time,
2607 e + chrono::milliseconds(2000));
2608 EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
2609 EXPECT_EQ(output0[1].monotonic_remote_time.time,
2610 e + chrono::seconds(20) + chrono::milliseconds(2000));
2611 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
2612 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
2613 e + chrono::seconds(20) + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002614 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002615
2616 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2617 EXPECT_EQ(output0[2].monotonic_event_time.time,
2618 e + chrono::milliseconds(3000));
2619 EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
2620 EXPECT_EQ(output0[2].monotonic_remote_time.time,
2621 e + chrono::seconds(20) + chrono::milliseconds(3000));
2622 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
2623 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
2624 e + chrono::seconds(20) + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002625 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002626 }
2627
2628 {
2629 SCOPED_TRACE("Trying node1 now");
2630 std::deque<TimestampedMessage> output1;
2631
2632 EXPECT_EQ(mapper0_count, 3u);
2633 EXPECT_EQ(mapper1_count, 0u);
2634
2635 ASSERT_TRUE(mapper1.Front() != nullptr);
2636 EXPECT_EQ(mapper0_count, 3u);
2637 EXPECT_EQ(mapper1_count, 1u);
2638 output1.emplace_back(std::move(*mapper1.Front()));
2639 mapper1.PopFront();
2640 EXPECT_TRUE(mapper1.started());
2641 EXPECT_EQ(mapper0_count, 3u);
2642 EXPECT_EQ(mapper1_count, 1u);
2643
2644 ASSERT_TRUE(mapper1.Front() != nullptr);
2645 EXPECT_EQ(mapper0_count, 3u);
2646 EXPECT_EQ(mapper1_count, 2u);
2647 output1.emplace_back(std::move(*mapper1.Front()));
2648 mapper1.PopFront();
2649 EXPECT_TRUE(mapper1.started());
2650
2651 ASSERT_TRUE(mapper1.Front() != nullptr);
2652 output1.emplace_back(std::move(*mapper1.Front()));
2653 mapper1.PopFront();
2654 EXPECT_TRUE(mapper1.started());
2655
2656 EXPECT_EQ(mapper0_count, 3u);
2657 EXPECT_EQ(mapper1_count, 3u);
2658
2659 ASSERT_TRUE(mapper1.Front() == nullptr);
2660
2661 EXPECT_EQ(mapper0_count, 3u);
2662 EXPECT_EQ(mapper1_count, 3u);
2663
2664 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2665 EXPECT_EQ(output1[0].monotonic_event_time.time,
2666 e + chrono::seconds(100) + chrono::milliseconds(1000));
2667 EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
2668 EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002669 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002670
2671 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2672 EXPECT_EQ(output1[1].monotonic_event_time.time,
2673 e + chrono::seconds(20) + chrono::milliseconds(2000));
2674 EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
2675 EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002676 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002677
2678 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2679 EXPECT_EQ(output1[2].monotonic_event_time.time,
2680 e + chrono::seconds(20) + chrono::milliseconds(3000));
2681 EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
2682 EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002683 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002684
2685 LOG(INFO) << output1[0];
2686 LOG(INFO) << output1[1];
2687 LOG(INFO) << output1[2];
2688 }
2689}
2690
Austin Schuh44c61472021-11-22 21:04:10 -08002691class SortingDeathTest : public SortingElementTest {
2692 public:
2693 SortingDeathTest()
2694 : SortingElementTest(),
2695 part0_(MakeHeader(config_, R"({
2696 /* 100ms */
2697 "max_out_of_order_duration": 100000000,
2698 "node": {
2699 "name": "pi1"
2700 },
2701 "logger_node": {
2702 "name": "pi1"
2703 },
2704 "monotonic_start_time": 1000000,
2705 "realtime_start_time": 1000000000000,
2706 "logger_monotonic_start_time": 1000000,
2707 "logger_realtime_start_time": 1000000000000,
2708 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2709 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2710 "parts_index": 0,
2711 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2712 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2713 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2714 "boot_uuids": [
2715 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2716 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2717 ""
2718 ],
2719 "oldest_remote_monotonic_timestamps": [
2720 9223372036854775807,
2721 9223372036854775807,
2722 9223372036854775807
2723 ],
2724 "oldest_local_monotonic_timestamps": [
2725 9223372036854775807,
2726 9223372036854775807,
2727 9223372036854775807
2728 ],
2729 "oldest_remote_unreliable_monotonic_timestamps": [
2730 9223372036854775807,
2731 0,
2732 9223372036854775807
2733 ],
2734 "oldest_local_unreliable_monotonic_timestamps": [
2735 9223372036854775807,
2736 0,
2737 9223372036854775807
2738 ]
2739})")),
2740 part1_(MakeHeader(config_, R"({
2741 /* 100ms */
2742 "max_out_of_order_duration": 100000000,
2743 "node": {
2744 "name": "pi1"
2745 },
2746 "logger_node": {
2747 "name": "pi1"
2748 },
2749 "monotonic_start_time": 1000000,
2750 "realtime_start_time": 1000000000000,
2751 "logger_monotonic_start_time": 1000000,
2752 "logger_realtime_start_time": 1000000000000,
2753 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2754 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2755 "parts_index": 1,
2756 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2757 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2758 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2759 "boot_uuids": [
2760 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2761 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2762 ""
2763 ],
2764 "oldest_remote_monotonic_timestamps": [
2765 9223372036854775807,
2766 9223372036854775807,
2767 9223372036854775807
2768 ],
2769 "oldest_local_monotonic_timestamps": [
2770 9223372036854775807,
2771 9223372036854775807,
2772 9223372036854775807
2773 ],
2774 "oldest_remote_unreliable_monotonic_timestamps": [
2775 9223372036854775807,
2776 100000,
2777 9223372036854775807
2778 ],
2779 "oldest_local_unreliable_monotonic_timestamps": [
2780 9223372036854775807,
2781 100000,
2782 9223372036854775807
2783 ]
2784})")),
2785 part2_(MakeHeader(config_, R"({
2786 /* 100ms */
2787 "max_out_of_order_duration": 100000000,
2788 "node": {
2789 "name": "pi1"
2790 },
2791 "logger_node": {
2792 "name": "pi1"
2793 },
2794 "monotonic_start_time": 1000000,
2795 "realtime_start_time": 1000000000000,
2796 "logger_monotonic_start_time": 1000000,
2797 "logger_realtime_start_time": 1000000000000,
2798 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2799 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2800 "parts_index": 2,
2801 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2802 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2803 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2804 "boot_uuids": [
2805 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2806 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2807 ""
2808 ],
2809 "oldest_remote_monotonic_timestamps": [
2810 9223372036854775807,
2811 9223372036854775807,
2812 9223372036854775807
2813 ],
2814 "oldest_local_monotonic_timestamps": [
2815 9223372036854775807,
2816 9223372036854775807,
2817 9223372036854775807
2818 ],
2819 "oldest_remote_unreliable_monotonic_timestamps": [
2820 9223372036854775807,
2821 200000,
2822 9223372036854775807
2823 ],
2824 "oldest_local_unreliable_monotonic_timestamps": [
2825 9223372036854775807,
2826 200000,
2827 9223372036854775807
2828 ]
2829})")),
2830 part3_(MakeHeader(config_, R"({
2831 /* 100ms */
2832 "max_out_of_order_duration": 100000000,
2833 "node": {
2834 "name": "pi1"
2835 },
2836 "logger_node": {
2837 "name": "pi1"
2838 },
2839 "monotonic_start_time": 1000000,
2840 "realtime_start_time": 1000000000000,
2841 "logger_monotonic_start_time": 1000000,
2842 "logger_realtime_start_time": 1000000000000,
2843 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2844 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2845 "parts_index": 3,
2846 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2847 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2848 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2849 "boot_uuids": [
2850 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2851 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2852 ""
2853 ],
2854 "oldest_remote_monotonic_timestamps": [
2855 9223372036854775807,
2856 9223372036854775807,
2857 9223372036854775807
2858 ],
2859 "oldest_local_monotonic_timestamps": [
2860 9223372036854775807,
2861 9223372036854775807,
2862 9223372036854775807
2863 ],
2864 "oldest_remote_unreliable_monotonic_timestamps": [
2865 9223372036854775807,
2866 300000,
2867 9223372036854775807
2868 ],
2869 "oldest_local_unreliable_monotonic_timestamps": [
2870 9223372036854775807,
2871 300000,
2872 9223372036854775807
2873 ]
2874})")) {}
2875
2876 protected:
2877 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part0_;
2878 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part1_;
2879 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part2_;
2880 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part3_;
2881};
2882
2883// Tests that if 2 computers go back and forth trying to be the same node, we
2884// die in sorting instead of failing to estimate time.
2885TEST_F(SortingDeathTest, FightingNodes) {
2886 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002887 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh44c61472021-11-22 21:04:10 -08002888 writer0.QueueSpan(part0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002889 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh44c61472021-11-22 21:04:10 -08002890 writer1.QueueSpan(part1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002891 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuh44c61472021-11-22 21:04:10 -08002892 writer2.QueueSpan(part2_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002893 TestDetachedBufferWriter writer3(logfile3_);
Austin Schuh44c61472021-11-22 21:04:10 -08002894 writer3.QueueSpan(part3_.span());
2895 }
2896
2897 EXPECT_DEATH(
2898 {
2899 const std::vector<LogFile> parts =
2900 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2901 },
Austin Schuh22cf7862022-09-19 19:09:42 -07002902 "found overlapping boots on");
Austin Schuh44c61472021-11-22 21:04:10 -08002903}
2904
Brian Smarttea913d42021-12-10 15:02:38 -08002905// Tests that we MessageReader blows up on a bad message.
2906TEST(MessageReaderConfirmCrash, ReadWrite) {
2907 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
2908 unlink(logfile.c_str());
2909
2910 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
2911 JsonToSizedFlatbuffer<LogFileHeader>(
2912 R"({ "max_out_of_order_duration": 100000000 })");
2913 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
2914 JsonToSizedFlatbuffer<MessageHeader>(
2915 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
2916 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
2917 JsonToSizedFlatbuffer<MessageHeader>(
2918 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
2919 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m4 =
2920 JsonToSizedFlatbuffer<MessageHeader>(
2921 R"({ "channel_index": 0, "monotonic_sent_time": 4 })");
2922
2923 // Starts out like a proper flat buffer header, but it breaks down ...
2924 std::vector<uint8_t> garbage{8, 0, 0, 0, 16, 0, 0, 0, 4, 0, 0, 0};
2925 absl::Span<uint8_t> m3_span(garbage);
2926
2927 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002928 TestDetachedBufferWriter writer(logfile);
Brian Smarttea913d42021-12-10 15:02:38 -08002929 writer.QueueSpan(config.span());
2930 writer.QueueSpan(m1.span());
2931 writer.QueueSpan(m2.span());
2932 writer.QueueSpan(m3_span);
2933 writer.QueueSpan(m4.span()); // This message is "hidden"
2934 }
2935
2936 {
2937 MessageReader reader(logfile);
2938
2939 EXPECT_EQ(reader.filename(), logfile);
2940
2941 EXPECT_EQ(
2942 reader.max_out_of_order_duration(),
2943 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2944 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2945 EXPECT_TRUE(reader.ReadMessage());
2946 EXPECT_EQ(reader.newest_timestamp(),
2947 monotonic_clock::time_point(chrono::nanoseconds(1)));
2948 EXPECT_TRUE(reader.ReadMessage());
2949 EXPECT_EQ(reader.newest_timestamp(),
2950 monotonic_clock::time_point(chrono::nanoseconds(2)));
2951 // Confirm default crashing behavior
2952 EXPECT_DEATH(reader.ReadMessage(), "Corrupted message at offset");
2953 }
2954
2955 {
2956 gflags::FlagSaver fs;
2957
2958 MessageReader reader(logfile);
2959 reader.set_crash_on_corrupt_message_flag(false);
2960
2961 EXPECT_EQ(reader.filename(), logfile);
2962
2963 EXPECT_EQ(
2964 reader.max_out_of_order_duration(),
2965 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2966 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2967 EXPECT_TRUE(reader.ReadMessage());
2968 EXPECT_EQ(reader.newest_timestamp(),
2969 monotonic_clock::time_point(chrono::nanoseconds(1)));
2970 EXPECT_TRUE(reader.ReadMessage());
2971 EXPECT_EQ(reader.newest_timestamp(),
2972 monotonic_clock::time_point(chrono::nanoseconds(2)));
2973 // Confirm avoiding the corrupted message crash, stopping instead.
2974 EXPECT_FALSE(reader.ReadMessage());
2975 }
2976
2977 {
2978 gflags::FlagSaver fs;
2979
2980 MessageReader reader(logfile);
2981 reader.set_crash_on_corrupt_message_flag(false);
2982 reader.set_ignore_corrupt_messages_flag(true);
2983
2984 EXPECT_EQ(reader.filename(), logfile);
2985
2986 EXPECT_EQ(
2987 reader.max_out_of_order_duration(),
2988 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2989 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2990 EXPECT_TRUE(reader.ReadMessage());
2991 EXPECT_EQ(reader.newest_timestamp(),
2992 monotonic_clock::time_point(chrono::nanoseconds(1)));
2993 EXPECT_TRUE(reader.ReadMessage());
2994 EXPECT_EQ(reader.newest_timestamp(),
2995 monotonic_clock::time_point(chrono::nanoseconds(2)));
2996 // Confirm skipping of the corrupted message to read the hidden one.
2997 EXPECT_TRUE(reader.ReadMessage());
2998 EXPECT_EQ(reader.newest_timestamp(),
2999 monotonic_clock::time_point(chrono::nanoseconds(4)));
3000 EXPECT_FALSE(reader.ReadMessage());
3001 }
3002}
3003
Austin Schuhfa30c352022-10-16 11:12:02 -07003004class InlinePackMessage : public ::testing::Test {
3005 protected:
3006 aos::Context RandomContext() {
3007 data_ = RandomData();
3008 std::uniform_int_distribution<uint32_t> uint32_distribution(
3009 std::numeric_limits<uint32_t>::min(),
3010 std::numeric_limits<uint32_t>::max());
3011
3012 std::uniform_int_distribution<int64_t> time_distribution(
3013 std::numeric_limits<int64_t>::min(),
3014 std::numeric_limits<int64_t>::max());
3015
3016 aos::Context context;
3017 context.monotonic_event_time =
3018 aos::monotonic_clock::epoch() +
3019 chrono::nanoseconds(time_distribution(random_number_generator_));
3020 context.realtime_event_time =
3021 aos::realtime_clock::epoch() +
3022 chrono::nanoseconds(time_distribution(random_number_generator_));
3023
3024 context.monotonic_remote_time =
3025 aos::monotonic_clock::epoch() +
3026 chrono::nanoseconds(time_distribution(random_number_generator_));
3027 context.realtime_remote_time =
3028 aos::realtime_clock::epoch() +
3029 chrono::nanoseconds(time_distribution(random_number_generator_));
3030
3031 context.queue_index = uint32_distribution(random_number_generator_);
3032 context.remote_queue_index = uint32_distribution(random_number_generator_);
3033 context.size = data_.size();
3034 context.data = data_.data();
3035 return context;
3036 }
3037
Austin Schuhf2d0e682022-10-16 14:20:58 -07003038 aos::monotonic_clock::time_point RandomMonotonic() {
3039 std::uniform_int_distribution<int64_t> time_distribution(
3040 0, std::numeric_limits<int64_t>::max());
3041 return aos::monotonic_clock::epoch() +
3042 chrono::nanoseconds(time_distribution(random_number_generator_));
3043 }
3044
3045 aos::SizePrefixedFlatbufferDetachedBuffer<message_bridge::RemoteMessage>
3046 RandomRemoteMessage() {
3047 std::uniform_int_distribution<uint8_t> uint8_distribution(
3048 std::numeric_limits<uint8_t>::min(),
3049 std::numeric_limits<uint8_t>::max());
3050
3051 std::uniform_int_distribution<int64_t> time_distribution(
3052 std::numeric_limits<int64_t>::min(),
3053 std::numeric_limits<int64_t>::max());
3054
3055 flatbuffers::FlatBufferBuilder fbb;
3056 message_bridge::RemoteMessage::Builder builder(fbb);
3057 builder.add_queue_index(uint8_distribution(random_number_generator_));
3058
3059 builder.add_monotonic_sent_time(
3060 time_distribution(random_number_generator_));
3061 builder.add_realtime_sent_time(time_distribution(random_number_generator_));
3062 builder.add_monotonic_remote_time(
3063 time_distribution(random_number_generator_));
3064 builder.add_realtime_remote_time(
3065 time_distribution(random_number_generator_));
3066
3067 builder.add_remote_queue_index(
3068 uint8_distribution(random_number_generator_));
3069
3070 fbb.FinishSizePrefixed(builder.Finish());
3071 return fbb.Release();
3072 }
3073
Austin Schuhfa30c352022-10-16 11:12:02 -07003074 std::vector<uint8_t> RandomData() {
3075 std::vector<uint8_t> result;
3076 std::uniform_int_distribution<int> length_distribution(1, 32);
3077 std::uniform_int_distribution<uint8_t> data_distribution(
3078 std::numeric_limits<uint8_t>::min(),
3079 std::numeric_limits<uint8_t>::max());
3080
3081 const size_t length = length_distribution(random_number_generator_);
3082
3083 result.reserve(length);
3084 for (size_t i = 0; i < length; ++i) {
3085 result.emplace_back(data_distribution(random_number_generator_));
3086 }
3087 return result;
3088 }
3089
3090 std::mt19937 random_number_generator_{
3091 std::mt19937(::aos::testing::RandomSeed())};
3092
3093 std::vector<uint8_t> data_;
3094};
3095
3096// Uses the binary schema to annotate a provided flatbuffer. Returns the
3097// annotated flatbuffer.
3098std::string AnnotateBinaries(
3099 const aos::NonSizePrefixedFlatbuffer<reflection::Schema> &schema,
3100 const std::string &schema_filename,
3101 flatbuffers::span<uint8_t> binary_data) {
3102 flatbuffers::BinaryAnnotator binary_annotator(
3103 schema.span().data(), schema.span().size(), binary_data.data(),
3104 binary_data.size());
3105
3106 auto annotations = binary_annotator.Annotate();
3107
3108 flatbuffers::AnnotatedBinaryTextGenerator text_generator(
3109 flatbuffers::AnnotatedBinaryTextGenerator::Options{}, annotations,
3110 binary_data.data(), binary_data.size());
3111
3112 text_generator.Generate(aos::testing::TestTmpDir() + "/foo.bfbs",
3113 schema_filename);
3114
3115 return aos::util::ReadFileToStringOrDie(aos::testing::TestTmpDir() +
3116 "/foo.afb");
3117}
3118
Austin Schuh71a40d42023-02-04 21:22:22 -08003119// Event loop which just has working time functions for the Copier classes
3120// tested below.
3121class TimeEventLoop : public EventLoop {
3122 public:
3123 TimeEventLoop() : EventLoop(nullptr) {}
3124
3125 aos::monotonic_clock::time_point monotonic_now() const final {
3126 return aos::monotonic_clock::min_time;
3127 }
3128 realtime_clock::time_point realtime_now() const final {
3129 return aos::realtime_clock::min_time;
3130 }
3131
3132 void OnRun(::std::function<void()> /*on_run*/) final { LOG(FATAL); }
3133
3134 const std::string_view name() const final { return "time"; }
3135 const Node *node() const final { return nullptr; }
3136
3137 void SetRuntimeAffinity(const cpu_set_t & /*cpuset*/) final { LOG(FATAL); }
3138 void SetRuntimeRealtimePriority(int /*priority*/) final { LOG(FATAL); }
3139
3140 const cpu_set_t &runtime_affinity() const final {
3141 LOG(FATAL);
3142 return cpuset_;
3143 }
3144
3145 TimerHandler *AddTimer(::std::function<void()> /*callback*/) final {
3146 LOG(FATAL);
3147 return nullptr;
3148 }
3149
3150 std::unique_ptr<RawSender> MakeRawSender(const Channel * /*channel*/) final {
3151 LOG(FATAL);
3152 return std::unique_ptr<RawSender>();
3153 }
3154
3155 const UUID &boot_uuid() const final {
3156 LOG(FATAL);
3157 return boot_uuid_;
3158 }
3159
3160 void set_name(const std::string_view name) final { LOG(FATAL) << name; }
3161
3162 pid_t GetTid() final {
3163 LOG(FATAL);
3164 return 0;
3165 }
3166
3167 int NumberBuffers(const Channel * /*channel*/) final {
3168 LOG(FATAL);
3169 return 0;
3170 }
3171
3172 int runtime_realtime_priority() const final {
3173 LOG(FATAL);
3174 return 0;
3175 }
3176
3177 std::unique_ptr<RawFetcher> MakeRawFetcher(
3178 const Channel * /*channel*/) final {
3179 LOG(FATAL);
3180 return std::unique_ptr<RawFetcher>();
3181 }
3182
3183 PhasedLoopHandler *AddPhasedLoop(
3184 ::std::function<void(int)> /*callback*/,
3185 const monotonic_clock::duration /*interval*/,
3186 const monotonic_clock::duration /*offset*/) final {
3187 LOG(FATAL);
3188 return nullptr;
3189 }
3190
3191 void MakeRawWatcher(
3192 const Channel * /*channel*/,
3193 std::function<void(const Context &context, const void *message)>
3194 /*watcher*/) final {
3195 LOG(FATAL);
3196 }
3197
3198 private:
3199 const cpu_set_t cpuset_ = DefaultAffinity();
3200 UUID boot_uuid_ = UUID ::Zero();
3201};
3202
Austin Schuhfa30c352022-10-16 11:12:02 -07003203// Tests that all variations of PackMessage are equivalent to the inline
3204// PackMessage used to avoid allocations.
3205TEST_F(InlinePackMessage, Equivilent) {
3206 std::uniform_int_distribution<uint32_t> uint32_distribution(
3207 std::numeric_limits<uint32_t>::min(),
3208 std::numeric_limits<uint32_t>::max());
3209 aos::FlatbufferVector<reflection::Schema> schema =
3210 FileToFlatbuffer<reflection::Schema>(
3211 ArtifactPath("aos/events/logging/logger.bfbs"));
3212
3213 for (const LogType type :
3214 {LogType::kLogMessage, LogType::kLogDeliveryTimeOnly,
3215 LogType::kLogMessageAndDeliveryTime, LogType::kLogRemoteMessage}) {
3216 for (int i = 0; i < 100; ++i) {
3217 aos::Context context = RandomContext();
3218 const uint32_t channel_index =
3219 uint32_distribution(random_number_generator_);
3220
3221 flatbuffers::FlatBufferBuilder fbb;
3222 fbb.ForceDefaults(true);
3223 fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index, type));
3224
3225 VLOG(1) << absl::BytesToHexString(std::string_view(
3226 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3227 fbb.GetBufferSpan().size()));
3228
3229 // Make sure that both the builder and inline method agree on sizes.
Austin Schuh48d10d62022-10-16 22:19:23 -07003230 ASSERT_EQ(fbb.GetSize(), PackMessageSize(type, context.size))
Austin Schuhfa30c352022-10-16 11:12:02 -07003231 << "log type " << static_cast<int>(type);
3232
3233 // Initialize the buffer to something nonzero to make sure all the padding
3234 // bytes are set to 0.
Austin Schuh48d10d62022-10-16 22:19:23 -07003235 std::vector<uint8_t> repacked_message(PackMessageSize(type, context.size),
3236 67);
Austin Schuhfa30c352022-10-16 11:12:02 -07003237
3238 // And verify packing inline works as expected.
Austin Schuh71a40d42023-02-04 21:22:22 -08003239 EXPECT_EQ(
3240 repacked_message.size(),
3241 PackMessageInline(repacked_message.data(), context, channel_index,
3242 type, 0u, repacked_message.size()));
Austin Schuhfa30c352022-10-16 11:12:02 -07003243 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
3244 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3245 fbb.GetBufferSpan().size()))
3246 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
3247 fbb.GetBufferSpan());
Austin Schuh71a40d42023-02-04 21:22:22 -08003248
3249 // Ok, now we want to confirm that we can build up arbitrary pieces of
3250 // said flatbuffer. Try all of them since it is cheap.
3251 TimeEventLoop event_loop;
3252 for (size_t i = 0; i < repacked_message.size(); i += 8) {
3253 for (size_t j = i; j < repacked_message.size(); j += 8) {
3254 std::vector<uint8_t> destination(repacked_message.size(), 67u);
3255 ContextDataCopier copier(context, channel_index, type, &event_loop);
3256
3257 copier.Copy(destination.data(), i, j);
3258
3259 size_t index = 0;
3260 for (size_t k = i; k < j; ++k) {
3261 ASSERT_EQ(destination[index], repacked_message[k])
3262 << ": Failed to match type " << static_cast<int>(type)
3263 << ", index " << index << " while testing range " << i << " to "
3264 << j;
3265 ;
3266 ++index;
3267 }
3268 // Now, confirm that none of the other bytes have been touched.
3269 for (; index < destination.size(); ++index) {
3270 ASSERT_EQ(destination[index], 67u);
3271 }
3272 }
3273 }
Austin Schuhfa30c352022-10-16 11:12:02 -07003274 }
3275 }
3276}
3277
Austin Schuhf2d0e682022-10-16 14:20:58 -07003278// Tests that all variations of PackMessage are equivilent to the inline
3279// PackMessage used to avoid allocations.
3280TEST_F(InlinePackMessage, RemoteEquivilent) {
3281 aos::FlatbufferVector<reflection::Schema> schema =
3282 FileToFlatbuffer<reflection::Schema>(
3283 ArtifactPath("aos/events/logging/logger.bfbs"));
3284 std::uniform_int_distribution<uint8_t> uint8_distribution(
3285 std::numeric_limits<uint8_t>::min(), std::numeric_limits<uint8_t>::max());
3286
3287 for (int i = 0; i < 100; ++i) {
3288 aos::SizePrefixedFlatbufferDetachedBuffer<RemoteMessage> random_msg =
3289 RandomRemoteMessage();
3290 const size_t channel_index = uint8_distribution(random_number_generator_);
3291 const monotonic_clock::time_point monotonic_timestamp_time =
3292 RandomMonotonic();
3293
3294 flatbuffers::FlatBufferBuilder fbb;
3295 fbb.ForceDefaults(true);
3296 fbb.FinishSizePrefixed(PackRemoteMessage(
3297 &fbb, &random_msg.message(), channel_index, monotonic_timestamp_time));
3298
3299 VLOG(1) << absl::BytesToHexString(std::string_view(
3300 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3301 fbb.GetBufferSpan().size()));
3302
3303 // Make sure that both the builder and inline method agree on sizes.
3304 ASSERT_EQ(fbb.GetSize(), PackRemoteMessageSize());
3305
3306 // Initialize the buffer to something nonzer to make sure all the padding
3307 // bytes are set to 0.
3308 std::vector<uint8_t> repacked_message(PackRemoteMessageSize(), 67);
3309
3310 // And verify packing inline works as expected.
Austin Schuh71a40d42023-02-04 21:22:22 -08003311 EXPECT_EQ(repacked_message.size(),
3312 PackRemoteMessageInline(
3313 repacked_message.data(), &random_msg.message(), channel_index,
3314 monotonic_timestamp_time, 0u, repacked_message.size()));
Austin Schuhf2d0e682022-10-16 14:20:58 -07003315 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
3316 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3317 fbb.GetBufferSpan().size()))
3318 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
3319 fbb.GetBufferSpan());
Austin Schuh71a40d42023-02-04 21:22:22 -08003320
3321 // Ok, now we want to confirm that we can build up arbitrary pieces of said
3322 // flatbuffer. Try all of them since it is cheap.
3323 TimeEventLoop event_loop;
3324 for (size_t i = 0; i < repacked_message.size(); i += 8) {
3325 for (size_t j = i; j < repacked_message.size(); j += 8) {
3326 std::vector<uint8_t> destination(repacked_message.size(), 67u);
3327 RemoteMessageCopier copier(&random_msg.message(), channel_index,
3328 monotonic_timestamp_time, &event_loop);
3329
3330 copier.Copy(destination.data(), i, j);
3331
3332 size_t index = 0;
3333 for (size_t k = i; k < j; ++k) {
3334 ASSERT_EQ(destination[index], repacked_message[k]);
3335 ++index;
3336 }
3337 for (; index < destination.size(); ++index) {
3338 ASSERT_EQ(destination[index], 67u);
3339 }
3340 }
3341 }
Austin Schuhf2d0e682022-10-16 14:20:58 -07003342 }
3343}
Austin Schuhfa30c352022-10-16 11:12:02 -07003344
Austin Schuhc243b422020-10-11 15:35:08 -07003345} // namespace testing
3346} // namespace logger
3347} // namespace aos