blob: e2dc8aa5c54e56418f5e960a5d7ccd103720a7ef [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
Alexei Strots01395492023-03-20 13:59:56 -07004#include <filesystem>
Austin Schuhfa30c352022-10-16 11:12:02 -07005#include <random>
Austin Schuhe243aaf2020-10-11 15:46:02 -07006#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07007
Austin Schuhfa30c352022-10-16 11:12:02 -07008#include "absl/strings/escaping.h"
Austin Schuhc41603c2020-10-11 16:17:37 -07009#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -070010#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -080011#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070012#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070013#include "aos/json_to_flatbuffer.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070014#include "aos/testing/path.h"
15#include "aos/testing/random_seed.h"
Austin Schuhc243b422020-10-11 15:35:08 -070016#include "aos/testing/tmpdir.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070017#include "aos/util/file.h"
18#include "external/com_github_google_flatbuffers/src/annotated_binary_text_gen.h"
19#include "external/com_github_google_flatbuffers/src/binary_annotator.h"
20#include "flatbuffers/reflection_generated.h"
Brian Smarttea913d42021-12-10 15:02:38 -080021#include "gflags/gflags.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070022#include "gtest/gtest.h"
Austin Schuhc243b422020-10-11 15:35:08 -070023
24namespace aos {
25namespace logger {
26namespace testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070027namespace chrono = std::chrono;
Austin Schuhf2d0e682022-10-16 14:20:58 -070028using aos::message_bridge::RemoteMessage;
Eric Schmiedebergb38477e2022-12-02 16:08:04 -070029using aos::testing::ArtifactPath;
Austin Schuhc243b422020-10-11 15:35:08 -070030
Austin Schuhd863e6e2022-10-16 15:44:50 -070031// Adapter class to make it easy to test DetachedBufferWriter without adding
32// test only boilerplate to DetachedBufferWriter.
Alexei Strots01395492023-03-20 13:59:56 -070033class TestDetachedBufferWriter : public DetachedBufferFileWriter {
Austin Schuhd863e6e2022-10-16 15:44:50 -070034 public:
Austin Schuh48d10d62022-10-16 22:19:23 -070035 // Pick a max size that is rather conservative.
36 static constexpr size_t kMaxMessageSize = 128 * 1024;
Austin Schuhd863e6e2022-10-16 15:44:50 -070037 TestDetachedBufferWriter(std::string_view filename)
Alexei Strots01395492023-03-20 13:59:56 -070038 : DetachedBufferFileWriter(
39 filename, std::make_unique<DummyEncoder>(kMaxMessageSize)) {}
Austin Schuhd863e6e2022-10-16 15:44:50 -070040 void WriteSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
41 QueueSpan(absl::Span<const uint8_t>(buffer.data(), buffer.size()));
42 }
Austin Schuh7ef11a42023-02-04 17:15:12 -080043 void QueueSpan(absl::Span<const uint8_t> buffer) {
44 DataEncoder::SpanCopier coppier(buffer);
45 CopyMessage(&coppier, monotonic_clock::now());
46 }
Austin Schuhd863e6e2022-10-16 15:44:50 -070047};
48
Austin Schuhe243aaf2020-10-11 15:46:02 -070049// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070050template <typename T>
51SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
52 const std::string_view data) {
53 flatbuffers::FlatBufferBuilder fbb;
54 fbb.ForceDefaults(true);
55 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
56 return fbb.Release();
57}
58
Austin Schuhe243aaf2020-10-11 15:46:02 -070059// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070060TEST(SpanReaderTest, ReadWrite) {
61 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
62 unlink(logfile.c_str());
63
64 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080065 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070066 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080067 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070068
69 {
Austin Schuhd863e6e2022-10-16 15:44:50 -070070 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080071 writer.QueueSpan(m1.span());
72 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070073 }
74
75 SpanReader reader(logfile);
76
77 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhcf5f6442021-07-06 10:43:28 -070078 EXPECT_EQ(reader.PeekMessage(), m1.span());
79 EXPECT_EQ(reader.PeekMessage(), m1.span());
Austin Schuhadd6eb32020-11-09 21:24:26 -080080 EXPECT_EQ(reader.ReadMessage(), m1.span());
81 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhcf5f6442021-07-06 10:43:28 -070082 EXPECT_EQ(reader.PeekMessage(), absl::Span<const uint8_t>());
Austin Schuhc243b422020-10-11 15:35:08 -070083 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
84}
85
Austin Schuhe243aaf2020-10-11 15:46:02 -070086// Tests that we can actually parse the resulting messages at a basic level
87// through MessageReader.
88TEST(MessageReaderTest, ReadWrite) {
89 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
90 unlink(logfile.c_str());
91
92 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
93 JsonToSizedFlatbuffer<LogFileHeader>(
94 R"({ "max_out_of_order_duration": 100000000 })");
95 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
96 JsonToSizedFlatbuffer<MessageHeader>(
97 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
98 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
99 JsonToSizedFlatbuffer<MessageHeader>(
100 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
101
102 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700103 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800104 writer.QueueSpan(config.span());
105 writer.QueueSpan(m1.span());
106 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -0700107 }
108
109 MessageReader reader(logfile);
110
111 EXPECT_EQ(reader.filename(), logfile);
112
113 EXPECT_EQ(
114 reader.max_out_of_order_duration(),
115 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
116 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
117 EXPECT_TRUE(reader.ReadMessage());
118 EXPECT_EQ(reader.newest_timestamp(),
119 monotonic_clock::time_point(chrono::nanoseconds(1)));
120 EXPECT_TRUE(reader.ReadMessage());
121 EXPECT_EQ(reader.newest_timestamp(),
122 monotonic_clock::time_point(chrono::nanoseconds(2)));
123 EXPECT_FALSE(reader.ReadMessage());
124}
125
Austin Schuh32f68492020-11-08 21:45:51 -0800126// Tests that we explode when messages are too far out of order.
127TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
128 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
129 unlink(logfile0.c_str());
130
131 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
132 JsonToSizedFlatbuffer<LogFileHeader>(
133 R"({
134 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800135 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800136 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
137 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
138 "parts_index": 0
139})");
140
141 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
142 JsonToSizedFlatbuffer<MessageHeader>(
143 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
144 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
145 JsonToSizedFlatbuffer<MessageHeader>(
146 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
147 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
148 JsonToSizedFlatbuffer<MessageHeader>(
149 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
150
151 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700152 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800153 writer.QueueSpan(config0.span());
154 writer.QueueSpan(m1.span());
155 writer.QueueSpan(m2.span());
156 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800157 }
Alexei Strots01395492023-03-20 13:59:56 -0700158 ASSERT_TRUE(std::filesystem::exists(logfile0)) << logfile0;
Austin Schuh32f68492020-11-08 21:45:51 -0800159
160 const std::vector<LogFile> parts = SortParts({logfile0});
161
162 PartsMessageReader reader(parts[0].parts[0]);
163
164 EXPECT_TRUE(reader.ReadMessage());
165 EXPECT_TRUE(reader.ReadMessage());
166 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
167}
168
Austin Schuhc41603c2020-10-11 16:17:37 -0700169// Tests that we can transparently re-assemble part files with a
170// PartsMessageReader.
171TEST(PartsMessageReaderTest, ReadWrite) {
172 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
173 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
174 unlink(logfile0.c_str());
175 unlink(logfile1.c_str());
176
177 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
178 JsonToSizedFlatbuffer<LogFileHeader>(
179 R"({
180 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800181 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700182 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
183 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
184 "parts_index": 0
185})");
186 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
187 JsonToSizedFlatbuffer<LogFileHeader>(
188 R"({
189 "max_out_of_order_duration": 200000000,
190 "monotonic_start_time": 0,
191 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800192 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700193 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
194 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
195 "parts_index": 1
196})");
197
198 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
199 JsonToSizedFlatbuffer<MessageHeader>(
200 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
201 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
202 JsonToSizedFlatbuffer<MessageHeader>(
203 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
204
205 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700206 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800207 writer.QueueSpan(config0.span());
208 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700209 }
210 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700211 TestDetachedBufferWriter writer(logfile1);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800212 writer.QueueSpan(config1.span());
213 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700214 }
215
216 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
217
218 PartsMessageReader reader(parts[0].parts[0]);
219
220 EXPECT_EQ(reader.filename(), logfile0);
221
222 // Confirm that the timestamps track, and the filename also updates.
223 // Read the first message.
224 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
225 EXPECT_EQ(
226 reader.max_out_of_order_duration(),
227 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
228 EXPECT_TRUE(reader.ReadMessage());
229 EXPECT_EQ(reader.filename(), logfile0);
230 EXPECT_EQ(reader.newest_timestamp(),
231 monotonic_clock::time_point(chrono::nanoseconds(1)));
232 EXPECT_EQ(
233 reader.max_out_of_order_duration(),
234 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
235
236 // Read the second message.
237 EXPECT_TRUE(reader.ReadMessage());
238 EXPECT_EQ(reader.filename(), logfile1);
239 EXPECT_EQ(reader.newest_timestamp(),
240 monotonic_clock::time_point(chrono::nanoseconds(2)));
241 EXPECT_EQ(
242 reader.max_out_of_order_duration(),
243 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
244
245 // And then confirm that reading again returns no message.
246 EXPECT_FALSE(reader.ReadMessage());
247 EXPECT_EQ(reader.filename(), logfile1);
248 EXPECT_EQ(
249 reader.max_out_of_order_duration(),
250 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800251 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Austin Schuhc41603c2020-10-11 16:17:37 -0700252}
Austin Schuh32f68492020-11-08 21:45:51 -0800253
Austin Schuh1be0ce42020-11-29 22:43:26 -0800254// Tests that Message's operator < works as expected.
255TEST(MessageTest, Sorting) {
256 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
257
258 Message m1{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700259 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700260 .timestamp =
261 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
Austin Schuh48507722021-07-17 17:29:24 -0700262 .monotonic_remote_boot = 0xffffff,
263 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700264 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800265 Message m2{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700266 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700267 .timestamp =
268 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
Austin Schuh48507722021-07-17 17:29:24 -0700269 .monotonic_remote_boot = 0xffffff,
270 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700271 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800272
273 EXPECT_LT(m1, m2);
274 EXPECT_GE(m2, m1);
275
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700276 m1.timestamp.time = e;
277 m2.timestamp.time = e;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800278
279 m1.channel_index = 1;
280 m2.channel_index = 2;
281
282 EXPECT_LT(m1, m2);
283 EXPECT_GE(m2, m1);
284
285 m1.channel_index = 0;
286 m2.channel_index = 0;
Austin Schuh58646e22021-08-23 23:51:46 -0700287 m1.queue_index.index = 0u;
288 m2.queue_index.index = 1u;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800289
290 EXPECT_LT(m1, m2);
291 EXPECT_GE(m2, m1);
292}
293
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800294aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
295 const aos::FlatbufferDetachedBuffer<Configuration> &config,
296 const std::string_view json) {
297 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700298 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800299 flatbuffers::Offset<Configuration> config_offset =
300 aos::CopyFlatBuffer(config, &fbb);
301 LogFileHeader::Builder header_builder(fbb);
302 header_builder.add_configuration(config_offset);
303 fbb.Finish(header_builder.Finish());
304 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
305
306 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
307 JsonToFlatbuffer<LogFileHeader>(json));
308 CHECK(header_updates.Verify());
309 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700310 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800311 fbb2.FinishSizePrefixed(
312 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
313 return fbb2.Release();
314}
315
316class SortingElementTest : public ::testing::Test {
317 public:
318 SortingElementTest()
319 : config_(JsonToFlatbuffer<Configuration>(
320 R"({
321 "channels": [
322 {
323 "name": "/a",
324 "type": "aos.logger.testing.TestMessage",
325 "source_node": "pi1",
326 "destination_nodes": [
327 {
328 "name": "pi2"
329 },
330 {
331 "name": "pi3"
332 }
333 ]
334 },
335 {
336 "name": "/b",
337 "type": "aos.logger.testing.TestMessage",
338 "source_node": "pi1"
339 },
340 {
341 "name": "/c",
342 "type": "aos.logger.testing.TestMessage",
343 "source_node": "pi1"
Austin Schuh48507722021-07-17 17:29:24 -0700344 },
345 {
346 "name": "/d",
347 "type": "aos.logger.testing.TestMessage",
348 "source_node": "pi2",
349 "destination_nodes": [
350 {
351 "name": "pi1"
352 }
353 ]
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800354 }
355 ],
356 "nodes": [
357 {
358 "name": "pi1"
359 },
360 {
361 "name": "pi2"
362 },
363 {
364 "name": "pi3"
365 }
366 ]
367}
368)")),
369 config0_(MakeHeader(config_, R"({
370 /* 100ms */
371 "max_out_of_order_duration": 100000000,
372 "node": {
373 "name": "pi1"
374 },
375 "logger_node": {
376 "name": "pi1"
377 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800378 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800379 "realtime_start_time": 1000000000000,
380 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700381 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
382 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
383 "boot_uuids": [
384 "1d782c63-b3c7-466e-bea9-a01308b43333",
385 "",
386 ""
387 ],
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800388 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
389 "parts_index": 0
390})")),
391 config1_(MakeHeader(config_,
392 R"({
393 /* 100ms */
394 "max_out_of_order_duration": 100000000,
395 "node": {
396 "name": "pi1"
397 },
398 "logger_node": {
399 "name": "pi1"
400 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800401 "monotonic_start_time": 1000000,
402 "realtime_start_time": 1000000000000,
403 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700404 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
405 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
406 "boot_uuids": [
407 "1d782c63-b3c7-466e-bea9-a01308b43333",
408 "",
409 ""
410 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800411 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
412 "parts_index": 0
413})")),
414 config2_(MakeHeader(config_,
415 R"({
416 /* 100ms */
417 "max_out_of_order_duration": 100000000,
418 "node": {
419 "name": "pi2"
420 },
421 "logger_node": {
422 "name": "pi2"
423 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800424 "monotonic_start_time": 0,
425 "realtime_start_time": 1000000000000,
Austin Schuh48507722021-07-17 17:29:24 -0700426 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
427 "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
428 "boot_uuids": [
429 "",
430 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
431 ""
432 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800433 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
434 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
435 "parts_index": 0
436})")),
437 config3_(MakeHeader(config_,
438 R"({
439 /* 100ms */
440 "max_out_of_order_duration": 100000000,
441 "node": {
442 "name": "pi1"
443 },
444 "logger_node": {
445 "name": "pi1"
446 },
447 "monotonic_start_time": 2000000,
448 "realtime_start_time": 1000000000,
449 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
Austin Schuh48507722021-07-17 17:29:24 -0700450 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
451 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
452 "boot_uuids": [
453 "1d782c63-b3c7-466e-bea9-a01308b43333",
454 "",
455 ""
456 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800457 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800458 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800459})")),
460 config4_(MakeHeader(config_,
461 R"({
462 /* 100ms */
463 "max_out_of_order_duration": 100000000,
464 "node": {
465 "name": "pi2"
466 },
467 "logger_node": {
468 "name": "pi1"
469 },
470 "monotonic_start_time": 2000000,
471 "realtime_start_time": 1000000000,
472 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
473 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
Austin Schuh48507722021-07-17 17:29:24 -0700474 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
475 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
476 "boot_uuids": [
477 "1d782c63-b3c7-466e-bea9-a01308b43333",
478 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
479 ""
480 ],
Austin Schuh8bf1e632021-01-02 22:41:04 -0800481 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800482})")) {
483 unlink(logfile0_.c_str());
484 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800485 unlink(logfile2_.c_str());
Austin Schuh48507722021-07-17 17:29:24 -0700486 unlink(logfile3_.c_str());
Austin Schuh921b2562021-07-31 19:37:03 -0700487 queue_index_.resize(config_.message().channels()->size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800488 }
489
490 protected:
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800491 flatbuffers::DetachedBuffer MakeLogMessage(
492 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
493 int value) {
494 flatbuffers::FlatBufferBuilder message_fbb;
495 message_fbb.ForceDefaults(true);
496 TestMessage::Builder test_message_builder(message_fbb);
497 test_message_builder.add_value(value);
498 message_fbb.Finish(test_message_builder.Finish());
499
500 aos::Context context;
501 context.monotonic_event_time = monotonic_now;
502 context.realtime_event_time = aos::realtime_clock::epoch() +
503 chrono::seconds(1000) +
504 monotonic_now.time_since_epoch();
Austin Schuh921b2562021-07-31 19:37:03 -0700505 CHECK_LT(static_cast<size_t>(channel_index), queue_index_.size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800506 context.queue_index = queue_index_[channel_index];
507 context.size = message_fbb.GetSize();
508 context.data = message_fbb.GetBufferPointer();
509
510 ++queue_index_[channel_index];
511
512 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh53e3a812021-11-03 16:57:23 -0700513 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800514 fbb.FinishSizePrefixed(
515 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
516
517 return fbb.Release();
518 }
519
520 flatbuffers::DetachedBuffer MakeTimestampMessage(
521 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800522 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
523 monotonic_clock::time_point monotonic_timestamp_time =
524 monotonic_clock::min_time) {
525 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800526 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800527
528 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800529 fbb.ForceDefaults(true);
530
531 logger::MessageHeader::Builder message_header_builder(fbb);
532
533 message_header_builder.add_channel_index(channel_index);
534
535 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
536 100);
537 message_header_builder.add_monotonic_sent_time(
538 monotonic_sent_time.time_since_epoch().count());
539 message_header_builder.add_realtime_sent_time(
540 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
541 monotonic_sent_time.time_since_epoch())
542 .time_since_epoch()
543 .count());
544
545 message_header_builder.add_monotonic_remote_time(
546 sender_monotonic_now.time_since_epoch().count());
547 message_header_builder.add_realtime_remote_time(
548 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
549 sender_monotonic_now.time_since_epoch())
550 .time_since_epoch()
551 .count());
552 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
553 1);
554
555 if (monotonic_timestamp_time != monotonic_clock::min_time) {
556 message_header_builder.add_monotonic_timestamp_time(
557 monotonic_timestamp_time.time_since_epoch().count());
558 }
559
560 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800561 LOG(INFO) << aos::FlatbufferToJson(
562 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
563 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
564
565 return fbb.Release();
566 }
567
568 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
569 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800570 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh48507722021-07-17 17:29:24 -0700571 const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800572
573 const aos::FlatbufferDetachedBuffer<Configuration> config_;
574 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
575 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800576 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
577 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800578 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800579
580 std::vector<uint32_t> queue_index_;
581};
582
583using LogPartsSorterTest = SortingElementTest;
584using LogPartsSorterDeathTest = LogPartsSorterTest;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800585using NodeMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800586using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800587
588// Tests that we can pull messages out of a log sorted in order.
589TEST_F(LogPartsSorterTest, Pull) {
590 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
591 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700592 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800593 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700594 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800595 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700596 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800597 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700598 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800599 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700600 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800601 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
602 }
603
604 const std::vector<LogFile> parts = SortParts({logfile0_});
605
606 LogPartsSorter parts_sorter(parts[0].parts[0]);
607
608 // Confirm we aren't sorted until any time until the message is popped.
609 // Peeking shouldn't change the sorted until time.
610 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
611
612 std::deque<Message> output;
613
614 ASSERT_TRUE(parts_sorter.Front() != nullptr);
615 output.emplace_back(std::move(*parts_sorter.Front()));
616 parts_sorter.PopFront();
617 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
618
619 ASSERT_TRUE(parts_sorter.Front() != nullptr);
620 output.emplace_back(std::move(*parts_sorter.Front()));
621 parts_sorter.PopFront();
622 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
623
624 ASSERT_TRUE(parts_sorter.Front() != nullptr);
625 output.emplace_back(std::move(*parts_sorter.Front()));
626 parts_sorter.PopFront();
627 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
628
629 ASSERT_TRUE(parts_sorter.Front() != nullptr);
630 output.emplace_back(std::move(*parts_sorter.Front()));
631 parts_sorter.PopFront();
632 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
633
634 ASSERT_TRUE(parts_sorter.Front() == nullptr);
635
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700636 EXPECT_EQ(output[0].timestamp.boot, 0);
637 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
638 EXPECT_EQ(output[1].timestamp.boot, 0);
639 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1000));
640 EXPECT_EQ(output[2].timestamp.boot, 0);
641 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1901));
642 EXPECT_EQ(output[3].timestamp.boot, 0);
643 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800644}
645
Austin Schuhb000de62020-12-03 22:00:40 -0800646// Tests that we can pull messages out of a log sorted in order.
647TEST_F(LogPartsSorterTest, WayBeforeStart) {
648 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
649 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700650 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhb000de62020-12-03 22:00:40 -0800651 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700652 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800653 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700654 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800655 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700656 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800657 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700658 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800659 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700660 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800661 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
662 }
663
664 const std::vector<LogFile> parts = SortParts({logfile0_});
665
666 LogPartsSorter parts_sorter(parts[0].parts[0]);
667
668 // Confirm we aren't sorted until any time until the message is popped.
669 // Peeking shouldn't change the sorted until time.
670 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
671
672 std::deque<Message> output;
673
674 for (monotonic_clock::time_point t :
675 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
676 e + chrono::milliseconds(1900), monotonic_clock::max_time,
677 monotonic_clock::max_time}) {
678 ASSERT_TRUE(parts_sorter.Front() != nullptr);
679 output.emplace_back(std::move(*parts_sorter.Front()));
680 parts_sorter.PopFront();
681 EXPECT_EQ(parts_sorter.sorted_until(), t);
682 }
683
684 ASSERT_TRUE(parts_sorter.Front() == nullptr);
685
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700686 EXPECT_EQ(output[0].timestamp.boot, 0u);
687 EXPECT_EQ(output[0].timestamp.time, e - chrono::milliseconds(1000));
688 EXPECT_EQ(output[1].timestamp.boot, 0u);
689 EXPECT_EQ(output[1].timestamp.time, e - chrono::milliseconds(500));
690 EXPECT_EQ(output[2].timestamp.boot, 0u);
691 EXPECT_EQ(output[2].timestamp.time, e - chrono::milliseconds(10));
692 EXPECT_EQ(output[3].timestamp.boot, 0u);
693 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(1901));
694 EXPECT_EQ(output[4].timestamp.boot, 0u);
695 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuhb000de62020-12-03 22:00:40 -0800696}
697
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800698// Tests that messages too far out of order trigger death.
699TEST_F(LogPartsSorterDeathTest, Pull) {
700 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
701 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700702 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800703 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700704 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800705 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700706 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800707 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700708 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800709 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
710 // The following message is too far out of order and will trigger the CHECK.
Austin Schuhd863e6e2022-10-16 15:44:50 -0700711 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800712 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
713 }
714
715 const std::vector<LogFile> parts = SortParts({logfile0_});
716
717 LogPartsSorter parts_sorter(parts[0].parts[0]);
718
719 // Confirm we aren't sorted until any time until the message is popped.
720 // Peeking shouldn't change the sorted until time.
721 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
722 std::deque<Message> output;
723
724 ASSERT_TRUE(parts_sorter.Front() != nullptr);
725 parts_sorter.PopFront();
726 ASSERT_TRUE(parts_sorter.Front() != nullptr);
727 ASSERT_TRUE(parts_sorter.Front() != nullptr);
728 parts_sorter.PopFront();
729
Austin Schuh58646e22021-08-23 23:51:46 -0700730 EXPECT_DEATH({ parts_sorter.Front(); },
731 "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800732}
733
Austin Schuh8f52ed52020-11-30 23:12:39 -0800734// Tests that we can merge data from 2 separate files, including duplicate data.
735TEST_F(NodeMergerTest, TwoFileMerger) {
736 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
737 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700738 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800739 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700740 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800741 writer1.QueueSpan(config1_.span());
742
Austin Schuhd863e6e2022-10-16 15:44:50 -0700743 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800744 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700745 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800746 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
747
Austin Schuhd863e6e2022-10-16 15:44:50 -0700748 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800749 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700750 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800751 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
752
753 // Make a duplicate!
754 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
755 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
756 writer0.QueueSpan(msg.span());
757 writer1.QueueSpan(msg.span());
758
Austin Schuhd863e6e2022-10-16 15:44:50 -0700759 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800760 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
761 }
762
763 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Austin Schuhd2f96102020-12-01 20:27:29 -0800764 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800765
Austin Schuhd2f96102020-12-01 20:27:29 -0800766 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800767
768 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
769
770 std::deque<Message> output;
771
772 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
773 ASSERT_TRUE(merger.Front() != nullptr);
774 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
775
776 output.emplace_back(std::move(*merger.Front()));
777 merger.PopFront();
778 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
779
780 ASSERT_TRUE(merger.Front() != nullptr);
781 output.emplace_back(std::move(*merger.Front()));
782 merger.PopFront();
783 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
784
785 ASSERT_TRUE(merger.Front() != nullptr);
786 output.emplace_back(std::move(*merger.Front()));
787 merger.PopFront();
788 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
789
790 ASSERT_TRUE(merger.Front() != nullptr);
791 output.emplace_back(std::move(*merger.Front()));
792 merger.PopFront();
793 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
794
795 ASSERT_TRUE(merger.Front() != nullptr);
796 output.emplace_back(std::move(*merger.Front()));
797 merger.PopFront();
798 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
799
800 ASSERT_TRUE(merger.Front() != nullptr);
801 output.emplace_back(std::move(*merger.Front()));
802 merger.PopFront();
803 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
804
805 ASSERT_TRUE(merger.Front() == nullptr);
806
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700807 EXPECT_EQ(output[0].timestamp.boot, 0u);
808 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
809 EXPECT_EQ(output[1].timestamp.boot, 0u);
810 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1001));
811 EXPECT_EQ(output[2].timestamp.boot, 0u);
812 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1002));
813 EXPECT_EQ(output[3].timestamp.boot, 0u);
814 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
815 EXPECT_EQ(output[4].timestamp.boot, 0u);
816 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(3000));
817 EXPECT_EQ(output[5].timestamp.boot, 0u);
818 EXPECT_EQ(output[5].timestamp.time, e + chrono::milliseconds(3002));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800819}
820
Austin Schuh8bf1e632021-01-02 22:41:04 -0800821// Tests that we can merge timestamps with various combinations of
822// monotonic_timestamp_time.
823TEST_F(NodeMergerTest, TwoFileTimestampMerger) {
824 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
825 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700826 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800827 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700828 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800829 writer1.QueueSpan(config1_.span());
830
831 // Neither has it.
832 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700833 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800834 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700835 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800836 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
837
838 // First only has it.
839 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700840 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800841 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
842 e + chrono::nanoseconds(971)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700843 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800844 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
845
846 // Second only has it.
847 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700848 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800849 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700850 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800851 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
852 e + chrono::nanoseconds(972)));
853
854 // Both have it.
855 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700856 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800857 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
858 e + chrono::nanoseconds(973)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700859 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800860 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
861 e + chrono::nanoseconds(973)));
862 }
863
864 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
865 ASSERT_EQ(parts.size(), 1u);
866
867 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
868
869 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
870
871 std::deque<Message> output;
872
873 for (int i = 0; i < 4; ++i) {
874 ASSERT_TRUE(merger.Front() != nullptr);
875 output.emplace_back(std::move(*merger.Front()));
876 merger.PopFront();
877 }
878 ASSERT_TRUE(merger.Front() == nullptr);
879
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700880 EXPECT_EQ(output[0].timestamp.boot, 0u);
881 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700882 EXPECT_FALSE(output[0].data->has_monotonic_timestamp_time);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700883
884 EXPECT_EQ(output[1].timestamp.boot, 0u);
885 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700886 EXPECT_TRUE(output[1].data->has_monotonic_timestamp_time);
887 EXPECT_EQ(output[1].data->monotonic_timestamp_time,
888 monotonic_clock::time_point(std::chrono::nanoseconds(971)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700889
890 EXPECT_EQ(output[2].timestamp.boot, 0u);
891 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700892 EXPECT_TRUE(output[2].data->has_monotonic_timestamp_time);
893 EXPECT_EQ(output[2].data->monotonic_timestamp_time,
894 monotonic_clock::time_point(std::chrono::nanoseconds(972)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700895
896 EXPECT_EQ(output[3].timestamp.boot, 0u);
897 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700898 EXPECT_TRUE(output[3].data->has_monotonic_timestamp_time);
899 EXPECT_EQ(output[3].data->monotonic_timestamp_time,
900 monotonic_clock::time_point(std::chrono::nanoseconds(973)));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800901}
902
Austin Schuhd2f96102020-12-01 20:27:29 -0800903// Tests that we can match timestamps on delivered messages.
904TEST_F(TimestampMapperTest, ReadNode0First) {
905 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
906 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700907 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800908 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700909 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800910 writer1.QueueSpan(config2_.span());
911
Austin Schuhd863e6e2022-10-16 15:44:50 -0700912 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800913 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700914 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800915 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
916
Austin Schuhd863e6e2022-10-16 15:44:50 -0700917 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800918 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700919 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800920 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
921
Austin Schuhd863e6e2022-10-16 15:44:50 -0700922 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800923 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700924 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800925 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
926 }
927
928 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
929
930 ASSERT_EQ(parts[0].logger_node, "pi1");
931 ASSERT_EQ(parts[1].logger_node, "pi2");
932
Austin Schuh79b30942021-01-24 22:32:21 -0800933 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800934 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -0800935 mapper0.set_timestamp_callback(
936 [&](TimestampedMessage *) { ++mapper0_count; });
937 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800938 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -0800939 mapper1.set_timestamp_callback(
940 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -0800941
942 mapper0.AddPeer(&mapper1);
943 mapper1.AddPeer(&mapper0);
944
945 {
946 std::deque<TimestampedMessage> output0;
947
Austin Schuh79b30942021-01-24 22:32:21 -0800948 EXPECT_EQ(mapper0_count, 0u);
949 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800950 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800951 EXPECT_EQ(mapper0_count, 1u);
952 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800953 output0.emplace_back(std::move(*mapper0.Front()));
954 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700955 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800956 EXPECT_EQ(mapper0_count, 1u);
957 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800958
959 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800960 EXPECT_EQ(mapper0_count, 2u);
961 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800962 output0.emplace_back(std::move(*mapper0.Front()));
963 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700964 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800965
966 ASSERT_TRUE(mapper0.Front() != nullptr);
967 output0.emplace_back(std::move(*mapper0.Front()));
968 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700969 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800970
Austin Schuh79b30942021-01-24 22:32:21 -0800971 EXPECT_EQ(mapper0_count, 3u);
972 EXPECT_EQ(mapper1_count, 0u);
973
Austin Schuhd2f96102020-12-01 20:27:29 -0800974 ASSERT_TRUE(mapper0.Front() == nullptr);
975
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700976 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
977 EXPECT_EQ(output0[0].monotonic_event_time.time,
978 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700979 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700980
981 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
982 EXPECT_EQ(output0[1].monotonic_event_time.time,
983 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700984 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700985
986 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
987 EXPECT_EQ(output0[2].monotonic_event_time.time,
988 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700989 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -0800990 }
991
992 {
993 SCOPED_TRACE("Trying node1 now");
994 std::deque<TimestampedMessage> output1;
995
Austin Schuh79b30942021-01-24 22:32:21 -0800996 EXPECT_EQ(mapper0_count, 3u);
997 EXPECT_EQ(mapper1_count, 0u);
998
Austin Schuhd2f96102020-12-01 20:27:29 -0800999 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001000 EXPECT_EQ(mapper0_count, 3u);
1001 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001002 output1.emplace_back(std::move(*mapper1.Front()));
1003 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001004 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -08001005 EXPECT_EQ(mapper0_count, 3u);
1006 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001007
1008 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001009 EXPECT_EQ(mapper0_count, 3u);
1010 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001011 output1.emplace_back(std::move(*mapper1.Front()));
1012 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001013 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001014
1015 ASSERT_TRUE(mapper1.Front() != nullptr);
1016 output1.emplace_back(std::move(*mapper1.Front()));
1017 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001018 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001019
Austin Schuh79b30942021-01-24 22:32:21 -08001020 EXPECT_EQ(mapper0_count, 3u);
1021 EXPECT_EQ(mapper1_count, 3u);
1022
Austin Schuhd2f96102020-12-01 20:27:29 -08001023 ASSERT_TRUE(mapper1.Front() == nullptr);
1024
Austin Schuh79b30942021-01-24 22:32:21 -08001025 EXPECT_EQ(mapper0_count, 3u);
1026 EXPECT_EQ(mapper1_count, 3u);
1027
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001028 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1029 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001030 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001031 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001032
1033 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1034 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001035 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001036 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001037
1038 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1039 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001040 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001041 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001042 }
1043}
1044
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001045// Tests that we filter messages using the channel filter callback
1046TEST_F(TimestampMapperTest, ReplayChannelsCallbackTest) {
1047 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1048 {
1049 TestDetachedBufferWriter writer0(logfile0_);
1050 writer0.QueueSpan(config0_.span());
1051 TestDetachedBufferWriter writer1(logfile1_);
1052 writer1.QueueSpan(config2_.span());
1053
1054 writer0.WriteSizedFlatbuffer(
1055 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1056 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1057 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1058
1059 writer0.WriteSizedFlatbuffer(
1060 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1061 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1062 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1063
1064 writer0.WriteSizedFlatbuffer(
1065 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1066 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1067 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1068 }
1069
1070 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1071
1072 ASSERT_EQ(parts[0].logger_node, "pi1");
1073 ASSERT_EQ(parts[1].logger_node, "pi2");
1074
1075 // mapper0 will not provide any messages while mapper1 will provide all
1076 // messages due to the channel filter callbacks used
1077 size_t mapper0_count = 0;
1078 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1079 mapper0.set_timestamp_callback(
1080 [&](TimestampedMessage *) { ++mapper0_count; });
1081 mapper0.set_replay_channels_callback(
1082 [&](const TimestampedMessage &) -> bool { return mapper0_count != 2; });
1083 size_t mapper1_count = 0;
1084 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1085 mapper1.set_timestamp_callback(
1086 [&](TimestampedMessage *) { ++mapper1_count; });
1087 mapper1.set_replay_channels_callback(
1088 [&](const TimestampedMessage &) -> bool { return mapper1_count != 2; });
1089
1090 mapper0.AddPeer(&mapper1);
1091 mapper1.AddPeer(&mapper0);
1092
1093 {
1094 std::deque<TimestampedMessage> output0;
1095
1096 EXPECT_EQ(mapper0_count, 0u);
1097 EXPECT_EQ(mapper1_count, 0u);
1098
1099 ASSERT_TRUE(mapper0.Front() != nullptr);
1100 EXPECT_EQ(mapper0_count, 1u);
1101 EXPECT_EQ(mapper1_count, 0u);
1102 output0.emplace_back(std::move(*mapper0.Front()));
1103 mapper0.PopFront();
1104
1105 EXPECT_TRUE(mapper0.started());
1106 EXPECT_EQ(mapper0_count, 1u);
1107 EXPECT_EQ(mapper1_count, 0u);
1108
1109 // mapper0_count is now at 3 since the second message is not queued, but
1110 // timestamp_callback needs to be called everytime even if Front() does not
1111 // provide a message due to the replay_channels_callback.
1112 ASSERT_TRUE(mapper0.Front() != nullptr);
1113 EXPECT_EQ(mapper0_count, 3u);
1114 EXPECT_EQ(mapper1_count, 0u);
1115 output0.emplace_back(std::move(*mapper0.Front()));
1116 mapper0.PopFront();
1117
1118 EXPECT_TRUE(mapper0.started());
1119 EXPECT_EQ(mapper0_count, 3u);
1120 EXPECT_EQ(mapper1_count, 0u);
1121
1122 ASSERT_TRUE(mapper0.Front() == nullptr);
1123 EXPECT_TRUE(mapper0.started());
1124
1125 EXPECT_EQ(mapper0_count, 3u);
1126 EXPECT_EQ(mapper1_count, 0u);
1127
1128 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1129 EXPECT_EQ(output0[0].monotonic_event_time.time,
1130 e + chrono::milliseconds(1000));
1131 EXPECT_TRUE(output0[0].data != nullptr);
1132
1133 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1134 EXPECT_EQ(output0[1].monotonic_event_time.time,
1135 e + chrono::milliseconds(3000));
1136 EXPECT_TRUE(output0[1].data != nullptr);
1137 }
1138
1139 {
1140 SCOPED_TRACE("Trying node1 now");
1141 std::deque<TimestampedMessage> output1;
1142
1143 EXPECT_EQ(mapper0_count, 3u);
1144 EXPECT_EQ(mapper1_count, 0u);
1145
1146 ASSERT_TRUE(mapper1.Front() != nullptr);
1147 EXPECT_EQ(mapper0_count, 3u);
1148 EXPECT_EQ(mapper1_count, 1u);
1149 output1.emplace_back(std::move(*mapper1.Front()));
1150 mapper1.PopFront();
1151 EXPECT_TRUE(mapper1.started());
1152 EXPECT_EQ(mapper0_count, 3u);
1153 EXPECT_EQ(mapper1_count, 1u);
1154
1155 // mapper1_count is now at 3 since the second message is not queued, but
1156 // timestamp_callback needs to be called everytime even if Front() does not
1157 // provide a message due to the replay_channels_callback.
1158 ASSERT_TRUE(mapper1.Front() != nullptr);
1159 output1.emplace_back(std::move(*mapper1.Front()));
1160 mapper1.PopFront();
1161 EXPECT_TRUE(mapper1.started());
1162
1163 EXPECT_EQ(mapper0_count, 3u);
1164 EXPECT_EQ(mapper1_count, 3u);
1165
1166 ASSERT_TRUE(mapper1.Front() == nullptr);
1167
1168 EXPECT_EQ(mapper0_count, 3u);
1169 EXPECT_EQ(mapper1_count, 3u);
1170
1171 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1172 EXPECT_EQ(output1[0].monotonic_event_time.time,
1173 e + chrono::seconds(100) + chrono::milliseconds(1000));
1174 EXPECT_TRUE(output1[0].data != nullptr);
1175
1176 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1177 EXPECT_EQ(output1[1].monotonic_event_time.time,
1178 e + chrono::seconds(100) + chrono::milliseconds(3000));
1179 EXPECT_TRUE(output1[1].data != nullptr);
1180 }
1181}
Austin Schuh8bf1e632021-01-02 22:41:04 -08001182// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
1183// returned.
1184TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
1185 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1186 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001187 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001188 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001189 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001190 writer1.QueueSpan(config4_.span());
1191
Austin Schuhd863e6e2022-10-16 15:44:50 -07001192 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001193 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001194 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001195 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
1196 e + chrono::nanoseconds(971)));
1197
Austin Schuhd863e6e2022-10-16 15:44:50 -07001198 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001199 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001200 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001201 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
1202 e + chrono::nanoseconds(5458)));
1203
Austin Schuhd863e6e2022-10-16 15:44:50 -07001204 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001205 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001206 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001207 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1208 }
1209
1210 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1211
1212 for (const auto &p : parts) {
1213 LOG(INFO) << p;
1214 }
1215
1216 ASSERT_EQ(parts.size(), 1u);
1217
Austin Schuh79b30942021-01-24 22:32:21 -08001218 size_t mapper0_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001219 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001220 mapper0.set_timestamp_callback(
1221 [&](TimestampedMessage *) { ++mapper0_count; });
1222 size_t mapper1_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001223 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001224 mapper1.set_timestamp_callback(
1225 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -08001226
1227 mapper0.AddPeer(&mapper1);
1228 mapper1.AddPeer(&mapper0);
1229
1230 {
1231 std::deque<TimestampedMessage> output0;
1232
1233 for (int i = 0; i < 3; ++i) {
1234 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
1235 output0.emplace_back(std::move(*mapper0.Front()));
1236 mapper0.PopFront();
1237 }
1238
1239 ASSERT_TRUE(mapper0.Front() == nullptr);
1240
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001241 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1242 EXPECT_EQ(output0[0].monotonic_event_time.time,
1243 e + chrono::milliseconds(1000));
1244 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
1245 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
1246 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001247 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001248
1249 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1250 EXPECT_EQ(output0[1].monotonic_event_time.time,
1251 e + chrono::milliseconds(2000));
1252 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
1253 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
1254 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001255 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001256
1257 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1258 EXPECT_EQ(output0[2].monotonic_event_time.time,
1259 e + chrono::milliseconds(3000));
1260 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
1261 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
1262 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001263 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001264 }
1265
1266 {
1267 SCOPED_TRACE("Trying node1 now");
1268 std::deque<TimestampedMessage> output1;
1269
1270 for (int i = 0; i < 3; ++i) {
1271 ASSERT_TRUE(mapper1.Front() != nullptr);
1272 output1.emplace_back(std::move(*mapper1.Front()));
1273 mapper1.PopFront();
1274 }
1275
1276 ASSERT_TRUE(mapper1.Front() == nullptr);
1277
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001278 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1279 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001280 e + chrono::seconds(100) + chrono::milliseconds(1000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001281 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
1282 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001283 e + chrono::nanoseconds(971));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001284 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001285
1286 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1287 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001288 e + chrono::seconds(100) + chrono::milliseconds(2000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001289 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
1290 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001291 e + chrono::nanoseconds(5458));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001292 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001293
1294 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1295 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001296 e + chrono::seconds(100) + chrono::milliseconds(3000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001297 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
1298 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
1299 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001300 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001301 }
Austin Schuh79b30942021-01-24 22:32:21 -08001302
1303 EXPECT_EQ(mapper0_count, 3u);
1304 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001305}
1306
Austin Schuhd2f96102020-12-01 20:27:29 -08001307// Tests that we can match timestamps on delivered messages. By doing this in
1308// the reverse order, the second node needs to queue data up from the first node
1309// to find the matching timestamp.
1310TEST_F(TimestampMapperTest, ReadNode1First) {
1311 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1312 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001313 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001314 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001315 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001316 writer1.QueueSpan(config2_.span());
1317
Austin Schuhd863e6e2022-10-16 15:44:50 -07001318 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001319 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001320 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001321 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1322
Austin Schuhd863e6e2022-10-16 15:44:50 -07001323 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001324 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001325 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001326 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1327
Austin Schuhd863e6e2022-10-16 15:44:50 -07001328 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001329 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001330 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001331 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1332 }
1333
1334 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1335
1336 ASSERT_EQ(parts[0].logger_node, "pi1");
1337 ASSERT_EQ(parts[1].logger_node, "pi2");
1338
Austin Schuh79b30942021-01-24 22:32:21 -08001339 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001340 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001341 mapper0.set_timestamp_callback(
1342 [&](TimestampedMessage *) { ++mapper0_count; });
1343 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001344 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001345 mapper1.set_timestamp_callback(
1346 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001347
1348 mapper0.AddPeer(&mapper1);
1349 mapper1.AddPeer(&mapper0);
1350
1351 {
1352 SCOPED_TRACE("Trying node1 now");
1353 std::deque<TimestampedMessage> output1;
1354
1355 ASSERT_TRUE(mapper1.Front() != nullptr);
1356 output1.emplace_back(std::move(*mapper1.Front()));
1357 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001358 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001359
1360 ASSERT_TRUE(mapper1.Front() != nullptr);
1361 output1.emplace_back(std::move(*mapper1.Front()));
1362 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001363 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001364
1365 ASSERT_TRUE(mapper1.Front() != nullptr);
1366 output1.emplace_back(std::move(*mapper1.Front()));
1367 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001368 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001369
1370 ASSERT_TRUE(mapper1.Front() == nullptr);
1371
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001372 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1373 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001374 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001375 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001376
1377 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1378 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001379 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001380 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001381
1382 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1383 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001384 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001385 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001386 }
1387
1388 {
1389 std::deque<TimestampedMessage> output0;
1390
1391 ASSERT_TRUE(mapper0.Front() != nullptr);
1392 output0.emplace_back(std::move(*mapper0.Front()));
1393 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001394 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001395
1396 ASSERT_TRUE(mapper0.Front() != nullptr);
1397 output0.emplace_back(std::move(*mapper0.Front()));
1398 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001399 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001400
1401 ASSERT_TRUE(mapper0.Front() != nullptr);
1402 output0.emplace_back(std::move(*mapper0.Front()));
1403 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001404 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001405
1406 ASSERT_TRUE(mapper0.Front() == nullptr);
1407
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001408 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1409 EXPECT_EQ(output0[0].monotonic_event_time.time,
1410 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001411 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001412
1413 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1414 EXPECT_EQ(output0[1].monotonic_event_time.time,
1415 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001416 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001417
1418 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1419 EXPECT_EQ(output0[2].monotonic_event_time.time,
1420 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001421 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001422 }
Austin Schuh79b30942021-01-24 22:32:21 -08001423
1424 EXPECT_EQ(mapper0_count, 3u);
1425 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001426}
1427
1428// Tests that we return just the timestamps if we couldn't find the data and the
1429// missing data was at the beginning of the file.
1430TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1431 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1432 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001433 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001434 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001435 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001436 writer1.QueueSpan(config2_.span());
1437
1438 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001439 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001440 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1441
Austin Schuhd863e6e2022-10-16 15:44:50 -07001442 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001443 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001444 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001445 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1446
Austin Schuhd863e6e2022-10-16 15:44:50 -07001447 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001448 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001449 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001450 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1451 }
1452
1453 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1454
1455 ASSERT_EQ(parts[0].logger_node, "pi1");
1456 ASSERT_EQ(parts[1].logger_node, "pi2");
1457
Austin Schuh79b30942021-01-24 22:32:21 -08001458 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001459 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001460 mapper0.set_timestamp_callback(
1461 [&](TimestampedMessage *) { ++mapper0_count; });
1462 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001463 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001464 mapper1.set_timestamp_callback(
1465 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001466
1467 mapper0.AddPeer(&mapper1);
1468 mapper1.AddPeer(&mapper0);
1469
1470 {
1471 SCOPED_TRACE("Trying node1 now");
1472 std::deque<TimestampedMessage> output1;
1473
1474 ASSERT_TRUE(mapper1.Front() != nullptr);
1475 output1.emplace_back(std::move(*mapper1.Front()));
1476 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001477 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001478
1479 ASSERT_TRUE(mapper1.Front() != nullptr);
1480 output1.emplace_back(std::move(*mapper1.Front()));
1481 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001482 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001483
1484 ASSERT_TRUE(mapper1.Front() != nullptr);
1485 output1.emplace_back(std::move(*mapper1.Front()));
1486 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001487 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001488
1489 ASSERT_TRUE(mapper1.Front() == nullptr);
1490
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001491 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1492 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001493 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001494 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001495
1496 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1497 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001498 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001499 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001500
1501 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1502 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001503 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001504 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001505 }
Austin Schuh79b30942021-01-24 22:32:21 -08001506
1507 EXPECT_EQ(mapper0_count, 0u);
1508 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001509}
1510
1511// Tests that we return just the timestamps if we couldn't find the data and the
1512// missing data was at the end of the file.
1513TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1514 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1515 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001516 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001517 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001518 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001519 writer1.QueueSpan(config2_.span());
1520
Austin Schuhd863e6e2022-10-16 15:44:50 -07001521 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001522 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001523 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001524 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1525
Austin Schuhd863e6e2022-10-16 15:44:50 -07001526 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001527 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001528 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001529 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1530
1531 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001532 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001533 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1534 }
1535
1536 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1537
1538 ASSERT_EQ(parts[0].logger_node, "pi1");
1539 ASSERT_EQ(parts[1].logger_node, "pi2");
1540
Austin Schuh79b30942021-01-24 22:32:21 -08001541 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001542 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001543 mapper0.set_timestamp_callback(
1544 [&](TimestampedMessage *) { ++mapper0_count; });
1545 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001546 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001547 mapper1.set_timestamp_callback(
1548 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001549
1550 mapper0.AddPeer(&mapper1);
1551 mapper1.AddPeer(&mapper0);
1552
1553 {
1554 SCOPED_TRACE("Trying node1 now");
1555 std::deque<TimestampedMessage> output1;
1556
1557 ASSERT_TRUE(mapper1.Front() != nullptr);
1558 output1.emplace_back(std::move(*mapper1.Front()));
1559 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001560 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001561
1562 ASSERT_TRUE(mapper1.Front() != nullptr);
1563 output1.emplace_back(std::move(*mapper1.Front()));
1564 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001565 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001566
1567 ASSERT_TRUE(mapper1.Front() != nullptr);
1568 output1.emplace_back(std::move(*mapper1.Front()));
1569 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001570 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001571
1572 ASSERT_TRUE(mapper1.Front() == nullptr);
1573
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001574 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1575 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001576 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001577 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001578
1579 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1580 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001581 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001582 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001583
1584 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1585 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001586 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001587 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001588 }
Austin Schuh79b30942021-01-24 22:32:21 -08001589
1590 EXPECT_EQ(mapper0_count, 0u);
1591 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001592}
1593
Austin Schuh993ccb52020-12-12 15:59:32 -08001594// Tests that we handle a message which failed to forward or be logged.
1595TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1596 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1597 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001598 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001599 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001600 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001601 writer1.QueueSpan(config2_.span());
1602
Austin Schuhd863e6e2022-10-16 15:44:50 -07001603 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001604 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001605 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001606 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1607
1608 // Create both the timestamp and message, but don't log them, simulating a
1609 // forwarding drop.
1610 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1611 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1612 chrono::seconds(100));
1613
Austin Schuhd863e6e2022-10-16 15:44:50 -07001614 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001615 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001616 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001617 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1618 }
1619
1620 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1621
1622 ASSERT_EQ(parts[0].logger_node, "pi1");
1623 ASSERT_EQ(parts[1].logger_node, "pi2");
1624
Austin Schuh79b30942021-01-24 22:32:21 -08001625 size_t mapper0_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001626 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001627 mapper0.set_timestamp_callback(
1628 [&](TimestampedMessage *) { ++mapper0_count; });
1629 size_t mapper1_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001630 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001631 mapper1.set_timestamp_callback(
1632 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001633
1634 mapper0.AddPeer(&mapper1);
1635 mapper1.AddPeer(&mapper0);
1636
1637 {
1638 std::deque<TimestampedMessage> output1;
1639
1640 ASSERT_TRUE(mapper1.Front() != nullptr);
1641 output1.emplace_back(std::move(*mapper1.Front()));
1642 mapper1.PopFront();
1643
1644 ASSERT_TRUE(mapper1.Front() != nullptr);
1645 output1.emplace_back(std::move(*mapper1.Front()));
1646
1647 ASSERT_FALSE(mapper1.Front() == nullptr);
1648
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001649 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1650 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001651 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001652 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001653
1654 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1655 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001656 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001657 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh993ccb52020-12-12 15:59:32 -08001658 }
Austin Schuh79b30942021-01-24 22:32:21 -08001659
1660 EXPECT_EQ(mapper0_count, 0u);
1661 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001662}
1663
Austin Schuhd2f96102020-12-01 20:27:29 -08001664// Tests that we properly sort log files with duplicate timestamps.
1665TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1666 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1667 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001668 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001669 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001670 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001671 writer1.QueueSpan(config2_.span());
1672
Austin Schuhd863e6e2022-10-16 15:44:50 -07001673 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001674 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001675 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001676 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1677
Austin Schuhd863e6e2022-10-16 15:44:50 -07001678 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001679 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001680 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001681 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1682
Austin Schuhd863e6e2022-10-16 15:44:50 -07001683 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001684 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001685 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001686 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1687
Austin Schuhd863e6e2022-10-16 15:44:50 -07001688 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001689 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001690 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001691 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1692 }
1693
1694 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1695
1696 ASSERT_EQ(parts[0].logger_node, "pi1");
1697 ASSERT_EQ(parts[1].logger_node, "pi2");
1698
Austin Schuh79b30942021-01-24 22:32:21 -08001699 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001700 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001701 mapper0.set_timestamp_callback(
1702 [&](TimestampedMessage *) { ++mapper0_count; });
1703 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001704 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001705 mapper1.set_timestamp_callback(
1706 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001707
1708 mapper0.AddPeer(&mapper1);
1709 mapper1.AddPeer(&mapper0);
1710
1711 {
1712 SCOPED_TRACE("Trying node1 now");
1713 std::deque<TimestampedMessage> output1;
1714
1715 for (int i = 0; i < 4; ++i) {
1716 ASSERT_TRUE(mapper1.Front() != nullptr);
1717 output1.emplace_back(std::move(*mapper1.Front()));
1718 mapper1.PopFront();
1719 }
1720 ASSERT_TRUE(mapper1.Front() == nullptr);
1721
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001722 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1723 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001724 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001725 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001726
1727 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1728 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001729 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001730 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001731
1732 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1733 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001734 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001735 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001736
1737 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1738 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001739 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001740 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001741 }
Austin Schuh79b30942021-01-24 22:32:21 -08001742
1743 EXPECT_EQ(mapper0_count, 0u);
1744 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001745}
1746
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001747// Tests that we properly produce a valid start time.
Austin Schuhd2f96102020-12-01 20:27:29 -08001748TEST_F(TimestampMapperTest, StartTime) {
1749 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1750 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001751 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001752 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001753 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001754 writer1.QueueSpan(config1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001755 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001756 writer2.QueueSpan(config3_.span());
1757 }
1758
1759 const std::vector<LogFile> parts =
1760 SortParts({logfile0_, logfile1_, logfile2_});
1761
Austin Schuh79b30942021-01-24 22:32:21 -08001762 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001763 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001764 mapper0.set_timestamp_callback(
1765 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001766
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001767 EXPECT_EQ(mapper0.monotonic_start_time(0), e + chrono::milliseconds(1));
1768 EXPECT_EQ(mapper0.realtime_start_time(0),
Austin Schuhd2f96102020-12-01 20:27:29 -08001769 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001770 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001771}
1772
Austin Schuhfecf1d82020-12-19 16:57:28 -08001773// Tests that when a peer isn't registered, we treat that as if there was no
1774// data available.
1775TEST_F(TimestampMapperTest, NoPeer) {
1776 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1777 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001778 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001779 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001780 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001781 writer1.QueueSpan(config2_.span());
1782
1783 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001784 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001785 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1786
Austin Schuhd863e6e2022-10-16 15:44:50 -07001787 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001788 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001789 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001790 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1791
Austin Schuhd863e6e2022-10-16 15:44:50 -07001792 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001793 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001794 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001795 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1796 }
1797
1798 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1799
1800 ASSERT_EQ(parts[0].logger_node, "pi1");
1801 ASSERT_EQ(parts[1].logger_node, "pi2");
1802
Austin Schuh79b30942021-01-24 22:32:21 -08001803 size_t mapper1_count = 0;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001804 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001805 mapper1.set_timestamp_callback(
1806 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001807
1808 {
1809 std::deque<TimestampedMessage> output1;
1810
1811 ASSERT_TRUE(mapper1.Front() != nullptr);
1812 output1.emplace_back(std::move(*mapper1.Front()));
1813 mapper1.PopFront();
1814 ASSERT_TRUE(mapper1.Front() != nullptr);
1815 output1.emplace_back(std::move(*mapper1.Front()));
1816 mapper1.PopFront();
1817 ASSERT_TRUE(mapper1.Front() != nullptr);
1818 output1.emplace_back(std::move(*mapper1.Front()));
1819 mapper1.PopFront();
1820 ASSERT_TRUE(mapper1.Front() == nullptr);
1821
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001822 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1823 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001824 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001825 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001826
1827 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1828 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001829 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001830 EXPECT_FALSE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001831
1832 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1833 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001834 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001835 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001836 }
Austin Schuh79b30942021-01-24 22:32:21 -08001837 EXPECT_EQ(mapper1_count, 3u);
1838}
1839
1840// Tests that we can queue messages and call the timestamp callback for both
1841// nodes.
1842TEST_F(TimestampMapperTest, QueueUntilNode0) {
1843 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1844 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001845 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh79b30942021-01-24 22:32:21 -08001846 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001847 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh79b30942021-01-24 22:32:21 -08001848 writer1.QueueSpan(config2_.span());
1849
Austin Schuhd863e6e2022-10-16 15:44:50 -07001850 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001851 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001852 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001853 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1854
Austin Schuhd863e6e2022-10-16 15:44:50 -07001855 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001856 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001857 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001858 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1859
Austin Schuhd863e6e2022-10-16 15:44:50 -07001860 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001861 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001862 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001863 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1864
Austin Schuhd863e6e2022-10-16 15:44:50 -07001865 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001866 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001867 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001868 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1869 }
1870
1871 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1872
1873 ASSERT_EQ(parts[0].logger_node, "pi1");
1874 ASSERT_EQ(parts[1].logger_node, "pi2");
1875
1876 size_t mapper0_count = 0;
1877 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1878 mapper0.set_timestamp_callback(
1879 [&](TimestampedMessage *) { ++mapper0_count; });
1880 size_t mapper1_count = 0;
1881 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1882 mapper1.set_timestamp_callback(
1883 [&](TimestampedMessage *) { ++mapper1_count; });
1884
1885 mapper0.AddPeer(&mapper1);
1886 mapper1.AddPeer(&mapper0);
1887
1888 {
1889 std::deque<TimestampedMessage> output0;
1890
1891 EXPECT_EQ(mapper0_count, 0u);
1892 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001893 mapper0.QueueUntil(
1894 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001895 EXPECT_EQ(mapper0_count, 3u);
1896 EXPECT_EQ(mapper1_count, 0u);
1897
1898 ASSERT_TRUE(mapper0.Front() != nullptr);
1899 EXPECT_EQ(mapper0_count, 3u);
1900 EXPECT_EQ(mapper1_count, 0u);
1901
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001902 mapper0.QueueUntil(
1903 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001904 EXPECT_EQ(mapper0_count, 3u);
1905 EXPECT_EQ(mapper1_count, 0u);
1906
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001907 mapper0.QueueUntil(
1908 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001909 EXPECT_EQ(mapper0_count, 4u);
1910 EXPECT_EQ(mapper1_count, 0u);
1911
1912 output0.emplace_back(std::move(*mapper0.Front()));
1913 mapper0.PopFront();
1914 output0.emplace_back(std::move(*mapper0.Front()));
1915 mapper0.PopFront();
1916 output0.emplace_back(std::move(*mapper0.Front()));
1917 mapper0.PopFront();
1918 output0.emplace_back(std::move(*mapper0.Front()));
1919 mapper0.PopFront();
1920
1921 EXPECT_EQ(mapper0_count, 4u);
1922 EXPECT_EQ(mapper1_count, 0u);
1923
1924 ASSERT_TRUE(mapper0.Front() == nullptr);
1925
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001926 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1927 EXPECT_EQ(output0[0].monotonic_event_time.time,
1928 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001929 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001930
1931 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1932 EXPECT_EQ(output0[1].monotonic_event_time.time,
1933 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001934 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001935
1936 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1937 EXPECT_EQ(output0[2].monotonic_event_time.time,
1938 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001939 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001940
1941 EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
1942 EXPECT_EQ(output0[3].monotonic_event_time.time,
1943 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001944 EXPECT_TRUE(output0[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001945 }
1946
1947 {
1948 SCOPED_TRACE("Trying node1 now");
1949 std::deque<TimestampedMessage> output1;
1950
1951 EXPECT_EQ(mapper0_count, 4u);
1952 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001953 mapper1.QueueUntil(BootTimestamp{
1954 .boot = 0,
1955 .time = e + chrono::seconds(100) + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001956 EXPECT_EQ(mapper0_count, 4u);
1957 EXPECT_EQ(mapper1_count, 3u);
1958
1959 ASSERT_TRUE(mapper1.Front() != nullptr);
1960 EXPECT_EQ(mapper0_count, 4u);
1961 EXPECT_EQ(mapper1_count, 3u);
1962
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001963 mapper1.QueueUntil(BootTimestamp{
1964 .boot = 0,
1965 .time = e + chrono::seconds(100) + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001966 EXPECT_EQ(mapper0_count, 4u);
1967 EXPECT_EQ(mapper1_count, 3u);
1968
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001969 mapper1.QueueUntil(BootTimestamp{
1970 .boot = 0,
1971 .time = e + chrono::seconds(100) + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001972 EXPECT_EQ(mapper0_count, 4u);
1973 EXPECT_EQ(mapper1_count, 4u);
1974
1975 ASSERT_TRUE(mapper1.Front() != nullptr);
1976 EXPECT_EQ(mapper0_count, 4u);
1977 EXPECT_EQ(mapper1_count, 4u);
1978
1979 output1.emplace_back(std::move(*mapper1.Front()));
1980 mapper1.PopFront();
1981 ASSERT_TRUE(mapper1.Front() != nullptr);
1982 output1.emplace_back(std::move(*mapper1.Front()));
1983 mapper1.PopFront();
1984 ASSERT_TRUE(mapper1.Front() != nullptr);
1985 output1.emplace_back(std::move(*mapper1.Front()));
1986 mapper1.PopFront();
1987 ASSERT_TRUE(mapper1.Front() != nullptr);
1988 output1.emplace_back(std::move(*mapper1.Front()));
1989 mapper1.PopFront();
1990
1991 EXPECT_EQ(mapper0_count, 4u);
1992 EXPECT_EQ(mapper1_count, 4u);
1993
1994 ASSERT_TRUE(mapper1.Front() == nullptr);
1995
1996 EXPECT_EQ(mapper0_count, 4u);
1997 EXPECT_EQ(mapper1_count, 4u);
1998
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001999 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2000 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002001 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002002 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002003
2004 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
2005 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002006 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002007 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002008
2009 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
2010 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002011 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002012 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002013
2014 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
2015 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002016 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002017 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08002018 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08002019}
2020
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002021class BootMergerTest : public SortingElementTest {
2022 public:
2023 BootMergerTest()
2024 : SortingElementTest(),
2025 boot0_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002026 /* 100ms */
2027 "max_out_of_order_duration": 100000000,
2028 "node": {
2029 "name": "pi2"
2030 },
2031 "logger_node": {
2032 "name": "pi1"
2033 },
2034 "monotonic_start_time": 1000000,
2035 "realtime_start_time": 1000000000000,
2036 "logger_monotonic_start_time": 1000000,
2037 "logger_realtime_start_time": 1000000000000,
2038 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2039 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
2040 "parts_index": 0,
2041 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2042 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002043 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2044 "boot_uuids": [
2045 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2046 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2047 ""
2048 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002049})")),
2050 boot1_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002051 /* 100ms */
2052 "max_out_of_order_duration": 100000000,
2053 "node": {
2054 "name": "pi2"
2055 },
2056 "logger_node": {
2057 "name": "pi1"
2058 },
2059 "monotonic_start_time": 1000000,
2060 "realtime_start_time": 1000000000000,
2061 "logger_monotonic_start_time": 1000000,
2062 "logger_realtime_start_time": 1000000000000,
2063 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2064 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
2065 "parts_index": 1,
2066 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2067 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002068 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2069 "boot_uuids": [
2070 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2071 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2072 ""
2073 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002074})")) {}
Austin Schuh8bee6882021-06-28 21:03:28 -07002075
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002076 protected:
2077 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
2078 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
2079};
2080
2081// This tests that we can properly sort a multi-node log file which has the old
2082// (and buggy) timestamps in the header, and the non-resetting parts_index.
2083// These make it so we can just bairly figure out what happened first and what
2084// happened second, but not in a way that is robust to multiple nodes rebooting.
2085TEST_F(BootMergerTest, OldReboot) {
Austin Schuh8bee6882021-06-28 21:03:28 -07002086 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002087 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002088 writer.QueueSpan(boot0_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002089 }
2090 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002091 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002092 writer.QueueSpan(boot1_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002093 }
2094
2095 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
2096
2097 ASSERT_EQ(parts.size(), 1u);
2098 ASSERT_EQ(parts[0].parts.size(), 2u);
2099
2100 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
2101 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002102 boot0_.message().source_node_boot_uuid()->string_view());
Austin Schuh8bee6882021-06-28 21:03:28 -07002103
2104 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
2105 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002106 boot1_.message().source_node_boot_uuid()->string_view());
2107}
2108
2109// This tests that we can produce messages ordered across a reboot.
2110TEST_F(BootMergerTest, SortAcrossReboot) {
2111 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2112 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002113 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002114 writer.QueueSpan(boot0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002115 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002116 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002117 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002118 MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
2119 }
2120 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002121 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002122 writer.QueueSpan(boot1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002123 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002124 MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002125 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002126 MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
2127 }
2128
2129 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
2130 ASSERT_EQ(parts.size(), 1u);
2131 ASSERT_EQ(parts[0].parts.size(), 2u);
2132
2133 BootMerger merger(FilterPartsForNode(parts, "pi2"));
2134
2135 EXPECT_EQ(merger.node(), 1u);
2136
2137 std::vector<Message> output;
2138 for (int i = 0; i < 4; ++i) {
2139 ASSERT_TRUE(merger.Front() != nullptr);
2140 output.emplace_back(std::move(*merger.Front()));
2141 merger.PopFront();
2142 }
2143
2144 ASSERT_TRUE(merger.Front() == nullptr);
2145
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002146 EXPECT_EQ(output[0].timestamp.boot, 0u);
2147 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
2148 EXPECT_EQ(output[1].timestamp.boot, 0u);
2149 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(2000));
2150
2151 EXPECT_EQ(output[2].timestamp.boot, 1u);
2152 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(100));
2153 EXPECT_EQ(output[3].timestamp.boot, 1u);
2154 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
Austin Schuh8bee6882021-06-28 21:03:28 -07002155}
2156
Austin Schuh48507722021-07-17 17:29:24 -07002157class RebootTimestampMapperTest : public SortingElementTest {
2158 public:
2159 RebootTimestampMapperTest()
2160 : SortingElementTest(),
2161 boot0a_(MakeHeader(config_, R"({
2162 /* 100ms */
2163 "max_out_of_order_duration": 100000000,
2164 "node": {
2165 "name": "pi1"
2166 },
2167 "logger_node": {
2168 "name": "pi1"
2169 },
2170 "monotonic_start_time": 1000000,
2171 "realtime_start_time": 1000000000000,
2172 "logger_monotonic_start_time": 1000000,
2173 "logger_realtime_start_time": 1000000000000,
2174 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2175 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2176 "parts_index": 0,
2177 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2178 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2179 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2180 "boot_uuids": [
2181 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2182 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2183 ""
2184 ]
2185})")),
2186 boot0b_(MakeHeader(config_, R"({
2187 /* 100ms */
2188 "max_out_of_order_duration": 100000000,
2189 "node": {
2190 "name": "pi1"
2191 },
2192 "logger_node": {
2193 "name": "pi1"
2194 },
2195 "monotonic_start_time": 1000000,
2196 "realtime_start_time": 1000000000000,
2197 "logger_monotonic_start_time": 1000000,
2198 "logger_realtime_start_time": 1000000000000,
2199 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2200 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2201 "parts_index": 1,
2202 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2203 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2204 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2205 "boot_uuids": [
2206 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2207 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2208 ""
2209 ]
2210})")),
2211 boot1a_(MakeHeader(config_, R"({
2212 /* 100ms */
2213 "max_out_of_order_duration": 100000000,
2214 "node": {
2215 "name": "pi2"
2216 },
2217 "logger_node": {
2218 "name": "pi1"
2219 },
2220 "monotonic_start_time": 1000000,
2221 "realtime_start_time": 1000000000000,
2222 "logger_monotonic_start_time": 1000000,
2223 "logger_realtime_start_time": 1000000000000,
2224 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2225 "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
2226 "parts_index": 0,
2227 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2228 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2229 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2230 "boot_uuids": [
2231 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2232 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2233 ""
2234 ]
2235})")),
2236 boot1b_(MakeHeader(config_, R"({
2237 /* 100ms */
2238 "max_out_of_order_duration": 100000000,
2239 "node": {
2240 "name": "pi2"
2241 },
2242 "logger_node": {
2243 "name": "pi1"
2244 },
2245 "monotonic_start_time": 1000000,
2246 "realtime_start_time": 1000000000000,
2247 "logger_monotonic_start_time": 1000000,
2248 "logger_realtime_start_time": 1000000000000,
2249 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2250 "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
2251 "parts_index": 1,
2252 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2253 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2254 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2255 "boot_uuids": [
2256 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2257 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2258 ""
2259 ]
2260})")) {}
2261
2262 protected:
2263 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
2264 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
2265 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
2266 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
2267};
2268
Austin Schuh48507722021-07-17 17:29:24 -07002269// Tests that we can match timestamps on delivered messages in the presence of
2270// reboots on the node receiving timestamps.
2271TEST_F(RebootTimestampMapperTest, ReadNode0First) {
2272 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2273 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002274 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002275 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002276 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002277 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002278 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002279 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002280 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002281 writer1b.QueueSpan(boot1b_.span());
2282
Austin Schuhd863e6e2022-10-16 15:44:50 -07002283 writer0a.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002284 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002285 writer1a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002286 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
2287 e + chrono::milliseconds(1001)));
2288
Austin Schuhd863e6e2022-10-16 15:44:50 -07002289 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh58646e22021-08-23 23:51:46 -07002290 e + chrono::milliseconds(1000), 0, chrono::seconds(21),
2291 e + chrono::milliseconds(2001)));
2292
Austin Schuhd863e6e2022-10-16 15:44:50 -07002293 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002294 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002295 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002296 e + chrono::milliseconds(2000), 0, chrono::seconds(20),
2297 e + chrono::milliseconds(2001)));
2298
Austin Schuhd863e6e2022-10-16 15:44:50 -07002299 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002300 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002301 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002302 e + chrono::milliseconds(3000), 0, chrono::seconds(20),
2303 e + chrono::milliseconds(3001)));
2304 }
2305
Austin Schuh58646e22021-08-23 23:51:46 -07002306 const std::vector<LogFile> parts =
2307 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Austin Schuh48507722021-07-17 17:29:24 -07002308
2309 for (const auto &x : parts) {
2310 LOG(INFO) << x;
2311 }
2312 ASSERT_EQ(parts.size(), 1u);
2313 ASSERT_EQ(parts[0].logger_node, "pi1");
2314
2315 size_t mapper0_count = 0;
2316 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2317 mapper0.set_timestamp_callback(
2318 [&](TimestampedMessage *) { ++mapper0_count; });
2319 size_t mapper1_count = 0;
2320 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2321 mapper1.set_timestamp_callback(
2322 [&](TimestampedMessage *) { ++mapper1_count; });
2323
2324 mapper0.AddPeer(&mapper1);
2325 mapper1.AddPeer(&mapper0);
2326
2327 {
2328 std::deque<TimestampedMessage> output0;
2329
2330 EXPECT_EQ(mapper0_count, 0u);
2331 EXPECT_EQ(mapper1_count, 0u);
2332 ASSERT_TRUE(mapper0.Front() != nullptr);
2333 EXPECT_EQ(mapper0_count, 1u);
2334 EXPECT_EQ(mapper1_count, 0u);
2335 output0.emplace_back(std::move(*mapper0.Front()));
2336 mapper0.PopFront();
2337 EXPECT_TRUE(mapper0.started());
2338 EXPECT_EQ(mapper0_count, 1u);
2339 EXPECT_EQ(mapper1_count, 0u);
2340
2341 ASSERT_TRUE(mapper0.Front() != nullptr);
2342 EXPECT_EQ(mapper0_count, 2u);
2343 EXPECT_EQ(mapper1_count, 0u);
2344 output0.emplace_back(std::move(*mapper0.Front()));
2345 mapper0.PopFront();
2346 EXPECT_TRUE(mapper0.started());
2347
2348 ASSERT_TRUE(mapper0.Front() != nullptr);
2349 output0.emplace_back(std::move(*mapper0.Front()));
2350 mapper0.PopFront();
2351 EXPECT_TRUE(mapper0.started());
2352
2353 EXPECT_EQ(mapper0_count, 3u);
2354 EXPECT_EQ(mapper1_count, 0u);
2355
2356 ASSERT_TRUE(mapper0.Front() == nullptr);
2357
2358 LOG(INFO) << output0[0];
2359 LOG(INFO) << output0[1];
2360 LOG(INFO) << output0[2];
2361
2362 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2363 EXPECT_EQ(output0[0].monotonic_event_time.time,
2364 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002365 EXPECT_EQ(output0[0].queue_index,
2366 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002367 EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
2368 EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002369 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002370
2371 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2372 EXPECT_EQ(output0[1].monotonic_event_time.time,
2373 e + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002374 EXPECT_EQ(output0[1].queue_index,
2375 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002376 EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
2377 EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002378 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002379
2380 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2381 EXPECT_EQ(output0[2].monotonic_event_time.time,
2382 e + chrono::milliseconds(3000));
Austin Schuh58646e22021-08-23 23:51:46 -07002383 EXPECT_EQ(output0[2].queue_index,
2384 (BootQueueIndex{.boot = 0u, .index = 2u}));
Austin Schuh48507722021-07-17 17:29:24 -07002385 EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
2386 EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002387 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002388 }
2389
2390 {
2391 SCOPED_TRACE("Trying node1 now");
2392 std::deque<TimestampedMessage> output1;
2393
2394 EXPECT_EQ(mapper0_count, 3u);
2395 EXPECT_EQ(mapper1_count, 0u);
2396
2397 ASSERT_TRUE(mapper1.Front() != nullptr);
2398 EXPECT_EQ(mapper0_count, 3u);
2399 EXPECT_EQ(mapper1_count, 1u);
2400 output1.emplace_back(std::move(*mapper1.Front()));
2401 mapper1.PopFront();
2402 EXPECT_TRUE(mapper1.started());
2403 EXPECT_EQ(mapper0_count, 3u);
2404 EXPECT_EQ(mapper1_count, 1u);
2405
2406 ASSERT_TRUE(mapper1.Front() != nullptr);
2407 EXPECT_EQ(mapper0_count, 3u);
2408 EXPECT_EQ(mapper1_count, 2u);
2409 output1.emplace_back(std::move(*mapper1.Front()));
2410 mapper1.PopFront();
2411 EXPECT_TRUE(mapper1.started());
2412
2413 ASSERT_TRUE(mapper1.Front() != nullptr);
2414 output1.emplace_back(std::move(*mapper1.Front()));
2415 mapper1.PopFront();
2416 EXPECT_TRUE(mapper1.started());
2417
Austin Schuh58646e22021-08-23 23:51:46 -07002418 ASSERT_TRUE(mapper1.Front() != nullptr);
2419 output1.emplace_back(std::move(*mapper1.Front()));
2420 mapper1.PopFront();
2421 EXPECT_TRUE(mapper1.started());
2422
Austin Schuh48507722021-07-17 17:29:24 -07002423 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002424 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002425
2426 ASSERT_TRUE(mapper1.Front() == nullptr);
2427
2428 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002429 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002430
2431 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2432 EXPECT_EQ(output1[0].monotonic_event_time.time,
2433 e + chrono::seconds(100) + chrono::milliseconds(1000));
2434 EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
2435 EXPECT_EQ(output1[0].monotonic_remote_time.time,
2436 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002437 EXPECT_EQ(output1[0].remote_queue_index,
2438 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002439 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
2440 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
2441 e + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002442 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002443
2444 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2445 EXPECT_EQ(output1[1].monotonic_event_time.time,
2446 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002447 EXPECT_EQ(output1[1].remote_queue_index,
2448 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002449 EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
2450 EXPECT_EQ(output1[1].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002451 e + chrono::milliseconds(1000));
Austin Schuh48507722021-07-17 17:29:24 -07002452 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
2453 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
2454 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002455 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002456
2457 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2458 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002459 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh48507722021-07-17 17:29:24 -07002460 EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
2461 EXPECT_EQ(output1[2].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002462 e + chrono::milliseconds(2000));
2463 EXPECT_EQ(output1[2].remote_queue_index,
2464 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002465 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
2466 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002467 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002468 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002469
Austin Schuh58646e22021-08-23 23:51:46 -07002470 EXPECT_EQ(output1[3].monotonic_event_time.boot, 1u);
2471 EXPECT_EQ(output1[3].monotonic_event_time.time,
2472 e + chrono::seconds(20) + chrono::milliseconds(3000));
2473 EXPECT_EQ(output1[3].monotonic_remote_time.boot, 0u);
2474 EXPECT_EQ(output1[3].monotonic_remote_time.time,
2475 e + chrono::milliseconds(3000));
2476 EXPECT_EQ(output1[3].remote_queue_index,
2477 (BootQueueIndex{.boot = 0u, .index = 2u}));
2478 EXPECT_EQ(output1[3].monotonic_timestamp_time.boot, 0u);
2479 EXPECT_EQ(output1[3].monotonic_timestamp_time.time,
2480 e + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002481 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh58646e22021-08-23 23:51:46 -07002482
Austin Schuh48507722021-07-17 17:29:24 -07002483 LOG(INFO) << output1[0];
2484 LOG(INFO) << output1[1];
2485 LOG(INFO) << output1[2];
Austin Schuh58646e22021-08-23 23:51:46 -07002486 LOG(INFO) << output1[3];
Austin Schuh48507722021-07-17 17:29:24 -07002487 }
2488}
2489
2490TEST_F(RebootTimestampMapperTest, Node2Reboot) {
2491 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2492 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002493 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002494 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002495 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002496 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002497 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002498 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002499 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002500 writer1b.QueueSpan(boot1b_.span());
2501
Austin Schuhd863e6e2022-10-16 15:44:50 -07002502 writer1a.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002503 e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002504 writer0a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002505 e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
2506 chrono::seconds(-100),
2507 e + chrono::seconds(100) + chrono::milliseconds(1001)));
2508
Austin Schuhd863e6e2022-10-16 15:44:50 -07002509 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002510 e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002511 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002512 e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
2513 chrono::seconds(-20),
2514 e + chrono::seconds(20) + chrono::milliseconds(2001)));
2515
Austin Schuhd863e6e2022-10-16 15:44:50 -07002516 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002517 e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002518 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002519 e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
2520 chrono::seconds(-20),
2521 e + chrono::seconds(20) + chrono::milliseconds(3001)));
2522 }
2523
2524 const std::vector<LogFile> parts =
2525 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2526
2527 for (const auto &x : parts) {
2528 LOG(INFO) << x;
2529 }
2530 ASSERT_EQ(parts.size(), 1u);
2531 ASSERT_EQ(parts[0].logger_node, "pi1");
2532
2533 size_t mapper0_count = 0;
2534 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2535 mapper0.set_timestamp_callback(
2536 [&](TimestampedMessage *) { ++mapper0_count; });
2537 size_t mapper1_count = 0;
2538 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2539 mapper1.set_timestamp_callback(
2540 [&](TimestampedMessage *) { ++mapper1_count; });
2541
2542 mapper0.AddPeer(&mapper1);
2543 mapper1.AddPeer(&mapper0);
2544
2545 {
2546 std::deque<TimestampedMessage> output0;
2547
2548 EXPECT_EQ(mapper0_count, 0u);
2549 EXPECT_EQ(mapper1_count, 0u);
2550 ASSERT_TRUE(mapper0.Front() != nullptr);
2551 EXPECT_EQ(mapper0_count, 1u);
2552 EXPECT_EQ(mapper1_count, 0u);
2553 output0.emplace_back(std::move(*mapper0.Front()));
2554 mapper0.PopFront();
2555 EXPECT_TRUE(mapper0.started());
2556 EXPECT_EQ(mapper0_count, 1u);
2557 EXPECT_EQ(mapper1_count, 0u);
2558
2559 ASSERT_TRUE(mapper0.Front() != nullptr);
2560 EXPECT_EQ(mapper0_count, 2u);
2561 EXPECT_EQ(mapper1_count, 0u);
2562 output0.emplace_back(std::move(*mapper0.Front()));
2563 mapper0.PopFront();
2564 EXPECT_TRUE(mapper0.started());
2565
2566 ASSERT_TRUE(mapper0.Front() != nullptr);
2567 output0.emplace_back(std::move(*mapper0.Front()));
2568 mapper0.PopFront();
2569 EXPECT_TRUE(mapper0.started());
2570
2571 EXPECT_EQ(mapper0_count, 3u);
2572 EXPECT_EQ(mapper1_count, 0u);
2573
2574 ASSERT_TRUE(mapper0.Front() == nullptr);
2575
2576 LOG(INFO) << output0[0];
2577 LOG(INFO) << output0[1];
2578 LOG(INFO) << output0[2];
2579
2580 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2581 EXPECT_EQ(output0[0].monotonic_event_time.time,
2582 e + chrono::milliseconds(1000));
2583 EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
2584 EXPECT_EQ(output0[0].monotonic_remote_time.time,
2585 e + chrono::seconds(100) + chrono::milliseconds(1000));
2586 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
2587 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
2588 e + chrono::seconds(100) + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002589 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002590
2591 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2592 EXPECT_EQ(output0[1].monotonic_event_time.time,
2593 e + chrono::milliseconds(2000));
2594 EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
2595 EXPECT_EQ(output0[1].monotonic_remote_time.time,
2596 e + chrono::seconds(20) + chrono::milliseconds(2000));
2597 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
2598 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
2599 e + chrono::seconds(20) + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002600 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002601
2602 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2603 EXPECT_EQ(output0[2].monotonic_event_time.time,
2604 e + chrono::milliseconds(3000));
2605 EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
2606 EXPECT_EQ(output0[2].monotonic_remote_time.time,
2607 e + chrono::seconds(20) + chrono::milliseconds(3000));
2608 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
2609 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
2610 e + chrono::seconds(20) + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002611 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002612 }
2613
2614 {
2615 SCOPED_TRACE("Trying node1 now");
2616 std::deque<TimestampedMessage> output1;
2617
2618 EXPECT_EQ(mapper0_count, 3u);
2619 EXPECT_EQ(mapper1_count, 0u);
2620
2621 ASSERT_TRUE(mapper1.Front() != nullptr);
2622 EXPECT_EQ(mapper0_count, 3u);
2623 EXPECT_EQ(mapper1_count, 1u);
2624 output1.emplace_back(std::move(*mapper1.Front()));
2625 mapper1.PopFront();
2626 EXPECT_TRUE(mapper1.started());
2627 EXPECT_EQ(mapper0_count, 3u);
2628 EXPECT_EQ(mapper1_count, 1u);
2629
2630 ASSERT_TRUE(mapper1.Front() != nullptr);
2631 EXPECT_EQ(mapper0_count, 3u);
2632 EXPECT_EQ(mapper1_count, 2u);
2633 output1.emplace_back(std::move(*mapper1.Front()));
2634 mapper1.PopFront();
2635 EXPECT_TRUE(mapper1.started());
2636
2637 ASSERT_TRUE(mapper1.Front() != nullptr);
2638 output1.emplace_back(std::move(*mapper1.Front()));
2639 mapper1.PopFront();
2640 EXPECT_TRUE(mapper1.started());
2641
2642 EXPECT_EQ(mapper0_count, 3u);
2643 EXPECT_EQ(mapper1_count, 3u);
2644
2645 ASSERT_TRUE(mapper1.Front() == nullptr);
2646
2647 EXPECT_EQ(mapper0_count, 3u);
2648 EXPECT_EQ(mapper1_count, 3u);
2649
2650 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2651 EXPECT_EQ(output1[0].monotonic_event_time.time,
2652 e + chrono::seconds(100) + chrono::milliseconds(1000));
2653 EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
2654 EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002655 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002656
2657 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2658 EXPECT_EQ(output1[1].monotonic_event_time.time,
2659 e + chrono::seconds(20) + chrono::milliseconds(2000));
2660 EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
2661 EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002662 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002663
2664 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2665 EXPECT_EQ(output1[2].monotonic_event_time.time,
2666 e + chrono::seconds(20) + chrono::milliseconds(3000));
2667 EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
2668 EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002669 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002670
2671 LOG(INFO) << output1[0];
2672 LOG(INFO) << output1[1];
2673 LOG(INFO) << output1[2];
2674 }
2675}
2676
Austin Schuh44c61472021-11-22 21:04:10 -08002677class SortingDeathTest : public SortingElementTest {
2678 public:
2679 SortingDeathTest()
2680 : SortingElementTest(),
2681 part0_(MakeHeader(config_, R"({
2682 /* 100ms */
2683 "max_out_of_order_duration": 100000000,
2684 "node": {
2685 "name": "pi1"
2686 },
2687 "logger_node": {
2688 "name": "pi1"
2689 },
2690 "monotonic_start_time": 1000000,
2691 "realtime_start_time": 1000000000000,
2692 "logger_monotonic_start_time": 1000000,
2693 "logger_realtime_start_time": 1000000000000,
2694 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2695 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2696 "parts_index": 0,
2697 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2698 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2699 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2700 "boot_uuids": [
2701 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2702 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2703 ""
2704 ],
2705 "oldest_remote_monotonic_timestamps": [
2706 9223372036854775807,
2707 9223372036854775807,
2708 9223372036854775807
2709 ],
2710 "oldest_local_monotonic_timestamps": [
2711 9223372036854775807,
2712 9223372036854775807,
2713 9223372036854775807
2714 ],
2715 "oldest_remote_unreliable_monotonic_timestamps": [
2716 9223372036854775807,
2717 0,
2718 9223372036854775807
2719 ],
2720 "oldest_local_unreliable_monotonic_timestamps": [
2721 9223372036854775807,
2722 0,
2723 9223372036854775807
2724 ]
2725})")),
2726 part1_(MakeHeader(config_, R"({
2727 /* 100ms */
2728 "max_out_of_order_duration": 100000000,
2729 "node": {
2730 "name": "pi1"
2731 },
2732 "logger_node": {
2733 "name": "pi1"
2734 },
2735 "monotonic_start_time": 1000000,
2736 "realtime_start_time": 1000000000000,
2737 "logger_monotonic_start_time": 1000000,
2738 "logger_realtime_start_time": 1000000000000,
2739 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2740 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2741 "parts_index": 1,
2742 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2743 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2744 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2745 "boot_uuids": [
2746 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2747 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2748 ""
2749 ],
2750 "oldest_remote_monotonic_timestamps": [
2751 9223372036854775807,
2752 9223372036854775807,
2753 9223372036854775807
2754 ],
2755 "oldest_local_monotonic_timestamps": [
2756 9223372036854775807,
2757 9223372036854775807,
2758 9223372036854775807
2759 ],
2760 "oldest_remote_unreliable_monotonic_timestamps": [
2761 9223372036854775807,
2762 100000,
2763 9223372036854775807
2764 ],
2765 "oldest_local_unreliable_monotonic_timestamps": [
2766 9223372036854775807,
2767 100000,
2768 9223372036854775807
2769 ]
2770})")),
2771 part2_(MakeHeader(config_, R"({
2772 /* 100ms */
2773 "max_out_of_order_duration": 100000000,
2774 "node": {
2775 "name": "pi1"
2776 },
2777 "logger_node": {
2778 "name": "pi1"
2779 },
2780 "monotonic_start_time": 1000000,
2781 "realtime_start_time": 1000000000000,
2782 "logger_monotonic_start_time": 1000000,
2783 "logger_realtime_start_time": 1000000000000,
2784 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2785 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2786 "parts_index": 2,
2787 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2788 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2789 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2790 "boot_uuids": [
2791 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2792 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2793 ""
2794 ],
2795 "oldest_remote_monotonic_timestamps": [
2796 9223372036854775807,
2797 9223372036854775807,
2798 9223372036854775807
2799 ],
2800 "oldest_local_monotonic_timestamps": [
2801 9223372036854775807,
2802 9223372036854775807,
2803 9223372036854775807
2804 ],
2805 "oldest_remote_unreliable_monotonic_timestamps": [
2806 9223372036854775807,
2807 200000,
2808 9223372036854775807
2809 ],
2810 "oldest_local_unreliable_monotonic_timestamps": [
2811 9223372036854775807,
2812 200000,
2813 9223372036854775807
2814 ]
2815})")),
2816 part3_(MakeHeader(config_, R"({
2817 /* 100ms */
2818 "max_out_of_order_duration": 100000000,
2819 "node": {
2820 "name": "pi1"
2821 },
2822 "logger_node": {
2823 "name": "pi1"
2824 },
2825 "monotonic_start_time": 1000000,
2826 "realtime_start_time": 1000000000000,
2827 "logger_monotonic_start_time": 1000000,
2828 "logger_realtime_start_time": 1000000000000,
2829 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2830 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2831 "parts_index": 3,
2832 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2833 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2834 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2835 "boot_uuids": [
2836 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2837 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2838 ""
2839 ],
2840 "oldest_remote_monotonic_timestamps": [
2841 9223372036854775807,
2842 9223372036854775807,
2843 9223372036854775807
2844 ],
2845 "oldest_local_monotonic_timestamps": [
2846 9223372036854775807,
2847 9223372036854775807,
2848 9223372036854775807
2849 ],
2850 "oldest_remote_unreliable_monotonic_timestamps": [
2851 9223372036854775807,
2852 300000,
2853 9223372036854775807
2854 ],
2855 "oldest_local_unreliable_monotonic_timestamps": [
2856 9223372036854775807,
2857 300000,
2858 9223372036854775807
2859 ]
2860})")) {}
2861
2862 protected:
2863 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part0_;
2864 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part1_;
2865 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part2_;
2866 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part3_;
2867};
2868
2869// Tests that if 2 computers go back and forth trying to be the same node, we
2870// die in sorting instead of failing to estimate time.
2871TEST_F(SortingDeathTest, FightingNodes) {
2872 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002873 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh44c61472021-11-22 21:04:10 -08002874 writer0.QueueSpan(part0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002875 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh44c61472021-11-22 21:04:10 -08002876 writer1.QueueSpan(part1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002877 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuh44c61472021-11-22 21:04:10 -08002878 writer2.QueueSpan(part2_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002879 TestDetachedBufferWriter writer3(logfile3_);
Austin Schuh44c61472021-11-22 21:04:10 -08002880 writer3.QueueSpan(part3_.span());
2881 }
2882
2883 EXPECT_DEATH(
2884 {
2885 const std::vector<LogFile> parts =
2886 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2887 },
Austin Schuh22cf7862022-09-19 19:09:42 -07002888 "found overlapping boots on");
Austin Schuh44c61472021-11-22 21:04:10 -08002889}
2890
Brian Smarttea913d42021-12-10 15:02:38 -08002891// Tests that we MessageReader blows up on a bad message.
2892TEST(MessageReaderConfirmCrash, ReadWrite) {
2893 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
2894 unlink(logfile.c_str());
2895
2896 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
2897 JsonToSizedFlatbuffer<LogFileHeader>(
2898 R"({ "max_out_of_order_duration": 100000000 })");
2899 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
2900 JsonToSizedFlatbuffer<MessageHeader>(
2901 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
2902 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
2903 JsonToSizedFlatbuffer<MessageHeader>(
2904 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
2905 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m4 =
2906 JsonToSizedFlatbuffer<MessageHeader>(
2907 R"({ "channel_index": 0, "monotonic_sent_time": 4 })");
2908
2909 // Starts out like a proper flat buffer header, but it breaks down ...
2910 std::vector<uint8_t> garbage{8, 0, 0, 0, 16, 0, 0, 0, 4, 0, 0, 0};
2911 absl::Span<uint8_t> m3_span(garbage);
2912
2913 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002914 TestDetachedBufferWriter writer(logfile);
Brian Smarttea913d42021-12-10 15:02:38 -08002915 writer.QueueSpan(config.span());
2916 writer.QueueSpan(m1.span());
2917 writer.QueueSpan(m2.span());
2918 writer.QueueSpan(m3_span);
2919 writer.QueueSpan(m4.span()); // This message is "hidden"
2920 }
2921
2922 {
2923 MessageReader reader(logfile);
2924
2925 EXPECT_EQ(reader.filename(), logfile);
2926
2927 EXPECT_EQ(
2928 reader.max_out_of_order_duration(),
2929 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2930 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2931 EXPECT_TRUE(reader.ReadMessage());
2932 EXPECT_EQ(reader.newest_timestamp(),
2933 monotonic_clock::time_point(chrono::nanoseconds(1)));
2934 EXPECT_TRUE(reader.ReadMessage());
2935 EXPECT_EQ(reader.newest_timestamp(),
2936 monotonic_clock::time_point(chrono::nanoseconds(2)));
2937 // Confirm default crashing behavior
2938 EXPECT_DEATH(reader.ReadMessage(), "Corrupted message at offset");
2939 }
2940
2941 {
2942 gflags::FlagSaver fs;
2943
2944 MessageReader reader(logfile);
2945 reader.set_crash_on_corrupt_message_flag(false);
2946
2947 EXPECT_EQ(reader.filename(), logfile);
2948
2949 EXPECT_EQ(
2950 reader.max_out_of_order_duration(),
2951 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2952 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2953 EXPECT_TRUE(reader.ReadMessage());
2954 EXPECT_EQ(reader.newest_timestamp(),
2955 monotonic_clock::time_point(chrono::nanoseconds(1)));
2956 EXPECT_TRUE(reader.ReadMessage());
2957 EXPECT_EQ(reader.newest_timestamp(),
2958 monotonic_clock::time_point(chrono::nanoseconds(2)));
2959 // Confirm avoiding the corrupted message crash, stopping instead.
2960 EXPECT_FALSE(reader.ReadMessage());
2961 }
2962
2963 {
2964 gflags::FlagSaver fs;
2965
2966 MessageReader reader(logfile);
2967 reader.set_crash_on_corrupt_message_flag(false);
2968 reader.set_ignore_corrupt_messages_flag(true);
2969
2970 EXPECT_EQ(reader.filename(), logfile);
2971
2972 EXPECT_EQ(
2973 reader.max_out_of_order_duration(),
2974 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2975 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2976 EXPECT_TRUE(reader.ReadMessage());
2977 EXPECT_EQ(reader.newest_timestamp(),
2978 monotonic_clock::time_point(chrono::nanoseconds(1)));
2979 EXPECT_TRUE(reader.ReadMessage());
2980 EXPECT_EQ(reader.newest_timestamp(),
2981 monotonic_clock::time_point(chrono::nanoseconds(2)));
2982 // Confirm skipping of the corrupted message to read the hidden one.
2983 EXPECT_TRUE(reader.ReadMessage());
2984 EXPECT_EQ(reader.newest_timestamp(),
2985 monotonic_clock::time_point(chrono::nanoseconds(4)));
2986 EXPECT_FALSE(reader.ReadMessage());
2987 }
2988}
2989
Austin Schuhfa30c352022-10-16 11:12:02 -07002990class InlinePackMessage : public ::testing::Test {
2991 protected:
2992 aos::Context RandomContext() {
2993 data_ = RandomData();
2994 std::uniform_int_distribution<uint32_t> uint32_distribution(
2995 std::numeric_limits<uint32_t>::min(),
2996 std::numeric_limits<uint32_t>::max());
2997
2998 std::uniform_int_distribution<int64_t> time_distribution(
2999 std::numeric_limits<int64_t>::min(),
3000 std::numeric_limits<int64_t>::max());
3001
3002 aos::Context context;
3003 context.monotonic_event_time =
3004 aos::monotonic_clock::epoch() +
3005 chrono::nanoseconds(time_distribution(random_number_generator_));
3006 context.realtime_event_time =
3007 aos::realtime_clock::epoch() +
3008 chrono::nanoseconds(time_distribution(random_number_generator_));
3009
3010 context.monotonic_remote_time =
3011 aos::monotonic_clock::epoch() +
3012 chrono::nanoseconds(time_distribution(random_number_generator_));
3013 context.realtime_remote_time =
3014 aos::realtime_clock::epoch() +
3015 chrono::nanoseconds(time_distribution(random_number_generator_));
3016
3017 context.queue_index = uint32_distribution(random_number_generator_);
3018 context.remote_queue_index = uint32_distribution(random_number_generator_);
3019 context.size = data_.size();
3020 context.data = data_.data();
3021 return context;
3022 }
3023
Austin Schuhf2d0e682022-10-16 14:20:58 -07003024 aos::monotonic_clock::time_point RandomMonotonic() {
3025 std::uniform_int_distribution<int64_t> time_distribution(
3026 0, std::numeric_limits<int64_t>::max());
3027 return aos::monotonic_clock::epoch() +
3028 chrono::nanoseconds(time_distribution(random_number_generator_));
3029 }
3030
3031 aos::SizePrefixedFlatbufferDetachedBuffer<message_bridge::RemoteMessage>
3032 RandomRemoteMessage() {
3033 std::uniform_int_distribution<uint8_t> uint8_distribution(
3034 std::numeric_limits<uint8_t>::min(),
3035 std::numeric_limits<uint8_t>::max());
3036
3037 std::uniform_int_distribution<int64_t> time_distribution(
3038 std::numeric_limits<int64_t>::min(),
3039 std::numeric_limits<int64_t>::max());
3040
3041 flatbuffers::FlatBufferBuilder fbb;
3042 message_bridge::RemoteMessage::Builder builder(fbb);
3043 builder.add_queue_index(uint8_distribution(random_number_generator_));
3044
3045 builder.add_monotonic_sent_time(
3046 time_distribution(random_number_generator_));
3047 builder.add_realtime_sent_time(time_distribution(random_number_generator_));
3048 builder.add_monotonic_remote_time(
3049 time_distribution(random_number_generator_));
3050 builder.add_realtime_remote_time(
3051 time_distribution(random_number_generator_));
3052
3053 builder.add_remote_queue_index(
3054 uint8_distribution(random_number_generator_));
3055
3056 fbb.FinishSizePrefixed(builder.Finish());
3057 return fbb.Release();
3058 }
3059
Austin Schuhfa30c352022-10-16 11:12:02 -07003060 std::vector<uint8_t> RandomData() {
3061 std::vector<uint8_t> result;
3062 std::uniform_int_distribution<int> length_distribution(1, 32);
3063 std::uniform_int_distribution<uint8_t> data_distribution(
3064 std::numeric_limits<uint8_t>::min(),
3065 std::numeric_limits<uint8_t>::max());
3066
3067 const size_t length = length_distribution(random_number_generator_);
3068
3069 result.reserve(length);
3070 for (size_t i = 0; i < length; ++i) {
3071 result.emplace_back(data_distribution(random_number_generator_));
3072 }
3073 return result;
3074 }
3075
3076 std::mt19937 random_number_generator_{
3077 std::mt19937(::aos::testing::RandomSeed())};
3078
3079 std::vector<uint8_t> data_;
3080};
3081
3082// Uses the binary schema to annotate a provided flatbuffer. Returns the
3083// annotated flatbuffer.
3084std::string AnnotateBinaries(
3085 const aos::NonSizePrefixedFlatbuffer<reflection::Schema> &schema,
3086 const std::string &schema_filename,
3087 flatbuffers::span<uint8_t> binary_data) {
3088 flatbuffers::BinaryAnnotator binary_annotator(
3089 schema.span().data(), schema.span().size(), binary_data.data(),
3090 binary_data.size());
3091
3092 auto annotations = binary_annotator.Annotate();
3093
3094 flatbuffers::AnnotatedBinaryTextGenerator text_generator(
3095 flatbuffers::AnnotatedBinaryTextGenerator::Options{}, annotations,
3096 binary_data.data(), binary_data.size());
3097
3098 text_generator.Generate(aos::testing::TestTmpDir() + "/foo.bfbs",
3099 schema_filename);
3100
3101 return aos::util::ReadFileToStringOrDie(aos::testing::TestTmpDir() +
3102 "/foo.afb");
3103}
3104
Austin Schuh71a40d42023-02-04 21:22:22 -08003105// Event loop which just has working time functions for the Copier classes
3106// tested below.
3107class TimeEventLoop : public EventLoop {
3108 public:
3109 TimeEventLoop() : EventLoop(nullptr) {}
3110
3111 aos::monotonic_clock::time_point monotonic_now() const final {
3112 return aos::monotonic_clock::min_time;
3113 }
3114 realtime_clock::time_point realtime_now() const final {
3115 return aos::realtime_clock::min_time;
3116 }
3117
3118 void OnRun(::std::function<void()> /*on_run*/) final { LOG(FATAL); }
3119
3120 const std::string_view name() const final { return "time"; }
3121 const Node *node() const final { return nullptr; }
3122
3123 void SetRuntimeAffinity(const cpu_set_t & /*cpuset*/) final { LOG(FATAL); }
3124 void SetRuntimeRealtimePriority(int /*priority*/) final { LOG(FATAL); }
3125
3126 const cpu_set_t &runtime_affinity() const final {
3127 LOG(FATAL);
3128 return cpuset_;
3129 }
3130
3131 TimerHandler *AddTimer(::std::function<void()> /*callback*/) final {
3132 LOG(FATAL);
3133 return nullptr;
3134 }
3135
3136 std::unique_ptr<RawSender> MakeRawSender(const Channel * /*channel*/) final {
3137 LOG(FATAL);
3138 return std::unique_ptr<RawSender>();
3139 }
3140
3141 const UUID &boot_uuid() const final {
3142 LOG(FATAL);
3143 return boot_uuid_;
3144 }
3145
3146 void set_name(const std::string_view name) final { LOG(FATAL) << name; }
3147
3148 pid_t GetTid() final {
3149 LOG(FATAL);
3150 return 0;
3151 }
3152
3153 int NumberBuffers(const Channel * /*channel*/) final {
3154 LOG(FATAL);
3155 return 0;
3156 }
3157
3158 int runtime_realtime_priority() const final {
3159 LOG(FATAL);
3160 return 0;
3161 }
3162
3163 std::unique_ptr<RawFetcher> MakeRawFetcher(
3164 const Channel * /*channel*/) final {
3165 LOG(FATAL);
3166 return std::unique_ptr<RawFetcher>();
3167 }
3168
3169 PhasedLoopHandler *AddPhasedLoop(
3170 ::std::function<void(int)> /*callback*/,
3171 const monotonic_clock::duration /*interval*/,
3172 const monotonic_clock::duration /*offset*/) final {
3173 LOG(FATAL);
3174 return nullptr;
3175 }
3176
3177 void MakeRawWatcher(
3178 const Channel * /*channel*/,
3179 std::function<void(const Context &context, const void *message)>
3180 /*watcher*/) final {
3181 LOG(FATAL);
3182 }
3183
3184 private:
3185 const cpu_set_t cpuset_ = DefaultAffinity();
3186 UUID boot_uuid_ = UUID ::Zero();
3187};
3188
Austin Schuhfa30c352022-10-16 11:12:02 -07003189// Tests that all variations of PackMessage are equivalent to the inline
3190// PackMessage used to avoid allocations.
3191TEST_F(InlinePackMessage, Equivilent) {
3192 std::uniform_int_distribution<uint32_t> uint32_distribution(
3193 std::numeric_limits<uint32_t>::min(),
3194 std::numeric_limits<uint32_t>::max());
3195 aos::FlatbufferVector<reflection::Schema> schema =
3196 FileToFlatbuffer<reflection::Schema>(
3197 ArtifactPath("aos/events/logging/logger.bfbs"));
3198
3199 for (const LogType type :
3200 {LogType::kLogMessage, LogType::kLogDeliveryTimeOnly,
3201 LogType::kLogMessageAndDeliveryTime, LogType::kLogRemoteMessage}) {
3202 for (int i = 0; i < 100; ++i) {
3203 aos::Context context = RandomContext();
3204 const uint32_t channel_index =
3205 uint32_distribution(random_number_generator_);
3206
3207 flatbuffers::FlatBufferBuilder fbb;
3208 fbb.ForceDefaults(true);
3209 fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index, type));
3210
3211 VLOG(1) << absl::BytesToHexString(std::string_view(
3212 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3213 fbb.GetBufferSpan().size()));
3214
3215 // Make sure that both the builder and inline method agree on sizes.
Austin Schuh48d10d62022-10-16 22:19:23 -07003216 ASSERT_EQ(fbb.GetSize(), PackMessageSize(type, context.size))
Austin Schuhfa30c352022-10-16 11:12:02 -07003217 << "log type " << static_cast<int>(type);
3218
3219 // Initialize the buffer to something nonzero to make sure all the padding
3220 // bytes are set to 0.
Austin Schuh48d10d62022-10-16 22:19:23 -07003221 std::vector<uint8_t> repacked_message(PackMessageSize(type, context.size),
3222 67);
Austin Schuhfa30c352022-10-16 11:12:02 -07003223
3224 // And verify packing inline works as expected.
Austin Schuh71a40d42023-02-04 21:22:22 -08003225 EXPECT_EQ(
3226 repacked_message.size(),
3227 PackMessageInline(repacked_message.data(), context, channel_index,
3228 type, 0u, repacked_message.size()));
Austin Schuhfa30c352022-10-16 11:12:02 -07003229 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
3230 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3231 fbb.GetBufferSpan().size()))
3232 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
3233 fbb.GetBufferSpan());
Austin Schuh71a40d42023-02-04 21:22:22 -08003234
3235 // Ok, now we want to confirm that we can build up arbitrary pieces of
3236 // said flatbuffer. Try all of them since it is cheap.
3237 TimeEventLoop event_loop;
3238 for (size_t i = 0; i < repacked_message.size(); i += 8) {
3239 for (size_t j = i; j < repacked_message.size(); j += 8) {
3240 std::vector<uint8_t> destination(repacked_message.size(), 67u);
3241 ContextDataCopier copier(context, channel_index, type, &event_loop);
3242
3243 copier.Copy(destination.data(), i, j);
3244
3245 size_t index = 0;
3246 for (size_t k = i; k < j; ++k) {
3247 ASSERT_EQ(destination[index], repacked_message[k])
3248 << ": Failed to match type " << static_cast<int>(type)
3249 << ", index " << index << " while testing range " << i << " to "
3250 << j;
3251 ;
3252 ++index;
3253 }
3254 // Now, confirm that none of the other bytes have been touched.
3255 for (; index < destination.size(); ++index) {
3256 ASSERT_EQ(destination[index], 67u);
3257 }
3258 }
3259 }
Austin Schuhfa30c352022-10-16 11:12:02 -07003260 }
3261 }
3262}
3263
Austin Schuhf2d0e682022-10-16 14:20:58 -07003264// Tests that all variations of PackMessage are equivilent to the inline
3265// PackMessage used to avoid allocations.
3266TEST_F(InlinePackMessage, RemoteEquivilent) {
3267 aos::FlatbufferVector<reflection::Schema> schema =
3268 FileToFlatbuffer<reflection::Schema>(
3269 ArtifactPath("aos/events/logging/logger.bfbs"));
3270 std::uniform_int_distribution<uint8_t> uint8_distribution(
3271 std::numeric_limits<uint8_t>::min(), std::numeric_limits<uint8_t>::max());
3272
3273 for (int i = 0; i < 100; ++i) {
3274 aos::SizePrefixedFlatbufferDetachedBuffer<RemoteMessage> random_msg =
3275 RandomRemoteMessage();
3276 const size_t channel_index = uint8_distribution(random_number_generator_);
3277 const monotonic_clock::time_point monotonic_timestamp_time =
3278 RandomMonotonic();
3279
3280 flatbuffers::FlatBufferBuilder fbb;
3281 fbb.ForceDefaults(true);
3282 fbb.FinishSizePrefixed(PackRemoteMessage(
3283 &fbb, &random_msg.message(), channel_index, monotonic_timestamp_time));
3284
3285 VLOG(1) << absl::BytesToHexString(std::string_view(
3286 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3287 fbb.GetBufferSpan().size()));
3288
3289 // Make sure that both the builder and inline method agree on sizes.
3290 ASSERT_EQ(fbb.GetSize(), PackRemoteMessageSize());
3291
3292 // Initialize the buffer to something nonzer to make sure all the padding
3293 // bytes are set to 0.
3294 std::vector<uint8_t> repacked_message(PackRemoteMessageSize(), 67);
3295
3296 // And verify packing inline works as expected.
Austin Schuh71a40d42023-02-04 21:22:22 -08003297 EXPECT_EQ(repacked_message.size(),
3298 PackRemoteMessageInline(
3299 repacked_message.data(), &random_msg.message(), channel_index,
3300 monotonic_timestamp_time, 0u, repacked_message.size()));
Austin Schuhf2d0e682022-10-16 14:20:58 -07003301 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
3302 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3303 fbb.GetBufferSpan().size()))
3304 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
3305 fbb.GetBufferSpan());
Austin Schuh71a40d42023-02-04 21:22:22 -08003306
3307 // Ok, now we want to confirm that we can build up arbitrary pieces of said
3308 // flatbuffer. Try all of them since it is cheap.
3309 TimeEventLoop event_loop;
3310 for (size_t i = 0; i < repacked_message.size(); i += 8) {
3311 for (size_t j = i; j < repacked_message.size(); j += 8) {
3312 std::vector<uint8_t> destination(repacked_message.size(), 67u);
3313 RemoteMessageCopier copier(&random_msg.message(), channel_index,
3314 monotonic_timestamp_time, &event_loop);
3315
3316 copier.Copy(destination.data(), i, j);
3317
3318 size_t index = 0;
3319 for (size_t k = i; k < j; ++k) {
3320 ASSERT_EQ(destination[index], repacked_message[k]);
3321 ++index;
3322 }
3323 for (; index < destination.size(); ++index) {
3324 ASSERT_EQ(destination[index], 67u);
3325 }
3326 }
3327 }
Austin Schuhf2d0e682022-10-16 14:20:58 -07003328 }
3329}
Austin Schuhfa30c352022-10-16 11:12:02 -07003330
Austin Schuhc243b422020-10-11 15:35:08 -07003331} // namespace testing
3332} // namespace logger
3333} // namespace aos