blob: 30a0ada98628f39839ece7bd9084bf5bad865841 [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
Alexei Strots01395492023-03-20 13:59:56 -07004#include <filesystem>
Austin Schuhfa30c352022-10-16 11:12:02 -07005#include <random>
Austin Schuhe243aaf2020-10-11 15:46:02 -07006#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07007
Austin Schuhfa30c352022-10-16 11:12:02 -07008#include "absl/strings/escaping.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07009#include "external/com_github_google_flatbuffers/src/annotated_binary_text_gen.h"
10#include "external/com_github_google_flatbuffers/src/binary_annotator.h"
11#include "flatbuffers/reflection_generated.h"
12#include "gflags/gflags.h"
13#include "gtest/gtest.h"
14
Austin Schuhc41603c2020-10-11 16:17:37 -070015#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -070016#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -080017#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070018#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070019#include "aos/json_to_flatbuffer.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070020#include "aos/testing/path.h"
21#include "aos/testing/random_seed.h"
Austin Schuhc243b422020-10-11 15:35:08 -070022#include "aos/testing/tmpdir.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070023#include "aos/util/file.h"
Austin Schuhc243b422020-10-11 15:35:08 -070024
Stephan Pleinesf63bde82024-01-13 15:59:33 -080025namespace aos::logger::testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070026namespace chrono = std::chrono;
Austin Schuhf2d0e682022-10-16 14:20:58 -070027using aos::message_bridge::RemoteMessage;
Eric Schmiedebergb38477e2022-12-02 16:08:04 -070028using aos::testing::ArtifactPath;
Austin Schuhc243b422020-10-11 15:35:08 -070029
Austin Schuhd863e6e2022-10-16 15:44:50 -070030// Adapter class to make it easy to test DetachedBufferWriter without adding
31// test only boilerplate to DetachedBufferWriter.
Alexei Strots15c22b12023-04-04 16:27:17 -070032class TestDetachedBufferWriter : public FileBackend,
33 public DetachedBufferWriter {
Austin Schuhd863e6e2022-10-16 15:44:50 -070034 public:
Austin Schuh48d10d62022-10-16 22:19:23 -070035 // Pick a max size that is rather conservative.
36 static constexpr size_t kMaxMessageSize = 128 * 1024;
Austin Schuhd863e6e2022-10-16 15:44:50 -070037 TestDetachedBufferWriter(std::string_view filename)
colleen61276dc2023-06-01 09:23:29 -070038 : FileBackend("/", false),
Alexei Strots15c22b12023-04-04 16:27:17 -070039 DetachedBufferWriter(FileBackend::RequestFile(filename),
40 std::make_unique<DummyEncoder>(kMaxMessageSize)) {}
Austin Schuhd863e6e2022-10-16 15:44:50 -070041 void WriteSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
42 QueueSpan(absl::Span<const uint8_t>(buffer.data(), buffer.size()));
43 }
Austin Schuh7ef11a42023-02-04 17:15:12 -080044 void QueueSpan(absl::Span<const uint8_t> buffer) {
45 DataEncoder::SpanCopier coppier(buffer);
46 CopyMessage(&coppier, monotonic_clock::now());
47 }
Austin Schuhd863e6e2022-10-16 15:44:50 -070048};
49
Austin Schuhe243aaf2020-10-11 15:46:02 -070050// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070051template <typename T>
52SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
53 const std::string_view data) {
54 flatbuffers::FlatBufferBuilder fbb;
55 fbb.ForceDefaults(true);
56 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
57 return fbb.Release();
58}
59
Austin Schuhe243aaf2020-10-11 15:46:02 -070060// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070061TEST(SpanReaderTest, ReadWrite) {
62 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
63 unlink(logfile.c_str());
64
65 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080066 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070067 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080068 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070069
70 {
Austin Schuhd863e6e2022-10-16 15:44:50 -070071 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080072 writer.QueueSpan(m1.span());
73 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070074 }
75
76 SpanReader reader(logfile);
77
78 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhcf5f6442021-07-06 10:43:28 -070079 EXPECT_EQ(reader.PeekMessage(), m1.span());
80 EXPECT_EQ(reader.PeekMessage(), m1.span());
Austin Schuhadd6eb32020-11-09 21:24:26 -080081 EXPECT_EQ(reader.ReadMessage(), m1.span());
82 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhcf5f6442021-07-06 10:43:28 -070083 EXPECT_EQ(reader.PeekMessage(), absl::Span<const uint8_t>());
Austin Schuhc243b422020-10-11 15:35:08 -070084 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
85}
86
Austin Schuhe243aaf2020-10-11 15:46:02 -070087// Tests that we can actually parse the resulting messages at a basic level
88// through MessageReader.
89TEST(MessageReaderTest, ReadWrite) {
90 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
91 unlink(logfile.c_str());
92
93 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
94 JsonToSizedFlatbuffer<LogFileHeader>(
95 R"({ "max_out_of_order_duration": 100000000 })");
96 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
97 JsonToSizedFlatbuffer<MessageHeader>(
98 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
99 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
100 JsonToSizedFlatbuffer<MessageHeader>(
101 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
102
103 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700104 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800105 writer.QueueSpan(config.span());
106 writer.QueueSpan(m1.span());
107 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -0700108 }
109
110 MessageReader reader(logfile);
111
112 EXPECT_EQ(reader.filename(), logfile);
113
114 EXPECT_EQ(
115 reader.max_out_of_order_duration(),
116 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
117 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
118 EXPECT_TRUE(reader.ReadMessage());
119 EXPECT_EQ(reader.newest_timestamp(),
120 monotonic_clock::time_point(chrono::nanoseconds(1)));
121 EXPECT_TRUE(reader.ReadMessage());
122 EXPECT_EQ(reader.newest_timestamp(),
123 monotonic_clock::time_point(chrono::nanoseconds(2)));
124 EXPECT_FALSE(reader.ReadMessage());
125}
126
Austin Schuh32f68492020-11-08 21:45:51 -0800127// Tests that we explode when messages are too far out of order.
128TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
129 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
130 unlink(logfile0.c_str());
131
132 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
133 JsonToSizedFlatbuffer<LogFileHeader>(
134 R"({
135 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800136 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800137 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
138 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
139 "parts_index": 0
140})");
141
142 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
143 JsonToSizedFlatbuffer<MessageHeader>(
144 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
145 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
146 JsonToSizedFlatbuffer<MessageHeader>(
147 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
148 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
149 JsonToSizedFlatbuffer<MessageHeader>(
150 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
151
152 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700153 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800154 writer.QueueSpan(config0.span());
155 writer.QueueSpan(m1.span());
156 writer.QueueSpan(m2.span());
157 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800158 }
Alexei Strots01395492023-03-20 13:59:56 -0700159 ASSERT_TRUE(std::filesystem::exists(logfile0)) << logfile0;
Austin Schuh32f68492020-11-08 21:45:51 -0800160
161 const std::vector<LogFile> parts = SortParts({logfile0});
162
163 PartsMessageReader reader(parts[0].parts[0]);
164
165 EXPECT_TRUE(reader.ReadMessage());
166 EXPECT_TRUE(reader.ReadMessage());
167 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
168}
169
Austin Schuhc41603c2020-10-11 16:17:37 -0700170// Tests that we can transparently re-assemble part files with a
171// PartsMessageReader.
172TEST(PartsMessageReaderTest, ReadWrite) {
173 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
174 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
175 unlink(logfile0.c_str());
176 unlink(logfile1.c_str());
177
178 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
179 JsonToSizedFlatbuffer<LogFileHeader>(
180 R"({
181 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800182 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700183 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
184 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
185 "parts_index": 0
186})");
187 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
188 JsonToSizedFlatbuffer<LogFileHeader>(
189 R"({
190 "max_out_of_order_duration": 200000000,
191 "monotonic_start_time": 0,
192 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800193 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700194 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
195 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
196 "parts_index": 1
197})");
198
199 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
200 JsonToSizedFlatbuffer<MessageHeader>(
201 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
202 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
203 JsonToSizedFlatbuffer<MessageHeader>(
204 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
205
206 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700207 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800208 writer.QueueSpan(config0.span());
209 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700210 }
211 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700212 TestDetachedBufferWriter writer(logfile1);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800213 writer.QueueSpan(config1.span());
214 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700215 }
216
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700217 // When parts are sorted, we choose the highest max out of order duration for
218 // all parts with the same part uuid.
Austin Schuhc41603c2020-10-11 16:17:37 -0700219 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
220
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700221 EXPECT_EQ(parts.size(), 1);
222 EXPECT_EQ(parts[0].parts.size(), 1);
223
Austin Schuhc41603c2020-10-11 16:17:37 -0700224 PartsMessageReader reader(parts[0].parts[0]);
225
226 EXPECT_EQ(reader.filename(), logfile0);
227
228 // Confirm that the timestamps track, and the filename also updates.
229 // Read the first message.
230 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700231 // Since config1 has higher max out of order duration, that will be used to
232 // read partfiles with same part uuid, i.e logfile0 and logfile1.
Austin Schuhc41603c2020-10-11 16:17:37 -0700233 EXPECT_EQ(
234 reader.max_out_of_order_duration(),
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700235 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuhc41603c2020-10-11 16:17:37 -0700236 EXPECT_TRUE(reader.ReadMessage());
237 EXPECT_EQ(reader.filename(), logfile0);
238 EXPECT_EQ(reader.newest_timestamp(),
239 monotonic_clock::time_point(chrono::nanoseconds(1)));
240 EXPECT_EQ(
241 reader.max_out_of_order_duration(),
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700242 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuhc41603c2020-10-11 16:17:37 -0700243
244 // Read the second message.
245 EXPECT_TRUE(reader.ReadMessage());
246 EXPECT_EQ(reader.filename(), logfile1);
247 EXPECT_EQ(reader.newest_timestamp(),
248 monotonic_clock::time_point(chrono::nanoseconds(2)));
249 EXPECT_EQ(
250 reader.max_out_of_order_duration(),
251 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
252
253 // And then confirm that reading again returns no message.
254 EXPECT_FALSE(reader.ReadMessage());
255 EXPECT_EQ(reader.filename(), logfile1);
256 EXPECT_EQ(
257 reader.max_out_of_order_duration(),
258 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800259 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700260
261 // Verify that the parts metadata has the correct max out of order duration.
262 EXPECT_EQ(
263 parts[0].parts[0].max_out_of_order_duration,
264 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuhc41603c2020-10-11 16:17:37 -0700265}
Austin Schuh32f68492020-11-08 21:45:51 -0800266
Austin Schuh1be0ce42020-11-29 22:43:26 -0800267// Tests that Message's operator < works as expected.
268TEST(MessageTest, Sorting) {
269 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
270
271 Message m1{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700272 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700273 .timestamp =
274 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
Austin Schuh48507722021-07-17 17:29:24 -0700275 .monotonic_remote_boot = 0xffffff,
276 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700277 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800278 Message m2{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700279 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700280 .timestamp =
281 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
Austin Schuh48507722021-07-17 17:29:24 -0700282 .monotonic_remote_boot = 0xffffff,
283 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700284 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800285
286 EXPECT_LT(m1, m2);
287 EXPECT_GE(m2, m1);
288
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700289 m1.timestamp.time = e;
290 m2.timestamp.time = e;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800291
292 m1.channel_index = 1;
293 m2.channel_index = 2;
294
295 EXPECT_LT(m1, m2);
296 EXPECT_GE(m2, m1);
297
298 m1.channel_index = 0;
299 m2.channel_index = 0;
Austin Schuh58646e22021-08-23 23:51:46 -0700300 m1.queue_index.index = 0u;
301 m2.queue_index.index = 1u;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800302
303 EXPECT_LT(m1, m2);
304 EXPECT_GE(m2, m1);
305}
306
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800307aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
308 const aos::FlatbufferDetachedBuffer<Configuration> &config,
309 const std::string_view json) {
310 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700311 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800312 flatbuffers::Offset<Configuration> config_offset =
313 aos::CopyFlatBuffer(config, &fbb);
314 LogFileHeader::Builder header_builder(fbb);
315 header_builder.add_configuration(config_offset);
316 fbb.Finish(header_builder.Finish());
317 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
318
319 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
320 JsonToFlatbuffer<LogFileHeader>(json));
321 CHECK(header_updates.Verify());
322 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700323 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800324 fbb2.FinishSizePrefixed(
325 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
326 return fbb2.Release();
327}
328
329class SortingElementTest : public ::testing::Test {
330 public:
331 SortingElementTest()
332 : config_(JsonToFlatbuffer<Configuration>(
333 R"({
334 "channels": [
335 {
336 "name": "/a",
337 "type": "aos.logger.testing.TestMessage",
338 "source_node": "pi1",
339 "destination_nodes": [
340 {
341 "name": "pi2"
342 },
343 {
344 "name": "pi3"
345 }
346 ]
347 },
348 {
349 "name": "/b",
350 "type": "aos.logger.testing.TestMessage",
351 "source_node": "pi1"
352 },
353 {
354 "name": "/c",
355 "type": "aos.logger.testing.TestMessage",
356 "source_node": "pi1"
Austin Schuh48507722021-07-17 17:29:24 -0700357 },
358 {
359 "name": "/d",
360 "type": "aos.logger.testing.TestMessage",
361 "source_node": "pi2",
362 "destination_nodes": [
363 {
364 "name": "pi1"
365 }
366 ]
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800367 }
368 ],
369 "nodes": [
370 {
371 "name": "pi1"
372 },
373 {
374 "name": "pi2"
375 },
376 {
377 "name": "pi3"
378 }
379 ]
380}
381)")),
382 config0_(MakeHeader(config_, R"({
383 /* 100ms */
384 "max_out_of_order_duration": 100000000,
385 "node": {
386 "name": "pi1"
387 },
388 "logger_node": {
389 "name": "pi1"
390 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800391 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800392 "realtime_start_time": 1000000000000,
393 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700394 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
395 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
396 "boot_uuids": [
397 "1d782c63-b3c7-466e-bea9-a01308b43333",
398 "",
399 ""
400 ],
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800401 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
402 "parts_index": 0
403})")),
404 config1_(MakeHeader(config_,
405 R"({
406 /* 100ms */
407 "max_out_of_order_duration": 100000000,
408 "node": {
409 "name": "pi1"
410 },
411 "logger_node": {
412 "name": "pi1"
413 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800414 "monotonic_start_time": 1000000,
415 "realtime_start_time": 1000000000000,
416 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700417 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
418 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
419 "boot_uuids": [
420 "1d782c63-b3c7-466e-bea9-a01308b43333",
421 "",
422 ""
423 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800424 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
425 "parts_index": 0
426})")),
427 config2_(MakeHeader(config_,
428 R"({
429 /* 100ms */
430 "max_out_of_order_duration": 100000000,
431 "node": {
432 "name": "pi2"
433 },
434 "logger_node": {
435 "name": "pi2"
436 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800437 "monotonic_start_time": 0,
438 "realtime_start_time": 1000000000000,
Austin Schuh48507722021-07-17 17:29:24 -0700439 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
440 "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
441 "boot_uuids": [
442 "",
443 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
444 ""
445 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800446 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
447 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
448 "parts_index": 0
449})")),
450 config3_(MakeHeader(config_,
451 R"({
452 /* 100ms */
453 "max_out_of_order_duration": 100000000,
454 "node": {
455 "name": "pi1"
456 },
457 "logger_node": {
458 "name": "pi1"
459 },
460 "monotonic_start_time": 2000000,
461 "realtime_start_time": 1000000000,
462 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
Austin Schuh48507722021-07-17 17:29:24 -0700463 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
464 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
465 "boot_uuids": [
466 "1d782c63-b3c7-466e-bea9-a01308b43333",
467 "",
468 ""
469 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800470 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800471 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800472})")),
473 config4_(MakeHeader(config_,
474 R"({
475 /* 100ms */
476 "max_out_of_order_duration": 100000000,
477 "node": {
478 "name": "pi2"
479 },
480 "logger_node": {
481 "name": "pi1"
482 },
483 "monotonic_start_time": 2000000,
484 "realtime_start_time": 1000000000,
485 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
486 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
Austin Schuh48507722021-07-17 17:29:24 -0700487 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
488 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
489 "boot_uuids": [
490 "1d782c63-b3c7-466e-bea9-a01308b43333",
491 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
492 ""
493 ],
Austin Schuh8bf1e632021-01-02 22:41:04 -0800494 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800495})")) {
496 unlink(logfile0_.c_str());
497 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800498 unlink(logfile2_.c_str());
Austin Schuh48507722021-07-17 17:29:24 -0700499 unlink(logfile3_.c_str());
Austin Schuh921b2562021-07-31 19:37:03 -0700500 queue_index_.resize(config_.message().channels()->size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800501 }
502
503 protected:
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800504 flatbuffers::DetachedBuffer MakeLogMessage(
505 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
506 int value) {
507 flatbuffers::FlatBufferBuilder message_fbb;
508 message_fbb.ForceDefaults(true);
509 TestMessage::Builder test_message_builder(message_fbb);
510 test_message_builder.add_value(value);
511 message_fbb.Finish(test_message_builder.Finish());
512
513 aos::Context context;
514 context.monotonic_event_time = monotonic_now;
515 context.realtime_event_time = aos::realtime_clock::epoch() +
516 chrono::seconds(1000) +
517 monotonic_now.time_since_epoch();
Austin Schuh921b2562021-07-31 19:37:03 -0700518 CHECK_LT(static_cast<size_t>(channel_index), queue_index_.size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800519 context.queue_index = queue_index_[channel_index];
520 context.size = message_fbb.GetSize();
521 context.data = message_fbb.GetBufferPointer();
522
523 ++queue_index_[channel_index];
524
525 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh53e3a812021-11-03 16:57:23 -0700526 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800527 fbb.FinishSizePrefixed(
528 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
529
530 return fbb.Release();
531 }
532
533 flatbuffers::DetachedBuffer MakeTimestampMessage(
534 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800535 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
536 monotonic_clock::time_point monotonic_timestamp_time =
537 monotonic_clock::min_time) {
538 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800539 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800540
541 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800542 fbb.ForceDefaults(true);
543
544 logger::MessageHeader::Builder message_header_builder(fbb);
545
546 message_header_builder.add_channel_index(channel_index);
547
548 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
549 100);
550 message_header_builder.add_monotonic_sent_time(
551 monotonic_sent_time.time_since_epoch().count());
552 message_header_builder.add_realtime_sent_time(
553 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
554 monotonic_sent_time.time_since_epoch())
555 .time_since_epoch()
556 .count());
557
558 message_header_builder.add_monotonic_remote_time(
559 sender_monotonic_now.time_since_epoch().count());
560 message_header_builder.add_realtime_remote_time(
561 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
562 sender_monotonic_now.time_since_epoch())
563 .time_since_epoch()
564 .count());
565 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
566 1);
567
568 if (monotonic_timestamp_time != monotonic_clock::min_time) {
569 message_header_builder.add_monotonic_timestamp_time(
570 monotonic_timestamp_time.time_since_epoch().count());
571 }
572
573 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800574 LOG(INFO) << aos::FlatbufferToJson(
575 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
576 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
577
578 return fbb.Release();
579 }
580
581 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
582 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800583 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh48507722021-07-17 17:29:24 -0700584 const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800585
586 const aos::FlatbufferDetachedBuffer<Configuration> config_;
587 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
588 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800589 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
590 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800591 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800592
593 std::vector<uint32_t> queue_index_;
594};
595
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700596using MessageSorterTest = SortingElementTest;
597using MessageSorterDeathTest = MessageSorterTest;
598using PartsMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800599using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800600
601// Tests that we can pull messages out of a log sorted in order.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700602TEST_F(MessageSorterTest, Pull) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800603 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
604 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700605 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800606 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700607 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800608 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700609 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800610 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700611 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800612 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700613 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800614 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
615 }
616
617 const std::vector<LogFile> parts = SortParts({logfile0_});
618
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700619 MessageSorter message_sorter(parts[0].parts[0]);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800620
621 // Confirm we aren't sorted until any time until the message is popped.
622 // Peeking shouldn't change the sorted until time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700623 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800624
625 std::deque<Message> output;
626
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700627 ASSERT_TRUE(message_sorter.Front() != nullptr);
628 output.emplace_back(std::move(*message_sorter.Front()));
629 message_sorter.PopFront();
630 EXPECT_EQ(message_sorter.sorted_until(), e + chrono::milliseconds(1900));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800631
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700632 ASSERT_TRUE(message_sorter.Front() != nullptr);
633 output.emplace_back(std::move(*message_sorter.Front()));
634 message_sorter.PopFront();
635 EXPECT_EQ(message_sorter.sorted_until(), e + chrono::milliseconds(1900));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800636
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700637 ASSERT_TRUE(message_sorter.Front() != nullptr);
638 output.emplace_back(std::move(*message_sorter.Front()));
639 message_sorter.PopFront();
640 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::max_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800641
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700642 ASSERT_TRUE(message_sorter.Front() != nullptr);
643 output.emplace_back(std::move(*message_sorter.Front()));
644 message_sorter.PopFront();
645 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::max_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800646
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700647 ASSERT_TRUE(message_sorter.Front() == nullptr);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800648
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700649 EXPECT_EQ(output[0].timestamp.boot, 0);
650 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
651 EXPECT_EQ(output[1].timestamp.boot, 0);
652 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1000));
653 EXPECT_EQ(output[2].timestamp.boot, 0);
654 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1901));
655 EXPECT_EQ(output[3].timestamp.boot, 0);
656 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800657}
658
Austin Schuhb000de62020-12-03 22:00:40 -0800659// Tests that we can pull messages out of a log sorted in order.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700660TEST_F(MessageSorterTest, WayBeforeStart) {
Austin Schuhb000de62020-12-03 22:00:40 -0800661 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
662 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700663 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhb000de62020-12-03 22:00:40 -0800664 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700665 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800666 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700667 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800668 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700669 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800670 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700671 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800672 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700673 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800674 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
675 }
676
677 const std::vector<LogFile> parts = SortParts({logfile0_});
678
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700679 MessageSorter message_sorter(parts[0].parts[0]);
Austin Schuhb000de62020-12-03 22:00:40 -0800680
681 // Confirm we aren't sorted until any time until the message is popped.
682 // Peeking shouldn't change the sorted until time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700683 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
Austin Schuhb000de62020-12-03 22:00:40 -0800684
685 std::deque<Message> output;
686
687 for (monotonic_clock::time_point t :
688 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
689 e + chrono::milliseconds(1900), monotonic_clock::max_time,
690 monotonic_clock::max_time}) {
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700691 ASSERT_TRUE(message_sorter.Front() != nullptr);
692 output.emplace_back(std::move(*message_sorter.Front()));
693 message_sorter.PopFront();
694 EXPECT_EQ(message_sorter.sorted_until(), t);
Austin Schuhb000de62020-12-03 22:00:40 -0800695 }
696
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700697 ASSERT_TRUE(message_sorter.Front() == nullptr);
Austin Schuhb000de62020-12-03 22:00:40 -0800698
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700699 EXPECT_EQ(output[0].timestamp.boot, 0u);
700 EXPECT_EQ(output[0].timestamp.time, e - chrono::milliseconds(1000));
701 EXPECT_EQ(output[1].timestamp.boot, 0u);
702 EXPECT_EQ(output[1].timestamp.time, e - chrono::milliseconds(500));
703 EXPECT_EQ(output[2].timestamp.boot, 0u);
704 EXPECT_EQ(output[2].timestamp.time, e - chrono::milliseconds(10));
705 EXPECT_EQ(output[3].timestamp.boot, 0u);
706 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(1901));
707 EXPECT_EQ(output[4].timestamp.boot, 0u);
708 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuhb000de62020-12-03 22:00:40 -0800709}
710
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800711// Tests that messages too far out of order trigger death.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700712TEST_F(MessageSorterDeathTest, Pull) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800713 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
714 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700715 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800716 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700717 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800718 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700719 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800720 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700721 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800722 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
723 // The following message is too far out of order and will trigger the CHECK.
Austin Schuhd863e6e2022-10-16 15:44:50 -0700724 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800725 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
726 }
727
728 const std::vector<LogFile> parts = SortParts({logfile0_});
729
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700730 MessageSorter message_sorter(parts[0].parts[0]);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800731
732 // Confirm we aren't sorted until any time until the message is popped.
733 // Peeking shouldn't change the sorted until time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700734 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800735 std::deque<Message> output;
736
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700737 ASSERT_TRUE(message_sorter.Front() != nullptr);
738 message_sorter.PopFront();
739 ASSERT_TRUE(message_sorter.Front() != nullptr);
740 ASSERT_TRUE(message_sorter.Front() != nullptr);
741 message_sorter.PopFront();
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800742
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700743 EXPECT_DEATH({ message_sorter.Front(); },
Austin Schuh58646e22021-08-23 23:51:46 -0700744 "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800745}
746
Austin Schuh8f52ed52020-11-30 23:12:39 -0800747// Tests that we can merge data from 2 separate files, including duplicate data.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700748TEST_F(PartsMergerTest, TwoFileMerger) {
Austin Schuh8f52ed52020-11-30 23:12:39 -0800749 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
750 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700751 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800752 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700753 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800754 writer1.QueueSpan(config1_.span());
755
Austin Schuhd863e6e2022-10-16 15:44:50 -0700756 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800757 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700758 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800759 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
760
Austin Schuhd863e6e2022-10-16 15:44:50 -0700761 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800762 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700763 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800764 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
765
766 // Make a duplicate!
767 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
768 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
769 writer0.QueueSpan(msg.span());
770 writer1.QueueSpan(msg.span());
771
Austin Schuhd863e6e2022-10-16 15:44:50 -0700772 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800773 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
774 }
775
776 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -0700777 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -0800778 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800779
Austin Schuh63097262023-08-16 17:04:29 -0700780 PartsMerger merger(
781 log_files.SelectParts("pi1", 0,
782 {StoredDataType::DATA, StoredDataType::TIMESTAMPS,
783 StoredDataType::REMOTE_TIMESTAMPS}));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800784
785 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
786
787 std::deque<Message> output;
788
789 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
790 ASSERT_TRUE(merger.Front() != nullptr);
791 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
792
793 output.emplace_back(std::move(*merger.Front()));
794 merger.PopFront();
795 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
796
797 ASSERT_TRUE(merger.Front() != nullptr);
798 output.emplace_back(std::move(*merger.Front()));
799 merger.PopFront();
800 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
801
802 ASSERT_TRUE(merger.Front() != nullptr);
803 output.emplace_back(std::move(*merger.Front()));
804 merger.PopFront();
805 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
806
807 ASSERT_TRUE(merger.Front() != nullptr);
808 output.emplace_back(std::move(*merger.Front()));
809 merger.PopFront();
810 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
811
812 ASSERT_TRUE(merger.Front() != nullptr);
813 output.emplace_back(std::move(*merger.Front()));
814 merger.PopFront();
815 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
816
817 ASSERT_TRUE(merger.Front() != nullptr);
818 output.emplace_back(std::move(*merger.Front()));
819 merger.PopFront();
820 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
821
822 ASSERT_TRUE(merger.Front() == nullptr);
823
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700824 EXPECT_EQ(output[0].timestamp.boot, 0u);
825 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
826 EXPECT_EQ(output[1].timestamp.boot, 0u);
827 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1001));
828 EXPECT_EQ(output[2].timestamp.boot, 0u);
829 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1002));
830 EXPECT_EQ(output[3].timestamp.boot, 0u);
831 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
832 EXPECT_EQ(output[4].timestamp.boot, 0u);
833 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(3000));
834 EXPECT_EQ(output[5].timestamp.boot, 0u);
835 EXPECT_EQ(output[5].timestamp.time, e + chrono::milliseconds(3002));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800836}
837
Austin Schuh8bf1e632021-01-02 22:41:04 -0800838// Tests that we can merge timestamps with various combinations of
839// monotonic_timestamp_time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700840TEST_F(PartsMergerTest, TwoFileTimestampMerger) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800841 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
842 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700843 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800844 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700845 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800846 writer1.QueueSpan(config1_.span());
847
848 // Neither has it.
849 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700850 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800851 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700852 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800853 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
854
855 // First only has it.
856 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700857 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800858 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
859 e + chrono::nanoseconds(971)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700860 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800861 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
862
863 // Second only has it.
864 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700865 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800866 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700867 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800868 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
869 e + chrono::nanoseconds(972)));
870
871 // Both have it.
872 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700873 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800874 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
875 e + chrono::nanoseconds(973)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700876 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800877 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
878 e + chrono::nanoseconds(973)));
879 }
880
881 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -0700882 LogFilesContainer log_files(parts);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800883 ASSERT_EQ(parts.size(), 1u);
884
Austin Schuh63097262023-08-16 17:04:29 -0700885 PartsMerger merger(
886 log_files.SelectParts("pi1", 0,
887 {StoredDataType::DATA, StoredDataType::TIMESTAMPS,
888 StoredDataType::REMOTE_TIMESTAMPS}));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800889
890 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
891
892 std::deque<Message> output;
893
894 for (int i = 0; i < 4; ++i) {
895 ASSERT_TRUE(merger.Front() != nullptr);
896 output.emplace_back(std::move(*merger.Front()));
897 merger.PopFront();
898 }
899 ASSERT_TRUE(merger.Front() == nullptr);
900
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700901 EXPECT_EQ(output[0].timestamp.boot, 0u);
902 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700903 EXPECT_FALSE(output[0].data->has_monotonic_timestamp_time);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700904
905 EXPECT_EQ(output[1].timestamp.boot, 0u);
906 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700907 EXPECT_TRUE(output[1].data->has_monotonic_timestamp_time);
908 EXPECT_EQ(output[1].data->monotonic_timestamp_time,
909 monotonic_clock::time_point(std::chrono::nanoseconds(971)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700910
911 EXPECT_EQ(output[2].timestamp.boot, 0u);
912 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700913 EXPECT_TRUE(output[2].data->has_monotonic_timestamp_time);
914 EXPECT_EQ(output[2].data->monotonic_timestamp_time,
915 monotonic_clock::time_point(std::chrono::nanoseconds(972)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700916
917 EXPECT_EQ(output[3].timestamp.boot, 0u);
918 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700919 EXPECT_TRUE(output[3].data->has_monotonic_timestamp_time);
920 EXPECT_EQ(output[3].data->monotonic_timestamp_time,
921 monotonic_clock::time_point(std::chrono::nanoseconds(973)));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800922}
923
Austin Schuhd2f96102020-12-01 20:27:29 -0800924// Tests that we can match timestamps on delivered messages.
925TEST_F(TimestampMapperTest, ReadNode0First) {
926 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
927 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700928 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800929 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700930 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800931 writer1.QueueSpan(config2_.span());
932
Austin Schuhd863e6e2022-10-16 15:44:50 -0700933 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800934 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700935 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800936 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
937
Austin Schuhd863e6e2022-10-16 15:44:50 -0700938 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800939 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700940 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800941 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
942
Austin Schuhd863e6e2022-10-16 15:44:50 -0700943 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800944 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700945 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800946 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
947 }
948
949 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -0700950 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -0800951 ASSERT_EQ(parts[0].logger_node, "pi1");
952 ASSERT_EQ(parts[1].logger_node, "pi2");
953
Austin Schuh79b30942021-01-24 22:32:21 -0800954 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -0700955
Austin Schuh63097262023-08-16 17:04:29 -0700956 TimestampMapper mapper0("pi1", log_files,
957 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -0800958 mapper0.set_timestamp_callback(
959 [&](TimestampedMessage *) { ++mapper0_count; });
960 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -0700961 TimestampMapper mapper1("pi2", log_files,
962 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -0800963 mapper1.set_timestamp_callback(
964 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -0800965
966 mapper0.AddPeer(&mapper1);
967 mapper1.AddPeer(&mapper0);
968
969 {
970 std::deque<TimestampedMessage> output0;
971
Austin Schuh79b30942021-01-24 22:32:21 -0800972 EXPECT_EQ(mapper0_count, 0u);
973 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800974 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800975 EXPECT_EQ(mapper0_count, 1u);
976 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800977 output0.emplace_back(std::move(*mapper0.Front()));
978 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700979 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800980 EXPECT_EQ(mapper0_count, 1u);
981 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800982
983 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800984 EXPECT_EQ(mapper0_count, 2u);
985 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800986 output0.emplace_back(std::move(*mapper0.Front()));
987 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700988 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800989
990 ASSERT_TRUE(mapper0.Front() != nullptr);
991 output0.emplace_back(std::move(*mapper0.Front()));
992 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700993 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800994
Austin Schuh79b30942021-01-24 22:32:21 -0800995 EXPECT_EQ(mapper0_count, 3u);
996 EXPECT_EQ(mapper1_count, 0u);
997
Austin Schuhd2f96102020-12-01 20:27:29 -0800998 ASSERT_TRUE(mapper0.Front() == nullptr);
999
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001000 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1001 EXPECT_EQ(output0[0].monotonic_event_time.time,
1002 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001003 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001004
1005 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1006 EXPECT_EQ(output0[1].monotonic_event_time.time,
1007 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001008 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001009
1010 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1011 EXPECT_EQ(output0[2].monotonic_event_time.time,
1012 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001013 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001014 }
1015
1016 {
1017 SCOPED_TRACE("Trying node1 now");
1018 std::deque<TimestampedMessage> output1;
1019
Austin Schuh79b30942021-01-24 22:32:21 -08001020 EXPECT_EQ(mapper0_count, 3u);
1021 EXPECT_EQ(mapper1_count, 0u);
1022
Austin Schuhd2f96102020-12-01 20:27:29 -08001023 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001024 EXPECT_EQ(mapper0_count, 3u);
1025 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001026 output1.emplace_back(std::move(*mapper1.Front()));
1027 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001028 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -08001029 EXPECT_EQ(mapper0_count, 3u);
1030 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001031
1032 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001033 EXPECT_EQ(mapper0_count, 3u);
1034 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001035 output1.emplace_back(std::move(*mapper1.Front()));
1036 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001037 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001038
1039 ASSERT_TRUE(mapper1.Front() != nullptr);
1040 output1.emplace_back(std::move(*mapper1.Front()));
1041 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001042 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001043
Austin Schuh79b30942021-01-24 22:32:21 -08001044 EXPECT_EQ(mapper0_count, 3u);
1045 EXPECT_EQ(mapper1_count, 3u);
1046
Austin Schuhd2f96102020-12-01 20:27:29 -08001047 ASSERT_TRUE(mapper1.Front() == nullptr);
1048
Austin Schuh79b30942021-01-24 22:32:21 -08001049 EXPECT_EQ(mapper0_count, 3u);
1050 EXPECT_EQ(mapper1_count, 3u);
1051
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001052 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1053 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001054 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001055 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001056
1057 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1058 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001059 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001060 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001061
1062 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1063 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001064 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001065 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001066 }
1067}
1068
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001069// Tests that we filter messages using the channel filter callback
1070TEST_F(TimestampMapperTest, ReplayChannelsCallbackTest) {
1071 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1072 {
1073 TestDetachedBufferWriter writer0(logfile0_);
1074 writer0.QueueSpan(config0_.span());
1075 TestDetachedBufferWriter writer1(logfile1_);
1076 writer1.QueueSpan(config2_.span());
1077
1078 writer0.WriteSizedFlatbuffer(
1079 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1080 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1081 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1082
1083 writer0.WriteSizedFlatbuffer(
1084 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1085 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1086 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1087
1088 writer0.WriteSizedFlatbuffer(
1089 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1090 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1091 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1092 }
1093
1094 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001095 LogFilesContainer log_files(parts);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001096 ASSERT_EQ(parts[0].logger_node, "pi1");
1097 ASSERT_EQ(parts[1].logger_node, "pi2");
1098
1099 // mapper0 will not provide any messages while mapper1 will provide all
1100 // messages due to the channel filter callbacks used
1101 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001102
Austin Schuh63097262023-08-16 17:04:29 -07001103 TimestampMapper mapper0("pi1", log_files,
1104 TimestampQueueStrategy::kQueueTogether);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001105 mapper0.set_timestamp_callback(
1106 [&](TimestampedMessage *) { ++mapper0_count; });
1107 mapper0.set_replay_channels_callback(
1108 [&](const TimestampedMessage &) -> bool { return mapper0_count != 2; });
1109 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001110 TimestampMapper mapper1("pi2", log_files,
1111 TimestampQueueStrategy::kQueueTogether);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001112 mapper1.set_timestamp_callback(
1113 [&](TimestampedMessage *) { ++mapper1_count; });
1114 mapper1.set_replay_channels_callback(
1115 [&](const TimestampedMessage &) -> bool { return mapper1_count != 2; });
1116
1117 mapper0.AddPeer(&mapper1);
1118 mapper1.AddPeer(&mapper0);
1119
1120 {
1121 std::deque<TimestampedMessage> output0;
1122
1123 EXPECT_EQ(mapper0_count, 0u);
1124 EXPECT_EQ(mapper1_count, 0u);
1125
1126 ASSERT_TRUE(mapper0.Front() != nullptr);
1127 EXPECT_EQ(mapper0_count, 1u);
1128 EXPECT_EQ(mapper1_count, 0u);
1129 output0.emplace_back(std::move(*mapper0.Front()));
1130 mapper0.PopFront();
1131
1132 EXPECT_TRUE(mapper0.started());
1133 EXPECT_EQ(mapper0_count, 1u);
1134 EXPECT_EQ(mapper1_count, 0u);
1135
1136 // mapper0_count is now at 3 since the second message is not queued, but
1137 // timestamp_callback needs to be called everytime even if Front() does not
1138 // provide a message due to the replay_channels_callback.
1139 ASSERT_TRUE(mapper0.Front() != nullptr);
1140 EXPECT_EQ(mapper0_count, 3u);
1141 EXPECT_EQ(mapper1_count, 0u);
1142 output0.emplace_back(std::move(*mapper0.Front()));
1143 mapper0.PopFront();
1144
1145 EXPECT_TRUE(mapper0.started());
1146 EXPECT_EQ(mapper0_count, 3u);
1147 EXPECT_EQ(mapper1_count, 0u);
1148
1149 ASSERT_TRUE(mapper0.Front() == nullptr);
1150 EXPECT_TRUE(mapper0.started());
1151
1152 EXPECT_EQ(mapper0_count, 3u);
1153 EXPECT_EQ(mapper1_count, 0u);
1154
1155 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1156 EXPECT_EQ(output0[0].monotonic_event_time.time,
1157 e + chrono::milliseconds(1000));
1158 EXPECT_TRUE(output0[0].data != nullptr);
1159
1160 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1161 EXPECT_EQ(output0[1].monotonic_event_time.time,
1162 e + chrono::milliseconds(3000));
1163 EXPECT_TRUE(output0[1].data != nullptr);
1164 }
1165
1166 {
1167 SCOPED_TRACE("Trying node1 now");
1168 std::deque<TimestampedMessage> output1;
1169
1170 EXPECT_EQ(mapper0_count, 3u);
1171 EXPECT_EQ(mapper1_count, 0u);
1172
1173 ASSERT_TRUE(mapper1.Front() != nullptr);
1174 EXPECT_EQ(mapper0_count, 3u);
1175 EXPECT_EQ(mapper1_count, 1u);
1176 output1.emplace_back(std::move(*mapper1.Front()));
1177 mapper1.PopFront();
1178 EXPECT_TRUE(mapper1.started());
1179 EXPECT_EQ(mapper0_count, 3u);
1180 EXPECT_EQ(mapper1_count, 1u);
1181
1182 // mapper1_count is now at 3 since the second message is not queued, but
1183 // timestamp_callback needs to be called everytime even if Front() does not
1184 // provide a message due to the replay_channels_callback.
1185 ASSERT_TRUE(mapper1.Front() != nullptr);
1186 output1.emplace_back(std::move(*mapper1.Front()));
1187 mapper1.PopFront();
1188 EXPECT_TRUE(mapper1.started());
1189
1190 EXPECT_EQ(mapper0_count, 3u);
1191 EXPECT_EQ(mapper1_count, 3u);
1192
1193 ASSERT_TRUE(mapper1.Front() == nullptr);
1194
1195 EXPECT_EQ(mapper0_count, 3u);
1196 EXPECT_EQ(mapper1_count, 3u);
1197
1198 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1199 EXPECT_EQ(output1[0].monotonic_event_time.time,
1200 e + chrono::seconds(100) + chrono::milliseconds(1000));
1201 EXPECT_TRUE(output1[0].data != nullptr);
1202
1203 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1204 EXPECT_EQ(output1[1].monotonic_event_time.time,
1205 e + chrono::seconds(100) + chrono::milliseconds(3000));
1206 EXPECT_TRUE(output1[1].data != nullptr);
1207 }
1208}
Austin Schuh8bf1e632021-01-02 22:41:04 -08001209// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
1210// returned.
1211TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
1212 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1213 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001214 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001215 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001216 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001217 writer1.QueueSpan(config4_.span());
1218
Austin Schuhd863e6e2022-10-16 15:44:50 -07001219 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001220 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001221 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001222 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
1223 e + chrono::nanoseconds(971)));
1224
Austin Schuhd863e6e2022-10-16 15:44:50 -07001225 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001226 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001227 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001228 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
1229 e + chrono::nanoseconds(5458)));
1230
Austin Schuhd863e6e2022-10-16 15:44:50 -07001231 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001232 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001233 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001234 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1235 }
1236
1237 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001238 LogFilesContainer log_files(parts);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001239 ASSERT_EQ(parts.size(), 1u);
1240
Austin Schuh79b30942021-01-24 22:32:21 -08001241 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001242 TimestampMapper mapper0("pi1", log_files,
1243 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001244 mapper0.set_timestamp_callback(
1245 [&](TimestampedMessage *) { ++mapper0_count; });
1246 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001247 TimestampMapper mapper1("pi2", log_files,
1248 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001249 mapper1.set_timestamp_callback(
1250 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -08001251
1252 mapper0.AddPeer(&mapper1);
1253 mapper1.AddPeer(&mapper0);
1254
1255 {
1256 std::deque<TimestampedMessage> output0;
1257
1258 for (int i = 0; i < 3; ++i) {
1259 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
1260 output0.emplace_back(std::move(*mapper0.Front()));
1261 mapper0.PopFront();
1262 }
1263
1264 ASSERT_TRUE(mapper0.Front() == nullptr);
1265
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001266 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1267 EXPECT_EQ(output0[0].monotonic_event_time.time,
1268 e + chrono::milliseconds(1000));
1269 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
1270 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
1271 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001272 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001273
1274 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1275 EXPECT_EQ(output0[1].monotonic_event_time.time,
1276 e + chrono::milliseconds(2000));
1277 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
1278 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
1279 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001280 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001281
1282 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1283 EXPECT_EQ(output0[2].monotonic_event_time.time,
1284 e + chrono::milliseconds(3000));
1285 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
1286 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
1287 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001288 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001289 }
1290
1291 {
1292 SCOPED_TRACE("Trying node1 now");
1293 std::deque<TimestampedMessage> output1;
1294
1295 for (int i = 0; i < 3; ++i) {
1296 ASSERT_TRUE(mapper1.Front() != nullptr);
1297 output1.emplace_back(std::move(*mapper1.Front()));
1298 mapper1.PopFront();
1299 }
1300
1301 ASSERT_TRUE(mapper1.Front() == nullptr);
1302
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001303 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1304 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001305 e + chrono::seconds(100) + chrono::milliseconds(1000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001306 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
1307 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001308 e + chrono::nanoseconds(971));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001309 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001310
1311 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1312 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001313 e + chrono::seconds(100) + chrono::milliseconds(2000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001314 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
1315 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001316 e + chrono::nanoseconds(5458));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001317 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001318
1319 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1320 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001321 e + chrono::seconds(100) + chrono::milliseconds(3000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001322 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
1323 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
1324 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001325 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001326 }
Austin Schuh79b30942021-01-24 22:32:21 -08001327
1328 EXPECT_EQ(mapper0_count, 3u);
1329 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001330}
1331
Austin Schuhd2f96102020-12-01 20:27:29 -08001332// Tests that we can match timestamps on delivered messages. By doing this in
1333// the reverse order, the second node needs to queue data up from the first node
1334// to find the matching timestamp.
1335TEST_F(TimestampMapperTest, ReadNode1First) {
1336 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1337 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001338 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001339 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001340 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001341 writer1.QueueSpan(config2_.span());
1342
Austin Schuhd863e6e2022-10-16 15:44:50 -07001343 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001344 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001345 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001346 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1347
Austin Schuhd863e6e2022-10-16 15:44:50 -07001348 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001349 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001350 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001351 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1352
Austin Schuhd863e6e2022-10-16 15:44:50 -07001353 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001354 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001355 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001356 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1357 }
1358
1359 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001360 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001361
1362 ASSERT_EQ(parts[0].logger_node, "pi1");
1363 ASSERT_EQ(parts[1].logger_node, "pi2");
1364
Austin Schuh79b30942021-01-24 22:32:21 -08001365 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001366 TimestampMapper mapper0("pi1", log_files,
1367 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001368 mapper0.set_timestamp_callback(
1369 [&](TimestampedMessage *) { ++mapper0_count; });
1370 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001371 TimestampMapper mapper1("pi2", log_files,
1372 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001373 mapper1.set_timestamp_callback(
1374 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001375
1376 mapper0.AddPeer(&mapper1);
1377 mapper1.AddPeer(&mapper0);
1378
1379 {
1380 SCOPED_TRACE("Trying node1 now");
1381 std::deque<TimestampedMessage> output1;
1382
1383 ASSERT_TRUE(mapper1.Front() != nullptr);
1384 output1.emplace_back(std::move(*mapper1.Front()));
1385 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001386 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001387
1388 ASSERT_TRUE(mapper1.Front() != nullptr);
1389 output1.emplace_back(std::move(*mapper1.Front()));
1390 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001391 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001392
1393 ASSERT_TRUE(mapper1.Front() != nullptr);
1394 output1.emplace_back(std::move(*mapper1.Front()));
1395 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001396 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001397
1398 ASSERT_TRUE(mapper1.Front() == nullptr);
1399
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001400 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1401 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001402 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001403 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001404
1405 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1406 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001407 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001408 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001409
1410 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1411 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001412 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001413 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001414 }
1415
1416 {
1417 std::deque<TimestampedMessage> output0;
1418
1419 ASSERT_TRUE(mapper0.Front() != nullptr);
1420 output0.emplace_back(std::move(*mapper0.Front()));
1421 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001422 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001423
1424 ASSERT_TRUE(mapper0.Front() != nullptr);
1425 output0.emplace_back(std::move(*mapper0.Front()));
1426 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001427 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001428
1429 ASSERT_TRUE(mapper0.Front() != nullptr);
1430 output0.emplace_back(std::move(*mapper0.Front()));
1431 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001432 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001433
1434 ASSERT_TRUE(mapper0.Front() == nullptr);
1435
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001436 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1437 EXPECT_EQ(output0[0].monotonic_event_time.time,
1438 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001439 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001440
1441 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1442 EXPECT_EQ(output0[1].monotonic_event_time.time,
1443 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001444 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001445
1446 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1447 EXPECT_EQ(output0[2].monotonic_event_time.time,
1448 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001449 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001450 }
Austin Schuh79b30942021-01-24 22:32:21 -08001451
1452 EXPECT_EQ(mapper0_count, 3u);
1453 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001454}
1455
1456// Tests that we return just the timestamps if we couldn't find the data and the
1457// missing data was at the beginning of the file.
1458TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1459 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1460 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001461 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001462 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001463 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001464 writer1.QueueSpan(config2_.span());
1465
1466 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001467 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001468 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1469
Austin Schuhd863e6e2022-10-16 15:44:50 -07001470 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001471 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001472 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001473 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1474
Austin Schuhd863e6e2022-10-16 15:44:50 -07001475 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001476 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001477 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001478 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1479 }
1480
1481 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001482 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001483
1484 ASSERT_EQ(parts[0].logger_node, "pi1");
1485 ASSERT_EQ(parts[1].logger_node, "pi2");
1486
Austin Schuh79b30942021-01-24 22:32:21 -08001487 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001488 TimestampMapper mapper0("pi1", log_files,
1489 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001490 mapper0.set_timestamp_callback(
1491 [&](TimestampedMessage *) { ++mapper0_count; });
1492 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001493 TimestampMapper mapper1("pi2", log_files,
1494 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001495 mapper1.set_timestamp_callback(
1496 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001497
1498 mapper0.AddPeer(&mapper1);
1499 mapper1.AddPeer(&mapper0);
1500
1501 {
1502 SCOPED_TRACE("Trying node1 now");
1503 std::deque<TimestampedMessage> output1;
1504
1505 ASSERT_TRUE(mapper1.Front() != nullptr);
1506 output1.emplace_back(std::move(*mapper1.Front()));
1507 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001508 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001509
1510 ASSERT_TRUE(mapper1.Front() != nullptr);
1511 output1.emplace_back(std::move(*mapper1.Front()));
1512 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001513 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001514
1515 ASSERT_TRUE(mapper1.Front() != nullptr);
1516 output1.emplace_back(std::move(*mapper1.Front()));
1517 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001518 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001519
1520 ASSERT_TRUE(mapper1.Front() == nullptr);
1521
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001522 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1523 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001524 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001525 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001526
1527 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1528 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001529 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001530 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001531
1532 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1533 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001534 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001535 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001536 }
Austin Schuh79b30942021-01-24 22:32:21 -08001537
1538 EXPECT_EQ(mapper0_count, 0u);
1539 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001540}
1541
1542// Tests that we return just the timestamps if we couldn't find the data and the
1543// missing data was at the end of the file.
1544TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1545 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1546 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001547 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001548 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001549 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001550 writer1.QueueSpan(config2_.span());
1551
Austin Schuhd863e6e2022-10-16 15:44:50 -07001552 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001553 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001554 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001555 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1556
Austin Schuhd863e6e2022-10-16 15:44:50 -07001557 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001558 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001559 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001560 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1561
1562 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001563 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001564 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1565 }
1566
1567 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001568 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001569
1570 ASSERT_EQ(parts[0].logger_node, "pi1");
1571 ASSERT_EQ(parts[1].logger_node, "pi2");
1572
Austin Schuh79b30942021-01-24 22:32:21 -08001573 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001574 TimestampMapper mapper0("pi1", log_files,
1575 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001576 mapper0.set_timestamp_callback(
1577 [&](TimestampedMessage *) { ++mapper0_count; });
1578 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001579 TimestampMapper mapper1("pi2", log_files,
1580 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001581 mapper1.set_timestamp_callback(
1582 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001583
1584 mapper0.AddPeer(&mapper1);
1585 mapper1.AddPeer(&mapper0);
1586
1587 {
1588 SCOPED_TRACE("Trying node1 now");
1589 std::deque<TimestampedMessage> output1;
1590
1591 ASSERT_TRUE(mapper1.Front() != nullptr);
1592 output1.emplace_back(std::move(*mapper1.Front()));
1593 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001594 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001595
1596 ASSERT_TRUE(mapper1.Front() != nullptr);
1597 output1.emplace_back(std::move(*mapper1.Front()));
1598 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001599 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001600
1601 ASSERT_TRUE(mapper1.Front() != nullptr);
1602 output1.emplace_back(std::move(*mapper1.Front()));
1603 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001604 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001605
1606 ASSERT_TRUE(mapper1.Front() == nullptr);
1607
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001608 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1609 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001610 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001611 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001612
1613 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1614 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001615 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001616 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001617
1618 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1619 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001620 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001621 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001622 }
Austin Schuh79b30942021-01-24 22:32:21 -08001623
1624 EXPECT_EQ(mapper0_count, 0u);
1625 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001626}
1627
Austin Schuh993ccb52020-12-12 15:59:32 -08001628// Tests that we handle a message which failed to forward or be logged.
1629TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1630 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1631 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001632 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001633 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001634 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001635 writer1.QueueSpan(config2_.span());
1636
Austin Schuhd863e6e2022-10-16 15:44:50 -07001637 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001638 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001639 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001640 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1641
1642 // Create both the timestamp and message, but don't log them, simulating a
1643 // forwarding drop.
1644 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1645 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1646 chrono::seconds(100));
1647
Austin Schuhd863e6e2022-10-16 15:44:50 -07001648 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001649 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001650 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001651 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1652 }
1653
1654 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001655 LogFilesContainer log_files(parts);
Austin Schuh993ccb52020-12-12 15:59:32 -08001656
1657 ASSERT_EQ(parts[0].logger_node, "pi1");
1658 ASSERT_EQ(parts[1].logger_node, "pi2");
1659
Austin Schuh79b30942021-01-24 22:32:21 -08001660 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001661 TimestampMapper mapper0("pi1", log_files,
1662 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001663 mapper0.set_timestamp_callback(
1664 [&](TimestampedMessage *) { ++mapper0_count; });
1665 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001666 TimestampMapper mapper1("pi2", log_files,
1667 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001668 mapper1.set_timestamp_callback(
1669 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001670
1671 mapper0.AddPeer(&mapper1);
1672 mapper1.AddPeer(&mapper0);
1673
1674 {
1675 std::deque<TimestampedMessage> output1;
1676
1677 ASSERT_TRUE(mapper1.Front() != nullptr);
1678 output1.emplace_back(std::move(*mapper1.Front()));
1679 mapper1.PopFront();
1680
1681 ASSERT_TRUE(mapper1.Front() != nullptr);
1682 output1.emplace_back(std::move(*mapper1.Front()));
1683
1684 ASSERT_FALSE(mapper1.Front() == nullptr);
1685
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001686 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1687 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001688 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001689 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001690
1691 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1692 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001693 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001694 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh993ccb52020-12-12 15:59:32 -08001695 }
Austin Schuh79b30942021-01-24 22:32:21 -08001696
1697 EXPECT_EQ(mapper0_count, 0u);
1698 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001699}
1700
Austin Schuhd2f96102020-12-01 20:27:29 -08001701// Tests that we properly sort log files with duplicate timestamps.
1702TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1703 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1704 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001705 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001706 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001707 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001708 writer1.QueueSpan(config2_.span());
1709
Austin Schuhd863e6e2022-10-16 15:44:50 -07001710 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001711 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001712 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001713 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1714
Austin Schuhd863e6e2022-10-16 15:44:50 -07001715 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001716 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001717 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001718 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1719
Austin Schuhd863e6e2022-10-16 15:44:50 -07001720 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001721 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001722 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001723 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1724
Austin Schuhd863e6e2022-10-16 15:44:50 -07001725 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001726 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001727 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001728 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1729 }
1730
1731 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001732 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001733
1734 ASSERT_EQ(parts[0].logger_node, "pi1");
1735 ASSERT_EQ(parts[1].logger_node, "pi2");
1736
Austin Schuh79b30942021-01-24 22:32:21 -08001737 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001738 TimestampMapper mapper0("pi1", log_files,
1739 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001740 mapper0.set_timestamp_callback(
1741 [&](TimestampedMessage *) { ++mapper0_count; });
1742 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001743 TimestampMapper mapper1("pi2", log_files,
1744 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001745 mapper1.set_timestamp_callback(
1746 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001747
1748 mapper0.AddPeer(&mapper1);
1749 mapper1.AddPeer(&mapper0);
1750
1751 {
1752 SCOPED_TRACE("Trying node1 now");
1753 std::deque<TimestampedMessage> output1;
1754
1755 for (int i = 0; i < 4; ++i) {
1756 ASSERT_TRUE(mapper1.Front() != nullptr);
1757 output1.emplace_back(std::move(*mapper1.Front()));
1758 mapper1.PopFront();
1759 }
1760 ASSERT_TRUE(mapper1.Front() == nullptr);
1761
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001762 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1763 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001764 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001765 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001766
1767 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1768 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001769 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001770 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001771
1772 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1773 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001774 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001775 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001776
1777 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1778 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001779 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001780 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001781 }
Austin Schuh79b30942021-01-24 22:32:21 -08001782
1783 EXPECT_EQ(mapper0_count, 0u);
1784 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001785}
1786
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001787// Tests that we properly produce a valid start time.
Austin Schuhd2f96102020-12-01 20:27:29 -08001788TEST_F(TimestampMapperTest, StartTime) {
1789 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1790 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001791 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001792 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001793 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001794 writer1.QueueSpan(config1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001795 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001796 writer2.QueueSpan(config3_.span());
1797 }
1798
1799 const std::vector<LogFile> parts =
1800 SortParts({logfile0_, logfile1_, logfile2_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001801 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001802
Austin Schuh79b30942021-01-24 22:32:21 -08001803 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001804 TimestampMapper mapper0("pi1", log_files,
1805 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001806 mapper0.set_timestamp_callback(
1807 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001808
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001809 EXPECT_EQ(mapper0.monotonic_start_time(0), e + chrono::milliseconds(1));
1810 EXPECT_EQ(mapper0.realtime_start_time(0),
Austin Schuhd2f96102020-12-01 20:27:29 -08001811 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001812 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001813}
1814
Austin Schuhfecf1d82020-12-19 16:57:28 -08001815// Tests that when a peer isn't registered, we treat that as if there was no
1816// data available.
1817TEST_F(TimestampMapperTest, NoPeer) {
1818 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1819 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001820 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001821 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001822 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001823 writer1.QueueSpan(config2_.span());
1824
1825 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001826 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001827 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1828
Austin Schuhd863e6e2022-10-16 15:44:50 -07001829 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001830 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001831 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001832 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1833
Austin Schuhd863e6e2022-10-16 15:44:50 -07001834 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001835 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001836 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001837 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1838 }
1839
1840 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001841 LogFilesContainer log_files(parts);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001842
1843 ASSERT_EQ(parts[0].logger_node, "pi1");
1844 ASSERT_EQ(parts[1].logger_node, "pi2");
1845
Austin Schuh79b30942021-01-24 22:32:21 -08001846 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001847 TimestampMapper mapper1("pi2", log_files,
1848 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001849 mapper1.set_timestamp_callback(
1850 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001851
1852 {
1853 std::deque<TimestampedMessage> output1;
1854
1855 ASSERT_TRUE(mapper1.Front() != nullptr);
1856 output1.emplace_back(std::move(*mapper1.Front()));
1857 mapper1.PopFront();
1858 ASSERT_TRUE(mapper1.Front() != nullptr);
1859 output1.emplace_back(std::move(*mapper1.Front()));
1860 mapper1.PopFront();
1861 ASSERT_TRUE(mapper1.Front() != nullptr);
1862 output1.emplace_back(std::move(*mapper1.Front()));
1863 mapper1.PopFront();
1864 ASSERT_TRUE(mapper1.Front() == nullptr);
1865
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001866 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1867 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001868 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001869 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001870
1871 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1872 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001873 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001874 EXPECT_FALSE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001875
1876 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1877 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001878 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001879 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001880 }
Austin Schuh79b30942021-01-24 22:32:21 -08001881 EXPECT_EQ(mapper1_count, 3u);
1882}
1883
1884// Tests that we can queue messages and call the timestamp callback for both
1885// nodes.
1886TEST_F(TimestampMapperTest, QueueUntilNode0) {
1887 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1888 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001889 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh79b30942021-01-24 22:32:21 -08001890 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001891 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh79b30942021-01-24 22:32:21 -08001892 writer1.QueueSpan(config2_.span());
1893
Austin Schuhd863e6e2022-10-16 15:44:50 -07001894 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001895 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001896 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001897 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1898
Austin Schuhd863e6e2022-10-16 15:44:50 -07001899 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001900 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001901 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001902 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1903
Austin Schuhd863e6e2022-10-16 15:44:50 -07001904 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001905 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001906 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001907 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1908
Austin Schuhd863e6e2022-10-16 15:44:50 -07001909 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001910 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001911 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001912 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1913 }
1914
1915 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001916 LogFilesContainer log_files(parts);
Austin Schuh79b30942021-01-24 22:32:21 -08001917
1918 ASSERT_EQ(parts[0].logger_node, "pi1");
1919 ASSERT_EQ(parts[1].logger_node, "pi2");
1920
1921 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001922 TimestampMapper mapper0("pi1", log_files,
1923 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001924 mapper0.set_timestamp_callback(
1925 [&](TimestampedMessage *) { ++mapper0_count; });
1926 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001927 TimestampMapper mapper1("pi2", log_files,
1928 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001929 mapper1.set_timestamp_callback(
1930 [&](TimestampedMessage *) { ++mapper1_count; });
1931
1932 mapper0.AddPeer(&mapper1);
1933 mapper1.AddPeer(&mapper0);
1934
1935 {
1936 std::deque<TimestampedMessage> output0;
1937
1938 EXPECT_EQ(mapper0_count, 0u);
1939 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001940 mapper0.QueueUntil(
1941 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001942 EXPECT_EQ(mapper0_count, 3u);
1943 EXPECT_EQ(mapper1_count, 0u);
1944
1945 ASSERT_TRUE(mapper0.Front() != nullptr);
1946 EXPECT_EQ(mapper0_count, 3u);
1947 EXPECT_EQ(mapper1_count, 0u);
1948
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001949 mapper0.QueueUntil(
1950 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001951 EXPECT_EQ(mapper0_count, 3u);
1952 EXPECT_EQ(mapper1_count, 0u);
1953
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001954 mapper0.QueueUntil(
1955 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001956 EXPECT_EQ(mapper0_count, 4u);
1957 EXPECT_EQ(mapper1_count, 0u);
1958
1959 output0.emplace_back(std::move(*mapper0.Front()));
1960 mapper0.PopFront();
1961 output0.emplace_back(std::move(*mapper0.Front()));
1962 mapper0.PopFront();
1963 output0.emplace_back(std::move(*mapper0.Front()));
1964 mapper0.PopFront();
1965 output0.emplace_back(std::move(*mapper0.Front()));
1966 mapper0.PopFront();
1967
1968 EXPECT_EQ(mapper0_count, 4u);
1969 EXPECT_EQ(mapper1_count, 0u);
1970
1971 ASSERT_TRUE(mapper0.Front() == nullptr);
1972
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001973 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1974 EXPECT_EQ(output0[0].monotonic_event_time.time,
1975 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001976 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001977
1978 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1979 EXPECT_EQ(output0[1].monotonic_event_time.time,
1980 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001981 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001982
1983 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1984 EXPECT_EQ(output0[2].monotonic_event_time.time,
1985 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001986 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001987
1988 EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
1989 EXPECT_EQ(output0[3].monotonic_event_time.time,
1990 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001991 EXPECT_TRUE(output0[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001992 }
1993
1994 {
1995 SCOPED_TRACE("Trying node1 now");
1996 std::deque<TimestampedMessage> output1;
1997
1998 EXPECT_EQ(mapper0_count, 4u);
1999 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002000 mapper1.QueueUntil(BootTimestamp{
2001 .boot = 0,
2002 .time = e + chrono::seconds(100) + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08002003 EXPECT_EQ(mapper0_count, 4u);
2004 EXPECT_EQ(mapper1_count, 3u);
2005
2006 ASSERT_TRUE(mapper1.Front() != nullptr);
2007 EXPECT_EQ(mapper0_count, 4u);
2008 EXPECT_EQ(mapper1_count, 3u);
2009
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002010 mapper1.QueueUntil(BootTimestamp{
2011 .boot = 0,
2012 .time = e + chrono::seconds(100) + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08002013 EXPECT_EQ(mapper0_count, 4u);
2014 EXPECT_EQ(mapper1_count, 3u);
2015
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002016 mapper1.QueueUntil(BootTimestamp{
2017 .boot = 0,
2018 .time = e + chrono::seconds(100) + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08002019 EXPECT_EQ(mapper0_count, 4u);
2020 EXPECT_EQ(mapper1_count, 4u);
2021
2022 ASSERT_TRUE(mapper1.Front() != nullptr);
2023 EXPECT_EQ(mapper0_count, 4u);
2024 EXPECT_EQ(mapper1_count, 4u);
2025
2026 output1.emplace_back(std::move(*mapper1.Front()));
2027 mapper1.PopFront();
2028 ASSERT_TRUE(mapper1.Front() != nullptr);
2029 output1.emplace_back(std::move(*mapper1.Front()));
2030 mapper1.PopFront();
2031 ASSERT_TRUE(mapper1.Front() != nullptr);
2032 output1.emplace_back(std::move(*mapper1.Front()));
2033 mapper1.PopFront();
2034 ASSERT_TRUE(mapper1.Front() != nullptr);
2035 output1.emplace_back(std::move(*mapper1.Front()));
2036 mapper1.PopFront();
2037
2038 EXPECT_EQ(mapper0_count, 4u);
2039 EXPECT_EQ(mapper1_count, 4u);
2040
2041 ASSERT_TRUE(mapper1.Front() == nullptr);
2042
2043 EXPECT_EQ(mapper0_count, 4u);
2044 EXPECT_EQ(mapper1_count, 4u);
2045
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002046 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2047 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002048 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002049 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002050
2051 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
2052 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002053 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002054 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002055
2056 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
2057 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002058 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002059 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002060
2061 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
2062 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002063 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002064 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08002065 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08002066}
2067
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002068class BootMergerTest : public SortingElementTest {
2069 public:
2070 BootMergerTest()
2071 : SortingElementTest(),
2072 boot0_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002073 /* 100ms */
2074 "max_out_of_order_duration": 100000000,
2075 "node": {
2076 "name": "pi2"
2077 },
2078 "logger_node": {
2079 "name": "pi1"
2080 },
2081 "monotonic_start_time": 1000000,
2082 "realtime_start_time": 1000000000000,
2083 "logger_monotonic_start_time": 1000000,
2084 "logger_realtime_start_time": 1000000000000,
2085 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2086 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
2087 "parts_index": 0,
2088 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2089 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002090 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2091 "boot_uuids": [
2092 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2093 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2094 ""
2095 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002096})")),
2097 boot1_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002098 /* 100ms */
2099 "max_out_of_order_duration": 100000000,
2100 "node": {
2101 "name": "pi2"
2102 },
2103 "logger_node": {
2104 "name": "pi1"
2105 },
2106 "monotonic_start_time": 1000000,
2107 "realtime_start_time": 1000000000000,
2108 "logger_monotonic_start_time": 1000000,
2109 "logger_realtime_start_time": 1000000000000,
2110 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2111 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
2112 "parts_index": 1,
2113 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2114 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002115 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2116 "boot_uuids": [
2117 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2118 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2119 ""
2120 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002121})")) {}
Austin Schuh8bee6882021-06-28 21:03:28 -07002122
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002123 protected:
2124 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
2125 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
2126};
2127
2128// This tests that we can properly sort a multi-node log file which has the old
2129// (and buggy) timestamps in the header, and the non-resetting parts_index.
2130// These make it so we can just bairly figure out what happened first and what
2131// happened second, but not in a way that is robust to multiple nodes rebooting.
2132TEST_F(BootMergerTest, OldReboot) {
Austin Schuh8bee6882021-06-28 21:03:28 -07002133 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002134 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002135 writer.QueueSpan(boot0_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002136 }
2137 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002138 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002139 writer.QueueSpan(boot1_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002140 }
2141
2142 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
2143
2144 ASSERT_EQ(parts.size(), 1u);
2145 ASSERT_EQ(parts[0].parts.size(), 2u);
2146
2147 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
2148 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002149 boot0_.message().source_node_boot_uuid()->string_view());
Austin Schuh8bee6882021-06-28 21:03:28 -07002150
2151 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
2152 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002153 boot1_.message().source_node_boot_uuid()->string_view());
2154}
2155
2156// This tests that we can produce messages ordered across a reboot.
2157TEST_F(BootMergerTest, SortAcrossReboot) {
2158 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2159 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002160 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002161 writer.QueueSpan(boot0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002162 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002163 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002164 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002165 MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
2166 }
2167 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002168 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002169 writer.QueueSpan(boot1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002170 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002171 MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002172 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002173 MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
2174 }
2175
2176 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07002177 LogFilesContainer log_files(parts);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002178 ASSERT_EQ(parts.size(), 1u);
2179 ASSERT_EQ(parts[0].parts.size(), 2u);
2180
Austin Schuh63097262023-08-16 17:04:29 -07002181 BootMerger merger("pi2", log_files,
2182 {StoredDataType::DATA, StoredDataType::TIMESTAMPS,
2183 StoredDataType::REMOTE_TIMESTAMPS});
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002184
2185 EXPECT_EQ(merger.node(), 1u);
2186
2187 std::vector<Message> output;
2188 for (int i = 0; i < 4; ++i) {
2189 ASSERT_TRUE(merger.Front() != nullptr);
2190 output.emplace_back(std::move(*merger.Front()));
2191 merger.PopFront();
2192 }
2193
2194 ASSERT_TRUE(merger.Front() == nullptr);
2195
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002196 EXPECT_EQ(output[0].timestamp.boot, 0u);
2197 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
2198 EXPECT_EQ(output[1].timestamp.boot, 0u);
2199 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(2000));
2200
2201 EXPECT_EQ(output[2].timestamp.boot, 1u);
2202 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(100));
2203 EXPECT_EQ(output[3].timestamp.boot, 1u);
2204 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
Austin Schuh8bee6882021-06-28 21:03:28 -07002205}
2206
Austin Schuh48507722021-07-17 17:29:24 -07002207class RebootTimestampMapperTest : public SortingElementTest {
2208 public:
2209 RebootTimestampMapperTest()
2210 : SortingElementTest(),
2211 boot0a_(MakeHeader(config_, R"({
2212 /* 100ms */
2213 "max_out_of_order_duration": 100000000,
2214 "node": {
2215 "name": "pi1"
2216 },
2217 "logger_node": {
2218 "name": "pi1"
2219 },
2220 "monotonic_start_time": 1000000,
2221 "realtime_start_time": 1000000000000,
2222 "logger_monotonic_start_time": 1000000,
2223 "logger_realtime_start_time": 1000000000000,
2224 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2225 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2226 "parts_index": 0,
2227 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2228 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2229 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2230 "boot_uuids": [
2231 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2232 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2233 ""
2234 ]
2235})")),
2236 boot0b_(MakeHeader(config_, R"({
2237 /* 100ms */
2238 "max_out_of_order_duration": 100000000,
2239 "node": {
2240 "name": "pi1"
2241 },
2242 "logger_node": {
2243 "name": "pi1"
2244 },
2245 "monotonic_start_time": 1000000,
2246 "realtime_start_time": 1000000000000,
2247 "logger_monotonic_start_time": 1000000,
2248 "logger_realtime_start_time": 1000000000000,
2249 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2250 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2251 "parts_index": 1,
2252 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2253 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2254 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2255 "boot_uuids": [
2256 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2257 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2258 ""
2259 ]
2260})")),
2261 boot1a_(MakeHeader(config_, R"({
2262 /* 100ms */
2263 "max_out_of_order_duration": 100000000,
2264 "node": {
2265 "name": "pi2"
2266 },
2267 "logger_node": {
2268 "name": "pi1"
2269 },
2270 "monotonic_start_time": 1000000,
2271 "realtime_start_time": 1000000000000,
2272 "logger_monotonic_start_time": 1000000,
2273 "logger_realtime_start_time": 1000000000000,
2274 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2275 "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
2276 "parts_index": 0,
2277 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2278 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2279 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2280 "boot_uuids": [
2281 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2282 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2283 ""
2284 ]
2285})")),
2286 boot1b_(MakeHeader(config_, R"({
2287 /* 100ms */
2288 "max_out_of_order_duration": 100000000,
2289 "node": {
2290 "name": "pi2"
2291 },
2292 "logger_node": {
2293 "name": "pi1"
2294 },
2295 "monotonic_start_time": 1000000,
2296 "realtime_start_time": 1000000000000,
2297 "logger_monotonic_start_time": 1000000,
2298 "logger_realtime_start_time": 1000000000000,
2299 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2300 "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
2301 "parts_index": 1,
2302 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2303 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2304 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2305 "boot_uuids": [
2306 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2307 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2308 ""
2309 ]
2310})")) {}
2311
2312 protected:
2313 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
2314 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
2315 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
2316 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
2317};
2318
Austin Schuh48507722021-07-17 17:29:24 -07002319// Tests that we can match timestamps on delivered messages in the presence of
2320// reboots on the node receiving timestamps.
2321TEST_F(RebootTimestampMapperTest, ReadNode0First) {
2322 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2323 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002324 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002325 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002326 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002327 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002328 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002329 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002330 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002331 writer1b.QueueSpan(boot1b_.span());
2332
Austin Schuhd863e6e2022-10-16 15:44:50 -07002333 writer0a.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002334 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002335 writer1a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002336 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
2337 e + chrono::milliseconds(1001)));
2338
Austin Schuhd863e6e2022-10-16 15:44:50 -07002339 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh58646e22021-08-23 23:51:46 -07002340 e + chrono::milliseconds(1000), 0, chrono::seconds(21),
2341 e + chrono::milliseconds(2001)));
2342
Austin Schuhd863e6e2022-10-16 15:44:50 -07002343 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002344 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002345 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002346 e + chrono::milliseconds(2000), 0, chrono::seconds(20),
2347 e + chrono::milliseconds(2001)));
2348
Austin Schuhd863e6e2022-10-16 15:44:50 -07002349 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002350 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002351 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002352 e + chrono::milliseconds(3000), 0, chrono::seconds(20),
2353 e + chrono::milliseconds(3001)));
2354 }
2355
Austin Schuh58646e22021-08-23 23:51:46 -07002356 const std::vector<LogFile> parts =
2357 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07002358 LogFilesContainer log_files(parts);
Austin Schuh48507722021-07-17 17:29:24 -07002359
2360 for (const auto &x : parts) {
2361 LOG(INFO) << x;
2362 }
2363 ASSERT_EQ(parts.size(), 1u);
2364 ASSERT_EQ(parts[0].logger_node, "pi1");
2365
2366 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07002367 TimestampMapper mapper0("pi1", log_files,
2368 TimestampQueueStrategy::kQueueTogether);
Austin Schuh48507722021-07-17 17:29:24 -07002369 mapper0.set_timestamp_callback(
2370 [&](TimestampedMessage *) { ++mapper0_count; });
2371 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07002372 TimestampMapper mapper1("pi2", log_files,
2373 TimestampQueueStrategy::kQueueTogether);
Austin Schuh48507722021-07-17 17:29:24 -07002374 mapper1.set_timestamp_callback(
2375 [&](TimestampedMessage *) { ++mapper1_count; });
2376
2377 mapper0.AddPeer(&mapper1);
2378 mapper1.AddPeer(&mapper0);
2379
2380 {
2381 std::deque<TimestampedMessage> output0;
2382
2383 EXPECT_EQ(mapper0_count, 0u);
2384 EXPECT_EQ(mapper1_count, 0u);
2385 ASSERT_TRUE(mapper0.Front() != nullptr);
2386 EXPECT_EQ(mapper0_count, 1u);
2387 EXPECT_EQ(mapper1_count, 0u);
2388 output0.emplace_back(std::move(*mapper0.Front()));
2389 mapper0.PopFront();
2390 EXPECT_TRUE(mapper0.started());
2391 EXPECT_EQ(mapper0_count, 1u);
2392 EXPECT_EQ(mapper1_count, 0u);
2393
2394 ASSERT_TRUE(mapper0.Front() != nullptr);
2395 EXPECT_EQ(mapper0_count, 2u);
2396 EXPECT_EQ(mapper1_count, 0u);
2397 output0.emplace_back(std::move(*mapper0.Front()));
2398 mapper0.PopFront();
2399 EXPECT_TRUE(mapper0.started());
2400
2401 ASSERT_TRUE(mapper0.Front() != nullptr);
2402 output0.emplace_back(std::move(*mapper0.Front()));
2403 mapper0.PopFront();
2404 EXPECT_TRUE(mapper0.started());
2405
2406 EXPECT_EQ(mapper0_count, 3u);
2407 EXPECT_EQ(mapper1_count, 0u);
2408
2409 ASSERT_TRUE(mapper0.Front() == nullptr);
2410
2411 LOG(INFO) << output0[0];
2412 LOG(INFO) << output0[1];
2413 LOG(INFO) << output0[2];
2414
2415 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2416 EXPECT_EQ(output0[0].monotonic_event_time.time,
2417 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002418 EXPECT_EQ(output0[0].queue_index,
2419 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002420 EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
2421 EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002422 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002423
2424 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2425 EXPECT_EQ(output0[1].monotonic_event_time.time,
2426 e + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002427 EXPECT_EQ(output0[1].queue_index,
2428 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002429 EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
2430 EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002431 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002432
2433 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2434 EXPECT_EQ(output0[2].monotonic_event_time.time,
2435 e + chrono::milliseconds(3000));
Austin Schuh58646e22021-08-23 23:51:46 -07002436 EXPECT_EQ(output0[2].queue_index,
2437 (BootQueueIndex{.boot = 0u, .index = 2u}));
Austin Schuh48507722021-07-17 17:29:24 -07002438 EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
2439 EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002440 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002441 }
2442
2443 {
2444 SCOPED_TRACE("Trying node1 now");
2445 std::deque<TimestampedMessage> output1;
2446
2447 EXPECT_EQ(mapper0_count, 3u);
2448 EXPECT_EQ(mapper1_count, 0u);
2449
2450 ASSERT_TRUE(mapper1.Front() != nullptr);
2451 EXPECT_EQ(mapper0_count, 3u);
2452 EXPECT_EQ(mapper1_count, 1u);
2453 output1.emplace_back(std::move(*mapper1.Front()));
2454 mapper1.PopFront();
2455 EXPECT_TRUE(mapper1.started());
2456 EXPECT_EQ(mapper0_count, 3u);
2457 EXPECT_EQ(mapper1_count, 1u);
2458
2459 ASSERT_TRUE(mapper1.Front() != nullptr);
2460 EXPECT_EQ(mapper0_count, 3u);
2461 EXPECT_EQ(mapper1_count, 2u);
2462 output1.emplace_back(std::move(*mapper1.Front()));
2463 mapper1.PopFront();
2464 EXPECT_TRUE(mapper1.started());
2465
2466 ASSERT_TRUE(mapper1.Front() != nullptr);
2467 output1.emplace_back(std::move(*mapper1.Front()));
2468 mapper1.PopFront();
2469 EXPECT_TRUE(mapper1.started());
2470
Austin Schuh58646e22021-08-23 23:51:46 -07002471 ASSERT_TRUE(mapper1.Front() != nullptr);
2472 output1.emplace_back(std::move(*mapper1.Front()));
2473 mapper1.PopFront();
2474 EXPECT_TRUE(mapper1.started());
2475
Austin Schuh48507722021-07-17 17:29:24 -07002476 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002477 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002478
2479 ASSERT_TRUE(mapper1.Front() == nullptr);
2480
2481 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002482 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002483
2484 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2485 EXPECT_EQ(output1[0].monotonic_event_time.time,
2486 e + chrono::seconds(100) + chrono::milliseconds(1000));
2487 EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
2488 EXPECT_EQ(output1[0].monotonic_remote_time.time,
2489 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002490 EXPECT_EQ(output1[0].remote_queue_index,
2491 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002492 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
2493 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
2494 e + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002495 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002496
2497 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2498 EXPECT_EQ(output1[1].monotonic_event_time.time,
2499 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002500 EXPECT_EQ(output1[1].remote_queue_index,
2501 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002502 EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
2503 EXPECT_EQ(output1[1].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002504 e + chrono::milliseconds(1000));
Austin Schuh48507722021-07-17 17:29:24 -07002505 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
2506 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
2507 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002508 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002509
2510 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2511 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002512 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh48507722021-07-17 17:29:24 -07002513 EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
2514 EXPECT_EQ(output1[2].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002515 e + chrono::milliseconds(2000));
2516 EXPECT_EQ(output1[2].remote_queue_index,
2517 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002518 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
2519 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002520 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002521 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002522
Austin Schuh58646e22021-08-23 23:51:46 -07002523 EXPECT_EQ(output1[3].monotonic_event_time.boot, 1u);
2524 EXPECT_EQ(output1[3].monotonic_event_time.time,
2525 e + chrono::seconds(20) + chrono::milliseconds(3000));
2526 EXPECT_EQ(output1[3].monotonic_remote_time.boot, 0u);
2527 EXPECT_EQ(output1[3].monotonic_remote_time.time,
2528 e + chrono::milliseconds(3000));
2529 EXPECT_EQ(output1[3].remote_queue_index,
2530 (BootQueueIndex{.boot = 0u, .index = 2u}));
2531 EXPECT_EQ(output1[3].monotonic_timestamp_time.boot, 0u);
2532 EXPECT_EQ(output1[3].monotonic_timestamp_time.time,
2533 e + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002534 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh58646e22021-08-23 23:51:46 -07002535
Austin Schuh48507722021-07-17 17:29:24 -07002536 LOG(INFO) << output1[0];
2537 LOG(INFO) << output1[1];
2538 LOG(INFO) << output1[2];
Austin Schuh58646e22021-08-23 23:51:46 -07002539 LOG(INFO) << output1[3];
Austin Schuh48507722021-07-17 17:29:24 -07002540 }
2541}
2542
2543TEST_F(RebootTimestampMapperTest, Node2Reboot) {
2544 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2545 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002546 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002547 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002548 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002549 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002550 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002551 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002552 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002553 writer1b.QueueSpan(boot1b_.span());
2554
Austin Schuhd863e6e2022-10-16 15:44:50 -07002555 writer1a.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002556 e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002557 writer0a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002558 e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
2559 chrono::seconds(-100),
2560 e + chrono::seconds(100) + chrono::milliseconds(1001)));
2561
Austin Schuhd863e6e2022-10-16 15:44:50 -07002562 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002563 e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002564 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002565 e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
2566 chrono::seconds(-20),
2567 e + chrono::seconds(20) + chrono::milliseconds(2001)));
2568
Austin Schuhd863e6e2022-10-16 15:44:50 -07002569 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002570 e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002571 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002572 e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
2573 chrono::seconds(-20),
2574 e + chrono::seconds(20) + chrono::milliseconds(3001)));
2575 }
2576
2577 const std::vector<LogFile> parts =
2578 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07002579 LogFilesContainer log_files(parts);
Austin Schuh48507722021-07-17 17:29:24 -07002580
2581 for (const auto &x : parts) {
2582 LOG(INFO) << x;
2583 }
2584 ASSERT_EQ(parts.size(), 1u);
2585 ASSERT_EQ(parts[0].logger_node, "pi1");
2586
2587 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07002588 TimestampMapper mapper0("pi1", log_files,
2589 TimestampQueueStrategy::kQueueTogether);
Austin Schuh48507722021-07-17 17:29:24 -07002590 mapper0.set_timestamp_callback(
2591 [&](TimestampedMessage *) { ++mapper0_count; });
2592 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07002593 TimestampMapper mapper1("pi2", log_files,
2594 TimestampQueueStrategy::kQueueTogether);
Austin Schuh48507722021-07-17 17:29:24 -07002595 mapper1.set_timestamp_callback(
2596 [&](TimestampedMessage *) { ++mapper1_count; });
2597
2598 mapper0.AddPeer(&mapper1);
2599 mapper1.AddPeer(&mapper0);
2600
2601 {
2602 std::deque<TimestampedMessage> output0;
2603
2604 EXPECT_EQ(mapper0_count, 0u);
2605 EXPECT_EQ(mapper1_count, 0u);
2606 ASSERT_TRUE(mapper0.Front() != nullptr);
2607 EXPECT_EQ(mapper0_count, 1u);
2608 EXPECT_EQ(mapper1_count, 0u);
2609 output0.emplace_back(std::move(*mapper0.Front()));
2610 mapper0.PopFront();
2611 EXPECT_TRUE(mapper0.started());
2612 EXPECT_EQ(mapper0_count, 1u);
2613 EXPECT_EQ(mapper1_count, 0u);
2614
2615 ASSERT_TRUE(mapper0.Front() != nullptr);
2616 EXPECT_EQ(mapper0_count, 2u);
2617 EXPECT_EQ(mapper1_count, 0u);
2618 output0.emplace_back(std::move(*mapper0.Front()));
2619 mapper0.PopFront();
2620 EXPECT_TRUE(mapper0.started());
2621
2622 ASSERT_TRUE(mapper0.Front() != nullptr);
2623 output0.emplace_back(std::move(*mapper0.Front()));
2624 mapper0.PopFront();
2625 EXPECT_TRUE(mapper0.started());
2626
2627 EXPECT_EQ(mapper0_count, 3u);
2628 EXPECT_EQ(mapper1_count, 0u);
2629
2630 ASSERT_TRUE(mapper0.Front() == nullptr);
2631
2632 LOG(INFO) << output0[0];
2633 LOG(INFO) << output0[1];
2634 LOG(INFO) << output0[2];
2635
2636 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2637 EXPECT_EQ(output0[0].monotonic_event_time.time,
2638 e + chrono::milliseconds(1000));
2639 EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
2640 EXPECT_EQ(output0[0].monotonic_remote_time.time,
2641 e + chrono::seconds(100) + chrono::milliseconds(1000));
2642 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
2643 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
2644 e + chrono::seconds(100) + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002645 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002646
2647 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2648 EXPECT_EQ(output0[1].monotonic_event_time.time,
2649 e + chrono::milliseconds(2000));
2650 EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
2651 EXPECT_EQ(output0[1].monotonic_remote_time.time,
2652 e + chrono::seconds(20) + chrono::milliseconds(2000));
2653 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
2654 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
2655 e + chrono::seconds(20) + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002656 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002657
2658 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2659 EXPECT_EQ(output0[2].monotonic_event_time.time,
2660 e + chrono::milliseconds(3000));
2661 EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
2662 EXPECT_EQ(output0[2].monotonic_remote_time.time,
2663 e + chrono::seconds(20) + chrono::milliseconds(3000));
2664 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
2665 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
2666 e + chrono::seconds(20) + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002667 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002668 }
2669
2670 {
2671 SCOPED_TRACE("Trying node1 now");
2672 std::deque<TimestampedMessage> output1;
2673
2674 EXPECT_EQ(mapper0_count, 3u);
2675 EXPECT_EQ(mapper1_count, 0u);
2676
2677 ASSERT_TRUE(mapper1.Front() != nullptr);
2678 EXPECT_EQ(mapper0_count, 3u);
2679 EXPECT_EQ(mapper1_count, 1u);
2680 output1.emplace_back(std::move(*mapper1.Front()));
2681 mapper1.PopFront();
2682 EXPECT_TRUE(mapper1.started());
2683 EXPECT_EQ(mapper0_count, 3u);
2684 EXPECT_EQ(mapper1_count, 1u);
2685
2686 ASSERT_TRUE(mapper1.Front() != nullptr);
2687 EXPECT_EQ(mapper0_count, 3u);
2688 EXPECT_EQ(mapper1_count, 2u);
2689 output1.emplace_back(std::move(*mapper1.Front()));
2690 mapper1.PopFront();
2691 EXPECT_TRUE(mapper1.started());
2692
2693 ASSERT_TRUE(mapper1.Front() != nullptr);
2694 output1.emplace_back(std::move(*mapper1.Front()));
2695 mapper1.PopFront();
2696 EXPECT_TRUE(mapper1.started());
2697
2698 EXPECT_EQ(mapper0_count, 3u);
2699 EXPECT_EQ(mapper1_count, 3u);
2700
2701 ASSERT_TRUE(mapper1.Front() == nullptr);
2702
2703 EXPECT_EQ(mapper0_count, 3u);
2704 EXPECT_EQ(mapper1_count, 3u);
2705
2706 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2707 EXPECT_EQ(output1[0].monotonic_event_time.time,
2708 e + chrono::seconds(100) + chrono::milliseconds(1000));
2709 EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
2710 EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002711 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002712
2713 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2714 EXPECT_EQ(output1[1].monotonic_event_time.time,
2715 e + chrono::seconds(20) + chrono::milliseconds(2000));
2716 EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
2717 EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002718 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002719
2720 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2721 EXPECT_EQ(output1[2].monotonic_event_time.time,
2722 e + chrono::seconds(20) + chrono::milliseconds(3000));
2723 EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
2724 EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002725 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002726
2727 LOG(INFO) << output1[0];
2728 LOG(INFO) << output1[1];
2729 LOG(INFO) << output1[2];
2730 }
2731}
2732
Austin Schuh44c61472021-11-22 21:04:10 -08002733class SortingDeathTest : public SortingElementTest {
2734 public:
2735 SortingDeathTest()
2736 : SortingElementTest(),
2737 part0_(MakeHeader(config_, R"({
2738 /* 100ms */
2739 "max_out_of_order_duration": 100000000,
2740 "node": {
2741 "name": "pi1"
2742 },
2743 "logger_node": {
2744 "name": "pi1"
2745 },
2746 "monotonic_start_time": 1000000,
2747 "realtime_start_time": 1000000000000,
2748 "logger_monotonic_start_time": 1000000,
2749 "logger_realtime_start_time": 1000000000000,
2750 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2751 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2752 "parts_index": 0,
2753 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2754 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2755 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2756 "boot_uuids": [
2757 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2758 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2759 ""
2760 ],
2761 "oldest_remote_monotonic_timestamps": [
2762 9223372036854775807,
2763 9223372036854775807,
2764 9223372036854775807
2765 ],
2766 "oldest_local_monotonic_timestamps": [
2767 9223372036854775807,
2768 9223372036854775807,
2769 9223372036854775807
2770 ],
2771 "oldest_remote_unreliable_monotonic_timestamps": [
2772 9223372036854775807,
2773 0,
2774 9223372036854775807
2775 ],
2776 "oldest_local_unreliable_monotonic_timestamps": [
2777 9223372036854775807,
2778 0,
2779 9223372036854775807
2780 ]
2781})")),
2782 part1_(MakeHeader(config_, R"({
2783 /* 100ms */
2784 "max_out_of_order_duration": 100000000,
2785 "node": {
2786 "name": "pi1"
2787 },
2788 "logger_node": {
2789 "name": "pi1"
2790 },
2791 "monotonic_start_time": 1000000,
2792 "realtime_start_time": 1000000000000,
2793 "logger_monotonic_start_time": 1000000,
2794 "logger_realtime_start_time": 1000000000000,
2795 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2796 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2797 "parts_index": 1,
2798 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2799 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2800 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2801 "boot_uuids": [
2802 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2803 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2804 ""
2805 ],
2806 "oldest_remote_monotonic_timestamps": [
2807 9223372036854775807,
2808 9223372036854775807,
2809 9223372036854775807
2810 ],
2811 "oldest_local_monotonic_timestamps": [
2812 9223372036854775807,
2813 9223372036854775807,
2814 9223372036854775807
2815 ],
2816 "oldest_remote_unreliable_monotonic_timestamps": [
2817 9223372036854775807,
2818 100000,
2819 9223372036854775807
2820 ],
2821 "oldest_local_unreliable_monotonic_timestamps": [
2822 9223372036854775807,
2823 100000,
2824 9223372036854775807
2825 ]
2826})")),
2827 part2_(MakeHeader(config_, R"({
2828 /* 100ms */
2829 "max_out_of_order_duration": 100000000,
2830 "node": {
2831 "name": "pi1"
2832 },
2833 "logger_node": {
2834 "name": "pi1"
2835 },
2836 "monotonic_start_time": 1000000,
2837 "realtime_start_time": 1000000000000,
2838 "logger_monotonic_start_time": 1000000,
2839 "logger_realtime_start_time": 1000000000000,
2840 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2841 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2842 "parts_index": 2,
2843 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2844 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2845 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2846 "boot_uuids": [
2847 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2848 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2849 ""
2850 ],
2851 "oldest_remote_monotonic_timestamps": [
2852 9223372036854775807,
2853 9223372036854775807,
2854 9223372036854775807
2855 ],
2856 "oldest_local_monotonic_timestamps": [
2857 9223372036854775807,
2858 9223372036854775807,
2859 9223372036854775807
2860 ],
2861 "oldest_remote_unreliable_monotonic_timestamps": [
2862 9223372036854775807,
2863 200000,
2864 9223372036854775807
2865 ],
2866 "oldest_local_unreliable_monotonic_timestamps": [
2867 9223372036854775807,
2868 200000,
2869 9223372036854775807
2870 ]
2871})")),
2872 part3_(MakeHeader(config_, R"({
2873 /* 100ms */
2874 "max_out_of_order_duration": 100000000,
2875 "node": {
2876 "name": "pi1"
2877 },
2878 "logger_node": {
2879 "name": "pi1"
2880 },
2881 "monotonic_start_time": 1000000,
2882 "realtime_start_time": 1000000000000,
2883 "logger_monotonic_start_time": 1000000,
2884 "logger_realtime_start_time": 1000000000000,
2885 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2886 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2887 "parts_index": 3,
2888 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2889 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2890 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2891 "boot_uuids": [
2892 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2893 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2894 ""
2895 ],
2896 "oldest_remote_monotonic_timestamps": [
2897 9223372036854775807,
2898 9223372036854775807,
2899 9223372036854775807
2900 ],
2901 "oldest_local_monotonic_timestamps": [
2902 9223372036854775807,
2903 9223372036854775807,
2904 9223372036854775807
2905 ],
2906 "oldest_remote_unreliable_monotonic_timestamps": [
2907 9223372036854775807,
2908 300000,
2909 9223372036854775807
2910 ],
2911 "oldest_local_unreliable_monotonic_timestamps": [
2912 9223372036854775807,
2913 300000,
2914 9223372036854775807
2915 ]
2916})")) {}
2917
2918 protected:
2919 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part0_;
2920 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part1_;
2921 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part2_;
2922 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part3_;
2923};
2924
2925// Tests that if 2 computers go back and forth trying to be the same node, we
2926// die in sorting instead of failing to estimate time.
2927TEST_F(SortingDeathTest, FightingNodes) {
2928 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002929 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh44c61472021-11-22 21:04:10 -08002930 writer0.QueueSpan(part0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002931 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh44c61472021-11-22 21:04:10 -08002932 writer1.QueueSpan(part1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002933 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuh44c61472021-11-22 21:04:10 -08002934 writer2.QueueSpan(part2_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002935 TestDetachedBufferWriter writer3(logfile3_);
Austin Schuh44c61472021-11-22 21:04:10 -08002936 writer3.QueueSpan(part3_.span());
2937 }
2938
2939 EXPECT_DEATH(
2940 {
2941 const std::vector<LogFile> parts =
2942 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2943 },
Austin Schuh22cf7862022-09-19 19:09:42 -07002944 "found overlapping boots on");
Austin Schuh44c61472021-11-22 21:04:10 -08002945}
2946
Brian Smarttea913d42021-12-10 15:02:38 -08002947// Tests that we MessageReader blows up on a bad message.
2948TEST(MessageReaderConfirmCrash, ReadWrite) {
2949 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
2950 unlink(logfile.c_str());
2951
2952 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
2953 JsonToSizedFlatbuffer<LogFileHeader>(
2954 R"({ "max_out_of_order_duration": 100000000 })");
2955 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
2956 JsonToSizedFlatbuffer<MessageHeader>(
2957 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
2958 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
2959 JsonToSizedFlatbuffer<MessageHeader>(
2960 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
2961 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m4 =
2962 JsonToSizedFlatbuffer<MessageHeader>(
2963 R"({ "channel_index": 0, "monotonic_sent_time": 4 })");
2964
2965 // Starts out like a proper flat buffer header, but it breaks down ...
2966 std::vector<uint8_t> garbage{8, 0, 0, 0, 16, 0, 0, 0, 4, 0, 0, 0};
2967 absl::Span<uint8_t> m3_span(garbage);
2968
2969 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002970 TestDetachedBufferWriter writer(logfile);
Brian Smarttea913d42021-12-10 15:02:38 -08002971 writer.QueueSpan(config.span());
2972 writer.QueueSpan(m1.span());
2973 writer.QueueSpan(m2.span());
2974 writer.QueueSpan(m3_span);
2975 writer.QueueSpan(m4.span()); // This message is "hidden"
2976 }
2977
2978 {
2979 MessageReader reader(logfile);
2980
2981 EXPECT_EQ(reader.filename(), logfile);
2982
2983 EXPECT_EQ(
2984 reader.max_out_of_order_duration(),
2985 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2986 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2987 EXPECT_TRUE(reader.ReadMessage());
2988 EXPECT_EQ(reader.newest_timestamp(),
2989 monotonic_clock::time_point(chrono::nanoseconds(1)));
2990 EXPECT_TRUE(reader.ReadMessage());
2991 EXPECT_EQ(reader.newest_timestamp(),
2992 monotonic_clock::time_point(chrono::nanoseconds(2)));
2993 // Confirm default crashing behavior
2994 EXPECT_DEATH(reader.ReadMessage(), "Corrupted message at offset");
2995 }
2996
2997 {
2998 gflags::FlagSaver fs;
2999
3000 MessageReader reader(logfile);
3001 reader.set_crash_on_corrupt_message_flag(false);
3002
3003 EXPECT_EQ(reader.filename(), logfile);
3004
3005 EXPECT_EQ(
3006 reader.max_out_of_order_duration(),
3007 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
3008 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
3009 EXPECT_TRUE(reader.ReadMessage());
3010 EXPECT_EQ(reader.newest_timestamp(),
3011 monotonic_clock::time_point(chrono::nanoseconds(1)));
3012 EXPECT_TRUE(reader.ReadMessage());
3013 EXPECT_EQ(reader.newest_timestamp(),
3014 monotonic_clock::time_point(chrono::nanoseconds(2)));
3015 // Confirm avoiding the corrupted message crash, stopping instead.
3016 EXPECT_FALSE(reader.ReadMessage());
3017 }
3018
3019 {
3020 gflags::FlagSaver fs;
3021
3022 MessageReader reader(logfile);
3023 reader.set_crash_on_corrupt_message_flag(false);
3024 reader.set_ignore_corrupt_messages_flag(true);
3025
3026 EXPECT_EQ(reader.filename(), logfile);
3027
3028 EXPECT_EQ(
3029 reader.max_out_of_order_duration(),
3030 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
3031 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
3032 EXPECT_TRUE(reader.ReadMessage());
3033 EXPECT_EQ(reader.newest_timestamp(),
3034 monotonic_clock::time_point(chrono::nanoseconds(1)));
3035 EXPECT_TRUE(reader.ReadMessage());
3036 EXPECT_EQ(reader.newest_timestamp(),
3037 monotonic_clock::time_point(chrono::nanoseconds(2)));
3038 // Confirm skipping of the corrupted message to read the hidden one.
3039 EXPECT_TRUE(reader.ReadMessage());
3040 EXPECT_EQ(reader.newest_timestamp(),
3041 monotonic_clock::time_point(chrono::nanoseconds(4)));
3042 EXPECT_FALSE(reader.ReadMessage());
3043 }
3044}
3045
Austin Schuhfa30c352022-10-16 11:12:02 -07003046class InlinePackMessage : public ::testing::Test {
3047 protected:
3048 aos::Context RandomContext() {
3049 data_ = RandomData();
3050 std::uniform_int_distribution<uint32_t> uint32_distribution(
3051 std::numeric_limits<uint32_t>::min(),
3052 std::numeric_limits<uint32_t>::max());
3053
3054 std::uniform_int_distribution<int64_t> time_distribution(
3055 std::numeric_limits<int64_t>::min(),
3056 std::numeric_limits<int64_t>::max());
3057
3058 aos::Context context;
3059 context.monotonic_event_time =
3060 aos::monotonic_clock::epoch() +
3061 chrono::nanoseconds(time_distribution(random_number_generator_));
3062 context.realtime_event_time =
3063 aos::realtime_clock::epoch() +
3064 chrono::nanoseconds(time_distribution(random_number_generator_));
3065
3066 context.monotonic_remote_time =
3067 aos::monotonic_clock::epoch() +
3068 chrono::nanoseconds(time_distribution(random_number_generator_));
3069 context.realtime_remote_time =
3070 aos::realtime_clock::epoch() +
3071 chrono::nanoseconds(time_distribution(random_number_generator_));
3072
3073 context.queue_index = uint32_distribution(random_number_generator_);
3074 context.remote_queue_index = uint32_distribution(random_number_generator_);
3075 context.size = data_.size();
3076 context.data = data_.data();
3077 return context;
3078 }
3079
Austin Schuhf2d0e682022-10-16 14:20:58 -07003080 aos::monotonic_clock::time_point RandomMonotonic() {
3081 std::uniform_int_distribution<int64_t> time_distribution(
3082 0, std::numeric_limits<int64_t>::max());
3083 return aos::monotonic_clock::epoch() +
3084 chrono::nanoseconds(time_distribution(random_number_generator_));
3085 }
3086
3087 aos::SizePrefixedFlatbufferDetachedBuffer<message_bridge::RemoteMessage>
3088 RandomRemoteMessage() {
3089 std::uniform_int_distribution<uint8_t> uint8_distribution(
3090 std::numeric_limits<uint8_t>::min(),
3091 std::numeric_limits<uint8_t>::max());
3092
3093 std::uniform_int_distribution<int64_t> time_distribution(
3094 std::numeric_limits<int64_t>::min(),
3095 std::numeric_limits<int64_t>::max());
3096
3097 flatbuffers::FlatBufferBuilder fbb;
3098 message_bridge::RemoteMessage::Builder builder(fbb);
3099 builder.add_queue_index(uint8_distribution(random_number_generator_));
3100
3101 builder.add_monotonic_sent_time(
3102 time_distribution(random_number_generator_));
3103 builder.add_realtime_sent_time(time_distribution(random_number_generator_));
3104 builder.add_monotonic_remote_time(
3105 time_distribution(random_number_generator_));
3106 builder.add_realtime_remote_time(
3107 time_distribution(random_number_generator_));
3108
3109 builder.add_remote_queue_index(
3110 uint8_distribution(random_number_generator_));
3111
3112 fbb.FinishSizePrefixed(builder.Finish());
3113 return fbb.Release();
3114 }
3115
Austin Schuhfa30c352022-10-16 11:12:02 -07003116 std::vector<uint8_t> RandomData() {
3117 std::vector<uint8_t> result;
3118 std::uniform_int_distribution<int> length_distribution(1, 32);
3119 std::uniform_int_distribution<uint8_t> data_distribution(
3120 std::numeric_limits<uint8_t>::min(),
3121 std::numeric_limits<uint8_t>::max());
3122
3123 const size_t length = length_distribution(random_number_generator_);
3124
3125 result.reserve(length);
3126 for (size_t i = 0; i < length; ++i) {
3127 result.emplace_back(data_distribution(random_number_generator_));
3128 }
3129 return result;
3130 }
3131
3132 std::mt19937 random_number_generator_{
3133 std::mt19937(::aos::testing::RandomSeed())};
3134
3135 std::vector<uint8_t> data_;
3136};
3137
3138// Uses the binary schema to annotate a provided flatbuffer. Returns the
3139// annotated flatbuffer.
3140std::string AnnotateBinaries(
3141 const aos::NonSizePrefixedFlatbuffer<reflection::Schema> &schema,
3142 const std::string &schema_filename,
3143 flatbuffers::span<uint8_t> binary_data) {
3144 flatbuffers::BinaryAnnotator binary_annotator(
3145 schema.span().data(), schema.span().size(), binary_data.data(),
3146 binary_data.size());
3147
3148 auto annotations = binary_annotator.Annotate();
3149
3150 flatbuffers::AnnotatedBinaryTextGenerator text_generator(
3151 flatbuffers::AnnotatedBinaryTextGenerator::Options{}, annotations,
3152 binary_data.data(), binary_data.size());
3153
3154 text_generator.Generate(aos::testing::TestTmpDir() + "/foo.bfbs",
3155 schema_filename);
3156
3157 return aos::util::ReadFileToStringOrDie(aos::testing::TestTmpDir() +
3158 "/foo.afb");
3159}
3160
Austin Schuh71a40d42023-02-04 21:22:22 -08003161// Event loop which just has working time functions for the Copier classes
3162// tested below.
3163class TimeEventLoop : public EventLoop {
3164 public:
3165 TimeEventLoop() : EventLoop(nullptr) {}
3166
3167 aos::monotonic_clock::time_point monotonic_now() const final {
3168 return aos::monotonic_clock::min_time;
3169 }
3170 realtime_clock::time_point realtime_now() const final {
3171 return aos::realtime_clock::min_time;
3172 }
3173
3174 void OnRun(::std::function<void()> /*on_run*/) final { LOG(FATAL); }
3175
3176 const std::string_view name() const final { return "time"; }
3177 const Node *node() const final { return nullptr; }
3178
3179 void SetRuntimeAffinity(const cpu_set_t & /*cpuset*/) final { LOG(FATAL); }
3180 void SetRuntimeRealtimePriority(int /*priority*/) final { LOG(FATAL); }
3181
3182 const cpu_set_t &runtime_affinity() const final {
3183 LOG(FATAL);
3184 return cpuset_;
3185 }
3186
3187 TimerHandler *AddTimer(::std::function<void()> /*callback*/) final {
3188 LOG(FATAL);
3189 return nullptr;
3190 }
3191
3192 std::unique_ptr<RawSender> MakeRawSender(const Channel * /*channel*/) final {
3193 LOG(FATAL);
3194 return std::unique_ptr<RawSender>();
3195 }
3196
3197 const UUID &boot_uuid() const final {
3198 LOG(FATAL);
3199 return boot_uuid_;
3200 }
3201
3202 void set_name(const std::string_view name) final { LOG(FATAL) << name; }
3203
3204 pid_t GetTid() final {
3205 LOG(FATAL);
3206 return 0;
3207 }
3208
3209 int NumberBuffers(const Channel * /*channel*/) final {
3210 LOG(FATAL);
3211 return 0;
3212 }
3213
3214 int runtime_realtime_priority() const final {
3215 LOG(FATAL);
3216 return 0;
3217 }
3218
3219 std::unique_ptr<RawFetcher> MakeRawFetcher(
3220 const Channel * /*channel*/) final {
3221 LOG(FATAL);
3222 return std::unique_ptr<RawFetcher>();
3223 }
3224
3225 PhasedLoopHandler *AddPhasedLoop(
3226 ::std::function<void(int)> /*callback*/,
3227 const monotonic_clock::duration /*interval*/,
3228 const monotonic_clock::duration /*offset*/) final {
3229 LOG(FATAL);
3230 return nullptr;
3231 }
3232
3233 void MakeRawWatcher(
3234 const Channel * /*channel*/,
3235 std::function<void(const Context &context, const void *message)>
3236 /*watcher*/) final {
3237 LOG(FATAL);
3238 }
3239
3240 private:
3241 const cpu_set_t cpuset_ = DefaultAffinity();
3242 UUID boot_uuid_ = UUID ::Zero();
3243};
3244
Austin Schuhfa30c352022-10-16 11:12:02 -07003245// Tests that all variations of PackMessage are equivalent to the inline
3246// PackMessage used to avoid allocations.
3247TEST_F(InlinePackMessage, Equivilent) {
3248 std::uniform_int_distribution<uint32_t> uint32_distribution(
3249 std::numeric_limits<uint32_t>::min(),
3250 std::numeric_limits<uint32_t>::max());
3251 aos::FlatbufferVector<reflection::Schema> schema =
3252 FileToFlatbuffer<reflection::Schema>(
3253 ArtifactPath("aos/events/logging/logger.bfbs"));
3254
3255 for (const LogType type :
3256 {LogType::kLogMessage, LogType::kLogDeliveryTimeOnly,
3257 LogType::kLogMessageAndDeliveryTime, LogType::kLogRemoteMessage}) {
3258 for (int i = 0; i < 100; ++i) {
3259 aos::Context context = RandomContext();
3260 const uint32_t channel_index =
3261 uint32_distribution(random_number_generator_);
3262
3263 flatbuffers::FlatBufferBuilder fbb;
3264 fbb.ForceDefaults(true);
3265 fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index, type));
3266
3267 VLOG(1) << absl::BytesToHexString(std::string_view(
3268 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3269 fbb.GetBufferSpan().size()));
3270
3271 // Make sure that both the builder and inline method agree on sizes.
Austin Schuh48d10d62022-10-16 22:19:23 -07003272 ASSERT_EQ(fbb.GetSize(), PackMessageSize(type, context.size))
Austin Schuhfa30c352022-10-16 11:12:02 -07003273 << "log type " << static_cast<int>(type);
3274
3275 // Initialize the buffer to something nonzero to make sure all the padding
3276 // bytes are set to 0.
Austin Schuh48d10d62022-10-16 22:19:23 -07003277 std::vector<uint8_t> repacked_message(PackMessageSize(type, context.size),
3278 67);
Austin Schuhfa30c352022-10-16 11:12:02 -07003279
3280 // And verify packing inline works as expected.
Austin Schuh71a40d42023-02-04 21:22:22 -08003281 EXPECT_EQ(
3282 repacked_message.size(),
3283 PackMessageInline(repacked_message.data(), context, channel_index,
3284 type, 0u, repacked_message.size()));
Austin Schuhfa30c352022-10-16 11:12:02 -07003285 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
3286 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3287 fbb.GetBufferSpan().size()))
3288 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
3289 fbb.GetBufferSpan());
Austin Schuh71a40d42023-02-04 21:22:22 -08003290
3291 // Ok, now we want to confirm that we can build up arbitrary pieces of
3292 // said flatbuffer. Try all of them since it is cheap.
3293 TimeEventLoop event_loop;
3294 for (size_t i = 0; i < repacked_message.size(); i += 8) {
3295 for (size_t j = i; j < repacked_message.size(); j += 8) {
3296 std::vector<uint8_t> destination(repacked_message.size(), 67u);
3297 ContextDataCopier copier(context, channel_index, type, &event_loop);
3298
3299 copier.Copy(destination.data(), i, j);
3300
3301 size_t index = 0;
3302 for (size_t k = i; k < j; ++k) {
3303 ASSERT_EQ(destination[index], repacked_message[k])
3304 << ": Failed to match type " << static_cast<int>(type)
3305 << ", index " << index << " while testing range " << i << " to "
3306 << j;
3307 ;
3308 ++index;
3309 }
3310 // Now, confirm that none of the other bytes have been touched.
3311 for (; index < destination.size(); ++index) {
3312 ASSERT_EQ(destination[index], 67u);
3313 }
3314 }
3315 }
Austin Schuhfa30c352022-10-16 11:12:02 -07003316 }
3317 }
3318}
3319
Austin Schuhf2d0e682022-10-16 14:20:58 -07003320// Tests that all variations of PackMessage are equivilent to the inline
3321// PackMessage used to avoid allocations.
3322TEST_F(InlinePackMessage, RemoteEquivilent) {
3323 aos::FlatbufferVector<reflection::Schema> schema =
3324 FileToFlatbuffer<reflection::Schema>(
3325 ArtifactPath("aos/events/logging/logger.bfbs"));
3326 std::uniform_int_distribution<uint8_t> uint8_distribution(
3327 std::numeric_limits<uint8_t>::min(), std::numeric_limits<uint8_t>::max());
3328
3329 for (int i = 0; i < 100; ++i) {
3330 aos::SizePrefixedFlatbufferDetachedBuffer<RemoteMessage> random_msg =
3331 RandomRemoteMessage();
3332 const size_t channel_index = uint8_distribution(random_number_generator_);
3333 const monotonic_clock::time_point monotonic_timestamp_time =
3334 RandomMonotonic();
3335
3336 flatbuffers::FlatBufferBuilder fbb;
3337 fbb.ForceDefaults(true);
3338 fbb.FinishSizePrefixed(PackRemoteMessage(
3339 &fbb, &random_msg.message(), channel_index, monotonic_timestamp_time));
3340
3341 VLOG(1) << absl::BytesToHexString(std::string_view(
3342 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3343 fbb.GetBufferSpan().size()));
3344
3345 // Make sure that both the builder and inline method agree on sizes.
3346 ASSERT_EQ(fbb.GetSize(), PackRemoteMessageSize());
3347
3348 // Initialize the buffer to something nonzer to make sure all the padding
3349 // bytes are set to 0.
3350 std::vector<uint8_t> repacked_message(PackRemoteMessageSize(), 67);
3351
3352 // And verify packing inline works as expected.
Austin Schuh71a40d42023-02-04 21:22:22 -08003353 EXPECT_EQ(repacked_message.size(),
3354 PackRemoteMessageInline(
3355 repacked_message.data(), &random_msg.message(), channel_index,
3356 monotonic_timestamp_time, 0u, repacked_message.size()));
Austin Schuhf2d0e682022-10-16 14:20:58 -07003357 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
3358 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3359 fbb.GetBufferSpan().size()))
3360 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
3361 fbb.GetBufferSpan());
Austin Schuh71a40d42023-02-04 21:22:22 -08003362
3363 // Ok, now we want to confirm that we can build up arbitrary pieces of said
3364 // flatbuffer. Try all of them since it is cheap.
3365 TimeEventLoop event_loop;
3366 for (size_t i = 0; i < repacked_message.size(); i += 8) {
3367 for (size_t j = i; j < repacked_message.size(); j += 8) {
3368 std::vector<uint8_t> destination(repacked_message.size(), 67u);
3369 RemoteMessageCopier copier(&random_msg.message(), channel_index,
3370 monotonic_timestamp_time, &event_loop);
3371
3372 copier.Copy(destination.data(), i, j);
3373
3374 size_t index = 0;
3375 for (size_t k = i; k < j; ++k) {
3376 ASSERT_EQ(destination[index], repacked_message[k]);
3377 ++index;
3378 }
3379 for (; index < destination.size(); ++index) {
3380 ASSERT_EQ(destination[index], 67u);
3381 }
3382 }
3383 }
Austin Schuhf2d0e682022-10-16 14:20:58 -07003384 }
3385}
Austin Schuhfa30c352022-10-16 11:12:02 -07003386
Stephan Pleinesf63bde82024-01-13 15:59:33 -08003387} // namespace aos::logger::testing