blob: b97579a7ad0b8af192cb8fdb9c245a21e07db15f [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
Alexei Strots01395492023-03-20 13:59:56 -07004#include <filesystem>
Austin Schuhfa30c352022-10-16 11:12:02 -07005#include <random>
Austin Schuhe243aaf2020-10-11 15:46:02 -07006#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07007
Austin Schuh99f7c6a2024-06-25 22:07:44 -07008#include "absl/flags/flag.h"
9#include "absl/flags/reflection.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070010#include "absl/strings/escaping.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070011#include "external/com_github_google_flatbuffers/src/annotated_binary_text_gen.h"
12#include "external/com_github_google_flatbuffers/src/binary_annotator.h"
13#include "flatbuffers/reflection_generated.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070014#include "gtest/gtest.h"
15
Austin Schuhc41603c2020-10-11 16:17:37 -070016#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -070017#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -080018#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070019#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070020#include "aos/json_to_flatbuffer.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070021#include "aos/testing/path.h"
22#include "aos/testing/random_seed.h"
Austin Schuhc243b422020-10-11 15:35:08 -070023#include "aos/testing/tmpdir.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070024#include "aos/util/file.h"
Austin Schuhc243b422020-10-11 15:35:08 -070025
Stephan Pleinesf63bde82024-01-13 15:59:33 -080026namespace aos::logger::testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070027namespace chrono = std::chrono;
Austin Schuhf2d0e682022-10-16 14:20:58 -070028using aos::message_bridge::RemoteMessage;
Eric Schmiedebergb38477e2022-12-02 16:08:04 -070029using aos::testing::ArtifactPath;
Austin Schuhc243b422020-10-11 15:35:08 -070030
Austin Schuhd863e6e2022-10-16 15:44:50 -070031// Adapter class to make it easy to test DetachedBufferWriter without adding
32// test only boilerplate to DetachedBufferWriter.
Alexei Strots15c22b12023-04-04 16:27:17 -070033class TestDetachedBufferWriter : public FileBackend,
34 public DetachedBufferWriter {
Austin Schuhd863e6e2022-10-16 15:44:50 -070035 public:
Austin Schuh48d10d62022-10-16 22:19:23 -070036 // Pick a max size that is rather conservative.
37 static constexpr size_t kMaxMessageSize = 128 * 1024;
Austin Schuhd863e6e2022-10-16 15:44:50 -070038 TestDetachedBufferWriter(std::string_view filename)
colleen61276dc2023-06-01 09:23:29 -070039 : FileBackend("/", false),
Alexei Strots15c22b12023-04-04 16:27:17 -070040 DetachedBufferWriter(FileBackend::RequestFile(filename),
41 std::make_unique<DummyEncoder>(kMaxMessageSize)) {}
Austin Schuhd863e6e2022-10-16 15:44:50 -070042 void WriteSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
43 QueueSpan(absl::Span<const uint8_t>(buffer.data(), buffer.size()));
44 }
Austin Schuh7ef11a42023-02-04 17:15:12 -080045 void QueueSpan(absl::Span<const uint8_t> buffer) {
46 DataEncoder::SpanCopier coppier(buffer);
47 CopyMessage(&coppier, monotonic_clock::now());
48 }
Austin Schuhd863e6e2022-10-16 15:44:50 -070049};
50
Austin Schuhe243aaf2020-10-11 15:46:02 -070051// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070052template <typename T>
53SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
54 const std::string_view data) {
55 flatbuffers::FlatBufferBuilder fbb;
56 fbb.ForceDefaults(true);
57 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
58 return fbb.Release();
59}
60
Austin Schuhe243aaf2020-10-11 15:46:02 -070061// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070062TEST(SpanReaderTest, ReadWrite) {
63 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
64 unlink(logfile.c_str());
65
66 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080067 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070068 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080069 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070070
71 {
Austin Schuhd863e6e2022-10-16 15:44:50 -070072 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080073 writer.QueueSpan(m1.span());
74 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070075 }
76
77 SpanReader reader(logfile);
78
79 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhcf5f6442021-07-06 10:43:28 -070080 EXPECT_EQ(reader.PeekMessage(), m1.span());
81 EXPECT_EQ(reader.PeekMessage(), m1.span());
Austin Schuhadd6eb32020-11-09 21:24:26 -080082 EXPECT_EQ(reader.ReadMessage(), m1.span());
83 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhcf5f6442021-07-06 10:43:28 -070084 EXPECT_EQ(reader.PeekMessage(), absl::Span<const uint8_t>());
Austin Schuhc243b422020-10-11 15:35:08 -070085 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
86}
87
Austin Schuhe243aaf2020-10-11 15:46:02 -070088// Tests that we can actually parse the resulting messages at a basic level
89// through MessageReader.
90TEST(MessageReaderTest, ReadWrite) {
91 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
92 unlink(logfile.c_str());
93
94 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
95 JsonToSizedFlatbuffer<LogFileHeader>(
96 R"({ "max_out_of_order_duration": 100000000 })");
97 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
98 JsonToSizedFlatbuffer<MessageHeader>(
99 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
100 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
101 JsonToSizedFlatbuffer<MessageHeader>(
102 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
103
104 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700105 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800106 writer.QueueSpan(config.span());
107 writer.QueueSpan(m1.span());
108 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -0700109 }
110
111 MessageReader reader(logfile);
112
113 EXPECT_EQ(reader.filename(), logfile);
114
115 EXPECT_EQ(
116 reader.max_out_of_order_duration(),
117 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
118 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
119 EXPECT_TRUE(reader.ReadMessage());
120 EXPECT_EQ(reader.newest_timestamp(),
121 monotonic_clock::time_point(chrono::nanoseconds(1)));
122 EXPECT_TRUE(reader.ReadMessage());
123 EXPECT_EQ(reader.newest_timestamp(),
124 monotonic_clock::time_point(chrono::nanoseconds(2)));
125 EXPECT_FALSE(reader.ReadMessage());
126}
127
Austin Schuh32f68492020-11-08 21:45:51 -0800128// Tests that we explode when messages are too far out of order.
129TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
130 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
131 unlink(logfile0.c_str());
132
133 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
134 JsonToSizedFlatbuffer<LogFileHeader>(
135 R"({
136 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800137 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800138 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
139 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
140 "parts_index": 0
141})");
142
143 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
144 JsonToSizedFlatbuffer<MessageHeader>(
145 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
146 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
147 JsonToSizedFlatbuffer<MessageHeader>(
148 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
149 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
150 JsonToSizedFlatbuffer<MessageHeader>(
151 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
152
153 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700154 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800155 writer.QueueSpan(config0.span());
156 writer.QueueSpan(m1.span());
157 writer.QueueSpan(m2.span());
158 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800159 }
Alexei Strots01395492023-03-20 13:59:56 -0700160 ASSERT_TRUE(std::filesystem::exists(logfile0)) << logfile0;
Austin Schuh32f68492020-11-08 21:45:51 -0800161
162 const std::vector<LogFile> parts = SortParts({logfile0});
163
164 PartsMessageReader reader(parts[0].parts[0]);
165
166 EXPECT_TRUE(reader.ReadMessage());
167 EXPECT_TRUE(reader.ReadMessage());
168 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
169}
170
Austin Schuhc41603c2020-10-11 16:17:37 -0700171// Tests that we can transparently re-assemble part files with a
172// PartsMessageReader.
173TEST(PartsMessageReaderTest, ReadWrite) {
174 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
175 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
176 unlink(logfile0.c_str());
177 unlink(logfile1.c_str());
178
179 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
180 JsonToSizedFlatbuffer<LogFileHeader>(
181 R"({
182 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800183 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700184 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
185 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
186 "parts_index": 0
187})");
188 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
189 JsonToSizedFlatbuffer<LogFileHeader>(
190 R"({
191 "max_out_of_order_duration": 200000000,
192 "monotonic_start_time": 0,
193 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800194 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700195 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
196 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
197 "parts_index": 1
198})");
199
200 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
201 JsonToSizedFlatbuffer<MessageHeader>(
202 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
203 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
204 JsonToSizedFlatbuffer<MessageHeader>(
205 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
206
207 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700208 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800209 writer.QueueSpan(config0.span());
210 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700211 }
212 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700213 TestDetachedBufferWriter writer(logfile1);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800214 writer.QueueSpan(config1.span());
215 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700216 }
217
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700218 // When parts are sorted, we choose the highest max out of order duration for
219 // all parts with the same part uuid.
Austin Schuhc41603c2020-10-11 16:17:37 -0700220 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
221
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700222 EXPECT_EQ(parts.size(), 1);
223 EXPECT_EQ(parts[0].parts.size(), 1);
224
Austin Schuhc41603c2020-10-11 16:17:37 -0700225 PartsMessageReader reader(parts[0].parts[0]);
226
227 EXPECT_EQ(reader.filename(), logfile0);
228
229 // Confirm that the timestamps track, and the filename also updates.
230 // Read the first message.
231 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700232 // Since config1 has higher max out of order duration, that will be used to
233 // read partfiles with same part uuid, i.e logfile0 and logfile1.
Austin Schuhc41603c2020-10-11 16:17:37 -0700234 EXPECT_EQ(
235 reader.max_out_of_order_duration(),
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700236 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuhc41603c2020-10-11 16:17:37 -0700237 EXPECT_TRUE(reader.ReadMessage());
238 EXPECT_EQ(reader.filename(), logfile0);
239 EXPECT_EQ(reader.newest_timestamp(),
240 monotonic_clock::time_point(chrono::nanoseconds(1)));
241 EXPECT_EQ(
242 reader.max_out_of_order_duration(),
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700243 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuhc41603c2020-10-11 16:17:37 -0700244
245 // Read the second message.
246 EXPECT_TRUE(reader.ReadMessage());
247 EXPECT_EQ(reader.filename(), logfile1);
248 EXPECT_EQ(reader.newest_timestamp(),
249 monotonic_clock::time_point(chrono::nanoseconds(2)));
250 EXPECT_EQ(
251 reader.max_out_of_order_duration(),
252 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
253
254 // And then confirm that reading again returns no message.
255 EXPECT_FALSE(reader.ReadMessage());
256 EXPECT_EQ(reader.filename(), logfile1);
257 EXPECT_EQ(
258 reader.max_out_of_order_duration(),
259 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800260 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700261
262 // Verify that the parts metadata has the correct max out of order duration.
263 EXPECT_EQ(
264 parts[0].parts[0].max_out_of_order_duration,
265 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuhc41603c2020-10-11 16:17:37 -0700266}
Austin Schuh32f68492020-11-08 21:45:51 -0800267
Austin Schuh1be0ce42020-11-29 22:43:26 -0800268// Tests that Message's operator < works as expected.
269TEST(MessageTest, Sorting) {
270 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
271
272 Message m1{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700273 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700274 .timestamp =
275 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
Austin Schuh48507722021-07-17 17:29:24 -0700276 .monotonic_remote_boot = 0xffffff,
277 .monotonic_timestamp_boot = 0xffffff,
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700278 .header = nullptr,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700279 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800280 Message m2{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700281 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700282 .timestamp =
283 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
Austin Schuh48507722021-07-17 17:29:24 -0700284 .monotonic_remote_boot = 0xffffff,
285 .monotonic_timestamp_boot = 0xffffff,
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700286 .header = nullptr,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700287 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800288
289 EXPECT_LT(m1, m2);
290 EXPECT_GE(m2, m1);
291
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700292 m1.timestamp.time = e;
293 m2.timestamp.time = e;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800294
295 m1.channel_index = 1;
296 m2.channel_index = 2;
297
298 EXPECT_LT(m1, m2);
299 EXPECT_GE(m2, m1);
300
301 m1.channel_index = 0;
302 m2.channel_index = 0;
Austin Schuh58646e22021-08-23 23:51:46 -0700303 m1.queue_index.index = 0u;
304 m2.queue_index.index = 1u;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800305
306 EXPECT_LT(m1, m2);
307 EXPECT_GE(m2, m1);
308}
309
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800310aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
311 const aos::FlatbufferDetachedBuffer<Configuration> &config,
312 const std::string_view json) {
313 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700314 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800315 flatbuffers::Offset<Configuration> config_offset =
316 aos::CopyFlatBuffer(config, &fbb);
317 LogFileHeader::Builder header_builder(fbb);
318 header_builder.add_configuration(config_offset);
319 fbb.Finish(header_builder.Finish());
320 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
321
322 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
323 JsonToFlatbuffer<LogFileHeader>(json));
324 CHECK(header_updates.Verify());
325 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700326 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800327 fbb2.FinishSizePrefixed(
328 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
329 return fbb2.Release();
330}
331
Philipp Schrader416505b2024-03-28 11:59:45 -0700332// Allows for some customization of a SortingElementTest.
333enum class SortingElementConfig {
334 // Create a single node configuration.
335 kSingleNode,
336 // Create a multi-node configuration.
337 kMultiNode,
338};
339
340template <SortingElementConfig sorting_element_config =
341 SortingElementConfig::kMultiNode>
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800342class SortingElementTest : public ::testing::Test {
343 public:
344 SortingElementTest()
345 : config_(JsonToFlatbuffer<Configuration>(
Philipp Schrader416505b2024-03-28 11:59:45 -0700346 sorting_element_config == SortingElementConfig::kSingleNode ?
347 R"({
348 "channels": [
349 {
350 "name": "/a",
351 "type": "aos.logger.testing.TestMessage"
352 },
353 {
354 "name": "/b",
355 "type": "aos.logger.testing.TestMessage"
356 },
357 {
358 "name": "/c",
359 "type": "aos.logger.testing.TestMessage"
360 },
361 {
362 "name": "/d",
363 "type": "aos.logger.testing.TestMessage"
364 }
365 ]
366}
367)"
368 :
369 R"({
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800370 "channels": [
371 {
372 "name": "/a",
373 "type": "aos.logger.testing.TestMessage",
374 "source_node": "pi1",
375 "destination_nodes": [
376 {
377 "name": "pi2"
378 },
379 {
380 "name": "pi3"
381 }
382 ]
383 },
384 {
385 "name": "/b",
386 "type": "aos.logger.testing.TestMessage",
387 "source_node": "pi1"
388 },
389 {
390 "name": "/c",
391 "type": "aos.logger.testing.TestMessage",
392 "source_node": "pi1"
Austin Schuh48507722021-07-17 17:29:24 -0700393 },
394 {
395 "name": "/d",
396 "type": "aos.logger.testing.TestMessage",
397 "source_node": "pi2",
398 "destination_nodes": [
399 {
400 "name": "pi1"
401 }
402 ]
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800403 }
404 ],
405 "nodes": [
406 {
407 "name": "pi1"
408 },
409 {
410 "name": "pi2"
411 },
412 {
413 "name": "pi3"
414 }
415 ]
416}
417)")),
Philipp Schrader416505b2024-03-28 11:59:45 -0700418 config0_(MakeHeader(
419 config_, sorting_element_config == SortingElementConfig::kSingleNode
420 ?
421 R"({
422 /* 100ms */
423 "max_out_of_order_duration": 100000000,
424 "node": {
425 "name": "pi1"
426 },
427 "logger_node": {
428 "name": "pi1"
429 },
430 "monotonic_start_time": 1000000,
431 "realtime_start_time": 1000000000000,
432 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
433 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
434 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
435 "boot_uuids": [
436 "1d782c63-b3c7-466e-bea9-a01308b43333",
437 ],
438 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
439 "parts_index": 0
440})"
441 :
442 R"({
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800443 /* 100ms */
444 "max_out_of_order_duration": 100000000,
445 "node": {
446 "name": "pi1"
447 },
448 "logger_node": {
449 "name": "pi1"
450 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800451 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800452 "realtime_start_time": 1000000000000,
453 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700454 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
455 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
456 "boot_uuids": [
457 "1d782c63-b3c7-466e-bea9-a01308b43333",
458 "",
459 ""
460 ],
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800461 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
462 "parts_index": 0
463})")),
464 config1_(MakeHeader(config_,
465 R"({
466 /* 100ms */
467 "max_out_of_order_duration": 100000000,
468 "node": {
469 "name": "pi1"
470 },
471 "logger_node": {
472 "name": "pi1"
473 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800474 "monotonic_start_time": 1000000,
475 "realtime_start_time": 1000000000000,
476 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700477 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
478 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
479 "boot_uuids": [
480 "1d782c63-b3c7-466e-bea9-a01308b43333",
481 "",
482 ""
483 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800484 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
485 "parts_index": 0
486})")),
487 config2_(MakeHeader(config_,
488 R"({
489 /* 100ms */
490 "max_out_of_order_duration": 100000000,
491 "node": {
492 "name": "pi2"
493 },
494 "logger_node": {
495 "name": "pi2"
496 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800497 "monotonic_start_time": 0,
498 "realtime_start_time": 1000000000000,
Austin Schuh48507722021-07-17 17:29:24 -0700499 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
500 "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
501 "boot_uuids": [
502 "",
503 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
504 ""
505 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800506 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
507 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
508 "parts_index": 0
509})")),
510 config3_(MakeHeader(config_,
511 R"({
512 /* 100ms */
513 "max_out_of_order_duration": 100000000,
514 "node": {
515 "name": "pi1"
516 },
517 "logger_node": {
518 "name": "pi1"
519 },
520 "monotonic_start_time": 2000000,
521 "realtime_start_time": 1000000000,
522 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
Austin Schuh48507722021-07-17 17:29:24 -0700523 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
524 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
525 "boot_uuids": [
526 "1d782c63-b3c7-466e-bea9-a01308b43333",
527 "",
528 ""
529 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800530 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800531 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800532})")),
533 config4_(MakeHeader(config_,
534 R"({
535 /* 100ms */
536 "max_out_of_order_duration": 100000000,
537 "node": {
538 "name": "pi2"
539 },
540 "logger_node": {
541 "name": "pi1"
542 },
543 "monotonic_start_time": 2000000,
544 "realtime_start_time": 1000000000,
545 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
546 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
Austin Schuh48507722021-07-17 17:29:24 -0700547 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
548 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
549 "boot_uuids": [
550 "1d782c63-b3c7-466e-bea9-a01308b43333",
551 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
552 ""
553 ],
Austin Schuh8bf1e632021-01-02 22:41:04 -0800554 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800555})")) {
556 unlink(logfile0_.c_str());
557 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800558 unlink(logfile2_.c_str());
Austin Schuh48507722021-07-17 17:29:24 -0700559 unlink(logfile3_.c_str());
Austin Schuh921b2562021-07-31 19:37:03 -0700560 queue_index_.resize(config_.message().channels()->size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800561 }
562
563 protected:
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800564 flatbuffers::DetachedBuffer MakeLogMessage(
565 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
566 int value) {
567 flatbuffers::FlatBufferBuilder message_fbb;
568 message_fbb.ForceDefaults(true);
569 TestMessage::Builder test_message_builder(message_fbb);
570 test_message_builder.add_value(value);
571 message_fbb.Finish(test_message_builder.Finish());
572
573 aos::Context context;
574 context.monotonic_event_time = monotonic_now;
575 context.realtime_event_time = aos::realtime_clock::epoch() +
576 chrono::seconds(1000) +
577 monotonic_now.time_since_epoch();
Austin Schuh921b2562021-07-31 19:37:03 -0700578 CHECK_LT(static_cast<size_t>(channel_index), queue_index_.size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800579 context.queue_index = queue_index_[channel_index];
580 context.size = message_fbb.GetSize();
581 context.data = message_fbb.GetBufferPointer();
582
583 ++queue_index_[channel_index];
584
585 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh53e3a812021-11-03 16:57:23 -0700586 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800587 fbb.FinishSizePrefixed(
588 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
589
590 return fbb.Release();
591 }
592
593 flatbuffers::DetachedBuffer MakeTimestampMessage(
594 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800595 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
596 monotonic_clock::time_point monotonic_timestamp_time =
597 monotonic_clock::min_time) {
598 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800599 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800600
601 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800602 fbb.ForceDefaults(true);
603
604 logger::MessageHeader::Builder message_header_builder(fbb);
605
606 message_header_builder.add_channel_index(channel_index);
607
608 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
609 100);
610 message_header_builder.add_monotonic_sent_time(
611 monotonic_sent_time.time_since_epoch().count());
612 message_header_builder.add_realtime_sent_time(
613 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
614 monotonic_sent_time.time_since_epoch())
615 .time_since_epoch()
616 .count());
617
618 message_header_builder.add_monotonic_remote_time(
619 sender_monotonic_now.time_since_epoch().count());
620 message_header_builder.add_realtime_remote_time(
621 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
622 sender_monotonic_now.time_since_epoch())
623 .time_since_epoch()
624 .count());
625 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
626 1);
627
628 if (monotonic_timestamp_time != monotonic_clock::min_time) {
629 message_header_builder.add_monotonic_timestamp_time(
630 monotonic_timestamp_time.time_since_epoch().count());
631 }
632
633 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800634 LOG(INFO) << aos::FlatbufferToJson(
635 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
636 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
637
638 return fbb.Release();
639 }
640
641 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
642 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800643 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh48507722021-07-17 17:29:24 -0700644 const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800645
646 const aos::FlatbufferDetachedBuffer<Configuration> config_;
647 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
648 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800649 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
650 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800651 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800652
653 std::vector<uint32_t> queue_index_;
654};
655
Philipp Schrader416505b2024-03-28 11:59:45 -0700656using MessageSorterTest = SortingElementTest<SortingElementConfig::kMultiNode>;
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700657using MessageSorterDeathTest = MessageSorterTest;
Philipp Schrader416505b2024-03-28 11:59:45 -0700658using PartsMergerTest = SortingElementTest<SortingElementConfig::kMultiNode>;
659using TimestampMapperTest =
660 SortingElementTest<SortingElementConfig::kMultiNode>;
661using SingleNodeTimestampMapperTest =
662 SortingElementTest<SortingElementConfig::kSingleNode>;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800663
664// Tests that we can pull messages out of a log sorted in order.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700665TEST_F(MessageSorterTest, Pull) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800666 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
667 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700668 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800669 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700670 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800671 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700672 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800673 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700674 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800675 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700676 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800677 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
678 }
679
680 const std::vector<LogFile> parts = SortParts({logfile0_});
681
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700682 MessageSorter message_sorter(parts[0].parts[0]);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800683
684 // Confirm we aren't sorted until any time until the message is popped.
685 // Peeking shouldn't change the sorted until time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700686 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800687
688 std::deque<Message> output;
689
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700690 ASSERT_TRUE(message_sorter.Front() != nullptr);
691 output.emplace_back(std::move(*message_sorter.Front()));
692 message_sorter.PopFront();
693 EXPECT_EQ(message_sorter.sorted_until(), e + chrono::milliseconds(1900));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800694
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700695 ASSERT_TRUE(message_sorter.Front() != nullptr);
696 output.emplace_back(std::move(*message_sorter.Front()));
697 message_sorter.PopFront();
698 EXPECT_EQ(message_sorter.sorted_until(), e + chrono::milliseconds(1900));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800699
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700700 ASSERT_TRUE(message_sorter.Front() != nullptr);
701 output.emplace_back(std::move(*message_sorter.Front()));
702 message_sorter.PopFront();
703 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::max_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800704
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700705 ASSERT_TRUE(message_sorter.Front() != nullptr);
706 output.emplace_back(std::move(*message_sorter.Front()));
707 message_sorter.PopFront();
708 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::max_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800709
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700710 ASSERT_TRUE(message_sorter.Front() == nullptr);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800711
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700712 EXPECT_EQ(output[0].timestamp.boot, 0);
713 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
714 EXPECT_EQ(output[1].timestamp.boot, 0);
715 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1000));
716 EXPECT_EQ(output[2].timestamp.boot, 0);
717 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1901));
718 EXPECT_EQ(output[3].timestamp.boot, 0);
719 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800720}
721
Austin Schuhb000de62020-12-03 22:00:40 -0800722// Tests that we can pull messages out of a log sorted in order.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700723TEST_F(MessageSorterTest, WayBeforeStart) {
Austin Schuhb000de62020-12-03 22:00:40 -0800724 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
725 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700726 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhb000de62020-12-03 22:00:40 -0800727 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700728 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800729 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700730 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800731 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700732 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800733 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700734 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800735 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700736 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800737 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
738 }
739
740 const std::vector<LogFile> parts = SortParts({logfile0_});
741
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700742 MessageSorter message_sorter(parts[0].parts[0]);
Austin Schuhb000de62020-12-03 22:00:40 -0800743
744 // Confirm we aren't sorted until any time until the message is popped.
745 // Peeking shouldn't change the sorted until time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700746 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
Austin Schuhb000de62020-12-03 22:00:40 -0800747
748 std::deque<Message> output;
749
750 for (monotonic_clock::time_point t :
751 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
752 e + chrono::milliseconds(1900), monotonic_clock::max_time,
753 monotonic_clock::max_time}) {
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700754 ASSERT_TRUE(message_sorter.Front() != nullptr);
755 output.emplace_back(std::move(*message_sorter.Front()));
756 message_sorter.PopFront();
757 EXPECT_EQ(message_sorter.sorted_until(), t);
Austin Schuhb000de62020-12-03 22:00:40 -0800758 }
759
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700760 ASSERT_TRUE(message_sorter.Front() == nullptr);
Austin Schuhb000de62020-12-03 22:00:40 -0800761
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700762 EXPECT_EQ(output[0].timestamp.boot, 0u);
763 EXPECT_EQ(output[0].timestamp.time, e - chrono::milliseconds(1000));
764 EXPECT_EQ(output[1].timestamp.boot, 0u);
765 EXPECT_EQ(output[1].timestamp.time, e - chrono::milliseconds(500));
766 EXPECT_EQ(output[2].timestamp.boot, 0u);
767 EXPECT_EQ(output[2].timestamp.time, e - chrono::milliseconds(10));
768 EXPECT_EQ(output[3].timestamp.boot, 0u);
769 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(1901));
770 EXPECT_EQ(output[4].timestamp.boot, 0u);
771 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuhb000de62020-12-03 22:00:40 -0800772}
773
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800774// Tests that messages too far out of order trigger death.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700775TEST_F(MessageSorterDeathTest, Pull) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800776 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
777 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700778 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800779 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700780 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800781 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700782 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800783 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700784 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800785 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
786 // The following message is too far out of order and will trigger the CHECK.
Austin Schuhd863e6e2022-10-16 15:44:50 -0700787 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800788 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
789 }
790
791 const std::vector<LogFile> parts = SortParts({logfile0_});
792
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700793 MessageSorter message_sorter(parts[0].parts[0]);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800794
795 // Confirm we aren't sorted until any time until the message is popped.
796 // Peeking shouldn't change the sorted until time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700797 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800798 std::deque<Message> output;
799
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700800 ASSERT_TRUE(message_sorter.Front() != nullptr);
801 message_sorter.PopFront();
802 ASSERT_TRUE(message_sorter.Front() != nullptr);
803 ASSERT_TRUE(message_sorter.Front() != nullptr);
804 message_sorter.PopFront();
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800805
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700806 EXPECT_DEATH({ message_sorter.Front(); },
Austin Schuh58646e22021-08-23 23:51:46 -0700807 "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800808}
809
Austin Schuh8f52ed52020-11-30 23:12:39 -0800810// Tests that we can merge data from 2 separate files, including duplicate data.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700811TEST_F(PartsMergerTest, TwoFileMerger) {
Austin Schuh8f52ed52020-11-30 23:12:39 -0800812 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
813 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700814 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800815 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700816 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800817 writer1.QueueSpan(config1_.span());
818
Austin Schuhd863e6e2022-10-16 15:44:50 -0700819 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800820 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700821 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800822 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
823
Austin Schuhd863e6e2022-10-16 15:44:50 -0700824 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800825 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700826 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800827 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
828
829 // Make a duplicate!
830 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
831 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
832 writer0.QueueSpan(msg.span());
833 writer1.QueueSpan(msg.span());
834
Austin Schuhd863e6e2022-10-16 15:44:50 -0700835 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800836 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
837 }
838
839 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -0700840 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -0800841 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800842
Austin Schuh63097262023-08-16 17:04:29 -0700843 PartsMerger merger(
844 log_files.SelectParts("pi1", 0,
845 {StoredDataType::DATA, StoredDataType::TIMESTAMPS,
846 StoredDataType::REMOTE_TIMESTAMPS}));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800847
848 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
849
850 std::deque<Message> output;
851
852 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
853 ASSERT_TRUE(merger.Front() != nullptr);
854 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
855
856 output.emplace_back(std::move(*merger.Front()));
857 merger.PopFront();
858 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
859
860 ASSERT_TRUE(merger.Front() != nullptr);
861 output.emplace_back(std::move(*merger.Front()));
862 merger.PopFront();
863 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
864
865 ASSERT_TRUE(merger.Front() != nullptr);
866 output.emplace_back(std::move(*merger.Front()));
867 merger.PopFront();
868 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
869
870 ASSERT_TRUE(merger.Front() != nullptr);
871 output.emplace_back(std::move(*merger.Front()));
872 merger.PopFront();
873 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
874
875 ASSERT_TRUE(merger.Front() != nullptr);
876 output.emplace_back(std::move(*merger.Front()));
877 merger.PopFront();
878 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
879
880 ASSERT_TRUE(merger.Front() != nullptr);
881 output.emplace_back(std::move(*merger.Front()));
882 merger.PopFront();
883 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
884
885 ASSERT_TRUE(merger.Front() == nullptr);
886
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700887 EXPECT_EQ(output[0].timestamp.boot, 0u);
888 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
889 EXPECT_EQ(output[1].timestamp.boot, 0u);
890 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1001));
891 EXPECT_EQ(output[2].timestamp.boot, 0u);
892 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1002));
893 EXPECT_EQ(output[3].timestamp.boot, 0u);
894 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
895 EXPECT_EQ(output[4].timestamp.boot, 0u);
896 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(3000));
897 EXPECT_EQ(output[5].timestamp.boot, 0u);
898 EXPECT_EQ(output[5].timestamp.time, e + chrono::milliseconds(3002));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800899}
900
Austin Schuh8bf1e632021-01-02 22:41:04 -0800901// Tests that we can merge timestamps with various combinations of
902// monotonic_timestamp_time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700903TEST_F(PartsMergerTest, TwoFileTimestampMerger) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800904 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
905 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700906 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800907 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700908 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800909 writer1.QueueSpan(config1_.span());
910
911 // Neither has it.
912 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700913 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800914 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700915 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800916 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
917
918 // First only has it.
919 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700920 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800921 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
922 e + chrono::nanoseconds(971)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700923 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800924 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
925
926 // Second only has it.
927 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700928 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800929 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700930 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800931 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
932 e + chrono::nanoseconds(972)));
933
934 // Both have it.
935 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700936 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800937 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
938 e + chrono::nanoseconds(973)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700939 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800940 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
941 e + chrono::nanoseconds(973)));
942 }
943
944 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -0700945 LogFilesContainer log_files(parts);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800946 ASSERT_EQ(parts.size(), 1u);
947
Austin Schuh63097262023-08-16 17:04:29 -0700948 PartsMerger merger(
949 log_files.SelectParts("pi1", 0,
950 {StoredDataType::DATA, StoredDataType::TIMESTAMPS,
951 StoredDataType::REMOTE_TIMESTAMPS}));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800952
953 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
954
955 std::deque<Message> output;
956
957 for (int i = 0; i < 4; ++i) {
958 ASSERT_TRUE(merger.Front() != nullptr);
959 output.emplace_back(std::move(*merger.Front()));
960 merger.PopFront();
961 }
962 ASSERT_TRUE(merger.Front() == nullptr);
963
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700964 EXPECT_EQ(output[0].timestamp.boot, 0u);
965 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700966 EXPECT_FALSE(output[0].header->has_monotonic_timestamp_time);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700967
968 EXPECT_EQ(output[1].timestamp.boot, 0u);
969 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700970 EXPECT_TRUE(output[1].header->has_monotonic_timestamp_time);
971 EXPECT_EQ(output[1].header->monotonic_timestamp_time,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700972 monotonic_clock::time_point(std::chrono::nanoseconds(971)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700973
974 EXPECT_EQ(output[2].timestamp.boot, 0u);
975 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700976 EXPECT_TRUE(output[2].header->has_monotonic_timestamp_time);
977 EXPECT_EQ(output[2].header->monotonic_timestamp_time,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700978 monotonic_clock::time_point(std::chrono::nanoseconds(972)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700979
980 EXPECT_EQ(output[3].timestamp.boot, 0u);
981 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700982 EXPECT_TRUE(output[3].header->has_monotonic_timestamp_time);
983 EXPECT_EQ(output[3].header->monotonic_timestamp_time,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700984 monotonic_clock::time_point(std::chrono::nanoseconds(973)));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800985}
986
Austin Schuhd2f96102020-12-01 20:27:29 -0800987// Tests that we can match timestamps on delivered messages.
988TEST_F(TimestampMapperTest, ReadNode0First) {
989 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
990 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700991 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800992 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700993 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800994 writer1.QueueSpan(config2_.span());
995
Austin Schuhd863e6e2022-10-16 15:44:50 -0700996 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800997 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700998 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800999 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1000
Austin Schuhd863e6e2022-10-16 15:44:50 -07001001 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001002 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001003 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001004 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1005
Austin Schuhd863e6e2022-10-16 15:44:50 -07001006 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001007 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001008 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001009 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1010 }
1011
1012 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001013 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001014 ASSERT_EQ(parts[0].logger_node, "pi1");
1015 ASSERT_EQ(parts[1].logger_node, "pi2");
1016
Austin Schuh79b30942021-01-24 22:32:21 -08001017 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001018
Austin Schuh63097262023-08-16 17:04:29 -07001019 TimestampMapper mapper0("pi1", log_files,
1020 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001021 mapper0.set_timestamp_callback(
1022 [&](TimestampedMessage *) { ++mapper0_count; });
1023 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001024 TimestampMapper mapper1("pi2", log_files,
1025 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001026 mapper1.set_timestamp_callback(
1027 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001028
1029 mapper0.AddPeer(&mapper1);
1030 mapper1.AddPeer(&mapper0);
1031
1032 {
1033 std::deque<TimestampedMessage> output0;
1034
Austin Schuh79b30942021-01-24 22:32:21 -08001035 EXPECT_EQ(mapper0_count, 0u);
1036 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001037 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001038 EXPECT_EQ(mapper0_count, 1u);
1039 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001040 output0.emplace_back(std::move(*mapper0.Front()));
1041 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001042 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -08001043 EXPECT_EQ(mapper0_count, 1u);
1044 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001045
1046 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001047 EXPECT_EQ(mapper0_count, 2u);
1048 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001049 output0.emplace_back(std::move(*mapper0.Front()));
1050 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001051 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001052
1053 ASSERT_TRUE(mapper0.Front() != nullptr);
1054 output0.emplace_back(std::move(*mapper0.Front()));
1055 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001056 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001057
Austin Schuh79b30942021-01-24 22:32:21 -08001058 EXPECT_EQ(mapper0_count, 3u);
1059 EXPECT_EQ(mapper1_count, 0u);
1060
Austin Schuhd2f96102020-12-01 20:27:29 -08001061 ASSERT_TRUE(mapper0.Front() == nullptr);
1062
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001063 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1064 EXPECT_EQ(output0[0].monotonic_event_time.time,
1065 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001066 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001067
1068 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1069 EXPECT_EQ(output0[1].monotonic_event_time.time,
1070 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001071 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001072
1073 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1074 EXPECT_EQ(output0[2].monotonic_event_time.time,
1075 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001076 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001077 }
1078
1079 {
1080 SCOPED_TRACE("Trying node1 now");
1081 std::deque<TimestampedMessage> output1;
1082
Austin Schuh79b30942021-01-24 22:32:21 -08001083 EXPECT_EQ(mapper0_count, 3u);
1084 EXPECT_EQ(mapper1_count, 0u);
1085
Austin Schuhd2f96102020-12-01 20:27:29 -08001086 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001087 EXPECT_EQ(mapper0_count, 3u);
1088 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001089 output1.emplace_back(std::move(*mapper1.Front()));
1090 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001091 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -08001092 EXPECT_EQ(mapper0_count, 3u);
1093 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001094
1095 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001096 EXPECT_EQ(mapper0_count, 3u);
1097 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001098 output1.emplace_back(std::move(*mapper1.Front()));
1099 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001100 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001101
1102 ASSERT_TRUE(mapper1.Front() != nullptr);
1103 output1.emplace_back(std::move(*mapper1.Front()));
1104 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001105 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001106
Austin Schuh79b30942021-01-24 22:32:21 -08001107 EXPECT_EQ(mapper0_count, 3u);
1108 EXPECT_EQ(mapper1_count, 3u);
1109
Austin Schuhd2f96102020-12-01 20:27:29 -08001110 ASSERT_TRUE(mapper1.Front() == nullptr);
1111
Austin Schuh79b30942021-01-24 22:32:21 -08001112 EXPECT_EQ(mapper0_count, 3u);
1113 EXPECT_EQ(mapper1_count, 3u);
1114
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001115 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1116 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001117 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001118 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001119
1120 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1121 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001122 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001123 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001124
1125 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1126 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001127 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001128 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001129 }
1130}
1131
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001132// Tests that we filter messages using the channel filter callback
1133TEST_F(TimestampMapperTest, ReplayChannelsCallbackTest) {
1134 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1135 {
1136 TestDetachedBufferWriter writer0(logfile0_);
1137 writer0.QueueSpan(config0_.span());
1138 TestDetachedBufferWriter writer1(logfile1_);
1139 writer1.QueueSpan(config2_.span());
1140
1141 writer0.WriteSizedFlatbuffer(
1142 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1143 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1144 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1145
1146 writer0.WriteSizedFlatbuffer(
1147 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1148 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1149 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1150
1151 writer0.WriteSizedFlatbuffer(
1152 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1153 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1154 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1155 }
1156
1157 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001158 LogFilesContainer log_files(parts);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001159 ASSERT_EQ(parts[0].logger_node, "pi1");
1160 ASSERT_EQ(parts[1].logger_node, "pi2");
1161
1162 // mapper0 will not provide any messages while mapper1 will provide all
1163 // messages due to the channel filter callbacks used
1164 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001165
Austin Schuh63097262023-08-16 17:04:29 -07001166 TimestampMapper mapper0("pi1", log_files,
1167 TimestampQueueStrategy::kQueueTogether);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001168 mapper0.set_timestamp_callback(
1169 [&](TimestampedMessage *) { ++mapper0_count; });
1170 mapper0.set_replay_channels_callback(
1171 [&](const TimestampedMessage &) -> bool { return mapper0_count != 2; });
1172 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001173 TimestampMapper mapper1("pi2", log_files,
1174 TimestampQueueStrategy::kQueueTogether);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001175 mapper1.set_timestamp_callback(
1176 [&](TimestampedMessage *) { ++mapper1_count; });
1177 mapper1.set_replay_channels_callback(
1178 [&](const TimestampedMessage &) -> bool { return mapper1_count != 2; });
1179
1180 mapper0.AddPeer(&mapper1);
1181 mapper1.AddPeer(&mapper0);
1182
1183 {
1184 std::deque<TimestampedMessage> output0;
1185
1186 EXPECT_EQ(mapper0_count, 0u);
1187 EXPECT_EQ(mapper1_count, 0u);
1188
1189 ASSERT_TRUE(mapper0.Front() != nullptr);
1190 EXPECT_EQ(mapper0_count, 1u);
1191 EXPECT_EQ(mapper1_count, 0u);
1192 output0.emplace_back(std::move(*mapper0.Front()));
1193 mapper0.PopFront();
1194
1195 EXPECT_TRUE(mapper0.started());
1196 EXPECT_EQ(mapper0_count, 1u);
1197 EXPECT_EQ(mapper1_count, 0u);
1198
1199 // mapper0_count is now at 3 since the second message is not queued, but
1200 // timestamp_callback needs to be called everytime even if Front() does not
1201 // provide a message due to the replay_channels_callback.
1202 ASSERT_TRUE(mapper0.Front() != nullptr);
1203 EXPECT_EQ(mapper0_count, 3u);
1204 EXPECT_EQ(mapper1_count, 0u);
1205 output0.emplace_back(std::move(*mapper0.Front()));
1206 mapper0.PopFront();
1207
1208 EXPECT_TRUE(mapper0.started());
1209 EXPECT_EQ(mapper0_count, 3u);
1210 EXPECT_EQ(mapper1_count, 0u);
1211
1212 ASSERT_TRUE(mapper0.Front() == nullptr);
1213 EXPECT_TRUE(mapper0.started());
1214
1215 EXPECT_EQ(mapper0_count, 3u);
1216 EXPECT_EQ(mapper1_count, 0u);
1217
1218 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1219 EXPECT_EQ(output0[0].monotonic_event_time.time,
1220 e + chrono::milliseconds(1000));
1221 EXPECT_TRUE(output0[0].data != nullptr);
1222
1223 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1224 EXPECT_EQ(output0[1].monotonic_event_time.time,
1225 e + chrono::milliseconds(3000));
1226 EXPECT_TRUE(output0[1].data != nullptr);
1227 }
1228
1229 {
1230 SCOPED_TRACE("Trying node1 now");
1231 std::deque<TimestampedMessage> output1;
1232
1233 EXPECT_EQ(mapper0_count, 3u);
1234 EXPECT_EQ(mapper1_count, 0u);
1235
1236 ASSERT_TRUE(mapper1.Front() != nullptr);
1237 EXPECT_EQ(mapper0_count, 3u);
1238 EXPECT_EQ(mapper1_count, 1u);
1239 output1.emplace_back(std::move(*mapper1.Front()));
1240 mapper1.PopFront();
1241 EXPECT_TRUE(mapper1.started());
1242 EXPECT_EQ(mapper0_count, 3u);
1243 EXPECT_EQ(mapper1_count, 1u);
1244
1245 // mapper1_count is now at 3 since the second message is not queued, but
1246 // timestamp_callback needs to be called everytime even if Front() does not
1247 // provide a message due to the replay_channels_callback.
1248 ASSERT_TRUE(mapper1.Front() != nullptr);
1249 output1.emplace_back(std::move(*mapper1.Front()));
1250 mapper1.PopFront();
1251 EXPECT_TRUE(mapper1.started());
1252
1253 EXPECT_EQ(mapper0_count, 3u);
1254 EXPECT_EQ(mapper1_count, 3u);
1255
1256 ASSERT_TRUE(mapper1.Front() == nullptr);
1257
1258 EXPECT_EQ(mapper0_count, 3u);
1259 EXPECT_EQ(mapper1_count, 3u);
1260
1261 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1262 EXPECT_EQ(output1[0].monotonic_event_time.time,
1263 e + chrono::seconds(100) + chrono::milliseconds(1000));
1264 EXPECT_TRUE(output1[0].data != nullptr);
1265
1266 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1267 EXPECT_EQ(output1[1].monotonic_event_time.time,
1268 e + chrono::seconds(100) + chrono::milliseconds(3000));
1269 EXPECT_TRUE(output1[1].data != nullptr);
1270 }
1271}
Austin Schuh8bf1e632021-01-02 22:41:04 -08001272// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
1273// returned.
1274TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
1275 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1276 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001277 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001278 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001279 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001280 writer1.QueueSpan(config4_.span());
1281
Austin Schuhd863e6e2022-10-16 15:44:50 -07001282 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001283 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001284 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001285 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
1286 e + chrono::nanoseconds(971)));
1287
Austin Schuhd863e6e2022-10-16 15:44:50 -07001288 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001289 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001290 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001291 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
1292 e + chrono::nanoseconds(5458)));
1293
Austin Schuhd863e6e2022-10-16 15:44:50 -07001294 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001295 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001296 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001297 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1298 }
1299
1300 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001301 LogFilesContainer log_files(parts);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001302 ASSERT_EQ(parts.size(), 1u);
1303
Austin Schuh79b30942021-01-24 22:32:21 -08001304 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001305 TimestampMapper mapper0("pi1", log_files,
1306 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001307 mapper0.set_timestamp_callback(
1308 [&](TimestampedMessage *) { ++mapper0_count; });
1309 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001310 TimestampMapper mapper1("pi2", log_files,
1311 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001312 mapper1.set_timestamp_callback(
1313 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -08001314
1315 mapper0.AddPeer(&mapper1);
1316 mapper1.AddPeer(&mapper0);
1317
1318 {
1319 std::deque<TimestampedMessage> output0;
1320
1321 for (int i = 0; i < 3; ++i) {
1322 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
1323 output0.emplace_back(std::move(*mapper0.Front()));
1324 mapper0.PopFront();
1325 }
1326
1327 ASSERT_TRUE(mapper0.Front() == nullptr);
1328
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001329 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1330 EXPECT_EQ(output0[0].monotonic_event_time.time,
1331 e + chrono::milliseconds(1000));
1332 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
1333 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
1334 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001335 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001336
1337 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1338 EXPECT_EQ(output0[1].monotonic_event_time.time,
1339 e + chrono::milliseconds(2000));
1340 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
1341 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
1342 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001343 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001344
1345 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1346 EXPECT_EQ(output0[2].monotonic_event_time.time,
1347 e + chrono::milliseconds(3000));
1348 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
1349 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
1350 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001351 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001352 }
1353
1354 {
1355 SCOPED_TRACE("Trying node1 now");
1356 std::deque<TimestampedMessage> output1;
1357
1358 for (int i = 0; i < 3; ++i) {
1359 ASSERT_TRUE(mapper1.Front() != nullptr);
1360 output1.emplace_back(std::move(*mapper1.Front()));
1361 mapper1.PopFront();
1362 }
1363
1364 ASSERT_TRUE(mapper1.Front() == nullptr);
1365
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001366 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1367 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001368 e + chrono::seconds(100) + chrono::milliseconds(1000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001369 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
1370 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001371 e + chrono::nanoseconds(971));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001372 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001373
1374 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1375 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001376 e + chrono::seconds(100) + chrono::milliseconds(2000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001377 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
1378 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001379 e + chrono::nanoseconds(5458));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001380 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001381
1382 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1383 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001384 e + chrono::seconds(100) + chrono::milliseconds(3000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001385 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
1386 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
1387 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001388 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001389 }
Austin Schuh79b30942021-01-24 22:32:21 -08001390
1391 EXPECT_EQ(mapper0_count, 3u);
1392 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001393}
1394
Austin Schuhd2f96102020-12-01 20:27:29 -08001395// Tests that we can match timestamps on delivered messages. By doing this in
1396// the reverse order, the second node needs to queue data up from the first node
1397// to find the matching timestamp.
1398TEST_F(TimestampMapperTest, ReadNode1First) {
1399 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1400 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001401 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001402 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001403 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001404 writer1.QueueSpan(config2_.span());
1405
Austin Schuhd863e6e2022-10-16 15:44:50 -07001406 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001407 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001408 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001409 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1410
Austin Schuhd863e6e2022-10-16 15:44:50 -07001411 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001412 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001413 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001414 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1415
Austin Schuhd863e6e2022-10-16 15:44:50 -07001416 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001417 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001418 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001419 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1420 }
1421
1422 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001423 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001424
1425 ASSERT_EQ(parts[0].logger_node, "pi1");
1426 ASSERT_EQ(parts[1].logger_node, "pi2");
1427
Austin Schuh79b30942021-01-24 22:32:21 -08001428 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001429 TimestampMapper mapper0("pi1", log_files,
1430 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001431 mapper0.set_timestamp_callback(
1432 [&](TimestampedMessage *) { ++mapper0_count; });
1433 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001434 TimestampMapper mapper1("pi2", log_files,
1435 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001436 mapper1.set_timestamp_callback(
1437 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001438
1439 mapper0.AddPeer(&mapper1);
1440 mapper1.AddPeer(&mapper0);
1441
1442 {
1443 SCOPED_TRACE("Trying node1 now");
1444 std::deque<TimestampedMessage> output1;
1445
1446 ASSERT_TRUE(mapper1.Front() != nullptr);
1447 output1.emplace_back(std::move(*mapper1.Front()));
1448 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001449 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001450
1451 ASSERT_TRUE(mapper1.Front() != nullptr);
1452 output1.emplace_back(std::move(*mapper1.Front()));
1453 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001454 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001455
1456 ASSERT_TRUE(mapper1.Front() != nullptr);
1457 output1.emplace_back(std::move(*mapper1.Front()));
1458 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001459 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001460
1461 ASSERT_TRUE(mapper1.Front() == nullptr);
1462
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001463 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1464 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001465 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001466 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001467
1468 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1469 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001470 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001471 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001472
1473 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1474 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001475 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001476 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001477 }
1478
1479 {
1480 std::deque<TimestampedMessage> output0;
1481
1482 ASSERT_TRUE(mapper0.Front() != nullptr);
1483 output0.emplace_back(std::move(*mapper0.Front()));
1484 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001485 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001486
1487 ASSERT_TRUE(mapper0.Front() != nullptr);
1488 output0.emplace_back(std::move(*mapper0.Front()));
1489 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001490 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001491
1492 ASSERT_TRUE(mapper0.Front() != nullptr);
1493 output0.emplace_back(std::move(*mapper0.Front()));
1494 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001495 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001496
1497 ASSERT_TRUE(mapper0.Front() == nullptr);
1498
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001499 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1500 EXPECT_EQ(output0[0].monotonic_event_time.time,
1501 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001502 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001503
1504 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1505 EXPECT_EQ(output0[1].monotonic_event_time.time,
1506 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001507 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001508
1509 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1510 EXPECT_EQ(output0[2].monotonic_event_time.time,
1511 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001512 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001513 }
Austin Schuh79b30942021-01-24 22:32:21 -08001514
1515 EXPECT_EQ(mapper0_count, 3u);
1516 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001517}
1518
1519// Tests that we return just the timestamps if we couldn't find the data and the
1520// missing data was at the beginning of the file.
1521TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1522 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1523 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001524 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001525 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001526 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001527 writer1.QueueSpan(config2_.span());
1528
1529 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001530 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001531 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1532
Austin Schuhd863e6e2022-10-16 15:44:50 -07001533 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001534 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001535 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001536 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1537
Austin Schuhd863e6e2022-10-16 15:44:50 -07001538 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001539 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001540 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001541 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1542 }
1543
1544 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001545 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001546
1547 ASSERT_EQ(parts[0].logger_node, "pi1");
1548 ASSERT_EQ(parts[1].logger_node, "pi2");
1549
Austin Schuh79b30942021-01-24 22:32:21 -08001550 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001551 TimestampMapper mapper0("pi1", log_files,
1552 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001553 mapper0.set_timestamp_callback(
1554 [&](TimestampedMessage *) { ++mapper0_count; });
1555 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001556 TimestampMapper mapper1("pi2", log_files,
1557 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001558 mapper1.set_timestamp_callback(
1559 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001560
1561 mapper0.AddPeer(&mapper1);
1562 mapper1.AddPeer(&mapper0);
1563
1564 {
1565 SCOPED_TRACE("Trying node1 now");
1566 std::deque<TimestampedMessage> output1;
1567
1568 ASSERT_TRUE(mapper1.Front() != nullptr);
1569 output1.emplace_back(std::move(*mapper1.Front()));
1570 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001571 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001572
1573 ASSERT_TRUE(mapper1.Front() != nullptr);
1574 output1.emplace_back(std::move(*mapper1.Front()));
1575 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001576 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001577
1578 ASSERT_TRUE(mapper1.Front() != nullptr);
1579 output1.emplace_back(std::move(*mapper1.Front()));
1580 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001581 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001582
1583 ASSERT_TRUE(mapper1.Front() == nullptr);
1584
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001585 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1586 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001587 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001588 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001589
1590 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1591 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001592 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001593 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001594
1595 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1596 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001597 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001598 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001599 }
Austin Schuh79b30942021-01-24 22:32:21 -08001600
1601 EXPECT_EQ(mapper0_count, 0u);
1602 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001603}
1604
1605// Tests that we return just the timestamps if we couldn't find the data and the
1606// missing data was at the end of the file.
1607TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1608 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1609 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001610 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001611 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001612 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001613 writer1.QueueSpan(config2_.span());
1614
Austin Schuhd863e6e2022-10-16 15:44:50 -07001615 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001616 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001617 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001618 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1619
Austin Schuhd863e6e2022-10-16 15:44:50 -07001620 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001621 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001622 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001623 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1624
1625 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001626 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001627 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1628 }
1629
1630 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001631 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001632
1633 ASSERT_EQ(parts[0].logger_node, "pi1");
1634 ASSERT_EQ(parts[1].logger_node, "pi2");
1635
Austin Schuh79b30942021-01-24 22:32:21 -08001636 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001637 TimestampMapper mapper0("pi1", log_files,
1638 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001639 mapper0.set_timestamp_callback(
1640 [&](TimestampedMessage *) { ++mapper0_count; });
1641 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001642 TimestampMapper mapper1("pi2", log_files,
1643 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001644 mapper1.set_timestamp_callback(
1645 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001646
1647 mapper0.AddPeer(&mapper1);
1648 mapper1.AddPeer(&mapper0);
1649
1650 {
1651 SCOPED_TRACE("Trying node1 now");
1652 std::deque<TimestampedMessage> output1;
1653
1654 ASSERT_TRUE(mapper1.Front() != nullptr);
1655 output1.emplace_back(std::move(*mapper1.Front()));
1656 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001657 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001658
1659 ASSERT_TRUE(mapper1.Front() != nullptr);
1660 output1.emplace_back(std::move(*mapper1.Front()));
1661 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001662 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001663
1664 ASSERT_TRUE(mapper1.Front() != nullptr);
1665 output1.emplace_back(std::move(*mapper1.Front()));
1666 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001667 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001668
1669 ASSERT_TRUE(mapper1.Front() == nullptr);
1670
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001671 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1672 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001673 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001674 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001675
1676 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1677 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001678 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001679 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001680
1681 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1682 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001683 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001684 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001685 }
Austin Schuh79b30942021-01-24 22:32:21 -08001686
1687 EXPECT_EQ(mapper0_count, 0u);
1688 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001689}
1690
Austin Schuh993ccb52020-12-12 15:59:32 -08001691// Tests that we handle a message which failed to forward or be logged.
1692TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1693 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1694 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001695 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001696 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001697 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001698 writer1.QueueSpan(config2_.span());
1699
Austin Schuhd863e6e2022-10-16 15:44:50 -07001700 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001701 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001702 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001703 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1704
1705 // Create both the timestamp and message, but don't log them, simulating a
1706 // forwarding drop.
1707 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1708 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1709 chrono::seconds(100));
1710
Austin Schuhd863e6e2022-10-16 15:44:50 -07001711 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001712 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001713 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001714 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1715 }
1716
1717 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001718 LogFilesContainer log_files(parts);
Austin Schuh993ccb52020-12-12 15:59:32 -08001719
1720 ASSERT_EQ(parts[0].logger_node, "pi1");
1721 ASSERT_EQ(parts[1].logger_node, "pi2");
1722
Austin Schuh79b30942021-01-24 22:32:21 -08001723 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001724 TimestampMapper mapper0("pi1", log_files,
1725 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001726 mapper0.set_timestamp_callback(
1727 [&](TimestampedMessage *) { ++mapper0_count; });
1728 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001729 TimestampMapper mapper1("pi2", log_files,
1730 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001731 mapper1.set_timestamp_callback(
1732 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001733
1734 mapper0.AddPeer(&mapper1);
1735 mapper1.AddPeer(&mapper0);
1736
1737 {
1738 std::deque<TimestampedMessage> output1;
1739
1740 ASSERT_TRUE(mapper1.Front() != nullptr);
1741 output1.emplace_back(std::move(*mapper1.Front()));
1742 mapper1.PopFront();
1743
1744 ASSERT_TRUE(mapper1.Front() != nullptr);
1745 output1.emplace_back(std::move(*mapper1.Front()));
1746
1747 ASSERT_FALSE(mapper1.Front() == nullptr);
1748
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001749 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1750 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001751 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001752 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001753
1754 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1755 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001756 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001757 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh993ccb52020-12-12 15:59:32 -08001758 }
Austin Schuh79b30942021-01-24 22:32:21 -08001759
1760 EXPECT_EQ(mapper0_count, 0u);
1761 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001762}
1763
Austin Schuhd2f96102020-12-01 20:27:29 -08001764// Tests that we properly sort log files with duplicate timestamps.
1765TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1766 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1767 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001768 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001769 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001770 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001771 writer1.QueueSpan(config2_.span());
1772
Austin Schuhd863e6e2022-10-16 15:44:50 -07001773 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001774 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001775 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001776 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1777
Austin Schuhd863e6e2022-10-16 15:44:50 -07001778 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001779 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001780 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001781 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1782
Austin Schuhd863e6e2022-10-16 15:44:50 -07001783 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001784 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001785 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001786 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1787
Austin Schuhd863e6e2022-10-16 15:44:50 -07001788 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001789 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001790 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001791 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1792 }
1793
1794 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001795 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001796
1797 ASSERT_EQ(parts[0].logger_node, "pi1");
1798 ASSERT_EQ(parts[1].logger_node, "pi2");
1799
Austin Schuh79b30942021-01-24 22:32:21 -08001800 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001801 TimestampMapper mapper0("pi1", log_files,
1802 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001803 mapper0.set_timestamp_callback(
1804 [&](TimestampedMessage *) { ++mapper0_count; });
1805 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001806 TimestampMapper mapper1("pi2", log_files,
1807 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001808 mapper1.set_timestamp_callback(
1809 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001810
1811 mapper0.AddPeer(&mapper1);
1812 mapper1.AddPeer(&mapper0);
1813
1814 {
1815 SCOPED_TRACE("Trying node1 now");
1816 std::deque<TimestampedMessage> output1;
1817
1818 for (int i = 0; i < 4; ++i) {
1819 ASSERT_TRUE(mapper1.Front() != nullptr);
1820 output1.emplace_back(std::move(*mapper1.Front()));
1821 mapper1.PopFront();
1822 }
1823 ASSERT_TRUE(mapper1.Front() == nullptr);
1824
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001825 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1826 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001827 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001828 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001829
1830 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1831 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001832 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001833 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001834
1835 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1836 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001837 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001838 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001839
1840 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1841 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001842 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001843 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001844 }
Austin Schuh79b30942021-01-24 22:32:21 -08001845
1846 EXPECT_EQ(mapper0_count, 0u);
1847 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001848}
1849
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001850// Tests that we properly produce a valid start time.
Austin Schuhd2f96102020-12-01 20:27:29 -08001851TEST_F(TimestampMapperTest, StartTime) {
1852 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1853 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001854 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001855 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001856 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001857 writer1.QueueSpan(config1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001858 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001859 writer2.QueueSpan(config3_.span());
1860 }
1861
1862 const std::vector<LogFile> parts =
1863 SortParts({logfile0_, logfile1_, logfile2_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001864 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001865
Austin Schuh79b30942021-01-24 22:32:21 -08001866 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001867 TimestampMapper mapper0("pi1", log_files,
1868 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001869 mapper0.set_timestamp_callback(
1870 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001871
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001872 EXPECT_EQ(mapper0.monotonic_start_time(0), e + chrono::milliseconds(1));
1873 EXPECT_EQ(mapper0.realtime_start_time(0),
Austin Schuhd2f96102020-12-01 20:27:29 -08001874 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001875 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001876}
1877
Austin Schuhfecf1d82020-12-19 16:57:28 -08001878// Tests that when a peer isn't registered, we treat that as if there was no
1879// data available.
1880TEST_F(TimestampMapperTest, NoPeer) {
1881 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1882 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001883 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001884 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001885 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001886 writer1.QueueSpan(config2_.span());
1887
1888 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001889 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001890 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1891
Austin Schuhd863e6e2022-10-16 15:44:50 -07001892 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001893 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001894 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001895 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1896
Austin Schuhd863e6e2022-10-16 15:44:50 -07001897 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001898 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001899 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001900 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1901 }
1902
1903 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001904 LogFilesContainer log_files(parts);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001905
1906 ASSERT_EQ(parts[0].logger_node, "pi1");
1907 ASSERT_EQ(parts[1].logger_node, "pi2");
1908
Austin Schuh79b30942021-01-24 22:32:21 -08001909 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001910 TimestampMapper mapper1("pi2", log_files,
1911 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001912 mapper1.set_timestamp_callback(
1913 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001914
1915 {
1916 std::deque<TimestampedMessage> output1;
1917
1918 ASSERT_TRUE(mapper1.Front() != nullptr);
1919 output1.emplace_back(std::move(*mapper1.Front()));
1920 mapper1.PopFront();
1921 ASSERT_TRUE(mapper1.Front() != nullptr);
1922 output1.emplace_back(std::move(*mapper1.Front()));
1923 mapper1.PopFront();
1924 ASSERT_TRUE(mapper1.Front() != nullptr);
1925 output1.emplace_back(std::move(*mapper1.Front()));
1926 mapper1.PopFront();
1927 ASSERT_TRUE(mapper1.Front() == nullptr);
1928
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001929 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1930 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001931 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001932 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001933
1934 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1935 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001936 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001937 EXPECT_FALSE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001938
1939 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1940 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001941 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001942 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001943 }
Austin Schuh79b30942021-01-24 22:32:21 -08001944 EXPECT_EQ(mapper1_count, 3u);
1945}
1946
1947// Tests that we can queue messages and call the timestamp callback for both
1948// nodes.
1949TEST_F(TimestampMapperTest, QueueUntilNode0) {
1950 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1951 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001952 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh79b30942021-01-24 22:32:21 -08001953 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001954 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh79b30942021-01-24 22:32:21 -08001955 writer1.QueueSpan(config2_.span());
1956
Austin Schuhd863e6e2022-10-16 15:44:50 -07001957 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001958 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001959 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001960 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1961
Austin Schuhd863e6e2022-10-16 15:44:50 -07001962 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001963 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001964 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001965 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1966
Austin Schuhd863e6e2022-10-16 15:44:50 -07001967 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001968 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001969 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001970 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1971
Austin Schuhd863e6e2022-10-16 15:44:50 -07001972 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001973 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001974 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001975 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1976 }
1977
1978 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001979 LogFilesContainer log_files(parts);
Austin Schuh79b30942021-01-24 22:32:21 -08001980
1981 ASSERT_EQ(parts[0].logger_node, "pi1");
1982 ASSERT_EQ(parts[1].logger_node, "pi2");
1983
1984 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001985 TimestampMapper mapper0("pi1", log_files,
1986 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001987 mapper0.set_timestamp_callback(
1988 [&](TimestampedMessage *) { ++mapper0_count; });
1989 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001990 TimestampMapper mapper1("pi2", log_files,
1991 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001992 mapper1.set_timestamp_callback(
1993 [&](TimestampedMessage *) { ++mapper1_count; });
1994
1995 mapper0.AddPeer(&mapper1);
1996 mapper1.AddPeer(&mapper0);
1997
1998 {
1999 std::deque<TimestampedMessage> output0;
2000
2001 EXPECT_EQ(mapper0_count, 0u);
2002 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002003 mapper0.QueueUntil(
2004 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08002005 EXPECT_EQ(mapper0_count, 3u);
2006 EXPECT_EQ(mapper1_count, 0u);
2007
2008 ASSERT_TRUE(mapper0.Front() != nullptr);
2009 EXPECT_EQ(mapper0_count, 3u);
2010 EXPECT_EQ(mapper1_count, 0u);
2011
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002012 mapper0.QueueUntil(
2013 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08002014 EXPECT_EQ(mapper0_count, 3u);
2015 EXPECT_EQ(mapper1_count, 0u);
2016
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002017 mapper0.QueueUntil(
2018 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08002019 EXPECT_EQ(mapper0_count, 4u);
2020 EXPECT_EQ(mapper1_count, 0u);
2021
2022 output0.emplace_back(std::move(*mapper0.Front()));
2023 mapper0.PopFront();
2024 output0.emplace_back(std::move(*mapper0.Front()));
2025 mapper0.PopFront();
2026 output0.emplace_back(std::move(*mapper0.Front()));
2027 mapper0.PopFront();
2028 output0.emplace_back(std::move(*mapper0.Front()));
2029 mapper0.PopFront();
2030
2031 EXPECT_EQ(mapper0_count, 4u);
2032 EXPECT_EQ(mapper1_count, 0u);
2033
2034 ASSERT_TRUE(mapper0.Front() == nullptr);
2035
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002036 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2037 EXPECT_EQ(output0[0].monotonic_event_time.time,
2038 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002039 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002040
2041 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2042 EXPECT_EQ(output0[1].monotonic_event_time.time,
2043 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002044 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002045
2046 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2047 EXPECT_EQ(output0[2].monotonic_event_time.time,
2048 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002049 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002050
2051 EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
2052 EXPECT_EQ(output0[3].monotonic_event_time.time,
2053 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002054 EXPECT_TRUE(output0[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08002055 }
2056
2057 {
2058 SCOPED_TRACE("Trying node1 now");
2059 std::deque<TimestampedMessage> output1;
2060
2061 EXPECT_EQ(mapper0_count, 4u);
2062 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002063 mapper1.QueueUntil(BootTimestamp{
2064 .boot = 0,
2065 .time = e + chrono::seconds(100) + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08002066 EXPECT_EQ(mapper0_count, 4u);
2067 EXPECT_EQ(mapper1_count, 3u);
2068
2069 ASSERT_TRUE(mapper1.Front() != nullptr);
2070 EXPECT_EQ(mapper0_count, 4u);
2071 EXPECT_EQ(mapper1_count, 3u);
2072
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002073 mapper1.QueueUntil(BootTimestamp{
2074 .boot = 0,
2075 .time = e + chrono::seconds(100) + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08002076 EXPECT_EQ(mapper0_count, 4u);
2077 EXPECT_EQ(mapper1_count, 3u);
2078
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002079 mapper1.QueueUntil(BootTimestamp{
2080 .boot = 0,
2081 .time = e + chrono::seconds(100) + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08002082 EXPECT_EQ(mapper0_count, 4u);
2083 EXPECT_EQ(mapper1_count, 4u);
2084
2085 ASSERT_TRUE(mapper1.Front() != nullptr);
2086 EXPECT_EQ(mapper0_count, 4u);
2087 EXPECT_EQ(mapper1_count, 4u);
2088
2089 output1.emplace_back(std::move(*mapper1.Front()));
2090 mapper1.PopFront();
2091 ASSERT_TRUE(mapper1.Front() != nullptr);
2092 output1.emplace_back(std::move(*mapper1.Front()));
2093 mapper1.PopFront();
2094 ASSERT_TRUE(mapper1.Front() != nullptr);
2095 output1.emplace_back(std::move(*mapper1.Front()));
2096 mapper1.PopFront();
2097 ASSERT_TRUE(mapper1.Front() != nullptr);
2098 output1.emplace_back(std::move(*mapper1.Front()));
2099 mapper1.PopFront();
2100
2101 EXPECT_EQ(mapper0_count, 4u);
2102 EXPECT_EQ(mapper1_count, 4u);
2103
2104 ASSERT_TRUE(mapper1.Front() == nullptr);
2105
2106 EXPECT_EQ(mapper0_count, 4u);
2107 EXPECT_EQ(mapper1_count, 4u);
2108
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002109 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2110 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002111 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002112 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002113
2114 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
2115 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002116 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002117 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002118
2119 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
2120 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002121 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002122 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002123
2124 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
2125 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002126 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002127 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08002128 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08002129}
2130
Philipp Schrader416505b2024-03-28 11:59:45 -07002131// Validates that we can read timestamps on startup even for single-node logs.
2132TEST_F(SingleNodeTimestampMapperTest, QueueTimestampsForSingleNodes) {
2133 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2134 {
2135 TestDetachedBufferWriter writer0(logfile0_);
2136 writer0.QueueSpan(config0_.span());
2137
2138 writer0.WriteSizedFlatbuffer(
2139 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
2140 writer0.WriteSizedFlatbuffer(
2141 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
2142 writer0.WriteSizedFlatbuffer(
2143 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
2144 writer0.WriteSizedFlatbuffer(
2145 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
2146 }
2147
2148 const std::vector<LogFile> parts = SortParts({logfile0_});
2149 LogFilesContainer log_files(parts);
2150
2151 ASSERT_EQ(parts[0].logger_node, "pi1");
2152
2153 size_t mapper0_count = 0;
2154 TimestampMapper mapper0("pi1", log_files,
2155 TimestampQueueStrategy::kQueueTimestampsAtStartup);
2156 mapper0.set_timestamp_callback(
2157 [&](TimestampedMessage *) { ++mapper0_count; });
2158 mapper0.QueueTimestamps();
2159
2160 for (int i = 0; i < 4; ++i) {
2161 ASSERT_TRUE(mapper0.Front() != nullptr);
2162 mapper0.PopFront();
2163 }
2164 EXPECT_TRUE(mapper0.Front() == nullptr);
2165 EXPECT_EQ(mapper0_count, 4u);
2166}
2167
2168class BootMergerTest : public SortingElementTest<> {
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002169 public:
2170 BootMergerTest()
2171 : SortingElementTest(),
2172 boot0_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002173 /* 100ms */
2174 "max_out_of_order_duration": 100000000,
2175 "node": {
2176 "name": "pi2"
2177 },
2178 "logger_node": {
2179 "name": "pi1"
2180 },
2181 "monotonic_start_time": 1000000,
2182 "realtime_start_time": 1000000000000,
2183 "logger_monotonic_start_time": 1000000,
2184 "logger_realtime_start_time": 1000000000000,
2185 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2186 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
2187 "parts_index": 0,
2188 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2189 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002190 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2191 "boot_uuids": [
2192 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2193 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2194 ""
2195 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002196})")),
2197 boot1_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002198 /* 100ms */
2199 "max_out_of_order_duration": 100000000,
2200 "node": {
2201 "name": "pi2"
2202 },
2203 "logger_node": {
2204 "name": "pi1"
2205 },
2206 "monotonic_start_time": 1000000,
2207 "realtime_start_time": 1000000000000,
2208 "logger_monotonic_start_time": 1000000,
2209 "logger_realtime_start_time": 1000000000000,
2210 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2211 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
2212 "parts_index": 1,
2213 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2214 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002215 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2216 "boot_uuids": [
2217 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2218 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2219 ""
2220 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002221})")) {}
Austin Schuh8bee6882021-06-28 21:03:28 -07002222
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002223 protected:
2224 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
2225 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
2226};
2227
2228// This tests that we can properly sort a multi-node log file which has the old
2229// (and buggy) timestamps in the header, and the non-resetting parts_index.
2230// These make it so we can just bairly figure out what happened first and what
2231// happened second, but not in a way that is robust to multiple nodes rebooting.
2232TEST_F(BootMergerTest, OldReboot) {
Austin Schuh8bee6882021-06-28 21:03:28 -07002233 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002234 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002235 writer.QueueSpan(boot0_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002236 }
2237 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002238 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002239 writer.QueueSpan(boot1_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002240 }
2241
2242 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
2243
2244 ASSERT_EQ(parts.size(), 1u);
2245 ASSERT_EQ(parts[0].parts.size(), 2u);
2246
2247 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
2248 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002249 boot0_.message().source_node_boot_uuid()->string_view());
Austin Schuh8bee6882021-06-28 21:03:28 -07002250
2251 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
2252 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002253 boot1_.message().source_node_boot_uuid()->string_view());
2254}
2255
2256// This tests that we can produce messages ordered across a reboot.
2257TEST_F(BootMergerTest, SortAcrossReboot) {
2258 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2259 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002260 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002261 writer.QueueSpan(boot0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002262 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002263 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002264 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002265 MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
2266 }
2267 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002268 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002269 writer.QueueSpan(boot1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002270 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002271 MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002272 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002273 MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
2274 }
2275
2276 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07002277 LogFilesContainer log_files(parts);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002278 ASSERT_EQ(parts.size(), 1u);
2279 ASSERT_EQ(parts[0].parts.size(), 2u);
2280
Austin Schuh63097262023-08-16 17:04:29 -07002281 BootMerger merger("pi2", log_files,
2282 {StoredDataType::DATA, StoredDataType::TIMESTAMPS,
2283 StoredDataType::REMOTE_TIMESTAMPS});
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002284
2285 EXPECT_EQ(merger.node(), 1u);
2286
2287 std::vector<Message> output;
2288 for (int i = 0; i < 4; ++i) {
2289 ASSERT_TRUE(merger.Front() != nullptr);
2290 output.emplace_back(std::move(*merger.Front()));
2291 merger.PopFront();
2292 }
2293
2294 ASSERT_TRUE(merger.Front() == nullptr);
2295
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002296 EXPECT_EQ(output[0].timestamp.boot, 0u);
2297 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
2298 EXPECT_EQ(output[1].timestamp.boot, 0u);
2299 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(2000));
2300
2301 EXPECT_EQ(output[2].timestamp.boot, 1u);
2302 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(100));
2303 EXPECT_EQ(output[3].timestamp.boot, 1u);
2304 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
Austin Schuh8bee6882021-06-28 21:03:28 -07002305}
2306
Philipp Schrader416505b2024-03-28 11:59:45 -07002307class RebootTimestampMapperTest : public SortingElementTest<> {
Austin Schuh48507722021-07-17 17:29:24 -07002308 public:
2309 RebootTimestampMapperTest()
2310 : SortingElementTest(),
2311 boot0a_(MakeHeader(config_, R"({
2312 /* 100ms */
2313 "max_out_of_order_duration": 100000000,
2314 "node": {
2315 "name": "pi1"
2316 },
2317 "logger_node": {
2318 "name": "pi1"
2319 },
2320 "monotonic_start_time": 1000000,
2321 "realtime_start_time": 1000000000000,
2322 "logger_monotonic_start_time": 1000000,
2323 "logger_realtime_start_time": 1000000000000,
2324 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2325 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2326 "parts_index": 0,
2327 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2328 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2329 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2330 "boot_uuids": [
2331 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2332 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2333 ""
2334 ]
2335})")),
2336 boot0b_(MakeHeader(config_, R"({
2337 /* 100ms */
2338 "max_out_of_order_duration": 100000000,
2339 "node": {
2340 "name": "pi1"
2341 },
2342 "logger_node": {
2343 "name": "pi1"
2344 },
2345 "monotonic_start_time": 1000000,
2346 "realtime_start_time": 1000000000000,
2347 "logger_monotonic_start_time": 1000000,
2348 "logger_realtime_start_time": 1000000000000,
2349 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2350 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2351 "parts_index": 1,
2352 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2353 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2354 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2355 "boot_uuids": [
2356 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2357 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2358 ""
2359 ]
2360})")),
2361 boot1a_(MakeHeader(config_, R"({
2362 /* 100ms */
2363 "max_out_of_order_duration": 100000000,
2364 "node": {
2365 "name": "pi2"
2366 },
2367 "logger_node": {
2368 "name": "pi1"
2369 },
2370 "monotonic_start_time": 1000000,
2371 "realtime_start_time": 1000000000000,
2372 "logger_monotonic_start_time": 1000000,
2373 "logger_realtime_start_time": 1000000000000,
2374 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2375 "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
2376 "parts_index": 0,
2377 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2378 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2379 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2380 "boot_uuids": [
2381 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2382 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2383 ""
2384 ]
2385})")),
2386 boot1b_(MakeHeader(config_, R"({
2387 /* 100ms */
2388 "max_out_of_order_duration": 100000000,
2389 "node": {
2390 "name": "pi2"
2391 },
2392 "logger_node": {
2393 "name": "pi1"
2394 },
2395 "monotonic_start_time": 1000000,
2396 "realtime_start_time": 1000000000000,
2397 "logger_monotonic_start_time": 1000000,
2398 "logger_realtime_start_time": 1000000000000,
2399 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2400 "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
2401 "parts_index": 1,
2402 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2403 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2404 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2405 "boot_uuids": [
2406 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2407 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2408 ""
2409 ]
2410})")) {}
2411
2412 protected:
2413 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
2414 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
2415 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
2416 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
2417};
2418
Austin Schuh48507722021-07-17 17:29:24 -07002419// Tests that we can match timestamps on delivered messages in the presence of
2420// reboots on the node receiving timestamps.
2421TEST_F(RebootTimestampMapperTest, ReadNode0First) {
2422 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2423 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002424 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002425 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002426 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002427 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002428 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002429 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002430 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002431 writer1b.QueueSpan(boot1b_.span());
2432
Austin Schuhd863e6e2022-10-16 15:44:50 -07002433 writer0a.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002434 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002435 writer1a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002436 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
2437 e + chrono::milliseconds(1001)));
2438
Austin Schuhd863e6e2022-10-16 15:44:50 -07002439 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh58646e22021-08-23 23:51:46 -07002440 e + chrono::milliseconds(1000), 0, chrono::seconds(21),
2441 e + chrono::milliseconds(2001)));
2442
Austin Schuhd863e6e2022-10-16 15:44:50 -07002443 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002444 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002445 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002446 e + chrono::milliseconds(2000), 0, chrono::seconds(20),
2447 e + chrono::milliseconds(2001)));
2448
Austin Schuhd863e6e2022-10-16 15:44:50 -07002449 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002450 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002451 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002452 e + chrono::milliseconds(3000), 0, chrono::seconds(20),
2453 e + chrono::milliseconds(3001)));
2454 }
2455
Austin Schuh58646e22021-08-23 23:51:46 -07002456 const std::vector<LogFile> parts =
2457 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07002458 LogFilesContainer log_files(parts);
Austin Schuh48507722021-07-17 17:29:24 -07002459
2460 for (const auto &x : parts) {
2461 LOG(INFO) << x;
2462 }
2463 ASSERT_EQ(parts.size(), 1u);
2464 ASSERT_EQ(parts[0].logger_node, "pi1");
2465
2466 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07002467 TimestampMapper mapper0("pi1", log_files,
2468 TimestampQueueStrategy::kQueueTogether);
Austin Schuh48507722021-07-17 17:29:24 -07002469 mapper0.set_timestamp_callback(
2470 [&](TimestampedMessage *) { ++mapper0_count; });
2471 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07002472 TimestampMapper mapper1("pi2", log_files,
2473 TimestampQueueStrategy::kQueueTogether);
Austin Schuh48507722021-07-17 17:29:24 -07002474 mapper1.set_timestamp_callback(
2475 [&](TimestampedMessage *) { ++mapper1_count; });
2476
2477 mapper0.AddPeer(&mapper1);
2478 mapper1.AddPeer(&mapper0);
2479
2480 {
2481 std::deque<TimestampedMessage> output0;
2482
2483 EXPECT_EQ(mapper0_count, 0u);
2484 EXPECT_EQ(mapper1_count, 0u);
2485 ASSERT_TRUE(mapper0.Front() != nullptr);
2486 EXPECT_EQ(mapper0_count, 1u);
2487 EXPECT_EQ(mapper1_count, 0u);
2488 output0.emplace_back(std::move(*mapper0.Front()));
2489 mapper0.PopFront();
2490 EXPECT_TRUE(mapper0.started());
2491 EXPECT_EQ(mapper0_count, 1u);
2492 EXPECT_EQ(mapper1_count, 0u);
2493
2494 ASSERT_TRUE(mapper0.Front() != nullptr);
2495 EXPECT_EQ(mapper0_count, 2u);
2496 EXPECT_EQ(mapper1_count, 0u);
2497 output0.emplace_back(std::move(*mapper0.Front()));
2498 mapper0.PopFront();
2499 EXPECT_TRUE(mapper0.started());
2500
2501 ASSERT_TRUE(mapper0.Front() != nullptr);
2502 output0.emplace_back(std::move(*mapper0.Front()));
2503 mapper0.PopFront();
2504 EXPECT_TRUE(mapper0.started());
2505
2506 EXPECT_EQ(mapper0_count, 3u);
2507 EXPECT_EQ(mapper1_count, 0u);
2508
2509 ASSERT_TRUE(mapper0.Front() == nullptr);
2510
2511 LOG(INFO) << output0[0];
2512 LOG(INFO) << output0[1];
2513 LOG(INFO) << output0[2];
2514
2515 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2516 EXPECT_EQ(output0[0].monotonic_event_time.time,
2517 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002518 EXPECT_EQ(output0[0].queue_index,
2519 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002520 EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
2521 EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002522 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002523
2524 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2525 EXPECT_EQ(output0[1].monotonic_event_time.time,
2526 e + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002527 EXPECT_EQ(output0[1].queue_index,
2528 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002529 EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
2530 EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002531 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002532
2533 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2534 EXPECT_EQ(output0[2].monotonic_event_time.time,
2535 e + chrono::milliseconds(3000));
Austin Schuh58646e22021-08-23 23:51:46 -07002536 EXPECT_EQ(output0[2].queue_index,
2537 (BootQueueIndex{.boot = 0u, .index = 2u}));
Austin Schuh48507722021-07-17 17:29:24 -07002538 EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
2539 EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002540 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002541 }
2542
2543 {
2544 SCOPED_TRACE("Trying node1 now");
2545 std::deque<TimestampedMessage> output1;
2546
2547 EXPECT_EQ(mapper0_count, 3u);
2548 EXPECT_EQ(mapper1_count, 0u);
2549
2550 ASSERT_TRUE(mapper1.Front() != nullptr);
2551 EXPECT_EQ(mapper0_count, 3u);
2552 EXPECT_EQ(mapper1_count, 1u);
2553 output1.emplace_back(std::move(*mapper1.Front()));
2554 mapper1.PopFront();
2555 EXPECT_TRUE(mapper1.started());
2556 EXPECT_EQ(mapper0_count, 3u);
2557 EXPECT_EQ(mapper1_count, 1u);
2558
2559 ASSERT_TRUE(mapper1.Front() != nullptr);
2560 EXPECT_EQ(mapper0_count, 3u);
2561 EXPECT_EQ(mapper1_count, 2u);
2562 output1.emplace_back(std::move(*mapper1.Front()));
2563 mapper1.PopFront();
2564 EXPECT_TRUE(mapper1.started());
2565
2566 ASSERT_TRUE(mapper1.Front() != nullptr);
2567 output1.emplace_back(std::move(*mapper1.Front()));
2568 mapper1.PopFront();
2569 EXPECT_TRUE(mapper1.started());
2570
Austin Schuh58646e22021-08-23 23:51:46 -07002571 ASSERT_TRUE(mapper1.Front() != nullptr);
2572 output1.emplace_back(std::move(*mapper1.Front()));
2573 mapper1.PopFront();
2574 EXPECT_TRUE(mapper1.started());
2575
Austin Schuh48507722021-07-17 17:29:24 -07002576 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002577 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002578
2579 ASSERT_TRUE(mapper1.Front() == nullptr);
2580
2581 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002582 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002583
2584 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2585 EXPECT_EQ(output1[0].monotonic_event_time.time,
2586 e + chrono::seconds(100) + chrono::milliseconds(1000));
2587 EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
2588 EXPECT_EQ(output1[0].monotonic_remote_time.time,
2589 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002590 EXPECT_EQ(output1[0].remote_queue_index,
2591 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002592 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
2593 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
2594 e + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002595 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002596
2597 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2598 EXPECT_EQ(output1[1].monotonic_event_time.time,
2599 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002600 EXPECT_EQ(output1[1].remote_queue_index,
2601 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002602 EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
2603 EXPECT_EQ(output1[1].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002604 e + chrono::milliseconds(1000));
Austin Schuh48507722021-07-17 17:29:24 -07002605 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
2606 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
2607 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002608 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002609
2610 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2611 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002612 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh48507722021-07-17 17:29:24 -07002613 EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
2614 EXPECT_EQ(output1[2].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002615 e + chrono::milliseconds(2000));
2616 EXPECT_EQ(output1[2].remote_queue_index,
2617 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002618 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
2619 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002620 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002621 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002622
Austin Schuh58646e22021-08-23 23:51:46 -07002623 EXPECT_EQ(output1[3].monotonic_event_time.boot, 1u);
2624 EXPECT_EQ(output1[3].monotonic_event_time.time,
2625 e + chrono::seconds(20) + chrono::milliseconds(3000));
2626 EXPECT_EQ(output1[3].monotonic_remote_time.boot, 0u);
2627 EXPECT_EQ(output1[3].monotonic_remote_time.time,
2628 e + chrono::milliseconds(3000));
2629 EXPECT_EQ(output1[3].remote_queue_index,
2630 (BootQueueIndex{.boot = 0u, .index = 2u}));
2631 EXPECT_EQ(output1[3].monotonic_timestamp_time.boot, 0u);
2632 EXPECT_EQ(output1[3].monotonic_timestamp_time.time,
2633 e + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002634 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh58646e22021-08-23 23:51:46 -07002635
Austin Schuh48507722021-07-17 17:29:24 -07002636 LOG(INFO) << output1[0];
2637 LOG(INFO) << output1[1];
2638 LOG(INFO) << output1[2];
Austin Schuh58646e22021-08-23 23:51:46 -07002639 LOG(INFO) << output1[3];
Austin Schuh48507722021-07-17 17:29:24 -07002640 }
2641}
2642
2643TEST_F(RebootTimestampMapperTest, Node2Reboot) {
2644 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2645 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002646 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002647 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002648 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002649 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002650 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002651 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002652 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002653 writer1b.QueueSpan(boot1b_.span());
2654
Austin Schuhd863e6e2022-10-16 15:44:50 -07002655 writer1a.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002656 e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002657 writer0a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002658 e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
2659 chrono::seconds(-100),
2660 e + chrono::seconds(100) + chrono::milliseconds(1001)));
2661
Austin Schuhd863e6e2022-10-16 15:44:50 -07002662 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002663 e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002664 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002665 e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
2666 chrono::seconds(-20),
2667 e + chrono::seconds(20) + chrono::milliseconds(2001)));
2668
Austin Schuhd863e6e2022-10-16 15:44:50 -07002669 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002670 e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002671 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002672 e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
2673 chrono::seconds(-20),
2674 e + chrono::seconds(20) + chrono::milliseconds(3001)));
2675 }
2676
2677 const std::vector<LogFile> parts =
2678 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07002679 LogFilesContainer log_files(parts);
Austin Schuh48507722021-07-17 17:29:24 -07002680
2681 for (const auto &x : parts) {
2682 LOG(INFO) << x;
2683 }
2684 ASSERT_EQ(parts.size(), 1u);
2685 ASSERT_EQ(parts[0].logger_node, "pi1");
2686
2687 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07002688 TimestampMapper mapper0("pi1", log_files,
2689 TimestampQueueStrategy::kQueueTogether);
Austin Schuh48507722021-07-17 17:29:24 -07002690 mapper0.set_timestamp_callback(
2691 [&](TimestampedMessage *) { ++mapper0_count; });
2692 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07002693 TimestampMapper mapper1("pi2", log_files,
2694 TimestampQueueStrategy::kQueueTogether);
Austin Schuh48507722021-07-17 17:29:24 -07002695 mapper1.set_timestamp_callback(
2696 [&](TimestampedMessage *) { ++mapper1_count; });
2697
2698 mapper0.AddPeer(&mapper1);
2699 mapper1.AddPeer(&mapper0);
2700
2701 {
2702 std::deque<TimestampedMessage> output0;
2703
2704 EXPECT_EQ(mapper0_count, 0u);
2705 EXPECT_EQ(mapper1_count, 0u);
2706 ASSERT_TRUE(mapper0.Front() != nullptr);
2707 EXPECT_EQ(mapper0_count, 1u);
2708 EXPECT_EQ(mapper1_count, 0u);
2709 output0.emplace_back(std::move(*mapper0.Front()));
2710 mapper0.PopFront();
2711 EXPECT_TRUE(mapper0.started());
2712 EXPECT_EQ(mapper0_count, 1u);
2713 EXPECT_EQ(mapper1_count, 0u);
2714
2715 ASSERT_TRUE(mapper0.Front() != nullptr);
2716 EXPECT_EQ(mapper0_count, 2u);
2717 EXPECT_EQ(mapper1_count, 0u);
2718 output0.emplace_back(std::move(*mapper0.Front()));
2719 mapper0.PopFront();
2720 EXPECT_TRUE(mapper0.started());
2721
2722 ASSERT_TRUE(mapper0.Front() != nullptr);
2723 output0.emplace_back(std::move(*mapper0.Front()));
2724 mapper0.PopFront();
2725 EXPECT_TRUE(mapper0.started());
2726
2727 EXPECT_EQ(mapper0_count, 3u);
2728 EXPECT_EQ(mapper1_count, 0u);
2729
2730 ASSERT_TRUE(mapper0.Front() == nullptr);
2731
2732 LOG(INFO) << output0[0];
2733 LOG(INFO) << output0[1];
2734 LOG(INFO) << output0[2];
2735
2736 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2737 EXPECT_EQ(output0[0].monotonic_event_time.time,
2738 e + chrono::milliseconds(1000));
2739 EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
2740 EXPECT_EQ(output0[0].monotonic_remote_time.time,
2741 e + chrono::seconds(100) + chrono::milliseconds(1000));
2742 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
2743 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
2744 e + chrono::seconds(100) + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002745 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002746
2747 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2748 EXPECT_EQ(output0[1].monotonic_event_time.time,
2749 e + chrono::milliseconds(2000));
2750 EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
2751 EXPECT_EQ(output0[1].monotonic_remote_time.time,
2752 e + chrono::seconds(20) + chrono::milliseconds(2000));
2753 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
2754 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
2755 e + chrono::seconds(20) + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002756 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002757
2758 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2759 EXPECT_EQ(output0[2].monotonic_event_time.time,
2760 e + chrono::milliseconds(3000));
2761 EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
2762 EXPECT_EQ(output0[2].monotonic_remote_time.time,
2763 e + chrono::seconds(20) + chrono::milliseconds(3000));
2764 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
2765 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
2766 e + chrono::seconds(20) + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002767 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002768 }
2769
2770 {
2771 SCOPED_TRACE("Trying node1 now");
2772 std::deque<TimestampedMessage> output1;
2773
2774 EXPECT_EQ(mapper0_count, 3u);
2775 EXPECT_EQ(mapper1_count, 0u);
2776
2777 ASSERT_TRUE(mapper1.Front() != nullptr);
2778 EXPECT_EQ(mapper0_count, 3u);
2779 EXPECT_EQ(mapper1_count, 1u);
2780 output1.emplace_back(std::move(*mapper1.Front()));
2781 mapper1.PopFront();
2782 EXPECT_TRUE(mapper1.started());
2783 EXPECT_EQ(mapper0_count, 3u);
2784 EXPECT_EQ(mapper1_count, 1u);
2785
2786 ASSERT_TRUE(mapper1.Front() != nullptr);
2787 EXPECT_EQ(mapper0_count, 3u);
2788 EXPECT_EQ(mapper1_count, 2u);
2789 output1.emplace_back(std::move(*mapper1.Front()));
2790 mapper1.PopFront();
2791 EXPECT_TRUE(mapper1.started());
2792
2793 ASSERT_TRUE(mapper1.Front() != nullptr);
2794 output1.emplace_back(std::move(*mapper1.Front()));
2795 mapper1.PopFront();
2796 EXPECT_TRUE(mapper1.started());
2797
2798 EXPECT_EQ(mapper0_count, 3u);
2799 EXPECT_EQ(mapper1_count, 3u);
2800
2801 ASSERT_TRUE(mapper1.Front() == nullptr);
2802
2803 EXPECT_EQ(mapper0_count, 3u);
2804 EXPECT_EQ(mapper1_count, 3u);
2805
2806 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2807 EXPECT_EQ(output1[0].monotonic_event_time.time,
2808 e + chrono::seconds(100) + chrono::milliseconds(1000));
2809 EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
2810 EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002811 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002812
2813 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2814 EXPECT_EQ(output1[1].monotonic_event_time.time,
2815 e + chrono::seconds(20) + chrono::milliseconds(2000));
2816 EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
2817 EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002818 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002819
2820 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2821 EXPECT_EQ(output1[2].monotonic_event_time.time,
2822 e + chrono::seconds(20) + chrono::milliseconds(3000));
2823 EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
2824 EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002825 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002826
2827 LOG(INFO) << output1[0];
2828 LOG(INFO) << output1[1];
2829 LOG(INFO) << output1[2];
2830 }
2831}
2832
Philipp Schrader416505b2024-03-28 11:59:45 -07002833class SortingDeathTest : public SortingElementTest<> {
Austin Schuh44c61472021-11-22 21:04:10 -08002834 public:
2835 SortingDeathTest()
2836 : SortingElementTest(),
2837 part0_(MakeHeader(config_, R"({
2838 /* 100ms */
2839 "max_out_of_order_duration": 100000000,
2840 "node": {
2841 "name": "pi1"
2842 },
2843 "logger_node": {
2844 "name": "pi1"
2845 },
2846 "monotonic_start_time": 1000000,
2847 "realtime_start_time": 1000000000000,
2848 "logger_monotonic_start_time": 1000000,
2849 "logger_realtime_start_time": 1000000000000,
2850 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2851 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2852 "parts_index": 0,
2853 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2854 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2855 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2856 "boot_uuids": [
2857 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2858 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2859 ""
2860 ],
2861 "oldest_remote_monotonic_timestamps": [
2862 9223372036854775807,
2863 9223372036854775807,
2864 9223372036854775807
2865 ],
2866 "oldest_local_monotonic_timestamps": [
2867 9223372036854775807,
2868 9223372036854775807,
2869 9223372036854775807
2870 ],
2871 "oldest_remote_unreliable_monotonic_timestamps": [
2872 9223372036854775807,
2873 0,
2874 9223372036854775807
2875 ],
2876 "oldest_local_unreliable_monotonic_timestamps": [
2877 9223372036854775807,
2878 0,
2879 9223372036854775807
2880 ]
2881})")),
2882 part1_(MakeHeader(config_, R"({
2883 /* 100ms */
2884 "max_out_of_order_duration": 100000000,
2885 "node": {
2886 "name": "pi1"
2887 },
2888 "logger_node": {
2889 "name": "pi1"
2890 },
2891 "monotonic_start_time": 1000000,
2892 "realtime_start_time": 1000000000000,
2893 "logger_monotonic_start_time": 1000000,
2894 "logger_realtime_start_time": 1000000000000,
2895 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2896 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2897 "parts_index": 1,
2898 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2899 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2900 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2901 "boot_uuids": [
2902 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2903 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2904 ""
2905 ],
2906 "oldest_remote_monotonic_timestamps": [
2907 9223372036854775807,
2908 9223372036854775807,
2909 9223372036854775807
2910 ],
2911 "oldest_local_monotonic_timestamps": [
2912 9223372036854775807,
2913 9223372036854775807,
2914 9223372036854775807
2915 ],
2916 "oldest_remote_unreliable_monotonic_timestamps": [
2917 9223372036854775807,
2918 100000,
2919 9223372036854775807
2920 ],
2921 "oldest_local_unreliable_monotonic_timestamps": [
2922 9223372036854775807,
2923 100000,
2924 9223372036854775807
2925 ]
2926})")),
2927 part2_(MakeHeader(config_, R"({
2928 /* 100ms */
2929 "max_out_of_order_duration": 100000000,
2930 "node": {
2931 "name": "pi1"
2932 },
2933 "logger_node": {
2934 "name": "pi1"
2935 },
2936 "monotonic_start_time": 1000000,
2937 "realtime_start_time": 1000000000000,
2938 "logger_monotonic_start_time": 1000000,
2939 "logger_realtime_start_time": 1000000000000,
2940 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2941 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2942 "parts_index": 2,
2943 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2944 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2945 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2946 "boot_uuids": [
2947 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2948 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2949 ""
2950 ],
2951 "oldest_remote_monotonic_timestamps": [
2952 9223372036854775807,
2953 9223372036854775807,
2954 9223372036854775807
2955 ],
2956 "oldest_local_monotonic_timestamps": [
2957 9223372036854775807,
2958 9223372036854775807,
2959 9223372036854775807
2960 ],
2961 "oldest_remote_unreliable_monotonic_timestamps": [
2962 9223372036854775807,
2963 200000,
2964 9223372036854775807
2965 ],
2966 "oldest_local_unreliable_monotonic_timestamps": [
2967 9223372036854775807,
2968 200000,
2969 9223372036854775807
2970 ]
2971})")),
2972 part3_(MakeHeader(config_, R"({
2973 /* 100ms */
2974 "max_out_of_order_duration": 100000000,
2975 "node": {
2976 "name": "pi1"
2977 },
2978 "logger_node": {
2979 "name": "pi1"
2980 },
2981 "monotonic_start_time": 1000000,
2982 "realtime_start_time": 1000000000000,
2983 "logger_monotonic_start_time": 1000000,
2984 "logger_realtime_start_time": 1000000000000,
2985 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2986 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2987 "parts_index": 3,
2988 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2989 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2990 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2991 "boot_uuids": [
2992 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2993 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2994 ""
2995 ],
2996 "oldest_remote_monotonic_timestamps": [
2997 9223372036854775807,
2998 9223372036854775807,
2999 9223372036854775807
3000 ],
3001 "oldest_local_monotonic_timestamps": [
3002 9223372036854775807,
3003 9223372036854775807,
3004 9223372036854775807
3005 ],
3006 "oldest_remote_unreliable_monotonic_timestamps": [
3007 9223372036854775807,
3008 300000,
3009 9223372036854775807
3010 ],
3011 "oldest_local_unreliable_monotonic_timestamps": [
3012 9223372036854775807,
3013 300000,
3014 9223372036854775807
3015 ]
3016})")) {}
3017
3018 protected:
3019 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part0_;
3020 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part1_;
3021 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part2_;
3022 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part3_;
3023};
3024
3025// Tests that if 2 computers go back and forth trying to be the same node, we
3026// die in sorting instead of failing to estimate time.
3027TEST_F(SortingDeathTest, FightingNodes) {
3028 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07003029 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh44c61472021-11-22 21:04:10 -08003030 writer0.QueueSpan(part0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07003031 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh44c61472021-11-22 21:04:10 -08003032 writer1.QueueSpan(part1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07003033 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuh44c61472021-11-22 21:04:10 -08003034 writer2.QueueSpan(part2_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07003035 TestDetachedBufferWriter writer3(logfile3_);
Austin Schuh44c61472021-11-22 21:04:10 -08003036 writer3.QueueSpan(part3_.span());
3037 }
3038
3039 EXPECT_DEATH(
3040 {
3041 const std::vector<LogFile> parts =
3042 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
3043 },
Austin Schuh22cf7862022-09-19 19:09:42 -07003044 "found overlapping boots on");
Austin Schuh44c61472021-11-22 21:04:10 -08003045}
3046
Brian Smarttea913d42021-12-10 15:02:38 -08003047// Tests that we MessageReader blows up on a bad message.
3048TEST(MessageReaderConfirmCrash, ReadWrite) {
3049 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
3050 unlink(logfile.c_str());
3051
3052 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
3053 JsonToSizedFlatbuffer<LogFileHeader>(
3054 R"({ "max_out_of_order_duration": 100000000 })");
3055 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
3056 JsonToSizedFlatbuffer<MessageHeader>(
3057 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
3058 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
3059 JsonToSizedFlatbuffer<MessageHeader>(
3060 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
3061 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m4 =
3062 JsonToSizedFlatbuffer<MessageHeader>(
3063 R"({ "channel_index": 0, "monotonic_sent_time": 4 })");
3064
3065 // Starts out like a proper flat buffer header, but it breaks down ...
3066 std::vector<uint8_t> garbage{8, 0, 0, 0, 16, 0, 0, 0, 4, 0, 0, 0};
3067 absl::Span<uint8_t> m3_span(garbage);
3068
3069 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07003070 TestDetachedBufferWriter writer(logfile);
Brian Smarttea913d42021-12-10 15:02:38 -08003071 writer.QueueSpan(config.span());
3072 writer.QueueSpan(m1.span());
3073 writer.QueueSpan(m2.span());
3074 writer.QueueSpan(m3_span);
3075 writer.QueueSpan(m4.span()); // This message is "hidden"
3076 }
3077
3078 {
3079 MessageReader reader(logfile);
3080
3081 EXPECT_EQ(reader.filename(), logfile);
3082
3083 EXPECT_EQ(
3084 reader.max_out_of_order_duration(),
3085 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
3086 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
3087 EXPECT_TRUE(reader.ReadMessage());
3088 EXPECT_EQ(reader.newest_timestamp(),
3089 monotonic_clock::time_point(chrono::nanoseconds(1)));
3090 EXPECT_TRUE(reader.ReadMessage());
3091 EXPECT_EQ(reader.newest_timestamp(),
3092 monotonic_clock::time_point(chrono::nanoseconds(2)));
3093 // Confirm default crashing behavior
3094 EXPECT_DEATH(reader.ReadMessage(), "Corrupted message at offset");
3095 }
3096
3097 {
Austin Schuh99f7c6a2024-06-25 22:07:44 -07003098 absl::FlagSaver fs;
Brian Smarttea913d42021-12-10 15:02:38 -08003099
3100 MessageReader reader(logfile);
3101 reader.set_crash_on_corrupt_message_flag(false);
3102
3103 EXPECT_EQ(reader.filename(), logfile);
3104
3105 EXPECT_EQ(
3106 reader.max_out_of_order_duration(),
3107 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
3108 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
3109 EXPECT_TRUE(reader.ReadMessage());
3110 EXPECT_EQ(reader.newest_timestamp(),
3111 monotonic_clock::time_point(chrono::nanoseconds(1)));
3112 EXPECT_TRUE(reader.ReadMessage());
3113 EXPECT_EQ(reader.newest_timestamp(),
3114 monotonic_clock::time_point(chrono::nanoseconds(2)));
3115 // Confirm avoiding the corrupted message crash, stopping instead.
3116 EXPECT_FALSE(reader.ReadMessage());
3117 }
3118
3119 {
Austin Schuh99f7c6a2024-06-25 22:07:44 -07003120 absl::FlagSaver fs;
Brian Smarttea913d42021-12-10 15:02:38 -08003121
3122 MessageReader reader(logfile);
3123 reader.set_crash_on_corrupt_message_flag(false);
3124 reader.set_ignore_corrupt_messages_flag(true);
3125
3126 EXPECT_EQ(reader.filename(), logfile);
3127
3128 EXPECT_EQ(
3129 reader.max_out_of_order_duration(),
3130 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
3131 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
3132 EXPECT_TRUE(reader.ReadMessage());
3133 EXPECT_EQ(reader.newest_timestamp(),
3134 monotonic_clock::time_point(chrono::nanoseconds(1)));
3135 EXPECT_TRUE(reader.ReadMessage());
3136 EXPECT_EQ(reader.newest_timestamp(),
3137 monotonic_clock::time_point(chrono::nanoseconds(2)));
3138 // Confirm skipping of the corrupted message to read the hidden one.
3139 EXPECT_TRUE(reader.ReadMessage());
3140 EXPECT_EQ(reader.newest_timestamp(),
3141 monotonic_clock::time_point(chrono::nanoseconds(4)));
3142 EXPECT_FALSE(reader.ReadMessage());
3143 }
3144}
3145
Austin Schuhfa30c352022-10-16 11:12:02 -07003146class InlinePackMessage : public ::testing::Test {
3147 protected:
3148 aos::Context RandomContext() {
3149 data_ = RandomData();
3150 std::uniform_int_distribution<uint32_t> uint32_distribution(
3151 std::numeric_limits<uint32_t>::min(),
3152 std::numeric_limits<uint32_t>::max());
3153
3154 std::uniform_int_distribution<int64_t> time_distribution(
3155 std::numeric_limits<int64_t>::min(),
3156 std::numeric_limits<int64_t>::max());
3157
3158 aos::Context context;
3159 context.monotonic_event_time =
3160 aos::monotonic_clock::epoch() +
3161 chrono::nanoseconds(time_distribution(random_number_generator_));
3162 context.realtime_event_time =
3163 aos::realtime_clock::epoch() +
3164 chrono::nanoseconds(time_distribution(random_number_generator_));
3165
3166 context.monotonic_remote_time =
3167 aos::monotonic_clock::epoch() +
3168 chrono::nanoseconds(time_distribution(random_number_generator_));
3169 context.realtime_remote_time =
3170 aos::realtime_clock::epoch() +
3171 chrono::nanoseconds(time_distribution(random_number_generator_));
3172
Austin Schuhb5224ec2024-03-27 15:20:09 -07003173 context.monotonic_remote_transmit_time =
3174 aos::monotonic_clock::epoch() +
3175 chrono::nanoseconds(time_distribution(random_number_generator_));
3176
Austin Schuhfa30c352022-10-16 11:12:02 -07003177 context.queue_index = uint32_distribution(random_number_generator_);
3178 context.remote_queue_index = uint32_distribution(random_number_generator_);
3179 context.size = data_.size();
3180 context.data = data_.data();
3181 return context;
3182 }
3183
Austin Schuhf2d0e682022-10-16 14:20:58 -07003184 aos::monotonic_clock::time_point RandomMonotonic() {
3185 std::uniform_int_distribution<int64_t> time_distribution(
3186 0, std::numeric_limits<int64_t>::max());
3187 return aos::monotonic_clock::epoch() +
3188 chrono::nanoseconds(time_distribution(random_number_generator_));
3189 }
3190
3191 aos::SizePrefixedFlatbufferDetachedBuffer<message_bridge::RemoteMessage>
3192 RandomRemoteMessage() {
3193 std::uniform_int_distribution<uint8_t> uint8_distribution(
3194 std::numeric_limits<uint8_t>::min(),
3195 std::numeric_limits<uint8_t>::max());
3196
3197 std::uniform_int_distribution<int64_t> time_distribution(
3198 std::numeric_limits<int64_t>::min(),
3199 std::numeric_limits<int64_t>::max());
3200
3201 flatbuffers::FlatBufferBuilder fbb;
3202 message_bridge::RemoteMessage::Builder builder(fbb);
3203 builder.add_queue_index(uint8_distribution(random_number_generator_));
3204
3205 builder.add_monotonic_sent_time(
3206 time_distribution(random_number_generator_));
3207 builder.add_realtime_sent_time(time_distribution(random_number_generator_));
3208 builder.add_monotonic_remote_time(
3209 time_distribution(random_number_generator_));
3210 builder.add_realtime_remote_time(
3211 time_distribution(random_number_generator_));
3212
3213 builder.add_remote_queue_index(
3214 uint8_distribution(random_number_generator_));
3215
Austin Schuhb5224ec2024-03-27 15:20:09 -07003216 builder.add_monotonic_remote_transmit_time(
3217 time_distribution(random_number_generator_));
3218
Austin Schuhf2d0e682022-10-16 14:20:58 -07003219 fbb.FinishSizePrefixed(builder.Finish());
3220 return fbb.Release();
3221 }
3222
Austin Schuhfa30c352022-10-16 11:12:02 -07003223 std::vector<uint8_t> RandomData() {
3224 std::vector<uint8_t> result;
3225 std::uniform_int_distribution<int> length_distribution(1, 32);
3226 std::uniform_int_distribution<uint8_t> data_distribution(
3227 std::numeric_limits<uint8_t>::min(),
3228 std::numeric_limits<uint8_t>::max());
3229
3230 const size_t length = length_distribution(random_number_generator_);
3231
3232 result.reserve(length);
3233 for (size_t i = 0; i < length; ++i) {
3234 result.emplace_back(data_distribution(random_number_generator_));
3235 }
3236 return result;
3237 }
3238
3239 std::mt19937 random_number_generator_{
3240 std::mt19937(::aos::testing::RandomSeed())};
3241
3242 std::vector<uint8_t> data_;
Austin Schuh99f7c6a2024-06-25 22:07:44 -07003243
3244 const aos::FlatbufferDetachedBuffer<Configuration> config_{
3245 JsonToFlatbuffer<Configuration>(
3246 R"({
3247 "channels": [
3248 {
3249 "name": "/a",
3250 "type": "aos.logger.testing.TestMessage"
3251 },
3252 {
3253 "name": "/b",
3254 "type": "aos.logger.testing.TestMessage"
3255 },
3256 {
3257 "name": "/c",
3258 "type": "aos.logger.testing.TestMessage"
3259 },
3260 {
3261 "name": "/d",
3262 "type": "aos.logger.testing.TestMessage"
3263 }
3264 ]
3265}
3266)")};
Austin Schuhfa30c352022-10-16 11:12:02 -07003267};
3268
3269// Uses the binary schema to annotate a provided flatbuffer. Returns the
3270// annotated flatbuffer.
3271std::string AnnotateBinaries(
3272 const aos::NonSizePrefixedFlatbuffer<reflection::Schema> &schema,
3273 const std::string &schema_filename,
3274 flatbuffers::span<uint8_t> binary_data) {
3275 flatbuffers::BinaryAnnotator binary_annotator(
3276 schema.span().data(), schema.span().size(), binary_data.data(),
3277 binary_data.size());
3278
3279 auto annotations = binary_annotator.Annotate();
3280
3281 flatbuffers::AnnotatedBinaryTextGenerator text_generator(
3282 flatbuffers::AnnotatedBinaryTextGenerator::Options{}, annotations,
3283 binary_data.data(), binary_data.size());
3284
3285 text_generator.Generate(aos::testing::TestTmpDir() + "/foo.bfbs",
3286 schema_filename);
3287
3288 return aos::util::ReadFileToStringOrDie(aos::testing::TestTmpDir() +
3289 "/foo.afb");
3290}
3291
Austin Schuh71a40d42023-02-04 21:22:22 -08003292// Event loop which just has working time functions for the Copier classes
3293// tested below.
3294class TimeEventLoop : public EventLoop {
3295 public:
Austin Schuh99f7c6a2024-06-25 22:07:44 -07003296 TimeEventLoop(const aos::Configuration *config) : EventLoop(config) {}
Austin Schuh71a40d42023-02-04 21:22:22 -08003297
3298 aos::monotonic_clock::time_point monotonic_now() const final {
3299 return aos::monotonic_clock::min_time;
3300 }
3301 realtime_clock::time_point realtime_now() const final {
3302 return aos::realtime_clock::min_time;
3303 }
3304
3305 void OnRun(::std::function<void()> /*on_run*/) final { LOG(FATAL); }
3306
3307 const std::string_view name() const final { return "time"; }
3308 const Node *node() const final { return nullptr; }
3309
3310 void SetRuntimeAffinity(const cpu_set_t & /*cpuset*/) final { LOG(FATAL); }
3311 void SetRuntimeRealtimePriority(int /*priority*/) final { LOG(FATAL); }
3312
3313 const cpu_set_t &runtime_affinity() const final {
3314 LOG(FATAL);
3315 return cpuset_;
3316 }
3317
3318 TimerHandler *AddTimer(::std::function<void()> /*callback*/) final {
3319 LOG(FATAL);
3320 return nullptr;
3321 }
3322
3323 std::unique_ptr<RawSender> MakeRawSender(const Channel * /*channel*/) final {
3324 LOG(FATAL);
3325 return std::unique_ptr<RawSender>();
3326 }
3327
3328 const UUID &boot_uuid() const final {
3329 LOG(FATAL);
3330 return boot_uuid_;
3331 }
3332
3333 void set_name(const std::string_view name) final { LOG(FATAL) << name; }
3334
3335 pid_t GetTid() final {
3336 LOG(FATAL);
3337 return 0;
3338 }
3339
3340 int NumberBuffers(const Channel * /*channel*/) final {
3341 LOG(FATAL);
3342 return 0;
3343 }
3344
3345 int runtime_realtime_priority() const final {
3346 LOG(FATAL);
3347 return 0;
3348 }
3349
3350 std::unique_ptr<RawFetcher> MakeRawFetcher(
3351 const Channel * /*channel*/) final {
3352 LOG(FATAL);
3353 return std::unique_ptr<RawFetcher>();
3354 }
3355
3356 PhasedLoopHandler *AddPhasedLoop(
3357 ::std::function<void(int)> /*callback*/,
3358 const monotonic_clock::duration /*interval*/,
3359 const monotonic_clock::duration /*offset*/) final {
3360 LOG(FATAL);
3361 return nullptr;
3362 }
3363
3364 void MakeRawWatcher(
3365 const Channel * /*channel*/,
3366 std::function<void(const Context &context, const void *message)>
3367 /*watcher*/) final {
3368 LOG(FATAL);
3369 }
3370
3371 private:
3372 const cpu_set_t cpuset_ = DefaultAffinity();
3373 UUID boot_uuid_ = UUID ::Zero();
3374};
3375
Austin Schuhfa30c352022-10-16 11:12:02 -07003376// Tests that all variations of PackMessage are equivalent to the inline
3377// PackMessage used to avoid allocations.
3378TEST_F(InlinePackMessage, Equivilent) {
3379 std::uniform_int_distribution<uint32_t> uint32_distribution(
3380 std::numeric_limits<uint32_t>::min(),
3381 std::numeric_limits<uint32_t>::max());
3382 aos::FlatbufferVector<reflection::Schema> schema =
3383 FileToFlatbuffer<reflection::Schema>(
3384 ArtifactPath("aos/events/logging/logger.bfbs"));
3385
3386 for (const LogType type :
3387 {LogType::kLogMessage, LogType::kLogDeliveryTimeOnly,
Austin Schuh9d50f0d2024-03-27 14:40:43 -07003388 LogType::kLogRemoteMessage}) {
Austin Schuhfa30c352022-10-16 11:12:02 -07003389 for (int i = 0; i < 100; ++i) {
3390 aos::Context context = RandomContext();
3391 const uint32_t channel_index =
3392 uint32_distribution(random_number_generator_);
3393
3394 flatbuffers::FlatBufferBuilder fbb;
3395 fbb.ForceDefaults(true);
3396 fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index, type));
3397
3398 VLOG(1) << absl::BytesToHexString(std::string_view(
3399 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3400 fbb.GetBufferSpan().size()));
3401
3402 // Make sure that both the builder and inline method agree on sizes.
Austin Schuh48d10d62022-10-16 22:19:23 -07003403 ASSERT_EQ(fbb.GetSize(), PackMessageSize(type, context.size))
Austin Schuhfa30c352022-10-16 11:12:02 -07003404 << "log type " << static_cast<int>(type);
3405
3406 // Initialize the buffer to something nonzero to make sure all the padding
3407 // bytes are set to 0.
Austin Schuh48d10d62022-10-16 22:19:23 -07003408 std::vector<uint8_t> repacked_message(PackMessageSize(type, context.size),
3409 67);
Austin Schuhfa30c352022-10-16 11:12:02 -07003410
3411 // And verify packing inline works as expected.
Austin Schuh71a40d42023-02-04 21:22:22 -08003412 EXPECT_EQ(
3413 repacked_message.size(),
3414 PackMessageInline(repacked_message.data(), context, channel_index,
3415 type, 0u, repacked_message.size()));
Austin Schuhb5224ec2024-03-27 15:20:09 -07003416 for (size_t i = 0; i < fbb.GetBufferSpan().size(); ++i) {
3417 ASSERT_EQ(absl::Span<uint8_t>(repacked_message)[i],
3418 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3419 fbb.GetBufferSpan().size())[i])
3420 << ": On index " << i;
3421 }
3422 ASSERT_EQ(absl::Span<uint8_t>(repacked_message),
Austin Schuhfa30c352022-10-16 11:12:02 -07003423 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3424 fbb.GetBufferSpan().size()))
3425 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
Austin Schuhb5224ec2024-03-27 15:20:09 -07003426 fbb.GetBufferSpan())
3427 << " for log type " << static_cast<int>(type);
Austin Schuh71a40d42023-02-04 21:22:22 -08003428
3429 // Ok, now we want to confirm that we can build up arbitrary pieces of
3430 // said flatbuffer. Try all of them since it is cheap.
Austin Schuh99f7c6a2024-06-25 22:07:44 -07003431 TimeEventLoop event_loop(&config_.message());
Austin Schuh71a40d42023-02-04 21:22:22 -08003432 for (size_t i = 0; i < repacked_message.size(); i += 8) {
3433 for (size_t j = i; j < repacked_message.size(); j += 8) {
3434 std::vector<uint8_t> destination(repacked_message.size(), 67u);
3435 ContextDataCopier copier(context, channel_index, type, &event_loop);
3436
3437 copier.Copy(destination.data(), i, j);
3438
3439 size_t index = 0;
3440 for (size_t k = i; k < j; ++k) {
3441 ASSERT_EQ(destination[index], repacked_message[k])
3442 << ": Failed to match type " << static_cast<int>(type)
3443 << ", index " << index << " while testing range " << i << " to "
3444 << j;
3445 ;
3446 ++index;
3447 }
3448 // Now, confirm that none of the other bytes have been touched.
3449 for (; index < destination.size(); ++index) {
3450 ASSERT_EQ(destination[index], 67u);
3451 }
3452 }
3453 }
Austin Schuhfa30c352022-10-16 11:12:02 -07003454 }
3455 }
3456}
3457
Austin Schuhf2d0e682022-10-16 14:20:58 -07003458// Tests that all variations of PackMessage are equivilent to the inline
3459// PackMessage used to avoid allocations.
3460TEST_F(InlinePackMessage, RemoteEquivilent) {
3461 aos::FlatbufferVector<reflection::Schema> schema =
3462 FileToFlatbuffer<reflection::Schema>(
3463 ArtifactPath("aos/events/logging/logger.bfbs"));
3464 std::uniform_int_distribution<uint8_t> uint8_distribution(
3465 std::numeric_limits<uint8_t>::min(), std::numeric_limits<uint8_t>::max());
3466
3467 for (int i = 0; i < 100; ++i) {
3468 aos::SizePrefixedFlatbufferDetachedBuffer<RemoteMessage> random_msg =
3469 RandomRemoteMessage();
3470 const size_t channel_index = uint8_distribution(random_number_generator_);
3471 const monotonic_clock::time_point monotonic_timestamp_time =
3472 RandomMonotonic();
3473
3474 flatbuffers::FlatBufferBuilder fbb;
3475 fbb.ForceDefaults(true);
3476 fbb.FinishSizePrefixed(PackRemoteMessage(
3477 &fbb, &random_msg.message(), channel_index, monotonic_timestamp_time));
3478
3479 VLOG(1) << absl::BytesToHexString(std::string_view(
3480 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3481 fbb.GetBufferSpan().size()));
3482
3483 // Make sure that both the builder and inline method agree on sizes.
3484 ASSERT_EQ(fbb.GetSize(), PackRemoteMessageSize());
3485
3486 // Initialize the buffer to something nonzer to make sure all the padding
3487 // bytes are set to 0.
3488 std::vector<uint8_t> repacked_message(PackRemoteMessageSize(), 67);
3489
3490 // And verify packing inline works as expected.
Austin Schuh71a40d42023-02-04 21:22:22 -08003491 EXPECT_EQ(repacked_message.size(),
3492 PackRemoteMessageInline(
3493 repacked_message.data(), &random_msg.message(), channel_index,
3494 monotonic_timestamp_time, 0u, repacked_message.size()));
Austin Schuhf2d0e682022-10-16 14:20:58 -07003495 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
3496 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3497 fbb.GetBufferSpan().size()))
3498 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
3499 fbb.GetBufferSpan());
Austin Schuh71a40d42023-02-04 21:22:22 -08003500
3501 // Ok, now we want to confirm that we can build up arbitrary pieces of said
3502 // flatbuffer. Try all of them since it is cheap.
Austin Schuh99f7c6a2024-06-25 22:07:44 -07003503 TimeEventLoop event_loop(&config_.message());
Austin Schuh71a40d42023-02-04 21:22:22 -08003504 for (size_t i = 0; i < repacked_message.size(); i += 8) {
3505 for (size_t j = i; j < repacked_message.size(); j += 8) {
3506 std::vector<uint8_t> destination(repacked_message.size(), 67u);
3507 RemoteMessageCopier copier(&random_msg.message(), channel_index,
3508 monotonic_timestamp_time, &event_loop);
3509
3510 copier.Copy(destination.data(), i, j);
3511
3512 size_t index = 0;
3513 for (size_t k = i; k < j; ++k) {
3514 ASSERT_EQ(destination[index], repacked_message[k]);
3515 ++index;
3516 }
3517 for (; index < destination.size(); ++index) {
3518 ASSERT_EQ(destination[index], 67u);
3519 }
3520 }
3521 }
Austin Schuhf2d0e682022-10-16 14:20:58 -07003522 }
3523}
Austin Schuhfa30c352022-10-16 11:12:02 -07003524
Stephan Pleinesf63bde82024-01-13 15:59:33 -08003525} // namespace aos::logger::testing