blob: a0abcd8bcd99c4ddd509ab4f48b23705f477b546 [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
Alexei Strots01395492023-03-20 13:59:56 -07004#include <filesystem>
Austin Schuhfa30c352022-10-16 11:12:02 -07005#include <random>
Austin Schuhe243aaf2020-10-11 15:46:02 -07006#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07007
Austin Schuhfa30c352022-10-16 11:12:02 -07008#include "absl/strings/escaping.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07009#include "external/com_github_google_flatbuffers/src/annotated_binary_text_gen.h"
10#include "external/com_github_google_flatbuffers/src/binary_annotator.h"
11#include "flatbuffers/reflection_generated.h"
12#include "gflags/gflags.h"
13#include "gtest/gtest.h"
14
Austin Schuhc41603c2020-10-11 16:17:37 -070015#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -070016#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -080017#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070018#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070019#include "aos/json_to_flatbuffer.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070020#include "aos/testing/path.h"
21#include "aos/testing/random_seed.h"
Austin Schuhc243b422020-10-11 15:35:08 -070022#include "aos/testing/tmpdir.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070023#include "aos/util/file.h"
Austin Schuhc243b422020-10-11 15:35:08 -070024
25namespace aos {
26namespace logger {
27namespace testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070028namespace chrono = std::chrono;
Austin Schuhf2d0e682022-10-16 14:20:58 -070029using aos::message_bridge::RemoteMessage;
Eric Schmiedebergb38477e2022-12-02 16:08:04 -070030using aos::testing::ArtifactPath;
Austin Schuhc243b422020-10-11 15:35:08 -070031
Austin Schuhd863e6e2022-10-16 15:44:50 -070032// Adapter class to make it easy to test DetachedBufferWriter without adding
33// test only boilerplate to DetachedBufferWriter.
Alexei Strots15c22b12023-04-04 16:27:17 -070034class TestDetachedBufferWriter : public FileBackend,
35 public DetachedBufferWriter {
Austin Schuhd863e6e2022-10-16 15:44:50 -070036 public:
Austin Schuh48d10d62022-10-16 22:19:23 -070037 // Pick a max size that is rather conservative.
38 static constexpr size_t kMaxMessageSize = 128 * 1024;
Austin Schuhd863e6e2022-10-16 15:44:50 -070039 TestDetachedBufferWriter(std::string_view filename)
colleen61276dc2023-06-01 09:23:29 -070040 : FileBackend("/", false),
Alexei Strots15c22b12023-04-04 16:27:17 -070041 DetachedBufferWriter(FileBackend::RequestFile(filename),
42 std::make_unique<DummyEncoder>(kMaxMessageSize)) {}
Austin Schuhd863e6e2022-10-16 15:44:50 -070043 void WriteSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
44 QueueSpan(absl::Span<const uint8_t>(buffer.data(), buffer.size()));
45 }
Austin Schuh7ef11a42023-02-04 17:15:12 -080046 void QueueSpan(absl::Span<const uint8_t> buffer) {
47 DataEncoder::SpanCopier coppier(buffer);
48 CopyMessage(&coppier, monotonic_clock::now());
49 }
Austin Schuhd863e6e2022-10-16 15:44:50 -070050};
51
Austin Schuhe243aaf2020-10-11 15:46:02 -070052// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070053template <typename T>
54SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
55 const std::string_view data) {
56 flatbuffers::FlatBufferBuilder fbb;
57 fbb.ForceDefaults(true);
58 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
59 return fbb.Release();
60}
61
Austin Schuhe243aaf2020-10-11 15:46:02 -070062// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070063TEST(SpanReaderTest, ReadWrite) {
64 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
65 unlink(logfile.c_str());
66
67 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080068 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070069 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080070 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070071
72 {
Austin Schuhd863e6e2022-10-16 15:44:50 -070073 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080074 writer.QueueSpan(m1.span());
75 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070076 }
77
78 SpanReader reader(logfile);
79
80 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhcf5f6442021-07-06 10:43:28 -070081 EXPECT_EQ(reader.PeekMessage(), m1.span());
82 EXPECT_EQ(reader.PeekMessage(), m1.span());
Austin Schuhadd6eb32020-11-09 21:24:26 -080083 EXPECT_EQ(reader.ReadMessage(), m1.span());
84 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhcf5f6442021-07-06 10:43:28 -070085 EXPECT_EQ(reader.PeekMessage(), absl::Span<const uint8_t>());
Austin Schuhc243b422020-10-11 15:35:08 -070086 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
87}
88
Austin Schuhe243aaf2020-10-11 15:46:02 -070089// Tests that we can actually parse the resulting messages at a basic level
90// through MessageReader.
91TEST(MessageReaderTest, ReadWrite) {
92 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
93 unlink(logfile.c_str());
94
95 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
96 JsonToSizedFlatbuffer<LogFileHeader>(
97 R"({ "max_out_of_order_duration": 100000000 })");
98 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
99 JsonToSizedFlatbuffer<MessageHeader>(
100 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
101 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
102 JsonToSizedFlatbuffer<MessageHeader>(
103 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
104
105 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700106 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800107 writer.QueueSpan(config.span());
108 writer.QueueSpan(m1.span());
109 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -0700110 }
111
112 MessageReader reader(logfile);
113
114 EXPECT_EQ(reader.filename(), logfile);
115
116 EXPECT_EQ(
117 reader.max_out_of_order_duration(),
118 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
119 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
120 EXPECT_TRUE(reader.ReadMessage());
121 EXPECT_EQ(reader.newest_timestamp(),
122 monotonic_clock::time_point(chrono::nanoseconds(1)));
123 EXPECT_TRUE(reader.ReadMessage());
124 EXPECT_EQ(reader.newest_timestamp(),
125 monotonic_clock::time_point(chrono::nanoseconds(2)));
126 EXPECT_FALSE(reader.ReadMessage());
127}
128
Austin Schuh32f68492020-11-08 21:45:51 -0800129// Tests that we explode when messages are too far out of order.
130TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
131 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
132 unlink(logfile0.c_str());
133
134 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
135 JsonToSizedFlatbuffer<LogFileHeader>(
136 R"({
137 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800138 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800139 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
140 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
141 "parts_index": 0
142})");
143
144 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
145 JsonToSizedFlatbuffer<MessageHeader>(
146 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
147 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
148 JsonToSizedFlatbuffer<MessageHeader>(
149 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
150 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
151 JsonToSizedFlatbuffer<MessageHeader>(
152 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
153
154 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700155 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800156 writer.QueueSpan(config0.span());
157 writer.QueueSpan(m1.span());
158 writer.QueueSpan(m2.span());
159 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800160 }
Alexei Strots01395492023-03-20 13:59:56 -0700161 ASSERT_TRUE(std::filesystem::exists(logfile0)) << logfile0;
Austin Schuh32f68492020-11-08 21:45:51 -0800162
163 const std::vector<LogFile> parts = SortParts({logfile0});
164
165 PartsMessageReader reader(parts[0].parts[0]);
166
167 EXPECT_TRUE(reader.ReadMessage());
168 EXPECT_TRUE(reader.ReadMessage());
169 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
170}
171
Austin Schuhc41603c2020-10-11 16:17:37 -0700172// Tests that we can transparently re-assemble part files with a
173// PartsMessageReader.
174TEST(PartsMessageReaderTest, ReadWrite) {
175 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
176 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
177 unlink(logfile0.c_str());
178 unlink(logfile1.c_str());
179
180 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
181 JsonToSizedFlatbuffer<LogFileHeader>(
182 R"({
183 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800184 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700185 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
186 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
187 "parts_index": 0
188})");
189 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
190 JsonToSizedFlatbuffer<LogFileHeader>(
191 R"({
192 "max_out_of_order_duration": 200000000,
193 "monotonic_start_time": 0,
194 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800195 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700196 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
197 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
198 "parts_index": 1
199})");
200
201 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
202 JsonToSizedFlatbuffer<MessageHeader>(
203 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
204 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
205 JsonToSizedFlatbuffer<MessageHeader>(
206 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
207
208 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700209 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800210 writer.QueueSpan(config0.span());
211 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700212 }
213 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700214 TestDetachedBufferWriter writer(logfile1);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800215 writer.QueueSpan(config1.span());
216 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700217 }
218
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700219 // When parts are sorted, we choose the highest max out of order duration for
220 // all parts with the same part uuid.
Austin Schuhc41603c2020-10-11 16:17:37 -0700221 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
222
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700223 EXPECT_EQ(parts.size(), 1);
224 EXPECT_EQ(parts[0].parts.size(), 1);
225
Austin Schuhc41603c2020-10-11 16:17:37 -0700226 PartsMessageReader reader(parts[0].parts[0]);
227
228 EXPECT_EQ(reader.filename(), logfile0);
229
230 // Confirm that the timestamps track, and the filename also updates.
231 // Read the first message.
232 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700233 // Since config1 has higher max out of order duration, that will be used to
234 // read partfiles with same part uuid, i.e logfile0 and logfile1.
Austin Schuhc41603c2020-10-11 16:17:37 -0700235 EXPECT_EQ(
236 reader.max_out_of_order_duration(),
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700237 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuhc41603c2020-10-11 16:17:37 -0700238 EXPECT_TRUE(reader.ReadMessage());
239 EXPECT_EQ(reader.filename(), logfile0);
240 EXPECT_EQ(reader.newest_timestamp(),
241 monotonic_clock::time_point(chrono::nanoseconds(1)));
242 EXPECT_EQ(
243 reader.max_out_of_order_duration(),
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700244 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuhc41603c2020-10-11 16:17:37 -0700245
246 // Read the second message.
247 EXPECT_TRUE(reader.ReadMessage());
248 EXPECT_EQ(reader.filename(), logfile1);
249 EXPECT_EQ(reader.newest_timestamp(),
250 monotonic_clock::time_point(chrono::nanoseconds(2)));
251 EXPECT_EQ(
252 reader.max_out_of_order_duration(),
253 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
254
255 // And then confirm that reading again returns no message.
256 EXPECT_FALSE(reader.ReadMessage());
257 EXPECT_EQ(reader.filename(), logfile1);
258 EXPECT_EQ(
259 reader.max_out_of_order_duration(),
260 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800261 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700262
263 // Verify that the parts metadata has the correct max out of order duration.
264 EXPECT_EQ(
265 parts[0].parts[0].max_out_of_order_duration,
266 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuhc41603c2020-10-11 16:17:37 -0700267}
Austin Schuh32f68492020-11-08 21:45:51 -0800268
Austin Schuh1be0ce42020-11-29 22:43:26 -0800269// Tests that Message's operator < works as expected.
270TEST(MessageTest, Sorting) {
271 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
272
273 Message m1{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700274 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700275 .timestamp =
276 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
Austin Schuh48507722021-07-17 17:29:24 -0700277 .monotonic_remote_boot = 0xffffff,
278 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700279 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800280 Message m2{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700281 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700282 .timestamp =
283 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
Austin Schuh48507722021-07-17 17:29:24 -0700284 .monotonic_remote_boot = 0xffffff,
285 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700286 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800287
288 EXPECT_LT(m1, m2);
289 EXPECT_GE(m2, m1);
290
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700291 m1.timestamp.time = e;
292 m2.timestamp.time = e;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800293
294 m1.channel_index = 1;
295 m2.channel_index = 2;
296
297 EXPECT_LT(m1, m2);
298 EXPECT_GE(m2, m1);
299
300 m1.channel_index = 0;
301 m2.channel_index = 0;
Austin Schuh58646e22021-08-23 23:51:46 -0700302 m1.queue_index.index = 0u;
303 m2.queue_index.index = 1u;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800304
305 EXPECT_LT(m1, m2);
306 EXPECT_GE(m2, m1);
307}
308
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800309aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
310 const aos::FlatbufferDetachedBuffer<Configuration> &config,
311 const std::string_view json) {
312 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700313 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800314 flatbuffers::Offset<Configuration> config_offset =
315 aos::CopyFlatBuffer(config, &fbb);
316 LogFileHeader::Builder header_builder(fbb);
317 header_builder.add_configuration(config_offset);
318 fbb.Finish(header_builder.Finish());
319 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
320
321 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
322 JsonToFlatbuffer<LogFileHeader>(json));
323 CHECK(header_updates.Verify());
324 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700325 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800326 fbb2.FinishSizePrefixed(
327 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
328 return fbb2.Release();
329}
330
331class SortingElementTest : public ::testing::Test {
332 public:
333 SortingElementTest()
334 : config_(JsonToFlatbuffer<Configuration>(
335 R"({
336 "channels": [
337 {
338 "name": "/a",
339 "type": "aos.logger.testing.TestMessage",
340 "source_node": "pi1",
341 "destination_nodes": [
342 {
343 "name": "pi2"
344 },
345 {
346 "name": "pi3"
347 }
348 ]
349 },
350 {
351 "name": "/b",
352 "type": "aos.logger.testing.TestMessage",
353 "source_node": "pi1"
354 },
355 {
356 "name": "/c",
357 "type": "aos.logger.testing.TestMessage",
358 "source_node": "pi1"
Austin Schuh48507722021-07-17 17:29:24 -0700359 },
360 {
361 "name": "/d",
362 "type": "aos.logger.testing.TestMessage",
363 "source_node": "pi2",
364 "destination_nodes": [
365 {
366 "name": "pi1"
367 }
368 ]
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800369 }
370 ],
371 "nodes": [
372 {
373 "name": "pi1"
374 },
375 {
376 "name": "pi2"
377 },
378 {
379 "name": "pi3"
380 }
381 ]
382}
383)")),
384 config0_(MakeHeader(config_, R"({
385 /* 100ms */
386 "max_out_of_order_duration": 100000000,
387 "node": {
388 "name": "pi1"
389 },
390 "logger_node": {
391 "name": "pi1"
392 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800393 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800394 "realtime_start_time": 1000000000000,
395 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700396 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
397 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
398 "boot_uuids": [
399 "1d782c63-b3c7-466e-bea9-a01308b43333",
400 "",
401 ""
402 ],
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800403 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
404 "parts_index": 0
405})")),
406 config1_(MakeHeader(config_,
407 R"({
408 /* 100ms */
409 "max_out_of_order_duration": 100000000,
410 "node": {
411 "name": "pi1"
412 },
413 "logger_node": {
414 "name": "pi1"
415 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800416 "monotonic_start_time": 1000000,
417 "realtime_start_time": 1000000000000,
418 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700419 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
420 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
421 "boot_uuids": [
422 "1d782c63-b3c7-466e-bea9-a01308b43333",
423 "",
424 ""
425 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800426 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
427 "parts_index": 0
428})")),
429 config2_(MakeHeader(config_,
430 R"({
431 /* 100ms */
432 "max_out_of_order_duration": 100000000,
433 "node": {
434 "name": "pi2"
435 },
436 "logger_node": {
437 "name": "pi2"
438 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800439 "monotonic_start_time": 0,
440 "realtime_start_time": 1000000000000,
Austin Schuh48507722021-07-17 17:29:24 -0700441 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
442 "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
443 "boot_uuids": [
444 "",
445 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
446 ""
447 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800448 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
449 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
450 "parts_index": 0
451})")),
452 config3_(MakeHeader(config_,
453 R"({
454 /* 100ms */
455 "max_out_of_order_duration": 100000000,
456 "node": {
457 "name": "pi1"
458 },
459 "logger_node": {
460 "name": "pi1"
461 },
462 "monotonic_start_time": 2000000,
463 "realtime_start_time": 1000000000,
464 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
Austin Schuh48507722021-07-17 17:29:24 -0700465 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
466 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
467 "boot_uuids": [
468 "1d782c63-b3c7-466e-bea9-a01308b43333",
469 "",
470 ""
471 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800472 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800473 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800474})")),
475 config4_(MakeHeader(config_,
476 R"({
477 /* 100ms */
478 "max_out_of_order_duration": 100000000,
479 "node": {
480 "name": "pi2"
481 },
482 "logger_node": {
483 "name": "pi1"
484 },
485 "monotonic_start_time": 2000000,
486 "realtime_start_time": 1000000000,
487 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
488 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
Austin Schuh48507722021-07-17 17:29:24 -0700489 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
490 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
491 "boot_uuids": [
492 "1d782c63-b3c7-466e-bea9-a01308b43333",
493 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
494 ""
495 ],
Austin Schuh8bf1e632021-01-02 22:41:04 -0800496 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800497})")) {
498 unlink(logfile0_.c_str());
499 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800500 unlink(logfile2_.c_str());
Austin Schuh48507722021-07-17 17:29:24 -0700501 unlink(logfile3_.c_str());
Austin Schuh921b2562021-07-31 19:37:03 -0700502 queue_index_.resize(config_.message().channels()->size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800503 }
504
505 protected:
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800506 flatbuffers::DetachedBuffer MakeLogMessage(
507 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
508 int value) {
509 flatbuffers::FlatBufferBuilder message_fbb;
510 message_fbb.ForceDefaults(true);
511 TestMessage::Builder test_message_builder(message_fbb);
512 test_message_builder.add_value(value);
513 message_fbb.Finish(test_message_builder.Finish());
514
515 aos::Context context;
516 context.monotonic_event_time = monotonic_now;
517 context.realtime_event_time = aos::realtime_clock::epoch() +
518 chrono::seconds(1000) +
519 monotonic_now.time_since_epoch();
Austin Schuh921b2562021-07-31 19:37:03 -0700520 CHECK_LT(static_cast<size_t>(channel_index), queue_index_.size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800521 context.queue_index = queue_index_[channel_index];
522 context.size = message_fbb.GetSize();
523 context.data = message_fbb.GetBufferPointer();
524
525 ++queue_index_[channel_index];
526
527 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh53e3a812021-11-03 16:57:23 -0700528 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800529 fbb.FinishSizePrefixed(
530 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
531
532 return fbb.Release();
533 }
534
535 flatbuffers::DetachedBuffer MakeTimestampMessage(
536 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800537 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
538 monotonic_clock::time_point monotonic_timestamp_time =
539 monotonic_clock::min_time) {
540 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800541 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800542
543 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800544 fbb.ForceDefaults(true);
545
546 logger::MessageHeader::Builder message_header_builder(fbb);
547
548 message_header_builder.add_channel_index(channel_index);
549
550 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
551 100);
552 message_header_builder.add_monotonic_sent_time(
553 monotonic_sent_time.time_since_epoch().count());
554 message_header_builder.add_realtime_sent_time(
555 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
556 monotonic_sent_time.time_since_epoch())
557 .time_since_epoch()
558 .count());
559
560 message_header_builder.add_monotonic_remote_time(
561 sender_monotonic_now.time_since_epoch().count());
562 message_header_builder.add_realtime_remote_time(
563 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
564 sender_monotonic_now.time_since_epoch())
565 .time_since_epoch()
566 .count());
567 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
568 1);
569
570 if (monotonic_timestamp_time != monotonic_clock::min_time) {
571 message_header_builder.add_monotonic_timestamp_time(
572 monotonic_timestamp_time.time_since_epoch().count());
573 }
574
575 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800576 LOG(INFO) << aos::FlatbufferToJson(
577 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
578 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
579
580 return fbb.Release();
581 }
582
583 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
584 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800585 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh48507722021-07-17 17:29:24 -0700586 const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800587
588 const aos::FlatbufferDetachedBuffer<Configuration> config_;
589 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
590 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800591 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
592 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800593 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800594
595 std::vector<uint32_t> queue_index_;
596};
597
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700598using MessageSorterTest = SortingElementTest;
599using MessageSorterDeathTest = MessageSorterTest;
600using PartsMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800601using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800602
603// Tests that we can pull messages out of a log sorted in order.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700604TEST_F(MessageSorterTest, Pull) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800605 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
606 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700607 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800608 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700609 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800610 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700611 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800612 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700613 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800614 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700615 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800616 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
617 }
618
619 const std::vector<LogFile> parts = SortParts({logfile0_});
620
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700621 MessageSorter message_sorter(parts[0].parts[0]);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800622
623 // Confirm we aren't sorted until any time until the message is popped.
624 // Peeking shouldn't change the sorted until time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700625 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800626
627 std::deque<Message> output;
628
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700629 ASSERT_TRUE(message_sorter.Front() != nullptr);
630 output.emplace_back(std::move(*message_sorter.Front()));
631 message_sorter.PopFront();
632 EXPECT_EQ(message_sorter.sorted_until(), e + chrono::milliseconds(1900));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800633
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700634 ASSERT_TRUE(message_sorter.Front() != nullptr);
635 output.emplace_back(std::move(*message_sorter.Front()));
636 message_sorter.PopFront();
637 EXPECT_EQ(message_sorter.sorted_until(), e + chrono::milliseconds(1900));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800638
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700639 ASSERT_TRUE(message_sorter.Front() != nullptr);
640 output.emplace_back(std::move(*message_sorter.Front()));
641 message_sorter.PopFront();
642 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::max_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800643
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700644 ASSERT_TRUE(message_sorter.Front() != nullptr);
645 output.emplace_back(std::move(*message_sorter.Front()));
646 message_sorter.PopFront();
647 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::max_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800648
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700649 ASSERT_TRUE(message_sorter.Front() == nullptr);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800650
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700651 EXPECT_EQ(output[0].timestamp.boot, 0);
652 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
653 EXPECT_EQ(output[1].timestamp.boot, 0);
654 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1000));
655 EXPECT_EQ(output[2].timestamp.boot, 0);
656 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1901));
657 EXPECT_EQ(output[3].timestamp.boot, 0);
658 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800659}
660
Austin Schuhb000de62020-12-03 22:00:40 -0800661// Tests that we can pull messages out of a log sorted in order.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700662TEST_F(MessageSorterTest, WayBeforeStart) {
Austin Schuhb000de62020-12-03 22:00:40 -0800663 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
664 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700665 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhb000de62020-12-03 22:00:40 -0800666 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700667 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800668 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700669 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800670 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700671 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800672 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700673 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800674 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700675 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800676 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
677 }
678
679 const std::vector<LogFile> parts = SortParts({logfile0_});
680
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700681 MessageSorter message_sorter(parts[0].parts[0]);
Austin Schuhb000de62020-12-03 22:00:40 -0800682
683 // Confirm we aren't sorted until any time until the message is popped.
684 // Peeking shouldn't change the sorted until time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700685 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
Austin Schuhb000de62020-12-03 22:00:40 -0800686
687 std::deque<Message> output;
688
689 for (monotonic_clock::time_point t :
690 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
691 e + chrono::milliseconds(1900), monotonic_clock::max_time,
692 monotonic_clock::max_time}) {
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700693 ASSERT_TRUE(message_sorter.Front() != nullptr);
694 output.emplace_back(std::move(*message_sorter.Front()));
695 message_sorter.PopFront();
696 EXPECT_EQ(message_sorter.sorted_until(), t);
Austin Schuhb000de62020-12-03 22:00:40 -0800697 }
698
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700699 ASSERT_TRUE(message_sorter.Front() == nullptr);
Austin Schuhb000de62020-12-03 22:00:40 -0800700
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700701 EXPECT_EQ(output[0].timestamp.boot, 0u);
702 EXPECT_EQ(output[0].timestamp.time, e - chrono::milliseconds(1000));
703 EXPECT_EQ(output[1].timestamp.boot, 0u);
704 EXPECT_EQ(output[1].timestamp.time, e - chrono::milliseconds(500));
705 EXPECT_EQ(output[2].timestamp.boot, 0u);
706 EXPECT_EQ(output[2].timestamp.time, e - chrono::milliseconds(10));
707 EXPECT_EQ(output[3].timestamp.boot, 0u);
708 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(1901));
709 EXPECT_EQ(output[4].timestamp.boot, 0u);
710 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuhb000de62020-12-03 22:00:40 -0800711}
712
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800713// Tests that messages too far out of order trigger death.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700714TEST_F(MessageSorterDeathTest, Pull) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800715 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
716 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700717 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800718 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700719 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800720 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700721 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800722 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700723 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800724 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
725 // The following message is too far out of order and will trigger the CHECK.
Austin Schuhd863e6e2022-10-16 15:44:50 -0700726 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800727 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
728 }
729
730 const std::vector<LogFile> parts = SortParts({logfile0_});
731
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700732 MessageSorter message_sorter(parts[0].parts[0]);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800733
734 // Confirm we aren't sorted until any time until the message is popped.
735 // Peeking shouldn't change the sorted until time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700736 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800737 std::deque<Message> output;
738
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700739 ASSERT_TRUE(message_sorter.Front() != nullptr);
740 message_sorter.PopFront();
741 ASSERT_TRUE(message_sorter.Front() != nullptr);
742 ASSERT_TRUE(message_sorter.Front() != nullptr);
743 message_sorter.PopFront();
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800744
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700745 EXPECT_DEATH({ message_sorter.Front(); },
Austin Schuh58646e22021-08-23 23:51:46 -0700746 "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800747}
748
Austin Schuh8f52ed52020-11-30 23:12:39 -0800749// Tests that we can merge data from 2 separate files, including duplicate data.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700750TEST_F(PartsMergerTest, TwoFileMerger) {
Austin Schuh8f52ed52020-11-30 23:12:39 -0800751 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
752 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700753 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800754 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700755 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800756 writer1.QueueSpan(config1_.span());
757
Austin Schuhd863e6e2022-10-16 15:44:50 -0700758 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800759 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700760 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800761 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
762
Austin Schuhd863e6e2022-10-16 15:44:50 -0700763 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800764 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700765 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800766 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
767
768 // Make a duplicate!
769 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
770 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
771 writer0.QueueSpan(msg.span());
772 writer1.QueueSpan(msg.span());
773
Austin Schuhd863e6e2022-10-16 15:44:50 -0700774 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800775 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
776 }
777
778 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -0700779 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -0800780 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800781
Alexei Strots1f51ac72023-05-15 10:14:54 -0700782 PartsMerger merger("pi1", 0, log_files);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800783
784 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
785
786 std::deque<Message> output;
787
788 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
789 ASSERT_TRUE(merger.Front() != nullptr);
790 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
791
792 output.emplace_back(std::move(*merger.Front()));
793 merger.PopFront();
794 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
795
796 ASSERT_TRUE(merger.Front() != nullptr);
797 output.emplace_back(std::move(*merger.Front()));
798 merger.PopFront();
799 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
800
801 ASSERT_TRUE(merger.Front() != nullptr);
802 output.emplace_back(std::move(*merger.Front()));
803 merger.PopFront();
804 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
805
806 ASSERT_TRUE(merger.Front() != nullptr);
807 output.emplace_back(std::move(*merger.Front()));
808 merger.PopFront();
809 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
810
811 ASSERT_TRUE(merger.Front() != nullptr);
812 output.emplace_back(std::move(*merger.Front()));
813 merger.PopFront();
814 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
815
816 ASSERT_TRUE(merger.Front() != nullptr);
817 output.emplace_back(std::move(*merger.Front()));
818 merger.PopFront();
819 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
820
821 ASSERT_TRUE(merger.Front() == nullptr);
822
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700823 EXPECT_EQ(output[0].timestamp.boot, 0u);
824 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
825 EXPECT_EQ(output[1].timestamp.boot, 0u);
826 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1001));
827 EXPECT_EQ(output[2].timestamp.boot, 0u);
828 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1002));
829 EXPECT_EQ(output[3].timestamp.boot, 0u);
830 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
831 EXPECT_EQ(output[4].timestamp.boot, 0u);
832 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(3000));
833 EXPECT_EQ(output[5].timestamp.boot, 0u);
834 EXPECT_EQ(output[5].timestamp.time, e + chrono::milliseconds(3002));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800835}
836
Austin Schuh8bf1e632021-01-02 22:41:04 -0800837// Tests that we can merge timestamps with various combinations of
838// monotonic_timestamp_time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700839TEST_F(PartsMergerTest, TwoFileTimestampMerger) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800840 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
841 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700842 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800843 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700844 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800845 writer1.QueueSpan(config1_.span());
846
847 // Neither has it.
848 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700849 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800850 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700851 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800852 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
853
854 // First only has it.
855 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700856 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800857 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
858 e + chrono::nanoseconds(971)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700859 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800860 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
861
862 // Second only has it.
863 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700864 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800865 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700866 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800867 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
868 e + chrono::nanoseconds(972)));
869
870 // Both have it.
871 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700872 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800873 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
874 e + chrono::nanoseconds(973)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700875 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800876 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
877 e + chrono::nanoseconds(973)));
878 }
879
880 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -0700881 LogFilesContainer log_files(parts);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800882 ASSERT_EQ(parts.size(), 1u);
883
Alexei Strots1f51ac72023-05-15 10:14:54 -0700884 PartsMerger merger("pi1", 0, log_files);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800885
886 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
887
888 std::deque<Message> output;
889
890 for (int i = 0; i < 4; ++i) {
891 ASSERT_TRUE(merger.Front() != nullptr);
892 output.emplace_back(std::move(*merger.Front()));
893 merger.PopFront();
894 }
895 ASSERT_TRUE(merger.Front() == nullptr);
896
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700897 EXPECT_EQ(output[0].timestamp.boot, 0u);
898 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700899 EXPECT_FALSE(output[0].data->has_monotonic_timestamp_time);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700900
901 EXPECT_EQ(output[1].timestamp.boot, 0u);
902 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700903 EXPECT_TRUE(output[1].data->has_monotonic_timestamp_time);
904 EXPECT_EQ(output[1].data->monotonic_timestamp_time,
905 monotonic_clock::time_point(std::chrono::nanoseconds(971)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700906
907 EXPECT_EQ(output[2].timestamp.boot, 0u);
908 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700909 EXPECT_TRUE(output[2].data->has_monotonic_timestamp_time);
910 EXPECT_EQ(output[2].data->monotonic_timestamp_time,
911 monotonic_clock::time_point(std::chrono::nanoseconds(972)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700912
913 EXPECT_EQ(output[3].timestamp.boot, 0u);
914 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700915 EXPECT_TRUE(output[3].data->has_monotonic_timestamp_time);
916 EXPECT_EQ(output[3].data->monotonic_timestamp_time,
917 monotonic_clock::time_point(std::chrono::nanoseconds(973)));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800918}
919
Austin Schuhd2f96102020-12-01 20:27:29 -0800920// Tests that we can match timestamps on delivered messages.
921TEST_F(TimestampMapperTest, ReadNode0First) {
922 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
923 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700924 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800925 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700926 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800927 writer1.QueueSpan(config2_.span());
928
Austin Schuhd863e6e2022-10-16 15:44:50 -0700929 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800930 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700931 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800932 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
933
Austin Schuhd863e6e2022-10-16 15:44:50 -0700934 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800935 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700936 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800937 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
938
Austin Schuhd863e6e2022-10-16 15:44:50 -0700939 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800940 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700941 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800942 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
943 }
944
945 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -0700946 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -0800947 ASSERT_EQ(parts[0].logger_node, "pi1");
948 ASSERT_EQ(parts[1].logger_node, "pi2");
949
Austin Schuh79b30942021-01-24 22:32:21 -0800950 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -0700951
952 TimestampMapper mapper0("pi1", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -0800953 mapper0.set_timestamp_callback(
954 [&](TimestampedMessage *) { ++mapper0_count; });
955 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -0700956 TimestampMapper mapper1("pi2", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -0800957 mapper1.set_timestamp_callback(
958 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -0800959
960 mapper0.AddPeer(&mapper1);
961 mapper1.AddPeer(&mapper0);
962
963 {
964 std::deque<TimestampedMessage> output0;
965
Austin Schuh79b30942021-01-24 22:32:21 -0800966 EXPECT_EQ(mapper0_count, 0u);
967 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800968 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800969 EXPECT_EQ(mapper0_count, 1u);
970 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800971 output0.emplace_back(std::move(*mapper0.Front()));
972 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700973 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800974 EXPECT_EQ(mapper0_count, 1u);
975 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800976
977 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800978 EXPECT_EQ(mapper0_count, 2u);
979 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800980 output0.emplace_back(std::move(*mapper0.Front()));
981 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700982 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800983
984 ASSERT_TRUE(mapper0.Front() != nullptr);
985 output0.emplace_back(std::move(*mapper0.Front()));
986 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700987 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800988
Austin Schuh79b30942021-01-24 22:32:21 -0800989 EXPECT_EQ(mapper0_count, 3u);
990 EXPECT_EQ(mapper1_count, 0u);
991
Austin Schuhd2f96102020-12-01 20:27:29 -0800992 ASSERT_TRUE(mapper0.Front() == nullptr);
993
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700994 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
995 EXPECT_EQ(output0[0].monotonic_event_time.time,
996 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700997 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700998
999 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1000 EXPECT_EQ(output0[1].monotonic_event_time.time,
1001 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001002 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001003
1004 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1005 EXPECT_EQ(output0[2].monotonic_event_time.time,
1006 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001007 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001008 }
1009
1010 {
1011 SCOPED_TRACE("Trying node1 now");
1012 std::deque<TimestampedMessage> output1;
1013
Austin Schuh79b30942021-01-24 22:32:21 -08001014 EXPECT_EQ(mapper0_count, 3u);
1015 EXPECT_EQ(mapper1_count, 0u);
1016
Austin Schuhd2f96102020-12-01 20:27:29 -08001017 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001018 EXPECT_EQ(mapper0_count, 3u);
1019 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001020 output1.emplace_back(std::move(*mapper1.Front()));
1021 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001022 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -08001023 EXPECT_EQ(mapper0_count, 3u);
1024 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001025
1026 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001027 EXPECT_EQ(mapper0_count, 3u);
1028 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001029 output1.emplace_back(std::move(*mapper1.Front()));
1030 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001031 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001032
1033 ASSERT_TRUE(mapper1.Front() != nullptr);
1034 output1.emplace_back(std::move(*mapper1.Front()));
1035 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001036 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001037
Austin Schuh79b30942021-01-24 22:32:21 -08001038 EXPECT_EQ(mapper0_count, 3u);
1039 EXPECT_EQ(mapper1_count, 3u);
1040
Austin Schuhd2f96102020-12-01 20:27:29 -08001041 ASSERT_TRUE(mapper1.Front() == nullptr);
1042
Austin Schuh79b30942021-01-24 22:32:21 -08001043 EXPECT_EQ(mapper0_count, 3u);
1044 EXPECT_EQ(mapper1_count, 3u);
1045
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001046 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1047 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001048 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001049 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001050
1051 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1052 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001053 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001054 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001055
1056 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1057 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001058 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001059 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001060 }
1061}
1062
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001063// Tests that we filter messages using the channel filter callback
1064TEST_F(TimestampMapperTest, ReplayChannelsCallbackTest) {
1065 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1066 {
1067 TestDetachedBufferWriter writer0(logfile0_);
1068 writer0.QueueSpan(config0_.span());
1069 TestDetachedBufferWriter writer1(logfile1_);
1070 writer1.QueueSpan(config2_.span());
1071
1072 writer0.WriteSizedFlatbuffer(
1073 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1074 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1075 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1076
1077 writer0.WriteSizedFlatbuffer(
1078 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1079 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1080 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1081
1082 writer0.WriteSizedFlatbuffer(
1083 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1084 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1085 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1086 }
1087
1088 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001089 LogFilesContainer log_files(parts);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001090 ASSERT_EQ(parts[0].logger_node, "pi1");
1091 ASSERT_EQ(parts[1].logger_node, "pi2");
1092
1093 // mapper0 will not provide any messages while mapper1 will provide all
1094 // messages due to the channel filter callbacks used
1095 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001096
1097 TimestampMapper mapper0("pi1", log_files);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001098 mapper0.set_timestamp_callback(
1099 [&](TimestampedMessage *) { ++mapper0_count; });
1100 mapper0.set_replay_channels_callback(
1101 [&](const TimestampedMessage &) -> bool { return mapper0_count != 2; });
1102 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001103 TimestampMapper mapper1("pi2", log_files);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001104 mapper1.set_timestamp_callback(
1105 [&](TimestampedMessage *) { ++mapper1_count; });
1106 mapper1.set_replay_channels_callback(
1107 [&](const TimestampedMessage &) -> bool { return mapper1_count != 2; });
1108
1109 mapper0.AddPeer(&mapper1);
1110 mapper1.AddPeer(&mapper0);
1111
1112 {
1113 std::deque<TimestampedMessage> output0;
1114
1115 EXPECT_EQ(mapper0_count, 0u);
1116 EXPECT_EQ(mapper1_count, 0u);
1117
1118 ASSERT_TRUE(mapper0.Front() != nullptr);
1119 EXPECT_EQ(mapper0_count, 1u);
1120 EXPECT_EQ(mapper1_count, 0u);
1121 output0.emplace_back(std::move(*mapper0.Front()));
1122 mapper0.PopFront();
1123
1124 EXPECT_TRUE(mapper0.started());
1125 EXPECT_EQ(mapper0_count, 1u);
1126 EXPECT_EQ(mapper1_count, 0u);
1127
1128 // mapper0_count is now at 3 since the second message is not queued, but
1129 // timestamp_callback needs to be called everytime even if Front() does not
1130 // provide a message due to the replay_channels_callback.
1131 ASSERT_TRUE(mapper0.Front() != nullptr);
1132 EXPECT_EQ(mapper0_count, 3u);
1133 EXPECT_EQ(mapper1_count, 0u);
1134 output0.emplace_back(std::move(*mapper0.Front()));
1135 mapper0.PopFront();
1136
1137 EXPECT_TRUE(mapper0.started());
1138 EXPECT_EQ(mapper0_count, 3u);
1139 EXPECT_EQ(mapper1_count, 0u);
1140
1141 ASSERT_TRUE(mapper0.Front() == nullptr);
1142 EXPECT_TRUE(mapper0.started());
1143
1144 EXPECT_EQ(mapper0_count, 3u);
1145 EXPECT_EQ(mapper1_count, 0u);
1146
1147 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1148 EXPECT_EQ(output0[0].monotonic_event_time.time,
1149 e + chrono::milliseconds(1000));
1150 EXPECT_TRUE(output0[0].data != nullptr);
1151
1152 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1153 EXPECT_EQ(output0[1].monotonic_event_time.time,
1154 e + chrono::milliseconds(3000));
1155 EXPECT_TRUE(output0[1].data != nullptr);
1156 }
1157
1158 {
1159 SCOPED_TRACE("Trying node1 now");
1160 std::deque<TimestampedMessage> output1;
1161
1162 EXPECT_EQ(mapper0_count, 3u);
1163 EXPECT_EQ(mapper1_count, 0u);
1164
1165 ASSERT_TRUE(mapper1.Front() != nullptr);
1166 EXPECT_EQ(mapper0_count, 3u);
1167 EXPECT_EQ(mapper1_count, 1u);
1168 output1.emplace_back(std::move(*mapper1.Front()));
1169 mapper1.PopFront();
1170 EXPECT_TRUE(mapper1.started());
1171 EXPECT_EQ(mapper0_count, 3u);
1172 EXPECT_EQ(mapper1_count, 1u);
1173
1174 // mapper1_count is now at 3 since the second message is not queued, but
1175 // timestamp_callback needs to be called everytime even if Front() does not
1176 // provide a message due to the replay_channels_callback.
1177 ASSERT_TRUE(mapper1.Front() != nullptr);
1178 output1.emplace_back(std::move(*mapper1.Front()));
1179 mapper1.PopFront();
1180 EXPECT_TRUE(mapper1.started());
1181
1182 EXPECT_EQ(mapper0_count, 3u);
1183 EXPECT_EQ(mapper1_count, 3u);
1184
1185 ASSERT_TRUE(mapper1.Front() == nullptr);
1186
1187 EXPECT_EQ(mapper0_count, 3u);
1188 EXPECT_EQ(mapper1_count, 3u);
1189
1190 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1191 EXPECT_EQ(output1[0].monotonic_event_time.time,
1192 e + chrono::seconds(100) + chrono::milliseconds(1000));
1193 EXPECT_TRUE(output1[0].data != nullptr);
1194
1195 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1196 EXPECT_EQ(output1[1].monotonic_event_time.time,
1197 e + chrono::seconds(100) + chrono::milliseconds(3000));
1198 EXPECT_TRUE(output1[1].data != nullptr);
1199 }
1200}
Austin Schuh8bf1e632021-01-02 22:41:04 -08001201// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
1202// returned.
1203TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
1204 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1205 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001206 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001207 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001208 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001209 writer1.QueueSpan(config4_.span());
1210
Austin Schuhd863e6e2022-10-16 15:44:50 -07001211 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001212 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001213 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001214 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
1215 e + chrono::nanoseconds(971)));
1216
Austin Schuhd863e6e2022-10-16 15:44:50 -07001217 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001218 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001219 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001220 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
1221 e + chrono::nanoseconds(5458)));
1222
Austin Schuhd863e6e2022-10-16 15:44:50 -07001223 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001224 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001225 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001226 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1227 }
1228
1229 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001230 LogFilesContainer log_files(parts);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001231 ASSERT_EQ(parts.size(), 1u);
1232
Austin Schuh79b30942021-01-24 22:32:21 -08001233 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001234 TimestampMapper mapper0("pi1", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001235 mapper0.set_timestamp_callback(
1236 [&](TimestampedMessage *) { ++mapper0_count; });
1237 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001238 TimestampMapper mapper1("pi2", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001239 mapper1.set_timestamp_callback(
1240 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -08001241
1242 mapper0.AddPeer(&mapper1);
1243 mapper1.AddPeer(&mapper0);
1244
1245 {
1246 std::deque<TimestampedMessage> output0;
1247
1248 for (int i = 0; i < 3; ++i) {
1249 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
1250 output0.emplace_back(std::move(*mapper0.Front()));
1251 mapper0.PopFront();
1252 }
1253
1254 ASSERT_TRUE(mapper0.Front() == nullptr);
1255
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001256 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1257 EXPECT_EQ(output0[0].monotonic_event_time.time,
1258 e + chrono::milliseconds(1000));
1259 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
1260 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
1261 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001262 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001263
1264 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1265 EXPECT_EQ(output0[1].monotonic_event_time.time,
1266 e + chrono::milliseconds(2000));
1267 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
1268 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
1269 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001270 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001271
1272 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1273 EXPECT_EQ(output0[2].monotonic_event_time.time,
1274 e + chrono::milliseconds(3000));
1275 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
1276 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
1277 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001278 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001279 }
1280
1281 {
1282 SCOPED_TRACE("Trying node1 now");
1283 std::deque<TimestampedMessage> output1;
1284
1285 for (int i = 0; i < 3; ++i) {
1286 ASSERT_TRUE(mapper1.Front() != nullptr);
1287 output1.emplace_back(std::move(*mapper1.Front()));
1288 mapper1.PopFront();
1289 }
1290
1291 ASSERT_TRUE(mapper1.Front() == nullptr);
1292
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001293 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1294 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001295 e + chrono::seconds(100) + chrono::milliseconds(1000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001296 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
1297 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001298 e + chrono::nanoseconds(971));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001299 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001300
1301 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1302 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001303 e + chrono::seconds(100) + chrono::milliseconds(2000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001304 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
1305 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001306 e + chrono::nanoseconds(5458));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001307 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001308
1309 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1310 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001311 e + chrono::seconds(100) + chrono::milliseconds(3000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001312 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
1313 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
1314 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001315 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001316 }
Austin Schuh79b30942021-01-24 22:32:21 -08001317
1318 EXPECT_EQ(mapper0_count, 3u);
1319 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001320}
1321
Austin Schuhd2f96102020-12-01 20:27:29 -08001322// Tests that we can match timestamps on delivered messages. By doing this in
1323// the reverse order, the second node needs to queue data up from the first node
1324// to find the matching timestamp.
1325TEST_F(TimestampMapperTest, ReadNode1First) {
1326 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1327 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001328 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001329 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001330 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001331 writer1.QueueSpan(config2_.span());
1332
Austin Schuhd863e6e2022-10-16 15:44:50 -07001333 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001334 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001335 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001336 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1337
Austin Schuhd863e6e2022-10-16 15:44:50 -07001338 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001339 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001340 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001341 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1342
Austin Schuhd863e6e2022-10-16 15:44:50 -07001343 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001344 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001345 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001346 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1347 }
1348
1349 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001350 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001351
1352 ASSERT_EQ(parts[0].logger_node, "pi1");
1353 ASSERT_EQ(parts[1].logger_node, "pi2");
1354
Austin Schuh79b30942021-01-24 22:32:21 -08001355 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001356 TimestampMapper mapper0("pi1", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001357 mapper0.set_timestamp_callback(
1358 [&](TimestampedMessage *) { ++mapper0_count; });
1359 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001360 TimestampMapper mapper1("pi2", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001361 mapper1.set_timestamp_callback(
1362 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001363
1364 mapper0.AddPeer(&mapper1);
1365 mapper1.AddPeer(&mapper0);
1366
1367 {
1368 SCOPED_TRACE("Trying node1 now");
1369 std::deque<TimestampedMessage> output1;
1370
1371 ASSERT_TRUE(mapper1.Front() != nullptr);
1372 output1.emplace_back(std::move(*mapper1.Front()));
1373 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001374 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001375
1376 ASSERT_TRUE(mapper1.Front() != nullptr);
1377 output1.emplace_back(std::move(*mapper1.Front()));
1378 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001379 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001380
1381 ASSERT_TRUE(mapper1.Front() != nullptr);
1382 output1.emplace_back(std::move(*mapper1.Front()));
1383 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001384 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001385
1386 ASSERT_TRUE(mapper1.Front() == nullptr);
1387
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001388 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1389 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001390 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001391 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001392
1393 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1394 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001395 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001396 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001397
1398 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1399 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001400 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001401 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001402 }
1403
1404 {
1405 std::deque<TimestampedMessage> output0;
1406
1407 ASSERT_TRUE(mapper0.Front() != nullptr);
1408 output0.emplace_back(std::move(*mapper0.Front()));
1409 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001410 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001411
1412 ASSERT_TRUE(mapper0.Front() != nullptr);
1413 output0.emplace_back(std::move(*mapper0.Front()));
1414 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001415 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001416
1417 ASSERT_TRUE(mapper0.Front() != nullptr);
1418 output0.emplace_back(std::move(*mapper0.Front()));
1419 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001420 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001421
1422 ASSERT_TRUE(mapper0.Front() == nullptr);
1423
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001424 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1425 EXPECT_EQ(output0[0].monotonic_event_time.time,
1426 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001427 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001428
1429 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1430 EXPECT_EQ(output0[1].monotonic_event_time.time,
1431 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001432 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001433
1434 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1435 EXPECT_EQ(output0[2].monotonic_event_time.time,
1436 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001437 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001438 }
Austin Schuh79b30942021-01-24 22:32:21 -08001439
1440 EXPECT_EQ(mapper0_count, 3u);
1441 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001442}
1443
1444// Tests that we return just the timestamps if we couldn't find the data and the
1445// missing data was at the beginning of the file.
1446TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1447 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1448 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001449 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001450 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001451 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001452 writer1.QueueSpan(config2_.span());
1453
1454 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001455 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001456 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1457
Austin Schuhd863e6e2022-10-16 15:44:50 -07001458 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001459 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001460 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001461 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1462
Austin Schuhd863e6e2022-10-16 15:44:50 -07001463 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001464 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001465 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001466 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1467 }
1468
1469 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001470 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001471
1472 ASSERT_EQ(parts[0].logger_node, "pi1");
1473 ASSERT_EQ(parts[1].logger_node, "pi2");
1474
Austin Schuh79b30942021-01-24 22:32:21 -08001475 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001476 TimestampMapper mapper0("pi1", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001477 mapper0.set_timestamp_callback(
1478 [&](TimestampedMessage *) { ++mapper0_count; });
1479 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001480 TimestampMapper mapper1("pi2", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001481 mapper1.set_timestamp_callback(
1482 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001483
1484 mapper0.AddPeer(&mapper1);
1485 mapper1.AddPeer(&mapper0);
1486
1487 {
1488 SCOPED_TRACE("Trying node1 now");
1489 std::deque<TimestampedMessage> output1;
1490
1491 ASSERT_TRUE(mapper1.Front() != nullptr);
1492 output1.emplace_back(std::move(*mapper1.Front()));
1493 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001494 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001495
1496 ASSERT_TRUE(mapper1.Front() != nullptr);
1497 output1.emplace_back(std::move(*mapper1.Front()));
1498 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001499 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001500
1501 ASSERT_TRUE(mapper1.Front() != nullptr);
1502 output1.emplace_back(std::move(*mapper1.Front()));
1503 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001504 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001505
1506 ASSERT_TRUE(mapper1.Front() == nullptr);
1507
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001508 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1509 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001510 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001511 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001512
1513 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1514 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001515 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001516 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001517
1518 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1519 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001520 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001521 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001522 }
Austin Schuh79b30942021-01-24 22:32:21 -08001523
1524 EXPECT_EQ(mapper0_count, 0u);
1525 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001526}
1527
1528// Tests that we return just the timestamps if we couldn't find the data and the
1529// missing data was at the end of the file.
1530TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1531 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1532 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001533 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001534 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001535 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001536 writer1.QueueSpan(config2_.span());
1537
Austin Schuhd863e6e2022-10-16 15:44:50 -07001538 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001539 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001540 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001541 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1542
Austin Schuhd863e6e2022-10-16 15:44:50 -07001543 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001544 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001545 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001546 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1547
1548 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001549 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001550 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1551 }
1552
1553 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001554 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001555
1556 ASSERT_EQ(parts[0].logger_node, "pi1");
1557 ASSERT_EQ(parts[1].logger_node, "pi2");
1558
Austin Schuh79b30942021-01-24 22:32:21 -08001559 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001560 TimestampMapper mapper0("pi1", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001561 mapper0.set_timestamp_callback(
1562 [&](TimestampedMessage *) { ++mapper0_count; });
1563 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001564 TimestampMapper mapper1("pi2", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001565 mapper1.set_timestamp_callback(
1566 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001567
1568 mapper0.AddPeer(&mapper1);
1569 mapper1.AddPeer(&mapper0);
1570
1571 {
1572 SCOPED_TRACE("Trying node1 now");
1573 std::deque<TimestampedMessage> output1;
1574
1575 ASSERT_TRUE(mapper1.Front() != nullptr);
1576 output1.emplace_back(std::move(*mapper1.Front()));
1577 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001578 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001579
1580 ASSERT_TRUE(mapper1.Front() != nullptr);
1581 output1.emplace_back(std::move(*mapper1.Front()));
1582 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001583 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001584
1585 ASSERT_TRUE(mapper1.Front() != nullptr);
1586 output1.emplace_back(std::move(*mapper1.Front()));
1587 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001588 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001589
1590 ASSERT_TRUE(mapper1.Front() == nullptr);
1591
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001592 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1593 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001594 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001595 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001596
1597 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1598 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001599 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001600 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001601
1602 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1603 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001604 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001605 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001606 }
Austin Schuh79b30942021-01-24 22:32:21 -08001607
1608 EXPECT_EQ(mapper0_count, 0u);
1609 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001610}
1611
Austin Schuh993ccb52020-12-12 15:59:32 -08001612// Tests that we handle a message which failed to forward or be logged.
1613TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1614 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1615 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001616 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001617 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001618 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001619 writer1.QueueSpan(config2_.span());
1620
Austin Schuhd863e6e2022-10-16 15:44:50 -07001621 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001622 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001623 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001624 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1625
1626 // Create both the timestamp and message, but don't log them, simulating a
1627 // forwarding drop.
1628 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1629 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1630 chrono::seconds(100));
1631
Austin Schuhd863e6e2022-10-16 15:44:50 -07001632 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001633 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001634 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001635 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1636 }
1637
1638 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001639 LogFilesContainer log_files(parts);
Austin Schuh993ccb52020-12-12 15:59:32 -08001640
1641 ASSERT_EQ(parts[0].logger_node, "pi1");
1642 ASSERT_EQ(parts[1].logger_node, "pi2");
1643
Austin Schuh79b30942021-01-24 22:32:21 -08001644 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001645 TimestampMapper mapper0("pi1", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001646 mapper0.set_timestamp_callback(
1647 [&](TimestampedMessage *) { ++mapper0_count; });
1648 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001649 TimestampMapper mapper1("pi2", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001650 mapper1.set_timestamp_callback(
1651 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001652
1653 mapper0.AddPeer(&mapper1);
1654 mapper1.AddPeer(&mapper0);
1655
1656 {
1657 std::deque<TimestampedMessage> output1;
1658
1659 ASSERT_TRUE(mapper1.Front() != nullptr);
1660 output1.emplace_back(std::move(*mapper1.Front()));
1661 mapper1.PopFront();
1662
1663 ASSERT_TRUE(mapper1.Front() != nullptr);
1664 output1.emplace_back(std::move(*mapper1.Front()));
1665
1666 ASSERT_FALSE(mapper1.Front() == nullptr);
1667
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001668 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1669 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001670 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001671 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001672
1673 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1674 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001675 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001676 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh993ccb52020-12-12 15:59:32 -08001677 }
Austin Schuh79b30942021-01-24 22:32:21 -08001678
1679 EXPECT_EQ(mapper0_count, 0u);
1680 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001681}
1682
Austin Schuhd2f96102020-12-01 20:27:29 -08001683// Tests that we properly sort log files with duplicate timestamps.
1684TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1685 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1686 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001687 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001688 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001689 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001690 writer1.QueueSpan(config2_.span());
1691
Austin Schuhd863e6e2022-10-16 15:44:50 -07001692 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001693 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001694 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001695 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1696
Austin Schuhd863e6e2022-10-16 15:44:50 -07001697 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001698 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001699 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001700 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1701
Austin Schuhd863e6e2022-10-16 15:44:50 -07001702 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001703 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001704 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001705 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1706
Austin Schuhd863e6e2022-10-16 15:44:50 -07001707 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001708 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001709 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001710 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1711 }
1712
1713 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001714 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001715
1716 ASSERT_EQ(parts[0].logger_node, "pi1");
1717 ASSERT_EQ(parts[1].logger_node, "pi2");
1718
Austin Schuh79b30942021-01-24 22:32:21 -08001719 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001720 TimestampMapper mapper0("pi1", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001721 mapper0.set_timestamp_callback(
1722 [&](TimestampedMessage *) { ++mapper0_count; });
1723 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001724 TimestampMapper mapper1("pi2", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001725 mapper1.set_timestamp_callback(
1726 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001727
1728 mapper0.AddPeer(&mapper1);
1729 mapper1.AddPeer(&mapper0);
1730
1731 {
1732 SCOPED_TRACE("Trying node1 now");
1733 std::deque<TimestampedMessage> output1;
1734
1735 for (int i = 0; i < 4; ++i) {
1736 ASSERT_TRUE(mapper1.Front() != nullptr);
1737 output1.emplace_back(std::move(*mapper1.Front()));
1738 mapper1.PopFront();
1739 }
1740 ASSERT_TRUE(mapper1.Front() == nullptr);
1741
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001742 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1743 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001744 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001745 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001746
1747 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1748 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001749 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001750 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001751
1752 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1753 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001754 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001755 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001756
1757 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1758 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001759 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001760 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001761 }
Austin Schuh79b30942021-01-24 22:32:21 -08001762
1763 EXPECT_EQ(mapper0_count, 0u);
1764 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001765}
1766
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001767// Tests that we properly produce a valid start time.
Austin Schuhd2f96102020-12-01 20:27:29 -08001768TEST_F(TimestampMapperTest, StartTime) {
1769 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1770 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001771 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001772 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001773 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001774 writer1.QueueSpan(config1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001775 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001776 writer2.QueueSpan(config3_.span());
1777 }
1778
1779 const std::vector<LogFile> parts =
1780 SortParts({logfile0_, logfile1_, logfile2_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001781 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001782
Austin Schuh79b30942021-01-24 22:32:21 -08001783 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001784 TimestampMapper mapper0("pi1", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001785 mapper0.set_timestamp_callback(
1786 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001787
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001788 EXPECT_EQ(mapper0.monotonic_start_time(0), e + chrono::milliseconds(1));
1789 EXPECT_EQ(mapper0.realtime_start_time(0),
Austin Schuhd2f96102020-12-01 20:27:29 -08001790 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001791 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001792}
1793
Austin Schuhfecf1d82020-12-19 16:57:28 -08001794// Tests that when a peer isn't registered, we treat that as if there was no
1795// data available.
1796TEST_F(TimestampMapperTest, NoPeer) {
1797 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1798 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001799 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001800 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001801 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001802 writer1.QueueSpan(config2_.span());
1803
1804 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001805 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001806 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1807
Austin Schuhd863e6e2022-10-16 15:44:50 -07001808 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001809 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001810 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001811 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1812
Austin Schuhd863e6e2022-10-16 15:44:50 -07001813 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001814 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001815 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001816 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1817 }
1818
1819 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001820 LogFilesContainer log_files(parts);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001821
1822 ASSERT_EQ(parts[0].logger_node, "pi1");
1823 ASSERT_EQ(parts[1].logger_node, "pi2");
1824
Austin Schuh79b30942021-01-24 22:32:21 -08001825 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001826 TimestampMapper mapper1("pi2", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001827 mapper1.set_timestamp_callback(
1828 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001829
1830 {
1831 std::deque<TimestampedMessage> output1;
1832
1833 ASSERT_TRUE(mapper1.Front() != nullptr);
1834 output1.emplace_back(std::move(*mapper1.Front()));
1835 mapper1.PopFront();
1836 ASSERT_TRUE(mapper1.Front() != nullptr);
1837 output1.emplace_back(std::move(*mapper1.Front()));
1838 mapper1.PopFront();
1839 ASSERT_TRUE(mapper1.Front() != nullptr);
1840 output1.emplace_back(std::move(*mapper1.Front()));
1841 mapper1.PopFront();
1842 ASSERT_TRUE(mapper1.Front() == nullptr);
1843
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001844 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1845 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001846 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001847 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001848
1849 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1850 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001851 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001852 EXPECT_FALSE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001853
1854 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1855 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001856 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001857 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001858 }
Austin Schuh79b30942021-01-24 22:32:21 -08001859 EXPECT_EQ(mapper1_count, 3u);
1860}
1861
1862// Tests that we can queue messages and call the timestamp callback for both
1863// nodes.
1864TEST_F(TimestampMapperTest, QueueUntilNode0) {
1865 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1866 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001867 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh79b30942021-01-24 22:32:21 -08001868 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001869 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh79b30942021-01-24 22:32:21 -08001870 writer1.QueueSpan(config2_.span());
1871
Austin Schuhd863e6e2022-10-16 15:44:50 -07001872 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001873 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001874 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001875 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1876
Austin Schuhd863e6e2022-10-16 15:44:50 -07001877 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001878 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001879 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001880 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1881
Austin Schuhd863e6e2022-10-16 15:44:50 -07001882 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001883 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001884 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001885 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1886
Austin Schuhd863e6e2022-10-16 15:44:50 -07001887 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001888 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001889 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001890 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1891 }
1892
1893 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001894 LogFilesContainer log_files(parts);
Austin Schuh79b30942021-01-24 22:32:21 -08001895
1896 ASSERT_EQ(parts[0].logger_node, "pi1");
1897 ASSERT_EQ(parts[1].logger_node, "pi2");
1898
1899 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001900 TimestampMapper mapper0("pi1", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001901 mapper0.set_timestamp_callback(
1902 [&](TimestampedMessage *) { ++mapper0_count; });
1903 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001904 TimestampMapper mapper1("pi2", log_files);
Austin Schuh79b30942021-01-24 22:32:21 -08001905 mapper1.set_timestamp_callback(
1906 [&](TimestampedMessage *) { ++mapper1_count; });
1907
1908 mapper0.AddPeer(&mapper1);
1909 mapper1.AddPeer(&mapper0);
1910
1911 {
1912 std::deque<TimestampedMessage> output0;
1913
1914 EXPECT_EQ(mapper0_count, 0u);
1915 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001916 mapper0.QueueUntil(
1917 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001918 EXPECT_EQ(mapper0_count, 3u);
1919 EXPECT_EQ(mapper1_count, 0u);
1920
1921 ASSERT_TRUE(mapper0.Front() != nullptr);
1922 EXPECT_EQ(mapper0_count, 3u);
1923 EXPECT_EQ(mapper1_count, 0u);
1924
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001925 mapper0.QueueUntil(
1926 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001927 EXPECT_EQ(mapper0_count, 3u);
1928 EXPECT_EQ(mapper1_count, 0u);
1929
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001930 mapper0.QueueUntil(
1931 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001932 EXPECT_EQ(mapper0_count, 4u);
1933 EXPECT_EQ(mapper1_count, 0u);
1934
1935 output0.emplace_back(std::move(*mapper0.Front()));
1936 mapper0.PopFront();
1937 output0.emplace_back(std::move(*mapper0.Front()));
1938 mapper0.PopFront();
1939 output0.emplace_back(std::move(*mapper0.Front()));
1940 mapper0.PopFront();
1941 output0.emplace_back(std::move(*mapper0.Front()));
1942 mapper0.PopFront();
1943
1944 EXPECT_EQ(mapper0_count, 4u);
1945 EXPECT_EQ(mapper1_count, 0u);
1946
1947 ASSERT_TRUE(mapper0.Front() == nullptr);
1948
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001949 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1950 EXPECT_EQ(output0[0].monotonic_event_time.time,
1951 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001952 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001953
1954 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1955 EXPECT_EQ(output0[1].monotonic_event_time.time,
1956 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001957 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001958
1959 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1960 EXPECT_EQ(output0[2].monotonic_event_time.time,
1961 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001962 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001963
1964 EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
1965 EXPECT_EQ(output0[3].monotonic_event_time.time,
1966 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001967 EXPECT_TRUE(output0[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001968 }
1969
1970 {
1971 SCOPED_TRACE("Trying node1 now");
1972 std::deque<TimestampedMessage> output1;
1973
1974 EXPECT_EQ(mapper0_count, 4u);
1975 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001976 mapper1.QueueUntil(BootTimestamp{
1977 .boot = 0,
1978 .time = e + chrono::seconds(100) + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001979 EXPECT_EQ(mapper0_count, 4u);
1980 EXPECT_EQ(mapper1_count, 3u);
1981
1982 ASSERT_TRUE(mapper1.Front() != nullptr);
1983 EXPECT_EQ(mapper0_count, 4u);
1984 EXPECT_EQ(mapper1_count, 3u);
1985
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001986 mapper1.QueueUntil(BootTimestamp{
1987 .boot = 0,
1988 .time = e + chrono::seconds(100) + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001989 EXPECT_EQ(mapper0_count, 4u);
1990 EXPECT_EQ(mapper1_count, 3u);
1991
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001992 mapper1.QueueUntil(BootTimestamp{
1993 .boot = 0,
1994 .time = e + chrono::seconds(100) + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001995 EXPECT_EQ(mapper0_count, 4u);
1996 EXPECT_EQ(mapper1_count, 4u);
1997
1998 ASSERT_TRUE(mapper1.Front() != nullptr);
1999 EXPECT_EQ(mapper0_count, 4u);
2000 EXPECT_EQ(mapper1_count, 4u);
2001
2002 output1.emplace_back(std::move(*mapper1.Front()));
2003 mapper1.PopFront();
2004 ASSERT_TRUE(mapper1.Front() != nullptr);
2005 output1.emplace_back(std::move(*mapper1.Front()));
2006 mapper1.PopFront();
2007 ASSERT_TRUE(mapper1.Front() != nullptr);
2008 output1.emplace_back(std::move(*mapper1.Front()));
2009 mapper1.PopFront();
2010 ASSERT_TRUE(mapper1.Front() != nullptr);
2011 output1.emplace_back(std::move(*mapper1.Front()));
2012 mapper1.PopFront();
2013
2014 EXPECT_EQ(mapper0_count, 4u);
2015 EXPECT_EQ(mapper1_count, 4u);
2016
2017 ASSERT_TRUE(mapper1.Front() == nullptr);
2018
2019 EXPECT_EQ(mapper0_count, 4u);
2020 EXPECT_EQ(mapper1_count, 4u);
2021
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002022 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2023 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002024 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002025 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002026
2027 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
2028 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002029 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002030 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002031
2032 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
2033 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002034 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002035 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002036
2037 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
2038 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002039 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002040 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08002041 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08002042}
2043
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002044class BootMergerTest : public SortingElementTest {
2045 public:
2046 BootMergerTest()
2047 : SortingElementTest(),
2048 boot0_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002049 /* 100ms */
2050 "max_out_of_order_duration": 100000000,
2051 "node": {
2052 "name": "pi2"
2053 },
2054 "logger_node": {
2055 "name": "pi1"
2056 },
2057 "monotonic_start_time": 1000000,
2058 "realtime_start_time": 1000000000000,
2059 "logger_monotonic_start_time": 1000000,
2060 "logger_realtime_start_time": 1000000000000,
2061 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2062 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
2063 "parts_index": 0,
2064 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2065 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002066 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2067 "boot_uuids": [
2068 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2069 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2070 ""
2071 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002072})")),
2073 boot1_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002074 /* 100ms */
2075 "max_out_of_order_duration": 100000000,
2076 "node": {
2077 "name": "pi2"
2078 },
2079 "logger_node": {
2080 "name": "pi1"
2081 },
2082 "monotonic_start_time": 1000000,
2083 "realtime_start_time": 1000000000000,
2084 "logger_monotonic_start_time": 1000000,
2085 "logger_realtime_start_time": 1000000000000,
2086 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2087 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
2088 "parts_index": 1,
2089 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2090 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002091 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2092 "boot_uuids": [
2093 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2094 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2095 ""
2096 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002097})")) {}
Austin Schuh8bee6882021-06-28 21:03:28 -07002098
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002099 protected:
2100 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
2101 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
2102};
2103
2104// This tests that we can properly sort a multi-node log file which has the old
2105// (and buggy) timestamps in the header, and the non-resetting parts_index.
2106// These make it so we can just bairly figure out what happened first and what
2107// happened second, but not in a way that is robust to multiple nodes rebooting.
2108TEST_F(BootMergerTest, OldReboot) {
Austin Schuh8bee6882021-06-28 21:03:28 -07002109 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002110 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002111 writer.QueueSpan(boot0_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002112 }
2113 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002114 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002115 writer.QueueSpan(boot1_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002116 }
2117
2118 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
2119
2120 ASSERT_EQ(parts.size(), 1u);
2121 ASSERT_EQ(parts[0].parts.size(), 2u);
2122
2123 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
2124 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002125 boot0_.message().source_node_boot_uuid()->string_view());
Austin Schuh8bee6882021-06-28 21:03:28 -07002126
2127 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
2128 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002129 boot1_.message().source_node_boot_uuid()->string_view());
2130}
2131
2132// This tests that we can produce messages ordered across a reboot.
2133TEST_F(BootMergerTest, SortAcrossReboot) {
2134 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2135 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002136 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002137 writer.QueueSpan(boot0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002138 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002139 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002140 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002141 MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
2142 }
2143 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002144 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002145 writer.QueueSpan(boot1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002146 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002147 MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002148 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002149 MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
2150 }
2151
2152 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07002153 LogFilesContainer log_files(parts);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002154 ASSERT_EQ(parts.size(), 1u);
2155 ASSERT_EQ(parts[0].parts.size(), 2u);
2156
Alexei Strots1f51ac72023-05-15 10:14:54 -07002157 BootMerger merger("pi2", log_files);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002158
2159 EXPECT_EQ(merger.node(), 1u);
2160
2161 std::vector<Message> output;
2162 for (int i = 0; i < 4; ++i) {
2163 ASSERT_TRUE(merger.Front() != nullptr);
2164 output.emplace_back(std::move(*merger.Front()));
2165 merger.PopFront();
2166 }
2167
2168 ASSERT_TRUE(merger.Front() == nullptr);
2169
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002170 EXPECT_EQ(output[0].timestamp.boot, 0u);
2171 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
2172 EXPECT_EQ(output[1].timestamp.boot, 0u);
2173 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(2000));
2174
2175 EXPECT_EQ(output[2].timestamp.boot, 1u);
2176 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(100));
2177 EXPECT_EQ(output[3].timestamp.boot, 1u);
2178 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
Austin Schuh8bee6882021-06-28 21:03:28 -07002179}
2180
Austin Schuh48507722021-07-17 17:29:24 -07002181class RebootTimestampMapperTest : public SortingElementTest {
2182 public:
2183 RebootTimestampMapperTest()
2184 : SortingElementTest(),
2185 boot0a_(MakeHeader(config_, R"({
2186 /* 100ms */
2187 "max_out_of_order_duration": 100000000,
2188 "node": {
2189 "name": "pi1"
2190 },
2191 "logger_node": {
2192 "name": "pi1"
2193 },
2194 "monotonic_start_time": 1000000,
2195 "realtime_start_time": 1000000000000,
2196 "logger_monotonic_start_time": 1000000,
2197 "logger_realtime_start_time": 1000000000000,
2198 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2199 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2200 "parts_index": 0,
2201 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2202 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2203 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2204 "boot_uuids": [
2205 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2206 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2207 ""
2208 ]
2209})")),
2210 boot0b_(MakeHeader(config_, R"({
2211 /* 100ms */
2212 "max_out_of_order_duration": 100000000,
2213 "node": {
2214 "name": "pi1"
2215 },
2216 "logger_node": {
2217 "name": "pi1"
2218 },
2219 "monotonic_start_time": 1000000,
2220 "realtime_start_time": 1000000000000,
2221 "logger_monotonic_start_time": 1000000,
2222 "logger_realtime_start_time": 1000000000000,
2223 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2224 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2225 "parts_index": 1,
2226 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2227 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2228 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2229 "boot_uuids": [
2230 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2231 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2232 ""
2233 ]
2234})")),
2235 boot1a_(MakeHeader(config_, R"({
2236 /* 100ms */
2237 "max_out_of_order_duration": 100000000,
2238 "node": {
2239 "name": "pi2"
2240 },
2241 "logger_node": {
2242 "name": "pi1"
2243 },
2244 "monotonic_start_time": 1000000,
2245 "realtime_start_time": 1000000000000,
2246 "logger_monotonic_start_time": 1000000,
2247 "logger_realtime_start_time": 1000000000000,
2248 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2249 "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
2250 "parts_index": 0,
2251 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2252 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2253 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2254 "boot_uuids": [
2255 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2256 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2257 ""
2258 ]
2259})")),
2260 boot1b_(MakeHeader(config_, R"({
2261 /* 100ms */
2262 "max_out_of_order_duration": 100000000,
2263 "node": {
2264 "name": "pi2"
2265 },
2266 "logger_node": {
2267 "name": "pi1"
2268 },
2269 "monotonic_start_time": 1000000,
2270 "realtime_start_time": 1000000000000,
2271 "logger_monotonic_start_time": 1000000,
2272 "logger_realtime_start_time": 1000000000000,
2273 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2274 "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
2275 "parts_index": 1,
2276 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2277 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2278 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2279 "boot_uuids": [
2280 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2281 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2282 ""
2283 ]
2284})")) {}
2285
2286 protected:
2287 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
2288 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
2289 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
2290 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
2291};
2292
Austin Schuh48507722021-07-17 17:29:24 -07002293// Tests that we can match timestamps on delivered messages in the presence of
2294// reboots on the node receiving timestamps.
2295TEST_F(RebootTimestampMapperTest, ReadNode0First) {
2296 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2297 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002298 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002299 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002300 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002301 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002302 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002303 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002304 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002305 writer1b.QueueSpan(boot1b_.span());
2306
Austin Schuhd863e6e2022-10-16 15:44:50 -07002307 writer0a.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002308 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002309 writer1a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002310 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
2311 e + chrono::milliseconds(1001)));
2312
Austin Schuhd863e6e2022-10-16 15:44:50 -07002313 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh58646e22021-08-23 23:51:46 -07002314 e + chrono::milliseconds(1000), 0, chrono::seconds(21),
2315 e + chrono::milliseconds(2001)));
2316
Austin Schuhd863e6e2022-10-16 15:44:50 -07002317 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002318 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002319 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002320 e + chrono::milliseconds(2000), 0, chrono::seconds(20),
2321 e + chrono::milliseconds(2001)));
2322
Austin Schuhd863e6e2022-10-16 15:44:50 -07002323 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002324 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002325 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002326 e + chrono::milliseconds(3000), 0, chrono::seconds(20),
2327 e + chrono::milliseconds(3001)));
2328 }
2329
Austin Schuh58646e22021-08-23 23:51:46 -07002330 const std::vector<LogFile> parts =
2331 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07002332 LogFilesContainer log_files(parts);
Austin Schuh48507722021-07-17 17:29:24 -07002333
2334 for (const auto &x : parts) {
2335 LOG(INFO) << x;
2336 }
2337 ASSERT_EQ(parts.size(), 1u);
2338 ASSERT_EQ(parts[0].logger_node, "pi1");
2339
2340 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07002341 TimestampMapper mapper0("pi1", log_files);
Austin Schuh48507722021-07-17 17:29:24 -07002342 mapper0.set_timestamp_callback(
2343 [&](TimestampedMessage *) { ++mapper0_count; });
2344 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07002345 TimestampMapper mapper1("pi2", log_files);
Austin Schuh48507722021-07-17 17:29:24 -07002346 mapper1.set_timestamp_callback(
2347 [&](TimestampedMessage *) { ++mapper1_count; });
2348
2349 mapper0.AddPeer(&mapper1);
2350 mapper1.AddPeer(&mapper0);
2351
2352 {
2353 std::deque<TimestampedMessage> output0;
2354
2355 EXPECT_EQ(mapper0_count, 0u);
2356 EXPECT_EQ(mapper1_count, 0u);
2357 ASSERT_TRUE(mapper0.Front() != nullptr);
2358 EXPECT_EQ(mapper0_count, 1u);
2359 EXPECT_EQ(mapper1_count, 0u);
2360 output0.emplace_back(std::move(*mapper0.Front()));
2361 mapper0.PopFront();
2362 EXPECT_TRUE(mapper0.started());
2363 EXPECT_EQ(mapper0_count, 1u);
2364 EXPECT_EQ(mapper1_count, 0u);
2365
2366 ASSERT_TRUE(mapper0.Front() != nullptr);
2367 EXPECT_EQ(mapper0_count, 2u);
2368 EXPECT_EQ(mapper1_count, 0u);
2369 output0.emplace_back(std::move(*mapper0.Front()));
2370 mapper0.PopFront();
2371 EXPECT_TRUE(mapper0.started());
2372
2373 ASSERT_TRUE(mapper0.Front() != nullptr);
2374 output0.emplace_back(std::move(*mapper0.Front()));
2375 mapper0.PopFront();
2376 EXPECT_TRUE(mapper0.started());
2377
2378 EXPECT_EQ(mapper0_count, 3u);
2379 EXPECT_EQ(mapper1_count, 0u);
2380
2381 ASSERT_TRUE(mapper0.Front() == nullptr);
2382
2383 LOG(INFO) << output0[0];
2384 LOG(INFO) << output0[1];
2385 LOG(INFO) << output0[2];
2386
2387 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2388 EXPECT_EQ(output0[0].monotonic_event_time.time,
2389 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002390 EXPECT_EQ(output0[0].queue_index,
2391 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002392 EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
2393 EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002394 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002395
2396 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2397 EXPECT_EQ(output0[1].monotonic_event_time.time,
2398 e + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002399 EXPECT_EQ(output0[1].queue_index,
2400 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002401 EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
2402 EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002403 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002404
2405 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2406 EXPECT_EQ(output0[2].monotonic_event_time.time,
2407 e + chrono::milliseconds(3000));
Austin Schuh58646e22021-08-23 23:51:46 -07002408 EXPECT_EQ(output0[2].queue_index,
2409 (BootQueueIndex{.boot = 0u, .index = 2u}));
Austin Schuh48507722021-07-17 17:29:24 -07002410 EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
2411 EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002412 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002413 }
2414
2415 {
2416 SCOPED_TRACE("Trying node1 now");
2417 std::deque<TimestampedMessage> output1;
2418
2419 EXPECT_EQ(mapper0_count, 3u);
2420 EXPECT_EQ(mapper1_count, 0u);
2421
2422 ASSERT_TRUE(mapper1.Front() != nullptr);
2423 EXPECT_EQ(mapper0_count, 3u);
2424 EXPECT_EQ(mapper1_count, 1u);
2425 output1.emplace_back(std::move(*mapper1.Front()));
2426 mapper1.PopFront();
2427 EXPECT_TRUE(mapper1.started());
2428 EXPECT_EQ(mapper0_count, 3u);
2429 EXPECT_EQ(mapper1_count, 1u);
2430
2431 ASSERT_TRUE(mapper1.Front() != nullptr);
2432 EXPECT_EQ(mapper0_count, 3u);
2433 EXPECT_EQ(mapper1_count, 2u);
2434 output1.emplace_back(std::move(*mapper1.Front()));
2435 mapper1.PopFront();
2436 EXPECT_TRUE(mapper1.started());
2437
2438 ASSERT_TRUE(mapper1.Front() != nullptr);
2439 output1.emplace_back(std::move(*mapper1.Front()));
2440 mapper1.PopFront();
2441 EXPECT_TRUE(mapper1.started());
2442
Austin Schuh58646e22021-08-23 23:51:46 -07002443 ASSERT_TRUE(mapper1.Front() != nullptr);
2444 output1.emplace_back(std::move(*mapper1.Front()));
2445 mapper1.PopFront();
2446 EXPECT_TRUE(mapper1.started());
2447
Austin Schuh48507722021-07-17 17:29:24 -07002448 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002449 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002450
2451 ASSERT_TRUE(mapper1.Front() == nullptr);
2452
2453 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002454 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002455
2456 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2457 EXPECT_EQ(output1[0].monotonic_event_time.time,
2458 e + chrono::seconds(100) + chrono::milliseconds(1000));
2459 EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
2460 EXPECT_EQ(output1[0].monotonic_remote_time.time,
2461 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002462 EXPECT_EQ(output1[0].remote_queue_index,
2463 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002464 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
2465 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
2466 e + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002467 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002468
2469 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2470 EXPECT_EQ(output1[1].monotonic_event_time.time,
2471 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002472 EXPECT_EQ(output1[1].remote_queue_index,
2473 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002474 EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
2475 EXPECT_EQ(output1[1].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002476 e + chrono::milliseconds(1000));
Austin Schuh48507722021-07-17 17:29:24 -07002477 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
2478 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
2479 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002480 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002481
2482 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2483 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002484 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh48507722021-07-17 17:29:24 -07002485 EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
2486 EXPECT_EQ(output1[2].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002487 e + chrono::milliseconds(2000));
2488 EXPECT_EQ(output1[2].remote_queue_index,
2489 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002490 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
2491 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002492 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002493 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002494
Austin Schuh58646e22021-08-23 23:51:46 -07002495 EXPECT_EQ(output1[3].monotonic_event_time.boot, 1u);
2496 EXPECT_EQ(output1[3].monotonic_event_time.time,
2497 e + chrono::seconds(20) + chrono::milliseconds(3000));
2498 EXPECT_EQ(output1[3].monotonic_remote_time.boot, 0u);
2499 EXPECT_EQ(output1[3].monotonic_remote_time.time,
2500 e + chrono::milliseconds(3000));
2501 EXPECT_EQ(output1[3].remote_queue_index,
2502 (BootQueueIndex{.boot = 0u, .index = 2u}));
2503 EXPECT_EQ(output1[3].monotonic_timestamp_time.boot, 0u);
2504 EXPECT_EQ(output1[3].monotonic_timestamp_time.time,
2505 e + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002506 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh58646e22021-08-23 23:51:46 -07002507
Austin Schuh48507722021-07-17 17:29:24 -07002508 LOG(INFO) << output1[0];
2509 LOG(INFO) << output1[1];
2510 LOG(INFO) << output1[2];
Austin Schuh58646e22021-08-23 23:51:46 -07002511 LOG(INFO) << output1[3];
Austin Schuh48507722021-07-17 17:29:24 -07002512 }
2513}
2514
2515TEST_F(RebootTimestampMapperTest, Node2Reboot) {
2516 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2517 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002518 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002519 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002520 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002521 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002522 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002523 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002524 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002525 writer1b.QueueSpan(boot1b_.span());
2526
Austin Schuhd863e6e2022-10-16 15:44:50 -07002527 writer1a.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002528 e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002529 writer0a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002530 e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
2531 chrono::seconds(-100),
2532 e + chrono::seconds(100) + chrono::milliseconds(1001)));
2533
Austin Schuhd863e6e2022-10-16 15:44:50 -07002534 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002535 e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002536 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002537 e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
2538 chrono::seconds(-20),
2539 e + chrono::seconds(20) + chrono::milliseconds(2001)));
2540
Austin Schuhd863e6e2022-10-16 15:44:50 -07002541 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002542 e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002543 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002544 e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
2545 chrono::seconds(-20),
2546 e + chrono::seconds(20) + chrono::milliseconds(3001)));
2547 }
2548
2549 const std::vector<LogFile> parts =
2550 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07002551 LogFilesContainer log_files(parts);
Austin Schuh48507722021-07-17 17:29:24 -07002552
2553 for (const auto &x : parts) {
2554 LOG(INFO) << x;
2555 }
2556 ASSERT_EQ(parts.size(), 1u);
2557 ASSERT_EQ(parts[0].logger_node, "pi1");
2558
2559 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07002560 TimestampMapper mapper0("pi1", log_files);
Austin Schuh48507722021-07-17 17:29:24 -07002561 mapper0.set_timestamp_callback(
2562 [&](TimestampedMessage *) { ++mapper0_count; });
2563 size_t mapper1_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07002564 TimestampMapper mapper1("pi2", log_files);
Austin Schuh48507722021-07-17 17:29:24 -07002565 mapper1.set_timestamp_callback(
2566 [&](TimestampedMessage *) { ++mapper1_count; });
2567
2568 mapper0.AddPeer(&mapper1);
2569 mapper1.AddPeer(&mapper0);
2570
2571 {
2572 std::deque<TimestampedMessage> output0;
2573
2574 EXPECT_EQ(mapper0_count, 0u);
2575 EXPECT_EQ(mapper1_count, 0u);
2576 ASSERT_TRUE(mapper0.Front() != nullptr);
2577 EXPECT_EQ(mapper0_count, 1u);
2578 EXPECT_EQ(mapper1_count, 0u);
2579 output0.emplace_back(std::move(*mapper0.Front()));
2580 mapper0.PopFront();
2581 EXPECT_TRUE(mapper0.started());
2582 EXPECT_EQ(mapper0_count, 1u);
2583 EXPECT_EQ(mapper1_count, 0u);
2584
2585 ASSERT_TRUE(mapper0.Front() != nullptr);
2586 EXPECT_EQ(mapper0_count, 2u);
2587 EXPECT_EQ(mapper1_count, 0u);
2588 output0.emplace_back(std::move(*mapper0.Front()));
2589 mapper0.PopFront();
2590 EXPECT_TRUE(mapper0.started());
2591
2592 ASSERT_TRUE(mapper0.Front() != nullptr);
2593 output0.emplace_back(std::move(*mapper0.Front()));
2594 mapper0.PopFront();
2595 EXPECT_TRUE(mapper0.started());
2596
2597 EXPECT_EQ(mapper0_count, 3u);
2598 EXPECT_EQ(mapper1_count, 0u);
2599
2600 ASSERT_TRUE(mapper0.Front() == nullptr);
2601
2602 LOG(INFO) << output0[0];
2603 LOG(INFO) << output0[1];
2604 LOG(INFO) << output0[2];
2605
2606 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2607 EXPECT_EQ(output0[0].monotonic_event_time.time,
2608 e + chrono::milliseconds(1000));
2609 EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
2610 EXPECT_EQ(output0[0].monotonic_remote_time.time,
2611 e + chrono::seconds(100) + chrono::milliseconds(1000));
2612 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
2613 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
2614 e + chrono::seconds(100) + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002615 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002616
2617 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2618 EXPECT_EQ(output0[1].monotonic_event_time.time,
2619 e + chrono::milliseconds(2000));
2620 EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
2621 EXPECT_EQ(output0[1].monotonic_remote_time.time,
2622 e + chrono::seconds(20) + chrono::milliseconds(2000));
2623 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
2624 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
2625 e + chrono::seconds(20) + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002626 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002627
2628 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2629 EXPECT_EQ(output0[2].monotonic_event_time.time,
2630 e + chrono::milliseconds(3000));
2631 EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
2632 EXPECT_EQ(output0[2].monotonic_remote_time.time,
2633 e + chrono::seconds(20) + chrono::milliseconds(3000));
2634 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
2635 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
2636 e + chrono::seconds(20) + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002637 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002638 }
2639
2640 {
2641 SCOPED_TRACE("Trying node1 now");
2642 std::deque<TimestampedMessage> output1;
2643
2644 EXPECT_EQ(mapper0_count, 3u);
2645 EXPECT_EQ(mapper1_count, 0u);
2646
2647 ASSERT_TRUE(mapper1.Front() != nullptr);
2648 EXPECT_EQ(mapper0_count, 3u);
2649 EXPECT_EQ(mapper1_count, 1u);
2650 output1.emplace_back(std::move(*mapper1.Front()));
2651 mapper1.PopFront();
2652 EXPECT_TRUE(mapper1.started());
2653 EXPECT_EQ(mapper0_count, 3u);
2654 EXPECT_EQ(mapper1_count, 1u);
2655
2656 ASSERT_TRUE(mapper1.Front() != nullptr);
2657 EXPECT_EQ(mapper0_count, 3u);
2658 EXPECT_EQ(mapper1_count, 2u);
2659 output1.emplace_back(std::move(*mapper1.Front()));
2660 mapper1.PopFront();
2661 EXPECT_TRUE(mapper1.started());
2662
2663 ASSERT_TRUE(mapper1.Front() != nullptr);
2664 output1.emplace_back(std::move(*mapper1.Front()));
2665 mapper1.PopFront();
2666 EXPECT_TRUE(mapper1.started());
2667
2668 EXPECT_EQ(mapper0_count, 3u);
2669 EXPECT_EQ(mapper1_count, 3u);
2670
2671 ASSERT_TRUE(mapper1.Front() == nullptr);
2672
2673 EXPECT_EQ(mapper0_count, 3u);
2674 EXPECT_EQ(mapper1_count, 3u);
2675
2676 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2677 EXPECT_EQ(output1[0].monotonic_event_time.time,
2678 e + chrono::seconds(100) + chrono::milliseconds(1000));
2679 EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
2680 EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002681 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002682
2683 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2684 EXPECT_EQ(output1[1].monotonic_event_time.time,
2685 e + chrono::seconds(20) + chrono::milliseconds(2000));
2686 EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
2687 EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002688 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002689
2690 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2691 EXPECT_EQ(output1[2].monotonic_event_time.time,
2692 e + chrono::seconds(20) + chrono::milliseconds(3000));
2693 EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
2694 EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002695 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002696
2697 LOG(INFO) << output1[0];
2698 LOG(INFO) << output1[1];
2699 LOG(INFO) << output1[2];
2700 }
2701}
2702
Austin Schuh44c61472021-11-22 21:04:10 -08002703class SortingDeathTest : public SortingElementTest {
2704 public:
2705 SortingDeathTest()
2706 : SortingElementTest(),
2707 part0_(MakeHeader(config_, R"({
2708 /* 100ms */
2709 "max_out_of_order_duration": 100000000,
2710 "node": {
2711 "name": "pi1"
2712 },
2713 "logger_node": {
2714 "name": "pi1"
2715 },
2716 "monotonic_start_time": 1000000,
2717 "realtime_start_time": 1000000000000,
2718 "logger_monotonic_start_time": 1000000,
2719 "logger_realtime_start_time": 1000000000000,
2720 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2721 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2722 "parts_index": 0,
2723 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2724 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2725 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2726 "boot_uuids": [
2727 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2728 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2729 ""
2730 ],
2731 "oldest_remote_monotonic_timestamps": [
2732 9223372036854775807,
2733 9223372036854775807,
2734 9223372036854775807
2735 ],
2736 "oldest_local_monotonic_timestamps": [
2737 9223372036854775807,
2738 9223372036854775807,
2739 9223372036854775807
2740 ],
2741 "oldest_remote_unreliable_monotonic_timestamps": [
2742 9223372036854775807,
2743 0,
2744 9223372036854775807
2745 ],
2746 "oldest_local_unreliable_monotonic_timestamps": [
2747 9223372036854775807,
2748 0,
2749 9223372036854775807
2750 ]
2751})")),
2752 part1_(MakeHeader(config_, R"({
2753 /* 100ms */
2754 "max_out_of_order_duration": 100000000,
2755 "node": {
2756 "name": "pi1"
2757 },
2758 "logger_node": {
2759 "name": "pi1"
2760 },
2761 "monotonic_start_time": 1000000,
2762 "realtime_start_time": 1000000000000,
2763 "logger_monotonic_start_time": 1000000,
2764 "logger_realtime_start_time": 1000000000000,
2765 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2766 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2767 "parts_index": 1,
2768 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2769 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2770 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2771 "boot_uuids": [
2772 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2773 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2774 ""
2775 ],
2776 "oldest_remote_monotonic_timestamps": [
2777 9223372036854775807,
2778 9223372036854775807,
2779 9223372036854775807
2780 ],
2781 "oldest_local_monotonic_timestamps": [
2782 9223372036854775807,
2783 9223372036854775807,
2784 9223372036854775807
2785 ],
2786 "oldest_remote_unreliable_monotonic_timestamps": [
2787 9223372036854775807,
2788 100000,
2789 9223372036854775807
2790 ],
2791 "oldest_local_unreliable_monotonic_timestamps": [
2792 9223372036854775807,
2793 100000,
2794 9223372036854775807
2795 ]
2796})")),
2797 part2_(MakeHeader(config_, R"({
2798 /* 100ms */
2799 "max_out_of_order_duration": 100000000,
2800 "node": {
2801 "name": "pi1"
2802 },
2803 "logger_node": {
2804 "name": "pi1"
2805 },
2806 "monotonic_start_time": 1000000,
2807 "realtime_start_time": 1000000000000,
2808 "logger_monotonic_start_time": 1000000,
2809 "logger_realtime_start_time": 1000000000000,
2810 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2811 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2812 "parts_index": 2,
2813 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2814 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2815 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2816 "boot_uuids": [
2817 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2818 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2819 ""
2820 ],
2821 "oldest_remote_monotonic_timestamps": [
2822 9223372036854775807,
2823 9223372036854775807,
2824 9223372036854775807
2825 ],
2826 "oldest_local_monotonic_timestamps": [
2827 9223372036854775807,
2828 9223372036854775807,
2829 9223372036854775807
2830 ],
2831 "oldest_remote_unreliable_monotonic_timestamps": [
2832 9223372036854775807,
2833 200000,
2834 9223372036854775807
2835 ],
2836 "oldest_local_unreliable_monotonic_timestamps": [
2837 9223372036854775807,
2838 200000,
2839 9223372036854775807
2840 ]
2841})")),
2842 part3_(MakeHeader(config_, R"({
2843 /* 100ms */
2844 "max_out_of_order_duration": 100000000,
2845 "node": {
2846 "name": "pi1"
2847 },
2848 "logger_node": {
2849 "name": "pi1"
2850 },
2851 "monotonic_start_time": 1000000,
2852 "realtime_start_time": 1000000000000,
2853 "logger_monotonic_start_time": 1000000,
2854 "logger_realtime_start_time": 1000000000000,
2855 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2856 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2857 "parts_index": 3,
2858 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2859 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2860 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2861 "boot_uuids": [
2862 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2863 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2864 ""
2865 ],
2866 "oldest_remote_monotonic_timestamps": [
2867 9223372036854775807,
2868 9223372036854775807,
2869 9223372036854775807
2870 ],
2871 "oldest_local_monotonic_timestamps": [
2872 9223372036854775807,
2873 9223372036854775807,
2874 9223372036854775807
2875 ],
2876 "oldest_remote_unreliable_monotonic_timestamps": [
2877 9223372036854775807,
2878 300000,
2879 9223372036854775807
2880 ],
2881 "oldest_local_unreliable_monotonic_timestamps": [
2882 9223372036854775807,
2883 300000,
2884 9223372036854775807
2885 ]
2886})")) {}
2887
2888 protected:
2889 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part0_;
2890 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part1_;
2891 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part2_;
2892 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part3_;
2893};
2894
2895// Tests that if 2 computers go back and forth trying to be the same node, we
2896// die in sorting instead of failing to estimate time.
2897TEST_F(SortingDeathTest, FightingNodes) {
2898 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002899 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh44c61472021-11-22 21:04:10 -08002900 writer0.QueueSpan(part0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002901 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh44c61472021-11-22 21:04:10 -08002902 writer1.QueueSpan(part1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002903 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuh44c61472021-11-22 21:04:10 -08002904 writer2.QueueSpan(part2_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002905 TestDetachedBufferWriter writer3(logfile3_);
Austin Schuh44c61472021-11-22 21:04:10 -08002906 writer3.QueueSpan(part3_.span());
2907 }
2908
2909 EXPECT_DEATH(
2910 {
2911 const std::vector<LogFile> parts =
2912 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2913 },
Austin Schuh22cf7862022-09-19 19:09:42 -07002914 "found overlapping boots on");
Austin Schuh44c61472021-11-22 21:04:10 -08002915}
2916
Brian Smarttea913d42021-12-10 15:02:38 -08002917// Tests that we MessageReader blows up on a bad message.
2918TEST(MessageReaderConfirmCrash, ReadWrite) {
2919 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
2920 unlink(logfile.c_str());
2921
2922 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
2923 JsonToSizedFlatbuffer<LogFileHeader>(
2924 R"({ "max_out_of_order_duration": 100000000 })");
2925 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
2926 JsonToSizedFlatbuffer<MessageHeader>(
2927 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
2928 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
2929 JsonToSizedFlatbuffer<MessageHeader>(
2930 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
2931 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m4 =
2932 JsonToSizedFlatbuffer<MessageHeader>(
2933 R"({ "channel_index": 0, "monotonic_sent_time": 4 })");
2934
2935 // Starts out like a proper flat buffer header, but it breaks down ...
2936 std::vector<uint8_t> garbage{8, 0, 0, 0, 16, 0, 0, 0, 4, 0, 0, 0};
2937 absl::Span<uint8_t> m3_span(garbage);
2938
2939 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002940 TestDetachedBufferWriter writer(logfile);
Brian Smarttea913d42021-12-10 15:02:38 -08002941 writer.QueueSpan(config.span());
2942 writer.QueueSpan(m1.span());
2943 writer.QueueSpan(m2.span());
2944 writer.QueueSpan(m3_span);
2945 writer.QueueSpan(m4.span()); // This message is "hidden"
2946 }
2947
2948 {
2949 MessageReader reader(logfile);
2950
2951 EXPECT_EQ(reader.filename(), logfile);
2952
2953 EXPECT_EQ(
2954 reader.max_out_of_order_duration(),
2955 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2956 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2957 EXPECT_TRUE(reader.ReadMessage());
2958 EXPECT_EQ(reader.newest_timestamp(),
2959 monotonic_clock::time_point(chrono::nanoseconds(1)));
2960 EXPECT_TRUE(reader.ReadMessage());
2961 EXPECT_EQ(reader.newest_timestamp(),
2962 monotonic_clock::time_point(chrono::nanoseconds(2)));
2963 // Confirm default crashing behavior
2964 EXPECT_DEATH(reader.ReadMessage(), "Corrupted message at offset");
2965 }
2966
2967 {
2968 gflags::FlagSaver fs;
2969
2970 MessageReader reader(logfile);
2971 reader.set_crash_on_corrupt_message_flag(false);
2972
2973 EXPECT_EQ(reader.filename(), logfile);
2974
2975 EXPECT_EQ(
2976 reader.max_out_of_order_duration(),
2977 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2978 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2979 EXPECT_TRUE(reader.ReadMessage());
2980 EXPECT_EQ(reader.newest_timestamp(),
2981 monotonic_clock::time_point(chrono::nanoseconds(1)));
2982 EXPECT_TRUE(reader.ReadMessage());
2983 EXPECT_EQ(reader.newest_timestamp(),
2984 monotonic_clock::time_point(chrono::nanoseconds(2)));
2985 // Confirm avoiding the corrupted message crash, stopping instead.
2986 EXPECT_FALSE(reader.ReadMessage());
2987 }
2988
2989 {
2990 gflags::FlagSaver fs;
2991
2992 MessageReader reader(logfile);
2993 reader.set_crash_on_corrupt_message_flag(false);
2994 reader.set_ignore_corrupt_messages_flag(true);
2995
2996 EXPECT_EQ(reader.filename(), logfile);
2997
2998 EXPECT_EQ(
2999 reader.max_out_of_order_duration(),
3000 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
3001 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
3002 EXPECT_TRUE(reader.ReadMessage());
3003 EXPECT_EQ(reader.newest_timestamp(),
3004 monotonic_clock::time_point(chrono::nanoseconds(1)));
3005 EXPECT_TRUE(reader.ReadMessage());
3006 EXPECT_EQ(reader.newest_timestamp(),
3007 monotonic_clock::time_point(chrono::nanoseconds(2)));
3008 // Confirm skipping of the corrupted message to read the hidden one.
3009 EXPECT_TRUE(reader.ReadMessage());
3010 EXPECT_EQ(reader.newest_timestamp(),
3011 monotonic_clock::time_point(chrono::nanoseconds(4)));
3012 EXPECT_FALSE(reader.ReadMessage());
3013 }
3014}
3015
Austin Schuhfa30c352022-10-16 11:12:02 -07003016class InlinePackMessage : public ::testing::Test {
3017 protected:
3018 aos::Context RandomContext() {
3019 data_ = RandomData();
3020 std::uniform_int_distribution<uint32_t> uint32_distribution(
3021 std::numeric_limits<uint32_t>::min(),
3022 std::numeric_limits<uint32_t>::max());
3023
3024 std::uniform_int_distribution<int64_t> time_distribution(
3025 std::numeric_limits<int64_t>::min(),
3026 std::numeric_limits<int64_t>::max());
3027
3028 aos::Context context;
3029 context.monotonic_event_time =
3030 aos::monotonic_clock::epoch() +
3031 chrono::nanoseconds(time_distribution(random_number_generator_));
3032 context.realtime_event_time =
3033 aos::realtime_clock::epoch() +
3034 chrono::nanoseconds(time_distribution(random_number_generator_));
3035
3036 context.monotonic_remote_time =
3037 aos::monotonic_clock::epoch() +
3038 chrono::nanoseconds(time_distribution(random_number_generator_));
3039 context.realtime_remote_time =
3040 aos::realtime_clock::epoch() +
3041 chrono::nanoseconds(time_distribution(random_number_generator_));
3042
3043 context.queue_index = uint32_distribution(random_number_generator_);
3044 context.remote_queue_index = uint32_distribution(random_number_generator_);
3045 context.size = data_.size();
3046 context.data = data_.data();
3047 return context;
3048 }
3049
Austin Schuhf2d0e682022-10-16 14:20:58 -07003050 aos::monotonic_clock::time_point RandomMonotonic() {
3051 std::uniform_int_distribution<int64_t> time_distribution(
3052 0, std::numeric_limits<int64_t>::max());
3053 return aos::monotonic_clock::epoch() +
3054 chrono::nanoseconds(time_distribution(random_number_generator_));
3055 }
3056
3057 aos::SizePrefixedFlatbufferDetachedBuffer<message_bridge::RemoteMessage>
3058 RandomRemoteMessage() {
3059 std::uniform_int_distribution<uint8_t> uint8_distribution(
3060 std::numeric_limits<uint8_t>::min(),
3061 std::numeric_limits<uint8_t>::max());
3062
3063 std::uniform_int_distribution<int64_t> time_distribution(
3064 std::numeric_limits<int64_t>::min(),
3065 std::numeric_limits<int64_t>::max());
3066
3067 flatbuffers::FlatBufferBuilder fbb;
3068 message_bridge::RemoteMessage::Builder builder(fbb);
3069 builder.add_queue_index(uint8_distribution(random_number_generator_));
3070
3071 builder.add_monotonic_sent_time(
3072 time_distribution(random_number_generator_));
3073 builder.add_realtime_sent_time(time_distribution(random_number_generator_));
3074 builder.add_monotonic_remote_time(
3075 time_distribution(random_number_generator_));
3076 builder.add_realtime_remote_time(
3077 time_distribution(random_number_generator_));
3078
3079 builder.add_remote_queue_index(
3080 uint8_distribution(random_number_generator_));
3081
3082 fbb.FinishSizePrefixed(builder.Finish());
3083 return fbb.Release();
3084 }
3085
Austin Schuhfa30c352022-10-16 11:12:02 -07003086 std::vector<uint8_t> RandomData() {
3087 std::vector<uint8_t> result;
3088 std::uniform_int_distribution<int> length_distribution(1, 32);
3089 std::uniform_int_distribution<uint8_t> data_distribution(
3090 std::numeric_limits<uint8_t>::min(),
3091 std::numeric_limits<uint8_t>::max());
3092
3093 const size_t length = length_distribution(random_number_generator_);
3094
3095 result.reserve(length);
3096 for (size_t i = 0; i < length; ++i) {
3097 result.emplace_back(data_distribution(random_number_generator_));
3098 }
3099 return result;
3100 }
3101
3102 std::mt19937 random_number_generator_{
3103 std::mt19937(::aos::testing::RandomSeed())};
3104
3105 std::vector<uint8_t> data_;
3106};
3107
3108// Uses the binary schema to annotate a provided flatbuffer. Returns the
3109// annotated flatbuffer.
3110std::string AnnotateBinaries(
3111 const aos::NonSizePrefixedFlatbuffer<reflection::Schema> &schema,
3112 const std::string &schema_filename,
3113 flatbuffers::span<uint8_t> binary_data) {
3114 flatbuffers::BinaryAnnotator binary_annotator(
3115 schema.span().data(), schema.span().size(), binary_data.data(),
3116 binary_data.size());
3117
3118 auto annotations = binary_annotator.Annotate();
3119
3120 flatbuffers::AnnotatedBinaryTextGenerator text_generator(
3121 flatbuffers::AnnotatedBinaryTextGenerator::Options{}, annotations,
3122 binary_data.data(), binary_data.size());
3123
3124 text_generator.Generate(aos::testing::TestTmpDir() + "/foo.bfbs",
3125 schema_filename);
3126
3127 return aos::util::ReadFileToStringOrDie(aos::testing::TestTmpDir() +
3128 "/foo.afb");
3129}
3130
Austin Schuh71a40d42023-02-04 21:22:22 -08003131// Event loop which just has working time functions for the Copier classes
3132// tested below.
3133class TimeEventLoop : public EventLoop {
3134 public:
3135 TimeEventLoop() : EventLoop(nullptr) {}
3136
3137 aos::monotonic_clock::time_point monotonic_now() const final {
3138 return aos::monotonic_clock::min_time;
3139 }
3140 realtime_clock::time_point realtime_now() const final {
3141 return aos::realtime_clock::min_time;
3142 }
3143
3144 void OnRun(::std::function<void()> /*on_run*/) final { LOG(FATAL); }
3145
3146 const std::string_view name() const final { return "time"; }
3147 const Node *node() const final { return nullptr; }
3148
3149 void SetRuntimeAffinity(const cpu_set_t & /*cpuset*/) final { LOG(FATAL); }
3150 void SetRuntimeRealtimePriority(int /*priority*/) final { LOG(FATAL); }
3151
3152 const cpu_set_t &runtime_affinity() const final {
3153 LOG(FATAL);
3154 return cpuset_;
3155 }
3156
3157 TimerHandler *AddTimer(::std::function<void()> /*callback*/) final {
3158 LOG(FATAL);
3159 return nullptr;
3160 }
3161
3162 std::unique_ptr<RawSender> MakeRawSender(const Channel * /*channel*/) final {
3163 LOG(FATAL);
3164 return std::unique_ptr<RawSender>();
3165 }
3166
3167 const UUID &boot_uuid() const final {
3168 LOG(FATAL);
3169 return boot_uuid_;
3170 }
3171
3172 void set_name(const std::string_view name) final { LOG(FATAL) << name; }
3173
3174 pid_t GetTid() final {
3175 LOG(FATAL);
3176 return 0;
3177 }
3178
3179 int NumberBuffers(const Channel * /*channel*/) final {
3180 LOG(FATAL);
3181 return 0;
3182 }
3183
3184 int runtime_realtime_priority() const final {
3185 LOG(FATAL);
3186 return 0;
3187 }
3188
3189 std::unique_ptr<RawFetcher> MakeRawFetcher(
3190 const Channel * /*channel*/) final {
3191 LOG(FATAL);
3192 return std::unique_ptr<RawFetcher>();
3193 }
3194
3195 PhasedLoopHandler *AddPhasedLoop(
3196 ::std::function<void(int)> /*callback*/,
3197 const monotonic_clock::duration /*interval*/,
3198 const monotonic_clock::duration /*offset*/) final {
3199 LOG(FATAL);
3200 return nullptr;
3201 }
3202
3203 void MakeRawWatcher(
3204 const Channel * /*channel*/,
3205 std::function<void(const Context &context, const void *message)>
3206 /*watcher*/) final {
3207 LOG(FATAL);
3208 }
3209
3210 private:
3211 const cpu_set_t cpuset_ = DefaultAffinity();
3212 UUID boot_uuid_ = UUID ::Zero();
3213};
3214
Austin Schuhfa30c352022-10-16 11:12:02 -07003215// Tests that all variations of PackMessage are equivalent to the inline
3216// PackMessage used to avoid allocations.
3217TEST_F(InlinePackMessage, Equivilent) {
3218 std::uniform_int_distribution<uint32_t> uint32_distribution(
3219 std::numeric_limits<uint32_t>::min(),
3220 std::numeric_limits<uint32_t>::max());
3221 aos::FlatbufferVector<reflection::Schema> schema =
3222 FileToFlatbuffer<reflection::Schema>(
3223 ArtifactPath("aos/events/logging/logger.bfbs"));
3224
3225 for (const LogType type :
3226 {LogType::kLogMessage, LogType::kLogDeliveryTimeOnly,
3227 LogType::kLogMessageAndDeliveryTime, LogType::kLogRemoteMessage}) {
3228 for (int i = 0; i < 100; ++i) {
3229 aos::Context context = RandomContext();
3230 const uint32_t channel_index =
3231 uint32_distribution(random_number_generator_);
3232
3233 flatbuffers::FlatBufferBuilder fbb;
3234 fbb.ForceDefaults(true);
3235 fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index, type));
3236
3237 VLOG(1) << absl::BytesToHexString(std::string_view(
3238 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3239 fbb.GetBufferSpan().size()));
3240
3241 // Make sure that both the builder and inline method agree on sizes.
Austin Schuh48d10d62022-10-16 22:19:23 -07003242 ASSERT_EQ(fbb.GetSize(), PackMessageSize(type, context.size))
Austin Schuhfa30c352022-10-16 11:12:02 -07003243 << "log type " << static_cast<int>(type);
3244
3245 // Initialize the buffer to something nonzero to make sure all the padding
3246 // bytes are set to 0.
Austin Schuh48d10d62022-10-16 22:19:23 -07003247 std::vector<uint8_t> repacked_message(PackMessageSize(type, context.size),
3248 67);
Austin Schuhfa30c352022-10-16 11:12:02 -07003249
3250 // And verify packing inline works as expected.
Austin Schuh71a40d42023-02-04 21:22:22 -08003251 EXPECT_EQ(
3252 repacked_message.size(),
3253 PackMessageInline(repacked_message.data(), context, channel_index,
3254 type, 0u, repacked_message.size()));
Austin Schuhfa30c352022-10-16 11:12:02 -07003255 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
3256 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3257 fbb.GetBufferSpan().size()))
3258 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
3259 fbb.GetBufferSpan());
Austin Schuh71a40d42023-02-04 21:22:22 -08003260
3261 // Ok, now we want to confirm that we can build up arbitrary pieces of
3262 // said flatbuffer. Try all of them since it is cheap.
3263 TimeEventLoop event_loop;
3264 for (size_t i = 0; i < repacked_message.size(); i += 8) {
3265 for (size_t j = i; j < repacked_message.size(); j += 8) {
3266 std::vector<uint8_t> destination(repacked_message.size(), 67u);
3267 ContextDataCopier copier(context, channel_index, type, &event_loop);
3268
3269 copier.Copy(destination.data(), i, j);
3270
3271 size_t index = 0;
3272 for (size_t k = i; k < j; ++k) {
3273 ASSERT_EQ(destination[index], repacked_message[k])
3274 << ": Failed to match type " << static_cast<int>(type)
3275 << ", index " << index << " while testing range " << i << " to "
3276 << j;
3277 ;
3278 ++index;
3279 }
3280 // Now, confirm that none of the other bytes have been touched.
3281 for (; index < destination.size(); ++index) {
3282 ASSERT_EQ(destination[index], 67u);
3283 }
3284 }
3285 }
Austin Schuhfa30c352022-10-16 11:12:02 -07003286 }
3287 }
3288}
3289
Austin Schuhf2d0e682022-10-16 14:20:58 -07003290// Tests that all variations of PackMessage are equivilent to the inline
3291// PackMessage used to avoid allocations.
3292TEST_F(InlinePackMessage, RemoteEquivilent) {
3293 aos::FlatbufferVector<reflection::Schema> schema =
3294 FileToFlatbuffer<reflection::Schema>(
3295 ArtifactPath("aos/events/logging/logger.bfbs"));
3296 std::uniform_int_distribution<uint8_t> uint8_distribution(
3297 std::numeric_limits<uint8_t>::min(), std::numeric_limits<uint8_t>::max());
3298
3299 for (int i = 0; i < 100; ++i) {
3300 aos::SizePrefixedFlatbufferDetachedBuffer<RemoteMessage> random_msg =
3301 RandomRemoteMessage();
3302 const size_t channel_index = uint8_distribution(random_number_generator_);
3303 const monotonic_clock::time_point monotonic_timestamp_time =
3304 RandomMonotonic();
3305
3306 flatbuffers::FlatBufferBuilder fbb;
3307 fbb.ForceDefaults(true);
3308 fbb.FinishSizePrefixed(PackRemoteMessage(
3309 &fbb, &random_msg.message(), channel_index, monotonic_timestamp_time));
3310
3311 VLOG(1) << absl::BytesToHexString(std::string_view(
3312 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3313 fbb.GetBufferSpan().size()));
3314
3315 // Make sure that both the builder and inline method agree on sizes.
3316 ASSERT_EQ(fbb.GetSize(), PackRemoteMessageSize());
3317
3318 // Initialize the buffer to something nonzer to make sure all the padding
3319 // bytes are set to 0.
3320 std::vector<uint8_t> repacked_message(PackRemoteMessageSize(), 67);
3321
3322 // And verify packing inline works as expected.
Austin Schuh71a40d42023-02-04 21:22:22 -08003323 EXPECT_EQ(repacked_message.size(),
3324 PackRemoteMessageInline(
3325 repacked_message.data(), &random_msg.message(), channel_index,
3326 monotonic_timestamp_time, 0u, repacked_message.size()));
Austin Schuhf2d0e682022-10-16 14:20:58 -07003327 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
3328 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3329 fbb.GetBufferSpan().size()))
3330 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
3331 fbb.GetBufferSpan());
Austin Schuh71a40d42023-02-04 21:22:22 -08003332
3333 // Ok, now we want to confirm that we can build up arbitrary pieces of said
3334 // flatbuffer. Try all of them since it is cheap.
3335 TimeEventLoop event_loop;
3336 for (size_t i = 0; i < repacked_message.size(); i += 8) {
3337 for (size_t j = i; j < repacked_message.size(); j += 8) {
3338 std::vector<uint8_t> destination(repacked_message.size(), 67u);
3339 RemoteMessageCopier copier(&random_msg.message(), channel_index,
3340 monotonic_timestamp_time, &event_loop);
3341
3342 copier.Copy(destination.data(), i, j);
3343
3344 size_t index = 0;
3345 for (size_t k = i; k < j; ++k) {
3346 ASSERT_EQ(destination[index], repacked_message[k]);
3347 ++index;
3348 }
3349 for (; index < destination.size(); ++index) {
3350 ASSERT_EQ(destination[index], 67u);
3351 }
3352 }
3353 }
Austin Schuhf2d0e682022-10-16 14:20:58 -07003354 }
3355}
Austin Schuhfa30c352022-10-16 11:12:02 -07003356
Austin Schuhc243b422020-10-11 15:35:08 -07003357} // namespace testing
3358} // namespace logger
3359} // namespace aos