blob: 4ceca20458cf75550fabbec240216b482b5a6f3a [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
Alexei Strots01395492023-03-20 13:59:56 -07004#include <filesystem>
Austin Schuhfa30c352022-10-16 11:12:02 -07005#include <random>
Austin Schuhe243aaf2020-10-11 15:46:02 -07006#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07007
Austin Schuhfa30c352022-10-16 11:12:02 -07008#include "absl/strings/escaping.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07009#include "external/com_github_google_flatbuffers/src/annotated_binary_text_gen.h"
10#include "external/com_github_google_flatbuffers/src/binary_annotator.h"
11#include "flatbuffers/reflection_generated.h"
12#include "gflags/gflags.h"
13#include "gtest/gtest.h"
14
Austin Schuhc41603c2020-10-11 16:17:37 -070015#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -070016#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -080017#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070018#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070019#include "aos/json_to_flatbuffer.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070020#include "aos/testing/path.h"
21#include "aos/testing/random_seed.h"
Austin Schuhc243b422020-10-11 15:35:08 -070022#include "aos/testing/tmpdir.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070023#include "aos/util/file.h"
Austin Schuhc243b422020-10-11 15:35:08 -070024
Stephan Pleinesf63bde82024-01-13 15:59:33 -080025namespace aos::logger::testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070026namespace chrono = std::chrono;
Austin Schuhf2d0e682022-10-16 14:20:58 -070027using aos::message_bridge::RemoteMessage;
Eric Schmiedebergb38477e2022-12-02 16:08:04 -070028using aos::testing::ArtifactPath;
Austin Schuhc243b422020-10-11 15:35:08 -070029
Austin Schuhd863e6e2022-10-16 15:44:50 -070030// Adapter class to make it easy to test DetachedBufferWriter without adding
31// test only boilerplate to DetachedBufferWriter.
Alexei Strots15c22b12023-04-04 16:27:17 -070032class TestDetachedBufferWriter : public FileBackend,
33 public DetachedBufferWriter {
Austin Schuhd863e6e2022-10-16 15:44:50 -070034 public:
Austin Schuh48d10d62022-10-16 22:19:23 -070035 // Pick a max size that is rather conservative.
36 static constexpr size_t kMaxMessageSize = 128 * 1024;
Austin Schuhd863e6e2022-10-16 15:44:50 -070037 TestDetachedBufferWriter(std::string_view filename)
colleen61276dc2023-06-01 09:23:29 -070038 : FileBackend("/", false),
Alexei Strots15c22b12023-04-04 16:27:17 -070039 DetachedBufferWriter(FileBackend::RequestFile(filename),
40 std::make_unique<DummyEncoder>(kMaxMessageSize)) {}
Austin Schuhd863e6e2022-10-16 15:44:50 -070041 void WriteSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
42 QueueSpan(absl::Span<const uint8_t>(buffer.data(), buffer.size()));
43 }
Austin Schuh7ef11a42023-02-04 17:15:12 -080044 void QueueSpan(absl::Span<const uint8_t> buffer) {
45 DataEncoder::SpanCopier coppier(buffer);
46 CopyMessage(&coppier, monotonic_clock::now());
47 }
Austin Schuhd863e6e2022-10-16 15:44:50 -070048};
49
Austin Schuhe243aaf2020-10-11 15:46:02 -070050// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070051template <typename T>
52SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
53 const std::string_view data) {
54 flatbuffers::FlatBufferBuilder fbb;
55 fbb.ForceDefaults(true);
56 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
57 return fbb.Release();
58}
59
Austin Schuhe243aaf2020-10-11 15:46:02 -070060// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070061TEST(SpanReaderTest, ReadWrite) {
62 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
63 unlink(logfile.c_str());
64
65 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080066 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070067 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080068 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070069
70 {
Austin Schuhd863e6e2022-10-16 15:44:50 -070071 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080072 writer.QueueSpan(m1.span());
73 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070074 }
75
76 SpanReader reader(logfile);
77
78 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhcf5f6442021-07-06 10:43:28 -070079 EXPECT_EQ(reader.PeekMessage(), m1.span());
80 EXPECT_EQ(reader.PeekMessage(), m1.span());
Austin Schuhadd6eb32020-11-09 21:24:26 -080081 EXPECT_EQ(reader.ReadMessage(), m1.span());
82 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhcf5f6442021-07-06 10:43:28 -070083 EXPECT_EQ(reader.PeekMessage(), absl::Span<const uint8_t>());
Austin Schuhc243b422020-10-11 15:35:08 -070084 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
85}
86
Austin Schuhe243aaf2020-10-11 15:46:02 -070087// Tests that we can actually parse the resulting messages at a basic level
88// through MessageReader.
89TEST(MessageReaderTest, ReadWrite) {
90 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
91 unlink(logfile.c_str());
92
93 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
94 JsonToSizedFlatbuffer<LogFileHeader>(
95 R"({ "max_out_of_order_duration": 100000000 })");
96 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
97 JsonToSizedFlatbuffer<MessageHeader>(
98 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
99 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
100 JsonToSizedFlatbuffer<MessageHeader>(
101 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
102
103 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700104 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800105 writer.QueueSpan(config.span());
106 writer.QueueSpan(m1.span());
107 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -0700108 }
109
110 MessageReader reader(logfile);
111
112 EXPECT_EQ(reader.filename(), logfile);
113
114 EXPECT_EQ(
115 reader.max_out_of_order_duration(),
116 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
117 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
118 EXPECT_TRUE(reader.ReadMessage());
119 EXPECT_EQ(reader.newest_timestamp(),
120 monotonic_clock::time_point(chrono::nanoseconds(1)));
121 EXPECT_TRUE(reader.ReadMessage());
122 EXPECT_EQ(reader.newest_timestamp(),
123 monotonic_clock::time_point(chrono::nanoseconds(2)));
124 EXPECT_FALSE(reader.ReadMessage());
125}
126
Austin Schuh32f68492020-11-08 21:45:51 -0800127// Tests that we explode when messages are too far out of order.
128TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
129 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
130 unlink(logfile0.c_str());
131
132 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
133 JsonToSizedFlatbuffer<LogFileHeader>(
134 R"({
135 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800136 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800137 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
138 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
139 "parts_index": 0
140})");
141
142 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
143 JsonToSizedFlatbuffer<MessageHeader>(
144 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
145 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
146 JsonToSizedFlatbuffer<MessageHeader>(
147 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
148 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
149 JsonToSizedFlatbuffer<MessageHeader>(
150 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
151
152 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700153 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800154 writer.QueueSpan(config0.span());
155 writer.QueueSpan(m1.span());
156 writer.QueueSpan(m2.span());
157 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800158 }
Alexei Strots01395492023-03-20 13:59:56 -0700159 ASSERT_TRUE(std::filesystem::exists(logfile0)) << logfile0;
Austin Schuh32f68492020-11-08 21:45:51 -0800160
161 const std::vector<LogFile> parts = SortParts({logfile0});
162
163 PartsMessageReader reader(parts[0].parts[0]);
164
165 EXPECT_TRUE(reader.ReadMessage());
166 EXPECT_TRUE(reader.ReadMessage());
167 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
168}
169
Austin Schuhc41603c2020-10-11 16:17:37 -0700170// Tests that we can transparently re-assemble part files with a
171// PartsMessageReader.
172TEST(PartsMessageReaderTest, ReadWrite) {
173 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
174 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
175 unlink(logfile0.c_str());
176 unlink(logfile1.c_str());
177
178 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
179 JsonToSizedFlatbuffer<LogFileHeader>(
180 R"({
181 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800182 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700183 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
184 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
185 "parts_index": 0
186})");
187 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
188 JsonToSizedFlatbuffer<LogFileHeader>(
189 R"({
190 "max_out_of_order_duration": 200000000,
191 "monotonic_start_time": 0,
192 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800193 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700194 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
195 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
196 "parts_index": 1
197})");
198
199 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
200 JsonToSizedFlatbuffer<MessageHeader>(
201 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
202 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
203 JsonToSizedFlatbuffer<MessageHeader>(
204 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
205
206 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700207 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800208 writer.QueueSpan(config0.span());
209 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700210 }
211 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700212 TestDetachedBufferWriter writer(logfile1);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800213 writer.QueueSpan(config1.span());
214 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700215 }
216
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700217 // When parts are sorted, we choose the highest max out of order duration for
218 // all parts with the same part uuid.
Austin Schuhc41603c2020-10-11 16:17:37 -0700219 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
220
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700221 EXPECT_EQ(parts.size(), 1);
222 EXPECT_EQ(parts[0].parts.size(), 1);
223
Austin Schuhc41603c2020-10-11 16:17:37 -0700224 PartsMessageReader reader(parts[0].parts[0]);
225
226 EXPECT_EQ(reader.filename(), logfile0);
227
228 // Confirm that the timestamps track, and the filename also updates.
229 // Read the first message.
230 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700231 // Since config1 has higher max out of order duration, that will be used to
232 // read partfiles with same part uuid, i.e logfile0 and logfile1.
Austin Schuhc41603c2020-10-11 16:17:37 -0700233 EXPECT_EQ(
234 reader.max_out_of_order_duration(),
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700235 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuhc41603c2020-10-11 16:17:37 -0700236 EXPECT_TRUE(reader.ReadMessage());
237 EXPECT_EQ(reader.filename(), logfile0);
238 EXPECT_EQ(reader.newest_timestamp(),
239 monotonic_clock::time_point(chrono::nanoseconds(1)));
240 EXPECT_EQ(
241 reader.max_out_of_order_duration(),
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700242 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuhc41603c2020-10-11 16:17:37 -0700243
244 // Read the second message.
245 EXPECT_TRUE(reader.ReadMessage());
246 EXPECT_EQ(reader.filename(), logfile1);
247 EXPECT_EQ(reader.newest_timestamp(),
248 monotonic_clock::time_point(chrono::nanoseconds(2)));
249 EXPECT_EQ(
250 reader.max_out_of_order_duration(),
251 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
252
253 // And then confirm that reading again returns no message.
254 EXPECT_FALSE(reader.ReadMessage());
255 EXPECT_EQ(reader.filename(), logfile1);
256 EXPECT_EQ(
257 reader.max_out_of_order_duration(),
258 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800259 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700260
261 // Verify that the parts metadata has the correct max out of order duration.
262 EXPECT_EQ(
263 parts[0].parts[0].max_out_of_order_duration,
264 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuhc41603c2020-10-11 16:17:37 -0700265}
Austin Schuh32f68492020-11-08 21:45:51 -0800266
Austin Schuh1be0ce42020-11-29 22:43:26 -0800267// Tests that Message's operator < works as expected.
268TEST(MessageTest, Sorting) {
269 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
270
271 Message m1{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700272 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700273 .timestamp =
274 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
Austin Schuh48507722021-07-17 17:29:24 -0700275 .monotonic_remote_boot = 0xffffff,
276 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700277 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800278 Message m2{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700279 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700280 .timestamp =
281 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
Austin Schuh48507722021-07-17 17:29:24 -0700282 .monotonic_remote_boot = 0xffffff,
283 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700284 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800285
286 EXPECT_LT(m1, m2);
287 EXPECT_GE(m2, m1);
288
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700289 m1.timestamp.time = e;
290 m2.timestamp.time = e;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800291
292 m1.channel_index = 1;
293 m2.channel_index = 2;
294
295 EXPECT_LT(m1, m2);
296 EXPECT_GE(m2, m1);
297
298 m1.channel_index = 0;
299 m2.channel_index = 0;
Austin Schuh58646e22021-08-23 23:51:46 -0700300 m1.queue_index.index = 0u;
301 m2.queue_index.index = 1u;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800302
303 EXPECT_LT(m1, m2);
304 EXPECT_GE(m2, m1);
305}
306
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800307aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
308 const aos::FlatbufferDetachedBuffer<Configuration> &config,
309 const std::string_view json) {
310 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700311 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800312 flatbuffers::Offset<Configuration> config_offset =
313 aos::CopyFlatBuffer(config, &fbb);
314 LogFileHeader::Builder header_builder(fbb);
315 header_builder.add_configuration(config_offset);
316 fbb.Finish(header_builder.Finish());
317 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
318
319 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
320 JsonToFlatbuffer<LogFileHeader>(json));
321 CHECK(header_updates.Verify());
322 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700323 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800324 fbb2.FinishSizePrefixed(
325 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
326 return fbb2.Release();
327}
328
Philipp Schrader416505b2024-03-28 11:59:45 -0700329// Allows for some customization of a SortingElementTest.
330enum class SortingElementConfig {
331 // Create a single node configuration.
332 kSingleNode,
333 // Create a multi-node configuration.
334 kMultiNode,
335};
336
337template <SortingElementConfig sorting_element_config =
338 SortingElementConfig::kMultiNode>
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800339class SortingElementTest : public ::testing::Test {
340 public:
341 SortingElementTest()
342 : config_(JsonToFlatbuffer<Configuration>(
Philipp Schrader416505b2024-03-28 11:59:45 -0700343 sorting_element_config == SortingElementConfig::kSingleNode ?
344 R"({
345 "channels": [
346 {
347 "name": "/a",
348 "type": "aos.logger.testing.TestMessage"
349 },
350 {
351 "name": "/b",
352 "type": "aos.logger.testing.TestMessage"
353 },
354 {
355 "name": "/c",
356 "type": "aos.logger.testing.TestMessage"
357 },
358 {
359 "name": "/d",
360 "type": "aos.logger.testing.TestMessage"
361 }
362 ]
363}
364)"
365 :
366 R"({
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800367 "channels": [
368 {
369 "name": "/a",
370 "type": "aos.logger.testing.TestMessage",
371 "source_node": "pi1",
372 "destination_nodes": [
373 {
374 "name": "pi2"
375 },
376 {
377 "name": "pi3"
378 }
379 ]
380 },
381 {
382 "name": "/b",
383 "type": "aos.logger.testing.TestMessage",
384 "source_node": "pi1"
385 },
386 {
387 "name": "/c",
388 "type": "aos.logger.testing.TestMessage",
389 "source_node": "pi1"
Austin Schuh48507722021-07-17 17:29:24 -0700390 },
391 {
392 "name": "/d",
393 "type": "aos.logger.testing.TestMessage",
394 "source_node": "pi2",
395 "destination_nodes": [
396 {
397 "name": "pi1"
398 }
399 ]
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800400 }
401 ],
402 "nodes": [
403 {
404 "name": "pi1"
405 },
406 {
407 "name": "pi2"
408 },
409 {
410 "name": "pi3"
411 }
412 ]
413}
414)")),
Philipp Schrader416505b2024-03-28 11:59:45 -0700415 config0_(MakeHeader(
416 config_, sorting_element_config == SortingElementConfig::kSingleNode
417 ?
418 R"({
419 /* 100ms */
420 "max_out_of_order_duration": 100000000,
421 "node": {
422 "name": "pi1"
423 },
424 "logger_node": {
425 "name": "pi1"
426 },
427 "monotonic_start_time": 1000000,
428 "realtime_start_time": 1000000000000,
429 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
430 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
431 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
432 "boot_uuids": [
433 "1d782c63-b3c7-466e-bea9-a01308b43333",
434 ],
435 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
436 "parts_index": 0
437})"
438 :
439 R"({
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800440 /* 100ms */
441 "max_out_of_order_duration": 100000000,
442 "node": {
443 "name": "pi1"
444 },
445 "logger_node": {
446 "name": "pi1"
447 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800448 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800449 "realtime_start_time": 1000000000000,
450 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700451 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
452 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
453 "boot_uuids": [
454 "1d782c63-b3c7-466e-bea9-a01308b43333",
455 "",
456 ""
457 ],
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800458 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
459 "parts_index": 0
460})")),
461 config1_(MakeHeader(config_,
462 R"({
463 /* 100ms */
464 "max_out_of_order_duration": 100000000,
465 "node": {
466 "name": "pi1"
467 },
468 "logger_node": {
469 "name": "pi1"
470 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800471 "monotonic_start_time": 1000000,
472 "realtime_start_time": 1000000000000,
473 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700474 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
475 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
476 "boot_uuids": [
477 "1d782c63-b3c7-466e-bea9-a01308b43333",
478 "",
479 ""
480 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800481 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
482 "parts_index": 0
483})")),
484 config2_(MakeHeader(config_,
485 R"({
486 /* 100ms */
487 "max_out_of_order_duration": 100000000,
488 "node": {
489 "name": "pi2"
490 },
491 "logger_node": {
492 "name": "pi2"
493 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800494 "monotonic_start_time": 0,
495 "realtime_start_time": 1000000000000,
Austin Schuh48507722021-07-17 17:29:24 -0700496 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
497 "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
498 "boot_uuids": [
499 "",
500 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
501 ""
502 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800503 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
504 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
505 "parts_index": 0
506})")),
507 config3_(MakeHeader(config_,
508 R"({
509 /* 100ms */
510 "max_out_of_order_duration": 100000000,
511 "node": {
512 "name": "pi1"
513 },
514 "logger_node": {
515 "name": "pi1"
516 },
517 "monotonic_start_time": 2000000,
518 "realtime_start_time": 1000000000,
519 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
Austin Schuh48507722021-07-17 17:29:24 -0700520 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
521 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
522 "boot_uuids": [
523 "1d782c63-b3c7-466e-bea9-a01308b43333",
524 "",
525 ""
526 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800527 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800528 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800529})")),
530 config4_(MakeHeader(config_,
531 R"({
532 /* 100ms */
533 "max_out_of_order_duration": 100000000,
534 "node": {
535 "name": "pi2"
536 },
537 "logger_node": {
538 "name": "pi1"
539 },
540 "monotonic_start_time": 2000000,
541 "realtime_start_time": 1000000000,
542 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
543 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
Austin Schuh48507722021-07-17 17:29:24 -0700544 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
545 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
546 "boot_uuids": [
547 "1d782c63-b3c7-466e-bea9-a01308b43333",
548 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
549 ""
550 ],
Austin Schuh8bf1e632021-01-02 22:41:04 -0800551 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800552})")) {
553 unlink(logfile0_.c_str());
554 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800555 unlink(logfile2_.c_str());
Austin Schuh48507722021-07-17 17:29:24 -0700556 unlink(logfile3_.c_str());
Austin Schuh921b2562021-07-31 19:37:03 -0700557 queue_index_.resize(config_.message().channels()->size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800558 }
559
560 protected:
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800561 flatbuffers::DetachedBuffer MakeLogMessage(
562 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
563 int value) {
564 flatbuffers::FlatBufferBuilder message_fbb;
565 message_fbb.ForceDefaults(true);
566 TestMessage::Builder test_message_builder(message_fbb);
567 test_message_builder.add_value(value);
568 message_fbb.Finish(test_message_builder.Finish());
569
570 aos::Context context;
571 context.monotonic_event_time = monotonic_now;
572 context.realtime_event_time = aos::realtime_clock::epoch() +
573 chrono::seconds(1000) +
574 monotonic_now.time_since_epoch();
Austin Schuh921b2562021-07-31 19:37:03 -0700575 CHECK_LT(static_cast<size_t>(channel_index), queue_index_.size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800576 context.queue_index = queue_index_[channel_index];
577 context.size = message_fbb.GetSize();
578 context.data = message_fbb.GetBufferPointer();
579
580 ++queue_index_[channel_index];
581
582 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh53e3a812021-11-03 16:57:23 -0700583 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800584 fbb.FinishSizePrefixed(
585 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
586
587 return fbb.Release();
588 }
589
590 flatbuffers::DetachedBuffer MakeTimestampMessage(
591 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800592 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
593 monotonic_clock::time_point monotonic_timestamp_time =
594 monotonic_clock::min_time) {
595 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800596 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800597
598 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800599 fbb.ForceDefaults(true);
600
601 logger::MessageHeader::Builder message_header_builder(fbb);
602
603 message_header_builder.add_channel_index(channel_index);
604
605 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
606 100);
607 message_header_builder.add_monotonic_sent_time(
608 monotonic_sent_time.time_since_epoch().count());
609 message_header_builder.add_realtime_sent_time(
610 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
611 monotonic_sent_time.time_since_epoch())
612 .time_since_epoch()
613 .count());
614
615 message_header_builder.add_monotonic_remote_time(
616 sender_monotonic_now.time_since_epoch().count());
617 message_header_builder.add_realtime_remote_time(
618 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
619 sender_monotonic_now.time_since_epoch())
620 .time_since_epoch()
621 .count());
622 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
623 1);
624
625 if (monotonic_timestamp_time != monotonic_clock::min_time) {
626 message_header_builder.add_monotonic_timestamp_time(
627 monotonic_timestamp_time.time_since_epoch().count());
628 }
629
630 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800631 LOG(INFO) << aos::FlatbufferToJson(
632 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
633 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
634
635 return fbb.Release();
636 }
637
638 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
639 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800640 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh48507722021-07-17 17:29:24 -0700641 const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800642
643 const aos::FlatbufferDetachedBuffer<Configuration> config_;
644 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
645 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800646 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
647 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800648 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800649
650 std::vector<uint32_t> queue_index_;
651};
652
Philipp Schrader416505b2024-03-28 11:59:45 -0700653using MessageSorterTest = SortingElementTest<SortingElementConfig::kMultiNode>;
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700654using MessageSorterDeathTest = MessageSorterTest;
Philipp Schrader416505b2024-03-28 11:59:45 -0700655using PartsMergerTest = SortingElementTest<SortingElementConfig::kMultiNode>;
656using TimestampMapperTest =
657 SortingElementTest<SortingElementConfig::kMultiNode>;
658using SingleNodeTimestampMapperTest =
659 SortingElementTest<SortingElementConfig::kSingleNode>;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800660
661// Tests that we can pull messages out of a log sorted in order.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700662TEST_F(MessageSorterTest, Pull) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800663 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
664 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700665 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800666 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700667 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800668 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700669 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800670 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700671 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800672 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700673 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800674 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
675 }
676
677 const std::vector<LogFile> parts = SortParts({logfile0_});
678
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700679 MessageSorter message_sorter(parts[0].parts[0]);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800680
681 // Confirm we aren't sorted until any time until the message is popped.
682 // Peeking shouldn't change the sorted until time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700683 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800684
685 std::deque<Message> output;
686
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700687 ASSERT_TRUE(message_sorter.Front() != nullptr);
688 output.emplace_back(std::move(*message_sorter.Front()));
689 message_sorter.PopFront();
690 EXPECT_EQ(message_sorter.sorted_until(), e + chrono::milliseconds(1900));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800691
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700692 ASSERT_TRUE(message_sorter.Front() != nullptr);
693 output.emplace_back(std::move(*message_sorter.Front()));
694 message_sorter.PopFront();
695 EXPECT_EQ(message_sorter.sorted_until(), e + chrono::milliseconds(1900));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800696
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700697 ASSERT_TRUE(message_sorter.Front() != nullptr);
698 output.emplace_back(std::move(*message_sorter.Front()));
699 message_sorter.PopFront();
700 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::max_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800701
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700702 ASSERT_TRUE(message_sorter.Front() != nullptr);
703 output.emplace_back(std::move(*message_sorter.Front()));
704 message_sorter.PopFront();
705 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::max_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800706
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700707 ASSERT_TRUE(message_sorter.Front() == nullptr);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800708
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700709 EXPECT_EQ(output[0].timestamp.boot, 0);
710 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
711 EXPECT_EQ(output[1].timestamp.boot, 0);
712 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1000));
713 EXPECT_EQ(output[2].timestamp.boot, 0);
714 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1901));
715 EXPECT_EQ(output[3].timestamp.boot, 0);
716 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800717}
718
Austin Schuhb000de62020-12-03 22:00:40 -0800719// Tests that we can pull messages out of a log sorted in order.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700720TEST_F(MessageSorterTest, WayBeforeStart) {
Austin Schuhb000de62020-12-03 22:00:40 -0800721 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
722 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700723 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhb000de62020-12-03 22:00:40 -0800724 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700725 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800726 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700727 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800728 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700729 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800730 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700731 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800732 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700733 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800734 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
735 }
736
737 const std::vector<LogFile> parts = SortParts({logfile0_});
738
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700739 MessageSorter message_sorter(parts[0].parts[0]);
Austin Schuhb000de62020-12-03 22:00:40 -0800740
741 // Confirm we aren't sorted until any time until the message is popped.
742 // Peeking shouldn't change the sorted until time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700743 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
Austin Schuhb000de62020-12-03 22:00:40 -0800744
745 std::deque<Message> output;
746
747 for (monotonic_clock::time_point t :
748 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
749 e + chrono::milliseconds(1900), monotonic_clock::max_time,
750 monotonic_clock::max_time}) {
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700751 ASSERT_TRUE(message_sorter.Front() != nullptr);
752 output.emplace_back(std::move(*message_sorter.Front()));
753 message_sorter.PopFront();
754 EXPECT_EQ(message_sorter.sorted_until(), t);
Austin Schuhb000de62020-12-03 22:00:40 -0800755 }
756
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700757 ASSERT_TRUE(message_sorter.Front() == nullptr);
Austin Schuhb000de62020-12-03 22:00:40 -0800758
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700759 EXPECT_EQ(output[0].timestamp.boot, 0u);
760 EXPECT_EQ(output[0].timestamp.time, e - chrono::milliseconds(1000));
761 EXPECT_EQ(output[1].timestamp.boot, 0u);
762 EXPECT_EQ(output[1].timestamp.time, e - chrono::milliseconds(500));
763 EXPECT_EQ(output[2].timestamp.boot, 0u);
764 EXPECT_EQ(output[2].timestamp.time, e - chrono::milliseconds(10));
765 EXPECT_EQ(output[3].timestamp.boot, 0u);
766 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(1901));
767 EXPECT_EQ(output[4].timestamp.boot, 0u);
768 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuhb000de62020-12-03 22:00:40 -0800769}
770
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800771// Tests that messages too far out of order trigger death.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700772TEST_F(MessageSorterDeathTest, Pull) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800773 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
774 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700775 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800776 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700777 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800778 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700779 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800780 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700781 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800782 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
783 // The following message is too far out of order and will trigger the CHECK.
Austin Schuhd863e6e2022-10-16 15:44:50 -0700784 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800785 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
786 }
787
788 const std::vector<LogFile> parts = SortParts({logfile0_});
789
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700790 MessageSorter message_sorter(parts[0].parts[0]);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800791
792 // Confirm we aren't sorted until any time until the message is popped.
793 // Peeking shouldn't change the sorted until time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700794 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800795 std::deque<Message> output;
796
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700797 ASSERT_TRUE(message_sorter.Front() != nullptr);
798 message_sorter.PopFront();
799 ASSERT_TRUE(message_sorter.Front() != nullptr);
800 ASSERT_TRUE(message_sorter.Front() != nullptr);
801 message_sorter.PopFront();
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800802
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700803 EXPECT_DEATH({ message_sorter.Front(); },
Austin Schuh58646e22021-08-23 23:51:46 -0700804 "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800805}
806
Austin Schuh8f52ed52020-11-30 23:12:39 -0800807// Tests that we can merge data from 2 separate files, including duplicate data.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700808TEST_F(PartsMergerTest, TwoFileMerger) {
Austin Schuh8f52ed52020-11-30 23:12:39 -0800809 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
810 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700811 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800812 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700813 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800814 writer1.QueueSpan(config1_.span());
815
Austin Schuhd863e6e2022-10-16 15:44:50 -0700816 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800817 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700818 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800819 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
820
Austin Schuhd863e6e2022-10-16 15:44:50 -0700821 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800822 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700823 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800824 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
825
826 // Make a duplicate!
827 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
828 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
829 writer0.QueueSpan(msg.span());
830 writer1.QueueSpan(msg.span());
831
Austin Schuhd863e6e2022-10-16 15:44:50 -0700832 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800833 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
834 }
835
836 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -0700837 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -0800838 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800839
Austin Schuh63097262023-08-16 17:04:29 -0700840 PartsMerger merger(
841 log_files.SelectParts("pi1", 0,
842 {StoredDataType::DATA, StoredDataType::TIMESTAMPS,
843 StoredDataType::REMOTE_TIMESTAMPS}));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800844
845 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
846
847 std::deque<Message> output;
848
849 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
850 ASSERT_TRUE(merger.Front() != nullptr);
851 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
852
853 output.emplace_back(std::move(*merger.Front()));
854 merger.PopFront();
855 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
856
857 ASSERT_TRUE(merger.Front() != nullptr);
858 output.emplace_back(std::move(*merger.Front()));
859 merger.PopFront();
860 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
861
862 ASSERT_TRUE(merger.Front() != nullptr);
863 output.emplace_back(std::move(*merger.Front()));
864 merger.PopFront();
865 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
866
867 ASSERT_TRUE(merger.Front() != nullptr);
868 output.emplace_back(std::move(*merger.Front()));
869 merger.PopFront();
870 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
871
872 ASSERT_TRUE(merger.Front() != nullptr);
873 output.emplace_back(std::move(*merger.Front()));
874 merger.PopFront();
875 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
876
877 ASSERT_TRUE(merger.Front() != nullptr);
878 output.emplace_back(std::move(*merger.Front()));
879 merger.PopFront();
880 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
881
882 ASSERT_TRUE(merger.Front() == nullptr);
883
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700884 EXPECT_EQ(output[0].timestamp.boot, 0u);
885 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
886 EXPECT_EQ(output[1].timestamp.boot, 0u);
887 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1001));
888 EXPECT_EQ(output[2].timestamp.boot, 0u);
889 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1002));
890 EXPECT_EQ(output[3].timestamp.boot, 0u);
891 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
892 EXPECT_EQ(output[4].timestamp.boot, 0u);
893 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(3000));
894 EXPECT_EQ(output[5].timestamp.boot, 0u);
895 EXPECT_EQ(output[5].timestamp.time, e + chrono::milliseconds(3002));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800896}
897
Austin Schuh8bf1e632021-01-02 22:41:04 -0800898// Tests that we can merge timestamps with various combinations of
899// monotonic_timestamp_time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700900TEST_F(PartsMergerTest, TwoFileTimestampMerger) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800901 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
902 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700903 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800904 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700905 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800906 writer1.QueueSpan(config1_.span());
907
908 // Neither has it.
909 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700910 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800911 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700912 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800913 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
914
915 // First only has it.
916 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700917 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800918 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
919 e + chrono::nanoseconds(971)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700920 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800921 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
922
923 // Second only has it.
924 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700925 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800926 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700927 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800928 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
929 e + chrono::nanoseconds(972)));
930
931 // Both have it.
932 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700933 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800934 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
935 e + chrono::nanoseconds(973)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700936 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800937 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
938 e + chrono::nanoseconds(973)));
939 }
940
941 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -0700942 LogFilesContainer log_files(parts);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800943 ASSERT_EQ(parts.size(), 1u);
944
Austin Schuh63097262023-08-16 17:04:29 -0700945 PartsMerger merger(
946 log_files.SelectParts("pi1", 0,
947 {StoredDataType::DATA, StoredDataType::TIMESTAMPS,
948 StoredDataType::REMOTE_TIMESTAMPS}));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800949
950 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
951
952 std::deque<Message> output;
953
954 for (int i = 0; i < 4; ++i) {
955 ASSERT_TRUE(merger.Front() != nullptr);
956 output.emplace_back(std::move(*merger.Front()));
957 merger.PopFront();
958 }
959 ASSERT_TRUE(merger.Front() == nullptr);
960
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700961 EXPECT_EQ(output[0].timestamp.boot, 0u);
962 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700963 EXPECT_FALSE(output[0].data->has_monotonic_timestamp_time);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700964
965 EXPECT_EQ(output[1].timestamp.boot, 0u);
966 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700967 EXPECT_TRUE(output[1].data->has_monotonic_timestamp_time);
968 EXPECT_EQ(output[1].data->monotonic_timestamp_time,
969 monotonic_clock::time_point(std::chrono::nanoseconds(971)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700970
971 EXPECT_EQ(output[2].timestamp.boot, 0u);
972 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700973 EXPECT_TRUE(output[2].data->has_monotonic_timestamp_time);
974 EXPECT_EQ(output[2].data->monotonic_timestamp_time,
975 monotonic_clock::time_point(std::chrono::nanoseconds(972)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700976
977 EXPECT_EQ(output[3].timestamp.boot, 0u);
978 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700979 EXPECT_TRUE(output[3].data->has_monotonic_timestamp_time);
980 EXPECT_EQ(output[3].data->monotonic_timestamp_time,
981 monotonic_clock::time_point(std::chrono::nanoseconds(973)));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800982}
983
Austin Schuhd2f96102020-12-01 20:27:29 -0800984// Tests that we can match timestamps on delivered messages.
985TEST_F(TimestampMapperTest, ReadNode0First) {
986 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
987 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700988 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800989 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700990 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800991 writer1.QueueSpan(config2_.span());
992
Austin Schuhd863e6e2022-10-16 15:44:50 -0700993 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800994 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700995 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800996 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
997
Austin Schuhd863e6e2022-10-16 15:44:50 -0700998 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800999 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001000 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001001 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1002
Austin Schuhd863e6e2022-10-16 15:44:50 -07001003 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001004 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001005 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001006 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1007 }
1008
1009 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001010 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001011 ASSERT_EQ(parts[0].logger_node, "pi1");
1012 ASSERT_EQ(parts[1].logger_node, "pi2");
1013
Austin Schuh79b30942021-01-24 22:32:21 -08001014 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001015
Austin Schuh63097262023-08-16 17:04:29 -07001016 TimestampMapper mapper0("pi1", log_files,
1017 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001018 mapper0.set_timestamp_callback(
1019 [&](TimestampedMessage *) { ++mapper0_count; });
1020 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001021 TimestampMapper mapper1("pi2", log_files,
1022 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001023 mapper1.set_timestamp_callback(
1024 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001025
1026 mapper0.AddPeer(&mapper1);
1027 mapper1.AddPeer(&mapper0);
1028
1029 {
1030 std::deque<TimestampedMessage> output0;
1031
Austin Schuh79b30942021-01-24 22:32:21 -08001032 EXPECT_EQ(mapper0_count, 0u);
1033 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001034 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001035 EXPECT_EQ(mapper0_count, 1u);
1036 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001037 output0.emplace_back(std::move(*mapper0.Front()));
1038 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001039 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -08001040 EXPECT_EQ(mapper0_count, 1u);
1041 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001042
1043 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001044 EXPECT_EQ(mapper0_count, 2u);
1045 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001046 output0.emplace_back(std::move(*mapper0.Front()));
1047 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001048 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001049
1050 ASSERT_TRUE(mapper0.Front() != nullptr);
1051 output0.emplace_back(std::move(*mapper0.Front()));
1052 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001053 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001054
Austin Schuh79b30942021-01-24 22:32:21 -08001055 EXPECT_EQ(mapper0_count, 3u);
1056 EXPECT_EQ(mapper1_count, 0u);
1057
Austin Schuhd2f96102020-12-01 20:27:29 -08001058 ASSERT_TRUE(mapper0.Front() == nullptr);
1059
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001060 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1061 EXPECT_EQ(output0[0].monotonic_event_time.time,
1062 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001063 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001064
1065 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1066 EXPECT_EQ(output0[1].monotonic_event_time.time,
1067 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001068 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001069
1070 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1071 EXPECT_EQ(output0[2].monotonic_event_time.time,
1072 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001073 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001074 }
1075
1076 {
1077 SCOPED_TRACE("Trying node1 now");
1078 std::deque<TimestampedMessage> output1;
1079
Austin Schuh79b30942021-01-24 22:32:21 -08001080 EXPECT_EQ(mapper0_count, 3u);
1081 EXPECT_EQ(mapper1_count, 0u);
1082
Austin Schuhd2f96102020-12-01 20:27:29 -08001083 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001084 EXPECT_EQ(mapper0_count, 3u);
1085 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001086 output1.emplace_back(std::move(*mapper1.Front()));
1087 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001088 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -08001089 EXPECT_EQ(mapper0_count, 3u);
1090 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001091
1092 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001093 EXPECT_EQ(mapper0_count, 3u);
1094 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001095 output1.emplace_back(std::move(*mapper1.Front()));
1096 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001097 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001098
1099 ASSERT_TRUE(mapper1.Front() != nullptr);
1100 output1.emplace_back(std::move(*mapper1.Front()));
1101 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001102 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001103
Austin Schuh79b30942021-01-24 22:32:21 -08001104 EXPECT_EQ(mapper0_count, 3u);
1105 EXPECT_EQ(mapper1_count, 3u);
1106
Austin Schuhd2f96102020-12-01 20:27:29 -08001107 ASSERT_TRUE(mapper1.Front() == nullptr);
1108
Austin Schuh79b30942021-01-24 22:32:21 -08001109 EXPECT_EQ(mapper0_count, 3u);
1110 EXPECT_EQ(mapper1_count, 3u);
1111
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001112 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1113 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001114 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001115 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001116
1117 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1118 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001119 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001120 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001121
1122 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1123 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001124 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001125 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001126 }
1127}
1128
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001129// Tests that we filter messages using the channel filter callback
1130TEST_F(TimestampMapperTest, ReplayChannelsCallbackTest) {
1131 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1132 {
1133 TestDetachedBufferWriter writer0(logfile0_);
1134 writer0.QueueSpan(config0_.span());
1135 TestDetachedBufferWriter writer1(logfile1_);
1136 writer1.QueueSpan(config2_.span());
1137
1138 writer0.WriteSizedFlatbuffer(
1139 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1140 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1141 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1142
1143 writer0.WriteSizedFlatbuffer(
1144 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1145 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1146 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1147
1148 writer0.WriteSizedFlatbuffer(
1149 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1150 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1151 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1152 }
1153
1154 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001155 LogFilesContainer log_files(parts);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001156 ASSERT_EQ(parts[0].logger_node, "pi1");
1157 ASSERT_EQ(parts[1].logger_node, "pi2");
1158
1159 // mapper0 will not provide any messages while mapper1 will provide all
1160 // messages due to the channel filter callbacks used
1161 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001162
Austin Schuh63097262023-08-16 17:04:29 -07001163 TimestampMapper mapper0("pi1", log_files,
1164 TimestampQueueStrategy::kQueueTogether);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001165 mapper0.set_timestamp_callback(
1166 [&](TimestampedMessage *) { ++mapper0_count; });
1167 mapper0.set_replay_channels_callback(
1168 [&](const TimestampedMessage &) -> bool { return mapper0_count != 2; });
1169 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001170 TimestampMapper mapper1("pi2", log_files,
1171 TimestampQueueStrategy::kQueueTogether);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001172 mapper1.set_timestamp_callback(
1173 [&](TimestampedMessage *) { ++mapper1_count; });
1174 mapper1.set_replay_channels_callback(
1175 [&](const TimestampedMessage &) -> bool { return mapper1_count != 2; });
1176
1177 mapper0.AddPeer(&mapper1);
1178 mapper1.AddPeer(&mapper0);
1179
1180 {
1181 std::deque<TimestampedMessage> output0;
1182
1183 EXPECT_EQ(mapper0_count, 0u);
1184 EXPECT_EQ(mapper1_count, 0u);
1185
1186 ASSERT_TRUE(mapper0.Front() != nullptr);
1187 EXPECT_EQ(mapper0_count, 1u);
1188 EXPECT_EQ(mapper1_count, 0u);
1189 output0.emplace_back(std::move(*mapper0.Front()));
1190 mapper0.PopFront();
1191
1192 EXPECT_TRUE(mapper0.started());
1193 EXPECT_EQ(mapper0_count, 1u);
1194 EXPECT_EQ(mapper1_count, 0u);
1195
1196 // mapper0_count is now at 3 since the second message is not queued, but
1197 // timestamp_callback needs to be called everytime even if Front() does not
1198 // provide a message due to the replay_channels_callback.
1199 ASSERT_TRUE(mapper0.Front() != nullptr);
1200 EXPECT_EQ(mapper0_count, 3u);
1201 EXPECT_EQ(mapper1_count, 0u);
1202 output0.emplace_back(std::move(*mapper0.Front()));
1203 mapper0.PopFront();
1204
1205 EXPECT_TRUE(mapper0.started());
1206 EXPECT_EQ(mapper0_count, 3u);
1207 EXPECT_EQ(mapper1_count, 0u);
1208
1209 ASSERT_TRUE(mapper0.Front() == nullptr);
1210 EXPECT_TRUE(mapper0.started());
1211
1212 EXPECT_EQ(mapper0_count, 3u);
1213 EXPECT_EQ(mapper1_count, 0u);
1214
1215 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1216 EXPECT_EQ(output0[0].monotonic_event_time.time,
1217 e + chrono::milliseconds(1000));
1218 EXPECT_TRUE(output0[0].data != nullptr);
1219
1220 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1221 EXPECT_EQ(output0[1].monotonic_event_time.time,
1222 e + chrono::milliseconds(3000));
1223 EXPECT_TRUE(output0[1].data != nullptr);
1224 }
1225
1226 {
1227 SCOPED_TRACE("Trying node1 now");
1228 std::deque<TimestampedMessage> output1;
1229
1230 EXPECT_EQ(mapper0_count, 3u);
1231 EXPECT_EQ(mapper1_count, 0u);
1232
1233 ASSERT_TRUE(mapper1.Front() != nullptr);
1234 EXPECT_EQ(mapper0_count, 3u);
1235 EXPECT_EQ(mapper1_count, 1u);
1236 output1.emplace_back(std::move(*mapper1.Front()));
1237 mapper1.PopFront();
1238 EXPECT_TRUE(mapper1.started());
1239 EXPECT_EQ(mapper0_count, 3u);
1240 EXPECT_EQ(mapper1_count, 1u);
1241
1242 // mapper1_count is now at 3 since the second message is not queued, but
1243 // timestamp_callback needs to be called everytime even if Front() does not
1244 // provide a message due to the replay_channels_callback.
1245 ASSERT_TRUE(mapper1.Front() != nullptr);
1246 output1.emplace_back(std::move(*mapper1.Front()));
1247 mapper1.PopFront();
1248 EXPECT_TRUE(mapper1.started());
1249
1250 EXPECT_EQ(mapper0_count, 3u);
1251 EXPECT_EQ(mapper1_count, 3u);
1252
1253 ASSERT_TRUE(mapper1.Front() == nullptr);
1254
1255 EXPECT_EQ(mapper0_count, 3u);
1256 EXPECT_EQ(mapper1_count, 3u);
1257
1258 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1259 EXPECT_EQ(output1[0].monotonic_event_time.time,
1260 e + chrono::seconds(100) + chrono::milliseconds(1000));
1261 EXPECT_TRUE(output1[0].data != nullptr);
1262
1263 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1264 EXPECT_EQ(output1[1].monotonic_event_time.time,
1265 e + chrono::seconds(100) + chrono::milliseconds(3000));
1266 EXPECT_TRUE(output1[1].data != nullptr);
1267 }
1268}
Austin Schuh8bf1e632021-01-02 22:41:04 -08001269// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
1270// returned.
1271TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
1272 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1273 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001274 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001275 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001276 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001277 writer1.QueueSpan(config4_.span());
1278
Austin Schuhd863e6e2022-10-16 15:44:50 -07001279 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001280 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001281 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001282 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
1283 e + chrono::nanoseconds(971)));
1284
Austin Schuhd863e6e2022-10-16 15:44:50 -07001285 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001286 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001287 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001288 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
1289 e + chrono::nanoseconds(5458)));
1290
Austin Schuhd863e6e2022-10-16 15:44:50 -07001291 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001292 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001293 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001294 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1295 }
1296
1297 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001298 LogFilesContainer log_files(parts);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001299 ASSERT_EQ(parts.size(), 1u);
1300
Austin Schuh79b30942021-01-24 22:32:21 -08001301 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001302 TimestampMapper mapper0("pi1", log_files,
1303 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001304 mapper0.set_timestamp_callback(
1305 [&](TimestampedMessage *) { ++mapper0_count; });
1306 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001307 TimestampMapper mapper1("pi2", log_files,
1308 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001309 mapper1.set_timestamp_callback(
1310 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -08001311
1312 mapper0.AddPeer(&mapper1);
1313 mapper1.AddPeer(&mapper0);
1314
1315 {
1316 std::deque<TimestampedMessage> output0;
1317
1318 for (int i = 0; i < 3; ++i) {
1319 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
1320 output0.emplace_back(std::move(*mapper0.Front()));
1321 mapper0.PopFront();
1322 }
1323
1324 ASSERT_TRUE(mapper0.Front() == nullptr);
1325
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001326 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1327 EXPECT_EQ(output0[0].monotonic_event_time.time,
1328 e + chrono::milliseconds(1000));
1329 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
1330 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
1331 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001332 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001333
1334 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1335 EXPECT_EQ(output0[1].monotonic_event_time.time,
1336 e + chrono::milliseconds(2000));
1337 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
1338 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
1339 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001340 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001341
1342 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1343 EXPECT_EQ(output0[2].monotonic_event_time.time,
1344 e + chrono::milliseconds(3000));
1345 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
1346 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
1347 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001348 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001349 }
1350
1351 {
1352 SCOPED_TRACE("Trying node1 now");
1353 std::deque<TimestampedMessage> output1;
1354
1355 for (int i = 0; i < 3; ++i) {
1356 ASSERT_TRUE(mapper1.Front() != nullptr);
1357 output1.emplace_back(std::move(*mapper1.Front()));
1358 mapper1.PopFront();
1359 }
1360
1361 ASSERT_TRUE(mapper1.Front() == nullptr);
1362
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001363 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1364 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001365 e + chrono::seconds(100) + chrono::milliseconds(1000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001366 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
1367 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001368 e + chrono::nanoseconds(971));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001369 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001370
1371 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1372 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001373 e + chrono::seconds(100) + chrono::milliseconds(2000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001374 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
1375 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001376 e + chrono::nanoseconds(5458));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001377 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001378
1379 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1380 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001381 e + chrono::seconds(100) + chrono::milliseconds(3000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001382 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
1383 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
1384 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001385 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001386 }
Austin Schuh79b30942021-01-24 22:32:21 -08001387
1388 EXPECT_EQ(mapper0_count, 3u);
1389 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001390}
1391
Austin Schuhd2f96102020-12-01 20:27:29 -08001392// Tests that we can match timestamps on delivered messages. By doing this in
1393// the reverse order, the second node needs to queue data up from the first node
1394// to find the matching timestamp.
1395TEST_F(TimestampMapperTest, ReadNode1First) {
1396 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1397 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001398 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001399 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001400 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001401 writer1.QueueSpan(config2_.span());
1402
Austin Schuhd863e6e2022-10-16 15:44:50 -07001403 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001404 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001405 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001406 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1407
Austin Schuhd863e6e2022-10-16 15:44:50 -07001408 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001409 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001410 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001411 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1412
Austin Schuhd863e6e2022-10-16 15:44:50 -07001413 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001414 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001415 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001416 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1417 }
1418
1419 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001420 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001421
1422 ASSERT_EQ(parts[0].logger_node, "pi1");
1423 ASSERT_EQ(parts[1].logger_node, "pi2");
1424
Austin Schuh79b30942021-01-24 22:32:21 -08001425 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001426 TimestampMapper mapper0("pi1", log_files,
1427 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001428 mapper0.set_timestamp_callback(
1429 [&](TimestampedMessage *) { ++mapper0_count; });
1430 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001431 TimestampMapper mapper1("pi2", log_files,
1432 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001433 mapper1.set_timestamp_callback(
1434 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001435
1436 mapper0.AddPeer(&mapper1);
1437 mapper1.AddPeer(&mapper0);
1438
1439 {
1440 SCOPED_TRACE("Trying node1 now");
1441 std::deque<TimestampedMessage> output1;
1442
1443 ASSERT_TRUE(mapper1.Front() != nullptr);
1444 output1.emplace_back(std::move(*mapper1.Front()));
1445 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001446 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001447
1448 ASSERT_TRUE(mapper1.Front() != nullptr);
1449 output1.emplace_back(std::move(*mapper1.Front()));
1450 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001451 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001452
1453 ASSERT_TRUE(mapper1.Front() != nullptr);
1454 output1.emplace_back(std::move(*mapper1.Front()));
1455 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001456 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001457
1458 ASSERT_TRUE(mapper1.Front() == nullptr);
1459
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001460 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1461 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001462 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001463 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001464
1465 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1466 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001467 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001468 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001469
1470 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1471 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001472 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001473 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001474 }
1475
1476 {
1477 std::deque<TimestampedMessage> output0;
1478
1479 ASSERT_TRUE(mapper0.Front() != nullptr);
1480 output0.emplace_back(std::move(*mapper0.Front()));
1481 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001482 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001483
1484 ASSERT_TRUE(mapper0.Front() != nullptr);
1485 output0.emplace_back(std::move(*mapper0.Front()));
1486 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001487 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001488
1489 ASSERT_TRUE(mapper0.Front() != nullptr);
1490 output0.emplace_back(std::move(*mapper0.Front()));
1491 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001492 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001493
1494 ASSERT_TRUE(mapper0.Front() == nullptr);
1495
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001496 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1497 EXPECT_EQ(output0[0].monotonic_event_time.time,
1498 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001499 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001500
1501 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1502 EXPECT_EQ(output0[1].monotonic_event_time.time,
1503 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001504 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001505
1506 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1507 EXPECT_EQ(output0[2].monotonic_event_time.time,
1508 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001509 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001510 }
Austin Schuh79b30942021-01-24 22:32:21 -08001511
1512 EXPECT_EQ(mapper0_count, 3u);
1513 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001514}
1515
1516// Tests that we return just the timestamps if we couldn't find the data and the
1517// missing data was at the beginning of the file.
1518TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1519 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1520 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001521 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001522 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001523 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001524 writer1.QueueSpan(config2_.span());
1525
1526 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001527 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001528 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1529
Austin Schuhd863e6e2022-10-16 15:44:50 -07001530 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001531 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001532 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001533 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1534
Austin Schuhd863e6e2022-10-16 15:44:50 -07001535 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001536 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001537 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001538 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1539 }
1540
1541 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001542 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001543
1544 ASSERT_EQ(parts[0].logger_node, "pi1");
1545 ASSERT_EQ(parts[1].logger_node, "pi2");
1546
Austin Schuh79b30942021-01-24 22:32:21 -08001547 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001548 TimestampMapper mapper0("pi1", log_files,
1549 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001550 mapper0.set_timestamp_callback(
1551 [&](TimestampedMessage *) { ++mapper0_count; });
1552 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001553 TimestampMapper mapper1("pi2", log_files,
1554 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001555 mapper1.set_timestamp_callback(
1556 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001557
1558 mapper0.AddPeer(&mapper1);
1559 mapper1.AddPeer(&mapper0);
1560
1561 {
1562 SCOPED_TRACE("Trying node1 now");
1563 std::deque<TimestampedMessage> output1;
1564
1565 ASSERT_TRUE(mapper1.Front() != nullptr);
1566 output1.emplace_back(std::move(*mapper1.Front()));
1567 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001568 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001569
1570 ASSERT_TRUE(mapper1.Front() != nullptr);
1571 output1.emplace_back(std::move(*mapper1.Front()));
1572 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001573 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001574
1575 ASSERT_TRUE(mapper1.Front() != nullptr);
1576 output1.emplace_back(std::move(*mapper1.Front()));
1577 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001578 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001579
1580 ASSERT_TRUE(mapper1.Front() == nullptr);
1581
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001582 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1583 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001584 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001585 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001586
1587 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1588 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001589 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001590 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001591
1592 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1593 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001594 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001595 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001596 }
Austin Schuh79b30942021-01-24 22:32:21 -08001597
1598 EXPECT_EQ(mapper0_count, 0u);
1599 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001600}
1601
1602// Tests that we return just the timestamps if we couldn't find the data and the
1603// missing data was at the end of the file.
1604TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1605 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1606 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001607 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001608 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001609 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001610 writer1.QueueSpan(config2_.span());
1611
Austin Schuhd863e6e2022-10-16 15:44:50 -07001612 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001613 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001614 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001615 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1616
Austin Schuhd863e6e2022-10-16 15:44:50 -07001617 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001618 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001619 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001620 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1621
1622 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001623 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001624 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1625 }
1626
1627 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001628 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001629
1630 ASSERT_EQ(parts[0].logger_node, "pi1");
1631 ASSERT_EQ(parts[1].logger_node, "pi2");
1632
Austin Schuh79b30942021-01-24 22:32:21 -08001633 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001634 TimestampMapper mapper0("pi1", log_files,
1635 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001636 mapper0.set_timestamp_callback(
1637 [&](TimestampedMessage *) { ++mapper0_count; });
1638 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001639 TimestampMapper mapper1("pi2", log_files,
1640 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001641 mapper1.set_timestamp_callback(
1642 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001643
1644 mapper0.AddPeer(&mapper1);
1645 mapper1.AddPeer(&mapper0);
1646
1647 {
1648 SCOPED_TRACE("Trying node1 now");
1649 std::deque<TimestampedMessage> output1;
1650
1651 ASSERT_TRUE(mapper1.Front() != nullptr);
1652 output1.emplace_back(std::move(*mapper1.Front()));
1653 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001654 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001655
1656 ASSERT_TRUE(mapper1.Front() != nullptr);
1657 output1.emplace_back(std::move(*mapper1.Front()));
1658 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001659 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001660
1661 ASSERT_TRUE(mapper1.Front() != nullptr);
1662 output1.emplace_back(std::move(*mapper1.Front()));
1663 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001664 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001665
1666 ASSERT_TRUE(mapper1.Front() == nullptr);
1667
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001668 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1669 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001670 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001671 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001672
1673 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1674 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001675 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001676 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001677
1678 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1679 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001680 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001681 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001682 }
Austin Schuh79b30942021-01-24 22:32:21 -08001683
1684 EXPECT_EQ(mapper0_count, 0u);
1685 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001686}
1687
Austin Schuh993ccb52020-12-12 15:59:32 -08001688// Tests that we handle a message which failed to forward or be logged.
1689TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1690 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1691 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001692 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001693 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001694 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001695 writer1.QueueSpan(config2_.span());
1696
Austin Schuhd863e6e2022-10-16 15:44:50 -07001697 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001698 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001699 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001700 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1701
1702 // Create both the timestamp and message, but don't log them, simulating a
1703 // forwarding drop.
1704 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1705 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1706 chrono::seconds(100));
1707
Austin Schuhd863e6e2022-10-16 15:44:50 -07001708 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001709 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001710 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001711 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1712 }
1713
1714 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001715 LogFilesContainer log_files(parts);
Austin Schuh993ccb52020-12-12 15:59:32 -08001716
1717 ASSERT_EQ(parts[0].logger_node, "pi1");
1718 ASSERT_EQ(parts[1].logger_node, "pi2");
1719
Austin Schuh79b30942021-01-24 22:32:21 -08001720 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001721 TimestampMapper mapper0("pi1", log_files,
1722 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001723 mapper0.set_timestamp_callback(
1724 [&](TimestampedMessage *) { ++mapper0_count; });
1725 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001726 TimestampMapper mapper1("pi2", log_files,
1727 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001728 mapper1.set_timestamp_callback(
1729 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001730
1731 mapper0.AddPeer(&mapper1);
1732 mapper1.AddPeer(&mapper0);
1733
1734 {
1735 std::deque<TimestampedMessage> output1;
1736
1737 ASSERT_TRUE(mapper1.Front() != nullptr);
1738 output1.emplace_back(std::move(*mapper1.Front()));
1739 mapper1.PopFront();
1740
1741 ASSERT_TRUE(mapper1.Front() != nullptr);
1742 output1.emplace_back(std::move(*mapper1.Front()));
1743
1744 ASSERT_FALSE(mapper1.Front() == nullptr);
1745
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001746 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1747 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001748 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001749 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001750
1751 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1752 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001753 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001754 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh993ccb52020-12-12 15:59:32 -08001755 }
Austin Schuh79b30942021-01-24 22:32:21 -08001756
1757 EXPECT_EQ(mapper0_count, 0u);
1758 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001759}
1760
Austin Schuhd2f96102020-12-01 20:27:29 -08001761// Tests that we properly sort log files with duplicate timestamps.
1762TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1763 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1764 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001765 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001766 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001767 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001768 writer1.QueueSpan(config2_.span());
1769
Austin Schuhd863e6e2022-10-16 15:44:50 -07001770 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001771 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001772 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001773 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1774
Austin Schuhd863e6e2022-10-16 15:44:50 -07001775 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001776 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001777 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001778 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1779
Austin Schuhd863e6e2022-10-16 15:44:50 -07001780 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001781 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001782 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001783 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1784
Austin Schuhd863e6e2022-10-16 15:44:50 -07001785 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001786 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001787 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001788 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1789 }
1790
1791 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001792 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001793
1794 ASSERT_EQ(parts[0].logger_node, "pi1");
1795 ASSERT_EQ(parts[1].logger_node, "pi2");
1796
Austin Schuh79b30942021-01-24 22:32:21 -08001797 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001798 TimestampMapper mapper0("pi1", log_files,
1799 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001800 mapper0.set_timestamp_callback(
1801 [&](TimestampedMessage *) { ++mapper0_count; });
1802 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001803 TimestampMapper mapper1("pi2", log_files,
1804 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001805 mapper1.set_timestamp_callback(
1806 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001807
1808 mapper0.AddPeer(&mapper1);
1809 mapper1.AddPeer(&mapper0);
1810
1811 {
1812 SCOPED_TRACE("Trying node1 now");
1813 std::deque<TimestampedMessage> output1;
1814
1815 for (int i = 0; i < 4; ++i) {
1816 ASSERT_TRUE(mapper1.Front() != nullptr);
1817 output1.emplace_back(std::move(*mapper1.Front()));
1818 mapper1.PopFront();
1819 }
1820 ASSERT_TRUE(mapper1.Front() == nullptr);
1821
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001822 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1823 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001824 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001825 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001826
1827 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1828 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001829 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001830 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001831
1832 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1833 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001834 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001835 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001836
1837 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1838 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001839 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001840 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001841 }
Austin Schuh79b30942021-01-24 22:32:21 -08001842
1843 EXPECT_EQ(mapper0_count, 0u);
1844 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001845}
1846
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001847// Tests that we properly produce a valid start time.
Austin Schuhd2f96102020-12-01 20:27:29 -08001848TEST_F(TimestampMapperTest, StartTime) {
1849 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1850 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001851 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001852 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001853 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001854 writer1.QueueSpan(config1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001855 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001856 writer2.QueueSpan(config3_.span());
1857 }
1858
1859 const std::vector<LogFile> parts =
1860 SortParts({logfile0_, logfile1_, logfile2_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001861 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001862
Austin Schuh79b30942021-01-24 22:32:21 -08001863 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001864 TimestampMapper mapper0("pi1", log_files,
1865 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001866 mapper0.set_timestamp_callback(
1867 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001868
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001869 EXPECT_EQ(mapper0.monotonic_start_time(0), e + chrono::milliseconds(1));
1870 EXPECT_EQ(mapper0.realtime_start_time(0),
Austin Schuhd2f96102020-12-01 20:27:29 -08001871 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001872 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001873}
1874
Austin Schuhfecf1d82020-12-19 16:57:28 -08001875// Tests that when a peer isn't registered, we treat that as if there was no
1876// data available.
1877TEST_F(TimestampMapperTest, NoPeer) {
1878 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1879 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001880 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001881 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001882 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001883 writer1.QueueSpan(config2_.span());
1884
1885 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001886 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001887 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1888
Austin Schuhd863e6e2022-10-16 15:44:50 -07001889 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001890 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001891 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001892 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1893
Austin Schuhd863e6e2022-10-16 15:44:50 -07001894 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001895 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001896 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001897 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1898 }
1899
1900 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001901 LogFilesContainer log_files(parts);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001902
1903 ASSERT_EQ(parts[0].logger_node, "pi1");
1904 ASSERT_EQ(parts[1].logger_node, "pi2");
1905
Austin Schuh79b30942021-01-24 22:32:21 -08001906 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001907 TimestampMapper mapper1("pi2", log_files,
1908 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001909 mapper1.set_timestamp_callback(
1910 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001911
1912 {
1913 std::deque<TimestampedMessage> output1;
1914
1915 ASSERT_TRUE(mapper1.Front() != nullptr);
1916 output1.emplace_back(std::move(*mapper1.Front()));
1917 mapper1.PopFront();
1918 ASSERT_TRUE(mapper1.Front() != nullptr);
1919 output1.emplace_back(std::move(*mapper1.Front()));
1920 mapper1.PopFront();
1921 ASSERT_TRUE(mapper1.Front() != nullptr);
1922 output1.emplace_back(std::move(*mapper1.Front()));
1923 mapper1.PopFront();
1924 ASSERT_TRUE(mapper1.Front() == nullptr);
1925
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001926 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1927 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001928 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001929 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001930
1931 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1932 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001933 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001934 EXPECT_FALSE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001935
1936 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1937 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001938 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001939 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001940 }
Austin Schuh79b30942021-01-24 22:32:21 -08001941 EXPECT_EQ(mapper1_count, 3u);
1942}
1943
1944// Tests that we can queue messages and call the timestamp callback for both
1945// nodes.
1946TEST_F(TimestampMapperTest, QueueUntilNode0) {
1947 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1948 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001949 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh79b30942021-01-24 22:32:21 -08001950 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001951 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh79b30942021-01-24 22:32:21 -08001952 writer1.QueueSpan(config2_.span());
1953
Austin Schuhd863e6e2022-10-16 15:44:50 -07001954 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001955 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001956 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001957 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1958
Austin Schuhd863e6e2022-10-16 15:44:50 -07001959 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001960 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001961 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001962 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1963
Austin Schuhd863e6e2022-10-16 15:44:50 -07001964 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001965 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001966 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001967 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1968
Austin Schuhd863e6e2022-10-16 15:44:50 -07001969 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001970 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001971 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001972 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1973 }
1974
1975 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001976 LogFilesContainer log_files(parts);
Austin Schuh79b30942021-01-24 22:32:21 -08001977
1978 ASSERT_EQ(parts[0].logger_node, "pi1");
1979 ASSERT_EQ(parts[1].logger_node, "pi2");
1980
1981 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001982 TimestampMapper mapper0("pi1", log_files,
1983 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001984 mapper0.set_timestamp_callback(
1985 [&](TimestampedMessage *) { ++mapper0_count; });
1986 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001987 TimestampMapper mapper1("pi2", log_files,
1988 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001989 mapper1.set_timestamp_callback(
1990 [&](TimestampedMessage *) { ++mapper1_count; });
1991
1992 mapper0.AddPeer(&mapper1);
1993 mapper1.AddPeer(&mapper0);
1994
1995 {
1996 std::deque<TimestampedMessage> output0;
1997
1998 EXPECT_EQ(mapper0_count, 0u);
1999 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002000 mapper0.QueueUntil(
2001 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08002002 EXPECT_EQ(mapper0_count, 3u);
2003 EXPECT_EQ(mapper1_count, 0u);
2004
2005 ASSERT_TRUE(mapper0.Front() != nullptr);
2006 EXPECT_EQ(mapper0_count, 3u);
2007 EXPECT_EQ(mapper1_count, 0u);
2008
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002009 mapper0.QueueUntil(
2010 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08002011 EXPECT_EQ(mapper0_count, 3u);
2012 EXPECT_EQ(mapper1_count, 0u);
2013
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002014 mapper0.QueueUntil(
2015 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08002016 EXPECT_EQ(mapper0_count, 4u);
2017 EXPECT_EQ(mapper1_count, 0u);
2018
2019 output0.emplace_back(std::move(*mapper0.Front()));
2020 mapper0.PopFront();
2021 output0.emplace_back(std::move(*mapper0.Front()));
2022 mapper0.PopFront();
2023 output0.emplace_back(std::move(*mapper0.Front()));
2024 mapper0.PopFront();
2025 output0.emplace_back(std::move(*mapper0.Front()));
2026 mapper0.PopFront();
2027
2028 EXPECT_EQ(mapper0_count, 4u);
2029 EXPECT_EQ(mapper1_count, 0u);
2030
2031 ASSERT_TRUE(mapper0.Front() == nullptr);
2032
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002033 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2034 EXPECT_EQ(output0[0].monotonic_event_time.time,
2035 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002036 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002037
2038 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2039 EXPECT_EQ(output0[1].monotonic_event_time.time,
2040 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002041 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002042
2043 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2044 EXPECT_EQ(output0[2].monotonic_event_time.time,
2045 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002046 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002047
2048 EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
2049 EXPECT_EQ(output0[3].monotonic_event_time.time,
2050 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002051 EXPECT_TRUE(output0[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08002052 }
2053
2054 {
2055 SCOPED_TRACE("Trying node1 now");
2056 std::deque<TimestampedMessage> output1;
2057
2058 EXPECT_EQ(mapper0_count, 4u);
2059 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002060 mapper1.QueueUntil(BootTimestamp{
2061 .boot = 0,
2062 .time = e + chrono::seconds(100) + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08002063 EXPECT_EQ(mapper0_count, 4u);
2064 EXPECT_EQ(mapper1_count, 3u);
2065
2066 ASSERT_TRUE(mapper1.Front() != nullptr);
2067 EXPECT_EQ(mapper0_count, 4u);
2068 EXPECT_EQ(mapper1_count, 3u);
2069
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002070 mapper1.QueueUntil(BootTimestamp{
2071 .boot = 0,
2072 .time = e + chrono::seconds(100) + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08002073 EXPECT_EQ(mapper0_count, 4u);
2074 EXPECT_EQ(mapper1_count, 3u);
2075
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002076 mapper1.QueueUntil(BootTimestamp{
2077 .boot = 0,
2078 .time = e + chrono::seconds(100) + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08002079 EXPECT_EQ(mapper0_count, 4u);
2080 EXPECT_EQ(mapper1_count, 4u);
2081
2082 ASSERT_TRUE(mapper1.Front() != nullptr);
2083 EXPECT_EQ(mapper0_count, 4u);
2084 EXPECT_EQ(mapper1_count, 4u);
2085
2086 output1.emplace_back(std::move(*mapper1.Front()));
2087 mapper1.PopFront();
2088 ASSERT_TRUE(mapper1.Front() != nullptr);
2089 output1.emplace_back(std::move(*mapper1.Front()));
2090 mapper1.PopFront();
2091 ASSERT_TRUE(mapper1.Front() != nullptr);
2092 output1.emplace_back(std::move(*mapper1.Front()));
2093 mapper1.PopFront();
2094 ASSERT_TRUE(mapper1.Front() != nullptr);
2095 output1.emplace_back(std::move(*mapper1.Front()));
2096 mapper1.PopFront();
2097
2098 EXPECT_EQ(mapper0_count, 4u);
2099 EXPECT_EQ(mapper1_count, 4u);
2100
2101 ASSERT_TRUE(mapper1.Front() == nullptr);
2102
2103 EXPECT_EQ(mapper0_count, 4u);
2104 EXPECT_EQ(mapper1_count, 4u);
2105
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002106 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2107 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002108 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002109 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002110
2111 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
2112 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002113 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002114 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002115
2116 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
2117 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002118 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002119 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002120
2121 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
2122 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002123 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002124 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08002125 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08002126}
2127
Philipp Schrader416505b2024-03-28 11:59:45 -07002128// Validates that we can read timestamps on startup even for single-node logs.
2129TEST_F(SingleNodeTimestampMapperTest, QueueTimestampsForSingleNodes) {
2130 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2131 {
2132 TestDetachedBufferWriter writer0(logfile0_);
2133 writer0.QueueSpan(config0_.span());
2134
2135 writer0.WriteSizedFlatbuffer(
2136 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
2137 writer0.WriteSizedFlatbuffer(
2138 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
2139 writer0.WriteSizedFlatbuffer(
2140 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
2141 writer0.WriteSizedFlatbuffer(
2142 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
2143 }
2144
2145 const std::vector<LogFile> parts = SortParts({logfile0_});
2146 LogFilesContainer log_files(parts);
2147
2148 ASSERT_EQ(parts[0].logger_node, "pi1");
2149
2150 size_t mapper0_count = 0;
2151 TimestampMapper mapper0("pi1", log_files,
2152 TimestampQueueStrategy::kQueueTimestampsAtStartup);
2153 mapper0.set_timestamp_callback(
2154 [&](TimestampedMessage *) { ++mapper0_count; });
2155 mapper0.QueueTimestamps();
2156
2157 for (int i = 0; i < 4; ++i) {
2158 ASSERT_TRUE(mapper0.Front() != nullptr);
2159 mapper0.PopFront();
2160 }
2161 EXPECT_TRUE(mapper0.Front() == nullptr);
2162 EXPECT_EQ(mapper0_count, 4u);
2163}
2164
2165class BootMergerTest : public SortingElementTest<> {
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002166 public:
2167 BootMergerTest()
2168 : SortingElementTest(),
2169 boot0_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002170 /* 100ms */
2171 "max_out_of_order_duration": 100000000,
2172 "node": {
2173 "name": "pi2"
2174 },
2175 "logger_node": {
2176 "name": "pi1"
2177 },
2178 "monotonic_start_time": 1000000,
2179 "realtime_start_time": 1000000000000,
2180 "logger_monotonic_start_time": 1000000,
2181 "logger_realtime_start_time": 1000000000000,
2182 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2183 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
2184 "parts_index": 0,
2185 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2186 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002187 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2188 "boot_uuids": [
2189 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2190 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2191 ""
2192 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002193})")),
2194 boot1_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002195 /* 100ms */
2196 "max_out_of_order_duration": 100000000,
2197 "node": {
2198 "name": "pi2"
2199 },
2200 "logger_node": {
2201 "name": "pi1"
2202 },
2203 "monotonic_start_time": 1000000,
2204 "realtime_start_time": 1000000000000,
2205 "logger_monotonic_start_time": 1000000,
2206 "logger_realtime_start_time": 1000000000000,
2207 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2208 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
2209 "parts_index": 1,
2210 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2211 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002212 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2213 "boot_uuids": [
2214 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2215 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2216 ""
2217 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002218})")) {}
Austin Schuh8bee6882021-06-28 21:03:28 -07002219
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002220 protected:
2221 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
2222 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
2223};
2224
2225// This tests that we can properly sort a multi-node log file which has the old
2226// (and buggy) timestamps in the header, and the non-resetting parts_index.
2227// These make it so we can just bairly figure out what happened first and what
2228// happened second, but not in a way that is robust to multiple nodes rebooting.
2229TEST_F(BootMergerTest, OldReboot) {
Austin Schuh8bee6882021-06-28 21:03:28 -07002230 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002231 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002232 writer.QueueSpan(boot0_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002233 }
2234 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002235 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002236 writer.QueueSpan(boot1_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002237 }
2238
2239 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
2240
2241 ASSERT_EQ(parts.size(), 1u);
2242 ASSERT_EQ(parts[0].parts.size(), 2u);
2243
2244 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
2245 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002246 boot0_.message().source_node_boot_uuid()->string_view());
Austin Schuh8bee6882021-06-28 21:03:28 -07002247
2248 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
2249 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002250 boot1_.message().source_node_boot_uuid()->string_view());
2251}
2252
2253// This tests that we can produce messages ordered across a reboot.
2254TEST_F(BootMergerTest, SortAcrossReboot) {
2255 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2256 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002257 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002258 writer.QueueSpan(boot0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002259 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002260 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002261 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002262 MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
2263 }
2264 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002265 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002266 writer.QueueSpan(boot1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002267 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002268 MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002269 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002270 MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
2271 }
2272
2273 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07002274 LogFilesContainer log_files(parts);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002275 ASSERT_EQ(parts.size(), 1u);
2276 ASSERT_EQ(parts[0].parts.size(), 2u);
2277
Austin Schuh63097262023-08-16 17:04:29 -07002278 BootMerger merger("pi2", log_files,
2279 {StoredDataType::DATA, StoredDataType::TIMESTAMPS,
2280 StoredDataType::REMOTE_TIMESTAMPS});
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002281
2282 EXPECT_EQ(merger.node(), 1u);
2283
2284 std::vector<Message> output;
2285 for (int i = 0; i < 4; ++i) {
2286 ASSERT_TRUE(merger.Front() != nullptr);
2287 output.emplace_back(std::move(*merger.Front()));
2288 merger.PopFront();
2289 }
2290
2291 ASSERT_TRUE(merger.Front() == nullptr);
2292
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002293 EXPECT_EQ(output[0].timestamp.boot, 0u);
2294 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
2295 EXPECT_EQ(output[1].timestamp.boot, 0u);
2296 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(2000));
2297
2298 EXPECT_EQ(output[2].timestamp.boot, 1u);
2299 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(100));
2300 EXPECT_EQ(output[3].timestamp.boot, 1u);
2301 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
Austin Schuh8bee6882021-06-28 21:03:28 -07002302}
2303
Philipp Schrader416505b2024-03-28 11:59:45 -07002304class RebootTimestampMapperTest : public SortingElementTest<> {
Austin Schuh48507722021-07-17 17:29:24 -07002305 public:
2306 RebootTimestampMapperTest()
2307 : SortingElementTest(),
2308 boot0a_(MakeHeader(config_, R"({
2309 /* 100ms */
2310 "max_out_of_order_duration": 100000000,
2311 "node": {
2312 "name": "pi1"
2313 },
2314 "logger_node": {
2315 "name": "pi1"
2316 },
2317 "monotonic_start_time": 1000000,
2318 "realtime_start_time": 1000000000000,
2319 "logger_monotonic_start_time": 1000000,
2320 "logger_realtime_start_time": 1000000000000,
2321 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2322 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2323 "parts_index": 0,
2324 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2325 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2326 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2327 "boot_uuids": [
2328 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2329 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2330 ""
2331 ]
2332})")),
2333 boot0b_(MakeHeader(config_, R"({
2334 /* 100ms */
2335 "max_out_of_order_duration": 100000000,
2336 "node": {
2337 "name": "pi1"
2338 },
2339 "logger_node": {
2340 "name": "pi1"
2341 },
2342 "monotonic_start_time": 1000000,
2343 "realtime_start_time": 1000000000000,
2344 "logger_monotonic_start_time": 1000000,
2345 "logger_realtime_start_time": 1000000000000,
2346 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2347 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2348 "parts_index": 1,
2349 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2350 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2351 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2352 "boot_uuids": [
2353 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2354 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2355 ""
2356 ]
2357})")),
2358 boot1a_(MakeHeader(config_, R"({
2359 /* 100ms */
2360 "max_out_of_order_duration": 100000000,
2361 "node": {
2362 "name": "pi2"
2363 },
2364 "logger_node": {
2365 "name": "pi1"
2366 },
2367 "monotonic_start_time": 1000000,
2368 "realtime_start_time": 1000000000000,
2369 "logger_monotonic_start_time": 1000000,
2370 "logger_realtime_start_time": 1000000000000,
2371 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2372 "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
2373 "parts_index": 0,
2374 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2375 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2376 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2377 "boot_uuids": [
2378 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2379 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2380 ""
2381 ]
2382})")),
2383 boot1b_(MakeHeader(config_, R"({
2384 /* 100ms */
2385 "max_out_of_order_duration": 100000000,
2386 "node": {
2387 "name": "pi2"
2388 },
2389 "logger_node": {
2390 "name": "pi1"
2391 },
2392 "monotonic_start_time": 1000000,
2393 "realtime_start_time": 1000000000000,
2394 "logger_monotonic_start_time": 1000000,
2395 "logger_realtime_start_time": 1000000000000,
2396 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2397 "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
2398 "parts_index": 1,
2399 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2400 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2401 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2402 "boot_uuids": [
2403 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2404 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2405 ""
2406 ]
2407})")) {}
2408
2409 protected:
2410 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
2411 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
2412 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
2413 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
2414};
2415
Austin Schuh48507722021-07-17 17:29:24 -07002416// Tests that we can match timestamps on delivered messages in the presence of
2417// reboots on the node receiving timestamps.
2418TEST_F(RebootTimestampMapperTest, ReadNode0First) {
2419 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2420 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002421 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002422 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002423 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002424 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002425 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002426 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002427 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002428 writer1b.QueueSpan(boot1b_.span());
2429
Austin Schuhd863e6e2022-10-16 15:44:50 -07002430 writer0a.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002431 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002432 writer1a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002433 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
2434 e + chrono::milliseconds(1001)));
2435
Austin Schuhd863e6e2022-10-16 15:44:50 -07002436 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh58646e22021-08-23 23:51:46 -07002437 e + chrono::milliseconds(1000), 0, chrono::seconds(21),
2438 e + chrono::milliseconds(2001)));
2439
Austin Schuhd863e6e2022-10-16 15:44:50 -07002440 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002441 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002442 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002443 e + chrono::milliseconds(2000), 0, chrono::seconds(20),
2444 e + chrono::milliseconds(2001)));
2445
Austin Schuhd863e6e2022-10-16 15:44:50 -07002446 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002447 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002448 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002449 e + chrono::milliseconds(3000), 0, chrono::seconds(20),
2450 e + chrono::milliseconds(3001)));
2451 }
2452
Austin Schuh58646e22021-08-23 23:51:46 -07002453 const std::vector<LogFile> parts =
2454 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07002455 LogFilesContainer log_files(parts);
Austin Schuh48507722021-07-17 17:29:24 -07002456
2457 for (const auto &x : parts) {
2458 LOG(INFO) << x;
2459 }
2460 ASSERT_EQ(parts.size(), 1u);
2461 ASSERT_EQ(parts[0].logger_node, "pi1");
2462
2463 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07002464 TimestampMapper mapper0("pi1", log_files,
2465 TimestampQueueStrategy::kQueueTogether);
Austin Schuh48507722021-07-17 17:29:24 -07002466 mapper0.set_timestamp_callback(
2467 [&](TimestampedMessage *) { ++mapper0_count; });
2468 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07002469 TimestampMapper mapper1("pi2", log_files,
2470 TimestampQueueStrategy::kQueueTogether);
Austin Schuh48507722021-07-17 17:29:24 -07002471 mapper1.set_timestamp_callback(
2472 [&](TimestampedMessage *) { ++mapper1_count; });
2473
2474 mapper0.AddPeer(&mapper1);
2475 mapper1.AddPeer(&mapper0);
2476
2477 {
2478 std::deque<TimestampedMessage> output0;
2479
2480 EXPECT_EQ(mapper0_count, 0u);
2481 EXPECT_EQ(mapper1_count, 0u);
2482 ASSERT_TRUE(mapper0.Front() != nullptr);
2483 EXPECT_EQ(mapper0_count, 1u);
2484 EXPECT_EQ(mapper1_count, 0u);
2485 output0.emplace_back(std::move(*mapper0.Front()));
2486 mapper0.PopFront();
2487 EXPECT_TRUE(mapper0.started());
2488 EXPECT_EQ(mapper0_count, 1u);
2489 EXPECT_EQ(mapper1_count, 0u);
2490
2491 ASSERT_TRUE(mapper0.Front() != nullptr);
2492 EXPECT_EQ(mapper0_count, 2u);
2493 EXPECT_EQ(mapper1_count, 0u);
2494 output0.emplace_back(std::move(*mapper0.Front()));
2495 mapper0.PopFront();
2496 EXPECT_TRUE(mapper0.started());
2497
2498 ASSERT_TRUE(mapper0.Front() != nullptr);
2499 output0.emplace_back(std::move(*mapper0.Front()));
2500 mapper0.PopFront();
2501 EXPECT_TRUE(mapper0.started());
2502
2503 EXPECT_EQ(mapper0_count, 3u);
2504 EXPECT_EQ(mapper1_count, 0u);
2505
2506 ASSERT_TRUE(mapper0.Front() == nullptr);
2507
2508 LOG(INFO) << output0[0];
2509 LOG(INFO) << output0[1];
2510 LOG(INFO) << output0[2];
2511
2512 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2513 EXPECT_EQ(output0[0].monotonic_event_time.time,
2514 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002515 EXPECT_EQ(output0[0].queue_index,
2516 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002517 EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
2518 EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002519 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002520
2521 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2522 EXPECT_EQ(output0[1].monotonic_event_time.time,
2523 e + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002524 EXPECT_EQ(output0[1].queue_index,
2525 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002526 EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
2527 EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002528 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002529
2530 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2531 EXPECT_EQ(output0[2].monotonic_event_time.time,
2532 e + chrono::milliseconds(3000));
Austin Schuh58646e22021-08-23 23:51:46 -07002533 EXPECT_EQ(output0[2].queue_index,
2534 (BootQueueIndex{.boot = 0u, .index = 2u}));
Austin Schuh48507722021-07-17 17:29:24 -07002535 EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
2536 EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002537 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002538 }
2539
2540 {
2541 SCOPED_TRACE("Trying node1 now");
2542 std::deque<TimestampedMessage> output1;
2543
2544 EXPECT_EQ(mapper0_count, 3u);
2545 EXPECT_EQ(mapper1_count, 0u);
2546
2547 ASSERT_TRUE(mapper1.Front() != nullptr);
2548 EXPECT_EQ(mapper0_count, 3u);
2549 EXPECT_EQ(mapper1_count, 1u);
2550 output1.emplace_back(std::move(*mapper1.Front()));
2551 mapper1.PopFront();
2552 EXPECT_TRUE(mapper1.started());
2553 EXPECT_EQ(mapper0_count, 3u);
2554 EXPECT_EQ(mapper1_count, 1u);
2555
2556 ASSERT_TRUE(mapper1.Front() != nullptr);
2557 EXPECT_EQ(mapper0_count, 3u);
2558 EXPECT_EQ(mapper1_count, 2u);
2559 output1.emplace_back(std::move(*mapper1.Front()));
2560 mapper1.PopFront();
2561 EXPECT_TRUE(mapper1.started());
2562
2563 ASSERT_TRUE(mapper1.Front() != nullptr);
2564 output1.emplace_back(std::move(*mapper1.Front()));
2565 mapper1.PopFront();
2566 EXPECT_TRUE(mapper1.started());
2567
Austin Schuh58646e22021-08-23 23:51:46 -07002568 ASSERT_TRUE(mapper1.Front() != nullptr);
2569 output1.emplace_back(std::move(*mapper1.Front()));
2570 mapper1.PopFront();
2571 EXPECT_TRUE(mapper1.started());
2572
Austin Schuh48507722021-07-17 17:29:24 -07002573 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002574 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002575
2576 ASSERT_TRUE(mapper1.Front() == nullptr);
2577
2578 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002579 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002580
2581 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2582 EXPECT_EQ(output1[0].monotonic_event_time.time,
2583 e + chrono::seconds(100) + chrono::milliseconds(1000));
2584 EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
2585 EXPECT_EQ(output1[0].monotonic_remote_time.time,
2586 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002587 EXPECT_EQ(output1[0].remote_queue_index,
2588 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002589 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
2590 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
2591 e + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002592 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002593
2594 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2595 EXPECT_EQ(output1[1].monotonic_event_time.time,
2596 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002597 EXPECT_EQ(output1[1].remote_queue_index,
2598 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002599 EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
2600 EXPECT_EQ(output1[1].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002601 e + chrono::milliseconds(1000));
Austin Schuh48507722021-07-17 17:29:24 -07002602 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
2603 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
2604 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002605 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002606
2607 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2608 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002609 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh48507722021-07-17 17:29:24 -07002610 EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
2611 EXPECT_EQ(output1[2].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002612 e + chrono::milliseconds(2000));
2613 EXPECT_EQ(output1[2].remote_queue_index,
2614 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002615 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
2616 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002617 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002618 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002619
Austin Schuh58646e22021-08-23 23:51:46 -07002620 EXPECT_EQ(output1[3].monotonic_event_time.boot, 1u);
2621 EXPECT_EQ(output1[3].monotonic_event_time.time,
2622 e + chrono::seconds(20) + chrono::milliseconds(3000));
2623 EXPECT_EQ(output1[3].monotonic_remote_time.boot, 0u);
2624 EXPECT_EQ(output1[3].monotonic_remote_time.time,
2625 e + chrono::milliseconds(3000));
2626 EXPECT_EQ(output1[3].remote_queue_index,
2627 (BootQueueIndex{.boot = 0u, .index = 2u}));
2628 EXPECT_EQ(output1[3].monotonic_timestamp_time.boot, 0u);
2629 EXPECT_EQ(output1[3].monotonic_timestamp_time.time,
2630 e + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002631 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh58646e22021-08-23 23:51:46 -07002632
Austin Schuh48507722021-07-17 17:29:24 -07002633 LOG(INFO) << output1[0];
2634 LOG(INFO) << output1[1];
2635 LOG(INFO) << output1[2];
Austin Schuh58646e22021-08-23 23:51:46 -07002636 LOG(INFO) << output1[3];
Austin Schuh48507722021-07-17 17:29:24 -07002637 }
2638}
2639
2640TEST_F(RebootTimestampMapperTest, Node2Reboot) {
2641 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2642 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002643 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002644 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002645 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002646 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002647 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002648 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002649 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002650 writer1b.QueueSpan(boot1b_.span());
2651
Austin Schuhd863e6e2022-10-16 15:44:50 -07002652 writer1a.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002653 e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002654 writer0a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002655 e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
2656 chrono::seconds(-100),
2657 e + chrono::seconds(100) + chrono::milliseconds(1001)));
2658
Austin Schuhd863e6e2022-10-16 15:44:50 -07002659 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002660 e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002661 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002662 e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
2663 chrono::seconds(-20),
2664 e + chrono::seconds(20) + chrono::milliseconds(2001)));
2665
Austin Schuhd863e6e2022-10-16 15:44:50 -07002666 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002667 e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002668 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002669 e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
2670 chrono::seconds(-20),
2671 e + chrono::seconds(20) + chrono::milliseconds(3001)));
2672 }
2673
2674 const std::vector<LogFile> parts =
2675 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07002676 LogFilesContainer log_files(parts);
Austin Schuh48507722021-07-17 17:29:24 -07002677
2678 for (const auto &x : parts) {
2679 LOG(INFO) << x;
2680 }
2681 ASSERT_EQ(parts.size(), 1u);
2682 ASSERT_EQ(parts[0].logger_node, "pi1");
2683
2684 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07002685 TimestampMapper mapper0("pi1", log_files,
2686 TimestampQueueStrategy::kQueueTogether);
Austin Schuh48507722021-07-17 17:29:24 -07002687 mapper0.set_timestamp_callback(
2688 [&](TimestampedMessage *) { ++mapper0_count; });
2689 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07002690 TimestampMapper mapper1("pi2", log_files,
2691 TimestampQueueStrategy::kQueueTogether);
Austin Schuh48507722021-07-17 17:29:24 -07002692 mapper1.set_timestamp_callback(
2693 [&](TimestampedMessage *) { ++mapper1_count; });
2694
2695 mapper0.AddPeer(&mapper1);
2696 mapper1.AddPeer(&mapper0);
2697
2698 {
2699 std::deque<TimestampedMessage> output0;
2700
2701 EXPECT_EQ(mapper0_count, 0u);
2702 EXPECT_EQ(mapper1_count, 0u);
2703 ASSERT_TRUE(mapper0.Front() != nullptr);
2704 EXPECT_EQ(mapper0_count, 1u);
2705 EXPECT_EQ(mapper1_count, 0u);
2706 output0.emplace_back(std::move(*mapper0.Front()));
2707 mapper0.PopFront();
2708 EXPECT_TRUE(mapper0.started());
2709 EXPECT_EQ(mapper0_count, 1u);
2710 EXPECT_EQ(mapper1_count, 0u);
2711
2712 ASSERT_TRUE(mapper0.Front() != nullptr);
2713 EXPECT_EQ(mapper0_count, 2u);
2714 EXPECT_EQ(mapper1_count, 0u);
2715 output0.emplace_back(std::move(*mapper0.Front()));
2716 mapper0.PopFront();
2717 EXPECT_TRUE(mapper0.started());
2718
2719 ASSERT_TRUE(mapper0.Front() != nullptr);
2720 output0.emplace_back(std::move(*mapper0.Front()));
2721 mapper0.PopFront();
2722 EXPECT_TRUE(mapper0.started());
2723
2724 EXPECT_EQ(mapper0_count, 3u);
2725 EXPECT_EQ(mapper1_count, 0u);
2726
2727 ASSERT_TRUE(mapper0.Front() == nullptr);
2728
2729 LOG(INFO) << output0[0];
2730 LOG(INFO) << output0[1];
2731 LOG(INFO) << output0[2];
2732
2733 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2734 EXPECT_EQ(output0[0].monotonic_event_time.time,
2735 e + chrono::milliseconds(1000));
2736 EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
2737 EXPECT_EQ(output0[0].monotonic_remote_time.time,
2738 e + chrono::seconds(100) + chrono::milliseconds(1000));
2739 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
2740 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
2741 e + chrono::seconds(100) + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002742 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002743
2744 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2745 EXPECT_EQ(output0[1].monotonic_event_time.time,
2746 e + chrono::milliseconds(2000));
2747 EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
2748 EXPECT_EQ(output0[1].monotonic_remote_time.time,
2749 e + chrono::seconds(20) + chrono::milliseconds(2000));
2750 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
2751 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
2752 e + chrono::seconds(20) + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002753 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002754
2755 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2756 EXPECT_EQ(output0[2].monotonic_event_time.time,
2757 e + chrono::milliseconds(3000));
2758 EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
2759 EXPECT_EQ(output0[2].monotonic_remote_time.time,
2760 e + chrono::seconds(20) + chrono::milliseconds(3000));
2761 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
2762 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
2763 e + chrono::seconds(20) + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002764 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002765 }
2766
2767 {
2768 SCOPED_TRACE("Trying node1 now");
2769 std::deque<TimestampedMessage> output1;
2770
2771 EXPECT_EQ(mapper0_count, 3u);
2772 EXPECT_EQ(mapper1_count, 0u);
2773
2774 ASSERT_TRUE(mapper1.Front() != nullptr);
2775 EXPECT_EQ(mapper0_count, 3u);
2776 EXPECT_EQ(mapper1_count, 1u);
2777 output1.emplace_back(std::move(*mapper1.Front()));
2778 mapper1.PopFront();
2779 EXPECT_TRUE(mapper1.started());
2780 EXPECT_EQ(mapper0_count, 3u);
2781 EXPECT_EQ(mapper1_count, 1u);
2782
2783 ASSERT_TRUE(mapper1.Front() != nullptr);
2784 EXPECT_EQ(mapper0_count, 3u);
2785 EXPECT_EQ(mapper1_count, 2u);
2786 output1.emplace_back(std::move(*mapper1.Front()));
2787 mapper1.PopFront();
2788 EXPECT_TRUE(mapper1.started());
2789
2790 ASSERT_TRUE(mapper1.Front() != nullptr);
2791 output1.emplace_back(std::move(*mapper1.Front()));
2792 mapper1.PopFront();
2793 EXPECT_TRUE(mapper1.started());
2794
2795 EXPECT_EQ(mapper0_count, 3u);
2796 EXPECT_EQ(mapper1_count, 3u);
2797
2798 ASSERT_TRUE(mapper1.Front() == nullptr);
2799
2800 EXPECT_EQ(mapper0_count, 3u);
2801 EXPECT_EQ(mapper1_count, 3u);
2802
2803 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2804 EXPECT_EQ(output1[0].monotonic_event_time.time,
2805 e + chrono::seconds(100) + chrono::milliseconds(1000));
2806 EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
2807 EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002808 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002809
2810 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2811 EXPECT_EQ(output1[1].monotonic_event_time.time,
2812 e + chrono::seconds(20) + chrono::milliseconds(2000));
2813 EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
2814 EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002815 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002816
2817 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2818 EXPECT_EQ(output1[2].monotonic_event_time.time,
2819 e + chrono::seconds(20) + chrono::milliseconds(3000));
2820 EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
2821 EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002822 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002823
2824 LOG(INFO) << output1[0];
2825 LOG(INFO) << output1[1];
2826 LOG(INFO) << output1[2];
2827 }
2828}
2829
Philipp Schrader416505b2024-03-28 11:59:45 -07002830class SortingDeathTest : public SortingElementTest<> {
Austin Schuh44c61472021-11-22 21:04:10 -08002831 public:
2832 SortingDeathTest()
2833 : SortingElementTest(),
2834 part0_(MakeHeader(config_, R"({
2835 /* 100ms */
2836 "max_out_of_order_duration": 100000000,
2837 "node": {
2838 "name": "pi1"
2839 },
2840 "logger_node": {
2841 "name": "pi1"
2842 },
2843 "monotonic_start_time": 1000000,
2844 "realtime_start_time": 1000000000000,
2845 "logger_monotonic_start_time": 1000000,
2846 "logger_realtime_start_time": 1000000000000,
2847 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2848 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2849 "parts_index": 0,
2850 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2851 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2852 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2853 "boot_uuids": [
2854 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2855 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2856 ""
2857 ],
2858 "oldest_remote_monotonic_timestamps": [
2859 9223372036854775807,
2860 9223372036854775807,
2861 9223372036854775807
2862 ],
2863 "oldest_local_monotonic_timestamps": [
2864 9223372036854775807,
2865 9223372036854775807,
2866 9223372036854775807
2867 ],
2868 "oldest_remote_unreliable_monotonic_timestamps": [
2869 9223372036854775807,
2870 0,
2871 9223372036854775807
2872 ],
2873 "oldest_local_unreliable_monotonic_timestamps": [
2874 9223372036854775807,
2875 0,
2876 9223372036854775807
2877 ]
2878})")),
2879 part1_(MakeHeader(config_, R"({
2880 /* 100ms */
2881 "max_out_of_order_duration": 100000000,
2882 "node": {
2883 "name": "pi1"
2884 },
2885 "logger_node": {
2886 "name": "pi1"
2887 },
2888 "monotonic_start_time": 1000000,
2889 "realtime_start_time": 1000000000000,
2890 "logger_monotonic_start_time": 1000000,
2891 "logger_realtime_start_time": 1000000000000,
2892 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2893 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2894 "parts_index": 1,
2895 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2896 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2897 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2898 "boot_uuids": [
2899 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2900 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2901 ""
2902 ],
2903 "oldest_remote_monotonic_timestamps": [
2904 9223372036854775807,
2905 9223372036854775807,
2906 9223372036854775807
2907 ],
2908 "oldest_local_monotonic_timestamps": [
2909 9223372036854775807,
2910 9223372036854775807,
2911 9223372036854775807
2912 ],
2913 "oldest_remote_unreliable_monotonic_timestamps": [
2914 9223372036854775807,
2915 100000,
2916 9223372036854775807
2917 ],
2918 "oldest_local_unreliable_monotonic_timestamps": [
2919 9223372036854775807,
2920 100000,
2921 9223372036854775807
2922 ]
2923})")),
2924 part2_(MakeHeader(config_, R"({
2925 /* 100ms */
2926 "max_out_of_order_duration": 100000000,
2927 "node": {
2928 "name": "pi1"
2929 },
2930 "logger_node": {
2931 "name": "pi1"
2932 },
2933 "monotonic_start_time": 1000000,
2934 "realtime_start_time": 1000000000000,
2935 "logger_monotonic_start_time": 1000000,
2936 "logger_realtime_start_time": 1000000000000,
2937 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2938 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2939 "parts_index": 2,
2940 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2941 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2942 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2943 "boot_uuids": [
2944 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2945 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2946 ""
2947 ],
2948 "oldest_remote_monotonic_timestamps": [
2949 9223372036854775807,
2950 9223372036854775807,
2951 9223372036854775807
2952 ],
2953 "oldest_local_monotonic_timestamps": [
2954 9223372036854775807,
2955 9223372036854775807,
2956 9223372036854775807
2957 ],
2958 "oldest_remote_unreliable_monotonic_timestamps": [
2959 9223372036854775807,
2960 200000,
2961 9223372036854775807
2962 ],
2963 "oldest_local_unreliable_monotonic_timestamps": [
2964 9223372036854775807,
2965 200000,
2966 9223372036854775807
2967 ]
2968})")),
2969 part3_(MakeHeader(config_, R"({
2970 /* 100ms */
2971 "max_out_of_order_duration": 100000000,
2972 "node": {
2973 "name": "pi1"
2974 },
2975 "logger_node": {
2976 "name": "pi1"
2977 },
2978 "monotonic_start_time": 1000000,
2979 "realtime_start_time": 1000000000000,
2980 "logger_monotonic_start_time": 1000000,
2981 "logger_realtime_start_time": 1000000000000,
2982 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2983 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2984 "parts_index": 3,
2985 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2986 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2987 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2988 "boot_uuids": [
2989 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2990 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2991 ""
2992 ],
2993 "oldest_remote_monotonic_timestamps": [
2994 9223372036854775807,
2995 9223372036854775807,
2996 9223372036854775807
2997 ],
2998 "oldest_local_monotonic_timestamps": [
2999 9223372036854775807,
3000 9223372036854775807,
3001 9223372036854775807
3002 ],
3003 "oldest_remote_unreliable_monotonic_timestamps": [
3004 9223372036854775807,
3005 300000,
3006 9223372036854775807
3007 ],
3008 "oldest_local_unreliable_monotonic_timestamps": [
3009 9223372036854775807,
3010 300000,
3011 9223372036854775807
3012 ]
3013})")) {}
3014
3015 protected:
3016 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part0_;
3017 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part1_;
3018 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part2_;
3019 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part3_;
3020};
3021
3022// Tests that if 2 computers go back and forth trying to be the same node, we
3023// die in sorting instead of failing to estimate time.
3024TEST_F(SortingDeathTest, FightingNodes) {
3025 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07003026 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh44c61472021-11-22 21:04:10 -08003027 writer0.QueueSpan(part0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07003028 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh44c61472021-11-22 21:04:10 -08003029 writer1.QueueSpan(part1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07003030 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuh44c61472021-11-22 21:04:10 -08003031 writer2.QueueSpan(part2_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07003032 TestDetachedBufferWriter writer3(logfile3_);
Austin Schuh44c61472021-11-22 21:04:10 -08003033 writer3.QueueSpan(part3_.span());
3034 }
3035
3036 EXPECT_DEATH(
3037 {
3038 const std::vector<LogFile> parts =
3039 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
3040 },
Austin Schuh22cf7862022-09-19 19:09:42 -07003041 "found overlapping boots on");
Austin Schuh44c61472021-11-22 21:04:10 -08003042}
3043
Brian Smarttea913d42021-12-10 15:02:38 -08003044// Tests that we MessageReader blows up on a bad message.
3045TEST(MessageReaderConfirmCrash, ReadWrite) {
3046 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
3047 unlink(logfile.c_str());
3048
3049 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
3050 JsonToSizedFlatbuffer<LogFileHeader>(
3051 R"({ "max_out_of_order_duration": 100000000 })");
3052 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
3053 JsonToSizedFlatbuffer<MessageHeader>(
3054 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
3055 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
3056 JsonToSizedFlatbuffer<MessageHeader>(
3057 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
3058 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m4 =
3059 JsonToSizedFlatbuffer<MessageHeader>(
3060 R"({ "channel_index": 0, "monotonic_sent_time": 4 })");
3061
3062 // Starts out like a proper flat buffer header, but it breaks down ...
3063 std::vector<uint8_t> garbage{8, 0, 0, 0, 16, 0, 0, 0, 4, 0, 0, 0};
3064 absl::Span<uint8_t> m3_span(garbage);
3065
3066 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07003067 TestDetachedBufferWriter writer(logfile);
Brian Smarttea913d42021-12-10 15:02:38 -08003068 writer.QueueSpan(config.span());
3069 writer.QueueSpan(m1.span());
3070 writer.QueueSpan(m2.span());
3071 writer.QueueSpan(m3_span);
3072 writer.QueueSpan(m4.span()); // This message is "hidden"
3073 }
3074
3075 {
3076 MessageReader reader(logfile);
3077
3078 EXPECT_EQ(reader.filename(), logfile);
3079
3080 EXPECT_EQ(
3081 reader.max_out_of_order_duration(),
3082 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
3083 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
3084 EXPECT_TRUE(reader.ReadMessage());
3085 EXPECT_EQ(reader.newest_timestamp(),
3086 monotonic_clock::time_point(chrono::nanoseconds(1)));
3087 EXPECT_TRUE(reader.ReadMessage());
3088 EXPECT_EQ(reader.newest_timestamp(),
3089 monotonic_clock::time_point(chrono::nanoseconds(2)));
3090 // Confirm default crashing behavior
3091 EXPECT_DEATH(reader.ReadMessage(), "Corrupted message at offset");
3092 }
3093
3094 {
3095 gflags::FlagSaver fs;
3096
3097 MessageReader reader(logfile);
3098 reader.set_crash_on_corrupt_message_flag(false);
3099
3100 EXPECT_EQ(reader.filename(), logfile);
3101
3102 EXPECT_EQ(
3103 reader.max_out_of_order_duration(),
3104 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
3105 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
3106 EXPECT_TRUE(reader.ReadMessage());
3107 EXPECT_EQ(reader.newest_timestamp(),
3108 monotonic_clock::time_point(chrono::nanoseconds(1)));
3109 EXPECT_TRUE(reader.ReadMessage());
3110 EXPECT_EQ(reader.newest_timestamp(),
3111 monotonic_clock::time_point(chrono::nanoseconds(2)));
3112 // Confirm avoiding the corrupted message crash, stopping instead.
3113 EXPECT_FALSE(reader.ReadMessage());
3114 }
3115
3116 {
3117 gflags::FlagSaver fs;
3118
3119 MessageReader reader(logfile);
3120 reader.set_crash_on_corrupt_message_flag(false);
3121 reader.set_ignore_corrupt_messages_flag(true);
3122
3123 EXPECT_EQ(reader.filename(), logfile);
3124
3125 EXPECT_EQ(
3126 reader.max_out_of_order_duration(),
3127 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
3128 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
3129 EXPECT_TRUE(reader.ReadMessage());
3130 EXPECT_EQ(reader.newest_timestamp(),
3131 monotonic_clock::time_point(chrono::nanoseconds(1)));
3132 EXPECT_TRUE(reader.ReadMessage());
3133 EXPECT_EQ(reader.newest_timestamp(),
3134 monotonic_clock::time_point(chrono::nanoseconds(2)));
3135 // Confirm skipping of the corrupted message to read the hidden one.
3136 EXPECT_TRUE(reader.ReadMessage());
3137 EXPECT_EQ(reader.newest_timestamp(),
3138 monotonic_clock::time_point(chrono::nanoseconds(4)));
3139 EXPECT_FALSE(reader.ReadMessage());
3140 }
3141}
3142
Austin Schuhfa30c352022-10-16 11:12:02 -07003143class InlinePackMessage : public ::testing::Test {
3144 protected:
3145 aos::Context RandomContext() {
3146 data_ = RandomData();
3147 std::uniform_int_distribution<uint32_t> uint32_distribution(
3148 std::numeric_limits<uint32_t>::min(),
3149 std::numeric_limits<uint32_t>::max());
3150
3151 std::uniform_int_distribution<int64_t> time_distribution(
3152 std::numeric_limits<int64_t>::min(),
3153 std::numeric_limits<int64_t>::max());
3154
3155 aos::Context context;
3156 context.monotonic_event_time =
3157 aos::monotonic_clock::epoch() +
3158 chrono::nanoseconds(time_distribution(random_number_generator_));
3159 context.realtime_event_time =
3160 aos::realtime_clock::epoch() +
3161 chrono::nanoseconds(time_distribution(random_number_generator_));
3162
3163 context.monotonic_remote_time =
3164 aos::monotonic_clock::epoch() +
3165 chrono::nanoseconds(time_distribution(random_number_generator_));
3166 context.realtime_remote_time =
3167 aos::realtime_clock::epoch() +
3168 chrono::nanoseconds(time_distribution(random_number_generator_));
3169
Austin Schuhb5224ec2024-03-27 15:20:09 -07003170 context.monotonic_remote_transmit_time =
3171 aos::monotonic_clock::epoch() +
3172 chrono::nanoseconds(time_distribution(random_number_generator_));
3173
Austin Schuhfa30c352022-10-16 11:12:02 -07003174 context.queue_index = uint32_distribution(random_number_generator_);
3175 context.remote_queue_index = uint32_distribution(random_number_generator_);
3176 context.size = data_.size();
3177 context.data = data_.data();
3178 return context;
3179 }
3180
Austin Schuhf2d0e682022-10-16 14:20:58 -07003181 aos::monotonic_clock::time_point RandomMonotonic() {
3182 std::uniform_int_distribution<int64_t> time_distribution(
3183 0, std::numeric_limits<int64_t>::max());
3184 return aos::monotonic_clock::epoch() +
3185 chrono::nanoseconds(time_distribution(random_number_generator_));
3186 }
3187
3188 aos::SizePrefixedFlatbufferDetachedBuffer<message_bridge::RemoteMessage>
3189 RandomRemoteMessage() {
3190 std::uniform_int_distribution<uint8_t> uint8_distribution(
3191 std::numeric_limits<uint8_t>::min(),
3192 std::numeric_limits<uint8_t>::max());
3193
3194 std::uniform_int_distribution<int64_t> time_distribution(
3195 std::numeric_limits<int64_t>::min(),
3196 std::numeric_limits<int64_t>::max());
3197
3198 flatbuffers::FlatBufferBuilder fbb;
3199 message_bridge::RemoteMessage::Builder builder(fbb);
3200 builder.add_queue_index(uint8_distribution(random_number_generator_));
3201
3202 builder.add_monotonic_sent_time(
3203 time_distribution(random_number_generator_));
3204 builder.add_realtime_sent_time(time_distribution(random_number_generator_));
3205 builder.add_monotonic_remote_time(
3206 time_distribution(random_number_generator_));
3207 builder.add_realtime_remote_time(
3208 time_distribution(random_number_generator_));
3209
3210 builder.add_remote_queue_index(
3211 uint8_distribution(random_number_generator_));
3212
Austin Schuhb5224ec2024-03-27 15:20:09 -07003213 builder.add_monotonic_remote_transmit_time(
3214 time_distribution(random_number_generator_));
3215
Austin Schuhf2d0e682022-10-16 14:20:58 -07003216 fbb.FinishSizePrefixed(builder.Finish());
3217 return fbb.Release();
3218 }
3219
Austin Schuhfa30c352022-10-16 11:12:02 -07003220 std::vector<uint8_t> RandomData() {
3221 std::vector<uint8_t> result;
3222 std::uniform_int_distribution<int> length_distribution(1, 32);
3223 std::uniform_int_distribution<uint8_t> data_distribution(
3224 std::numeric_limits<uint8_t>::min(),
3225 std::numeric_limits<uint8_t>::max());
3226
3227 const size_t length = length_distribution(random_number_generator_);
3228
3229 result.reserve(length);
3230 for (size_t i = 0; i < length; ++i) {
3231 result.emplace_back(data_distribution(random_number_generator_));
3232 }
3233 return result;
3234 }
3235
3236 std::mt19937 random_number_generator_{
3237 std::mt19937(::aos::testing::RandomSeed())};
3238
3239 std::vector<uint8_t> data_;
3240};
3241
3242// Uses the binary schema to annotate a provided flatbuffer. Returns the
3243// annotated flatbuffer.
3244std::string AnnotateBinaries(
3245 const aos::NonSizePrefixedFlatbuffer<reflection::Schema> &schema,
3246 const std::string &schema_filename,
3247 flatbuffers::span<uint8_t> binary_data) {
3248 flatbuffers::BinaryAnnotator binary_annotator(
3249 schema.span().data(), schema.span().size(), binary_data.data(),
3250 binary_data.size());
3251
3252 auto annotations = binary_annotator.Annotate();
3253
3254 flatbuffers::AnnotatedBinaryTextGenerator text_generator(
3255 flatbuffers::AnnotatedBinaryTextGenerator::Options{}, annotations,
3256 binary_data.data(), binary_data.size());
3257
3258 text_generator.Generate(aos::testing::TestTmpDir() + "/foo.bfbs",
3259 schema_filename);
3260
3261 return aos::util::ReadFileToStringOrDie(aos::testing::TestTmpDir() +
3262 "/foo.afb");
3263}
3264
Austin Schuh71a40d42023-02-04 21:22:22 -08003265// Event loop which just has working time functions for the Copier classes
3266// tested below.
3267class TimeEventLoop : public EventLoop {
3268 public:
3269 TimeEventLoop() : EventLoop(nullptr) {}
3270
3271 aos::monotonic_clock::time_point monotonic_now() const final {
3272 return aos::monotonic_clock::min_time;
3273 }
3274 realtime_clock::time_point realtime_now() const final {
3275 return aos::realtime_clock::min_time;
3276 }
3277
3278 void OnRun(::std::function<void()> /*on_run*/) final { LOG(FATAL); }
3279
3280 const std::string_view name() const final { return "time"; }
3281 const Node *node() const final { return nullptr; }
3282
3283 void SetRuntimeAffinity(const cpu_set_t & /*cpuset*/) final { LOG(FATAL); }
3284 void SetRuntimeRealtimePriority(int /*priority*/) final { LOG(FATAL); }
3285
3286 const cpu_set_t &runtime_affinity() const final {
3287 LOG(FATAL);
3288 return cpuset_;
3289 }
3290
3291 TimerHandler *AddTimer(::std::function<void()> /*callback*/) final {
3292 LOG(FATAL);
3293 return nullptr;
3294 }
3295
3296 std::unique_ptr<RawSender> MakeRawSender(const Channel * /*channel*/) final {
3297 LOG(FATAL);
3298 return std::unique_ptr<RawSender>();
3299 }
3300
3301 const UUID &boot_uuid() const final {
3302 LOG(FATAL);
3303 return boot_uuid_;
3304 }
3305
3306 void set_name(const std::string_view name) final { LOG(FATAL) << name; }
3307
3308 pid_t GetTid() final {
3309 LOG(FATAL);
3310 return 0;
3311 }
3312
3313 int NumberBuffers(const Channel * /*channel*/) final {
3314 LOG(FATAL);
3315 return 0;
3316 }
3317
3318 int runtime_realtime_priority() const final {
3319 LOG(FATAL);
3320 return 0;
3321 }
3322
3323 std::unique_ptr<RawFetcher> MakeRawFetcher(
3324 const Channel * /*channel*/) final {
3325 LOG(FATAL);
3326 return std::unique_ptr<RawFetcher>();
3327 }
3328
3329 PhasedLoopHandler *AddPhasedLoop(
3330 ::std::function<void(int)> /*callback*/,
3331 const monotonic_clock::duration /*interval*/,
3332 const monotonic_clock::duration /*offset*/) final {
3333 LOG(FATAL);
3334 return nullptr;
3335 }
3336
3337 void MakeRawWatcher(
3338 const Channel * /*channel*/,
3339 std::function<void(const Context &context, const void *message)>
3340 /*watcher*/) final {
3341 LOG(FATAL);
3342 }
3343
3344 private:
3345 const cpu_set_t cpuset_ = DefaultAffinity();
3346 UUID boot_uuid_ = UUID ::Zero();
3347};
3348
Austin Schuhfa30c352022-10-16 11:12:02 -07003349// Tests that all variations of PackMessage are equivalent to the inline
3350// PackMessage used to avoid allocations.
3351TEST_F(InlinePackMessage, Equivilent) {
3352 std::uniform_int_distribution<uint32_t> uint32_distribution(
3353 std::numeric_limits<uint32_t>::min(),
3354 std::numeric_limits<uint32_t>::max());
3355 aos::FlatbufferVector<reflection::Schema> schema =
3356 FileToFlatbuffer<reflection::Schema>(
3357 ArtifactPath("aos/events/logging/logger.bfbs"));
3358
3359 for (const LogType type :
3360 {LogType::kLogMessage, LogType::kLogDeliveryTimeOnly,
Austin Schuh9d50f0d2024-03-27 14:40:43 -07003361 LogType::kLogRemoteMessage}) {
Austin Schuhfa30c352022-10-16 11:12:02 -07003362 for (int i = 0; i < 100; ++i) {
3363 aos::Context context = RandomContext();
3364 const uint32_t channel_index =
3365 uint32_distribution(random_number_generator_);
3366
3367 flatbuffers::FlatBufferBuilder fbb;
3368 fbb.ForceDefaults(true);
3369 fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index, type));
3370
3371 VLOG(1) << absl::BytesToHexString(std::string_view(
3372 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3373 fbb.GetBufferSpan().size()));
3374
3375 // Make sure that both the builder and inline method agree on sizes.
Austin Schuh48d10d62022-10-16 22:19:23 -07003376 ASSERT_EQ(fbb.GetSize(), PackMessageSize(type, context.size))
Austin Schuhfa30c352022-10-16 11:12:02 -07003377 << "log type " << static_cast<int>(type);
3378
3379 // Initialize the buffer to something nonzero to make sure all the padding
3380 // bytes are set to 0.
Austin Schuh48d10d62022-10-16 22:19:23 -07003381 std::vector<uint8_t> repacked_message(PackMessageSize(type, context.size),
3382 67);
Austin Schuhfa30c352022-10-16 11:12:02 -07003383
3384 // And verify packing inline works as expected.
Austin Schuh71a40d42023-02-04 21:22:22 -08003385 EXPECT_EQ(
3386 repacked_message.size(),
3387 PackMessageInline(repacked_message.data(), context, channel_index,
3388 type, 0u, repacked_message.size()));
Austin Schuhb5224ec2024-03-27 15:20:09 -07003389 for (size_t i = 0; i < fbb.GetBufferSpan().size(); ++i) {
3390 ASSERT_EQ(absl::Span<uint8_t>(repacked_message)[i],
3391 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3392 fbb.GetBufferSpan().size())[i])
3393 << ": On index " << i;
3394 }
3395 ASSERT_EQ(absl::Span<uint8_t>(repacked_message),
Austin Schuhfa30c352022-10-16 11:12:02 -07003396 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3397 fbb.GetBufferSpan().size()))
3398 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
Austin Schuhb5224ec2024-03-27 15:20:09 -07003399 fbb.GetBufferSpan())
3400 << " for log type " << static_cast<int>(type);
Austin Schuh71a40d42023-02-04 21:22:22 -08003401
3402 // Ok, now we want to confirm that we can build up arbitrary pieces of
3403 // said flatbuffer. Try all of them since it is cheap.
3404 TimeEventLoop event_loop;
3405 for (size_t i = 0; i < repacked_message.size(); i += 8) {
3406 for (size_t j = i; j < repacked_message.size(); j += 8) {
3407 std::vector<uint8_t> destination(repacked_message.size(), 67u);
3408 ContextDataCopier copier(context, channel_index, type, &event_loop);
3409
3410 copier.Copy(destination.data(), i, j);
3411
3412 size_t index = 0;
3413 for (size_t k = i; k < j; ++k) {
3414 ASSERT_EQ(destination[index], repacked_message[k])
3415 << ": Failed to match type " << static_cast<int>(type)
3416 << ", index " << index << " while testing range " << i << " to "
3417 << j;
3418 ;
3419 ++index;
3420 }
3421 // Now, confirm that none of the other bytes have been touched.
3422 for (; index < destination.size(); ++index) {
3423 ASSERT_EQ(destination[index], 67u);
3424 }
3425 }
3426 }
Austin Schuhfa30c352022-10-16 11:12:02 -07003427 }
3428 }
3429}
3430
Austin Schuhf2d0e682022-10-16 14:20:58 -07003431// Tests that all variations of PackMessage are equivilent to the inline
3432// PackMessage used to avoid allocations.
3433TEST_F(InlinePackMessage, RemoteEquivilent) {
3434 aos::FlatbufferVector<reflection::Schema> schema =
3435 FileToFlatbuffer<reflection::Schema>(
3436 ArtifactPath("aos/events/logging/logger.bfbs"));
3437 std::uniform_int_distribution<uint8_t> uint8_distribution(
3438 std::numeric_limits<uint8_t>::min(), std::numeric_limits<uint8_t>::max());
3439
3440 for (int i = 0; i < 100; ++i) {
3441 aos::SizePrefixedFlatbufferDetachedBuffer<RemoteMessage> random_msg =
3442 RandomRemoteMessage();
3443 const size_t channel_index = uint8_distribution(random_number_generator_);
3444 const monotonic_clock::time_point monotonic_timestamp_time =
3445 RandomMonotonic();
3446
3447 flatbuffers::FlatBufferBuilder fbb;
3448 fbb.ForceDefaults(true);
3449 fbb.FinishSizePrefixed(PackRemoteMessage(
3450 &fbb, &random_msg.message(), channel_index, monotonic_timestamp_time));
3451
3452 VLOG(1) << absl::BytesToHexString(std::string_view(
3453 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3454 fbb.GetBufferSpan().size()));
3455
3456 // Make sure that both the builder and inline method agree on sizes.
3457 ASSERT_EQ(fbb.GetSize(), PackRemoteMessageSize());
3458
3459 // Initialize the buffer to something nonzer to make sure all the padding
3460 // bytes are set to 0.
3461 std::vector<uint8_t> repacked_message(PackRemoteMessageSize(), 67);
3462
3463 // And verify packing inline works as expected.
Austin Schuh71a40d42023-02-04 21:22:22 -08003464 EXPECT_EQ(repacked_message.size(),
3465 PackRemoteMessageInline(
3466 repacked_message.data(), &random_msg.message(), channel_index,
3467 monotonic_timestamp_time, 0u, repacked_message.size()));
Austin Schuhf2d0e682022-10-16 14:20:58 -07003468 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
3469 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3470 fbb.GetBufferSpan().size()))
3471 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
3472 fbb.GetBufferSpan());
Austin Schuh71a40d42023-02-04 21:22:22 -08003473
3474 // Ok, now we want to confirm that we can build up arbitrary pieces of said
3475 // flatbuffer. Try all of them since it is cheap.
3476 TimeEventLoop event_loop;
3477 for (size_t i = 0; i < repacked_message.size(); i += 8) {
3478 for (size_t j = i; j < repacked_message.size(); j += 8) {
3479 std::vector<uint8_t> destination(repacked_message.size(), 67u);
3480 RemoteMessageCopier copier(&random_msg.message(), channel_index,
3481 monotonic_timestamp_time, &event_loop);
3482
3483 copier.Copy(destination.data(), i, j);
3484
3485 size_t index = 0;
3486 for (size_t k = i; k < j; ++k) {
3487 ASSERT_EQ(destination[index], repacked_message[k]);
3488 ++index;
3489 }
3490 for (; index < destination.size(); ++index) {
3491 ASSERT_EQ(destination[index], 67u);
3492 }
3493 }
3494 }
Austin Schuhf2d0e682022-10-16 14:20:58 -07003495 }
3496}
Austin Schuhfa30c352022-10-16 11:12:02 -07003497
Stephan Pleinesf63bde82024-01-13 15:59:33 -08003498} // namespace aos::logger::testing