blob: b7cf90bc17a2033e3e8a880e697f517f9f19fb3a [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
Austin Schuhfa30c352022-10-16 11:12:02 -07004#include <random>
Austin Schuhe243aaf2020-10-11 15:46:02 -07005#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07006
Austin Schuhfa30c352022-10-16 11:12:02 -07007#include "absl/strings/escaping.h"
Austin Schuhc41603c2020-10-11 16:17:37 -07008#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -07009#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -080010#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070011#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070012#include "aos/json_to_flatbuffer.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070013#include "aos/testing/path.h"
14#include "aos/testing/random_seed.h"
Austin Schuhc243b422020-10-11 15:35:08 -070015#include "aos/testing/tmpdir.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070016#include "aos/util/file.h"
17#include "external/com_github_google_flatbuffers/src/annotated_binary_text_gen.h"
18#include "external/com_github_google_flatbuffers/src/binary_annotator.h"
19#include "flatbuffers/reflection_generated.h"
Brian Smarttea913d42021-12-10 15:02:38 -080020#include "gflags/gflags.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070021#include "gtest/gtest.h"
Austin Schuhc243b422020-10-11 15:35:08 -070022
23namespace aos {
24namespace logger {
25namespace testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070026namespace chrono = std::chrono;
Austin Schuhfa30c352022-10-16 11:12:02 -070027using aos::testing::ArtifactPath;
Austin Schuhc243b422020-10-11 15:35:08 -070028
Austin Schuhd863e6e2022-10-16 15:44:50 -070029// Adapter class to make it easy to test DetachedBufferWriter without adding
30// test only boilerplate to DetachedBufferWriter.
31class TestDetachedBufferWriter : public DetachedBufferWriter {
32 public:
33 TestDetachedBufferWriter(std::string_view filename)
34 : DetachedBufferWriter(filename, std::make_unique<DummyEncoder>()) {}
35 void WriteSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
36 QueueSpan(absl::Span<const uint8_t>(buffer.data(), buffer.size()));
37 }
38};
39
Austin Schuhe243aaf2020-10-11 15:46:02 -070040// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070041template <typename T>
42SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
43 const std::string_view data) {
44 flatbuffers::FlatBufferBuilder fbb;
45 fbb.ForceDefaults(true);
46 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
47 return fbb.Release();
48}
49
Austin Schuhe243aaf2020-10-11 15:46:02 -070050// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070051TEST(SpanReaderTest, ReadWrite) {
52 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
53 unlink(logfile.c_str());
54
55 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080056 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070057 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080058 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070059
60 {
Austin Schuhd863e6e2022-10-16 15:44:50 -070061 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080062 writer.QueueSpan(m1.span());
63 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070064 }
65
66 SpanReader reader(logfile);
67
68 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhcf5f6442021-07-06 10:43:28 -070069 EXPECT_EQ(reader.PeekMessage(), m1.span());
70 EXPECT_EQ(reader.PeekMessage(), m1.span());
Austin Schuhadd6eb32020-11-09 21:24:26 -080071 EXPECT_EQ(reader.ReadMessage(), m1.span());
72 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhcf5f6442021-07-06 10:43:28 -070073 EXPECT_EQ(reader.PeekMessage(), absl::Span<const uint8_t>());
Austin Schuhc243b422020-10-11 15:35:08 -070074 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
75}
76
Austin Schuhe243aaf2020-10-11 15:46:02 -070077// Tests that we can actually parse the resulting messages at a basic level
78// through MessageReader.
79TEST(MessageReaderTest, ReadWrite) {
80 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
81 unlink(logfile.c_str());
82
83 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
84 JsonToSizedFlatbuffer<LogFileHeader>(
85 R"({ "max_out_of_order_duration": 100000000 })");
86 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
87 JsonToSizedFlatbuffer<MessageHeader>(
88 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
89 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
90 JsonToSizedFlatbuffer<MessageHeader>(
91 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
92
93 {
Austin Schuhd863e6e2022-10-16 15:44:50 -070094 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080095 writer.QueueSpan(config.span());
96 writer.QueueSpan(m1.span());
97 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -070098 }
99
100 MessageReader reader(logfile);
101
102 EXPECT_EQ(reader.filename(), logfile);
103
104 EXPECT_EQ(
105 reader.max_out_of_order_duration(),
106 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
107 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
108 EXPECT_TRUE(reader.ReadMessage());
109 EXPECT_EQ(reader.newest_timestamp(),
110 monotonic_clock::time_point(chrono::nanoseconds(1)));
111 EXPECT_TRUE(reader.ReadMessage());
112 EXPECT_EQ(reader.newest_timestamp(),
113 monotonic_clock::time_point(chrono::nanoseconds(2)));
114 EXPECT_FALSE(reader.ReadMessage());
115}
116
Austin Schuh32f68492020-11-08 21:45:51 -0800117// Tests that we explode when messages are too far out of order.
118TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
119 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
120 unlink(logfile0.c_str());
121
122 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
123 JsonToSizedFlatbuffer<LogFileHeader>(
124 R"({
125 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800126 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800127 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
128 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
129 "parts_index": 0
130})");
131
132 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
133 JsonToSizedFlatbuffer<MessageHeader>(
134 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
135 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
136 JsonToSizedFlatbuffer<MessageHeader>(
137 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
138 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
139 JsonToSizedFlatbuffer<MessageHeader>(
140 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
141
142 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700143 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800144 writer.QueueSpan(config0.span());
145 writer.QueueSpan(m1.span());
146 writer.QueueSpan(m2.span());
147 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800148 }
149
150 const std::vector<LogFile> parts = SortParts({logfile0});
151
152 PartsMessageReader reader(parts[0].parts[0]);
153
154 EXPECT_TRUE(reader.ReadMessage());
155 EXPECT_TRUE(reader.ReadMessage());
156 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
157}
158
Austin Schuhc41603c2020-10-11 16:17:37 -0700159// Tests that we can transparently re-assemble part files with a
160// PartsMessageReader.
161TEST(PartsMessageReaderTest, ReadWrite) {
162 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
163 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
164 unlink(logfile0.c_str());
165 unlink(logfile1.c_str());
166
167 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
168 JsonToSizedFlatbuffer<LogFileHeader>(
169 R"({
170 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800171 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700172 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
173 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
174 "parts_index": 0
175})");
176 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
177 JsonToSizedFlatbuffer<LogFileHeader>(
178 R"({
179 "max_out_of_order_duration": 200000000,
180 "monotonic_start_time": 0,
181 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800182 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700183 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
184 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
185 "parts_index": 1
186})");
187
188 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
189 JsonToSizedFlatbuffer<MessageHeader>(
190 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
191 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
192 JsonToSizedFlatbuffer<MessageHeader>(
193 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
194
195 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700196 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800197 writer.QueueSpan(config0.span());
198 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700199 }
200 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700201 TestDetachedBufferWriter writer(logfile1);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800202 writer.QueueSpan(config1.span());
203 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700204 }
205
206 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
207
208 PartsMessageReader reader(parts[0].parts[0]);
209
210 EXPECT_EQ(reader.filename(), logfile0);
211
212 // Confirm that the timestamps track, and the filename also updates.
213 // Read the first message.
214 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
215 EXPECT_EQ(
216 reader.max_out_of_order_duration(),
217 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
218 EXPECT_TRUE(reader.ReadMessage());
219 EXPECT_EQ(reader.filename(), logfile0);
220 EXPECT_EQ(reader.newest_timestamp(),
221 monotonic_clock::time_point(chrono::nanoseconds(1)));
222 EXPECT_EQ(
223 reader.max_out_of_order_duration(),
224 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
225
226 // Read the second message.
227 EXPECT_TRUE(reader.ReadMessage());
228 EXPECT_EQ(reader.filename(), logfile1);
229 EXPECT_EQ(reader.newest_timestamp(),
230 monotonic_clock::time_point(chrono::nanoseconds(2)));
231 EXPECT_EQ(
232 reader.max_out_of_order_duration(),
233 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
234
235 // And then confirm that reading again returns no message.
236 EXPECT_FALSE(reader.ReadMessage());
237 EXPECT_EQ(reader.filename(), logfile1);
238 EXPECT_EQ(
239 reader.max_out_of_order_duration(),
240 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800241 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Austin Schuhc41603c2020-10-11 16:17:37 -0700242}
Austin Schuh32f68492020-11-08 21:45:51 -0800243
Austin Schuh1be0ce42020-11-29 22:43:26 -0800244// Tests that Message's operator < works as expected.
245TEST(MessageTest, Sorting) {
246 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
247
248 Message m1{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700249 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700250 .timestamp =
251 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
Austin Schuh48507722021-07-17 17:29:24 -0700252 .monotonic_remote_boot = 0xffffff,
253 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700254 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800255 Message m2{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700256 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700257 .timestamp =
258 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
Austin Schuh48507722021-07-17 17:29:24 -0700259 .monotonic_remote_boot = 0xffffff,
260 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700261 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800262
263 EXPECT_LT(m1, m2);
264 EXPECT_GE(m2, m1);
265
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700266 m1.timestamp.time = e;
267 m2.timestamp.time = e;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800268
269 m1.channel_index = 1;
270 m2.channel_index = 2;
271
272 EXPECT_LT(m1, m2);
273 EXPECT_GE(m2, m1);
274
275 m1.channel_index = 0;
276 m2.channel_index = 0;
Austin Schuh58646e22021-08-23 23:51:46 -0700277 m1.queue_index.index = 0u;
278 m2.queue_index.index = 1u;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800279
280 EXPECT_LT(m1, m2);
281 EXPECT_GE(m2, m1);
282}
283
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800284aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
285 const aos::FlatbufferDetachedBuffer<Configuration> &config,
286 const std::string_view json) {
287 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700288 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800289 flatbuffers::Offset<Configuration> config_offset =
290 aos::CopyFlatBuffer(config, &fbb);
291 LogFileHeader::Builder header_builder(fbb);
292 header_builder.add_configuration(config_offset);
293 fbb.Finish(header_builder.Finish());
294 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
295
296 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
297 JsonToFlatbuffer<LogFileHeader>(json));
298 CHECK(header_updates.Verify());
299 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700300 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800301 fbb2.FinishSizePrefixed(
302 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
303 return fbb2.Release();
304}
305
306class SortingElementTest : public ::testing::Test {
307 public:
308 SortingElementTest()
309 : config_(JsonToFlatbuffer<Configuration>(
310 R"({
311 "channels": [
312 {
313 "name": "/a",
314 "type": "aos.logger.testing.TestMessage",
315 "source_node": "pi1",
316 "destination_nodes": [
317 {
318 "name": "pi2"
319 },
320 {
321 "name": "pi3"
322 }
323 ]
324 },
325 {
326 "name": "/b",
327 "type": "aos.logger.testing.TestMessage",
328 "source_node": "pi1"
329 },
330 {
331 "name": "/c",
332 "type": "aos.logger.testing.TestMessage",
333 "source_node": "pi1"
Austin Schuh48507722021-07-17 17:29:24 -0700334 },
335 {
336 "name": "/d",
337 "type": "aos.logger.testing.TestMessage",
338 "source_node": "pi2",
339 "destination_nodes": [
340 {
341 "name": "pi1"
342 }
343 ]
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800344 }
345 ],
346 "nodes": [
347 {
348 "name": "pi1"
349 },
350 {
351 "name": "pi2"
352 },
353 {
354 "name": "pi3"
355 }
356 ]
357}
358)")),
359 config0_(MakeHeader(config_, R"({
360 /* 100ms */
361 "max_out_of_order_duration": 100000000,
362 "node": {
363 "name": "pi1"
364 },
365 "logger_node": {
366 "name": "pi1"
367 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800368 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800369 "realtime_start_time": 1000000000000,
370 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700371 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
372 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
373 "boot_uuids": [
374 "1d782c63-b3c7-466e-bea9-a01308b43333",
375 "",
376 ""
377 ],
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800378 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
379 "parts_index": 0
380})")),
381 config1_(MakeHeader(config_,
382 R"({
383 /* 100ms */
384 "max_out_of_order_duration": 100000000,
385 "node": {
386 "name": "pi1"
387 },
388 "logger_node": {
389 "name": "pi1"
390 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800391 "monotonic_start_time": 1000000,
392 "realtime_start_time": 1000000000000,
393 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700394 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
395 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
396 "boot_uuids": [
397 "1d782c63-b3c7-466e-bea9-a01308b43333",
398 "",
399 ""
400 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800401 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
402 "parts_index": 0
403})")),
404 config2_(MakeHeader(config_,
405 R"({
406 /* 100ms */
407 "max_out_of_order_duration": 100000000,
408 "node": {
409 "name": "pi2"
410 },
411 "logger_node": {
412 "name": "pi2"
413 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800414 "monotonic_start_time": 0,
415 "realtime_start_time": 1000000000000,
Austin Schuh48507722021-07-17 17:29:24 -0700416 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
417 "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
418 "boot_uuids": [
419 "",
420 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
421 ""
422 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800423 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
424 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
425 "parts_index": 0
426})")),
427 config3_(MakeHeader(config_,
428 R"({
429 /* 100ms */
430 "max_out_of_order_duration": 100000000,
431 "node": {
432 "name": "pi1"
433 },
434 "logger_node": {
435 "name": "pi1"
436 },
437 "monotonic_start_time": 2000000,
438 "realtime_start_time": 1000000000,
439 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
Austin Schuh48507722021-07-17 17:29:24 -0700440 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
441 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
442 "boot_uuids": [
443 "1d782c63-b3c7-466e-bea9-a01308b43333",
444 "",
445 ""
446 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800447 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800448 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800449})")),
450 config4_(MakeHeader(config_,
451 R"({
452 /* 100ms */
453 "max_out_of_order_duration": 100000000,
454 "node": {
455 "name": "pi2"
456 },
457 "logger_node": {
458 "name": "pi1"
459 },
460 "monotonic_start_time": 2000000,
461 "realtime_start_time": 1000000000,
462 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
463 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
Austin Schuh48507722021-07-17 17:29:24 -0700464 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
465 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
466 "boot_uuids": [
467 "1d782c63-b3c7-466e-bea9-a01308b43333",
468 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
469 ""
470 ],
Austin Schuh8bf1e632021-01-02 22:41:04 -0800471 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800472})")) {
473 unlink(logfile0_.c_str());
474 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800475 unlink(logfile2_.c_str());
Austin Schuh48507722021-07-17 17:29:24 -0700476 unlink(logfile3_.c_str());
Austin Schuh921b2562021-07-31 19:37:03 -0700477 queue_index_.resize(config_.message().channels()->size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800478 }
479
480 protected:
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800481 flatbuffers::DetachedBuffer MakeLogMessage(
482 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
483 int value) {
484 flatbuffers::FlatBufferBuilder message_fbb;
485 message_fbb.ForceDefaults(true);
486 TestMessage::Builder test_message_builder(message_fbb);
487 test_message_builder.add_value(value);
488 message_fbb.Finish(test_message_builder.Finish());
489
490 aos::Context context;
491 context.monotonic_event_time = monotonic_now;
492 context.realtime_event_time = aos::realtime_clock::epoch() +
493 chrono::seconds(1000) +
494 monotonic_now.time_since_epoch();
Austin Schuh921b2562021-07-31 19:37:03 -0700495 CHECK_LT(static_cast<size_t>(channel_index), queue_index_.size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800496 context.queue_index = queue_index_[channel_index];
497 context.size = message_fbb.GetSize();
498 context.data = message_fbb.GetBufferPointer();
499
500 ++queue_index_[channel_index];
501
502 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh53e3a812021-11-03 16:57:23 -0700503 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800504 fbb.FinishSizePrefixed(
505 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
506
507 return fbb.Release();
508 }
509
510 flatbuffers::DetachedBuffer MakeTimestampMessage(
511 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800512 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
513 monotonic_clock::time_point monotonic_timestamp_time =
514 monotonic_clock::min_time) {
515 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800516 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800517
518 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800519 fbb.ForceDefaults(true);
520
521 logger::MessageHeader::Builder message_header_builder(fbb);
522
523 message_header_builder.add_channel_index(channel_index);
524
525 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
526 100);
527 message_header_builder.add_monotonic_sent_time(
528 monotonic_sent_time.time_since_epoch().count());
529 message_header_builder.add_realtime_sent_time(
530 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
531 monotonic_sent_time.time_since_epoch())
532 .time_since_epoch()
533 .count());
534
535 message_header_builder.add_monotonic_remote_time(
536 sender_monotonic_now.time_since_epoch().count());
537 message_header_builder.add_realtime_remote_time(
538 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
539 sender_monotonic_now.time_since_epoch())
540 .time_since_epoch()
541 .count());
542 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
543 1);
544
545 if (monotonic_timestamp_time != monotonic_clock::min_time) {
546 message_header_builder.add_monotonic_timestamp_time(
547 monotonic_timestamp_time.time_since_epoch().count());
548 }
549
550 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800551 LOG(INFO) << aos::FlatbufferToJson(
552 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
553 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
554
555 return fbb.Release();
556 }
557
558 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
559 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800560 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh48507722021-07-17 17:29:24 -0700561 const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800562
563 const aos::FlatbufferDetachedBuffer<Configuration> config_;
564 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
565 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800566 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
567 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800568 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800569
570 std::vector<uint32_t> queue_index_;
571};
572
573using LogPartsSorterTest = SortingElementTest;
574using LogPartsSorterDeathTest = LogPartsSorterTest;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800575using NodeMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800576using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800577
578// Tests that we can pull messages out of a log sorted in order.
579TEST_F(LogPartsSorterTest, Pull) {
580 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
581 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700582 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800583 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700584 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800585 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700586 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800587 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700588 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800589 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700590 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800591 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
592 }
593
594 const std::vector<LogFile> parts = SortParts({logfile0_});
595
596 LogPartsSorter parts_sorter(parts[0].parts[0]);
597
598 // Confirm we aren't sorted until any time until the message is popped.
599 // Peeking shouldn't change the sorted until time.
600 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
601
602 std::deque<Message> output;
603
604 ASSERT_TRUE(parts_sorter.Front() != nullptr);
605 output.emplace_back(std::move(*parts_sorter.Front()));
606 parts_sorter.PopFront();
607 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
608
609 ASSERT_TRUE(parts_sorter.Front() != nullptr);
610 output.emplace_back(std::move(*parts_sorter.Front()));
611 parts_sorter.PopFront();
612 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
613
614 ASSERT_TRUE(parts_sorter.Front() != nullptr);
615 output.emplace_back(std::move(*parts_sorter.Front()));
616 parts_sorter.PopFront();
617 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
618
619 ASSERT_TRUE(parts_sorter.Front() != nullptr);
620 output.emplace_back(std::move(*parts_sorter.Front()));
621 parts_sorter.PopFront();
622 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
623
624 ASSERT_TRUE(parts_sorter.Front() == nullptr);
625
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700626 EXPECT_EQ(output[0].timestamp.boot, 0);
627 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
628 EXPECT_EQ(output[1].timestamp.boot, 0);
629 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1000));
630 EXPECT_EQ(output[2].timestamp.boot, 0);
631 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1901));
632 EXPECT_EQ(output[3].timestamp.boot, 0);
633 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800634}
635
Austin Schuhb000de62020-12-03 22:00:40 -0800636// Tests that we can pull messages out of a log sorted in order.
637TEST_F(LogPartsSorterTest, WayBeforeStart) {
638 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
639 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700640 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhb000de62020-12-03 22:00:40 -0800641 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700642 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800643 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700644 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800645 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700646 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800647 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700648 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800649 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700650 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800651 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
652 }
653
654 const std::vector<LogFile> parts = SortParts({logfile0_});
655
656 LogPartsSorter parts_sorter(parts[0].parts[0]);
657
658 // Confirm we aren't sorted until any time until the message is popped.
659 // Peeking shouldn't change the sorted until time.
660 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
661
662 std::deque<Message> output;
663
664 for (monotonic_clock::time_point t :
665 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
666 e + chrono::milliseconds(1900), monotonic_clock::max_time,
667 monotonic_clock::max_time}) {
668 ASSERT_TRUE(parts_sorter.Front() != nullptr);
669 output.emplace_back(std::move(*parts_sorter.Front()));
670 parts_sorter.PopFront();
671 EXPECT_EQ(parts_sorter.sorted_until(), t);
672 }
673
674 ASSERT_TRUE(parts_sorter.Front() == nullptr);
675
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700676 EXPECT_EQ(output[0].timestamp.boot, 0u);
677 EXPECT_EQ(output[0].timestamp.time, e - chrono::milliseconds(1000));
678 EXPECT_EQ(output[1].timestamp.boot, 0u);
679 EXPECT_EQ(output[1].timestamp.time, e - chrono::milliseconds(500));
680 EXPECT_EQ(output[2].timestamp.boot, 0u);
681 EXPECT_EQ(output[2].timestamp.time, e - chrono::milliseconds(10));
682 EXPECT_EQ(output[3].timestamp.boot, 0u);
683 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(1901));
684 EXPECT_EQ(output[4].timestamp.boot, 0u);
685 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuhb000de62020-12-03 22:00:40 -0800686}
687
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800688// Tests that messages too far out of order trigger death.
689TEST_F(LogPartsSorterDeathTest, Pull) {
690 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
691 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700692 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800693 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700694 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800695 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700696 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800697 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700698 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800699 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
700 // The following message is too far out of order and will trigger the CHECK.
Austin Schuhd863e6e2022-10-16 15:44:50 -0700701 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800702 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
703 }
704
705 const std::vector<LogFile> parts = SortParts({logfile0_});
706
707 LogPartsSorter parts_sorter(parts[0].parts[0]);
708
709 // Confirm we aren't sorted until any time until the message is popped.
710 // Peeking shouldn't change the sorted until time.
711 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
712 std::deque<Message> output;
713
714 ASSERT_TRUE(parts_sorter.Front() != nullptr);
715 parts_sorter.PopFront();
716 ASSERT_TRUE(parts_sorter.Front() != nullptr);
717 ASSERT_TRUE(parts_sorter.Front() != nullptr);
718 parts_sorter.PopFront();
719
Austin Schuh58646e22021-08-23 23:51:46 -0700720 EXPECT_DEATH({ parts_sorter.Front(); },
721 "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800722}
723
Austin Schuh8f52ed52020-11-30 23:12:39 -0800724// Tests that we can merge data from 2 separate files, including duplicate data.
725TEST_F(NodeMergerTest, TwoFileMerger) {
726 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
727 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700728 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800729 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700730 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800731 writer1.QueueSpan(config1_.span());
732
Austin Schuhd863e6e2022-10-16 15:44:50 -0700733 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800734 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700735 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800736 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
737
Austin Schuhd863e6e2022-10-16 15:44:50 -0700738 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800739 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700740 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800741 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
742
743 // Make a duplicate!
744 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
745 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
746 writer0.QueueSpan(msg.span());
747 writer1.QueueSpan(msg.span());
748
Austin Schuhd863e6e2022-10-16 15:44:50 -0700749 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800750 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
751 }
752
753 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Austin Schuhd2f96102020-12-01 20:27:29 -0800754 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800755
Austin Schuhd2f96102020-12-01 20:27:29 -0800756 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800757
758 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
759
760 std::deque<Message> output;
761
762 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
763 ASSERT_TRUE(merger.Front() != nullptr);
764 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
765
766 output.emplace_back(std::move(*merger.Front()));
767 merger.PopFront();
768 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
769
770 ASSERT_TRUE(merger.Front() != nullptr);
771 output.emplace_back(std::move(*merger.Front()));
772 merger.PopFront();
773 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
774
775 ASSERT_TRUE(merger.Front() != nullptr);
776 output.emplace_back(std::move(*merger.Front()));
777 merger.PopFront();
778 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
779
780 ASSERT_TRUE(merger.Front() != nullptr);
781 output.emplace_back(std::move(*merger.Front()));
782 merger.PopFront();
783 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
784
785 ASSERT_TRUE(merger.Front() != nullptr);
786 output.emplace_back(std::move(*merger.Front()));
787 merger.PopFront();
788 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
789
790 ASSERT_TRUE(merger.Front() != nullptr);
791 output.emplace_back(std::move(*merger.Front()));
792 merger.PopFront();
793 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
794
795 ASSERT_TRUE(merger.Front() == nullptr);
796
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700797 EXPECT_EQ(output[0].timestamp.boot, 0u);
798 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
799 EXPECT_EQ(output[1].timestamp.boot, 0u);
800 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1001));
801 EXPECT_EQ(output[2].timestamp.boot, 0u);
802 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1002));
803 EXPECT_EQ(output[3].timestamp.boot, 0u);
804 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
805 EXPECT_EQ(output[4].timestamp.boot, 0u);
806 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(3000));
807 EXPECT_EQ(output[5].timestamp.boot, 0u);
808 EXPECT_EQ(output[5].timestamp.time, e + chrono::milliseconds(3002));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800809}
810
Austin Schuh8bf1e632021-01-02 22:41:04 -0800811// Tests that we can merge timestamps with various combinations of
812// monotonic_timestamp_time.
813TEST_F(NodeMergerTest, TwoFileTimestampMerger) {
814 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
815 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700816 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800817 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700818 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800819 writer1.QueueSpan(config1_.span());
820
821 // Neither has it.
822 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700823 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800824 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700825 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800826 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
827
828 // First only has it.
829 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700830 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800831 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
832 e + chrono::nanoseconds(971)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700833 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800834 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
835
836 // Second only has it.
837 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700838 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800839 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700840 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800841 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
842 e + chrono::nanoseconds(972)));
843
844 // Both have it.
845 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700846 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800847 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
848 e + chrono::nanoseconds(973)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700849 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800850 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
851 e + chrono::nanoseconds(973)));
852 }
853
854 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
855 ASSERT_EQ(parts.size(), 1u);
856
857 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
858
859 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
860
861 std::deque<Message> output;
862
863 for (int i = 0; i < 4; ++i) {
864 ASSERT_TRUE(merger.Front() != nullptr);
865 output.emplace_back(std::move(*merger.Front()));
866 merger.PopFront();
867 }
868 ASSERT_TRUE(merger.Front() == nullptr);
869
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700870 EXPECT_EQ(output[0].timestamp.boot, 0u);
871 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700872 EXPECT_FALSE(output[0].data->has_monotonic_timestamp_time);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700873
874 EXPECT_EQ(output[1].timestamp.boot, 0u);
875 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700876 EXPECT_TRUE(output[1].data->has_monotonic_timestamp_time);
877 EXPECT_EQ(output[1].data->monotonic_timestamp_time,
878 monotonic_clock::time_point(std::chrono::nanoseconds(971)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700879
880 EXPECT_EQ(output[2].timestamp.boot, 0u);
881 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700882 EXPECT_TRUE(output[2].data->has_monotonic_timestamp_time);
883 EXPECT_EQ(output[2].data->monotonic_timestamp_time,
884 monotonic_clock::time_point(std::chrono::nanoseconds(972)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700885
886 EXPECT_EQ(output[3].timestamp.boot, 0u);
887 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700888 EXPECT_TRUE(output[3].data->has_monotonic_timestamp_time);
889 EXPECT_EQ(output[3].data->monotonic_timestamp_time,
890 monotonic_clock::time_point(std::chrono::nanoseconds(973)));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800891}
892
Austin Schuhd2f96102020-12-01 20:27:29 -0800893// Tests that we can match timestamps on delivered messages.
894TEST_F(TimestampMapperTest, ReadNode0First) {
895 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
896 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700897 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800898 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700899 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800900 writer1.QueueSpan(config2_.span());
901
Austin Schuhd863e6e2022-10-16 15:44:50 -0700902 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800903 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700904 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800905 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
906
Austin Schuhd863e6e2022-10-16 15:44:50 -0700907 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800908 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700909 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800910 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
911
Austin Schuhd863e6e2022-10-16 15:44:50 -0700912 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800913 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700914 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800915 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
916 }
917
918 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
919
920 ASSERT_EQ(parts[0].logger_node, "pi1");
921 ASSERT_EQ(parts[1].logger_node, "pi2");
922
Austin Schuh79b30942021-01-24 22:32:21 -0800923 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800924 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -0800925 mapper0.set_timestamp_callback(
926 [&](TimestampedMessage *) { ++mapper0_count; });
927 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800928 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -0800929 mapper1.set_timestamp_callback(
930 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -0800931
932 mapper0.AddPeer(&mapper1);
933 mapper1.AddPeer(&mapper0);
934
935 {
936 std::deque<TimestampedMessage> output0;
937
Austin Schuh79b30942021-01-24 22:32:21 -0800938 EXPECT_EQ(mapper0_count, 0u);
939 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800940 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800941 EXPECT_EQ(mapper0_count, 1u);
942 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800943 output0.emplace_back(std::move(*mapper0.Front()));
944 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700945 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800946 EXPECT_EQ(mapper0_count, 1u);
947 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800948
949 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800950 EXPECT_EQ(mapper0_count, 2u);
951 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800952 output0.emplace_back(std::move(*mapper0.Front()));
953 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700954 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800955
956 ASSERT_TRUE(mapper0.Front() != nullptr);
957 output0.emplace_back(std::move(*mapper0.Front()));
958 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700959 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800960
Austin Schuh79b30942021-01-24 22:32:21 -0800961 EXPECT_EQ(mapper0_count, 3u);
962 EXPECT_EQ(mapper1_count, 0u);
963
Austin Schuhd2f96102020-12-01 20:27:29 -0800964 ASSERT_TRUE(mapper0.Front() == nullptr);
965
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700966 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
967 EXPECT_EQ(output0[0].monotonic_event_time.time,
968 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700969 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700970
971 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
972 EXPECT_EQ(output0[1].monotonic_event_time.time,
973 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700974 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700975
976 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
977 EXPECT_EQ(output0[2].monotonic_event_time.time,
978 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700979 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -0800980 }
981
982 {
983 SCOPED_TRACE("Trying node1 now");
984 std::deque<TimestampedMessage> output1;
985
Austin Schuh79b30942021-01-24 22:32:21 -0800986 EXPECT_EQ(mapper0_count, 3u);
987 EXPECT_EQ(mapper1_count, 0u);
988
Austin Schuhd2f96102020-12-01 20:27:29 -0800989 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800990 EXPECT_EQ(mapper0_count, 3u);
991 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800992 output1.emplace_back(std::move(*mapper1.Front()));
993 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700994 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800995 EXPECT_EQ(mapper0_count, 3u);
996 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800997
998 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800999 EXPECT_EQ(mapper0_count, 3u);
1000 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001001 output1.emplace_back(std::move(*mapper1.Front()));
1002 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001003 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001004
1005 ASSERT_TRUE(mapper1.Front() != nullptr);
1006 output1.emplace_back(std::move(*mapper1.Front()));
1007 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001008 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001009
Austin Schuh79b30942021-01-24 22:32:21 -08001010 EXPECT_EQ(mapper0_count, 3u);
1011 EXPECT_EQ(mapper1_count, 3u);
1012
Austin Schuhd2f96102020-12-01 20:27:29 -08001013 ASSERT_TRUE(mapper1.Front() == nullptr);
1014
Austin Schuh79b30942021-01-24 22:32:21 -08001015 EXPECT_EQ(mapper0_count, 3u);
1016 EXPECT_EQ(mapper1_count, 3u);
1017
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001018 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1019 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001020 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001021 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001022
1023 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1024 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001025 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001026 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001027
1028 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1029 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001030 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001031 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001032 }
1033}
1034
Austin Schuh8bf1e632021-01-02 22:41:04 -08001035// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
1036// returned.
1037TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
1038 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1039 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001040 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001041 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001042 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001043 writer1.QueueSpan(config4_.span());
1044
Austin Schuhd863e6e2022-10-16 15:44:50 -07001045 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001046 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001047 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001048 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
1049 e + chrono::nanoseconds(971)));
1050
Austin Schuhd863e6e2022-10-16 15:44:50 -07001051 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001052 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001053 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001054 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
1055 e + chrono::nanoseconds(5458)));
1056
Austin Schuhd863e6e2022-10-16 15:44:50 -07001057 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001058 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001059 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001060 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1061 }
1062
1063 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1064
1065 for (const auto &p : parts) {
1066 LOG(INFO) << p;
1067 }
1068
1069 ASSERT_EQ(parts.size(), 1u);
1070
Austin Schuh79b30942021-01-24 22:32:21 -08001071 size_t mapper0_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001072 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001073 mapper0.set_timestamp_callback(
1074 [&](TimestampedMessage *) { ++mapper0_count; });
1075 size_t mapper1_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001076 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001077 mapper1.set_timestamp_callback(
1078 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -08001079
1080 mapper0.AddPeer(&mapper1);
1081 mapper1.AddPeer(&mapper0);
1082
1083 {
1084 std::deque<TimestampedMessage> output0;
1085
1086 for (int i = 0; i < 3; ++i) {
1087 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
1088 output0.emplace_back(std::move(*mapper0.Front()));
1089 mapper0.PopFront();
1090 }
1091
1092 ASSERT_TRUE(mapper0.Front() == nullptr);
1093
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001094 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1095 EXPECT_EQ(output0[0].monotonic_event_time.time,
1096 e + chrono::milliseconds(1000));
1097 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
1098 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
1099 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001100 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001101
1102 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1103 EXPECT_EQ(output0[1].monotonic_event_time.time,
1104 e + chrono::milliseconds(2000));
1105 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
1106 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
1107 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001108 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001109
1110 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1111 EXPECT_EQ(output0[2].monotonic_event_time.time,
1112 e + chrono::milliseconds(3000));
1113 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
1114 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
1115 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001116 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001117 }
1118
1119 {
1120 SCOPED_TRACE("Trying node1 now");
1121 std::deque<TimestampedMessage> output1;
1122
1123 for (int i = 0; i < 3; ++i) {
1124 ASSERT_TRUE(mapper1.Front() != nullptr);
1125 output1.emplace_back(std::move(*mapper1.Front()));
1126 mapper1.PopFront();
1127 }
1128
1129 ASSERT_TRUE(mapper1.Front() == nullptr);
1130
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001131 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1132 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001133 e + chrono::seconds(100) + chrono::milliseconds(1000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001134 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
1135 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001136 e + chrono::nanoseconds(971));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001137 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001138
1139 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1140 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001141 e + chrono::seconds(100) + chrono::milliseconds(2000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001142 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
1143 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001144 e + chrono::nanoseconds(5458));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001145 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001146
1147 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1148 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001149 e + chrono::seconds(100) + chrono::milliseconds(3000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001150 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
1151 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
1152 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001153 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001154 }
Austin Schuh79b30942021-01-24 22:32:21 -08001155
1156 EXPECT_EQ(mapper0_count, 3u);
1157 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001158}
1159
Austin Schuhd2f96102020-12-01 20:27:29 -08001160// Tests that we can match timestamps on delivered messages. By doing this in
1161// the reverse order, the second node needs to queue data up from the first node
1162// to find the matching timestamp.
1163TEST_F(TimestampMapperTest, ReadNode1First) {
1164 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1165 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001166 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001167 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001168 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001169 writer1.QueueSpan(config2_.span());
1170
Austin Schuhd863e6e2022-10-16 15:44:50 -07001171 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001172 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001173 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001174 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1175
Austin Schuhd863e6e2022-10-16 15:44:50 -07001176 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001177 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001178 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001179 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1180
Austin Schuhd863e6e2022-10-16 15:44:50 -07001181 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001182 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001183 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001184 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1185 }
1186
1187 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1188
1189 ASSERT_EQ(parts[0].logger_node, "pi1");
1190 ASSERT_EQ(parts[1].logger_node, "pi2");
1191
Austin Schuh79b30942021-01-24 22:32:21 -08001192 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001193 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001194 mapper0.set_timestamp_callback(
1195 [&](TimestampedMessage *) { ++mapper0_count; });
1196 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001197 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001198 mapper1.set_timestamp_callback(
1199 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001200
1201 mapper0.AddPeer(&mapper1);
1202 mapper1.AddPeer(&mapper0);
1203
1204 {
1205 SCOPED_TRACE("Trying node1 now");
1206 std::deque<TimestampedMessage> output1;
1207
1208 ASSERT_TRUE(mapper1.Front() != nullptr);
1209 output1.emplace_back(std::move(*mapper1.Front()));
1210 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001211 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001212
1213 ASSERT_TRUE(mapper1.Front() != nullptr);
1214 output1.emplace_back(std::move(*mapper1.Front()));
1215 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001216 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001217
1218 ASSERT_TRUE(mapper1.Front() != nullptr);
1219 output1.emplace_back(std::move(*mapper1.Front()));
1220 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001221 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001222
1223 ASSERT_TRUE(mapper1.Front() == nullptr);
1224
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001225 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1226 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001227 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001228 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001229
1230 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1231 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001232 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001233 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001234
1235 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1236 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001237 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001238 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001239 }
1240
1241 {
1242 std::deque<TimestampedMessage> output0;
1243
1244 ASSERT_TRUE(mapper0.Front() != nullptr);
1245 output0.emplace_back(std::move(*mapper0.Front()));
1246 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001247 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001248
1249 ASSERT_TRUE(mapper0.Front() != nullptr);
1250 output0.emplace_back(std::move(*mapper0.Front()));
1251 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001252 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001253
1254 ASSERT_TRUE(mapper0.Front() != nullptr);
1255 output0.emplace_back(std::move(*mapper0.Front()));
1256 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001257 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001258
1259 ASSERT_TRUE(mapper0.Front() == nullptr);
1260
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001261 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1262 EXPECT_EQ(output0[0].monotonic_event_time.time,
1263 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001264 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001265
1266 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1267 EXPECT_EQ(output0[1].monotonic_event_time.time,
1268 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001269 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001270
1271 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1272 EXPECT_EQ(output0[2].monotonic_event_time.time,
1273 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001274 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001275 }
Austin Schuh79b30942021-01-24 22:32:21 -08001276
1277 EXPECT_EQ(mapper0_count, 3u);
1278 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001279}
1280
1281// Tests that we return just the timestamps if we couldn't find the data and the
1282// missing data was at the beginning of the file.
1283TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1284 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1285 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001286 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001287 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001288 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001289 writer1.QueueSpan(config2_.span());
1290
1291 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001292 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001293 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1294
Austin Schuhd863e6e2022-10-16 15:44:50 -07001295 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001296 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001297 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001298 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1299
Austin Schuhd863e6e2022-10-16 15:44:50 -07001300 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001301 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001302 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001303 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1304 }
1305
1306 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1307
1308 ASSERT_EQ(parts[0].logger_node, "pi1");
1309 ASSERT_EQ(parts[1].logger_node, "pi2");
1310
Austin Schuh79b30942021-01-24 22:32:21 -08001311 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001312 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001313 mapper0.set_timestamp_callback(
1314 [&](TimestampedMessage *) { ++mapper0_count; });
1315 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001316 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001317 mapper1.set_timestamp_callback(
1318 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001319
1320 mapper0.AddPeer(&mapper1);
1321 mapper1.AddPeer(&mapper0);
1322
1323 {
1324 SCOPED_TRACE("Trying node1 now");
1325 std::deque<TimestampedMessage> output1;
1326
1327 ASSERT_TRUE(mapper1.Front() != nullptr);
1328 output1.emplace_back(std::move(*mapper1.Front()));
1329 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001330 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001331
1332 ASSERT_TRUE(mapper1.Front() != nullptr);
1333 output1.emplace_back(std::move(*mapper1.Front()));
1334 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001335 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001336
1337 ASSERT_TRUE(mapper1.Front() != nullptr);
1338 output1.emplace_back(std::move(*mapper1.Front()));
1339 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001340 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001341
1342 ASSERT_TRUE(mapper1.Front() == nullptr);
1343
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001344 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1345 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001346 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001347 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001348
1349 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1350 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001351 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001352 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001353
1354 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1355 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001356 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001357 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001358 }
Austin Schuh79b30942021-01-24 22:32:21 -08001359
1360 EXPECT_EQ(mapper0_count, 0u);
1361 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001362}
1363
1364// Tests that we return just the timestamps if we couldn't find the data and the
1365// missing data was at the end of the file.
1366TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1367 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1368 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001369 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001370 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001371 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001372 writer1.QueueSpan(config2_.span());
1373
Austin Schuhd863e6e2022-10-16 15:44:50 -07001374 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001375 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001376 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001377 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1378
Austin Schuhd863e6e2022-10-16 15:44:50 -07001379 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001380 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001381 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001382 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1383
1384 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001385 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001386 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1387 }
1388
1389 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1390
1391 ASSERT_EQ(parts[0].logger_node, "pi1");
1392 ASSERT_EQ(parts[1].logger_node, "pi2");
1393
Austin Schuh79b30942021-01-24 22:32:21 -08001394 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001395 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001396 mapper0.set_timestamp_callback(
1397 [&](TimestampedMessage *) { ++mapper0_count; });
1398 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001399 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001400 mapper1.set_timestamp_callback(
1401 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001402
1403 mapper0.AddPeer(&mapper1);
1404 mapper1.AddPeer(&mapper0);
1405
1406 {
1407 SCOPED_TRACE("Trying node1 now");
1408 std::deque<TimestampedMessage> output1;
1409
1410 ASSERT_TRUE(mapper1.Front() != nullptr);
1411 output1.emplace_back(std::move(*mapper1.Front()));
1412 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001413 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001414
1415 ASSERT_TRUE(mapper1.Front() != nullptr);
1416 output1.emplace_back(std::move(*mapper1.Front()));
1417 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001418 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001419
1420 ASSERT_TRUE(mapper1.Front() != nullptr);
1421 output1.emplace_back(std::move(*mapper1.Front()));
1422 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001423 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001424
1425 ASSERT_TRUE(mapper1.Front() == nullptr);
1426
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001427 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1428 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001429 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001430 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001431
1432 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1433 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001434 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001435 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001436
1437 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1438 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001439 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001440 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001441 }
Austin Schuh79b30942021-01-24 22:32:21 -08001442
1443 EXPECT_EQ(mapper0_count, 0u);
1444 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001445}
1446
Austin Schuh993ccb52020-12-12 15:59:32 -08001447// Tests that we handle a message which failed to forward or be logged.
1448TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1449 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1450 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001451 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001452 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001453 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001454 writer1.QueueSpan(config2_.span());
1455
Austin Schuhd863e6e2022-10-16 15:44:50 -07001456 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001457 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001458 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001459 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1460
1461 // Create both the timestamp and message, but don't log them, simulating a
1462 // forwarding drop.
1463 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1464 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1465 chrono::seconds(100));
1466
Austin Schuhd863e6e2022-10-16 15:44:50 -07001467 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001468 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001469 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001470 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1471 }
1472
1473 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1474
1475 ASSERT_EQ(parts[0].logger_node, "pi1");
1476 ASSERT_EQ(parts[1].logger_node, "pi2");
1477
Austin Schuh79b30942021-01-24 22:32:21 -08001478 size_t mapper0_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001479 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001480 mapper0.set_timestamp_callback(
1481 [&](TimestampedMessage *) { ++mapper0_count; });
1482 size_t mapper1_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001483 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001484 mapper1.set_timestamp_callback(
1485 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001486
1487 mapper0.AddPeer(&mapper1);
1488 mapper1.AddPeer(&mapper0);
1489
1490 {
1491 std::deque<TimestampedMessage> output1;
1492
1493 ASSERT_TRUE(mapper1.Front() != nullptr);
1494 output1.emplace_back(std::move(*mapper1.Front()));
1495 mapper1.PopFront();
1496
1497 ASSERT_TRUE(mapper1.Front() != nullptr);
1498 output1.emplace_back(std::move(*mapper1.Front()));
1499
1500 ASSERT_FALSE(mapper1.Front() == nullptr);
1501
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001502 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1503 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001504 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001505 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001506
1507 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1508 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001509 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001510 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh993ccb52020-12-12 15:59:32 -08001511 }
Austin Schuh79b30942021-01-24 22:32:21 -08001512
1513 EXPECT_EQ(mapper0_count, 0u);
1514 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001515}
1516
Austin Schuhd2f96102020-12-01 20:27:29 -08001517// Tests that we properly sort log files with duplicate timestamps.
1518TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1519 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1520 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001521 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001522 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001523 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001524 writer1.QueueSpan(config2_.span());
1525
Austin Schuhd863e6e2022-10-16 15:44:50 -07001526 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001527 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001528 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001529 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1530
Austin Schuhd863e6e2022-10-16 15:44:50 -07001531 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001532 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001533 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001534 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1535
Austin Schuhd863e6e2022-10-16 15:44:50 -07001536 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001537 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001538 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001539 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1540
Austin Schuhd863e6e2022-10-16 15:44:50 -07001541 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001542 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001543 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001544 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1545 }
1546
1547 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1548
1549 ASSERT_EQ(parts[0].logger_node, "pi1");
1550 ASSERT_EQ(parts[1].logger_node, "pi2");
1551
Austin Schuh79b30942021-01-24 22:32:21 -08001552 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001553 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001554 mapper0.set_timestamp_callback(
1555 [&](TimestampedMessage *) { ++mapper0_count; });
1556 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001557 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001558 mapper1.set_timestamp_callback(
1559 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001560
1561 mapper0.AddPeer(&mapper1);
1562 mapper1.AddPeer(&mapper0);
1563
1564 {
1565 SCOPED_TRACE("Trying node1 now");
1566 std::deque<TimestampedMessage> output1;
1567
1568 for (int i = 0; i < 4; ++i) {
1569 ASSERT_TRUE(mapper1.Front() != nullptr);
1570 output1.emplace_back(std::move(*mapper1.Front()));
1571 mapper1.PopFront();
1572 }
1573 ASSERT_TRUE(mapper1.Front() == nullptr);
1574
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001575 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1576 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001577 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001578 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001579
1580 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1581 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001582 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001583 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001584
1585 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1586 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001587 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001588 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001589
1590 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1591 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001592 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001593 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001594 }
Austin Schuh79b30942021-01-24 22:32:21 -08001595
1596 EXPECT_EQ(mapper0_count, 0u);
1597 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001598}
1599
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001600// Tests that we properly produce a valid start time.
Austin Schuhd2f96102020-12-01 20:27:29 -08001601TEST_F(TimestampMapperTest, StartTime) {
1602 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1603 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001604 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001605 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001606 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001607 writer1.QueueSpan(config1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001608 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001609 writer2.QueueSpan(config3_.span());
1610 }
1611
1612 const std::vector<LogFile> parts =
1613 SortParts({logfile0_, logfile1_, logfile2_});
1614
Austin Schuh79b30942021-01-24 22:32:21 -08001615 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001616 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001617 mapper0.set_timestamp_callback(
1618 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001619
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001620 EXPECT_EQ(mapper0.monotonic_start_time(0), e + chrono::milliseconds(1));
1621 EXPECT_EQ(mapper0.realtime_start_time(0),
Austin Schuhd2f96102020-12-01 20:27:29 -08001622 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001623 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001624}
1625
Austin Schuhfecf1d82020-12-19 16:57:28 -08001626// Tests that when a peer isn't registered, we treat that as if there was no
1627// data available.
1628TEST_F(TimestampMapperTest, NoPeer) {
1629 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1630 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001631 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001632 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001633 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001634 writer1.QueueSpan(config2_.span());
1635
1636 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001637 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001638 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1639
Austin Schuhd863e6e2022-10-16 15:44:50 -07001640 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001641 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001642 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001643 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1644
Austin Schuhd863e6e2022-10-16 15:44:50 -07001645 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001646 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001647 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001648 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1649 }
1650
1651 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1652
1653 ASSERT_EQ(parts[0].logger_node, "pi1");
1654 ASSERT_EQ(parts[1].logger_node, "pi2");
1655
Austin Schuh79b30942021-01-24 22:32:21 -08001656 size_t mapper1_count = 0;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001657 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001658 mapper1.set_timestamp_callback(
1659 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001660
1661 {
1662 std::deque<TimestampedMessage> output1;
1663
1664 ASSERT_TRUE(mapper1.Front() != nullptr);
1665 output1.emplace_back(std::move(*mapper1.Front()));
1666 mapper1.PopFront();
1667 ASSERT_TRUE(mapper1.Front() != nullptr);
1668 output1.emplace_back(std::move(*mapper1.Front()));
1669 mapper1.PopFront();
1670 ASSERT_TRUE(mapper1.Front() != nullptr);
1671 output1.emplace_back(std::move(*mapper1.Front()));
1672 mapper1.PopFront();
1673 ASSERT_TRUE(mapper1.Front() == nullptr);
1674
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001675 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1676 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001677 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001678 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001679
1680 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1681 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001682 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001683 EXPECT_FALSE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001684
1685 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1686 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001687 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001688 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001689 }
Austin Schuh79b30942021-01-24 22:32:21 -08001690 EXPECT_EQ(mapper1_count, 3u);
1691}
1692
1693// Tests that we can queue messages and call the timestamp callback for both
1694// nodes.
1695TEST_F(TimestampMapperTest, QueueUntilNode0) {
1696 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1697 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001698 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh79b30942021-01-24 22:32:21 -08001699 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001700 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh79b30942021-01-24 22:32:21 -08001701 writer1.QueueSpan(config2_.span());
1702
Austin Schuhd863e6e2022-10-16 15:44:50 -07001703 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001704 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001705 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001706 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1707
Austin Schuhd863e6e2022-10-16 15:44:50 -07001708 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001709 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001710 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001711 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1712
Austin Schuhd863e6e2022-10-16 15:44:50 -07001713 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001714 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001715 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001716 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1717
Austin Schuhd863e6e2022-10-16 15:44:50 -07001718 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001719 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001720 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001721 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1722 }
1723
1724 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1725
1726 ASSERT_EQ(parts[0].logger_node, "pi1");
1727 ASSERT_EQ(parts[1].logger_node, "pi2");
1728
1729 size_t mapper0_count = 0;
1730 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1731 mapper0.set_timestamp_callback(
1732 [&](TimestampedMessage *) { ++mapper0_count; });
1733 size_t mapper1_count = 0;
1734 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1735 mapper1.set_timestamp_callback(
1736 [&](TimestampedMessage *) { ++mapper1_count; });
1737
1738 mapper0.AddPeer(&mapper1);
1739 mapper1.AddPeer(&mapper0);
1740
1741 {
1742 std::deque<TimestampedMessage> output0;
1743
1744 EXPECT_EQ(mapper0_count, 0u);
1745 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001746 mapper0.QueueUntil(
1747 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001748 EXPECT_EQ(mapper0_count, 3u);
1749 EXPECT_EQ(mapper1_count, 0u);
1750
1751 ASSERT_TRUE(mapper0.Front() != nullptr);
1752 EXPECT_EQ(mapper0_count, 3u);
1753 EXPECT_EQ(mapper1_count, 0u);
1754
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001755 mapper0.QueueUntil(
1756 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001757 EXPECT_EQ(mapper0_count, 3u);
1758 EXPECT_EQ(mapper1_count, 0u);
1759
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001760 mapper0.QueueUntil(
1761 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001762 EXPECT_EQ(mapper0_count, 4u);
1763 EXPECT_EQ(mapper1_count, 0u);
1764
1765 output0.emplace_back(std::move(*mapper0.Front()));
1766 mapper0.PopFront();
1767 output0.emplace_back(std::move(*mapper0.Front()));
1768 mapper0.PopFront();
1769 output0.emplace_back(std::move(*mapper0.Front()));
1770 mapper0.PopFront();
1771 output0.emplace_back(std::move(*mapper0.Front()));
1772 mapper0.PopFront();
1773
1774 EXPECT_EQ(mapper0_count, 4u);
1775 EXPECT_EQ(mapper1_count, 0u);
1776
1777 ASSERT_TRUE(mapper0.Front() == nullptr);
1778
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001779 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1780 EXPECT_EQ(output0[0].monotonic_event_time.time,
1781 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001782 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001783
1784 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1785 EXPECT_EQ(output0[1].monotonic_event_time.time,
1786 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001787 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001788
1789 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1790 EXPECT_EQ(output0[2].monotonic_event_time.time,
1791 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001792 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001793
1794 EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
1795 EXPECT_EQ(output0[3].monotonic_event_time.time,
1796 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001797 EXPECT_TRUE(output0[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001798 }
1799
1800 {
1801 SCOPED_TRACE("Trying node1 now");
1802 std::deque<TimestampedMessage> output1;
1803
1804 EXPECT_EQ(mapper0_count, 4u);
1805 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001806 mapper1.QueueUntil(BootTimestamp{
1807 .boot = 0,
1808 .time = e + chrono::seconds(100) + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001809 EXPECT_EQ(mapper0_count, 4u);
1810 EXPECT_EQ(mapper1_count, 3u);
1811
1812 ASSERT_TRUE(mapper1.Front() != nullptr);
1813 EXPECT_EQ(mapper0_count, 4u);
1814 EXPECT_EQ(mapper1_count, 3u);
1815
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001816 mapper1.QueueUntil(BootTimestamp{
1817 .boot = 0,
1818 .time = e + chrono::seconds(100) + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001819 EXPECT_EQ(mapper0_count, 4u);
1820 EXPECT_EQ(mapper1_count, 3u);
1821
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001822 mapper1.QueueUntil(BootTimestamp{
1823 .boot = 0,
1824 .time = e + chrono::seconds(100) + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001825 EXPECT_EQ(mapper0_count, 4u);
1826 EXPECT_EQ(mapper1_count, 4u);
1827
1828 ASSERT_TRUE(mapper1.Front() != nullptr);
1829 EXPECT_EQ(mapper0_count, 4u);
1830 EXPECT_EQ(mapper1_count, 4u);
1831
1832 output1.emplace_back(std::move(*mapper1.Front()));
1833 mapper1.PopFront();
1834 ASSERT_TRUE(mapper1.Front() != nullptr);
1835 output1.emplace_back(std::move(*mapper1.Front()));
1836 mapper1.PopFront();
1837 ASSERT_TRUE(mapper1.Front() != nullptr);
1838 output1.emplace_back(std::move(*mapper1.Front()));
1839 mapper1.PopFront();
1840 ASSERT_TRUE(mapper1.Front() != nullptr);
1841 output1.emplace_back(std::move(*mapper1.Front()));
1842 mapper1.PopFront();
1843
1844 EXPECT_EQ(mapper0_count, 4u);
1845 EXPECT_EQ(mapper1_count, 4u);
1846
1847 ASSERT_TRUE(mapper1.Front() == nullptr);
1848
1849 EXPECT_EQ(mapper0_count, 4u);
1850 EXPECT_EQ(mapper1_count, 4u);
1851
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001852 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1853 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001854 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001855 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001856
1857 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1858 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001859 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001860 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001861
1862 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1863 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001864 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001865 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001866
1867 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1868 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001869 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001870 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001871 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08001872}
1873
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001874class BootMergerTest : public SortingElementTest {
1875 public:
1876 BootMergerTest()
1877 : SortingElementTest(),
1878 boot0_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07001879 /* 100ms */
1880 "max_out_of_order_duration": 100000000,
1881 "node": {
1882 "name": "pi2"
1883 },
1884 "logger_node": {
1885 "name": "pi1"
1886 },
1887 "monotonic_start_time": 1000000,
1888 "realtime_start_time": 1000000000000,
1889 "logger_monotonic_start_time": 1000000,
1890 "logger_realtime_start_time": 1000000000000,
1891 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
1892 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
1893 "parts_index": 0,
1894 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
1895 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07001896 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
1897 "boot_uuids": [
1898 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
1899 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
1900 ""
1901 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001902})")),
1903 boot1_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07001904 /* 100ms */
1905 "max_out_of_order_duration": 100000000,
1906 "node": {
1907 "name": "pi2"
1908 },
1909 "logger_node": {
1910 "name": "pi1"
1911 },
1912 "monotonic_start_time": 1000000,
1913 "realtime_start_time": 1000000000000,
1914 "logger_monotonic_start_time": 1000000,
1915 "logger_realtime_start_time": 1000000000000,
1916 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
1917 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
1918 "parts_index": 1,
1919 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
1920 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07001921 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
1922 "boot_uuids": [
1923 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
1924 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
1925 ""
1926 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001927})")) {}
Austin Schuh8bee6882021-06-28 21:03:28 -07001928
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001929 protected:
1930 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
1931 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
1932};
1933
1934// This tests that we can properly sort a multi-node log file which has the old
1935// (and buggy) timestamps in the header, and the non-resetting parts_index.
1936// These make it so we can just bairly figure out what happened first and what
1937// happened second, but not in a way that is robust to multiple nodes rebooting.
1938TEST_F(BootMergerTest, OldReboot) {
Austin Schuh8bee6882021-06-28 21:03:28 -07001939 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001940 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001941 writer.QueueSpan(boot0_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07001942 }
1943 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001944 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001945 writer.QueueSpan(boot1_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07001946 }
1947
1948 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1949
1950 ASSERT_EQ(parts.size(), 1u);
1951 ASSERT_EQ(parts[0].parts.size(), 2u);
1952
1953 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
1954 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001955 boot0_.message().source_node_boot_uuid()->string_view());
Austin Schuh8bee6882021-06-28 21:03:28 -07001956
1957 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
1958 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001959 boot1_.message().source_node_boot_uuid()->string_view());
1960}
1961
1962// This tests that we can produce messages ordered across a reboot.
1963TEST_F(BootMergerTest, SortAcrossReboot) {
1964 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1965 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001966 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001967 writer.QueueSpan(boot0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001968 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001969 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001970 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001971 MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
1972 }
1973 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001974 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001975 writer.QueueSpan(boot1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001976 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001977 MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001978 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001979 MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
1980 }
1981
1982 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1983 ASSERT_EQ(parts.size(), 1u);
1984 ASSERT_EQ(parts[0].parts.size(), 2u);
1985
1986 BootMerger merger(FilterPartsForNode(parts, "pi2"));
1987
1988 EXPECT_EQ(merger.node(), 1u);
1989
1990 std::vector<Message> output;
1991 for (int i = 0; i < 4; ++i) {
1992 ASSERT_TRUE(merger.Front() != nullptr);
1993 output.emplace_back(std::move(*merger.Front()));
1994 merger.PopFront();
1995 }
1996
1997 ASSERT_TRUE(merger.Front() == nullptr);
1998
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001999 EXPECT_EQ(output[0].timestamp.boot, 0u);
2000 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
2001 EXPECT_EQ(output[1].timestamp.boot, 0u);
2002 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(2000));
2003
2004 EXPECT_EQ(output[2].timestamp.boot, 1u);
2005 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(100));
2006 EXPECT_EQ(output[3].timestamp.boot, 1u);
2007 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
Austin Schuh8bee6882021-06-28 21:03:28 -07002008}
2009
Austin Schuh48507722021-07-17 17:29:24 -07002010class RebootTimestampMapperTest : public SortingElementTest {
2011 public:
2012 RebootTimestampMapperTest()
2013 : SortingElementTest(),
2014 boot0a_(MakeHeader(config_, R"({
2015 /* 100ms */
2016 "max_out_of_order_duration": 100000000,
2017 "node": {
2018 "name": "pi1"
2019 },
2020 "logger_node": {
2021 "name": "pi1"
2022 },
2023 "monotonic_start_time": 1000000,
2024 "realtime_start_time": 1000000000000,
2025 "logger_monotonic_start_time": 1000000,
2026 "logger_realtime_start_time": 1000000000000,
2027 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2028 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2029 "parts_index": 0,
2030 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2031 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2032 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2033 "boot_uuids": [
2034 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2035 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2036 ""
2037 ]
2038})")),
2039 boot0b_(MakeHeader(config_, R"({
2040 /* 100ms */
2041 "max_out_of_order_duration": 100000000,
2042 "node": {
2043 "name": "pi1"
2044 },
2045 "logger_node": {
2046 "name": "pi1"
2047 },
2048 "monotonic_start_time": 1000000,
2049 "realtime_start_time": 1000000000000,
2050 "logger_monotonic_start_time": 1000000,
2051 "logger_realtime_start_time": 1000000000000,
2052 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2053 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2054 "parts_index": 1,
2055 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2056 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2057 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2058 "boot_uuids": [
2059 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2060 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2061 ""
2062 ]
2063})")),
2064 boot1a_(MakeHeader(config_, R"({
2065 /* 100ms */
2066 "max_out_of_order_duration": 100000000,
2067 "node": {
2068 "name": "pi2"
2069 },
2070 "logger_node": {
2071 "name": "pi1"
2072 },
2073 "monotonic_start_time": 1000000,
2074 "realtime_start_time": 1000000000000,
2075 "logger_monotonic_start_time": 1000000,
2076 "logger_realtime_start_time": 1000000000000,
2077 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2078 "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
2079 "parts_index": 0,
2080 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2081 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2082 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2083 "boot_uuids": [
2084 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2085 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2086 ""
2087 ]
2088})")),
2089 boot1b_(MakeHeader(config_, R"({
2090 /* 100ms */
2091 "max_out_of_order_duration": 100000000,
2092 "node": {
2093 "name": "pi2"
2094 },
2095 "logger_node": {
2096 "name": "pi1"
2097 },
2098 "monotonic_start_time": 1000000,
2099 "realtime_start_time": 1000000000000,
2100 "logger_monotonic_start_time": 1000000,
2101 "logger_realtime_start_time": 1000000000000,
2102 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2103 "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
2104 "parts_index": 1,
2105 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2106 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2107 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2108 "boot_uuids": [
2109 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2110 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2111 ""
2112 ]
2113})")) {}
2114
2115 protected:
2116 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
2117 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
2118 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
2119 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
2120};
2121
Austin Schuh48507722021-07-17 17:29:24 -07002122// Tests that we can match timestamps on delivered messages in the presence of
2123// reboots on the node receiving timestamps.
2124TEST_F(RebootTimestampMapperTest, ReadNode0First) {
2125 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2126 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002127 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002128 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002129 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002130 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002131 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002132 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002133 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002134 writer1b.QueueSpan(boot1b_.span());
2135
Austin Schuhd863e6e2022-10-16 15:44:50 -07002136 writer0a.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002137 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002138 writer1a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002139 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
2140 e + chrono::milliseconds(1001)));
2141
Austin Schuhd863e6e2022-10-16 15:44:50 -07002142 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh58646e22021-08-23 23:51:46 -07002143 e + chrono::milliseconds(1000), 0, chrono::seconds(21),
2144 e + chrono::milliseconds(2001)));
2145
Austin Schuhd863e6e2022-10-16 15:44:50 -07002146 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002147 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002148 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002149 e + chrono::milliseconds(2000), 0, chrono::seconds(20),
2150 e + chrono::milliseconds(2001)));
2151
Austin Schuhd863e6e2022-10-16 15:44:50 -07002152 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002153 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002154 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002155 e + chrono::milliseconds(3000), 0, chrono::seconds(20),
2156 e + chrono::milliseconds(3001)));
2157 }
2158
Austin Schuh58646e22021-08-23 23:51:46 -07002159 const std::vector<LogFile> parts =
2160 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Austin Schuh48507722021-07-17 17:29:24 -07002161
2162 for (const auto &x : parts) {
2163 LOG(INFO) << x;
2164 }
2165 ASSERT_EQ(parts.size(), 1u);
2166 ASSERT_EQ(parts[0].logger_node, "pi1");
2167
2168 size_t mapper0_count = 0;
2169 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2170 mapper0.set_timestamp_callback(
2171 [&](TimestampedMessage *) { ++mapper0_count; });
2172 size_t mapper1_count = 0;
2173 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2174 mapper1.set_timestamp_callback(
2175 [&](TimestampedMessage *) { ++mapper1_count; });
2176
2177 mapper0.AddPeer(&mapper1);
2178 mapper1.AddPeer(&mapper0);
2179
2180 {
2181 std::deque<TimestampedMessage> output0;
2182
2183 EXPECT_EQ(mapper0_count, 0u);
2184 EXPECT_EQ(mapper1_count, 0u);
2185 ASSERT_TRUE(mapper0.Front() != nullptr);
2186 EXPECT_EQ(mapper0_count, 1u);
2187 EXPECT_EQ(mapper1_count, 0u);
2188 output0.emplace_back(std::move(*mapper0.Front()));
2189 mapper0.PopFront();
2190 EXPECT_TRUE(mapper0.started());
2191 EXPECT_EQ(mapper0_count, 1u);
2192 EXPECT_EQ(mapper1_count, 0u);
2193
2194 ASSERT_TRUE(mapper0.Front() != nullptr);
2195 EXPECT_EQ(mapper0_count, 2u);
2196 EXPECT_EQ(mapper1_count, 0u);
2197 output0.emplace_back(std::move(*mapper0.Front()));
2198 mapper0.PopFront();
2199 EXPECT_TRUE(mapper0.started());
2200
2201 ASSERT_TRUE(mapper0.Front() != nullptr);
2202 output0.emplace_back(std::move(*mapper0.Front()));
2203 mapper0.PopFront();
2204 EXPECT_TRUE(mapper0.started());
2205
2206 EXPECT_EQ(mapper0_count, 3u);
2207 EXPECT_EQ(mapper1_count, 0u);
2208
2209 ASSERT_TRUE(mapper0.Front() == nullptr);
2210
2211 LOG(INFO) << output0[0];
2212 LOG(INFO) << output0[1];
2213 LOG(INFO) << output0[2];
2214
2215 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2216 EXPECT_EQ(output0[0].monotonic_event_time.time,
2217 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002218 EXPECT_EQ(output0[0].queue_index,
2219 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002220 EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
2221 EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002222 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002223
2224 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2225 EXPECT_EQ(output0[1].monotonic_event_time.time,
2226 e + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002227 EXPECT_EQ(output0[1].queue_index,
2228 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002229 EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
2230 EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002231 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002232
2233 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2234 EXPECT_EQ(output0[2].monotonic_event_time.time,
2235 e + chrono::milliseconds(3000));
Austin Schuh58646e22021-08-23 23:51:46 -07002236 EXPECT_EQ(output0[2].queue_index,
2237 (BootQueueIndex{.boot = 0u, .index = 2u}));
Austin Schuh48507722021-07-17 17:29:24 -07002238 EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
2239 EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002240 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002241 }
2242
2243 {
2244 SCOPED_TRACE("Trying node1 now");
2245 std::deque<TimestampedMessage> output1;
2246
2247 EXPECT_EQ(mapper0_count, 3u);
2248 EXPECT_EQ(mapper1_count, 0u);
2249
2250 ASSERT_TRUE(mapper1.Front() != nullptr);
2251 EXPECT_EQ(mapper0_count, 3u);
2252 EXPECT_EQ(mapper1_count, 1u);
2253 output1.emplace_back(std::move(*mapper1.Front()));
2254 mapper1.PopFront();
2255 EXPECT_TRUE(mapper1.started());
2256 EXPECT_EQ(mapper0_count, 3u);
2257 EXPECT_EQ(mapper1_count, 1u);
2258
2259 ASSERT_TRUE(mapper1.Front() != nullptr);
2260 EXPECT_EQ(mapper0_count, 3u);
2261 EXPECT_EQ(mapper1_count, 2u);
2262 output1.emplace_back(std::move(*mapper1.Front()));
2263 mapper1.PopFront();
2264 EXPECT_TRUE(mapper1.started());
2265
2266 ASSERT_TRUE(mapper1.Front() != nullptr);
2267 output1.emplace_back(std::move(*mapper1.Front()));
2268 mapper1.PopFront();
2269 EXPECT_TRUE(mapper1.started());
2270
Austin Schuh58646e22021-08-23 23:51:46 -07002271 ASSERT_TRUE(mapper1.Front() != nullptr);
2272 output1.emplace_back(std::move(*mapper1.Front()));
2273 mapper1.PopFront();
2274 EXPECT_TRUE(mapper1.started());
2275
Austin Schuh48507722021-07-17 17:29:24 -07002276 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002277 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002278
2279 ASSERT_TRUE(mapper1.Front() == nullptr);
2280
2281 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002282 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002283
2284 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2285 EXPECT_EQ(output1[0].monotonic_event_time.time,
2286 e + chrono::seconds(100) + chrono::milliseconds(1000));
2287 EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
2288 EXPECT_EQ(output1[0].monotonic_remote_time.time,
2289 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002290 EXPECT_EQ(output1[0].remote_queue_index,
2291 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002292 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
2293 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
2294 e + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002295 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002296
2297 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2298 EXPECT_EQ(output1[1].monotonic_event_time.time,
2299 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002300 EXPECT_EQ(output1[1].remote_queue_index,
2301 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002302 EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
2303 EXPECT_EQ(output1[1].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002304 e + chrono::milliseconds(1000));
Austin Schuh48507722021-07-17 17:29:24 -07002305 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
2306 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
2307 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002308 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002309
2310 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2311 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002312 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh48507722021-07-17 17:29:24 -07002313 EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
2314 EXPECT_EQ(output1[2].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002315 e + chrono::milliseconds(2000));
2316 EXPECT_EQ(output1[2].remote_queue_index,
2317 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002318 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
2319 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002320 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002321 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002322
Austin Schuh58646e22021-08-23 23:51:46 -07002323 EXPECT_EQ(output1[3].monotonic_event_time.boot, 1u);
2324 EXPECT_EQ(output1[3].monotonic_event_time.time,
2325 e + chrono::seconds(20) + chrono::milliseconds(3000));
2326 EXPECT_EQ(output1[3].monotonic_remote_time.boot, 0u);
2327 EXPECT_EQ(output1[3].monotonic_remote_time.time,
2328 e + chrono::milliseconds(3000));
2329 EXPECT_EQ(output1[3].remote_queue_index,
2330 (BootQueueIndex{.boot = 0u, .index = 2u}));
2331 EXPECT_EQ(output1[3].monotonic_timestamp_time.boot, 0u);
2332 EXPECT_EQ(output1[3].monotonic_timestamp_time.time,
2333 e + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002334 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh58646e22021-08-23 23:51:46 -07002335
Austin Schuh48507722021-07-17 17:29:24 -07002336 LOG(INFO) << output1[0];
2337 LOG(INFO) << output1[1];
2338 LOG(INFO) << output1[2];
Austin Schuh58646e22021-08-23 23:51:46 -07002339 LOG(INFO) << output1[3];
Austin Schuh48507722021-07-17 17:29:24 -07002340 }
2341}
2342
2343TEST_F(RebootTimestampMapperTest, Node2Reboot) {
2344 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2345 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002346 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002347 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002348 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002349 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002350 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002351 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002352 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002353 writer1b.QueueSpan(boot1b_.span());
2354
Austin Schuhd863e6e2022-10-16 15:44:50 -07002355 writer1a.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002356 e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002357 writer0a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002358 e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
2359 chrono::seconds(-100),
2360 e + chrono::seconds(100) + chrono::milliseconds(1001)));
2361
Austin Schuhd863e6e2022-10-16 15:44:50 -07002362 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002363 e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002364 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002365 e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
2366 chrono::seconds(-20),
2367 e + chrono::seconds(20) + chrono::milliseconds(2001)));
2368
Austin Schuhd863e6e2022-10-16 15:44:50 -07002369 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002370 e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002371 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002372 e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
2373 chrono::seconds(-20),
2374 e + chrono::seconds(20) + chrono::milliseconds(3001)));
2375 }
2376
2377 const std::vector<LogFile> parts =
2378 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2379
2380 for (const auto &x : parts) {
2381 LOG(INFO) << x;
2382 }
2383 ASSERT_EQ(parts.size(), 1u);
2384 ASSERT_EQ(parts[0].logger_node, "pi1");
2385
2386 size_t mapper0_count = 0;
2387 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2388 mapper0.set_timestamp_callback(
2389 [&](TimestampedMessage *) { ++mapper0_count; });
2390 size_t mapper1_count = 0;
2391 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2392 mapper1.set_timestamp_callback(
2393 [&](TimestampedMessage *) { ++mapper1_count; });
2394
2395 mapper0.AddPeer(&mapper1);
2396 mapper1.AddPeer(&mapper0);
2397
2398 {
2399 std::deque<TimestampedMessage> output0;
2400
2401 EXPECT_EQ(mapper0_count, 0u);
2402 EXPECT_EQ(mapper1_count, 0u);
2403 ASSERT_TRUE(mapper0.Front() != nullptr);
2404 EXPECT_EQ(mapper0_count, 1u);
2405 EXPECT_EQ(mapper1_count, 0u);
2406 output0.emplace_back(std::move(*mapper0.Front()));
2407 mapper0.PopFront();
2408 EXPECT_TRUE(mapper0.started());
2409 EXPECT_EQ(mapper0_count, 1u);
2410 EXPECT_EQ(mapper1_count, 0u);
2411
2412 ASSERT_TRUE(mapper0.Front() != nullptr);
2413 EXPECT_EQ(mapper0_count, 2u);
2414 EXPECT_EQ(mapper1_count, 0u);
2415 output0.emplace_back(std::move(*mapper0.Front()));
2416 mapper0.PopFront();
2417 EXPECT_TRUE(mapper0.started());
2418
2419 ASSERT_TRUE(mapper0.Front() != nullptr);
2420 output0.emplace_back(std::move(*mapper0.Front()));
2421 mapper0.PopFront();
2422 EXPECT_TRUE(mapper0.started());
2423
2424 EXPECT_EQ(mapper0_count, 3u);
2425 EXPECT_EQ(mapper1_count, 0u);
2426
2427 ASSERT_TRUE(mapper0.Front() == nullptr);
2428
2429 LOG(INFO) << output0[0];
2430 LOG(INFO) << output0[1];
2431 LOG(INFO) << output0[2];
2432
2433 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2434 EXPECT_EQ(output0[0].monotonic_event_time.time,
2435 e + chrono::milliseconds(1000));
2436 EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
2437 EXPECT_EQ(output0[0].monotonic_remote_time.time,
2438 e + chrono::seconds(100) + chrono::milliseconds(1000));
2439 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
2440 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
2441 e + chrono::seconds(100) + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002442 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002443
2444 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2445 EXPECT_EQ(output0[1].monotonic_event_time.time,
2446 e + chrono::milliseconds(2000));
2447 EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
2448 EXPECT_EQ(output0[1].monotonic_remote_time.time,
2449 e + chrono::seconds(20) + chrono::milliseconds(2000));
2450 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
2451 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
2452 e + chrono::seconds(20) + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002453 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002454
2455 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2456 EXPECT_EQ(output0[2].monotonic_event_time.time,
2457 e + chrono::milliseconds(3000));
2458 EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
2459 EXPECT_EQ(output0[2].monotonic_remote_time.time,
2460 e + chrono::seconds(20) + chrono::milliseconds(3000));
2461 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
2462 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
2463 e + chrono::seconds(20) + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002464 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002465 }
2466
2467 {
2468 SCOPED_TRACE("Trying node1 now");
2469 std::deque<TimestampedMessage> output1;
2470
2471 EXPECT_EQ(mapper0_count, 3u);
2472 EXPECT_EQ(mapper1_count, 0u);
2473
2474 ASSERT_TRUE(mapper1.Front() != nullptr);
2475 EXPECT_EQ(mapper0_count, 3u);
2476 EXPECT_EQ(mapper1_count, 1u);
2477 output1.emplace_back(std::move(*mapper1.Front()));
2478 mapper1.PopFront();
2479 EXPECT_TRUE(mapper1.started());
2480 EXPECT_EQ(mapper0_count, 3u);
2481 EXPECT_EQ(mapper1_count, 1u);
2482
2483 ASSERT_TRUE(mapper1.Front() != nullptr);
2484 EXPECT_EQ(mapper0_count, 3u);
2485 EXPECT_EQ(mapper1_count, 2u);
2486 output1.emplace_back(std::move(*mapper1.Front()));
2487 mapper1.PopFront();
2488 EXPECT_TRUE(mapper1.started());
2489
2490 ASSERT_TRUE(mapper1.Front() != nullptr);
2491 output1.emplace_back(std::move(*mapper1.Front()));
2492 mapper1.PopFront();
2493 EXPECT_TRUE(mapper1.started());
2494
2495 EXPECT_EQ(mapper0_count, 3u);
2496 EXPECT_EQ(mapper1_count, 3u);
2497
2498 ASSERT_TRUE(mapper1.Front() == nullptr);
2499
2500 EXPECT_EQ(mapper0_count, 3u);
2501 EXPECT_EQ(mapper1_count, 3u);
2502
2503 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2504 EXPECT_EQ(output1[0].monotonic_event_time.time,
2505 e + chrono::seconds(100) + chrono::milliseconds(1000));
2506 EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
2507 EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002508 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002509
2510 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2511 EXPECT_EQ(output1[1].monotonic_event_time.time,
2512 e + chrono::seconds(20) + chrono::milliseconds(2000));
2513 EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
2514 EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002515 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002516
2517 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2518 EXPECT_EQ(output1[2].monotonic_event_time.time,
2519 e + chrono::seconds(20) + chrono::milliseconds(3000));
2520 EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
2521 EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002522 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002523
2524 LOG(INFO) << output1[0];
2525 LOG(INFO) << output1[1];
2526 LOG(INFO) << output1[2];
2527 }
2528}
2529
Austin Schuh44c61472021-11-22 21:04:10 -08002530class SortingDeathTest : public SortingElementTest {
2531 public:
2532 SortingDeathTest()
2533 : SortingElementTest(),
2534 part0_(MakeHeader(config_, R"({
2535 /* 100ms */
2536 "max_out_of_order_duration": 100000000,
2537 "node": {
2538 "name": "pi1"
2539 },
2540 "logger_node": {
2541 "name": "pi1"
2542 },
2543 "monotonic_start_time": 1000000,
2544 "realtime_start_time": 1000000000000,
2545 "logger_monotonic_start_time": 1000000,
2546 "logger_realtime_start_time": 1000000000000,
2547 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2548 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2549 "parts_index": 0,
2550 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2551 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2552 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2553 "boot_uuids": [
2554 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2555 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2556 ""
2557 ],
2558 "oldest_remote_monotonic_timestamps": [
2559 9223372036854775807,
2560 9223372036854775807,
2561 9223372036854775807
2562 ],
2563 "oldest_local_monotonic_timestamps": [
2564 9223372036854775807,
2565 9223372036854775807,
2566 9223372036854775807
2567 ],
2568 "oldest_remote_unreliable_monotonic_timestamps": [
2569 9223372036854775807,
2570 0,
2571 9223372036854775807
2572 ],
2573 "oldest_local_unreliable_monotonic_timestamps": [
2574 9223372036854775807,
2575 0,
2576 9223372036854775807
2577 ]
2578})")),
2579 part1_(MakeHeader(config_, R"({
2580 /* 100ms */
2581 "max_out_of_order_duration": 100000000,
2582 "node": {
2583 "name": "pi1"
2584 },
2585 "logger_node": {
2586 "name": "pi1"
2587 },
2588 "monotonic_start_time": 1000000,
2589 "realtime_start_time": 1000000000000,
2590 "logger_monotonic_start_time": 1000000,
2591 "logger_realtime_start_time": 1000000000000,
2592 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2593 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2594 "parts_index": 1,
2595 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2596 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2597 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2598 "boot_uuids": [
2599 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2600 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2601 ""
2602 ],
2603 "oldest_remote_monotonic_timestamps": [
2604 9223372036854775807,
2605 9223372036854775807,
2606 9223372036854775807
2607 ],
2608 "oldest_local_monotonic_timestamps": [
2609 9223372036854775807,
2610 9223372036854775807,
2611 9223372036854775807
2612 ],
2613 "oldest_remote_unreliable_monotonic_timestamps": [
2614 9223372036854775807,
2615 100000,
2616 9223372036854775807
2617 ],
2618 "oldest_local_unreliable_monotonic_timestamps": [
2619 9223372036854775807,
2620 100000,
2621 9223372036854775807
2622 ]
2623})")),
2624 part2_(MakeHeader(config_, R"({
2625 /* 100ms */
2626 "max_out_of_order_duration": 100000000,
2627 "node": {
2628 "name": "pi1"
2629 },
2630 "logger_node": {
2631 "name": "pi1"
2632 },
2633 "monotonic_start_time": 1000000,
2634 "realtime_start_time": 1000000000000,
2635 "logger_monotonic_start_time": 1000000,
2636 "logger_realtime_start_time": 1000000000000,
2637 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2638 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2639 "parts_index": 2,
2640 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2641 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2642 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2643 "boot_uuids": [
2644 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2645 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2646 ""
2647 ],
2648 "oldest_remote_monotonic_timestamps": [
2649 9223372036854775807,
2650 9223372036854775807,
2651 9223372036854775807
2652 ],
2653 "oldest_local_monotonic_timestamps": [
2654 9223372036854775807,
2655 9223372036854775807,
2656 9223372036854775807
2657 ],
2658 "oldest_remote_unreliable_monotonic_timestamps": [
2659 9223372036854775807,
2660 200000,
2661 9223372036854775807
2662 ],
2663 "oldest_local_unreliable_monotonic_timestamps": [
2664 9223372036854775807,
2665 200000,
2666 9223372036854775807
2667 ]
2668})")),
2669 part3_(MakeHeader(config_, R"({
2670 /* 100ms */
2671 "max_out_of_order_duration": 100000000,
2672 "node": {
2673 "name": "pi1"
2674 },
2675 "logger_node": {
2676 "name": "pi1"
2677 },
2678 "monotonic_start_time": 1000000,
2679 "realtime_start_time": 1000000000000,
2680 "logger_monotonic_start_time": 1000000,
2681 "logger_realtime_start_time": 1000000000000,
2682 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2683 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2684 "parts_index": 3,
2685 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2686 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2687 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2688 "boot_uuids": [
2689 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2690 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2691 ""
2692 ],
2693 "oldest_remote_monotonic_timestamps": [
2694 9223372036854775807,
2695 9223372036854775807,
2696 9223372036854775807
2697 ],
2698 "oldest_local_monotonic_timestamps": [
2699 9223372036854775807,
2700 9223372036854775807,
2701 9223372036854775807
2702 ],
2703 "oldest_remote_unreliable_monotonic_timestamps": [
2704 9223372036854775807,
2705 300000,
2706 9223372036854775807
2707 ],
2708 "oldest_local_unreliable_monotonic_timestamps": [
2709 9223372036854775807,
2710 300000,
2711 9223372036854775807
2712 ]
2713})")) {}
2714
2715 protected:
2716 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part0_;
2717 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part1_;
2718 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part2_;
2719 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part3_;
2720};
2721
2722// Tests that if 2 computers go back and forth trying to be the same node, we
2723// die in sorting instead of failing to estimate time.
2724TEST_F(SortingDeathTest, FightingNodes) {
2725 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002726 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh44c61472021-11-22 21:04:10 -08002727 writer0.QueueSpan(part0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002728 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh44c61472021-11-22 21:04:10 -08002729 writer1.QueueSpan(part1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002730 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuh44c61472021-11-22 21:04:10 -08002731 writer2.QueueSpan(part2_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002732 TestDetachedBufferWriter writer3(logfile3_);
Austin Schuh44c61472021-11-22 21:04:10 -08002733 writer3.QueueSpan(part3_.span());
2734 }
2735
2736 EXPECT_DEATH(
2737 {
2738 const std::vector<LogFile> parts =
2739 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2740 },
Austin Schuh22cf7862022-09-19 19:09:42 -07002741 "found overlapping boots on");
Austin Schuh44c61472021-11-22 21:04:10 -08002742}
2743
Brian Smarttea913d42021-12-10 15:02:38 -08002744// Tests that we MessageReader blows up on a bad message.
2745TEST(MessageReaderConfirmCrash, ReadWrite) {
2746 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
2747 unlink(logfile.c_str());
2748
2749 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
2750 JsonToSizedFlatbuffer<LogFileHeader>(
2751 R"({ "max_out_of_order_duration": 100000000 })");
2752 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
2753 JsonToSizedFlatbuffer<MessageHeader>(
2754 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
2755 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
2756 JsonToSizedFlatbuffer<MessageHeader>(
2757 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
2758 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m4 =
2759 JsonToSizedFlatbuffer<MessageHeader>(
2760 R"({ "channel_index": 0, "monotonic_sent_time": 4 })");
2761
2762 // Starts out like a proper flat buffer header, but it breaks down ...
2763 std::vector<uint8_t> garbage{8, 0, 0, 0, 16, 0, 0, 0, 4, 0, 0, 0};
2764 absl::Span<uint8_t> m3_span(garbage);
2765
2766 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002767 TestDetachedBufferWriter writer(logfile);
Brian Smarttea913d42021-12-10 15:02:38 -08002768 writer.QueueSpan(config.span());
2769 writer.QueueSpan(m1.span());
2770 writer.QueueSpan(m2.span());
2771 writer.QueueSpan(m3_span);
2772 writer.QueueSpan(m4.span()); // This message is "hidden"
2773 }
2774
2775 {
2776 MessageReader reader(logfile);
2777
2778 EXPECT_EQ(reader.filename(), logfile);
2779
2780 EXPECT_EQ(
2781 reader.max_out_of_order_duration(),
2782 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2783 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2784 EXPECT_TRUE(reader.ReadMessage());
2785 EXPECT_EQ(reader.newest_timestamp(),
2786 monotonic_clock::time_point(chrono::nanoseconds(1)));
2787 EXPECT_TRUE(reader.ReadMessage());
2788 EXPECT_EQ(reader.newest_timestamp(),
2789 monotonic_clock::time_point(chrono::nanoseconds(2)));
2790 // Confirm default crashing behavior
2791 EXPECT_DEATH(reader.ReadMessage(), "Corrupted message at offset");
2792 }
2793
2794 {
2795 gflags::FlagSaver fs;
2796
2797 MessageReader reader(logfile);
2798 reader.set_crash_on_corrupt_message_flag(false);
2799
2800 EXPECT_EQ(reader.filename(), logfile);
2801
2802 EXPECT_EQ(
2803 reader.max_out_of_order_duration(),
2804 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2805 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2806 EXPECT_TRUE(reader.ReadMessage());
2807 EXPECT_EQ(reader.newest_timestamp(),
2808 monotonic_clock::time_point(chrono::nanoseconds(1)));
2809 EXPECT_TRUE(reader.ReadMessage());
2810 EXPECT_EQ(reader.newest_timestamp(),
2811 monotonic_clock::time_point(chrono::nanoseconds(2)));
2812 // Confirm avoiding the corrupted message crash, stopping instead.
2813 EXPECT_FALSE(reader.ReadMessage());
2814 }
2815
2816 {
2817 gflags::FlagSaver fs;
2818
2819 MessageReader reader(logfile);
2820 reader.set_crash_on_corrupt_message_flag(false);
2821 reader.set_ignore_corrupt_messages_flag(true);
2822
2823 EXPECT_EQ(reader.filename(), logfile);
2824
2825 EXPECT_EQ(
2826 reader.max_out_of_order_duration(),
2827 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2828 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2829 EXPECT_TRUE(reader.ReadMessage());
2830 EXPECT_EQ(reader.newest_timestamp(),
2831 monotonic_clock::time_point(chrono::nanoseconds(1)));
2832 EXPECT_TRUE(reader.ReadMessage());
2833 EXPECT_EQ(reader.newest_timestamp(),
2834 monotonic_clock::time_point(chrono::nanoseconds(2)));
2835 // Confirm skipping of the corrupted message to read the hidden one.
2836 EXPECT_TRUE(reader.ReadMessage());
2837 EXPECT_EQ(reader.newest_timestamp(),
2838 monotonic_clock::time_point(chrono::nanoseconds(4)));
2839 EXPECT_FALSE(reader.ReadMessage());
2840 }
2841}
2842
Austin Schuhfa30c352022-10-16 11:12:02 -07002843class InlinePackMessage : public ::testing::Test {
2844 protected:
2845 aos::Context RandomContext() {
2846 data_ = RandomData();
2847 std::uniform_int_distribution<uint32_t> uint32_distribution(
2848 std::numeric_limits<uint32_t>::min(),
2849 std::numeric_limits<uint32_t>::max());
2850
2851 std::uniform_int_distribution<int64_t> time_distribution(
2852 std::numeric_limits<int64_t>::min(),
2853 std::numeric_limits<int64_t>::max());
2854
2855 aos::Context context;
2856 context.monotonic_event_time =
2857 aos::monotonic_clock::epoch() +
2858 chrono::nanoseconds(time_distribution(random_number_generator_));
2859 context.realtime_event_time =
2860 aos::realtime_clock::epoch() +
2861 chrono::nanoseconds(time_distribution(random_number_generator_));
2862
2863 context.monotonic_remote_time =
2864 aos::monotonic_clock::epoch() +
2865 chrono::nanoseconds(time_distribution(random_number_generator_));
2866 context.realtime_remote_time =
2867 aos::realtime_clock::epoch() +
2868 chrono::nanoseconds(time_distribution(random_number_generator_));
2869
2870 context.queue_index = uint32_distribution(random_number_generator_);
2871 context.remote_queue_index = uint32_distribution(random_number_generator_);
2872 context.size = data_.size();
2873 context.data = data_.data();
2874 return context;
2875 }
2876
2877 std::vector<uint8_t> RandomData() {
2878 std::vector<uint8_t> result;
2879 std::uniform_int_distribution<int> length_distribution(1, 32);
2880 std::uniform_int_distribution<uint8_t> data_distribution(
2881 std::numeric_limits<uint8_t>::min(),
2882 std::numeric_limits<uint8_t>::max());
2883
2884 const size_t length = length_distribution(random_number_generator_);
2885
2886 result.reserve(length);
2887 for (size_t i = 0; i < length; ++i) {
2888 result.emplace_back(data_distribution(random_number_generator_));
2889 }
2890 return result;
2891 }
2892
2893 std::mt19937 random_number_generator_{
2894 std::mt19937(::aos::testing::RandomSeed())};
2895
2896 std::vector<uint8_t> data_;
2897};
2898
2899// Uses the binary schema to annotate a provided flatbuffer. Returns the
2900// annotated flatbuffer.
2901std::string AnnotateBinaries(
2902 const aos::NonSizePrefixedFlatbuffer<reflection::Schema> &schema,
2903 const std::string &schema_filename,
2904 flatbuffers::span<uint8_t> binary_data) {
2905 flatbuffers::BinaryAnnotator binary_annotator(
2906 schema.span().data(), schema.span().size(), binary_data.data(),
2907 binary_data.size());
2908
2909 auto annotations = binary_annotator.Annotate();
2910
2911 flatbuffers::AnnotatedBinaryTextGenerator text_generator(
2912 flatbuffers::AnnotatedBinaryTextGenerator::Options{}, annotations,
2913 binary_data.data(), binary_data.size());
2914
2915 text_generator.Generate(aos::testing::TestTmpDir() + "/foo.bfbs",
2916 schema_filename);
2917
2918 return aos::util::ReadFileToStringOrDie(aos::testing::TestTmpDir() +
2919 "/foo.afb");
2920}
2921
2922// Tests that all variations of PackMessage are equivalent to the inline
2923// PackMessage used to avoid allocations.
2924TEST_F(InlinePackMessage, Equivilent) {
2925 std::uniform_int_distribution<uint32_t> uint32_distribution(
2926 std::numeric_limits<uint32_t>::min(),
2927 std::numeric_limits<uint32_t>::max());
2928 aos::FlatbufferVector<reflection::Schema> schema =
2929 FileToFlatbuffer<reflection::Schema>(
2930 ArtifactPath("aos/events/logging/logger.bfbs"));
2931
2932 for (const LogType type :
2933 {LogType::kLogMessage, LogType::kLogDeliveryTimeOnly,
2934 LogType::kLogMessageAndDeliveryTime, LogType::kLogRemoteMessage}) {
2935 for (int i = 0; i < 100; ++i) {
2936 aos::Context context = RandomContext();
2937 const uint32_t channel_index =
2938 uint32_distribution(random_number_generator_);
2939
2940 flatbuffers::FlatBufferBuilder fbb;
2941 fbb.ForceDefaults(true);
2942 fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index, type));
2943
2944 VLOG(1) << absl::BytesToHexString(std::string_view(
2945 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
2946 fbb.GetBufferSpan().size()));
2947
2948 // Make sure that both the builder and inline method agree on sizes.
2949 ASSERT_EQ(fbb.GetSize(), PackMessageSize(type, context))
2950 << "log type " << static_cast<int>(type);
2951
2952 // Initialize the buffer to something nonzero to make sure all the padding
2953 // bytes are set to 0.
2954 std::vector<uint8_t> repacked_message(PackMessageSize(type, context), 67);
2955
2956 // And verify packing inline works as expected.
2957 EXPECT_EQ(repacked_message.size(),
2958 PackMessageInline(repacked_message.data(), context,
2959 channel_index, type));
2960 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
2961 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
2962 fbb.GetBufferSpan().size()))
2963 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
2964 fbb.GetBufferSpan());
2965 }
2966 }
2967}
2968
2969// TODO(austin): I need a method to cpoy the RemoteMessage without mallocing
2970// too.
2971
Austin Schuhc243b422020-10-11 15:35:08 -07002972} // namespace testing
2973} // namespace logger
2974} // namespace aos