blob: eaeb4b7be5d7b3c55cdfaa80eace233b23437934 [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
Alexei Strots01395492023-03-20 13:59:56 -07004#include <filesystem>
Austin Schuhfa30c352022-10-16 11:12:02 -07005#include <random>
Austin Schuhe243aaf2020-10-11 15:46:02 -07006#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07007
Austin Schuhfa30c352022-10-16 11:12:02 -07008#include "absl/strings/escaping.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07009#include "external/com_github_google_flatbuffers/src/annotated_binary_text_gen.h"
10#include "external/com_github_google_flatbuffers/src/binary_annotator.h"
11#include "flatbuffers/reflection_generated.h"
12#include "gflags/gflags.h"
13#include "gtest/gtest.h"
14
Austin Schuhc41603c2020-10-11 16:17:37 -070015#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -070016#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -080017#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070018#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070019#include "aos/json_to_flatbuffer.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070020#include "aos/testing/path.h"
21#include "aos/testing/random_seed.h"
Austin Schuhc243b422020-10-11 15:35:08 -070022#include "aos/testing/tmpdir.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070023#include "aos/util/file.h"
Austin Schuhc243b422020-10-11 15:35:08 -070024
25namespace aos {
26namespace logger {
27namespace testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070028namespace chrono = std::chrono;
Austin Schuhf2d0e682022-10-16 14:20:58 -070029using aos::message_bridge::RemoteMessage;
Eric Schmiedebergb38477e2022-12-02 16:08:04 -070030using aos::testing::ArtifactPath;
Austin Schuhc243b422020-10-11 15:35:08 -070031
Austin Schuhd863e6e2022-10-16 15:44:50 -070032// Adapter class to make it easy to test DetachedBufferWriter without adding
33// test only boilerplate to DetachedBufferWriter.
Alexei Strots15c22b12023-04-04 16:27:17 -070034class TestDetachedBufferWriter : public FileBackend,
35 public DetachedBufferWriter {
Austin Schuhd863e6e2022-10-16 15:44:50 -070036 public:
Austin Schuh48d10d62022-10-16 22:19:23 -070037 // Pick a max size that is rather conservative.
38 static constexpr size_t kMaxMessageSize = 128 * 1024;
Austin Schuhd863e6e2022-10-16 15:44:50 -070039 TestDetachedBufferWriter(std::string_view filename)
Alexei Strots15c22b12023-04-04 16:27:17 -070040 : FileBackend("/"),
41 DetachedBufferWriter(FileBackend::RequestFile(filename),
42 std::make_unique<DummyEncoder>(kMaxMessageSize)) {}
Austin Schuhd863e6e2022-10-16 15:44:50 -070043 void WriteSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
44 QueueSpan(absl::Span<const uint8_t>(buffer.data(), buffer.size()));
45 }
Austin Schuh7ef11a42023-02-04 17:15:12 -080046 void QueueSpan(absl::Span<const uint8_t> buffer) {
47 DataEncoder::SpanCopier coppier(buffer);
48 CopyMessage(&coppier, monotonic_clock::now());
49 }
Austin Schuhd863e6e2022-10-16 15:44:50 -070050};
51
Austin Schuhe243aaf2020-10-11 15:46:02 -070052// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070053template <typename T>
54SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
55 const std::string_view data) {
56 flatbuffers::FlatBufferBuilder fbb;
57 fbb.ForceDefaults(true);
58 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
59 return fbb.Release();
60}
61
Austin Schuhe243aaf2020-10-11 15:46:02 -070062// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070063TEST(SpanReaderTest, ReadWrite) {
64 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
65 unlink(logfile.c_str());
66
67 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080068 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070069 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080070 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070071
72 {
Austin Schuhd863e6e2022-10-16 15:44:50 -070073 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080074 writer.QueueSpan(m1.span());
75 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070076 }
77
78 SpanReader reader(logfile);
79
80 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhcf5f6442021-07-06 10:43:28 -070081 EXPECT_EQ(reader.PeekMessage(), m1.span());
82 EXPECT_EQ(reader.PeekMessage(), m1.span());
Austin Schuhadd6eb32020-11-09 21:24:26 -080083 EXPECT_EQ(reader.ReadMessage(), m1.span());
84 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhcf5f6442021-07-06 10:43:28 -070085 EXPECT_EQ(reader.PeekMessage(), absl::Span<const uint8_t>());
Austin Schuhc243b422020-10-11 15:35:08 -070086 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
87}
88
Austin Schuhe243aaf2020-10-11 15:46:02 -070089// Tests that we can actually parse the resulting messages at a basic level
90// through MessageReader.
91TEST(MessageReaderTest, ReadWrite) {
92 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
93 unlink(logfile.c_str());
94
95 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
96 JsonToSizedFlatbuffer<LogFileHeader>(
97 R"({ "max_out_of_order_duration": 100000000 })");
98 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
99 JsonToSizedFlatbuffer<MessageHeader>(
100 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
101 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
102 JsonToSizedFlatbuffer<MessageHeader>(
103 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
104
105 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700106 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800107 writer.QueueSpan(config.span());
108 writer.QueueSpan(m1.span());
109 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -0700110 }
111
112 MessageReader reader(logfile);
113
114 EXPECT_EQ(reader.filename(), logfile);
115
116 EXPECT_EQ(
117 reader.max_out_of_order_duration(),
118 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
119 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
120 EXPECT_TRUE(reader.ReadMessage());
121 EXPECT_EQ(reader.newest_timestamp(),
122 monotonic_clock::time_point(chrono::nanoseconds(1)));
123 EXPECT_TRUE(reader.ReadMessage());
124 EXPECT_EQ(reader.newest_timestamp(),
125 monotonic_clock::time_point(chrono::nanoseconds(2)));
126 EXPECT_FALSE(reader.ReadMessage());
127}
128
Austin Schuh32f68492020-11-08 21:45:51 -0800129// Tests that we explode when messages are too far out of order.
130TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
131 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
132 unlink(logfile0.c_str());
133
134 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
135 JsonToSizedFlatbuffer<LogFileHeader>(
136 R"({
137 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800138 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800139 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
140 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
141 "parts_index": 0
142})");
143
144 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
145 JsonToSizedFlatbuffer<MessageHeader>(
146 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
147 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
148 JsonToSizedFlatbuffer<MessageHeader>(
149 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
150 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
151 JsonToSizedFlatbuffer<MessageHeader>(
152 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
153
154 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700155 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800156 writer.QueueSpan(config0.span());
157 writer.QueueSpan(m1.span());
158 writer.QueueSpan(m2.span());
159 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800160 }
Alexei Strots01395492023-03-20 13:59:56 -0700161 ASSERT_TRUE(std::filesystem::exists(logfile0)) << logfile0;
Austin Schuh32f68492020-11-08 21:45:51 -0800162
163 const std::vector<LogFile> parts = SortParts({logfile0});
164
165 PartsMessageReader reader(parts[0].parts[0]);
166
167 EXPECT_TRUE(reader.ReadMessage());
168 EXPECT_TRUE(reader.ReadMessage());
169 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
170}
171
Austin Schuhc41603c2020-10-11 16:17:37 -0700172// Tests that we can transparently re-assemble part files with a
173// PartsMessageReader.
174TEST(PartsMessageReaderTest, ReadWrite) {
175 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
176 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
177 unlink(logfile0.c_str());
178 unlink(logfile1.c_str());
179
180 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
181 JsonToSizedFlatbuffer<LogFileHeader>(
182 R"({
183 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800184 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700185 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
186 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
187 "parts_index": 0
188})");
189 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
190 JsonToSizedFlatbuffer<LogFileHeader>(
191 R"({
192 "max_out_of_order_duration": 200000000,
193 "monotonic_start_time": 0,
194 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800195 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700196 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
197 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
198 "parts_index": 1
199})");
200
201 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
202 JsonToSizedFlatbuffer<MessageHeader>(
203 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
204 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
205 JsonToSizedFlatbuffer<MessageHeader>(
206 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
207
208 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700209 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800210 writer.QueueSpan(config0.span());
211 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700212 }
213 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700214 TestDetachedBufferWriter writer(logfile1);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800215 writer.QueueSpan(config1.span());
216 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700217 }
218
219 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
220
221 PartsMessageReader reader(parts[0].parts[0]);
222
223 EXPECT_EQ(reader.filename(), logfile0);
224
225 // Confirm that the timestamps track, and the filename also updates.
226 // Read the first message.
227 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
228 EXPECT_EQ(
229 reader.max_out_of_order_duration(),
230 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
231 EXPECT_TRUE(reader.ReadMessage());
232 EXPECT_EQ(reader.filename(), logfile0);
233 EXPECT_EQ(reader.newest_timestamp(),
234 monotonic_clock::time_point(chrono::nanoseconds(1)));
235 EXPECT_EQ(
236 reader.max_out_of_order_duration(),
237 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
238
239 // Read the second message.
240 EXPECT_TRUE(reader.ReadMessage());
241 EXPECT_EQ(reader.filename(), logfile1);
242 EXPECT_EQ(reader.newest_timestamp(),
243 monotonic_clock::time_point(chrono::nanoseconds(2)));
244 EXPECT_EQ(
245 reader.max_out_of_order_duration(),
246 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
247
248 // And then confirm that reading again returns no message.
249 EXPECT_FALSE(reader.ReadMessage());
250 EXPECT_EQ(reader.filename(), logfile1);
251 EXPECT_EQ(
252 reader.max_out_of_order_duration(),
253 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800254 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Austin Schuhc41603c2020-10-11 16:17:37 -0700255}
Austin Schuh32f68492020-11-08 21:45:51 -0800256
Austin Schuh1be0ce42020-11-29 22:43:26 -0800257// Tests that Message's operator < works as expected.
258TEST(MessageTest, Sorting) {
259 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
260
261 Message m1{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700262 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700263 .timestamp =
264 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
Austin Schuh48507722021-07-17 17:29:24 -0700265 .monotonic_remote_boot = 0xffffff,
266 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700267 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800268 Message m2{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700269 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700270 .timestamp =
271 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
Austin Schuh48507722021-07-17 17:29:24 -0700272 .monotonic_remote_boot = 0xffffff,
273 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700274 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800275
276 EXPECT_LT(m1, m2);
277 EXPECT_GE(m2, m1);
278
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700279 m1.timestamp.time = e;
280 m2.timestamp.time = e;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800281
282 m1.channel_index = 1;
283 m2.channel_index = 2;
284
285 EXPECT_LT(m1, m2);
286 EXPECT_GE(m2, m1);
287
288 m1.channel_index = 0;
289 m2.channel_index = 0;
Austin Schuh58646e22021-08-23 23:51:46 -0700290 m1.queue_index.index = 0u;
291 m2.queue_index.index = 1u;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800292
293 EXPECT_LT(m1, m2);
294 EXPECT_GE(m2, m1);
295}
296
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800297aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
298 const aos::FlatbufferDetachedBuffer<Configuration> &config,
299 const std::string_view json) {
300 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700301 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800302 flatbuffers::Offset<Configuration> config_offset =
303 aos::CopyFlatBuffer(config, &fbb);
304 LogFileHeader::Builder header_builder(fbb);
305 header_builder.add_configuration(config_offset);
306 fbb.Finish(header_builder.Finish());
307 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
308
309 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
310 JsonToFlatbuffer<LogFileHeader>(json));
311 CHECK(header_updates.Verify());
312 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700313 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800314 fbb2.FinishSizePrefixed(
315 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
316 return fbb2.Release();
317}
318
319class SortingElementTest : public ::testing::Test {
320 public:
321 SortingElementTest()
322 : config_(JsonToFlatbuffer<Configuration>(
323 R"({
324 "channels": [
325 {
326 "name": "/a",
327 "type": "aos.logger.testing.TestMessage",
328 "source_node": "pi1",
329 "destination_nodes": [
330 {
331 "name": "pi2"
332 },
333 {
334 "name": "pi3"
335 }
336 ]
337 },
338 {
339 "name": "/b",
340 "type": "aos.logger.testing.TestMessage",
341 "source_node": "pi1"
342 },
343 {
344 "name": "/c",
345 "type": "aos.logger.testing.TestMessage",
346 "source_node": "pi1"
Austin Schuh48507722021-07-17 17:29:24 -0700347 },
348 {
349 "name": "/d",
350 "type": "aos.logger.testing.TestMessage",
351 "source_node": "pi2",
352 "destination_nodes": [
353 {
354 "name": "pi1"
355 }
356 ]
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800357 }
358 ],
359 "nodes": [
360 {
361 "name": "pi1"
362 },
363 {
364 "name": "pi2"
365 },
366 {
367 "name": "pi3"
368 }
369 ]
370}
371)")),
372 config0_(MakeHeader(config_, R"({
373 /* 100ms */
374 "max_out_of_order_duration": 100000000,
375 "node": {
376 "name": "pi1"
377 },
378 "logger_node": {
379 "name": "pi1"
380 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800381 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800382 "realtime_start_time": 1000000000000,
383 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700384 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
385 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
386 "boot_uuids": [
387 "1d782c63-b3c7-466e-bea9-a01308b43333",
388 "",
389 ""
390 ],
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800391 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
392 "parts_index": 0
393})")),
394 config1_(MakeHeader(config_,
395 R"({
396 /* 100ms */
397 "max_out_of_order_duration": 100000000,
398 "node": {
399 "name": "pi1"
400 },
401 "logger_node": {
402 "name": "pi1"
403 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800404 "monotonic_start_time": 1000000,
405 "realtime_start_time": 1000000000000,
406 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700407 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
408 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
409 "boot_uuids": [
410 "1d782c63-b3c7-466e-bea9-a01308b43333",
411 "",
412 ""
413 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800414 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
415 "parts_index": 0
416})")),
417 config2_(MakeHeader(config_,
418 R"({
419 /* 100ms */
420 "max_out_of_order_duration": 100000000,
421 "node": {
422 "name": "pi2"
423 },
424 "logger_node": {
425 "name": "pi2"
426 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800427 "monotonic_start_time": 0,
428 "realtime_start_time": 1000000000000,
Austin Schuh48507722021-07-17 17:29:24 -0700429 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
430 "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
431 "boot_uuids": [
432 "",
433 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
434 ""
435 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800436 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
437 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
438 "parts_index": 0
439})")),
440 config3_(MakeHeader(config_,
441 R"({
442 /* 100ms */
443 "max_out_of_order_duration": 100000000,
444 "node": {
445 "name": "pi1"
446 },
447 "logger_node": {
448 "name": "pi1"
449 },
450 "monotonic_start_time": 2000000,
451 "realtime_start_time": 1000000000,
452 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
Austin Schuh48507722021-07-17 17:29:24 -0700453 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
454 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
455 "boot_uuids": [
456 "1d782c63-b3c7-466e-bea9-a01308b43333",
457 "",
458 ""
459 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800460 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800461 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800462})")),
463 config4_(MakeHeader(config_,
464 R"({
465 /* 100ms */
466 "max_out_of_order_duration": 100000000,
467 "node": {
468 "name": "pi2"
469 },
470 "logger_node": {
471 "name": "pi1"
472 },
473 "monotonic_start_time": 2000000,
474 "realtime_start_time": 1000000000,
475 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
476 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
Austin Schuh48507722021-07-17 17:29:24 -0700477 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
478 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
479 "boot_uuids": [
480 "1d782c63-b3c7-466e-bea9-a01308b43333",
481 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
482 ""
483 ],
Austin Schuh8bf1e632021-01-02 22:41:04 -0800484 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800485})")) {
486 unlink(logfile0_.c_str());
487 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800488 unlink(logfile2_.c_str());
Austin Schuh48507722021-07-17 17:29:24 -0700489 unlink(logfile3_.c_str());
Austin Schuh921b2562021-07-31 19:37:03 -0700490 queue_index_.resize(config_.message().channels()->size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800491 }
492
493 protected:
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800494 flatbuffers::DetachedBuffer MakeLogMessage(
495 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
496 int value) {
497 flatbuffers::FlatBufferBuilder message_fbb;
498 message_fbb.ForceDefaults(true);
499 TestMessage::Builder test_message_builder(message_fbb);
500 test_message_builder.add_value(value);
501 message_fbb.Finish(test_message_builder.Finish());
502
503 aos::Context context;
504 context.monotonic_event_time = monotonic_now;
505 context.realtime_event_time = aos::realtime_clock::epoch() +
506 chrono::seconds(1000) +
507 monotonic_now.time_since_epoch();
Austin Schuh921b2562021-07-31 19:37:03 -0700508 CHECK_LT(static_cast<size_t>(channel_index), queue_index_.size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800509 context.queue_index = queue_index_[channel_index];
510 context.size = message_fbb.GetSize();
511 context.data = message_fbb.GetBufferPointer();
512
513 ++queue_index_[channel_index];
514
515 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh53e3a812021-11-03 16:57:23 -0700516 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800517 fbb.FinishSizePrefixed(
518 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
519
520 return fbb.Release();
521 }
522
523 flatbuffers::DetachedBuffer MakeTimestampMessage(
524 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800525 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
526 monotonic_clock::time_point monotonic_timestamp_time =
527 monotonic_clock::min_time) {
528 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800529 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800530
531 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800532 fbb.ForceDefaults(true);
533
534 logger::MessageHeader::Builder message_header_builder(fbb);
535
536 message_header_builder.add_channel_index(channel_index);
537
538 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
539 100);
540 message_header_builder.add_monotonic_sent_time(
541 monotonic_sent_time.time_since_epoch().count());
542 message_header_builder.add_realtime_sent_time(
543 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
544 monotonic_sent_time.time_since_epoch())
545 .time_since_epoch()
546 .count());
547
548 message_header_builder.add_monotonic_remote_time(
549 sender_monotonic_now.time_since_epoch().count());
550 message_header_builder.add_realtime_remote_time(
551 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
552 sender_monotonic_now.time_since_epoch())
553 .time_since_epoch()
554 .count());
555 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
556 1);
557
558 if (monotonic_timestamp_time != monotonic_clock::min_time) {
559 message_header_builder.add_monotonic_timestamp_time(
560 monotonic_timestamp_time.time_since_epoch().count());
561 }
562
563 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800564 LOG(INFO) << aos::FlatbufferToJson(
565 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
566 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
567
568 return fbb.Release();
569 }
570
571 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
572 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800573 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh48507722021-07-17 17:29:24 -0700574 const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800575
576 const aos::FlatbufferDetachedBuffer<Configuration> config_;
577 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
578 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800579 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
580 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800581 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800582
583 std::vector<uint32_t> queue_index_;
584};
585
586using LogPartsSorterTest = SortingElementTest;
587using LogPartsSorterDeathTest = LogPartsSorterTest;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800588using NodeMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800589using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800590
591// Tests that we can pull messages out of a log sorted in order.
592TEST_F(LogPartsSorterTest, Pull) {
593 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
594 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700595 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800596 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700597 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800598 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700599 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800600 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700601 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800602 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700603 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800604 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
605 }
606
607 const std::vector<LogFile> parts = SortParts({logfile0_});
608
609 LogPartsSorter parts_sorter(parts[0].parts[0]);
610
611 // Confirm we aren't sorted until any time until the message is popped.
612 // Peeking shouldn't change the sorted until time.
613 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
614
615 std::deque<Message> output;
616
617 ASSERT_TRUE(parts_sorter.Front() != nullptr);
618 output.emplace_back(std::move(*parts_sorter.Front()));
619 parts_sorter.PopFront();
620 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
621
622 ASSERT_TRUE(parts_sorter.Front() != nullptr);
623 output.emplace_back(std::move(*parts_sorter.Front()));
624 parts_sorter.PopFront();
625 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
626
627 ASSERT_TRUE(parts_sorter.Front() != nullptr);
628 output.emplace_back(std::move(*parts_sorter.Front()));
629 parts_sorter.PopFront();
630 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
631
632 ASSERT_TRUE(parts_sorter.Front() != nullptr);
633 output.emplace_back(std::move(*parts_sorter.Front()));
634 parts_sorter.PopFront();
635 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
636
637 ASSERT_TRUE(parts_sorter.Front() == nullptr);
638
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700639 EXPECT_EQ(output[0].timestamp.boot, 0);
640 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
641 EXPECT_EQ(output[1].timestamp.boot, 0);
642 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1000));
643 EXPECT_EQ(output[2].timestamp.boot, 0);
644 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1901));
645 EXPECT_EQ(output[3].timestamp.boot, 0);
646 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800647}
648
Austin Schuhb000de62020-12-03 22:00:40 -0800649// Tests that we can pull messages out of a log sorted in order.
650TEST_F(LogPartsSorterTest, WayBeforeStart) {
651 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
652 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700653 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhb000de62020-12-03 22:00:40 -0800654 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700655 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800656 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700657 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800658 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700659 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800660 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700661 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800662 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700663 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800664 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
665 }
666
667 const std::vector<LogFile> parts = SortParts({logfile0_});
668
669 LogPartsSorter parts_sorter(parts[0].parts[0]);
670
671 // Confirm we aren't sorted until any time until the message is popped.
672 // Peeking shouldn't change the sorted until time.
673 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
674
675 std::deque<Message> output;
676
677 for (monotonic_clock::time_point t :
678 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
679 e + chrono::milliseconds(1900), monotonic_clock::max_time,
680 monotonic_clock::max_time}) {
681 ASSERT_TRUE(parts_sorter.Front() != nullptr);
682 output.emplace_back(std::move(*parts_sorter.Front()));
683 parts_sorter.PopFront();
684 EXPECT_EQ(parts_sorter.sorted_until(), t);
685 }
686
687 ASSERT_TRUE(parts_sorter.Front() == nullptr);
688
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700689 EXPECT_EQ(output[0].timestamp.boot, 0u);
690 EXPECT_EQ(output[0].timestamp.time, e - chrono::milliseconds(1000));
691 EXPECT_EQ(output[1].timestamp.boot, 0u);
692 EXPECT_EQ(output[1].timestamp.time, e - chrono::milliseconds(500));
693 EXPECT_EQ(output[2].timestamp.boot, 0u);
694 EXPECT_EQ(output[2].timestamp.time, e - chrono::milliseconds(10));
695 EXPECT_EQ(output[3].timestamp.boot, 0u);
696 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(1901));
697 EXPECT_EQ(output[4].timestamp.boot, 0u);
698 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuhb000de62020-12-03 22:00:40 -0800699}
700
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800701// Tests that messages too far out of order trigger death.
702TEST_F(LogPartsSorterDeathTest, Pull) {
703 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
704 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700705 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800706 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700707 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800708 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700709 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800710 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700711 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800712 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
713 // The following message is too far out of order and will trigger the CHECK.
Austin Schuhd863e6e2022-10-16 15:44:50 -0700714 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800715 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
716 }
717
718 const std::vector<LogFile> parts = SortParts({logfile0_});
719
720 LogPartsSorter parts_sorter(parts[0].parts[0]);
721
722 // Confirm we aren't sorted until any time until the message is popped.
723 // Peeking shouldn't change the sorted until time.
724 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
725 std::deque<Message> output;
726
727 ASSERT_TRUE(parts_sorter.Front() != nullptr);
728 parts_sorter.PopFront();
729 ASSERT_TRUE(parts_sorter.Front() != nullptr);
730 ASSERT_TRUE(parts_sorter.Front() != nullptr);
731 parts_sorter.PopFront();
732
Austin Schuh58646e22021-08-23 23:51:46 -0700733 EXPECT_DEATH({ parts_sorter.Front(); },
734 "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800735}
736
Austin Schuh8f52ed52020-11-30 23:12:39 -0800737// Tests that we can merge data from 2 separate files, including duplicate data.
738TEST_F(NodeMergerTest, TwoFileMerger) {
739 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
740 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700741 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800742 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700743 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800744 writer1.QueueSpan(config1_.span());
745
Austin Schuhd863e6e2022-10-16 15:44:50 -0700746 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800747 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700748 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800749 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
750
Austin Schuhd863e6e2022-10-16 15:44:50 -0700751 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800752 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700753 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800754 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
755
756 // Make a duplicate!
757 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
758 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
759 writer0.QueueSpan(msg.span());
760 writer1.QueueSpan(msg.span());
761
Austin Schuhd863e6e2022-10-16 15:44:50 -0700762 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800763 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
764 }
765
766 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Austin Schuhd2f96102020-12-01 20:27:29 -0800767 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800768
Austin Schuhd2f96102020-12-01 20:27:29 -0800769 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800770
771 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
772
773 std::deque<Message> output;
774
775 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
776 ASSERT_TRUE(merger.Front() != nullptr);
777 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
778
779 output.emplace_back(std::move(*merger.Front()));
780 merger.PopFront();
781 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
782
783 ASSERT_TRUE(merger.Front() != nullptr);
784 output.emplace_back(std::move(*merger.Front()));
785 merger.PopFront();
786 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
787
788 ASSERT_TRUE(merger.Front() != nullptr);
789 output.emplace_back(std::move(*merger.Front()));
790 merger.PopFront();
791 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
792
793 ASSERT_TRUE(merger.Front() != nullptr);
794 output.emplace_back(std::move(*merger.Front()));
795 merger.PopFront();
796 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
797
798 ASSERT_TRUE(merger.Front() != nullptr);
799 output.emplace_back(std::move(*merger.Front()));
800 merger.PopFront();
801 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
802
803 ASSERT_TRUE(merger.Front() != nullptr);
804 output.emplace_back(std::move(*merger.Front()));
805 merger.PopFront();
806 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
807
808 ASSERT_TRUE(merger.Front() == nullptr);
809
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700810 EXPECT_EQ(output[0].timestamp.boot, 0u);
811 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
812 EXPECT_EQ(output[1].timestamp.boot, 0u);
813 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1001));
814 EXPECT_EQ(output[2].timestamp.boot, 0u);
815 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1002));
816 EXPECT_EQ(output[3].timestamp.boot, 0u);
817 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
818 EXPECT_EQ(output[4].timestamp.boot, 0u);
819 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(3000));
820 EXPECT_EQ(output[5].timestamp.boot, 0u);
821 EXPECT_EQ(output[5].timestamp.time, e + chrono::milliseconds(3002));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800822}
823
Austin Schuh8bf1e632021-01-02 22:41:04 -0800824// Tests that we can merge timestamps with various combinations of
825// monotonic_timestamp_time.
826TEST_F(NodeMergerTest, TwoFileTimestampMerger) {
827 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
828 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700829 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800830 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700831 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800832 writer1.QueueSpan(config1_.span());
833
834 // Neither has it.
835 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700836 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800837 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700838 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800839 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
840
841 // First only has it.
842 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700843 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800844 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
845 e + chrono::nanoseconds(971)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700846 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800847 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
848
849 // Second only has it.
850 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700851 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800852 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700853 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800854 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
855 e + chrono::nanoseconds(972)));
856
857 // Both have it.
858 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700859 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800860 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
861 e + chrono::nanoseconds(973)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700862 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800863 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
864 e + chrono::nanoseconds(973)));
865 }
866
867 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
868 ASSERT_EQ(parts.size(), 1u);
869
870 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
871
872 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
873
874 std::deque<Message> output;
875
876 for (int i = 0; i < 4; ++i) {
877 ASSERT_TRUE(merger.Front() != nullptr);
878 output.emplace_back(std::move(*merger.Front()));
879 merger.PopFront();
880 }
881 ASSERT_TRUE(merger.Front() == nullptr);
882
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700883 EXPECT_EQ(output[0].timestamp.boot, 0u);
884 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700885 EXPECT_FALSE(output[0].data->has_monotonic_timestamp_time);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700886
887 EXPECT_EQ(output[1].timestamp.boot, 0u);
888 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700889 EXPECT_TRUE(output[1].data->has_monotonic_timestamp_time);
890 EXPECT_EQ(output[1].data->monotonic_timestamp_time,
891 monotonic_clock::time_point(std::chrono::nanoseconds(971)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700892
893 EXPECT_EQ(output[2].timestamp.boot, 0u);
894 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700895 EXPECT_TRUE(output[2].data->has_monotonic_timestamp_time);
896 EXPECT_EQ(output[2].data->monotonic_timestamp_time,
897 monotonic_clock::time_point(std::chrono::nanoseconds(972)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700898
899 EXPECT_EQ(output[3].timestamp.boot, 0u);
900 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700901 EXPECT_TRUE(output[3].data->has_monotonic_timestamp_time);
902 EXPECT_EQ(output[3].data->monotonic_timestamp_time,
903 monotonic_clock::time_point(std::chrono::nanoseconds(973)));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800904}
905
Austin Schuhd2f96102020-12-01 20:27:29 -0800906// Tests that we can match timestamps on delivered messages.
907TEST_F(TimestampMapperTest, ReadNode0First) {
908 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
909 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700910 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800911 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700912 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800913 writer1.QueueSpan(config2_.span());
914
Austin Schuhd863e6e2022-10-16 15:44:50 -0700915 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800916 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700917 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800918 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
919
Austin Schuhd863e6e2022-10-16 15:44:50 -0700920 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800921 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700922 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800923 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
924
Austin Schuhd863e6e2022-10-16 15:44:50 -0700925 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800926 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700927 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800928 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
929 }
930
931 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
932
933 ASSERT_EQ(parts[0].logger_node, "pi1");
934 ASSERT_EQ(parts[1].logger_node, "pi2");
935
Austin Schuh79b30942021-01-24 22:32:21 -0800936 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800937 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -0800938 mapper0.set_timestamp_callback(
939 [&](TimestampedMessage *) { ++mapper0_count; });
940 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800941 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -0800942 mapper1.set_timestamp_callback(
943 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -0800944
945 mapper0.AddPeer(&mapper1);
946 mapper1.AddPeer(&mapper0);
947
948 {
949 std::deque<TimestampedMessage> output0;
950
Austin Schuh79b30942021-01-24 22:32:21 -0800951 EXPECT_EQ(mapper0_count, 0u);
952 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800953 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800954 EXPECT_EQ(mapper0_count, 1u);
955 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800956 output0.emplace_back(std::move(*mapper0.Front()));
957 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700958 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800959 EXPECT_EQ(mapper0_count, 1u);
960 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800961
962 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800963 EXPECT_EQ(mapper0_count, 2u);
964 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800965 output0.emplace_back(std::move(*mapper0.Front()));
966 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700967 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800968
969 ASSERT_TRUE(mapper0.Front() != nullptr);
970 output0.emplace_back(std::move(*mapper0.Front()));
971 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700972 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800973
Austin Schuh79b30942021-01-24 22:32:21 -0800974 EXPECT_EQ(mapper0_count, 3u);
975 EXPECT_EQ(mapper1_count, 0u);
976
Austin Schuhd2f96102020-12-01 20:27:29 -0800977 ASSERT_TRUE(mapper0.Front() == nullptr);
978
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700979 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
980 EXPECT_EQ(output0[0].monotonic_event_time.time,
981 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700982 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700983
984 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
985 EXPECT_EQ(output0[1].monotonic_event_time.time,
986 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700987 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700988
989 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
990 EXPECT_EQ(output0[2].monotonic_event_time.time,
991 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700992 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -0800993 }
994
995 {
996 SCOPED_TRACE("Trying node1 now");
997 std::deque<TimestampedMessage> output1;
998
Austin Schuh79b30942021-01-24 22:32:21 -0800999 EXPECT_EQ(mapper0_count, 3u);
1000 EXPECT_EQ(mapper1_count, 0u);
1001
Austin Schuhd2f96102020-12-01 20:27:29 -08001002 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001003 EXPECT_EQ(mapper0_count, 3u);
1004 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001005 output1.emplace_back(std::move(*mapper1.Front()));
1006 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001007 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -08001008 EXPECT_EQ(mapper0_count, 3u);
1009 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001010
1011 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001012 EXPECT_EQ(mapper0_count, 3u);
1013 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001014 output1.emplace_back(std::move(*mapper1.Front()));
1015 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001016 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001017
1018 ASSERT_TRUE(mapper1.Front() != nullptr);
1019 output1.emplace_back(std::move(*mapper1.Front()));
1020 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001021 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001022
Austin Schuh79b30942021-01-24 22:32:21 -08001023 EXPECT_EQ(mapper0_count, 3u);
1024 EXPECT_EQ(mapper1_count, 3u);
1025
Austin Schuhd2f96102020-12-01 20:27:29 -08001026 ASSERT_TRUE(mapper1.Front() == nullptr);
1027
Austin Schuh79b30942021-01-24 22:32:21 -08001028 EXPECT_EQ(mapper0_count, 3u);
1029 EXPECT_EQ(mapper1_count, 3u);
1030
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001031 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1032 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001033 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001034 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001035
1036 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1037 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001038 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001039 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001040
1041 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1042 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001043 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001044 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001045 }
1046}
1047
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001048// Tests that we filter messages using the channel filter callback
1049TEST_F(TimestampMapperTest, ReplayChannelsCallbackTest) {
1050 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1051 {
1052 TestDetachedBufferWriter writer0(logfile0_);
1053 writer0.QueueSpan(config0_.span());
1054 TestDetachedBufferWriter writer1(logfile1_);
1055 writer1.QueueSpan(config2_.span());
1056
1057 writer0.WriteSizedFlatbuffer(
1058 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1059 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1060 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1061
1062 writer0.WriteSizedFlatbuffer(
1063 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1064 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1065 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1066
1067 writer0.WriteSizedFlatbuffer(
1068 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1069 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1070 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1071 }
1072
1073 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1074
1075 ASSERT_EQ(parts[0].logger_node, "pi1");
1076 ASSERT_EQ(parts[1].logger_node, "pi2");
1077
1078 // mapper0 will not provide any messages while mapper1 will provide all
1079 // messages due to the channel filter callbacks used
1080 size_t mapper0_count = 0;
1081 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1082 mapper0.set_timestamp_callback(
1083 [&](TimestampedMessage *) { ++mapper0_count; });
1084 mapper0.set_replay_channels_callback(
1085 [&](const TimestampedMessage &) -> bool { return mapper0_count != 2; });
1086 size_t mapper1_count = 0;
1087 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1088 mapper1.set_timestamp_callback(
1089 [&](TimestampedMessage *) { ++mapper1_count; });
1090 mapper1.set_replay_channels_callback(
1091 [&](const TimestampedMessage &) -> bool { return mapper1_count != 2; });
1092
1093 mapper0.AddPeer(&mapper1);
1094 mapper1.AddPeer(&mapper0);
1095
1096 {
1097 std::deque<TimestampedMessage> output0;
1098
1099 EXPECT_EQ(mapper0_count, 0u);
1100 EXPECT_EQ(mapper1_count, 0u);
1101
1102 ASSERT_TRUE(mapper0.Front() != nullptr);
1103 EXPECT_EQ(mapper0_count, 1u);
1104 EXPECT_EQ(mapper1_count, 0u);
1105 output0.emplace_back(std::move(*mapper0.Front()));
1106 mapper0.PopFront();
1107
1108 EXPECT_TRUE(mapper0.started());
1109 EXPECT_EQ(mapper0_count, 1u);
1110 EXPECT_EQ(mapper1_count, 0u);
1111
1112 // mapper0_count is now at 3 since the second message is not queued, but
1113 // timestamp_callback needs to be called everytime even if Front() does not
1114 // provide a message due to the replay_channels_callback.
1115 ASSERT_TRUE(mapper0.Front() != nullptr);
1116 EXPECT_EQ(mapper0_count, 3u);
1117 EXPECT_EQ(mapper1_count, 0u);
1118 output0.emplace_back(std::move(*mapper0.Front()));
1119 mapper0.PopFront();
1120
1121 EXPECT_TRUE(mapper0.started());
1122 EXPECT_EQ(mapper0_count, 3u);
1123 EXPECT_EQ(mapper1_count, 0u);
1124
1125 ASSERT_TRUE(mapper0.Front() == nullptr);
1126 EXPECT_TRUE(mapper0.started());
1127
1128 EXPECT_EQ(mapper0_count, 3u);
1129 EXPECT_EQ(mapper1_count, 0u);
1130
1131 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1132 EXPECT_EQ(output0[0].monotonic_event_time.time,
1133 e + chrono::milliseconds(1000));
1134 EXPECT_TRUE(output0[0].data != nullptr);
1135
1136 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1137 EXPECT_EQ(output0[1].monotonic_event_time.time,
1138 e + chrono::milliseconds(3000));
1139 EXPECT_TRUE(output0[1].data != nullptr);
1140 }
1141
1142 {
1143 SCOPED_TRACE("Trying node1 now");
1144 std::deque<TimestampedMessage> output1;
1145
1146 EXPECT_EQ(mapper0_count, 3u);
1147 EXPECT_EQ(mapper1_count, 0u);
1148
1149 ASSERT_TRUE(mapper1.Front() != nullptr);
1150 EXPECT_EQ(mapper0_count, 3u);
1151 EXPECT_EQ(mapper1_count, 1u);
1152 output1.emplace_back(std::move(*mapper1.Front()));
1153 mapper1.PopFront();
1154 EXPECT_TRUE(mapper1.started());
1155 EXPECT_EQ(mapper0_count, 3u);
1156 EXPECT_EQ(mapper1_count, 1u);
1157
1158 // mapper1_count is now at 3 since the second message is not queued, but
1159 // timestamp_callback needs to be called everytime even if Front() does not
1160 // provide a message due to the replay_channels_callback.
1161 ASSERT_TRUE(mapper1.Front() != nullptr);
1162 output1.emplace_back(std::move(*mapper1.Front()));
1163 mapper1.PopFront();
1164 EXPECT_TRUE(mapper1.started());
1165
1166 EXPECT_EQ(mapper0_count, 3u);
1167 EXPECT_EQ(mapper1_count, 3u);
1168
1169 ASSERT_TRUE(mapper1.Front() == nullptr);
1170
1171 EXPECT_EQ(mapper0_count, 3u);
1172 EXPECT_EQ(mapper1_count, 3u);
1173
1174 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1175 EXPECT_EQ(output1[0].monotonic_event_time.time,
1176 e + chrono::seconds(100) + chrono::milliseconds(1000));
1177 EXPECT_TRUE(output1[0].data != nullptr);
1178
1179 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1180 EXPECT_EQ(output1[1].monotonic_event_time.time,
1181 e + chrono::seconds(100) + chrono::milliseconds(3000));
1182 EXPECT_TRUE(output1[1].data != nullptr);
1183 }
1184}
Austin Schuh8bf1e632021-01-02 22:41:04 -08001185// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
1186// returned.
1187TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
1188 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1189 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001190 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001191 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001192 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001193 writer1.QueueSpan(config4_.span());
1194
Austin Schuhd863e6e2022-10-16 15:44:50 -07001195 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001196 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001197 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001198 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
1199 e + chrono::nanoseconds(971)));
1200
Austin Schuhd863e6e2022-10-16 15:44:50 -07001201 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001202 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001203 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001204 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
1205 e + chrono::nanoseconds(5458)));
1206
Austin Schuhd863e6e2022-10-16 15:44:50 -07001207 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001208 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001209 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001210 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1211 }
1212
1213 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1214
1215 for (const auto &p : parts) {
1216 LOG(INFO) << p;
1217 }
1218
1219 ASSERT_EQ(parts.size(), 1u);
1220
Austin Schuh79b30942021-01-24 22:32:21 -08001221 size_t mapper0_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001222 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001223 mapper0.set_timestamp_callback(
1224 [&](TimestampedMessage *) { ++mapper0_count; });
1225 size_t mapper1_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001226 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001227 mapper1.set_timestamp_callback(
1228 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -08001229
1230 mapper0.AddPeer(&mapper1);
1231 mapper1.AddPeer(&mapper0);
1232
1233 {
1234 std::deque<TimestampedMessage> output0;
1235
1236 for (int i = 0; i < 3; ++i) {
1237 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
1238 output0.emplace_back(std::move(*mapper0.Front()));
1239 mapper0.PopFront();
1240 }
1241
1242 ASSERT_TRUE(mapper0.Front() == nullptr);
1243
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001244 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1245 EXPECT_EQ(output0[0].monotonic_event_time.time,
1246 e + chrono::milliseconds(1000));
1247 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
1248 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
1249 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001250 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001251
1252 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1253 EXPECT_EQ(output0[1].monotonic_event_time.time,
1254 e + chrono::milliseconds(2000));
1255 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
1256 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
1257 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001258 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001259
1260 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1261 EXPECT_EQ(output0[2].monotonic_event_time.time,
1262 e + chrono::milliseconds(3000));
1263 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
1264 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
1265 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001266 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001267 }
1268
1269 {
1270 SCOPED_TRACE("Trying node1 now");
1271 std::deque<TimestampedMessage> output1;
1272
1273 for (int i = 0; i < 3; ++i) {
1274 ASSERT_TRUE(mapper1.Front() != nullptr);
1275 output1.emplace_back(std::move(*mapper1.Front()));
1276 mapper1.PopFront();
1277 }
1278
1279 ASSERT_TRUE(mapper1.Front() == nullptr);
1280
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001281 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1282 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001283 e + chrono::seconds(100) + chrono::milliseconds(1000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001284 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
1285 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001286 e + chrono::nanoseconds(971));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001287 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001288
1289 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1290 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001291 e + chrono::seconds(100) + chrono::milliseconds(2000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001292 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
1293 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001294 e + chrono::nanoseconds(5458));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001295 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001296
1297 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1298 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001299 e + chrono::seconds(100) + chrono::milliseconds(3000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001300 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
1301 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
1302 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001303 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001304 }
Austin Schuh79b30942021-01-24 22:32:21 -08001305
1306 EXPECT_EQ(mapper0_count, 3u);
1307 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001308}
1309
Austin Schuhd2f96102020-12-01 20:27:29 -08001310// Tests that we can match timestamps on delivered messages. By doing this in
1311// the reverse order, the second node needs to queue data up from the first node
1312// to find the matching timestamp.
1313TEST_F(TimestampMapperTest, ReadNode1First) {
1314 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1315 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001316 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001317 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001318 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001319 writer1.QueueSpan(config2_.span());
1320
Austin Schuhd863e6e2022-10-16 15:44:50 -07001321 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001322 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001323 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001324 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1325
Austin Schuhd863e6e2022-10-16 15:44:50 -07001326 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001327 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001328 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001329 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1330
Austin Schuhd863e6e2022-10-16 15:44:50 -07001331 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001332 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001333 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001334 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1335 }
1336
1337 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1338
1339 ASSERT_EQ(parts[0].logger_node, "pi1");
1340 ASSERT_EQ(parts[1].logger_node, "pi2");
1341
Austin Schuh79b30942021-01-24 22:32:21 -08001342 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001343 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001344 mapper0.set_timestamp_callback(
1345 [&](TimestampedMessage *) { ++mapper0_count; });
1346 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001347 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001348 mapper1.set_timestamp_callback(
1349 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001350
1351 mapper0.AddPeer(&mapper1);
1352 mapper1.AddPeer(&mapper0);
1353
1354 {
1355 SCOPED_TRACE("Trying node1 now");
1356 std::deque<TimestampedMessage> output1;
1357
1358 ASSERT_TRUE(mapper1.Front() != nullptr);
1359 output1.emplace_back(std::move(*mapper1.Front()));
1360 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001361 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001362
1363 ASSERT_TRUE(mapper1.Front() != nullptr);
1364 output1.emplace_back(std::move(*mapper1.Front()));
1365 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001366 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001367
1368 ASSERT_TRUE(mapper1.Front() != nullptr);
1369 output1.emplace_back(std::move(*mapper1.Front()));
1370 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001371 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001372
1373 ASSERT_TRUE(mapper1.Front() == nullptr);
1374
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001375 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1376 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001377 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001378 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001379
1380 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1381 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001382 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001383 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001384
1385 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1386 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001387 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001388 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001389 }
1390
1391 {
1392 std::deque<TimestampedMessage> output0;
1393
1394 ASSERT_TRUE(mapper0.Front() != nullptr);
1395 output0.emplace_back(std::move(*mapper0.Front()));
1396 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001397 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001398
1399 ASSERT_TRUE(mapper0.Front() != nullptr);
1400 output0.emplace_back(std::move(*mapper0.Front()));
1401 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001402 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001403
1404 ASSERT_TRUE(mapper0.Front() != nullptr);
1405 output0.emplace_back(std::move(*mapper0.Front()));
1406 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001407 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001408
1409 ASSERT_TRUE(mapper0.Front() == nullptr);
1410
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001411 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1412 EXPECT_EQ(output0[0].monotonic_event_time.time,
1413 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001414 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001415
1416 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1417 EXPECT_EQ(output0[1].monotonic_event_time.time,
1418 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001419 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001420
1421 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1422 EXPECT_EQ(output0[2].monotonic_event_time.time,
1423 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001424 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001425 }
Austin Schuh79b30942021-01-24 22:32:21 -08001426
1427 EXPECT_EQ(mapper0_count, 3u);
1428 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001429}
1430
1431// Tests that we return just the timestamps if we couldn't find the data and the
1432// missing data was at the beginning of the file.
1433TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1434 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1435 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001436 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001437 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001438 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001439 writer1.QueueSpan(config2_.span());
1440
1441 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001442 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001443 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1444
Austin Schuhd863e6e2022-10-16 15:44:50 -07001445 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001446 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001447 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001448 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1449
Austin Schuhd863e6e2022-10-16 15:44:50 -07001450 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001451 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001452 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001453 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1454 }
1455
1456 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1457
1458 ASSERT_EQ(parts[0].logger_node, "pi1");
1459 ASSERT_EQ(parts[1].logger_node, "pi2");
1460
Austin Schuh79b30942021-01-24 22:32:21 -08001461 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001462 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001463 mapper0.set_timestamp_callback(
1464 [&](TimestampedMessage *) { ++mapper0_count; });
1465 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001466 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001467 mapper1.set_timestamp_callback(
1468 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001469
1470 mapper0.AddPeer(&mapper1);
1471 mapper1.AddPeer(&mapper0);
1472
1473 {
1474 SCOPED_TRACE("Trying node1 now");
1475 std::deque<TimestampedMessage> output1;
1476
1477 ASSERT_TRUE(mapper1.Front() != nullptr);
1478 output1.emplace_back(std::move(*mapper1.Front()));
1479 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001480 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001481
1482 ASSERT_TRUE(mapper1.Front() != nullptr);
1483 output1.emplace_back(std::move(*mapper1.Front()));
1484 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001485 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001486
1487 ASSERT_TRUE(mapper1.Front() != nullptr);
1488 output1.emplace_back(std::move(*mapper1.Front()));
1489 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001490 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001491
1492 ASSERT_TRUE(mapper1.Front() == nullptr);
1493
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001494 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1495 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001496 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001497 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001498
1499 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1500 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001501 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001502 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001503
1504 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1505 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001506 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001507 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001508 }
Austin Schuh79b30942021-01-24 22:32:21 -08001509
1510 EXPECT_EQ(mapper0_count, 0u);
1511 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001512}
1513
1514// Tests that we return just the timestamps if we couldn't find the data and the
1515// missing data was at the end of the file.
1516TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1517 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1518 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001519 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001520 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001521 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001522 writer1.QueueSpan(config2_.span());
1523
Austin Schuhd863e6e2022-10-16 15:44:50 -07001524 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001525 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001526 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001527 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1528
Austin Schuhd863e6e2022-10-16 15:44:50 -07001529 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001530 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001531 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001532 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1533
1534 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001535 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001536 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1537 }
1538
1539 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1540
1541 ASSERT_EQ(parts[0].logger_node, "pi1");
1542 ASSERT_EQ(parts[1].logger_node, "pi2");
1543
Austin Schuh79b30942021-01-24 22:32:21 -08001544 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001545 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001546 mapper0.set_timestamp_callback(
1547 [&](TimestampedMessage *) { ++mapper0_count; });
1548 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001549 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001550 mapper1.set_timestamp_callback(
1551 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001552
1553 mapper0.AddPeer(&mapper1);
1554 mapper1.AddPeer(&mapper0);
1555
1556 {
1557 SCOPED_TRACE("Trying node1 now");
1558 std::deque<TimestampedMessage> output1;
1559
1560 ASSERT_TRUE(mapper1.Front() != nullptr);
1561 output1.emplace_back(std::move(*mapper1.Front()));
1562 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001563 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001564
1565 ASSERT_TRUE(mapper1.Front() != nullptr);
1566 output1.emplace_back(std::move(*mapper1.Front()));
1567 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001568 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001569
1570 ASSERT_TRUE(mapper1.Front() != nullptr);
1571 output1.emplace_back(std::move(*mapper1.Front()));
1572 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001573 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001574
1575 ASSERT_TRUE(mapper1.Front() == nullptr);
1576
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001577 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1578 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001579 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001580 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001581
1582 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1583 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001584 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001585 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001586
1587 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1588 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001589 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001590 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001591 }
Austin Schuh79b30942021-01-24 22:32:21 -08001592
1593 EXPECT_EQ(mapper0_count, 0u);
1594 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001595}
1596
Austin Schuh993ccb52020-12-12 15:59:32 -08001597// Tests that we handle a message which failed to forward or be logged.
1598TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1599 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1600 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001601 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001602 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001603 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001604 writer1.QueueSpan(config2_.span());
1605
Austin Schuhd863e6e2022-10-16 15:44:50 -07001606 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001607 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001608 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001609 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1610
1611 // Create both the timestamp and message, but don't log them, simulating a
1612 // forwarding drop.
1613 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1614 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1615 chrono::seconds(100));
1616
Austin Schuhd863e6e2022-10-16 15:44:50 -07001617 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001618 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001619 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001620 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1621 }
1622
1623 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1624
1625 ASSERT_EQ(parts[0].logger_node, "pi1");
1626 ASSERT_EQ(parts[1].logger_node, "pi2");
1627
Austin Schuh79b30942021-01-24 22:32:21 -08001628 size_t mapper0_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001629 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001630 mapper0.set_timestamp_callback(
1631 [&](TimestampedMessage *) { ++mapper0_count; });
1632 size_t mapper1_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001633 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001634 mapper1.set_timestamp_callback(
1635 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001636
1637 mapper0.AddPeer(&mapper1);
1638 mapper1.AddPeer(&mapper0);
1639
1640 {
1641 std::deque<TimestampedMessage> output1;
1642
1643 ASSERT_TRUE(mapper1.Front() != nullptr);
1644 output1.emplace_back(std::move(*mapper1.Front()));
1645 mapper1.PopFront();
1646
1647 ASSERT_TRUE(mapper1.Front() != nullptr);
1648 output1.emplace_back(std::move(*mapper1.Front()));
1649
1650 ASSERT_FALSE(mapper1.Front() == nullptr);
1651
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001652 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1653 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001654 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001655 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001656
1657 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1658 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001659 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001660 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh993ccb52020-12-12 15:59:32 -08001661 }
Austin Schuh79b30942021-01-24 22:32:21 -08001662
1663 EXPECT_EQ(mapper0_count, 0u);
1664 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001665}
1666
Austin Schuhd2f96102020-12-01 20:27:29 -08001667// Tests that we properly sort log files with duplicate timestamps.
1668TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1669 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1670 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001671 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001672 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001673 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001674 writer1.QueueSpan(config2_.span());
1675
Austin Schuhd863e6e2022-10-16 15:44:50 -07001676 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001677 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001678 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001679 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1680
Austin Schuhd863e6e2022-10-16 15:44:50 -07001681 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001682 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001683 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001684 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1685
Austin Schuhd863e6e2022-10-16 15:44:50 -07001686 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001687 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001688 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001689 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1690
Austin Schuhd863e6e2022-10-16 15:44:50 -07001691 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001692 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001693 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001694 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1695 }
1696
1697 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1698
1699 ASSERT_EQ(parts[0].logger_node, "pi1");
1700 ASSERT_EQ(parts[1].logger_node, "pi2");
1701
Austin Schuh79b30942021-01-24 22:32:21 -08001702 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001703 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001704 mapper0.set_timestamp_callback(
1705 [&](TimestampedMessage *) { ++mapper0_count; });
1706 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001707 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001708 mapper1.set_timestamp_callback(
1709 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001710
1711 mapper0.AddPeer(&mapper1);
1712 mapper1.AddPeer(&mapper0);
1713
1714 {
1715 SCOPED_TRACE("Trying node1 now");
1716 std::deque<TimestampedMessage> output1;
1717
1718 for (int i = 0; i < 4; ++i) {
1719 ASSERT_TRUE(mapper1.Front() != nullptr);
1720 output1.emplace_back(std::move(*mapper1.Front()));
1721 mapper1.PopFront();
1722 }
1723 ASSERT_TRUE(mapper1.Front() == nullptr);
1724
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001725 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1726 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001727 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001728 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001729
1730 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1731 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001732 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001733 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001734
1735 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1736 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001737 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001738 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001739
1740 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1741 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001742 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001743 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001744 }
Austin Schuh79b30942021-01-24 22:32:21 -08001745
1746 EXPECT_EQ(mapper0_count, 0u);
1747 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001748}
1749
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001750// Tests that we properly produce a valid start time.
Austin Schuhd2f96102020-12-01 20:27:29 -08001751TEST_F(TimestampMapperTest, StartTime) {
1752 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1753 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001754 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001755 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001756 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001757 writer1.QueueSpan(config1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001758 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001759 writer2.QueueSpan(config3_.span());
1760 }
1761
1762 const std::vector<LogFile> parts =
1763 SortParts({logfile0_, logfile1_, logfile2_});
1764
Austin Schuh79b30942021-01-24 22:32:21 -08001765 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001766 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001767 mapper0.set_timestamp_callback(
1768 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001769
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001770 EXPECT_EQ(mapper0.monotonic_start_time(0), e + chrono::milliseconds(1));
1771 EXPECT_EQ(mapper0.realtime_start_time(0),
Austin Schuhd2f96102020-12-01 20:27:29 -08001772 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001773 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001774}
1775
Austin Schuhfecf1d82020-12-19 16:57:28 -08001776// Tests that when a peer isn't registered, we treat that as if there was no
1777// data available.
1778TEST_F(TimestampMapperTest, NoPeer) {
1779 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1780 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001781 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001782 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001783 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001784 writer1.QueueSpan(config2_.span());
1785
1786 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001787 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001788 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1789
Austin Schuhd863e6e2022-10-16 15:44:50 -07001790 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001791 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001792 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001793 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1794
Austin Schuhd863e6e2022-10-16 15:44:50 -07001795 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001796 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001797 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001798 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1799 }
1800
1801 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1802
1803 ASSERT_EQ(parts[0].logger_node, "pi1");
1804 ASSERT_EQ(parts[1].logger_node, "pi2");
1805
Austin Schuh79b30942021-01-24 22:32:21 -08001806 size_t mapper1_count = 0;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001807 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001808 mapper1.set_timestamp_callback(
1809 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001810
1811 {
1812 std::deque<TimestampedMessage> output1;
1813
1814 ASSERT_TRUE(mapper1.Front() != nullptr);
1815 output1.emplace_back(std::move(*mapper1.Front()));
1816 mapper1.PopFront();
1817 ASSERT_TRUE(mapper1.Front() != nullptr);
1818 output1.emplace_back(std::move(*mapper1.Front()));
1819 mapper1.PopFront();
1820 ASSERT_TRUE(mapper1.Front() != nullptr);
1821 output1.emplace_back(std::move(*mapper1.Front()));
1822 mapper1.PopFront();
1823 ASSERT_TRUE(mapper1.Front() == nullptr);
1824
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001825 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1826 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001827 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001828 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001829
1830 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1831 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001832 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001833 EXPECT_FALSE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001834
1835 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1836 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001837 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001838 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001839 }
Austin Schuh79b30942021-01-24 22:32:21 -08001840 EXPECT_EQ(mapper1_count, 3u);
1841}
1842
1843// Tests that we can queue messages and call the timestamp callback for both
1844// nodes.
1845TEST_F(TimestampMapperTest, QueueUntilNode0) {
1846 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1847 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001848 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh79b30942021-01-24 22:32:21 -08001849 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001850 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh79b30942021-01-24 22:32:21 -08001851 writer1.QueueSpan(config2_.span());
1852
Austin Schuhd863e6e2022-10-16 15:44:50 -07001853 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001854 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001855 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001856 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1857
Austin Schuhd863e6e2022-10-16 15:44:50 -07001858 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001859 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001860 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001861 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1862
Austin Schuhd863e6e2022-10-16 15:44:50 -07001863 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001864 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001865 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001866 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1867
Austin Schuhd863e6e2022-10-16 15:44:50 -07001868 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001869 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001870 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001871 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1872 }
1873
1874 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1875
1876 ASSERT_EQ(parts[0].logger_node, "pi1");
1877 ASSERT_EQ(parts[1].logger_node, "pi2");
1878
1879 size_t mapper0_count = 0;
1880 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1881 mapper0.set_timestamp_callback(
1882 [&](TimestampedMessage *) { ++mapper0_count; });
1883 size_t mapper1_count = 0;
1884 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1885 mapper1.set_timestamp_callback(
1886 [&](TimestampedMessage *) { ++mapper1_count; });
1887
1888 mapper0.AddPeer(&mapper1);
1889 mapper1.AddPeer(&mapper0);
1890
1891 {
1892 std::deque<TimestampedMessage> output0;
1893
1894 EXPECT_EQ(mapper0_count, 0u);
1895 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001896 mapper0.QueueUntil(
1897 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001898 EXPECT_EQ(mapper0_count, 3u);
1899 EXPECT_EQ(mapper1_count, 0u);
1900
1901 ASSERT_TRUE(mapper0.Front() != nullptr);
1902 EXPECT_EQ(mapper0_count, 3u);
1903 EXPECT_EQ(mapper1_count, 0u);
1904
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001905 mapper0.QueueUntil(
1906 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001907 EXPECT_EQ(mapper0_count, 3u);
1908 EXPECT_EQ(mapper1_count, 0u);
1909
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001910 mapper0.QueueUntil(
1911 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001912 EXPECT_EQ(mapper0_count, 4u);
1913 EXPECT_EQ(mapper1_count, 0u);
1914
1915 output0.emplace_back(std::move(*mapper0.Front()));
1916 mapper0.PopFront();
1917 output0.emplace_back(std::move(*mapper0.Front()));
1918 mapper0.PopFront();
1919 output0.emplace_back(std::move(*mapper0.Front()));
1920 mapper0.PopFront();
1921 output0.emplace_back(std::move(*mapper0.Front()));
1922 mapper0.PopFront();
1923
1924 EXPECT_EQ(mapper0_count, 4u);
1925 EXPECT_EQ(mapper1_count, 0u);
1926
1927 ASSERT_TRUE(mapper0.Front() == nullptr);
1928
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001929 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1930 EXPECT_EQ(output0[0].monotonic_event_time.time,
1931 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001932 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001933
1934 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1935 EXPECT_EQ(output0[1].monotonic_event_time.time,
1936 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001937 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001938
1939 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1940 EXPECT_EQ(output0[2].monotonic_event_time.time,
1941 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001942 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001943
1944 EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
1945 EXPECT_EQ(output0[3].monotonic_event_time.time,
1946 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001947 EXPECT_TRUE(output0[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001948 }
1949
1950 {
1951 SCOPED_TRACE("Trying node1 now");
1952 std::deque<TimestampedMessage> output1;
1953
1954 EXPECT_EQ(mapper0_count, 4u);
1955 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001956 mapper1.QueueUntil(BootTimestamp{
1957 .boot = 0,
1958 .time = e + chrono::seconds(100) + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001959 EXPECT_EQ(mapper0_count, 4u);
1960 EXPECT_EQ(mapper1_count, 3u);
1961
1962 ASSERT_TRUE(mapper1.Front() != nullptr);
1963 EXPECT_EQ(mapper0_count, 4u);
1964 EXPECT_EQ(mapper1_count, 3u);
1965
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001966 mapper1.QueueUntil(BootTimestamp{
1967 .boot = 0,
1968 .time = e + chrono::seconds(100) + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001969 EXPECT_EQ(mapper0_count, 4u);
1970 EXPECT_EQ(mapper1_count, 3u);
1971
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001972 mapper1.QueueUntil(BootTimestamp{
1973 .boot = 0,
1974 .time = e + chrono::seconds(100) + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001975 EXPECT_EQ(mapper0_count, 4u);
1976 EXPECT_EQ(mapper1_count, 4u);
1977
1978 ASSERT_TRUE(mapper1.Front() != nullptr);
1979 EXPECT_EQ(mapper0_count, 4u);
1980 EXPECT_EQ(mapper1_count, 4u);
1981
1982 output1.emplace_back(std::move(*mapper1.Front()));
1983 mapper1.PopFront();
1984 ASSERT_TRUE(mapper1.Front() != nullptr);
1985 output1.emplace_back(std::move(*mapper1.Front()));
1986 mapper1.PopFront();
1987 ASSERT_TRUE(mapper1.Front() != nullptr);
1988 output1.emplace_back(std::move(*mapper1.Front()));
1989 mapper1.PopFront();
1990 ASSERT_TRUE(mapper1.Front() != nullptr);
1991 output1.emplace_back(std::move(*mapper1.Front()));
1992 mapper1.PopFront();
1993
1994 EXPECT_EQ(mapper0_count, 4u);
1995 EXPECT_EQ(mapper1_count, 4u);
1996
1997 ASSERT_TRUE(mapper1.Front() == nullptr);
1998
1999 EXPECT_EQ(mapper0_count, 4u);
2000 EXPECT_EQ(mapper1_count, 4u);
2001
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002002 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2003 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002004 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002005 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002006
2007 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
2008 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002009 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002010 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002011
2012 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
2013 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002014 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002015 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002016
2017 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
2018 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002019 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002020 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08002021 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08002022}
2023
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002024class BootMergerTest : public SortingElementTest {
2025 public:
2026 BootMergerTest()
2027 : SortingElementTest(),
2028 boot0_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002029 /* 100ms */
2030 "max_out_of_order_duration": 100000000,
2031 "node": {
2032 "name": "pi2"
2033 },
2034 "logger_node": {
2035 "name": "pi1"
2036 },
2037 "monotonic_start_time": 1000000,
2038 "realtime_start_time": 1000000000000,
2039 "logger_monotonic_start_time": 1000000,
2040 "logger_realtime_start_time": 1000000000000,
2041 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2042 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
2043 "parts_index": 0,
2044 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2045 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002046 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2047 "boot_uuids": [
2048 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2049 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2050 ""
2051 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002052})")),
2053 boot1_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002054 /* 100ms */
2055 "max_out_of_order_duration": 100000000,
2056 "node": {
2057 "name": "pi2"
2058 },
2059 "logger_node": {
2060 "name": "pi1"
2061 },
2062 "monotonic_start_time": 1000000,
2063 "realtime_start_time": 1000000000000,
2064 "logger_monotonic_start_time": 1000000,
2065 "logger_realtime_start_time": 1000000000000,
2066 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2067 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
2068 "parts_index": 1,
2069 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2070 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002071 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2072 "boot_uuids": [
2073 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2074 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2075 ""
2076 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002077})")) {}
Austin Schuh8bee6882021-06-28 21:03:28 -07002078
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002079 protected:
2080 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
2081 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
2082};
2083
2084// This tests that we can properly sort a multi-node log file which has the old
2085// (and buggy) timestamps in the header, and the non-resetting parts_index.
2086// These make it so we can just bairly figure out what happened first and what
2087// happened second, but not in a way that is robust to multiple nodes rebooting.
2088TEST_F(BootMergerTest, OldReboot) {
Austin Schuh8bee6882021-06-28 21:03:28 -07002089 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002090 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002091 writer.QueueSpan(boot0_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002092 }
2093 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002094 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002095 writer.QueueSpan(boot1_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002096 }
2097
2098 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
2099
2100 ASSERT_EQ(parts.size(), 1u);
2101 ASSERT_EQ(parts[0].parts.size(), 2u);
2102
2103 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
2104 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002105 boot0_.message().source_node_boot_uuid()->string_view());
Austin Schuh8bee6882021-06-28 21:03:28 -07002106
2107 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
2108 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002109 boot1_.message().source_node_boot_uuid()->string_view());
2110}
2111
2112// This tests that we can produce messages ordered across a reboot.
2113TEST_F(BootMergerTest, SortAcrossReboot) {
2114 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2115 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002116 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002117 writer.QueueSpan(boot0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002118 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002119 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002120 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002121 MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
2122 }
2123 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002124 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002125 writer.QueueSpan(boot1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002126 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002127 MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002128 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002129 MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
2130 }
2131
2132 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
2133 ASSERT_EQ(parts.size(), 1u);
2134 ASSERT_EQ(parts[0].parts.size(), 2u);
2135
2136 BootMerger merger(FilterPartsForNode(parts, "pi2"));
2137
2138 EXPECT_EQ(merger.node(), 1u);
2139
2140 std::vector<Message> output;
2141 for (int i = 0; i < 4; ++i) {
2142 ASSERT_TRUE(merger.Front() != nullptr);
2143 output.emplace_back(std::move(*merger.Front()));
2144 merger.PopFront();
2145 }
2146
2147 ASSERT_TRUE(merger.Front() == nullptr);
2148
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002149 EXPECT_EQ(output[0].timestamp.boot, 0u);
2150 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
2151 EXPECT_EQ(output[1].timestamp.boot, 0u);
2152 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(2000));
2153
2154 EXPECT_EQ(output[2].timestamp.boot, 1u);
2155 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(100));
2156 EXPECT_EQ(output[3].timestamp.boot, 1u);
2157 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
Austin Schuh8bee6882021-06-28 21:03:28 -07002158}
2159
Austin Schuh48507722021-07-17 17:29:24 -07002160class RebootTimestampMapperTest : public SortingElementTest {
2161 public:
2162 RebootTimestampMapperTest()
2163 : SortingElementTest(),
2164 boot0a_(MakeHeader(config_, R"({
2165 /* 100ms */
2166 "max_out_of_order_duration": 100000000,
2167 "node": {
2168 "name": "pi1"
2169 },
2170 "logger_node": {
2171 "name": "pi1"
2172 },
2173 "monotonic_start_time": 1000000,
2174 "realtime_start_time": 1000000000000,
2175 "logger_monotonic_start_time": 1000000,
2176 "logger_realtime_start_time": 1000000000000,
2177 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2178 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2179 "parts_index": 0,
2180 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2181 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2182 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2183 "boot_uuids": [
2184 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2185 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2186 ""
2187 ]
2188})")),
2189 boot0b_(MakeHeader(config_, R"({
2190 /* 100ms */
2191 "max_out_of_order_duration": 100000000,
2192 "node": {
2193 "name": "pi1"
2194 },
2195 "logger_node": {
2196 "name": "pi1"
2197 },
2198 "monotonic_start_time": 1000000,
2199 "realtime_start_time": 1000000000000,
2200 "logger_monotonic_start_time": 1000000,
2201 "logger_realtime_start_time": 1000000000000,
2202 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2203 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2204 "parts_index": 1,
2205 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2206 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2207 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2208 "boot_uuids": [
2209 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2210 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2211 ""
2212 ]
2213})")),
2214 boot1a_(MakeHeader(config_, R"({
2215 /* 100ms */
2216 "max_out_of_order_duration": 100000000,
2217 "node": {
2218 "name": "pi2"
2219 },
2220 "logger_node": {
2221 "name": "pi1"
2222 },
2223 "monotonic_start_time": 1000000,
2224 "realtime_start_time": 1000000000000,
2225 "logger_monotonic_start_time": 1000000,
2226 "logger_realtime_start_time": 1000000000000,
2227 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2228 "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
2229 "parts_index": 0,
2230 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2231 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2232 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2233 "boot_uuids": [
2234 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2235 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2236 ""
2237 ]
2238})")),
2239 boot1b_(MakeHeader(config_, R"({
2240 /* 100ms */
2241 "max_out_of_order_duration": 100000000,
2242 "node": {
2243 "name": "pi2"
2244 },
2245 "logger_node": {
2246 "name": "pi1"
2247 },
2248 "monotonic_start_time": 1000000,
2249 "realtime_start_time": 1000000000000,
2250 "logger_monotonic_start_time": 1000000,
2251 "logger_realtime_start_time": 1000000000000,
2252 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2253 "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
2254 "parts_index": 1,
2255 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2256 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2257 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2258 "boot_uuids": [
2259 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2260 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2261 ""
2262 ]
2263})")) {}
2264
2265 protected:
2266 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
2267 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
2268 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
2269 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
2270};
2271
Austin Schuh48507722021-07-17 17:29:24 -07002272// Tests that we can match timestamps on delivered messages in the presence of
2273// reboots on the node receiving timestamps.
2274TEST_F(RebootTimestampMapperTest, ReadNode0First) {
2275 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2276 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002277 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002278 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002279 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002280 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002281 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002282 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002283 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002284 writer1b.QueueSpan(boot1b_.span());
2285
Austin Schuhd863e6e2022-10-16 15:44:50 -07002286 writer0a.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002287 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002288 writer1a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002289 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
2290 e + chrono::milliseconds(1001)));
2291
Austin Schuhd863e6e2022-10-16 15:44:50 -07002292 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh58646e22021-08-23 23:51:46 -07002293 e + chrono::milliseconds(1000), 0, chrono::seconds(21),
2294 e + chrono::milliseconds(2001)));
2295
Austin Schuhd863e6e2022-10-16 15:44:50 -07002296 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002297 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002298 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002299 e + chrono::milliseconds(2000), 0, chrono::seconds(20),
2300 e + chrono::milliseconds(2001)));
2301
Austin Schuhd863e6e2022-10-16 15:44:50 -07002302 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002303 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002304 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002305 e + chrono::milliseconds(3000), 0, chrono::seconds(20),
2306 e + chrono::milliseconds(3001)));
2307 }
2308
Austin Schuh58646e22021-08-23 23:51:46 -07002309 const std::vector<LogFile> parts =
2310 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Austin Schuh48507722021-07-17 17:29:24 -07002311
2312 for (const auto &x : parts) {
2313 LOG(INFO) << x;
2314 }
2315 ASSERT_EQ(parts.size(), 1u);
2316 ASSERT_EQ(parts[0].logger_node, "pi1");
2317
2318 size_t mapper0_count = 0;
2319 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2320 mapper0.set_timestamp_callback(
2321 [&](TimestampedMessage *) { ++mapper0_count; });
2322 size_t mapper1_count = 0;
2323 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2324 mapper1.set_timestamp_callback(
2325 [&](TimestampedMessage *) { ++mapper1_count; });
2326
2327 mapper0.AddPeer(&mapper1);
2328 mapper1.AddPeer(&mapper0);
2329
2330 {
2331 std::deque<TimestampedMessage> output0;
2332
2333 EXPECT_EQ(mapper0_count, 0u);
2334 EXPECT_EQ(mapper1_count, 0u);
2335 ASSERT_TRUE(mapper0.Front() != nullptr);
2336 EXPECT_EQ(mapper0_count, 1u);
2337 EXPECT_EQ(mapper1_count, 0u);
2338 output0.emplace_back(std::move(*mapper0.Front()));
2339 mapper0.PopFront();
2340 EXPECT_TRUE(mapper0.started());
2341 EXPECT_EQ(mapper0_count, 1u);
2342 EXPECT_EQ(mapper1_count, 0u);
2343
2344 ASSERT_TRUE(mapper0.Front() != nullptr);
2345 EXPECT_EQ(mapper0_count, 2u);
2346 EXPECT_EQ(mapper1_count, 0u);
2347 output0.emplace_back(std::move(*mapper0.Front()));
2348 mapper0.PopFront();
2349 EXPECT_TRUE(mapper0.started());
2350
2351 ASSERT_TRUE(mapper0.Front() != nullptr);
2352 output0.emplace_back(std::move(*mapper0.Front()));
2353 mapper0.PopFront();
2354 EXPECT_TRUE(mapper0.started());
2355
2356 EXPECT_EQ(mapper0_count, 3u);
2357 EXPECT_EQ(mapper1_count, 0u);
2358
2359 ASSERT_TRUE(mapper0.Front() == nullptr);
2360
2361 LOG(INFO) << output0[0];
2362 LOG(INFO) << output0[1];
2363 LOG(INFO) << output0[2];
2364
2365 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2366 EXPECT_EQ(output0[0].monotonic_event_time.time,
2367 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002368 EXPECT_EQ(output0[0].queue_index,
2369 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002370 EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
2371 EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002372 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002373
2374 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2375 EXPECT_EQ(output0[1].monotonic_event_time.time,
2376 e + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002377 EXPECT_EQ(output0[1].queue_index,
2378 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002379 EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
2380 EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002381 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002382
2383 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2384 EXPECT_EQ(output0[2].monotonic_event_time.time,
2385 e + chrono::milliseconds(3000));
Austin Schuh58646e22021-08-23 23:51:46 -07002386 EXPECT_EQ(output0[2].queue_index,
2387 (BootQueueIndex{.boot = 0u, .index = 2u}));
Austin Schuh48507722021-07-17 17:29:24 -07002388 EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
2389 EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002390 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002391 }
2392
2393 {
2394 SCOPED_TRACE("Trying node1 now");
2395 std::deque<TimestampedMessage> output1;
2396
2397 EXPECT_EQ(mapper0_count, 3u);
2398 EXPECT_EQ(mapper1_count, 0u);
2399
2400 ASSERT_TRUE(mapper1.Front() != nullptr);
2401 EXPECT_EQ(mapper0_count, 3u);
2402 EXPECT_EQ(mapper1_count, 1u);
2403 output1.emplace_back(std::move(*mapper1.Front()));
2404 mapper1.PopFront();
2405 EXPECT_TRUE(mapper1.started());
2406 EXPECT_EQ(mapper0_count, 3u);
2407 EXPECT_EQ(mapper1_count, 1u);
2408
2409 ASSERT_TRUE(mapper1.Front() != nullptr);
2410 EXPECT_EQ(mapper0_count, 3u);
2411 EXPECT_EQ(mapper1_count, 2u);
2412 output1.emplace_back(std::move(*mapper1.Front()));
2413 mapper1.PopFront();
2414 EXPECT_TRUE(mapper1.started());
2415
2416 ASSERT_TRUE(mapper1.Front() != nullptr);
2417 output1.emplace_back(std::move(*mapper1.Front()));
2418 mapper1.PopFront();
2419 EXPECT_TRUE(mapper1.started());
2420
Austin Schuh58646e22021-08-23 23:51:46 -07002421 ASSERT_TRUE(mapper1.Front() != nullptr);
2422 output1.emplace_back(std::move(*mapper1.Front()));
2423 mapper1.PopFront();
2424 EXPECT_TRUE(mapper1.started());
2425
Austin Schuh48507722021-07-17 17:29:24 -07002426 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002427 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002428
2429 ASSERT_TRUE(mapper1.Front() == nullptr);
2430
2431 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002432 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002433
2434 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2435 EXPECT_EQ(output1[0].monotonic_event_time.time,
2436 e + chrono::seconds(100) + chrono::milliseconds(1000));
2437 EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
2438 EXPECT_EQ(output1[0].monotonic_remote_time.time,
2439 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002440 EXPECT_EQ(output1[0].remote_queue_index,
2441 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002442 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
2443 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
2444 e + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002445 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002446
2447 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2448 EXPECT_EQ(output1[1].monotonic_event_time.time,
2449 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002450 EXPECT_EQ(output1[1].remote_queue_index,
2451 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002452 EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
2453 EXPECT_EQ(output1[1].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002454 e + chrono::milliseconds(1000));
Austin Schuh48507722021-07-17 17:29:24 -07002455 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
2456 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
2457 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002458 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002459
2460 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2461 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002462 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh48507722021-07-17 17:29:24 -07002463 EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
2464 EXPECT_EQ(output1[2].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002465 e + chrono::milliseconds(2000));
2466 EXPECT_EQ(output1[2].remote_queue_index,
2467 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002468 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
2469 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002470 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002471 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002472
Austin Schuh58646e22021-08-23 23:51:46 -07002473 EXPECT_EQ(output1[3].monotonic_event_time.boot, 1u);
2474 EXPECT_EQ(output1[3].monotonic_event_time.time,
2475 e + chrono::seconds(20) + chrono::milliseconds(3000));
2476 EXPECT_EQ(output1[3].monotonic_remote_time.boot, 0u);
2477 EXPECT_EQ(output1[3].monotonic_remote_time.time,
2478 e + chrono::milliseconds(3000));
2479 EXPECT_EQ(output1[3].remote_queue_index,
2480 (BootQueueIndex{.boot = 0u, .index = 2u}));
2481 EXPECT_EQ(output1[3].monotonic_timestamp_time.boot, 0u);
2482 EXPECT_EQ(output1[3].monotonic_timestamp_time.time,
2483 e + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002484 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh58646e22021-08-23 23:51:46 -07002485
Austin Schuh48507722021-07-17 17:29:24 -07002486 LOG(INFO) << output1[0];
2487 LOG(INFO) << output1[1];
2488 LOG(INFO) << output1[2];
Austin Schuh58646e22021-08-23 23:51:46 -07002489 LOG(INFO) << output1[3];
Austin Schuh48507722021-07-17 17:29:24 -07002490 }
2491}
2492
2493TEST_F(RebootTimestampMapperTest, Node2Reboot) {
2494 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2495 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002496 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002497 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002498 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002499 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002500 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002501 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002502 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002503 writer1b.QueueSpan(boot1b_.span());
2504
Austin Schuhd863e6e2022-10-16 15:44:50 -07002505 writer1a.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002506 e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002507 writer0a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002508 e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
2509 chrono::seconds(-100),
2510 e + chrono::seconds(100) + chrono::milliseconds(1001)));
2511
Austin Schuhd863e6e2022-10-16 15:44:50 -07002512 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002513 e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002514 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002515 e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
2516 chrono::seconds(-20),
2517 e + chrono::seconds(20) + chrono::milliseconds(2001)));
2518
Austin Schuhd863e6e2022-10-16 15:44:50 -07002519 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002520 e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002521 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002522 e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
2523 chrono::seconds(-20),
2524 e + chrono::seconds(20) + chrono::milliseconds(3001)));
2525 }
2526
2527 const std::vector<LogFile> parts =
2528 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2529
2530 for (const auto &x : parts) {
2531 LOG(INFO) << x;
2532 }
2533 ASSERT_EQ(parts.size(), 1u);
2534 ASSERT_EQ(parts[0].logger_node, "pi1");
2535
2536 size_t mapper0_count = 0;
2537 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2538 mapper0.set_timestamp_callback(
2539 [&](TimestampedMessage *) { ++mapper0_count; });
2540 size_t mapper1_count = 0;
2541 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2542 mapper1.set_timestamp_callback(
2543 [&](TimestampedMessage *) { ++mapper1_count; });
2544
2545 mapper0.AddPeer(&mapper1);
2546 mapper1.AddPeer(&mapper0);
2547
2548 {
2549 std::deque<TimestampedMessage> output0;
2550
2551 EXPECT_EQ(mapper0_count, 0u);
2552 EXPECT_EQ(mapper1_count, 0u);
2553 ASSERT_TRUE(mapper0.Front() != nullptr);
2554 EXPECT_EQ(mapper0_count, 1u);
2555 EXPECT_EQ(mapper1_count, 0u);
2556 output0.emplace_back(std::move(*mapper0.Front()));
2557 mapper0.PopFront();
2558 EXPECT_TRUE(mapper0.started());
2559 EXPECT_EQ(mapper0_count, 1u);
2560 EXPECT_EQ(mapper1_count, 0u);
2561
2562 ASSERT_TRUE(mapper0.Front() != nullptr);
2563 EXPECT_EQ(mapper0_count, 2u);
2564 EXPECT_EQ(mapper1_count, 0u);
2565 output0.emplace_back(std::move(*mapper0.Front()));
2566 mapper0.PopFront();
2567 EXPECT_TRUE(mapper0.started());
2568
2569 ASSERT_TRUE(mapper0.Front() != nullptr);
2570 output0.emplace_back(std::move(*mapper0.Front()));
2571 mapper0.PopFront();
2572 EXPECT_TRUE(mapper0.started());
2573
2574 EXPECT_EQ(mapper0_count, 3u);
2575 EXPECT_EQ(mapper1_count, 0u);
2576
2577 ASSERT_TRUE(mapper0.Front() == nullptr);
2578
2579 LOG(INFO) << output0[0];
2580 LOG(INFO) << output0[1];
2581 LOG(INFO) << output0[2];
2582
2583 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2584 EXPECT_EQ(output0[0].monotonic_event_time.time,
2585 e + chrono::milliseconds(1000));
2586 EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
2587 EXPECT_EQ(output0[0].monotonic_remote_time.time,
2588 e + chrono::seconds(100) + chrono::milliseconds(1000));
2589 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
2590 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
2591 e + chrono::seconds(100) + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002592 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002593
2594 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2595 EXPECT_EQ(output0[1].monotonic_event_time.time,
2596 e + chrono::milliseconds(2000));
2597 EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
2598 EXPECT_EQ(output0[1].monotonic_remote_time.time,
2599 e + chrono::seconds(20) + chrono::milliseconds(2000));
2600 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
2601 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
2602 e + chrono::seconds(20) + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002603 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002604
2605 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2606 EXPECT_EQ(output0[2].monotonic_event_time.time,
2607 e + chrono::milliseconds(3000));
2608 EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
2609 EXPECT_EQ(output0[2].monotonic_remote_time.time,
2610 e + chrono::seconds(20) + chrono::milliseconds(3000));
2611 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
2612 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
2613 e + chrono::seconds(20) + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002614 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002615 }
2616
2617 {
2618 SCOPED_TRACE("Trying node1 now");
2619 std::deque<TimestampedMessage> output1;
2620
2621 EXPECT_EQ(mapper0_count, 3u);
2622 EXPECT_EQ(mapper1_count, 0u);
2623
2624 ASSERT_TRUE(mapper1.Front() != nullptr);
2625 EXPECT_EQ(mapper0_count, 3u);
2626 EXPECT_EQ(mapper1_count, 1u);
2627 output1.emplace_back(std::move(*mapper1.Front()));
2628 mapper1.PopFront();
2629 EXPECT_TRUE(mapper1.started());
2630 EXPECT_EQ(mapper0_count, 3u);
2631 EXPECT_EQ(mapper1_count, 1u);
2632
2633 ASSERT_TRUE(mapper1.Front() != nullptr);
2634 EXPECT_EQ(mapper0_count, 3u);
2635 EXPECT_EQ(mapper1_count, 2u);
2636 output1.emplace_back(std::move(*mapper1.Front()));
2637 mapper1.PopFront();
2638 EXPECT_TRUE(mapper1.started());
2639
2640 ASSERT_TRUE(mapper1.Front() != nullptr);
2641 output1.emplace_back(std::move(*mapper1.Front()));
2642 mapper1.PopFront();
2643 EXPECT_TRUE(mapper1.started());
2644
2645 EXPECT_EQ(mapper0_count, 3u);
2646 EXPECT_EQ(mapper1_count, 3u);
2647
2648 ASSERT_TRUE(mapper1.Front() == nullptr);
2649
2650 EXPECT_EQ(mapper0_count, 3u);
2651 EXPECT_EQ(mapper1_count, 3u);
2652
2653 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2654 EXPECT_EQ(output1[0].monotonic_event_time.time,
2655 e + chrono::seconds(100) + chrono::milliseconds(1000));
2656 EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
2657 EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002658 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002659
2660 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2661 EXPECT_EQ(output1[1].monotonic_event_time.time,
2662 e + chrono::seconds(20) + chrono::milliseconds(2000));
2663 EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
2664 EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002665 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002666
2667 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2668 EXPECT_EQ(output1[2].monotonic_event_time.time,
2669 e + chrono::seconds(20) + chrono::milliseconds(3000));
2670 EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
2671 EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002672 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002673
2674 LOG(INFO) << output1[0];
2675 LOG(INFO) << output1[1];
2676 LOG(INFO) << output1[2];
2677 }
2678}
2679
Austin Schuh44c61472021-11-22 21:04:10 -08002680class SortingDeathTest : public SortingElementTest {
2681 public:
2682 SortingDeathTest()
2683 : SortingElementTest(),
2684 part0_(MakeHeader(config_, R"({
2685 /* 100ms */
2686 "max_out_of_order_duration": 100000000,
2687 "node": {
2688 "name": "pi1"
2689 },
2690 "logger_node": {
2691 "name": "pi1"
2692 },
2693 "monotonic_start_time": 1000000,
2694 "realtime_start_time": 1000000000000,
2695 "logger_monotonic_start_time": 1000000,
2696 "logger_realtime_start_time": 1000000000000,
2697 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2698 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2699 "parts_index": 0,
2700 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2701 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2702 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2703 "boot_uuids": [
2704 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2705 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2706 ""
2707 ],
2708 "oldest_remote_monotonic_timestamps": [
2709 9223372036854775807,
2710 9223372036854775807,
2711 9223372036854775807
2712 ],
2713 "oldest_local_monotonic_timestamps": [
2714 9223372036854775807,
2715 9223372036854775807,
2716 9223372036854775807
2717 ],
2718 "oldest_remote_unreliable_monotonic_timestamps": [
2719 9223372036854775807,
2720 0,
2721 9223372036854775807
2722 ],
2723 "oldest_local_unreliable_monotonic_timestamps": [
2724 9223372036854775807,
2725 0,
2726 9223372036854775807
2727 ]
2728})")),
2729 part1_(MakeHeader(config_, R"({
2730 /* 100ms */
2731 "max_out_of_order_duration": 100000000,
2732 "node": {
2733 "name": "pi1"
2734 },
2735 "logger_node": {
2736 "name": "pi1"
2737 },
2738 "monotonic_start_time": 1000000,
2739 "realtime_start_time": 1000000000000,
2740 "logger_monotonic_start_time": 1000000,
2741 "logger_realtime_start_time": 1000000000000,
2742 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2743 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2744 "parts_index": 1,
2745 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2746 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2747 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2748 "boot_uuids": [
2749 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2750 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2751 ""
2752 ],
2753 "oldest_remote_monotonic_timestamps": [
2754 9223372036854775807,
2755 9223372036854775807,
2756 9223372036854775807
2757 ],
2758 "oldest_local_monotonic_timestamps": [
2759 9223372036854775807,
2760 9223372036854775807,
2761 9223372036854775807
2762 ],
2763 "oldest_remote_unreliable_monotonic_timestamps": [
2764 9223372036854775807,
2765 100000,
2766 9223372036854775807
2767 ],
2768 "oldest_local_unreliable_monotonic_timestamps": [
2769 9223372036854775807,
2770 100000,
2771 9223372036854775807
2772 ]
2773})")),
2774 part2_(MakeHeader(config_, R"({
2775 /* 100ms */
2776 "max_out_of_order_duration": 100000000,
2777 "node": {
2778 "name": "pi1"
2779 },
2780 "logger_node": {
2781 "name": "pi1"
2782 },
2783 "monotonic_start_time": 1000000,
2784 "realtime_start_time": 1000000000000,
2785 "logger_monotonic_start_time": 1000000,
2786 "logger_realtime_start_time": 1000000000000,
2787 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2788 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2789 "parts_index": 2,
2790 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2791 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2792 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2793 "boot_uuids": [
2794 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2795 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2796 ""
2797 ],
2798 "oldest_remote_monotonic_timestamps": [
2799 9223372036854775807,
2800 9223372036854775807,
2801 9223372036854775807
2802 ],
2803 "oldest_local_monotonic_timestamps": [
2804 9223372036854775807,
2805 9223372036854775807,
2806 9223372036854775807
2807 ],
2808 "oldest_remote_unreliable_monotonic_timestamps": [
2809 9223372036854775807,
2810 200000,
2811 9223372036854775807
2812 ],
2813 "oldest_local_unreliable_monotonic_timestamps": [
2814 9223372036854775807,
2815 200000,
2816 9223372036854775807
2817 ]
2818})")),
2819 part3_(MakeHeader(config_, R"({
2820 /* 100ms */
2821 "max_out_of_order_duration": 100000000,
2822 "node": {
2823 "name": "pi1"
2824 },
2825 "logger_node": {
2826 "name": "pi1"
2827 },
2828 "monotonic_start_time": 1000000,
2829 "realtime_start_time": 1000000000000,
2830 "logger_monotonic_start_time": 1000000,
2831 "logger_realtime_start_time": 1000000000000,
2832 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2833 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2834 "parts_index": 3,
2835 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2836 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2837 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2838 "boot_uuids": [
2839 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2840 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2841 ""
2842 ],
2843 "oldest_remote_monotonic_timestamps": [
2844 9223372036854775807,
2845 9223372036854775807,
2846 9223372036854775807
2847 ],
2848 "oldest_local_monotonic_timestamps": [
2849 9223372036854775807,
2850 9223372036854775807,
2851 9223372036854775807
2852 ],
2853 "oldest_remote_unreliable_monotonic_timestamps": [
2854 9223372036854775807,
2855 300000,
2856 9223372036854775807
2857 ],
2858 "oldest_local_unreliable_monotonic_timestamps": [
2859 9223372036854775807,
2860 300000,
2861 9223372036854775807
2862 ]
2863})")) {}
2864
2865 protected:
2866 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part0_;
2867 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part1_;
2868 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part2_;
2869 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part3_;
2870};
2871
2872// Tests that if 2 computers go back and forth trying to be the same node, we
2873// die in sorting instead of failing to estimate time.
2874TEST_F(SortingDeathTest, FightingNodes) {
2875 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002876 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh44c61472021-11-22 21:04:10 -08002877 writer0.QueueSpan(part0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002878 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh44c61472021-11-22 21:04:10 -08002879 writer1.QueueSpan(part1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002880 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuh44c61472021-11-22 21:04:10 -08002881 writer2.QueueSpan(part2_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002882 TestDetachedBufferWriter writer3(logfile3_);
Austin Schuh44c61472021-11-22 21:04:10 -08002883 writer3.QueueSpan(part3_.span());
2884 }
2885
2886 EXPECT_DEATH(
2887 {
2888 const std::vector<LogFile> parts =
2889 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2890 },
Austin Schuh22cf7862022-09-19 19:09:42 -07002891 "found overlapping boots on");
Austin Schuh44c61472021-11-22 21:04:10 -08002892}
2893
Brian Smarttea913d42021-12-10 15:02:38 -08002894// Tests that we MessageReader blows up on a bad message.
2895TEST(MessageReaderConfirmCrash, ReadWrite) {
2896 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
2897 unlink(logfile.c_str());
2898
2899 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
2900 JsonToSizedFlatbuffer<LogFileHeader>(
2901 R"({ "max_out_of_order_duration": 100000000 })");
2902 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
2903 JsonToSizedFlatbuffer<MessageHeader>(
2904 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
2905 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
2906 JsonToSizedFlatbuffer<MessageHeader>(
2907 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
2908 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m4 =
2909 JsonToSizedFlatbuffer<MessageHeader>(
2910 R"({ "channel_index": 0, "monotonic_sent_time": 4 })");
2911
2912 // Starts out like a proper flat buffer header, but it breaks down ...
2913 std::vector<uint8_t> garbage{8, 0, 0, 0, 16, 0, 0, 0, 4, 0, 0, 0};
2914 absl::Span<uint8_t> m3_span(garbage);
2915
2916 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002917 TestDetachedBufferWriter writer(logfile);
Brian Smarttea913d42021-12-10 15:02:38 -08002918 writer.QueueSpan(config.span());
2919 writer.QueueSpan(m1.span());
2920 writer.QueueSpan(m2.span());
2921 writer.QueueSpan(m3_span);
2922 writer.QueueSpan(m4.span()); // This message is "hidden"
2923 }
2924
2925 {
2926 MessageReader reader(logfile);
2927
2928 EXPECT_EQ(reader.filename(), logfile);
2929
2930 EXPECT_EQ(
2931 reader.max_out_of_order_duration(),
2932 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2933 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2934 EXPECT_TRUE(reader.ReadMessage());
2935 EXPECT_EQ(reader.newest_timestamp(),
2936 monotonic_clock::time_point(chrono::nanoseconds(1)));
2937 EXPECT_TRUE(reader.ReadMessage());
2938 EXPECT_EQ(reader.newest_timestamp(),
2939 monotonic_clock::time_point(chrono::nanoseconds(2)));
2940 // Confirm default crashing behavior
2941 EXPECT_DEATH(reader.ReadMessage(), "Corrupted message at offset");
2942 }
2943
2944 {
2945 gflags::FlagSaver fs;
2946
2947 MessageReader reader(logfile);
2948 reader.set_crash_on_corrupt_message_flag(false);
2949
2950 EXPECT_EQ(reader.filename(), logfile);
2951
2952 EXPECT_EQ(
2953 reader.max_out_of_order_duration(),
2954 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2955 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2956 EXPECT_TRUE(reader.ReadMessage());
2957 EXPECT_EQ(reader.newest_timestamp(),
2958 monotonic_clock::time_point(chrono::nanoseconds(1)));
2959 EXPECT_TRUE(reader.ReadMessage());
2960 EXPECT_EQ(reader.newest_timestamp(),
2961 monotonic_clock::time_point(chrono::nanoseconds(2)));
2962 // Confirm avoiding the corrupted message crash, stopping instead.
2963 EXPECT_FALSE(reader.ReadMessage());
2964 }
2965
2966 {
2967 gflags::FlagSaver fs;
2968
2969 MessageReader reader(logfile);
2970 reader.set_crash_on_corrupt_message_flag(false);
2971 reader.set_ignore_corrupt_messages_flag(true);
2972
2973 EXPECT_EQ(reader.filename(), logfile);
2974
2975 EXPECT_EQ(
2976 reader.max_out_of_order_duration(),
2977 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2978 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2979 EXPECT_TRUE(reader.ReadMessage());
2980 EXPECT_EQ(reader.newest_timestamp(),
2981 monotonic_clock::time_point(chrono::nanoseconds(1)));
2982 EXPECT_TRUE(reader.ReadMessage());
2983 EXPECT_EQ(reader.newest_timestamp(),
2984 monotonic_clock::time_point(chrono::nanoseconds(2)));
2985 // Confirm skipping of the corrupted message to read the hidden one.
2986 EXPECT_TRUE(reader.ReadMessage());
2987 EXPECT_EQ(reader.newest_timestamp(),
2988 monotonic_clock::time_point(chrono::nanoseconds(4)));
2989 EXPECT_FALSE(reader.ReadMessage());
2990 }
2991}
2992
Austin Schuhfa30c352022-10-16 11:12:02 -07002993class InlinePackMessage : public ::testing::Test {
2994 protected:
2995 aos::Context RandomContext() {
2996 data_ = RandomData();
2997 std::uniform_int_distribution<uint32_t> uint32_distribution(
2998 std::numeric_limits<uint32_t>::min(),
2999 std::numeric_limits<uint32_t>::max());
3000
3001 std::uniform_int_distribution<int64_t> time_distribution(
3002 std::numeric_limits<int64_t>::min(),
3003 std::numeric_limits<int64_t>::max());
3004
3005 aos::Context context;
3006 context.monotonic_event_time =
3007 aos::monotonic_clock::epoch() +
3008 chrono::nanoseconds(time_distribution(random_number_generator_));
3009 context.realtime_event_time =
3010 aos::realtime_clock::epoch() +
3011 chrono::nanoseconds(time_distribution(random_number_generator_));
3012
3013 context.monotonic_remote_time =
3014 aos::monotonic_clock::epoch() +
3015 chrono::nanoseconds(time_distribution(random_number_generator_));
3016 context.realtime_remote_time =
3017 aos::realtime_clock::epoch() +
3018 chrono::nanoseconds(time_distribution(random_number_generator_));
3019
3020 context.queue_index = uint32_distribution(random_number_generator_);
3021 context.remote_queue_index = uint32_distribution(random_number_generator_);
3022 context.size = data_.size();
3023 context.data = data_.data();
3024 return context;
3025 }
3026
Austin Schuhf2d0e682022-10-16 14:20:58 -07003027 aos::monotonic_clock::time_point RandomMonotonic() {
3028 std::uniform_int_distribution<int64_t> time_distribution(
3029 0, std::numeric_limits<int64_t>::max());
3030 return aos::monotonic_clock::epoch() +
3031 chrono::nanoseconds(time_distribution(random_number_generator_));
3032 }
3033
3034 aos::SizePrefixedFlatbufferDetachedBuffer<message_bridge::RemoteMessage>
3035 RandomRemoteMessage() {
3036 std::uniform_int_distribution<uint8_t> uint8_distribution(
3037 std::numeric_limits<uint8_t>::min(),
3038 std::numeric_limits<uint8_t>::max());
3039
3040 std::uniform_int_distribution<int64_t> time_distribution(
3041 std::numeric_limits<int64_t>::min(),
3042 std::numeric_limits<int64_t>::max());
3043
3044 flatbuffers::FlatBufferBuilder fbb;
3045 message_bridge::RemoteMessage::Builder builder(fbb);
3046 builder.add_queue_index(uint8_distribution(random_number_generator_));
3047
3048 builder.add_monotonic_sent_time(
3049 time_distribution(random_number_generator_));
3050 builder.add_realtime_sent_time(time_distribution(random_number_generator_));
3051 builder.add_monotonic_remote_time(
3052 time_distribution(random_number_generator_));
3053 builder.add_realtime_remote_time(
3054 time_distribution(random_number_generator_));
3055
3056 builder.add_remote_queue_index(
3057 uint8_distribution(random_number_generator_));
3058
3059 fbb.FinishSizePrefixed(builder.Finish());
3060 return fbb.Release();
3061 }
3062
Austin Schuhfa30c352022-10-16 11:12:02 -07003063 std::vector<uint8_t> RandomData() {
3064 std::vector<uint8_t> result;
3065 std::uniform_int_distribution<int> length_distribution(1, 32);
3066 std::uniform_int_distribution<uint8_t> data_distribution(
3067 std::numeric_limits<uint8_t>::min(),
3068 std::numeric_limits<uint8_t>::max());
3069
3070 const size_t length = length_distribution(random_number_generator_);
3071
3072 result.reserve(length);
3073 for (size_t i = 0; i < length; ++i) {
3074 result.emplace_back(data_distribution(random_number_generator_));
3075 }
3076 return result;
3077 }
3078
3079 std::mt19937 random_number_generator_{
3080 std::mt19937(::aos::testing::RandomSeed())};
3081
3082 std::vector<uint8_t> data_;
3083};
3084
3085// Uses the binary schema to annotate a provided flatbuffer. Returns the
3086// annotated flatbuffer.
3087std::string AnnotateBinaries(
3088 const aos::NonSizePrefixedFlatbuffer<reflection::Schema> &schema,
3089 const std::string &schema_filename,
3090 flatbuffers::span<uint8_t> binary_data) {
3091 flatbuffers::BinaryAnnotator binary_annotator(
3092 schema.span().data(), schema.span().size(), binary_data.data(),
3093 binary_data.size());
3094
3095 auto annotations = binary_annotator.Annotate();
3096
3097 flatbuffers::AnnotatedBinaryTextGenerator text_generator(
3098 flatbuffers::AnnotatedBinaryTextGenerator::Options{}, annotations,
3099 binary_data.data(), binary_data.size());
3100
3101 text_generator.Generate(aos::testing::TestTmpDir() + "/foo.bfbs",
3102 schema_filename);
3103
3104 return aos::util::ReadFileToStringOrDie(aos::testing::TestTmpDir() +
3105 "/foo.afb");
3106}
3107
Austin Schuh71a40d42023-02-04 21:22:22 -08003108// Event loop which just has working time functions for the Copier classes
3109// tested below.
3110class TimeEventLoop : public EventLoop {
3111 public:
3112 TimeEventLoop() : EventLoop(nullptr) {}
3113
3114 aos::monotonic_clock::time_point monotonic_now() const final {
3115 return aos::monotonic_clock::min_time;
3116 }
3117 realtime_clock::time_point realtime_now() const final {
3118 return aos::realtime_clock::min_time;
3119 }
3120
3121 void OnRun(::std::function<void()> /*on_run*/) final { LOG(FATAL); }
3122
3123 const std::string_view name() const final { return "time"; }
3124 const Node *node() const final { return nullptr; }
3125
3126 void SetRuntimeAffinity(const cpu_set_t & /*cpuset*/) final { LOG(FATAL); }
3127 void SetRuntimeRealtimePriority(int /*priority*/) final { LOG(FATAL); }
3128
3129 const cpu_set_t &runtime_affinity() const final {
3130 LOG(FATAL);
3131 return cpuset_;
3132 }
3133
3134 TimerHandler *AddTimer(::std::function<void()> /*callback*/) final {
3135 LOG(FATAL);
3136 return nullptr;
3137 }
3138
3139 std::unique_ptr<RawSender> MakeRawSender(const Channel * /*channel*/) final {
3140 LOG(FATAL);
3141 return std::unique_ptr<RawSender>();
3142 }
3143
3144 const UUID &boot_uuid() const final {
3145 LOG(FATAL);
3146 return boot_uuid_;
3147 }
3148
3149 void set_name(const std::string_view name) final { LOG(FATAL) << name; }
3150
3151 pid_t GetTid() final {
3152 LOG(FATAL);
3153 return 0;
3154 }
3155
3156 int NumberBuffers(const Channel * /*channel*/) final {
3157 LOG(FATAL);
3158 return 0;
3159 }
3160
3161 int runtime_realtime_priority() const final {
3162 LOG(FATAL);
3163 return 0;
3164 }
3165
3166 std::unique_ptr<RawFetcher> MakeRawFetcher(
3167 const Channel * /*channel*/) final {
3168 LOG(FATAL);
3169 return std::unique_ptr<RawFetcher>();
3170 }
3171
3172 PhasedLoopHandler *AddPhasedLoop(
3173 ::std::function<void(int)> /*callback*/,
3174 const monotonic_clock::duration /*interval*/,
3175 const monotonic_clock::duration /*offset*/) final {
3176 LOG(FATAL);
3177 return nullptr;
3178 }
3179
3180 void MakeRawWatcher(
3181 const Channel * /*channel*/,
3182 std::function<void(const Context &context, const void *message)>
3183 /*watcher*/) final {
3184 LOG(FATAL);
3185 }
3186
3187 private:
3188 const cpu_set_t cpuset_ = DefaultAffinity();
3189 UUID boot_uuid_ = UUID ::Zero();
3190};
3191
Austin Schuhfa30c352022-10-16 11:12:02 -07003192// Tests that all variations of PackMessage are equivalent to the inline
3193// PackMessage used to avoid allocations.
3194TEST_F(InlinePackMessage, Equivilent) {
3195 std::uniform_int_distribution<uint32_t> uint32_distribution(
3196 std::numeric_limits<uint32_t>::min(),
3197 std::numeric_limits<uint32_t>::max());
3198 aos::FlatbufferVector<reflection::Schema> schema =
3199 FileToFlatbuffer<reflection::Schema>(
3200 ArtifactPath("aos/events/logging/logger.bfbs"));
3201
3202 for (const LogType type :
3203 {LogType::kLogMessage, LogType::kLogDeliveryTimeOnly,
3204 LogType::kLogMessageAndDeliveryTime, LogType::kLogRemoteMessage}) {
3205 for (int i = 0; i < 100; ++i) {
3206 aos::Context context = RandomContext();
3207 const uint32_t channel_index =
3208 uint32_distribution(random_number_generator_);
3209
3210 flatbuffers::FlatBufferBuilder fbb;
3211 fbb.ForceDefaults(true);
3212 fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index, type));
3213
3214 VLOG(1) << absl::BytesToHexString(std::string_view(
3215 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3216 fbb.GetBufferSpan().size()));
3217
3218 // Make sure that both the builder and inline method agree on sizes.
Austin Schuh48d10d62022-10-16 22:19:23 -07003219 ASSERT_EQ(fbb.GetSize(), PackMessageSize(type, context.size))
Austin Schuhfa30c352022-10-16 11:12:02 -07003220 << "log type " << static_cast<int>(type);
3221
3222 // Initialize the buffer to something nonzero to make sure all the padding
3223 // bytes are set to 0.
Austin Schuh48d10d62022-10-16 22:19:23 -07003224 std::vector<uint8_t> repacked_message(PackMessageSize(type, context.size),
3225 67);
Austin Schuhfa30c352022-10-16 11:12:02 -07003226
3227 // And verify packing inline works as expected.
Austin Schuh71a40d42023-02-04 21:22:22 -08003228 EXPECT_EQ(
3229 repacked_message.size(),
3230 PackMessageInline(repacked_message.data(), context, channel_index,
3231 type, 0u, repacked_message.size()));
Austin Schuhfa30c352022-10-16 11:12:02 -07003232 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
3233 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3234 fbb.GetBufferSpan().size()))
3235 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
3236 fbb.GetBufferSpan());
Austin Schuh71a40d42023-02-04 21:22:22 -08003237
3238 // Ok, now we want to confirm that we can build up arbitrary pieces of
3239 // said flatbuffer. Try all of them since it is cheap.
3240 TimeEventLoop event_loop;
3241 for (size_t i = 0; i < repacked_message.size(); i += 8) {
3242 for (size_t j = i; j < repacked_message.size(); j += 8) {
3243 std::vector<uint8_t> destination(repacked_message.size(), 67u);
3244 ContextDataCopier copier(context, channel_index, type, &event_loop);
3245
3246 copier.Copy(destination.data(), i, j);
3247
3248 size_t index = 0;
3249 for (size_t k = i; k < j; ++k) {
3250 ASSERT_EQ(destination[index], repacked_message[k])
3251 << ": Failed to match type " << static_cast<int>(type)
3252 << ", index " << index << " while testing range " << i << " to "
3253 << j;
3254 ;
3255 ++index;
3256 }
3257 // Now, confirm that none of the other bytes have been touched.
3258 for (; index < destination.size(); ++index) {
3259 ASSERT_EQ(destination[index], 67u);
3260 }
3261 }
3262 }
Austin Schuhfa30c352022-10-16 11:12:02 -07003263 }
3264 }
3265}
3266
Austin Schuhf2d0e682022-10-16 14:20:58 -07003267// Tests that all variations of PackMessage are equivilent to the inline
3268// PackMessage used to avoid allocations.
3269TEST_F(InlinePackMessage, RemoteEquivilent) {
3270 aos::FlatbufferVector<reflection::Schema> schema =
3271 FileToFlatbuffer<reflection::Schema>(
3272 ArtifactPath("aos/events/logging/logger.bfbs"));
3273 std::uniform_int_distribution<uint8_t> uint8_distribution(
3274 std::numeric_limits<uint8_t>::min(), std::numeric_limits<uint8_t>::max());
3275
3276 for (int i = 0; i < 100; ++i) {
3277 aos::SizePrefixedFlatbufferDetachedBuffer<RemoteMessage> random_msg =
3278 RandomRemoteMessage();
3279 const size_t channel_index = uint8_distribution(random_number_generator_);
3280 const monotonic_clock::time_point monotonic_timestamp_time =
3281 RandomMonotonic();
3282
3283 flatbuffers::FlatBufferBuilder fbb;
3284 fbb.ForceDefaults(true);
3285 fbb.FinishSizePrefixed(PackRemoteMessage(
3286 &fbb, &random_msg.message(), channel_index, monotonic_timestamp_time));
3287
3288 VLOG(1) << absl::BytesToHexString(std::string_view(
3289 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3290 fbb.GetBufferSpan().size()));
3291
3292 // Make sure that both the builder and inline method agree on sizes.
3293 ASSERT_EQ(fbb.GetSize(), PackRemoteMessageSize());
3294
3295 // Initialize the buffer to something nonzer to make sure all the padding
3296 // bytes are set to 0.
3297 std::vector<uint8_t> repacked_message(PackRemoteMessageSize(), 67);
3298
3299 // And verify packing inline works as expected.
Austin Schuh71a40d42023-02-04 21:22:22 -08003300 EXPECT_EQ(repacked_message.size(),
3301 PackRemoteMessageInline(
3302 repacked_message.data(), &random_msg.message(), channel_index,
3303 monotonic_timestamp_time, 0u, repacked_message.size()));
Austin Schuhf2d0e682022-10-16 14:20:58 -07003304 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
3305 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3306 fbb.GetBufferSpan().size()))
3307 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
3308 fbb.GetBufferSpan());
Austin Schuh71a40d42023-02-04 21:22:22 -08003309
3310 // Ok, now we want to confirm that we can build up arbitrary pieces of said
3311 // flatbuffer. Try all of them since it is cheap.
3312 TimeEventLoop event_loop;
3313 for (size_t i = 0; i < repacked_message.size(); i += 8) {
3314 for (size_t j = i; j < repacked_message.size(); j += 8) {
3315 std::vector<uint8_t> destination(repacked_message.size(), 67u);
3316 RemoteMessageCopier copier(&random_msg.message(), channel_index,
3317 monotonic_timestamp_time, &event_loop);
3318
3319 copier.Copy(destination.data(), i, j);
3320
3321 size_t index = 0;
3322 for (size_t k = i; k < j; ++k) {
3323 ASSERT_EQ(destination[index], repacked_message[k]);
3324 ++index;
3325 }
3326 for (; index < destination.size(); ++index) {
3327 ASSERT_EQ(destination[index], 67u);
3328 }
3329 }
3330 }
Austin Schuhf2d0e682022-10-16 14:20:58 -07003331 }
3332}
Austin Schuhfa30c352022-10-16 11:12:02 -07003333
Austin Schuhc243b422020-10-11 15:35:08 -07003334} // namespace testing
3335} // namespace logger
3336} // namespace aos