blob: ea56d8d40aeed07a40ee0e57a5676f587f288bec [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
Alexei Strots01395492023-03-20 13:59:56 -07004#include <filesystem>
Austin Schuhfa30c352022-10-16 11:12:02 -07005#include <random>
Austin Schuhe243aaf2020-10-11 15:46:02 -07006#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07007
Austin Schuhfa30c352022-10-16 11:12:02 -07008#include "absl/strings/escaping.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07009#include "external/com_github_google_flatbuffers/src/annotated_binary_text_gen.h"
10#include "external/com_github_google_flatbuffers/src/binary_annotator.h"
11#include "flatbuffers/reflection_generated.h"
12#include "gflags/gflags.h"
13#include "gtest/gtest.h"
14
Austin Schuhc41603c2020-10-11 16:17:37 -070015#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -070016#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -080017#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070018#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070019#include "aos/json_to_flatbuffer.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070020#include "aos/testing/path.h"
21#include "aos/testing/random_seed.h"
Austin Schuhc243b422020-10-11 15:35:08 -070022#include "aos/testing/tmpdir.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070023#include "aos/util/file.h"
Austin Schuhc243b422020-10-11 15:35:08 -070024
25namespace aos {
26namespace logger {
27namespace testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070028namespace chrono = std::chrono;
Austin Schuhf2d0e682022-10-16 14:20:58 -070029using aos::message_bridge::RemoteMessage;
Eric Schmiedebergb38477e2022-12-02 16:08:04 -070030using aos::testing::ArtifactPath;
Austin Schuhc243b422020-10-11 15:35:08 -070031
Austin Schuhd863e6e2022-10-16 15:44:50 -070032// Adapter class to make it easy to test DetachedBufferWriter without adding
33// test only boilerplate to DetachedBufferWriter.
Alexei Strots15c22b12023-04-04 16:27:17 -070034class TestDetachedBufferWriter : public FileBackend,
35 public DetachedBufferWriter {
Austin Schuhd863e6e2022-10-16 15:44:50 -070036 public:
Austin Schuh48d10d62022-10-16 22:19:23 -070037 // Pick a max size that is rather conservative.
38 static constexpr size_t kMaxMessageSize = 128 * 1024;
Austin Schuhd863e6e2022-10-16 15:44:50 -070039 TestDetachedBufferWriter(std::string_view filename)
colleen61276dc2023-06-01 09:23:29 -070040 : FileBackend("/", false),
Alexei Strots15c22b12023-04-04 16:27:17 -070041 DetachedBufferWriter(FileBackend::RequestFile(filename),
42 std::make_unique<DummyEncoder>(kMaxMessageSize)) {}
Austin Schuhd863e6e2022-10-16 15:44:50 -070043 void WriteSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
44 QueueSpan(absl::Span<const uint8_t>(buffer.data(), buffer.size()));
45 }
Austin Schuh7ef11a42023-02-04 17:15:12 -080046 void QueueSpan(absl::Span<const uint8_t> buffer) {
47 DataEncoder::SpanCopier coppier(buffer);
48 CopyMessage(&coppier, monotonic_clock::now());
49 }
Austin Schuhd863e6e2022-10-16 15:44:50 -070050};
51
Austin Schuhe243aaf2020-10-11 15:46:02 -070052// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070053template <typename T>
54SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
55 const std::string_view data) {
56 flatbuffers::FlatBufferBuilder fbb;
57 fbb.ForceDefaults(true);
58 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
59 return fbb.Release();
60}
61
Austin Schuhe243aaf2020-10-11 15:46:02 -070062// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070063TEST(SpanReaderTest, ReadWrite) {
64 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
65 unlink(logfile.c_str());
66
67 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080068 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070069 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080070 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070071
72 {
Austin Schuhd863e6e2022-10-16 15:44:50 -070073 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080074 writer.QueueSpan(m1.span());
75 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070076 }
77
78 SpanReader reader(logfile);
79
80 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhcf5f6442021-07-06 10:43:28 -070081 EXPECT_EQ(reader.PeekMessage(), m1.span());
82 EXPECT_EQ(reader.PeekMessage(), m1.span());
Austin Schuhadd6eb32020-11-09 21:24:26 -080083 EXPECT_EQ(reader.ReadMessage(), m1.span());
84 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhcf5f6442021-07-06 10:43:28 -070085 EXPECT_EQ(reader.PeekMessage(), absl::Span<const uint8_t>());
Austin Schuhc243b422020-10-11 15:35:08 -070086 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
87}
88
Austin Schuhe243aaf2020-10-11 15:46:02 -070089// Tests that we can actually parse the resulting messages at a basic level
90// through MessageReader.
91TEST(MessageReaderTest, ReadWrite) {
92 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
93 unlink(logfile.c_str());
94
95 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
96 JsonToSizedFlatbuffer<LogFileHeader>(
97 R"({ "max_out_of_order_duration": 100000000 })");
98 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
99 JsonToSizedFlatbuffer<MessageHeader>(
100 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
101 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
102 JsonToSizedFlatbuffer<MessageHeader>(
103 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
104
105 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700106 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800107 writer.QueueSpan(config.span());
108 writer.QueueSpan(m1.span());
109 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -0700110 }
111
112 MessageReader reader(logfile);
113
114 EXPECT_EQ(reader.filename(), logfile);
115
116 EXPECT_EQ(
117 reader.max_out_of_order_duration(),
118 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
119 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
120 EXPECT_TRUE(reader.ReadMessage());
121 EXPECT_EQ(reader.newest_timestamp(),
122 monotonic_clock::time_point(chrono::nanoseconds(1)));
123 EXPECT_TRUE(reader.ReadMessage());
124 EXPECT_EQ(reader.newest_timestamp(),
125 monotonic_clock::time_point(chrono::nanoseconds(2)));
126 EXPECT_FALSE(reader.ReadMessage());
127}
128
Austin Schuh32f68492020-11-08 21:45:51 -0800129// Tests that we explode when messages are too far out of order.
130TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
131 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
132 unlink(logfile0.c_str());
133
134 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
135 JsonToSizedFlatbuffer<LogFileHeader>(
136 R"({
137 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800138 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800139 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
140 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
141 "parts_index": 0
142})");
143
144 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
145 JsonToSizedFlatbuffer<MessageHeader>(
146 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
147 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
148 JsonToSizedFlatbuffer<MessageHeader>(
149 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
150 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
151 JsonToSizedFlatbuffer<MessageHeader>(
152 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
153
154 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700155 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800156 writer.QueueSpan(config0.span());
157 writer.QueueSpan(m1.span());
158 writer.QueueSpan(m2.span());
159 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800160 }
Alexei Strots01395492023-03-20 13:59:56 -0700161 ASSERT_TRUE(std::filesystem::exists(logfile0)) << logfile0;
Austin Schuh32f68492020-11-08 21:45:51 -0800162
163 const std::vector<LogFile> parts = SortParts({logfile0});
164
165 PartsMessageReader reader(parts[0].parts[0]);
166
167 EXPECT_TRUE(reader.ReadMessage());
168 EXPECT_TRUE(reader.ReadMessage());
169 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
170}
171
Austin Schuhc41603c2020-10-11 16:17:37 -0700172// Tests that we can transparently re-assemble part files with a
173// PartsMessageReader.
174TEST(PartsMessageReaderTest, ReadWrite) {
175 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
176 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
177 unlink(logfile0.c_str());
178 unlink(logfile1.c_str());
179
180 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
181 JsonToSizedFlatbuffer<LogFileHeader>(
182 R"({
183 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800184 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700185 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
186 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
187 "parts_index": 0
188})");
189 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
190 JsonToSizedFlatbuffer<LogFileHeader>(
191 R"({
192 "max_out_of_order_duration": 200000000,
193 "monotonic_start_time": 0,
194 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800195 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700196 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
197 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
198 "parts_index": 1
199})");
200
201 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
202 JsonToSizedFlatbuffer<MessageHeader>(
203 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
204 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
205 JsonToSizedFlatbuffer<MessageHeader>(
206 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
207
208 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700209 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800210 writer.QueueSpan(config0.span());
211 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700212 }
213 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700214 TestDetachedBufferWriter writer(logfile1);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800215 writer.QueueSpan(config1.span());
216 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700217 }
218
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700219 // When parts are sorted, we choose the highest max out of order duration for
220 // all parts with the same part uuid.
Austin Schuhc41603c2020-10-11 16:17:37 -0700221 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
222
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700223 EXPECT_EQ(parts.size(), 1);
224 EXPECT_EQ(parts[0].parts.size(), 1);
225
Austin Schuhc41603c2020-10-11 16:17:37 -0700226 PartsMessageReader reader(parts[0].parts[0]);
227
228 EXPECT_EQ(reader.filename(), logfile0);
229
230 // Confirm that the timestamps track, and the filename also updates.
231 // Read the first message.
232 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700233 // Since config1 has higher max out of order duration, that will be used to
234 // read partfiles with same part uuid, i.e logfile0 and logfile1.
Austin Schuhc41603c2020-10-11 16:17:37 -0700235 EXPECT_EQ(
236 reader.max_out_of_order_duration(),
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700237 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuhc41603c2020-10-11 16:17:37 -0700238 EXPECT_TRUE(reader.ReadMessage());
239 EXPECT_EQ(reader.filename(), logfile0);
240 EXPECT_EQ(reader.newest_timestamp(),
241 monotonic_clock::time_point(chrono::nanoseconds(1)));
242 EXPECT_EQ(
243 reader.max_out_of_order_duration(),
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700244 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuhc41603c2020-10-11 16:17:37 -0700245
246 // Read the second message.
247 EXPECT_TRUE(reader.ReadMessage());
248 EXPECT_EQ(reader.filename(), logfile1);
249 EXPECT_EQ(reader.newest_timestamp(),
250 monotonic_clock::time_point(chrono::nanoseconds(2)));
251 EXPECT_EQ(
252 reader.max_out_of_order_duration(),
253 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
254
255 // And then confirm that reading again returns no message.
256 EXPECT_FALSE(reader.ReadMessage());
257 EXPECT_EQ(reader.filename(), logfile1);
258 EXPECT_EQ(
259 reader.max_out_of_order_duration(),
260 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800261 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700262
263 // Verify that the parts metadata has the correct max out of order duration.
264 EXPECT_EQ(
265 parts[0].parts[0].max_out_of_order_duration,
266 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuhc41603c2020-10-11 16:17:37 -0700267}
Austin Schuh32f68492020-11-08 21:45:51 -0800268
Austin Schuh1be0ce42020-11-29 22:43:26 -0800269// Tests that Message's operator < works as expected.
270TEST(MessageTest, Sorting) {
271 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
272
273 Message m1{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700274 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700275 .timestamp =
276 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
Austin Schuh48507722021-07-17 17:29:24 -0700277 .monotonic_remote_boot = 0xffffff,
278 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700279 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800280 Message m2{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700281 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700282 .timestamp =
283 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
Austin Schuh48507722021-07-17 17:29:24 -0700284 .monotonic_remote_boot = 0xffffff,
285 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700286 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800287
288 EXPECT_LT(m1, m2);
289 EXPECT_GE(m2, m1);
290
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700291 m1.timestamp.time = e;
292 m2.timestamp.time = e;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800293
294 m1.channel_index = 1;
295 m2.channel_index = 2;
296
297 EXPECT_LT(m1, m2);
298 EXPECT_GE(m2, m1);
299
300 m1.channel_index = 0;
301 m2.channel_index = 0;
Austin Schuh58646e22021-08-23 23:51:46 -0700302 m1.queue_index.index = 0u;
303 m2.queue_index.index = 1u;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800304
305 EXPECT_LT(m1, m2);
306 EXPECT_GE(m2, m1);
307}
308
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800309aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
310 const aos::FlatbufferDetachedBuffer<Configuration> &config,
311 const std::string_view json) {
312 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700313 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800314 flatbuffers::Offset<Configuration> config_offset =
315 aos::CopyFlatBuffer(config, &fbb);
316 LogFileHeader::Builder header_builder(fbb);
317 header_builder.add_configuration(config_offset);
318 fbb.Finish(header_builder.Finish());
319 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
320
321 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
322 JsonToFlatbuffer<LogFileHeader>(json));
323 CHECK(header_updates.Verify());
324 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700325 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800326 fbb2.FinishSizePrefixed(
327 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
328 return fbb2.Release();
329}
330
331class SortingElementTest : public ::testing::Test {
332 public:
333 SortingElementTest()
334 : config_(JsonToFlatbuffer<Configuration>(
335 R"({
336 "channels": [
337 {
338 "name": "/a",
339 "type": "aos.logger.testing.TestMessage",
340 "source_node": "pi1",
341 "destination_nodes": [
342 {
343 "name": "pi2"
344 },
345 {
346 "name": "pi3"
347 }
348 ]
349 },
350 {
351 "name": "/b",
352 "type": "aos.logger.testing.TestMessage",
353 "source_node": "pi1"
354 },
355 {
356 "name": "/c",
357 "type": "aos.logger.testing.TestMessage",
358 "source_node": "pi1"
Austin Schuh48507722021-07-17 17:29:24 -0700359 },
360 {
361 "name": "/d",
362 "type": "aos.logger.testing.TestMessage",
363 "source_node": "pi2",
364 "destination_nodes": [
365 {
366 "name": "pi1"
367 }
368 ]
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800369 }
370 ],
371 "nodes": [
372 {
373 "name": "pi1"
374 },
375 {
376 "name": "pi2"
377 },
378 {
379 "name": "pi3"
380 }
381 ]
382}
383)")),
384 config0_(MakeHeader(config_, R"({
385 /* 100ms */
386 "max_out_of_order_duration": 100000000,
387 "node": {
388 "name": "pi1"
389 },
390 "logger_node": {
391 "name": "pi1"
392 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800393 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800394 "realtime_start_time": 1000000000000,
395 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700396 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
397 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
398 "boot_uuids": [
399 "1d782c63-b3c7-466e-bea9-a01308b43333",
400 "",
401 ""
402 ],
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800403 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
404 "parts_index": 0
405})")),
406 config1_(MakeHeader(config_,
407 R"({
408 /* 100ms */
409 "max_out_of_order_duration": 100000000,
410 "node": {
411 "name": "pi1"
412 },
413 "logger_node": {
414 "name": "pi1"
415 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800416 "monotonic_start_time": 1000000,
417 "realtime_start_time": 1000000000000,
418 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700419 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
420 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
421 "boot_uuids": [
422 "1d782c63-b3c7-466e-bea9-a01308b43333",
423 "",
424 ""
425 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800426 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
427 "parts_index": 0
428})")),
429 config2_(MakeHeader(config_,
430 R"({
431 /* 100ms */
432 "max_out_of_order_duration": 100000000,
433 "node": {
434 "name": "pi2"
435 },
436 "logger_node": {
437 "name": "pi2"
438 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800439 "monotonic_start_time": 0,
440 "realtime_start_time": 1000000000000,
Austin Schuh48507722021-07-17 17:29:24 -0700441 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
442 "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
443 "boot_uuids": [
444 "",
445 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
446 ""
447 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800448 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
449 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
450 "parts_index": 0
451})")),
452 config3_(MakeHeader(config_,
453 R"({
454 /* 100ms */
455 "max_out_of_order_duration": 100000000,
456 "node": {
457 "name": "pi1"
458 },
459 "logger_node": {
460 "name": "pi1"
461 },
462 "monotonic_start_time": 2000000,
463 "realtime_start_time": 1000000000,
464 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
Austin Schuh48507722021-07-17 17:29:24 -0700465 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
466 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
467 "boot_uuids": [
468 "1d782c63-b3c7-466e-bea9-a01308b43333",
469 "",
470 ""
471 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800472 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800473 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800474})")),
475 config4_(MakeHeader(config_,
476 R"({
477 /* 100ms */
478 "max_out_of_order_duration": 100000000,
479 "node": {
480 "name": "pi2"
481 },
482 "logger_node": {
483 "name": "pi1"
484 },
485 "monotonic_start_time": 2000000,
486 "realtime_start_time": 1000000000,
487 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
488 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
Austin Schuh48507722021-07-17 17:29:24 -0700489 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
490 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
491 "boot_uuids": [
492 "1d782c63-b3c7-466e-bea9-a01308b43333",
493 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
494 ""
495 ],
Austin Schuh8bf1e632021-01-02 22:41:04 -0800496 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800497})")) {
498 unlink(logfile0_.c_str());
499 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800500 unlink(logfile2_.c_str());
Austin Schuh48507722021-07-17 17:29:24 -0700501 unlink(logfile3_.c_str());
Austin Schuh921b2562021-07-31 19:37:03 -0700502 queue_index_.resize(config_.message().channels()->size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800503 }
504
505 protected:
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800506 flatbuffers::DetachedBuffer MakeLogMessage(
507 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
508 int value) {
509 flatbuffers::FlatBufferBuilder message_fbb;
510 message_fbb.ForceDefaults(true);
511 TestMessage::Builder test_message_builder(message_fbb);
512 test_message_builder.add_value(value);
513 message_fbb.Finish(test_message_builder.Finish());
514
515 aos::Context context;
516 context.monotonic_event_time = monotonic_now;
517 context.realtime_event_time = aos::realtime_clock::epoch() +
518 chrono::seconds(1000) +
519 monotonic_now.time_since_epoch();
Austin Schuh921b2562021-07-31 19:37:03 -0700520 CHECK_LT(static_cast<size_t>(channel_index), queue_index_.size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800521 context.queue_index = queue_index_[channel_index];
522 context.size = message_fbb.GetSize();
523 context.data = message_fbb.GetBufferPointer();
524
525 ++queue_index_[channel_index];
526
527 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh53e3a812021-11-03 16:57:23 -0700528 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800529 fbb.FinishSizePrefixed(
530 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
531
532 return fbb.Release();
533 }
534
535 flatbuffers::DetachedBuffer MakeTimestampMessage(
536 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800537 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
538 monotonic_clock::time_point monotonic_timestamp_time =
539 monotonic_clock::min_time) {
540 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800541 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800542
543 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800544 fbb.ForceDefaults(true);
545
546 logger::MessageHeader::Builder message_header_builder(fbb);
547
548 message_header_builder.add_channel_index(channel_index);
549
550 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
551 100);
552 message_header_builder.add_monotonic_sent_time(
553 monotonic_sent_time.time_since_epoch().count());
554 message_header_builder.add_realtime_sent_time(
555 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
556 monotonic_sent_time.time_since_epoch())
557 .time_since_epoch()
558 .count());
559
560 message_header_builder.add_monotonic_remote_time(
561 sender_monotonic_now.time_since_epoch().count());
562 message_header_builder.add_realtime_remote_time(
563 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
564 sender_monotonic_now.time_since_epoch())
565 .time_since_epoch()
566 .count());
567 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
568 1);
569
570 if (monotonic_timestamp_time != monotonic_clock::min_time) {
571 message_header_builder.add_monotonic_timestamp_time(
572 monotonic_timestamp_time.time_since_epoch().count());
573 }
574
575 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800576 LOG(INFO) << aos::FlatbufferToJson(
577 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
578 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
579
580 return fbb.Release();
581 }
582
583 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
584 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800585 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh48507722021-07-17 17:29:24 -0700586 const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800587
588 const aos::FlatbufferDetachedBuffer<Configuration> config_;
589 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
590 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800591 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
592 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800593 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800594
595 std::vector<uint32_t> queue_index_;
596};
597
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700598using MessageSorterTest = SortingElementTest;
599using MessageSorterDeathTest = MessageSorterTest;
600using PartsMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800601using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800602
603// Tests that we can pull messages out of a log sorted in order.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700604TEST_F(MessageSorterTest, Pull) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800605 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
606 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700607 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800608 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700609 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800610 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700611 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800612 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700613 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800614 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700615 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800616 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
617 }
618
619 const std::vector<LogFile> parts = SortParts({logfile0_});
620
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700621 MessageSorter message_sorter(parts[0].parts[0]);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800622
623 // Confirm we aren't sorted until any time until the message is popped.
624 // Peeking shouldn't change the sorted until time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700625 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800626
627 std::deque<Message> output;
628
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700629 ASSERT_TRUE(message_sorter.Front() != nullptr);
630 output.emplace_back(std::move(*message_sorter.Front()));
631 message_sorter.PopFront();
632 EXPECT_EQ(message_sorter.sorted_until(), e + chrono::milliseconds(1900));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800633
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700634 ASSERT_TRUE(message_sorter.Front() != nullptr);
635 output.emplace_back(std::move(*message_sorter.Front()));
636 message_sorter.PopFront();
637 EXPECT_EQ(message_sorter.sorted_until(), e + chrono::milliseconds(1900));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800638
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700639 ASSERT_TRUE(message_sorter.Front() != nullptr);
640 output.emplace_back(std::move(*message_sorter.Front()));
641 message_sorter.PopFront();
642 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::max_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800643
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700644 ASSERT_TRUE(message_sorter.Front() != nullptr);
645 output.emplace_back(std::move(*message_sorter.Front()));
646 message_sorter.PopFront();
647 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::max_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800648
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700649 ASSERT_TRUE(message_sorter.Front() == nullptr);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800650
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700651 EXPECT_EQ(output[0].timestamp.boot, 0);
652 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
653 EXPECT_EQ(output[1].timestamp.boot, 0);
654 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1000));
655 EXPECT_EQ(output[2].timestamp.boot, 0);
656 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1901));
657 EXPECT_EQ(output[3].timestamp.boot, 0);
658 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800659}
660
Austin Schuhb000de62020-12-03 22:00:40 -0800661// Tests that we can pull messages out of a log sorted in order.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700662TEST_F(MessageSorterTest, WayBeforeStart) {
Austin Schuhb000de62020-12-03 22:00:40 -0800663 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
664 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700665 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhb000de62020-12-03 22:00:40 -0800666 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700667 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800668 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700669 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800670 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700671 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800672 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700673 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800674 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700675 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800676 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
677 }
678
679 const std::vector<LogFile> parts = SortParts({logfile0_});
680
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700681 MessageSorter message_sorter(parts[0].parts[0]);
Austin Schuhb000de62020-12-03 22:00:40 -0800682
683 // Confirm we aren't sorted until any time until the message is popped.
684 // Peeking shouldn't change the sorted until time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700685 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
Austin Schuhb000de62020-12-03 22:00:40 -0800686
687 std::deque<Message> output;
688
689 for (monotonic_clock::time_point t :
690 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
691 e + chrono::milliseconds(1900), monotonic_clock::max_time,
692 monotonic_clock::max_time}) {
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700693 ASSERT_TRUE(message_sorter.Front() != nullptr);
694 output.emplace_back(std::move(*message_sorter.Front()));
695 message_sorter.PopFront();
696 EXPECT_EQ(message_sorter.sorted_until(), t);
Austin Schuhb000de62020-12-03 22:00:40 -0800697 }
698
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700699 ASSERT_TRUE(message_sorter.Front() == nullptr);
Austin Schuhb000de62020-12-03 22:00:40 -0800700
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700701 EXPECT_EQ(output[0].timestamp.boot, 0u);
702 EXPECT_EQ(output[0].timestamp.time, e - chrono::milliseconds(1000));
703 EXPECT_EQ(output[1].timestamp.boot, 0u);
704 EXPECT_EQ(output[1].timestamp.time, e - chrono::milliseconds(500));
705 EXPECT_EQ(output[2].timestamp.boot, 0u);
706 EXPECT_EQ(output[2].timestamp.time, e - chrono::milliseconds(10));
707 EXPECT_EQ(output[3].timestamp.boot, 0u);
708 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(1901));
709 EXPECT_EQ(output[4].timestamp.boot, 0u);
710 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuhb000de62020-12-03 22:00:40 -0800711}
712
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800713// Tests that messages too far out of order trigger death.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700714TEST_F(MessageSorterDeathTest, Pull) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800715 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
716 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700717 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800718 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700719 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800720 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700721 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800722 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700723 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800724 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
725 // The following message is too far out of order and will trigger the CHECK.
Austin Schuhd863e6e2022-10-16 15:44:50 -0700726 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800727 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
728 }
729
730 const std::vector<LogFile> parts = SortParts({logfile0_});
731
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700732 MessageSorter message_sorter(parts[0].parts[0]);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800733
734 // Confirm we aren't sorted until any time until the message is popped.
735 // Peeking shouldn't change the sorted until time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700736 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800737 std::deque<Message> output;
738
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700739 ASSERT_TRUE(message_sorter.Front() != nullptr);
740 message_sorter.PopFront();
741 ASSERT_TRUE(message_sorter.Front() != nullptr);
742 ASSERT_TRUE(message_sorter.Front() != nullptr);
743 message_sorter.PopFront();
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800744
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700745 EXPECT_DEATH({ message_sorter.Front(); },
Austin Schuh58646e22021-08-23 23:51:46 -0700746 "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800747}
748
Austin Schuh8f52ed52020-11-30 23:12:39 -0800749// Tests that we can merge data from 2 separate files, including duplicate data.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700750TEST_F(PartsMergerTest, TwoFileMerger) {
Austin Schuh8f52ed52020-11-30 23:12:39 -0800751 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
752 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700753 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800754 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700755 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800756 writer1.QueueSpan(config1_.span());
757
Austin Schuhd863e6e2022-10-16 15:44:50 -0700758 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800759 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700760 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800761 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
762
Austin Schuhd863e6e2022-10-16 15:44:50 -0700763 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800764 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700765 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800766 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
767
768 // Make a duplicate!
769 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
770 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
771 writer0.QueueSpan(msg.span());
772 writer1.QueueSpan(msg.span());
773
Austin Schuhd863e6e2022-10-16 15:44:50 -0700774 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800775 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
776 }
777
778 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -0700779 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -0800780 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800781
Austin Schuh63097262023-08-16 17:04:29 -0700782 PartsMerger merger(
783 log_files.SelectParts("pi1", 0,
784 {StoredDataType::DATA, StoredDataType::TIMESTAMPS,
785 StoredDataType::REMOTE_TIMESTAMPS}));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800786
787 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
788
789 std::deque<Message> output;
790
791 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
792 ASSERT_TRUE(merger.Front() != nullptr);
793 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
794
795 output.emplace_back(std::move(*merger.Front()));
796 merger.PopFront();
797 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
798
799 ASSERT_TRUE(merger.Front() != nullptr);
800 output.emplace_back(std::move(*merger.Front()));
801 merger.PopFront();
802 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
803
804 ASSERT_TRUE(merger.Front() != nullptr);
805 output.emplace_back(std::move(*merger.Front()));
806 merger.PopFront();
807 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
808
809 ASSERT_TRUE(merger.Front() != nullptr);
810 output.emplace_back(std::move(*merger.Front()));
811 merger.PopFront();
812 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
813
814 ASSERT_TRUE(merger.Front() != nullptr);
815 output.emplace_back(std::move(*merger.Front()));
816 merger.PopFront();
817 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
818
819 ASSERT_TRUE(merger.Front() != nullptr);
820 output.emplace_back(std::move(*merger.Front()));
821 merger.PopFront();
822 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
823
824 ASSERT_TRUE(merger.Front() == nullptr);
825
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700826 EXPECT_EQ(output[0].timestamp.boot, 0u);
827 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
828 EXPECT_EQ(output[1].timestamp.boot, 0u);
829 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1001));
830 EXPECT_EQ(output[2].timestamp.boot, 0u);
831 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1002));
832 EXPECT_EQ(output[3].timestamp.boot, 0u);
833 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
834 EXPECT_EQ(output[4].timestamp.boot, 0u);
835 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(3000));
836 EXPECT_EQ(output[5].timestamp.boot, 0u);
837 EXPECT_EQ(output[5].timestamp.time, e + chrono::milliseconds(3002));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800838}
839
Austin Schuh8bf1e632021-01-02 22:41:04 -0800840// Tests that we can merge timestamps with various combinations of
841// monotonic_timestamp_time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700842TEST_F(PartsMergerTest, TwoFileTimestampMerger) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800843 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
844 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700845 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800846 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700847 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800848 writer1.QueueSpan(config1_.span());
849
850 // Neither has it.
851 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700852 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800853 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700854 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800855 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
856
857 // First only has it.
858 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700859 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800860 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
861 e + chrono::nanoseconds(971)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700862 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800863 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
864
865 // Second only has it.
866 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700867 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800868 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700869 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800870 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
871 e + chrono::nanoseconds(972)));
872
873 // Both have it.
874 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700875 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800876 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
877 e + chrono::nanoseconds(973)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700878 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800879 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
880 e + chrono::nanoseconds(973)));
881 }
882
883 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -0700884 LogFilesContainer log_files(parts);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800885 ASSERT_EQ(parts.size(), 1u);
886
Austin Schuh63097262023-08-16 17:04:29 -0700887 PartsMerger merger(
888 log_files.SelectParts("pi1", 0,
889 {StoredDataType::DATA, StoredDataType::TIMESTAMPS,
890 StoredDataType::REMOTE_TIMESTAMPS}));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800891
892 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
893
894 std::deque<Message> output;
895
896 for (int i = 0; i < 4; ++i) {
897 ASSERT_TRUE(merger.Front() != nullptr);
898 output.emplace_back(std::move(*merger.Front()));
899 merger.PopFront();
900 }
901 ASSERT_TRUE(merger.Front() == nullptr);
902
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700903 EXPECT_EQ(output[0].timestamp.boot, 0u);
904 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700905 EXPECT_FALSE(output[0].data->has_monotonic_timestamp_time);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700906
907 EXPECT_EQ(output[1].timestamp.boot, 0u);
908 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700909 EXPECT_TRUE(output[1].data->has_monotonic_timestamp_time);
910 EXPECT_EQ(output[1].data->monotonic_timestamp_time,
911 monotonic_clock::time_point(std::chrono::nanoseconds(971)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700912
913 EXPECT_EQ(output[2].timestamp.boot, 0u);
914 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700915 EXPECT_TRUE(output[2].data->has_monotonic_timestamp_time);
916 EXPECT_EQ(output[2].data->monotonic_timestamp_time,
917 monotonic_clock::time_point(std::chrono::nanoseconds(972)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700918
919 EXPECT_EQ(output[3].timestamp.boot, 0u);
920 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700921 EXPECT_TRUE(output[3].data->has_monotonic_timestamp_time);
922 EXPECT_EQ(output[3].data->monotonic_timestamp_time,
923 monotonic_clock::time_point(std::chrono::nanoseconds(973)));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800924}
925
Austin Schuhd2f96102020-12-01 20:27:29 -0800926// Tests that we can match timestamps on delivered messages.
927TEST_F(TimestampMapperTest, ReadNode0First) {
928 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
929 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700930 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800931 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700932 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800933 writer1.QueueSpan(config2_.span());
934
Austin Schuhd863e6e2022-10-16 15:44:50 -0700935 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800936 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700937 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800938 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
939
Austin Schuhd863e6e2022-10-16 15:44:50 -0700940 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800941 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700942 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800943 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
944
Austin Schuhd863e6e2022-10-16 15:44:50 -0700945 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800946 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700947 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800948 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
949 }
950
951 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -0700952 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -0800953 ASSERT_EQ(parts[0].logger_node, "pi1");
954 ASSERT_EQ(parts[1].logger_node, "pi2");
955
Austin Schuh79b30942021-01-24 22:32:21 -0800956 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -0700957
Austin Schuh63097262023-08-16 17:04:29 -0700958 TimestampMapper mapper0("pi1", log_files,
959 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -0800960 mapper0.set_timestamp_callback(
961 [&](TimestampedMessage *) { ++mapper0_count; });
962 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -0700963 TimestampMapper mapper1("pi2", log_files,
964 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -0800965 mapper1.set_timestamp_callback(
966 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -0800967
968 mapper0.AddPeer(&mapper1);
969 mapper1.AddPeer(&mapper0);
970
971 {
972 std::deque<TimestampedMessage> output0;
973
Austin Schuh79b30942021-01-24 22:32:21 -0800974 EXPECT_EQ(mapper0_count, 0u);
975 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800976 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800977 EXPECT_EQ(mapper0_count, 1u);
978 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800979 output0.emplace_back(std::move(*mapper0.Front()));
980 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700981 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800982 EXPECT_EQ(mapper0_count, 1u);
983 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800984
985 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800986 EXPECT_EQ(mapper0_count, 2u);
987 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800988 output0.emplace_back(std::move(*mapper0.Front()));
989 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700990 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800991
992 ASSERT_TRUE(mapper0.Front() != nullptr);
993 output0.emplace_back(std::move(*mapper0.Front()));
994 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700995 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800996
Austin Schuh79b30942021-01-24 22:32:21 -0800997 EXPECT_EQ(mapper0_count, 3u);
998 EXPECT_EQ(mapper1_count, 0u);
999
Austin Schuhd2f96102020-12-01 20:27:29 -08001000 ASSERT_TRUE(mapper0.Front() == nullptr);
1001
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001002 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1003 EXPECT_EQ(output0[0].monotonic_event_time.time,
1004 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001005 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001006
1007 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1008 EXPECT_EQ(output0[1].monotonic_event_time.time,
1009 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001010 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001011
1012 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1013 EXPECT_EQ(output0[2].monotonic_event_time.time,
1014 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001015 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001016 }
1017
1018 {
1019 SCOPED_TRACE("Trying node1 now");
1020 std::deque<TimestampedMessage> output1;
1021
Austin Schuh79b30942021-01-24 22:32:21 -08001022 EXPECT_EQ(mapper0_count, 3u);
1023 EXPECT_EQ(mapper1_count, 0u);
1024
Austin Schuhd2f96102020-12-01 20:27:29 -08001025 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001026 EXPECT_EQ(mapper0_count, 3u);
1027 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001028 output1.emplace_back(std::move(*mapper1.Front()));
1029 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001030 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -08001031 EXPECT_EQ(mapper0_count, 3u);
1032 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001033
1034 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001035 EXPECT_EQ(mapper0_count, 3u);
1036 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001037 output1.emplace_back(std::move(*mapper1.Front()));
1038 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001039 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001040
1041 ASSERT_TRUE(mapper1.Front() != nullptr);
1042 output1.emplace_back(std::move(*mapper1.Front()));
1043 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001044 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001045
Austin Schuh79b30942021-01-24 22:32:21 -08001046 EXPECT_EQ(mapper0_count, 3u);
1047 EXPECT_EQ(mapper1_count, 3u);
1048
Austin Schuhd2f96102020-12-01 20:27:29 -08001049 ASSERT_TRUE(mapper1.Front() == nullptr);
1050
Austin Schuh79b30942021-01-24 22:32:21 -08001051 EXPECT_EQ(mapper0_count, 3u);
1052 EXPECT_EQ(mapper1_count, 3u);
1053
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001054 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1055 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001056 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001057 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001058
1059 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1060 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001061 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001062 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001063
1064 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1065 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001066 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001067 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001068 }
1069}
1070
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001071// Tests that we filter messages using the channel filter callback
1072TEST_F(TimestampMapperTest, ReplayChannelsCallbackTest) {
1073 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1074 {
1075 TestDetachedBufferWriter writer0(logfile0_);
1076 writer0.QueueSpan(config0_.span());
1077 TestDetachedBufferWriter writer1(logfile1_);
1078 writer1.QueueSpan(config2_.span());
1079
1080 writer0.WriteSizedFlatbuffer(
1081 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1082 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1083 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1084
1085 writer0.WriteSizedFlatbuffer(
1086 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1087 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1088 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1089
1090 writer0.WriteSizedFlatbuffer(
1091 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1092 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1093 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1094 }
1095
1096 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001097 LogFilesContainer log_files(parts);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001098 ASSERT_EQ(parts[0].logger_node, "pi1");
1099 ASSERT_EQ(parts[1].logger_node, "pi2");
1100
1101 // mapper0 will not provide any messages while mapper1 will provide all
1102 // messages due to the channel filter callbacks used
1103 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001104
Austin Schuh63097262023-08-16 17:04:29 -07001105 TimestampMapper mapper0("pi1", log_files,
1106 TimestampQueueStrategy::kQueueTogether);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001107 mapper0.set_timestamp_callback(
1108 [&](TimestampedMessage *) { ++mapper0_count; });
1109 mapper0.set_replay_channels_callback(
1110 [&](const TimestampedMessage &) -> bool { return mapper0_count != 2; });
1111 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001112 TimestampMapper mapper1("pi2", log_files,
1113 TimestampQueueStrategy::kQueueTogether);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001114 mapper1.set_timestamp_callback(
1115 [&](TimestampedMessage *) { ++mapper1_count; });
1116 mapper1.set_replay_channels_callback(
1117 [&](const TimestampedMessage &) -> bool { return mapper1_count != 2; });
1118
1119 mapper0.AddPeer(&mapper1);
1120 mapper1.AddPeer(&mapper0);
1121
1122 {
1123 std::deque<TimestampedMessage> output0;
1124
1125 EXPECT_EQ(mapper0_count, 0u);
1126 EXPECT_EQ(mapper1_count, 0u);
1127
1128 ASSERT_TRUE(mapper0.Front() != nullptr);
1129 EXPECT_EQ(mapper0_count, 1u);
1130 EXPECT_EQ(mapper1_count, 0u);
1131 output0.emplace_back(std::move(*mapper0.Front()));
1132 mapper0.PopFront();
1133
1134 EXPECT_TRUE(mapper0.started());
1135 EXPECT_EQ(mapper0_count, 1u);
1136 EXPECT_EQ(mapper1_count, 0u);
1137
1138 // mapper0_count is now at 3 since the second message is not queued, but
1139 // timestamp_callback needs to be called everytime even if Front() does not
1140 // provide a message due to the replay_channels_callback.
1141 ASSERT_TRUE(mapper0.Front() != nullptr);
1142 EXPECT_EQ(mapper0_count, 3u);
1143 EXPECT_EQ(mapper1_count, 0u);
1144 output0.emplace_back(std::move(*mapper0.Front()));
1145 mapper0.PopFront();
1146
1147 EXPECT_TRUE(mapper0.started());
1148 EXPECT_EQ(mapper0_count, 3u);
1149 EXPECT_EQ(mapper1_count, 0u);
1150
1151 ASSERT_TRUE(mapper0.Front() == nullptr);
1152 EXPECT_TRUE(mapper0.started());
1153
1154 EXPECT_EQ(mapper0_count, 3u);
1155 EXPECT_EQ(mapper1_count, 0u);
1156
1157 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1158 EXPECT_EQ(output0[0].monotonic_event_time.time,
1159 e + chrono::milliseconds(1000));
1160 EXPECT_TRUE(output0[0].data != nullptr);
1161
1162 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1163 EXPECT_EQ(output0[1].monotonic_event_time.time,
1164 e + chrono::milliseconds(3000));
1165 EXPECT_TRUE(output0[1].data != nullptr);
1166 }
1167
1168 {
1169 SCOPED_TRACE("Trying node1 now");
1170 std::deque<TimestampedMessage> output1;
1171
1172 EXPECT_EQ(mapper0_count, 3u);
1173 EXPECT_EQ(mapper1_count, 0u);
1174
1175 ASSERT_TRUE(mapper1.Front() != nullptr);
1176 EXPECT_EQ(mapper0_count, 3u);
1177 EXPECT_EQ(mapper1_count, 1u);
1178 output1.emplace_back(std::move(*mapper1.Front()));
1179 mapper1.PopFront();
1180 EXPECT_TRUE(mapper1.started());
1181 EXPECT_EQ(mapper0_count, 3u);
1182 EXPECT_EQ(mapper1_count, 1u);
1183
1184 // mapper1_count is now at 3 since the second message is not queued, but
1185 // timestamp_callback needs to be called everytime even if Front() does not
1186 // provide a message due to the replay_channels_callback.
1187 ASSERT_TRUE(mapper1.Front() != nullptr);
1188 output1.emplace_back(std::move(*mapper1.Front()));
1189 mapper1.PopFront();
1190 EXPECT_TRUE(mapper1.started());
1191
1192 EXPECT_EQ(mapper0_count, 3u);
1193 EXPECT_EQ(mapper1_count, 3u);
1194
1195 ASSERT_TRUE(mapper1.Front() == nullptr);
1196
1197 EXPECT_EQ(mapper0_count, 3u);
1198 EXPECT_EQ(mapper1_count, 3u);
1199
1200 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1201 EXPECT_EQ(output1[0].monotonic_event_time.time,
1202 e + chrono::seconds(100) + chrono::milliseconds(1000));
1203 EXPECT_TRUE(output1[0].data != nullptr);
1204
1205 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1206 EXPECT_EQ(output1[1].monotonic_event_time.time,
1207 e + chrono::seconds(100) + chrono::milliseconds(3000));
1208 EXPECT_TRUE(output1[1].data != nullptr);
1209 }
1210}
Austin Schuh8bf1e632021-01-02 22:41:04 -08001211// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
1212// returned.
1213TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
1214 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1215 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001216 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001217 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001218 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001219 writer1.QueueSpan(config4_.span());
1220
Austin Schuhd863e6e2022-10-16 15:44:50 -07001221 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001222 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001223 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001224 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
1225 e + chrono::nanoseconds(971)));
1226
Austin Schuhd863e6e2022-10-16 15:44:50 -07001227 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001228 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001229 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001230 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
1231 e + chrono::nanoseconds(5458)));
1232
Austin Schuhd863e6e2022-10-16 15:44:50 -07001233 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001234 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001235 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001236 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1237 }
1238
1239 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001240 LogFilesContainer log_files(parts);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001241 ASSERT_EQ(parts.size(), 1u);
1242
Austin Schuh79b30942021-01-24 22:32:21 -08001243 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001244 TimestampMapper mapper0("pi1", log_files,
1245 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001246 mapper0.set_timestamp_callback(
1247 [&](TimestampedMessage *) { ++mapper0_count; });
1248 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001249 TimestampMapper mapper1("pi2", log_files,
1250 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001251 mapper1.set_timestamp_callback(
1252 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -08001253
1254 mapper0.AddPeer(&mapper1);
1255 mapper1.AddPeer(&mapper0);
1256
1257 {
1258 std::deque<TimestampedMessage> output0;
1259
1260 for (int i = 0; i < 3; ++i) {
1261 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
1262 output0.emplace_back(std::move(*mapper0.Front()));
1263 mapper0.PopFront();
1264 }
1265
1266 ASSERT_TRUE(mapper0.Front() == nullptr);
1267
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001268 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1269 EXPECT_EQ(output0[0].monotonic_event_time.time,
1270 e + chrono::milliseconds(1000));
1271 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
1272 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
1273 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001274 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001275
1276 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1277 EXPECT_EQ(output0[1].monotonic_event_time.time,
1278 e + chrono::milliseconds(2000));
1279 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
1280 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
1281 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001282 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001283
1284 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1285 EXPECT_EQ(output0[2].monotonic_event_time.time,
1286 e + chrono::milliseconds(3000));
1287 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
1288 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
1289 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001290 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001291 }
1292
1293 {
1294 SCOPED_TRACE("Trying node1 now");
1295 std::deque<TimestampedMessage> output1;
1296
1297 for (int i = 0; i < 3; ++i) {
1298 ASSERT_TRUE(mapper1.Front() != nullptr);
1299 output1.emplace_back(std::move(*mapper1.Front()));
1300 mapper1.PopFront();
1301 }
1302
1303 ASSERT_TRUE(mapper1.Front() == nullptr);
1304
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001305 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1306 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001307 e + chrono::seconds(100) + chrono::milliseconds(1000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001308 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
1309 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001310 e + chrono::nanoseconds(971));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001311 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001312
1313 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1314 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001315 e + chrono::seconds(100) + chrono::milliseconds(2000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001316 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
1317 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001318 e + chrono::nanoseconds(5458));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001319 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001320
1321 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1322 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001323 e + chrono::seconds(100) + chrono::milliseconds(3000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001324 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
1325 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
1326 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001327 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001328 }
Austin Schuh79b30942021-01-24 22:32:21 -08001329
1330 EXPECT_EQ(mapper0_count, 3u);
1331 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001332}
1333
Austin Schuhd2f96102020-12-01 20:27:29 -08001334// Tests that we can match timestamps on delivered messages. By doing this in
1335// the reverse order, the second node needs to queue data up from the first node
1336// to find the matching timestamp.
1337TEST_F(TimestampMapperTest, ReadNode1First) {
1338 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1339 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001340 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001341 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001342 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001343 writer1.QueueSpan(config2_.span());
1344
Austin Schuhd863e6e2022-10-16 15:44:50 -07001345 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001346 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001347 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001348 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1349
Austin Schuhd863e6e2022-10-16 15:44:50 -07001350 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001351 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001352 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001353 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1354
Austin Schuhd863e6e2022-10-16 15:44:50 -07001355 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001356 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001357 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001358 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1359 }
1360
1361 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001362 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001363
1364 ASSERT_EQ(parts[0].logger_node, "pi1");
1365 ASSERT_EQ(parts[1].logger_node, "pi2");
1366
Austin Schuh79b30942021-01-24 22:32:21 -08001367 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001368 TimestampMapper mapper0("pi1", log_files,
1369 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001370 mapper0.set_timestamp_callback(
1371 [&](TimestampedMessage *) { ++mapper0_count; });
1372 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001373 TimestampMapper mapper1("pi2", log_files,
1374 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001375 mapper1.set_timestamp_callback(
1376 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001377
1378 mapper0.AddPeer(&mapper1);
1379 mapper1.AddPeer(&mapper0);
1380
1381 {
1382 SCOPED_TRACE("Trying node1 now");
1383 std::deque<TimestampedMessage> output1;
1384
1385 ASSERT_TRUE(mapper1.Front() != nullptr);
1386 output1.emplace_back(std::move(*mapper1.Front()));
1387 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001388 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001389
1390 ASSERT_TRUE(mapper1.Front() != nullptr);
1391 output1.emplace_back(std::move(*mapper1.Front()));
1392 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001393 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001394
1395 ASSERT_TRUE(mapper1.Front() != nullptr);
1396 output1.emplace_back(std::move(*mapper1.Front()));
1397 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001398 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001399
1400 ASSERT_TRUE(mapper1.Front() == nullptr);
1401
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001402 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1403 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001404 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001405 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001406
1407 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1408 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001409 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001410 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001411
1412 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1413 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001414 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001415 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001416 }
1417
1418 {
1419 std::deque<TimestampedMessage> output0;
1420
1421 ASSERT_TRUE(mapper0.Front() != nullptr);
1422 output0.emplace_back(std::move(*mapper0.Front()));
1423 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001424 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001425
1426 ASSERT_TRUE(mapper0.Front() != nullptr);
1427 output0.emplace_back(std::move(*mapper0.Front()));
1428 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001429 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001430
1431 ASSERT_TRUE(mapper0.Front() != nullptr);
1432 output0.emplace_back(std::move(*mapper0.Front()));
1433 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001434 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001435
1436 ASSERT_TRUE(mapper0.Front() == nullptr);
1437
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001438 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1439 EXPECT_EQ(output0[0].monotonic_event_time.time,
1440 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001441 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001442
1443 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1444 EXPECT_EQ(output0[1].monotonic_event_time.time,
1445 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001446 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001447
1448 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1449 EXPECT_EQ(output0[2].monotonic_event_time.time,
1450 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001451 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001452 }
Austin Schuh79b30942021-01-24 22:32:21 -08001453
1454 EXPECT_EQ(mapper0_count, 3u);
1455 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001456}
1457
1458// Tests that we return just the timestamps if we couldn't find the data and the
1459// missing data was at the beginning of the file.
1460TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1461 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1462 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001463 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001464 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001465 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001466 writer1.QueueSpan(config2_.span());
1467
1468 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001469 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001470 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1471
Austin Schuhd863e6e2022-10-16 15:44:50 -07001472 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001473 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001474 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001475 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1476
Austin Schuhd863e6e2022-10-16 15:44:50 -07001477 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001478 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001479 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001480 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1481 }
1482
1483 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001484 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001485
1486 ASSERT_EQ(parts[0].logger_node, "pi1");
1487 ASSERT_EQ(parts[1].logger_node, "pi2");
1488
Austin Schuh79b30942021-01-24 22:32:21 -08001489 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001490 TimestampMapper mapper0("pi1", log_files,
1491 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001492 mapper0.set_timestamp_callback(
1493 [&](TimestampedMessage *) { ++mapper0_count; });
1494 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001495 TimestampMapper mapper1("pi2", log_files,
1496 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001497 mapper1.set_timestamp_callback(
1498 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001499
1500 mapper0.AddPeer(&mapper1);
1501 mapper1.AddPeer(&mapper0);
1502
1503 {
1504 SCOPED_TRACE("Trying node1 now");
1505 std::deque<TimestampedMessage> output1;
1506
1507 ASSERT_TRUE(mapper1.Front() != nullptr);
1508 output1.emplace_back(std::move(*mapper1.Front()));
1509 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001510 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001511
1512 ASSERT_TRUE(mapper1.Front() != nullptr);
1513 output1.emplace_back(std::move(*mapper1.Front()));
1514 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001515 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001516
1517 ASSERT_TRUE(mapper1.Front() != nullptr);
1518 output1.emplace_back(std::move(*mapper1.Front()));
1519 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001520 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001521
1522 ASSERT_TRUE(mapper1.Front() == nullptr);
1523
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001524 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1525 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001526 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001527 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001528
1529 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1530 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001531 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001532 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001533
1534 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1535 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001536 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001537 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001538 }
Austin Schuh79b30942021-01-24 22:32:21 -08001539
1540 EXPECT_EQ(mapper0_count, 0u);
1541 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001542}
1543
1544// Tests that we return just the timestamps if we couldn't find the data and the
1545// missing data was at the end of the file.
1546TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1547 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1548 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001549 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001550 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001551 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001552 writer1.QueueSpan(config2_.span());
1553
Austin Schuhd863e6e2022-10-16 15:44:50 -07001554 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001555 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001556 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001557 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1558
Austin Schuhd863e6e2022-10-16 15:44:50 -07001559 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001560 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001561 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001562 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1563
1564 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001565 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001566 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1567 }
1568
1569 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001570 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001571
1572 ASSERT_EQ(parts[0].logger_node, "pi1");
1573 ASSERT_EQ(parts[1].logger_node, "pi2");
1574
Austin Schuh79b30942021-01-24 22:32:21 -08001575 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001576 TimestampMapper mapper0("pi1", log_files,
1577 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001578 mapper0.set_timestamp_callback(
1579 [&](TimestampedMessage *) { ++mapper0_count; });
1580 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001581 TimestampMapper mapper1("pi2", log_files,
1582 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001583 mapper1.set_timestamp_callback(
1584 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001585
1586 mapper0.AddPeer(&mapper1);
1587 mapper1.AddPeer(&mapper0);
1588
1589 {
1590 SCOPED_TRACE("Trying node1 now");
1591 std::deque<TimestampedMessage> output1;
1592
1593 ASSERT_TRUE(mapper1.Front() != nullptr);
1594 output1.emplace_back(std::move(*mapper1.Front()));
1595 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001596 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001597
1598 ASSERT_TRUE(mapper1.Front() != nullptr);
1599 output1.emplace_back(std::move(*mapper1.Front()));
1600 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001601 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001602
1603 ASSERT_TRUE(mapper1.Front() != nullptr);
1604 output1.emplace_back(std::move(*mapper1.Front()));
1605 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001606 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001607
1608 ASSERT_TRUE(mapper1.Front() == nullptr);
1609
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001610 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1611 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001612 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001613 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001614
1615 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1616 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001617 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001618 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001619
1620 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1621 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001622 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001623 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001624 }
Austin Schuh79b30942021-01-24 22:32:21 -08001625
1626 EXPECT_EQ(mapper0_count, 0u);
1627 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001628}
1629
Austin Schuh993ccb52020-12-12 15:59:32 -08001630// Tests that we handle a message which failed to forward or be logged.
1631TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1632 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1633 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001634 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001635 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001636 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001637 writer1.QueueSpan(config2_.span());
1638
Austin Schuhd863e6e2022-10-16 15:44:50 -07001639 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001640 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001641 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001642 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1643
1644 // Create both the timestamp and message, but don't log them, simulating a
1645 // forwarding drop.
1646 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1647 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1648 chrono::seconds(100));
1649
Austin Schuhd863e6e2022-10-16 15:44:50 -07001650 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001651 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001652 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001653 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1654 }
1655
1656 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001657 LogFilesContainer log_files(parts);
Austin Schuh993ccb52020-12-12 15:59:32 -08001658
1659 ASSERT_EQ(parts[0].logger_node, "pi1");
1660 ASSERT_EQ(parts[1].logger_node, "pi2");
1661
Austin Schuh79b30942021-01-24 22:32:21 -08001662 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001663 TimestampMapper mapper0("pi1", log_files,
1664 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001665 mapper0.set_timestamp_callback(
1666 [&](TimestampedMessage *) { ++mapper0_count; });
1667 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001668 TimestampMapper mapper1("pi2", log_files,
1669 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001670 mapper1.set_timestamp_callback(
1671 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001672
1673 mapper0.AddPeer(&mapper1);
1674 mapper1.AddPeer(&mapper0);
1675
1676 {
1677 std::deque<TimestampedMessage> output1;
1678
1679 ASSERT_TRUE(mapper1.Front() != nullptr);
1680 output1.emplace_back(std::move(*mapper1.Front()));
1681 mapper1.PopFront();
1682
1683 ASSERT_TRUE(mapper1.Front() != nullptr);
1684 output1.emplace_back(std::move(*mapper1.Front()));
1685
1686 ASSERT_FALSE(mapper1.Front() == nullptr);
1687
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001688 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1689 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001690 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001691 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001692
1693 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1694 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001695 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001696 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh993ccb52020-12-12 15:59:32 -08001697 }
Austin Schuh79b30942021-01-24 22:32:21 -08001698
1699 EXPECT_EQ(mapper0_count, 0u);
1700 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001701}
1702
Austin Schuhd2f96102020-12-01 20:27:29 -08001703// Tests that we properly sort log files with duplicate timestamps.
1704TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1705 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1706 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001707 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001708 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001709 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001710 writer1.QueueSpan(config2_.span());
1711
Austin Schuhd863e6e2022-10-16 15:44:50 -07001712 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001713 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001714 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001715 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1716
Austin Schuhd863e6e2022-10-16 15:44:50 -07001717 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001718 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001719 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001720 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1721
Austin Schuhd863e6e2022-10-16 15:44:50 -07001722 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001723 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001724 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001725 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1726
Austin Schuhd863e6e2022-10-16 15:44:50 -07001727 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001728 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001729 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001730 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1731 }
1732
1733 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001734 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001735
1736 ASSERT_EQ(parts[0].logger_node, "pi1");
1737 ASSERT_EQ(parts[1].logger_node, "pi2");
1738
Austin Schuh79b30942021-01-24 22:32:21 -08001739 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001740 TimestampMapper mapper0("pi1", log_files,
1741 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001742 mapper0.set_timestamp_callback(
1743 [&](TimestampedMessage *) { ++mapper0_count; });
1744 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001745 TimestampMapper mapper1("pi2", log_files,
1746 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001747 mapper1.set_timestamp_callback(
1748 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001749
1750 mapper0.AddPeer(&mapper1);
1751 mapper1.AddPeer(&mapper0);
1752
1753 {
1754 SCOPED_TRACE("Trying node1 now");
1755 std::deque<TimestampedMessage> output1;
1756
1757 for (int i = 0; i < 4; ++i) {
1758 ASSERT_TRUE(mapper1.Front() != nullptr);
1759 output1.emplace_back(std::move(*mapper1.Front()));
1760 mapper1.PopFront();
1761 }
1762 ASSERT_TRUE(mapper1.Front() == nullptr);
1763
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001764 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1765 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001766 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001767 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001768
1769 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1770 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001771 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001772 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001773
1774 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1775 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001776 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001777 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001778
1779 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1780 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001781 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001782 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001783 }
Austin Schuh79b30942021-01-24 22:32:21 -08001784
1785 EXPECT_EQ(mapper0_count, 0u);
1786 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001787}
1788
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001789// Tests that we properly produce a valid start time.
Austin Schuhd2f96102020-12-01 20:27:29 -08001790TEST_F(TimestampMapperTest, StartTime) {
1791 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1792 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001793 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001794 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001795 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001796 writer1.QueueSpan(config1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001797 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001798 writer2.QueueSpan(config3_.span());
1799 }
1800
1801 const std::vector<LogFile> parts =
1802 SortParts({logfile0_, logfile1_, logfile2_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001803 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001804
Austin Schuh79b30942021-01-24 22:32:21 -08001805 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001806 TimestampMapper mapper0("pi1", log_files,
1807 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001808 mapper0.set_timestamp_callback(
1809 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001810
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001811 EXPECT_EQ(mapper0.monotonic_start_time(0), e + chrono::milliseconds(1));
1812 EXPECT_EQ(mapper0.realtime_start_time(0),
Austin Schuhd2f96102020-12-01 20:27:29 -08001813 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001814 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001815}
1816
Austin Schuhfecf1d82020-12-19 16:57:28 -08001817// Tests that when a peer isn't registered, we treat that as if there was no
1818// data available.
1819TEST_F(TimestampMapperTest, NoPeer) {
1820 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1821 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001822 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001823 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001824 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001825 writer1.QueueSpan(config2_.span());
1826
1827 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001828 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001829 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1830
Austin Schuhd863e6e2022-10-16 15:44:50 -07001831 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001832 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001833 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001834 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1835
Austin Schuhd863e6e2022-10-16 15:44:50 -07001836 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001837 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001838 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001839 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1840 }
1841
1842 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001843 LogFilesContainer log_files(parts);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001844
1845 ASSERT_EQ(parts[0].logger_node, "pi1");
1846 ASSERT_EQ(parts[1].logger_node, "pi2");
1847
Austin Schuh79b30942021-01-24 22:32:21 -08001848 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001849 TimestampMapper mapper1("pi2", log_files,
1850 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001851 mapper1.set_timestamp_callback(
1852 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001853
1854 {
1855 std::deque<TimestampedMessage> output1;
1856
1857 ASSERT_TRUE(mapper1.Front() != nullptr);
1858 output1.emplace_back(std::move(*mapper1.Front()));
1859 mapper1.PopFront();
1860 ASSERT_TRUE(mapper1.Front() != nullptr);
1861 output1.emplace_back(std::move(*mapper1.Front()));
1862 mapper1.PopFront();
1863 ASSERT_TRUE(mapper1.Front() != nullptr);
1864 output1.emplace_back(std::move(*mapper1.Front()));
1865 mapper1.PopFront();
1866 ASSERT_TRUE(mapper1.Front() == nullptr);
1867
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001868 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1869 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001870 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001871 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001872
1873 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1874 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001875 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001876 EXPECT_FALSE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001877
1878 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1879 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001880 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001881 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001882 }
Austin Schuh79b30942021-01-24 22:32:21 -08001883 EXPECT_EQ(mapper1_count, 3u);
1884}
1885
1886// Tests that we can queue messages and call the timestamp callback for both
1887// nodes.
1888TEST_F(TimestampMapperTest, QueueUntilNode0) {
1889 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1890 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001891 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh79b30942021-01-24 22:32:21 -08001892 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001893 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh79b30942021-01-24 22:32:21 -08001894 writer1.QueueSpan(config2_.span());
1895
Austin Schuhd863e6e2022-10-16 15:44:50 -07001896 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001897 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001898 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001899 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1900
Austin Schuhd863e6e2022-10-16 15:44:50 -07001901 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001902 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001903 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001904 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1905
Austin Schuhd863e6e2022-10-16 15:44:50 -07001906 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001907 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001908 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001909 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1910
Austin Schuhd863e6e2022-10-16 15:44:50 -07001911 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001912 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001913 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001914 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1915 }
1916
1917 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001918 LogFilesContainer log_files(parts);
Austin Schuh79b30942021-01-24 22:32:21 -08001919
1920 ASSERT_EQ(parts[0].logger_node, "pi1");
1921 ASSERT_EQ(parts[1].logger_node, "pi2");
1922
1923 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001924 TimestampMapper mapper0("pi1", log_files,
1925 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001926 mapper0.set_timestamp_callback(
1927 [&](TimestampedMessage *) { ++mapper0_count; });
1928 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001929 TimestampMapper mapper1("pi2", log_files,
1930 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001931 mapper1.set_timestamp_callback(
1932 [&](TimestampedMessage *) { ++mapper1_count; });
1933
1934 mapper0.AddPeer(&mapper1);
1935 mapper1.AddPeer(&mapper0);
1936
1937 {
1938 std::deque<TimestampedMessage> output0;
1939
1940 EXPECT_EQ(mapper0_count, 0u);
1941 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001942 mapper0.QueueUntil(
1943 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001944 EXPECT_EQ(mapper0_count, 3u);
1945 EXPECT_EQ(mapper1_count, 0u);
1946
1947 ASSERT_TRUE(mapper0.Front() != nullptr);
1948 EXPECT_EQ(mapper0_count, 3u);
1949 EXPECT_EQ(mapper1_count, 0u);
1950
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001951 mapper0.QueueUntil(
1952 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001953 EXPECT_EQ(mapper0_count, 3u);
1954 EXPECT_EQ(mapper1_count, 0u);
1955
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001956 mapper0.QueueUntil(
1957 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001958 EXPECT_EQ(mapper0_count, 4u);
1959 EXPECT_EQ(mapper1_count, 0u);
1960
1961 output0.emplace_back(std::move(*mapper0.Front()));
1962 mapper0.PopFront();
1963 output0.emplace_back(std::move(*mapper0.Front()));
1964 mapper0.PopFront();
1965 output0.emplace_back(std::move(*mapper0.Front()));
1966 mapper0.PopFront();
1967 output0.emplace_back(std::move(*mapper0.Front()));
1968 mapper0.PopFront();
1969
1970 EXPECT_EQ(mapper0_count, 4u);
1971 EXPECT_EQ(mapper1_count, 0u);
1972
1973 ASSERT_TRUE(mapper0.Front() == nullptr);
1974
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001975 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1976 EXPECT_EQ(output0[0].monotonic_event_time.time,
1977 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001978 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001979
1980 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1981 EXPECT_EQ(output0[1].monotonic_event_time.time,
1982 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001983 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001984
1985 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1986 EXPECT_EQ(output0[2].monotonic_event_time.time,
1987 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001988 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001989
1990 EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
1991 EXPECT_EQ(output0[3].monotonic_event_time.time,
1992 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001993 EXPECT_TRUE(output0[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001994 }
1995
1996 {
1997 SCOPED_TRACE("Trying node1 now");
1998 std::deque<TimestampedMessage> output1;
1999
2000 EXPECT_EQ(mapper0_count, 4u);
2001 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002002 mapper1.QueueUntil(BootTimestamp{
2003 .boot = 0,
2004 .time = e + chrono::seconds(100) + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08002005 EXPECT_EQ(mapper0_count, 4u);
2006 EXPECT_EQ(mapper1_count, 3u);
2007
2008 ASSERT_TRUE(mapper1.Front() != nullptr);
2009 EXPECT_EQ(mapper0_count, 4u);
2010 EXPECT_EQ(mapper1_count, 3u);
2011
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002012 mapper1.QueueUntil(BootTimestamp{
2013 .boot = 0,
2014 .time = e + chrono::seconds(100) + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08002015 EXPECT_EQ(mapper0_count, 4u);
2016 EXPECT_EQ(mapper1_count, 3u);
2017
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002018 mapper1.QueueUntil(BootTimestamp{
2019 .boot = 0,
2020 .time = e + chrono::seconds(100) + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08002021 EXPECT_EQ(mapper0_count, 4u);
2022 EXPECT_EQ(mapper1_count, 4u);
2023
2024 ASSERT_TRUE(mapper1.Front() != nullptr);
2025 EXPECT_EQ(mapper0_count, 4u);
2026 EXPECT_EQ(mapper1_count, 4u);
2027
2028 output1.emplace_back(std::move(*mapper1.Front()));
2029 mapper1.PopFront();
2030 ASSERT_TRUE(mapper1.Front() != nullptr);
2031 output1.emplace_back(std::move(*mapper1.Front()));
2032 mapper1.PopFront();
2033 ASSERT_TRUE(mapper1.Front() != nullptr);
2034 output1.emplace_back(std::move(*mapper1.Front()));
2035 mapper1.PopFront();
2036 ASSERT_TRUE(mapper1.Front() != nullptr);
2037 output1.emplace_back(std::move(*mapper1.Front()));
2038 mapper1.PopFront();
2039
2040 EXPECT_EQ(mapper0_count, 4u);
2041 EXPECT_EQ(mapper1_count, 4u);
2042
2043 ASSERT_TRUE(mapper1.Front() == nullptr);
2044
2045 EXPECT_EQ(mapper0_count, 4u);
2046 EXPECT_EQ(mapper1_count, 4u);
2047
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002048 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2049 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002050 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002051 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002052
2053 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
2054 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002055 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002056 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002057
2058 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
2059 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002060 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002061 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002062
2063 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
2064 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002065 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002066 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08002067 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08002068}
2069
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002070class BootMergerTest : public SortingElementTest {
2071 public:
2072 BootMergerTest()
2073 : SortingElementTest(),
2074 boot0_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002075 /* 100ms */
2076 "max_out_of_order_duration": 100000000,
2077 "node": {
2078 "name": "pi2"
2079 },
2080 "logger_node": {
2081 "name": "pi1"
2082 },
2083 "monotonic_start_time": 1000000,
2084 "realtime_start_time": 1000000000000,
2085 "logger_monotonic_start_time": 1000000,
2086 "logger_realtime_start_time": 1000000000000,
2087 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2088 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
2089 "parts_index": 0,
2090 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2091 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002092 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2093 "boot_uuids": [
2094 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2095 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2096 ""
2097 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002098})")),
2099 boot1_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002100 /* 100ms */
2101 "max_out_of_order_duration": 100000000,
2102 "node": {
2103 "name": "pi2"
2104 },
2105 "logger_node": {
2106 "name": "pi1"
2107 },
2108 "monotonic_start_time": 1000000,
2109 "realtime_start_time": 1000000000000,
2110 "logger_monotonic_start_time": 1000000,
2111 "logger_realtime_start_time": 1000000000000,
2112 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2113 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
2114 "parts_index": 1,
2115 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2116 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002117 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2118 "boot_uuids": [
2119 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2120 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2121 ""
2122 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002123})")) {}
Austin Schuh8bee6882021-06-28 21:03:28 -07002124
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002125 protected:
2126 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
2127 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
2128};
2129
2130// This tests that we can properly sort a multi-node log file which has the old
2131// (and buggy) timestamps in the header, and the non-resetting parts_index.
2132// These make it so we can just bairly figure out what happened first and what
2133// happened second, but not in a way that is robust to multiple nodes rebooting.
2134TEST_F(BootMergerTest, OldReboot) {
Austin Schuh8bee6882021-06-28 21:03:28 -07002135 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002136 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002137 writer.QueueSpan(boot0_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002138 }
2139 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002140 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002141 writer.QueueSpan(boot1_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002142 }
2143
2144 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
2145
2146 ASSERT_EQ(parts.size(), 1u);
2147 ASSERT_EQ(parts[0].parts.size(), 2u);
2148
2149 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
2150 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002151 boot0_.message().source_node_boot_uuid()->string_view());
Austin Schuh8bee6882021-06-28 21:03:28 -07002152
2153 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
2154 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002155 boot1_.message().source_node_boot_uuid()->string_view());
2156}
2157
2158// This tests that we can produce messages ordered across a reboot.
2159TEST_F(BootMergerTest, SortAcrossReboot) {
2160 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2161 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002162 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002163 writer.QueueSpan(boot0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002164 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002165 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002166 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002167 MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
2168 }
2169 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002170 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002171 writer.QueueSpan(boot1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002172 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002173 MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002174 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002175 MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
2176 }
2177
2178 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07002179 LogFilesContainer log_files(parts);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002180 ASSERT_EQ(parts.size(), 1u);
2181 ASSERT_EQ(parts[0].parts.size(), 2u);
2182
Austin Schuh63097262023-08-16 17:04:29 -07002183 BootMerger merger("pi2", log_files,
2184 {StoredDataType::DATA, StoredDataType::TIMESTAMPS,
2185 StoredDataType::REMOTE_TIMESTAMPS});
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002186
2187 EXPECT_EQ(merger.node(), 1u);
2188
2189 std::vector<Message> output;
2190 for (int i = 0; i < 4; ++i) {
2191 ASSERT_TRUE(merger.Front() != nullptr);
2192 output.emplace_back(std::move(*merger.Front()));
2193 merger.PopFront();
2194 }
2195
2196 ASSERT_TRUE(merger.Front() == nullptr);
2197
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002198 EXPECT_EQ(output[0].timestamp.boot, 0u);
2199 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
2200 EXPECT_EQ(output[1].timestamp.boot, 0u);
2201 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(2000));
2202
2203 EXPECT_EQ(output[2].timestamp.boot, 1u);
2204 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(100));
2205 EXPECT_EQ(output[3].timestamp.boot, 1u);
2206 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
Austin Schuh8bee6882021-06-28 21:03:28 -07002207}
2208
Austin Schuh48507722021-07-17 17:29:24 -07002209class RebootTimestampMapperTest : public SortingElementTest {
2210 public:
2211 RebootTimestampMapperTest()
2212 : SortingElementTest(),
2213 boot0a_(MakeHeader(config_, R"({
2214 /* 100ms */
2215 "max_out_of_order_duration": 100000000,
2216 "node": {
2217 "name": "pi1"
2218 },
2219 "logger_node": {
2220 "name": "pi1"
2221 },
2222 "monotonic_start_time": 1000000,
2223 "realtime_start_time": 1000000000000,
2224 "logger_monotonic_start_time": 1000000,
2225 "logger_realtime_start_time": 1000000000000,
2226 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2227 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2228 "parts_index": 0,
2229 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2230 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2231 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2232 "boot_uuids": [
2233 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2234 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2235 ""
2236 ]
2237})")),
2238 boot0b_(MakeHeader(config_, R"({
2239 /* 100ms */
2240 "max_out_of_order_duration": 100000000,
2241 "node": {
2242 "name": "pi1"
2243 },
2244 "logger_node": {
2245 "name": "pi1"
2246 },
2247 "monotonic_start_time": 1000000,
2248 "realtime_start_time": 1000000000000,
2249 "logger_monotonic_start_time": 1000000,
2250 "logger_realtime_start_time": 1000000000000,
2251 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2252 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2253 "parts_index": 1,
2254 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2255 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2256 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2257 "boot_uuids": [
2258 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2259 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2260 ""
2261 ]
2262})")),
2263 boot1a_(MakeHeader(config_, R"({
2264 /* 100ms */
2265 "max_out_of_order_duration": 100000000,
2266 "node": {
2267 "name": "pi2"
2268 },
2269 "logger_node": {
2270 "name": "pi1"
2271 },
2272 "monotonic_start_time": 1000000,
2273 "realtime_start_time": 1000000000000,
2274 "logger_monotonic_start_time": 1000000,
2275 "logger_realtime_start_time": 1000000000000,
2276 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2277 "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
2278 "parts_index": 0,
2279 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2280 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2281 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2282 "boot_uuids": [
2283 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2284 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2285 ""
2286 ]
2287})")),
2288 boot1b_(MakeHeader(config_, R"({
2289 /* 100ms */
2290 "max_out_of_order_duration": 100000000,
2291 "node": {
2292 "name": "pi2"
2293 },
2294 "logger_node": {
2295 "name": "pi1"
2296 },
2297 "monotonic_start_time": 1000000,
2298 "realtime_start_time": 1000000000000,
2299 "logger_monotonic_start_time": 1000000,
2300 "logger_realtime_start_time": 1000000000000,
2301 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2302 "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
2303 "parts_index": 1,
2304 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2305 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2306 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2307 "boot_uuids": [
2308 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2309 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2310 ""
2311 ]
2312})")) {}
2313
2314 protected:
2315 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
2316 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
2317 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
2318 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
2319};
2320
Austin Schuh48507722021-07-17 17:29:24 -07002321// Tests that we can match timestamps on delivered messages in the presence of
2322// reboots on the node receiving timestamps.
2323TEST_F(RebootTimestampMapperTest, ReadNode0First) {
2324 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2325 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002326 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002327 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002328 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002329 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002330 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002331 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002332 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002333 writer1b.QueueSpan(boot1b_.span());
2334
Austin Schuhd863e6e2022-10-16 15:44:50 -07002335 writer0a.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002336 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002337 writer1a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002338 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
2339 e + chrono::milliseconds(1001)));
2340
Austin Schuhd863e6e2022-10-16 15:44:50 -07002341 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh58646e22021-08-23 23:51:46 -07002342 e + chrono::milliseconds(1000), 0, chrono::seconds(21),
2343 e + chrono::milliseconds(2001)));
2344
Austin Schuhd863e6e2022-10-16 15:44:50 -07002345 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002346 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002347 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002348 e + chrono::milliseconds(2000), 0, chrono::seconds(20),
2349 e + chrono::milliseconds(2001)));
2350
Austin Schuhd863e6e2022-10-16 15:44:50 -07002351 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002352 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002353 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002354 e + chrono::milliseconds(3000), 0, chrono::seconds(20),
2355 e + chrono::milliseconds(3001)));
2356 }
2357
Austin Schuh58646e22021-08-23 23:51:46 -07002358 const std::vector<LogFile> parts =
2359 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07002360 LogFilesContainer log_files(parts);
Austin Schuh48507722021-07-17 17:29:24 -07002361
2362 for (const auto &x : parts) {
2363 LOG(INFO) << x;
2364 }
2365 ASSERT_EQ(parts.size(), 1u);
2366 ASSERT_EQ(parts[0].logger_node, "pi1");
2367
2368 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07002369 TimestampMapper mapper0("pi1", log_files,
2370 TimestampQueueStrategy::kQueueTogether);
Austin Schuh48507722021-07-17 17:29:24 -07002371 mapper0.set_timestamp_callback(
2372 [&](TimestampedMessage *) { ++mapper0_count; });
2373 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07002374 TimestampMapper mapper1("pi2", log_files,
2375 TimestampQueueStrategy::kQueueTogether);
Austin Schuh48507722021-07-17 17:29:24 -07002376 mapper1.set_timestamp_callback(
2377 [&](TimestampedMessage *) { ++mapper1_count; });
2378
2379 mapper0.AddPeer(&mapper1);
2380 mapper1.AddPeer(&mapper0);
2381
2382 {
2383 std::deque<TimestampedMessage> output0;
2384
2385 EXPECT_EQ(mapper0_count, 0u);
2386 EXPECT_EQ(mapper1_count, 0u);
2387 ASSERT_TRUE(mapper0.Front() != nullptr);
2388 EXPECT_EQ(mapper0_count, 1u);
2389 EXPECT_EQ(mapper1_count, 0u);
2390 output0.emplace_back(std::move(*mapper0.Front()));
2391 mapper0.PopFront();
2392 EXPECT_TRUE(mapper0.started());
2393 EXPECT_EQ(mapper0_count, 1u);
2394 EXPECT_EQ(mapper1_count, 0u);
2395
2396 ASSERT_TRUE(mapper0.Front() != nullptr);
2397 EXPECT_EQ(mapper0_count, 2u);
2398 EXPECT_EQ(mapper1_count, 0u);
2399 output0.emplace_back(std::move(*mapper0.Front()));
2400 mapper0.PopFront();
2401 EXPECT_TRUE(mapper0.started());
2402
2403 ASSERT_TRUE(mapper0.Front() != nullptr);
2404 output0.emplace_back(std::move(*mapper0.Front()));
2405 mapper0.PopFront();
2406 EXPECT_TRUE(mapper0.started());
2407
2408 EXPECT_EQ(mapper0_count, 3u);
2409 EXPECT_EQ(mapper1_count, 0u);
2410
2411 ASSERT_TRUE(mapper0.Front() == nullptr);
2412
2413 LOG(INFO) << output0[0];
2414 LOG(INFO) << output0[1];
2415 LOG(INFO) << output0[2];
2416
2417 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2418 EXPECT_EQ(output0[0].monotonic_event_time.time,
2419 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002420 EXPECT_EQ(output0[0].queue_index,
2421 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002422 EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
2423 EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002424 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002425
2426 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2427 EXPECT_EQ(output0[1].monotonic_event_time.time,
2428 e + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002429 EXPECT_EQ(output0[1].queue_index,
2430 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002431 EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
2432 EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002433 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002434
2435 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2436 EXPECT_EQ(output0[2].monotonic_event_time.time,
2437 e + chrono::milliseconds(3000));
Austin Schuh58646e22021-08-23 23:51:46 -07002438 EXPECT_EQ(output0[2].queue_index,
2439 (BootQueueIndex{.boot = 0u, .index = 2u}));
Austin Schuh48507722021-07-17 17:29:24 -07002440 EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
2441 EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002442 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002443 }
2444
2445 {
2446 SCOPED_TRACE("Trying node1 now");
2447 std::deque<TimestampedMessage> output1;
2448
2449 EXPECT_EQ(mapper0_count, 3u);
2450 EXPECT_EQ(mapper1_count, 0u);
2451
2452 ASSERT_TRUE(mapper1.Front() != nullptr);
2453 EXPECT_EQ(mapper0_count, 3u);
2454 EXPECT_EQ(mapper1_count, 1u);
2455 output1.emplace_back(std::move(*mapper1.Front()));
2456 mapper1.PopFront();
2457 EXPECT_TRUE(mapper1.started());
2458 EXPECT_EQ(mapper0_count, 3u);
2459 EXPECT_EQ(mapper1_count, 1u);
2460
2461 ASSERT_TRUE(mapper1.Front() != nullptr);
2462 EXPECT_EQ(mapper0_count, 3u);
2463 EXPECT_EQ(mapper1_count, 2u);
2464 output1.emplace_back(std::move(*mapper1.Front()));
2465 mapper1.PopFront();
2466 EXPECT_TRUE(mapper1.started());
2467
2468 ASSERT_TRUE(mapper1.Front() != nullptr);
2469 output1.emplace_back(std::move(*mapper1.Front()));
2470 mapper1.PopFront();
2471 EXPECT_TRUE(mapper1.started());
2472
Austin Schuh58646e22021-08-23 23:51:46 -07002473 ASSERT_TRUE(mapper1.Front() != nullptr);
2474 output1.emplace_back(std::move(*mapper1.Front()));
2475 mapper1.PopFront();
2476 EXPECT_TRUE(mapper1.started());
2477
Austin Schuh48507722021-07-17 17:29:24 -07002478 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002479 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002480
2481 ASSERT_TRUE(mapper1.Front() == nullptr);
2482
2483 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002484 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002485
2486 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2487 EXPECT_EQ(output1[0].monotonic_event_time.time,
2488 e + chrono::seconds(100) + chrono::milliseconds(1000));
2489 EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
2490 EXPECT_EQ(output1[0].monotonic_remote_time.time,
2491 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002492 EXPECT_EQ(output1[0].remote_queue_index,
2493 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002494 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
2495 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
2496 e + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002497 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002498
2499 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2500 EXPECT_EQ(output1[1].monotonic_event_time.time,
2501 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002502 EXPECT_EQ(output1[1].remote_queue_index,
2503 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002504 EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
2505 EXPECT_EQ(output1[1].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002506 e + chrono::milliseconds(1000));
Austin Schuh48507722021-07-17 17:29:24 -07002507 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
2508 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
2509 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002510 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002511
2512 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2513 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002514 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh48507722021-07-17 17:29:24 -07002515 EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
2516 EXPECT_EQ(output1[2].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002517 e + chrono::milliseconds(2000));
2518 EXPECT_EQ(output1[2].remote_queue_index,
2519 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002520 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
2521 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002522 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002523 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002524
Austin Schuh58646e22021-08-23 23:51:46 -07002525 EXPECT_EQ(output1[3].monotonic_event_time.boot, 1u);
2526 EXPECT_EQ(output1[3].monotonic_event_time.time,
2527 e + chrono::seconds(20) + chrono::milliseconds(3000));
2528 EXPECT_EQ(output1[3].monotonic_remote_time.boot, 0u);
2529 EXPECT_EQ(output1[3].monotonic_remote_time.time,
2530 e + chrono::milliseconds(3000));
2531 EXPECT_EQ(output1[3].remote_queue_index,
2532 (BootQueueIndex{.boot = 0u, .index = 2u}));
2533 EXPECT_EQ(output1[3].monotonic_timestamp_time.boot, 0u);
2534 EXPECT_EQ(output1[3].monotonic_timestamp_time.time,
2535 e + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002536 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh58646e22021-08-23 23:51:46 -07002537
Austin Schuh48507722021-07-17 17:29:24 -07002538 LOG(INFO) << output1[0];
2539 LOG(INFO) << output1[1];
2540 LOG(INFO) << output1[2];
Austin Schuh58646e22021-08-23 23:51:46 -07002541 LOG(INFO) << output1[3];
Austin Schuh48507722021-07-17 17:29:24 -07002542 }
2543}
2544
2545TEST_F(RebootTimestampMapperTest, Node2Reboot) {
2546 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2547 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002548 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002549 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002550 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002551 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002552 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002553 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002554 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002555 writer1b.QueueSpan(boot1b_.span());
2556
Austin Schuhd863e6e2022-10-16 15:44:50 -07002557 writer1a.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002558 e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002559 writer0a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002560 e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
2561 chrono::seconds(-100),
2562 e + chrono::seconds(100) + chrono::milliseconds(1001)));
2563
Austin Schuhd863e6e2022-10-16 15:44:50 -07002564 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002565 e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002566 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002567 e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
2568 chrono::seconds(-20),
2569 e + chrono::seconds(20) + chrono::milliseconds(2001)));
2570
Austin Schuhd863e6e2022-10-16 15:44:50 -07002571 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002572 e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002573 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002574 e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
2575 chrono::seconds(-20),
2576 e + chrono::seconds(20) + chrono::milliseconds(3001)));
2577 }
2578
2579 const std::vector<LogFile> parts =
2580 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07002581 LogFilesContainer log_files(parts);
Austin Schuh48507722021-07-17 17:29:24 -07002582
2583 for (const auto &x : parts) {
2584 LOG(INFO) << x;
2585 }
2586 ASSERT_EQ(parts.size(), 1u);
2587 ASSERT_EQ(parts[0].logger_node, "pi1");
2588
2589 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07002590 TimestampMapper mapper0("pi1", log_files,
2591 TimestampQueueStrategy::kQueueTogether);
Austin Schuh48507722021-07-17 17:29:24 -07002592 mapper0.set_timestamp_callback(
2593 [&](TimestampedMessage *) { ++mapper0_count; });
2594 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07002595 TimestampMapper mapper1("pi2", log_files,
2596 TimestampQueueStrategy::kQueueTogether);
Austin Schuh48507722021-07-17 17:29:24 -07002597 mapper1.set_timestamp_callback(
2598 [&](TimestampedMessage *) { ++mapper1_count; });
2599
2600 mapper0.AddPeer(&mapper1);
2601 mapper1.AddPeer(&mapper0);
2602
2603 {
2604 std::deque<TimestampedMessage> output0;
2605
2606 EXPECT_EQ(mapper0_count, 0u);
2607 EXPECT_EQ(mapper1_count, 0u);
2608 ASSERT_TRUE(mapper0.Front() != nullptr);
2609 EXPECT_EQ(mapper0_count, 1u);
2610 EXPECT_EQ(mapper1_count, 0u);
2611 output0.emplace_back(std::move(*mapper0.Front()));
2612 mapper0.PopFront();
2613 EXPECT_TRUE(mapper0.started());
2614 EXPECT_EQ(mapper0_count, 1u);
2615 EXPECT_EQ(mapper1_count, 0u);
2616
2617 ASSERT_TRUE(mapper0.Front() != nullptr);
2618 EXPECT_EQ(mapper0_count, 2u);
2619 EXPECT_EQ(mapper1_count, 0u);
2620 output0.emplace_back(std::move(*mapper0.Front()));
2621 mapper0.PopFront();
2622 EXPECT_TRUE(mapper0.started());
2623
2624 ASSERT_TRUE(mapper0.Front() != nullptr);
2625 output0.emplace_back(std::move(*mapper0.Front()));
2626 mapper0.PopFront();
2627 EXPECT_TRUE(mapper0.started());
2628
2629 EXPECT_EQ(mapper0_count, 3u);
2630 EXPECT_EQ(mapper1_count, 0u);
2631
2632 ASSERT_TRUE(mapper0.Front() == nullptr);
2633
2634 LOG(INFO) << output0[0];
2635 LOG(INFO) << output0[1];
2636 LOG(INFO) << output0[2];
2637
2638 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2639 EXPECT_EQ(output0[0].monotonic_event_time.time,
2640 e + chrono::milliseconds(1000));
2641 EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
2642 EXPECT_EQ(output0[0].monotonic_remote_time.time,
2643 e + chrono::seconds(100) + chrono::milliseconds(1000));
2644 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
2645 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
2646 e + chrono::seconds(100) + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002647 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002648
2649 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2650 EXPECT_EQ(output0[1].monotonic_event_time.time,
2651 e + chrono::milliseconds(2000));
2652 EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
2653 EXPECT_EQ(output0[1].monotonic_remote_time.time,
2654 e + chrono::seconds(20) + chrono::milliseconds(2000));
2655 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
2656 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
2657 e + chrono::seconds(20) + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002658 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002659
2660 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2661 EXPECT_EQ(output0[2].monotonic_event_time.time,
2662 e + chrono::milliseconds(3000));
2663 EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
2664 EXPECT_EQ(output0[2].monotonic_remote_time.time,
2665 e + chrono::seconds(20) + chrono::milliseconds(3000));
2666 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
2667 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
2668 e + chrono::seconds(20) + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002669 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002670 }
2671
2672 {
2673 SCOPED_TRACE("Trying node1 now");
2674 std::deque<TimestampedMessage> output1;
2675
2676 EXPECT_EQ(mapper0_count, 3u);
2677 EXPECT_EQ(mapper1_count, 0u);
2678
2679 ASSERT_TRUE(mapper1.Front() != nullptr);
2680 EXPECT_EQ(mapper0_count, 3u);
2681 EXPECT_EQ(mapper1_count, 1u);
2682 output1.emplace_back(std::move(*mapper1.Front()));
2683 mapper1.PopFront();
2684 EXPECT_TRUE(mapper1.started());
2685 EXPECT_EQ(mapper0_count, 3u);
2686 EXPECT_EQ(mapper1_count, 1u);
2687
2688 ASSERT_TRUE(mapper1.Front() != nullptr);
2689 EXPECT_EQ(mapper0_count, 3u);
2690 EXPECT_EQ(mapper1_count, 2u);
2691 output1.emplace_back(std::move(*mapper1.Front()));
2692 mapper1.PopFront();
2693 EXPECT_TRUE(mapper1.started());
2694
2695 ASSERT_TRUE(mapper1.Front() != nullptr);
2696 output1.emplace_back(std::move(*mapper1.Front()));
2697 mapper1.PopFront();
2698 EXPECT_TRUE(mapper1.started());
2699
2700 EXPECT_EQ(mapper0_count, 3u);
2701 EXPECT_EQ(mapper1_count, 3u);
2702
2703 ASSERT_TRUE(mapper1.Front() == nullptr);
2704
2705 EXPECT_EQ(mapper0_count, 3u);
2706 EXPECT_EQ(mapper1_count, 3u);
2707
2708 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2709 EXPECT_EQ(output1[0].monotonic_event_time.time,
2710 e + chrono::seconds(100) + chrono::milliseconds(1000));
2711 EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
2712 EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002713 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002714
2715 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2716 EXPECT_EQ(output1[1].monotonic_event_time.time,
2717 e + chrono::seconds(20) + chrono::milliseconds(2000));
2718 EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
2719 EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002720 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002721
2722 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2723 EXPECT_EQ(output1[2].monotonic_event_time.time,
2724 e + chrono::seconds(20) + chrono::milliseconds(3000));
2725 EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
2726 EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002727 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002728
2729 LOG(INFO) << output1[0];
2730 LOG(INFO) << output1[1];
2731 LOG(INFO) << output1[2];
2732 }
2733}
2734
Austin Schuh44c61472021-11-22 21:04:10 -08002735class SortingDeathTest : public SortingElementTest {
2736 public:
2737 SortingDeathTest()
2738 : SortingElementTest(),
2739 part0_(MakeHeader(config_, R"({
2740 /* 100ms */
2741 "max_out_of_order_duration": 100000000,
2742 "node": {
2743 "name": "pi1"
2744 },
2745 "logger_node": {
2746 "name": "pi1"
2747 },
2748 "monotonic_start_time": 1000000,
2749 "realtime_start_time": 1000000000000,
2750 "logger_monotonic_start_time": 1000000,
2751 "logger_realtime_start_time": 1000000000000,
2752 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2753 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2754 "parts_index": 0,
2755 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2756 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2757 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2758 "boot_uuids": [
2759 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2760 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2761 ""
2762 ],
2763 "oldest_remote_monotonic_timestamps": [
2764 9223372036854775807,
2765 9223372036854775807,
2766 9223372036854775807
2767 ],
2768 "oldest_local_monotonic_timestamps": [
2769 9223372036854775807,
2770 9223372036854775807,
2771 9223372036854775807
2772 ],
2773 "oldest_remote_unreliable_monotonic_timestamps": [
2774 9223372036854775807,
2775 0,
2776 9223372036854775807
2777 ],
2778 "oldest_local_unreliable_monotonic_timestamps": [
2779 9223372036854775807,
2780 0,
2781 9223372036854775807
2782 ]
2783})")),
2784 part1_(MakeHeader(config_, R"({
2785 /* 100ms */
2786 "max_out_of_order_duration": 100000000,
2787 "node": {
2788 "name": "pi1"
2789 },
2790 "logger_node": {
2791 "name": "pi1"
2792 },
2793 "monotonic_start_time": 1000000,
2794 "realtime_start_time": 1000000000000,
2795 "logger_monotonic_start_time": 1000000,
2796 "logger_realtime_start_time": 1000000000000,
2797 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2798 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2799 "parts_index": 1,
2800 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2801 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2802 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2803 "boot_uuids": [
2804 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2805 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2806 ""
2807 ],
2808 "oldest_remote_monotonic_timestamps": [
2809 9223372036854775807,
2810 9223372036854775807,
2811 9223372036854775807
2812 ],
2813 "oldest_local_monotonic_timestamps": [
2814 9223372036854775807,
2815 9223372036854775807,
2816 9223372036854775807
2817 ],
2818 "oldest_remote_unreliable_monotonic_timestamps": [
2819 9223372036854775807,
2820 100000,
2821 9223372036854775807
2822 ],
2823 "oldest_local_unreliable_monotonic_timestamps": [
2824 9223372036854775807,
2825 100000,
2826 9223372036854775807
2827 ]
2828})")),
2829 part2_(MakeHeader(config_, R"({
2830 /* 100ms */
2831 "max_out_of_order_duration": 100000000,
2832 "node": {
2833 "name": "pi1"
2834 },
2835 "logger_node": {
2836 "name": "pi1"
2837 },
2838 "monotonic_start_time": 1000000,
2839 "realtime_start_time": 1000000000000,
2840 "logger_monotonic_start_time": 1000000,
2841 "logger_realtime_start_time": 1000000000000,
2842 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2843 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2844 "parts_index": 2,
2845 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2846 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2847 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2848 "boot_uuids": [
2849 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2850 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2851 ""
2852 ],
2853 "oldest_remote_monotonic_timestamps": [
2854 9223372036854775807,
2855 9223372036854775807,
2856 9223372036854775807
2857 ],
2858 "oldest_local_monotonic_timestamps": [
2859 9223372036854775807,
2860 9223372036854775807,
2861 9223372036854775807
2862 ],
2863 "oldest_remote_unreliable_monotonic_timestamps": [
2864 9223372036854775807,
2865 200000,
2866 9223372036854775807
2867 ],
2868 "oldest_local_unreliable_monotonic_timestamps": [
2869 9223372036854775807,
2870 200000,
2871 9223372036854775807
2872 ]
2873})")),
2874 part3_(MakeHeader(config_, R"({
2875 /* 100ms */
2876 "max_out_of_order_duration": 100000000,
2877 "node": {
2878 "name": "pi1"
2879 },
2880 "logger_node": {
2881 "name": "pi1"
2882 },
2883 "monotonic_start_time": 1000000,
2884 "realtime_start_time": 1000000000000,
2885 "logger_monotonic_start_time": 1000000,
2886 "logger_realtime_start_time": 1000000000000,
2887 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2888 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2889 "parts_index": 3,
2890 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2891 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2892 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2893 "boot_uuids": [
2894 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2895 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2896 ""
2897 ],
2898 "oldest_remote_monotonic_timestamps": [
2899 9223372036854775807,
2900 9223372036854775807,
2901 9223372036854775807
2902 ],
2903 "oldest_local_monotonic_timestamps": [
2904 9223372036854775807,
2905 9223372036854775807,
2906 9223372036854775807
2907 ],
2908 "oldest_remote_unreliable_monotonic_timestamps": [
2909 9223372036854775807,
2910 300000,
2911 9223372036854775807
2912 ],
2913 "oldest_local_unreliable_monotonic_timestamps": [
2914 9223372036854775807,
2915 300000,
2916 9223372036854775807
2917 ]
2918})")) {}
2919
2920 protected:
2921 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part0_;
2922 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part1_;
2923 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part2_;
2924 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part3_;
2925};
2926
2927// Tests that if 2 computers go back and forth trying to be the same node, we
2928// die in sorting instead of failing to estimate time.
2929TEST_F(SortingDeathTest, FightingNodes) {
2930 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002931 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh44c61472021-11-22 21:04:10 -08002932 writer0.QueueSpan(part0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002933 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh44c61472021-11-22 21:04:10 -08002934 writer1.QueueSpan(part1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002935 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuh44c61472021-11-22 21:04:10 -08002936 writer2.QueueSpan(part2_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002937 TestDetachedBufferWriter writer3(logfile3_);
Austin Schuh44c61472021-11-22 21:04:10 -08002938 writer3.QueueSpan(part3_.span());
2939 }
2940
2941 EXPECT_DEATH(
2942 {
2943 const std::vector<LogFile> parts =
2944 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2945 },
Austin Schuh22cf7862022-09-19 19:09:42 -07002946 "found overlapping boots on");
Austin Schuh44c61472021-11-22 21:04:10 -08002947}
2948
Brian Smarttea913d42021-12-10 15:02:38 -08002949// Tests that we MessageReader blows up on a bad message.
2950TEST(MessageReaderConfirmCrash, ReadWrite) {
2951 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
2952 unlink(logfile.c_str());
2953
2954 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
2955 JsonToSizedFlatbuffer<LogFileHeader>(
2956 R"({ "max_out_of_order_duration": 100000000 })");
2957 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
2958 JsonToSizedFlatbuffer<MessageHeader>(
2959 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
2960 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
2961 JsonToSizedFlatbuffer<MessageHeader>(
2962 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
2963 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m4 =
2964 JsonToSizedFlatbuffer<MessageHeader>(
2965 R"({ "channel_index": 0, "monotonic_sent_time": 4 })");
2966
2967 // Starts out like a proper flat buffer header, but it breaks down ...
2968 std::vector<uint8_t> garbage{8, 0, 0, 0, 16, 0, 0, 0, 4, 0, 0, 0};
2969 absl::Span<uint8_t> m3_span(garbage);
2970
2971 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002972 TestDetachedBufferWriter writer(logfile);
Brian Smarttea913d42021-12-10 15:02:38 -08002973 writer.QueueSpan(config.span());
2974 writer.QueueSpan(m1.span());
2975 writer.QueueSpan(m2.span());
2976 writer.QueueSpan(m3_span);
2977 writer.QueueSpan(m4.span()); // This message is "hidden"
2978 }
2979
2980 {
2981 MessageReader reader(logfile);
2982
2983 EXPECT_EQ(reader.filename(), logfile);
2984
2985 EXPECT_EQ(
2986 reader.max_out_of_order_duration(),
2987 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2988 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2989 EXPECT_TRUE(reader.ReadMessage());
2990 EXPECT_EQ(reader.newest_timestamp(),
2991 monotonic_clock::time_point(chrono::nanoseconds(1)));
2992 EXPECT_TRUE(reader.ReadMessage());
2993 EXPECT_EQ(reader.newest_timestamp(),
2994 monotonic_clock::time_point(chrono::nanoseconds(2)));
2995 // Confirm default crashing behavior
2996 EXPECT_DEATH(reader.ReadMessage(), "Corrupted message at offset");
2997 }
2998
2999 {
3000 gflags::FlagSaver fs;
3001
3002 MessageReader reader(logfile);
3003 reader.set_crash_on_corrupt_message_flag(false);
3004
3005 EXPECT_EQ(reader.filename(), logfile);
3006
3007 EXPECT_EQ(
3008 reader.max_out_of_order_duration(),
3009 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
3010 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
3011 EXPECT_TRUE(reader.ReadMessage());
3012 EXPECT_EQ(reader.newest_timestamp(),
3013 monotonic_clock::time_point(chrono::nanoseconds(1)));
3014 EXPECT_TRUE(reader.ReadMessage());
3015 EXPECT_EQ(reader.newest_timestamp(),
3016 monotonic_clock::time_point(chrono::nanoseconds(2)));
3017 // Confirm avoiding the corrupted message crash, stopping instead.
3018 EXPECT_FALSE(reader.ReadMessage());
3019 }
3020
3021 {
3022 gflags::FlagSaver fs;
3023
3024 MessageReader reader(logfile);
3025 reader.set_crash_on_corrupt_message_flag(false);
3026 reader.set_ignore_corrupt_messages_flag(true);
3027
3028 EXPECT_EQ(reader.filename(), logfile);
3029
3030 EXPECT_EQ(
3031 reader.max_out_of_order_duration(),
3032 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
3033 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
3034 EXPECT_TRUE(reader.ReadMessage());
3035 EXPECT_EQ(reader.newest_timestamp(),
3036 monotonic_clock::time_point(chrono::nanoseconds(1)));
3037 EXPECT_TRUE(reader.ReadMessage());
3038 EXPECT_EQ(reader.newest_timestamp(),
3039 monotonic_clock::time_point(chrono::nanoseconds(2)));
3040 // Confirm skipping of the corrupted message to read the hidden one.
3041 EXPECT_TRUE(reader.ReadMessage());
3042 EXPECT_EQ(reader.newest_timestamp(),
3043 monotonic_clock::time_point(chrono::nanoseconds(4)));
3044 EXPECT_FALSE(reader.ReadMessage());
3045 }
3046}
3047
Austin Schuhfa30c352022-10-16 11:12:02 -07003048class InlinePackMessage : public ::testing::Test {
3049 protected:
3050 aos::Context RandomContext() {
3051 data_ = RandomData();
3052 std::uniform_int_distribution<uint32_t> uint32_distribution(
3053 std::numeric_limits<uint32_t>::min(),
3054 std::numeric_limits<uint32_t>::max());
3055
3056 std::uniform_int_distribution<int64_t> time_distribution(
3057 std::numeric_limits<int64_t>::min(),
3058 std::numeric_limits<int64_t>::max());
3059
3060 aos::Context context;
3061 context.monotonic_event_time =
3062 aos::monotonic_clock::epoch() +
3063 chrono::nanoseconds(time_distribution(random_number_generator_));
3064 context.realtime_event_time =
3065 aos::realtime_clock::epoch() +
3066 chrono::nanoseconds(time_distribution(random_number_generator_));
3067
3068 context.monotonic_remote_time =
3069 aos::monotonic_clock::epoch() +
3070 chrono::nanoseconds(time_distribution(random_number_generator_));
3071 context.realtime_remote_time =
3072 aos::realtime_clock::epoch() +
3073 chrono::nanoseconds(time_distribution(random_number_generator_));
3074
3075 context.queue_index = uint32_distribution(random_number_generator_);
3076 context.remote_queue_index = uint32_distribution(random_number_generator_);
3077 context.size = data_.size();
3078 context.data = data_.data();
3079 return context;
3080 }
3081
Austin Schuhf2d0e682022-10-16 14:20:58 -07003082 aos::monotonic_clock::time_point RandomMonotonic() {
3083 std::uniform_int_distribution<int64_t> time_distribution(
3084 0, std::numeric_limits<int64_t>::max());
3085 return aos::monotonic_clock::epoch() +
3086 chrono::nanoseconds(time_distribution(random_number_generator_));
3087 }
3088
3089 aos::SizePrefixedFlatbufferDetachedBuffer<message_bridge::RemoteMessage>
3090 RandomRemoteMessage() {
3091 std::uniform_int_distribution<uint8_t> uint8_distribution(
3092 std::numeric_limits<uint8_t>::min(),
3093 std::numeric_limits<uint8_t>::max());
3094
3095 std::uniform_int_distribution<int64_t> time_distribution(
3096 std::numeric_limits<int64_t>::min(),
3097 std::numeric_limits<int64_t>::max());
3098
3099 flatbuffers::FlatBufferBuilder fbb;
3100 message_bridge::RemoteMessage::Builder builder(fbb);
3101 builder.add_queue_index(uint8_distribution(random_number_generator_));
3102
3103 builder.add_monotonic_sent_time(
3104 time_distribution(random_number_generator_));
3105 builder.add_realtime_sent_time(time_distribution(random_number_generator_));
3106 builder.add_monotonic_remote_time(
3107 time_distribution(random_number_generator_));
3108 builder.add_realtime_remote_time(
3109 time_distribution(random_number_generator_));
3110
3111 builder.add_remote_queue_index(
3112 uint8_distribution(random_number_generator_));
3113
3114 fbb.FinishSizePrefixed(builder.Finish());
3115 return fbb.Release();
3116 }
3117
Austin Schuhfa30c352022-10-16 11:12:02 -07003118 std::vector<uint8_t> RandomData() {
3119 std::vector<uint8_t> result;
3120 std::uniform_int_distribution<int> length_distribution(1, 32);
3121 std::uniform_int_distribution<uint8_t> data_distribution(
3122 std::numeric_limits<uint8_t>::min(),
3123 std::numeric_limits<uint8_t>::max());
3124
3125 const size_t length = length_distribution(random_number_generator_);
3126
3127 result.reserve(length);
3128 for (size_t i = 0; i < length; ++i) {
3129 result.emplace_back(data_distribution(random_number_generator_));
3130 }
3131 return result;
3132 }
3133
3134 std::mt19937 random_number_generator_{
3135 std::mt19937(::aos::testing::RandomSeed())};
3136
3137 std::vector<uint8_t> data_;
3138};
3139
3140// Uses the binary schema to annotate a provided flatbuffer. Returns the
3141// annotated flatbuffer.
3142std::string AnnotateBinaries(
3143 const aos::NonSizePrefixedFlatbuffer<reflection::Schema> &schema,
3144 const std::string &schema_filename,
3145 flatbuffers::span<uint8_t> binary_data) {
3146 flatbuffers::BinaryAnnotator binary_annotator(
3147 schema.span().data(), schema.span().size(), binary_data.data(),
3148 binary_data.size());
3149
3150 auto annotations = binary_annotator.Annotate();
3151
3152 flatbuffers::AnnotatedBinaryTextGenerator text_generator(
3153 flatbuffers::AnnotatedBinaryTextGenerator::Options{}, annotations,
3154 binary_data.data(), binary_data.size());
3155
3156 text_generator.Generate(aos::testing::TestTmpDir() + "/foo.bfbs",
3157 schema_filename);
3158
3159 return aos::util::ReadFileToStringOrDie(aos::testing::TestTmpDir() +
3160 "/foo.afb");
3161}
3162
Austin Schuh71a40d42023-02-04 21:22:22 -08003163// Event loop which just has working time functions for the Copier classes
3164// tested below.
3165class TimeEventLoop : public EventLoop {
3166 public:
3167 TimeEventLoop() : EventLoop(nullptr) {}
3168
3169 aos::monotonic_clock::time_point monotonic_now() const final {
3170 return aos::monotonic_clock::min_time;
3171 }
3172 realtime_clock::time_point realtime_now() const final {
3173 return aos::realtime_clock::min_time;
3174 }
3175
3176 void OnRun(::std::function<void()> /*on_run*/) final { LOG(FATAL); }
3177
3178 const std::string_view name() const final { return "time"; }
3179 const Node *node() const final { return nullptr; }
3180
3181 void SetRuntimeAffinity(const cpu_set_t & /*cpuset*/) final { LOG(FATAL); }
3182 void SetRuntimeRealtimePriority(int /*priority*/) final { LOG(FATAL); }
3183
3184 const cpu_set_t &runtime_affinity() const final {
3185 LOG(FATAL);
3186 return cpuset_;
3187 }
3188
3189 TimerHandler *AddTimer(::std::function<void()> /*callback*/) final {
3190 LOG(FATAL);
3191 return nullptr;
3192 }
3193
3194 std::unique_ptr<RawSender> MakeRawSender(const Channel * /*channel*/) final {
3195 LOG(FATAL);
3196 return std::unique_ptr<RawSender>();
3197 }
3198
3199 const UUID &boot_uuid() const final {
3200 LOG(FATAL);
3201 return boot_uuid_;
3202 }
3203
3204 void set_name(const std::string_view name) final { LOG(FATAL) << name; }
3205
3206 pid_t GetTid() final {
3207 LOG(FATAL);
3208 return 0;
3209 }
3210
3211 int NumberBuffers(const Channel * /*channel*/) final {
3212 LOG(FATAL);
3213 return 0;
3214 }
3215
3216 int runtime_realtime_priority() const final {
3217 LOG(FATAL);
3218 return 0;
3219 }
3220
3221 std::unique_ptr<RawFetcher> MakeRawFetcher(
3222 const Channel * /*channel*/) final {
3223 LOG(FATAL);
3224 return std::unique_ptr<RawFetcher>();
3225 }
3226
3227 PhasedLoopHandler *AddPhasedLoop(
3228 ::std::function<void(int)> /*callback*/,
3229 const monotonic_clock::duration /*interval*/,
3230 const monotonic_clock::duration /*offset*/) final {
3231 LOG(FATAL);
3232 return nullptr;
3233 }
3234
3235 void MakeRawWatcher(
3236 const Channel * /*channel*/,
3237 std::function<void(const Context &context, const void *message)>
3238 /*watcher*/) final {
3239 LOG(FATAL);
3240 }
3241
3242 private:
3243 const cpu_set_t cpuset_ = DefaultAffinity();
3244 UUID boot_uuid_ = UUID ::Zero();
3245};
3246
Austin Schuhfa30c352022-10-16 11:12:02 -07003247// Tests that all variations of PackMessage are equivalent to the inline
3248// PackMessage used to avoid allocations.
3249TEST_F(InlinePackMessage, Equivilent) {
3250 std::uniform_int_distribution<uint32_t> uint32_distribution(
3251 std::numeric_limits<uint32_t>::min(),
3252 std::numeric_limits<uint32_t>::max());
3253 aos::FlatbufferVector<reflection::Schema> schema =
3254 FileToFlatbuffer<reflection::Schema>(
3255 ArtifactPath("aos/events/logging/logger.bfbs"));
3256
3257 for (const LogType type :
3258 {LogType::kLogMessage, LogType::kLogDeliveryTimeOnly,
3259 LogType::kLogMessageAndDeliveryTime, LogType::kLogRemoteMessage}) {
3260 for (int i = 0; i < 100; ++i) {
3261 aos::Context context = RandomContext();
3262 const uint32_t channel_index =
3263 uint32_distribution(random_number_generator_);
3264
3265 flatbuffers::FlatBufferBuilder fbb;
3266 fbb.ForceDefaults(true);
3267 fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index, type));
3268
3269 VLOG(1) << absl::BytesToHexString(std::string_view(
3270 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3271 fbb.GetBufferSpan().size()));
3272
3273 // Make sure that both the builder and inline method agree on sizes.
Austin Schuh48d10d62022-10-16 22:19:23 -07003274 ASSERT_EQ(fbb.GetSize(), PackMessageSize(type, context.size))
Austin Schuhfa30c352022-10-16 11:12:02 -07003275 << "log type " << static_cast<int>(type);
3276
3277 // Initialize the buffer to something nonzero to make sure all the padding
3278 // bytes are set to 0.
Austin Schuh48d10d62022-10-16 22:19:23 -07003279 std::vector<uint8_t> repacked_message(PackMessageSize(type, context.size),
3280 67);
Austin Schuhfa30c352022-10-16 11:12:02 -07003281
3282 // And verify packing inline works as expected.
Austin Schuh71a40d42023-02-04 21:22:22 -08003283 EXPECT_EQ(
3284 repacked_message.size(),
3285 PackMessageInline(repacked_message.data(), context, channel_index,
3286 type, 0u, repacked_message.size()));
Austin Schuhfa30c352022-10-16 11:12:02 -07003287 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
3288 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3289 fbb.GetBufferSpan().size()))
3290 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
3291 fbb.GetBufferSpan());
Austin Schuh71a40d42023-02-04 21:22:22 -08003292
3293 // Ok, now we want to confirm that we can build up arbitrary pieces of
3294 // said flatbuffer. Try all of them since it is cheap.
3295 TimeEventLoop event_loop;
3296 for (size_t i = 0; i < repacked_message.size(); i += 8) {
3297 for (size_t j = i; j < repacked_message.size(); j += 8) {
3298 std::vector<uint8_t> destination(repacked_message.size(), 67u);
3299 ContextDataCopier copier(context, channel_index, type, &event_loop);
3300
3301 copier.Copy(destination.data(), i, j);
3302
3303 size_t index = 0;
3304 for (size_t k = i; k < j; ++k) {
3305 ASSERT_EQ(destination[index], repacked_message[k])
3306 << ": Failed to match type " << static_cast<int>(type)
3307 << ", index " << index << " while testing range " << i << " to "
3308 << j;
3309 ;
3310 ++index;
3311 }
3312 // Now, confirm that none of the other bytes have been touched.
3313 for (; index < destination.size(); ++index) {
3314 ASSERT_EQ(destination[index], 67u);
3315 }
3316 }
3317 }
Austin Schuhfa30c352022-10-16 11:12:02 -07003318 }
3319 }
3320}
3321
Austin Schuhf2d0e682022-10-16 14:20:58 -07003322// Tests that all variations of PackMessage are equivilent to the inline
3323// PackMessage used to avoid allocations.
3324TEST_F(InlinePackMessage, RemoteEquivilent) {
3325 aos::FlatbufferVector<reflection::Schema> schema =
3326 FileToFlatbuffer<reflection::Schema>(
3327 ArtifactPath("aos/events/logging/logger.bfbs"));
3328 std::uniform_int_distribution<uint8_t> uint8_distribution(
3329 std::numeric_limits<uint8_t>::min(), std::numeric_limits<uint8_t>::max());
3330
3331 for (int i = 0; i < 100; ++i) {
3332 aos::SizePrefixedFlatbufferDetachedBuffer<RemoteMessage> random_msg =
3333 RandomRemoteMessage();
3334 const size_t channel_index = uint8_distribution(random_number_generator_);
3335 const monotonic_clock::time_point monotonic_timestamp_time =
3336 RandomMonotonic();
3337
3338 flatbuffers::FlatBufferBuilder fbb;
3339 fbb.ForceDefaults(true);
3340 fbb.FinishSizePrefixed(PackRemoteMessage(
3341 &fbb, &random_msg.message(), channel_index, monotonic_timestamp_time));
3342
3343 VLOG(1) << absl::BytesToHexString(std::string_view(
3344 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3345 fbb.GetBufferSpan().size()));
3346
3347 // Make sure that both the builder and inline method agree on sizes.
3348 ASSERT_EQ(fbb.GetSize(), PackRemoteMessageSize());
3349
3350 // Initialize the buffer to something nonzer to make sure all the padding
3351 // bytes are set to 0.
3352 std::vector<uint8_t> repacked_message(PackRemoteMessageSize(), 67);
3353
3354 // And verify packing inline works as expected.
Austin Schuh71a40d42023-02-04 21:22:22 -08003355 EXPECT_EQ(repacked_message.size(),
3356 PackRemoteMessageInline(
3357 repacked_message.data(), &random_msg.message(), channel_index,
3358 monotonic_timestamp_time, 0u, repacked_message.size()));
Austin Schuhf2d0e682022-10-16 14:20:58 -07003359 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
3360 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3361 fbb.GetBufferSpan().size()))
3362 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
3363 fbb.GetBufferSpan());
Austin Schuh71a40d42023-02-04 21:22:22 -08003364
3365 // Ok, now we want to confirm that we can build up arbitrary pieces of said
3366 // flatbuffer. Try all of them since it is cheap.
3367 TimeEventLoop event_loop;
3368 for (size_t i = 0; i < repacked_message.size(); i += 8) {
3369 for (size_t j = i; j < repacked_message.size(); j += 8) {
3370 std::vector<uint8_t> destination(repacked_message.size(), 67u);
3371 RemoteMessageCopier copier(&random_msg.message(), channel_index,
3372 monotonic_timestamp_time, &event_loop);
3373
3374 copier.Copy(destination.data(), i, j);
3375
3376 size_t index = 0;
3377 for (size_t k = i; k < j; ++k) {
3378 ASSERT_EQ(destination[index], repacked_message[k]);
3379 ++index;
3380 }
3381 for (; index < destination.size(); ++index) {
3382 ASSERT_EQ(destination[index], 67u);
3383 }
3384 }
3385 }
Austin Schuhf2d0e682022-10-16 14:20:58 -07003386 }
3387}
Austin Schuhfa30c352022-10-16 11:12:02 -07003388
Austin Schuhc243b422020-10-11 15:35:08 -07003389} // namespace testing
3390} // namespace logger
3391} // namespace aos