blob: 8d5b0fb8ca953dd229925587704256f6a556305c [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
Alexei Strots01395492023-03-20 13:59:56 -07004#include <filesystem>
Austin Schuhfa30c352022-10-16 11:12:02 -07005#include <random>
Austin Schuhe243aaf2020-10-11 15:46:02 -07006#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07007
Austin Schuhfa30c352022-10-16 11:12:02 -07008#include "absl/strings/escaping.h"
Austin Schuhc41603c2020-10-11 16:17:37 -07009#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -070010#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -080011#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070012#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070013#include "aos/json_to_flatbuffer.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070014#include "aos/testing/path.h"
15#include "aos/testing/random_seed.h"
Austin Schuhc243b422020-10-11 15:35:08 -070016#include "aos/testing/tmpdir.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070017#include "aos/util/file.h"
18#include "external/com_github_google_flatbuffers/src/annotated_binary_text_gen.h"
19#include "external/com_github_google_flatbuffers/src/binary_annotator.h"
20#include "flatbuffers/reflection_generated.h"
Brian Smarttea913d42021-12-10 15:02:38 -080021#include "gflags/gflags.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070022#include "gtest/gtest.h"
Austin Schuhc243b422020-10-11 15:35:08 -070023
24namespace aos {
25namespace logger {
26namespace testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070027namespace chrono = std::chrono;
Austin Schuhf2d0e682022-10-16 14:20:58 -070028using aos::message_bridge::RemoteMessage;
Eric Schmiedebergb38477e2022-12-02 16:08:04 -070029using aos::testing::ArtifactPath;
Austin Schuhc243b422020-10-11 15:35:08 -070030
Austin Schuhd863e6e2022-10-16 15:44:50 -070031// Adapter class to make it easy to test DetachedBufferWriter without adding
32// test only boilerplate to DetachedBufferWriter.
Alexei Strots15c22b12023-04-04 16:27:17 -070033class TestDetachedBufferWriter : public FileBackend,
34 public DetachedBufferWriter {
Austin Schuhd863e6e2022-10-16 15:44:50 -070035 public:
Austin Schuh48d10d62022-10-16 22:19:23 -070036 // Pick a max size that is rather conservative.
37 static constexpr size_t kMaxMessageSize = 128 * 1024;
Austin Schuhd863e6e2022-10-16 15:44:50 -070038 TestDetachedBufferWriter(std::string_view filename)
Alexei Strots15c22b12023-04-04 16:27:17 -070039 : FileBackend("/"),
40 DetachedBufferWriter(FileBackend::RequestFile(filename),
41 std::make_unique<DummyEncoder>(kMaxMessageSize)) {}
Austin Schuhd863e6e2022-10-16 15:44:50 -070042 void WriteSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
43 QueueSpan(absl::Span<const uint8_t>(buffer.data(), buffer.size()));
44 }
Austin Schuh7ef11a42023-02-04 17:15:12 -080045 void QueueSpan(absl::Span<const uint8_t> buffer) {
46 DataEncoder::SpanCopier coppier(buffer);
47 CopyMessage(&coppier, monotonic_clock::now());
48 }
Austin Schuhd863e6e2022-10-16 15:44:50 -070049};
50
Austin Schuhe243aaf2020-10-11 15:46:02 -070051// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070052template <typename T>
53SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
54 const std::string_view data) {
55 flatbuffers::FlatBufferBuilder fbb;
56 fbb.ForceDefaults(true);
57 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
58 return fbb.Release();
59}
60
Austin Schuhe243aaf2020-10-11 15:46:02 -070061// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070062TEST(SpanReaderTest, ReadWrite) {
63 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
64 unlink(logfile.c_str());
65
66 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080067 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070068 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080069 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070070
71 {
Austin Schuhd863e6e2022-10-16 15:44:50 -070072 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080073 writer.QueueSpan(m1.span());
74 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070075 }
76
77 SpanReader reader(logfile);
78
79 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhcf5f6442021-07-06 10:43:28 -070080 EXPECT_EQ(reader.PeekMessage(), m1.span());
81 EXPECT_EQ(reader.PeekMessage(), m1.span());
Austin Schuhadd6eb32020-11-09 21:24:26 -080082 EXPECT_EQ(reader.ReadMessage(), m1.span());
83 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhcf5f6442021-07-06 10:43:28 -070084 EXPECT_EQ(reader.PeekMessage(), absl::Span<const uint8_t>());
Austin Schuhc243b422020-10-11 15:35:08 -070085 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
86}
87
Austin Schuhe243aaf2020-10-11 15:46:02 -070088// Tests that we can actually parse the resulting messages at a basic level
89// through MessageReader.
90TEST(MessageReaderTest, ReadWrite) {
91 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
92 unlink(logfile.c_str());
93
94 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
95 JsonToSizedFlatbuffer<LogFileHeader>(
96 R"({ "max_out_of_order_duration": 100000000 })");
97 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
98 JsonToSizedFlatbuffer<MessageHeader>(
99 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
100 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
101 JsonToSizedFlatbuffer<MessageHeader>(
102 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
103
104 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700105 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800106 writer.QueueSpan(config.span());
107 writer.QueueSpan(m1.span());
108 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -0700109 }
110
111 MessageReader reader(logfile);
112
113 EXPECT_EQ(reader.filename(), logfile);
114
115 EXPECT_EQ(
116 reader.max_out_of_order_duration(),
117 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
118 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
119 EXPECT_TRUE(reader.ReadMessage());
120 EXPECT_EQ(reader.newest_timestamp(),
121 monotonic_clock::time_point(chrono::nanoseconds(1)));
122 EXPECT_TRUE(reader.ReadMessage());
123 EXPECT_EQ(reader.newest_timestamp(),
124 monotonic_clock::time_point(chrono::nanoseconds(2)));
125 EXPECT_FALSE(reader.ReadMessage());
126}
127
Austin Schuh32f68492020-11-08 21:45:51 -0800128// Tests that we explode when messages are too far out of order.
129TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
130 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
131 unlink(logfile0.c_str());
132
133 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
134 JsonToSizedFlatbuffer<LogFileHeader>(
135 R"({
136 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800137 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800138 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
139 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
140 "parts_index": 0
141})");
142
143 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
144 JsonToSizedFlatbuffer<MessageHeader>(
145 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
146 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
147 JsonToSizedFlatbuffer<MessageHeader>(
148 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
149 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
150 JsonToSizedFlatbuffer<MessageHeader>(
151 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
152
153 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700154 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800155 writer.QueueSpan(config0.span());
156 writer.QueueSpan(m1.span());
157 writer.QueueSpan(m2.span());
158 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800159 }
Alexei Strots01395492023-03-20 13:59:56 -0700160 ASSERT_TRUE(std::filesystem::exists(logfile0)) << logfile0;
Austin Schuh32f68492020-11-08 21:45:51 -0800161
162 const std::vector<LogFile> parts = SortParts({logfile0});
163
164 PartsMessageReader reader(parts[0].parts[0]);
165
166 EXPECT_TRUE(reader.ReadMessage());
167 EXPECT_TRUE(reader.ReadMessage());
168 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
169}
170
Austin Schuhc41603c2020-10-11 16:17:37 -0700171// Tests that we can transparently re-assemble part files with a
172// PartsMessageReader.
173TEST(PartsMessageReaderTest, ReadWrite) {
174 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
175 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
176 unlink(logfile0.c_str());
177 unlink(logfile1.c_str());
178
179 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
180 JsonToSizedFlatbuffer<LogFileHeader>(
181 R"({
182 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800183 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700184 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
185 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
186 "parts_index": 0
187})");
188 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
189 JsonToSizedFlatbuffer<LogFileHeader>(
190 R"({
191 "max_out_of_order_duration": 200000000,
192 "monotonic_start_time": 0,
193 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800194 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700195 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
196 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
197 "parts_index": 1
198})");
199
200 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
201 JsonToSizedFlatbuffer<MessageHeader>(
202 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
203 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
204 JsonToSizedFlatbuffer<MessageHeader>(
205 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
206
207 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700208 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800209 writer.QueueSpan(config0.span());
210 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700211 }
212 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700213 TestDetachedBufferWriter writer(logfile1);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800214 writer.QueueSpan(config1.span());
215 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700216 }
217
218 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
219
220 PartsMessageReader reader(parts[0].parts[0]);
221
222 EXPECT_EQ(reader.filename(), logfile0);
223
224 // Confirm that the timestamps track, and the filename also updates.
225 // Read the first message.
226 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
227 EXPECT_EQ(
228 reader.max_out_of_order_duration(),
229 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
230 EXPECT_TRUE(reader.ReadMessage());
231 EXPECT_EQ(reader.filename(), logfile0);
232 EXPECT_EQ(reader.newest_timestamp(),
233 monotonic_clock::time_point(chrono::nanoseconds(1)));
234 EXPECT_EQ(
235 reader.max_out_of_order_duration(),
236 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
237
238 // Read the second message.
239 EXPECT_TRUE(reader.ReadMessage());
240 EXPECT_EQ(reader.filename(), logfile1);
241 EXPECT_EQ(reader.newest_timestamp(),
242 monotonic_clock::time_point(chrono::nanoseconds(2)));
243 EXPECT_EQ(
244 reader.max_out_of_order_duration(),
245 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
246
247 // And then confirm that reading again returns no message.
248 EXPECT_FALSE(reader.ReadMessage());
249 EXPECT_EQ(reader.filename(), logfile1);
250 EXPECT_EQ(
251 reader.max_out_of_order_duration(),
252 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800253 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Austin Schuhc41603c2020-10-11 16:17:37 -0700254}
Austin Schuh32f68492020-11-08 21:45:51 -0800255
Austin Schuh1be0ce42020-11-29 22:43:26 -0800256// Tests that Message's operator < works as expected.
257TEST(MessageTest, Sorting) {
258 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
259
260 Message m1{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700261 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700262 .timestamp =
263 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
Austin Schuh48507722021-07-17 17:29:24 -0700264 .monotonic_remote_boot = 0xffffff,
265 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700266 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800267 Message m2{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700268 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700269 .timestamp =
270 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
Austin Schuh48507722021-07-17 17:29:24 -0700271 .monotonic_remote_boot = 0xffffff,
272 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700273 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800274
275 EXPECT_LT(m1, m2);
276 EXPECT_GE(m2, m1);
277
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700278 m1.timestamp.time = e;
279 m2.timestamp.time = e;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800280
281 m1.channel_index = 1;
282 m2.channel_index = 2;
283
284 EXPECT_LT(m1, m2);
285 EXPECT_GE(m2, m1);
286
287 m1.channel_index = 0;
288 m2.channel_index = 0;
Austin Schuh58646e22021-08-23 23:51:46 -0700289 m1.queue_index.index = 0u;
290 m2.queue_index.index = 1u;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800291
292 EXPECT_LT(m1, m2);
293 EXPECT_GE(m2, m1);
294}
295
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800296aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
297 const aos::FlatbufferDetachedBuffer<Configuration> &config,
298 const std::string_view json) {
299 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700300 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800301 flatbuffers::Offset<Configuration> config_offset =
302 aos::CopyFlatBuffer(config, &fbb);
303 LogFileHeader::Builder header_builder(fbb);
304 header_builder.add_configuration(config_offset);
305 fbb.Finish(header_builder.Finish());
306 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
307
308 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
309 JsonToFlatbuffer<LogFileHeader>(json));
310 CHECK(header_updates.Verify());
311 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700312 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800313 fbb2.FinishSizePrefixed(
314 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
315 return fbb2.Release();
316}
317
318class SortingElementTest : public ::testing::Test {
319 public:
320 SortingElementTest()
321 : config_(JsonToFlatbuffer<Configuration>(
322 R"({
323 "channels": [
324 {
325 "name": "/a",
326 "type": "aos.logger.testing.TestMessage",
327 "source_node": "pi1",
328 "destination_nodes": [
329 {
330 "name": "pi2"
331 },
332 {
333 "name": "pi3"
334 }
335 ]
336 },
337 {
338 "name": "/b",
339 "type": "aos.logger.testing.TestMessage",
340 "source_node": "pi1"
341 },
342 {
343 "name": "/c",
344 "type": "aos.logger.testing.TestMessage",
345 "source_node": "pi1"
Austin Schuh48507722021-07-17 17:29:24 -0700346 },
347 {
348 "name": "/d",
349 "type": "aos.logger.testing.TestMessage",
350 "source_node": "pi2",
351 "destination_nodes": [
352 {
353 "name": "pi1"
354 }
355 ]
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800356 }
357 ],
358 "nodes": [
359 {
360 "name": "pi1"
361 },
362 {
363 "name": "pi2"
364 },
365 {
366 "name": "pi3"
367 }
368 ]
369}
370)")),
371 config0_(MakeHeader(config_, R"({
372 /* 100ms */
373 "max_out_of_order_duration": 100000000,
374 "node": {
375 "name": "pi1"
376 },
377 "logger_node": {
378 "name": "pi1"
379 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800380 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800381 "realtime_start_time": 1000000000000,
382 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700383 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
384 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
385 "boot_uuids": [
386 "1d782c63-b3c7-466e-bea9-a01308b43333",
387 "",
388 ""
389 ],
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800390 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
391 "parts_index": 0
392})")),
393 config1_(MakeHeader(config_,
394 R"({
395 /* 100ms */
396 "max_out_of_order_duration": 100000000,
397 "node": {
398 "name": "pi1"
399 },
400 "logger_node": {
401 "name": "pi1"
402 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800403 "monotonic_start_time": 1000000,
404 "realtime_start_time": 1000000000000,
405 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700406 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
407 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
408 "boot_uuids": [
409 "1d782c63-b3c7-466e-bea9-a01308b43333",
410 "",
411 ""
412 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800413 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
414 "parts_index": 0
415})")),
416 config2_(MakeHeader(config_,
417 R"({
418 /* 100ms */
419 "max_out_of_order_duration": 100000000,
420 "node": {
421 "name": "pi2"
422 },
423 "logger_node": {
424 "name": "pi2"
425 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800426 "monotonic_start_time": 0,
427 "realtime_start_time": 1000000000000,
Austin Schuh48507722021-07-17 17:29:24 -0700428 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
429 "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
430 "boot_uuids": [
431 "",
432 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
433 ""
434 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800435 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
436 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
437 "parts_index": 0
438})")),
439 config3_(MakeHeader(config_,
440 R"({
441 /* 100ms */
442 "max_out_of_order_duration": 100000000,
443 "node": {
444 "name": "pi1"
445 },
446 "logger_node": {
447 "name": "pi1"
448 },
449 "monotonic_start_time": 2000000,
450 "realtime_start_time": 1000000000,
451 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
Austin Schuh48507722021-07-17 17:29:24 -0700452 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
453 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
454 "boot_uuids": [
455 "1d782c63-b3c7-466e-bea9-a01308b43333",
456 "",
457 ""
458 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800459 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800460 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800461})")),
462 config4_(MakeHeader(config_,
463 R"({
464 /* 100ms */
465 "max_out_of_order_duration": 100000000,
466 "node": {
467 "name": "pi2"
468 },
469 "logger_node": {
470 "name": "pi1"
471 },
472 "monotonic_start_time": 2000000,
473 "realtime_start_time": 1000000000,
474 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
475 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
Austin Schuh48507722021-07-17 17:29:24 -0700476 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
477 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
478 "boot_uuids": [
479 "1d782c63-b3c7-466e-bea9-a01308b43333",
480 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
481 ""
482 ],
Austin Schuh8bf1e632021-01-02 22:41:04 -0800483 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800484})")) {
485 unlink(logfile0_.c_str());
486 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800487 unlink(logfile2_.c_str());
Austin Schuh48507722021-07-17 17:29:24 -0700488 unlink(logfile3_.c_str());
Austin Schuh921b2562021-07-31 19:37:03 -0700489 queue_index_.resize(config_.message().channels()->size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800490 }
491
492 protected:
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800493 flatbuffers::DetachedBuffer MakeLogMessage(
494 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
495 int value) {
496 flatbuffers::FlatBufferBuilder message_fbb;
497 message_fbb.ForceDefaults(true);
498 TestMessage::Builder test_message_builder(message_fbb);
499 test_message_builder.add_value(value);
500 message_fbb.Finish(test_message_builder.Finish());
501
502 aos::Context context;
503 context.monotonic_event_time = monotonic_now;
504 context.realtime_event_time = aos::realtime_clock::epoch() +
505 chrono::seconds(1000) +
506 monotonic_now.time_since_epoch();
Austin Schuh921b2562021-07-31 19:37:03 -0700507 CHECK_LT(static_cast<size_t>(channel_index), queue_index_.size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800508 context.queue_index = queue_index_[channel_index];
509 context.size = message_fbb.GetSize();
510 context.data = message_fbb.GetBufferPointer();
511
512 ++queue_index_[channel_index];
513
514 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh53e3a812021-11-03 16:57:23 -0700515 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800516 fbb.FinishSizePrefixed(
517 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
518
519 return fbb.Release();
520 }
521
522 flatbuffers::DetachedBuffer MakeTimestampMessage(
523 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800524 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
525 monotonic_clock::time_point monotonic_timestamp_time =
526 monotonic_clock::min_time) {
527 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800528 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800529
530 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800531 fbb.ForceDefaults(true);
532
533 logger::MessageHeader::Builder message_header_builder(fbb);
534
535 message_header_builder.add_channel_index(channel_index);
536
537 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
538 100);
539 message_header_builder.add_monotonic_sent_time(
540 monotonic_sent_time.time_since_epoch().count());
541 message_header_builder.add_realtime_sent_time(
542 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
543 monotonic_sent_time.time_since_epoch())
544 .time_since_epoch()
545 .count());
546
547 message_header_builder.add_monotonic_remote_time(
548 sender_monotonic_now.time_since_epoch().count());
549 message_header_builder.add_realtime_remote_time(
550 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
551 sender_monotonic_now.time_since_epoch())
552 .time_since_epoch()
553 .count());
554 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
555 1);
556
557 if (monotonic_timestamp_time != monotonic_clock::min_time) {
558 message_header_builder.add_monotonic_timestamp_time(
559 monotonic_timestamp_time.time_since_epoch().count());
560 }
561
562 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800563 LOG(INFO) << aos::FlatbufferToJson(
564 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
565 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
566
567 return fbb.Release();
568 }
569
570 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
571 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800572 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh48507722021-07-17 17:29:24 -0700573 const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800574
575 const aos::FlatbufferDetachedBuffer<Configuration> config_;
576 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
577 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800578 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
579 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800580 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800581
582 std::vector<uint32_t> queue_index_;
583};
584
585using LogPartsSorterTest = SortingElementTest;
586using LogPartsSorterDeathTest = LogPartsSorterTest;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800587using NodeMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800588using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800589
590// Tests that we can pull messages out of a log sorted in order.
591TEST_F(LogPartsSorterTest, Pull) {
592 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
593 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700594 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800595 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700596 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800597 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700598 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800599 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700600 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800601 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700602 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800603 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
604 }
605
606 const std::vector<LogFile> parts = SortParts({logfile0_});
607
608 LogPartsSorter parts_sorter(parts[0].parts[0]);
609
610 // Confirm we aren't sorted until any time until the message is popped.
611 // Peeking shouldn't change the sorted until time.
612 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
613
614 std::deque<Message> output;
615
616 ASSERT_TRUE(parts_sorter.Front() != nullptr);
617 output.emplace_back(std::move(*parts_sorter.Front()));
618 parts_sorter.PopFront();
619 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
620
621 ASSERT_TRUE(parts_sorter.Front() != nullptr);
622 output.emplace_back(std::move(*parts_sorter.Front()));
623 parts_sorter.PopFront();
624 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
625
626 ASSERT_TRUE(parts_sorter.Front() != nullptr);
627 output.emplace_back(std::move(*parts_sorter.Front()));
628 parts_sorter.PopFront();
629 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
630
631 ASSERT_TRUE(parts_sorter.Front() != nullptr);
632 output.emplace_back(std::move(*parts_sorter.Front()));
633 parts_sorter.PopFront();
634 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
635
636 ASSERT_TRUE(parts_sorter.Front() == nullptr);
637
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700638 EXPECT_EQ(output[0].timestamp.boot, 0);
639 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
640 EXPECT_EQ(output[1].timestamp.boot, 0);
641 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1000));
642 EXPECT_EQ(output[2].timestamp.boot, 0);
643 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1901));
644 EXPECT_EQ(output[3].timestamp.boot, 0);
645 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800646}
647
Austin Schuhb000de62020-12-03 22:00:40 -0800648// Tests that we can pull messages out of a log sorted in order.
649TEST_F(LogPartsSorterTest, WayBeforeStart) {
650 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
651 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700652 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhb000de62020-12-03 22:00:40 -0800653 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700654 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800655 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700656 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800657 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700658 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800659 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700660 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800661 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700662 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800663 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
664 }
665
666 const std::vector<LogFile> parts = SortParts({logfile0_});
667
668 LogPartsSorter parts_sorter(parts[0].parts[0]);
669
670 // Confirm we aren't sorted until any time until the message is popped.
671 // Peeking shouldn't change the sorted until time.
672 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
673
674 std::deque<Message> output;
675
676 for (monotonic_clock::time_point t :
677 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
678 e + chrono::milliseconds(1900), monotonic_clock::max_time,
679 monotonic_clock::max_time}) {
680 ASSERT_TRUE(parts_sorter.Front() != nullptr);
681 output.emplace_back(std::move(*parts_sorter.Front()));
682 parts_sorter.PopFront();
683 EXPECT_EQ(parts_sorter.sorted_until(), t);
684 }
685
686 ASSERT_TRUE(parts_sorter.Front() == nullptr);
687
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700688 EXPECT_EQ(output[0].timestamp.boot, 0u);
689 EXPECT_EQ(output[0].timestamp.time, e - chrono::milliseconds(1000));
690 EXPECT_EQ(output[1].timestamp.boot, 0u);
691 EXPECT_EQ(output[1].timestamp.time, e - chrono::milliseconds(500));
692 EXPECT_EQ(output[2].timestamp.boot, 0u);
693 EXPECT_EQ(output[2].timestamp.time, e - chrono::milliseconds(10));
694 EXPECT_EQ(output[3].timestamp.boot, 0u);
695 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(1901));
696 EXPECT_EQ(output[4].timestamp.boot, 0u);
697 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuhb000de62020-12-03 22:00:40 -0800698}
699
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800700// Tests that messages too far out of order trigger death.
701TEST_F(LogPartsSorterDeathTest, Pull) {
702 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
703 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700704 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800705 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700706 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800707 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700708 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800709 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700710 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800711 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
712 // The following message is too far out of order and will trigger the CHECK.
Austin Schuhd863e6e2022-10-16 15:44:50 -0700713 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800714 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
715 }
716
717 const std::vector<LogFile> parts = SortParts({logfile0_});
718
719 LogPartsSorter parts_sorter(parts[0].parts[0]);
720
721 // Confirm we aren't sorted until any time until the message is popped.
722 // Peeking shouldn't change the sorted until time.
723 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
724 std::deque<Message> output;
725
726 ASSERT_TRUE(parts_sorter.Front() != nullptr);
727 parts_sorter.PopFront();
728 ASSERT_TRUE(parts_sorter.Front() != nullptr);
729 ASSERT_TRUE(parts_sorter.Front() != nullptr);
730 parts_sorter.PopFront();
731
Austin Schuh58646e22021-08-23 23:51:46 -0700732 EXPECT_DEATH({ parts_sorter.Front(); },
733 "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800734}
735
Austin Schuh8f52ed52020-11-30 23:12:39 -0800736// Tests that we can merge data from 2 separate files, including duplicate data.
737TEST_F(NodeMergerTest, TwoFileMerger) {
738 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
739 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700740 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800741 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700742 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800743 writer1.QueueSpan(config1_.span());
744
Austin Schuhd863e6e2022-10-16 15:44:50 -0700745 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800746 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700747 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800748 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
749
Austin Schuhd863e6e2022-10-16 15:44:50 -0700750 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800751 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700752 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800753 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
754
755 // Make a duplicate!
756 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
757 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
758 writer0.QueueSpan(msg.span());
759 writer1.QueueSpan(msg.span());
760
Austin Schuhd863e6e2022-10-16 15:44:50 -0700761 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800762 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
763 }
764
765 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Austin Schuhd2f96102020-12-01 20:27:29 -0800766 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800767
Austin Schuhd2f96102020-12-01 20:27:29 -0800768 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800769
770 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
771
772 std::deque<Message> output;
773
774 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
775 ASSERT_TRUE(merger.Front() != nullptr);
776 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
777
778 output.emplace_back(std::move(*merger.Front()));
779 merger.PopFront();
780 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
781
782 ASSERT_TRUE(merger.Front() != nullptr);
783 output.emplace_back(std::move(*merger.Front()));
784 merger.PopFront();
785 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
786
787 ASSERT_TRUE(merger.Front() != nullptr);
788 output.emplace_back(std::move(*merger.Front()));
789 merger.PopFront();
790 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
791
792 ASSERT_TRUE(merger.Front() != nullptr);
793 output.emplace_back(std::move(*merger.Front()));
794 merger.PopFront();
795 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
796
797 ASSERT_TRUE(merger.Front() != nullptr);
798 output.emplace_back(std::move(*merger.Front()));
799 merger.PopFront();
800 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
801
802 ASSERT_TRUE(merger.Front() != nullptr);
803 output.emplace_back(std::move(*merger.Front()));
804 merger.PopFront();
805 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
806
807 ASSERT_TRUE(merger.Front() == nullptr);
808
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700809 EXPECT_EQ(output[0].timestamp.boot, 0u);
810 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
811 EXPECT_EQ(output[1].timestamp.boot, 0u);
812 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1001));
813 EXPECT_EQ(output[2].timestamp.boot, 0u);
814 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1002));
815 EXPECT_EQ(output[3].timestamp.boot, 0u);
816 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
817 EXPECT_EQ(output[4].timestamp.boot, 0u);
818 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(3000));
819 EXPECT_EQ(output[5].timestamp.boot, 0u);
820 EXPECT_EQ(output[5].timestamp.time, e + chrono::milliseconds(3002));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800821}
822
Austin Schuh8bf1e632021-01-02 22:41:04 -0800823// Tests that we can merge timestamps with various combinations of
824// monotonic_timestamp_time.
825TEST_F(NodeMergerTest, TwoFileTimestampMerger) {
826 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
827 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700828 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800829 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700830 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800831 writer1.QueueSpan(config1_.span());
832
833 // Neither has it.
834 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700835 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800836 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700837 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800838 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
839
840 // First only has it.
841 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700842 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800843 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
844 e + chrono::nanoseconds(971)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700845 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800846 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
847
848 // Second only has it.
849 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700850 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800851 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700852 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800853 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
854 e + chrono::nanoseconds(972)));
855
856 // Both have it.
857 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700858 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800859 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
860 e + chrono::nanoseconds(973)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700861 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800862 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
863 e + chrono::nanoseconds(973)));
864 }
865
866 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
867 ASSERT_EQ(parts.size(), 1u);
868
869 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
870
871 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
872
873 std::deque<Message> output;
874
875 for (int i = 0; i < 4; ++i) {
876 ASSERT_TRUE(merger.Front() != nullptr);
877 output.emplace_back(std::move(*merger.Front()));
878 merger.PopFront();
879 }
880 ASSERT_TRUE(merger.Front() == nullptr);
881
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700882 EXPECT_EQ(output[0].timestamp.boot, 0u);
883 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700884 EXPECT_FALSE(output[0].data->has_monotonic_timestamp_time);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700885
886 EXPECT_EQ(output[1].timestamp.boot, 0u);
887 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700888 EXPECT_TRUE(output[1].data->has_monotonic_timestamp_time);
889 EXPECT_EQ(output[1].data->monotonic_timestamp_time,
890 monotonic_clock::time_point(std::chrono::nanoseconds(971)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700891
892 EXPECT_EQ(output[2].timestamp.boot, 0u);
893 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700894 EXPECT_TRUE(output[2].data->has_monotonic_timestamp_time);
895 EXPECT_EQ(output[2].data->monotonic_timestamp_time,
896 monotonic_clock::time_point(std::chrono::nanoseconds(972)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700897
898 EXPECT_EQ(output[3].timestamp.boot, 0u);
899 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700900 EXPECT_TRUE(output[3].data->has_monotonic_timestamp_time);
901 EXPECT_EQ(output[3].data->monotonic_timestamp_time,
902 monotonic_clock::time_point(std::chrono::nanoseconds(973)));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800903}
904
Austin Schuhd2f96102020-12-01 20:27:29 -0800905// Tests that we can match timestamps on delivered messages.
906TEST_F(TimestampMapperTest, ReadNode0First) {
907 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
908 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700909 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800910 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700911 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800912 writer1.QueueSpan(config2_.span());
913
Austin Schuhd863e6e2022-10-16 15:44:50 -0700914 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800915 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700916 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800917 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
918
Austin Schuhd863e6e2022-10-16 15:44:50 -0700919 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800920 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700921 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800922 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
923
Austin Schuhd863e6e2022-10-16 15:44:50 -0700924 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800925 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700926 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800927 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
928 }
929
930 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
931
932 ASSERT_EQ(parts[0].logger_node, "pi1");
933 ASSERT_EQ(parts[1].logger_node, "pi2");
934
Austin Schuh79b30942021-01-24 22:32:21 -0800935 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800936 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -0800937 mapper0.set_timestamp_callback(
938 [&](TimestampedMessage *) { ++mapper0_count; });
939 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800940 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -0800941 mapper1.set_timestamp_callback(
942 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -0800943
944 mapper0.AddPeer(&mapper1);
945 mapper1.AddPeer(&mapper0);
946
947 {
948 std::deque<TimestampedMessage> output0;
949
Austin Schuh79b30942021-01-24 22:32:21 -0800950 EXPECT_EQ(mapper0_count, 0u);
951 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800952 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800953 EXPECT_EQ(mapper0_count, 1u);
954 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800955 output0.emplace_back(std::move(*mapper0.Front()));
956 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700957 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800958 EXPECT_EQ(mapper0_count, 1u);
959 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800960
961 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800962 EXPECT_EQ(mapper0_count, 2u);
963 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800964 output0.emplace_back(std::move(*mapper0.Front()));
965 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700966 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800967
968 ASSERT_TRUE(mapper0.Front() != nullptr);
969 output0.emplace_back(std::move(*mapper0.Front()));
970 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700971 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800972
Austin Schuh79b30942021-01-24 22:32:21 -0800973 EXPECT_EQ(mapper0_count, 3u);
974 EXPECT_EQ(mapper1_count, 0u);
975
Austin Schuhd2f96102020-12-01 20:27:29 -0800976 ASSERT_TRUE(mapper0.Front() == nullptr);
977
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700978 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
979 EXPECT_EQ(output0[0].monotonic_event_time.time,
980 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700981 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700982
983 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
984 EXPECT_EQ(output0[1].monotonic_event_time.time,
985 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700986 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700987
988 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
989 EXPECT_EQ(output0[2].monotonic_event_time.time,
990 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700991 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -0800992 }
993
994 {
995 SCOPED_TRACE("Trying node1 now");
996 std::deque<TimestampedMessage> output1;
997
Austin Schuh79b30942021-01-24 22:32:21 -0800998 EXPECT_EQ(mapper0_count, 3u);
999 EXPECT_EQ(mapper1_count, 0u);
1000
Austin Schuhd2f96102020-12-01 20:27:29 -08001001 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001002 EXPECT_EQ(mapper0_count, 3u);
1003 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001004 output1.emplace_back(std::move(*mapper1.Front()));
1005 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001006 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -08001007 EXPECT_EQ(mapper0_count, 3u);
1008 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001009
1010 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001011 EXPECT_EQ(mapper0_count, 3u);
1012 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001013 output1.emplace_back(std::move(*mapper1.Front()));
1014 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001015 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001016
1017 ASSERT_TRUE(mapper1.Front() != nullptr);
1018 output1.emplace_back(std::move(*mapper1.Front()));
1019 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001020 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001021
Austin Schuh79b30942021-01-24 22:32:21 -08001022 EXPECT_EQ(mapper0_count, 3u);
1023 EXPECT_EQ(mapper1_count, 3u);
1024
Austin Schuhd2f96102020-12-01 20:27:29 -08001025 ASSERT_TRUE(mapper1.Front() == nullptr);
1026
Austin Schuh79b30942021-01-24 22:32:21 -08001027 EXPECT_EQ(mapper0_count, 3u);
1028 EXPECT_EQ(mapper1_count, 3u);
1029
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001030 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1031 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001032 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001033 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001034
1035 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1036 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001037 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001038 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001039
1040 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1041 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001042 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001043 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001044 }
1045}
1046
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001047// Tests that we filter messages using the channel filter callback
1048TEST_F(TimestampMapperTest, ReplayChannelsCallbackTest) {
1049 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1050 {
1051 TestDetachedBufferWriter writer0(logfile0_);
1052 writer0.QueueSpan(config0_.span());
1053 TestDetachedBufferWriter writer1(logfile1_);
1054 writer1.QueueSpan(config2_.span());
1055
1056 writer0.WriteSizedFlatbuffer(
1057 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1058 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1059 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1060
1061 writer0.WriteSizedFlatbuffer(
1062 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1063 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1064 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1065
1066 writer0.WriteSizedFlatbuffer(
1067 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1068 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1069 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1070 }
1071
1072 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1073
1074 ASSERT_EQ(parts[0].logger_node, "pi1");
1075 ASSERT_EQ(parts[1].logger_node, "pi2");
1076
1077 // mapper0 will not provide any messages while mapper1 will provide all
1078 // messages due to the channel filter callbacks used
1079 size_t mapper0_count = 0;
1080 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1081 mapper0.set_timestamp_callback(
1082 [&](TimestampedMessage *) { ++mapper0_count; });
1083 mapper0.set_replay_channels_callback(
1084 [&](const TimestampedMessage &) -> bool { return mapper0_count != 2; });
1085 size_t mapper1_count = 0;
1086 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1087 mapper1.set_timestamp_callback(
1088 [&](TimestampedMessage *) { ++mapper1_count; });
1089 mapper1.set_replay_channels_callback(
1090 [&](const TimestampedMessage &) -> bool { return mapper1_count != 2; });
1091
1092 mapper0.AddPeer(&mapper1);
1093 mapper1.AddPeer(&mapper0);
1094
1095 {
1096 std::deque<TimestampedMessage> output0;
1097
1098 EXPECT_EQ(mapper0_count, 0u);
1099 EXPECT_EQ(mapper1_count, 0u);
1100
1101 ASSERT_TRUE(mapper0.Front() != nullptr);
1102 EXPECT_EQ(mapper0_count, 1u);
1103 EXPECT_EQ(mapper1_count, 0u);
1104 output0.emplace_back(std::move(*mapper0.Front()));
1105 mapper0.PopFront();
1106
1107 EXPECT_TRUE(mapper0.started());
1108 EXPECT_EQ(mapper0_count, 1u);
1109 EXPECT_EQ(mapper1_count, 0u);
1110
1111 // mapper0_count is now at 3 since the second message is not queued, but
1112 // timestamp_callback needs to be called everytime even if Front() does not
1113 // provide a message due to the replay_channels_callback.
1114 ASSERT_TRUE(mapper0.Front() != nullptr);
1115 EXPECT_EQ(mapper0_count, 3u);
1116 EXPECT_EQ(mapper1_count, 0u);
1117 output0.emplace_back(std::move(*mapper0.Front()));
1118 mapper0.PopFront();
1119
1120 EXPECT_TRUE(mapper0.started());
1121 EXPECT_EQ(mapper0_count, 3u);
1122 EXPECT_EQ(mapper1_count, 0u);
1123
1124 ASSERT_TRUE(mapper0.Front() == nullptr);
1125 EXPECT_TRUE(mapper0.started());
1126
1127 EXPECT_EQ(mapper0_count, 3u);
1128 EXPECT_EQ(mapper1_count, 0u);
1129
1130 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1131 EXPECT_EQ(output0[0].monotonic_event_time.time,
1132 e + chrono::milliseconds(1000));
1133 EXPECT_TRUE(output0[0].data != nullptr);
1134
1135 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1136 EXPECT_EQ(output0[1].monotonic_event_time.time,
1137 e + chrono::milliseconds(3000));
1138 EXPECT_TRUE(output0[1].data != nullptr);
1139 }
1140
1141 {
1142 SCOPED_TRACE("Trying node1 now");
1143 std::deque<TimestampedMessage> output1;
1144
1145 EXPECT_EQ(mapper0_count, 3u);
1146 EXPECT_EQ(mapper1_count, 0u);
1147
1148 ASSERT_TRUE(mapper1.Front() != nullptr);
1149 EXPECT_EQ(mapper0_count, 3u);
1150 EXPECT_EQ(mapper1_count, 1u);
1151 output1.emplace_back(std::move(*mapper1.Front()));
1152 mapper1.PopFront();
1153 EXPECT_TRUE(mapper1.started());
1154 EXPECT_EQ(mapper0_count, 3u);
1155 EXPECT_EQ(mapper1_count, 1u);
1156
1157 // mapper1_count is now at 3 since the second message is not queued, but
1158 // timestamp_callback needs to be called everytime even if Front() does not
1159 // provide a message due to the replay_channels_callback.
1160 ASSERT_TRUE(mapper1.Front() != nullptr);
1161 output1.emplace_back(std::move(*mapper1.Front()));
1162 mapper1.PopFront();
1163 EXPECT_TRUE(mapper1.started());
1164
1165 EXPECT_EQ(mapper0_count, 3u);
1166 EXPECT_EQ(mapper1_count, 3u);
1167
1168 ASSERT_TRUE(mapper1.Front() == nullptr);
1169
1170 EXPECT_EQ(mapper0_count, 3u);
1171 EXPECT_EQ(mapper1_count, 3u);
1172
1173 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1174 EXPECT_EQ(output1[0].monotonic_event_time.time,
1175 e + chrono::seconds(100) + chrono::milliseconds(1000));
1176 EXPECT_TRUE(output1[0].data != nullptr);
1177
1178 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1179 EXPECT_EQ(output1[1].monotonic_event_time.time,
1180 e + chrono::seconds(100) + chrono::milliseconds(3000));
1181 EXPECT_TRUE(output1[1].data != nullptr);
1182 }
1183}
Austin Schuh8bf1e632021-01-02 22:41:04 -08001184// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
1185// returned.
1186TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
1187 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1188 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001189 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001190 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001191 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001192 writer1.QueueSpan(config4_.span());
1193
Austin Schuhd863e6e2022-10-16 15:44:50 -07001194 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001195 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001196 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001197 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
1198 e + chrono::nanoseconds(971)));
1199
Austin Schuhd863e6e2022-10-16 15:44:50 -07001200 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001201 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001202 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001203 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
1204 e + chrono::nanoseconds(5458)));
1205
Austin Schuhd863e6e2022-10-16 15:44:50 -07001206 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001207 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001208 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001209 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1210 }
1211
1212 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1213
1214 for (const auto &p : parts) {
1215 LOG(INFO) << p;
1216 }
1217
1218 ASSERT_EQ(parts.size(), 1u);
1219
Austin Schuh79b30942021-01-24 22:32:21 -08001220 size_t mapper0_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001221 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001222 mapper0.set_timestamp_callback(
1223 [&](TimestampedMessage *) { ++mapper0_count; });
1224 size_t mapper1_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001225 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001226 mapper1.set_timestamp_callback(
1227 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -08001228
1229 mapper0.AddPeer(&mapper1);
1230 mapper1.AddPeer(&mapper0);
1231
1232 {
1233 std::deque<TimestampedMessage> output0;
1234
1235 for (int i = 0; i < 3; ++i) {
1236 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
1237 output0.emplace_back(std::move(*mapper0.Front()));
1238 mapper0.PopFront();
1239 }
1240
1241 ASSERT_TRUE(mapper0.Front() == nullptr);
1242
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001243 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1244 EXPECT_EQ(output0[0].monotonic_event_time.time,
1245 e + chrono::milliseconds(1000));
1246 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
1247 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
1248 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001249 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001250
1251 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1252 EXPECT_EQ(output0[1].monotonic_event_time.time,
1253 e + chrono::milliseconds(2000));
1254 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
1255 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
1256 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001257 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001258
1259 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1260 EXPECT_EQ(output0[2].monotonic_event_time.time,
1261 e + chrono::milliseconds(3000));
1262 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
1263 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
1264 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001265 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001266 }
1267
1268 {
1269 SCOPED_TRACE("Trying node1 now");
1270 std::deque<TimestampedMessage> output1;
1271
1272 for (int i = 0; i < 3; ++i) {
1273 ASSERT_TRUE(mapper1.Front() != nullptr);
1274 output1.emplace_back(std::move(*mapper1.Front()));
1275 mapper1.PopFront();
1276 }
1277
1278 ASSERT_TRUE(mapper1.Front() == nullptr);
1279
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001280 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1281 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001282 e + chrono::seconds(100) + chrono::milliseconds(1000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001283 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
1284 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001285 e + chrono::nanoseconds(971));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001286 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001287
1288 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1289 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001290 e + chrono::seconds(100) + chrono::milliseconds(2000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001291 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
1292 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001293 e + chrono::nanoseconds(5458));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001294 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001295
1296 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1297 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001298 e + chrono::seconds(100) + chrono::milliseconds(3000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001299 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
1300 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
1301 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001302 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001303 }
Austin Schuh79b30942021-01-24 22:32:21 -08001304
1305 EXPECT_EQ(mapper0_count, 3u);
1306 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001307}
1308
Austin Schuhd2f96102020-12-01 20:27:29 -08001309// Tests that we can match timestamps on delivered messages. By doing this in
1310// the reverse order, the second node needs to queue data up from the first node
1311// to find the matching timestamp.
1312TEST_F(TimestampMapperTest, ReadNode1First) {
1313 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1314 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001315 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001316 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001317 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001318 writer1.QueueSpan(config2_.span());
1319
Austin Schuhd863e6e2022-10-16 15:44:50 -07001320 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001321 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001322 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001323 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1324
Austin Schuhd863e6e2022-10-16 15:44:50 -07001325 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001326 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001327 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001328 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1329
Austin Schuhd863e6e2022-10-16 15:44:50 -07001330 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001331 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001332 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001333 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1334 }
1335
1336 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1337
1338 ASSERT_EQ(parts[0].logger_node, "pi1");
1339 ASSERT_EQ(parts[1].logger_node, "pi2");
1340
Austin Schuh79b30942021-01-24 22:32:21 -08001341 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001342 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001343 mapper0.set_timestamp_callback(
1344 [&](TimestampedMessage *) { ++mapper0_count; });
1345 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001346 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001347 mapper1.set_timestamp_callback(
1348 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001349
1350 mapper0.AddPeer(&mapper1);
1351 mapper1.AddPeer(&mapper0);
1352
1353 {
1354 SCOPED_TRACE("Trying node1 now");
1355 std::deque<TimestampedMessage> output1;
1356
1357 ASSERT_TRUE(mapper1.Front() != nullptr);
1358 output1.emplace_back(std::move(*mapper1.Front()));
1359 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001360 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001361
1362 ASSERT_TRUE(mapper1.Front() != nullptr);
1363 output1.emplace_back(std::move(*mapper1.Front()));
1364 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001365 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001366
1367 ASSERT_TRUE(mapper1.Front() != nullptr);
1368 output1.emplace_back(std::move(*mapper1.Front()));
1369 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001370 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001371
1372 ASSERT_TRUE(mapper1.Front() == nullptr);
1373
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001374 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1375 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001376 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001377 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001378
1379 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1380 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001381 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001382 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001383
1384 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1385 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001386 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001387 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001388 }
1389
1390 {
1391 std::deque<TimestampedMessage> output0;
1392
1393 ASSERT_TRUE(mapper0.Front() != nullptr);
1394 output0.emplace_back(std::move(*mapper0.Front()));
1395 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001396 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001397
1398 ASSERT_TRUE(mapper0.Front() != nullptr);
1399 output0.emplace_back(std::move(*mapper0.Front()));
1400 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001401 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001402
1403 ASSERT_TRUE(mapper0.Front() != nullptr);
1404 output0.emplace_back(std::move(*mapper0.Front()));
1405 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001406 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001407
1408 ASSERT_TRUE(mapper0.Front() == nullptr);
1409
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001410 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1411 EXPECT_EQ(output0[0].monotonic_event_time.time,
1412 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001413 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001414
1415 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1416 EXPECT_EQ(output0[1].monotonic_event_time.time,
1417 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001418 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001419
1420 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1421 EXPECT_EQ(output0[2].monotonic_event_time.time,
1422 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001423 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001424 }
Austin Schuh79b30942021-01-24 22:32:21 -08001425
1426 EXPECT_EQ(mapper0_count, 3u);
1427 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001428}
1429
1430// Tests that we return just the timestamps if we couldn't find the data and the
1431// missing data was at the beginning of the file.
1432TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1433 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1434 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001435 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001436 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001437 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001438 writer1.QueueSpan(config2_.span());
1439
1440 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001441 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001442 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1443
Austin Schuhd863e6e2022-10-16 15:44:50 -07001444 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001445 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001446 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001447 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1448
Austin Schuhd863e6e2022-10-16 15:44:50 -07001449 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001450 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001451 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001452 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1453 }
1454
1455 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1456
1457 ASSERT_EQ(parts[0].logger_node, "pi1");
1458 ASSERT_EQ(parts[1].logger_node, "pi2");
1459
Austin Schuh79b30942021-01-24 22:32:21 -08001460 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001461 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001462 mapper0.set_timestamp_callback(
1463 [&](TimestampedMessage *) { ++mapper0_count; });
1464 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001465 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001466 mapper1.set_timestamp_callback(
1467 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001468
1469 mapper0.AddPeer(&mapper1);
1470 mapper1.AddPeer(&mapper0);
1471
1472 {
1473 SCOPED_TRACE("Trying node1 now");
1474 std::deque<TimestampedMessage> output1;
1475
1476 ASSERT_TRUE(mapper1.Front() != nullptr);
1477 output1.emplace_back(std::move(*mapper1.Front()));
1478 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001479 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001480
1481 ASSERT_TRUE(mapper1.Front() != nullptr);
1482 output1.emplace_back(std::move(*mapper1.Front()));
1483 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001484 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001485
1486 ASSERT_TRUE(mapper1.Front() != nullptr);
1487 output1.emplace_back(std::move(*mapper1.Front()));
1488 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001489 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001490
1491 ASSERT_TRUE(mapper1.Front() == nullptr);
1492
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001493 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1494 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001495 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001496 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001497
1498 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1499 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001500 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001501 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001502
1503 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1504 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001505 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001506 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001507 }
Austin Schuh79b30942021-01-24 22:32:21 -08001508
1509 EXPECT_EQ(mapper0_count, 0u);
1510 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001511}
1512
1513// Tests that we return just the timestamps if we couldn't find the data and the
1514// missing data was at the end of the file.
1515TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1516 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1517 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001518 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001519 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001520 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001521 writer1.QueueSpan(config2_.span());
1522
Austin Schuhd863e6e2022-10-16 15:44:50 -07001523 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001524 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001525 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001526 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1527
Austin Schuhd863e6e2022-10-16 15:44:50 -07001528 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001529 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001530 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001531 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1532
1533 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001534 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001535 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1536 }
1537
1538 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1539
1540 ASSERT_EQ(parts[0].logger_node, "pi1");
1541 ASSERT_EQ(parts[1].logger_node, "pi2");
1542
Austin Schuh79b30942021-01-24 22:32:21 -08001543 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001544 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001545 mapper0.set_timestamp_callback(
1546 [&](TimestampedMessage *) { ++mapper0_count; });
1547 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001548 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001549 mapper1.set_timestamp_callback(
1550 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001551
1552 mapper0.AddPeer(&mapper1);
1553 mapper1.AddPeer(&mapper0);
1554
1555 {
1556 SCOPED_TRACE("Trying node1 now");
1557 std::deque<TimestampedMessage> output1;
1558
1559 ASSERT_TRUE(mapper1.Front() != nullptr);
1560 output1.emplace_back(std::move(*mapper1.Front()));
1561 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001562 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001563
1564 ASSERT_TRUE(mapper1.Front() != nullptr);
1565 output1.emplace_back(std::move(*mapper1.Front()));
1566 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001567 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001568
1569 ASSERT_TRUE(mapper1.Front() != nullptr);
1570 output1.emplace_back(std::move(*mapper1.Front()));
1571 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001572 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001573
1574 ASSERT_TRUE(mapper1.Front() == nullptr);
1575
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001576 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1577 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001578 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001579 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001580
1581 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1582 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001583 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001584 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001585
1586 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1587 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001588 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001589 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001590 }
Austin Schuh79b30942021-01-24 22:32:21 -08001591
1592 EXPECT_EQ(mapper0_count, 0u);
1593 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001594}
1595
Austin Schuh993ccb52020-12-12 15:59:32 -08001596// Tests that we handle a message which failed to forward or be logged.
1597TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1598 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1599 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001600 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001601 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001602 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001603 writer1.QueueSpan(config2_.span());
1604
Austin Schuhd863e6e2022-10-16 15:44:50 -07001605 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001606 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001607 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001608 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1609
1610 // Create both the timestamp and message, but don't log them, simulating a
1611 // forwarding drop.
1612 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1613 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1614 chrono::seconds(100));
1615
Austin Schuhd863e6e2022-10-16 15:44:50 -07001616 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001617 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001618 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001619 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1620 }
1621
1622 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1623
1624 ASSERT_EQ(parts[0].logger_node, "pi1");
1625 ASSERT_EQ(parts[1].logger_node, "pi2");
1626
Austin Schuh79b30942021-01-24 22:32:21 -08001627 size_t mapper0_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001628 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001629 mapper0.set_timestamp_callback(
1630 [&](TimestampedMessage *) { ++mapper0_count; });
1631 size_t mapper1_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001632 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001633 mapper1.set_timestamp_callback(
1634 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001635
1636 mapper0.AddPeer(&mapper1);
1637 mapper1.AddPeer(&mapper0);
1638
1639 {
1640 std::deque<TimestampedMessage> output1;
1641
1642 ASSERT_TRUE(mapper1.Front() != nullptr);
1643 output1.emplace_back(std::move(*mapper1.Front()));
1644 mapper1.PopFront();
1645
1646 ASSERT_TRUE(mapper1.Front() != nullptr);
1647 output1.emplace_back(std::move(*mapper1.Front()));
1648
1649 ASSERT_FALSE(mapper1.Front() == nullptr);
1650
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001651 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1652 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001653 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001654 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001655
1656 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1657 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001658 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001659 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh993ccb52020-12-12 15:59:32 -08001660 }
Austin Schuh79b30942021-01-24 22:32:21 -08001661
1662 EXPECT_EQ(mapper0_count, 0u);
1663 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001664}
1665
Austin Schuhd2f96102020-12-01 20:27:29 -08001666// Tests that we properly sort log files with duplicate timestamps.
1667TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1668 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1669 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001670 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001671 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001672 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001673 writer1.QueueSpan(config2_.span());
1674
Austin Schuhd863e6e2022-10-16 15:44:50 -07001675 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001676 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001677 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001678 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1679
Austin Schuhd863e6e2022-10-16 15:44:50 -07001680 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001681 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001682 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001683 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1684
Austin Schuhd863e6e2022-10-16 15:44:50 -07001685 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001686 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001687 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001688 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1689
Austin Schuhd863e6e2022-10-16 15:44:50 -07001690 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001691 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001692 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001693 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1694 }
1695
1696 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1697
1698 ASSERT_EQ(parts[0].logger_node, "pi1");
1699 ASSERT_EQ(parts[1].logger_node, "pi2");
1700
Austin Schuh79b30942021-01-24 22:32:21 -08001701 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001702 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001703 mapper0.set_timestamp_callback(
1704 [&](TimestampedMessage *) { ++mapper0_count; });
1705 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001706 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001707 mapper1.set_timestamp_callback(
1708 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001709
1710 mapper0.AddPeer(&mapper1);
1711 mapper1.AddPeer(&mapper0);
1712
1713 {
1714 SCOPED_TRACE("Trying node1 now");
1715 std::deque<TimestampedMessage> output1;
1716
1717 for (int i = 0; i < 4; ++i) {
1718 ASSERT_TRUE(mapper1.Front() != nullptr);
1719 output1.emplace_back(std::move(*mapper1.Front()));
1720 mapper1.PopFront();
1721 }
1722 ASSERT_TRUE(mapper1.Front() == nullptr);
1723
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001724 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1725 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001726 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001727 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001728
1729 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1730 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001731 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001732 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001733
1734 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1735 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001736 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001737 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001738
1739 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1740 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001741 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001742 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001743 }
Austin Schuh79b30942021-01-24 22:32:21 -08001744
1745 EXPECT_EQ(mapper0_count, 0u);
1746 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001747}
1748
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001749// Tests that we properly produce a valid start time.
Austin Schuhd2f96102020-12-01 20:27:29 -08001750TEST_F(TimestampMapperTest, StartTime) {
1751 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1752 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001753 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001754 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001755 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001756 writer1.QueueSpan(config1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001757 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001758 writer2.QueueSpan(config3_.span());
1759 }
1760
1761 const std::vector<LogFile> parts =
1762 SortParts({logfile0_, logfile1_, logfile2_});
1763
Austin Schuh79b30942021-01-24 22:32:21 -08001764 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001765 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001766 mapper0.set_timestamp_callback(
1767 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001768
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001769 EXPECT_EQ(mapper0.monotonic_start_time(0), e + chrono::milliseconds(1));
1770 EXPECT_EQ(mapper0.realtime_start_time(0),
Austin Schuhd2f96102020-12-01 20:27:29 -08001771 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001772 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001773}
1774
Austin Schuhfecf1d82020-12-19 16:57:28 -08001775// Tests that when a peer isn't registered, we treat that as if there was no
1776// data available.
1777TEST_F(TimestampMapperTest, NoPeer) {
1778 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1779 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001780 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001781 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001782 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001783 writer1.QueueSpan(config2_.span());
1784
1785 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001786 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001787 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1788
Austin Schuhd863e6e2022-10-16 15:44:50 -07001789 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001790 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001791 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001792 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1793
Austin Schuhd863e6e2022-10-16 15:44:50 -07001794 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001795 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001796 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001797 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1798 }
1799
1800 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1801
1802 ASSERT_EQ(parts[0].logger_node, "pi1");
1803 ASSERT_EQ(parts[1].logger_node, "pi2");
1804
Austin Schuh79b30942021-01-24 22:32:21 -08001805 size_t mapper1_count = 0;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001806 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001807 mapper1.set_timestamp_callback(
1808 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001809
1810 {
1811 std::deque<TimestampedMessage> output1;
1812
1813 ASSERT_TRUE(mapper1.Front() != nullptr);
1814 output1.emplace_back(std::move(*mapper1.Front()));
1815 mapper1.PopFront();
1816 ASSERT_TRUE(mapper1.Front() != nullptr);
1817 output1.emplace_back(std::move(*mapper1.Front()));
1818 mapper1.PopFront();
1819 ASSERT_TRUE(mapper1.Front() != nullptr);
1820 output1.emplace_back(std::move(*mapper1.Front()));
1821 mapper1.PopFront();
1822 ASSERT_TRUE(mapper1.Front() == nullptr);
1823
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001824 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1825 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001826 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001827 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001828
1829 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1830 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001831 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001832 EXPECT_FALSE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001833
1834 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1835 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001836 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001837 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001838 }
Austin Schuh79b30942021-01-24 22:32:21 -08001839 EXPECT_EQ(mapper1_count, 3u);
1840}
1841
1842// Tests that we can queue messages and call the timestamp callback for both
1843// nodes.
1844TEST_F(TimestampMapperTest, QueueUntilNode0) {
1845 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1846 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001847 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh79b30942021-01-24 22:32:21 -08001848 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001849 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh79b30942021-01-24 22:32:21 -08001850 writer1.QueueSpan(config2_.span());
1851
Austin Schuhd863e6e2022-10-16 15:44:50 -07001852 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001853 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001854 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001855 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1856
Austin Schuhd863e6e2022-10-16 15:44:50 -07001857 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001858 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001859 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001860 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1861
Austin Schuhd863e6e2022-10-16 15:44:50 -07001862 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001863 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001864 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001865 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1866
Austin Schuhd863e6e2022-10-16 15:44:50 -07001867 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001868 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001869 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001870 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1871 }
1872
1873 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1874
1875 ASSERT_EQ(parts[0].logger_node, "pi1");
1876 ASSERT_EQ(parts[1].logger_node, "pi2");
1877
1878 size_t mapper0_count = 0;
1879 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1880 mapper0.set_timestamp_callback(
1881 [&](TimestampedMessage *) { ++mapper0_count; });
1882 size_t mapper1_count = 0;
1883 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1884 mapper1.set_timestamp_callback(
1885 [&](TimestampedMessage *) { ++mapper1_count; });
1886
1887 mapper0.AddPeer(&mapper1);
1888 mapper1.AddPeer(&mapper0);
1889
1890 {
1891 std::deque<TimestampedMessage> output0;
1892
1893 EXPECT_EQ(mapper0_count, 0u);
1894 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001895 mapper0.QueueUntil(
1896 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001897 EXPECT_EQ(mapper0_count, 3u);
1898 EXPECT_EQ(mapper1_count, 0u);
1899
1900 ASSERT_TRUE(mapper0.Front() != nullptr);
1901 EXPECT_EQ(mapper0_count, 3u);
1902 EXPECT_EQ(mapper1_count, 0u);
1903
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001904 mapper0.QueueUntil(
1905 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001906 EXPECT_EQ(mapper0_count, 3u);
1907 EXPECT_EQ(mapper1_count, 0u);
1908
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001909 mapper0.QueueUntil(
1910 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001911 EXPECT_EQ(mapper0_count, 4u);
1912 EXPECT_EQ(mapper1_count, 0u);
1913
1914 output0.emplace_back(std::move(*mapper0.Front()));
1915 mapper0.PopFront();
1916 output0.emplace_back(std::move(*mapper0.Front()));
1917 mapper0.PopFront();
1918 output0.emplace_back(std::move(*mapper0.Front()));
1919 mapper0.PopFront();
1920 output0.emplace_back(std::move(*mapper0.Front()));
1921 mapper0.PopFront();
1922
1923 EXPECT_EQ(mapper0_count, 4u);
1924 EXPECT_EQ(mapper1_count, 0u);
1925
1926 ASSERT_TRUE(mapper0.Front() == nullptr);
1927
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001928 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1929 EXPECT_EQ(output0[0].monotonic_event_time.time,
1930 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001931 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001932
1933 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1934 EXPECT_EQ(output0[1].monotonic_event_time.time,
1935 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001936 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001937
1938 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1939 EXPECT_EQ(output0[2].monotonic_event_time.time,
1940 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001941 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001942
1943 EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
1944 EXPECT_EQ(output0[3].monotonic_event_time.time,
1945 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001946 EXPECT_TRUE(output0[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001947 }
1948
1949 {
1950 SCOPED_TRACE("Trying node1 now");
1951 std::deque<TimestampedMessage> output1;
1952
1953 EXPECT_EQ(mapper0_count, 4u);
1954 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001955 mapper1.QueueUntil(BootTimestamp{
1956 .boot = 0,
1957 .time = e + chrono::seconds(100) + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001958 EXPECT_EQ(mapper0_count, 4u);
1959 EXPECT_EQ(mapper1_count, 3u);
1960
1961 ASSERT_TRUE(mapper1.Front() != nullptr);
1962 EXPECT_EQ(mapper0_count, 4u);
1963 EXPECT_EQ(mapper1_count, 3u);
1964
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001965 mapper1.QueueUntil(BootTimestamp{
1966 .boot = 0,
1967 .time = e + chrono::seconds(100) + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001968 EXPECT_EQ(mapper0_count, 4u);
1969 EXPECT_EQ(mapper1_count, 3u);
1970
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001971 mapper1.QueueUntil(BootTimestamp{
1972 .boot = 0,
1973 .time = e + chrono::seconds(100) + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001974 EXPECT_EQ(mapper0_count, 4u);
1975 EXPECT_EQ(mapper1_count, 4u);
1976
1977 ASSERT_TRUE(mapper1.Front() != nullptr);
1978 EXPECT_EQ(mapper0_count, 4u);
1979 EXPECT_EQ(mapper1_count, 4u);
1980
1981 output1.emplace_back(std::move(*mapper1.Front()));
1982 mapper1.PopFront();
1983 ASSERT_TRUE(mapper1.Front() != nullptr);
1984 output1.emplace_back(std::move(*mapper1.Front()));
1985 mapper1.PopFront();
1986 ASSERT_TRUE(mapper1.Front() != nullptr);
1987 output1.emplace_back(std::move(*mapper1.Front()));
1988 mapper1.PopFront();
1989 ASSERT_TRUE(mapper1.Front() != nullptr);
1990 output1.emplace_back(std::move(*mapper1.Front()));
1991 mapper1.PopFront();
1992
1993 EXPECT_EQ(mapper0_count, 4u);
1994 EXPECT_EQ(mapper1_count, 4u);
1995
1996 ASSERT_TRUE(mapper1.Front() == nullptr);
1997
1998 EXPECT_EQ(mapper0_count, 4u);
1999 EXPECT_EQ(mapper1_count, 4u);
2000
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002001 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2002 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002003 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002004 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002005
2006 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
2007 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002008 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002009 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002010
2011 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
2012 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002013 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002014 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002015
2016 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
2017 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002018 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002019 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08002020 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08002021}
2022
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002023class BootMergerTest : public SortingElementTest {
2024 public:
2025 BootMergerTest()
2026 : SortingElementTest(),
2027 boot0_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002028 /* 100ms */
2029 "max_out_of_order_duration": 100000000,
2030 "node": {
2031 "name": "pi2"
2032 },
2033 "logger_node": {
2034 "name": "pi1"
2035 },
2036 "monotonic_start_time": 1000000,
2037 "realtime_start_time": 1000000000000,
2038 "logger_monotonic_start_time": 1000000,
2039 "logger_realtime_start_time": 1000000000000,
2040 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2041 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
2042 "parts_index": 0,
2043 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2044 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002045 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2046 "boot_uuids": [
2047 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2048 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2049 ""
2050 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002051})")),
2052 boot1_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002053 /* 100ms */
2054 "max_out_of_order_duration": 100000000,
2055 "node": {
2056 "name": "pi2"
2057 },
2058 "logger_node": {
2059 "name": "pi1"
2060 },
2061 "monotonic_start_time": 1000000,
2062 "realtime_start_time": 1000000000000,
2063 "logger_monotonic_start_time": 1000000,
2064 "logger_realtime_start_time": 1000000000000,
2065 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2066 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
2067 "parts_index": 1,
2068 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2069 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002070 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2071 "boot_uuids": [
2072 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2073 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2074 ""
2075 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002076})")) {}
Austin Schuh8bee6882021-06-28 21:03:28 -07002077
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002078 protected:
2079 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
2080 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
2081};
2082
2083// This tests that we can properly sort a multi-node log file which has the old
2084// (and buggy) timestamps in the header, and the non-resetting parts_index.
2085// These make it so we can just bairly figure out what happened first and what
2086// happened second, but not in a way that is robust to multiple nodes rebooting.
2087TEST_F(BootMergerTest, OldReboot) {
Austin Schuh8bee6882021-06-28 21:03:28 -07002088 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002089 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002090 writer.QueueSpan(boot0_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002091 }
2092 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002093 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002094 writer.QueueSpan(boot1_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002095 }
2096
2097 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
2098
2099 ASSERT_EQ(parts.size(), 1u);
2100 ASSERT_EQ(parts[0].parts.size(), 2u);
2101
2102 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
2103 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002104 boot0_.message().source_node_boot_uuid()->string_view());
Austin Schuh8bee6882021-06-28 21:03:28 -07002105
2106 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
2107 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002108 boot1_.message().source_node_boot_uuid()->string_view());
2109}
2110
2111// This tests that we can produce messages ordered across a reboot.
2112TEST_F(BootMergerTest, SortAcrossReboot) {
2113 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2114 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002115 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002116 writer.QueueSpan(boot0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002117 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002118 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002119 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002120 MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
2121 }
2122 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002123 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002124 writer.QueueSpan(boot1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002125 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002126 MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002127 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002128 MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
2129 }
2130
2131 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
2132 ASSERT_EQ(parts.size(), 1u);
2133 ASSERT_EQ(parts[0].parts.size(), 2u);
2134
2135 BootMerger merger(FilterPartsForNode(parts, "pi2"));
2136
2137 EXPECT_EQ(merger.node(), 1u);
2138
2139 std::vector<Message> output;
2140 for (int i = 0; i < 4; ++i) {
2141 ASSERT_TRUE(merger.Front() != nullptr);
2142 output.emplace_back(std::move(*merger.Front()));
2143 merger.PopFront();
2144 }
2145
2146 ASSERT_TRUE(merger.Front() == nullptr);
2147
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002148 EXPECT_EQ(output[0].timestamp.boot, 0u);
2149 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
2150 EXPECT_EQ(output[1].timestamp.boot, 0u);
2151 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(2000));
2152
2153 EXPECT_EQ(output[2].timestamp.boot, 1u);
2154 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(100));
2155 EXPECT_EQ(output[3].timestamp.boot, 1u);
2156 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
Austin Schuh8bee6882021-06-28 21:03:28 -07002157}
2158
Austin Schuh48507722021-07-17 17:29:24 -07002159class RebootTimestampMapperTest : public SortingElementTest {
2160 public:
2161 RebootTimestampMapperTest()
2162 : SortingElementTest(),
2163 boot0a_(MakeHeader(config_, R"({
2164 /* 100ms */
2165 "max_out_of_order_duration": 100000000,
2166 "node": {
2167 "name": "pi1"
2168 },
2169 "logger_node": {
2170 "name": "pi1"
2171 },
2172 "monotonic_start_time": 1000000,
2173 "realtime_start_time": 1000000000000,
2174 "logger_monotonic_start_time": 1000000,
2175 "logger_realtime_start_time": 1000000000000,
2176 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2177 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2178 "parts_index": 0,
2179 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2180 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2181 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2182 "boot_uuids": [
2183 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2184 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2185 ""
2186 ]
2187})")),
2188 boot0b_(MakeHeader(config_, R"({
2189 /* 100ms */
2190 "max_out_of_order_duration": 100000000,
2191 "node": {
2192 "name": "pi1"
2193 },
2194 "logger_node": {
2195 "name": "pi1"
2196 },
2197 "monotonic_start_time": 1000000,
2198 "realtime_start_time": 1000000000000,
2199 "logger_monotonic_start_time": 1000000,
2200 "logger_realtime_start_time": 1000000000000,
2201 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2202 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2203 "parts_index": 1,
2204 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2205 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2206 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2207 "boot_uuids": [
2208 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2209 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2210 ""
2211 ]
2212})")),
2213 boot1a_(MakeHeader(config_, R"({
2214 /* 100ms */
2215 "max_out_of_order_duration": 100000000,
2216 "node": {
2217 "name": "pi2"
2218 },
2219 "logger_node": {
2220 "name": "pi1"
2221 },
2222 "monotonic_start_time": 1000000,
2223 "realtime_start_time": 1000000000000,
2224 "logger_monotonic_start_time": 1000000,
2225 "logger_realtime_start_time": 1000000000000,
2226 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2227 "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
2228 "parts_index": 0,
2229 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2230 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2231 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2232 "boot_uuids": [
2233 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2234 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2235 ""
2236 ]
2237})")),
2238 boot1b_(MakeHeader(config_, R"({
2239 /* 100ms */
2240 "max_out_of_order_duration": 100000000,
2241 "node": {
2242 "name": "pi2"
2243 },
2244 "logger_node": {
2245 "name": "pi1"
2246 },
2247 "monotonic_start_time": 1000000,
2248 "realtime_start_time": 1000000000000,
2249 "logger_monotonic_start_time": 1000000,
2250 "logger_realtime_start_time": 1000000000000,
2251 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2252 "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
2253 "parts_index": 1,
2254 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2255 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2256 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2257 "boot_uuids": [
2258 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2259 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2260 ""
2261 ]
2262})")) {}
2263
2264 protected:
2265 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
2266 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
2267 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
2268 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
2269};
2270
Austin Schuh48507722021-07-17 17:29:24 -07002271// Tests that we can match timestamps on delivered messages in the presence of
2272// reboots on the node receiving timestamps.
2273TEST_F(RebootTimestampMapperTest, ReadNode0First) {
2274 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2275 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002276 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002277 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002278 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002279 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002280 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002281 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002282 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002283 writer1b.QueueSpan(boot1b_.span());
2284
Austin Schuhd863e6e2022-10-16 15:44:50 -07002285 writer0a.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002286 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002287 writer1a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002288 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
2289 e + chrono::milliseconds(1001)));
2290
Austin Schuhd863e6e2022-10-16 15:44:50 -07002291 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh58646e22021-08-23 23:51:46 -07002292 e + chrono::milliseconds(1000), 0, chrono::seconds(21),
2293 e + chrono::milliseconds(2001)));
2294
Austin Schuhd863e6e2022-10-16 15:44:50 -07002295 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002296 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002297 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002298 e + chrono::milliseconds(2000), 0, chrono::seconds(20),
2299 e + chrono::milliseconds(2001)));
2300
Austin Schuhd863e6e2022-10-16 15:44:50 -07002301 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002302 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002303 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002304 e + chrono::milliseconds(3000), 0, chrono::seconds(20),
2305 e + chrono::milliseconds(3001)));
2306 }
2307
Austin Schuh58646e22021-08-23 23:51:46 -07002308 const std::vector<LogFile> parts =
2309 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Austin Schuh48507722021-07-17 17:29:24 -07002310
2311 for (const auto &x : parts) {
2312 LOG(INFO) << x;
2313 }
2314 ASSERT_EQ(parts.size(), 1u);
2315 ASSERT_EQ(parts[0].logger_node, "pi1");
2316
2317 size_t mapper0_count = 0;
2318 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2319 mapper0.set_timestamp_callback(
2320 [&](TimestampedMessage *) { ++mapper0_count; });
2321 size_t mapper1_count = 0;
2322 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2323 mapper1.set_timestamp_callback(
2324 [&](TimestampedMessage *) { ++mapper1_count; });
2325
2326 mapper0.AddPeer(&mapper1);
2327 mapper1.AddPeer(&mapper0);
2328
2329 {
2330 std::deque<TimestampedMessage> output0;
2331
2332 EXPECT_EQ(mapper0_count, 0u);
2333 EXPECT_EQ(mapper1_count, 0u);
2334 ASSERT_TRUE(mapper0.Front() != nullptr);
2335 EXPECT_EQ(mapper0_count, 1u);
2336 EXPECT_EQ(mapper1_count, 0u);
2337 output0.emplace_back(std::move(*mapper0.Front()));
2338 mapper0.PopFront();
2339 EXPECT_TRUE(mapper0.started());
2340 EXPECT_EQ(mapper0_count, 1u);
2341 EXPECT_EQ(mapper1_count, 0u);
2342
2343 ASSERT_TRUE(mapper0.Front() != nullptr);
2344 EXPECT_EQ(mapper0_count, 2u);
2345 EXPECT_EQ(mapper1_count, 0u);
2346 output0.emplace_back(std::move(*mapper0.Front()));
2347 mapper0.PopFront();
2348 EXPECT_TRUE(mapper0.started());
2349
2350 ASSERT_TRUE(mapper0.Front() != nullptr);
2351 output0.emplace_back(std::move(*mapper0.Front()));
2352 mapper0.PopFront();
2353 EXPECT_TRUE(mapper0.started());
2354
2355 EXPECT_EQ(mapper0_count, 3u);
2356 EXPECT_EQ(mapper1_count, 0u);
2357
2358 ASSERT_TRUE(mapper0.Front() == nullptr);
2359
2360 LOG(INFO) << output0[0];
2361 LOG(INFO) << output0[1];
2362 LOG(INFO) << output0[2];
2363
2364 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2365 EXPECT_EQ(output0[0].monotonic_event_time.time,
2366 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002367 EXPECT_EQ(output0[0].queue_index,
2368 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002369 EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
2370 EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002371 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002372
2373 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2374 EXPECT_EQ(output0[1].monotonic_event_time.time,
2375 e + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002376 EXPECT_EQ(output0[1].queue_index,
2377 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002378 EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
2379 EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002380 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002381
2382 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2383 EXPECT_EQ(output0[2].monotonic_event_time.time,
2384 e + chrono::milliseconds(3000));
Austin Schuh58646e22021-08-23 23:51:46 -07002385 EXPECT_EQ(output0[2].queue_index,
2386 (BootQueueIndex{.boot = 0u, .index = 2u}));
Austin Schuh48507722021-07-17 17:29:24 -07002387 EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
2388 EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002389 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002390 }
2391
2392 {
2393 SCOPED_TRACE("Trying node1 now");
2394 std::deque<TimestampedMessage> output1;
2395
2396 EXPECT_EQ(mapper0_count, 3u);
2397 EXPECT_EQ(mapper1_count, 0u);
2398
2399 ASSERT_TRUE(mapper1.Front() != nullptr);
2400 EXPECT_EQ(mapper0_count, 3u);
2401 EXPECT_EQ(mapper1_count, 1u);
2402 output1.emplace_back(std::move(*mapper1.Front()));
2403 mapper1.PopFront();
2404 EXPECT_TRUE(mapper1.started());
2405 EXPECT_EQ(mapper0_count, 3u);
2406 EXPECT_EQ(mapper1_count, 1u);
2407
2408 ASSERT_TRUE(mapper1.Front() != nullptr);
2409 EXPECT_EQ(mapper0_count, 3u);
2410 EXPECT_EQ(mapper1_count, 2u);
2411 output1.emplace_back(std::move(*mapper1.Front()));
2412 mapper1.PopFront();
2413 EXPECT_TRUE(mapper1.started());
2414
2415 ASSERT_TRUE(mapper1.Front() != nullptr);
2416 output1.emplace_back(std::move(*mapper1.Front()));
2417 mapper1.PopFront();
2418 EXPECT_TRUE(mapper1.started());
2419
Austin Schuh58646e22021-08-23 23:51:46 -07002420 ASSERT_TRUE(mapper1.Front() != nullptr);
2421 output1.emplace_back(std::move(*mapper1.Front()));
2422 mapper1.PopFront();
2423 EXPECT_TRUE(mapper1.started());
2424
Austin Schuh48507722021-07-17 17:29:24 -07002425 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002426 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002427
2428 ASSERT_TRUE(mapper1.Front() == nullptr);
2429
2430 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002431 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002432
2433 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2434 EXPECT_EQ(output1[0].monotonic_event_time.time,
2435 e + chrono::seconds(100) + chrono::milliseconds(1000));
2436 EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
2437 EXPECT_EQ(output1[0].monotonic_remote_time.time,
2438 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002439 EXPECT_EQ(output1[0].remote_queue_index,
2440 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002441 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
2442 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
2443 e + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002444 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002445
2446 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2447 EXPECT_EQ(output1[1].monotonic_event_time.time,
2448 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002449 EXPECT_EQ(output1[1].remote_queue_index,
2450 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002451 EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
2452 EXPECT_EQ(output1[1].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002453 e + chrono::milliseconds(1000));
Austin Schuh48507722021-07-17 17:29:24 -07002454 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
2455 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
2456 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002457 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002458
2459 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2460 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002461 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh48507722021-07-17 17:29:24 -07002462 EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
2463 EXPECT_EQ(output1[2].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002464 e + chrono::milliseconds(2000));
2465 EXPECT_EQ(output1[2].remote_queue_index,
2466 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002467 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
2468 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002469 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002470 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002471
Austin Schuh58646e22021-08-23 23:51:46 -07002472 EXPECT_EQ(output1[3].monotonic_event_time.boot, 1u);
2473 EXPECT_EQ(output1[3].monotonic_event_time.time,
2474 e + chrono::seconds(20) + chrono::milliseconds(3000));
2475 EXPECT_EQ(output1[3].monotonic_remote_time.boot, 0u);
2476 EXPECT_EQ(output1[3].monotonic_remote_time.time,
2477 e + chrono::milliseconds(3000));
2478 EXPECT_EQ(output1[3].remote_queue_index,
2479 (BootQueueIndex{.boot = 0u, .index = 2u}));
2480 EXPECT_EQ(output1[3].monotonic_timestamp_time.boot, 0u);
2481 EXPECT_EQ(output1[3].monotonic_timestamp_time.time,
2482 e + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002483 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh58646e22021-08-23 23:51:46 -07002484
Austin Schuh48507722021-07-17 17:29:24 -07002485 LOG(INFO) << output1[0];
2486 LOG(INFO) << output1[1];
2487 LOG(INFO) << output1[2];
Austin Schuh58646e22021-08-23 23:51:46 -07002488 LOG(INFO) << output1[3];
Austin Schuh48507722021-07-17 17:29:24 -07002489 }
2490}
2491
2492TEST_F(RebootTimestampMapperTest, Node2Reboot) {
2493 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2494 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002495 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002496 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002497 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002498 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002499 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002500 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002501 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002502 writer1b.QueueSpan(boot1b_.span());
2503
Austin Schuhd863e6e2022-10-16 15:44:50 -07002504 writer1a.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002505 e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002506 writer0a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002507 e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
2508 chrono::seconds(-100),
2509 e + chrono::seconds(100) + chrono::milliseconds(1001)));
2510
Austin Schuhd863e6e2022-10-16 15:44:50 -07002511 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002512 e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002513 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002514 e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
2515 chrono::seconds(-20),
2516 e + chrono::seconds(20) + chrono::milliseconds(2001)));
2517
Austin Schuhd863e6e2022-10-16 15:44:50 -07002518 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002519 e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002520 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002521 e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
2522 chrono::seconds(-20),
2523 e + chrono::seconds(20) + chrono::milliseconds(3001)));
2524 }
2525
2526 const std::vector<LogFile> parts =
2527 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2528
2529 for (const auto &x : parts) {
2530 LOG(INFO) << x;
2531 }
2532 ASSERT_EQ(parts.size(), 1u);
2533 ASSERT_EQ(parts[0].logger_node, "pi1");
2534
2535 size_t mapper0_count = 0;
2536 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2537 mapper0.set_timestamp_callback(
2538 [&](TimestampedMessage *) { ++mapper0_count; });
2539 size_t mapper1_count = 0;
2540 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2541 mapper1.set_timestamp_callback(
2542 [&](TimestampedMessage *) { ++mapper1_count; });
2543
2544 mapper0.AddPeer(&mapper1);
2545 mapper1.AddPeer(&mapper0);
2546
2547 {
2548 std::deque<TimestampedMessage> output0;
2549
2550 EXPECT_EQ(mapper0_count, 0u);
2551 EXPECT_EQ(mapper1_count, 0u);
2552 ASSERT_TRUE(mapper0.Front() != nullptr);
2553 EXPECT_EQ(mapper0_count, 1u);
2554 EXPECT_EQ(mapper1_count, 0u);
2555 output0.emplace_back(std::move(*mapper0.Front()));
2556 mapper0.PopFront();
2557 EXPECT_TRUE(mapper0.started());
2558 EXPECT_EQ(mapper0_count, 1u);
2559 EXPECT_EQ(mapper1_count, 0u);
2560
2561 ASSERT_TRUE(mapper0.Front() != nullptr);
2562 EXPECT_EQ(mapper0_count, 2u);
2563 EXPECT_EQ(mapper1_count, 0u);
2564 output0.emplace_back(std::move(*mapper0.Front()));
2565 mapper0.PopFront();
2566 EXPECT_TRUE(mapper0.started());
2567
2568 ASSERT_TRUE(mapper0.Front() != nullptr);
2569 output0.emplace_back(std::move(*mapper0.Front()));
2570 mapper0.PopFront();
2571 EXPECT_TRUE(mapper0.started());
2572
2573 EXPECT_EQ(mapper0_count, 3u);
2574 EXPECT_EQ(mapper1_count, 0u);
2575
2576 ASSERT_TRUE(mapper0.Front() == nullptr);
2577
2578 LOG(INFO) << output0[0];
2579 LOG(INFO) << output0[1];
2580 LOG(INFO) << output0[2];
2581
2582 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2583 EXPECT_EQ(output0[0].monotonic_event_time.time,
2584 e + chrono::milliseconds(1000));
2585 EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
2586 EXPECT_EQ(output0[0].monotonic_remote_time.time,
2587 e + chrono::seconds(100) + chrono::milliseconds(1000));
2588 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
2589 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
2590 e + chrono::seconds(100) + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002591 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002592
2593 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2594 EXPECT_EQ(output0[1].monotonic_event_time.time,
2595 e + chrono::milliseconds(2000));
2596 EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
2597 EXPECT_EQ(output0[1].monotonic_remote_time.time,
2598 e + chrono::seconds(20) + chrono::milliseconds(2000));
2599 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
2600 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
2601 e + chrono::seconds(20) + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002602 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002603
2604 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2605 EXPECT_EQ(output0[2].monotonic_event_time.time,
2606 e + chrono::milliseconds(3000));
2607 EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
2608 EXPECT_EQ(output0[2].monotonic_remote_time.time,
2609 e + chrono::seconds(20) + chrono::milliseconds(3000));
2610 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
2611 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
2612 e + chrono::seconds(20) + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002613 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002614 }
2615
2616 {
2617 SCOPED_TRACE("Trying node1 now");
2618 std::deque<TimestampedMessage> output1;
2619
2620 EXPECT_EQ(mapper0_count, 3u);
2621 EXPECT_EQ(mapper1_count, 0u);
2622
2623 ASSERT_TRUE(mapper1.Front() != nullptr);
2624 EXPECT_EQ(mapper0_count, 3u);
2625 EXPECT_EQ(mapper1_count, 1u);
2626 output1.emplace_back(std::move(*mapper1.Front()));
2627 mapper1.PopFront();
2628 EXPECT_TRUE(mapper1.started());
2629 EXPECT_EQ(mapper0_count, 3u);
2630 EXPECT_EQ(mapper1_count, 1u);
2631
2632 ASSERT_TRUE(mapper1.Front() != nullptr);
2633 EXPECT_EQ(mapper0_count, 3u);
2634 EXPECT_EQ(mapper1_count, 2u);
2635 output1.emplace_back(std::move(*mapper1.Front()));
2636 mapper1.PopFront();
2637 EXPECT_TRUE(mapper1.started());
2638
2639 ASSERT_TRUE(mapper1.Front() != nullptr);
2640 output1.emplace_back(std::move(*mapper1.Front()));
2641 mapper1.PopFront();
2642 EXPECT_TRUE(mapper1.started());
2643
2644 EXPECT_EQ(mapper0_count, 3u);
2645 EXPECT_EQ(mapper1_count, 3u);
2646
2647 ASSERT_TRUE(mapper1.Front() == nullptr);
2648
2649 EXPECT_EQ(mapper0_count, 3u);
2650 EXPECT_EQ(mapper1_count, 3u);
2651
2652 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2653 EXPECT_EQ(output1[0].monotonic_event_time.time,
2654 e + chrono::seconds(100) + chrono::milliseconds(1000));
2655 EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
2656 EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002657 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002658
2659 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2660 EXPECT_EQ(output1[1].monotonic_event_time.time,
2661 e + chrono::seconds(20) + chrono::milliseconds(2000));
2662 EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
2663 EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002664 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002665
2666 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2667 EXPECT_EQ(output1[2].monotonic_event_time.time,
2668 e + chrono::seconds(20) + chrono::milliseconds(3000));
2669 EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
2670 EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002671 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002672
2673 LOG(INFO) << output1[0];
2674 LOG(INFO) << output1[1];
2675 LOG(INFO) << output1[2];
2676 }
2677}
2678
Austin Schuh44c61472021-11-22 21:04:10 -08002679class SortingDeathTest : public SortingElementTest {
2680 public:
2681 SortingDeathTest()
2682 : SortingElementTest(),
2683 part0_(MakeHeader(config_, R"({
2684 /* 100ms */
2685 "max_out_of_order_duration": 100000000,
2686 "node": {
2687 "name": "pi1"
2688 },
2689 "logger_node": {
2690 "name": "pi1"
2691 },
2692 "monotonic_start_time": 1000000,
2693 "realtime_start_time": 1000000000000,
2694 "logger_monotonic_start_time": 1000000,
2695 "logger_realtime_start_time": 1000000000000,
2696 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2697 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2698 "parts_index": 0,
2699 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2700 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2701 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2702 "boot_uuids": [
2703 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2704 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2705 ""
2706 ],
2707 "oldest_remote_monotonic_timestamps": [
2708 9223372036854775807,
2709 9223372036854775807,
2710 9223372036854775807
2711 ],
2712 "oldest_local_monotonic_timestamps": [
2713 9223372036854775807,
2714 9223372036854775807,
2715 9223372036854775807
2716 ],
2717 "oldest_remote_unreliable_monotonic_timestamps": [
2718 9223372036854775807,
2719 0,
2720 9223372036854775807
2721 ],
2722 "oldest_local_unreliable_monotonic_timestamps": [
2723 9223372036854775807,
2724 0,
2725 9223372036854775807
2726 ]
2727})")),
2728 part1_(MakeHeader(config_, R"({
2729 /* 100ms */
2730 "max_out_of_order_duration": 100000000,
2731 "node": {
2732 "name": "pi1"
2733 },
2734 "logger_node": {
2735 "name": "pi1"
2736 },
2737 "monotonic_start_time": 1000000,
2738 "realtime_start_time": 1000000000000,
2739 "logger_monotonic_start_time": 1000000,
2740 "logger_realtime_start_time": 1000000000000,
2741 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2742 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2743 "parts_index": 1,
2744 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2745 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2746 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2747 "boot_uuids": [
2748 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2749 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2750 ""
2751 ],
2752 "oldest_remote_monotonic_timestamps": [
2753 9223372036854775807,
2754 9223372036854775807,
2755 9223372036854775807
2756 ],
2757 "oldest_local_monotonic_timestamps": [
2758 9223372036854775807,
2759 9223372036854775807,
2760 9223372036854775807
2761 ],
2762 "oldest_remote_unreliable_monotonic_timestamps": [
2763 9223372036854775807,
2764 100000,
2765 9223372036854775807
2766 ],
2767 "oldest_local_unreliable_monotonic_timestamps": [
2768 9223372036854775807,
2769 100000,
2770 9223372036854775807
2771 ]
2772})")),
2773 part2_(MakeHeader(config_, R"({
2774 /* 100ms */
2775 "max_out_of_order_duration": 100000000,
2776 "node": {
2777 "name": "pi1"
2778 },
2779 "logger_node": {
2780 "name": "pi1"
2781 },
2782 "monotonic_start_time": 1000000,
2783 "realtime_start_time": 1000000000000,
2784 "logger_monotonic_start_time": 1000000,
2785 "logger_realtime_start_time": 1000000000000,
2786 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2787 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2788 "parts_index": 2,
2789 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2790 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2791 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2792 "boot_uuids": [
2793 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2794 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2795 ""
2796 ],
2797 "oldest_remote_monotonic_timestamps": [
2798 9223372036854775807,
2799 9223372036854775807,
2800 9223372036854775807
2801 ],
2802 "oldest_local_monotonic_timestamps": [
2803 9223372036854775807,
2804 9223372036854775807,
2805 9223372036854775807
2806 ],
2807 "oldest_remote_unreliable_monotonic_timestamps": [
2808 9223372036854775807,
2809 200000,
2810 9223372036854775807
2811 ],
2812 "oldest_local_unreliable_monotonic_timestamps": [
2813 9223372036854775807,
2814 200000,
2815 9223372036854775807
2816 ]
2817})")),
2818 part3_(MakeHeader(config_, R"({
2819 /* 100ms */
2820 "max_out_of_order_duration": 100000000,
2821 "node": {
2822 "name": "pi1"
2823 },
2824 "logger_node": {
2825 "name": "pi1"
2826 },
2827 "monotonic_start_time": 1000000,
2828 "realtime_start_time": 1000000000000,
2829 "logger_monotonic_start_time": 1000000,
2830 "logger_realtime_start_time": 1000000000000,
2831 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2832 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2833 "parts_index": 3,
2834 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2835 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2836 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2837 "boot_uuids": [
2838 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2839 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2840 ""
2841 ],
2842 "oldest_remote_monotonic_timestamps": [
2843 9223372036854775807,
2844 9223372036854775807,
2845 9223372036854775807
2846 ],
2847 "oldest_local_monotonic_timestamps": [
2848 9223372036854775807,
2849 9223372036854775807,
2850 9223372036854775807
2851 ],
2852 "oldest_remote_unreliable_monotonic_timestamps": [
2853 9223372036854775807,
2854 300000,
2855 9223372036854775807
2856 ],
2857 "oldest_local_unreliable_monotonic_timestamps": [
2858 9223372036854775807,
2859 300000,
2860 9223372036854775807
2861 ]
2862})")) {}
2863
2864 protected:
2865 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part0_;
2866 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part1_;
2867 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part2_;
2868 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part3_;
2869};
2870
2871// Tests that if 2 computers go back and forth trying to be the same node, we
2872// die in sorting instead of failing to estimate time.
2873TEST_F(SortingDeathTest, FightingNodes) {
2874 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002875 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh44c61472021-11-22 21:04:10 -08002876 writer0.QueueSpan(part0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002877 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh44c61472021-11-22 21:04:10 -08002878 writer1.QueueSpan(part1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002879 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuh44c61472021-11-22 21:04:10 -08002880 writer2.QueueSpan(part2_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002881 TestDetachedBufferWriter writer3(logfile3_);
Austin Schuh44c61472021-11-22 21:04:10 -08002882 writer3.QueueSpan(part3_.span());
2883 }
2884
2885 EXPECT_DEATH(
2886 {
2887 const std::vector<LogFile> parts =
2888 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2889 },
Austin Schuh22cf7862022-09-19 19:09:42 -07002890 "found overlapping boots on");
Austin Schuh44c61472021-11-22 21:04:10 -08002891}
2892
Brian Smarttea913d42021-12-10 15:02:38 -08002893// Tests that we MessageReader blows up on a bad message.
2894TEST(MessageReaderConfirmCrash, ReadWrite) {
2895 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
2896 unlink(logfile.c_str());
2897
2898 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
2899 JsonToSizedFlatbuffer<LogFileHeader>(
2900 R"({ "max_out_of_order_duration": 100000000 })");
2901 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
2902 JsonToSizedFlatbuffer<MessageHeader>(
2903 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
2904 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
2905 JsonToSizedFlatbuffer<MessageHeader>(
2906 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
2907 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m4 =
2908 JsonToSizedFlatbuffer<MessageHeader>(
2909 R"({ "channel_index": 0, "monotonic_sent_time": 4 })");
2910
2911 // Starts out like a proper flat buffer header, but it breaks down ...
2912 std::vector<uint8_t> garbage{8, 0, 0, 0, 16, 0, 0, 0, 4, 0, 0, 0};
2913 absl::Span<uint8_t> m3_span(garbage);
2914
2915 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002916 TestDetachedBufferWriter writer(logfile);
Brian Smarttea913d42021-12-10 15:02:38 -08002917 writer.QueueSpan(config.span());
2918 writer.QueueSpan(m1.span());
2919 writer.QueueSpan(m2.span());
2920 writer.QueueSpan(m3_span);
2921 writer.QueueSpan(m4.span()); // This message is "hidden"
2922 }
2923
2924 {
2925 MessageReader reader(logfile);
2926
2927 EXPECT_EQ(reader.filename(), logfile);
2928
2929 EXPECT_EQ(
2930 reader.max_out_of_order_duration(),
2931 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2932 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2933 EXPECT_TRUE(reader.ReadMessage());
2934 EXPECT_EQ(reader.newest_timestamp(),
2935 monotonic_clock::time_point(chrono::nanoseconds(1)));
2936 EXPECT_TRUE(reader.ReadMessage());
2937 EXPECT_EQ(reader.newest_timestamp(),
2938 monotonic_clock::time_point(chrono::nanoseconds(2)));
2939 // Confirm default crashing behavior
2940 EXPECT_DEATH(reader.ReadMessage(), "Corrupted message at offset");
2941 }
2942
2943 {
2944 gflags::FlagSaver fs;
2945
2946 MessageReader reader(logfile);
2947 reader.set_crash_on_corrupt_message_flag(false);
2948
2949 EXPECT_EQ(reader.filename(), logfile);
2950
2951 EXPECT_EQ(
2952 reader.max_out_of_order_duration(),
2953 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2954 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2955 EXPECT_TRUE(reader.ReadMessage());
2956 EXPECT_EQ(reader.newest_timestamp(),
2957 monotonic_clock::time_point(chrono::nanoseconds(1)));
2958 EXPECT_TRUE(reader.ReadMessage());
2959 EXPECT_EQ(reader.newest_timestamp(),
2960 monotonic_clock::time_point(chrono::nanoseconds(2)));
2961 // Confirm avoiding the corrupted message crash, stopping instead.
2962 EXPECT_FALSE(reader.ReadMessage());
2963 }
2964
2965 {
2966 gflags::FlagSaver fs;
2967
2968 MessageReader reader(logfile);
2969 reader.set_crash_on_corrupt_message_flag(false);
2970 reader.set_ignore_corrupt_messages_flag(true);
2971
2972 EXPECT_EQ(reader.filename(), logfile);
2973
2974 EXPECT_EQ(
2975 reader.max_out_of_order_duration(),
2976 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2977 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2978 EXPECT_TRUE(reader.ReadMessage());
2979 EXPECT_EQ(reader.newest_timestamp(),
2980 monotonic_clock::time_point(chrono::nanoseconds(1)));
2981 EXPECT_TRUE(reader.ReadMessage());
2982 EXPECT_EQ(reader.newest_timestamp(),
2983 monotonic_clock::time_point(chrono::nanoseconds(2)));
2984 // Confirm skipping of the corrupted message to read the hidden one.
2985 EXPECT_TRUE(reader.ReadMessage());
2986 EXPECT_EQ(reader.newest_timestamp(),
2987 monotonic_clock::time_point(chrono::nanoseconds(4)));
2988 EXPECT_FALSE(reader.ReadMessage());
2989 }
2990}
2991
Austin Schuhfa30c352022-10-16 11:12:02 -07002992class InlinePackMessage : public ::testing::Test {
2993 protected:
2994 aos::Context RandomContext() {
2995 data_ = RandomData();
2996 std::uniform_int_distribution<uint32_t> uint32_distribution(
2997 std::numeric_limits<uint32_t>::min(),
2998 std::numeric_limits<uint32_t>::max());
2999
3000 std::uniform_int_distribution<int64_t> time_distribution(
3001 std::numeric_limits<int64_t>::min(),
3002 std::numeric_limits<int64_t>::max());
3003
3004 aos::Context context;
3005 context.monotonic_event_time =
3006 aos::monotonic_clock::epoch() +
3007 chrono::nanoseconds(time_distribution(random_number_generator_));
3008 context.realtime_event_time =
3009 aos::realtime_clock::epoch() +
3010 chrono::nanoseconds(time_distribution(random_number_generator_));
3011
3012 context.monotonic_remote_time =
3013 aos::monotonic_clock::epoch() +
3014 chrono::nanoseconds(time_distribution(random_number_generator_));
3015 context.realtime_remote_time =
3016 aos::realtime_clock::epoch() +
3017 chrono::nanoseconds(time_distribution(random_number_generator_));
3018
3019 context.queue_index = uint32_distribution(random_number_generator_);
3020 context.remote_queue_index = uint32_distribution(random_number_generator_);
3021 context.size = data_.size();
3022 context.data = data_.data();
3023 return context;
3024 }
3025
Austin Schuhf2d0e682022-10-16 14:20:58 -07003026 aos::monotonic_clock::time_point RandomMonotonic() {
3027 std::uniform_int_distribution<int64_t> time_distribution(
3028 0, std::numeric_limits<int64_t>::max());
3029 return aos::monotonic_clock::epoch() +
3030 chrono::nanoseconds(time_distribution(random_number_generator_));
3031 }
3032
3033 aos::SizePrefixedFlatbufferDetachedBuffer<message_bridge::RemoteMessage>
3034 RandomRemoteMessage() {
3035 std::uniform_int_distribution<uint8_t> uint8_distribution(
3036 std::numeric_limits<uint8_t>::min(),
3037 std::numeric_limits<uint8_t>::max());
3038
3039 std::uniform_int_distribution<int64_t> time_distribution(
3040 std::numeric_limits<int64_t>::min(),
3041 std::numeric_limits<int64_t>::max());
3042
3043 flatbuffers::FlatBufferBuilder fbb;
3044 message_bridge::RemoteMessage::Builder builder(fbb);
3045 builder.add_queue_index(uint8_distribution(random_number_generator_));
3046
3047 builder.add_monotonic_sent_time(
3048 time_distribution(random_number_generator_));
3049 builder.add_realtime_sent_time(time_distribution(random_number_generator_));
3050 builder.add_monotonic_remote_time(
3051 time_distribution(random_number_generator_));
3052 builder.add_realtime_remote_time(
3053 time_distribution(random_number_generator_));
3054
3055 builder.add_remote_queue_index(
3056 uint8_distribution(random_number_generator_));
3057
3058 fbb.FinishSizePrefixed(builder.Finish());
3059 return fbb.Release();
3060 }
3061
Austin Schuhfa30c352022-10-16 11:12:02 -07003062 std::vector<uint8_t> RandomData() {
3063 std::vector<uint8_t> result;
3064 std::uniform_int_distribution<int> length_distribution(1, 32);
3065 std::uniform_int_distribution<uint8_t> data_distribution(
3066 std::numeric_limits<uint8_t>::min(),
3067 std::numeric_limits<uint8_t>::max());
3068
3069 const size_t length = length_distribution(random_number_generator_);
3070
3071 result.reserve(length);
3072 for (size_t i = 0; i < length; ++i) {
3073 result.emplace_back(data_distribution(random_number_generator_));
3074 }
3075 return result;
3076 }
3077
3078 std::mt19937 random_number_generator_{
3079 std::mt19937(::aos::testing::RandomSeed())};
3080
3081 std::vector<uint8_t> data_;
3082};
3083
3084// Uses the binary schema to annotate a provided flatbuffer. Returns the
3085// annotated flatbuffer.
3086std::string AnnotateBinaries(
3087 const aos::NonSizePrefixedFlatbuffer<reflection::Schema> &schema,
3088 const std::string &schema_filename,
3089 flatbuffers::span<uint8_t> binary_data) {
3090 flatbuffers::BinaryAnnotator binary_annotator(
3091 schema.span().data(), schema.span().size(), binary_data.data(),
3092 binary_data.size());
3093
3094 auto annotations = binary_annotator.Annotate();
3095
3096 flatbuffers::AnnotatedBinaryTextGenerator text_generator(
3097 flatbuffers::AnnotatedBinaryTextGenerator::Options{}, annotations,
3098 binary_data.data(), binary_data.size());
3099
3100 text_generator.Generate(aos::testing::TestTmpDir() + "/foo.bfbs",
3101 schema_filename);
3102
3103 return aos::util::ReadFileToStringOrDie(aos::testing::TestTmpDir() +
3104 "/foo.afb");
3105}
3106
Austin Schuh71a40d42023-02-04 21:22:22 -08003107// Event loop which just has working time functions for the Copier classes
3108// tested below.
3109class TimeEventLoop : public EventLoop {
3110 public:
3111 TimeEventLoop() : EventLoop(nullptr) {}
3112
3113 aos::monotonic_clock::time_point monotonic_now() const final {
3114 return aos::monotonic_clock::min_time;
3115 }
3116 realtime_clock::time_point realtime_now() const final {
3117 return aos::realtime_clock::min_time;
3118 }
3119
3120 void OnRun(::std::function<void()> /*on_run*/) final { LOG(FATAL); }
3121
3122 const std::string_view name() const final { return "time"; }
3123 const Node *node() const final { return nullptr; }
3124
3125 void SetRuntimeAffinity(const cpu_set_t & /*cpuset*/) final { LOG(FATAL); }
3126 void SetRuntimeRealtimePriority(int /*priority*/) final { LOG(FATAL); }
3127
3128 const cpu_set_t &runtime_affinity() const final {
3129 LOG(FATAL);
3130 return cpuset_;
3131 }
3132
3133 TimerHandler *AddTimer(::std::function<void()> /*callback*/) final {
3134 LOG(FATAL);
3135 return nullptr;
3136 }
3137
3138 std::unique_ptr<RawSender> MakeRawSender(const Channel * /*channel*/) final {
3139 LOG(FATAL);
3140 return std::unique_ptr<RawSender>();
3141 }
3142
3143 const UUID &boot_uuid() const final {
3144 LOG(FATAL);
3145 return boot_uuid_;
3146 }
3147
3148 void set_name(const std::string_view name) final { LOG(FATAL) << name; }
3149
3150 pid_t GetTid() final {
3151 LOG(FATAL);
3152 return 0;
3153 }
3154
3155 int NumberBuffers(const Channel * /*channel*/) final {
3156 LOG(FATAL);
3157 return 0;
3158 }
3159
3160 int runtime_realtime_priority() const final {
3161 LOG(FATAL);
3162 return 0;
3163 }
3164
3165 std::unique_ptr<RawFetcher> MakeRawFetcher(
3166 const Channel * /*channel*/) final {
3167 LOG(FATAL);
3168 return std::unique_ptr<RawFetcher>();
3169 }
3170
3171 PhasedLoopHandler *AddPhasedLoop(
3172 ::std::function<void(int)> /*callback*/,
3173 const monotonic_clock::duration /*interval*/,
3174 const monotonic_clock::duration /*offset*/) final {
3175 LOG(FATAL);
3176 return nullptr;
3177 }
3178
3179 void MakeRawWatcher(
3180 const Channel * /*channel*/,
3181 std::function<void(const Context &context, const void *message)>
3182 /*watcher*/) final {
3183 LOG(FATAL);
3184 }
3185
3186 private:
3187 const cpu_set_t cpuset_ = DefaultAffinity();
3188 UUID boot_uuid_ = UUID ::Zero();
3189};
3190
Austin Schuhfa30c352022-10-16 11:12:02 -07003191// Tests that all variations of PackMessage are equivalent to the inline
3192// PackMessage used to avoid allocations.
3193TEST_F(InlinePackMessage, Equivilent) {
3194 std::uniform_int_distribution<uint32_t> uint32_distribution(
3195 std::numeric_limits<uint32_t>::min(),
3196 std::numeric_limits<uint32_t>::max());
3197 aos::FlatbufferVector<reflection::Schema> schema =
3198 FileToFlatbuffer<reflection::Schema>(
3199 ArtifactPath("aos/events/logging/logger.bfbs"));
3200
3201 for (const LogType type :
3202 {LogType::kLogMessage, LogType::kLogDeliveryTimeOnly,
3203 LogType::kLogMessageAndDeliveryTime, LogType::kLogRemoteMessage}) {
3204 for (int i = 0; i < 100; ++i) {
3205 aos::Context context = RandomContext();
3206 const uint32_t channel_index =
3207 uint32_distribution(random_number_generator_);
3208
3209 flatbuffers::FlatBufferBuilder fbb;
3210 fbb.ForceDefaults(true);
3211 fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index, type));
3212
3213 VLOG(1) << absl::BytesToHexString(std::string_view(
3214 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3215 fbb.GetBufferSpan().size()));
3216
3217 // Make sure that both the builder and inline method agree on sizes.
Austin Schuh48d10d62022-10-16 22:19:23 -07003218 ASSERT_EQ(fbb.GetSize(), PackMessageSize(type, context.size))
Austin Schuhfa30c352022-10-16 11:12:02 -07003219 << "log type " << static_cast<int>(type);
3220
3221 // Initialize the buffer to something nonzero to make sure all the padding
3222 // bytes are set to 0.
Austin Schuh48d10d62022-10-16 22:19:23 -07003223 std::vector<uint8_t> repacked_message(PackMessageSize(type, context.size),
3224 67);
Austin Schuhfa30c352022-10-16 11:12:02 -07003225
3226 // And verify packing inline works as expected.
Austin Schuh71a40d42023-02-04 21:22:22 -08003227 EXPECT_EQ(
3228 repacked_message.size(),
3229 PackMessageInline(repacked_message.data(), context, channel_index,
3230 type, 0u, repacked_message.size()));
Austin Schuhfa30c352022-10-16 11:12:02 -07003231 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
3232 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3233 fbb.GetBufferSpan().size()))
3234 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
3235 fbb.GetBufferSpan());
Austin Schuh71a40d42023-02-04 21:22:22 -08003236
3237 // Ok, now we want to confirm that we can build up arbitrary pieces of
3238 // said flatbuffer. Try all of them since it is cheap.
3239 TimeEventLoop event_loop;
3240 for (size_t i = 0; i < repacked_message.size(); i += 8) {
3241 for (size_t j = i; j < repacked_message.size(); j += 8) {
3242 std::vector<uint8_t> destination(repacked_message.size(), 67u);
3243 ContextDataCopier copier(context, channel_index, type, &event_loop);
3244
3245 copier.Copy(destination.data(), i, j);
3246
3247 size_t index = 0;
3248 for (size_t k = i; k < j; ++k) {
3249 ASSERT_EQ(destination[index], repacked_message[k])
3250 << ": Failed to match type " << static_cast<int>(type)
3251 << ", index " << index << " while testing range " << i << " to "
3252 << j;
3253 ;
3254 ++index;
3255 }
3256 // Now, confirm that none of the other bytes have been touched.
3257 for (; index < destination.size(); ++index) {
3258 ASSERT_EQ(destination[index], 67u);
3259 }
3260 }
3261 }
Austin Schuhfa30c352022-10-16 11:12:02 -07003262 }
3263 }
3264}
3265
Austin Schuhf2d0e682022-10-16 14:20:58 -07003266// Tests that all variations of PackMessage are equivilent to the inline
3267// PackMessage used to avoid allocations.
3268TEST_F(InlinePackMessage, RemoteEquivilent) {
3269 aos::FlatbufferVector<reflection::Schema> schema =
3270 FileToFlatbuffer<reflection::Schema>(
3271 ArtifactPath("aos/events/logging/logger.bfbs"));
3272 std::uniform_int_distribution<uint8_t> uint8_distribution(
3273 std::numeric_limits<uint8_t>::min(), std::numeric_limits<uint8_t>::max());
3274
3275 for (int i = 0; i < 100; ++i) {
3276 aos::SizePrefixedFlatbufferDetachedBuffer<RemoteMessage> random_msg =
3277 RandomRemoteMessage();
3278 const size_t channel_index = uint8_distribution(random_number_generator_);
3279 const monotonic_clock::time_point monotonic_timestamp_time =
3280 RandomMonotonic();
3281
3282 flatbuffers::FlatBufferBuilder fbb;
3283 fbb.ForceDefaults(true);
3284 fbb.FinishSizePrefixed(PackRemoteMessage(
3285 &fbb, &random_msg.message(), channel_index, monotonic_timestamp_time));
3286
3287 VLOG(1) << absl::BytesToHexString(std::string_view(
3288 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3289 fbb.GetBufferSpan().size()));
3290
3291 // Make sure that both the builder and inline method agree on sizes.
3292 ASSERT_EQ(fbb.GetSize(), PackRemoteMessageSize());
3293
3294 // Initialize the buffer to something nonzer to make sure all the padding
3295 // bytes are set to 0.
3296 std::vector<uint8_t> repacked_message(PackRemoteMessageSize(), 67);
3297
3298 // And verify packing inline works as expected.
Austin Schuh71a40d42023-02-04 21:22:22 -08003299 EXPECT_EQ(repacked_message.size(),
3300 PackRemoteMessageInline(
3301 repacked_message.data(), &random_msg.message(), channel_index,
3302 monotonic_timestamp_time, 0u, repacked_message.size()));
Austin Schuhf2d0e682022-10-16 14:20:58 -07003303 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
3304 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3305 fbb.GetBufferSpan().size()))
3306 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
3307 fbb.GetBufferSpan());
Austin Schuh71a40d42023-02-04 21:22:22 -08003308
3309 // Ok, now we want to confirm that we can build up arbitrary pieces of said
3310 // flatbuffer. Try all of them since it is cheap.
3311 TimeEventLoop event_loop;
3312 for (size_t i = 0; i < repacked_message.size(); i += 8) {
3313 for (size_t j = i; j < repacked_message.size(); j += 8) {
3314 std::vector<uint8_t> destination(repacked_message.size(), 67u);
3315 RemoteMessageCopier copier(&random_msg.message(), channel_index,
3316 monotonic_timestamp_time, &event_loop);
3317
3318 copier.Copy(destination.data(), i, j);
3319
3320 size_t index = 0;
3321 for (size_t k = i; k < j; ++k) {
3322 ASSERT_EQ(destination[index], repacked_message[k]);
3323 ++index;
3324 }
3325 for (; index < destination.size(); ++index) {
3326 ASSERT_EQ(destination[index], 67u);
3327 }
3328 }
3329 }
Austin Schuhf2d0e682022-10-16 14:20:58 -07003330 }
3331}
Austin Schuhfa30c352022-10-16 11:12:02 -07003332
Austin Schuhc243b422020-10-11 15:35:08 -07003333} // namespace testing
3334} // namespace logger
3335} // namespace aos