blob: f4537980f097db33d1c0abd77e92ff071dacb6a8 [file] [log] [blame]
Austin Schuhc243b422020-10-11 15:35:08 -07001#include "aos/events/logging/logfile_utils.h"
2
Austin Schuhe243aaf2020-10-11 15:46:02 -07003#include <chrono>
Austin Schuhfa30c352022-10-16 11:12:02 -07004#include <random>
Austin Schuhe243aaf2020-10-11 15:46:02 -07005#include <string>
Austin Schuhc243b422020-10-11 15:35:08 -07006
Austin Schuhfa30c352022-10-16 11:12:02 -07007#include "absl/strings/escaping.h"
Austin Schuhc41603c2020-10-11 16:17:37 -07008#include "aos/events/logging/logfile_sorting.h"
Austin Schuhc243b422020-10-11 15:35:08 -07009#include "aos/events/logging/test_message_generated.h"
Austin Schuh4b5c22a2020-11-30 22:58:43 -080010#include "aos/flatbuffer_merge.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070011#include "aos/flatbuffers.h"
Austin Schuhc243b422020-10-11 15:35:08 -070012#include "aos/json_to_flatbuffer.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070013#include "aos/testing/path.h"
14#include "aos/testing/random_seed.h"
Austin Schuhc243b422020-10-11 15:35:08 -070015#include "aos/testing/tmpdir.h"
Austin Schuhfa30c352022-10-16 11:12:02 -070016#include "aos/util/file.h"
17#include "external/com_github_google_flatbuffers/src/annotated_binary_text_gen.h"
18#include "external/com_github_google_flatbuffers/src/binary_annotator.h"
19#include "flatbuffers/reflection_generated.h"
Brian Smarttea913d42021-12-10 15:02:38 -080020#include "gflags/gflags.h"
Austin Schuhe243aaf2020-10-11 15:46:02 -070021#include "gtest/gtest.h"
Austin Schuhc243b422020-10-11 15:35:08 -070022
23namespace aos {
24namespace logger {
25namespace testing {
Austin Schuhe243aaf2020-10-11 15:46:02 -070026namespace chrono = std::chrono;
Austin Schuhf2d0e682022-10-16 14:20:58 -070027using aos::message_bridge::RemoteMessage;
Eric Schmiedebergb38477e2022-12-02 16:08:04 -070028using aos::testing::ArtifactPath;
Austin Schuhc243b422020-10-11 15:35:08 -070029
Austin Schuhd863e6e2022-10-16 15:44:50 -070030// Adapter class to make it easy to test DetachedBufferWriter without adding
31// test only boilerplate to DetachedBufferWriter.
32class TestDetachedBufferWriter : public DetachedBufferWriter {
33 public:
Austin Schuh48d10d62022-10-16 22:19:23 -070034 // Pick a max size that is rather conservative.
35 static constexpr size_t kMaxMessageSize = 128 * 1024;
Austin Schuhd863e6e2022-10-16 15:44:50 -070036 TestDetachedBufferWriter(std::string_view filename)
Austin Schuh48d10d62022-10-16 22:19:23 -070037 : DetachedBufferWriter(filename,
38 std::make_unique<DummyEncoder>(kMaxMessageSize)) {}
Austin Schuhd863e6e2022-10-16 15:44:50 -070039 void WriteSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
40 QueueSpan(absl::Span<const uint8_t>(buffer.data(), buffer.size()));
41 }
42};
43
Austin Schuhe243aaf2020-10-11 15:46:02 -070044// Creates a size prefixed flatbuffer from json.
Austin Schuhc243b422020-10-11 15:35:08 -070045template <typename T>
46SizePrefixedFlatbufferDetachedBuffer<T> JsonToSizedFlatbuffer(
47 const std::string_view data) {
48 flatbuffers::FlatBufferBuilder fbb;
49 fbb.ForceDefaults(true);
50 fbb.FinishSizePrefixed(JsonToFlatbuffer<T>(data, &fbb));
51 return fbb.Release();
52}
53
Austin Schuhe243aaf2020-10-11 15:46:02 -070054// Tests that we can write and read 2 flatbuffers to file.
Austin Schuhc243b422020-10-11 15:35:08 -070055TEST(SpanReaderTest, ReadWrite) {
56 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
57 unlink(logfile.c_str());
58
59 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080060 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
Austin Schuhc243b422020-10-11 15:35:08 -070061 const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
Austin Schuh4b5c22a2020-11-30 22:58:43 -080062 JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
Austin Schuhc243b422020-10-11 15:35:08 -070063
64 {
Austin Schuhd863e6e2022-10-16 15:44:50 -070065 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080066 writer.QueueSpan(m1.span());
67 writer.QueueSpan(m2.span());
Austin Schuhc243b422020-10-11 15:35:08 -070068 }
69
70 SpanReader reader(logfile);
71
72 EXPECT_EQ(reader.filename(), logfile);
Austin Schuhcf5f6442021-07-06 10:43:28 -070073 EXPECT_EQ(reader.PeekMessage(), m1.span());
74 EXPECT_EQ(reader.PeekMessage(), m1.span());
Austin Schuhadd6eb32020-11-09 21:24:26 -080075 EXPECT_EQ(reader.ReadMessage(), m1.span());
76 EXPECT_EQ(reader.ReadMessage(), m2.span());
Austin Schuhcf5f6442021-07-06 10:43:28 -070077 EXPECT_EQ(reader.PeekMessage(), absl::Span<const uint8_t>());
Austin Schuhc243b422020-10-11 15:35:08 -070078 EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
79}
80
Austin Schuhe243aaf2020-10-11 15:46:02 -070081// Tests that we can actually parse the resulting messages at a basic level
82// through MessageReader.
83TEST(MessageReaderTest, ReadWrite) {
84 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
85 unlink(logfile.c_str());
86
87 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
88 JsonToSizedFlatbuffer<LogFileHeader>(
89 R"({ "max_out_of_order_duration": 100000000 })");
90 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
91 JsonToSizedFlatbuffer<MessageHeader>(
92 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
93 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
94 JsonToSizedFlatbuffer<MessageHeader>(
95 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
96
97 {
Austin Schuhd863e6e2022-10-16 15:44:50 -070098 TestDetachedBufferWriter writer(logfile);
Austin Schuhadd6eb32020-11-09 21:24:26 -080099 writer.QueueSpan(config.span());
100 writer.QueueSpan(m1.span());
101 writer.QueueSpan(m2.span());
Austin Schuhe243aaf2020-10-11 15:46:02 -0700102 }
103
104 MessageReader reader(logfile);
105
106 EXPECT_EQ(reader.filename(), logfile);
107
108 EXPECT_EQ(
109 reader.max_out_of_order_duration(),
110 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
111 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
112 EXPECT_TRUE(reader.ReadMessage());
113 EXPECT_EQ(reader.newest_timestamp(),
114 monotonic_clock::time_point(chrono::nanoseconds(1)));
115 EXPECT_TRUE(reader.ReadMessage());
116 EXPECT_EQ(reader.newest_timestamp(),
117 monotonic_clock::time_point(chrono::nanoseconds(2)));
118 EXPECT_FALSE(reader.ReadMessage());
119}
120
Austin Schuh32f68492020-11-08 21:45:51 -0800121// Tests that we explode when messages are too far out of order.
122TEST(PartsMessageReaderDeathTest, TooFarOutOfOrder) {
123 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
124 unlink(logfile0.c_str());
125
126 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
127 JsonToSizedFlatbuffer<LogFileHeader>(
128 R"({
129 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800130 "configuration": {},
Austin Schuh32f68492020-11-08 21:45:51 -0800131 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
132 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
133 "parts_index": 0
134})");
135
136 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
137 JsonToSizedFlatbuffer<MessageHeader>(
138 R"({ "channel_index": 0, "monotonic_sent_time": 100000000 })");
139 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
140 JsonToSizedFlatbuffer<MessageHeader>(
141 R"({ "channel_index": 0, "monotonic_sent_time": 0 })");
142 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m3 =
143 JsonToSizedFlatbuffer<MessageHeader>(
144 R"({ "channel_index": 0, "monotonic_sent_time": -1 })");
145
146 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700147 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800148 writer.QueueSpan(config0.span());
149 writer.QueueSpan(m1.span());
150 writer.QueueSpan(m2.span());
151 writer.QueueSpan(m3.span());
Austin Schuh32f68492020-11-08 21:45:51 -0800152 }
153
154 const std::vector<LogFile> parts = SortParts({logfile0});
155
156 PartsMessageReader reader(parts[0].parts[0]);
157
158 EXPECT_TRUE(reader.ReadMessage());
159 EXPECT_TRUE(reader.ReadMessage());
160 EXPECT_DEATH({ reader.ReadMessage(); }, "-0.000000001sec vs. 0.000000000sec");
161}
162
Austin Schuhc41603c2020-10-11 16:17:37 -0700163// Tests that we can transparently re-assemble part files with a
164// PartsMessageReader.
165TEST(PartsMessageReaderTest, ReadWrite) {
166 const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
167 const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
168 unlink(logfile0.c_str());
169 unlink(logfile1.c_str());
170
171 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
172 JsonToSizedFlatbuffer<LogFileHeader>(
173 R"({
174 "max_out_of_order_duration": 100000000,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800175 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700176 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
177 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
178 "parts_index": 0
179})");
180 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
181 JsonToSizedFlatbuffer<LogFileHeader>(
182 R"({
183 "max_out_of_order_duration": 200000000,
184 "monotonic_start_time": 0,
185 "realtime_start_time": 0,
Austin Schuh0ca51f32020-12-25 21:51:45 -0800186 "configuration": {},
Austin Schuhc41603c2020-10-11 16:17:37 -0700187 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
188 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
189 "parts_index": 1
190})");
191
192 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
193 JsonToSizedFlatbuffer<MessageHeader>(
194 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
195 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
196 JsonToSizedFlatbuffer<MessageHeader>(
197 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
198
199 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700200 TestDetachedBufferWriter writer(logfile0);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800201 writer.QueueSpan(config0.span());
202 writer.QueueSpan(m1.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700203 }
204 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700205 TestDetachedBufferWriter writer(logfile1);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800206 writer.QueueSpan(config1.span());
207 writer.QueueSpan(m2.span());
Austin Schuhc41603c2020-10-11 16:17:37 -0700208 }
209
210 const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
211
212 PartsMessageReader reader(parts[0].parts[0]);
213
214 EXPECT_EQ(reader.filename(), logfile0);
215
216 // Confirm that the timestamps track, and the filename also updates.
217 // Read the first message.
218 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
219 EXPECT_EQ(
220 reader.max_out_of_order_duration(),
221 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
222 EXPECT_TRUE(reader.ReadMessage());
223 EXPECT_EQ(reader.filename(), logfile0);
224 EXPECT_EQ(reader.newest_timestamp(),
225 monotonic_clock::time_point(chrono::nanoseconds(1)));
226 EXPECT_EQ(
227 reader.max_out_of_order_duration(),
228 std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
229
230 // Read the second message.
231 EXPECT_TRUE(reader.ReadMessage());
232 EXPECT_EQ(reader.filename(), logfile1);
233 EXPECT_EQ(reader.newest_timestamp(),
234 monotonic_clock::time_point(chrono::nanoseconds(2)));
235 EXPECT_EQ(
236 reader.max_out_of_order_duration(),
237 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
238
239 // And then confirm that reading again returns no message.
240 EXPECT_FALSE(reader.ReadMessage());
241 EXPECT_EQ(reader.filename(), logfile1);
242 EXPECT_EQ(
243 reader.max_out_of_order_duration(),
244 std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
Austin Schuh32f68492020-11-08 21:45:51 -0800245 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
Austin Schuhc41603c2020-10-11 16:17:37 -0700246}
Austin Schuh32f68492020-11-08 21:45:51 -0800247
Austin Schuh1be0ce42020-11-29 22:43:26 -0800248// Tests that Message's operator < works as expected.
249TEST(MessageTest, Sorting) {
250 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
251
252 Message m1{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700253 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700254 .timestamp =
255 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
Austin Schuh48507722021-07-17 17:29:24 -0700256 .monotonic_remote_boot = 0xffffff,
257 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700258 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800259 Message m2{.channel_index = 0,
Austin Schuh58646e22021-08-23 23:51:46 -0700260 .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700261 .timestamp =
262 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
Austin Schuh48507722021-07-17 17:29:24 -0700263 .monotonic_remote_boot = 0xffffff,
264 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700265 .data = nullptr};
Austin Schuh1be0ce42020-11-29 22:43:26 -0800266
267 EXPECT_LT(m1, m2);
268 EXPECT_GE(m2, m1);
269
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700270 m1.timestamp.time = e;
271 m2.timestamp.time = e;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800272
273 m1.channel_index = 1;
274 m2.channel_index = 2;
275
276 EXPECT_LT(m1, m2);
277 EXPECT_GE(m2, m1);
278
279 m1.channel_index = 0;
280 m2.channel_index = 0;
Austin Schuh58646e22021-08-23 23:51:46 -0700281 m1.queue_index.index = 0u;
282 m2.queue_index.index = 1u;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800283
284 EXPECT_LT(m1, m2);
285 EXPECT_GE(m2, m1);
286}
287
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800288aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
289 const aos::FlatbufferDetachedBuffer<Configuration> &config,
290 const std::string_view json) {
291 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh1034a832021-03-31 21:53:26 -0700292 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800293 flatbuffers::Offset<Configuration> config_offset =
294 aos::CopyFlatBuffer(config, &fbb);
295 LogFileHeader::Builder header_builder(fbb);
296 header_builder.add_configuration(config_offset);
297 fbb.Finish(header_builder.Finish());
298 aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
299
300 aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
301 JsonToFlatbuffer<LogFileHeader>(json));
302 CHECK(header_updates.Verify());
303 flatbuffers::FlatBufferBuilder fbb2;
Austin Schuh1034a832021-03-31 21:53:26 -0700304 fbb2.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800305 fbb2.FinishSizePrefixed(
306 aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
307 return fbb2.Release();
308}
309
310class SortingElementTest : public ::testing::Test {
311 public:
312 SortingElementTest()
313 : config_(JsonToFlatbuffer<Configuration>(
314 R"({
315 "channels": [
316 {
317 "name": "/a",
318 "type": "aos.logger.testing.TestMessage",
319 "source_node": "pi1",
320 "destination_nodes": [
321 {
322 "name": "pi2"
323 },
324 {
325 "name": "pi3"
326 }
327 ]
328 },
329 {
330 "name": "/b",
331 "type": "aos.logger.testing.TestMessage",
332 "source_node": "pi1"
333 },
334 {
335 "name": "/c",
336 "type": "aos.logger.testing.TestMessage",
337 "source_node": "pi1"
Austin Schuh48507722021-07-17 17:29:24 -0700338 },
339 {
340 "name": "/d",
341 "type": "aos.logger.testing.TestMessage",
342 "source_node": "pi2",
343 "destination_nodes": [
344 {
345 "name": "pi1"
346 }
347 ]
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800348 }
349 ],
350 "nodes": [
351 {
352 "name": "pi1"
353 },
354 {
355 "name": "pi2"
356 },
357 {
358 "name": "pi3"
359 }
360 ]
361}
362)")),
363 config0_(MakeHeader(config_, R"({
364 /* 100ms */
365 "max_out_of_order_duration": 100000000,
366 "node": {
367 "name": "pi1"
368 },
369 "logger_node": {
370 "name": "pi1"
371 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800372 "monotonic_start_time": 1000000,
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800373 "realtime_start_time": 1000000000000,
374 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700375 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
376 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
377 "boot_uuids": [
378 "1d782c63-b3c7-466e-bea9-a01308b43333",
379 "",
380 ""
381 ],
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800382 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
383 "parts_index": 0
384})")),
385 config1_(MakeHeader(config_,
386 R"({
387 /* 100ms */
388 "max_out_of_order_duration": 100000000,
389 "node": {
390 "name": "pi1"
391 },
392 "logger_node": {
393 "name": "pi1"
394 },
Austin Schuhd2f96102020-12-01 20:27:29 -0800395 "monotonic_start_time": 1000000,
396 "realtime_start_time": 1000000000000,
397 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
Austin Schuh48507722021-07-17 17:29:24 -0700398 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
399 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
400 "boot_uuids": [
401 "1d782c63-b3c7-466e-bea9-a01308b43333",
402 "",
403 ""
404 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800405 "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
406 "parts_index": 0
407})")),
408 config2_(MakeHeader(config_,
409 R"({
410 /* 100ms */
411 "max_out_of_order_duration": 100000000,
412 "node": {
413 "name": "pi2"
414 },
415 "logger_node": {
416 "name": "pi2"
417 },
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800418 "monotonic_start_time": 0,
419 "realtime_start_time": 1000000000000,
Austin Schuh48507722021-07-17 17:29:24 -0700420 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
421 "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
422 "boot_uuids": [
423 "",
424 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
425 ""
426 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800427 "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
428 "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
429 "parts_index": 0
430})")),
431 config3_(MakeHeader(config_,
432 R"({
433 /* 100ms */
434 "max_out_of_order_duration": 100000000,
435 "node": {
436 "name": "pi1"
437 },
438 "logger_node": {
439 "name": "pi1"
440 },
441 "monotonic_start_time": 2000000,
442 "realtime_start_time": 1000000000,
443 "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
Austin Schuh48507722021-07-17 17:29:24 -0700444 "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
445 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
446 "boot_uuids": [
447 "1d782c63-b3c7-466e-bea9-a01308b43333",
448 "",
449 ""
450 ],
Austin Schuhd2f96102020-12-01 20:27:29 -0800451 "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800452 "parts_index": 0
Austin Schuh8bf1e632021-01-02 22:41:04 -0800453})")),
454 config4_(MakeHeader(config_,
455 R"({
456 /* 100ms */
457 "max_out_of_order_duration": 100000000,
458 "node": {
459 "name": "pi2"
460 },
461 "logger_node": {
462 "name": "pi1"
463 },
464 "monotonic_start_time": 2000000,
465 "realtime_start_time": 1000000000,
466 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
467 "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
Austin Schuh48507722021-07-17 17:29:24 -0700468 "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
469 "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
470 "boot_uuids": [
471 "1d782c63-b3c7-466e-bea9-a01308b43333",
472 "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
473 ""
474 ],
Austin Schuh8bf1e632021-01-02 22:41:04 -0800475 "parts_index": 0
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800476})")) {
477 unlink(logfile0_.c_str());
478 unlink(logfile1_.c_str());
Austin Schuhd2f96102020-12-01 20:27:29 -0800479 unlink(logfile2_.c_str());
Austin Schuh48507722021-07-17 17:29:24 -0700480 unlink(logfile3_.c_str());
Austin Schuh921b2562021-07-31 19:37:03 -0700481 queue_index_.resize(config_.message().channels()->size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800482 }
483
484 protected:
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800485 flatbuffers::DetachedBuffer MakeLogMessage(
486 const aos::monotonic_clock::time_point monotonic_now, int channel_index,
487 int value) {
488 flatbuffers::FlatBufferBuilder message_fbb;
489 message_fbb.ForceDefaults(true);
490 TestMessage::Builder test_message_builder(message_fbb);
491 test_message_builder.add_value(value);
492 message_fbb.Finish(test_message_builder.Finish());
493
494 aos::Context context;
495 context.monotonic_event_time = monotonic_now;
496 context.realtime_event_time = aos::realtime_clock::epoch() +
497 chrono::seconds(1000) +
498 monotonic_now.time_since_epoch();
Austin Schuh921b2562021-07-31 19:37:03 -0700499 CHECK_LT(static_cast<size_t>(channel_index), queue_index_.size());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800500 context.queue_index = queue_index_[channel_index];
501 context.size = message_fbb.GetSize();
502 context.data = message_fbb.GetBufferPointer();
503
504 ++queue_index_[channel_index];
505
506 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh53e3a812021-11-03 16:57:23 -0700507 fbb.ForceDefaults(true);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800508 fbb.FinishSizePrefixed(
509 PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
510
511 return fbb.Release();
512 }
513
514 flatbuffers::DetachedBuffer MakeTimestampMessage(
515 const aos::monotonic_clock::time_point sender_monotonic_now,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800516 int channel_index, chrono::nanoseconds receiver_monotonic_offset,
517 monotonic_clock::time_point monotonic_timestamp_time =
518 monotonic_clock::min_time) {
519 const monotonic_clock::time_point monotonic_sent_time =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800520 sender_monotonic_now + receiver_monotonic_offset;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800521
522 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800523 fbb.ForceDefaults(true);
524
525 logger::MessageHeader::Builder message_header_builder(fbb);
526
527 message_header_builder.add_channel_index(channel_index);
528
529 message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
530 100);
531 message_header_builder.add_monotonic_sent_time(
532 monotonic_sent_time.time_since_epoch().count());
533 message_header_builder.add_realtime_sent_time(
534 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
535 monotonic_sent_time.time_since_epoch())
536 .time_since_epoch()
537 .count());
538
539 message_header_builder.add_monotonic_remote_time(
540 sender_monotonic_now.time_since_epoch().count());
541 message_header_builder.add_realtime_remote_time(
542 (aos::realtime_clock::epoch() + chrono::seconds(1000) +
543 sender_monotonic_now.time_since_epoch())
544 .time_since_epoch()
545 .count());
546 message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
547 1);
548
549 if (monotonic_timestamp_time != monotonic_clock::min_time) {
550 message_header_builder.add_monotonic_timestamp_time(
551 monotonic_timestamp_time.time_since_epoch().count());
552 }
553
554 fbb.FinishSizePrefixed(message_header_builder.Finish());
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800555 LOG(INFO) << aos::FlatbufferToJson(
556 aos::SizePrefixedFlatbufferSpan<MessageHeader>(
557 absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
558
559 return fbb.Release();
560 }
561
562 const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
563 const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
Austin Schuhd2f96102020-12-01 20:27:29 -0800564 const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
Austin Schuh48507722021-07-17 17:29:24 -0700565 const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800566
567 const aos::FlatbufferDetachedBuffer<Configuration> config_;
568 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
569 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800570 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
571 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800572 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800573
574 std::vector<uint32_t> queue_index_;
575};
576
577using LogPartsSorterTest = SortingElementTest;
578using LogPartsSorterDeathTest = LogPartsSorterTest;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800579using NodeMergerTest = SortingElementTest;
Austin Schuhd2f96102020-12-01 20:27:29 -0800580using TimestampMapperTest = SortingElementTest;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800581
582// Tests that we can pull messages out of a log sorted in order.
583TEST_F(LogPartsSorterTest, Pull) {
584 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
585 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700586 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800587 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700588 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800589 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700590 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800591 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700592 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800593 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700594 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800595 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
596 }
597
598 const std::vector<LogFile> parts = SortParts({logfile0_});
599
600 LogPartsSorter parts_sorter(parts[0].parts[0]);
601
602 // Confirm we aren't sorted until any time until the message is popped.
603 // Peeking shouldn't change the sorted until time.
604 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
605
606 std::deque<Message> output;
607
608 ASSERT_TRUE(parts_sorter.Front() != nullptr);
609 output.emplace_back(std::move(*parts_sorter.Front()));
610 parts_sorter.PopFront();
611 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
612
613 ASSERT_TRUE(parts_sorter.Front() != nullptr);
614 output.emplace_back(std::move(*parts_sorter.Front()));
615 parts_sorter.PopFront();
616 EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
617
618 ASSERT_TRUE(parts_sorter.Front() != nullptr);
619 output.emplace_back(std::move(*parts_sorter.Front()));
620 parts_sorter.PopFront();
621 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
622
623 ASSERT_TRUE(parts_sorter.Front() != nullptr);
624 output.emplace_back(std::move(*parts_sorter.Front()));
625 parts_sorter.PopFront();
626 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
627
628 ASSERT_TRUE(parts_sorter.Front() == nullptr);
629
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700630 EXPECT_EQ(output[0].timestamp.boot, 0);
631 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
632 EXPECT_EQ(output[1].timestamp.boot, 0);
633 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1000));
634 EXPECT_EQ(output[2].timestamp.boot, 0);
635 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1901));
636 EXPECT_EQ(output[3].timestamp.boot, 0);
637 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800638}
639
Austin Schuhb000de62020-12-03 22:00:40 -0800640// Tests that we can pull messages out of a log sorted in order.
641TEST_F(LogPartsSorterTest, WayBeforeStart) {
642 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
643 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700644 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhb000de62020-12-03 22:00:40 -0800645 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700646 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800647 MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700648 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800649 MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700650 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800651 MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700652 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800653 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700654 writer.WriteSizedFlatbuffer(
Austin Schuhb000de62020-12-03 22:00:40 -0800655 MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
656 }
657
658 const std::vector<LogFile> parts = SortParts({logfile0_});
659
660 LogPartsSorter parts_sorter(parts[0].parts[0]);
661
662 // Confirm we aren't sorted until any time until the message is popped.
663 // Peeking shouldn't change the sorted until time.
664 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
665
666 std::deque<Message> output;
667
668 for (monotonic_clock::time_point t :
669 {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
670 e + chrono::milliseconds(1900), monotonic_clock::max_time,
671 monotonic_clock::max_time}) {
672 ASSERT_TRUE(parts_sorter.Front() != nullptr);
673 output.emplace_back(std::move(*parts_sorter.Front()));
674 parts_sorter.PopFront();
675 EXPECT_EQ(parts_sorter.sorted_until(), t);
676 }
677
678 ASSERT_TRUE(parts_sorter.Front() == nullptr);
679
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700680 EXPECT_EQ(output[0].timestamp.boot, 0u);
681 EXPECT_EQ(output[0].timestamp.time, e - chrono::milliseconds(1000));
682 EXPECT_EQ(output[1].timestamp.boot, 0u);
683 EXPECT_EQ(output[1].timestamp.time, e - chrono::milliseconds(500));
684 EXPECT_EQ(output[2].timestamp.boot, 0u);
685 EXPECT_EQ(output[2].timestamp.time, e - chrono::milliseconds(10));
686 EXPECT_EQ(output[3].timestamp.boot, 0u);
687 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(1901));
688 EXPECT_EQ(output[4].timestamp.boot, 0u);
689 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(2000));
Austin Schuhb000de62020-12-03 22:00:40 -0800690}
691
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800692// Tests that messages too far out of order trigger death.
693TEST_F(LogPartsSorterDeathTest, Pull) {
694 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
695 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700696 TestDetachedBufferWriter writer(logfile0_);
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800697 writer.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700698 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800699 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700700 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800701 MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700702 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800703 MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
704 // The following message is too far out of order and will trigger the CHECK.
Austin Schuhd863e6e2022-10-16 15:44:50 -0700705 writer.WriteSizedFlatbuffer(
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800706 MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
707 }
708
709 const std::vector<LogFile> parts = SortParts({logfile0_});
710
711 LogPartsSorter parts_sorter(parts[0].parts[0]);
712
713 // Confirm we aren't sorted until any time until the message is popped.
714 // Peeking shouldn't change the sorted until time.
715 EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
716 std::deque<Message> output;
717
718 ASSERT_TRUE(parts_sorter.Front() != nullptr);
719 parts_sorter.PopFront();
720 ASSERT_TRUE(parts_sorter.Front() != nullptr);
721 ASSERT_TRUE(parts_sorter.Front() != nullptr);
722 parts_sorter.PopFront();
723
Austin Schuh58646e22021-08-23 23:51:46 -0700724 EXPECT_DEATH({ parts_sorter.Front(); },
725 "Max out of order of 100000000ns exceeded.");
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800726}
727
Austin Schuh8f52ed52020-11-30 23:12:39 -0800728// Tests that we can merge data from 2 separate files, including duplicate data.
729TEST_F(NodeMergerTest, TwoFileMerger) {
730 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
731 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700732 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800733 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700734 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800735 writer1.QueueSpan(config1_.span());
736
Austin Schuhd863e6e2022-10-16 15:44:50 -0700737 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800738 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700739 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800740 MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
741
Austin Schuhd863e6e2022-10-16 15:44:50 -0700742 writer0.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800743 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700744 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800745 MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
746
747 // Make a duplicate!
748 SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
749 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
750 writer0.QueueSpan(msg.span());
751 writer1.QueueSpan(msg.span());
752
Austin Schuhd863e6e2022-10-16 15:44:50 -0700753 writer1.WriteSizedFlatbuffer(
Austin Schuh8f52ed52020-11-30 23:12:39 -0800754 MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
755 }
756
757 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
Austin Schuhd2f96102020-12-01 20:27:29 -0800758 ASSERT_EQ(parts.size(), 1u);
Austin Schuh8f52ed52020-11-30 23:12:39 -0800759
Austin Schuhd2f96102020-12-01 20:27:29 -0800760 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800761
762 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
763
764 std::deque<Message> output;
765
766 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
767 ASSERT_TRUE(merger.Front() != nullptr);
768 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
769
770 output.emplace_back(std::move(*merger.Front()));
771 merger.PopFront();
772 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
773
774 ASSERT_TRUE(merger.Front() != nullptr);
775 output.emplace_back(std::move(*merger.Front()));
776 merger.PopFront();
777 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
778
779 ASSERT_TRUE(merger.Front() != nullptr);
780 output.emplace_back(std::move(*merger.Front()));
781 merger.PopFront();
782 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
783
784 ASSERT_TRUE(merger.Front() != nullptr);
785 output.emplace_back(std::move(*merger.Front()));
786 merger.PopFront();
787 EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
788
789 ASSERT_TRUE(merger.Front() != nullptr);
790 output.emplace_back(std::move(*merger.Front()));
791 merger.PopFront();
792 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
793
794 ASSERT_TRUE(merger.Front() != nullptr);
795 output.emplace_back(std::move(*merger.Front()));
796 merger.PopFront();
797 EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
798
799 ASSERT_TRUE(merger.Front() == nullptr);
800
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700801 EXPECT_EQ(output[0].timestamp.boot, 0u);
802 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
803 EXPECT_EQ(output[1].timestamp.boot, 0u);
804 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(1001));
805 EXPECT_EQ(output[2].timestamp.boot, 0u);
806 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(1002));
807 EXPECT_EQ(output[3].timestamp.boot, 0u);
808 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(2000));
809 EXPECT_EQ(output[4].timestamp.boot, 0u);
810 EXPECT_EQ(output[4].timestamp.time, e + chrono::milliseconds(3000));
811 EXPECT_EQ(output[5].timestamp.boot, 0u);
812 EXPECT_EQ(output[5].timestamp.time, e + chrono::milliseconds(3002));
Austin Schuh8f52ed52020-11-30 23:12:39 -0800813}
814
Austin Schuh8bf1e632021-01-02 22:41:04 -0800815// Tests that we can merge timestamps with various combinations of
816// monotonic_timestamp_time.
817TEST_F(NodeMergerTest, TwoFileTimestampMerger) {
818 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
819 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700820 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800821 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700822 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800823 writer1.QueueSpan(config1_.span());
824
825 // Neither has it.
826 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700827 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800828 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700829 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800830 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
831
832 // First only has it.
833 MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700834 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800835 e + chrono::milliseconds(1001), 0, chrono::seconds(100),
836 e + chrono::nanoseconds(971)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700837 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800838 e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
839
840 // Second only has it.
841 MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700842 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800843 e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700844 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800845 e + chrono::milliseconds(1002), 0, chrono::seconds(100),
846 e + chrono::nanoseconds(972)));
847
848 // Both have it.
849 MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
Austin Schuhd863e6e2022-10-16 15:44:50 -0700850 writer0.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800851 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
852 e + chrono::nanoseconds(973)));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700853 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -0800854 e + chrono::milliseconds(1003), 0, chrono::seconds(100),
855 e + chrono::nanoseconds(973)));
856 }
857
858 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
859 ASSERT_EQ(parts.size(), 1u);
860
861 NodeMerger merger(FilterPartsForNode(parts, "pi1"));
862
863 EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
864
865 std::deque<Message> output;
866
867 for (int i = 0; i < 4; ++i) {
868 ASSERT_TRUE(merger.Front() != nullptr);
869 output.emplace_back(std::move(*merger.Front()));
870 merger.PopFront();
871 }
872 ASSERT_TRUE(merger.Front() == nullptr);
873
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700874 EXPECT_EQ(output[0].timestamp.boot, 0u);
875 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700876 EXPECT_FALSE(output[0].data->has_monotonic_timestamp_time);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700877
878 EXPECT_EQ(output[1].timestamp.boot, 0u);
879 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700880 EXPECT_TRUE(output[1].data->has_monotonic_timestamp_time);
881 EXPECT_EQ(output[1].data->monotonic_timestamp_time,
882 monotonic_clock::time_point(std::chrono::nanoseconds(971)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700883
884 EXPECT_EQ(output[2].timestamp.boot, 0u);
885 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700886 EXPECT_TRUE(output[2].data->has_monotonic_timestamp_time);
887 EXPECT_EQ(output[2].data->monotonic_timestamp_time,
888 monotonic_clock::time_point(std::chrono::nanoseconds(972)));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700889
890 EXPECT_EQ(output[3].timestamp.boot, 0u);
891 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700892 EXPECT_TRUE(output[3].data->has_monotonic_timestamp_time);
893 EXPECT_EQ(output[3].data->monotonic_timestamp_time,
894 monotonic_clock::time_point(std::chrono::nanoseconds(973)));
Austin Schuh8bf1e632021-01-02 22:41:04 -0800895}
896
Austin Schuhd2f96102020-12-01 20:27:29 -0800897// Tests that we can match timestamps on delivered messages.
898TEST_F(TimestampMapperTest, ReadNode0First) {
899 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
900 {
Austin Schuhd863e6e2022-10-16 15:44:50 -0700901 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800902 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -0700903 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -0800904 writer1.QueueSpan(config2_.span());
905
Austin Schuhd863e6e2022-10-16 15:44:50 -0700906 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800907 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700908 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800909 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
910
Austin Schuhd863e6e2022-10-16 15:44:50 -0700911 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800912 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700913 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800914 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
915
Austin Schuhd863e6e2022-10-16 15:44:50 -0700916 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -0800917 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -0700918 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -0800919 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
920 }
921
922 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
923
924 ASSERT_EQ(parts[0].logger_node, "pi1");
925 ASSERT_EQ(parts[1].logger_node, "pi2");
926
Austin Schuh79b30942021-01-24 22:32:21 -0800927 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800928 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -0800929 mapper0.set_timestamp_callback(
930 [&](TimestampedMessage *) { ++mapper0_count; });
931 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -0800932 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -0800933 mapper1.set_timestamp_callback(
934 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -0800935
936 mapper0.AddPeer(&mapper1);
937 mapper1.AddPeer(&mapper0);
938
939 {
940 std::deque<TimestampedMessage> output0;
941
Austin Schuh79b30942021-01-24 22:32:21 -0800942 EXPECT_EQ(mapper0_count, 0u);
943 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800944 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800945 EXPECT_EQ(mapper0_count, 1u);
946 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800947 output0.emplace_back(std::move(*mapper0.Front()));
948 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700949 EXPECT_TRUE(mapper0.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800950 EXPECT_EQ(mapper0_count, 1u);
951 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800952
953 ASSERT_TRUE(mapper0.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800954 EXPECT_EQ(mapper0_count, 2u);
955 EXPECT_EQ(mapper1_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800956 output0.emplace_back(std::move(*mapper0.Front()));
957 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700958 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800959
960 ASSERT_TRUE(mapper0.Front() != nullptr);
961 output0.emplace_back(std::move(*mapper0.Front()));
962 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700963 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -0800964
Austin Schuh79b30942021-01-24 22:32:21 -0800965 EXPECT_EQ(mapper0_count, 3u);
966 EXPECT_EQ(mapper1_count, 0u);
967
Austin Schuhd2f96102020-12-01 20:27:29 -0800968 ASSERT_TRUE(mapper0.Front() == nullptr);
969
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700970 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
971 EXPECT_EQ(output0[0].monotonic_event_time.time,
972 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700973 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700974
975 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
976 EXPECT_EQ(output0[1].monotonic_event_time.time,
977 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700978 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700979
980 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
981 EXPECT_EQ(output0[2].monotonic_event_time.time,
982 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700983 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -0800984 }
985
986 {
987 SCOPED_TRACE("Trying node1 now");
988 std::deque<TimestampedMessage> output1;
989
Austin Schuh79b30942021-01-24 22:32:21 -0800990 EXPECT_EQ(mapper0_count, 3u);
991 EXPECT_EQ(mapper1_count, 0u);
992
Austin Schuhd2f96102020-12-01 20:27:29 -0800993 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -0800994 EXPECT_EQ(mapper0_count, 3u);
995 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -0800996 output1.emplace_back(std::move(*mapper1.Front()));
997 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -0700998 EXPECT_TRUE(mapper1.started());
Austin Schuh79b30942021-01-24 22:32:21 -0800999 EXPECT_EQ(mapper0_count, 3u);
1000 EXPECT_EQ(mapper1_count, 1u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001001
1002 ASSERT_TRUE(mapper1.Front() != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001003 EXPECT_EQ(mapper0_count, 3u);
1004 EXPECT_EQ(mapper1_count, 2u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001005 output1.emplace_back(std::move(*mapper1.Front()));
1006 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001007 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001008
1009 ASSERT_TRUE(mapper1.Front() != nullptr);
1010 output1.emplace_back(std::move(*mapper1.Front()));
1011 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001012 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001013
Austin Schuh79b30942021-01-24 22:32:21 -08001014 EXPECT_EQ(mapper0_count, 3u);
1015 EXPECT_EQ(mapper1_count, 3u);
1016
Austin Schuhd2f96102020-12-01 20:27:29 -08001017 ASSERT_TRUE(mapper1.Front() == nullptr);
1018
Austin Schuh79b30942021-01-24 22:32:21 -08001019 EXPECT_EQ(mapper0_count, 3u);
1020 EXPECT_EQ(mapper1_count, 3u);
1021
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001022 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1023 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001024 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001025 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001026
1027 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1028 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001029 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001030 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001031
1032 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1033 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001034 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001035 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001036 }
1037}
1038
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07001039// Tests that we filter messages using the channel filter callback
1040TEST_F(TimestampMapperTest, ReplayChannelsCallbackTest) {
1041 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1042 {
1043 TestDetachedBufferWriter writer0(logfile0_);
1044 writer0.QueueSpan(config0_.span());
1045 TestDetachedBufferWriter writer1(logfile1_);
1046 writer1.QueueSpan(config2_.span());
1047
1048 writer0.WriteSizedFlatbuffer(
1049 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
1050 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1051 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1052
1053 writer0.WriteSizedFlatbuffer(
1054 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
1055 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1056 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1057
1058 writer0.WriteSizedFlatbuffer(
1059 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
1060 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
1061 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1062 }
1063
1064 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1065
1066 ASSERT_EQ(parts[0].logger_node, "pi1");
1067 ASSERT_EQ(parts[1].logger_node, "pi2");
1068
1069 // mapper0 will not provide any messages while mapper1 will provide all
1070 // messages due to the channel filter callbacks used
1071 size_t mapper0_count = 0;
1072 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1073 mapper0.set_timestamp_callback(
1074 [&](TimestampedMessage *) { ++mapper0_count; });
1075 mapper0.set_replay_channels_callback(
1076 [&](const TimestampedMessage &) -> bool { return mapper0_count != 2; });
1077 size_t mapper1_count = 0;
1078 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1079 mapper1.set_timestamp_callback(
1080 [&](TimestampedMessage *) { ++mapper1_count; });
1081 mapper1.set_replay_channels_callback(
1082 [&](const TimestampedMessage &) -> bool { return mapper1_count != 2; });
1083
1084 mapper0.AddPeer(&mapper1);
1085 mapper1.AddPeer(&mapper0);
1086
1087 {
1088 std::deque<TimestampedMessage> output0;
1089
1090 EXPECT_EQ(mapper0_count, 0u);
1091 EXPECT_EQ(mapper1_count, 0u);
1092
1093 ASSERT_TRUE(mapper0.Front() != nullptr);
1094 EXPECT_EQ(mapper0_count, 1u);
1095 EXPECT_EQ(mapper1_count, 0u);
1096 output0.emplace_back(std::move(*mapper0.Front()));
1097 mapper0.PopFront();
1098
1099 EXPECT_TRUE(mapper0.started());
1100 EXPECT_EQ(mapper0_count, 1u);
1101 EXPECT_EQ(mapper1_count, 0u);
1102
1103 // mapper0_count is now at 3 since the second message is not queued, but
1104 // timestamp_callback needs to be called everytime even if Front() does not
1105 // provide a message due to the replay_channels_callback.
1106 ASSERT_TRUE(mapper0.Front() != nullptr);
1107 EXPECT_EQ(mapper0_count, 3u);
1108 EXPECT_EQ(mapper1_count, 0u);
1109 output0.emplace_back(std::move(*mapper0.Front()));
1110 mapper0.PopFront();
1111
1112 EXPECT_TRUE(mapper0.started());
1113 EXPECT_EQ(mapper0_count, 3u);
1114 EXPECT_EQ(mapper1_count, 0u);
1115
1116 ASSERT_TRUE(mapper0.Front() == nullptr);
1117 EXPECT_TRUE(mapper0.started());
1118
1119 EXPECT_EQ(mapper0_count, 3u);
1120 EXPECT_EQ(mapper1_count, 0u);
1121
1122 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1123 EXPECT_EQ(output0[0].monotonic_event_time.time,
1124 e + chrono::milliseconds(1000));
1125 EXPECT_TRUE(output0[0].data != nullptr);
1126
1127 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1128 EXPECT_EQ(output0[1].monotonic_event_time.time,
1129 e + chrono::milliseconds(3000));
1130 EXPECT_TRUE(output0[1].data != nullptr);
1131 }
1132
1133 {
1134 SCOPED_TRACE("Trying node1 now");
1135 std::deque<TimestampedMessage> output1;
1136
1137 EXPECT_EQ(mapper0_count, 3u);
1138 EXPECT_EQ(mapper1_count, 0u);
1139
1140 ASSERT_TRUE(mapper1.Front() != nullptr);
1141 EXPECT_EQ(mapper0_count, 3u);
1142 EXPECT_EQ(mapper1_count, 1u);
1143 output1.emplace_back(std::move(*mapper1.Front()));
1144 mapper1.PopFront();
1145 EXPECT_TRUE(mapper1.started());
1146 EXPECT_EQ(mapper0_count, 3u);
1147 EXPECT_EQ(mapper1_count, 1u);
1148
1149 // mapper1_count is now at 3 since the second message is not queued, but
1150 // timestamp_callback needs to be called everytime even if Front() does not
1151 // provide a message due to the replay_channels_callback.
1152 ASSERT_TRUE(mapper1.Front() != nullptr);
1153 output1.emplace_back(std::move(*mapper1.Front()));
1154 mapper1.PopFront();
1155 EXPECT_TRUE(mapper1.started());
1156
1157 EXPECT_EQ(mapper0_count, 3u);
1158 EXPECT_EQ(mapper1_count, 3u);
1159
1160 ASSERT_TRUE(mapper1.Front() == nullptr);
1161
1162 EXPECT_EQ(mapper0_count, 3u);
1163 EXPECT_EQ(mapper1_count, 3u);
1164
1165 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1166 EXPECT_EQ(output1[0].monotonic_event_time.time,
1167 e + chrono::seconds(100) + chrono::milliseconds(1000));
1168 EXPECT_TRUE(output1[0].data != nullptr);
1169
1170 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1171 EXPECT_EQ(output1[1].monotonic_event_time.time,
1172 e + chrono::seconds(100) + chrono::milliseconds(3000));
1173 EXPECT_TRUE(output1[1].data != nullptr);
1174 }
1175}
Austin Schuh8bf1e632021-01-02 22:41:04 -08001176// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
1177// returned.
1178TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
1179 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1180 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001181 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001182 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001183 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001184 writer1.QueueSpan(config4_.span());
1185
Austin Schuhd863e6e2022-10-16 15:44:50 -07001186 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001187 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001188 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001189 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
1190 e + chrono::nanoseconds(971)));
1191
Austin Schuhd863e6e2022-10-16 15:44:50 -07001192 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001193 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001194 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001195 e + chrono::milliseconds(2000), 0, chrono::seconds(100),
1196 e + chrono::nanoseconds(5458)));
1197
Austin Schuhd863e6e2022-10-16 15:44:50 -07001198 writer0.WriteSizedFlatbuffer(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001199 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001200 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh8bf1e632021-01-02 22:41:04 -08001201 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1202 }
1203
1204 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1205
1206 for (const auto &p : parts) {
1207 LOG(INFO) << p;
1208 }
1209
1210 ASSERT_EQ(parts.size(), 1u);
1211
Austin Schuh79b30942021-01-24 22:32:21 -08001212 size_t mapper0_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001213 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001214 mapper0.set_timestamp_callback(
1215 [&](TimestampedMessage *) { ++mapper0_count; });
1216 size_t mapper1_count = 0;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001217 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001218 mapper1.set_timestamp_callback(
1219 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh8bf1e632021-01-02 22:41:04 -08001220
1221 mapper0.AddPeer(&mapper1);
1222 mapper1.AddPeer(&mapper0);
1223
1224 {
1225 std::deque<TimestampedMessage> output0;
1226
1227 for (int i = 0; i < 3; ++i) {
1228 ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
1229 output0.emplace_back(std::move(*mapper0.Front()));
1230 mapper0.PopFront();
1231 }
1232
1233 ASSERT_TRUE(mapper0.Front() == nullptr);
1234
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001235 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1236 EXPECT_EQ(output0[0].monotonic_event_time.time,
1237 e + chrono::milliseconds(1000));
1238 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
1239 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
1240 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001241 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001242
1243 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1244 EXPECT_EQ(output0[1].monotonic_event_time.time,
1245 e + chrono::milliseconds(2000));
1246 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
1247 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
1248 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001249 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001250
1251 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1252 EXPECT_EQ(output0[2].monotonic_event_time.time,
1253 e + chrono::milliseconds(3000));
1254 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
1255 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
1256 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001257 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001258 }
1259
1260 {
1261 SCOPED_TRACE("Trying node1 now");
1262 std::deque<TimestampedMessage> output1;
1263
1264 for (int i = 0; i < 3; ++i) {
1265 ASSERT_TRUE(mapper1.Front() != nullptr);
1266 output1.emplace_back(std::move(*mapper1.Front()));
1267 mapper1.PopFront();
1268 }
1269
1270 ASSERT_TRUE(mapper1.Front() == nullptr);
1271
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001272 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1273 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001274 e + chrono::seconds(100) + chrono::milliseconds(1000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001275 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
1276 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001277 e + chrono::nanoseconds(971));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001278 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001279
1280 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1281 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001282 e + chrono::seconds(100) + chrono::milliseconds(2000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001283 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
1284 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001285 e + chrono::nanoseconds(5458));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001286 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001287
1288 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1289 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh8bf1e632021-01-02 22:41:04 -08001290 e + chrono::seconds(100) + chrono::milliseconds(3000));
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001291 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
1292 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
1293 monotonic_clock::min_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001294 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001295 }
Austin Schuh79b30942021-01-24 22:32:21 -08001296
1297 EXPECT_EQ(mapper0_count, 3u);
1298 EXPECT_EQ(mapper1_count, 3u);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001299}
1300
Austin Schuhd2f96102020-12-01 20:27:29 -08001301// Tests that we can match timestamps on delivered messages. By doing this in
1302// the reverse order, the second node needs to queue data up from the first node
1303// to find the matching timestamp.
1304TEST_F(TimestampMapperTest, ReadNode1First) {
1305 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1306 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001307 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001308 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001309 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001310 writer1.QueueSpan(config2_.span());
1311
Austin Schuhd863e6e2022-10-16 15:44:50 -07001312 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001313 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001314 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001315 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1316
Austin Schuhd863e6e2022-10-16 15:44:50 -07001317 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001318 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001319 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001320 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1321
Austin Schuhd863e6e2022-10-16 15:44:50 -07001322 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001323 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001324 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001325 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1326 }
1327
1328 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1329
1330 ASSERT_EQ(parts[0].logger_node, "pi1");
1331 ASSERT_EQ(parts[1].logger_node, "pi2");
1332
Austin Schuh79b30942021-01-24 22:32:21 -08001333 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001334 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001335 mapper0.set_timestamp_callback(
1336 [&](TimestampedMessage *) { ++mapper0_count; });
1337 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001338 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001339 mapper1.set_timestamp_callback(
1340 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001341
1342 mapper0.AddPeer(&mapper1);
1343 mapper1.AddPeer(&mapper0);
1344
1345 {
1346 SCOPED_TRACE("Trying node1 now");
1347 std::deque<TimestampedMessage> output1;
1348
1349 ASSERT_TRUE(mapper1.Front() != nullptr);
1350 output1.emplace_back(std::move(*mapper1.Front()));
1351 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001352 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001353
1354 ASSERT_TRUE(mapper1.Front() != nullptr);
1355 output1.emplace_back(std::move(*mapper1.Front()));
1356 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001357 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001358
1359 ASSERT_TRUE(mapper1.Front() != nullptr);
1360 output1.emplace_back(std::move(*mapper1.Front()));
1361 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001362 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001363
1364 ASSERT_TRUE(mapper1.Front() == nullptr);
1365
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001366 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1367 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001368 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001369 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001370
1371 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1372 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001373 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001374 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001375
1376 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1377 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001378 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001379 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001380 }
1381
1382 {
1383 std::deque<TimestampedMessage> output0;
1384
1385 ASSERT_TRUE(mapper0.Front() != nullptr);
1386 output0.emplace_back(std::move(*mapper0.Front()));
1387 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001388 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001389
1390 ASSERT_TRUE(mapper0.Front() != nullptr);
1391 output0.emplace_back(std::move(*mapper0.Front()));
1392 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001393 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001394
1395 ASSERT_TRUE(mapper0.Front() != nullptr);
1396 output0.emplace_back(std::move(*mapper0.Front()));
1397 mapper0.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001398 EXPECT_TRUE(mapper0.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001399
1400 ASSERT_TRUE(mapper0.Front() == nullptr);
1401
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001402 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1403 EXPECT_EQ(output0[0].monotonic_event_time.time,
1404 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001405 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001406
1407 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1408 EXPECT_EQ(output0[1].monotonic_event_time.time,
1409 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001410 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001411
1412 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1413 EXPECT_EQ(output0[2].monotonic_event_time.time,
1414 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001415 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001416 }
Austin Schuh79b30942021-01-24 22:32:21 -08001417
1418 EXPECT_EQ(mapper0_count, 3u);
1419 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001420}
1421
1422// Tests that we return just the timestamps if we couldn't find the data and the
1423// missing data was at the beginning of the file.
1424TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
1425 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1426 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001427 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001428 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001429 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001430 writer1.QueueSpan(config2_.span());
1431
1432 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001433 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001434 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1435
Austin Schuhd863e6e2022-10-16 15:44:50 -07001436 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001437 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001438 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001439 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1440
Austin Schuhd863e6e2022-10-16 15:44:50 -07001441 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001442 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001443 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001444 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1445 }
1446
1447 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1448
1449 ASSERT_EQ(parts[0].logger_node, "pi1");
1450 ASSERT_EQ(parts[1].logger_node, "pi2");
1451
Austin Schuh79b30942021-01-24 22:32:21 -08001452 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001453 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001454 mapper0.set_timestamp_callback(
1455 [&](TimestampedMessage *) { ++mapper0_count; });
1456 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001457 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001458 mapper1.set_timestamp_callback(
1459 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001460
1461 mapper0.AddPeer(&mapper1);
1462 mapper1.AddPeer(&mapper0);
1463
1464 {
1465 SCOPED_TRACE("Trying node1 now");
1466 std::deque<TimestampedMessage> output1;
1467
1468 ASSERT_TRUE(mapper1.Front() != nullptr);
1469 output1.emplace_back(std::move(*mapper1.Front()));
1470 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001471 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001472
1473 ASSERT_TRUE(mapper1.Front() != nullptr);
1474 output1.emplace_back(std::move(*mapper1.Front()));
1475 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001476 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001477
1478 ASSERT_TRUE(mapper1.Front() != nullptr);
1479 output1.emplace_back(std::move(*mapper1.Front()));
1480 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001481 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001482
1483 ASSERT_TRUE(mapper1.Front() == nullptr);
1484
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001485 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1486 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001487 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001488 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001489
1490 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1491 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001492 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001493 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001494
1495 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1496 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001497 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001498 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001499 }
Austin Schuh79b30942021-01-24 22:32:21 -08001500
1501 EXPECT_EQ(mapper0_count, 0u);
1502 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001503}
1504
1505// Tests that we return just the timestamps if we couldn't find the data and the
1506// missing data was at the end of the file.
1507TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
1508 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1509 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001510 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001511 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001512 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001513 writer1.QueueSpan(config2_.span());
1514
Austin Schuhd863e6e2022-10-16 15:44:50 -07001515 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001516 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001517 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001518 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1519
Austin Schuhd863e6e2022-10-16 15:44:50 -07001520 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001521 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001522 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001523 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1524
1525 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001526 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001527 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1528 }
1529
1530 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1531
1532 ASSERT_EQ(parts[0].logger_node, "pi1");
1533 ASSERT_EQ(parts[1].logger_node, "pi2");
1534
Austin Schuh79b30942021-01-24 22:32:21 -08001535 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001536 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001537 mapper0.set_timestamp_callback(
1538 [&](TimestampedMessage *) { ++mapper0_count; });
1539 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001540 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001541 mapper1.set_timestamp_callback(
1542 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001543
1544 mapper0.AddPeer(&mapper1);
1545 mapper1.AddPeer(&mapper0);
1546
1547 {
1548 SCOPED_TRACE("Trying node1 now");
1549 std::deque<TimestampedMessage> output1;
1550
1551 ASSERT_TRUE(mapper1.Front() != nullptr);
1552 output1.emplace_back(std::move(*mapper1.Front()));
1553 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001554 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001555
1556 ASSERT_TRUE(mapper1.Front() != nullptr);
1557 output1.emplace_back(std::move(*mapper1.Front()));
1558 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001559 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001560
1561 ASSERT_TRUE(mapper1.Front() != nullptr);
1562 output1.emplace_back(std::move(*mapper1.Front()));
1563 mapper1.PopFront();
Austin Schuh24bf4972021-06-29 22:09:08 -07001564 EXPECT_TRUE(mapper1.started());
Austin Schuhd2f96102020-12-01 20:27:29 -08001565
1566 ASSERT_TRUE(mapper1.Front() == nullptr);
1567
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001568 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1569 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001570 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001571 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001572
1573 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1574 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001575 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001576 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001577
1578 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1579 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001580 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001581 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001582 }
Austin Schuh79b30942021-01-24 22:32:21 -08001583
1584 EXPECT_EQ(mapper0_count, 0u);
1585 EXPECT_EQ(mapper1_count, 3u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001586}
1587
Austin Schuh993ccb52020-12-12 15:59:32 -08001588// Tests that we handle a message which failed to forward or be logged.
1589TEST_F(TimestampMapperTest, ReadMissingDataMiddle) {
1590 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1591 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001592 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001593 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001594 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh993ccb52020-12-12 15:59:32 -08001595 writer1.QueueSpan(config2_.span());
1596
Austin Schuhd863e6e2022-10-16 15:44:50 -07001597 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001598 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001599 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001600 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1601
1602 // Create both the timestamp and message, but don't log them, simulating a
1603 // forwarding drop.
1604 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006);
1605 MakeTimestampMessage(e + chrono::milliseconds(2000), 0,
1606 chrono::seconds(100));
1607
Austin Schuhd863e6e2022-10-16 15:44:50 -07001608 writer0.WriteSizedFlatbuffer(
Austin Schuh993ccb52020-12-12 15:59:32 -08001609 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001610 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh993ccb52020-12-12 15:59:32 -08001611 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1612 }
1613
1614 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1615
1616 ASSERT_EQ(parts[0].logger_node, "pi1");
1617 ASSERT_EQ(parts[1].logger_node, "pi2");
1618
Austin Schuh79b30942021-01-24 22:32:21 -08001619 size_t mapper0_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001620 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001621 mapper0.set_timestamp_callback(
1622 [&](TimestampedMessage *) { ++mapper0_count; });
1623 size_t mapper1_count = 0;
Austin Schuh993ccb52020-12-12 15:59:32 -08001624 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001625 mapper1.set_timestamp_callback(
1626 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuh993ccb52020-12-12 15:59:32 -08001627
1628 mapper0.AddPeer(&mapper1);
1629 mapper1.AddPeer(&mapper0);
1630
1631 {
1632 std::deque<TimestampedMessage> output1;
1633
1634 ASSERT_TRUE(mapper1.Front() != nullptr);
1635 output1.emplace_back(std::move(*mapper1.Front()));
1636 mapper1.PopFront();
1637
1638 ASSERT_TRUE(mapper1.Front() != nullptr);
1639 output1.emplace_back(std::move(*mapper1.Front()));
1640
1641 ASSERT_FALSE(mapper1.Front() == nullptr);
1642
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001643 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1644 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001645 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001646 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001647
1648 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1649 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001650 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001651 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh993ccb52020-12-12 15:59:32 -08001652 }
Austin Schuh79b30942021-01-24 22:32:21 -08001653
1654 EXPECT_EQ(mapper0_count, 0u);
1655 EXPECT_EQ(mapper1_count, 2u);
Austin Schuh993ccb52020-12-12 15:59:32 -08001656}
1657
Austin Schuhd2f96102020-12-01 20:27:29 -08001658// Tests that we properly sort log files with duplicate timestamps.
1659TEST_F(TimestampMapperTest, ReadSameTimestamp) {
1660 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1661 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001662 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001663 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001664 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001665 writer1.QueueSpan(config2_.span());
1666
Austin Schuhd863e6e2022-10-16 15:44:50 -07001667 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001668 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001669 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001670 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1671
Austin Schuhd863e6e2022-10-16 15:44:50 -07001672 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001673 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001674 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001675 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1676
Austin Schuhd863e6e2022-10-16 15:44:50 -07001677 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001678 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001679 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001680 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1681
Austin Schuhd863e6e2022-10-16 15:44:50 -07001682 writer0.WriteSizedFlatbuffer(
Austin Schuhd2f96102020-12-01 20:27:29 -08001683 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001684 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhd2f96102020-12-01 20:27:29 -08001685 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1686 }
1687
1688 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1689
1690 ASSERT_EQ(parts[0].logger_node, "pi1");
1691 ASSERT_EQ(parts[1].logger_node, "pi2");
1692
Austin Schuh79b30942021-01-24 22:32:21 -08001693 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001694 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001695 mapper0.set_timestamp_callback(
1696 [&](TimestampedMessage *) { ++mapper0_count; });
1697 size_t mapper1_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001698 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001699 mapper1.set_timestamp_callback(
1700 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001701
1702 mapper0.AddPeer(&mapper1);
1703 mapper1.AddPeer(&mapper0);
1704
1705 {
1706 SCOPED_TRACE("Trying node1 now");
1707 std::deque<TimestampedMessage> output1;
1708
1709 for (int i = 0; i < 4; ++i) {
1710 ASSERT_TRUE(mapper1.Front() != nullptr);
1711 output1.emplace_back(std::move(*mapper1.Front()));
1712 mapper1.PopFront();
1713 }
1714 ASSERT_TRUE(mapper1.Front() == nullptr);
1715
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001716 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1717 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001718 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001719 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001720
1721 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1722 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001723 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001724 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001725
1726 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1727 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001728 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001729 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001730
1731 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
1732 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuhd2f96102020-12-01 20:27:29 -08001733 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001734 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuhd2f96102020-12-01 20:27:29 -08001735 }
Austin Schuh79b30942021-01-24 22:32:21 -08001736
1737 EXPECT_EQ(mapper0_count, 0u);
1738 EXPECT_EQ(mapper1_count, 4u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001739}
1740
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001741// Tests that we properly produce a valid start time.
Austin Schuhd2f96102020-12-01 20:27:29 -08001742TEST_F(TimestampMapperTest, StartTime) {
1743 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1744 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001745 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001746 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001747 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001748 writer1.QueueSpan(config1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001749 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuhd2f96102020-12-01 20:27:29 -08001750 writer2.QueueSpan(config3_.span());
1751 }
1752
1753 const std::vector<LogFile> parts =
1754 SortParts({logfile0_, logfile1_, logfile2_});
1755
Austin Schuh79b30942021-01-24 22:32:21 -08001756 size_t mapper0_count = 0;
Austin Schuhd2f96102020-12-01 20:27:29 -08001757 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
Austin Schuh79b30942021-01-24 22:32:21 -08001758 mapper0.set_timestamp_callback(
1759 [&](TimestampedMessage *) { ++mapper0_count; });
Austin Schuhd2f96102020-12-01 20:27:29 -08001760
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001761 EXPECT_EQ(mapper0.monotonic_start_time(0), e + chrono::milliseconds(1));
1762 EXPECT_EQ(mapper0.realtime_start_time(0),
Austin Schuhd2f96102020-12-01 20:27:29 -08001763 realtime_clock::time_point(chrono::seconds(1000)));
Austin Schuh79b30942021-01-24 22:32:21 -08001764 EXPECT_EQ(mapper0_count, 0u);
Austin Schuhd2f96102020-12-01 20:27:29 -08001765}
1766
Austin Schuhfecf1d82020-12-19 16:57:28 -08001767// Tests that when a peer isn't registered, we treat that as if there was no
1768// data available.
1769TEST_F(TimestampMapperTest, NoPeer) {
1770 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1771 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001772 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001773 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001774 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001775 writer1.QueueSpan(config2_.span());
1776
1777 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
Austin Schuhd863e6e2022-10-16 15:44:50 -07001778 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001779 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1780
Austin Schuhd863e6e2022-10-16 15:44:50 -07001781 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001782 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001783 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001784 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1785
Austin Schuhd863e6e2022-10-16 15:44:50 -07001786 writer0.WriteSizedFlatbuffer(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001787 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001788 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuhfecf1d82020-12-19 16:57:28 -08001789 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1790 }
1791
1792 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1793
1794 ASSERT_EQ(parts[0].logger_node, "pi1");
1795 ASSERT_EQ(parts[1].logger_node, "pi2");
1796
Austin Schuh79b30942021-01-24 22:32:21 -08001797 size_t mapper1_count = 0;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001798 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
Austin Schuh79b30942021-01-24 22:32:21 -08001799 mapper1.set_timestamp_callback(
1800 [&](TimestampedMessage *) { ++mapper1_count; });
Austin Schuhfecf1d82020-12-19 16:57:28 -08001801
1802 {
1803 std::deque<TimestampedMessage> output1;
1804
1805 ASSERT_TRUE(mapper1.Front() != nullptr);
1806 output1.emplace_back(std::move(*mapper1.Front()));
1807 mapper1.PopFront();
1808 ASSERT_TRUE(mapper1.Front() != nullptr);
1809 output1.emplace_back(std::move(*mapper1.Front()));
1810 mapper1.PopFront();
1811 ASSERT_TRUE(mapper1.Front() != nullptr);
1812 output1.emplace_back(std::move(*mapper1.Front()));
1813 mapper1.PopFront();
1814 ASSERT_TRUE(mapper1.Front() == nullptr);
1815
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001816 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1817 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001818 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001819 EXPECT_FALSE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001820
1821 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1822 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001823 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001824 EXPECT_FALSE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001825
1826 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
1827 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001828 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001829 EXPECT_FALSE(output1[2].data != nullptr);
Austin Schuhfecf1d82020-12-19 16:57:28 -08001830 }
Austin Schuh79b30942021-01-24 22:32:21 -08001831 EXPECT_EQ(mapper1_count, 3u);
1832}
1833
1834// Tests that we can queue messages and call the timestamp callback for both
1835// nodes.
1836TEST_F(TimestampMapperTest, QueueUntilNode0) {
1837 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
1838 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07001839 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh79b30942021-01-24 22:32:21 -08001840 writer0.QueueSpan(config0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07001841 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh79b30942021-01-24 22:32:21 -08001842 writer1.QueueSpan(config2_.span());
1843
Austin Schuhd863e6e2022-10-16 15:44:50 -07001844 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001845 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001846 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001847 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1848
Austin Schuhd863e6e2022-10-16 15:44:50 -07001849 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001850 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001851 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001852 e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
1853
Austin Schuhd863e6e2022-10-16 15:44:50 -07001854 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001855 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001856 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001857 e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
1858
Austin Schuhd863e6e2022-10-16 15:44:50 -07001859 writer0.WriteSizedFlatbuffer(
Austin Schuh79b30942021-01-24 22:32:21 -08001860 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
Austin Schuhd863e6e2022-10-16 15:44:50 -07001861 writer1.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh79b30942021-01-24 22:32:21 -08001862 e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
1863 }
1864
1865 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
1866
1867 ASSERT_EQ(parts[0].logger_node, "pi1");
1868 ASSERT_EQ(parts[1].logger_node, "pi2");
1869
1870 size_t mapper0_count = 0;
1871 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
1872 mapper0.set_timestamp_callback(
1873 [&](TimestampedMessage *) { ++mapper0_count; });
1874 size_t mapper1_count = 0;
1875 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
1876 mapper1.set_timestamp_callback(
1877 [&](TimestampedMessage *) { ++mapper1_count; });
1878
1879 mapper0.AddPeer(&mapper1);
1880 mapper1.AddPeer(&mapper0);
1881
1882 {
1883 std::deque<TimestampedMessage> output0;
1884
1885 EXPECT_EQ(mapper0_count, 0u);
1886 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001887 mapper0.QueueUntil(
1888 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001889 EXPECT_EQ(mapper0_count, 3u);
1890 EXPECT_EQ(mapper1_count, 0u);
1891
1892 ASSERT_TRUE(mapper0.Front() != nullptr);
1893 EXPECT_EQ(mapper0_count, 3u);
1894 EXPECT_EQ(mapper1_count, 0u);
1895
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001896 mapper0.QueueUntil(
1897 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001898 EXPECT_EQ(mapper0_count, 3u);
1899 EXPECT_EQ(mapper1_count, 0u);
1900
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001901 mapper0.QueueUntil(
1902 BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001903 EXPECT_EQ(mapper0_count, 4u);
1904 EXPECT_EQ(mapper1_count, 0u);
1905
1906 output0.emplace_back(std::move(*mapper0.Front()));
1907 mapper0.PopFront();
1908 output0.emplace_back(std::move(*mapper0.Front()));
1909 mapper0.PopFront();
1910 output0.emplace_back(std::move(*mapper0.Front()));
1911 mapper0.PopFront();
1912 output0.emplace_back(std::move(*mapper0.Front()));
1913 mapper0.PopFront();
1914
1915 EXPECT_EQ(mapper0_count, 4u);
1916 EXPECT_EQ(mapper1_count, 0u);
1917
1918 ASSERT_TRUE(mapper0.Front() == nullptr);
1919
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001920 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
1921 EXPECT_EQ(output0[0].monotonic_event_time.time,
1922 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001923 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001924
1925 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
1926 EXPECT_EQ(output0[1].monotonic_event_time.time,
1927 e + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001928 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001929
1930 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
1931 EXPECT_EQ(output0[2].monotonic_event_time.time,
1932 e + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001933 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001934
1935 EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
1936 EXPECT_EQ(output0[3].monotonic_event_time.time,
1937 e + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001938 EXPECT_TRUE(output0[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08001939 }
1940
1941 {
1942 SCOPED_TRACE("Trying node1 now");
1943 std::deque<TimestampedMessage> output1;
1944
1945 EXPECT_EQ(mapper0_count, 4u);
1946 EXPECT_EQ(mapper1_count, 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001947 mapper1.QueueUntil(BootTimestamp{
1948 .boot = 0,
1949 .time = e + chrono::seconds(100) + chrono::milliseconds(1000)});
Austin Schuh79b30942021-01-24 22:32:21 -08001950 EXPECT_EQ(mapper0_count, 4u);
1951 EXPECT_EQ(mapper1_count, 3u);
1952
1953 ASSERT_TRUE(mapper1.Front() != nullptr);
1954 EXPECT_EQ(mapper0_count, 4u);
1955 EXPECT_EQ(mapper1_count, 3u);
1956
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001957 mapper1.QueueUntil(BootTimestamp{
1958 .boot = 0,
1959 .time = e + chrono::seconds(100) + chrono::milliseconds(1500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001960 EXPECT_EQ(mapper0_count, 4u);
1961 EXPECT_EQ(mapper1_count, 3u);
1962
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001963 mapper1.QueueUntil(BootTimestamp{
1964 .boot = 0,
1965 .time = e + chrono::seconds(100) + chrono::milliseconds(2500)});
Austin Schuh79b30942021-01-24 22:32:21 -08001966 EXPECT_EQ(mapper0_count, 4u);
1967 EXPECT_EQ(mapper1_count, 4u);
1968
1969 ASSERT_TRUE(mapper1.Front() != nullptr);
1970 EXPECT_EQ(mapper0_count, 4u);
1971 EXPECT_EQ(mapper1_count, 4u);
1972
1973 output1.emplace_back(std::move(*mapper1.Front()));
1974 mapper1.PopFront();
1975 ASSERT_TRUE(mapper1.Front() != nullptr);
1976 output1.emplace_back(std::move(*mapper1.Front()));
1977 mapper1.PopFront();
1978 ASSERT_TRUE(mapper1.Front() != nullptr);
1979 output1.emplace_back(std::move(*mapper1.Front()));
1980 mapper1.PopFront();
1981 ASSERT_TRUE(mapper1.Front() != nullptr);
1982 output1.emplace_back(std::move(*mapper1.Front()));
1983 mapper1.PopFront();
1984
1985 EXPECT_EQ(mapper0_count, 4u);
1986 EXPECT_EQ(mapper1_count, 4u);
1987
1988 ASSERT_TRUE(mapper1.Front() == nullptr);
1989
1990 EXPECT_EQ(mapper0_count, 4u);
1991 EXPECT_EQ(mapper1_count, 4u);
1992
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001993 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
1994 EXPECT_EQ(output1[0].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08001995 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001996 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001997
1998 EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
1999 EXPECT_EQ(output1[1].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002000 e + chrono::seconds(100) + chrono::milliseconds(1000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002001 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002002
2003 EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
2004 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002005 e + chrono::seconds(100) + chrono::milliseconds(2000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002006 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002007
2008 EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
2009 EXPECT_EQ(output1[3].monotonic_event_time.time,
Austin Schuh79b30942021-01-24 22:32:21 -08002010 e + chrono::seconds(100) + chrono::milliseconds(3000));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002011 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh79b30942021-01-24 22:32:21 -08002012 }
Austin Schuhfecf1d82020-12-19 16:57:28 -08002013}
2014
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002015class BootMergerTest : public SortingElementTest {
2016 public:
2017 BootMergerTest()
2018 : SortingElementTest(),
2019 boot0_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002020 /* 100ms */
2021 "max_out_of_order_duration": 100000000,
2022 "node": {
2023 "name": "pi2"
2024 },
2025 "logger_node": {
2026 "name": "pi1"
2027 },
2028 "monotonic_start_time": 1000000,
2029 "realtime_start_time": 1000000000000,
2030 "logger_monotonic_start_time": 1000000,
2031 "logger_realtime_start_time": 1000000000000,
2032 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2033 "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
2034 "parts_index": 0,
2035 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2036 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002037 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2038 "boot_uuids": [
2039 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2040 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2041 ""
2042 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002043})")),
2044 boot1_(MakeHeader(config_, R"({
Austin Schuh8bee6882021-06-28 21:03:28 -07002045 /* 100ms */
2046 "max_out_of_order_duration": 100000000,
2047 "node": {
2048 "name": "pi2"
2049 },
2050 "logger_node": {
2051 "name": "pi1"
2052 },
2053 "monotonic_start_time": 1000000,
2054 "realtime_start_time": 1000000000000,
2055 "logger_monotonic_start_time": 1000000,
2056 "logger_realtime_start_time": 1000000000000,
2057 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2058 "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
2059 "parts_index": 1,
2060 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2061 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
Austin Schuh48507722021-07-17 17:29:24 -07002062 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2063 "boot_uuids": [
2064 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2065 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2066 ""
2067 ]
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002068})")) {}
Austin Schuh8bee6882021-06-28 21:03:28 -07002069
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002070 protected:
2071 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0_;
2072 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1_;
2073};
2074
2075// This tests that we can properly sort a multi-node log file which has the old
2076// (and buggy) timestamps in the header, and the non-resetting parts_index.
2077// These make it so we can just bairly figure out what happened first and what
2078// happened second, but not in a way that is robust to multiple nodes rebooting.
2079TEST_F(BootMergerTest, OldReboot) {
Austin Schuh8bee6882021-06-28 21:03:28 -07002080 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002081 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002082 writer.QueueSpan(boot0_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002083 }
2084 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002085 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002086 writer.QueueSpan(boot1_.span());
Austin Schuh8bee6882021-06-28 21:03:28 -07002087 }
2088
2089 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
2090
2091 ASSERT_EQ(parts.size(), 1u);
2092 ASSERT_EQ(parts[0].parts.size(), 2u);
2093
2094 EXPECT_EQ(parts[0].parts[0].boot_count, 0);
2095 EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002096 boot0_.message().source_node_boot_uuid()->string_view());
Austin Schuh8bee6882021-06-28 21:03:28 -07002097
2098 EXPECT_EQ(parts[0].parts[1].boot_count, 1);
2099 EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002100 boot1_.message().source_node_boot_uuid()->string_view());
2101}
2102
2103// This tests that we can produce messages ordered across a reboot.
2104TEST_F(BootMergerTest, SortAcrossReboot) {
2105 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2106 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002107 TestDetachedBufferWriter writer(logfile0_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002108 writer.QueueSpan(boot0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002109 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002110 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002111 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002112 MakeLogMessage(e + chrono::milliseconds(2000), 1, 0x105));
2113 }
2114 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002115 TestDetachedBufferWriter writer(logfile1_);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002116 writer.QueueSpan(boot1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002117 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002118 MakeLogMessage(e + chrono::milliseconds(100), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002119 writer.WriteSizedFlatbuffer(
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002120 MakeLogMessage(e + chrono::milliseconds(200), 1, 0x106));
2121 }
2122
2123 const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
2124 ASSERT_EQ(parts.size(), 1u);
2125 ASSERT_EQ(parts[0].parts.size(), 2u);
2126
2127 BootMerger merger(FilterPartsForNode(parts, "pi2"));
2128
2129 EXPECT_EQ(merger.node(), 1u);
2130
2131 std::vector<Message> output;
2132 for (int i = 0; i < 4; ++i) {
2133 ASSERT_TRUE(merger.Front() != nullptr);
2134 output.emplace_back(std::move(*merger.Front()));
2135 merger.PopFront();
2136 }
2137
2138 ASSERT_TRUE(merger.Front() == nullptr);
2139
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002140 EXPECT_EQ(output[0].timestamp.boot, 0u);
2141 EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
2142 EXPECT_EQ(output[1].timestamp.boot, 0u);
2143 EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(2000));
2144
2145 EXPECT_EQ(output[2].timestamp.boot, 1u);
2146 EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(100));
2147 EXPECT_EQ(output[3].timestamp.boot, 1u);
2148 EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
Austin Schuh8bee6882021-06-28 21:03:28 -07002149}
2150
Austin Schuh48507722021-07-17 17:29:24 -07002151class RebootTimestampMapperTest : public SortingElementTest {
2152 public:
2153 RebootTimestampMapperTest()
2154 : SortingElementTest(),
2155 boot0a_(MakeHeader(config_, R"({
2156 /* 100ms */
2157 "max_out_of_order_duration": 100000000,
2158 "node": {
2159 "name": "pi1"
2160 },
2161 "logger_node": {
2162 "name": "pi1"
2163 },
2164 "monotonic_start_time": 1000000,
2165 "realtime_start_time": 1000000000000,
2166 "logger_monotonic_start_time": 1000000,
2167 "logger_realtime_start_time": 1000000000000,
2168 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2169 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2170 "parts_index": 0,
2171 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2172 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2173 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2174 "boot_uuids": [
2175 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2176 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2177 ""
2178 ]
2179})")),
2180 boot0b_(MakeHeader(config_, R"({
2181 /* 100ms */
2182 "max_out_of_order_duration": 100000000,
2183 "node": {
2184 "name": "pi1"
2185 },
2186 "logger_node": {
2187 "name": "pi1"
2188 },
2189 "monotonic_start_time": 1000000,
2190 "realtime_start_time": 1000000000000,
2191 "logger_monotonic_start_time": 1000000,
2192 "logger_realtime_start_time": 1000000000000,
2193 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2194 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2195 "parts_index": 1,
2196 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2197 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2198 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2199 "boot_uuids": [
2200 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2201 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2202 ""
2203 ]
2204})")),
2205 boot1a_(MakeHeader(config_, R"({
2206 /* 100ms */
2207 "max_out_of_order_duration": 100000000,
2208 "node": {
2209 "name": "pi2"
2210 },
2211 "logger_node": {
2212 "name": "pi1"
2213 },
2214 "monotonic_start_time": 1000000,
2215 "realtime_start_time": 1000000000000,
2216 "logger_monotonic_start_time": 1000000,
2217 "logger_realtime_start_time": 1000000000000,
2218 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2219 "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
2220 "parts_index": 0,
2221 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2222 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2223 "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2224 "boot_uuids": [
2225 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2226 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2227 ""
2228 ]
2229})")),
2230 boot1b_(MakeHeader(config_, R"({
2231 /* 100ms */
2232 "max_out_of_order_duration": 100000000,
2233 "node": {
2234 "name": "pi2"
2235 },
2236 "logger_node": {
2237 "name": "pi1"
2238 },
2239 "monotonic_start_time": 1000000,
2240 "realtime_start_time": 1000000000000,
2241 "logger_monotonic_start_time": 1000000,
2242 "logger_realtime_start_time": 1000000000000,
2243 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2244 "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
2245 "parts_index": 1,
2246 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2247 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2248 "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2249 "boot_uuids": [
2250 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2251 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2252 ""
2253 ]
2254})")) {}
2255
2256 protected:
2257 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
2258 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
2259 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
2260 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
2261};
2262
Austin Schuh48507722021-07-17 17:29:24 -07002263// Tests that we can match timestamps on delivered messages in the presence of
2264// reboots on the node receiving timestamps.
2265TEST_F(RebootTimestampMapperTest, ReadNode0First) {
2266 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2267 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002268 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002269 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002270 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002271 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002272 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002273 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002274 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002275 writer1b.QueueSpan(boot1b_.span());
2276
Austin Schuhd863e6e2022-10-16 15:44:50 -07002277 writer0a.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002278 MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002279 writer1a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002280 e + chrono::milliseconds(1000), 0, chrono::seconds(100),
2281 e + chrono::milliseconds(1001)));
2282
Austin Schuhd863e6e2022-10-16 15:44:50 -07002283 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh58646e22021-08-23 23:51:46 -07002284 e + chrono::milliseconds(1000), 0, chrono::seconds(21),
2285 e + chrono::milliseconds(2001)));
2286
Austin Schuhd863e6e2022-10-16 15:44:50 -07002287 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002288 MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002289 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002290 e + chrono::milliseconds(2000), 0, chrono::seconds(20),
2291 e + chrono::milliseconds(2001)));
2292
Austin Schuhd863e6e2022-10-16 15:44:50 -07002293 writer0b.WriteSizedFlatbuffer(
Austin Schuh48507722021-07-17 17:29:24 -07002294 MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002295 writer1b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002296 e + chrono::milliseconds(3000), 0, chrono::seconds(20),
2297 e + chrono::milliseconds(3001)));
2298 }
2299
Austin Schuh58646e22021-08-23 23:51:46 -07002300 const std::vector<LogFile> parts =
2301 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
Austin Schuh48507722021-07-17 17:29:24 -07002302
2303 for (const auto &x : parts) {
2304 LOG(INFO) << x;
2305 }
2306 ASSERT_EQ(parts.size(), 1u);
2307 ASSERT_EQ(parts[0].logger_node, "pi1");
2308
2309 size_t mapper0_count = 0;
2310 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2311 mapper0.set_timestamp_callback(
2312 [&](TimestampedMessage *) { ++mapper0_count; });
2313 size_t mapper1_count = 0;
2314 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2315 mapper1.set_timestamp_callback(
2316 [&](TimestampedMessage *) { ++mapper1_count; });
2317
2318 mapper0.AddPeer(&mapper1);
2319 mapper1.AddPeer(&mapper0);
2320
2321 {
2322 std::deque<TimestampedMessage> output0;
2323
2324 EXPECT_EQ(mapper0_count, 0u);
2325 EXPECT_EQ(mapper1_count, 0u);
2326 ASSERT_TRUE(mapper0.Front() != nullptr);
2327 EXPECT_EQ(mapper0_count, 1u);
2328 EXPECT_EQ(mapper1_count, 0u);
2329 output0.emplace_back(std::move(*mapper0.Front()));
2330 mapper0.PopFront();
2331 EXPECT_TRUE(mapper0.started());
2332 EXPECT_EQ(mapper0_count, 1u);
2333 EXPECT_EQ(mapper1_count, 0u);
2334
2335 ASSERT_TRUE(mapper0.Front() != nullptr);
2336 EXPECT_EQ(mapper0_count, 2u);
2337 EXPECT_EQ(mapper1_count, 0u);
2338 output0.emplace_back(std::move(*mapper0.Front()));
2339 mapper0.PopFront();
2340 EXPECT_TRUE(mapper0.started());
2341
2342 ASSERT_TRUE(mapper0.Front() != nullptr);
2343 output0.emplace_back(std::move(*mapper0.Front()));
2344 mapper0.PopFront();
2345 EXPECT_TRUE(mapper0.started());
2346
2347 EXPECT_EQ(mapper0_count, 3u);
2348 EXPECT_EQ(mapper1_count, 0u);
2349
2350 ASSERT_TRUE(mapper0.Front() == nullptr);
2351
2352 LOG(INFO) << output0[0];
2353 LOG(INFO) << output0[1];
2354 LOG(INFO) << output0[2];
2355
2356 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2357 EXPECT_EQ(output0[0].monotonic_event_time.time,
2358 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002359 EXPECT_EQ(output0[0].queue_index,
2360 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002361 EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
2362 EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002363 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002364
2365 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2366 EXPECT_EQ(output0[1].monotonic_event_time.time,
2367 e + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002368 EXPECT_EQ(output0[1].queue_index,
2369 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002370 EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
2371 EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002372 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002373
2374 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2375 EXPECT_EQ(output0[2].monotonic_event_time.time,
2376 e + chrono::milliseconds(3000));
Austin Schuh58646e22021-08-23 23:51:46 -07002377 EXPECT_EQ(output0[2].queue_index,
2378 (BootQueueIndex{.boot = 0u, .index = 2u}));
Austin Schuh48507722021-07-17 17:29:24 -07002379 EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
2380 EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002381 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002382 }
2383
2384 {
2385 SCOPED_TRACE("Trying node1 now");
2386 std::deque<TimestampedMessage> output1;
2387
2388 EXPECT_EQ(mapper0_count, 3u);
2389 EXPECT_EQ(mapper1_count, 0u);
2390
2391 ASSERT_TRUE(mapper1.Front() != nullptr);
2392 EXPECT_EQ(mapper0_count, 3u);
2393 EXPECT_EQ(mapper1_count, 1u);
2394 output1.emplace_back(std::move(*mapper1.Front()));
2395 mapper1.PopFront();
2396 EXPECT_TRUE(mapper1.started());
2397 EXPECT_EQ(mapper0_count, 3u);
2398 EXPECT_EQ(mapper1_count, 1u);
2399
2400 ASSERT_TRUE(mapper1.Front() != nullptr);
2401 EXPECT_EQ(mapper0_count, 3u);
2402 EXPECT_EQ(mapper1_count, 2u);
2403 output1.emplace_back(std::move(*mapper1.Front()));
2404 mapper1.PopFront();
2405 EXPECT_TRUE(mapper1.started());
2406
2407 ASSERT_TRUE(mapper1.Front() != nullptr);
2408 output1.emplace_back(std::move(*mapper1.Front()));
2409 mapper1.PopFront();
2410 EXPECT_TRUE(mapper1.started());
2411
Austin Schuh58646e22021-08-23 23:51:46 -07002412 ASSERT_TRUE(mapper1.Front() != nullptr);
2413 output1.emplace_back(std::move(*mapper1.Front()));
2414 mapper1.PopFront();
2415 EXPECT_TRUE(mapper1.started());
2416
Austin Schuh48507722021-07-17 17:29:24 -07002417 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002418 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002419
2420 ASSERT_TRUE(mapper1.Front() == nullptr);
2421
2422 EXPECT_EQ(mapper0_count, 3u);
Austin Schuh58646e22021-08-23 23:51:46 -07002423 EXPECT_EQ(mapper1_count, 4u);
Austin Schuh48507722021-07-17 17:29:24 -07002424
2425 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2426 EXPECT_EQ(output1[0].monotonic_event_time.time,
2427 e + chrono::seconds(100) + chrono::milliseconds(1000));
2428 EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
2429 EXPECT_EQ(output1[0].monotonic_remote_time.time,
2430 e + chrono::milliseconds(1000));
Austin Schuh58646e22021-08-23 23:51:46 -07002431 EXPECT_EQ(output1[0].remote_queue_index,
2432 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002433 EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
2434 EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
2435 e + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002436 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002437
2438 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2439 EXPECT_EQ(output1[1].monotonic_event_time.time,
2440 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh58646e22021-08-23 23:51:46 -07002441 EXPECT_EQ(output1[1].remote_queue_index,
2442 (BootQueueIndex{.boot = 0u, .index = 0u}));
Austin Schuh48507722021-07-17 17:29:24 -07002443 EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
2444 EXPECT_EQ(output1[1].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002445 e + chrono::milliseconds(1000));
Austin Schuh48507722021-07-17 17:29:24 -07002446 EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
2447 EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
2448 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002449 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002450
2451 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2452 EXPECT_EQ(output1[2].monotonic_event_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002453 e + chrono::seconds(20) + chrono::milliseconds(2000));
Austin Schuh48507722021-07-17 17:29:24 -07002454 EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
2455 EXPECT_EQ(output1[2].monotonic_remote_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002456 e + chrono::milliseconds(2000));
2457 EXPECT_EQ(output1[2].remote_queue_index,
2458 (BootQueueIndex{.boot = 0u, .index = 1u}));
Austin Schuh48507722021-07-17 17:29:24 -07002459 EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
2460 EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
Austin Schuh58646e22021-08-23 23:51:46 -07002461 e + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002462 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002463
Austin Schuh58646e22021-08-23 23:51:46 -07002464 EXPECT_EQ(output1[3].monotonic_event_time.boot, 1u);
2465 EXPECT_EQ(output1[3].monotonic_event_time.time,
2466 e + chrono::seconds(20) + chrono::milliseconds(3000));
2467 EXPECT_EQ(output1[3].monotonic_remote_time.boot, 0u);
2468 EXPECT_EQ(output1[3].monotonic_remote_time.time,
2469 e + chrono::milliseconds(3000));
2470 EXPECT_EQ(output1[3].remote_queue_index,
2471 (BootQueueIndex{.boot = 0u, .index = 2u}));
2472 EXPECT_EQ(output1[3].monotonic_timestamp_time.boot, 0u);
2473 EXPECT_EQ(output1[3].monotonic_timestamp_time.time,
2474 e + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002475 EXPECT_TRUE(output1[3].data != nullptr);
Austin Schuh58646e22021-08-23 23:51:46 -07002476
Austin Schuh48507722021-07-17 17:29:24 -07002477 LOG(INFO) << output1[0];
2478 LOG(INFO) << output1[1];
2479 LOG(INFO) << output1[2];
Austin Schuh58646e22021-08-23 23:51:46 -07002480 LOG(INFO) << output1[3];
Austin Schuh48507722021-07-17 17:29:24 -07002481 }
2482}
2483
2484TEST_F(RebootTimestampMapperTest, Node2Reboot) {
2485 const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
2486 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002487 TestDetachedBufferWriter writer0a(logfile0_);
Austin Schuh48507722021-07-17 17:29:24 -07002488 writer0a.QueueSpan(boot0a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002489 TestDetachedBufferWriter writer0b(logfile1_);
Austin Schuh48507722021-07-17 17:29:24 -07002490 writer0b.QueueSpan(boot0b_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002491 TestDetachedBufferWriter writer1a(logfile2_);
Austin Schuh48507722021-07-17 17:29:24 -07002492 writer1a.QueueSpan(boot1a_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002493 TestDetachedBufferWriter writer1b(logfile3_);
Austin Schuh48507722021-07-17 17:29:24 -07002494 writer1b.QueueSpan(boot1b_.span());
2495
Austin Schuhd863e6e2022-10-16 15:44:50 -07002496 writer1a.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002497 e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002498 writer0a.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002499 e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
2500 chrono::seconds(-100),
2501 e + chrono::seconds(100) + chrono::milliseconds(1001)));
2502
Austin Schuhd863e6e2022-10-16 15:44:50 -07002503 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002504 e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002505 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002506 e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
2507 chrono::seconds(-20),
2508 e + chrono::seconds(20) + chrono::milliseconds(2001)));
2509
Austin Schuhd863e6e2022-10-16 15:44:50 -07002510 writer1b.WriteSizedFlatbuffer(MakeLogMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002511 e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
Austin Schuhd863e6e2022-10-16 15:44:50 -07002512 writer0b.WriteSizedFlatbuffer(MakeTimestampMessage(
Austin Schuh48507722021-07-17 17:29:24 -07002513 e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
2514 chrono::seconds(-20),
2515 e + chrono::seconds(20) + chrono::milliseconds(3001)));
2516 }
2517
2518 const std::vector<LogFile> parts =
2519 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2520
2521 for (const auto &x : parts) {
2522 LOG(INFO) << x;
2523 }
2524 ASSERT_EQ(parts.size(), 1u);
2525 ASSERT_EQ(parts[0].logger_node, "pi1");
2526
2527 size_t mapper0_count = 0;
2528 TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
2529 mapper0.set_timestamp_callback(
2530 [&](TimestampedMessage *) { ++mapper0_count; });
2531 size_t mapper1_count = 0;
2532 TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
2533 mapper1.set_timestamp_callback(
2534 [&](TimestampedMessage *) { ++mapper1_count; });
2535
2536 mapper0.AddPeer(&mapper1);
2537 mapper1.AddPeer(&mapper0);
2538
2539 {
2540 std::deque<TimestampedMessage> output0;
2541
2542 EXPECT_EQ(mapper0_count, 0u);
2543 EXPECT_EQ(mapper1_count, 0u);
2544 ASSERT_TRUE(mapper0.Front() != nullptr);
2545 EXPECT_EQ(mapper0_count, 1u);
2546 EXPECT_EQ(mapper1_count, 0u);
2547 output0.emplace_back(std::move(*mapper0.Front()));
2548 mapper0.PopFront();
2549 EXPECT_TRUE(mapper0.started());
2550 EXPECT_EQ(mapper0_count, 1u);
2551 EXPECT_EQ(mapper1_count, 0u);
2552
2553 ASSERT_TRUE(mapper0.Front() != nullptr);
2554 EXPECT_EQ(mapper0_count, 2u);
2555 EXPECT_EQ(mapper1_count, 0u);
2556 output0.emplace_back(std::move(*mapper0.Front()));
2557 mapper0.PopFront();
2558 EXPECT_TRUE(mapper0.started());
2559
2560 ASSERT_TRUE(mapper0.Front() != nullptr);
2561 output0.emplace_back(std::move(*mapper0.Front()));
2562 mapper0.PopFront();
2563 EXPECT_TRUE(mapper0.started());
2564
2565 EXPECT_EQ(mapper0_count, 3u);
2566 EXPECT_EQ(mapper1_count, 0u);
2567
2568 ASSERT_TRUE(mapper0.Front() == nullptr);
2569
2570 LOG(INFO) << output0[0];
2571 LOG(INFO) << output0[1];
2572 LOG(INFO) << output0[2];
2573
2574 EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
2575 EXPECT_EQ(output0[0].monotonic_event_time.time,
2576 e + chrono::milliseconds(1000));
2577 EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
2578 EXPECT_EQ(output0[0].monotonic_remote_time.time,
2579 e + chrono::seconds(100) + chrono::milliseconds(1000));
2580 EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
2581 EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
2582 e + chrono::seconds(100) + chrono::milliseconds(1001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002583 EXPECT_TRUE(output0[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002584
2585 EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
2586 EXPECT_EQ(output0[1].monotonic_event_time.time,
2587 e + chrono::milliseconds(2000));
2588 EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
2589 EXPECT_EQ(output0[1].monotonic_remote_time.time,
2590 e + chrono::seconds(20) + chrono::milliseconds(2000));
2591 EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
2592 EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
2593 e + chrono::seconds(20) + chrono::milliseconds(2001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002594 EXPECT_TRUE(output0[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002595
2596 EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
2597 EXPECT_EQ(output0[2].monotonic_event_time.time,
2598 e + chrono::milliseconds(3000));
2599 EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
2600 EXPECT_EQ(output0[2].monotonic_remote_time.time,
2601 e + chrono::seconds(20) + chrono::milliseconds(3000));
2602 EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
2603 EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
2604 e + chrono::seconds(20) + chrono::milliseconds(3001));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002605 EXPECT_TRUE(output0[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002606 }
2607
2608 {
2609 SCOPED_TRACE("Trying node1 now");
2610 std::deque<TimestampedMessage> output1;
2611
2612 EXPECT_EQ(mapper0_count, 3u);
2613 EXPECT_EQ(mapper1_count, 0u);
2614
2615 ASSERT_TRUE(mapper1.Front() != nullptr);
2616 EXPECT_EQ(mapper0_count, 3u);
2617 EXPECT_EQ(mapper1_count, 1u);
2618 output1.emplace_back(std::move(*mapper1.Front()));
2619 mapper1.PopFront();
2620 EXPECT_TRUE(mapper1.started());
2621 EXPECT_EQ(mapper0_count, 3u);
2622 EXPECT_EQ(mapper1_count, 1u);
2623
2624 ASSERT_TRUE(mapper1.Front() != nullptr);
2625 EXPECT_EQ(mapper0_count, 3u);
2626 EXPECT_EQ(mapper1_count, 2u);
2627 output1.emplace_back(std::move(*mapper1.Front()));
2628 mapper1.PopFront();
2629 EXPECT_TRUE(mapper1.started());
2630
2631 ASSERT_TRUE(mapper1.Front() != nullptr);
2632 output1.emplace_back(std::move(*mapper1.Front()));
2633 mapper1.PopFront();
2634 EXPECT_TRUE(mapper1.started());
2635
2636 EXPECT_EQ(mapper0_count, 3u);
2637 EXPECT_EQ(mapper1_count, 3u);
2638
2639 ASSERT_TRUE(mapper1.Front() == nullptr);
2640
2641 EXPECT_EQ(mapper0_count, 3u);
2642 EXPECT_EQ(mapper1_count, 3u);
2643
2644 EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
2645 EXPECT_EQ(output1[0].monotonic_event_time.time,
2646 e + chrono::seconds(100) + chrono::milliseconds(1000));
2647 EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
2648 EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002649 EXPECT_TRUE(output1[0].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002650
2651 EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
2652 EXPECT_EQ(output1[1].monotonic_event_time.time,
2653 e + chrono::seconds(20) + chrono::milliseconds(2000));
2654 EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
2655 EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002656 EXPECT_TRUE(output1[1].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002657
2658 EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
2659 EXPECT_EQ(output1[2].monotonic_event_time.time,
2660 e + chrono::seconds(20) + chrono::milliseconds(3000));
2661 EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
2662 EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002663 EXPECT_TRUE(output1[2].data != nullptr);
Austin Schuh48507722021-07-17 17:29:24 -07002664
2665 LOG(INFO) << output1[0];
2666 LOG(INFO) << output1[1];
2667 LOG(INFO) << output1[2];
2668 }
2669}
2670
Austin Schuh44c61472021-11-22 21:04:10 -08002671class SortingDeathTest : public SortingElementTest {
2672 public:
2673 SortingDeathTest()
2674 : SortingElementTest(),
2675 part0_(MakeHeader(config_, R"({
2676 /* 100ms */
2677 "max_out_of_order_duration": 100000000,
2678 "node": {
2679 "name": "pi1"
2680 },
2681 "logger_node": {
2682 "name": "pi1"
2683 },
2684 "monotonic_start_time": 1000000,
2685 "realtime_start_time": 1000000000000,
2686 "logger_monotonic_start_time": 1000000,
2687 "logger_realtime_start_time": 1000000000000,
2688 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2689 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2690 "parts_index": 0,
2691 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2692 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2693 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2694 "boot_uuids": [
2695 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2696 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2697 ""
2698 ],
2699 "oldest_remote_monotonic_timestamps": [
2700 9223372036854775807,
2701 9223372036854775807,
2702 9223372036854775807
2703 ],
2704 "oldest_local_monotonic_timestamps": [
2705 9223372036854775807,
2706 9223372036854775807,
2707 9223372036854775807
2708 ],
2709 "oldest_remote_unreliable_monotonic_timestamps": [
2710 9223372036854775807,
2711 0,
2712 9223372036854775807
2713 ],
2714 "oldest_local_unreliable_monotonic_timestamps": [
2715 9223372036854775807,
2716 0,
2717 9223372036854775807
2718 ]
2719})")),
2720 part1_(MakeHeader(config_, R"({
2721 /* 100ms */
2722 "max_out_of_order_duration": 100000000,
2723 "node": {
2724 "name": "pi1"
2725 },
2726 "logger_node": {
2727 "name": "pi1"
2728 },
2729 "monotonic_start_time": 1000000,
2730 "realtime_start_time": 1000000000000,
2731 "logger_monotonic_start_time": 1000000,
2732 "logger_realtime_start_time": 1000000000000,
2733 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2734 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2735 "parts_index": 1,
2736 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2737 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2738 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2739 "boot_uuids": [
2740 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2741 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2742 ""
2743 ],
2744 "oldest_remote_monotonic_timestamps": [
2745 9223372036854775807,
2746 9223372036854775807,
2747 9223372036854775807
2748 ],
2749 "oldest_local_monotonic_timestamps": [
2750 9223372036854775807,
2751 9223372036854775807,
2752 9223372036854775807
2753 ],
2754 "oldest_remote_unreliable_monotonic_timestamps": [
2755 9223372036854775807,
2756 100000,
2757 9223372036854775807
2758 ],
2759 "oldest_local_unreliable_monotonic_timestamps": [
2760 9223372036854775807,
2761 100000,
2762 9223372036854775807
2763 ]
2764})")),
2765 part2_(MakeHeader(config_, R"({
2766 /* 100ms */
2767 "max_out_of_order_duration": 100000000,
2768 "node": {
2769 "name": "pi1"
2770 },
2771 "logger_node": {
2772 "name": "pi1"
2773 },
2774 "monotonic_start_time": 1000000,
2775 "realtime_start_time": 1000000000000,
2776 "logger_monotonic_start_time": 1000000,
2777 "logger_realtime_start_time": 1000000000000,
2778 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2779 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2780 "parts_index": 2,
2781 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2782 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2783 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2784 "boot_uuids": [
2785 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2786 "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
2787 ""
2788 ],
2789 "oldest_remote_monotonic_timestamps": [
2790 9223372036854775807,
2791 9223372036854775807,
2792 9223372036854775807
2793 ],
2794 "oldest_local_monotonic_timestamps": [
2795 9223372036854775807,
2796 9223372036854775807,
2797 9223372036854775807
2798 ],
2799 "oldest_remote_unreliable_monotonic_timestamps": [
2800 9223372036854775807,
2801 200000,
2802 9223372036854775807
2803 ],
2804 "oldest_local_unreliable_monotonic_timestamps": [
2805 9223372036854775807,
2806 200000,
2807 9223372036854775807
2808 ]
2809})")),
2810 part3_(MakeHeader(config_, R"({
2811 /* 100ms */
2812 "max_out_of_order_duration": 100000000,
2813 "node": {
2814 "name": "pi1"
2815 },
2816 "logger_node": {
2817 "name": "pi1"
2818 },
2819 "monotonic_start_time": 1000000,
2820 "realtime_start_time": 1000000000000,
2821 "logger_monotonic_start_time": 1000000,
2822 "logger_realtime_start_time": 1000000000000,
2823 "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
2824 "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
2825 "parts_index": 3,
2826 "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
2827 "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2828 "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2829 "boot_uuids": [
2830 "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
2831 "b728d27a-9181-4eac-bfc1-5d09b80469d2",
2832 ""
2833 ],
2834 "oldest_remote_monotonic_timestamps": [
2835 9223372036854775807,
2836 9223372036854775807,
2837 9223372036854775807
2838 ],
2839 "oldest_local_monotonic_timestamps": [
2840 9223372036854775807,
2841 9223372036854775807,
2842 9223372036854775807
2843 ],
2844 "oldest_remote_unreliable_monotonic_timestamps": [
2845 9223372036854775807,
2846 300000,
2847 9223372036854775807
2848 ],
2849 "oldest_local_unreliable_monotonic_timestamps": [
2850 9223372036854775807,
2851 300000,
2852 9223372036854775807
2853 ]
2854})")) {}
2855
2856 protected:
2857 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part0_;
2858 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part1_;
2859 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part2_;
2860 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> part3_;
2861};
2862
2863// Tests that if 2 computers go back and forth trying to be the same node, we
2864// die in sorting instead of failing to estimate time.
2865TEST_F(SortingDeathTest, FightingNodes) {
2866 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002867 TestDetachedBufferWriter writer0(logfile0_);
Austin Schuh44c61472021-11-22 21:04:10 -08002868 writer0.QueueSpan(part0_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002869 TestDetachedBufferWriter writer1(logfile1_);
Austin Schuh44c61472021-11-22 21:04:10 -08002870 writer1.QueueSpan(part1_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002871 TestDetachedBufferWriter writer2(logfile2_);
Austin Schuh44c61472021-11-22 21:04:10 -08002872 writer2.QueueSpan(part2_.span());
Austin Schuhd863e6e2022-10-16 15:44:50 -07002873 TestDetachedBufferWriter writer3(logfile3_);
Austin Schuh44c61472021-11-22 21:04:10 -08002874 writer3.QueueSpan(part3_.span());
2875 }
2876
2877 EXPECT_DEATH(
2878 {
2879 const std::vector<LogFile> parts =
2880 SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
2881 },
Austin Schuh22cf7862022-09-19 19:09:42 -07002882 "found overlapping boots on");
Austin Schuh44c61472021-11-22 21:04:10 -08002883}
2884
Brian Smarttea913d42021-12-10 15:02:38 -08002885// Tests that we MessageReader blows up on a bad message.
2886TEST(MessageReaderConfirmCrash, ReadWrite) {
2887 const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
2888 unlink(logfile.c_str());
2889
2890 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
2891 JsonToSizedFlatbuffer<LogFileHeader>(
2892 R"({ "max_out_of_order_duration": 100000000 })");
2893 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
2894 JsonToSizedFlatbuffer<MessageHeader>(
2895 R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
2896 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
2897 JsonToSizedFlatbuffer<MessageHeader>(
2898 R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
2899 const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m4 =
2900 JsonToSizedFlatbuffer<MessageHeader>(
2901 R"({ "channel_index": 0, "monotonic_sent_time": 4 })");
2902
2903 // Starts out like a proper flat buffer header, but it breaks down ...
2904 std::vector<uint8_t> garbage{8, 0, 0, 0, 16, 0, 0, 0, 4, 0, 0, 0};
2905 absl::Span<uint8_t> m3_span(garbage);
2906
2907 {
Austin Schuhd863e6e2022-10-16 15:44:50 -07002908 TestDetachedBufferWriter writer(logfile);
Brian Smarttea913d42021-12-10 15:02:38 -08002909 writer.QueueSpan(config.span());
2910 writer.QueueSpan(m1.span());
2911 writer.QueueSpan(m2.span());
2912 writer.QueueSpan(m3_span);
2913 writer.QueueSpan(m4.span()); // This message is "hidden"
2914 }
2915
2916 {
2917 MessageReader reader(logfile);
2918
2919 EXPECT_EQ(reader.filename(), logfile);
2920
2921 EXPECT_EQ(
2922 reader.max_out_of_order_duration(),
2923 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2924 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2925 EXPECT_TRUE(reader.ReadMessage());
2926 EXPECT_EQ(reader.newest_timestamp(),
2927 monotonic_clock::time_point(chrono::nanoseconds(1)));
2928 EXPECT_TRUE(reader.ReadMessage());
2929 EXPECT_EQ(reader.newest_timestamp(),
2930 monotonic_clock::time_point(chrono::nanoseconds(2)));
2931 // Confirm default crashing behavior
2932 EXPECT_DEATH(reader.ReadMessage(), "Corrupted message at offset");
2933 }
2934
2935 {
2936 gflags::FlagSaver fs;
2937
2938 MessageReader reader(logfile);
2939 reader.set_crash_on_corrupt_message_flag(false);
2940
2941 EXPECT_EQ(reader.filename(), logfile);
2942
2943 EXPECT_EQ(
2944 reader.max_out_of_order_duration(),
2945 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2946 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2947 EXPECT_TRUE(reader.ReadMessage());
2948 EXPECT_EQ(reader.newest_timestamp(),
2949 monotonic_clock::time_point(chrono::nanoseconds(1)));
2950 EXPECT_TRUE(reader.ReadMessage());
2951 EXPECT_EQ(reader.newest_timestamp(),
2952 monotonic_clock::time_point(chrono::nanoseconds(2)));
2953 // Confirm avoiding the corrupted message crash, stopping instead.
2954 EXPECT_FALSE(reader.ReadMessage());
2955 }
2956
2957 {
2958 gflags::FlagSaver fs;
2959
2960 MessageReader reader(logfile);
2961 reader.set_crash_on_corrupt_message_flag(false);
2962 reader.set_ignore_corrupt_messages_flag(true);
2963
2964 EXPECT_EQ(reader.filename(), logfile);
2965
2966 EXPECT_EQ(
2967 reader.max_out_of_order_duration(),
2968 std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
2969 EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
2970 EXPECT_TRUE(reader.ReadMessage());
2971 EXPECT_EQ(reader.newest_timestamp(),
2972 monotonic_clock::time_point(chrono::nanoseconds(1)));
2973 EXPECT_TRUE(reader.ReadMessage());
2974 EXPECT_EQ(reader.newest_timestamp(),
2975 monotonic_clock::time_point(chrono::nanoseconds(2)));
2976 // Confirm skipping of the corrupted message to read the hidden one.
2977 EXPECT_TRUE(reader.ReadMessage());
2978 EXPECT_EQ(reader.newest_timestamp(),
2979 monotonic_clock::time_point(chrono::nanoseconds(4)));
2980 EXPECT_FALSE(reader.ReadMessage());
2981 }
2982}
2983
Austin Schuhfa30c352022-10-16 11:12:02 -07002984class InlinePackMessage : public ::testing::Test {
2985 protected:
2986 aos::Context RandomContext() {
2987 data_ = RandomData();
2988 std::uniform_int_distribution<uint32_t> uint32_distribution(
2989 std::numeric_limits<uint32_t>::min(),
2990 std::numeric_limits<uint32_t>::max());
2991
2992 std::uniform_int_distribution<int64_t> time_distribution(
2993 std::numeric_limits<int64_t>::min(),
2994 std::numeric_limits<int64_t>::max());
2995
2996 aos::Context context;
2997 context.monotonic_event_time =
2998 aos::monotonic_clock::epoch() +
2999 chrono::nanoseconds(time_distribution(random_number_generator_));
3000 context.realtime_event_time =
3001 aos::realtime_clock::epoch() +
3002 chrono::nanoseconds(time_distribution(random_number_generator_));
3003
3004 context.monotonic_remote_time =
3005 aos::monotonic_clock::epoch() +
3006 chrono::nanoseconds(time_distribution(random_number_generator_));
3007 context.realtime_remote_time =
3008 aos::realtime_clock::epoch() +
3009 chrono::nanoseconds(time_distribution(random_number_generator_));
3010
3011 context.queue_index = uint32_distribution(random_number_generator_);
3012 context.remote_queue_index = uint32_distribution(random_number_generator_);
3013 context.size = data_.size();
3014 context.data = data_.data();
3015 return context;
3016 }
3017
Austin Schuhf2d0e682022-10-16 14:20:58 -07003018 aos::monotonic_clock::time_point RandomMonotonic() {
3019 std::uniform_int_distribution<int64_t> time_distribution(
3020 0, std::numeric_limits<int64_t>::max());
3021 return aos::monotonic_clock::epoch() +
3022 chrono::nanoseconds(time_distribution(random_number_generator_));
3023 }
3024
3025 aos::SizePrefixedFlatbufferDetachedBuffer<message_bridge::RemoteMessage>
3026 RandomRemoteMessage() {
3027 std::uniform_int_distribution<uint8_t> uint8_distribution(
3028 std::numeric_limits<uint8_t>::min(),
3029 std::numeric_limits<uint8_t>::max());
3030
3031 std::uniform_int_distribution<int64_t> time_distribution(
3032 std::numeric_limits<int64_t>::min(),
3033 std::numeric_limits<int64_t>::max());
3034
3035 flatbuffers::FlatBufferBuilder fbb;
3036 message_bridge::RemoteMessage::Builder builder(fbb);
3037 builder.add_queue_index(uint8_distribution(random_number_generator_));
3038
3039 builder.add_monotonic_sent_time(
3040 time_distribution(random_number_generator_));
3041 builder.add_realtime_sent_time(time_distribution(random_number_generator_));
3042 builder.add_monotonic_remote_time(
3043 time_distribution(random_number_generator_));
3044 builder.add_realtime_remote_time(
3045 time_distribution(random_number_generator_));
3046
3047 builder.add_remote_queue_index(
3048 uint8_distribution(random_number_generator_));
3049
3050 fbb.FinishSizePrefixed(builder.Finish());
3051 return fbb.Release();
3052 }
3053
Austin Schuhfa30c352022-10-16 11:12:02 -07003054 std::vector<uint8_t> RandomData() {
3055 std::vector<uint8_t> result;
3056 std::uniform_int_distribution<int> length_distribution(1, 32);
3057 std::uniform_int_distribution<uint8_t> data_distribution(
3058 std::numeric_limits<uint8_t>::min(),
3059 std::numeric_limits<uint8_t>::max());
3060
3061 const size_t length = length_distribution(random_number_generator_);
3062
3063 result.reserve(length);
3064 for (size_t i = 0; i < length; ++i) {
3065 result.emplace_back(data_distribution(random_number_generator_));
3066 }
3067 return result;
3068 }
3069
3070 std::mt19937 random_number_generator_{
3071 std::mt19937(::aos::testing::RandomSeed())};
3072
3073 std::vector<uint8_t> data_;
3074};
3075
3076// Uses the binary schema to annotate a provided flatbuffer. Returns the
3077// annotated flatbuffer.
3078std::string AnnotateBinaries(
3079 const aos::NonSizePrefixedFlatbuffer<reflection::Schema> &schema,
3080 const std::string &schema_filename,
3081 flatbuffers::span<uint8_t> binary_data) {
3082 flatbuffers::BinaryAnnotator binary_annotator(
3083 schema.span().data(), schema.span().size(), binary_data.data(),
3084 binary_data.size());
3085
3086 auto annotations = binary_annotator.Annotate();
3087
3088 flatbuffers::AnnotatedBinaryTextGenerator text_generator(
3089 flatbuffers::AnnotatedBinaryTextGenerator::Options{}, annotations,
3090 binary_data.data(), binary_data.size());
3091
3092 text_generator.Generate(aos::testing::TestTmpDir() + "/foo.bfbs",
3093 schema_filename);
3094
3095 return aos::util::ReadFileToStringOrDie(aos::testing::TestTmpDir() +
3096 "/foo.afb");
3097}
3098
3099// Tests that all variations of PackMessage are equivalent to the inline
3100// PackMessage used to avoid allocations.
3101TEST_F(InlinePackMessage, Equivilent) {
3102 std::uniform_int_distribution<uint32_t> uint32_distribution(
3103 std::numeric_limits<uint32_t>::min(),
3104 std::numeric_limits<uint32_t>::max());
3105 aos::FlatbufferVector<reflection::Schema> schema =
3106 FileToFlatbuffer<reflection::Schema>(
3107 ArtifactPath("aos/events/logging/logger.bfbs"));
3108
3109 for (const LogType type :
3110 {LogType::kLogMessage, LogType::kLogDeliveryTimeOnly,
3111 LogType::kLogMessageAndDeliveryTime, LogType::kLogRemoteMessage}) {
3112 for (int i = 0; i < 100; ++i) {
3113 aos::Context context = RandomContext();
3114 const uint32_t channel_index =
3115 uint32_distribution(random_number_generator_);
3116
3117 flatbuffers::FlatBufferBuilder fbb;
3118 fbb.ForceDefaults(true);
3119 fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index, type));
3120
3121 VLOG(1) << absl::BytesToHexString(std::string_view(
3122 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3123 fbb.GetBufferSpan().size()));
3124
3125 // Make sure that both the builder and inline method agree on sizes.
Austin Schuh48d10d62022-10-16 22:19:23 -07003126 ASSERT_EQ(fbb.GetSize(), PackMessageSize(type, context.size))
Austin Schuhfa30c352022-10-16 11:12:02 -07003127 << "log type " << static_cast<int>(type);
3128
3129 // Initialize the buffer to something nonzero to make sure all the padding
3130 // bytes are set to 0.
Austin Schuh48d10d62022-10-16 22:19:23 -07003131 std::vector<uint8_t> repacked_message(PackMessageSize(type, context.size),
3132 67);
Austin Schuhfa30c352022-10-16 11:12:02 -07003133
3134 // And verify packing inline works as expected.
3135 EXPECT_EQ(repacked_message.size(),
3136 PackMessageInline(repacked_message.data(), context,
3137 channel_index, type));
3138 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
3139 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3140 fbb.GetBufferSpan().size()))
3141 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
3142 fbb.GetBufferSpan());
3143 }
3144 }
3145}
3146
Austin Schuhf2d0e682022-10-16 14:20:58 -07003147// Tests that all variations of PackMessage are equivilent to the inline
3148// PackMessage used to avoid allocations.
3149TEST_F(InlinePackMessage, RemoteEquivilent) {
3150 aos::FlatbufferVector<reflection::Schema> schema =
3151 FileToFlatbuffer<reflection::Schema>(
3152 ArtifactPath("aos/events/logging/logger.bfbs"));
3153 std::uniform_int_distribution<uint8_t> uint8_distribution(
3154 std::numeric_limits<uint8_t>::min(), std::numeric_limits<uint8_t>::max());
3155
3156 for (int i = 0; i < 100; ++i) {
3157 aos::SizePrefixedFlatbufferDetachedBuffer<RemoteMessage> random_msg =
3158 RandomRemoteMessage();
3159 const size_t channel_index = uint8_distribution(random_number_generator_);
3160 const monotonic_clock::time_point monotonic_timestamp_time =
3161 RandomMonotonic();
3162
3163 flatbuffers::FlatBufferBuilder fbb;
3164 fbb.ForceDefaults(true);
3165 fbb.FinishSizePrefixed(PackRemoteMessage(
3166 &fbb, &random_msg.message(), channel_index, monotonic_timestamp_time));
3167
3168 VLOG(1) << absl::BytesToHexString(std::string_view(
3169 reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
3170 fbb.GetBufferSpan().size()));
3171
3172 // Make sure that both the builder and inline method agree on sizes.
3173 ASSERT_EQ(fbb.GetSize(), PackRemoteMessageSize());
3174
3175 // Initialize the buffer to something nonzer to make sure all the padding
3176 // bytes are set to 0.
3177 std::vector<uint8_t> repacked_message(PackRemoteMessageSize(), 67);
3178
3179 // And verify packing inline works as expected.
3180 EXPECT_EQ(
3181 repacked_message.size(),
3182 PackRemoteMessageInline(repacked_message.data(), &random_msg.message(),
3183 channel_index, monotonic_timestamp_time));
3184 EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
3185 absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
3186 fbb.GetBufferSpan().size()))
3187 << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
3188 fbb.GetBufferSpan());
3189 }
3190}
Austin Schuhfa30c352022-10-16 11:12:02 -07003191
Austin Schuhc243b422020-10-11 15:35:08 -07003192} // namespace testing
3193} // namespace logger
3194} // namespace aos