blob: 87c6f5d777654599296e52db0484bc1914640ce9 [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
Alexei Strots01395492023-03-20 13:59:56 -07004#include <filesystem>
Austin Schuhfa30c352022-10-16 11:12:02 -07005#include <random>
Austin Schuhe243aaf2020-10-11 15:46:02 -07006#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07007
Austin Schuhfa30c352022-10-16 11:12:02 -07008#include "absl/strings/escaping.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07009#include "external/com_github_google_flatbuffers/src/annotated_binary_text_gen.h"
10#include "external/com_github_google_flatbuffers/src/binary_annotator.h"
11#include "flatbuffers/reflection_generated.h"
12#include "gflags/gflags.h"
13#include "gtest/gtest.h"
14
Austin Schuhc41603c2020-10-11 16:17:37 -070015#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -070016#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -080017#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070018#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070019#include "aos/json_to_flatbuffer.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070020#include "aos/testing/path.h"
21#include "aos/testing/random_seed.h"
Austin Schuhc243b422020-10-11 15:35:08 -070022#include "aos/testing/tmpdir.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070023#include "aos/util/file.h"
Austin Schuhc243b422020-10-11 15:35:08 -070024
Stephan Pleinesf63bde82024-01-13 15:59:33 -080025namespace aos::logger::testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070026namespace chrono = std::chrono;
Austin Schuhf2d0e682022-10-16 14:20:58 -070027using aos::message_bridge::RemoteMessage;
Eric Schmiedebergb38477e2022-12-02 16:08:04 -070028using aos::testing::ArtifactPath;
Austin Schuhc243b422020-10-11 15:35:08 -070029
Austin Schuhd863e6e2022-10-16 15:44:50 -070030// Adapter class to make it easy to test DetachedBufferWriter without adding
31// test only boilerplate to DetachedBufferWriter.
Alexei Strots15c22b12023-04-04 16:27:17 -070032class TestDetachedBufferWriter : public FileBackend,
33 public DetachedBufferWriter {
Austin Schuhd863e6e2022-10-16 15:44:50 -070034 public:
Austin Schuh48d10d62022-10-16 22:19:23 -070035 // Pick a max size that is rather conservative.
36 static constexpr size_t kMaxMessageSize = 128 * 1024;
Austin Schuhd863e6e2022-10-16 15:44:50 -070037 TestDetachedBufferWriter(std::string_view filename)
colleen61276dc2023-06-01 09:23:29 -070038 : FileBackend("/", false),
Alexei Strots15c22b12023-04-04 16:27:17 -070039 DetachedBufferWriter(FileBackend::RequestFile(filename),
40 std::make_unique<DummyEncoder>(kMaxMessageSize)) {}
Austin Schuhd863e6e2022-10-16 15:44:50 -070041 void WriteSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
42 QueueSpan(absl::Span<const uint8_t>(buffer.data(), buffer.size()));
43 }
Austin Schuh7ef11a42023-02-04 17:15:12 -080044 void QueueSpan(absl::Span<const uint8_t> buffer) {
45 DataEncoder::SpanCopier coppier(buffer);
46 CopyMessage(&coppier, monotonic_clock::now());
47 }
Austin Schuhd863e6e2022-10-16 15:44:50 -070048};
49
Austin Schuhe243aaf2020-10-11 15:46:02 -070050// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070051template <typename T>
52SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
53 const std::string_view data) {
54 flatbuffers::FlatBufferBuilder fbb;
55 fbb.ForceDefaults(true);
56 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
57 return fbb.Release();
58}
59
Austin Schuhe243aaf2020-10-11 15:46:02 -070060// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070061TEST(SpanReaderTest, ReadWrite) {
62 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
63 unlink(logfile.c_str());
64
65 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080066 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070067 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080068 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070069
70 {
Austin Schuhd863e6e2022-10-16 15:44:50 -070071 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080072 writer.QueueSpan(m1.span());
73 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070074 }
75
76 SpanReader reader(logfile);
77
78 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhcf5f6442021-07-06 10:43:28 -070079 EXPECT_EQ(reader.PeekMessage(), m1.span());
80 EXPECT_EQ(reader.PeekMessage(), m1.span());
Austin Schuhadd6eb32020-11-09 21:24:26 -080081 EXPECT_EQ(reader.ReadMessage(), m1.span());
82 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhcf5f6442021-07-06 10:43:28 -070083 EXPECT_EQ(reader.PeekMessage(), absl::Span<const uint8_t>());
Austin Schuhc243b422020-10-11 15:35:08 -070084 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
85}
86
Austin Schuhe243aaf2020-10-11 15:46:02 -070087// Tests that we can actually parse the resulting messages at a basic level
88// through MessageReader.
89TEST(MessageReaderTest, ReadWrite) {
90 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
91 unlink(logfile.c_str());
92
93 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
94 JsonToSizedFlatbuffer<LogFileHeader>(
95 R"({ "max_out_of_order_duration": 100000000 })");
96 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
97 JsonToSizedFlatbuffer<MessageHeader>(
98 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
99 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
100 JsonToSizedFlatbuffer<MessageHeader>(
101 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
102
103 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700104 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800105 writer.QueueSpan(config.span());
106 writer.QueueSpan(m1.span());
107 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -0700108 }
109
110 MessageReader reader(logfile);
111
112 EXPECT_EQ(reader.filename(), logfile);
113
114 EXPECT_EQ(
115 reader.max_out_of_order_duration(),
116 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
117 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
118 EXPECT_TRUE(reader.ReadMessage());
119 EXPECT_EQ(reader.newest_timestamp(),
120 monotonic_clock::time_point(chrono::nanoseconds(1)));
121 EXPECT_TRUE(reader.ReadMessage());
122 EXPECT_EQ(reader.newest_timestamp(),
123 monotonic_clock::time_point(chrono::nanoseconds(2)));
124 EXPECT_FALSE(reader.ReadMessage());
125}
126
Austin Schuh32f68492020-11-08 21:45:51 -0800127// Tests that we explode when messages are too far out of order.
128TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
129 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
130 unlink(logfile0.c_str());
131
132 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
133 JsonToSizedFlatbuffer<LogFileHeader>(
134 R"({
135 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800136 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800137 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
138 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
139 "parts_index": 0
140})");
141
142 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
143 JsonToSizedFlatbuffer<MessageHeader>(
144 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
145 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
146 JsonToSizedFlatbuffer<MessageHeader>(
147 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
148 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
149 JsonToSizedFlatbuffer<MessageHeader>(
150 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
151
152 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700153 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800154 writer.QueueSpan(config0.span());
155 writer.QueueSpan(m1.span());
156 writer.QueueSpan(m2.span());
157 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800158 }
Alexei Strots01395492023-03-20 13:59:56 -0700159 ASSERT_TRUE(std::filesystem::exists(logfile0)) << logfile0;
Austin Schuh32f68492020-11-08 21:45:51 -0800160
161 const std::vector<LogFile> parts = SortParts({logfile0});
162
163 PartsMessageReader reader(parts[0].parts[0]);
164
165 EXPECT_TRUE(reader.ReadMessage());
166 EXPECT_TRUE(reader.ReadMessage());
167 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
168}
169
Austin Schuhc41603c2020-10-11 16:17:37 -0700170// Tests that we can transparently re-assemble part files with a
171// PartsMessageReader.
172TEST(PartsMessageReaderTest, ReadWrite) {
173 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
174 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
175 unlink(logfile0.c_str());
176 unlink(logfile1.c_str());
177
178 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
179 JsonToSizedFlatbuffer<LogFileHeader>(
180 R"({
181 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800182 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700183 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
184 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
185 "parts_index": 0
186})");
187 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
188 JsonToSizedFlatbuffer<LogFileHeader>(
189 R"({
190 "max_out_of_order_duration": 200000000,
191 "monotonic_start_time": 0,
192 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800193 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700194 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
195 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
196 "parts_index": 1
197})");
198
199 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
200 JsonToSizedFlatbuffer<MessageHeader>(
201 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
202 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
203 JsonToSizedFlatbuffer<MessageHeader>(
204 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
205
206 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700207 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800208 writer.QueueSpan(config0.span());
209 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700210 }
211 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700212 TestDetachedBufferWriter writer(logfile1);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800213 writer.QueueSpan(config1.span());
214 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700215 }
216
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700217 // When parts are sorted, we choose the highest max out of order duration for
218 // all parts with the same part uuid.
Austin Schuhc41603c2020-10-11 16:17:37 -0700219 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
220
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700221 EXPECT_EQ(parts.size(), 1);
222 EXPECT_EQ(parts[0].parts.size(), 1);
223
Austin Schuhc41603c2020-10-11 16:17:37 -0700224 PartsMessageReader reader(parts[0].parts[0]);
225
226 EXPECT_EQ(reader.filename(), logfile0);
227
228 // Confirm that the timestamps track, and the filename also updates.
229 // Read the first message.
230 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700231 // Since config1 has higher max out of order duration, that will be used to
232 // read partfiles with same part uuid, i.e logfile0 and logfile1.
Austin Schuhc41603c2020-10-11 16:17:37 -0700233 EXPECT_EQ(
234 reader.max_out_of_order_duration(),
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700235 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuhc41603c2020-10-11 16:17:37 -0700236 EXPECT_TRUE(reader.ReadMessage());
237 EXPECT_EQ(reader.filename(), logfile0);
238 EXPECT_EQ(reader.newest_timestamp(),
239 monotonic_clock::time_point(chrono::nanoseconds(1)));
240 EXPECT_EQ(
241 reader.max_out_of_order_duration(),
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700242 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuhc41603c2020-10-11 16:17:37 -0700243
244 // Read the second message.
245 EXPECT_TRUE(reader.ReadMessage());
246 EXPECT_EQ(reader.filename(), logfile1);
247 EXPECT_EQ(reader.newest_timestamp(),
248 monotonic_clock::time_point(chrono::nanoseconds(2)));
249 EXPECT_EQ(
250 reader.max_out_of_order_duration(),
251 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
252
253 // And then confirm that reading again returns no message.
254 EXPECT_FALSE(reader.ReadMessage());
255 EXPECT_EQ(reader.filename(), logfile1);
256 EXPECT_EQ(
257 reader.max_out_of_order_duration(),
258 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800259 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700260
261 // Verify that the parts metadata has the correct max out of order duration.
262 EXPECT_EQ(
263 parts[0].parts[0].max_out_of_order_duration,
264 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuhc41603c2020-10-11 16:17:37 -0700265}
Austin Schuh32f68492020-11-08 21:45:51 -0800266
Austin Schuh1be0ce42020-11-29 22:43:26 -0800267// Tests that Message's operator < works as expected.
268TEST(MessageTest, Sorting) {
269 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
270
271 Message m1{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700272 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700273 .timestamp =
274 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
Austin Schuh48507722021-07-17 17:29:24 -0700275 .monotonic_remote_boot = 0xffffff,
276 .monotonic_timestamp_boot = 0xffffff,
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700277 .header = nullptr,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700278 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800279 Message m2{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700280 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700281 .timestamp =
282 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
Austin Schuh48507722021-07-17 17:29:24 -0700283 .monotonic_remote_boot = 0xffffff,
284 .monotonic_timestamp_boot = 0xffffff,
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700285 .header = nullptr,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700286 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800287
288 EXPECT_LT(m1, m2);
289 EXPECT_GE(m2, m1);
290
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700291 m1.timestamp.time = e;
292 m2.timestamp.time = e;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800293
294 m1.channel_index = 1;
295 m2.channel_index = 2;
296
297 EXPECT_LT(m1, m2);
298 EXPECT_GE(m2, m1);
299
300 m1.channel_index = 0;
301 m2.channel_index = 0;
Austin Schuh58646e22021-08-23 23:51:46 -0700302 m1.queue_index.index = 0u;
303 m2.queue_index.index = 1u;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800304
305 EXPECT_LT(m1, m2);
306 EXPECT_GE(m2, m1);
307}
308
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800309aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
310 const aos::FlatbufferDetachedBuffer<Configuration> &config,
311 const std::string_view json) {
312 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700313 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800314 flatbuffers::Offset<Configuration> config_offset =
315 aos::CopyFlatBuffer(config, &fbb);
316 LogFileHeader::Builder header_builder(fbb);
317 header_builder.add_configuration(config_offset);
318 fbb.Finish(header_builder.Finish());
319 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
320
321 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
322 JsonToFlatbuffer<LogFileHeader>(json));
323 CHECK(header_updates.Verify());
324 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700325 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800326 fbb2.FinishSizePrefixed(
327 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
328 return fbb2.Release();
329}
330
Philipp Schrader416505b2024-03-28 11:59:45 -0700331// Allows for some customization of a SortingElementTest.
332enum class SortingElementConfig {
333 // Create a single node configuration.
334 kSingleNode,
335 // Create a multi-node configuration.
336 kMultiNode,
337};
338
339template <SortingElementConfig sorting_element_config =
340 SortingElementConfig::kMultiNode>
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800341class SortingElementTest : public ::testing::Test {
342 public:
343 SortingElementTest()
344 : config_(JsonToFlatbuffer<Configuration>(
Philipp Schrader416505b2024-03-28 11:59:45 -0700345 sorting_element_config == SortingElementConfig::kSingleNode ?
346 R"({
347 "channels": [
348 {
349 "name": "/a",
350 "type": "aos.logger.testing.TestMessage"
351 },
352 {
353 "name": "/b",
354 "type": "aos.logger.testing.TestMessage"
355 },
356 {
357 "name": "/c",
358 "type": "aos.logger.testing.TestMessage"
359 },
360 {
361 "name": "/d",
362 "type": "aos.logger.testing.TestMessage"
363 }
364 ]
365}
366)"
367 :
368 R"({
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800369 "channels": [
370 {
371 "name": "/a",
372 "type": "aos.logger.testing.TestMessage",
373 "source_node": "pi1",
374 "destination_nodes": [
375 {
376 "name": "pi2"
377 },
378 {
379 "name": "pi3"
380 }
381 ]
382 },
383 {
384 "name": "/b",
385 "type": "aos.logger.testing.TestMessage",
386 "source_node": "pi1"
387 },
388 {
389 "name": "/c",
390 "type": "aos.logger.testing.TestMessage",
391 "source_node": "pi1"
Austin Schuh48507722021-07-17 17:29:24 -0700392 },
393 {
394 "name": "/d",
395 "type": "aos.logger.testing.TestMessage",
396 "source_node": "pi2",
397 "destination_nodes": [
398 {
399 "name": "pi1"
400 }
401 ]
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800402 }
403 ],
404 "nodes": [
405 {
406 "name": "pi1"
407 },
408 {
409 "name": "pi2"
410 },
411 {
412 "name": "pi3"
413 }
414 ]
415}
416)")),
Philipp Schrader416505b2024-03-28 11:59:45 -0700417 config0_(MakeHeader(
418 config_, sorting_element_config == SortingElementConfig::kSingleNode
419 ?
420 R"({
421 /* 100ms */
422 "max_out_of_order_duration": 100000000,
423 "node": {
424 "name": "pi1"
425 },
426 "logger_node": {
427 "name": "pi1"
428 },
429 "monotonic_start_time": 1000000,
430 "realtime_start_time": 1000000000000,
431 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
432 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
433 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
434 "boot_uuids": [
435 "1d782c63-b3c7-466e-bea9-a01308b43333",
436 ],
437 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
438 "parts_index": 0
439})"
440 :
441 R"({
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800442 /* 100ms */
443 "max_out_of_order_duration": 100000000,
444 "node": {
445 "name": "pi1"
446 },
447 "logger_node": {
448 "name": "pi1"
449 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800450 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800451 "realtime_start_time": 1000000000000,
452 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700453 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
454 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
455 "boot_uuids": [
456 "1d782c63-b3c7-466e-bea9-a01308b43333",
457 "",
458 ""
459 ],
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800460 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
461 "parts_index": 0
462})")),
463 config1_(MakeHeader(config_,
464 R"({
465 /* 100ms */
466 "max_out_of_order_duration": 100000000,
467 "node": {
468 "name": "pi1"
469 },
470 "logger_node": {
471 "name": "pi1"
472 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800473 "monotonic_start_time": 1000000,
474 "realtime_start_time": 1000000000000,
475 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700476 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
477 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
478 "boot_uuids": [
479 "1d782c63-b3c7-466e-bea9-a01308b43333",
480 "",
481 ""
482 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800483 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
484 "parts_index": 0
485})")),
486 config2_(MakeHeader(config_,
487 R"({
488 /* 100ms */
489 "max_out_of_order_duration": 100000000,
490 "node": {
491 "name": "pi2"
492 },
493 "logger_node": {
494 "name": "pi2"
495 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800496 "monotonic_start_time": 0,
497 "realtime_start_time": 1000000000000,
Austin Schuh48507722021-07-17 17:29:24 -0700498 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
499 "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
500 "boot_uuids": [
501 "",
502 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
503 ""
504 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800505 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
506 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
507 "parts_index": 0
508})")),
509 config3_(MakeHeader(config_,
510 R"({
511 /* 100ms */
512 "max_out_of_order_duration": 100000000,
513 "node": {
514 "name": "pi1"
515 },
516 "logger_node": {
517 "name": "pi1"
518 },
519 "monotonic_start_time": 2000000,
520 "realtime_start_time": 1000000000,
521 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
Austin Schuh48507722021-07-17 17:29:24 -0700522 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
523 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
524 "boot_uuids": [
525 "1d782c63-b3c7-466e-bea9-a01308b43333",
526 "",
527 ""
528 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800529 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800530 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800531})")),
532 config4_(MakeHeader(config_,
533 R"({
534 /* 100ms */
535 "max_out_of_order_duration": 100000000,
536 "node": {
537 "name": "pi2"
538 },
539 "logger_node": {
540 "name": "pi1"
541 },
542 "monotonic_start_time": 2000000,
543 "realtime_start_time": 1000000000,
544 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
545 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
Austin Schuh48507722021-07-17 17:29:24 -0700546 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
547 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
548 "boot_uuids": [
549 "1d782c63-b3c7-466e-bea9-a01308b43333",
550 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
551 ""
552 ],
Austin Schuh8bf1e632021-01-02 22:41:04 -0800553 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800554})")) {
555 unlink(logfile0_.c_str());
556 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800557 unlink(logfile2_.c_str());
Austin Schuh48507722021-07-17 17:29:24 -0700558 unlink(logfile3_.c_str());
Austin Schuh921b2562021-07-31 19:37:03 -0700559 queue_index_.resize(config_.message().channels()->size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800560 }
561
562 protected:
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800563 flatbuffers::DetachedBuffer MakeLogMessage(
564 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
565 int value) {
566 flatbuffers::FlatBufferBuilder message_fbb;
567 message_fbb.ForceDefaults(true);
568 TestMessage::Builder test_message_builder(message_fbb);
569 test_message_builder.add_value(value);
570 message_fbb.Finish(test_message_builder.Finish());
571
572 aos::Context context;
573 context.monotonic_event_time = monotonic_now;
574 context.realtime_event_time = aos::realtime_clock::epoch() +
575 chrono::seconds(1000) +
576 monotonic_now.time_since_epoch();
Austin Schuh921b2562021-07-31 19:37:03 -0700577 CHECK_LT(static_cast<size_t>(channel_index), queue_index_.size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800578 context.queue_index = queue_index_[channel_index];
579 context.size = message_fbb.GetSize();
580 context.data = message_fbb.GetBufferPointer();
581
582 ++queue_index_[channel_index];
583
584 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh53e3a812021-11-03 16:57:23 -0700585 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800586 fbb.FinishSizePrefixed(
587 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
588
589 return fbb.Release();
590 }
591
592 flatbuffers::DetachedBuffer MakeTimestampMessage(
593 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800594 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
595 monotonic_clock::time_point monotonic_timestamp_time =
596 monotonic_clock::min_time) {
597 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800598 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800599
600 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800601 fbb.ForceDefaults(true);
602
603 logger::MessageHeader::Builder message_header_builder(fbb);
604
605 message_header_builder.add_channel_index(channel_index);
606
607 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
608 100);
609 message_header_builder.add_monotonic_sent_time(
610 monotonic_sent_time.time_since_epoch().count());
611 message_header_builder.add_realtime_sent_time(
612 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
613 monotonic_sent_time.time_since_epoch())
614 .time_since_epoch()
615 .count());
616
617 message_header_builder.add_monotonic_remote_time(
618 sender_monotonic_now.time_since_epoch().count());
619 message_header_builder.add_realtime_remote_time(
620 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
621 sender_monotonic_now.time_since_epoch())
622 .time_since_epoch()
623 .count());
624 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
625 1);
626
627 if (monotonic_timestamp_time != monotonic_clock::min_time) {
628 message_header_builder.add_monotonic_timestamp_time(
629 monotonic_timestamp_time.time_since_epoch().count());
630 }
631
632 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800633 LOG(INFO) << aos::FlatbufferToJson(
634 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
635 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
636
637 return fbb.Release();
638 }
639
640 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
641 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800642 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh48507722021-07-17 17:29:24 -0700643 const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800644
645 const aos::FlatbufferDetachedBuffer<Configuration> config_;
646 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
647 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800648 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
649 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800650 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800651
652 std::vector<uint32_t> queue_index_;
653};
654
Philipp Schrader416505b2024-03-28 11:59:45 -0700655using MessageSorterTest = SortingElementTest<SortingElementConfig::kMultiNode>;
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700656using MessageSorterDeathTest = MessageSorterTest;
Philipp Schrader416505b2024-03-28 11:59:45 -0700657using PartsMergerTest = SortingElementTest<SortingElementConfig::kMultiNode>;
658using TimestampMapperTest =
659 SortingElementTest<SortingElementConfig::kMultiNode>;
660using SingleNodeTimestampMapperTest =
661 SortingElementTest<SortingElementConfig::kSingleNode>;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800662
663// Tests that we can pull messages out of a log sorted in order.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700664TEST_F(MessageSorterTest, Pull) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800665 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
666 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700667 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800668 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700669 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800670 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700671 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800672 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700673 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800674 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700675 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800676 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
677 }
678
679 const std::vector<LogFile> parts = SortParts({logfile0_});
680
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700681 MessageSorter message_sorter(parts[0].parts[0]);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800682
683 // Confirm we aren't sorted until any time until the message is popped.
684 // Peeking shouldn't change the sorted until time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700685 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800686
687 std::deque<Message> output;
688
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700689 ASSERT_TRUE(message_sorter.Front() != nullptr);
690 output.emplace_back(std::move(*message_sorter.Front()));
691 message_sorter.PopFront();
692 EXPECT_EQ(message_sorter.sorted_until(), e + chrono::milliseconds(1900));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800693
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700694 ASSERT_TRUE(message_sorter.Front() != nullptr);
695 output.emplace_back(std::move(*message_sorter.Front()));
696 message_sorter.PopFront();
697 EXPECT_EQ(message_sorter.sorted_until(), e + chrono::milliseconds(1900));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800698
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700699 ASSERT_TRUE(message_sorter.Front() != nullptr);
700 output.emplace_back(std::move(*message_sorter.Front()));
701 message_sorter.PopFront();
702 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::max_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800703
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700704 ASSERT_TRUE(message_sorter.Front() != nullptr);
705 output.emplace_back(std::move(*message_sorter.Front()));
706 message_sorter.PopFront();
707 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::max_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800708
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700709 ASSERT_TRUE(message_sorter.Front() == nullptr);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800710
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700711 EXPECT_EQ(output[0].timestamp.boot, 0);
712 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
713 EXPECT_EQ(output[1].timestamp.boot, 0);
714 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1000));
715 EXPECT_EQ(output[2].timestamp.boot, 0);
716 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1901));
717 EXPECT_EQ(output[3].timestamp.boot, 0);
718 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800719}
720
Austin Schuhb000de62020-12-03 22:00:40 -0800721// Tests that we can pull messages out of a log sorted in order.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700722TEST_F(MessageSorterTest, WayBeforeStart) {
Austin Schuhb000de62020-12-03 22:00:40 -0800723 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
724 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700725 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhb000de62020-12-03 22:00:40 -0800726 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700727 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800728 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700729 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800730 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700731 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800732 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700733 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800734 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700735 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800736 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
737 }
738
739 const std::vector<LogFile> parts = SortParts({logfile0_});
740
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700741 MessageSorter message_sorter(parts[0].parts[0]);
Austin Schuhb000de62020-12-03 22:00:40 -0800742
743 // Confirm we aren't sorted until any time until the message is popped.
744 // Peeking shouldn't change the sorted until time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700745 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
Austin Schuhb000de62020-12-03 22:00:40 -0800746
747 std::deque<Message> output;
748
749 for (monotonic_clock::time_point t :
750 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
751 e + chrono::milliseconds(1900), monotonic_clock::max_time,
752 monotonic_clock::max_time}) {
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700753 ASSERT_TRUE(message_sorter.Front() != nullptr);
754 output.emplace_back(std::move(*message_sorter.Front()));
755 message_sorter.PopFront();
756 EXPECT_EQ(message_sorter.sorted_until(), t);
Austin Schuhb000de62020-12-03 22:00:40 -0800757 }
758
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700759 ASSERT_TRUE(message_sorter.Front() == nullptr);
Austin Schuhb000de62020-12-03 22:00:40 -0800760
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700761 EXPECT_EQ(output[0].timestamp.boot, 0u);
762 EXPECT_EQ(output[0].timestamp.time, e - chrono::milliseconds(1000));
763 EXPECT_EQ(output[1].timestamp.boot, 0u);
764 EXPECT_EQ(output[1].timestamp.time, e - chrono::milliseconds(500));
765 EXPECT_EQ(output[2].timestamp.boot, 0u);
766 EXPECT_EQ(output[2].timestamp.time, e - chrono::milliseconds(10));
767 EXPECT_EQ(output[3].timestamp.boot, 0u);
768 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(1901));
769 EXPECT_EQ(output[4].timestamp.boot, 0u);
770 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuhb000de62020-12-03 22:00:40 -0800771}
772
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800773// Tests that messages too far out of order trigger death.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700774TEST_F(MessageSorterDeathTest, Pull) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800775 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
776 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700777 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800778 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700779 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800780 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700781 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800782 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700783 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800784 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
785 // The following message is too far out of order and will trigger the CHECK.
Austin Schuhd863e6e2022-10-16 15:44:50 -0700786 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800787 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
788 }
789
790 const std::vector<LogFile> parts = SortParts({logfile0_});
791
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700792 MessageSorter message_sorter(parts[0].parts[0]);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800793
794 // Confirm we aren't sorted until any time until the message is popped.
795 // Peeking shouldn't change the sorted until time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700796 EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800797 std::deque<Message> output;
798
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700799 ASSERT_TRUE(message_sorter.Front() != nullptr);
800 message_sorter.PopFront();
801 ASSERT_TRUE(message_sorter.Front() != nullptr);
802 ASSERT_TRUE(message_sorter.Front() != nullptr);
803 message_sorter.PopFront();
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800804
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700805 EXPECT_DEATH({ message_sorter.Front(); },
Austin Schuh58646e22021-08-23 23:51:46 -0700806 "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800807}
808
Austin Schuh8f52ed52020-11-30 23:12:39 -0800809// Tests that we can merge data from 2 separate files, including duplicate data.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700810TEST_F(PartsMergerTest, TwoFileMerger) {
Austin Schuh8f52ed52020-11-30 23:12:39 -0800811 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
812 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700813 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800814 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700815 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800816 writer1.QueueSpan(config1_.span());
817
Austin Schuhd863e6e2022-10-16 15:44:50 -0700818 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800819 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700820 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800821 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
822
Austin Schuhd863e6e2022-10-16 15:44:50 -0700823 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800824 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700825 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800826 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
827
828 // Make a duplicate!
829 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
830 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
831 writer0.QueueSpan(msg.span());
832 writer1.QueueSpan(msg.span());
833
Austin Schuhd863e6e2022-10-16 15:44:50 -0700834 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800835 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
836 }
837
838 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -0700839 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -0800840 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800841
Austin Schuh63097262023-08-16 17:04:29 -0700842 PartsMerger merger(
843 log_files.SelectParts("pi1", 0,
844 {StoredDataType::DATA, StoredDataType::TIMESTAMPS,
845 StoredDataType::REMOTE_TIMESTAMPS}));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800846
847 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
848
849 std::deque<Message> output;
850
851 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
852 ASSERT_TRUE(merger.Front() != nullptr);
853 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
854
855 output.emplace_back(std::move(*merger.Front()));
856 merger.PopFront();
857 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
858
859 ASSERT_TRUE(merger.Front() != nullptr);
860 output.emplace_back(std::move(*merger.Front()));
861 merger.PopFront();
862 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
863
864 ASSERT_TRUE(merger.Front() != nullptr);
865 output.emplace_back(std::move(*merger.Front()));
866 merger.PopFront();
867 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
868
869 ASSERT_TRUE(merger.Front() != nullptr);
870 output.emplace_back(std::move(*merger.Front()));
871 merger.PopFront();
872 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
873
874 ASSERT_TRUE(merger.Front() != nullptr);
875 output.emplace_back(std::move(*merger.Front()));
876 merger.PopFront();
877 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
878
879 ASSERT_TRUE(merger.Front() != nullptr);
880 output.emplace_back(std::move(*merger.Front()));
881 merger.PopFront();
882 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
883
884 ASSERT_TRUE(merger.Front() == nullptr);
885
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700886 EXPECT_EQ(output[0].timestamp.boot, 0u);
887 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
888 EXPECT_EQ(output[1].timestamp.boot, 0u);
889 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1001));
890 EXPECT_EQ(output[2].timestamp.boot, 0u);
891 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1002));
892 EXPECT_EQ(output[3].timestamp.boot, 0u);
893 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
894 EXPECT_EQ(output[4].timestamp.boot, 0u);
895 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(3000));
896 EXPECT_EQ(output[5].timestamp.boot, 0u);
897 EXPECT_EQ(output[5].timestamp.time, e + chrono::milliseconds(3002));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800898}
899
Austin Schuh8bf1e632021-01-02 22:41:04 -0800900// Tests that we can merge timestamps with various combinations of
901// monotonic_timestamp_time.
Alexei Strotsa8dadd12023-04-28 15:19:23 -0700902TEST_F(PartsMergerTest, TwoFileTimestampMerger) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800903 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
904 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700905 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800906 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700907 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800908 writer1.QueueSpan(config1_.span());
909
910 // Neither has it.
911 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700912 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800913 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700914 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800915 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
916
917 // First only has it.
918 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700919 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800920 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
921 e + chrono::nanoseconds(971)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700922 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800923 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
924
925 // Second only has it.
926 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700927 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800928 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700929 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800930 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
931 e + chrono::nanoseconds(972)));
932
933 // Both have it.
934 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700935 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800936 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
937 e + chrono::nanoseconds(973)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700938 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800939 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
940 e + chrono::nanoseconds(973)));
941 }
942
943 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -0700944 LogFilesContainer log_files(parts);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800945 ASSERT_EQ(parts.size(), 1u);
946
Austin Schuh63097262023-08-16 17:04:29 -0700947 PartsMerger merger(
948 log_files.SelectParts("pi1", 0,
949 {StoredDataType::DATA, StoredDataType::TIMESTAMPS,
950 StoredDataType::REMOTE_TIMESTAMPS}));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800951
952 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
953
954 std::deque<Message> output;
955
956 for (int i = 0; i < 4; ++i) {
957 ASSERT_TRUE(merger.Front() != nullptr);
958 output.emplace_back(std::move(*merger.Front()));
959 merger.PopFront();
960 }
961 ASSERT_TRUE(merger.Front() == nullptr);
962
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700963 EXPECT_EQ(output[0].timestamp.boot, 0u);
964 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700965 EXPECT_FALSE(output[0].header->has_monotonic_timestamp_time);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700966
967 EXPECT_EQ(output[1].timestamp.boot, 0u);
968 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700969 EXPECT_TRUE(output[1].header->has_monotonic_timestamp_time);
970 EXPECT_EQ(output[1].header->monotonic_timestamp_time,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700971 monotonic_clock::time_point(std::chrono::nanoseconds(971)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700972
973 EXPECT_EQ(output[2].timestamp.boot, 0u);
974 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700975 EXPECT_TRUE(output[2].header->has_monotonic_timestamp_time);
976 EXPECT_EQ(output[2].header->monotonic_timestamp_time,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700977 monotonic_clock::time_point(std::chrono::nanoseconds(972)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700978
979 EXPECT_EQ(output[3].timestamp.boot, 0u);
980 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700981 EXPECT_TRUE(output[3].header->has_monotonic_timestamp_time);
982 EXPECT_EQ(output[3].header->monotonic_timestamp_time,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700983 monotonic_clock::time_point(std::chrono::nanoseconds(973)));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800984}
985
Austin Schuhd2f96102020-12-01 20:27:29 -0800986// Tests that we can match timestamps on delivered messages.
987TEST_F(TimestampMapperTest, ReadNode0First) {
988 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
989 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700990 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800991 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700992 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800993 writer1.QueueSpan(config2_.span());
994
Austin Schuhd863e6e2022-10-16 15:44:50 -0700995 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800996 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700997 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800998 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
999
Austin Schuhd863e6e2022-10-16 15:44:50 -07001000 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001001 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001002 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001003 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1004
Austin Schuhd863e6e2022-10-16 15:44:50 -07001005 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001006 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001007 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001008 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1009 }
1010
1011 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001012 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001013 ASSERT_EQ(parts[0].logger_node, "pi1");
1014 ASSERT_EQ(parts[1].logger_node, "pi2");
1015
Austin Schuh79b30942021-01-24 22:32:21 -08001016 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001017
Austin Schuh63097262023-08-16 17:04:29 -07001018 TimestampMapper mapper0("pi1", log_files,
1019 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001020 mapper0.set_timestamp_callback(
1021 [&](TimestampedMessage *) { ++mapper0_count; });
1022 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001023 TimestampMapper mapper1("pi2", log_files,
1024 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001025 mapper1.set_timestamp_callback(
1026 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001027
1028 mapper0.AddPeer(&mapper1);
1029 mapper1.AddPeer(&mapper0);
1030
1031 {
1032 std::deque<TimestampedMessage> output0;
1033
Austin Schuh79b30942021-01-24 22:32:21 -08001034 EXPECT_EQ(mapper0_count, 0u);
1035 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001036 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001037 EXPECT_EQ(mapper0_count, 1u);
1038 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001039 output0.emplace_back(std::move(*mapper0.Front()));
1040 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001041 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -08001042 EXPECT_EQ(mapper0_count, 1u);
1043 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001044
1045 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001046 EXPECT_EQ(mapper0_count, 2u);
1047 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001048 output0.emplace_back(std::move(*mapper0.Front()));
1049 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001050 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001051
1052 ASSERT_TRUE(mapper0.Front() != nullptr);
1053 output0.emplace_back(std::move(*mapper0.Front()));
1054 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001055 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001056
Austin Schuh79b30942021-01-24 22:32:21 -08001057 EXPECT_EQ(mapper0_count, 3u);
1058 EXPECT_EQ(mapper1_count, 0u);
1059
Austin Schuhd2f96102020-12-01 20:27:29 -08001060 ASSERT_TRUE(mapper0.Front() == nullptr);
1061
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001062 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1063 EXPECT_EQ(output0[0].monotonic_event_time.time,
1064 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001065 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001066
1067 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1068 EXPECT_EQ(output0[1].monotonic_event_time.time,
1069 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001070 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001071
1072 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1073 EXPECT_EQ(output0[2].monotonic_event_time.time,
1074 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001075 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001076 }
1077
1078 {
1079 SCOPED_TRACE("Trying node1 now");
1080 std::deque<TimestampedMessage> output1;
1081
Austin Schuh79b30942021-01-24 22:32:21 -08001082 EXPECT_EQ(mapper0_count, 3u);
1083 EXPECT_EQ(mapper1_count, 0u);
1084
Austin Schuhd2f96102020-12-01 20:27:29 -08001085 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001086 EXPECT_EQ(mapper0_count, 3u);
1087 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001088 output1.emplace_back(std::move(*mapper1.Front()));
1089 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001090 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -08001091 EXPECT_EQ(mapper0_count, 3u);
1092 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001093
1094 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001095 EXPECT_EQ(mapper0_count, 3u);
1096 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001097 output1.emplace_back(std::move(*mapper1.Front()));
1098 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001099 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001100
1101 ASSERT_TRUE(mapper1.Front() != nullptr);
1102 output1.emplace_back(std::move(*mapper1.Front()));
1103 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001104 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001105
Austin Schuh79b30942021-01-24 22:32:21 -08001106 EXPECT_EQ(mapper0_count, 3u);
1107 EXPECT_EQ(mapper1_count, 3u);
1108
Austin Schuhd2f96102020-12-01 20:27:29 -08001109 ASSERT_TRUE(mapper1.Front() == nullptr);
1110
Austin Schuh79b30942021-01-24 22:32:21 -08001111 EXPECT_EQ(mapper0_count, 3u);
1112 EXPECT_EQ(mapper1_count, 3u);
1113
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001114 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1115 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001116 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001117 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001118
1119 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1120 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001121 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001122 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001123
1124 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1125 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001126 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001127 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001128 }
1129}
1130
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001131// Tests that we filter messages using the channel filter callback
1132TEST_F(TimestampMapperTest, ReplayChannelsCallbackTest) {
1133 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1134 {
1135 TestDetachedBufferWriter writer0(logfile0_);
1136 writer0.QueueSpan(config0_.span());
1137 TestDetachedBufferWriter writer1(logfile1_);
1138 writer1.QueueSpan(config2_.span());
1139
1140 writer0.WriteSizedFlatbuffer(
1141 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1142 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1143 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1144
1145 writer0.WriteSizedFlatbuffer(
1146 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1147 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1148 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1149
1150 writer0.WriteSizedFlatbuffer(
1151 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1152 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1153 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1154 }
1155
1156 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001157 LogFilesContainer log_files(parts);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001158 ASSERT_EQ(parts[0].logger_node, "pi1");
1159 ASSERT_EQ(parts[1].logger_node, "pi2");
1160
1161 // mapper0 will not provide any messages while mapper1 will provide all
1162 // messages due to the channel filter callbacks used
1163 size_t mapper0_count = 0;
Alexei Strots1f51ac72023-05-15 10:14:54 -07001164
Austin Schuh63097262023-08-16 17:04:29 -07001165 TimestampMapper mapper0("pi1", log_files,
1166 TimestampQueueStrategy::kQueueTogether);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001167 mapper0.set_timestamp_callback(
1168 [&](TimestampedMessage *) { ++mapper0_count; });
1169 mapper0.set_replay_channels_callback(
1170 [&](const TimestampedMessage &) -> bool { return mapper0_count != 2; });
1171 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001172 TimestampMapper mapper1("pi2", log_files,
1173 TimestampQueueStrategy::kQueueTogether);
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001174 mapper1.set_timestamp_callback(
1175 [&](TimestampedMessage *) { ++mapper1_count; });
1176 mapper1.set_replay_channels_callback(
1177 [&](const TimestampedMessage &) -> bool { return mapper1_count != 2; });
1178
1179 mapper0.AddPeer(&mapper1);
1180 mapper1.AddPeer(&mapper0);
1181
1182 {
1183 std::deque<TimestampedMessage> output0;
1184
1185 EXPECT_EQ(mapper0_count, 0u);
1186 EXPECT_EQ(mapper1_count, 0u);
1187
1188 ASSERT_TRUE(mapper0.Front() != nullptr);
1189 EXPECT_EQ(mapper0_count, 1u);
1190 EXPECT_EQ(mapper1_count, 0u);
1191 output0.emplace_back(std::move(*mapper0.Front()));
1192 mapper0.PopFront();
1193
1194 EXPECT_TRUE(mapper0.started());
1195 EXPECT_EQ(mapper0_count, 1u);
1196 EXPECT_EQ(mapper1_count, 0u);
1197
1198 // mapper0_count is now at 3 since the second message is not queued, but
1199 // timestamp_callback needs to be called everytime even if Front() does not
1200 // provide a message due to the replay_channels_callback.
1201 ASSERT_TRUE(mapper0.Front() != nullptr);
1202 EXPECT_EQ(mapper0_count, 3u);
1203 EXPECT_EQ(mapper1_count, 0u);
1204 output0.emplace_back(std::move(*mapper0.Front()));
1205 mapper0.PopFront();
1206
1207 EXPECT_TRUE(mapper0.started());
1208 EXPECT_EQ(mapper0_count, 3u);
1209 EXPECT_EQ(mapper1_count, 0u);
1210
1211 ASSERT_TRUE(mapper0.Front() == nullptr);
1212 EXPECT_TRUE(mapper0.started());
1213
1214 EXPECT_EQ(mapper0_count, 3u);
1215 EXPECT_EQ(mapper1_count, 0u);
1216
1217 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1218 EXPECT_EQ(output0[0].monotonic_event_time.time,
1219 e + chrono::milliseconds(1000));
1220 EXPECT_TRUE(output0[0].data != nullptr);
1221
1222 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1223 EXPECT_EQ(output0[1].monotonic_event_time.time,
1224 e + chrono::milliseconds(3000));
1225 EXPECT_TRUE(output0[1].data != nullptr);
1226 }
1227
1228 {
1229 SCOPED_TRACE("Trying node1 now");
1230 std::deque<TimestampedMessage> output1;
1231
1232 EXPECT_EQ(mapper0_count, 3u);
1233 EXPECT_EQ(mapper1_count, 0u);
1234
1235 ASSERT_TRUE(mapper1.Front() != nullptr);
1236 EXPECT_EQ(mapper0_count, 3u);
1237 EXPECT_EQ(mapper1_count, 1u);
1238 output1.emplace_back(std::move(*mapper1.Front()));
1239 mapper1.PopFront();
1240 EXPECT_TRUE(mapper1.started());
1241 EXPECT_EQ(mapper0_count, 3u);
1242 EXPECT_EQ(mapper1_count, 1u);
1243
1244 // mapper1_count is now at 3 since the second message is not queued, but
1245 // timestamp_callback needs to be called everytime even if Front() does not
1246 // provide a message due to the replay_channels_callback.
1247 ASSERT_TRUE(mapper1.Front() != nullptr);
1248 output1.emplace_back(std::move(*mapper1.Front()));
1249 mapper1.PopFront();
1250 EXPECT_TRUE(mapper1.started());
1251
1252 EXPECT_EQ(mapper0_count, 3u);
1253 EXPECT_EQ(mapper1_count, 3u);
1254
1255 ASSERT_TRUE(mapper1.Front() == nullptr);
1256
1257 EXPECT_EQ(mapper0_count, 3u);
1258 EXPECT_EQ(mapper1_count, 3u);
1259
1260 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1261 EXPECT_EQ(output1[0].monotonic_event_time.time,
1262 e + chrono::seconds(100) + chrono::milliseconds(1000));
1263 EXPECT_TRUE(output1[0].data != nullptr);
1264
1265 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1266 EXPECT_EQ(output1[1].monotonic_event_time.time,
1267 e + chrono::seconds(100) + chrono::milliseconds(3000));
1268 EXPECT_TRUE(output1[1].data != nullptr);
1269 }
1270}
Austin Schuh8bf1e632021-01-02 22:41:04 -08001271// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
1272// returned.
1273TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
1274 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1275 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001276 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001277 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001278 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001279 writer1.QueueSpan(config4_.span());
1280
Austin Schuhd863e6e2022-10-16 15:44:50 -07001281 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001282 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001283 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001284 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
1285 e + chrono::nanoseconds(971)));
1286
Austin Schuhd863e6e2022-10-16 15:44:50 -07001287 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001288 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001289 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001290 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
1291 e + chrono::nanoseconds(5458)));
1292
Austin Schuhd863e6e2022-10-16 15:44:50 -07001293 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001294 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001295 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001296 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1297 }
1298
1299 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001300 LogFilesContainer log_files(parts);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001301 ASSERT_EQ(parts.size(), 1u);
1302
Austin Schuh79b30942021-01-24 22:32:21 -08001303 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001304 TimestampMapper mapper0("pi1", log_files,
1305 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001306 mapper0.set_timestamp_callback(
1307 [&](TimestampedMessage *) { ++mapper0_count; });
1308 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001309 TimestampMapper mapper1("pi2", log_files,
1310 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001311 mapper1.set_timestamp_callback(
1312 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -08001313
1314 mapper0.AddPeer(&mapper1);
1315 mapper1.AddPeer(&mapper0);
1316
1317 {
1318 std::deque<TimestampedMessage> output0;
1319
1320 for (int i = 0; i < 3; ++i) {
1321 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
1322 output0.emplace_back(std::move(*mapper0.Front()));
1323 mapper0.PopFront();
1324 }
1325
1326 ASSERT_TRUE(mapper0.Front() == nullptr);
1327
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001328 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1329 EXPECT_EQ(output0[0].monotonic_event_time.time,
1330 e + chrono::milliseconds(1000));
1331 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
1332 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
1333 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001334 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001335
1336 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1337 EXPECT_EQ(output0[1].monotonic_event_time.time,
1338 e + chrono::milliseconds(2000));
1339 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
1340 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
1341 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001342 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001343
1344 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1345 EXPECT_EQ(output0[2].monotonic_event_time.time,
1346 e + chrono::milliseconds(3000));
1347 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
1348 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
1349 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001350 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001351 }
1352
1353 {
1354 SCOPED_TRACE("Trying node1 now");
1355 std::deque<TimestampedMessage> output1;
1356
1357 for (int i = 0; i < 3; ++i) {
1358 ASSERT_TRUE(mapper1.Front() != nullptr);
1359 output1.emplace_back(std::move(*mapper1.Front()));
1360 mapper1.PopFront();
1361 }
1362
1363 ASSERT_TRUE(mapper1.Front() == nullptr);
1364
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001365 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1366 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001367 e + chrono::seconds(100) + chrono::milliseconds(1000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001368 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
1369 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001370 e + chrono::nanoseconds(971));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001371 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001372
1373 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1374 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001375 e + chrono::seconds(100) + chrono::milliseconds(2000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001376 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
1377 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001378 e + chrono::nanoseconds(5458));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001379 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001380
1381 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1382 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001383 e + chrono::seconds(100) + chrono::milliseconds(3000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001384 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
1385 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
1386 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001387 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001388 }
Austin Schuh79b30942021-01-24 22:32:21 -08001389
1390 EXPECT_EQ(mapper0_count, 3u);
1391 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001392}
1393
Austin Schuhd2f96102020-12-01 20:27:29 -08001394// Tests that we can match timestamps on delivered messages. By doing this in
1395// the reverse order, the second node needs to queue data up from the first node
1396// to find the matching timestamp.
1397TEST_F(TimestampMapperTest, ReadNode1First) {
1398 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1399 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001400 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001401 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001402 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001403 writer1.QueueSpan(config2_.span());
1404
Austin Schuhd863e6e2022-10-16 15:44:50 -07001405 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001406 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001407 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001408 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1409
Austin Schuhd863e6e2022-10-16 15:44:50 -07001410 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001411 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001412 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001413 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1414
Austin Schuhd863e6e2022-10-16 15:44:50 -07001415 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001416 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001417 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001418 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1419 }
1420
1421 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001422 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001423
1424 ASSERT_EQ(parts[0].logger_node, "pi1");
1425 ASSERT_EQ(parts[1].logger_node, "pi2");
1426
Austin Schuh79b30942021-01-24 22:32:21 -08001427 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001428 TimestampMapper mapper0("pi1", log_files,
1429 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001430 mapper0.set_timestamp_callback(
1431 [&](TimestampedMessage *) { ++mapper0_count; });
1432 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001433 TimestampMapper mapper1("pi2", log_files,
1434 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001435 mapper1.set_timestamp_callback(
1436 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001437
1438 mapper0.AddPeer(&mapper1);
1439 mapper1.AddPeer(&mapper0);
1440
1441 {
1442 SCOPED_TRACE("Trying node1 now");
1443 std::deque<TimestampedMessage> output1;
1444
1445 ASSERT_TRUE(mapper1.Front() != nullptr);
1446 output1.emplace_back(std::move(*mapper1.Front()));
1447 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001448 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001449
1450 ASSERT_TRUE(mapper1.Front() != nullptr);
1451 output1.emplace_back(std::move(*mapper1.Front()));
1452 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001453 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001454
1455 ASSERT_TRUE(mapper1.Front() != nullptr);
1456 output1.emplace_back(std::move(*mapper1.Front()));
1457 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001458 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001459
1460 ASSERT_TRUE(mapper1.Front() == nullptr);
1461
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001462 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1463 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001464 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001465 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001466
1467 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1468 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001469 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001470 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001471
1472 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1473 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001474 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001475 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001476 }
1477
1478 {
1479 std::deque<TimestampedMessage> output0;
1480
1481 ASSERT_TRUE(mapper0.Front() != nullptr);
1482 output0.emplace_back(std::move(*mapper0.Front()));
1483 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001484 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001485
1486 ASSERT_TRUE(mapper0.Front() != nullptr);
1487 output0.emplace_back(std::move(*mapper0.Front()));
1488 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001489 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001490
1491 ASSERT_TRUE(mapper0.Front() != nullptr);
1492 output0.emplace_back(std::move(*mapper0.Front()));
1493 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001494 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001495
1496 ASSERT_TRUE(mapper0.Front() == nullptr);
1497
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001498 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1499 EXPECT_EQ(output0[0].monotonic_event_time.time,
1500 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001501 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001502
1503 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1504 EXPECT_EQ(output0[1].monotonic_event_time.time,
1505 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001506 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001507
1508 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1509 EXPECT_EQ(output0[2].monotonic_event_time.time,
1510 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001511 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001512 }
Austin Schuh79b30942021-01-24 22:32:21 -08001513
1514 EXPECT_EQ(mapper0_count, 3u);
1515 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001516}
1517
1518// Tests that we return just the timestamps if we couldn't find the data and the
1519// missing data was at the beginning of the file.
1520TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1521 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1522 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001523 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001524 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001525 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001526 writer1.QueueSpan(config2_.span());
1527
1528 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001529 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001530 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1531
Austin Schuhd863e6e2022-10-16 15:44:50 -07001532 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001533 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001534 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001535 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1536
Austin Schuhd863e6e2022-10-16 15:44:50 -07001537 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001538 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001539 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001540 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1541 }
1542
1543 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001544 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001545
1546 ASSERT_EQ(parts[0].logger_node, "pi1");
1547 ASSERT_EQ(parts[1].logger_node, "pi2");
1548
Austin Schuh79b30942021-01-24 22:32:21 -08001549 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001550 TimestampMapper mapper0("pi1", log_files,
1551 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001552 mapper0.set_timestamp_callback(
1553 [&](TimestampedMessage *) { ++mapper0_count; });
1554 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001555 TimestampMapper mapper1("pi2", log_files,
1556 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001557 mapper1.set_timestamp_callback(
1558 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001559
1560 mapper0.AddPeer(&mapper1);
1561 mapper1.AddPeer(&mapper0);
1562
1563 {
1564 SCOPED_TRACE("Trying node1 now");
1565 std::deque<TimestampedMessage> output1;
1566
1567 ASSERT_TRUE(mapper1.Front() != nullptr);
1568 output1.emplace_back(std::move(*mapper1.Front()));
1569 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001570 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001571
1572 ASSERT_TRUE(mapper1.Front() != nullptr);
1573 output1.emplace_back(std::move(*mapper1.Front()));
1574 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001575 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001576
1577 ASSERT_TRUE(mapper1.Front() != nullptr);
1578 output1.emplace_back(std::move(*mapper1.Front()));
1579 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001580 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001581
1582 ASSERT_TRUE(mapper1.Front() == nullptr);
1583
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001584 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1585 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001586 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001587 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001588
1589 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1590 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001591 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001592 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001593
1594 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1595 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001596 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001597 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001598 }
Austin Schuh79b30942021-01-24 22:32:21 -08001599
1600 EXPECT_EQ(mapper0_count, 0u);
1601 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001602}
1603
1604// Tests that we return just the timestamps if we couldn't find the data and the
1605// missing data was at the end of the file.
1606TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1607 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1608 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001609 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001610 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001611 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001612 writer1.QueueSpan(config2_.span());
1613
Austin Schuhd863e6e2022-10-16 15:44:50 -07001614 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001615 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001616 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001617 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1618
Austin Schuhd863e6e2022-10-16 15:44:50 -07001619 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001620 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001621 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001622 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1623
1624 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001625 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001626 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1627 }
1628
1629 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001630 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001631
1632 ASSERT_EQ(parts[0].logger_node, "pi1");
1633 ASSERT_EQ(parts[1].logger_node, "pi2");
1634
Austin Schuh79b30942021-01-24 22:32:21 -08001635 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001636 TimestampMapper mapper0("pi1", log_files,
1637 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001638 mapper0.set_timestamp_callback(
1639 [&](TimestampedMessage *) { ++mapper0_count; });
1640 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001641 TimestampMapper mapper1("pi2", log_files,
1642 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001643 mapper1.set_timestamp_callback(
1644 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001645
1646 mapper0.AddPeer(&mapper1);
1647 mapper1.AddPeer(&mapper0);
1648
1649 {
1650 SCOPED_TRACE("Trying node1 now");
1651 std::deque<TimestampedMessage> output1;
1652
1653 ASSERT_TRUE(mapper1.Front() != nullptr);
1654 output1.emplace_back(std::move(*mapper1.Front()));
1655 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001656 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001657
1658 ASSERT_TRUE(mapper1.Front() != nullptr);
1659 output1.emplace_back(std::move(*mapper1.Front()));
1660 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001661 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001662
1663 ASSERT_TRUE(mapper1.Front() != nullptr);
1664 output1.emplace_back(std::move(*mapper1.Front()));
1665 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001666 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001667
1668 ASSERT_TRUE(mapper1.Front() == nullptr);
1669
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001670 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1671 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001672 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001673 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001674
1675 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1676 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001677 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001678 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001679
1680 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1681 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001682 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001683 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001684 }
Austin Schuh79b30942021-01-24 22:32:21 -08001685
1686 EXPECT_EQ(mapper0_count, 0u);
1687 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001688}
1689
Austin Schuh993ccb52020-12-12 15:59:32 -08001690// Tests that we handle a message which failed to forward or be logged.
1691TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1692 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1693 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001694 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001695 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001696 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001697 writer1.QueueSpan(config2_.span());
1698
Austin Schuhd863e6e2022-10-16 15:44:50 -07001699 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001700 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001701 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001702 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1703
1704 // Create both the timestamp and message, but don't log them, simulating a
1705 // forwarding drop.
1706 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1707 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1708 chrono::seconds(100));
1709
Austin Schuhd863e6e2022-10-16 15:44:50 -07001710 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001711 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001712 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001713 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1714 }
1715
1716 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001717 LogFilesContainer log_files(parts);
Austin Schuh993ccb52020-12-12 15:59:32 -08001718
1719 ASSERT_EQ(parts[0].logger_node, "pi1");
1720 ASSERT_EQ(parts[1].logger_node, "pi2");
1721
Austin Schuh79b30942021-01-24 22:32:21 -08001722 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001723 TimestampMapper mapper0("pi1", log_files,
1724 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001725 mapper0.set_timestamp_callback(
1726 [&](TimestampedMessage *) { ++mapper0_count; });
1727 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001728 TimestampMapper mapper1("pi2", log_files,
1729 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001730 mapper1.set_timestamp_callback(
1731 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001732
1733 mapper0.AddPeer(&mapper1);
1734 mapper1.AddPeer(&mapper0);
1735
1736 {
1737 std::deque<TimestampedMessage> output1;
1738
1739 ASSERT_TRUE(mapper1.Front() != nullptr);
1740 output1.emplace_back(std::move(*mapper1.Front()));
1741 mapper1.PopFront();
1742
1743 ASSERT_TRUE(mapper1.Front() != nullptr);
1744 output1.emplace_back(std::move(*mapper1.Front()));
1745
1746 ASSERT_FALSE(mapper1.Front() == nullptr);
1747
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001748 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1749 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001750 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001751 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001752
1753 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1754 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001755 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001756 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh993ccb52020-12-12 15:59:32 -08001757 }
Austin Schuh79b30942021-01-24 22:32:21 -08001758
1759 EXPECT_EQ(mapper0_count, 0u);
1760 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001761}
1762
Austin Schuhd2f96102020-12-01 20:27:29 -08001763// Tests that we properly sort log files with duplicate timestamps.
1764TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1765 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1766 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001767 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001768 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001769 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001770 writer1.QueueSpan(config2_.span());
1771
Austin Schuhd863e6e2022-10-16 15:44:50 -07001772 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001773 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001774 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001775 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1776
Austin Schuhd863e6e2022-10-16 15:44:50 -07001777 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001778 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001779 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001780 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1781
Austin Schuhd863e6e2022-10-16 15:44:50 -07001782 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001783 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001784 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001785 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1786
Austin Schuhd863e6e2022-10-16 15:44:50 -07001787 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001788 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001789 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001790 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1791 }
1792
1793 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001794 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001795
1796 ASSERT_EQ(parts[0].logger_node, "pi1");
1797 ASSERT_EQ(parts[1].logger_node, "pi2");
1798
Austin Schuh79b30942021-01-24 22:32:21 -08001799 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001800 TimestampMapper mapper0("pi1", log_files,
1801 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001802 mapper0.set_timestamp_callback(
1803 [&](TimestampedMessage *) { ++mapper0_count; });
1804 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001805 TimestampMapper mapper1("pi2", log_files,
1806 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001807 mapper1.set_timestamp_callback(
1808 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001809
1810 mapper0.AddPeer(&mapper1);
1811 mapper1.AddPeer(&mapper0);
1812
1813 {
1814 SCOPED_TRACE("Trying node1 now");
1815 std::deque<TimestampedMessage> output1;
1816
1817 for (int i = 0; i < 4; ++i) {
1818 ASSERT_TRUE(mapper1.Front() != nullptr);
1819 output1.emplace_back(std::move(*mapper1.Front()));
1820 mapper1.PopFront();
1821 }
1822 ASSERT_TRUE(mapper1.Front() == nullptr);
1823
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001824 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1825 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001826 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001827 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001828
1829 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1830 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001831 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001832 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001833
1834 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1835 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001836 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001837 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001838
1839 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1840 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001841 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001842 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001843 }
Austin Schuh79b30942021-01-24 22:32:21 -08001844
1845 EXPECT_EQ(mapper0_count, 0u);
1846 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001847}
1848
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001849// Tests that we properly produce a valid start time.
Austin Schuhd2f96102020-12-01 20:27:29 -08001850TEST_F(TimestampMapperTest, StartTime) {
1851 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1852 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001853 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001854 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001855 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001856 writer1.QueueSpan(config1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001857 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001858 writer2.QueueSpan(config3_.span());
1859 }
1860
1861 const std::vector<LogFile> parts =
1862 SortParts({logfile0_, logfile1_, logfile2_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001863 LogFilesContainer log_files(parts);
Austin Schuhd2f96102020-12-01 20:27:29 -08001864
Austin Schuh79b30942021-01-24 22:32:21 -08001865 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001866 TimestampMapper mapper0("pi1", log_files,
1867 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001868 mapper0.set_timestamp_callback(
1869 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001870
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001871 EXPECT_EQ(mapper0.monotonic_start_time(0), e + chrono::milliseconds(1));
1872 EXPECT_EQ(mapper0.realtime_start_time(0),
Austin Schuhd2f96102020-12-01 20:27:29 -08001873 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001874 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001875}
1876
Austin Schuhfecf1d82020-12-19 16:57:28 -08001877// Tests that when a peer isn't registered, we treat that as if there was no
1878// data available.
1879TEST_F(TimestampMapperTest, NoPeer) {
1880 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1881 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001882 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001883 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001884 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001885 writer1.QueueSpan(config2_.span());
1886
1887 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001888 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001889 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1890
Austin Schuhd863e6e2022-10-16 15:44:50 -07001891 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001892 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001893 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001894 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1895
Austin Schuhd863e6e2022-10-16 15:44:50 -07001896 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001897 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001898 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001899 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1900 }
1901
1902 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001903 LogFilesContainer log_files(parts);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001904
1905 ASSERT_EQ(parts[0].logger_node, "pi1");
1906 ASSERT_EQ(parts[1].logger_node, "pi2");
1907
Austin Schuh79b30942021-01-24 22:32:21 -08001908 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001909 TimestampMapper mapper1("pi2", log_files,
1910 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001911 mapper1.set_timestamp_callback(
1912 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001913
1914 {
1915 std::deque<TimestampedMessage> output1;
1916
1917 ASSERT_TRUE(mapper1.Front() != nullptr);
1918 output1.emplace_back(std::move(*mapper1.Front()));
1919 mapper1.PopFront();
1920 ASSERT_TRUE(mapper1.Front() != nullptr);
1921 output1.emplace_back(std::move(*mapper1.Front()));
1922 mapper1.PopFront();
1923 ASSERT_TRUE(mapper1.Front() != nullptr);
1924 output1.emplace_back(std::move(*mapper1.Front()));
1925 mapper1.PopFront();
1926 ASSERT_TRUE(mapper1.Front() == nullptr);
1927
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001928 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1929 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001930 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001931 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001932
1933 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1934 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001935 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001936 EXPECT_FALSE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001937
1938 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1939 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001940 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001941 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001942 }
Austin Schuh79b30942021-01-24 22:32:21 -08001943 EXPECT_EQ(mapper1_count, 3u);
1944}
1945
1946// Tests that we can queue messages and call the timestamp callback for both
1947// nodes.
1948TEST_F(TimestampMapperTest, QueueUntilNode0) {
1949 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1950 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001951 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh79b30942021-01-24 22:32:21 -08001952 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001953 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh79b30942021-01-24 22:32:21 -08001954 writer1.QueueSpan(config2_.span());
1955
Austin Schuhd863e6e2022-10-16 15:44:50 -07001956 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001957 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001958 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001959 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1960
Austin Schuhd863e6e2022-10-16 15:44:50 -07001961 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001962 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001963 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001964 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1965
Austin Schuhd863e6e2022-10-16 15:44:50 -07001966 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001967 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001968 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001969 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1970
Austin Schuhd863e6e2022-10-16 15:44:50 -07001971 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001972 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001973 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001974 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1975 }
1976
1977 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07001978 LogFilesContainer log_files(parts);
Austin Schuh79b30942021-01-24 22:32:21 -08001979
1980 ASSERT_EQ(parts[0].logger_node, "pi1");
1981 ASSERT_EQ(parts[1].logger_node, "pi2");
1982
1983 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001984 TimestampMapper mapper0("pi1", log_files,
1985 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001986 mapper0.set_timestamp_callback(
1987 [&](TimestampedMessage *) { ++mapper0_count; });
1988 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001989 TimestampMapper mapper1("pi2", log_files,
1990 TimestampQueueStrategy::kQueueTogether);
Austin Schuh79b30942021-01-24 22:32:21 -08001991 mapper1.set_timestamp_callback(
1992 [&](TimestampedMessage *) { ++mapper1_count; });
1993
1994 mapper0.AddPeer(&mapper1);
1995 mapper1.AddPeer(&mapper0);
1996
1997 {
1998 std::deque<TimestampedMessage> output0;
1999
2000 EXPECT_EQ(mapper0_count, 0u);
2001 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002002 mapper0.QueueUntil(
2003 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08002004 EXPECT_EQ(mapper0_count, 3u);
2005 EXPECT_EQ(mapper1_count, 0u);
2006
2007 ASSERT_TRUE(mapper0.Front() != nullptr);
2008 EXPECT_EQ(mapper0_count, 3u);
2009 EXPECT_EQ(mapper1_count, 0u);
2010
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002011 mapper0.QueueUntil(
2012 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08002013 EXPECT_EQ(mapper0_count, 3u);
2014 EXPECT_EQ(mapper1_count, 0u);
2015
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002016 mapper0.QueueUntil(
2017 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08002018 EXPECT_EQ(mapper0_count, 4u);
2019 EXPECT_EQ(mapper1_count, 0u);
2020
2021 output0.emplace_back(std::move(*mapper0.Front()));
2022 mapper0.PopFront();
2023 output0.emplace_back(std::move(*mapper0.Front()));
2024 mapper0.PopFront();
2025 output0.emplace_back(std::move(*mapper0.Front()));
2026 mapper0.PopFront();
2027 output0.emplace_back(std::move(*mapper0.Front()));
2028 mapper0.PopFront();
2029
2030 EXPECT_EQ(mapper0_count, 4u);
2031 EXPECT_EQ(mapper1_count, 0u);
2032
2033 ASSERT_TRUE(mapper0.Front() == nullptr);
2034
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002035 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2036 EXPECT_EQ(output0[0].monotonic_event_time.time,
2037 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002038 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002039
2040 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2041 EXPECT_EQ(output0[1].monotonic_event_time.time,
2042 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002043 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002044
2045 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2046 EXPECT_EQ(output0[2].monotonic_event_time.time,
2047 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002048 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002049
2050 EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
2051 EXPECT_EQ(output0[3].monotonic_event_time.time,
2052 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002053 EXPECT_TRUE(output0[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08002054 }
2055
2056 {
2057 SCOPED_TRACE("Trying node1 now");
2058 std::deque<TimestampedMessage> output1;
2059
2060 EXPECT_EQ(mapper0_count, 4u);
2061 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002062 mapper1.QueueUntil(BootTimestamp{
2063 .boot = 0,
2064 .time = e + chrono::seconds(100) + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08002065 EXPECT_EQ(mapper0_count, 4u);
2066 EXPECT_EQ(mapper1_count, 3u);
2067
2068 ASSERT_TRUE(mapper1.Front() != nullptr);
2069 EXPECT_EQ(mapper0_count, 4u);
2070 EXPECT_EQ(mapper1_count, 3u);
2071
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002072 mapper1.QueueUntil(BootTimestamp{
2073 .boot = 0,
2074 .time = e + chrono::seconds(100) + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08002075 EXPECT_EQ(mapper0_count, 4u);
2076 EXPECT_EQ(mapper1_count, 3u);
2077
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002078 mapper1.QueueUntil(BootTimestamp{
2079 .boot = 0,
2080 .time = e + chrono::seconds(100) + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08002081 EXPECT_EQ(mapper0_count, 4u);
2082 EXPECT_EQ(mapper1_count, 4u);
2083
2084 ASSERT_TRUE(mapper1.Front() != nullptr);
2085 EXPECT_EQ(mapper0_count, 4u);
2086 EXPECT_EQ(mapper1_count, 4u);
2087
2088 output1.emplace_back(std::move(*mapper1.Front()));
2089 mapper1.PopFront();
2090 ASSERT_TRUE(mapper1.Front() != nullptr);
2091 output1.emplace_back(std::move(*mapper1.Front()));
2092 mapper1.PopFront();
2093 ASSERT_TRUE(mapper1.Front() != nullptr);
2094 output1.emplace_back(std::move(*mapper1.Front()));
2095 mapper1.PopFront();
2096 ASSERT_TRUE(mapper1.Front() != nullptr);
2097 output1.emplace_back(std::move(*mapper1.Front()));
2098 mapper1.PopFront();
2099
2100 EXPECT_EQ(mapper0_count, 4u);
2101 EXPECT_EQ(mapper1_count, 4u);
2102
2103 ASSERT_TRUE(mapper1.Front() == nullptr);
2104
2105 EXPECT_EQ(mapper0_count, 4u);
2106 EXPECT_EQ(mapper1_count, 4u);
2107
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002108 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2109 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002110 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002111 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002112
2113 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
2114 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002115 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002116 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002117
2118 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
2119 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002120 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002121 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002122
2123 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
2124 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002125 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002126 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08002127 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08002128}
2129
Philipp Schrader416505b2024-03-28 11:59:45 -07002130// Validates that we can read timestamps on startup even for single-node logs.
2131TEST_F(SingleNodeTimestampMapperTest, QueueTimestampsForSingleNodes) {
2132 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2133 {
2134 TestDetachedBufferWriter writer0(logfile0_);
2135 writer0.QueueSpan(config0_.span());
2136
2137 writer0.WriteSizedFlatbuffer(
2138 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
2139 writer0.WriteSizedFlatbuffer(
2140 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
2141 writer0.WriteSizedFlatbuffer(
2142 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
2143 writer0.WriteSizedFlatbuffer(
2144 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
2145 }
2146
2147 const std::vector<LogFile> parts = SortParts({logfile0_});
2148 LogFilesContainer log_files(parts);
2149
2150 ASSERT_EQ(parts[0].logger_node, "pi1");
2151
2152 size_t mapper0_count = 0;
2153 TimestampMapper mapper0("pi1", log_files,
2154 TimestampQueueStrategy::kQueueTimestampsAtStartup);
2155 mapper0.set_timestamp_callback(
2156 [&](TimestampedMessage *) { ++mapper0_count; });
2157 mapper0.QueueTimestamps();
2158
2159 for (int i = 0; i < 4; ++i) {
2160 ASSERT_TRUE(mapper0.Front() != nullptr);
2161 mapper0.PopFront();
2162 }
2163 EXPECT_TRUE(mapper0.Front() == nullptr);
2164 EXPECT_EQ(mapper0_count, 4u);
2165}
2166
2167class BootMergerTest : public SortingElementTest<> {
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002168 public:
2169 BootMergerTest()
2170 : SortingElementTest(),
2171 boot0_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002172 /* 100ms */
2173 "max_out_of_order_duration": 100000000,
2174 "node": {
2175 "name": "pi2"
2176 },
2177 "logger_node": {
2178 "name": "pi1"
2179 },
2180 "monotonic_start_time": 1000000,
2181 "realtime_start_time": 1000000000000,
2182 "logger_monotonic_start_time": 1000000,
2183 "logger_realtime_start_time": 1000000000000,
2184 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2185 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
2186 "parts_index": 0,
2187 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2188 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002189 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2190 "boot_uuids": [
2191 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2192 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2193 ""
2194 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002195})")),
2196 boot1_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002197 /* 100ms */
2198 "max_out_of_order_duration": 100000000,
2199 "node": {
2200 "name": "pi2"
2201 },
2202 "logger_node": {
2203 "name": "pi1"
2204 },
2205 "monotonic_start_time": 1000000,
2206 "realtime_start_time": 1000000000000,
2207 "logger_monotonic_start_time": 1000000,
2208 "logger_realtime_start_time": 1000000000000,
2209 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2210 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
2211 "parts_index": 1,
2212 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2213 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002214 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2215 "boot_uuids": [
2216 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2217 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2218 ""
2219 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002220})")) {}
Austin Schuh8bee6882021-06-28 21:03:28 -07002221
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002222 protected:
2223 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
2224 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
2225};
2226
2227// This tests that we can properly sort a multi-node log file which has the old
2228// (and buggy) timestamps in the header, and the non-resetting parts_index.
2229// These make it so we can just bairly figure out what happened first and what
2230// happened second, but not in a way that is robust to multiple nodes rebooting.
2231TEST_F(BootMergerTest, OldReboot) {
Austin Schuh8bee6882021-06-28 21:03:28 -07002232 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002233 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002234 writer.QueueSpan(boot0_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002235 }
2236 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002237 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002238 writer.QueueSpan(boot1_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002239 }
2240
2241 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
2242
2243 ASSERT_EQ(parts.size(), 1u);
2244 ASSERT_EQ(parts[0].parts.size(), 2u);
2245
2246 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
2247 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002248 boot0_.message().source_node_boot_uuid()->string_view());
Austin Schuh8bee6882021-06-28 21:03:28 -07002249
2250 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
2251 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002252 boot1_.message().source_node_boot_uuid()->string_view());
2253}
2254
2255// This tests that we can produce messages ordered across a reboot.
2256TEST_F(BootMergerTest, SortAcrossReboot) {
2257 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2258 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002259 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002260 writer.QueueSpan(boot0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002261 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002262 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002263 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002264 MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
2265 }
2266 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002267 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002268 writer.QueueSpan(boot1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002269 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002270 MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002271 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002272 MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
2273 }
2274
2275 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07002276 LogFilesContainer log_files(parts);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002277 ASSERT_EQ(parts.size(), 1u);
2278 ASSERT_EQ(parts[0].parts.size(), 2u);
2279
Austin Schuh63097262023-08-16 17:04:29 -07002280 BootMerger merger("pi2", log_files,
2281 {StoredDataType::DATA, StoredDataType::TIMESTAMPS,
2282 StoredDataType::REMOTE_TIMESTAMPS});
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002283
2284 EXPECT_EQ(merger.node(), 1u);
2285
2286 std::vector<Message> output;
2287 for (int i = 0; i < 4; ++i) {
2288 ASSERT_TRUE(merger.Front() != nullptr);
2289 output.emplace_back(std::move(*merger.Front()));
2290 merger.PopFront();
2291 }
2292
2293 ASSERT_TRUE(merger.Front() == nullptr);
2294
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002295 EXPECT_EQ(output[0].timestamp.boot, 0u);
2296 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
2297 EXPECT_EQ(output[1].timestamp.boot, 0u);
2298 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(2000));
2299
2300 EXPECT_EQ(output[2].timestamp.boot, 1u);
2301 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(100));
2302 EXPECT_EQ(output[3].timestamp.boot, 1u);
2303 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
Austin Schuh8bee6882021-06-28 21:03:28 -07002304}
2305
Philipp Schrader416505b2024-03-28 11:59:45 -07002306class RebootTimestampMapperTest : public SortingElementTest<> {
Austin Schuh48507722021-07-17 17:29:24 -07002307 public:
2308 RebootTimestampMapperTest()
2309 : SortingElementTest(),
2310 boot0a_(MakeHeader(config_, R"({
2311 /* 100ms */
2312 "max_out_of_order_duration": 100000000,
2313 "node": {
2314 "name": "pi1"
2315 },
2316 "logger_node": {
2317 "name": "pi1"
2318 },
2319 "monotonic_start_time": 1000000,
2320 "realtime_start_time": 1000000000000,
2321 "logger_monotonic_start_time": 1000000,
2322 "logger_realtime_start_time": 1000000000000,
2323 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2324 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2325 "parts_index": 0,
2326 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2327 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2328 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2329 "boot_uuids": [
2330 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2331 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2332 ""
2333 ]
2334})")),
2335 boot0b_(MakeHeader(config_, R"({
2336 /* 100ms */
2337 "max_out_of_order_duration": 100000000,
2338 "node": {
2339 "name": "pi1"
2340 },
2341 "logger_node": {
2342 "name": "pi1"
2343 },
2344 "monotonic_start_time": 1000000,
2345 "realtime_start_time": 1000000000000,
2346 "logger_monotonic_start_time": 1000000,
2347 "logger_realtime_start_time": 1000000000000,
2348 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2349 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2350 "parts_index": 1,
2351 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2352 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2353 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2354 "boot_uuids": [
2355 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2356 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2357 ""
2358 ]
2359})")),
2360 boot1a_(MakeHeader(config_, R"({
2361 /* 100ms */
2362 "max_out_of_order_duration": 100000000,
2363 "node": {
2364 "name": "pi2"
2365 },
2366 "logger_node": {
2367 "name": "pi1"
2368 },
2369 "monotonic_start_time": 1000000,
2370 "realtime_start_time": 1000000000000,
2371 "logger_monotonic_start_time": 1000000,
2372 "logger_realtime_start_time": 1000000000000,
2373 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2374 "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
2375 "parts_index": 0,
2376 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2377 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2378 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2379 "boot_uuids": [
2380 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2381 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2382 ""
2383 ]
2384})")),
2385 boot1b_(MakeHeader(config_, R"({
2386 /* 100ms */
2387 "max_out_of_order_duration": 100000000,
2388 "node": {
2389 "name": "pi2"
2390 },
2391 "logger_node": {
2392 "name": "pi1"
2393 },
2394 "monotonic_start_time": 1000000,
2395 "realtime_start_time": 1000000000000,
2396 "logger_monotonic_start_time": 1000000,
2397 "logger_realtime_start_time": 1000000000000,
2398 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2399 "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
2400 "parts_index": 1,
2401 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2402 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2403 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2404 "boot_uuids": [
2405 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2406 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2407 ""
2408 ]
2409})")) {}
2410
2411 protected:
2412 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
2413 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
2414 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
2415 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
2416};
2417
Austin Schuh48507722021-07-17 17:29:24 -07002418// Tests that we can match timestamps on delivered messages in the presence of
2419// reboots on the node receiving timestamps.
2420TEST_F(RebootTimestampMapperTest, ReadNode0First) {
2421 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2422 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002423 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002424 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002425 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002426 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002427 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002428 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002429 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002430 writer1b.QueueSpan(boot1b_.span());
2431
Austin Schuhd863e6e2022-10-16 15:44:50 -07002432 writer0a.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002433 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002434 writer1a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002435 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
2436 e + chrono::milliseconds(1001)));
2437
Austin Schuhd863e6e2022-10-16 15:44:50 -07002438 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh58646e22021-08-23 23:51:46 -07002439 e + chrono::milliseconds(1000), 0, chrono::seconds(21),
2440 e + chrono::milliseconds(2001)));
2441
Austin Schuhd863e6e2022-10-16 15:44:50 -07002442 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002443 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002444 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002445 e + chrono::milliseconds(2000), 0, chrono::seconds(20),
2446 e + chrono::milliseconds(2001)));
2447
Austin Schuhd863e6e2022-10-16 15:44:50 -07002448 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002449 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002450 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002451 e + chrono::milliseconds(3000), 0, chrono::seconds(20),
2452 e + chrono::milliseconds(3001)));
2453 }
2454
Austin Schuh58646e22021-08-23 23:51:46 -07002455 const std::vector<LogFile> parts =
2456 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07002457 LogFilesContainer log_files(parts);
Austin Schuh48507722021-07-17 17:29:24 -07002458
2459 for (const auto &x : parts) {
2460 LOG(INFO) << x;
2461 }
2462 ASSERT_EQ(parts.size(), 1u);
2463 ASSERT_EQ(parts[0].logger_node, "pi1");
2464
2465 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07002466 TimestampMapper mapper0("pi1", log_files,
2467 TimestampQueueStrategy::kQueueTogether);
Austin Schuh48507722021-07-17 17:29:24 -07002468 mapper0.set_timestamp_callback(
2469 [&](TimestampedMessage *) { ++mapper0_count; });
2470 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07002471 TimestampMapper mapper1("pi2", log_files,
2472 TimestampQueueStrategy::kQueueTogether);
Austin Schuh48507722021-07-17 17:29:24 -07002473 mapper1.set_timestamp_callback(
2474 [&](TimestampedMessage *) { ++mapper1_count; });
2475
2476 mapper0.AddPeer(&mapper1);
2477 mapper1.AddPeer(&mapper0);
2478
2479 {
2480 std::deque<TimestampedMessage> output0;
2481
2482 EXPECT_EQ(mapper0_count, 0u);
2483 EXPECT_EQ(mapper1_count, 0u);
2484 ASSERT_TRUE(mapper0.Front() != nullptr);
2485 EXPECT_EQ(mapper0_count, 1u);
2486 EXPECT_EQ(mapper1_count, 0u);
2487 output0.emplace_back(std::move(*mapper0.Front()));
2488 mapper0.PopFront();
2489 EXPECT_TRUE(mapper0.started());
2490 EXPECT_EQ(mapper0_count, 1u);
2491 EXPECT_EQ(mapper1_count, 0u);
2492
2493 ASSERT_TRUE(mapper0.Front() != nullptr);
2494 EXPECT_EQ(mapper0_count, 2u);
2495 EXPECT_EQ(mapper1_count, 0u);
2496 output0.emplace_back(std::move(*mapper0.Front()));
2497 mapper0.PopFront();
2498 EXPECT_TRUE(mapper0.started());
2499
2500 ASSERT_TRUE(mapper0.Front() != nullptr);
2501 output0.emplace_back(std::move(*mapper0.Front()));
2502 mapper0.PopFront();
2503 EXPECT_TRUE(mapper0.started());
2504
2505 EXPECT_EQ(mapper0_count, 3u);
2506 EXPECT_EQ(mapper1_count, 0u);
2507
2508 ASSERT_TRUE(mapper0.Front() == nullptr);
2509
2510 LOG(INFO) << output0[0];
2511 LOG(INFO) << output0[1];
2512 LOG(INFO) << output0[2];
2513
2514 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2515 EXPECT_EQ(output0[0].monotonic_event_time.time,
2516 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002517 EXPECT_EQ(output0[0].queue_index,
2518 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002519 EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
2520 EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002521 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002522
2523 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2524 EXPECT_EQ(output0[1].monotonic_event_time.time,
2525 e + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002526 EXPECT_EQ(output0[1].queue_index,
2527 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002528 EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
2529 EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002530 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002531
2532 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2533 EXPECT_EQ(output0[2].monotonic_event_time.time,
2534 e + chrono::milliseconds(3000));
Austin Schuh58646e22021-08-23 23:51:46 -07002535 EXPECT_EQ(output0[2].queue_index,
2536 (BootQueueIndex{.boot = 0u, .index = 2u}));
Austin Schuh48507722021-07-17 17:29:24 -07002537 EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
2538 EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002539 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002540 }
2541
2542 {
2543 SCOPED_TRACE("Trying node1 now");
2544 std::deque<TimestampedMessage> output1;
2545
2546 EXPECT_EQ(mapper0_count, 3u);
2547 EXPECT_EQ(mapper1_count, 0u);
2548
2549 ASSERT_TRUE(mapper1.Front() != nullptr);
2550 EXPECT_EQ(mapper0_count, 3u);
2551 EXPECT_EQ(mapper1_count, 1u);
2552 output1.emplace_back(std::move(*mapper1.Front()));
2553 mapper1.PopFront();
2554 EXPECT_TRUE(mapper1.started());
2555 EXPECT_EQ(mapper0_count, 3u);
2556 EXPECT_EQ(mapper1_count, 1u);
2557
2558 ASSERT_TRUE(mapper1.Front() != nullptr);
2559 EXPECT_EQ(mapper0_count, 3u);
2560 EXPECT_EQ(mapper1_count, 2u);
2561 output1.emplace_back(std::move(*mapper1.Front()));
2562 mapper1.PopFront();
2563 EXPECT_TRUE(mapper1.started());
2564
2565 ASSERT_TRUE(mapper1.Front() != nullptr);
2566 output1.emplace_back(std::move(*mapper1.Front()));
2567 mapper1.PopFront();
2568 EXPECT_TRUE(mapper1.started());
2569
Austin Schuh58646e22021-08-23 23:51:46 -07002570 ASSERT_TRUE(mapper1.Front() != nullptr);
2571 output1.emplace_back(std::move(*mapper1.Front()));
2572 mapper1.PopFront();
2573 EXPECT_TRUE(mapper1.started());
2574
Austin Schuh48507722021-07-17 17:29:24 -07002575 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002576 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002577
2578 ASSERT_TRUE(mapper1.Front() == nullptr);
2579
2580 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002581 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002582
2583 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2584 EXPECT_EQ(output1[0].monotonic_event_time.time,
2585 e + chrono::seconds(100) + chrono::milliseconds(1000));
2586 EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
2587 EXPECT_EQ(output1[0].monotonic_remote_time.time,
2588 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002589 EXPECT_EQ(output1[0].remote_queue_index,
2590 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002591 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
2592 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
2593 e + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002594 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002595
2596 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2597 EXPECT_EQ(output1[1].monotonic_event_time.time,
2598 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002599 EXPECT_EQ(output1[1].remote_queue_index,
2600 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002601 EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
2602 EXPECT_EQ(output1[1].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002603 e + chrono::milliseconds(1000));
Austin Schuh48507722021-07-17 17:29:24 -07002604 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
2605 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
2606 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002607 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002608
2609 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2610 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002611 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh48507722021-07-17 17:29:24 -07002612 EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
2613 EXPECT_EQ(output1[2].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002614 e + chrono::milliseconds(2000));
2615 EXPECT_EQ(output1[2].remote_queue_index,
2616 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002617 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
2618 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002619 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002620 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002621
Austin Schuh58646e22021-08-23 23:51:46 -07002622 EXPECT_EQ(output1[3].monotonic_event_time.boot, 1u);
2623 EXPECT_EQ(output1[3].monotonic_event_time.time,
2624 e + chrono::seconds(20) + chrono::milliseconds(3000));
2625 EXPECT_EQ(output1[3].monotonic_remote_time.boot, 0u);
2626 EXPECT_EQ(output1[3].monotonic_remote_time.time,
2627 e + chrono::milliseconds(3000));
2628 EXPECT_EQ(output1[3].remote_queue_index,
2629 (BootQueueIndex{.boot = 0u, .index = 2u}));
2630 EXPECT_EQ(output1[3].monotonic_timestamp_time.boot, 0u);
2631 EXPECT_EQ(output1[3].monotonic_timestamp_time.time,
2632 e + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002633 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh58646e22021-08-23 23:51:46 -07002634
Austin Schuh48507722021-07-17 17:29:24 -07002635 LOG(INFO) << output1[0];
2636 LOG(INFO) << output1[1];
2637 LOG(INFO) << output1[2];
Austin Schuh58646e22021-08-23 23:51:46 -07002638 LOG(INFO) << output1[3];
Austin Schuh48507722021-07-17 17:29:24 -07002639 }
2640}
2641
2642TEST_F(RebootTimestampMapperTest, Node2Reboot) {
2643 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2644 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002645 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002646 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002647 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002648 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002649 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002650 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002651 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002652 writer1b.QueueSpan(boot1b_.span());
2653
Austin Schuhd863e6e2022-10-16 15:44:50 -07002654 writer1a.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002655 e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002656 writer0a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002657 e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
2658 chrono::seconds(-100),
2659 e + chrono::seconds(100) + chrono::milliseconds(1001)));
2660
Austin Schuhd863e6e2022-10-16 15:44:50 -07002661 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002662 e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002663 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002664 e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
2665 chrono::seconds(-20),
2666 e + chrono::seconds(20) + chrono::milliseconds(2001)));
2667
Austin Schuhd863e6e2022-10-16 15:44:50 -07002668 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002669 e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002670 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002671 e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
2672 chrono::seconds(-20),
2673 e + chrono::seconds(20) + chrono::milliseconds(3001)));
2674 }
2675
2676 const std::vector<LogFile> parts =
2677 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Alexei Strots1f51ac72023-05-15 10:14:54 -07002678 LogFilesContainer log_files(parts);
Austin Schuh48507722021-07-17 17:29:24 -07002679
2680 for (const auto &x : parts) {
2681 LOG(INFO) << x;
2682 }
2683 ASSERT_EQ(parts.size(), 1u);
2684 ASSERT_EQ(parts[0].logger_node, "pi1");
2685
2686 size_t mapper0_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07002687 TimestampMapper mapper0("pi1", log_files,
2688 TimestampQueueStrategy::kQueueTogether);
Austin Schuh48507722021-07-17 17:29:24 -07002689 mapper0.set_timestamp_callback(
2690 [&](TimestampedMessage *) { ++mapper0_count; });
2691 size_t mapper1_count = 0;
Austin Schuh63097262023-08-16 17:04:29 -07002692 TimestampMapper mapper1("pi2", log_files,
2693 TimestampQueueStrategy::kQueueTogether);
Austin Schuh48507722021-07-17 17:29:24 -07002694 mapper1.set_timestamp_callback(
2695 [&](TimestampedMessage *) { ++mapper1_count; });
2696
2697 mapper0.AddPeer(&mapper1);
2698 mapper1.AddPeer(&mapper0);
2699
2700 {
2701 std::deque<TimestampedMessage> output0;
2702
2703 EXPECT_EQ(mapper0_count, 0u);
2704 EXPECT_EQ(mapper1_count, 0u);
2705 ASSERT_TRUE(mapper0.Front() != nullptr);
2706 EXPECT_EQ(mapper0_count, 1u);
2707 EXPECT_EQ(mapper1_count, 0u);
2708 output0.emplace_back(std::move(*mapper0.Front()));
2709 mapper0.PopFront();
2710 EXPECT_TRUE(mapper0.started());
2711 EXPECT_EQ(mapper0_count, 1u);
2712 EXPECT_EQ(mapper1_count, 0u);
2713
2714 ASSERT_TRUE(mapper0.Front() != nullptr);
2715 EXPECT_EQ(mapper0_count, 2u);
2716 EXPECT_EQ(mapper1_count, 0u);
2717 output0.emplace_back(std::move(*mapper0.Front()));
2718 mapper0.PopFront();
2719 EXPECT_TRUE(mapper0.started());
2720
2721 ASSERT_TRUE(mapper0.Front() != nullptr);
2722 output0.emplace_back(std::move(*mapper0.Front()));
2723 mapper0.PopFront();
2724 EXPECT_TRUE(mapper0.started());
2725
2726 EXPECT_EQ(mapper0_count, 3u);
2727 EXPECT_EQ(mapper1_count, 0u);
2728
2729 ASSERT_TRUE(mapper0.Front() == nullptr);
2730
2731 LOG(INFO) << output0[0];
2732 LOG(INFO) << output0[1];
2733 LOG(INFO) << output0[2];
2734
2735 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2736 EXPECT_EQ(output0[0].monotonic_event_time.time,
2737 e + chrono::milliseconds(1000));
2738 EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
2739 EXPECT_EQ(output0[0].monotonic_remote_time.time,
2740 e + chrono::seconds(100) + chrono::milliseconds(1000));
2741 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
2742 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
2743 e + chrono::seconds(100) + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002744 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002745
2746 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2747 EXPECT_EQ(output0[1].monotonic_event_time.time,
2748 e + chrono::milliseconds(2000));
2749 EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
2750 EXPECT_EQ(output0[1].monotonic_remote_time.time,
2751 e + chrono::seconds(20) + chrono::milliseconds(2000));
2752 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
2753 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
2754 e + chrono::seconds(20) + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002755 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002756
2757 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2758 EXPECT_EQ(output0[2].monotonic_event_time.time,
2759 e + chrono::milliseconds(3000));
2760 EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
2761 EXPECT_EQ(output0[2].monotonic_remote_time.time,
2762 e + chrono::seconds(20) + chrono::milliseconds(3000));
2763 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
2764 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
2765 e + chrono::seconds(20) + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002766 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002767 }
2768
2769 {
2770 SCOPED_TRACE("Trying node1 now");
2771 std::deque<TimestampedMessage> output1;
2772
2773 EXPECT_EQ(mapper0_count, 3u);
2774 EXPECT_EQ(mapper1_count, 0u);
2775
2776 ASSERT_TRUE(mapper1.Front() != nullptr);
2777 EXPECT_EQ(mapper0_count, 3u);
2778 EXPECT_EQ(mapper1_count, 1u);
2779 output1.emplace_back(std::move(*mapper1.Front()));
2780 mapper1.PopFront();
2781 EXPECT_TRUE(mapper1.started());
2782 EXPECT_EQ(mapper0_count, 3u);
2783 EXPECT_EQ(mapper1_count, 1u);
2784
2785 ASSERT_TRUE(mapper1.Front() != nullptr);
2786 EXPECT_EQ(mapper0_count, 3u);
2787 EXPECT_EQ(mapper1_count, 2u);
2788 output1.emplace_back(std::move(*mapper1.Front()));
2789 mapper1.PopFront();
2790 EXPECT_TRUE(mapper1.started());
2791
2792 ASSERT_TRUE(mapper1.Front() != nullptr);
2793 output1.emplace_back(std::move(*mapper1.Front()));
2794 mapper1.PopFront();
2795 EXPECT_TRUE(mapper1.started());
2796
2797 EXPECT_EQ(mapper0_count, 3u);
2798 EXPECT_EQ(mapper1_count, 3u);
2799
2800 ASSERT_TRUE(mapper1.Front() == nullptr);
2801
2802 EXPECT_EQ(mapper0_count, 3u);
2803 EXPECT_EQ(mapper1_count, 3u);
2804
2805 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2806 EXPECT_EQ(output1[0].monotonic_event_time.time,
2807 e + chrono::seconds(100) + chrono::milliseconds(1000));
2808 EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
2809 EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002810 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002811
2812 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2813 EXPECT_EQ(output1[1].monotonic_event_time.time,
2814 e + chrono::seconds(20) + chrono::milliseconds(2000));
2815 EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
2816 EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002817 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002818
2819 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2820 EXPECT_EQ(output1[2].monotonic_event_time.time,
2821 e + chrono::seconds(20) + chrono::milliseconds(3000));
2822 EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
2823 EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002824 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002825
2826 LOG(INFO) << output1[0];
2827 LOG(INFO) << output1[1];
2828 LOG(INFO) << output1[2];
2829 }
2830}
2831
Philipp Schrader416505b2024-03-28 11:59:45 -07002832class SortingDeathTest : public SortingElementTest<> {
Austin Schuh44c61472021-11-22 21:04:10 -08002833 public:
2834 SortingDeathTest()
2835 : SortingElementTest(),
2836 part0_(MakeHeader(config_, R"({
2837 /* 100ms */
2838 "max_out_of_order_duration": 100000000,
2839 "node": {
2840 "name": "pi1"
2841 },
2842 "logger_node": {
2843 "name": "pi1"
2844 },
2845 "monotonic_start_time": 1000000,
2846 "realtime_start_time": 1000000000000,
2847 "logger_monotonic_start_time": 1000000,
2848 "logger_realtime_start_time": 1000000000000,
2849 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2850 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2851 "parts_index": 0,
2852 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2853 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2854 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2855 "boot_uuids": [
2856 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2857 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2858 ""
2859 ],
2860 "oldest_remote_monotonic_timestamps": [
2861 9223372036854775807,
2862 9223372036854775807,
2863 9223372036854775807
2864 ],
2865 "oldest_local_monotonic_timestamps": [
2866 9223372036854775807,
2867 9223372036854775807,
2868 9223372036854775807
2869 ],
2870 "oldest_remote_unreliable_monotonic_timestamps": [
2871 9223372036854775807,
2872 0,
2873 9223372036854775807
2874 ],
2875 "oldest_local_unreliable_monotonic_timestamps": [
2876 9223372036854775807,
2877 0,
2878 9223372036854775807
2879 ]
2880})")),
2881 part1_(MakeHeader(config_, R"({
2882 /* 100ms */
2883 "max_out_of_order_duration": 100000000,
2884 "node": {
2885 "name": "pi1"
2886 },
2887 "logger_node": {
2888 "name": "pi1"
2889 },
2890 "monotonic_start_time": 1000000,
2891 "realtime_start_time": 1000000000000,
2892 "logger_monotonic_start_time": 1000000,
2893 "logger_realtime_start_time": 1000000000000,
2894 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2895 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2896 "parts_index": 1,
2897 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2898 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2899 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2900 "boot_uuids": [
2901 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2902 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2903 ""
2904 ],
2905 "oldest_remote_monotonic_timestamps": [
2906 9223372036854775807,
2907 9223372036854775807,
2908 9223372036854775807
2909 ],
2910 "oldest_local_monotonic_timestamps": [
2911 9223372036854775807,
2912 9223372036854775807,
2913 9223372036854775807
2914 ],
2915 "oldest_remote_unreliable_monotonic_timestamps": [
2916 9223372036854775807,
2917 100000,
2918 9223372036854775807
2919 ],
2920 "oldest_local_unreliable_monotonic_timestamps": [
2921 9223372036854775807,
2922 100000,
2923 9223372036854775807
2924 ]
2925})")),
2926 part2_(MakeHeader(config_, R"({
2927 /* 100ms */
2928 "max_out_of_order_duration": 100000000,
2929 "node": {
2930 "name": "pi1"
2931 },
2932 "logger_node": {
2933 "name": "pi1"
2934 },
2935 "monotonic_start_time": 1000000,
2936 "realtime_start_time": 1000000000000,
2937 "logger_monotonic_start_time": 1000000,
2938 "logger_realtime_start_time": 1000000000000,
2939 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2940 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2941 "parts_index": 2,
2942 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2943 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2944 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2945 "boot_uuids": [
2946 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2947 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2948 ""
2949 ],
2950 "oldest_remote_monotonic_timestamps": [
2951 9223372036854775807,
2952 9223372036854775807,
2953 9223372036854775807
2954 ],
2955 "oldest_local_monotonic_timestamps": [
2956 9223372036854775807,
2957 9223372036854775807,
2958 9223372036854775807
2959 ],
2960 "oldest_remote_unreliable_monotonic_timestamps": [
2961 9223372036854775807,
2962 200000,
2963 9223372036854775807
2964 ],
2965 "oldest_local_unreliable_monotonic_timestamps": [
2966 9223372036854775807,
2967 200000,
2968 9223372036854775807
2969 ]
2970})")),
2971 part3_(MakeHeader(config_, R"({
2972 /* 100ms */
2973 "max_out_of_order_duration": 100000000,
2974 "node": {
2975 "name": "pi1"
2976 },
2977 "logger_node": {
2978 "name": "pi1"
2979 },
2980 "monotonic_start_time": 1000000,
2981 "realtime_start_time": 1000000000000,
2982 "logger_monotonic_start_time": 1000000,
2983 "logger_realtime_start_time": 1000000000000,
2984 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2985 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2986 "parts_index": 3,
2987 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2988 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2989 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2990 "boot_uuids": [
2991 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2992 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2993 ""
2994 ],
2995 "oldest_remote_monotonic_timestamps": [
2996 9223372036854775807,
2997 9223372036854775807,
2998 9223372036854775807
2999 ],
3000 "oldest_local_monotonic_timestamps": [
3001 9223372036854775807,
3002 9223372036854775807,
3003 9223372036854775807
3004 ],
3005 "oldest_remote_unreliable_monotonic_timestamps": [
3006 9223372036854775807,
3007 300000,
3008 9223372036854775807
3009 ],
3010 "oldest_local_unreliable_monotonic_timestamps": [
3011 9223372036854775807,
3012 300000,
3013 9223372036854775807
3014 ]
3015})")) {}
3016
3017 protected:
3018 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part0_;
3019 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part1_;
3020 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part2_;
3021 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part3_;
3022};
3023
3024// Tests that if 2 computers go back and forth trying to be the same node, we
3025// die in sorting instead of failing to estimate time.
3026TEST_F(SortingDeathTest, FightingNodes) {
3027 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07003028 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh44c61472021-11-22 21:04:10 -08003029 writer0.QueueSpan(part0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07003030 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh44c61472021-11-22 21:04:10 -08003031 writer1.QueueSpan(part1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07003032 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuh44c61472021-11-22 21:04:10 -08003033 writer2.QueueSpan(part2_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07003034 TestDetachedBufferWriter writer3(logfile3_);
Austin Schuh44c61472021-11-22 21:04:10 -08003035 writer3.QueueSpan(part3_.span());
3036 }
3037
3038 EXPECT_DEATH(
3039 {
3040 const std::vector<LogFile> parts =
3041 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
3042 },
Austin Schuh22cf7862022-09-19 19:09:42 -07003043 "found overlapping boots on");
Austin Schuh44c61472021-11-22 21:04:10 -08003044}
3045
Brian Smarttea913d42021-12-10 15:02:38 -08003046// Tests that we MessageReader blows up on a bad message.
3047TEST(MessageReaderConfirmCrash, ReadWrite) {
3048 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
3049 unlink(logfile.c_str());
3050
3051 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
3052 JsonToSizedFlatbuffer<LogFileHeader>(
3053 R"({ "max_out_of_order_duration": 100000000 })");
3054 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
3055 JsonToSizedFlatbuffer<MessageHeader>(
3056 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
3057 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
3058 JsonToSizedFlatbuffer<MessageHeader>(
3059 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
3060 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m4 =
3061 JsonToSizedFlatbuffer<MessageHeader>(
3062 R"({ "channel_index": 0, "monotonic_sent_time": 4 })");
3063
3064 // Starts out like a proper flat buffer header, but it breaks down ...
3065 std::vector<uint8_t> garbage{8, 0, 0, 0, 16, 0, 0, 0, 4, 0, 0, 0};
3066 absl::Span<uint8_t> m3_span(garbage);
3067
3068 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07003069 TestDetachedBufferWriter writer(logfile);
Brian Smarttea913d42021-12-10 15:02:38 -08003070 writer.QueueSpan(config.span());
3071 writer.QueueSpan(m1.span());
3072 writer.QueueSpan(m2.span());
3073 writer.QueueSpan(m3_span);
3074 writer.QueueSpan(m4.span()); // This message is "hidden"
3075 }
3076
3077 {
3078 MessageReader reader(logfile);
3079
3080 EXPECT_EQ(reader.filename(), logfile);
3081
3082 EXPECT_EQ(
3083 reader.max_out_of_order_duration(),
3084 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
3085 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
3086 EXPECT_TRUE(reader.ReadMessage());
3087 EXPECT_EQ(reader.newest_timestamp(),
3088 monotonic_clock::time_point(chrono::nanoseconds(1)));
3089 EXPECT_TRUE(reader.ReadMessage());
3090 EXPECT_EQ(reader.newest_timestamp(),
3091 monotonic_clock::time_point(chrono::nanoseconds(2)));
3092 // Confirm default crashing behavior
3093 EXPECT_DEATH(reader.ReadMessage(), "Corrupted message at offset");
3094 }
3095
3096 {
3097 gflags::FlagSaver fs;
3098
3099 MessageReader reader(logfile);
3100 reader.set_crash_on_corrupt_message_flag(false);
3101
3102 EXPECT_EQ(reader.filename(), logfile);
3103
3104 EXPECT_EQ(
3105 reader.max_out_of_order_duration(),
3106 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
3107 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
3108 EXPECT_TRUE(reader.ReadMessage());
3109 EXPECT_EQ(reader.newest_timestamp(),
3110 monotonic_clock::time_point(chrono::nanoseconds(1)));
3111 EXPECT_TRUE(reader.ReadMessage());
3112 EXPECT_EQ(reader.newest_timestamp(),
3113 monotonic_clock::time_point(chrono::nanoseconds(2)));
3114 // Confirm avoiding the corrupted message crash, stopping instead.
3115 EXPECT_FALSE(reader.ReadMessage());
3116 }
3117
3118 {
3119 gflags::FlagSaver fs;
3120
3121 MessageReader reader(logfile);
3122 reader.set_crash_on_corrupt_message_flag(false);
3123 reader.set_ignore_corrupt_messages_flag(true);
3124
3125 EXPECT_EQ(reader.filename(), logfile);
3126
3127 EXPECT_EQ(
3128 reader.max_out_of_order_duration(),
3129 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
3130 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
3131 EXPECT_TRUE(reader.ReadMessage());
3132 EXPECT_EQ(reader.newest_timestamp(),
3133 monotonic_clock::time_point(chrono::nanoseconds(1)));
3134 EXPECT_TRUE(reader.ReadMessage());
3135 EXPECT_EQ(reader.newest_timestamp(),
3136 monotonic_clock::time_point(chrono::nanoseconds(2)));
3137 // Confirm skipping of the corrupted message to read the hidden one.
3138 EXPECT_TRUE(reader.ReadMessage());
3139 EXPECT_EQ(reader.newest_timestamp(),
3140 monotonic_clock::time_point(chrono::nanoseconds(4)));
3141 EXPECT_FALSE(reader.ReadMessage());
3142 }
3143}
3144
Austin Schuhfa30c352022-10-16 11:12:02 -07003145class InlinePackMessage : public ::testing::Test {
3146 protected:
3147 aos::Context RandomContext() {
3148 data_ = RandomData();
3149 std::uniform_int_distribution<uint32_t> uint32_distribution(
3150 std::numeric_limits<uint32_t>::min(),
3151 std::numeric_limits<uint32_t>::max());
3152
3153 std::uniform_int_distribution<int64_t> time_distribution(
3154 std::numeric_limits<int64_t>::min(),
3155 std::numeric_limits<int64_t>::max());
3156
3157 aos::Context context;
3158 context.monotonic_event_time =
3159 aos::monotonic_clock::epoch() +
3160 chrono::nanoseconds(time_distribution(random_number_generator_));
3161 context.realtime_event_time =
3162 aos::realtime_clock::epoch() +
3163 chrono::nanoseconds(time_distribution(random_number_generator_));
3164
3165 context.monotonic_remote_time =
3166 aos::monotonic_clock::epoch() +
3167 chrono::nanoseconds(time_distribution(random_number_generator_));
3168 context.realtime_remote_time =
3169 aos::realtime_clock::epoch() +
3170 chrono::nanoseconds(time_distribution(random_number_generator_));
3171
Austin Schuhb5224ec2024-03-27 15:20:09 -07003172 context.monotonic_remote_transmit_time =
3173 aos::monotonic_clock::epoch() +
3174 chrono::nanoseconds(time_distribution(random_number_generator_));
3175
Austin Schuhfa30c352022-10-16 11:12:02 -07003176 context.queue_index = uint32_distribution(random_number_generator_);
3177 context.remote_queue_index = uint32_distribution(random_number_generator_);
3178 context.size = data_.size();
3179 context.data = data_.data();
3180 return context;
3181 }
3182
Austin Schuhf2d0e682022-10-16 14:20:58 -07003183 aos::monotonic_clock::time_point RandomMonotonic() {
3184 std::uniform_int_distribution<int64_t> time_distribution(
3185 0, std::numeric_limits<int64_t>::max());
3186 return aos::monotonic_clock::epoch() +
3187 chrono::nanoseconds(time_distribution(random_number_generator_));
3188 }
3189
3190 aos::SizePrefixedFlatbufferDetachedBuffer<message_bridge::RemoteMessage>
3191 RandomRemoteMessage() {
3192 std::uniform_int_distribution<uint8_t> uint8_distribution(
3193 std::numeric_limits<uint8_t>::min(),
3194 std::numeric_limits<uint8_t>::max());
3195
3196 std::uniform_int_distribution<int64_t> time_distribution(
3197 std::numeric_limits<int64_t>::min(),
3198 std::numeric_limits<int64_t>::max());
3199
3200 flatbuffers::FlatBufferBuilder fbb;
3201 message_bridge::RemoteMessage::Builder builder(fbb);
3202 builder.add_queue_index(uint8_distribution(random_number_generator_));
3203
3204 builder.add_monotonic_sent_time(
3205 time_distribution(random_number_generator_));
3206 builder.add_realtime_sent_time(time_distribution(random_number_generator_));
3207 builder.add_monotonic_remote_time(
3208 time_distribution(random_number_generator_));
3209 builder.add_realtime_remote_time(
3210 time_distribution(random_number_generator_));
3211
3212 builder.add_remote_queue_index(
3213 uint8_distribution(random_number_generator_));
3214
Austin Schuhb5224ec2024-03-27 15:20:09 -07003215 builder.add_monotonic_remote_transmit_time(
3216 time_distribution(random_number_generator_));
3217
Austin Schuhf2d0e682022-10-16 14:20:58 -07003218 fbb.FinishSizePrefixed(builder.Finish());
3219 return fbb.Release();
3220 }
3221
Austin Schuhfa30c352022-10-16 11:12:02 -07003222 std::vector<uint8_t> RandomData() {
3223 std::vector<uint8_t> result;
3224 std::uniform_int_distribution<int> length_distribution(1, 32);
3225 std::uniform_int_distribution<uint8_t> data_distribution(
3226 std::numeric_limits<uint8_t>::min(),
3227 std::numeric_limits<uint8_t>::max());
3228
3229 const size_t length = length_distribution(random_number_generator_);
3230
3231 result.reserve(length);
3232 for (size_t i = 0; i < length; ++i) {
3233 result.emplace_back(data_distribution(random_number_generator_));
3234 }
3235 return result;
3236 }
3237
3238 std::mt19937 random_number_generator_{
3239 std::mt19937(::aos::testing::RandomSeed())};
3240
3241 std::vector<uint8_t> data_;
3242};
3243
3244// Uses the binary schema to annotate a provided flatbuffer. Returns the
3245// annotated flatbuffer.
3246std::string AnnotateBinaries(
3247 const aos::NonSizePrefixedFlatbuffer<reflection::Schema> &schema,
3248 const std::string &schema_filename,
3249 flatbuffers::span<uint8_t> binary_data) {
3250 flatbuffers::BinaryAnnotator binary_annotator(
3251 schema.span().data(), schema.span().size(), binary_data.data(),
3252 binary_data.size());
3253
3254 auto annotations = binary_annotator.Annotate();
3255
3256 flatbuffers::AnnotatedBinaryTextGenerator text_generator(
3257 flatbuffers::AnnotatedBinaryTextGenerator::Options{}, annotations,
3258 binary_data.data(), binary_data.size());
3259
3260 text_generator.Generate(aos::testing::TestTmpDir() + "/foo.bfbs",
3261 schema_filename);
3262
3263 return aos::util::ReadFileToStringOrDie(aos::testing::TestTmpDir() +
3264 "/foo.afb");
3265}
3266
Austin Schuh71a40d42023-02-04 21:22:22 -08003267// Event loop which just has working time functions for the Copier classes
3268// tested below.
3269class TimeEventLoop : public EventLoop {
3270 public:
3271 TimeEventLoop() : EventLoop(nullptr) {}
3272
3273 aos::monotonic_clock::time_point monotonic_now() const final {
3274 return aos::monotonic_clock::min_time;
3275 }
3276 realtime_clock::time_point realtime_now() const final {
3277 return aos::realtime_clock::min_time;
3278 }
3279
3280 void OnRun(::std::function<void()> /*on_run*/) final { LOG(FATAL); }
3281
3282 const std::string_view name() const final { return "time"; }
3283 const Node *node() const final { return nullptr; }
3284
3285 void SetRuntimeAffinity(const cpu_set_t & /*cpuset*/) final { LOG(FATAL); }
3286 void SetRuntimeRealtimePriority(int /*priority*/) final { LOG(FATAL); }
3287
3288 const cpu_set_t &runtime_affinity() const final {
3289 LOG(FATAL);
3290 return cpuset_;
3291 }
3292
3293 TimerHandler *AddTimer(::std::function<void()> /*callback*/) final {
3294 LOG(FATAL);
3295 return nullptr;
3296 }
3297
3298 std::unique_ptr<RawSender> MakeRawSender(const Channel * /*channel*/) final {
3299 LOG(FATAL);
3300 return std::unique_ptr<RawSender>();
3301 }
3302
3303 const UUID &boot_uuid() const final {
3304 LOG(FATAL);
3305 return boot_uuid_;
3306 }
3307
3308 void set_name(const std::string_view name) final { LOG(FATAL) << name; }
3309
3310 pid_t GetTid() final {
3311 LOG(FATAL);
3312 return 0;
3313 }
3314
3315 int NumberBuffers(const Channel * /*channel*/) final {
3316 LOG(FATAL);
3317 return 0;
3318 }
3319
3320 int runtime_realtime_priority() const final {
3321 LOG(FATAL);
3322 return 0;
3323 }
3324
3325 std::unique_ptr<RawFetcher> MakeRawFetcher(
3326 const Channel * /*channel*/) final {
3327 LOG(FATAL);
3328 return std::unique_ptr<RawFetcher>();
3329 }
3330
3331 PhasedLoopHandler *AddPhasedLoop(
3332 ::std::function<void(int)> /*callback*/,
3333 const monotonic_clock::duration /*interval*/,
3334 const monotonic_clock::duration /*offset*/) final {
3335 LOG(FATAL);
3336 return nullptr;
3337 }
3338
3339 void MakeRawWatcher(
3340 const Channel * /*channel*/,
3341 std::function<void(const Context &context, const void *message)>
3342 /*watcher*/) final {
3343 LOG(FATAL);
3344 }
3345
3346 private:
3347 const cpu_set_t cpuset_ = DefaultAffinity();
3348 UUID boot_uuid_ = UUID ::Zero();
3349};
3350
Austin Schuhfa30c352022-10-16 11:12:02 -07003351// Tests that all variations of PackMessage are equivalent to the inline
3352// PackMessage used to avoid allocations.
3353TEST_F(InlinePackMessage, Equivilent) {
3354 std::uniform_int_distribution<uint32_t> uint32_distribution(
3355 std::numeric_limits<uint32_t>::min(),
3356 std::numeric_limits<uint32_t>::max());
3357 aos::FlatbufferVector<reflection::Schema> schema =
3358 FileToFlatbuffer<reflection::Schema>(
3359 ArtifactPath("aos/events/logging/logger.bfbs"));
3360
3361 for (const LogType type :
3362 {LogType::kLogMessage, LogType::kLogDeliveryTimeOnly,
Austin Schuh9d50f0d2024-03-27 14:40:43 -07003363 LogType::kLogRemoteMessage}) {
Austin Schuhfa30c352022-10-16 11:12:02 -07003364 for (int i = 0; i < 100; ++i) {
3365 aos::Context context = RandomContext();
3366 const uint32_t channel_index =
3367 uint32_distribution(random_number_generator_);
3368
3369 flatbuffers::FlatBufferBuilder fbb;
3370 fbb.ForceDefaults(true);
3371 fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index, type));
3372
3373 VLOG(1) << absl::BytesToHexString(std::string_view(
3374 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3375 fbb.GetBufferSpan().size()));
3376
3377 // Make sure that both the builder and inline method agree on sizes.
Austin Schuh48d10d62022-10-16 22:19:23 -07003378 ASSERT_EQ(fbb.GetSize(), PackMessageSize(type, context.size))
Austin Schuhfa30c352022-10-16 11:12:02 -07003379 << "log type " << static_cast<int>(type);
3380
3381 // Initialize the buffer to something nonzero to make sure all the padding
3382 // bytes are set to 0.
Austin Schuh48d10d62022-10-16 22:19:23 -07003383 std::vector<uint8_t> repacked_message(PackMessageSize(type, context.size),
3384 67);
Austin Schuhfa30c352022-10-16 11:12:02 -07003385
3386 // And verify packing inline works as expected.
Austin Schuh71a40d42023-02-04 21:22:22 -08003387 EXPECT_EQ(
3388 repacked_message.size(),
3389 PackMessageInline(repacked_message.data(), context, channel_index,
3390 type, 0u, repacked_message.size()));
Austin Schuhb5224ec2024-03-27 15:20:09 -07003391 for (size_t i = 0; i < fbb.GetBufferSpan().size(); ++i) {
3392 ASSERT_EQ(absl::Span<uint8_t>(repacked_message)[i],
3393 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3394 fbb.GetBufferSpan().size())[i])
3395 << ": On index " << i;
3396 }
3397 ASSERT_EQ(absl::Span<uint8_t>(repacked_message),
Austin Schuhfa30c352022-10-16 11:12:02 -07003398 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3399 fbb.GetBufferSpan().size()))
3400 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
Austin Schuhb5224ec2024-03-27 15:20:09 -07003401 fbb.GetBufferSpan())
3402 << " for log type " << static_cast<int>(type);
Austin Schuh71a40d42023-02-04 21:22:22 -08003403
3404 // Ok, now we want to confirm that we can build up arbitrary pieces of
3405 // said flatbuffer. Try all of them since it is cheap.
3406 TimeEventLoop event_loop;
3407 for (size_t i = 0; i < repacked_message.size(); i += 8) {
3408 for (size_t j = i; j < repacked_message.size(); j += 8) {
3409 std::vector<uint8_t> destination(repacked_message.size(), 67u);
3410 ContextDataCopier copier(context, channel_index, type, &event_loop);
3411
3412 copier.Copy(destination.data(), i, j);
3413
3414 size_t index = 0;
3415 for (size_t k = i; k < j; ++k) {
3416 ASSERT_EQ(destination[index], repacked_message[k])
3417 << ": Failed to match type " << static_cast<int>(type)
3418 << ", index " << index << " while testing range " << i << " to "
3419 << j;
3420 ;
3421 ++index;
3422 }
3423 // Now, confirm that none of the other bytes have been touched.
3424 for (; index < destination.size(); ++index) {
3425 ASSERT_EQ(destination[index], 67u);
3426 }
3427 }
3428 }
Austin Schuhfa30c352022-10-16 11:12:02 -07003429 }
3430 }
3431}
3432
Austin Schuhf2d0e682022-10-16 14:20:58 -07003433// Tests that all variations of PackMessage are equivilent to the inline
3434// PackMessage used to avoid allocations.
3435TEST_F(InlinePackMessage, RemoteEquivilent) {
3436 aos::FlatbufferVector<reflection::Schema> schema =
3437 FileToFlatbuffer<reflection::Schema>(
3438 ArtifactPath("aos/events/logging/logger.bfbs"));
3439 std::uniform_int_distribution<uint8_t> uint8_distribution(
3440 std::numeric_limits<uint8_t>::min(), std::numeric_limits<uint8_t>::max());
3441
3442 for (int i = 0; i < 100; ++i) {
3443 aos::SizePrefixedFlatbufferDetachedBuffer<RemoteMessage> random_msg =
3444 RandomRemoteMessage();
3445 const size_t channel_index = uint8_distribution(random_number_generator_);
3446 const monotonic_clock::time_point monotonic_timestamp_time =
3447 RandomMonotonic();
3448
3449 flatbuffers::FlatBufferBuilder fbb;
3450 fbb.ForceDefaults(true);
3451 fbb.FinishSizePrefixed(PackRemoteMessage(
3452 &fbb, &random_msg.message(), channel_index, monotonic_timestamp_time));
3453
3454 VLOG(1) << absl::BytesToHexString(std::string_view(
3455 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3456 fbb.GetBufferSpan().size()));
3457
3458 // Make sure that both the builder and inline method agree on sizes.
3459 ASSERT_EQ(fbb.GetSize(), PackRemoteMessageSize());
3460
3461 // Initialize the buffer to something nonzer to make sure all the padding
3462 // bytes are set to 0.
3463 std::vector<uint8_t> repacked_message(PackRemoteMessageSize(), 67);
3464
3465 // And verify packing inline works as expected.
Austin Schuh71a40d42023-02-04 21:22:22 -08003466 EXPECT_EQ(repacked_message.size(),
3467 PackRemoteMessageInline(
3468 repacked_message.data(), &random_msg.message(), channel_index,
3469 monotonic_timestamp_time, 0u, repacked_message.size()));
Austin Schuhf2d0e682022-10-16 14:20:58 -07003470 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
3471 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3472 fbb.GetBufferSpan().size()))
3473 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
3474 fbb.GetBufferSpan());
Austin Schuh71a40d42023-02-04 21:22:22 -08003475
3476 // Ok, now we want to confirm that we can build up arbitrary pieces of said
3477 // flatbuffer. Try all of them since it is cheap.
3478 TimeEventLoop event_loop;
3479 for (size_t i = 0; i < repacked_message.size(); i += 8) {
3480 for (size_t j = i; j < repacked_message.size(); j += 8) {
3481 std::vector<uint8_t> destination(repacked_message.size(), 67u);
3482 RemoteMessageCopier copier(&random_msg.message(), channel_index,
3483 monotonic_timestamp_time, &event_loop);
3484
3485 copier.Copy(destination.data(), i, j);
3486
3487 size_t index = 0;
3488 for (size_t k = i; k < j; ++k) {
3489 ASSERT_EQ(destination[index], repacked_message[k]);
3490 ++index;
3491 }
3492 for (; index < destination.size(); ++index) {
3493 ASSERT_EQ(destination[index], 67u);
3494 }
3495 }
3496 }
Austin Schuhf2d0e682022-10-16 14:20:58 -07003497 }
3498}
Austin Schuhfa30c352022-10-16 11:12:02 -07003499
Stephan Pleinesf63bde82024-01-13 15:59:33 -08003500} // namespace aos::logger::testing