blob: f1784bf64a63a602688a0a72d9866373efb57878 [file] [log] [blame]
Naman Guptaa63aa132023-03-22 20:06:34 -07001#include "aos/events/logging/log_reader.h"
2#include "aos/events/logging/multinode_logger_test_lib.h"
3#include "aos/events/message_counter.h"
4#include "aos/events/ping_lib.h"
5#include "aos/events/pong_lib.h"
6#include "aos/network/remote_message_generated.h"
7#include "aos/network/timestamp_generated.h"
8#include "aos/testing/tmpdir.h"
9#include "gmock/gmock.h"
10#include "gtest/gtest.h"
11
12namespace aos {
13namespace logger {
14namespace testing {
15
16namespace chrono = std::chrono;
17using aos::message_bridge::RemoteMessage;
18using aos::testing::ArtifactPath;
19using aos::testing::MessageCounter;
20
Naman Guptaa63aa132023-03-22 20:06:34 -070021INSTANTIATE_TEST_SUITE_P(
22 All, MultinodeLoggerTest,
23 ::testing::Combine(
24 ::testing::Values(
25 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080026 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070027 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080028 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070029 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
30
31INSTANTIATE_TEST_SUITE_P(
32 All, MultinodeLoggerDeathTest,
33 ::testing::Combine(
34 ::testing::Values(
35 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080036 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070037 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080038 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070039 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
40
41// Tests that we can write and read simple multi-node log files.
42TEST_P(MultinodeLoggerTest, SimpleMultiNode) {
43 std::vector<std::string> actual_filenames;
44 time_converter_.StartEqual();
45
46 {
47 LoggerState pi1_logger = MakeLogger(pi1_);
48 LoggerState pi2_logger = MakeLogger(pi2_);
49
50 event_loop_factory_.RunFor(chrono::milliseconds(95));
51
52 StartLogger(&pi1_logger);
53 StartLogger(&pi2_logger);
54
55 event_loop_factory_.RunFor(chrono::milliseconds(20000));
56 pi1_logger.AppendAllFilenames(&actual_filenames);
57 pi2_logger.AppendAllFilenames(&actual_filenames);
58 }
59
60 ASSERT_THAT(actual_filenames,
61 ::testing::UnorderedElementsAreArray(logfiles_));
62
63 {
64 std::set<std::string> logfile_uuids;
65 std::set<std::string> parts_uuids;
66 // Confirm that we have the expected number of UUIDs for both the logfile
67 // UUIDs and parts UUIDs.
68 std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
69 for (std::string_view f : logfiles_) {
70 log_header.emplace_back(ReadHeader(f).value());
71 if (!log_header.back().message().has_configuration()) {
72 logfile_uuids.insert(
73 log_header.back().message().log_event_uuid()->str());
74 parts_uuids.insert(log_header.back().message().parts_uuid()->str());
75 }
76 }
77
78 EXPECT_EQ(logfile_uuids.size(), 2u);
79 if (shared()) {
80 EXPECT_EQ(parts_uuids.size(), 7u);
81 } else {
82 EXPECT_EQ(parts_uuids.size(), 8u);
83 }
84
85 // And confirm everything is on the correct node.
86 EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
87 EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi1");
88 EXPECT_EQ(log_header[4].message().node()->name()->string_view(), "pi1");
89
90 EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
91 EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi2");
92
93 EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi2");
94 EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi2");
95 EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi2");
96
97 EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi1");
98 EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi1");
99
100 EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
101 EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
102
103 if (shared()) {
104 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
105 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi2");
106 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi2");
107
108 EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi1");
109 EXPECT_EQ(log_header[18].message().node()->name()->string_view(), "pi1");
110 } else {
111 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
112 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi2");
113
114 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi1");
115 EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi1");
116
117 EXPECT_EQ(log_header[18].message().node()->name()->string_view(), "pi2");
118 EXPECT_EQ(log_header[19].message().node()->name()->string_view(), "pi2");
119 }
120
121 // And the parts index matches.
122 EXPECT_EQ(log_header[2].message().parts_index(), 0);
123 EXPECT_EQ(log_header[3].message().parts_index(), 1);
124 EXPECT_EQ(log_header[4].message().parts_index(), 2);
125
126 EXPECT_EQ(log_header[5].message().parts_index(), 0);
127 EXPECT_EQ(log_header[6].message().parts_index(), 1);
128
129 EXPECT_EQ(log_header[7].message().parts_index(), 0);
130 EXPECT_EQ(log_header[8].message().parts_index(), 1);
131 EXPECT_EQ(log_header[9].message().parts_index(), 2);
132
133 EXPECT_EQ(log_header[10].message().parts_index(), 0);
134 EXPECT_EQ(log_header[11].message().parts_index(), 1);
135
136 EXPECT_EQ(log_header[12].message().parts_index(), 0);
137 EXPECT_EQ(log_header[13].message().parts_index(), 1);
138
139 if (shared()) {
140 EXPECT_EQ(log_header[14].message().parts_index(), 0);
141 EXPECT_EQ(log_header[15].message().parts_index(), 1);
142 EXPECT_EQ(log_header[16].message().parts_index(), 2);
143
144 EXPECT_EQ(log_header[17].message().parts_index(), 0);
145 EXPECT_EQ(log_header[18].message().parts_index(), 1);
146 } else {
147 EXPECT_EQ(log_header[14].message().parts_index(), 0);
148 EXPECT_EQ(log_header[15].message().parts_index(), 1);
149
150 EXPECT_EQ(log_header[16].message().parts_index(), 0);
151 EXPECT_EQ(log_header[17].message().parts_index(), 1);
152
153 EXPECT_EQ(log_header[18].message().parts_index(), 0);
154 EXPECT_EQ(log_header[19].message().parts_index(), 1);
155 }
156 }
157
158 const std::vector<LogFile> sorted_log_files = SortParts(logfiles_);
159 {
160 using ::testing::UnorderedElementsAre;
161 std::shared_ptr<const aos::Configuration> config =
162 sorted_log_files[0].config;
163
164 // Timing reports, pings
165 EXPECT_THAT(CountChannelsData(config, logfiles_[2]),
166 UnorderedElementsAre(
167 std::make_tuple("/pi1/aos",
168 "aos.message_bridge.ServerStatistics", 1),
169 std::make_tuple("/test", "aos.examples.Ping", 1)))
170 << " : " << logfiles_[2];
171 {
172 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
173 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 1),
174 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
175 1)};
176 if (!std::get<0>(GetParam()).shared) {
177 channel_counts.push_back(
178 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
179 "aos-message_bridge-Timestamp",
180 "aos.message_bridge.RemoteMessage", 1));
181 }
182 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
183 ::testing::UnorderedElementsAreArray(channel_counts))
184 << " : " << logfiles_[3];
185 }
186 {
187 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
188 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 199),
189 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
190 20),
191 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
192 199),
193 std::make_tuple("/pi1/aos", "aos.timing.Report", 40),
194 std::make_tuple("/test", "aos.examples.Ping", 2000)};
195 if (!std::get<0>(GetParam()).shared) {
196 channel_counts.push_back(
197 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
198 "aos-message_bridge-Timestamp",
199 "aos.message_bridge.RemoteMessage", 199));
200 }
201 EXPECT_THAT(CountChannelsData(config, logfiles_[4]),
202 ::testing::UnorderedElementsAreArray(channel_counts))
203 << " : " << logfiles_[4];
204 }
205 // Timestamps for pong
206 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
207 UnorderedElementsAre())
208 << " : " << logfiles_[2];
209 EXPECT_THAT(
210 CountChannelsTimestamp(config, logfiles_[3]),
211 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
212 << " : " << logfiles_[3];
213 EXPECT_THAT(
214 CountChannelsTimestamp(config, logfiles_[4]),
215 UnorderedElementsAre(
216 std::make_tuple("/test", "aos.examples.Pong", 2000),
217 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)))
218 << " : " << logfiles_[4];
219
220 // Pong data.
221 EXPECT_THAT(
222 CountChannelsData(config, logfiles_[5]),
223 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 91)))
224 << " : " << logfiles_[5];
225 EXPECT_THAT(CountChannelsData(config, logfiles_[6]),
226 UnorderedElementsAre(
227 std::make_tuple("/test", "aos.examples.Pong", 1910)))
228 << " : " << logfiles_[6];
229
230 // No timestamps
231 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
232 UnorderedElementsAre())
233 << " : " << logfiles_[5];
234 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[6]),
235 UnorderedElementsAre())
236 << " : " << logfiles_[6];
237
238 // Timing reports and pongs.
239 EXPECT_THAT(CountChannelsData(config, logfiles_[7]),
240 UnorderedElementsAre(std::make_tuple(
241 "/pi2/aos", "aos.message_bridge.ServerStatistics", 1)))
242 << " : " << logfiles_[7];
243 EXPECT_THAT(
244 CountChannelsData(config, logfiles_[8]),
245 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
246 << " : " << logfiles_[8];
247 EXPECT_THAT(
248 CountChannelsData(config, logfiles_[9]),
249 UnorderedElementsAre(
250 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
251 std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
252 20),
253 std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
254 200),
255 std::make_tuple("/pi2/aos", "aos.timing.Report", 40),
256 std::make_tuple("/test", "aos.examples.Pong", 2000)))
257 << " : " << logfiles_[9];
258 // And ping timestamps.
259 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[7]),
260 UnorderedElementsAre())
261 << " : " << logfiles_[7];
262 EXPECT_THAT(
263 CountChannelsTimestamp(config, logfiles_[8]),
264 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
265 << " : " << logfiles_[8];
266 EXPECT_THAT(
267 CountChannelsTimestamp(config, logfiles_[9]),
268 UnorderedElementsAre(
269 std::make_tuple("/test", "aos.examples.Ping", 2000),
270 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
271 << " : " << logfiles_[9];
272
273 // And then test that the remotely logged timestamp data files only have
274 // timestamps in them.
275 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
276 UnorderedElementsAre())
277 << " : " << logfiles_[10];
278 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
279 UnorderedElementsAre())
280 << " : " << logfiles_[11];
281 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[12]),
282 UnorderedElementsAre())
283 << " : " << logfiles_[12];
284 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[13]),
285 UnorderedElementsAre())
286 << " : " << logfiles_[13];
287
288 EXPECT_THAT(CountChannelsData(config, logfiles_[10]),
289 UnorderedElementsAre(std::make_tuple(
290 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
291 << " : " << logfiles_[10];
292 EXPECT_THAT(CountChannelsData(config, logfiles_[11]),
293 UnorderedElementsAre(std::make_tuple(
294 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
295 << " : " << logfiles_[11];
296
297 EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
298 UnorderedElementsAre(std::make_tuple(
299 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
300 << " : " << logfiles_[12];
301 EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
302 UnorderedElementsAre(std::make_tuple(
303 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
304 << " : " << logfiles_[13];
305
306 // Timestamps from pi2 on pi1, and the other way.
307 if (shared()) {
308 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
309 UnorderedElementsAre())
310 << " : " << logfiles_[14];
311 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
312 UnorderedElementsAre())
313 << " : " << logfiles_[15];
314 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
315 UnorderedElementsAre())
316 << " : " << logfiles_[16];
317 EXPECT_THAT(CountChannelsData(config, logfiles_[17]),
318 UnorderedElementsAre())
319 << " : " << logfiles_[17];
320 EXPECT_THAT(CountChannelsData(config, logfiles_[18]),
321 UnorderedElementsAre())
322 << " : " << logfiles_[18];
323
324 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
325 UnorderedElementsAre(
326 std::make_tuple("/test", "aos.examples.Ping", 1)))
327 << " : " << logfiles_[14];
328 EXPECT_THAT(
329 CountChannelsTimestamp(config, logfiles_[15]),
330 UnorderedElementsAre(
331 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 9),
332 std::make_tuple("/test", "aos.examples.Ping", 90)))
333 << " : " << logfiles_[15];
334 EXPECT_THAT(
335 CountChannelsTimestamp(config, logfiles_[16]),
336 UnorderedElementsAre(
337 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 191),
338 std::make_tuple("/test", "aos.examples.Ping", 1910)))
339 << " : " << logfiles_[16];
340 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
341 UnorderedElementsAre(std::make_tuple(
342 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
343 << " : " << logfiles_[17];
344 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[18]),
345 UnorderedElementsAre(std::make_tuple(
346 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
347 << " : " << logfiles_[18];
348 } else {
349 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
350 UnorderedElementsAre())
351 << " : " << logfiles_[14];
352 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
353 UnorderedElementsAre())
354 << " : " << logfiles_[15];
355 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
356 UnorderedElementsAre())
357 << " : " << logfiles_[16];
358 EXPECT_THAT(CountChannelsData(config, logfiles_[17]),
359 UnorderedElementsAre())
360 << " : " << logfiles_[17];
361 EXPECT_THAT(CountChannelsData(config, logfiles_[18]),
362 UnorderedElementsAre())
363 << " : " << logfiles_[18];
364 EXPECT_THAT(CountChannelsData(config, logfiles_[19]),
365 UnorderedElementsAre())
366 << " : " << logfiles_[19];
367
368 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
369 UnorderedElementsAre(std::make_tuple(
370 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
371 << " : " << logfiles_[14];
372 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
373 UnorderedElementsAre(std::make_tuple(
374 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
375 << " : " << logfiles_[15];
376 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[16]),
377 UnorderedElementsAre(std::make_tuple(
378 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
379 << " : " << logfiles_[16];
380 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
381 UnorderedElementsAre(std::make_tuple(
382 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
383 << " : " << logfiles_[17];
384 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[18]),
385 UnorderedElementsAre(
386 std::make_tuple("/test", "aos.examples.Ping", 91)))
387 << " : " << logfiles_[18];
388 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[19]),
389 UnorderedElementsAre(
390 std::make_tuple("/test", "aos.examples.Ping", 1910)))
391 << " : " << logfiles_[19];
392 }
393 }
394
395 LogReader reader(sorted_log_files);
396
397 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
398 log_reader_factory.set_send_delay(chrono::microseconds(0));
399
400 // This sends out the fetched messages and advances time to the start of the
401 // log file.
402 reader.Register(&log_reader_factory);
403
404 const Node *pi1 =
405 configuration::GetNode(log_reader_factory.configuration(), "pi1");
406 const Node *pi2 =
407 configuration::GetNode(log_reader_factory.configuration(), "pi2");
408
409 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
410 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
411 LOG(INFO) << "now pi1 "
412 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
413 LOG(INFO) << "now pi2 "
414 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
415
416 EXPECT_THAT(reader.LoggedNodes(),
417 ::testing::ElementsAre(
418 configuration::GetNode(reader.logged_configuration(), pi1),
419 configuration::GetNode(reader.logged_configuration(), pi2)));
420
421 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
422
423 std::unique_ptr<EventLoop> pi1_event_loop =
424 log_reader_factory.MakeEventLoop("test", pi1);
425 std::unique_ptr<EventLoop> pi2_event_loop =
426 log_reader_factory.MakeEventLoop("test", pi2);
427
428 int pi1_ping_count = 10;
429 int pi2_ping_count = 10;
430 int pi1_pong_count = 10;
431 int pi2_pong_count = 10;
432
433 // Confirm that the ping value matches.
434 pi1_event_loop->MakeWatcher(
435 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
436 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
437 << pi1_event_loop->context().monotonic_remote_time << " -> "
438 << pi1_event_loop->context().monotonic_event_time;
439 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
440 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
441 pi1_ping_count * chrono::milliseconds(10) +
442 monotonic_clock::epoch());
443 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
444 pi1_ping_count * chrono::milliseconds(10) +
445 realtime_clock::epoch());
446 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
447 pi1_event_loop->context().monotonic_event_time);
448 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
449 pi1_event_loop->context().realtime_event_time);
450
451 ++pi1_ping_count;
452 });
453 pi2_event_loop->MakeWatcher(
454 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
455 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
456 << pi2_event_loop->context().monotonic_remote_time << " -> "
457 << pi2_event_loop->context().monotonic_event_time;
458 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
459
460 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
461 pi2_ping_count * chrono::milliseconds(10) +
462 monotonic_clock::epoch());
463 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
464 pi2_ping_count * chrono::milliseconds(10) +
465 realtime_clock::epoch());
466 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time +
467 chrono::microseconds(150),
468 pi2_event_loop->context().monotonic_event_time);
469 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time +
470 chrono::microseconds(150),
471 pi2_event_loop->context().realtime_event_time);
472 ++pi2_ping_count;
473 });
474
475 constexpr ssize_t kQueueIndexOffset = -9;
476 // Confirm that the ping and pong counts both match, and the value also
477 // matches.
478 pi1_event_loop->MakeWatcher(
479 "/test", [&pi1_event_loop, &pi1_ping_count,
480 &pi1_pong_count](const examples::Pong &pong) {
481 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
482 << pi1_event_loop->context().monotonic_remote_time << " -> "
483 << pi1_event_loop->context().monotonic_event_time;
484
485 EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
486 pi1_pong_count + kQueueIndexOffset);
487 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
488 chrono::microseconds(200) +
489 pi1_pong_count * chrono::milliseconds(10) +
490 monotonic_clock::epoch());
491 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
492 chrono::microseconds(200) +
493 pi1_pong_count * chrono::milliseconds(10) +
494 realtime_clock::epoch());
495
496 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
497 chrono::microseconds(150),
498 pi1_event_loop->context().monotonic_event_time);
499 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
500 chrono::microseconds(150),
501 pi1_event_loop->context().realtime_event_time);
502
503 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
504 ++pi1_pong_count;
505 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
506 });
507 pi2_event_loop->MakeWatcher(
508 "/test", [&pi2_event_loop, &pi2_ping_count,
509 &pi2_pong_count](const examples::Pong &pong) {
510 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
511 << pi2_event_loop->context().monotonic_remote_time << " -> "
512 << pi2_event_loop->context().monotonic_event_time;
513
514 EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
515 pi2_pong_count + kQueueIndexOffset);
516
517 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
518 chrono::microseconds(200) +
519 pi2_pong_count * chrono::milliseconds(10) +
520 monotonic_clock::epoch());
521 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
522 chrono::microseconds(200) +
523 pi2_pong_count * chrono::milliseconds(10) +
524 realtime_clock::epoch());
525
526 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
527 pi2_event_loop->context().monotonic_event_time);
528 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
529 pi2_event_loop->context().realtime_event_time);
530
531 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
532 ++pi2_pong_count;
533 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
534 });
535
536 log_reader_factory.Run();
537 EXPECT_EQ(pi1_ping_count, 2010);
538 EXPECT_EQ(pi2_ping_count, 2010);
539 EXPECT_EQ(pi1_pong_count, 2010);
540 EXPECT_EQ(pi2_pong_count, 2010);
541
542 reader.Deregister();
543}
544
545// Test that if we feed the replay with a mismatched node list that we die on
546// the LogReader constructor.
547TEST_P(MultinodeLoggerDeathTest, MultiNodeBadReplayConfig) {
548 time_converter_.StartEqual();
549 {
550 LoggerState pi1_logger = MakeLogger(pi1_);
551 LoggerState pi2_logger = MakeLogger(pi2_);
552
553 event_loop_factory_.RunFor(chrono::milliseconds(95));
554
555 StartLogger(&pi1_logger);
556 StartLogger(&pi2_logger);
557
558 event_loop_factory_.RunFor(chrono::milliseconds(20000));
559 }
560
561 // Test that, if we add an additional node to the replay config that the
562 // logger complains about the mismatch in number of nodes.
563 FlatbufferDetachedBuffer<Configuration> extra_nodes_config =
564 configuration::MergeWithConfig(&config_.message(), R"({
565 "nodes": [
566 {
567 "name": "extra-node"
568 }
569 ]
570 }
571 )");
572
573 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
574 EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
575 "Log file and replay config need to have matching nodes lists.");
576}
577
578// Tests that we can read log files where they don't start at the same monotonic
579// time.
580TEST_P(MultinodeLoggerTest, StaggeredStart) {
581 time_converter_.StartEqual();
582 std::vector<std::string> actual_filenames;
583
584 {
585 LoggerState pi1_logger = MakeLogger(pi1_);
586 LoggerState pi2_logger = MakeLogger(pi2_);
587
588 event_loop_factory_.RunFor(chrono::milliseconds(95));
589
590 StartLogger(&pi1_logger);
591
592 event_loop_factory_.RunFor(chrono::milliseconds(200));
593
594 StartLogger(&pi2_logger);
595
596 event_loop_factory_.RunFor(chrono::milliseconds(20000));
597 pi1_logger.AppendAllFilenames(&actual_filenames);
598 pi2_logger.AppendAllFilenames(&actual_filenames);
599 }
600
601 // Since we delay starting pi2, it already knows about all the timestamps so
602 // we don't end up with extra parts.
603 LogReader reader(SortParts(actual_filenames));
604
605 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
606 log_reader_factory.set_send_delay(chrono::microseconds(0));
607
608 // This sends out the fetched messages and advances time to the start of the
609 // log file.
610 reader.Register(&log_reader_factory);
611
612 const Node *pi1 =
613 configuration::GetNode(log_reader_factory.configuration(), "pi1");
614 const Node *pi2 =
615 configuration::GetNode(log_reader_factory.configuration(), "pi2");
616
617 EXPECT_THAT(reader.LoggedNodes(),
618 ::testing::ElementsAre(
619 configuration::GetNode(reader.logged_configuration(), pi1),
620 configuration::GetNode(reader.logged_configuration(), pi2)));
621
622 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
623
624 std::unique_ptr<EventLoop> pi1_event_loop =
625 log_reader_factory.MakeEventLoop("test", pi1);
626 std::unique_ptr<EventLoop> pi2_event_loop =
627 log_reader_factory.MakeEventLoop("test", pi2);
628
629 int pi1_ping_count = 30;
630 int pi2_ping_count = 30;
631 int pi1_pong_count = 30;
632 int pi2_pong_count = 30;
633
634 // Confirm that the ping value matches.
635 pi1_event_loop->MakeWatcher(
636 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
637 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
638 << pi1_event_loop->context().monotonic_remote_time << " -> "
639 << pi1_event_loop->context().monotonic_event_time;
640 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
641
642 ++pi1_ping_count;
643 });
644 pi2_event_loop->MakeWatcher(
645 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
646 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
647 << pi2_event_loop->context().monotonic_remote_time << " -> "
648 << pi2_event_loop->context().monotonic_event_time;
649 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
650
651 ++pi2_ping_count;
652 });
653
654 // Confirm that the ping and pong counts both match, and the value also
655 // matches.
656 pi1_event_loop->MakeWatcher(
657 "/test", [&pi1_event_loop, &pi1_ping_count,
658 &pi1_pong_count](const examples::Pong &pong) {
659 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
660 << pi1_event_loop->context().monotonic_remote_time << " -> "
661 << pi1_event_loop->context().monotonic_event_time;
662
663 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
664 ++pi1_pong_count;
665 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
666 });
667 pi2_event_loop->MakeWatcher(
668 "/test", [&pi2_event_loop, &pi2_ping_count,
669 &pi2_pong_count](const examples::Pong &pong) {
670 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
671 << pi2_event_loop->context().monotonic_remote_time << " -> "
672 << pi2_event_loop->context().monotonic_event_time;
673
674 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
675 ++pi2_pong_count;
676 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
677 });
678
679 log_reader_factory.Run();
680 EXPECT_EQ(pi1_ping_count, 2030);
681 EXPECT_EQ(pi2_ping_count, 2030);
682 EXPECT_EQ(pi1_pong_count, 2030);
683 EXPECT_EQ(pi2_pong_count, 2030);
684
685 reader.Deregister();
686}
687
688// Tests that we can read log files where the monotonic clocks drift and don't
689// match correctly. While we are here, also test that different ending times
690// also is readable.
691TEST_P(MultinodeLoggerTest, MismatchedClocks) {
692 // TODO(austin): Negate...
693 const chrono::nanoseconds initial_pi2_offset = chrono::seconds(1000);
694
695 time_converter_.AddMonotonic(
696 {BootTimestamp::epoch(), BootTimestamp::epoch() + initial_pi2_offset});
697 // Wait for 95 ms, (~0.1 seconds - 1/2 of the ping/pong period), and set the
698 // skew to be 200 uS/s
699 const chrono::nanoseconds startup_sleep1 = time_converter_.AddMonotonic(
700 {chrono::milliseconds(95),
701 chrono::milliseconds(95) - chrono::nanoseconds(200) * 95});
702 // Run another 200 ms to have one logger start first.
703 const chrono::nanoseconds startup_sleep2 = time_converter_.AddMonotonic(
704 {chrono::milliseconds(200), chrono::milliseconds(200)});
705 // Slew one way then the other at the same 200 uS/S slew rate. Make sure we
706 // go far enough to cause problems if this isn't accounted for.
707 const chrono::nanoseconds logger_run1 = time_converter_.AddMonotonic(
708 {chrono::milliseconds(20000),
709 chrono::milliseconds(20000) - chrono::nanoseconds(200) * 20000});
710 const chrono::nanoseconds logger_run2 = time_converter_.AddMonotonic(
711 {chrono::milliseconds(40000),
712 chrono::milliseconds(40000) + chrono::nanoseconds(200) * 40000});
713 const chrono::nanoseconds logger_run3 = time_converter_.AddMonotonic(
714 {chrono::milliseconds(400), chrono::milliseconds(400)});
715
716 {
717 LoggerState pi2_logger = MakeLogger(pi2_);
718
719 LOG(INFO) << "pi2 times: " << pi2_->monotonic_now() << " "
720 << pi2_->realtime_now() << " distributed "
721 << pi2_->ToDistributedClock(pi2_->monotonic_now());
722
723 LOG(INFO) << "pi2_ times: " << pi2_->monotonic_now() << " "
724 << pi2_->realtime_now() << " distributed "
725 << pi2_->ToDistributedClock(pi2_->monotonic_now());
726
727 event_loop_factory_.RunFor(startup_sleep1);
728
729 StartLogger(&pi2_logger);
730
731 event_loop_factory_.RunFor(startup_sleep2);
732
733 {
734 // Run pi1's logger for only part of the time.
735 LoggerState pi1_logger = MakeLogger(pi1_);
736
737 StartLogger(&pi1_logger);
738 event_loop_factory_.RunFor(logger_run1);
739
740 // Make sure we slewed time far enough so that the difference is greater
741 // than the network delay. This confirms that if we sort incorrectly, it
742 // would show in the results.
743 EXPECT_LT(
744 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
745 -event_loop_factory_.send_delay() -
746 event_loop_factory_.network_delay());
747
748 event_loop_factory_.RunFor(logger_run2);
749
750 // And now check that we went far enough the other way to make sure we
751 // cover both problems.
752 EXPECT_GT(
753 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
754 event_loop_factory_.send_delay() +
755 event_loop_factory_.network_delay());
756 }
757
758 // And log a bit more on pi2.
759 event_loop_factory_.RunFor(logger_run3);
760 }
761
762 LogReader reader(
763 SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 3, 2)));
764
765 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
766 log_reader_factory.set_send_delay(chrono::microseconds(0));
767
768 const Node *pi1 =
769 configuration::GetNode(log_reader_factory.configuration(), "pi1");
770 const Node *pi2 =
771 configuration::GetNode(log_reader_factory.configuration(), "pi2");
772
773 // This sends out the fetched messages and advances time to the start of the
774 // log file.
775 reader.Register(&log_reader_factory);
776
777 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
778 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
779 LOG(INFO) << "now pi1 "
780 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
781 LOG(INFO) << "now pi2 "
782 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
783
784 LOG(INFO) << "Done registering (pi1) "
785 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
786 << " "
787 << log_reader_factory.GetNodeEventLoopFactory(pi1)->realtime_now();
788 LOG(INFO) << "Done registering (pi2) "
789 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now()
790 << " "
791 << log_reader_factory.GetNodeEventLoopFactory(pi2)->realtime_now();
792
793 EXPECT_THAT(reader.LoggedNodes(),
794 ::testing::ElementsAre(
795 configuration::GetNode(reader.logged_configuration(), pi1),
796 configuration::GetNode(reader.logged_configuration(), pi2)));
797
798 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
799
800 std::unique_ptr<EventLoop> pi1_event_loop =
801 log_reader_factory.MakeEventLoop("test", pi1);
802 std::unique_ptr<EventLoop> pi2_event_loop =
803 log_reader_factory.MakeEventLoop("test", pi2);
804
805 int pi1_ping_count = 30;
806 int pi2_ping_count = 30;
807 int pi1_pong_count = 30;
808 int pi2_pong_count = 30;
809
810 // Confirm that the ping value matches.
811 pi1_event_loop->MakeWatcher(
812 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
813 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
814 << pi1_event_loop->context().monotonic_remote_time << " -> "
815 << pi1_event_loop->context().monotonic_event_time;
816 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
817
818 ++pi1_ping_count;
819 });
820 pi2_event_loop->MakeWatcher(
821 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
822 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
823 << pi2_event_loop->context().monotonic_remote_time << " -> "
824 << pi2_event_loop->context().monotonic_event_time;
825 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
826
827 ++pi2_ping_count;
828 });
829
830 // Confirm that the ping and pong counts both match, and the value also
831 // matches.
832 pi1_event_loop->MakeWatcher(
833 "/test", [&pi1_event_loop, &pi1_ping_count,
834 &pi1_pong_count](const examples::Pong &pong) {
835 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
836 << pi1_event_loop->context().monotonic_remote_time << " -> "
837 << pi1_event_loop->context().monotonic_event_time;
838
839 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
840 ++pi1_pong_count;
841 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
842 });
843 pi2_event_loop->MakeWatcher(
844 "/test", [&pi2_event_loop, &pi2_ping_count,
845 &pi2_pong_count](const examples::Pong &pong) {
846 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
847 << pi2_event_loop->context().monotonic_remote_time << " -> "
848 << pi2_event_loop->context().monotonic_event_time;
849
850 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
851 ++pi2_pong_count;
852 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
853 });
854
855 log_reader_factory.Run();
856 EXPECT_EQ(pi1_ping_count, 6030);
857 EXPECT_EQ(pi2_ping_count, 6030);
858 EXPECT_EQ(pi1_pong_count, 6030);
859 EXPECT_EQ(pi2_pong_count, 6030);
860
861 reader.Deregister();
862}
863
864// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
865TEST_P(MultinodeLoggerTest, SortParts) {
866 time_converter_.StartEqual();
867 // Make a bunch of parts.
868 {
869 LoggerState pi1_logger = MakeLogger(pi1_);
870 LoggerState pi2_logger = MakeLogger(pi2_);
871
872 event_loop_factory_.RunFor(chrono::milliseconds(95));
873
874 StartLogger(&pi1_logger);
875 StartLogger(&pi2_logger);
876
877 event_loop_factory_.RunFor(chrono::milliseconds(2000));
878 }
879
880 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
881 VerifyParts(sorted_parts);
882}
883
884// Tests that we can sort a bunch of parts with an empty part. We should ignore
885// it and remove it from the sorted list.
886TEST_P(MultinodeLoggerTest, SortEmptyParts) {
887 time_converter_.StartEqual();
888 // Make a bunch of parts.
889 {
890 LoggerState pi1_logger = MakeLogger(pi1_);
891 LoggerState pi2_logger = MakeLogger(pi2_);
892
893 event_loop_factory_.RunFor(chrono::milliseconds(95));
894
895 StartLogger(&pi1_logger);
896 StartLogger(&pi2_logger);
897
898 event_loop_factory_.RunFor(chrono::milliseconds(2000));
899 }
900
901 // TODO(austin): Should we flip out if the file can't open?
902 const std::string kEmptyFile("foobarinvalidfiledoesnotexist" + Extension());
903
904 aos::util::WriteStringToFileOrDie(kEmptyFile, "");
905 logfiles_.emplace_back(kEmptyFile);
906
907 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
908 VerifyParts(sorted_parts, {kEmptyFile});
909}
910
911// Tests that we can sort a bunch of parts with the end missing off a
912// file. We should use the part we can read.
913TEST_P(MultinodeLoggerTest, SortTruncatedParts) {
914 std::vector<std::string> actual_filenames;
915 time_converter_.StartEqual();
916 // Make a bunch of parts.
917 {
918 LoggerState pi1_logger = MakeLogger(pi1_);
919 LoggerState pi2_logger = MakeLogger(pi2_);
920
921 event_loop_factory_.RunFor(chrono::milliseconds(95));
922
923 StartLogger(&pi1_logger);
924 StartLogger(&pi2_logger);
925
926 event_loop_factory_.RunFor(chrono::milliseconds(2000));
927
928 pi1_logger.AppendAllFilenames(&actual_filenames);
929 pi2_logger.AppendAllFilenames(&actual_filenames);
930 }
931
932 ASSERT_THAT(actual_filenames,
933 ::testing::UnorderedElementsAreArray(logfiles_));
934
935 // Strip off the end of one of the files. Pick one with a lot of data.
936 // For snappy, needs to have enough data to be >1 chunk of compressed data so
937 // that we don't corrupt the entire log part.
938 ::std::string compressed_contents =
939 aos::util::ReadFileToStringOrDie(logfiles_[4]);
940
941 aos::util::WriteStringToFileOrDie(
942 logfiles_[4],
943 compressed_contents.substr(0, compressed_contents.size() - 100));
944
945 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
946 VerifyParts(sorted_parts);
947}
948
949// Tests that if we remap a remapped channel, it shows up correctly.
950TEST_P(MultinodeLoggerTest, RemapLoggedChannel) {
951 time_converter_.StartEqual();
952 {
953 LoggerState pi1_logger = MakeLogger(pi1_);
954 LoggerState pi2_logger = MakeLogger(pi2_);
955
956 event_loop_factory_.RunFor(chrono::milliseconds(95));
957
958 StartLogger(&pi1_logger);
959 StartLogger(&pi2_logger);
960
961 event_loop_factory_.RunFor(chrono::milliseconds(20000));
962 }
963
964 LogReader reader(SortParts(logfiles_));
965
966 // Remap just on pi1.
967 reader.RemapLoggedChannel<aos::timing::Report>(
968 "/aos", configuration::GetNode(reader.configuration(), "pi1"));
969
970 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
971 log_reader_factory.set_send_delay(chrono::microseconds(0));
972
973 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
974 // Note: An extra channel gets remapped automatically due to a timestamp
975 // channel being LOCAL_LOGGER'd.
976 ASSERT_EQ(remapped_channels.size(), std::get<0>(GetParam()).shared ? 1u : 2u);
977 EXPECT_EQ(remapped_channels[0]->name()->string_view(), "/original/pi1/aos");
978 EXPECT_EQ(remapped_channels[0]->type()->string_view(), "aos.timing.Report");
979 if (!std::get<0>(GetParam()).shared) {
980 EXPECT_EQ(remapped_channels[1]->name()->string_view(),
981 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
982 "aos-message_bridge-Timestamp");
983 EXPECT_EQ(remapped_channels[1]->type()->string_view(),
984 "aos.message_bridge.RemoteMessage");
985 }
986
987 reader.Register(&log_reader_factory);
988
989 const Node *pi1 =
990 configuration::GetNode(log_reader_factory.configuration(), "pi1");
991 const Node *pi2 =
992 configuration::GetNode(log_reader_factory.configuration(), "pi2");
993
994 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
995 // else should have moved.
996 std::unique_ptr<EventLoop> pi1_event_loop =
997 log_reader_factory.MakeEventLoop("test", pi1);
998 pi1_event_loop->SkipTimingReport();
999 std::unique_ptr<EventLoop> full_pi1_event_loop =
1000 log_reader_factory.MakeEventLoop("test", pi1);
1001 full_pi1_event_loop->SkipTimingReport();
1002 std::unique_ptr<EventLoop> pi2_event_loop =
1003 log_reader_factory.MakeEventLoop("test", pi2);
1004 pi2_event_loop->SkipTimingReport();
1005
1006 MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
1007 "/aos");
1008 MessageCounter<aos::timing::Report> full_pi1_timing_report(
1009 full_pi1_event_loop.get(), "/pi1/aos");
1010 MessageCounter<aos::timing::Report> pi1_original_timing_report(
1011 pi1_event_loop.get(), "/original/aos");
1012 MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
1013 full_pi1_event_loop.get(), "/original/pi1/aos");
1014 MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
1015 "/aos");
1016
1017 log_reader_factory.Run();
1018
1019 EXPECT_EQ(pi1_timing_report.count(), 0u);
1020 EXPECT_EQ(full_pi1_timing_report.count(), 0u);
1021 EXPECT_NE(pi1_original_timing_report.count(), 0u);
1022 EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
1023 EXPECT_NE(pi2_timing_report.count(), 0u);
1024
1025 reader.Deregister();
1026}
1027
1028// Tests that we can remap a forwarded channel as well.
1029TEST_P(MultinodeLoggerTest, RemapForwardedLoggedChannel) {
1030 time_converter_.StartEqual();
1031 {
1032 LoggerState pi1_logger = MakeLogger(pi1_);
1033 LoggerState pi2_logger = MakeLogger(pi2_);
1034
1035 event_loop_factory_.RunFor(chrono::milliseconds(95));
1036
1037 StartLogger(&pi1_logger);
1038 StartLogger(&pi2_logger);
1039
1040 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1041 }
1042
1043 LogReader reader(SortParts(logfiles_));
1044
1045 reader.RemapLoggedChannel<examples::Ping>("/test");
1046
1047 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1048 log_reader_factory.set_send_delay(chrono::microseconds(0));
1049
1050 reader.Register(&log_reader_factory);
1051
1052 const Node *pi1 =
1053 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1054 const Node *pi2 =
1055 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1056
1057 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1058 // else should have moved.
1059 std::unique_ptr<EventLoop> pi1_event_loop =
1060 log_reader_factory.MakeEventLoop("test", pi1);
1061 pi1_event_loop->SkipTimingReport();
1062 std::unique_ptr<EventLoop> full_pi1_event_loop =
1063 log_reader_factory.MakeEventLoop("test", pi1);
1064 full_pi1_event_loop->SkipTimingReport();
1065 std::unique_ptr<EventLoop> pi2_event_loop =
1066 log_reader_factory.MakeEventLoop("test", pi2);
1067 pi2_event_loop->SkipTimingReport();
1068
1069 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1070 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1071 MessageCounter<examples::Ping> pi1_original_ping(pi1_event_loop.get(),
1072 "/original/test");
1073 MessageCounter<examples::Ping> pi2_original_ping(pi2_event_loop.get(),
1074 "/original/test");
1075
1076 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1077 pi1_original_ping_timestamp;
1078 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1079 pi1_ping_timestamp;
1080 if (!shared()) {
1081 pi1_original_ping_timestamp =
1082 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1083 pi1_event_loop.get(),
1084 "/pi1/aos/remote_timestamps/pi2/original/test/aos-examples-Ping");
1085 pi1_ping_timestamp =
1086 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1087 pi1_event_loop.get(),
1088 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1089 }
1090
1091 log_reader_factory.Run();
1092
1093 EXPECT_EQ(pi1_ping.count(), 0u);
1094 EXPECT_EQ(pi2_ping.count(), 0u);
1095 EXPECT_NE(pi1_original_ping.count(), 0u);
1096 EXPECT_NE(pi2_original_ping.count(), 0u);
1097 if (!shared()) {
1098 EXPECT_NE(pi1_original_ping_timestamp->count(), 0u);
1099 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1100 }
1101
1102 reader.Deregister();
1103}
1104
1105// Tests that we observe all the same events in log replay (for a given node)
1106// whether we just register an event loop for that node or if we register a full
1107// event loop factory.
1108TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
1109 time_converter_.StartEqual();
1110 constexpr chrono::milliseconds kStartupDelay(95);
1111 {
1112 LoggerState pi1_logger = MakeLogger(pi1_);
1113 LoggerState pi2_logger = MakeLogger(pi2_);
1114
1115 event_loop_factory_.RunFor(kStartupDelay);
1116
1117 StartLogger(&pi1_logger);
1118 StartLogger(&pi2_logger);
1119
1120 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1121 }
1122
1123 LogReader full_reader(SortParts(logfiles_));
1124 LogReader single_node_reader(SortParts(logfiles_));
1125
1126 SimulatedEventLoopFactory full_factory(full_reader.configuration());
1127 SimulatedEventLoopFactory single_node_factory(
1128 single_node_reader.configuration());
1129 single_node_factory.SkipTimingReport();
1130 single_node_factory.DisableStatistics();
1131 std::unique_ptr<EventLoop> replay_event_loop =
1132 single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
1133 "log_reader");
1134
1135 full_reader.Register(&full_factory);
1136 single_node_reader.Register(replay_event_loop.get());
1137
1138 const Node *full_pi1 =
1139 configuration::GetNode(full_factory.configuration(), "pi1");
1140
1141 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1142 // else should have moved.
1143 std::unique_ptr<EventLoop> full_event_loop =
1144 full_factory.MakeEventLoop("test", full_pi1);
1145 full_event_loop->SkipTimingReport();
1146 full_event_loop->SkipAosLog();
1147 // maps are indexed on channel index.
1148 // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
1149 std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
1150 observed_messages;
1151 std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
1152 for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
1153 ++ii) {
1154 const Channel *channel =
1155 full_event_loop->configuration()->channels()->Get(ii);
1156 // We currently don't support replaying remote timestamp channels in
1157 // realtime replay (unless the remote timestamp channel was not NOT_LOGGED,
1158 // in which case it gets auto-remapped and replayed on a /original channel).
1159 if (channel->name()->string_view().find("remote_timestamp") !=
1160 std::string_view::npos &&
1161 channel->name()->string_view().find("/original") ==
1162 std::string_view::npos) {
1163 continue;
1164 }
1165 if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
1166 observed_messages[ii] = {};
1167 fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
1168 full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
1169 if (fetchers[ii]->Fetch()) {
1170 observed_messages[ii].push_back(std::make_pair(
1171 fetchers[ii]->context().monotonic_event_time, true));
1172 }
1173 });
1174 full_event_loop->MakeRawNoArgWatcher(
1175 channel, [ii, &observed_messages](const Context &context) {
1176 observed_messages[ii].push_back(
1177 std::make_pair(context.monotonic_event_time, false));
1178 });
1179 }
1180 }
1181
1182 full_factory.Run();
1183 fetchers.clear();
1184 full_reader.Deregister();
1185
1186 const Node *single_node_pi1 =
1187 configuration::GetNode(single_node_factory.configuration(), "pi1");
1188 std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
1189
1190 std::unique_ptr<EventLoop> single_node_event_loop =
1191 single_node_factory.MakeEventLoop("test", single_node_pi1);
1192 single_node_event_loop->SkipTimingReport();
1193 single_node_event_loop->SkipAosLog();
1194 for (size_t ii = 0;
1195 ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
1196 const Channel *channel =
1197 single_node_event_loop->configuration()->channels()->Get(ii);
1198 single_node_factory.DisableForwarding(channel);
1199 if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
1200 single_node_fetchers[ii] =
1201 single_node_event_loop->MakeRawFetcher(channel);
1202 single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
1203 EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
1204 << "Single EventLoop replay doesn't support pre-loading fetchers. "
1205 << configuration::StrippedChannelToString(channel);
1206 });
1207 single_node_event_loop->MakeRawNoArgWatcher(
1208 channel, [ii, &observed_messages, channel,
1209 kStartupDelay](const Context &context) {
1210 if (observed_messages[ii].empty()) {
1211 FAIL() << "Observed extra message at "
1212 << context.monotonic_event_time << " on "
1213 << configuration::StrippedChannelToString(channel);
1214 return;
1215 }
1216 const std::pair<monotonic_clock::time_point, bool> &message =
1217 observed_messages[ii].front();
1218 if (message.second) {
1219 EXPECT_LE(message.first,
1220 context.monotonic_event_time + kStartupDelay)
1221 << "Mismatched message times " << context.monotonic_event_time
1222 << " and " << message.first << " on "
1223 << configuration::StrippedChannelToString(channel);
1224 } else {
1225 EXPECT_EQ(message.first,
1226 context.monotonic_event_time + kStartupDelay)
1227 << "Mismatched message times " << context.monotonic_event_time
1228 << " and " << message.first << " on "
1229 << configuration::StrippedChannelToString(channel);
1230 }
1231 observed_messages[ii].erase(observed_messages[ii].begin());
1232 });
1233 }
1234 }
1235
1236 single_node_factory.Run();
1237
1238 single_node_fetchers.clear();
1239
1240 single_node_reader.Deregister();
1241
1242 for (const auto &pair : observed_messages) {
1243 EXPECT_TRUE(pair.second.empty())
1244 << "Missed " << pair.second.size() << " messages on "
1245 << configuration::StrippedChannelToString(
1246 single_node_event_loop->configuration()->channels()->Get(
1247 pair.first));
1248 }
1249}
1250
1251// Tests that we properly recreate forwarded timestamps when replaying a log.
1252// This should be enough that we can then re-run the logger and get a valid log
1253// back.
1254TEST_P(MultinodeLoggerTest, MessageHeader) {
1255 time_converter_.StartEqual();
1256 {
1257 LoggerState pi1_logger = MakeLogger(pi1_);
1258 LoggerState pi2_logger = MakeLogger(pi2_);
1259
1260 event_loop_factory_.RunFor(chrono::milliseconds(95));
1261
1262 StartLogger(&pi1_logger);
1263 StartLogger(&pi2_logger);
1264
1265 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1266 }
1267
1268 LogReader reader(SortParts(logfiles_));
1269
1270 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1271 log_reader_factory.set_send_delay(chrono::microseconds(0));
1272
1273 // This sends out the fetched messages and advances time to the start of the
1274 // log file.
1275 reader.Register(&log_reader_factory);
1276
1277 const Node *pi1 =
1278 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1279 const Node *pi2 =
1280 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1281
1282 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1283 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1284 LOG(INFO) << "now pi1 "
1285 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1286 LOG(INFO) << "now pi2 "
1287 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1288
1289 EXPECT_THAT(reader.LoggedNodes(),
1290 ::testing::ElementsAre(
1291 configuration::GetNode(reader.logged_configuration(), pi1),
1292 configuration::GetNode(reader.logged_configuration(), pi2)));
1293
1294 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1295
1296 std::unique_ptr<EventLoop> pi1_event_loop =
1297 log_reader_factory.MakeEventLoop("test", pi1);
1298 std::unique_ptr<EventLoop> pi2_event_loop =
1299 log_reader_factory.MakeEventLoop("test", pi2);
1300
1301 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
1302 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1303 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
1304 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1305
1306 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1307 pi1_event_loop->MakeFetcher<examples::Ping>("/test");
1308 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1309 pi2_event_loop->MakeFetcher<examples::Ping>("/test");
1310
1311 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
1312 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1313 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
1314 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1315
1316 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1317 pi2_event_loop->MakeFetcher<examples::Pong>("/test");
1318 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1319 pi1_event_loop->MakeFetcher<examples::Pong>("/test");
1320
1321 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
1322 pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
1323 const size_t ping_timestamp_channel = configuration::ChannelIndex(
1324 pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
1325
1326 const size_t pi2_timestamp_channel = configuration::ChannelIndex(
1327 pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
1328 const size_t pong_timestamp_channel = configuration::ChannelIndex(
1329 pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
1330
1331 const chrono::nanoseconds network_delay = event_loop_factory_.network_delay();
1332 const chrono::nanoseconds send_delay = event_loop_factory_.send_delay();
1333
1334 for (std::pair<int, std::string> channel :
1335 shared()
1336 ? std::vector<
1337 std::pair<int, std::string>>{{-1,
1338 "/aos/remote_timestamps/pi2"}}
1339 : std::vector<std::pair<int, std::string>>{
1340 {pi1_timestamp_channel,
1341 "/aos/remote_timestamps/pi2/pi1/aos/"
1342 "aos-message_bridge-Timestamp"},
1343 {ping_timestamp_channel,
1344 "/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
1345 pi1_event_loop->MakeWatcher(
1346 channel.second,
1347 [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
1348 ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
1349 &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
1350 &ping_on_pi2_fetcher, network_delay, send_delay,
1351 channel_index = channel.first](const RemoteMessage &header) {
1352 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1353 chrono::nanoseconds(header.monotonic_sent_time()));
1354 const aos::realtime_clock::time_point header_realtime_sent_time(
1355 chrono::nanoseconds(header.realtime_sent_time()));
1356 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1357 chrono::nanoseconds(header.monotonic_remote_time()));
1358 const aos::realtime_clock::time_point header_realtime_remote_time(
1359 chrono::nanoseconds(header.realtime_remote_time()));
1360
1361 if (channel_index != -1) {
1362 ASSERT_EQ(channel_index, header.channel_index());
1363 }
1364
1365 const Context *pi1_context = nullptr;
1366 const Context *pi2_context = nullptr;
1367
1368 if (header.channel_index() == pi1_timestamp_channel) {
1369 ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
1370 ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
1371 pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
1372 pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
1373 } else if (header.channel_index() == ping_timestamp_channel) {
1374 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
1375 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
1376 pi1_context = &ping_on_pi1_fetcher.context();
1377 pi2_context = &ping_on_pi2_fetcher.context();
1378 } else {
1379 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1380 << configuration::CleanedChannelToString(
1381 pi1_event_loop->configuration()->channels()->Get(
1382 header.channel_index()));
1383 }
1384
1385 ASSERT_TRUE(header.has_boot_uuid());
1386 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1387 pi2_event_loop->boot_uuid());
1388
1389 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
1390 EXPECT_EQ(pi2_context->remote_queue_index,
1391 header.remote_queue_index());
1392 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
1393
1394 EXPECT_EQ(pi2_context->monotonic_event_time,
1395 header_monotonic_sent_time);
1396 EXPECT_EQ(pi2_context->realtime_event_time,
1397 header_realtime_sent_time);
1398 EXPECT_EQ(pi2_context->realtime_remote_time,
1399 header_realtime_remote_time);
1400 EXPECT_EQ(pi2_context->monotonic_remote_time,
1401 header_monotonic_remote_time);
1402
1403 EXPECT_EQ(pi1_context->realtime_event_time,
1404 header_realtime_remote_time);
1405 EXPECT_EQ(pi1_context->monotonic_event_time,
1406 header_monotonic_remote_time);
1407
1408 // Time estimation isn't perfect, but we know the clocks were
1409 // identical when logged, so we know when this should have come back.
1410 // Confirm we got it when we expected.
1411 EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
1412 pi1_context->monotonic_event_time + 2 * network_delay +
1413 send_delay);
1414 });
1415 }
1416 for (std::pair<int, std::string> channel :
1417 shared()
1418 ? std::vector<
1419 std::pair<int, std::string>>{{-1,
1420 "/aos/remote_timestamps/pi1"}}
1421 : std::vector<std::pair<int, std::string>>{
1422 {pi2_timestamp_channel,
1423 "/aos/remote_timestamps/pi1/pi2/aos/"
1424 "aos-message_bridge-Timestamp"}}) {
1425 pi2_event_loop->MakeWatcher(
1426 channel.second,
1427 [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
1428 pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
1429 &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
1430 &pong_on_pi1_fetcher, network_delay, send_delay,
1431 channel_index = channel.first](const RemoteMessage &header) {
1432 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1433 chrono::nanoseconds(header.monotonic_sent_time()));
1434 const aos::realtime_clock::time_point header_realtime_sent_time(
1435 chrono::nanoseconds(header.realtime_sent_time()));
1436 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1437 chrono::nanoseconds(header.monotonic_remote_time()));
1438 const aos::realtime_clock::time_point header_realtime_remote_time(
1439 chrono::nanoseconds(header.realtime_remote_time()));
1440
1441 if (channel_index != -1) {
1442 ASSERT_EQ(channel_index, header.channel_index());
1443 }
1444
1445 const Context *pi2_context = nullptr;
1446 const Context *pi1_context = nullptr;
1447
1448 if (header.channel_index() == pi2_timestamp_channel) {
1449 ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
1450 ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
1451 pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
1452 pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
1453 } else if (header.channel_index() == pong_timestamp_channel) {
1454 ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
1455 ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
1456 pi2_context = &pong_on_pi2_fetcher.context();
1457 pi1_context = &pong_on_pi1_fetcher.context();
1458 } else {
1459 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1460 << configuration::CleanedChannelToString(
1461 pi2_event_loop->configuration()->channels()->Get(
1462 header.channel_index()));
1463 }
1464
1465 ASSERT_TRUE(header.has_boot_uuid());
1466 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1467 pi1_event_loop->boot_uuid());
1468
1469 EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
1470 EXPECT_EQ(pi1_context->remote_queue_index,
1471 header.remote_queue_index());
1472 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
1473
1474 EXPECT_EQ(pi1_context->monotonic_event_time,
1475 header_monotonic_sent_time);
1476 EXPECT_EQ(pi1_context->realtime_event_time,
1477 header_realtime_sent_time);
1478 EXPECT_EQ(pi1_context->realtime_remote_time,
1479 header_realtime_remote_time);
1480 EXPECT_EQ(pi1_context->monotonic_remote_time,
1481 header_monotonic_remote_time);
1482
1483 EXPECT_EQ(pi2_context->realtime_event_time,
1484 header_realtime_remote_time);
1485 EXPECT_EQ(pi2_context->monotonic_event_time,
1486 header_monotonic_remote_time);
1487
1488 // Time estimation isn't perfect, but we know the clocks were
1489 // identical when logged, so we know when this should have come back.
1490 // Confirm we got it when we expected.
1491 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
1492 pi2_context->monotonic_event_time + 2 * network_delay +
1493 send_delay);
1494 });
1495 }
1496
1497 // And confirm we can re-create a log again, while checking the contents.
1498 {
1499 LoggerState pi1_logger = MakeLogger(
1500 log_reader_factory.GetNodeEventLoopFactory("pi1"), &log_reader_factory);
1501 LoggerState pi2_logger = MakeLogger(
1502 log_reader_factory.GetNodeEventLoopFactory("pi2"), &log_reader_factory);
1503
1504 StartLogger(&pi1_logger, tmp_dir_ + "/relogged1");
1505 StartLogger(&pi2_logger, tmp_dir_ + "/relogged2");
1506
1507 log_reader_factory.Run();
1508 }
1509
1510 reader.Deregister();
1511
1512 // And verify that we can run the LogReader over the relogged files without
1513 // hitting any fatal errors.
1514 {
1515 LogReader relogged_reader(SortParts(MakeLogFiles(
1516 tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 3, 3, true)));
1517 relogged_reader.Register();
1518
1519 relogged_reader.event_loop_factory()->Run();
1520 }
1521 // And confirm that we can read the logged file using the reader's
1522 // configuration.
1523 {
1524 LogReader relogged_reader(
1525 SortParts(MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2",
1526 3, 3, true)),
1527 reader.configuration());
1528 relogged_reader.Register();
1529
1530 relogged_reader.event_loop_factory()->Run();
1531 }
1532}
1533
1534// Tests that we properly populate and extract the logger_start time by setting
1535// up a clock difference between 2 nodes and looking at the resulting parts.
1536TEST_P(MultinodeLoggerTest, LoggerStartTime) {
1537 std::vector<std::string> actual_filenames;
1538 time_converter_.AddMonotonic(
1539 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1540 {
1541 LoggerState pi1_logger = MakeLogger(pi1_);
1542 LoggerState pi2_logger = MakeLogger(pi2_);
1543
1544 StartLogger(&pi1_logger);
1545 StartLogger(&pi2_logger);
1546
1547 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1548
1549 pi1_logger.AppendAllFilenames(&actual_filenames);
1550 pi2_logger.AppendAllFilenames(&actual_filenames);
1551 }
1552
1553 ASSERT_THAT(actual_filenames,
1554 ::testing::UnorderedElementsAreArray(logfiles_));
1555
1556 for (const LogFile &log_file : SortParts(logfiles_)) {
1557 for (const LogParts &log_part : log_file.parts) {
1558 if (log_part.node == log_file.logger_node) {
1559 EXPECT_EQ(log_part.logger_monotonic_start_time,
1560 aos::monotonic_clock::min_time);
1561 EXPECT_EQ(log_part.logger_realtime_start_time,
1562 aos::realtime_clock::min_time);
1563 } else {
1564 const chrono::seconds offset = log_file.logger_node == "pi1"
1565 ? -chrono::seconds(1000)
1566 : chrono::seconds(1000);
1567 EXPECT_EQ(log_part.logger_monotonic_start_time,
1568 log_part.monotonic_start_time + offset);
1569 EXPECT_EQ(log_part.logger_realtime_start_time,
1570 log_file.realtime_start_time +
1571 (log_part.logger_monotonic_start_time -
1572 log_file.monotonic_start_time));
1573 }
1574 }
1575 }
1576}
1577
1578// Test that renaming the base, renames the folder.
1579TEST_P(MultinodeLoggerTest, LoggerRenameFolder) {
1580 util::UnlinkRecursive(tmp_dir_ + "/renamefolder");
1581 util::UnlinkRecursive(tmp_dir_ + "/new-good");
1582 time_converter_.AddMonotonic(
1583 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1584 logfile_base1_ = tmp_dir_ + "/renamefolder/multi_logfile1";
1585 logfile_base2_ = tmp_dir_ + "/renamefolder/multi_logfile2";
1586 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
1587 LoggerState pi1_logger = MakeLogger(pi1_);
1588 LoggerState pi2_logger = MakeLogger(pi2_);
1589
1590 StartLogger(&pi1_logger);
1591 StartLogger(&pi2_logger);
1592
1593 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1594 logfile_base1_ = tmp_dir_ + "/new-good/multi_logfile1";
1595 logfile_base2_ = tmp_dir_ + "/new-good/multi_logfile2";
1596 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
1597 ASSERT_TRUE(pi1_logger.logger->RenameLogBase(logfile_base1_));
1598 ASSERT_TRUE(pi2_logger.logger->RenameLogBase(logfile_base2_));
1599 for (auto &file : logfiles_) {
1600 struct stat s;
1601 EXPECT_EQ(0, stat(file.c_str(), &s));
1602 }
1603}
1604
1605// Test that renaming the file base dies.
1606TEST_P(MultinodeLoggerDeathTest, LoggerRenameFile) {
1607 time_converter_.AddMonotonic(
1608 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1609 util::UnlinkRecursive(tmp_dir_ + "/renamefile");
1610 logfile_base1_ = tmp_dir_ + "/renamefile/multi_logfile1";
1611 logfile_base2_ = tmp_dir_ + "/renamefile/multi_logfile2";
1612 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
1613 LoggerState pi1_logger = MakeLogger(pi1_);
1614 StartLogger(&pi1_logger);
1615 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1616 logfile_base1_ = tmp_dir_ + "/new-renamefile/new_multi_logfile1";
1617 EXPECT_DEATH({ pi1_logger.logger->RenameLogBase(logfile_base1_); },
1618 "Rename of file base from");
1619}
1620
1621// TODO(austin): We can write a test which recreates a logfile and confirms that
1622// we get it back. That is the ultimate test.
1623
1624// Tests that we properly recreate forwarded timestamps when replaying a log.
1625// This should be enough that we can then re-run the logger and get a valid log
1626// back.
1627TEST_P(MultinodeLoggerTest, RemoteReboot) {
1628 std::vector<std::string> actual_filenames;
1629
1630 const UUID pi1_boot0 = UUID::Random();
1631 const UUID pi2_boot0 = UUID::Random();
1632 const UUID pi2_boot1 = UUID::Random();
1633 {
1634 CHECK_EQ(pi1_index_, 0u);
1635 CHECK_EQ(pi2_index_, 1u);
1636
1637 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
1638 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
1639 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
1640
1641 time_converter_.AddNextTimestamp(
1642 distributed_clock::epoch(),
1643 {BootTimestamp::epoch(), BootTimestamp::epoch()});
1644 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
1645 time_converter_.AddNextTimestamp(
1646 distributed_clock::epoch() + reboot_time,
1647 {BootTimestamp::epoch() + reboot_time,
1648 BootTimestamp{
1649 .boot = 1,
1650 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
1651 }
1652
1653 {
1654 LoggerState pi1_logger = MakeLogger(pi1_);
1655
1656 event_loop_factory_.RunFor(chrono::milliseconds(95));
1657 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
1658 pi1_boot0);
1659 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
1660 pi2_boot0);
1661
1662 StartLogger(&pi1_logger);
1663
1664 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1665
1666 VLOG(1) << "Reboot now!";
1667
1668 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1669 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
1670 pi1_boot0);
1671 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
1672 pi2_boot1);
1673
1674 pi1_logger.AppendAllFilenames(&actual_filenames);
1675 }
1676
1677 std::sort(actual_filenames.begin(), actual_filenames.end());
1678 std::sort(pi1_reboot_logfiles_.begin(), pi1_reboot_logfiles_.end());
1679 ASSERT_THAT(actual_filenames,
1680 ::testing::UnorderedElementsAreArray(pi1_reboot_logfiles_));
1681
1682 // Confirm that our new oldest timestamps properly update as we reboot and
1683 // rotate.
1684 for (const std::string &file : pi1_reboot_logfiles_) {
1685 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
1686 ReadHeader(file);
1687 CHECK(log_header);
1688 if (log_header->message().has_configuration()) {
1689 continue;
1690 }
1691
1692 const monotonic_clock::time_point monotonic_start_time =
1693 monotonic_clock::time_point(
1694 chrono::nanoseconds(log_header->message().monotonic_start_time()));
1695 const UUID source_node_boot_uuid = UUID::FromString(
1696 log_header->message().source_node_boot_uuid()->string_view());
1697
1698 if (log_header->message().node()->name()->string_view() != "pi1") {
1699 // The remote message channel should rotate later and have more parts.
1700 // This only is true on the log files with shared remote messages.
1701 //
1702 // TODO(austin): I'm not the most thrilled with this test pattern... It
1703 // feels brittle in a different way.
1704 if (file.find("aos.message_bridge.RemoteMessage") == std::string::npos ||
1705 !shared()) {
1706 switch (log_header->message().parts_index()) {
1707 case 0:
1708 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1709 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
1710 break;
1711 case 1:
1712 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1713 ASSERT_EQ(monotonic_start_time,
1714 monotonic_clock::epoch() + chrono::seconds(1));
1715 break;
1716 case 2:
1717 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1718 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
1719 break;
1720 case 3:
1721 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1722 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
1723 chrono::nanoseconds(2322999462))
1724 << " on " << file;
1725 break;
1726 default:
1727 FAIL();
1728 break;
1729 }
1730 } else {
1731 switch (log_header->message().parts_index()) {
1732 case 0:
1733 case 1:
1734 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1735 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
1736 break;
1737 case 2:
1738 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1739 ASSERT_EQ(monotonic_start_time,
1740 monotonic_clock::epoch() + chrono::seconds(1));
1741 break;
1742 case 3:
1743 case 4:
1744 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1745 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
1746 break;
1747 case 5:
1748 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1749 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
1750 chrono::nanoseconds(2322999462))
1751 << " on " << file;
1752 break;
1753 default:
1754 FAIL();
1755 break;
1756 }
1757 }
1758 continue;
1759 }
1760 SCOPED_TRACE(file);
1761 SCOPED_TRACE(aos::FlatbufferToJson(
1762 *log_header, {.multi_line = true, .max_vector_size = 100}));
1763 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
1764 ASSERT_EQ(
1765 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
1766 EXPECT_EQ(
1767 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
1768 monotonic_clock::max_time.time_since_epoch().count());
1769 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
1770 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
1771 2u);
1772 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
1773 monotonic_clock::max_time.time_since_epoch().count());
1774 ASSERT_TRUE(log_header->message()
1775 .has_oldest_remote_unreliable_monotonic_timestamps());
1776 ASSERT_EQ(log_header->message()
1777 .oldest_remote_unreliable_monotonic_timestamps()
1778 ->size(),
1779 2u);
1780 EXPECT_EQ(log_header->message()
1781 .oldest_remote_unreliable_monotonic_timestamps()
1782 ->Get(0),
1783 monotonic_clock::max_time.time_since_epoch().count());
1784 ASSERT_TRUE(log_header->message()
1785 .has_oldest_local_unreliable_monotonic_timestamps());
1786 ASSERT_EQ(log_header->message()
1787 .oldest_local_unreliable_monotonic_timestamps()
1788 ->size(),
1789 2u);
1790 EXPECT_EQ(log_header->message()
1791 .oldest_local_unreliable_monotonic_timestamps()
1792 ->Get(0),
1793 monotonic_clock::max_time.time_since_epoch().count());
1794
1795 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
1796 monotonic_clock::time_point(chrono::nanoseconds(
1797 log_header->message().oldest_remote_monotonic_timestamps()->Get(
1798 1)));
1799 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
1800 monotonic_clock::time_point(chrono::nanoseconds(
1801 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
1802 const monotonic_clock::time_point
1803 oldest_remote_unreliable_monotonic_timestamps =
1804 monotonic_clock::time_point(chrono::nanoseconds(
1805 log_header->message()
1806 .oldest_remote_unreliable_monotonic_timestamps()
1807 ->Get(1)));
1808 const monotonic_clock::time_point
1809 oldest_local_unreliable_monotonic_timestamps =
1810 monotonic_clock::time_point(chrono::nanoseconds(
1811 log_header->message()
1812 .oldest_local_unreliable_monotonic_timestamps()
1813 ->Get(1)));
1814 const monotonic_clock::time_point
1815 oldest_remote_reliable_monotonic_timestamps =
1816 monotonic_clock::time_point(chrono::nanoseconds(
1817 log_header->message()
1818 .oldest_remote_reliable_monotonic_timestamps()
1819 ->Get(1)));
1820 const monotonic_clock::time_point
1821 oldest_local_reliable_monotonic_timestamps =
1822 monotonic_clock::time_point(chrono::nanoseconds(
1823 log_header->message()
1824 .oldest_local_reliable_monotonic_timestamps()
1825 ->Get(1)));
1826 const monotonic_clock::time_point
1827 oldest_logger_remote_unreliable_monotonic_timestamps =
1828 monotonic_clock::time_point(chrono::nanoseconds(
1829 log_header->message()
1830 .oldest_logger_remote_unreliable_monotonic_timestamps()
1831 ->Get(0)));
1832 const monotonic_clock::time_point
1833 oldest_logger_local_unreliable_monotonic_timestamps =
1834 monotonic_clock::time_point(chrono::nanoseconds(
1835 log_header->message()
1836 .oldest_logger_local_unreliable_monotonic_timestamps()
1837 ->Get(0)));
1838 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
1839 monotonic_clock::max_time);
1840 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
1841 monotonic_clock::max_time);
1842 switch (log_header->message().parts_index()) {
1843 case 0:
1844 EXPECT_EQ(oldest_remote_monotonic_timestamps,
1845 monotonic_clock::max_time);
1846 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
1847 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
1848 monotonic_clock::max_time);
1849 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
1850 monotonic_clock::max_time);
1851 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
1852 monotonic_clock::max_time);
1853 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
1854 monotonic_clock::max_time);
1855 break;
1856 case 1:
1857 EXPECT_EQ(oldest_remote_monotonic_timestamps,
1858 monotonic_clock::time_point(chrono::microseconds(90200)));
1859 EXPECT_EQ(oldest_local_monotonic_timestamps,
1860 monotonic_clock::time_point(chrono::microseconds(90350)));
1861 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
1862 monotonic_clock::time_point(chrono::microseconds(90200)));
1863 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
1864 monotonic_clock::time_point(chrono::microseconds(90350)));
1865 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
1866 monotonic_clock::max_time);
1867 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
1868 monotonic_clock::max_time);
1869 break;
1870 case 2:
1871 EXPECT_EQ(oldest_remote_monotonic_timestamps,
1872 monotonic_clock::time_point(chrono::microseconds(90200)))
1873 << file;
1874 EXPECT_EQ(oldest_local_monotonic_timestamps,
1875 monotonic_clock::time_point(chrono::microseconds(90350)))
1876 << file;
1877 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
1878 monotonic_clock::time_point(chrono::microseconds(90200)))
1879 << file;
1880 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
1881 monotonic_clock::time_point(chrono::microseconds(90350)))
1882 << file;
1883 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
1884 monotonic_clock::time_point(chrono::microseconds(100000)))
1885 << file;
1886 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
1887 monotonic_clock::time_point(chrono::microseconds(100150)))
1888 << file;
1889 break;
1890 case 3:
1891 EXPECT_EQ(oldest_remote_monotonic_timestamps,
1892 monotonic_clock::time_point(chrono::milliseconds(1323) +
1893 chrono::microseconds(200)));
1894 EXPECT_EQ(oldest_local_monotonic_timestamps,
1895 monotonic_clock::time_point(chrono::microseconds(10100350)));
1896 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
1897 monotonic_clock::time_point(chrono::milliseconds(1323) +
1898 chrono::microseconds(200)));
1899 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
1900 monotonic_clock::time_point(chrono::microseconds(10100350)));
1901 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
1902 monotonic_clock::max_time)
1903 << file;
1904 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
1905 monotonic_clock::max_time)
1906 << file;
1907 break;
1908 case 4:
1909 EXPECT_EQ(oldest_remote_monotonic_timestamps,
1910 monotonic_clock::time_point(chrono::milliseconds(1323) +
1911 chrono::microseconds(200)));
1912 EXPECT_EQ(oldest_local_monotonic_timestamps,
1913 monotonic_clock::time_point(chrono::microseconds(10100350)));
1914 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
1915 monotonic_clock::time_point(chrono::milliseconds(1323) +
1916 chrono::microseconds(200)));
1917 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
1918 monotonic_clock::time_point(chrono::microseconds(10100350)));
1919 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
1920 monotonic_clock::time_point(chrono::microseconds(1423000)))
1921 << file;
1922 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
1923 monotonic_clock::time_point(chrono::microseconds(10200150)))
1924 << file;
1925 break;
1926 default:
1927 FAIL();
1928 break;
1929 }
1930 }
1931
1932 // Confirm that we refuse to replay logs with missing boot uuids.
1933 {
1934 LogReader reader(SortParts(pi1_reboot_logfiles_));
1935
1936 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1937 log_reader_factory.set_send_delay(chrono::microseconds(0));
1938
1939 // This sends out the fetched messages and advances time to the start of
1940 // the log file.
1941 reader.Register(&log_reader_factory);
1942
1943 log_reader_factory.Run();
1944
1945 reader.Deregister();
1946 }
1947}
1948
1949// Tests that we can sort a log which only has timestamps from the remote
1950// because the local message_bridge_client failed to connect.
1951TEST_P(MultinodeLoggerTest, RemoteRebootOnlyTimestamps) {
1952 const UUID pi1_boot0 = UUID::Random();
1953 const UUID pi2_boot0 = UUID::Random();
1954 const UUID pi2_boot1 = UUID::Random();
1955 {
1956 CHECK_EQ(pi1_index_, 0u);
1957 CHECK_EQ(pi2_index_, 1u);
1958
1959 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
1960 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
1961 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
1962
1963 time_converter_.AddNextTimestamp(
1964 distributed_clock::epoch(),
1965 {BootTimestamp::epoch(), BootTimestamp::epoch()});
1966 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
1967 time_converter_.AddNextTimestamp(
1968 distributed_clock::epoch() + reboot_time,
1969 {BootTimestamp::epoch() + reboot_time,
1970 BootTimestamp{
1971 .boot = 1,
1972 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
1973 }
1974 pi2_->Disconnect(pi1_->node());
1975
1976 std::vector<std::string> filenames;
1977 {
1978 LoggerState pi1_logger = MakeLogger(pi1_);
1979
1980 event_loop_factory_.RunFor(chrono::milliseconds(95));
1981 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
1982 pi1_boot0);
1983 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
1984 pi2_boot0);
1985
1986 StartLogger(&pi1_logger);
1987
1988 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1989
1990 VLOG(1) << "Reboot now!";
1991
1992 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1993 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
1994 pi1_boot0);
1995 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
1996 pi2_boot1);
1997 pi1_logger.AppendAllFilenames(&filenames);
1998 }
1999
2000 std::sort(filenames.begin(), filenames.end());
2001
2002 // Confirm that our new oldest timestamps properly update as we reboot and
2003 // rotate.
2004 size_t timestamp_file_count = 0;
2005 for (const std::string &file : filenames) {
2006 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2007 ReadHeader(file);
2008 CHECK(log_header);
2009
2010 if (log_header->message().has_configuration()) {
2011 continue;
2012 }
2013
2014 const monotonic_clock::time_point monotonic_start_time =
2015 monotonic_clock::time_point(
2016 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2017 const UUID source_node_boot_uuid = UUID::FromString(
2018 log_header->message().source_node_boot_uuid()->string_view());
2019
2020 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2021 ASSERT_EQ(
2022 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2023 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2024 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2025 2u);
2026 ASSERT_TRUE(log_header->message()
2027 .has_oldest_remote_unreliable_monotonic_timestamps());
2028 ASSERT_EQ(log_header->message()
2029 .oldest_remote_unreliable_monotonic_timestamps()
2030 ->size(),
2031 2u);
2032 ASSERT_TRUE(log_header->message()
2033 .has_oldest_local_unreliable_monotonic_timestamps());
2034 ASSERT_EQ(log_header->message()
2035 .oldest_local_unreliable_monotonic_timestamps()
2036 ->size(),
2037 2u);
2038 ASSERT_TRUE(log_header->message()
2039 .has_oldest_remote_reliable_monotonic_timestamps());
2040 ASSERT_EQ(log_header->message()
2041 .oldest_remote_reliable_monotonic_timestamps()
2042 ->size(),
2043 2u);
2044 ASSERT_TRUE(
2045 log_header->message().has_oldest_local_reliable_monotonic_timestamps());
2046 ASSERT_EQ(log_header->message()
2047 .oldest_local_reliable_monotonic_timestamps()
2048 ->size(),
2049 2u);
2050
2051 ASSERT_TRUE(
2052 log_header->message()
2053 .has_oldest_logger_remote_unreliable_monotonic_timestamps());
2054 ASSERT_EQ(log_header->message()
2055 .oldest_logger_remote_unreliable_monotonic_timestamps()
2056 ->size(),
2057 2u);
2058 ASSERT_TRUE(log_header->message()
2059 .has_oldest_logger_local_unreliable_monotonic_timestamps());
2060 ASSERT_EQ(log_header->message()
2061 .oldest_logger_local_unreliable_monotonic_timestamps()
2062 ->size(),
2063 2u);
2064
2065 if (log_header->message().node()->name()->string_view() != "pi1") {
2066 ASSERT_TRUE(file.find("aos.message_bridge.RemoteMessage") !=
2067 std::string::npos);
2068
2069 const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
2070 ReadNthMessage(file, 0);
2071 CHECK(msg);
2072
2073 EXPECT_TRUE(msg->message().has_monotonic_sent_time());
2074 EXPECT_TRUE(msg->message().has_monotonic_remote_time());
2075
2076 const monotonic_clock::time_point
2077 expected_oldest_local_monotonic_timestamps(
2078 chrono::nanoseconds(msg->message().monotonic_sent_time()));
2079 const monotonic_clock::time_point
2080 expected_oldest_remote_monotonic_timestamps(
2081 chrono::nanoseconds(msg->message().monotonic_remote_time()));
2082 const monotonic_clock::time_point
2083 expected_oldest_timestamp_monotonic_timestamps(
2084 chrono::nanoseconds(msg->message().monotonic_timestamp_time()));
2085
2086 EXPECT_NE(expected_oldest_local_monotonic_timestamps,
2087 monotonic_clock::min_time);
2088 EXPECT_NE(expected_oldest_remote_monotonic_timestamps,
2089 monotonic_clock::min_time);
2090 EXPECT_NE(expected_oldest_timestamp_monotonic_timestamps,
2091 monotonic_clock::min_time);
2092
2093 ++timestamp_file_count;
2094 // Since the log file is from the perspective of the other node,
2095 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2096 monotonic_clock::time_point(chrono::nanoseconds(
2097 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2098 0)));
2099 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2100 monotonic_clock::time_point(chrono::nanoseconds(
2101 log_header->message().oldest_local_monotonic_timestamps()->Get(
2102 0)));
2103 const monotonic_clock::time_point
2104 oldest_remote_unreliable_monotonic_timestamps =
2105 monotonic_clock::time_point(chrono::nanoseconds(
2106 log_header->message()
2107 .oldest_remote_unreliable_monotonic_timestamps()
2108 ->Get(0)));
2109 const monotonic_clock::time_point
2110 oldest_local_unreliable_monotonic_timestamps =
2111 monotonic_clock::time_point(chrono::nanoseconds(
2112 log_header->message()
2113 .oldest_local_unreliable_monotonic_timestamps()
2114 ->Get(0)));
2115 const monotonic_clock::time_point
2116 oldest_remote_reliable_monotonic_timestamps =
2117 monotonic_clock::time_point(chrono::nanoseconds(
2118 log_header->message()
2119 .oldest_remote_reliable_monotonic_timestamps()
2120 ->Get(0)));
2121 const monotonic_clock::time_point
2122 oldest_local_reliable_monotonic_timestamps =
2123 monotonic_clock::time_point(chrono::nanoseconds(
2124 log_header->message()
2125 .oldest_local_reliable_monotonic_timestamps()
2126 ->Get(0)));
2127 const monotonic_clock::time_point
2128 oldest_logger_remote_unreliable_monotonic_timestamps =
2129 monotonic_clock::time_point(chrono::nanoseconds(
2130 log_header->message()
2131 .oldest_logger_remote_unreliable_monotonic_timestamps()
2132 ->Get(1)));
2133 const monotonic_clock::time_point
2134 oldest_logger_local_unreliable_monotonic_timestamps =
2135 monotonic_clock::time_point(chrono::nanoseconds(
2136 log_header->message()
2137 .oldest_logger_local_unreliable_monotonic_timestamps()
2138 ->Get(1)));
2139
2140 const Channel *channel =
2141 event_loop_factory_.configuration()->channels()->Get(
2142 msg->message().channel_index());
2143 const Connection *connection = configuration::ConnectionToNode(
2144 channel, configuration::GetNode(
2145 event_loop_factory_.configuration(),
2146 log_header->message().node()->name()->string_view()));
2147
2148 const bool reliable = connection->time_to_live() == 0;
2149
2150 SCOPED_TRACE(file);
2151 SCOPED_TRACE(aos::FlatbufferToJson(
2152 *log_header, {.multi_line = true, .max_vector_size = 100}));
2153
2154 if (shared()) {
2155 // Confirm that the oldest timestamps match what we expect. Based on
2156 // what we are doing, we know that the oldest time is the first
2157 // message's time.
2158 //
2159 // This makes the test robust to both the split and combined config
2160 // tests.
2161 switch (log_header->message().parts_index()) {
2162 case 0:
2163 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2164 expected_oldest_remote_monotonic_timestamps);
2165 EXPECT_EQ(oldest_local_monotonic_timestamps,
2166 expected_oldest_local_monotonic_timestamps);
2167 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2168 expected_oldest_local_monotonic_timestamps)
2169 << file;
2170 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2171 expected_oldest_timestamp_monotonic_timestamps)
2172 << file;
2173
2174 if (reliable) {
2175 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2176 expected_oldest_remote_monotonic_timestamps);
2177 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2178 expected_oldest_local_monotonic_timestamps);
2179 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2180 monotonic_clock::max_time);
2181 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2182 monotonic_clock::max_time);
2183 } else {
2184 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2185 monotonic_clock::max_time);
2186 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2187 monotonic_clock::max_time);
2188 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2189 expected_oldest_remote_monotonic_timestamps);
2190 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2191 expected_oldest_local_monotonic_timestamps);
2192 }
2193 break;
2194 case 1:
2195 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2196 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2197 EXPECT_EQ(oldest_local_monotonic_timestamps,
2198 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2199 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2200 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2201 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2202 monotonic_clock::epoch() + chrono::nanoseconds(90250000));
2203 if (reliable) {
2204 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2205 expected_oldest_remote_monotonic_timestamps);
2206 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2207 expected_oldest_local_monotonic_timestamps);
2208 EXPECT_EQ(
2209 oldest_remote_unreliable_monotonic_timestamps,
2210 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2211 EXPECT_EQ(
2212 oldest_local_unreliable_monotonic_timestamps,
2213 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2214 } else {
2215 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2216 monotonic_clock::max_time);
2217 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2218 monotonic_clock::max_time);
2219 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2220 expected_oldest_remote_monotonic_timestamps);
2221 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2222 expected_oldest_local_monotonic_timestamps);
2223 }
2224 break;
2225 case 2:
2226 EXPECT_EQ(
2227 oldest_remote_monotonic_timestamps,
2228 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2229 EXPECT_EQ(
2230 oldest_local_monotonic_timestamps,
2231 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2232 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2233 expected_oldest_local_monotonic_timestamps)
2234 << file;
2235 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2236 expected_oldest_timestamp_monotonic_timestamps)
2237 << file;
2238 if (reliable) {
2239 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2240 expected_oldest_remote_monotonic_timestamps);
2241 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2242 expected_oldest_local_monotonic_timestamps);
2243 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2244 monotonic_clock::max_time);
2245 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2246 monotonic_clock::max_time);
2247 } else {
2248 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2249 monotonic_clock::max_time);
2250 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2251 monotonic_clock::max_time);
2252 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2253 expected_oldest_remote_monotonic_timestamps);
2254 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2255 expected_oldest_local_monotonic_timestamps);
2256 }
2257 break;
2258
2259 case 3:
2260 EXPECT_EQ(
2261 oldest_remote_monotonic_timestamps,
2262 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2263 EXPECT_EQ(
2264 oldest_local_monotonic_timestamps,
2265 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2266 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2267 expected_oldest_remote_monotonic_timestamps);
2268 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2269 expected_oldest_local_monotonic_timestamps);
2270 EXPECT_EQ(
2271 oldest_logger_remote_unreliable_monotonic_timestamps,
2272 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2273 EXPECT_EQ(
2274 oldest_logger_local_unreliable_monotonic_timestamps,
2275 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2276 break;
2277 default:
2278 FAIL();
2279 break;
2280 }
2281
2282 switch (log_header->message().parts_index()) {
2283 case 0:
2284 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2285 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2286 break;
2287 case 1:
2288 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2289 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2290 break;
2291 case 2:
2292 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2293 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2294 break;
2295 case 3:
2296 if (shared()) {
2297 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2298 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2299 break;
2300 }
2301 [[fallthrough]];
2302 default:
2303 FAIL();
2304 break;
2305 }
2306 } else {
2307 switch (log_header->message().parts_index()) {
2308 case 0:
2309 if (reliable) {
2310 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2311 monotonic_clock::max_time);
2312 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2313 monotonic_clock::max_time);
2314 EXPECT_EQ(
2315 oldest_logger_remote_unreliable_monotonic_timestamps,
2316 monotonic_clock::epoch() + chrono::nanoseconds(100150000))
2317 << file;
2318 EXPECT_EQ(
2319 oldest_logger_local_unreliable_monotonic_timestamps,
2320 monotonic_clock::epoch() + chrono::nanoseconds(100250000))
2321 << file;
2322 } else {
2323 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2324 expected_oldest_remote_monotonic_timestamps);
2325 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2326 expected_oldest_local_monotonic_timestamps);
2327 EXPECT_EQ(
2328 oldest_logger_remote_unreliable_monotonic_timestamps,
2329 monotonic_clock::epoch() + chrono::nanoseconds(90150000))
2330 << file;
2331 EXPECT_EQ(
2332 oldest_logger_local_unreliable_monotonic_timestamps,
2333 monotonic_clock::epoch() + chrono::nanoseconds(90250000))
2334 << file;
2335 }
2336 break;
2337 case 1:
2338 if (reliable) {
2339 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2340 monotonic_clock::max_time);
2341 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2342 monotonic_clock::max_time);
2343 EXPECT_EQ(
2344 oldest_logger_remote_unreliable_monotonic_timestamps,
2345 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2346 EXPECT_EQ(
2347 oldest_logger_local_unreliable_monotonic_timestamps,
2348 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2349 } else {
2350 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2351 expected_oldest_remote_monotonic_timestamps);
2352 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2353 expected_oldest_local_monotonic_timestamps);
2354 EXPECT_EQ(
2355 oldest_logger_remote_unreliable_monotonic_timestamps,
2356 monotonic_clock::epoch() + chrono::nanoseconds(1323150000));
2357 EXPECT_EQ(
2358 oldest_logger_local_unreliable_monotonic_timestamps,
2359 monotonic_clock::epoch() + chrono::nanoseconds(10100250000));
2360 }
2361 break;
2362 default:
2363 FAIL();
2364 break;
2365 }
2366
2367 switch (log_header->message().parts_index()) {
2368 case 0:
2369 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2370 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2371 break;
2372 case 1:
2373 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2374 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2375 break;
2376 default:
2377 FAIL();
2378 break;
2379 }
2380 }
2381
2382 continue;
2383 }
2384 EXPECT_EQ(
2385 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2386 monotonic_clock::max_time.time_since_epoch().count());
2387 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2388 monotonic_clock::max_time.time_since_epoch().count());
2389 EXPECT_EQ(log_header->message()
2390 .oldest_remote_unreliable_monotonic_timestamps()
2391 ->Get(0),
2392 monotonic_clock::max_time.time_since_epoch().count());
2393 EXPECT_EQ(log_header->message()
2394 .oldest_local_unreliable_monotonic_timestamps()
2395 ->Get(0),
2396 monotonic_clock::max_time.time_since_epoch().count());
2397
2398 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2399 monotonic_clock::time_point(chrono::nanoseconds(
2400 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2401 1)));
2402 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2403 monotonic_clock::time_point(chrono::nanoseconds(
2404 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2405 const monotonic_clock::time_point
2406 oldest_remote_unreliable_monotonic_timestamps =
2407 monotonic_clock::time_point(chrono::nanoseconds(
2408 log_header->message()
2409 .oldest_remote_unreliable_monotonic_timestamps()
2410 ->Get(1)));
2411 const monotonic_clock::time_point
2412 oldest_local_unreliable_monotonic_timestamps =
2413 monotonic_clock::time_point(chrono::nanoseconds(
2414 log_header->message()
2415 .oldest_local_unreliable_monotonic_timestamps()
2416 ->Get(1)));
2417 switch (log_header->message().parts_index()) {
2418 case 0:
2419 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2420 monotonic_clock::max_time);
2421 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2422 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2423 monotonic_clock::max_time);
2424 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2425 monotonic_clock::max_time);
2426 break;
2427 default:
2428 FAIL();
2429 break;
2430 }
2431 }
2432
2433 if (shared()) {
2434 EXPECT_EQ(timestamp_file_count, 4u);
2435 } else {
2436 EXPECT_EQ(timestamp_file_count, 4u);
2437 }
2438
2439 // Confirm that we can actually sort the resulting log and read it.
2440 {
2441 LogReader reader(SortParts(filenames));
2442
2443 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2444 log_reader_factory.set_send_delay(chrono::microseconds(0));
2445
2446 // This sends out the fetched messages and advances time to the start of
2447 // the log file.
2448 reader.Register(&log_reader_factory);
2449
2450 log_reader_factory.Run();
2451
2452 reader.Deregister();
2453 }
2454}
2455
2456// Tests that we properly handle one direction of message_bridge being
2457// unavailable.
2458TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
2459 pi1_->Disconnect(pi2_->node());
2460 time_converter_.AddMonotonic(
2461 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2462
2463 time_converter_.AddMonotonic(
2464 {chrono::milliseconds(10000),
2465 chrono::milliseconds(10000) - chrono::milliseconds(1)});
2466 {
2467 LoggerState pi1_logger = MakeLogger(pi1_);
2468
2469 event_loop_factory_.RunFor(chrono::milliseconds(95));
2470
2471 StartLogger(&pi1_logger);
2472
2473 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2474 }
2475
2476 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2477 // to confirm the right thing happened.
2478 ConfirmReadable(pi1_single_direction_logfiles_);
2479}
2480
2481// Tests that we properly handle one direction of message_bridge being
2482// unavailable.
2483TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
2484 pi1_->Disconnect(pi2_->node());
2485 time_converter_.AddMonotonic(
2486 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(500)});
2487
2488 time_converter_.AddMonotonic(
2489 {chrono::milliseconds(10000),
2490 chrono::milliseconds(10000) + chrono::milliseconds(1)});
2491 {
2492 LoggerState pi1_logger = MakeLogger(pi1_);
2493
2494 event_loop_factory_.RunFor(chrono::milliseconds(95));
2495
2496 StartLogger(&pi1_logger);
2497
2498 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2499 }
2500
2501 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2502 // to confirm the right thing happened.
2503 ConfirmReadable(pi1_single_direction_logfiles_);
2504}
2505
2506// Tests that we explode if someone passes in a part file twice with a better
2507// error than an out of order error.
2508TEST_P(MultinodeLoggerTest, DuplicateLogFiles) {
2509 time_converter_.AddMonotonic(
2510 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2511 {
2512 LoggerState pi1_logger = MakeLogger(pi1_);
2513
2514 event_loop_factory_.RunFor(chrono::milliseconds(95));
2515
2516 StartLogger(&pi1_logger);
2517
2518 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2519 }
2520
2521 std::vector<std::string> duplicates;
2522 for (const std::string &f : pi1_single_direction_logfiles_) {
2523 duplicates.emplace_back(f);
2524 duplicates.emplace_back(f);
2525 }
2526 EXPECT_DEATH({ SortParts(duplicates); }, "Found duplicate parts in");
2527}
2528
2529// Tests that we explode if someone loses a part out of the middle of a log.
2530TEST_P(MultinodeLoggerTest, MissingPartsFromMiddle) {
2531 time_converter_.AddMonotonic(
2532 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2533 {
2534 LoggerState pi1_logger = MakeLogger(pi1_);
2535
2536 event_loop_factory_.RunFor(chrono::milliseconds(95));
2537
2538 StartLogger(&pi1_logger);
2539 aos::monotonic_clock::time_point last_rotation_time =
2540 pi1_logger.event_loop->monotonic_now();
2541 pi1_logger.logger->set_on_logged_period([&] {
2542 const auto now = pi1_logger.event_loop->monotonic_now();
2543 if (now > last_rotation_time + std::chrono::seconds(5)) {
2544 pi1_logger.logger->Rotate();
2545 last_rotation_time = now;
2546 }
2547 });
2548
2549 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2550 }
2551
2552 std::vector<std::string> missing_parts;
2553
2554 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
2555 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part2" + Extension());
2556 missing_parts.emplace_back(absl::StrCat(
2557 logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
2558
2559 EXPECT_DEATH({ SortParts(missing_parts); },
2560 "Broken log, missing part files between");
2561}
2562
2563// Tests that we properly handle a dead node. Do this by just disconnecting it
2564// and only using one nodes of logs.
2565TEST_P(MultinodeLoggerTest, DeadNode) {
2566 pi1_->Disconnect(pi2_->node());
2567 pi2_->Disconnect(pi1_->node());
2568 time_converter_.AddMonotonic(
2569 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2570 {
2571 LoggerState pi1_logger = MakeLogger(pi1_);
2572
2573 event_loop_factory_.RunFor(chrono::milliseconds(95));
2574
2575 StartLogger(&pi1_logger);
2576
2577 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2578 }
2579
2580 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2581 // to confirm the right thing happened.
2582 ConfirmReadable(MakePi1DeadNodeLogfiles());
2583}
2584
2585// Tests that we can relog with a different config. This makes most sense when
2586// you are trying to edit a log and want to use channel renaming + the original
2587// config in the new log.
2588TEST_P(MultinodeLoggerTest, LogDifferentConfig) {
2589 time_converter_.StartEqual();
2590 {
2591 LoggerState pi1_logger = MakeLogger(pi1_);
2592 LoggerState pi2_logger = MakeLogger(pi2_);
2593
2594 event_loop_factory_.RunFor(chrono::milliseconds(95));
2595
2596 StartLogger(&pi1_logger);
2597 StartLogger(&pi2_logger);
2598
2599 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2600 }
2601
2602 LogReader reader(SortParts(logfiles_));
2603 reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
2604
2605 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2606 log_reader_factory.set_send_delay(chrono::microseconds(0));
2607
2608 // This sends out the fetched messages and advances time to the start of the
2609 // log file.
2610 reader.Register(&log_reader_factory);
2611
2612 const Node *pi1 =
2613 configuration::GetNode(log_reader_factory.configuration(), "pi1");
2614 const Node *pi2 =
2615 configuration::GetNode(log_reader_factory.configuration(), "pi2");
2616
2617 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
2618 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
2619 LOG(INFO) << "now pi1 "
2620 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
2621 LOG(INFO) << "now pi2 "
2622 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
2623
2624 EXPECT_THAT(reader.LoggedNodes(),
2625 ::testing::ElementsAre(
2626 configuration::GetNode(reader.logged_configuration(), pi1),
2627 configuration::GetNode(reader.logged_configuration(), pi2)));
2628
2629 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
2630
2631 // And confirm we can re-create a log again, while checking the contents.
2632 std::vector<std::string> log_files;
2633 {
2634 LoggerState pi1_logger =
2635 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
2636 &log_reader_factory, reader.logged_configuration());
2637 LoggerState pi2_logger =
2638 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
2639 &log_reader_factory, reader.logged_configuration());
2640
2641 pi1_logger.StartLogger(tmp_dir_ + "/relogged1");
2642 pi2_logger.StartLogger(tmp_dir_ + "/relogged2");
2643
2644 log_reader_factory.Run();
2645
2646 for (auto &x : pi1_logger.log_namer->all_filenames()) {
2647 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged1_", x));
2648 }
2649 for (auto &x : pi2_logger.log_namer->all_filenames()) {
2650 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged2_", x));
2651 }
2652 }
2653
2654 reader.Deregister();
2655
2656 // And verify that we can run the LogReader over the relogged files without
2657 // hitting any fatal errors.
2658 {
2659 LogReader relogged_reader(SortParts(log_files));
2660 relogged_reader.Register();
2661
2662 relogged_reader.event_loop_factory()->Run();
2663 }
2664}
2665
2666// Tests that we properly replay a log where the start time for a node is before
2667// any data on the node. This can happen if the logger starts before data is
2668// published. While the scenario below is a bit convoluted, we have seen logs
2669// like this generated out in the wild.
2670TEST(MultinodeRebootLoggerTest, StartTimeBeforeData) {
2671 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2672 aos::configuration::ReadConfig(ArtifactPath(
2673 "aos/events/logging/multinode_pingpong_split3_config.json"));
2674 message_bridge::TestingTimeConverter time_converter(
2675 configuration::NodesCount(&config.message()));
2676 SimulatedEventLoopFactory event_loop_factory(&config.message());
2677 event_loop_factory.SetTimeConverter(&time_converter);
2678 NodeEventLoopFactory *const pi1 =
2679 event_loop_factory.GetNodeEventLoopFactory("pi1");
2680 const size_t pi1_index = configuration::GetNodeIndex(
2681 event_loop_factory.configuration(), pi1->node());
2682 NodeEventLoopFactory *const pi2 =
2683 event_loop_factory.GetNodeEventLoopFactory("pi2");
2684 const size_t pi2_index = configuration::GetNodeIndex(
2685 event_loop_factory.configuration(), pi2->node());
2686 NodeEventLoopFactory *const pi3 =
2687 event_loop_factory.GetNodeEventLoopFactory("pi3");
2688 const size_t pi3_index = configuration::GetNodeIndex(
2689 event_loop_factory.configuration(), pi3->node());
2690
2691 const std::string kLogfile1_1 =
2692 aos::testing::TestTmpDir() + "/multi_logfile1/";
2693 const std::string kLogfile2_1 =
2694 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
2695 const std::string kLogfile2_2 =
2696 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
2697 const std::string kLogfile3_1 =
2698 aos::testing::TestTmpDir() + "/multi_logfile3/";
2699 util::UnlinkRecursive(kLogfile1_1);
2700 util::UnlinkRecursive(kLogfile2_1);
2701 util::UnlinkRecursive(kLogfile2_2);
2702 util::UnlinkRecursive(kLogfile3_1);
2703 const UUID pi1_boot0 = UUID::Random();
2704 const UUID pi2_boot0 = UUID::Random();
2705 const UUID pi2_boot1 = UUID::Random();
2706 const UUID pi3_boot0 = UUID::Random();
2707 {
2708 CHECK_EQ(pi1_index, 0u);
2709 CHECK_EQ(pi2_index, 1u);
2710 CHECK_EQ(pi3_index, 2u);
2711
2712 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
2713 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
2714 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
2715 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
2716
2717 time_converter.AddNextTimestamp(
2718 distributed_clock::epoch(),
2719 {BootTimestamp::epoch(), BootTimestamp::epoch(),
2720 BootTimestamp::epoch()});
2721 const chrono::nanoseconds reboot_time = chrono::milliseconds(20000);
2722 time_converter.AddNextTimestamp(
2723 distributed_clock::epoch() + reboot_time,
2724 {BootTimestamp::epoch() + reboot_time,
2725 BootTimestamp{
2726 .boot = 1,
2727 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)},
2728 BootTimestamp::epoch() + reboot_time});
2729 }
2730
2731 // Make everything perfectly quiet.
2732 event_loop_factory.SkipTimingReport();
2733 event_loop_factory.DisableStatistics();
2734
2735 std::vector<std::string> filenames;
2736 {
2737 LoggerState pi1_logger = MakeLoggerState(
2738 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2739 LoggerState pi3_logger = MakeLoggerState(
2740 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2741 {
2742 // And now start the logger.
2743 LoggerState pi2_logger = MakeLoggerState(
2744 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2745
2746 event_loop_factory.RunFor(chrono::milliseconds(1000));
2747
2748 pi1_logger.StartLogger(kLogfile1_1);
2749 pi3_logger.StartLogger(kLogfile3_1);
2750 pi2_logger.StartLogger(kLogfile2_1);
2751
2752 event_loop_factory.RunFor(chrono::milliseconds(10000));
2753
2754 // Now that we've got a start time in the past, turn on data.
2755 event_loop_factory.EnableStatistics();
2756 std::unique_ptr<aos::EventLoop> ping_event_loop =
2757 pi1->MakeEventLoop("ping");
2758 Ping ping(ping_event_loop.get());
2759
2760 pi2->AlwaysStart<Pong>("pong");
2761
2762 event_loop_factory.RunFor(chrono::milliseconds(3000));
2763
2764 pi2_logger.AppendAllFilenames(&filenames);
2765
2766 // Stop logging on pi2 before rebooting and completely shut off all
2767 // messages on pi2.
2768 pi2->DisableStatistics();
2769 pi1->Disconnect(pi2->node());
2770 pi2->Disconnect(pi1->node());
2771 }
2772 event_loop_factory.RunFor(chrono::milliseconds(7000));
2773 // pi2 now reboots.
2774 {
2775 event_loop_factory.RunFor(chrono::milliseconds(1000));
2776
2777 // Start logging again on pi2 after it is up.
2778 LoggerState pi2_logger = MakeLoggerState(
2779 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2780 pi2_logger.StartLogger(kLogfile2_2);
2781
2782 event_loop_factory.RunFor(chrono::milliseconds(10000));
2783 // And, now that we have a start time in the log, turn data back on.
2784 pi2->EnableStatistics();
2785 pi1->Connect(pi2->node());
2786 pi2->Connect(pi1->node());
2787
2788 pi2->AlwaysStart<Pong>("pong");
2789 std::unique_ptr<aos::EventLoop> ping_event_loop =
2790 pi1->MakeEventLoop("ping");
2791 Ping ping(ping_event_loop.get());
2792
2793 event_loop_factory.RunFor(chrono::milliseconds(3000));
2794
2795 pi2_logger.AppendAllFilenames(&filenames);
2796 }
2797
2798 pi1_logger.AppendAllFilenames(&filenames);
2799 pi3_logger.AppendAllFilenames(&filenames);
2800 }
2801
2802 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2803 // to confirm the right thing happened.
2804 const std::vector<LogFile> sorted_parts = SortParts(filenames);
2805 auto result = ConfirmReadable(filenames);
2806 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
2807 chrono::seconds(1)));
2808 EXPECT_THAT(result[0].second,
2809 ::testing::ElementsAre(realtime_clock::epoch() +
2810 chrono::microseconds(34990350)));
2811
2812 EXPECT_THAT(result[1].first,
2813 ::testing::ElementsAre(
2814 realtime_clock::epoch() + chrono::seconds(1),
2815 realtime_clock::epoch() + chrono::microseconds(3323000)));
2816 EXPECT_THAT(result[1].second,
2817 ::testing::ElementsAre(
2818 realtime_clock::epoch() + chrono::microseconds(13990200),
2819 realtime_clock::epoch() + chrono::microseconds(16313200)));
2820
2821 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
2822 chrono::seconds(1)));
2823 EXPECT_THAT(result[2].second,
2824 ::testing::ElementsAre(realtime_clock::epoch() +
2825 chrono::microseconds(34900150)));
2826}
2827
2828// Tests that local data before remote data after reboot is properly replayed.
2829// We only trigger a reboot in the timestamp interpolation function when solving
2830// the timestamp problem when we actually have a point in the function. This
2831// originally only happened when a point passes the noncausal filter. At the
2832// start of time for the second boot, if we aren't careful, we will have
2833// messages which need to be published at times before the boot. This happens
2834// when a local message is in the log before a forwarded message, so there is no
2835// point in the interpolation function. This delays the reboot. So, we need to
2836// recreate that situation and make sure it doesn't come back.
2837TEST(MultinodeRebootLoggerTest,
2838 LocalMessageBeforeRemoteBeforeStartAfterReboot) {
2839 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2840 aos::configuration::ReadConfig(ArtifactPath(
2841 "aos/events/logging/multinode_pingpong_split3_config.json"));
2842 message_bridge::TestingTimeConverter time_converter(
2843 configuration::NodesCount(&config.message()));
2844 SimulatedEventLoopFactory event_loop_factory(&config.message());
2845 event_loop_factory.SetTimeConverter(&time_converter);
2846 NodeEventLoopFactory *const pi1 =
2847 event_loop_factory.GetNodeEventLoopFactory("pi1");
2848 const size_t pi1_index = configuration::GetNodeIndex(
2849 event_loop_factory.configuration(), pi1->node());
2850 NodeEventLoopFactory *const pi2 =
2851 event_loop_factory.GetNodeEventLoopFactory("pi2");
2852 const size_t pi2_index = configuration::GetNodeIndex(
2853 event_loop_factory.configuration(), pi2->node());
2854 NodeEventLoopFactory *const pi3 =
2855 event_loop_factory.GetNodeEventLoopFactory("pi3");
2856 const size_t pi3_index = configuration::GetNodeIndex(
2857 event_loop_factory.configuration(), pi3->node());
2858
2859 const std::string kLogfile1_1 =
2860 aos::testing::TestTmpDir() + "/multi_logfile1/";
2861 const std::string kLogfile2_1 =
2862 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
2863 const std::string kLogfile2_2 =
2864 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
2865 const std::string kLogfile3_1 =
2866 aos::testing::TestTmpDir() + "/multi_logfile3/";
2867 util::UnlinkRecursive(kLogfile1_1);
2868 util::UnlinkRecursive(kLogfile2_1);
2869 util::UnlinkRecursive(kLogfile2_2);
2870 util::UnlinkRecursive(kLogfile3_1);
2871 const UUID pi1_boot0 = UUID::Random();
2872 const UUID pi2_boot0 = UUID::Random();
2873 const UUID pi2_boot1 = UUID::Random();
2874 const UUID pi3_boot0 = UUID::Random();
2875 {
2876 CHECK_EQ(pi1_index, 0u);
2877 CHECK_EQ(pi2_index, 1u);
2878 CHECK_EQ(pi3_index, 2u);
2879
2880 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
2881 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
2882 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
2883 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
2884
2885 time_converter.AddNextTimestamp(
2886 distributed_clock::epoch(),
2887 {BootTimestamp::epoch(), BootTimestamp::epoch(),
2888 BootTimestamp::epoch()});
2889 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
2890 time_converter.AddNextTimestamp(
2891 distributed_clock::epoch() + reboot_time,
2892 {BootTimestamp::epoch() + reboot_time,
2893 BootTimestamp{.boot = 1,
2894 .time = monotonic_clock::epoch() + reboot_time +
2895 chrono::seconds(100)},
2896 BootTimestamp::epoch() + reboot_time});
2897 }
2898
2899 std::vector<std::string> filenames;
2900 {
2901 LoggerState pi1_logger = MakeLoggerState(
2902 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2903 LoggerState pi3_logger = MakeLoggerState(
2904 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2905 {
2906 // And now start the logger.
2907 LoggerState pi2_logger = MakeLoggerState(
2908 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2909
2910 pi1_logger.StartLogger(kLogfile1_1);
2911 pi3_logger.StartLogger(kLogfile3_1);
2912 pi2_logger.StartLogger(kLogfile2_1);
2913
2914 event_loop_factory.RunFor(chrono::milliseconds(1005));
2915
2916 // Now that we've got a start time in the past, turn on data.
2917 std::unique_ptr<aos::EventLoop> ping_event_loop =
2918 pi1->MakeEventLoop("ping");
2919 Ping ping(ping_event_loop.get());
2920
2921 pi2->AlwaysStart<Pong>("pong");
2922
2923 event_loop_factory.RunFor(chrono::milliseconds(3000));
2924
2925 pi2_logger.AppendAllFilenames(&filenames);
2926
2927 // Disable any remote messages on pi2.
2928 pi1->Disconnect(pi2->node());
2929 pi2->Disconnect(pi1->node());
2930 }
2931 event_loop_factory.RunFor(chrono::milliseconds(995));
2932 // pi2 now reboots at 5 seconds.
2933 {
2934 event_loop_factory.RunFor(chrono::milliseconds(1000));
2935
2936 // Make local stuff happen before we start logging and connect the remote.
2937 pi2->AlwaysStart<Pong>("pong");
2938 std::unique_ptr<aos::EventLoop> ping_event_loop =
2939 pi1->MakeEventLoop("ping");
2940 Ping ping(ping_event_loop.get());
2941 event_loop_factory.RunFor(chrono::milliseconds(1005));
2942
2943 // Start logging again on pi2 after it is up.
2944 LoggerState pi2_logger = MakeLoggerState(
2945 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2946 pi2_logger.StartLogger(kLogfile2_2);
2947
2948 // And allow remote messages now that we have some local ones.
2949 pi1->Connect(pi2->node());
2950 pi2->Connect(pi1->node());
2951
2952 event_loop_factory.RunFor(chrono::milliseconds(1000));
2953
2954 event_loop_factory.RunFor(chrono::milliseconds(3000));
2955
2956 pi2_logger.AppendAllFilenames(&filenames);
2957 }
2958
2959 pi1_logger.AppendAllFilenames(&filenames);
2960 pi3_logger.AppendAllFilenames(&filenames);
2961 }
2962
2963 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2964 // to confirm the right thing happened.
2965 const std::vector<LogFile> sorted_parts = SortParts(filenames);
2966 auto result = ConfirmReadable(filenames);
2967
2968 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
2969 EXPECT_THAT(result[0].second,
2970 ::testing::ElementsAre(realtime_clock::epoch() +
2971 chrono::microseconds(11000350)));
2972
2973 EXPECT_THAT(result[1].first,
2974 ::testing::ElementsAre(
2975 realtime_clock::epoch(),
2976 realtime_clock::epoch() + chrono::microseconds(107005000)));
2977 EXPECT_THAT(result[1].second,
2978 ::testing::ElementsAre(
2979 realtime_clock::epoch() + chrono::microseconds(4000150),
2980 realtime_clock::epoch() + chrono::microseconds(111000200)));
2981
2982 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
2983 EXPECT_THAT(result[2].second,
2984 ::testing::ElementsAre(realtime_clock::epoch() +
2985 chrono::microseconds(11000150)));
2986
2987 auto start_stop_result = ConfirmReadable(
2988 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
2989 realtime_clock::epoch() + chrono::milliseconds(3000));
2990
2991 EXPECT_THAT(
2992 start_stop_result[0].first,
2993 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
2994 EXPECT_THAT(
2995 start_stop_result[0].second,
2996 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
2997 EXPECT_THAT(
2998 start_stop_result[1].first,
2999 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3000 EXPECT_THAT(
3001 start_stop_result[1].second,
3002 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3003 EXPECT_THAT(
3004 start_stop_result[2].first,
3005 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3006 EXPECT_THAT(
3007 start_stop_result[2].second,
3008 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3009}
3010
3011// Tests that setting the start and stop flags across a reboot works as
3012// expected.
3013TEST(MultinodeRebootLoggerTest, RebootStartStopTimes) {
3014 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3015 aos::configuration::ReadConfig(ArtifactPath(
3016 "aos/events/logging/multinode_pingpong_split3_config.json"));
3017 message_bridge::TestingTimeConverter time_converter(
3018 configuration::NodesCount(&config.message()));
3019 SimulatedEventLoopFactory event_loop_factory(&config.message());
3020 event_loop_factory.SetTimeConverter(&time_converter);
3021 NodeEventLoopFactory *const pi1 =
3022 event_loop_factory.GetNodeEventLoopFactory("pi1");
3023 const size_t pi1_index = configuration::GetNodeIndex(
3024 event_loop_factory.configuration(), pi1->node());
3025 NodeEventLoopFactory *const pi2 =
3026 event_loop_factory.GetNodeEventLoopFactory("pi2");
3027 const size_t pi2_index = configuration::GetNodeIndex(
3028 event_loop_factory.configuration(), pi2->node());
3029 NodeEventLoopFactory *const pi3 =
3030 event_loop_factory.GetNodeEventLoopFactory("pi3");
3031 const size_t pi3_index = configuration::GetNodeIndex(
3032 event_loop_factory.configuration(), pi3->node());
3033
3034 const std::string kLogfile1_1 =
3035 aos::testing::TestTmpDir() + "/multi_logfile1/";
3036 const std::string kLogfile2_1 =
3037 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3038 const std::string kLogfile2_2 =
3039 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3040 const std::string kLogfile3_1 =
3041 aos::testing::TestTmpDir() + "/multi_logfile3/";
3042 util::UnlinkRecursive(kLogfile1_1);
3043 util::UnlinkRecursive(kLogfile2_1);
3044 util::UnlinkRecursive(kLogfile2_2);
3045 util::UnlinkRecursive(kLogfile3_1);
3046 {
3047 CHECK_EQ(pi1_index, 0u);
3048 CHECK_EQ(pi2_index, 1u);
3049 CHECK_EQ(pi3_index, 2u);
3050
3051 time_converter.AddNextTimestamp(
3052 distributed_clock::epoch(),
3053 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3054 BootTimestamp::epoch()});
3055 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3056 time_converter.AddNextTimestamp(
3057 distributed_clock::epoch() + reboot_time,
3058 {BootTimestamp::epoch() + reboot_time,
3059 BootTimestamp{.boot = 1,
3060 .time = monotonic_clock::epoch() + reboot_time},
3061 BootTimestamp::epoch() + reboot_time});
3062 }
3063
3064 std::vector<std::string> filenames;
3065 {
3066 LoggerState pi1_logger = MakeLoggerState(
3067 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3068 LoggerState pi3_logger = MakeLoggerState(
3069 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3070 {
3071 // And now start the logger.
3072 LoggerState pi2_logger = MakeLoggerState(
3073 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3074
3075 pi1_logger.StartLogger(kLogfile1_1);
3076 pi3_logger.StartLogger(kLogfile3_1);
3077 pi2_logger.StartLogger(kLogfile2_1);
3078
3079 event_loop_factory.RunFor(chrono::milliseconds(1005));
3080
3081 // Now that we've got a start time in the past, turn on data.
3082 std::unique_ptr<aos::EventLoop> ping_event_loop =
3083 pi1->MakeEventLoop("ping");
3084 Ping ping(ping_event_loop.get());
3085
3086 pi2->AlwaysStart<Pong>("pong");
3087
3088 event_loop_factory.RunFor(chrono::milliseconds(3000));
3089
3090 pi2_logger.AppendAllFilenames(&filenames);
3091 }
3092 event_loop_factory.RunFor(chrono::milliseconds(995));
3093 // pi2 now reboots at 5 seconds.
3094 {
3095 event_loop_factory.RunFor(chrono::milliseconds(1000));
3096
3097 // Make local stuff happen before we start logging and connect the remote.
3098 pi2->AlwaysStart<Pong>("pong");
3099 std::unique_ptr<aos::EventLoop> ping_event_loop =
3100 pi1->MakeEventLoop("ping");
3101 Ping ping(ping_event_loop.get());
3102 event_loop_factory.RunFor(chrono::milliseconds(5));
3103
3104 // Start logging again on pi2 after it is up.
3105 LoggerState pi2_logger = MakeLoggerState(
3106 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3107 pi2_logger.StartLogger(kLogfile2_2);
3108
3109 event_loop_factory.RunFor(chrono::milliseconds(5000));
3110
3111 pi2_logger.AppendAllFilenames(&filenames);
3112 }
3113
3114 pi1_logger.AppendAllFilenames(&filenames);
3115 pi3_logger.AppendAllFilenames(&filenames);
3116 }
3117
3118 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3119 auto result = ConfirmReadable(filenames);
3120
3121 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3122 EXPECT_THAT(result[0].second,
3123 ::testing::ElementsAre(realtime_clock::epoch() +
3124 chrono::microseconds(11000350)));
3125
3126 EXPECT_THAT(result[1].first,
3127 ::testing::ElementsAre(
3128 realtime_clock::epoch(),
3129 realtime_clock::epoch() + chrono::microseconds(6005000)));
3130 EXPECT_THAT(result[1].second,
3131 ::testing::ElementsAre(
3132 realtime_clock::epoch() + chrono::microseconds(4900150),
3133 realtime_clock::epoch() + chrono::microseconds(11000200)));
3134
3135 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3136 EXPECT_THAT(result[2].second,
3137 ::testing::ElementsAre(realtime_clock::epoch() +
3138 chrono::microseconds(11000150)));
3139
3140 // Confirm we observed the correct start and stop times. We should see the
3141 // reboot here.
3142 auto start_stop_result = ConfirmReadable(
3143 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3144 realtime_clock::epoch() + chrono::milliseconds(8000));
3145
3146 EXPECT_THAT(
3147 start_stop_result[0].first,
3148 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3149 EXPECT_THAT(
3150 start_stop_result[0].second,
3151 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3152 EXPECT_THAT(start_stop_result[1].first,
3153 ::testing::ElementsAre(
3154 realtime_clock::epoch() + chrono::seconds(2),
3155 realtime_clock::epoch() + chrono::microseconds(6005000)));
3156 EXPECT_THAT(start_stop_result[1].second,
3157 ::testing::ElementsAre(
3158 realtime_clock::epoch() + chrono::microseconds(4900150),
3159 realtime_clock::epoch() + chrono::seconds(8)));
3160 EXPECT_THAT(
3161 start_stop_result[2].first,
3162 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3163 EXPECT_THAT(
3164 start_stop_result[2].second,
3165 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3166}
3167
3168// Tests that we properly handle one direction being down.
3169TEST(MissingDirectionTest, OneDirection) {
3170 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3171 aos::configuration::ReadConfig(ArtifactPath(
3172 "aos/events/logging/multinode_pingpong_split4_config.json"));
3173 message_bridge::TestingTimeConverter time_converter(
3174 configuration::NodesCount(&config.message()));
3175 SimulatedEventLoopFactory event_loop_factory(&config.message());
3176 event_loop_factory.SetTimeConverter(&time_converter);
3177
3178 NodeEventLoopFactory *const pi1 =
3179 event_loop_factory.GetNodeEventLoopFactory("pi1");
3180 const size_t pi1_index = configuration::GetNodeIndex(
3181 event_loop_factory.configuration(), pi1->node());
3182 NodeEventLoopFactory *const pi2 =
3183 event_loop_factory.GetNodeEventLoopFactory("pi2");
3184 const size_t pi2_index = configuration::GetNodeIndex(
3185 event_loop_factory.configuration(), pi2->node());
3186 std::vector<std::string> filenames;
3187
3188 {
3189 CHECK_EQ(pi1_index, 0u);
3190 CHECK_EQ(pi2_index, 1u);
3191
3192 time_converter.AddNextTimestamp(
3193 distributed_clock::epoch(),
3194 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3195
3196 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3197 time_converter.AddNextTimestamp(
3198 distributed_clock::epoch() + reboot_time,
3199 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3200 BootTimestamp::epoch() + reboot_time});
3201 }
3202
3203 const std::string kLogfile2_1 =
3204 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3205 const std::string kLogfile1_1 =
3206 aos::testing::TestTmpDir() + "/multi_logfile1.1/";
3207 util::UnlinkRecursive(kLogfile2_1);
3208 util::UnlinkRecursive(kLogfile1_1);
3209
3210 pi2->Disconnect(pi1->node());
3211
3212 pi1->AlwaysStart<Ping>("ping");
3213 pi2->AlwaysStart<Pong>("pong");
3214
3215 {
3216 LoggerState pi2_logger = MakeLoggerState(
3217 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3218
3219 event_loop_factory.RunFor(chrono::milliseconds(95));
3220
3221 pi2_logger.StartLogger(kLogfile2_1);
3222
3223 event_loop_factory.RunFor(chrono::milliseconds(6000));
3224
3225 pi2->Connect(pi1->node());
3226
3227 LoggerState pi1_logger = MakeLoggerState(
3228 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3229 pi1_logger.StartLogger(kLogfile1_1);
3230
3231 event_loop_factory.RunFor(chrono::milliseconds(5000));
3232 pi1_logger.AppendAllFilenames(&filenames);
3233 pi2_logger.AppendAllFilenames(&filenames);
3234 }
3235
3236 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3237 ConfirmReadable(filenames);
3238}
3239
3240// Tests that we properly handle only one direction ever existing after a
3241// reboot.
3242TEST(MissingDirectionTest, OneDirectionAfterReboot) {
3243 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3244 aos::configuration::ReadConfig(ArtifactPath(
3245 "aos/events/logging/multinode_pingpong_split4_config.json"));
3246 message_bridge::TestingTimeConverter time_converter(
3247 configuration::NodesCount(&config.message()));
3248 SimulatedEventLoopFactory event_loop_factory(&config.message());
3249 event_loop_factory.SetTimeConverter(&time_converter);
3250
3251 NodeEventLoopFactory *const pi1 =
3252 event_loop_factory.GetNodeEventLoopFactory("pi1");
3253 const size_t pi1_index = configuration::GetNodeIndex(
3254 event_loop_factory.configuration(), pi1->node());
3255 NodeEventLoopFactory *const pi2 =
3256 event_loop_factory.GetNodeEventLoopFactory("pi2");
3257 const size_t pi2_index = configuration::GetNodeIndex(
3258 event_loop_factory.configuration(), pi2->node());
3259 std::vector<std::string> filenames;
3260
3261 {
3262 CHECK_EQ(pi1_index, 0u);
3263 CHECK_EQ(pi2_index, 1u);
3264
3265 time_converter.AddNextTimestamp(
3266 distributed_clock::epoch(),
3267 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3268
3269 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3270 time_converter.AddNextTimestamp(
3271 distributed_clock::epoch() + reboot_time,
3272 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3273 BootTimestamp::epoch() + reboot_time});
3274 }
3275
3276 const std::string kLogfile2_1 =
3277 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3278 util::UnlinkRecursive(kLogfile2_1);
3279
3280 pi1->AlwaysStart<Ping>("ping");
3281
3282 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3283 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3284 // second boot.
3285 {
3286 LoggerState pi2_logger = MakeLoggerState(
3287 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3288
3289 event_loop_factory.RunFor(chrono::milliseconds(95));
3290
3291 pi2_logger.StartLogger(kLogfile2_1);
3292
3293 event_loop_factory.RunFor(chrono::milliseconds(4000));
3294
3295 pi2->Disconnect(pi1->node());
3296
3297 event_loop_factory.RunFor(chrono::milliseconds(1000));
3298 pi1->AlwaysStart<Ping>("ping");
3299
3300 event_loop_factory.RunFor(chrono::milliseconds(5000));
3301 pi2_logger.AppendAllFilenames(&filenames);
3302 }
3303
3304 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3305 ConfirmReadable(filenames);
3306}
3307
3308// Tests that we properly handle only one direction ever existing after a reboot
3309// with only reliable data.
3310TEST(MissingDirectionTest, OneDirectionAfterRebootReliable) {
3311 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3312 aos::configuration::ReadConfig(ArtifactPath(
3313 "aos/events/logging/multinode_pingpong_split4_reliable_config.json"));
3314 message_bridge::TestingTimeConverter time_converter(
3315 configuration::NodesCount(&config.message()));
3316 SimulatedEventLoopFactory event_loop_factory(&config.message());
3317 event_loop_factory.SetTimeConverter(&time_converter);
3318
3319 NodeEventLoopFactory *const pi1 =
3320 event_loop_factory.GetNodeEventLoopFactory("pi1");
3321 const size_t pi1_index = configuration::GetNodeIndex(
3322 event_loop_factory.configuration(), pi1->node());
3323 NodeEventLoopFactory *const pi2 =
3324 event_loop_factory.GetNodeEventLoopFactory("pi2");
3325 const size_t pi2_index = configuration::GetNodeIndex(
3326 event_loop_factory.configuration(), pi2->node());
3327 std::vector<std::string> filenames;
3328
3329 {
3330 CHECK_EQ(pi1_index, 0u);
3331 CHECK_EQ(pi2_index, 1u);
3332
3333 time_converter.AddNextTimestamp(
3334 distributed_clock::epoch(),
3335 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3336
3337 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3338 time_converter.AddNextTimestamp(
3339 distributed_clock::epoch() + reboot_time,
3340 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3341 BootTimestamp::epoch() + reboot_time});
3342 }
3343
3344 const std::string kLogfile2_1 =
3345 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3346 util::UnlinkRecursive(kLogfile2_1);
3347
3348 pi1->AlwaysStart<Ping>("ping");
3349
3350 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3351 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3352 // second boot.
3353 {
3354 LoggerState pi2_logger = MakeLoggerState(
3355 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3356
3357 event_loop_factory.RunFor(chrono::milliseconds(95));
3358
3359 pi2_logger.StartLogger(kLogfile2_1);
3360
3361 event_loop_factory.RunFor(chrono::milliseconds(4000));
3362
3363 pi2->Disconnect(pi1->node());
3364
3365 event_loop_factory.RunFor(chrono::milliseconds(1000));
3366 pi1->AlwaysStart<Ping>("ping");
3367
3368 event_loop_factory.RunFor(chrono::milliseconds(5000));
3369 pi2_logger.AppendAllFilenames(&filenames);
3370 }
3371
3372 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3373 ConfirmReadable(filenames);
3374}
3375
3376// Tests that we properly handle what used to be a time violation in one
3377// direction. This can occur when one direction goes down after sending some
3378// data, but the other keeps working. The down direction ends up resolving to a
3379// straight line in the noncausal filter, where the direction which is still up
3380// can cross that line. Really, time progressed along just fine but we assumed
3381// that the offset was a line when it could have deviated by up to 1ms/second.
3382TEST_P(MultinodeLoggerTest, OneDirectionTimeDrift) {
3383 std::vector<std::string> filenames;
3384
3385 CHECK_EQ(pi1_index_, 0u);
3386 CHECK_EQ(pi2_index_, 1u);
3387
3388 time_converter_.AddNextTimestamp(
3389 distributed_clock::epoch(),
3390 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3391
3392 const chrono::nanoseconds before_disconnect_duration =
3393 time_converter_.AddMonotonic(
3394 {chrono::milliseconds(1000), chrono::milliseconds(1000)});
3395
3396 const chrono::nanoseconds test_duration =
3397 time_converter_.AddMonotonic(
3398 {chrono::milliseconds(1000), chrono::milliseconds(1000)}) +
3399 time_converter_.AddMonotonic(
3400 {chrono::milliseconds(10000),
3401 chrono::milliseconds(10000) - chrono::milliseconds(5)}) +
3402 time_converter_.AddMonotonic(
3403 {chrono::milliseconds(10000),
3404 chrono::milliseconds(10000) + chrono::milliseconds(5)});
3405
3406 const std::string kLogfile =
3407 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3408 util::UnlinkRecursive(kLogfile);
3409
3410 {
3411 LoggerState pi2_logger = MakeLogger(pi2_);
3412 pi2_logger.StartLogger(kLogfile);
3413 event_loop_factory_.RunFor(before_disconnect_duration);
3414
3415 pi2_->Disconnect(pi1_->node());
3416
3417 event_loop_factory_.RunFor(test_duration);
3418 pi2_->Connect(pi1_->node());
3419
3420 event_loop_factory_.RunFor(chrono::milliseconds(5000));
3421 pi2_logger.AppendAllFilenames(&filenames);
3422 }
3423
3424 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3425 ConfirmReadable(filenames);
3426}
3427
3428// Tests that we can replay a logfile that has timestamps such that at least one
3429// node's epoch is at a positive distributed_clock (and thus will have to be
3430// booted after the other node(s)).
3431TEST_P(MultinodeLoggerTest, StartOneNodeBeforeOther) {
3432 std::vector<std::string> filenames;
3433
3434 CHECK_EQ(pi1_index_, 0u);
3435 CHECK_EQ(pi2_index_, 1u);
3436
3437 time_converter_.AddNextTimestamp(
3438 distributed_clock::epoch(),
3439 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3440
3441 const chrono::nanoseconds before_reboot_duration = chrono::milliseconds(1000);
3442 time_converter_.RebootAt(
3443 0, distributed_clock::time_point(before_reboot_duration));
3444
3445 const chrono::nanoseconds test_duration = time_converter_.AddMonotonic(
3446 {chrono::milliseconds(10000), chrono::milliseconds(10000)});
3447
3448 const std::string kLogfile =
3449 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3450 util::UnlinkRecursive(kLogfile);
3451
3452 pi2_->Disconnect(pi1_->node());
3453 pi1_->Disconnect(pi2_->node());
3454
3455 {
3456 LoggerState pi2_logger = MakeLogger(pi2_);
3457
3458 pi2_logger.StartLogger(kLogfile);
3459 event_loop_factory_.RunFor(before_reboot_duration);
3460
3461 pi2_->Connect(pi1_->node());
3462 pi1_->Connect(pi2_->node());
3463
3464 event_loop_factory_.RunFor(test_duration);
3465
3466 pi2_logger.AppendAllFilenames(&filenames);
3467 }
3468
3469 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3470 ConfirmReadable(filenames);
3471
3472 {
3473 LogReader reader(sorted_parts);
3474 SimulatedEventLoopFactory replay_factory(reader.configuration());
3475 reader.RegisterWithoutStarting(&replay_factory);
3476
3477 NodeEventLoopFactory *const replay_node =
3478 reader.event_loop_factory()->GetNodeEventLoopFactory("pi1");
3479
3480 std::unique_ptr<EventLoop> test_event_loop =
3481 replay_node->MakeEventLoop("test_reader");
3482 replay_node->OnStartup([replay_node]() {
3483 // Check that we didn't boot until at least t=0.
3484 CHECK_LE(monotonic_clock::epoch(), replay_node->monotonic_now());
3485 });
3486 test_event_loop->OnRun([&test_event_loop]() {
3487 // Check that we didn't boot until at least t=0.
3488 EXPECT_LE(monotonic_clock::epoch(), test_event_loop->monotonic_now());
3489 });
3490 reader.event_loop_factory()->Run();
3491 reader.Deregister();
3492 }
3493}
3494
3495// Tests that when we have a loop without all the logs at all points in time, we
3496// can sort it properly.
3497TEST(MultinodeLoggerLoopTest, Loop) {
3498 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3499 aos::configuration::ReadConfig(ArtifactPath(
3500 "aos/events/logging/multinode_pingpong_triangle_split_config.json"));
3501 message_bridge::TestingTimeConverter time_converter(
3502 configuration::NodesCount(&config.message()));
3503 SimulatedEventLoopFactory event_loop_factory(&config.message());
3504 event_loop_factory.SetTimeConverter(&time_converter);
3505
3506 NodeEventLoopFactory *const pi1 =
3507 event_loop_factory.GetNodeEventLoopFactory("pi1");
3508 NodeEventLoopFactory *const pi2 =
3509 event_loop_factory.GetNodeEventLoopFactory("pi2");
3510 NodeEventLoopFactory *const pi3 =
3511 event_loop_factory.GetNodeEventLoopFactory("pi3");
3512
3513 const std::string kLogfile1_1 =
3514 aos::testing::TestTmpDir() + "/multi_logfile1/";
3515 const std::string kLogfile2_1 =
3516 aos::testing::TestTmpDir() + "/multi_logfile2/";
3517 const std::string kLogfile3_1 =
3518 aos::testing::TestTmpDir() + "/multi_logfile3/";
3519 util::UnlinkRecursive(kLogfile1_1);
3520 util::UnlinkRecursive(kLogfile2_1);
3521 util::UnlinkRecursive(kLogfile3_1);
3522
3523 {
3524 // Make pi1 boot before everything else.
3525 time_converter.AddNextTimestamp(
3526 distributed_clock::epoch(),
3527 {BootTimestamp::epoch(),
3528 BootTimestamp::epoch() - chrono::milliseconds(100),
3529 BootTimestamp::epoch() - chrono::milliseconds(300)});
3530 }
3531
3532 // We want to setup a situation such that 2 of the 3 legs of the loop are very
3533 // confident about time being X, and the third leg is pulling the average off
3534 // to one side.
3535 //
3536 // It's easiest to visualize this in timestamp_plotter.
3537
3538 std::vector<std::string> filenames;
3539 {
3540 // Have pi1 send out a reliable message at startup. This sets up a long
3541 // forwarding time message at the start to bias time.
3542 std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
3543 {
3544 aos::Sender<examples::Ping> ping_sender =
3545 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
3546
3547 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
3548 examples::Ping::Builder ping_builder =
3549 builder.MakeBuilder<examples::Ping>();
3550 CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
3551 }
3552
3553 // Wait a while so there's enough data to let the worst case be rather off.
3554 event_loop_factory.RunFor(chrono::seconds(1000));
3555
3556 // Now start a receiving node first. This sets up 2 tight bounds between 2
3557 // of the nodes.
3558 LoggerState pi2_logger = MakeLoggerState(
3559 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3560 pi2_logger.StartLogger(kLogfile2_1);
3561
3562 event_loop_factory.RunFor(chrono::seconds(100));
3563
3564 // And now start the third leg.
3565 LoggerState pi3_logger = MakeLoggerState(
3566 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3567 pi3_logger.StartLogger(kLogfile3_1);
3568
3569 LoggerState pi1_logger = MakeLoggerState(
3570 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3571 pi1_logger.StartLogger(kLogfile1_1);
3572
3573 event_loop_factory.RunFor(chrono::seconds(100));
3574
3575 pi1_logger.AppendAllFilenames(&filenames);
3576 pi2_logger.AppendAllFilenames(&filenames);
3577 pi3_logger.AppendAllFilenames(&filenames);
3578 }
3579
3580 // Make sure we can read this.
3581 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3582 auto result = ConfirmReadable(filenames);
3583}
3584
3585} // namespace testing
3586} // namespace logger
3587} // namespace aos