blob: bc1f5b86c440bb852219160058639f5e4181b949 [file] [log] [blame]
Naman Guptaa63aa132023-03-22 20:06:34 -07001#include "aos/events/logging/log_reader.h"
2#include "aos/events/logging/multinode_logger_test_lib.h"
3#include "aos/events/message_counter.h"
4#include "aos/events/ping_lib.h"
5#include "aos/events/pong_lib.h"
6#include "aos/network/remote_message_generated.h"
7#include "aos/network/timestamp_generated.h"
8#include "aos/testing/tmpdir.h"
9#include "gmock/gmock.h"
10#include "gtest/gtest.h"
11
12namespace aos {
13namespace logger {
14namespace testing {
15
16namespace chrono = std::chrono;
17using aos::message_bridge::RemoteMessage;
18using aos::testing::ArtifactPath;
19using aos::testing::MessageCounter;
20
21constexpr std::string_view kCombinedConfigSha1(
22 "5d73fe35bacaa59d24f8f0c1a806fe10b783b0fcc80809ee30a9db824e82538b");
23constexpr std::string_view kSplitConfigSha1(
24 "f25e8f6f90d61f41c41517e652300566228b077e44cd86f1af2af4a9bed31ad4");
25constexpr std::string_view kReloggedSplitConfigSha1(
26 "f1fabd629bdf8735c3d81bc791d7a454e8e636951c26cba6426545cbc97f911f");
27
28INSTANTIATE_TEST_SUITE_P(
29 All, MultinodeLoggerTest,
30 ::testing::Combine(
31 ::testing::Values(
32 ConfigParams{"multinode_pingpong_combined_config.json", true,
33 kCombinedConfigSha1, kCombinedConfigSha1},
34 ConfigParams{"multinode_pingpong_split_config.json", false,
35 kSplitConfigSha1, kReloggedSplitConfigSha1}),
36 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
37
38INSTANTIATE_TEST_SUITE_P(
39 All, MultinodeLoggerDeathTest,
40 ::testing::Combine(
41 ::testing::Values(
42 ConfigParams{"multinode_pingpong_combined_config.json", true,
43 kCombinedConfigSha1, kCombinedConfigSha1},
44 ConfigParams{"multinode_pingpong_split_config.json", false,
45 kSplitConfigSha1, kReloggedSplitConfigSha1}),
46 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
47
48// Tests that we can write and read simple multi-node log files.
49TEST_P(MultinodeLoggerTest, SimpleMultiNode) {
50 std::vector<std::string> actual_filenames;
51 time_converter_.StartEqual();
52
53 {
54 LoggerState pi1_logger = MakeLogger(pi1_);
55 LoggerState pi2_logger = MakeLogger(pi2_);
56
57 event_loop_factory_.RunFor(chrono::milliseconds(95));
58
59 StartLogger(&pi1_logger);
60 StartLogger(&pi2_logger);
61
62 event_loop_factory_.RunFor(chrono::milliseconds(20000));
63 pi1_logger.AppendAllFilenames(&actual_filenames);
64 pi2_logger.AppendAllFilenames(&actual_filenames);
65 }
66
67 ASSERT_THAT(actual_filenames,
68 ::testing::UnorderedElementsAreArray(logfiles_));
69
70 {
71 std::set<std::string> logfile_uuids;
72 std::set<std::string> parts_uuids;
73 // Confirm that we have the expected number of UUIDs for both the logfile
74 // UUIDs and parts UUIDs.
75 std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
76 for (std::string_view f : logfiles_) {
77 log_header.emplace_back(ReadHeader(f).value());
78 if (!log_header.back().message().has_configuration()) {
79 logfile_uuids.insert(
80 log_header.back().message().log_event_uuid()->str());
81 parts_uuids.insert(log_header.back().message().parts_uuid()->str());
82 }
83 }
84
85 EXPECT_EQ(logfile_uuids.size(), 2u);
86 if (shared()) {
87 EXPECT_EQ(parts_uuids.size(), 7u);
88 } else {
89 EXPECT_EQ(parts_uuids.size(), 8u);
90 }
91
92 // And confirm everything is on the correct node.
93 EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
94 EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi1");
95 EXPECT_EQ(log_header[4].message().node()->name()->string_view(), "pi1");
96
97 EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
98 EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi2");
99
100 EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi2");
101 EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi2");
102 EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi2");
103
104 EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi1");
105 EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi1");
106
107 EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
108 EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
109
110 if (shared()) {
111 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
112 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi2");
113 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi2");
114
115 EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi1");
116 EXPECT_EQ(log_header[18].message().node()->name()->string_view(), "pi1");
117 } else {
118 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
119 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi2");
120
121 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi1");
122 EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi1");
123
124 EXPECT_EQ(log_header[18].message().node()->name()->string_view(), "pi2");
125 EXPECT_EQ(log_header[19].message().node()->name()->string_view(), "pi2");
126 }
127
128 // And the parts index matches.
129 EXPECT_EQ(log_header[2].message().parts_index(), 0);
130 EXPECT_EQ(log_header[3].message().parts_index(), 1);
131 EXPECT_EQ(log_header[4].message().parts_index(), 2);
132
133 EXPECT_EQ(log_header[5].message().parts_index(), 0);
134 EXPECT_EQ(log_header[6].message().parts_index(), 1);
135
136 EXPECT_EQ(log_header[7].message().parts_index(), 0);
137 EXPECT_EQ(log_header[8].message().parts_index(), 1);
138 EXPECT_EQ(log_header[9].message().parts_index(), 2);
139
140 EXPECT_EQ(log_header[10].message().parts_index(), 0);
141 EXPECT_EQ(log_header[11].message().parts_index(), 1);
142
143 EXPECT_EQ(log_header[12].message().parts_index(), 0);
144 EXPECT_EQ(log_header[13].message().parts_index(), 1);
145
146 if (shared()) {
147 EXPECT_EQ(log_header[14].message().parts_index(), 0);
148 EXPECT_EQ(log_header[15].message().parts_index(), 1);
149 EXPECT_EQ(log_header[16].message().parts_index(), 2);
150
151 EXPECT_EQ(log_header[17].message().parts_index(), 0);
152 EXPECT_EQ(log_header[18].message().parts_index(), 1);
153 } else {
154 EXPECT_EQ(log_header[14].message().parts_index(), 0);
155 EXPECT_EQ(log_header[15].message().parts_index(), 1);
156
157 EXPECT_EQ(log_header[16].message().parts_index(), 0);
158 EXPECT_EQ(log_header[17].message().parts_index(), 1);
159
160 EXPECT_EQ(log_header[18].message().parts_index(), 0);
161 EXPECT_EQ(log_header[19].message().parts_index(), 1);
162 }
163 }
164
165 const std::vector<LogFile> sorted_log_files = SortParts(logfiles_);
166 {
167 using ::testing::UnorderedElementsAre;
168 std::shared_ptr<const aos::Configuration> config =
169 sorted_log_files[0].config;
170
171 // Timing reports, pings
172 EXPECT_THAT(CountChannelsData(config, logfiles_[2]),
173 UnorderedElementsAre(
174 std::make_tuple("/pi1/aos",
175 "aos.message_bridge.ServerStatistics", 1),
176 std::make_tuple("/test", "aos.examples.Ping", 1)))
177 << " : " << logfiles_[2];
178 {
179 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
180 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 1),
181 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
182 1)};
183 if (!std::get<0>(GetParam()).shared) {
184 channel_counts.push_back(
185 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
186 "aos-message_bridge-Timestamp",
187 "aos.message_bridge.RemoteMessage", 1));
188 }
189 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
190 ::testing::UnorderedElementsAreArray(channel_counts))
191 << " : " << logfiles_[3];
192 }
193 {
194 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
195 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 199),
196 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
197 20),
198 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
199 199),
200 std::make_tuple("/pi1/aos", "aos.timing.Report", 40),
201 std::make_tuple("/test", "aos.examples.Ping", 2000)};
202 if (!std::get<0>(GetParam()).shared) {
203 channel_counts.push_back(
204 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
205 "aos-message_bridge-Timestamp",
206 "aos.message_bridge.RemoteMessage", 199));
207 }
208 EXPECT_THAT(CountChannelsData(config, logfiles_[4]),
209 ::testing::UnorderedElementsAreArray(channel_counts))
210 << " : " << logfiles_[4];
211 }
212 // Timestamps for pong
213 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
214 UnorderedElementsAre())
215 << " : " << logfiles_[2];
216 EXPECT_THAT(
217 CountChannelsTimestamp(config, logfiles_[3]),
218 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
219 << " : " << logfiles_[3];
220 EXPECT_THAT(
221 CountChannelsTimestamp(config, logfiles_[4]),
222 UnorderedElementsAre(
223 std::make_tuple("/test", "aos.examples.Pong", 2000),
224 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)))
225 << " : " << logfiles_[4];
226
227 // Pong data.
228 EXPECT_THAT(
229 CountChannelsData(config, logfiles_[5]),
230 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 91)))
231 << " : " << logfiles_[5];
232 EXPECT_THAT(CountChannelsData(config, logfiles_[6]),
233 UnorderedElementsAre(
234 std::make_tuple("/test", "aos.examples.Pong", 1910)))
235 << " : " << logfiles_[6];
236
237 // No timestamps
238 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
239 UnorderedElementsAre())
240 << " : " << logfiles_[5];
241 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[6]),
242 UnorderedElementsAre())
243 << " : " << logfiles_[6];
244
245 // Timing reports and pongs.
246 EXPECT_THAT(CountChannelsData(config, logfiles_[7]),
247 UnorderedElementsAre(std::make_tuple(
248 "/pi2/aos", "aos.message_bridge.ServerStatistics", 1)))
249 << " : " << logfiles_[7];
250 EXPECT_THAT(
251 CountChannelsData(config, logfiles_[8]),
252 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
253 << " : " << logfiles_[8];
254 EXPECT_THAT(
255 CountChannelsData(config, logfiles_[9]),
256 UnorderedElementsAre(
257 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
258 std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
259 20),
260 std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
261 200),
262 std::make_tuple("/pi2/aos", "aos.timing.Report", 40),
263 std::make_tuple("/test", "aos.examples.Pong", 2000)))
264 << " : " << logfiles_[9];
265 // And ping timestamps.
266 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[7]),
267 UnorderedElementsAre())
268 << " : " << logfiles_[7];
269 EXPECT_THAT(
270 CountChannelsTimestamp(config, logfiles_[8]),
271 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
272 << " : " << logfiles_[8];
273 EXPECT_THAT(
274 CountChannelsTimestamp(config, logfiles_[9]),
275 UnorderedElementsAre(
276 std::make_tuple("/test", "aos.examples.Ping", 2000),
277 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
278 << " : " << logfiles_[9];
279
280 // And then test that the remotely logged timestamp data files only have
281 // timestamps in them.
282 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
283 UnorderedElementsAre())
284 << " : " << logfiles_[10];
285 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
286 UnorderedElementsAre())
287 << " : " << logfiles_[11];
288 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[12]),
289 UnorderedElementsAre())
290 << " : " << logfiles_[12];
291 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[13]),
292 UnorderedElementsAre())
293 << " : " << logfiles_[13];
294
295 EXPECT_THAT(CountChannelsData(config, logfiles_[10]),
296 UnorderedElementsAre(std::make_tuple(
297 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
298 << " : " << logfiles_[10];
299 EXPECT_THAT(CountChannelsData(config, logfiles_[11]),
300 UnorderedElementsAre(std::make_tuple(
301 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
302 << " : " << logfiles_[11];
303
304 EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
305 UnorderedElementsAre(std::make_tuple(
306 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
307 << " : " << logfiles_[12];
308 EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
309 UnorderedElementsAre(std::make_tuple(
310 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
311 << " : " << logfiles_[13];
312
313 // Timestamps from pi2 on pi1, and the other way.
314 if (shared()) {
315 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
316 UnorderedElementsAre())
317 << " : " << logfiles_[14];
318 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
319 UnorderedElementsAre())
320 << " : " << logfiles_[15];
321 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
322 UnorderedElementsAre())
323 << " : " << logfiles_[16];
324 EXPECT_THAT(CountChannelsData(config, logfiles_[17]),
325 UnorderedElementsAre())
326 << " : " << logfiles_[17];
327 EXPECT_THAT(CountChannelsData(config, logfiles_[18]),
328 UnorderedElementsAre())
329 << " : " << logfiles_[18];
330
331 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
332 UnorderedElementsAre(
333 std::make_tuple("/test", "aos.examples.Ping", 1)))
334 << " : " << logfiles_[14];
335 EXPECT_THAT(
336 CountChannelsTimestamp(config, logfiles_[15]),
337 UnorderedElementsAre(
338 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 9),
339 std::make_tuple("/test", "aos.examples.Ping", 90)))
340 << " : " << logfiles_[15];
341 EXPECT_THAT(
342 CountChannelsTimestamp(config, logfiles_[16]),
343 UnorderedElementsAre(
344 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 191),
345 std::make_tuple("/test", "aos.examples.Ping", 1910)))
346 << " : " << logfiles_[16];
347 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
348 UnorderedElementsAre(std::make_tuple(
349 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
350 << " : " << logfiles_[17];
351 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[18]),
352 UnorderedElementsAre(std::make_tuple(
353 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
354 << " : " << logfiles_[18];
355 } else {
356 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
357 UnorderedElementsAre())
358 << " : " << logfiles_[14];
359 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
360 UnorderedElementsAre())
361 << " : " << logfiles_[15];
362 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
363 UnorderedElementsAre())
364 << " : " << logfiles_[16];
365 EXPECT_THAT(CountChannelsData(config, logfiles_[17]),
366 UnorderedElementsAre())
367 << " : " << logfiles_[17];
368 EXPECT_THAT(CountChannelsData(config, logfiles_[18]),
369 UnorderedElementsAre())
370 << " : " << logfiles_[18];
371 EXPECT_THAT(CountChannelsData(config, logfiles_[19]),
372 UnorderedElementsAre())
373 << " : " << logfiles_[19];
374
375 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
376 UnorderedElementsAre(std::make_tuple(
377 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
378 << " : " << logfiles_[14];
379 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
380 UnorderedElementsAre(std::make_tuple(
381 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
382 << " : " << logfiles_[15];
383 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[16]),
384 UnorderedElementsAre(std::make_tuple(
385 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
386 << " : " << logfiles_[16];
387 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
388 UnorderedElementsAre(std::make_tuple(
389 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
390 << " : " << logfiles_[17];
391 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[18]),
392 UnorderedElementsAre(
393 std::make_tuple("/test", "aos.examples.Ping", 91)))
394 << " : " << logfiles_[18];
395 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[19]),
396 UnorderedElementsAre(
397 std::make_tuple("/test", "aos.examples.Ping", 1910)))
398 << " : " << logfiles_[19];
399 }
400 }
401
402 LogReader reader(sorted_log_files);
403
404 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
405 log_reader_factory.set_send_delay(chrono::microseconds(0));
406
407 // This sends out the fetched messages and advances time to the start of the
408 // log file.
409 reader.Register(&log_reader_factory);
410
411 const Node *pi1 =
412 configuration::GetNode(log_reader_factory.configuration(), "pi1");
413 const Node *pi2 =
414 configuration::GetNode(log_reader_factory.configuration(), "pi2");
415
416 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
417 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
418 LOG(INFO) << "now pi1 "
419 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
420 LOG(INFO) << "now pi2 "
421 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
422
423 EXPECT_THAT(reader.LoggedNodes(),
424 ::testing::ElementsAre(
425 configuration::GetNode(reader.logged_configuration(), pi1),
426 configuration::GetNode(reader.logged_configuration(), pi2)));
427
428 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
429
430 std::unique_ptr<EventLoop> pi1_event_loop =
431 log_reader_factory.MakeEventLoop("test", pi1);
432 std::unique_ptr<EventLoop> pi2_event_loop =
433 log_reader_factory.MakeEventLoop("test", pi2);
434
435 int pi1_ping_count = 10;
436 int pi2_ping_count = 10;
437 int pi1_pong_count = 10;
438 int pi2_pong_count = 10;
439
440 // Confirm that the ping value matches.
441 pi1_event_loop->MakeWatcher(
442 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
443 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
444 << pi1_event_loop->context().monotonic_remote_time << " -> "
445 << pi1_event_loop->context().monotonic_event_time;
446 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
447 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
448 pi1_ping_count * chrono::milliseconds(10) +
449 monotonic_clock::epoch());
450 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
451 pi1_ping_count * chrono::milliseconds(10) +
452 realtime_clock::epoch());
453 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
454 pi1_event_loop->context().monotonic_event_time);
455 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
456 pi1_event_loop->context().realtime_event_time);
457
458 ++pi1_ping_count;
459 });
460 pi2_event_loop->MakeWatcher(
461 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
462 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
463 << pi2_event_loop->context().monotonic_remote_time << " -> "
464 << pi2_event_loop->context().monotonic_event_time;
465 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
466
467 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
468 pi2_ping_count * chrono::milliseconds(10) +
469 monotonic_clock::epoch());
470 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
471 pi2_ping_count * chrono::milliseconds(10) +
472 realtime_clock::epoch());
473 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time +
474 chrono::microseconds(150),
475 pi2_event_loop->context().monotonic_event_time);
476 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time +
477 chrono::microseconds(150),
478 pi2_event_loop->context().realtime_event_time);
479 ++pi2_ping_count;
480 });
481
482 constexpr ssize_t kQueueIndexOffset = -9;
483 // Confirm that the ping and pong counts both match, and the value also
484 // matches.
485 pi1_event_loop->MakeWatcher(
486 "/test", [&pi1_event_loop, &pi1_ping_count,
487 &pi1_pong_count](const examples::Pong &pong) {
488 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
489 << pi1_event_loop->context().monotonic_remote_time << " -> "
490 << pi1_event_loop->context().monotonic_event_time;
491
492 EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
493 pi1_pong_count + kQueueIndexOffset);
494 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
495 chrono::microseconds(200) +
496 pi1_pong_count * chrono::milliseconds(10) +
497 monotonic_clock::epoch());
498 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
499 chrono::microseconds(200) +
500 pi1_pong_count * chrono::milliseconds(10) +
501 realtime_clock::epoch());
502
503 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
504 chrono::microseconds(150),
505 pi1_event_loop->context().monotonic_event_time);
506 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
507 chrono::microseconds(150),
508 pi1_event_loop->context().realtime_event_time);
509
510 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
511 ++pi1_pong_count;
512 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
513 });
514 pi2_event_loop->MakeWatcher(
515 "/test", [&pi2_event_loop, &pi2_ping_count,
516 &pi2_pong_count](const examples::Pong &pong) {
517 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
518 << pi2_event_loop->context().monotonic_remote_time << " -> "
519 << pi2_event_loop->context().monotonic_event_time;
520
521 EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
522 pi2_pong_count + kQueueIndexOffset);
523
524 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
525 chrono::microseconds(200) +
526 pi2_pong_count * chrono::milliseconds(10) +
527 monotonic_clock::epoch());
528 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
529 chrono::microseconds(200) +
530 pi2_pong_count * chrono::milliseconds(10) +
531 realtime_clock::epoch());
532
533 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
534 pi2_event_loop->context().monotonic_event_time);
535 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
536 pi2_event_loop->context().realtime_event_time);
537
538 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
539 ++pi2_pong_count;
540 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
541 });
542
543 log_reader_factory.Run();
544 EXPECT_EQ(pi1_ping_count, 2010);
545 EXPECT_EQ(pi2_ping_count, 2010);
546 EXPECT_EQ(pi1_pong_count, 2010);
547 EXPECT_EQ(pi2_pong_count, 2010);
548
549 reader.Deregister();
550}
551
552// Test that if we feed the replay with a mismatched node list that we die on
553// the LogReader constructor.
554TEST_P(MultinodeLoggerDeathTest, MultiNodeBadReplayConfig) {
555 time_converter_.StartEqual();
556 {
557 LoggerState pi1_logger = MakeLogger(pi1_);
558 LoggerState pi2_logger = MakeLogger(pi2_);
559
560 event_loop_factory_.RunFor(chrono::milliseconds(95));
561
562 StartLogger(&pi1_logger);
563 StartLogger(&pi2_logger);
564
565 event_loop_factory_.RunFor(chrono::milliseconds(20000));
566 }
567
568 // Test that, if we add an additional node to the replay config that the
569 // logger complains about the mismatch in number of nodes.
570 FlatbufferDetachedBuffer<Configuration> extra_nodes_config =
571 configuration::MergeWithConfig(&config_.message(), R"({
572 "nodes": [
573 {
574 "name": "extra-node"
575 }
576 ]
577 }
578 )");
579
580 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
581 EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
582 "Log file and replay config need to have matching nodes lists.");
583}
584
585// Tests that we can read log files where they don't start at the same monotonic
586// time.
587TEST_P(MultinodeLoggerTest, StaggeredStart) {
588 time_converter_.StartEqual();
589 std::vector<std::string> actual_filenames;
590
591 {
592 LoggerState pi1_logger = MakeLogger(pi1_);
593 LoggerState pi2_logger = MakeLogger(pi2_);
594
595 event_loop_factory_.RunFor(chrono::milliseconds(95));
596
597 StartLogger(&pi1_logger);
598
599 event_loop_factory_.RunFor(chrono::milliseconds(200));
600
601 StartLogger(&pi2_logger);
602
603 event_loop_factory_.RunFor(chrono::milliseconds(20000));
604 pi1_logger.AppendAllFilenames(&actual_filenames);
605 pi2_logger.AppendAllFilenames(&actual_filenames);
606 }
607
608 // Since we delay starting pi2, it already knows about all the timestamps so
609 // we don't end up with extra parts.
610 LogReader reader(SortParts(actual_filenames));
611
612 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
613 log_reader_factory.set_send_delay(chrono::microseconds(0));
614
615 // This sends out the fetched messages and advances time to the start of the
616 // log file.
617 reader.Register(&log_reader_factory);
618
619 const Node *pi1 =
620 configuration::GetNode(log_reader_factory.configuration(), "pi1");
621 const Node *pi2 =
622 configuration::GetNode(log_reader_factory.configuration(), "pi2");
623
624 EXPECT_THAT(reader.LoggedNodes(),
625 ::testing::ElementsAre(
626 configuration::GetNode(reader.logged_configuration(), pi1),
627 configuration::GetNode(reader.logged_configuration(), pi2)));
628
629 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
630
631 std::unique_ptr<EventLoop> pi1_event_loop =
632 log_reader_factory.MakeEventLoop("test", pi1);
633 std::unique_ptr<EventLoop> pi2_event_loop =
634 log_reader_factory.MakeEventLoop("test", pi2);
635
636 int pi1_ping_count = 30;
637 int pi2_ping_count = 30;
638 int pi1_pong_count = 30;
639 int pi2_pong_count = 30;
640
641 // Confirm that the ping value matches.
642 pi1_event_loop->MakeWatcher(
643 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
644 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
645 << pi1_event_loop->context().monotonic_remote_time << " -> "
646 << pi1_event_loop->context().monotonic_event_time;
647 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
648
649 ++pi1_ping_count;
650 });
651 pi2_event_loop->MakeWatcher(
652 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
653 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
654 << pi2_event_loop->context().monotonic_remote_time << " -> "
655 << pi2_event_loop->context().monotonic_event_time;
656 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
657
658 ++pi2_ping_count;
659 });
660
661 // Confirm that the ping and pong counts both match, and the value also
662 // matches.
663 pi1_event_loop->MakeWatcher(
664 "/test", [&pi1_event_loop, &pi1_ping_count,
665 &pi1_pong_count](const examples::Pong &pong) {
666 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
667 << pi1_event_loop->context().monotonic_remote_time << " -> "
668 << pi1_event_loop->context().monotonic_event_time;
669
670 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
671 ++pi1_pong_count;
672 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
673 });
674 pi2_event_loop->MakeWatcher(
675 "/test", [&pi2_event_loop, &pi2_ping_count,
676 &pi2_pong_count](const examples::Pong &pong) {
677 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
678 << pi2_event_loop->context().monotonic_remote_time << " -> "
679 << pi2_event_loop->context().monotonic_event_time;
680
681 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
682 ++pi2_pong_count;
683 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
684 });
685
686 log_reader_factory.Run();
687 EXPECT_EQ(pi1_ping_count, 2030);
688 EXPECT_EQ(pi2_ping_count, 2030);
689 EXPECT_EQ(pi1_pong_count, 2030);
690 EXPECT_EQ(pi2_pong_count, 2030);
691
692 reader.Deregister();
693}
694
695// Tests that we can read log files where the monotonic clocks drift and don't
696// match correctly. While we are here, also test that different ending times
697// also is readable.
698TEST_P(MultinodeLoggerTest, MismatchedClocks) {
699 // TODO(austin): Negate...
700 const chrono::nanoseconds initial_pi2_offset = chrono::seconds(1000);
701
702 time_converter_.AddMonotonic(
703 {BootTimestamp::epoch(), BootTimestamp::epoch() + initial_pi2_offset});
704 // Wait for 95 ms, (~0.1 seconds - 1/2 of the ping/pong period), and set the
705 // skew to be 200 uS/s
706 const chrono::nanoseconds startup_sleep1 = time_converter_.AddMonotonic(
707 {chrono::milliseconds(95),
708 chrono::milliseconds(95) - chrono::nanoseconds(200) * 95});
709 // Run another 200 ms to have one logger start first.
710 const chrono::nanoseconds startup_sleep2 = time_converter_.AddMonotonic(
711 {chrono::milliseconds(200), chrono::milliseconds(200)});
712 // Slew one way then the other at the same 200 uS/S slew rate. Make sure we
713 // go far enough to cause problems if this isn't accounted for.
714 const chrono::nanoseconds logger_run1 = time_converter_.AddMonotonic(
715 {chrono::milliseconds(20000),
716 chrono::milliseconds(20000) - chrono::nanoseconds(200) * 20000});
717 const chrono::nanoseconds logger_run2 = time_converter_.AddMonotonic(
718 {chrono::milliseconds(40000),
719 chrono::milliseconds(40000) + chrono::nanoseconds(200) * 40000});
720 const chrono::nanoseconds logger_run3 = time_converter_.AddMonotonic(
721 {chrono::milliseconds(400), chrono::milliseconds(400)});
722
723 {
724 LoggerState pi2_logger = MakeLogger(pi2_);
725
726 LOG(INFO) << "pi2 times: " << pi2_->monotonic_now() << " "
727 << pi2_->realtime_now() << " distributed "
728 << pi2_->ToDistributedClock(pi2_->monotonic_now());
729
730 LOG(INFO) << "pi2_ times: " << pi2_->monotonic_now() << " "
731 << pi2_->realtime_now() << " distributed "
732 << pi2_->ToDistributedClock(pi2_->monotonic_now());
733
734 event_loop_factory_.RunFor(startup_sleep1);
735
736 StartLogger(&pi2_logger);
737
738 event_loop_factory_.RunFor(startup_sleep2);
739
740 {
741 // Run pi1's logger for only part of the time.
742 LoggerState pi1_logger = MakeLogger(pi1_);
743
744 StartLogger(&pi1_logger);
745 event_loop_factory_.RunFor(logger_run1);
746
747 // Make sure we slewed time far enough so that the difference is greater
748 // than the network delay. This confirms that if we sort incorrectly, it
749 // would show in the results.
750 EXPECT_LT(
751 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
752 -event_loop_factory_.send_delay() -
753 event_loop_factory_.network_delay());
754
755 event_loop_factory_.RunFor(logger_run2);
756
757 // And now check that we went far enough the other way to make sure we
758 // cover both problems.
759 EXPECT_GT(
760 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
761 event_loop_factory_.send_delay() +
762 event_loop_factory_.network_delay());
763 }
764
765 // And log a bit more on pi2.
766 event_loop_factory_.RunFor(logger_run3);
767 }
768
769 LogReader reader(
770 SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 3, 2)));
771
772 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
773 log_reader_factory.set_send_delay(chrono::microseconds(0));
774
775 const Node *pi1 =
776 configuration::GetNode(log_reader_factory.configuration(), "pi1");
777 const Node *pi2 =
778 configuration::GetNode(log_reader_factory.configuration(), "pi2");
779
780 // This sends out the fetched messages and advances time to the start of the
781 // log file.
782 reader.Register(&log_reader_factory);
783
784 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
785 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
786 LOG(INFO) << "now pi1 "
787 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
788 LOG(INFO) << "now pi2 "
789 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
790
791 LOG(INFO) << "Done registering (pi1) "
792 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
793 << " "
794 << log_reader_factory.GetNodeEventLoopFactory(pi1)->realtime_now();
795 LOG(INFO) << "Done registering (pi2) "
796 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now()
797 << " "
798 << log_reader_factory.GetNodeEventLoopFactory(pi2)->realtime_now();
799
800 EXPECT_THAT(reader.LoggedNodes(),
801 ::testing::ElementsAre(
802 configuration::GetNode(reader.logged_configuration(), pi1),
803 configuration::GetNode(reader.logged_configuration(), pi2)));
804
805 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
806
807 std::unique_ptr<EventLoop> pi1_event_loop =
808 log_reader_factory.MakeEventLoop("test", pi1);
809 std::unique_ptr<EventLoop> pi2_event_loop =
810 log_reader_factory.MakeEventLoop("test", pi2);
811
812 int pi1_ping_count = 30;
813 int pi2_ping_count = 30;
814 int pi1_pong_count = 30;
815 int pi2_pong_count = 30;
816
817 // Confirm that the ping value matches.
818 pi1_event_loop->MakeWatcher(
819 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
820 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
821 << pi1_event_loop->context().monotonic_remote_time << " -> "
822 << pi1_event_loop->context().monotonic_event_time;
823 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
824
825 ++pi1_ping_count;
826 });
827 pi2_event_loop->MakeWatcher(
828 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
829 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
830 << pi2_event_loop->context().monotonic_remote_time << " -> "
831 << pi2_event_loop->context().monotonic_event_time;
832 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
833
834 ++pi2_ping_count;
835 });
836
837 // Confirm that the ping and pong counts both match, and the value also
838 // matches.
839 pi1_event_loop->MakeWatcher(
840 "/test", [&pi1_event_loop, &pi1_ping_count,
841 &pi1_pong_count](const examples::Pong &pong) {
842 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
843 << pi1_event_loop->context().monotonic_remote_time << " -> "
844 << pi1_event_loop->context().monotonic_event_time;
845
846 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
847 ++pi1_pong_count;
848 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
849 });
850 pi2_event_loop->MakeWatcher(
851 "/test", [&pi2_event_loop, &pi2_ping_count,
852 &pi2_pong_count](const examples::Pong &pong) {
853 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
854 << pi2_event_loop->context().monotonic_remote_time << " -> "
855 << pi2_event_loop->context().monotonic_event_time;
856
857 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
858 ++pi2_pong_count;
859 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
860 });
861
862 log_reader_factory.Run();
863 EXPECT_EQ(pi1_ping_count, 6030);
864 EXPECT_EQ(pi2_ping_count, 6030);
865 EXPECT_EQ(pi1_pong_count, 6030);
866 EXPECT_EQ(pi2_pong_count, 6030);
867
868 reader.Deregister();
869}
870
871// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
872TEST_P(MultinodeLoggerTest, SortParts) {
873 time_converter_.StartEqual();
874 // Make a bunch of parts.
875 {
876 LoggerState pi1_logger = MakeLogger(pi1_);
877 LoggerState pi2_logger = MakeLogger(pi2_);
878
879 event_loop_factory_.RunFor(chrono::milliseconds(95));
880
881 StartLogger(&pi1_logger);
882 StartLogger(&pi2_logger);
883
884 event_loop_factory_.RunFor(chrono::milliseconds(2000));
885 }
886
887 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
888 VerifyParts(sorted_parts);
889}
890
891// Tests that we can sort a bunch of parts with an empty part. We should ignore
892// it and remove it from the sorted list.
893TEST_P(MultinodeLoggerTest, SortEmptyParts) {
894 time_converter_.StartEqual();
895 // Make a bunch of parts.
896 {
897 LoggerState pi1_logger = MakeLogger(pi1_);
898 LoggerState pi2_logger = MakeLogger(pi2_);
899
900 event_loop_factory_.RunFor(chrono::milliseconds(95));
901
902 StartLogger(&pi1_logger);
903 StartLogger(&pi2_logger);
904
905 event_loop_factory_.RunFor(chrono::milliseconds(2000));
906 }
907
908 // TODO(austin): Should we flip out if the file can't open?
909 const std::string kEmptyFile("foobarinvalidfiledoesnotexist" + Extension());
910
911 aos::util::WriteStringToFileOrDie(kEmptyFile, "");
912 logfiles_.emplace_back(kEmptyFile);
913
914 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
915 VerifyParts(sorted_parts, {kEmptyFile});
916}
917
918// Tests that we can sort a bunch of parts with the end missing off a
919// file. We should use the part we can read.
920TEST_P(MultinodeLoggerTest, SortTruncatedParts) {
921 std::vector<std::string> actual_filenames;
922 time_converter_.StartEqual();
923 // Make a bunch of parts.
924 {
925 LoggerState pi1_logger = MakeLogger(pi1_);
926 LoggerState pi2_logger = MakeLogger(pi2_);
927
928 event_loop_factory_.RunFor(chrono::milliseconds(95));
929
930 StartLogger(&pi1_logger);
931 StartLogger(&pi2_logger);
932
933 event_loop_factory_.RunFor(chrono::milliseconds(2000));
934
935 pi1_logger.AppendAllFilenames(&actual_filenames);
936 pi2_logger.AppendAllFilenames(&actual_filenames);
937 }
938
939 ASSERT_THAT(actual_filenames,
940 ::testing::UnorderedElementsAreArray(logfiles_));
941
942 // Strip off the end of one of the files. Pick one with a lot of data.
943 // For snappy, needs to have enough data to be >1 chunk of compressed data so
944 // that we don't corrupt the entire log part.
945 ::std::string compressed_contents =
946 aos::util::ReadFileToStringOrDie(logfiles_[4]);
947
948 aos::util::WriteStringToFileOrDie(
949 logfiles_[4],
950 compressed_contents.substr(0, compressed_contents.size() - 100));
951
952 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
953 VerifyParts(sorted_parts);
954}
955
956// Tests that if we remap a remapped channel, it shows up correctly.
957TEST_P(MultinodeLoggerTest, RemapLoggedChannel) {
958 time_converter_.StartEqual();
959 {
960 LoggerState pi1_logger = MakeLogger(pi1_);
961 LoggerState pi2_logger = MakeLogger(pi2_);
962
963 event_loop_factory_.RunFor(chrono::milliseconds(95));
964
965 StartLogger(&pi1_logger);
966 StartLogger(&pi2_logger);
967
968 event_loop_factory_.RunFor(chrono::milliseconds(20000));
969 }
970
971 LogReader reader(SortParts(logfiles_));
972
973 // Remap just on pi1.
974 reader.RemapLoggedChannel<aos::timing::Report>(
975 "/aos", configuration::GetNode(reader.configuration(), "pi1"));
976
977 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
978 log_reader_factory.set_send_delay(chrono::microseconds(0));
979
980 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
981 // Note: An extra channel gets remapped automatically due to a timestamp
982 // channel being LOCAL_LOGGER'd.
983 ASSERT_EQ(remapped_channels.size(), std::get<0>(GetParam()).shared ? 1u : 2u);
984 EXPECT_EQ(remapped_channels[0]->name()->string_view(), "/original/pi1/aos");
985 EXPECT_EQ(remapped_channels[0]->type()->string_view(), "aos.timing.Report");
986 if (!std::get<0>(GetParam()).shared) {
987 EXPECT_EQ(remapped_channels[1]->name()->string_view(),
988 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
989 "aos-message_bridge-Timestamp");
990 EXPECT_EQ(remapped_channels[1]->type()->string_view(),
991 "aos.message_bridge.RemoteMessage");
992 }
993
994 reader.Register(&log_reader_factory);
995
996 const Node *pi1 =
997 configuration::GetNode(log_reader_factory.configuration(), "pi1");
998 const Node *pi2 =
999 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1000
1001 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1002 // else should have moved.
1003 std::unique_ptr<EventLoop> pi1_event_loop =
1004 log_reader_factory.MakeEventLoop("test", pi1);
1005 pi1_event_loop->SkipTimingReport();
1006 std::unique_ptr<EventLoop> full_pi1_event_loop =
1007 log_reader_factory.MakeEventLoop("test", pi1);
1008 full_pi1_event_loop->SkipTimingReport();
1009 std::unique_ptr<EventLoop> pi2_event_loop =
1010 log_reader_factory.MakeEventLoop("test", pi2);
1011 pi2_event_loop->SkipTimingReport();
1012
1013 MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
1014 "/aos");
1015 MessageCounter<aos::timing::Report> full_pi1_timing_report(
1016 full_pi1_event_loop.get(), "/pi1/aos");
1017 MessageCounter<aos::timing::Report> pi1_original_timing_report(
1018 pi1_event_loop.get(), "/original/aos");
1019 MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
1020 full_pi1_event_loop.get(), "/original/pi1/aos");
1021 MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
1022 "/aos");
1023
1024 log_reader_factory.Run();
1025
1026 EXPECT_EQ(pi1_timing_report.count(), 0u);
1027 EXPECT_EQ(full_pi1_timing_report.count(), 0u);
1028 EXPECT_NE(pi1_original_timing_report.count(), 0u);
1029 EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
1030 EXPECT_NE(pi2_timing_report.count(), 0u);
1031
1032 reader.Deregister();
1033}
1034
1035// Tests that we can remap a forwarded channel as well.
1036TEST_P(MultinodeLoggerTest, RemapForwardedLoggedChannel) {
1037 time_converter_.StartEqual();
1038 {
1039 LoggerState pi1_logger = MakeLogger(pi1_);
1040 LoggerState pi2_logger = MakeLogger(pi2_);
1041
1042 event_loop_factory_.RunFor(chrono::milliseconds(95));
1043
1044 StartLogger(&pi1_logger);
1045 StartLogger(&pi2_logger);
1046
1047 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1048 }
1049
1050 LogReader reader(SortParts(logfiles_));
1051
1052 reader.RemapLoggedChannel<examples::Ping>("/test");
1053
1054 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1055 log_reader_factory.set_send_delay(chrono::microseconds(0));
1056
1057 reader.Register(&log_reader_factory);
1058
1059 const Node *pi1 =
1060 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1061 const Node *pi2 =
1062 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1063
1064 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1065 // else should have moved.
1066 std::unique_ptr<EventLoop> pi1_event_loop =
1067 log_reader_factory.MakeEventLoop("test", pi1);
1068 pi1_event_loop->SkipTimingReport();
1069 std::unique_ptr<EventLoop> full_pi1_event_loop =
1070 log_reader_factory.MakeEventLoop("test", pi1);
1071 full_pi1_event_loop->SkipTimingReport();
1072 std::unique_ptr<EventLoop> pi2_event_loop =
1073 log_reader_factory.MakeEventLoop("test", pi2);
1074 pi2_event_loop->SkipTimingReport();
1075
1076 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1077 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1078 MessageCounter<examples::Ping> pi1_original_ping(pi1_event_loop.get(),
1079 "/original/test");
1080 MessageCounter<examples::Ping> pi2_original_ping(pi2_event_loop.get(),
1081 "/original/test");
1082
1083 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1084 pi1_original_ping_timestamp;
1085 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1086 pi1_ping_timestamp;
1087 if (!shared()) {
1088 pi1_original_ping_timestamp =
1089 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1090 pi1_event_loop.get(),
1091 "/pi1/aos/remote_timestamps/pi2/original/test/aos-examples-Ping");
1092 pi1_ping_timestamp =
1093 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1094 pi1_event_loop.get(),
1095 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1096 }
1097
1098 log_reader_factory.Run();
1099
1100 EXPECT_EQ(pi1_ping.count(), 0u);
1101 EXPECT_EQ(pi2_ping.count(), 0u);
1102 EXPECT_NE(pi1_original_ping.count(), 0u);
1103 EXPECT_NE(pi2_original_ping.count(), 0u);
1104 if (!shared()) {
1105 EXPECT_NE(pi1_original_ping_timestamp->count(), 0u);
1106 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1107 }
1108
1109 reader.Deregister();
1110}
1111
1112// Tests that we observe all the same events in log replay (for a given node)
1113// whether we just register an event loop for that node or if we register a full
1114// event loop factory.
1115TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
1116 time_converter_.StartEqual();
1117 constexpr chrono::milliseconds kStartupDelay(95);
1118 {
1119 LoggerState pi1_logger = MakeLogger(pi1_);
1120 LoggerState pi2_logger = MakeLogger(pi2_);
1121
1122 event_loop_factory_.RunFor(kStartupDelay);
1123
1124 StartLogger(&pi1_logger);
1125 StartLogger(&pi2_logger);
1126
1127 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1128 }
1129
1130 LogReader full_reader(SortParts(logfiles_));
1131 LogReader single_node_reader(SortParts(logfiles_));
1132
1133 SimulatedEventLoopFactory full_factory(full_reader.configuration());
1134 SimulatedEventLoopFactory single_node_factory(
1135 single_node_reader.configuration());
1136 single_node_factory.SkipTimingReport();
1137 single_node_factory.DisableStatistics();
1138 std::unique_ptr<EventLoop> replay_event_loop =
1139 single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
1140 "log_reader");
1141
1142 full_reader.Register(&full_factory);
1143 single_node_reader.Register(replay_event_loop.get());
1144
1145 const Node *full_pi1 =
1146 configuration::GetNode(full_factory.configuration(), "pi1");
1147
1148 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1149 // else should have moved.
1150 std::unique_ptr<EventLoop> full_event_loop =
1151 full_factory.MakeEventLoop("test", full_pi1);
1152 full_event_loop->SkipTimingReport();
1153 full_event_loop->SkipAosLog();
1154 // maps are indexed on channel index.
1155 // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
1156 std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
1157 observed_messages;
1158 std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
1159 for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
1160 ++ii) {
1161 const Channel *channel =
1162 full_event_loop->configuration()->channels()->Get(ii);
1163 // We currently don't support replaying remote timestamp channels in
1164 // realtime replay (unless the remote timestamp channel was not NOT_LOGGED,
1165 // in which case it gets auto-remapped and replayed on a /original channel).
1166 if (channel->name()->string_view().find("remote_timestamp") !=
1167 std::string_view::npos &&
1168 channel->name()->string_view().find("/original") ==
1169 std::string_view::npos) {
1170 continue;
1171 }
1172 if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
1173 observed_messages[ii] = {};
1174 fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
1175 full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
1176 if (fetchers[ii]->Fetch()) {
1177 observed_messages[ii].push_back(std::make_pair(
1178 fetchers[ii]->context().monotonic_event_time, true));
1179 }
1180 });
1181 full_event_loop->MakeRawNoArgWatcher(
1182 channel, [ii, &observed_messages](const Context &context) {
1183 observed_messages[ii].push_back(
1184 std::make_pair(context.monotonic_event_time, false));
1185 });
1186 }
1187 }
1188
1189 full_factory.Run();
1190 fetchers.clear();
1191 full_reader.Deregister();
1192
1193 const Node *single_node_pi1 =
1194 configuration::GetNode(single_node_factory.configuration(), "pi1");
1195 std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
1196
1197 std::unique_ptr<EventLoop> single_node_event_loop =
1198 single_node_factory.MakeEventLoop("test", single_node_pi1);
1199 single_node_event_loop->SkipTimingReport();
1200 single_node_event_loop->SkipAosLog();
1201 for (size_t ii = 0;
1202 ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
1203 const Channel *channel =
1204 single_node_event_loop->configuration()->channels()->Get(ii);
1205 single_node_factory.DisableForwarding(channel);
1206 if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
1207 single_node_fetchers[ii] =
1208 single_node_event_loop->MakeRawFetcher(channel);
1209 single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
1210 EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
1211 << "Single EventLoop replay doesn't support pre-loading fetchers. "
1212 << configuration::StrippedChannelToString(channel);
1213 });
1214 single_node_event_loop->MakeRawNoArgWatcher(
1215 channel, [ii, &observed_messages, channel,
1216 kStartupDelay](const Context &context) {
1217 if (observed_messages[ii].empty()) {
1218 FAIL() << "Observed extra message at "
1219 << context.monotonic_event_time << " on "
1220 << configuration::StrippedChannelToString(channel);
1221 return;
1222 }
1223 const std::pair<monotonic_clock::time_point, bool> &message =
1224 observed_messages[ii].front();
1225 if (message.second) {
1226 EXPECT_LE(message.first,
1227 context.monotonic_event_time + kStartupDelay)
1228 << "Mismatched message times " << context.monotonic_event_time
1229 << " and " << message.first << " on "
1230 << configuration::StrippedChannelToString(channel);
1231 } else {
1232 EXPECT_EQ(message.first,
1233 context.monotonic_event_time + kStartupDelay)
1234 << "Mismatched message times " << context.monotonic_event_time
1235 << " and " << message.first << " on "
1236 << configuration::StrippedChannelToString(channel);
1237 }
1238 observed_messages[ii].erase(observed_messages[ii].begin());
1239 });
1240 }
1241 }
1242
1243 single_node_factory.Run();
1244
1245 single_node_fetchers.clear();
1246
1247 single_node_reader.Deregister();
1248
1249 for (const auto &pair : observed_messages) {
1250 EXPECT_TRUE(pair.second.empty())
1251 << "Missed " << pair.second.size() << " messages on "
1252 << configuration::StrippedChannelToString(
1253 single_node_event_loop->configuration()->channels()->Get(
1254 pair.first));
1255 }
1256}
1257
1258// Tests that we properly recreate forwarded timestamps when replaying a log.
1259// This should be enough that we can then re-run the logger and get a valid log
1260// back.
1261TEST_P(MultinodeLoggerTest, MessageHeader) {
1262 time_converter_.StartEqual();
1263 {
1264 LoggerState pi1_logger = MakeLogger(pi1_);
1265 LoggerState pi2_logger = MakeLogger(pi2_);
1266
1267 event_loop_factory_.RunFor(chrono::milliseconds(95));
1268
1269 StartLogger(&pi1_logger);
1270 StartLogger(&pi2_logger);
1271
1272 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1273 }
1274
1275 LogReader reader(SortParts(logfiles_));
1276
1277 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1278 log_reader_factory.set_send_delay(chrono::microseconds(0));
1279
1280 // This sends out the fetched messages and advances time to the start of the
1281 // log file.
1282 reader.Register(&log_reader_factory);
1283
1284 const Node *pi1 =
1285 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1286 const Node *pi2 =
1287 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1288
1289 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1290 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1291 LOG(INFO) << "now pi1 "
1292 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1293 LOG(INFO) << "now pi2 "
1294 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1295
1296 EXPECT_THAT(reader.LoggedNodes(),
1297 ::testing::ElementsAre(
1298 configuration::GetNode(reader.logged_configuration(), pi1),
1299 configuration::GetNode(reader.logged_configuration(), pi2)));
1300
1301 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1302
1303 std::unique_ptr<EventLoop> pi1_event_loop =
1304 log_reader_factory.MakeEventLoop("test", pi1);
1305 std::unique_ptr<EventLoop> pi2_event_loop =
1306 log_reader_factory.MakeEventLoop("test", pi2);
1307
1308 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
1309 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1310 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
1311 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1312
1313 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1314 pi1_event_loop->MakeFetcher<examples::Ping>("/test");
1315 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1316 pi2_event_loop->MakeFetcher<examples::Ping>("/test");
1317
1318 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
1319 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1320 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
1321 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1322
1323 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1324 pi2_event_loop->MakeFetcher<examples::Pong>("/test");
1325 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1326 pi1_event_loop->MakeFetcher<examples::Pong>("/test");
1327
1328 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
1329 pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
1330 const size_t ping_timestamp_channel = configuration::ChannelIndex(
1331 pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
1332
1333 const size_t pi2_timestamp_channel = configuration::ChannelIndex(
1334 pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
1335 const size_t pong_timestamp_channel = configuration::ChannelIndex(
1336 pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
1337
1338 const chrono::nanoseconds network_delay = event_loop_factory_.network_delay();
1339 const chrono::nanoseconds send_delay = event_loop_factory_.send_delay();
1340
1341 for (std::pair<int, std::string> channel :
1342 shared()
1343 ? std::vector<
1344 std::pair<int, std::string>>{{-1,
1345 "/aos/remote_timestamps/pi2"}}
1346 : std::vector<std::pair<int, std::string>>{
1347 {pi1_timestamp_channel,
1348 "/aos/remote_timestamps/pi2/pi1/aos/"
1349 "aos-message_bridge-Timestamp"},
1350 {ping_timestamp_channel,
1351 "/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
1352 pi1_event_loop->MakeWatcher(
1353 channel.second,
1354 [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
1355 ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
1356 &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
1357 &ping_on_pi2_fetcher, network_delay, send_delay,
1358 channel_index = channel.first](const RemoteMessage &header) {
1359 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1360 chrono::nanoseconds(header.monotonic_sent_time()));
1361 const aos::realtime_clock::time_point header_realtime_sent_time(
1362 chrono::nanoseconds(header.realtime_sent_time()));
1363 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1364 chrono::nanoseconds(header.monotonic_remote_time()));
1365 const aos::realtime_clock::time_point header_realtime_remote_time(
1366 chrono::nanoseconds(header.realtime_remote_time()));
1367
1368 if (channel_index != -1) {
1369 ASSERT_EQ(channel_index, header.channel_index());
1370 }
1371
1372 const Context *pi1_context = nullptr;
1373 const Context *pi2_context = nullptr;
1374
1375 if (header.channel_index() == pi1_timestamp_channel) {
1376 ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
1377 ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
1378 pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
1379 pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
1380 } else if (header.channel_index() == ping_timestamp_channel) {
1381 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
1382 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
1383 pi1_context = &ping_on_pi1_fetcher.context();
1384 pi2_context = &ping_on_pi2_fetcher.context();
1385 } else {
1386 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1387 << configuration::CleanedChannelToString(
1388 pi1_event_loop->configuration()->channels()->Get(
1389 header.channel_index()));
1390 }
1391
1392 ASSERT_TRUE(header.has_boot_uuid());
1393 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1394 pi2_event_loop->boot_uuid());
1395
1396 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
1397 EXPECT_EQ(pi2_context->remote_queue_index,
1398 header.remote_queue_index());
1399 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
1400
1401 EXPECT_EQ(pi2_context->monotonic_event_time,
1402 header_monotonic_sent_time);
1403 EXPECT_EQ(pi2_context->realtime_event_time,
1404 header_realtime_sent_time);
1405 EXPECT_EQ(pi2_context->realtime_remote_time,
1406 header_realtime_remote_time);
1407 EXPECT_EQ(pi2_context->monotonic_remote_time,
1408 header_monotonic_remote_time);
1409
1410 EXPECT_EQ(pi1_context->realtime_event_time,
1411 header_realtime_remote_time);
1412 EXPECT_EQ(pi1_context->monotonic_event_time,
1413 header_monotonic_remote_time);
1414
1415 // Time estimation isn't perfect, but we know the clocks were
1416 // identical when logged, so we know when this should have come back.
1417 // Confirm we got it when we expected.
1418 EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
1419 pi1_context->monotonic_event_time + 2 * network_delay +
1420 send_delay);
1421 });
1422 }
1423 for (std::pair<int, std::string> channel :
1424 shared()
1425 ? std::vector<
1426 std::pair<int, std::string>>{{-1,
1427 "/aos/remote_timestamps/pi1"}}
1428 : std::vector<std::pair<int, std::string>>{
1429 {pi2_timestamp_channel,
1430 "/aos/remote_timestamps/pi1/pi2/aos/"
1431 "aos-message_bridge-Timestamp"}}) {
1432 pi2_event_loop->MakeWatcher(
1433 channel.second,
1434 [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
1435 pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
1436 &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
1437 &pong_on_pi1_fetcher, network_delay, send_delay,
1438 channel_index = channel.first](const RemoteMessage &header) {
1439 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1440 chrono::nanoseconds(header.monotonic_sent_time()));
1441 const aos::realtime_clock::time_point header_realtime_sent_time(
1442 chrono::nanoseconds(header.realtime_sent_time()));
1443 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1444 chrono::nanoseconds(header.monotonic_remote_time()));
1445 const aos::realtime_clock::time_point header_realtime_remote_time(
1446 chrono::nanoseconds(header.realtime_remote_time()));
1447
1448 if (channel_index != -1) {
1449 ASSERT_EQ(channel_index, header.channel_index());
1450 }
1451
1452 const Context *pi2_context = nullptr;
1453 const Context *pi1_context = nullptr;
1454
1455 if (header.channel_index() == pi2_timestamp_channel) {
1456 ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
1457 ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
1458 pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
1459 pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
1460 } else if (header.channel_index() == pong_timestamp_channel) {
1461 ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
1462 ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
1463 pi2_context = &pong_on_pi2_fetcher.context();
1464 pi1_context = &pong_on_pi1_fetcher.context();
1465 } else {
1466 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1467 << configuration::CleanedChannelToString(
1468 pi2_event_loop->configuration()->channels()->Get(
1469 header.channel_index()));
1470 }
1471
1472 ASSERT_TRUE(header.has_boot_uuid());
1473 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1474 pi1_event_loop->boot_uuid());
1475
1476 EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
1477 EXPECT_EQ(pi1_context->remote_queue_index,
1478 header.remote_queue_index());
1479 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
1480
1481 EXPECT_EQ(pi1_context->monotonic_event_time,
1482 header_monotonic_sent_time);
1483 EXPECT_EQ(pi1_context->realtime_event_time,
1484 header_realtime_sent_time);
1485 EXPECT_EQ(pi1_context->realtime_remote_time,
1486 header_realtime_remote_time);
1487 EXPECT_EQ(pi1_context->monotonic_remote_time,
1488 header_monotonic_remote_time);
1489
1490 EXPECT_EQ(pi2_context->realtime_event_time,
1491 header_realtime_remote_time);
1492 EXPECT_EQ(pi2_context->monotonic_event_time,
1493 header_monotonic_remote_time);
1494
1495 // Time estimation isn't perfect, but we know the clocks were
1496 // identical when logged, so we know when this should have come back.
1497 // Confirm we got it when we expected.
1498 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
1499 pi2_context->monotonic_event_time + 2 * network_delay +
1500 send_delay);
1501 });
1502 }
1503
1504 // And confirm we can re-create a log again, while checking the contents.
1505 {
1506 LoggerState pi1_logger = MakeLogger(
1507 log_reader_factory.GetNodeEventLoopFactory("pi1"), &log_reader_factory);
1508 LoggerState pi2_logger = MakeLogger(
1509 log_reader_factory.GetNodeEventLoopFactory("pi2"), &log_reader_factory);
1510
1511 StartLogger(&pi1_logger, tmp_dir_ + "/relogged1");
1512 StartLogger(&pi2_logger, tmp_dir_ + "/relogged2");
1513
1514 log_reader_factory.Run();
1515 }
1516
1517 reader.Deregister();
1518
1519 // And verify that we can run the LogReader over the relogged files without
1520 // hitting any fatal errors.
1521 {
1522 LogReader relogged_reader(SortParts(MakeLogFiles(
1523 tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 3, 3, true)));
1524 relogged_reader.Register();
1525
1526 relogged_reader.event_loop_factory()->Run();
1527 }
1528 // And confirm that we can read the logged file using the reader's
1529 // configuration.
1530 {
1531 LogReader relogged_reader(
1532 SortParts(MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2",
1533 3, 3, true)),
1534 reader.configuration());
1535 relogged_reader.Register();
1536
1537 relogged_reader.event_loop_factory()->Run();
1538 }
1539}
1540
1541// Tests that we properly populate and extract the logger_start time by setting
1542// up a clock difference between 2 nodes and looking at the resulting parts.
1543TEST_P(MultinodeLoggerTest, LoggerStartTime) {
1544 std::vector<std::string> actual_filenames;
1545 time_converter_.AddMonotonic(
1546 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1547 {
1548 LoggerState pi1_logger = MakeLogger(pi1_);
1549 LoggerState pi2_logger = MakeLogger(pi2_);
1550
1551 StartLogger(&pi1_logger);
1552 StartLogger(&pi2_logger);
1553
1554 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1555
1556 pi1_logger.AppendAllFilenames(&actual_filenames);
1557 pi2_logger.AppendAllFilenames(&actual_filenames);
1558 }
1559
1560 ASSERT_THAT(actual_filenames,
1561 ::testing::UnorderedElementsAreArray(logfiles_));
1562
1563 for (const LogFile &log_file : SortParts(logfiles_)) {
1564 for (const LogParts &log_part : log_file.parts) {
1565 if (log_part.node == log_file.logger_node) {
1566 EXPECT_EQ(log_part.logger_monotonic_start_time,
1567 aos::monotonic_clock::min_time);
1568 EXPECT_EQ(log_part.logger_realtime_start_time,
1569 aos::realtime_clock::min_time);
1570 } else {
1571 const chrono::seconds offset = log_file.logger_node == "pi1"
1572 ? -chrono::seconds(1000)
1573 : chrono::seconds(1000);
1574 EXPECT_EQ(log_part.logger_monotonic_start_time,
1575 log_part.monotonic_start_time + offset);
1576 EXPECT_EQ(log_part.logger_realtime_start_time,
1577 log_file.realtime_start_time +
1578 (log_part.logger_monotonic_start_time -
1579 log_file.monotonic_start_time));
1580 }
1581 }
1582 }
1583}
1584
1585// Test that renaming the base, renames the folder.
1586TEST_P(MultinodeLoggerTest, LoggerRenameFolder) {
1587 util::UnlinkRecursive(tmp_dir_ + "/renamefolder");
1588 util::UnlinkRecursive(tmp_dir_ + "/new-good");
1589 time_converter_.AddMonotonic(
1590 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1591 logfile_base1_ = tmp_dir_ + "/renamefolder/multi_logfile1";
1592 logfile_base2_ = tmp_dir_ + "/renamefolder/multi_logfile2";
1593 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
1594 LoggerState pi1_logger = MakeLogger(pi1_);
1595 LoggerState pi2_logger = MakeLogger(pi2_);
1596
1597 StartLogger(&pi1_logger);
1598 StartLogger(&pi2_logger);
1599
1600 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1601 logfile_base1_ = tmp_dir_ + "/new-good/multi_logfile1";
1602 logfile_base2_ = tmp_dir_ + "/new-good/multi_logfile2";
1603 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
1604 ASSERT_TRUE(pi1_logger.logger->RenameLogBase(logfile_base1_));
1605 ASSERT_TRUE(pi2_logger.logger->RenameLogBase(logfile_base2_));
1606 for (auto &file : logfiles_) {
1607 struct stat s;
1608 EXPECT_EQ(0, stat(file.c_str(), &s));
1609 }
1610}
1611
1612// Test that renaming the file base dies.
1613TEST_P(MultinodeLoggerDeathTest, LoggerRenameFile) {
1614 time_converter_.AddMonotonic(
1615 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1616 util::UnlinkRecursive(tmp_dir_ + "/renamefile");
1617 logfile_base1_ = tmp_dir_ + "/renamefile/multi_logfile1";
1618 logfile_base2_ = tmp_dir_ + "/renamefile/multi_logfile2";
1619 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
1620 LoggerState pi1_logger = MakeLogger(pi1_);
1621 StartLogger(&pi1_logger);
1622 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1623 logfile_base1_ = tmp_dir_ + "/new-renamefile/new_multi_logfile1";
1624 EXPECT_DEATH({ pi1_logger.logger->RenameLogBase(logfile_base1_); },
1625 "Rename of file base from");
1626}
1627
1628// TODO(austin): We can write a test which recreates a logfile and confirms that
1629// we get it back. That is the ultimate test.
1630
1631// Tests that we properly recreate forwarded timestamps when replaying a log.
1632// This should be enough that we can then re-run the logger and get a valid log
1633// back.
1634TEST_P(MultinodeLoggerTest, RemoteReboot) {
1635 std::vector<std::string> actual_filenames;
1636
1637 const UUID pi1_boot0 = UUID::Random();
1638 const UUID pi2_boot0 = UUID::Random();
1639 const UUID pi2_boot1 = UUID::Random();
1640 {
1641 CHECK_EQ(pi1_index_, 0u);
1642 CHECK_EQ(pi2_index_, 1u);
1643
1644 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
1645 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
1646 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
1647
1648 time_converter_.AddNextTimestamp(
1649 distributed_clock::epoch(),
1650 {BootTimestamp::epoch(), BootTimestamp::epoch()});
1651 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
1652 time_converter_.AddNextTimestamp(
1653 distributed_clock::epoch() + reboot_time,
1654 {BootTimestamp::epoch() + reboot_time,
1655 BootTimestamp{
1656 .boot = 1,
1657 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
1658 }
1659
1660 {
1661 LoggerState pi1_logger = MakeLogger(pi1_);
1662
1663 event_loop_factory_.RunFor(chrono::milliseconds(95));
1664 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
1665 pi1_boot0);
1666 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
1667 pi2_boot0);
1668
1669 StartLogger(&pi1_logger);
1670
1671 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1672
1673 VLOG(1) << "Reboot now!";
1674
1675 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1676 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
1677 pi1_boot0);
1678 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
1679 pi2_boot1);
1680
1681 pi1_logger.AppendAllFilenames(&actual_filenames);
1682 }
1683
1684 std::sort(actual_filenames.begin(), actual_filenames.end());
1685 std::sort(pi1_reboot_logfiles_.begin(), pi1_reboot_logfiles_.end());
1686 ASSERT_THAT(actual_filenames,
1687 ::testing::UnorderedElementsAreArray(pi1_reboot_logfiles_));
1688
1689 // Confirm that our new oldest timestamps properly update as we reboot and
1690 // rotate.
1691 for (const std::string &file : pi1_reboot_logfiles_) {
1692 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
1693 ReadHeader(file);
1694 CHECK(log_header);
1695 if (log_header->message().has_configuration()) {
1696 continue;
1697 }
1698
1699 const monotonic_clock::time_point monotonic_start_time =
1700 monotonic_clock::time_point(
1701 chrono::nanoseconds(log_header->message().monotonic_start_time()));
1702 const UUID source_node_boot_uuid = UUID::FromString(
1703 log_header->message().source_node_boot_uuid()->string_view());
1704
1705 if (log_header->message().node()->name()->string_view() != "pi1") {
1706 // The remote message channel should rotate later and have more parts.
1707 // This only is true on the log files with shared remote messages.
1708 //
1709 // TODO(austin): I'm not the most thrilled with this test pattern... It
1710 // feels brittle in a different way.
1711 if (file.find("aos.message_bridge.RemoteMessage") == std::string::npos ||
1712 !shared()) {
1713 switch (log_header->message().parts_index()) {
1714 case 0:
1715 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1716 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
1717 break;
1718 case 1:
1719 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1720 ASSERT_EQ(monotonic_start_time,
1721 monotonic_clock::epoch() + chrono::seconds(1));
1722 break;
1723 case 2:
1724 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1725 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
1726 break;
1727 case 3:
1728 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1729 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
1730 chrono::nanoseconds(2322999462))
1731 << " on " << file;
1732 break;
1733 default:
1734 FAIL();
1735 break;
1736 }
1737 } else {
1738 switch (log_header->message().parts_index()) {
1739 case 0:
1740 case 1:
1741 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1742 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
1743 break;
1744 case 2:
1745 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1746 ASSERT_EQ(monotonic_start_time,
1747 monotonic_clock::epoch() + chrono::seconds(1));
1748 break;
1749 case 3:
1750 case 4:
1751 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1752 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
1753 break;
1754 case 5:
1755 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1756 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
1757 chrono::nanoseconds(2322999462))
1758 << " on " << file;
1759 break;
1760 default:
1761 FAIL();
1762 break;
1763 }
1764 }
1765 continue;
1766 }
1767 SCOPED_TRACE(file);
1768 SCOPED_TRACE(aos::FlatbufferToJson(
1769 *log_header, {.multi_line = true, .max_vector_size = 100}));
1770 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
1771 ASSERT_EQ(
1772 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
1773 EXPECT_EQ(
1774 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
1775 monotonic_clock::max_time.time_since_epoch().count());
1776 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
1777 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
1778 2u);
1779 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
1780 monotonic_clock::max_time.time_since_epoch().count());
1781 ASSERT_TRUE(log_header->message()
1782 .has_oldest_remote_unreliable_monotonic_timestamps());
1783 ASSERT_EQ(log_header->message()
1784 .oldest_remote_unreliable_monotonic_timestamps()
1785 ->size(),
1786 2u);
1787 EXPECT_EQ(log_header->message()
1788 .oldest_remote_unreliable_monotonic_timestamps()
1789 ->Get(0),
1790 monotonic_clock::max_time.time_since_epoch().count());
1791 ASSERT_TRUE(log_header->message()
1792 .has_oldest_local_unreliable_monotonic_timestamps());
1793 ASSERT_EQ(log_header->message()
1794 .oldest_local_unreliable_monotonic_timestamps()
1795 ->size(),
1796 2u);
1797 EXPECT_EQ(log_header->message()
1798 .oldest_local_unreliable_monotonic_timestamps()
1799 ->Get(0),
1800 monotonic_clock::max_time.time_since_epoch().count());
1801
1802 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
1803 monotonic_clock::time_point(chrono::nanoseconds(
1804 log_header->message().oldest_remote_monotonic_timestamps()->Get(
1805 1)));
1806 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
1807 monotonic_clock::time_point(chrono::nanoseconds(
1808 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
1809 const monotonic_clock::time_point
1810 oldest_remote_unreliable_monotonic_timestamps =
1811 monotonic_clock::time_point(chrono::nanoseconds(
1812 log_header->message()
1813 .oldest_remote_unreliable_monotonic_timestamps()
1814 ->Get(1)));
1815 const monotonic_clock::time_point
1816 oldest_local_unreliable_monotonic_timestamps =
1817 monotonic_clock::time_point(chrono::nanoseconds(
1818 log_header->message()
1819 .oldest_local_unreliable_monotonic_timestamps()
1820 ->Get(1)));
1821 const monotonic_clock::time_point
1822 oldest_remote_reliable_monotonic_timestamps =
1823 monotonic_clock::time_point(chrono::nanoseconds(
1824 log_header->message()
1825 .oldest_remote_reliable_monotonic_timestamps()
1826 ->Get(1)));
1827 const monotonic_clock::time_point
1828 oldest_local_reliable_monotonic_timestamps =
1829 monotonic_clock::time_point(chrono::nanoseconds(
1830 log_header->message()
1831 .oldest_local_reliable_monotonic_timestamps()
1832 ->Get(1)));
1833 const monotonic_clock::time_point
1834 oldest_logger_remote_unreliable_monotonic_timestamps =
1835 monotonic_clock::time_point(chrono::nanoseconds(
1836 log_header->message()
1837 .oldest_logger_remote_unreliable_monotonic_timestamps()
1838 ->Get(0)));
1839 const monotonic_clock::time_point
1840 oldest_logger_local_unreliable_monotonic_timestamps =
1841 monotonic_clock::time_point(chrono::nanoseconds(
1842 log_header->message()
1843 .oldest_logger_local_unreliable_monotonic_timestamps()
1844 ->Get(0)));
1845 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
1846 monotonic_clock::max_time);
1847 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
1848 monotonic_clock::max_time);
1849 switch (log_header->message().parts_index()) {
1850 case 0:
1851 EXPECT_EQ(oldest_remote_monotonic_timestamps,
1852 monotonic_clock::max_time);
1853 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
1854 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
1855 monotonic_clock::max_time);
1856 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
1857 monotonic_clock::max_time);
1858 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
1859 monotonic_clock::max_time);
1860 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
1861 monotonic_clock::max_time);
1862 break;
1863 case 1:
1864 EXPECT_EQ(oldest_remote_monotonic_timestamps,
1865 monotonic_clock::time_point(chrono::microseconds(90200)));
1866 EXPECT_EQ(oldest_local_monotonic_timestamps,
1867 monotonic_clock::time_point(chrono::microseconds(90350)));
1868 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
1869 monotonic_clock::time_point(chrono::microseconds(90200)));
1870 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
1871 monotonic_clock::time_point(chrono::microseconds(90350)));
1872 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
1873 monotonic_clock::max_time);
1874 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
1875 monotonic_clock::max_time);
1876 break;
1877 case 2:
1878 EXPECT_EQ(oldest_remote_monotonic_timestamps,
1879 monotonic_clock::time_point(chrono::microseconds(90200)))
1880 << file;
1881 EXPECT_EQ(oldest_local_monotonic_timestamps,
1882 monotonic_clock::time_point(chrono::microseconds(90350)))
1883 << file;
1884 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
1885 monotonic_clock::time_point(chrono::microseconds(90200)))
1886 << file;
1887 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
1888 monotonic_clock::time_point(chrono::microseconds(90350)))
1889 << file;
1890 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
1891 monotonic_clock::time_point(chrono::microseconds(100000)))
1892 << file;
1893 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
1894 monotonic_clock::time_point(chrono::microseconds(100150)))
1895 << file;
1896 break;
1897 case 3:
1898 EXPECT_EQ(oldest_remote_monotonic_timestamps,
1899 monotonic_clock::time_point(chrono::milliseconds(1323) +
1900 chrono::microseconds(200)));
1901 EXPECT_EQ(oldest_local_monotonic_timestamps,
1902 monotonic_clock::time_point(chrono::microseconds(10100350)));
1903 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
1904 monotonic_clock::time_point(chrono::milliseconds(1323) +
1905 chrono::microseconds(200)));
1906 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
1907 monotonic_clock::time_point(chrono::microseconds(10100350)));
1908 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
1909 monotonic_clock::max_time)
1910 << file;
1911 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
1912 monotonic_clock::max_time)
1913 << file;
1914 break;
1915 case 4:
1916 EXPECT_EQ(oldest_remote_monotonic_timestamps,
1917 monotonic_clock::time_point(chrono::milliseconds(1323) +
1918 chrono::microseconds(200)));
1919 EXPECT_EQ(oldest_local_monotonic_timestamps,
1920 monotonic_clock::time_point(chrono::microseconds(10100350)));
1921 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
1922 monotonic_clock::time_point(chrono::milliseconds(1323) +
1923 chrono::microseconds(200)));
1924 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
1925 monotonic_clock::time_point(chrono::microseconds(10100350)));
1926 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
1927 monotonic_clock::time_point(chrono::microseconds(1423000)))
1928 << file;
1929 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
1930 monotonic_clock::time_point(chrono::microseconds(10200150)))
1931 << file;
1932 break;
1933 default:
1934 FAIL();
1935 break;
1936 }
1937 }
1938
1939 // Confirm that we refuse to replay logs with missing boot uuids.
1940 {
1941 LogReader reader(SortParts(pi1_reboot_logfiles_));
1942
1943 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1944 log_reader_factory.set_send_delay(chrono::microseconds(0));
1945
1946 // This sends out the fetched messages and advances time to the start of
1947 // the log file.
1948 reader.Register(&log_reader_factory);
1949
1950 log_reader_factory.Run();
1951
1952 reader.Deregister();
1953 }
1954}
1955
1956// Tests that we can sort a log which only has timestamps from the remote
1957// because the local message_bridge_client failed to connect.
1958TEST_P(MultinodeLoggerTest, RemoteRebootOnlyTimestamps) {
1959 const UUID pi1_boot0 = UUID::Random();
1960 const UUID pi2_boot0 = UUID::Random();
1961 const UUID pi2_boot1 = UUID::Random();
1962 {
1963 CHECK_EQ(pi1_index_, 0u);
1964 CHECK_EQ(pi2_index_, 1u);
1965
1966 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
1967 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
1968 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
1969
1970 time_converter_.AddNextTimestamp(
1971 distributed_clock::epoch(),
1972 {BootTimestamp::epoch(), BootTimestamp::epoch()});
1973 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
1974 time_converter_.AddNextTimestamp(
1975 distributed_clock::epoch() + reboot_time,
1976 {BootTimestamp::epoch() + reboot_time,
1977 BootTimestamp{
1978 .boot = 1,
1979 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
1980 }
1981 pi2_->Disconnect(pi1_->node());
1982
1983 std::vector<std::string> filenames;
1984 {
1985 LoggerState pi1_logger = MakeLogger(pi1_);
1986
1987 event_loop_factory_.RunFor(chrono::milliseconds(95));
1988 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
1989 pi1_boot0);
1990 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
1991 pi2_boot0);
1992
1993 StartLogger(&pi1_logger);
1994
1995 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1996
1997 VLOG(1) << "Reboot now!";
1998
1999 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2000 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2001 pi1_boot0);
2002 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2003 pi2_boot1);
2004 pi1_logger.AppendAllFilenames(&filenames);
2005 }
2006
2007 std::sort(filenames.begin(), filenames.end());
2008
2009 // Confirm that our new oldest timestamps properly update as we reboot and
2010 // rotate.
2011 size_t timestamp_file_count = 0;
2012 for (const std::string &file : filenames) {
2013 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2014 ReadHeader(file);
2015 CHECK(log_header);
2016
2017 if (log_header->message().has_configuration()) {
2018 continue;
2019 }
2020
2021 const monotonic_clock::time_point monotonic_start_time =
2022 monotonic_clock::time_point(
2023 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2024 const UUID source_node_boot_uuid = UUID::FromString(
2025 log_header->message().source_node_boot_uuid()->string_view());
2026
2027 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2028 ASSERT_EQ(
2029 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2030 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2031 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2032 2u);
2033 ASSERT_TRUE(log_header->message()
2034 .has_oldest_remote_unreliable_monotonic_timestamps());
2035 ASSERT_EQ(log_header->message()
2036 .oldest_remote_unreliable_monotonic_timestamps()
2037 ->size(),
2038 2u);
2039 ASSERT_TRUE(log_header->message()
2040 .has_oldest_local_unreliable_monotonic_timestamps());
2041 ASSERT_EQ(log_header->message()
2042 .oldest_local_unreliable_monotonic_timestamps()
2043 ->size(),
2044 2u);
2045 ASSERT_TRUE(log_header->message()
2046 .has_oldest_remote_reliable_monotonic_timestamps());
2047 ASSERT_EQ(log_header->message()
2048 .oldest_remote_reliable_monotonic_timestamps()
2049 ->size(),
2050 2u);
2051 ASSERT_TRUE(
2052 log_header->message().has_oldest_local_reliable_monotonic_timestamps());
2053 ASSERT_EQ(log_header->message()
2054 .oldest_local_reliable_monotonic_timestamps()
2055 ->size(),
2056 2u);
2057
2058 ASSERT_TRUE(
2059 log_header->message()
2060 .has_oldest_logger_remote_unreliable_monotonic_timestamps());
2061 ASSERT_EQ(log_header->message()
2062 .oldest_logger_remote_unreliable_monotonic_timestamps()
2063 ->size(),
2064 2u);
2065 ASSERT_TRUE(log_header->message()
2066 .has_oldest_logger_local_unreliable_monotonic_timestamps());
2067 ASSERT_EQ(log_header->message()
2068 .oldest_logger_local_unreliable_monotonic_timestamps()
2069 ->size(),
2070 2u);
2071
2072 if (log_header->message().node()->name()->string_view() != "pi1") {
2073 ASSERT_TRUE(file.find("aos.message_bridge.RemoteMessage") !=
2074 std::string::npos);
2075
2076 const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
2077 ReadNthMessage(file, 0);
2078 CHECK(msg);
2079
2080 EXPECT_TRUE(msg->message().has_monotonic_sent_time());
2081 EXPECT_TRUE(msg->message().has_monotonic_remote_time());
2082
2083 const monotonic_clock::time_point
2084 expected_oldest_local_monotonic_timestamps(
2085 chrono::nanoseconds(msg->message().monotonic_sent_time()));
2086 const monotonic_clock::time_point
2087 expected_oldest_remote_monotonic_timestamps(
2088 chrono::nanoseconds(msg->message().monotonic_remote_time()));
2089 const monotonic_clock::time_point
2090 expected_oldest_timestamp_monotonic_timestamps(
2091 chrono::nanoseconds(msg->message().monotonic_timestamp_time()));
2092
2093 EXPECT_NE(expected_oldest_local_monotonic_timestamps,
2094 monotonic_clock::min_time);
2095 EXPECT_NE(expected_oldest_remote_monotonic_timestamps,
2096 monotonic_clock::min_time);
2097 EXPECT_NE(expected_oldest_timestamp_monotonic_timestamps,
2098 monotonic_clock::min_time);
2099
2100 ++timestamp_file_count;
2101 // Since the log file is from the perspective of the other node,
2102 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2103 monotonic_clock::time_point(chrono::nanoseconds(
2104 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2105 0)));
2106 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2107 monotonic_clock::time_point(chrono::nanoseconds(
2108 log_header->message().oldest_local_monotonic_timestamps()->Get(
2109 0)));
2110 const monotonic_clock::time_point
2111 oldest_remote_unreliable_monotonic_timestamps =
2112 monotonic_clock::time_point(chrono::nanoseconds(
2113 log_header->message()
2114 .oldest_remote_unreliable_monotonic_timestamps()
2115 ->Get(0)));
2116 const monotonic_clock::time_point
2117 oldest_local_unreliable_monotonic_timestamps =
2118 monotonic_clock::time_point(chrono::nanoseconds(
2119 log_header->message()
2120 .oldest_local_unreliable_monotonic_timestamps()
2121 ->Get(0)));
2122 const monotonic_clock::time_point
2123 oldest_remote_reliable_monotonic_timestamps =
2124 monotonic_clock::time_point(chrono::nanoseconds(
2125 log_header->message()
2126 .oldest_remote_reliable_monotonic_timestamps()
2127 ->Get(0)));
2128 const monotonic_clock::time_point
2129 oldest_local_reliable_monotonic_timestamps =
2130 monotonic_clock::time_point(chrono::nanoseconds(
2131 log_header->message()
2132 .oldest_local_reliable_monotonic_timestamps()
2133 ->Get(0)));
2134 const monotonic_clock::time_point
2135 oldest_logger_remote_unreliable_monotonic_timestamps =
2136 monotonic_clock::time_point(chrono::nanoseconds(
2137 log_header->message()
2138 .oldest_logger_remote_unreliable_monotonic_timestamps()
2139 ->Get(1)));
2140 const monotonic_clock::time_point
2141 oldest_logger_local_unreliable_monotonic_timestamps =
2142 monotonic_clock::time_point(chrono::nanoseconds(
2143 log_header->message()
2144 .oldest_logger_local_unreliable_monotonic_timestamps()
2145 ->Get(1)));
2146
2147 const Channel *channel =
2148 event_loop_factory_.configuration()->channels()->Get(
2149 msg->message().channel_index());
2150 const Connection *connection = configuration::ConnectionToNode(
2151 channel, configuration::GetNode(
2152 event_loop_factory_.configuration(),
2153 log_header->message().node()->name()->string_view()));
2154
2155 const bool reliable = connection->time_to_live() == 0;
2156
2157 SCOPED_TRACE(file);
2158 SCOPED_TRACE(aos::FlatbufferToJson(
2159 *log_header, {.multi_line = true, .max_vector_size = 100}));
2160
2161 if (shared()) {
2162 // Confirm that the oldest timestamps match what we expect. Based on
2163 // what we are doing, we know that the oldest time is the first
2164 // message's time.
2165 //
2166 // This makes the test robust to both the split and combined config
2167 // tests.
2168 switch (log_header->message().parts_index()) {
2169 case 0:
2170 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2171 expected_oldest_remote_monotonic_timestamps);
2172 EXPECT_EQ(oldest_local_monotonic_timestamps,
2173 expected_oldest_local_monotonic_timestamps);
2174 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2175 expected_oldest_local_monotonic_timestamps)
2176 << file;
2177 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2178 expected_oldest_timestamp_monotonic_timestamps)
2179 << file;
2180
2181 if (reliable) {
2182 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2183 expected_oldest_remote_monotonic_timestamps);
2184 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2185 expected_oldest_local_monotonic_timestamps);
2186 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2187 monotonic_clock::max_time);
2188 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2189 monotonic_clock::max_time);
2190 } else {
2191 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2192 monotonic_clock::max_time);
2193 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2194 monotonic_clock::max_time);
2195 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2196 expected_oldest_remote_monotonic_timestamps);
2197 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2198 expected_oldest_local_monotonic_timestamps);
2199 }
2200 break;
2201 case 1:
2202 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2203 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2204 EXPECT_EQ(oldest_local_monotonic_timestamps,
2205 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2206 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2207 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2208 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2209 monotonic_clock::epoch() + chrono::nanoseconds(90250000));
2210 if (reliable) {
2211 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2212 expected_oldest_remote_monotonic_timestamps);
2213 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2214 expected_oldest_local_monotonic_timestamps);
2215 EXPECT_EQ(
2216 oldest_remote_unreliable_monotonic_timestamps,
2217 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2218 EXPECT_EQ(
2219 oldest_local_unreliable_monotonic_timestamps,
2220 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2221 } else {
2222 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2223 monotonic_clock::max_time);
2224 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2225 monotonic_clock::max_time);
2226 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2227 expected_oldest_remote_monotonic_timestamps);
2228 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2229 expected_oldest_local_monotonic_timestamps);
2230 }
2231 break;
2232 case 2:
2233 EXPECT_EQ(
2234 oldest_remote_monotonic_timestamps,
2235 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2236 EXPECT_EQ(
2237 oldest_local_monotonic_timestamps,
2238 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2239 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2240 expected_oldest_local_monotonic_timestamps)
2241 << file;
2242 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2243 expected_oldest_timestamp_monotonic_timestamps)
2244 << file;
2245 if (reliable) {
2246 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2247 expected_oldest_remote_monotonic_timestamps);
2248 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2249 expected_oldest_local_monotonic_timestamps);
2250 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2251 monotonic_clock::max_time);
2252 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2253 monotonic_clock::max_time);
2254 } else {
2255 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2256 monotonic_clock::max_time);
2257 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2258 monotonic_clock::max_time);
2259 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2260 expected_oldest_remote_monotonic_timestamps);
2261 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2262 expected_oldest_local_monotonic_timestamps);
2263 }
2264 break;
2265
2266 case 3:
2267 EXPECT_EQ(
2268 oldest_remote_monotonic_timestamps,
2269 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2270 EXPECT_EQ(
2271 oldest_local_monotonic_timestamps,
2272 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2273 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2274 expected_oldest_remote_monotonic_timestamps);
2275 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2276 expected_oldest_local_monotonic_timestamps);
2277 EXPECT_EQ(
2278 oldest_logger_remote_unreliable_monotonic_timestamps,
2279 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2280 EXPECT_EQ(
2281 oldest_logger_local_unreliable_monotonic_timestamps,
2282 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2283 break;
2284 default:
2285 FAIL();
2286 break;
2287 }
2288
2289 switch (log_header->message().parts_index()) {
2290 case 0:
2291 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2292 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2293 break;
2294 case 1:
2295 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2296 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2297 break;
2298 case 2:
2299 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2300 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2301 break;
2302 case 3:
2303 if (shared()) {
2304 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2305 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2306 break;
2307 }
2308 [[fallthrough]];
2309 default:
2310 FAIL();
2311 break;
2312 }
2313 } else {
2314 switch (log_header->message().parts_index()) {
2315 case 0:
2316 if (reliable) {
2317 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2318 monotonic_clock::max_time);
2319 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2320 monotonic_clock::max_time);
2321 EXPECT_EQ(
2322 oldest_logger_remote_unreliable_monotonic_timestamps,
2323 monotonic_clock::epoch() + chrono::nanoseconds(100150000))
2324 << file;
2325 EXPECT_EQ(
2326 oldest_logger_local_unreliable_monotonic_timestamps,
2327 monotonic_clock::epoch() + chrono::nanoseconds(100250000))
2328 << file;
2329 } else {
2330 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2331 expected_oldest_remote_monotonic_timestamps);
2332 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2333 expected_oldest_local_monotonic_timestamps);
2334 EXPECT_EQ(
2335 oldest_logger_remote_unreliable_monotonic_timestamps,
2336 monotonic_clock::epoch() + chrono::nanoseconds(90150000))
2337 << file;
2338 EXPECT_EQ(
2339 oldest_logger_local_unreliable_monotonic_timestamps,
2340 monotonic_clock::epoch() + chrono::nanoseconds(90250000))
2341 << file;
2342 }
2343 break;
2344 case 1:
2345 if (reliable) {
2346 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2347 monotonic_clock::max_time);
2348 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2349 monotonic_clock::max_time);
2350 EXPECT_EQ(
2351 oldest_logger_remote_unreliable_monotonic_timestamps,
2352 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2353 EXPECT_EQ(
2354 oldest_logger_local_unreliable_monotonic_timestamps,
2355 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2356 } else {
2357 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2358 expected_oldest_remote_monotonic_timestamps);
2359 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2360 expected_oldest_local_monotonic_timestamps);
2361 EXPECT_EQ(
2362 oldest_logger_remote_unreliable_monotonic_timestamps,
2363 monotonic_clock::epoch() + chrono::nanoseconds(1323150000));
2364 EXPECT_EQ(
2365 oldest_logger_local_unreliable_monotonic_timestamps,
2366 monotonic_clock::epoch() + chrono::nanoseconds(10100250000));
2367 }
2368 break;
2369 default:
2370 FAIL();
2371 break;
2372 }
2373
2374 switch (log_header->message().parts_index()) {
2375 case 0:
2376 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2377 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2378 break;
2379 case 1:
2380 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2381 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2382 break;
2383 default:
2384 FAIL();
2385 break;
2386 }
2387 }
2388
2389 continue;
2390 }
2391 EXPECT_EQ(
2392 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2393 monotonic_clock::max_time.time_since_epoch().count());
2394 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2395 monotonic_clock::max_time.time_since_epoch().count());
2396 EXPECT_EQ(log_header->message()
2397 .oldest_remote_unreliable_monotonic_timestamps()
2398 ->Get(0),
2399 monotonic_clock::max_time.time_since_epoch().count());
2400 EXPECT_EQ(log_header->message()
2401 .oldest_local_unreliable_monotonic_timestamps()
2402 ->Get(0),
2403 monotonic_clock::max_time.time_since_epoch().count());
2404
2405 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2406 monotonic_clock::time_point(chrono::nanoseconds(
2407 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2408 1)));
2409 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2410 monotonic_clock::time_point(chrono::nanoseconds(
2411 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2412 const monotonic_clock::time_point
2413 oldest_remote_unreliable_monotonic_timestamps =
2414 monotonic_clock::time_point(chrono::nanoseconds(
2415 log_header->message()
2416 .oldest_remote_unreliable_monotonic_timestamps()
2417 ->Get(1)));
2418 const monotonic_clock::time_point
2419 oldest_local_unreliable_monotonic_timestamps =
2420 monotonic_clock::time_point(chrono::nanoseconds(
2421 log_header->message()
2422 .oldest_local_unreliable_monotonic_timestamps()
2423 ->Get(1)));
2424 switch (log_header->message().parts_index()) {
2425 case 0:
2426 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2427 monotonic_clock::max_time);
2428 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2429 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2430 monotonic_clock::max_time);
2431 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2432 monotonic_clock::max_time);
2433 break;
2434 default:
2435 FAIL();
2436 break;
2437 }
2438 }
2439
2440 if (shared()) {
2441 EXPECT_EQ(timestamp_file_count, 4u);
2442 } else {
2443 EXPECT_EQ(timestamp_file_count, 4u);
2444 }
2445
2446 // Confirm that we can actually sort the resulting log and read it.
2447 {
2448 LogReader reader(SortParts(filenames));
2449
2450 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2451 log_reader_factory.set_send_delay(chrono::microseconds(0));
2452
2453 // This sends out the fetched messages and advances time to the start of
2454 // the log file.
2455 reader.Register(&log_reader_factory);
2456
2457 log_reader_factory.Run();
2458
2459 reader.Deregister();
2460 }
2461}
2462
2463// Tests that we properly handle one direction of message_bridge being
2464// unavailable.
2465TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
2466 pi1_->Disconnect(pi2_->node());
2467 time_converter_.AddMonotonic(
2468 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2469
2470 time_converter_.AddMonotonic(
2471 {chrono::milliseconds(10000),
2472 chrono::milliseconds(10000) - chrono::milliseconds(1)});
2473 {
2474 LoggerState pi1_logger = MakeLogger(pi1_);
2475
2476 event_loop_factory_.RunFor(chrono::milliseconds(95));
2477
2478 StartLogger(&pi1_logger);
2479
2480 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2481 }
2482
2483 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2484 // to confirm the right thing happened.
2485 ConfirmReadable(pi1_single_direction_logfiles_);
2486}
2487
2488// Tests that we properly handle one direction of message_bridge being
2489// unavailable.
2490TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
2491 pi1_->Disconnect(pi2_->node());
2492 time_converter_.AddMonotonic(
2493 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(500)});
2494
2495 time_converter_.AddMonotonic(
2496 {chrono::milliseconds(10000),
2497 chrono::milliseconds(10000) + chrono::milliseconds(1)});
2498 {
2499 LoggerState pi1_logger = MakeLogger(pi1_);
2500
2501 event_loop_factory_.RunFor(chrono::milliseconds(95));
2502
2503 StartLogger(&pi1_logger);
2504
2505 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2506 }
2507
2508 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2509 // to confirm the right thing happened.
2510 ConfirmReadable(pi1_single_direction_logfiles_);
2511}
2512
2513// Tests that we explode if someone passes in a part file twice with a better
2514// error than an out of order error.
2515TEST_P(MultinodeLoggerTest, DuplicateLogFiles) {
2516 time_converter_.AddMonotonic(
2517 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2518 {
2519 LoggerState pi1_logger = MakeLogger(pi1_);
2520
2521 event_loop_factory_.RunFor(chrono::milliseconds(95));
2522
2523 StartLogger(&pi1_logger);
2524
2525 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2526 }
2527
2528 std::vector<std::string> duplicates;
2529 for (const std::string &f : pi1_single_direction_logfiles_) {
2530 duplicates.emplace_back(f);
2531 duplicates.emplace_back(f);
2532 }
2533 EXPECT_DEATH({ SortParts(duplicates); }, "Found duplicate parts in");
2534}
2535
2536// Tests that we explode if someone loses a part out of the middle of a log.
2537TEST_P(MultinodeLoggerTest, MissingPartsFromMiddle) {
2538 time_converter_.AddMonotonic(
2539 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2540 {
2541 LoggerState pi1_logger = MakeLogger(pi1_);
2542
2543 event_loop_factory_.RunFor(chrono::milliseconds(95));
2544
2545 StartLogger(&pi1_logger);
2546 aos::monotonic_clock::time_point last_rotation_time =
2547 pi1_logger.event_loop->monotonic_now();
2548 pi1_logger.logger->set_on_logged_period([&] {
2549 const auto now = pi1_logger.event_loop->monotonic_now();
2550 if (now > last_rotation_time + std::chrono::seconds(5)) {
2551 pi1_logger.logger->Rotate();
2552 last_rotation_time = now;
2553 }
2554 });
2555
2556 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2557 }
2558
2559 std::vector<std::string> missing_parts;
2560
2561 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
2562 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part2" + Extension());
2563 missing_parts.emplace_back(absl::StrCat(
2564 logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
2565
2566 EXPECT_DEATH({ SortParts(missing_parts); },
2567 "Broken log, missing part files between");
2568}
2569
2570// Tests that we properly handle a dead node. Do this by just disconnecting it
2571// and only using one nodes of logs.
2572TEST_P(MultinodeLoggerTest, DeadNode) {
2573 pi1_->Disconnect(pi2_->node());
2574 pi2_->Disconnect(pi1_->node());
2575 time_converter_.AddMonotonic(
2576 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2577 {
2578 LoggerState pi1_logger = MakeLogger(pi1_);
2579
2580 event_loop_factory_.RunFor(chrono::milliseconds(95));
2581
2582 StartLogger(&pi1_logger);
2583
2584 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2585 }
2586
2587 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2588 // to confirm the right thing happened.
2589 ConfirmReadable(MakePi1DeadNodeLogfiles());
2590}
2591
2592// Tests that we can relog with a different config. This makes most sense when
2593// you are trying to edit a log and want to use channel renaming + the original
2594// config in the new log.
2595TEST_P(MultinodeLoggerTest, LogDifferentConfig) {
2596 time_converter_.StartEqual();
2597 {
2598 LoggerState pi1_logger = MakeLogger(pi1_);
2599 LoggerState pi2_logger = MakeLogger(pi2_);
2600
2601 event_loop_factory_.RunFor(chrono::milliseconds(95));
2602
2603 StartLogger(&pi1_logger);
2604 StartLogger(&pi2_logger);
2605
2606 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2607 }
2608
2609 LogReader reader(SortParts(logfiles_));
2610 reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
2611
2612 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2613 log_reader_factory.set_send_delay(chrono::microseconds(0));
2614
2615 // This sends out the fetched messages and advances time to the start of the
2616 // log file.
2617 reader.Register(&log_reader_factory);
2618
2619 const Node *pi1 =
2620 configuration::GetNode(log_reader_factory.configuration(), "pi1");
2621 const Node *pi2 =
2622 configuration::GetNode(log_reader_factory.configuration(), "pi2");
2623
2624 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
2625 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
2626 LOG(INFO) << "now pi1 "
2627 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
2628 LOG(INFO) << "now pi2 "
2629 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
2630
2631 EXPECT_THAT(reader.LoggedNodes(),
2632 ::testing::ElementsAre(
2633 configuration::GetNode(reader.logged_configuration(), pi1),
2634 configuration::GetNode(reader.logged_configuration(), pi2)));
2635
2636 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
2637
2638 // And confirm we can re-create a log again, while checking the contents.
2639 std::vector<std::string> log_files;
2640 {
2641 LoggerState pi1_logger =
2642 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
2643 &log_reader_factory, reader.logged_configuration());
2644 LoggerState pi2_logger =
2645 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
2646 &log_reader_factory, reader.logged_configuration());
2647
2648 pi1_logger.StartLogger(tmp_dir_ + "/relogged1");
2649 pi2_logger.StartLogger(tmp_dir_ + "/relogged2");
2650
2651 log_reader_factory.Run();
2652
2653 for (auto &x : pi1_logger.log_namer->all_filenames()) {
2654 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged1_", x));
2655 }
2656 for (auto &x : pi2_logger.log_namer->all_filenames()) {
2657 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged2_", x));
2658 }
2659 }
2660
2661 reader.Deregister();
2662
2663 // And verify that we can run the LogReader over the relogged files without
2664 // hitting any fatal errors.
2665 {
2666 LogReader relogged_reader(SortParts(log_files));
2667 relogged_reader.Register();
2668
2669 relogged_reader.event_loop_factory()->Run();
2670 }
2671}
2672
2673// Tests that we properly replay a log where the start time for a node is before
2674// any data on the node. This can happen if the logger starts before data is
2675// published. While the scenario below is a bit convoluted, we have seen logs
2676// like this generated out in the wild.
2677TEST(MultinodeRebootLoggerTest, StartTimeBeforeData) {
2678 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2679 aos::configuration::ReadConfig(ArtifactPath(
2680 "aos/events/logging/multinode_pingpong_split3_config.json"));
2681 message_bridge::TestingTimeConverter time_converter(
2682 configuration::NodesCount(&config.message()));
2683 SimulatedEventLoopFactory event_loop_factory(&config.message());
2684 event_loop_factory.SetTimeConverter(&time_converter);
2685 NodeEventLoopFactory *const pi1 =
2686 event_loop_factory.GetNodeEventLoopFactory("pi1");
2687 const size_t pi1_index = configuration::GetNodeIndex(
2688 event_loop_factory.configuration(), pi1->node());
2689 NodeEventLoopFactory *const pi2 =
2690 event_loop_factory.GetNodeEventLoopFactory("pi2");
2691 const size_t pi2_index = configuration::GetNodeIndex(
2692 event_loop_factory.configuration(), pi2->node());
2693 NodeEventLoopFactory *const pi3 =
2694 event_loop_factory.GetNodeEventLoopFactory("pi3");
2695 const size_t pi3_index = configuration::GetNodeIndex(
2696 event_loop_factory.configuration(), pi3->node());
2697
2698 const std::string kLogfile1_1 =
2699 aos::testing::TestTmpDir() + "/multi_logfile1/";
2700 const std::string kLogfile2_1 =
2701 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
2702 const std::string kLogfile2_2 =
2703 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
2704 const std::string kLogfile3_1 =
2705 aos::testing::TestTmpDir() + "/multi_logfile3/";
2706 util::UnlinkRecursive(kLogfile1_1);
2707 util::UnlinkRecursive(kLogfile2_1);
2708 util::UnlinkRecursive(kLogfile2_2);
2709 util::UnlinkRecursive(kLogfile3_1);
2710 const UUID pi1_boot0 = UUID::Random();
2711 const UUID pi2_boot0 = UUID::Random();
2712 const UUID pi2_boot1 = UUID::Random();
2713 const UUID pi3_boot0 = UUID::Random();
2714 {
2715 CHECK_EQ(pi1_index, 0u);
2716 CHECK_EQ(pi2_index, 1u);
2717 CHECK_EQ(pi3_index, 2u);
2718
2719 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
2720 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
2721 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
2722 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
2723
2724 time_converter.AddNextTimestamp(
2725 distributed_clock::epoch(),
2726 {BootTimestamp::epoch(), BootTimestamp::epoch(),
2727 BootTimestamp::epoch()});
2728 const chrono::nanoseconds reboot_time = chrono::milliseconds(20000);
2729 time_converter.AddNextTimestamp(
2730 distributed_clock::epoch() + reboot_time,
2731 {BootTimestamp::epoch() + reboot_time,
2732 BootTimestamp{
2733 .boot = 1,
2734 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)},
2735 BootTimestamp::epoch() + reboot_time});
2736 }
2737
2738 // Make everything perfectly quiet.
2739 event_loop_factory.SkipTimingReport();
2740 event_loop_factory.DisableStatistics();
2741
2742 std::vector<std::string> filenames;
2743 {
2744 LoggerState pi1_logger = MakeLoggerState(
2745 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2746 LoggerState pi3_logger = MakeLoggerState(
2747 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2748 {
2749 // And now start the logger.
2750 LoggerState pi2_logger = MakeLoggerState(
2751 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2752
2753 event_loop_factory.RunFor(chrono::milliseconds(1000));
2754
2755 pi1_logger.StartLogger(kLogfile1_1);
2756 pi3_logger.StartLogger(kLogfile3_1);
2757 pi2_logger.StartLogger(kLogfile2_1);
2758
2759 event_loop_factory.RunFor(chrono::milliseconds(10000));
2760
2761 // Now that we've got a start time in the past, turn on data.
2762 event_loop_factory.EnableStatistics();
2763 std::unique_ptr<aos::EventLoop> ping_event_loop =
2764 pi1->MakeEventLoop("ping");
2765 Ping ping(ping_event_loop.get());
2766
2767 pi2->AlwaysStart<Pong>("pong");
2768
2769 event_loop_factory.RunFor(chrono::milliseconds(3000));
2770
2771 pi2_logger.AppendAllFilenames(&filenames);
2772
2773 // Stop logging on pi2 before rebooting and completely shut off all
2774 // messages on pi2.
2775 pi2->DisableStatistics();
2776 pi1->Disconnect(pi2->node());
2777 pi2->Disconnect(pi1->node());
2778 }
2779 event_loop_factory.RunFor(chrono::milliseconds(7000));
2780 // pi2 now reboots.
2781 {
2782 event_loop_factory.RunFor(chrono::milliseconds(1000));
2783
2784 // Start logging again on pi2 after it is up.
2785 LoggerState pi2_logger = MakeLoggerState(
2786 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2787 pi2_logger.StartLogger(kLogfile2_2);
2788
2789 event_loop_factory.RunFor(chrono::milliseconds(10000));
2790 // And, now that we have a start time in the log, turn data back on.
2791 pi2->EnableStatistics();
2792 pi1->Connect(pi2->node());
2793 pi2->Connect(pi1->node());
2794
2795 pi2->AlwaysStart<Pong>("pong");
2796 std::unique_ptr<aos::EventLoop> ping_event_loop =
2797 pi1->MakeEventLoop("ping");
2798 Ping ping(ping_event_loop.get());
2799
2800 event_loop_factory.RunFor(chrono::milliseconds(3000));
2801
2802 pi2_logger.AppendAllFilenames(&filenames);
2803 }
2804
2805 pi1_logger.AppendAllFilenames(&filenames);
2806 pi3_logger.AppendAllFilenames(&filenames);
2807 }
2808
2809 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2810 // to confirm the right thing happened.
2811 const std::vector<LogFile> sorted_parts = SortParts(filenames);
2812 auto result = ConfirmReadable(filenames);
2813 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
2814 chrono::seconds(1)));
2815 EXPECT_THAT(result[0].second,
2816 ::testing::ElementsAre(realtime_clock::epoch() +
2817 chrono::microseconds(34990350)));
2818
2819 EXPECT_THAT(result[1].first,
2820 ::testing::ElementsAre(
2821 realtime_clock::epoch() + chrono::seconds(1),
2822 realtime_clock::epoch() + chrono::microseconds(3323000)));
2823 EXPECT_THAT(result[1].second,
2824 ::testing::ElementsAre(
2825 realtime_clock::epoch() + chrono::microseconds(13990200),
2826 realtime_clock::epoch() + chrono::microseconds(16313200)));
2827
2828 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
2829 chrono::seconds(1)));
2830 EXPECT_THAT(result[2].second,
2831 ::testing::ElementsAre(realtime_clock::epoch() +
2832 chrono::microseconds(34900150)));
2833}
2834
2835// Tests that local data before remote data after reboot is properly replayed.
2836// We only trigger a reboot in the timestamp interpolation function when solving
2837// the timestamp problem when we actually have a point in the function. This
2838// originally only happened when a point passes the noncausal filter. At the
2839// start of time for the second boot, if we aren't careful, we will have
2840// messages which need to be published at times before the boot. This happens
2841// when a local message is in the log before a forwarded message, so there is no
2842// point in the interpolation function. This delays the reboot. So, we need to
2843// recreate that situation and make sure it doesn't come back.
2844TEST(MultinodeRebootLoggerTest,
2845 LocalMessageBeforeRemoteBeforeStartAfterReboot) {
2846 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2847 aos::configuration::ReadConfig(ArtifactPath(
2848 "aos/events/logging/multinode_pingpong_split3_config.json"));
2849 message_bridge::TestingTimeConverter time_converter(
2850 configuration::NodesCount(&config.message()));
2851 SimulatedEventLoopFactory event_loop_factory(&config.message());
2852 event_loop_factory.SetTimeConverter(&time_converter);
2853 NodeEventLoopFactory *const pi1 =
2854 event_loop_factory.GetNodeEventLoopFactory("pi1");
2855 const size_t pi1_index = configuration::GetNodeIndex(
2856 event_loop_factory.configuration(), pi1->node());
2857 NodeEventLoopFactory *const pi2 =
2858 event_loop_factory.GetNodeEventLoopFactory("pi2");
2859 const size_t pi2_index = configuration::GetNodeIndex(
2860 event_loop_factory.configuration(), pi2->node());
2861 NodeEventLoopFactory *const pi3 =
2862 event_loop_factory.GetNodeEventLoopFactory("pi3");
2863 const size_t pi3_index = configuration::GetNodeIndex(
2864 event_loop_factory.configuration(), pi3->node());
2865
2866 const std::string kLogfile1_1 =
2867 aos::testing::TestTmpDir() + "/multi_logfile1/";
2868 const std::string kLogfile2_1 =
2869 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
2870 const std::string kLogfile2_2 =
2871 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
2872 const std::string kLogfile3_1 =
2873 aos::testing::TestTmpDir() + "/multi_logfile3/";
2874 util::UnlinkRecursive(kLogfile1_1);
2875 util::UnlinkRecursive(kLogfile2_1);
2876 util::UnlinkRecursive(kLogfile2_2);
2877 util::UnlinkRecursive(kLogfile3_1);
2878 const UUID pi1_boot0 = UUID::Random();
2879 const UUID pi2_boot0 = UUID::Random();
2880 const UUID pi2_boot1 = UUID::Random();
2881 const UUID pi3_boot0 = UUID::Random();
2882 {
2883 CHECK_EQ(pi1_index, 0u);
2884 CHECK_EQ(pi2_index, 1u);
2885 CHECK_EQ(pi3_index, 2u);
2886
2887 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
2888 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
2889 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
2890 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
2891
2892 time_converter.AddNextTimestamp(
2893 distributed_clock::epoch(),
2894 {BootTimestamp::epoch(), BootTimestamp::epoch(),
2895 BootTimestamp::epoch()});
2896 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
2897 time_converter.AddNextTimestamp(
2898 distributed_clock::epoch() + reboot_time,
2899 {BootTimestamp::epoch() + reboot_time,
2900 BootTimestamp{.boot = 1,
2901 .time = monotonic_clock::epoch() + reboot_time +
2902 chrono::seconds(100)},
2903 BootTimestamp::epoch() + reboot_time});
2904 }
2905
2906 std::vector<std::string> filenames;
2907 {
2908 LoggerState pi1_logger = MakeLoggerState(
2909 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2910 LoggerState pi3_logger = MakeLoggerState(
2911 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2912 {
2913 // And now start the logger.
2914 LoggerState pi2_logger = MakeLoggerState(
2915 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2916
2917 pi1_logger.StartLogger(kLogfile1_1);
2918 pi3_logger.StartLogger(kLogfile3_1);
2919 pi2_logger.StartLogger(kLogfile2_1);
2920
2921 event_loop_factory.RunFor(chrono::milliseconds(1005));
2922
2923 // Now that we've got a start time in the past, turn on data.
2924 std::unique_ptr<aos::EventLoop> ping_event_loop =
2925 pi1->MakeEventLoop("ping");
2926 Ping ping(ping_event_loop.get());
2927
2928 pi2->AlwaysStart<Pong>("pong");
2929
2930 event_loop_factory.RunFor(chrono::milliseconds(3000));
2931
2932 pi2_logger.AppendAllFilenames(&filenames);
2933
2934 // Disable any remote messages on pi2.
2935 pi1->Disconnect(pi2->node());
2936 pi2->Disconnect(pi1->node());
2937 }
2938 event_loop_factory.RunFor(chrono::milliseconds(995));
2939 // pi2 now reboots at 5 seconds.
2940 {
2941 event_loop_factory.RunFor(chrono::milliseconds(1000));
2942
2943 // Make local stuff happen before we start logging and connect the remote.
2944 pi2->AlwaysStart<Pong>("pong");
2945 std::unique_ptr<aos::EventLoop> ping_event_loop =
2946 pi1->MakeEventLoop("ping");
2947 Ping ping(ping_event_loop.get());
2948 event_loop_factory.RunFor(chrono::milliseconds(1005));
2949
2950 // Start logging again on pi2 after it is up.
2951 LoggerState pi2_logger = MakeLoggerState(
2952 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2953 pi2_logger.StartLogger(kLogfile2_2);
2954
2955 // And allow remote messages now that we have some local ones.
2956 pi1->Connect(pi2->node());
2957 pi2->Connect(pi1->node());
2958
2959 event_loop_factory.RunFor(chrono::milliseconds(1000));
2960
2961 event_loop_factory.RunFor(chrono::milliseconds(3000));
2962
2963 pi2_logger.AppendAllFilenames(&filenames);
2964 }
2965
2966 pi1_logger.AppendAllFilenames(&filenames);
2967 pi3_logger.AppendAllFilenames(&filenames);
2968 }
2969
2970 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2971 // to confirm the right thing happened.
2972 const std::vector<LogFile> sorted_parts = SortParts(filenames);
2973 auto result = ConfirmReadable(filenames);
2974
2975 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
2976 EXPECT_THAT(result[0].second,
2977 ::testing::ElementsAre(realtime_clock::epoch() +
2978 chrono::microseconds(11000350)));
2979
2980 EXPECT_THAT(result[1].first,
2981 ::testing::ElementsAre(
2982 realtime_clock::epoch(),
2983 realtime_clock::epoch() + chrono::microseconds(107005000)));
2984 EXPECT_THAT(result[1].second,
2985 ::testing::ElementsAre(
2986 realtime_clock::epoch() + chrono::microseconds(4000150),
2987 realtime_clock::epoch() + chrono::microseconds(111000200)));
2988
2989 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
2990 EXPECT_THAT(result[2].second,
2991 ::testing::ElementsAre(realtime_clock::epoch() +
2992 chrono::microseconds(11000150)));
2993
2994 auto start_stop_result = ConfirmReadable(
2995 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
2996 realtime_clock::epoch() + chrono::milliseconds(3000));
2997
2998 EXPECT_THAT(
2999 start_stop_result[0].first,
3000 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3001 EXPECT_THAT(
3002 start_stop_result[0].second,
3003 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3004 EXPECT_THAT(
3005 start_stop_result[1].first,
3006 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3007 EXPECT_THAT(
3008 start_stop_result[1].second,
3009 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3010 EXPECT_THAT(
3011 start_stop_result[2].first,
3012 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3013 EXPECT_THAT(
3014 start_stop_result[2].second,
3015 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3016}
3017
3018// Tests that setting the start and stop flags across a reboot works as
3019// expected.
3020TEST(MultinodeRebootLoggerTest, RebootStartStopTimes) {
3021 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3022 aos::configuration::ReadConfig(ArtifactPath(
3023 "aos/events/logging/multinode_pingpong_split3_config.json"));
3024 message_bridge::TestingTimeConverter time_converter(
3025 configuration::NodesCount(&config.message()));
3026 SimulatedEventLoopFactory event_loop_factory(&config.message());
3027 event_loop_factory.SetTimeConverter(&time_converter);
3028 NodeEventLoopFactory *const pi1 =
3029 event_loop_factory.GetNodeEventLoopFactory("pi1");
3030 const size_t pi1_index = configuration::GetNodeIndex(
3031 event_loop_factory.configuration(), pi1->node());
3032 NodeEventLoopFactory *const pi2 =
3033 event_loop_factory.GetNodeEventLoopFactory("pi2");
3034 const size_t pi2_index = configuration::GetNodeIndex(
3035 event_loop_factory.configuration(), pi2->node());
3036 NodeEventLoopFactory *const pi3 =
3037 event_loop_factory.GetNodeEventLoopFactory("pi3");
3038 const size_t pi3_index = configuration::GetNodeIndex(
3039 event_loop_factory.configuration(), pi3->node());
3040
3041 const std::string kLogfile1_1 =
3042 aos::testing::TestTmpDir() + "/multi_logfile1/";
3043 const std::string kLogfile2_1 =
3044 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3045 const std::string kLogfile2_2 =
3046 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3047 const std::string kLogfile3_1 =
3048 aos::testing::TestTmpDir() + "/multi_logfile3/";
3049 util::UnlinkRecursive(kLogfile1_1);
3050 util::UnlinkRecursive(kLogfile2_1);
3051 util::UnlinkRecursive(kLogfile2_2);
3052 util::UnlinkRecursive(kLogfile3_1);
3053 {
3054 CHECK_EQ(pi1_index, 0u);
3055 CHECK_EQ(pi2_index, 1u);
3056 CHECK_EQ(pi3_index, 2u);
3057
3058 time_converter.AddNextTimestamp(
3059 distributed_clock::epoch(),
3060 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3061 BootTimestamp::epoch()});
3062 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3063 time_converter.AddNextTimestamp(
3064 distributed_clock::epoch() + reboot_time,
3065 {BootTimestamp::epoch() + reboot_time,
3066 BootTimestamp{.boot = 1,
3067 .time = monotonic_clock::epoch() + reboot_time},
3068 BootTimestamp::epoch() + reboot_time});
3069 }
3070
3071 std::vector<std::string> filenames;
3072 {
3073 LoggerState pi1_logger = MakeLoggerState(
3074 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3075 LoggerState pi3_logger = MakeLoggerState(
3076 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3077 {
3078 // And now start the logger.
3079 LoggerState pi2_logger = MakeLoggerState(
3080 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3081
3082 pi1_logger.StartLogger(kLogfile1_1);
3083 pi3_logger.StartLogger(kLogfile3_1);
3084 pi2_logger.StartLogger(kLogfile2_1);
3085
3086 event_loop_factory.RunFor(chrono::milliseconds(1005));
3087
3088 // Now that we've got a start time in the past, turn on data.
3089 std::unique_ptr<aos::EventLoop> ping_event_loop =
3090 pi1->MakeEventLoop("ping");
3091 Ping ping(ping_event_loop.get());
3092
3093 pi2->AlwaysStart<Pong>("pong");
3094
3095 event_loop_factory.RunFor(chrono::milliseconds(3000));
3096
3097 pi2_logger.AppendAllFilenames(&filenames);
3098 }
3099 event_loop_factory.RunFor(chrono::milliseconds(995));
3100 // pi2 now reboots at 5 seconds.
3101 {
3102 event_loop_factory.RunFor(chrono::milliseconds(1000));
3103
3104 // Make local stuff happen before we start logging and connect the remote.
3105 pi2->AlwaysStart<Pong>("pong");
3106 std::unique_ptr<aos::EventLoop> ping_event_loop =
3107 pi1->MakeEventLoop("ping");
3108 Ping ping(ping_event_loop.get());
3109 event_loop_factory.RunFor(chrono::milliseconds(5));
3110
3111 // Start logging again on pi2 after it is up.
3112 LoggerState pi2_logger = MakeLoggerState(
3113 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3114 pi2_logger.StartLogger(kLogfile2_2);
3115
3116 event_loop_factory.RunFor(chrono::milliseconds(5000));
3117
3118 pi2_logger.AppendAllFilenames(&filenames);
3119 }
3120
3121 pi1_logger.AppendAllFilenames(&filenames);
3122 pi3_logger.AppendAllFilenames(&filenames);
3123 }
3124
3125 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3126 auto result = ConfirmReadable(filenames);
3127
3128 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3129 EXPECT_THAT(result[0].second,
3130 ::testing::ElementsAre(realtime_clock::epoch() +
3131 chrono::microseconds(11000350)));
3132
3133 EXPECT_THAT(result[1].first,
3134 ::testing::ElementsAre(
3135 realtime_clock::epoch(),
3136 realtime_clock::epoch() + chrono::microseconds(6005000)));
3137 EXPECT_THAT(result[1].second,
3138 ::testing::ElementsAre(
3139 realtime_clock::epoch() + chrono::microseconds(4900150),
3140 realtime_clock::epoch() + chrono::microseconds(11000200)));
3141
3142 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3143 EXPECT_THAT(result[2].second,
3144 ::testing::ElementsAre(realtime_clock::epoch() +
3145 chrono::microseconds(11000150)));
3146
3147 // Confirm we observed the correct start and stop times. We should see the
3148 // reboot here.
3149 auto start_stop_result = ConfirmReadable(
3150 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3151 realtime_clock::epoch() + chrono::milliseconds(8000));
3152
3153 EXPECT_THAT(
3154 start_stop_result[0].first,
3155 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3156 EXPECT_THAT(
3157 start_stop_result[0].second,
3158 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3159 EXPECT_THAT(start_stop_result[1].first,
3160 ::testing::ElementsAre(
3161 realtime_clock::epoch() + chrono::seconds(2),
3162 realtime_clock::epoch() + chrono::microseconds(6005000)));
3163 EXPECT_THAT(start_stop_result[1].second,
3164 ::testing::ElementsAre(
3165 realtime_clock::epoch() + chrono::microseconds(4900150),
3166 realtime_clock::epoch() + chrono::seconds(8)));
3167 EXPECT_THAT(
3168 start_stop_result[2].first,
3169 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3170 EXPECT_THAT(
3171 start_stop_result[2].second,
3172 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3173}
3174
3175// Tests that we properly handle one direction being down.
3176TEST(MissingDirectionTest, OneDirection) {
3177 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3178 aos::configuration::ReadConfig(ArtifactPath(
3179 "aos/events/logging/multinode_pingpong_split4_config.json"));
3180 message_bridge::TestingTimeConverter time_converter(
3181 configuration::NodesCount(&config.message()));
3182 SimulatedEventLoopFactory event_loop_factory(&config.message());
3183 event_loop_factory.SetTimeConverter(&time_converter);
3184
3185 NodeEventLoopFactory *const pi1 =
3186 event_loop_factory.GetNodeEventLoopFactory("pi1");
3187 const size_t pi1_index = configuration::GetNodeIndex(
3188 event_loop_factory.configuration(), pi1->node());
3189 NodeEventLoopFactory *const pi2 =
3190 event_loop_factory.GetNodeEventLoopFactory("pi2");
3191 const size_t pi2_index = configuration::GetNodeIndex(
3192 event_loop_factory.configuration(), pi2->node());
3193 std::vector<std::string> filenames;
3194
3195 {
3196 CHECK_EQ(pi1_index, 0u);
3197 CHECK_EQ(pi2_index, 1u);
3198
3199 time_converter.AddNextTimestamp(
3200 distributed_clock::epoch(),
3201 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3202
3203 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3204 time_converter.AddNextTimestamp(
3205 distributed_clock::epoch() + reboot_time,
3206 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3207 BootTimestamp::epoch() + reboot_time});
3208 }
3209
3210 const std::string kLogfile2_1 =
3211 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3212 const std::string kLogfile1_1 =
3213 aos::testing::TestTmpDir() + "/multi_logfile1.1/";
3214 util::UnlinkRecursive(kLogfile2_1);
3215 util::UnlinkRecursive(kLogfile1_1);
3216
3217 pi2->Disconnect(pi1->node());
3218
3219 pi1->AlwaysStart<Ping>("ping");
3220 pi2->AlwaysStart<Pong>("pong");
3221
3222 {
3223 LoggerState pi2_logger = MakeLoggerState(
3224 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3225
3226 event_loop_factory.RunFor(chrono::milliseconds(95));
3227
3228 pi2_logger.StartLogger(kLogfile2_1);
3229
3230 event_loop_factory.RunFor(chrono::milliseconds(6000));
3231
3232 pi2->Connect(pi1->node());
3233
3234 LoggerState pi1_logger = MakeLoggerState(
3235 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3236 pi1_logger.StartLogger(kLogfile1_1);
3237
3238 event_loop_factory.RunFor(chrono::milliseconds(5000));
3239 pi1_logger.AppendAllFilenames(&filenames);
3240 pi2_logger.AppendAllFilenames(&filenames);
3241 }
3242
3243 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3244 ConfirmReadable(filenames);
3245}
3246
3247// Tests that we properly handle only one direction ever existing after a
3248// reboot.
3249TEST(MissingDirectionTest, OneDirectionAfterReboot) {
3250 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3251 aos::configuration::ReadConfig(ArtifactPath(
3252 "aos/events/logging/multinode_pingpong_split4_config.json"));
3253 message_bridge::TestingTimeConverter time_converter(
3254 configuration::NodesCount(&config.message()));
3255 SimulatedEventLoopFactory event_loop_factory(&config.message());
3256 event_loop_factory.SetTimeConverter(&time_converter);
3257
3258 NodeEventLoopFactory *const pi1 =
3259 event_loop_factory.GetNodeEventLoopFactory("pi1");
3260 const size_t pi1_index = configuration::GetNodeIndex(
3261 event_loop_factory.configuration(), pi1->node());
3262 NodeEventLoopFactory *const pi2 =
3263 event_loop_factory.GetNodeEventLoopFactory("pi2");
3264 const size_t pi2_index = configuration::GetNodeIndex(
3265 event_loop_factory.configuration(), pi2->node());
3266 std::vector<std::string> filenames;
3267
3268 {
3269 CHECK_EQ(pi1_index, 0u);
3270 CHECK_EQ(pi2_index, 1u);
3271
3272 time_converter.AddNextTimestamp(
3273 distributed_clock::epoch(),
3274 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3275
3276 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3277 time_converter.AddNextTimestamp(
3278 distributed_clock::epoch() + reboot_time,
3279 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3280 BootTimestamp::epoch() + reboot_time});
3281 }
3282
3283 const std::string kLogfile2_1 =
3284 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3285 util::UnlinkRecursive(kLogfile2_1);
3286
3287 pi1->AlwaysStart<Ping>("ping");
3288
3289 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3290 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3291 // second boot.
3292 {
3293 LoggerState pi2_logger = MakeLoggerState(
3294 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3295
3296 event_loop_factory.RunFor(chrono::milliseconds(95));
3297
3298 pi2_logger.StartLogger(kLogfile2_1);
3299
3300 event_loop_factory.RunFor(chrono::milliseconds(4000));
3301
3302 pi2->Disconnect(pi1->node());
3303
3304 event_loop_factory.RunFor(chrono::milliseconds(1000));
3305 pi1->AlwaysStart<Ping>("ping");
3306
3307 event_loop_factory.RunFor(chrono::milliseconds(5000));
3308 pi2_logger.AppendAllFilenames(&filenames);
3309 }
3310
3311 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3312 ConfirmReadable(filenames);
3313}
3314
3315// Tests that we properly handle only one direction ever existing after a reboot
3316// with only reliable data.
3317TEST(MissingDirectionTest, OneDirectionAfterRebootReliable) {
3318 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3319 aos::configuration::ReadConfig(ArtifactPath(
3320 "aos/events/logging/multinode_pingpong_split4_reliable_config.json"));
3321 message_bridge::TestingTimeConverter time_converter(
3322 configuration::NodesCount(&config.message()));
3323 SimulatedEventLoopFactory event_loop_factory(&config.message());
3324 event_loop_factory.SetTimeConverter(&time_converter);
3325
3326 NodeEventLoopFactory *const pi1 =
3327 event_loop_factory.GetNodeEventLoopFactory("pi1");
3328 const size_t pi1_index = configuration::GetNodeIndex(
3329 event_loop_factory.configuration(), pi1->node());
3330 NodeEventLoopFactory *const pi2 =
3331 event_loop_factory.GetNodeEventLoopFactory("pi2");
3332 const size_t pi2_index = configuration::GetNodeIndex(
3333 event_loop_factory.configuration(), pi2->node());
3334 std::vector<std::string> filenames;
3335
3336 {
3337 CHECK_EQ(pi1_index, 0u);
3338 CHECK_EQ(pi2_index, 1u);
3339
3340 time_converter.AddNextTimestamp(
3341 distributed_clock::epoch(),
3342 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3343
3344 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3345 time_converter.AddNextTimestamp(
3346 distributed_clock::epoch() + reboot_time,
3347 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3348 BootTimestamp::epoch() + reboot_time});
3349 }
3350
3351 const std::string kLogfile2_1 =
3352 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3353 util::UnlinkRecursive(kLogfile2_1);
3354
3355 pi1->AlwaysStart<Ping>("ping");
3356
3357 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3358 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3359 // second boot.
3360 {
3361 LoggerState pi2_logger = MakeLoggerState(
3362 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3363
3364 event_loop_factory.RunFor(chrono::milliseconds(95));
3365
3366 pi2_logger.StartLogger(kLogfile2_1);
3367
3368 event_loop_factory.RunFor(chrono::milliseconds(4000));
3369
3370 pi2->Disconnect(pi1->node());
3371
3372 event_loop_factory.RunFor(chrono::milliseconds(1000));
3373 pi1->AlwaysStart<Ping>("ping");
3374
3375 event_loop_factory.RunFor(chrono::milliseconds(5000));
3376 pi2_logger.AppendAllFilenames(&filenames);
3377 }
3378
3379 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3380 ConfirmReadable(filenames);
3381}
3382
3383// Tests that we properly handle what used to be a time violation in one
3384// direction. This can occur when one direction goes down after sending some
3385// data, but the other keeps working. The down direction ends up resolving to a
3386// straight line in the noncausal filter, where the direction which is still up
3387// can cross that line. Really, time progressed along just fine but we assumed
3388// that the offset was a line when it could have deviated by up to 1ms/second.
3389TEST_P(MultinodeLoggerTest, OneDirectionTimeDrift) {
3390 std::vector<std::string> filenames;
3391
3392 CHECK_EQ(pi1_index_, 0u);
3393 CHECK_EQ(pi2_index_, 1u);
3394
3395 time_converter_.AddNextTimestamp(
3396 distributed_clock::epoch(),
3397 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3398
3399 const chrono::nanoseconds before_disconnect_duration =
3400 time_converter_.AddMonotonic(
3401 {chrono::milliseconds(1000), chrono::milliseconds(1000)});
3402
3403 const chrono::nanoseconds test_duration =
3404 time_converter_.AddMonotonic(
3405 {chrono::milliseconds(1000), chrono::milliseconds(1000)}) +
3406 time_converter_.AddMonotonic(
3407 {chrono::milliseconds(10000),
3408 chrono::milliseconds(10000) - chrono::milliseconds(5)}) +
3409 time_converter_.AddMonotonic(
3410 {chrono::milliseconds(10000),
3411 chrono::milliseconds(10000) + chrono::milliseconds(5)});
3412
3413 const std::string kLogfile =
3414 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3415 util::UnlinkRecursive(kLogfile);
3416
3417 {
3418 LoggerState pi2_logger = MakeLogger(pi2_);
3419 pi2_logger.StartLogger(kLogfile);
3420 event_loop_factory_.RunFor(before_disconnect_duration);
3421
3422 pi2_->Disconnect(pi1_->node());
3423
3424 event_loop_factory_.RunFor(test_duration);
3425 pi2_->Connect(pi1_->node());
3426
3427 event_loop_factory_.RunFor(chrono::milliseconds(5000));
3428 pi2_logger.AppendAllFilenames(&filenames);
3429 }
3430
3431 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3432 ConfirmReadable(filenames);
3433}
3434
3435// Tests that we can replay a logfile that has timestamps such that at least one
3436// node's epoch is at a positive distributed_clock (and thus will have to be
3437// booted after the other node(s)).
3438TEST_P(MultinodeLoggerTest, StartOneNodeBeforeOther) {
3439 std::vector<std::string> filenames;
3440
3441 CHECK_EQ(pi1_index_, 0u);
3442 CHECK_EQ(pi2_index_, 1u);
3443
3444 time_converter_.AddNextTimestamp(
3445 distributed_clock::epoch(),
3446 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3447
3448 const chrono::nanoseconds before_reboot_duration = chrono::milliseconds(1000);
3449 time_converter_.RebootAt(
3450 0, distributed_clock::time_point(before_reboot_duration));
3451
3452 const chrono::nanoseconds test_duration = time_converter_.AddMonotonic(
3453 {chrono::milliseconds(10000), chrono::milliseconds(10000)});
3454
3455 const std::string kLogfile =
3456 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3457 util::UnlinkRecursive(kLogfile);
3458
3459 pi2_->Disconnect(pi1_->node());
3460 pi1_->Disconnect(pi2_->node());
3461
3462 {
3463 LoggerState pi2_logger = MakeLogger(pi2_);
3464
3465 pi2_logger.StartLogger(kLogfile);
3466 event_loop_factory_.RunFor(before_reboot_duration);
3467
3468 pi2_->Connect(pi1_->node());
3469 pi1_->Connect(pi2_->node());
3470
3471 event_loop_factory_.RunFor(test_duration);
3472
3473 pi2_logger.AppendAllFilenames(&filenames);
3474 }
3475
3476 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3477 ConfirmReadable(filenames);
3478
3479 {
3480 LogReader reader(sorted_parts);
3481 SimulatedEventLoopFactory replay_factory(reader.configuration());
3482 reader.RegisterWithoutStarting(&replay_factory);
3483
3484 NodeEventLoopFactory *const replay_node =
3485 reader.event_loop_factory()->GetNodeEventLoopFactory("pi1");
3486
3487 std::unique_ptr<EventLoop> test_event_loop =
3488 replay_node->MakeEventLoop("test_reader");
3489 replay_node->OnStartup([replay_node]() {
3490 // Check that we didn't boot until at least t=0.
3491 CHECK_LE(monotonic_clock::epoch(), replay_node->monotonic_now());
3492 });
3493 test_event_loop->OnRun([&test_event_loop]() {
3494 // Check that we didn't boot until at least t=0.
3495 EXPECT_LE(monotonic_clock::epoch(), test_event_loop->monotonic_now());
3496 });
3497 reader.event_loop_factory()->Run();
3498 reader.Deregister();
3499 }
3500}
3501
3502// Tests that when we have a loop without all the logs at all points in time, we
3503// can sort it properly.
3504TEST(MultinodeLoggerLoopTest, Loop) {
3505 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3506 aos::configuration::ReadConfig(ArtifactPath(
3507 "aos/events/logging/multinode_pingpong_triangle_split_config.json"));
3508 message_bridge::TestingTimeConverter time_converter(
3509 configuration::NodesCount(&config.message()));
3510 SimulatedEventLoopFactory event_loop_factory(&config.message());
3511 event_loop_factory.SetTimeConverter(&time_converter);
3512
3513 NodeEventLoopFactory *const pi1 =
3514 event_loop_factory.GetNodeEventLoopFactory("pi1");
3515 NodeEventLoopFactory *const pi2 =
3516 event_loop_factory.GetNodeEventLoopFactory("pi2");
3517 NodeEventLoopFactory *const pi3 =
3518 event_loop_factory.GetNodeEventLoopFactory("pi3");
3519
3520 const std::string kLogfile1_1 =
3521 aos::testing::TestTmpDir() + "/multi_logfile1/";
3522 const std::string kLogfile2_1 =
3523 aos::testing::TestTmpDir() + "/multi_logfile2/";
3524 const std::string kLogfile3_1 =
3525 aos::testing::TestTmpDir() + "/multi_logfile3/";
3526 util::UnlinkRecursive(kLogfile1_1);
3527 util::UnlinkRecursive(kLogfile2_1);
3528 util::UnlinkRecursive(kLogfile3_1);
3529
3530 {
3531 // Make pi1 boot before everything else.
3532 time_converter.AddNextTimestamp(
3533 distributed_clock::epoch(),
3534 {BootTimestamp::epoch(),
3535 BootTimestamp::epoch() - chrono::milliseconds(100),
3536 BootTimestamp::epoch() - chrono::milliseconds(300)});
3537 }
3538
3539 // We want to setup a situation such that 2 of the 3 legs of the loop are very
3540 // confident about time being X, and the third leg is pulling the average off
3541 // to one side.
3542 //
3543 // It's easiest to visualize this in timestamp_plotter.
3544
3545 std::vector<std::string> filenames;
3546 {
3547 // Have pi1 send out a reliable message at startup. This sets up a long
3548 // forwarding time message at the start to bias time.
3549 std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
3550 {
3551 aos::Sender<examples::Ping> ping_sender =
3552 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
3553
3554 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
3555 examples::Ping::Builder ping_builder =
3556 builder.MakeBuilder<examples::Ping>();
3557 CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
3558 }
3559
3560 // Wait a while so there's enough data to let the worst case be rather off.
3561 event_loop_factory.RunFor(chrono::seconds(1000));
3562
3563 // Now start a receiving node first. This sets up 2 tight bounds between 2
3564 // of the nodes.
3565 LoggerState pi2_logger = MakeLoggerState(
3566 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3567 pi2_logger.StartLogger(kLogfile2_1);
3568
3569 event_loop_factory.RunFor(chrono::seconds(100));
3570
3571 // And now start the third leg.
3572 LoggerState pi3_logger = MakeLoggerState(
3573 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3574 pi3_logger.StartLogger(kLogfile3_1);
3575
3576 LoggerState pi1_logger = MakeLoggerState(
3577 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3578 pi1_logger.StartLogger(kLogfile1_1);
3579
3580 event_loop_factory.RunFor(chrono::seconds(100));
3581
3582 pi1_logger.AppendAllFilenames(&filenames);
3583 pi2_logger.AppendAllFilenames(&filenames);
3584 pi3_logger.AppendAllFilenames(&filenames);
3585 }
3586
3587 // Make sure we can read this.
3588 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3589 auto result = ConfirmReadable(filenames);
3590}
3591
3592} // namespace testing
3593} // namespace logger
3594} // namespace aos