blob: 18337bebc58c5e6a4f51bda59d95f8124f5a4076 [file] [log] [blame]
Naman Guptaa63aa132023-03-22 20:06:34 -07001#include "aos/events/logging/log_reader.h"
2#include "aos/events/logging/multinode_logger_test_lib.h"
3#include "aos/events/message_counter.h"
4#include "aos/events/ping_lib.h"
5#include "aos/events/pong_lib.h"
6#include "aos/network/remote_message_generated.h"
7#include "aos/network/timestamp_generated.h"
8#include "aos/testing/tmpdir.h"
9#include "gmock/gmock.h"
10#include "gtest/gtest.h"
11
12namespace aos {
13namespace logger {
14namespace testing {
15
16namespace chrono = std::chrono;
17using aos::message_bridge::RemoteMessage;
18using aos::testing::ArtifactPath;
19using aos::testing::MessageCounter;
20
Naman Guptaa63aa132023-03-22 20:06:34 -070021INSTANTIATE_TEST_SUITE_P(
22 All, MultinodeLoggerTest,
23 ::testing::Combine(
24 ::testing::Values(
25 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080026 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070027 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080028 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070029 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
30
31INSTANTIATE_TEST_SUITE_P(
32 All, MultinodeLoggerDeathTest,
33 ::testing::Combine(
34 ::testing::Values(
35 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080036 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070037 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080038 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070039 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
40
41// Tests that we can write and read simple multi-node log files.
42TEST_P(MultinodeLoggerTest, SimpleMultiNode) {
43 std::vector<std::string> actual_filenames;
44 time_converter_.StartEqual();
45
46 {
47 LoggerState pi1_logger = MakeLogger(pi1_);
48 LoggerState pi2_logger = MakeLogger(pi2_);
49
50 event_loop_factory_.RunFor(chrono::milliseconds(95));
51
52 StartLogger(&pi1_logger);
53 StartLogger(&pi2_logger);
54
55 event_loop_factory_.RunFor(chrono::milliseconds(20000));
56 pi1_logger.AppendAllFilenames(&actual_filenames);
57 pi2_logger.AppendAllFilenames(&actual_filenames);
58 }
59
60 ASSERT_THAT(actual_filenames,
61 ::testing::UnorderedElementsAreArray(logfiles_));
62
63 {
64 std::set<std::string> logfile_uuids;
65 std::set<std::string> parts_uuids;
66 // Confirm that we have the expected number of UUIDs for both the logfile
67 // UUIDs and parts UUIDs.
68 std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
69 for (std::string_view f : logfiles_) {
70 log_header.emplace_back(ReadHeader(f).value());
71 if (!log_header.back().message().has_configuration()) {
72 logfile_uuids.insert(
73 log_header.back().message().log_event_uuid()->str());
74 parts_uuids.insert(log_header.back().message().parts_uuid()->str());
75 }
76 }
77
78 EXPECT_EQ(logfile_uuids.size(), 2u);
79 if (shared()) {
80 EXPECT_EQ(parts_uuids.size(), 7u);
81 } else {
82 EXPECT_EQ(parts_uuids.size(), 8u);
83 }
84
85 // And confirm everything is on the correct node.
86 EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
87 EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi1");
88 EXPECT_EQ(log_header[4].message().node()->name()->string_view(), "pi1");
89
90 EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
91 EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi2");
92
93 EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi2");
94 EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi2");
95 EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi2");
96
97 EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi1");
98 EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi1");
99
100 EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
101 EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
102
103 if (shared()) {
104 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
105 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi2");
106 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi2");
107
108 EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi1");
109 EXPECT_EQ(log_header[18].message().node()->name()->string_view(), "pi1");
110 } else {
111 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
112 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi2");
113
114 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi1");
115 EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi1");
116
117 EXPECT_EQ(log_header[18].message().node()->name()->string_view(), "pi2");
118 EXPECT_EQ(log_header[19].message().node()->name()->string_view(), "pi2");
119 }
120
121 // And the parts index matches.
122 EXPECT_EQ(log_header[2].message().parts_index(), 0);
123 EXPECT_EQ(log_header[3].message().parts_index(), 1);
124 EXPECT_EQ(log_header[4].message().parts_index(), 2);
125
126 EXPECT_EQ(log_header[5].message().parts_index(), 0);
127 EXPECT_EQ(log_header[6].message().parts_index(), 1);
128
129 EXPECT_EQ(log_header[7].message().parts_index(), 0);
130 EXPECT_EQ(log_header[8].message().parts_index(), 1);
131 EXPECT_EQ(log_header[9].message().parts_index(), 2);
132
133 EXPECT_EQ(log_header[10].message().parts_index(), 0);
134 EXPECT_EQ(log_header[11].message().parts_index(), 1);
135
136 EXPECT_EQ(log_header[12].message().parts_index(), 0);
137 EXPECT_EQ(log_header[13].message().parts_index(), 1);
138
139 if (shared()) {
140 EXPECT_EQ(log_header[14].message().parts_index(), 0);
141 EXPECT_EQ(log_header[15].message().parts_index(), 1);
142 EXPECT_EQ(log_header[16].message().parts_index(), 2);
143
144 EXPECT_EQ(log_header[17].message().parts_index(), 0);
145 EXPECT_EQ(log_header[18].message().parts_index(), 1);
146 } else {
147 EXPECT_EQ(log_header[14].message().parts_index(), 0);
148 EXPECT_EQ(log_header[15].message().parts_index(), 1);
149
150 EXPECT_EQ(log_header[16].message().parts_index(), 0);
151 EXPECT_EQ(log_header[17].message().parts_index(), 1);
152
153 EXPECT_EQ(log_header[18].message().parts_index(), 0);
154 EXPECT_EQ(log_header[19].message().parts_index(), 1);
155 }
156 }
157
158 const std::vector<LogFile> sorted_log_files = SortParts(logfiles_);
159 {
160 using ::testing::UnorderedElementsAre;
161 std::shared_ptr<const aos::Configuration> config =
162 sorted_log_files[0].config;
163
164 // Timing reports, pings
165 EXPECT_THAT(CountChannelsData(config, logfiles_[2]),
166 UnorderedElementsAre(
167 std::make_tuple("/pi1/aos",
168 "aos.message_bridge.ServerStatistics", 1),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700169 std::make_tuple("/test", "aos.examples.Ping", 1),
170 std::make_tuple("/pi1/aos", "aos.examples.Ping", 1)))
Naman Guptaa63aa132023-03-22 20:06:34 -0700171 << " : " << logfiles_[2];
172 {
173 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700174 std::make_tuple("/pi1/aos", "aos.examples.Ping", 10),
Naman Guptaa63aa132023-03-22 20:06:34 -0700175 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 1),
176 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
177 1)};
178 if (!std::get<0>(GetParam()).shared) {
179 channel_counts.push_back(
180 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
181 "aos-message_bridge-Timestamp",
182 "aos.message_bridge.RemoteMessage", 1));
183 }
184 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
185 ::testing::UnorderedElementsAreArray(channel_counts))
186 << " : " << logfiles_[3];
187 }
188 {
189 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700190 std::make_tuple("/pi1/aos", "aos.examples.Ping", 1990),
Naman Guptaa63aa132023-03-22 20:06:34 -0700191 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 199),
192 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
193 20),
194 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
195 199),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700196 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
Naman Guptaa63aa132023-03-22 20:06:34 -0700197 std::make_tuple("/test", "aos.examples.Ping", 2000)};
198 if (!std::get<0>(GetParam()).shared) {
199 channel_counts.push_back(
200 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
201 "aos-message_bridge-Timestamp",
202 "aos.message_bridge.RemoteMessage", 199));
203 }
204 EXPECT_THAT(CountChannelsData(config, logfiles_[4]),
205 ::testing::UnorderedElementsAreArray(channel_counts))
206 << " : " << logfiles_[4];
207 }
208 // Timestamps for pong
209 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
210 UnorderedElementsAre())
211 << " : " << logfiles_[2];
212 EXPECT_THAT(
213 CountChannelsTimestamp(config, logfiles_[3]),
214 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
215 << " : " << logfiles_[3];
216 EXPECT_THAT(
217 CountChannelsTimestamp(config, logfiles_[4]),
218 UnorderedElementsAre(
219 std::make_tuple("/test", "aos.examples.Pong", 2000),
220 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)))
221 << " : " << logfiles_[4];
222
223 // Pong data.
224 EXPECT_THAT(
225 CountChannelsData(config, logfiles_[5]),
226 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 91)))
227 << " : " << logfiles_[5];
228 EXPECT_THAT(CountChannelsData(config, logfiles_[6]),
229 UnorderedElementsAre(
230 std::make_tuple("/test", "aos.examples.Pong", 1910)))
231 << " : " << logfiles_[6];
232
233 // No timestamps
234 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
235 UnorderedElementsAre())
236 << " : " << logfiles_[5];
237 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[6]),
238 UnorderedElementsAre())
239 << " : " << logfiles_[6];
240
241 // Timing reports and pongs.
242 EXPECT_THAT(CountChannelsData(config, logfiles_[7]),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700243 UnorderedElementsAre(
244 std::make_tuple("/pi2/aos", "aos.examples.Ping", 1),
245 std::make_tuple("/pi2/aos",
246 "aos.message_bridge.ServerStatistics", 1)))
Naman Guptaa63aa132023-03-22 20:06:34 -0700247 << " : " << logfiles_[7];
248 EXPECT_THAT(
249 CountChannelsData(config, logfiles_[8]),
250 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
251 << " : " << logfiles_[8];
252 EXPECT_THAT(
253 CountChannelsData(config, logfiles_[9]),
254 UnorderedElementsAre(
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700255 std::make_tuple("/pi2/aos", "aos.examples.Ping", 2000),
Naman Guptaa63aa132023-03-22 20:06:34 -0700256 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
257 std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
258 20),
259 std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
260 200),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700261 std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
Naman Guptaa63aa132023-03-22 20:06:34 -0700262 std::make_tuple("/test", "aos.examples.Pong", 2000)))
263 << " : " << logfiles_[9];
264 // And ping timestamps.
265 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[7]),
266 UnorderedElementsAre())
267 << " : " << logfiles_[7];
268 EXPECT_THAT(
269 CountChannelsTimestamp(config, logfiles_[8]),
270 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
271 << " : " << logfiles_[8];
272 EXPECT_THAT(
273 CountChannelsTimestamp(config, logfiles_[9]),
274 UnorderedElementsAre(
275 std::make_tuple("/test", "aos.examples.Ping", 2000),
276 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
277 << " : " << logfiles_[9];
278
279 // And then test that the remotely logged timestamp data files only have
280 // timestamps in them.
281 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
282 UnorderedElementsAre())
283 << " : " << logfiles_[10];
284 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
285 UnorderedElementsAre())
286 << " : " << logfiles_[11];
287 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[12]),
288 UnorderedElementsAre())
289 << " : " << logfiles_[12];
290 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[13]),
291 UnorderedElementsAre())
292 << " : " << logfiles_[13];
293
294 EXPECT_THAT(CountChannelsData(config, logfiles_[10]),
295 UnorderedElementsAre(std::make_tuple(
296 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
297 << " : " << logfiles_[10];
298 EXPECT_THAT(CountChannelsData(config, logfiles_[11]),
299 UnorderedElementsAre(std::make_tuple(
300 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
301 << " : " << logfiles_[11];
302
303 EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
304 UnorderedElementsAre(std::make_tuple(
305 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
306 << " : " << logfiles_[12];
307 EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
308 UnorderedElementsAre(std::make_tuple(
309 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
310 << " : " << logfiles_[13];
311
312 // Timestamps from pi2 on pi1, and the other way.
313 if (shared()) {
314 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
315 UnorderedElementsAre())
316 << " : " << logfiles_[14];
317 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
318 UnorderedElementsAre())
319 << " : " << logfiles_[15];
320 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
321 UnorderedElementsAre())
322 << " : " << logfiles_[16];
323 EXPECT_THAT(CountChannelsData(config, logfiles_[17]),
324 UnorderedElementsAre())
325 << " : " << logfiles_[17];
326 EXPECT_THAT(CountChannelsData(config, logfiles_[18]),
327 UnorderedElementsAre())
328 << " : " << logfiles_[18];
329
330 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
331 UnorderedElementsAre(
332 std::make_tuple("/test", "aos.examples.Ping", 1)))
333 << " : " << logfiles_[14];
334 EXPECT_THAT(
335 CountChannelsTimestamp(config, logfiles_[15]),
336 UnorderedElementsAre(
337 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 9),
338 std::make_tuple("/test", "aos.examples.Ping", 90)))
339 << " : " << logfiles_[15];
340 EXPECT_THAT(
341 CountChannelsTimestamp(config, logfiles_[16]),
342 UnorderedElementsAre(
343 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 191),
344 std::make_tuple("/test", "aos.examples.Ping", 1910)))
345 << " : " << logfiles_[16];
346 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
347 UnorderedElementsAre(std::make_tuple(
348 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
349 << " : " << logfiles_[17];
350 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[18]),
351 UnorderedElementsAre(std::make_tuple(
352 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
353 << " : " << logfiles_[18];
354 } else {
355 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
356 UnorderedElementsAre())
357 << " : " << logfiles_[14];
358 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
359 UnorderedElementsAre())
360 << " : " << logfiles_[15];
361 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
362 UnorderedElementsAre())
363 << " : " << logfiles_[16];
364 EXPECT_THAT(CountChannelsData(config, logfiles_[17]),
365 UnorderedElementsAre())
366 << " : " << logfiles_[17];
367 EXPECT_THAT(CountChannelsData(config, logfiles_[18]),
368 UnorderedElementsAre())
369 << " : " << logfiles_[18];
370 EXPECT_THAT(CountChannelsData(config, logfiles_[19]),
371 UnorderedElementsAre())
372 << " : " << logfiles_[19];
373
374 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
375 UnorderedElementsAre(std::make_tuple(
376 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
377 << " : " << logfiles_[14];
378 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
379 UnorderedElementsAre(std::make_tuple(
380 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
381 << " : " << logfiles_[15];
382 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[16]),
383 UnorderedElementsAre(std::make_tuple(
384 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
385 << " : " << logfiles_[16];
386 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
387 UnorderedElementsAre(std::make_tuple(
388 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
389 << " : " << logfiles_[17];
390 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[18]),
391 UnorderedElementsAre(
392 std::make_tuple("/test", "aos.examples.Ping", 91)))
393 << " : " << logfiles_[18];
394 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[19]),
395 UnorderedElementsAre(
396 std::make_tuple("/test", "aos.examples.Ping", 1910)))
397 << " : " << logfiles_[19];
398 }
399 }
400
401 LogReader reader(sorted_log_files);
402
403 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
404 log_reader_factory.set_send_delay(chrono::microseconds(0));
405
406 // This sends out the fetched messages and advances time to the start of the
407 // log file.
408 reader.Register(&log_reader_factory);
409
410 const Node *pi1 =
411 configuration::GetNode(log_reader_factory.configuration(), "pi1");
412 const Node *pi2 =
413 configuration::GetNode(log_reader_factory.configuration(), "pi2");
414
415 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
416 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
417 LOG(INFO) << "now pi1 "
418 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
419 LOG(INFO) << "now pi2 "
420 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
421
422 EXPECT_THAT(reader.LoggedNodes(),
423 ::testing::ElementsAre(
424 configuration::GetNode(reader.logged_configuration(), pi1),
425 configuration::GetNode(reader.logged_configuration(), pi2)));
426
427 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
428
429 std::unique_ptr<EventLoop> pi1_event_loop =
430 log_reader_factory.MakeEventLoop("test", pi1);
431 std::unique_ptr<EventLoop> pi2_event_loop =
432 log_reader_factory.MakeEventLoop("test", pi2);
433
434 int pi1_ping_count = 10;
435 int pi2_ping_count = 10;
436 int pi1_pong_count = 10;
437 int pi2_pong_count = 10;
438
439 // Confirm that the ping value matches.
440 pi1_event_loop->MakeWatcher(
441 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
442 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
443 << pi1_event_loop->context().monotonic_remote_time << " -> "
444 << pi1_event_loop->context().monotonic_event_time;
445 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
446 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
447 pi1_ping_count * chrono::milliseconds(10) +
448 monotonic_clock::epoch());
449 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
450 pi1_ping_count * chrono::milliseconds(10) +
451 realtime_clock::epoch());
452 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
453 pi1_event_loop->context().monotonic_event_time);
454 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
455 pi1_event_loop->context().realtime_event_time);
456
457 ++pi1_ping_count;
458 });
459 pi2_event_loop->MakeWatcher(
460 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
461 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
462 << pi2_event_loop->context().monotonic_remote_time << " -> "
463 << pi2_event_loop->context().monotonic_event_time;
464 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
465
466 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
467 pi2_ping_count * chrono::milliseconds(10) +
468 monotonic_clock::epoch());
469 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
470 pi2_ping_count * chrono::milliseconds(10) +
471 realtime_clock::epoch());
472 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time +
473 chrono::microseconds(150),
474 pi2_event_loop->context().monotonic_event_time);
475 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time +
476 chrono::microseconds(150),
477 pi2_event_loop->context().realtime_event_time);
478 ++pi2_ping_count;
479 });
480
481 constexpr ssize_t kQueueIndexOffset = -9;
482 // Confirm that the ping and pong counts both match, and the value also
483 // matches.
484 pi1_event_loop->MakeWatcher(
485 "/test", [&pi1_event_loop, &pi1_ping_count,
486 &pi1_pong_count](const examples::Pong &pong) {
487 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
488 << pi1_event_loop->context().monotonic_remote_time << " -> "
489 << pi1_event_loop->context().monotonic_event_time;
490
491 EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
492 pi1_pong_count + kQueueIndexOffset);
493 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
494 chrono::microseconds(200) +
495 pi1_pong_count * chrono::milliseconds(10) +
496 monotonic_clock::epoch());
497 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
498 chrono::microseconds(200) +
499 pi1_pong_count * chrono::milliseconds(10) +
500 realtime_clock::epoch());
501
502 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
503 chrono::microseconds(150),
504 pi1_event_loop->context().monotonic_event_time);
505 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
506 chrono::microseconds(150),
507 pi1_event_loop->context().realtime_event_time);
508
509 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
510 ++pi1_pong_count;
511 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
512 });
513 pi2_event_loop->MakeWatcher(
514 "/test", [&pi2_event_loop, &pi2_ping_count,
515 &pi2_pong_count](const examples::Pong &pong) {
516 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
517 << pi2_event_loop->context().monotonic_remote_time << " -> "
518 << pi2_event_loop->context().monotonic_event_time;
519
520 EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
521 pi2_pong_count + kQueueIndexOffset);
522
523 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
524 chrono::microseconds(200) +
525 pi2_pong_count * chrono::milliseconds(10) +
526 monotonic_clock::epoch());
527 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
528 chrono::microseconds(200) +
529 pi2_pong_count * chrono::milliseconds(10) +
530 realtime_clock::epoch());
531
532 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
533 pi2_event_loop->context().monotonic_event_time);
534 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
535 pi2_event_loop->context().realtime_event_time);
536
537 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
538 ++pi2_pong_count;
539 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
540 });
541
542 log_reader_factory.Run();
543 EXPECT_EQ(pi1_ping_count, 2010);
544 EXPECT_EQ(pi2_ping_count, 2010);
545 EXPECT_EQ(pi1_pong_count, 2010);
546 EXPECT_EQ(pi2_pong_count, 2010);
547
548 reader.Deregister();
549}
550
551// Test that if we feed the replay with a mismatched node list that we die on
552// the LogReader constructor.
553TEST_P(MultinodeLoggerDeathTest, MultiNodeBadReplayConfig) {
554 time_converter_.StartEqual();
555 {
556 LoggerState pi1_logger = MakeLogger(pi1_);
557 LoggerState pi2_logger = MakeLogger(pi2_);
558
559 event_loop_factory_.RunFor(chrono::milliseconds(95));
560
561 StartLogger(&pi1_logger);
562 StartLogger(&pi2_logger);
563
564 event_loop_factory_.RunFor(chrono::milliseconds(20000));
565 }
566
567 // Test that, if we add an additional node to the replay config that the
568 // logger complains about the mismatch in number of nodes.
569 FlatbufferDetachedBuffer<Configuration> extra_nodes_config =
570 configuration::MergeWithConfig(&config_.message(), R"({
571 "nodes": [
572 {
573 "name": "extra-node"
574 }
575 ]
576 }
577 )");
578
579 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
580 EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
581 "Log file and replay config need to have matching nodes lists.");
582}
583
584// Tests that we can read log files where they don't start at the same monotonic
585// time.
586TEST_P(MultinodeLoggerTest, StaggeredStart) {
587 time_converter_.StartEqual();
588 std::vector<std::string> actual_filenames;
589
590 {
591 LoggerState pi1_logger = MakeLogger(pi1_);
592 LoggerState pi2_logger = MakeLogger(pi2_);
593
594 event_loop_factory_.RunFor(chrono::milliseconds(95));
595
596 StartLogger(&pi1_logger);
597
598 event_loop_factory_.RunFor(chrono::milliseconds(200));
599
600 StartLogger(&pi2_logger);
601
602 event_loop_factory_.RunFor(chrono::milliseconds(20000));
603 pi1_logger.AppendAllFilenames(&actual_filenames);
604 pi2_logger.AppendAllFilenames(&actual_filenames);
605 }
606
607 // Since we delay starting pi2, it already knows about all the timestamps so
608 // we don't end up with extra parts.
609 LogReader reader(SortParts(actual_filenames));
610
611 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
612 log_reader_factory.set_send_delay(chrono::microseconds(0));
613
614 // This sends out the fetched messages and advances time to the start of the
615 // log file.
616 reader.Register(&log_reader_factory);
617
618 const Node *pi1 =
619 configuration::GetNode(log_reader_factory.configuration(), "pi1");
620 const Node *pi2 =
621 configuration::GetNode(log_reader_factory.configuration(), "pi2");
622
623 EXPECT_THAT(reader.LoggedNodes(),
624 ::testing::ElementsAre(
625 configuration::GetNode(reader.logged_configuration(), pi1),
626 configuration::GetNode(reader.logged_configuration(), pi2)));
627
628 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
629
630 std::unique_ptr<EventLoop> pi1_event_loop =
631 log_reader_factory.MakeEventLoop("test", pi1);
632 std::unique_ptr<EventLoop> pi2_event_loop =
633 log_reader_factory.MakeEventLoop("test", pi2);
634
635 int pi1_ping_count = 30;
636 int pi2_ping_count = 30;
637 int pi1_pong_count = 30;
638 int pi2_pong_count = 30;
639
640 // Confirm that the ping value matches.
641 pi1_event_loop->MakeWatcher(
642 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
643 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
644 << pi1_event_loop->context().monotonic_remote_time << " -> "
645 << pi1_event_loop->context().monotonic_event_time;
646 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
647
648 ++pi1_ping_count;
649 });
650 pi2_event_loop->MakeWatcher(
651 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
652 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
653 << pi2_event_loop->context().monotonic_remote_time << " -> "
654 << pi2_event_loop->context().monotonic_event_time;
655 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
656
657 ++pi2_ping_count;
658 });
659
660 // Confirm that the ping and pong counts both match, and the value also
661 // matches.
662 pi1_event_loop->MakeWatcher(
663 "/test", [&pi1_event_loop, &pi1_ping_count,
664 &pi1_pong_count](const examples::Pong &pong) {
665 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
666 << pi1_event_loop->context().monotonic_remote_time << " -> "
667 << pi1_event_loop->context().monotonic_event_time;
668
669 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
670 ++pi1_pong_count;
671 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
672 });
673 pi2_event_loop->MakeWatcher(
674 "/test", [&pi2_event_loop, &pi2_ping_count,
675 &pi2_pong_count](const examples::Pong &pong) {
676 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
677 << pi2_event_loop->context().monotonic_remote_time << " -> "
678 << pi2_event_loop->context().monotonic_event_time;
679
680 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
681 ++pi2_pong_count;
682 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
683 });
684
685 log_reader_factory.Run();
686 EXPECT_EQ(pi1_ping_count, 2030);
687 EXPECT_EQ(pi2_ping_count, 2030);
688 EXPECT_EQ(pi1_pong_count, 2030);
689 EXPECT_EQ(pi2_pong_count, 2030);
690
691 reader.Deregister();
692}
693
694// Tests that we can read log files where the monotonic clocks drift and don't
695// match correctly. While we are here, also test that different ending times
696// also is readable.
697TEST_P(MultinodeLoggerTest, MismatchedClocks) {
698 // TODO(austin): Negate...
699 const chrono::nanoseconds initial_pi2_offset = chrono::seconds(1000);
700
701 time_converter_.AddMonotonic(
702 {BootTimestamp::epoch(), BootTimestamp::epoch() + initial_pi2_offset});
703 // Wait for 95 ms, (~0.1 seconds - 1/2 of the ping/pong period), and set the
704 // skew to be 200 uS/s
705 const chrono::nanoseconds startup_sleep1 = time_converter_.AddMonotonic(
706 {chrono::milliseconds(95),
707 chrono::milliseconds(95) - chrono::nanoseconds(200) * 95});
708 // Run another 200 ms to have one logger start first.
709 const chrono::nanoseconds startup_sleep2 = time_converter_.AddMonotonic(
710 {chrono::milliseconds(200), chrono::milliseconds(200)});
711 // Slew one way then the other at the same 200 uS/S slew rate. Make sure we
712 // go far enough to cause problems if this isn't accounted for.
713 const chrono::nanoseconds logger_run1 = time_converter_.AddMonotonic(
714 {chrono::milliseconds(20000),
715 chrono::milliseconds(20000) - chrono::nanoseconds(200) * 20000});
716 const chrono::nanoseconds logger_run2 = time_converter_.AddMonotonic(
717 {chrono::milliseconds(40000),
718 chrono::milliseconds(40000) + chrono::nanoseconds(200) * 40000});
719 const chrono::nanoseconds logger_run3 = time_converter_.AddMonotonic(
720 {chrono::milliseconds(400), chrono::milliseconds(400)});
721
722 {
723 LoggerState pi2_logger = MakeLogger(pi2_);
724
725 LOG(INFO) << "pi2 times: " << pi2_->monotonic_now() << " "
726 << pi2_->realtime_now() << " distributed "
727 << pi2_->ToDistributedClock(pi2_->monotonic_now());
728
729 LOG(INFO) << "pi2_ times: " << pi2_->monotonic_now() << " "
730 << pi2_->realtime_now() << " distributed "
731 << pi2_->ToDistributedClock(pi2_->monotonic_now());
732
733 event_loop_factory_.RunFor(startup_sleep1);
734
735 StartLogger(&pi2_logger);
736
737 event_loop_factory_.RunFor(startup_sleep2);
738
739 {
740 // Run pi1's logger for only part of the time.
741 LoggerState pi1_logger = MakeLogger(pi1_);
742
743 StartLogger(&pi1_logger);
744 event_loop_factory_.RunFor(logger_run1);
745
746 // Make sure we slewed time far enough so that the difference is greater
747 // than the network delay. This confirms that if we sort incorrectly, it
748 // would show in the results.
749 EXPECT_LT(
750 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
751 -event_loop_factory_.send_delay() -
752 event_loop_factory_.network_delay());
753
754 event_loop_factory_.RunFor(logger_run2);
755
756 // And now check that we went far enough the other way to make sure we
757 // cover both problems.
758 EXPECT_GT(
759 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
760 event_loop_factory_.send_delay() +
761 event_loop_factory_.network_delay());
762 }
763
764 // And log a bit more on pi2.
765 event_loop_factory_.RunFor(logger_run3);
766 }
767
768 LogReader reader(
769 SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 3, 2)));
770
771 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
772 log_reader_factory.set_send_delay(chrono::microseconds(0));
773
774 const Node *pi1 =
775 configuration::GetNode(log_reader_factory.configuration(), "pi1");
776 const Node *pi2 =
777 configuration::GetNode(log_reader_factory.configuration(), "pi2");
778
779 // This sends out the fetched messages and advances time to the start of the
780 // log file.
781 reader.Register(&log_reader_factory);
782
783 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
784 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
785 LOG(INFO) << "now pi1 "
786 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
787 LOG(INFO) << "now pi2 "
788 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
789
790 LOG(INFO) << "Done registering (pi1) "
791 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
792 << " "
793 << log_reader_factory.GetNodeEventLoopFactory(pi1)->realtime_now();
794 LOG(INFO) << "Done registering (pi2) "
795 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now()
796 << " "
797 << log_reader_factory.GetNodeEventLoopFactory(pi2)->realtime_now();
798
799 EXPECT_THAT(reader.LoggedNodes(),
800 ::testing::ElementsAre(
801 configuration::GetNode(reader.logged_configuration(), pi1),
802 configuration::GetNode(reader.logged_configuration(), pi2)));
803
804 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
805
806 std::unique_ptr<EventLoop> pi1_event_loop =
807 log_reader_factory.MakeEventLoop("test", pi1);
808 std::unique_ptr<EventLoop> pi2_event_loop =
809 log_reader_factory.MakeEventLoop("test", pi2);
810
811 int pi1_ping_count = 30;
812 int pi2_ping_count = 30;
813 int pi1_pong_count = 30;
814 int pi2_pong_count = 30;
815
816 // Confirm that the ping value matches.
817 pi1_event_loop->MakeWatcher(
818 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
819 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
820 << pi1_event_loop->context().monotonic_remote_time << " -> "
821 << pi1_event_loop->context().monotonic_event_time;
822 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
823
824 ++pi1_ping_count;
825 });
826 pi2_event_loop->MakeWatcher(
827 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
828 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
829 << pi2_event_loop->context().monotonic_remote_time << " -> "
830 << pi2_event_loop->context().monotonic_event_time;
831 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
832
833 ++pi2_ping_count;
834 });
835
836 // Confirm that the ping and pong counts both match, and the value also
837 // matches.
838 pi1_event_loop->MakeWatcher(
839 "/test", [&pi1_event_loop, &pi1_ping_count,
840 &pi1_pong_count](const examples::Pong &pong) {
841 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
842 << pi1_event_loop->context().monotonic_remote_time << " -> "
843 << pi1_event_loop->context().monotonic_event_time;
844
845 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
846 ++pi1_pong_count;
847 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
848 });
849 pi2_event_loop->MakeWatcher(
850 "/test", [&pi2_event_loop, &pi2_ping_count,
851 &pi2_pong_count](const examples::Pong &pong) {
852 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
853 << pi2_event_loop->context().monotonic_remote_time << " -> "
854 << pi2_event_loop->context().monotonic_event_time;
855
856 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
857 ++pi2_pong_count;
858 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
859 });
860
861 log_reader_factory.Run();
862 EXPECT_EQ(pi1_ping_count, 6030);
863 EXPECT_EQ(pi2_ping_count, 6030);
864 EXPECT_EQ(pi1_pong_count, 6030);
865 EXPECT_EQ(pi2_pong_count, 6030);
866
867 reader.Deregister();
868}
869
870// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
871TEST_P(MultinodeLoggerTest, SortParts) {
872 time_converter_.StartEqual();
873 // Make a bunch of parts.
874 {
875 LoggerState pi1_logger = MakeLogger(pi1_);
876 LoggerState pi2_logger = MakeLogger(pi2_);
877
878 event_loop_factory_.RunFor(chrono::milliseconds(95));
879
880 StartLogger(&pi1_logger);
881 StartLogger(&pi2_logger);
882
883 event_loop_factory_.RunFor(chrono::milliseconds(2000));
884 }
885
886 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
887 VerifyParts(sorted_parts);
888}
889
890// Tests that we can sort a bunch of parts with an empty part. We should ignore
891// it and remove it from the sorted list.
892TEST_P(MultinodeLoggerTest, SortEmptyParts) {
893 time_converter_.StartEqual();
894 // Make a bunch of parts.
895 {
896 LoggerState pi1_logger = MakeLogger(pi1_);
897 LoggerState pi2_logger = MakeLogger(pi2_);
898
899 event_loop_factory_.RunFor(chrono::milliseconds(95));
900
901 StartLogger(&pi1_logger);
902 StartLogger(&pi2_logger);
903
904 event_loop_factory_.RunFor(chrono::milliseconds(2000));
905 }
906
907 // TODO(austin): Should we flip out if the file can't open?
908 const std::string kEmptyFile("foobarinvalidfiledoesnotexist" + Extension());
909
910 aos::util::WriteStringToFileOrDie(kEmptyFile, "");
911 logfiles_.emplace_back(kEmptyFile);
912
913 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
914 VerifyParts(sorted_parts, {kEmptyFile});
915}
916
917// Tests that we can sort a bunch of parts with the end missing off a
918// file. We should use the part we can read.
919TEST_P(MultinodeLoggerTest, SortTruncatedParts) {
920 std::vector<std::string> actual_filenames;
921 time_converter_.StartEqual();
922 // Make a bunch of parts.
923 {
924 LoggerState pi1_logger = MakeLogger(pi1_);
925 LoggerState pi2_logger = MakeLogger(pi2_);
926
927 event_loop_factory_.RunFor(chrono::milliseconds(95));
928
929 StartLogger(&pi1_logger);
930 StartLogger(&pi2_logger);
931
932 event_loop_factory_.RunFor(chrono::milliseconds(2000));
933
934 pi1_logger.AppendAllFilenames(&actual_filenames);
935 pi2_logger.AppendAllFilenames(&actual_filenames);
936 }
937
938 ASSERT_THAT(actual_filenames,
939 ::testing::UnorderedElementsAreArray(logfiles_));
940
941 // Strip off the end of one of the files. Pick one with a lot of data.
942 // For snappy, needs to have enough data to be >1 chunk of compressed data so
943 // that we don't corrupt the entire log part.
944 ::std::string compressed_contents =
945 aos::util::ReadFileToStringOrDie(logfiles_[4]);
946
947 aos::util::WriteStringToFileOrDie(
948 logfiles_[4],
949 compressed_contents.substr(0, compressed_contents.size() - 100));
950
951 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
952 VerifyParts(sorted_parts);
953}
954
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700955// Tests that if we remap a logged channel, it shows up correctly.
Naman Guptaa63aa132023-03-22 20:06:34 -0700956TEST_P(MultinodeLoggerTest, RemapLoggedChannel) {
957 time_converter_.StartEqual();
958 {
959 LoggerState pi1_logger = MakeLogger(pi1_);
960 LoggerState pi2_logger = MakeLogger(pi2_);
961
962 event_loop_factory_.RunFor(chrono::milliseconds(95));
963
964 StartLogger(&pi1_logger);
965 StartLogger(&pi2_logger);
966
967 event_loop_factory_.RunFor(chrono::milliseconds(20000));
968 }
969
970 LogReader reader(SortParts(logfiles_));
971
972 // Remap just on pi1.
973 reader.RemapLoggedChannel<aos::timing::Report>(
974 "/aos", configuration::GetNode(reader.configuration(), "pi1"));
975
976 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
977 log_reader_factory.set_send_delay(chrono::microseconds(0));
978
979 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
980 // Note: An extra channel gets remapped automatically due to a timestamp
981 // channel being LOCAL_LOGGER'd.
982 ASSERT_EQ(remapped_channels.size(), std::get<0>(GetParam()).shared ? 1u : 2u);
983 EXPECT_EQ(remapped_channels[0]->name()->string_view(), "/original/pi1/aos");
984 EXPECT_EQ(remapped_channels[0]->type()->string_view(), "aos.timing.Report");
985 if (!std::get<0>(GetParam()).shared) {
986 EXPECT_EQ(remapped_channels[1]->name()->string_view(),
987 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
988 "aos-message_bridge-Timestamp");
989 EXPECT_EQ(remapped_channels[1]->type()->string_view(),
990 "aos.message_bridge.RemoteMessage");
991 }
992
993 reader.Register(&log_reader_factory);
994
995 const Node *pi1 =
996 configuration::GetNode(log_reader_factory.configuration(), "pi1");
997 const Node *pi2 =
998 configuration::GetNode(log_reader_factory.configuration(), "pi2");
999
1000 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1001 // else should have moved.
1002 std::unique_ptr<EventLoop> pi1_event_loop =
1003 log_reader_factory.MakeEventLoop("test", pi1);
1004 pi1_event_loop->SkipTimingReport();
1005 std::unique_ptr<EventLoop> full_pi1_event_loop =
1006 log_reader_factory.MakeEventLoop("test", pi1);
1007 full_pi1_event_loop->SkipTimingReport();
1008 std::unique_ptr<EventLoop> pi2_event_loop =
1009 log_reader_factory.MakeEventLoop("test", pi2);
1010 pi2_event_loop->SkipTimingReport();
1011
1012 MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
1013 "/aos");
1014 MessageCounter<aos::timing::Report> full_pi1_timing_report(
1015 full_pi1_event_loop.get(), "/pi1/aos");
1016 MessageCounter<aos::timing::Report> pi1_original_timing_report(
1017 pi1_event_loop.get(), "/original/aos");
1018 MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
1019 full_pi1_event_loop.get(), "/original/pi1/aos");
1020 MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
1021 "/aos");
1022
1023 log_reader_factory.Run();
1024
1025 EXPECT_EQ(pi1_timing_report.count(), 0u);
1026 EXPECT_EQ(full_pi1_timing_report.count(), 0u);
1027 EXPECT_NE(pi1_original_timing_report.count(), 0u);
1028 EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
1029 EXPECT_NE(pi2_timing_report.count(), 0u);
1030
1031 reader.Deregister();
1032}
1033
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001034// Tests that if we rename a logged channel, it shows up correctly.
1035TEST_P(MultinodeLoggerTest, RenameLoggedChannel) {
1036 std::vector<std::string> actual_filenames;
1037 time_converter_.StartEqual();
1038 {
1039 LoggerState pi1_logger = MakeLogger(pi1_);
1040 LoggerState pi2_logger = MakeLogger(pi2_);
1041
1042 event_loop_factory_.RunFor(chrono::milliseconds(95));
1043
1044 StartLogger(&pi1_logger);
1045 StartLogger(&pi2_logger);
1046
1047 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1048
1049 pi1_logger.AppendAllFilenames(&actual_filenames);
1050 pi2_logger.AppendAllFilenames(&actual_filenames);
1051 }
1052
1053 LogReader reader(SortParts(actual_filenames));
1054
1055 // Rename just on pi2. Add some global maps just to verify they get added in
1056 // the config and used correctly.
1057 std::vector<MapT> maps;
1058 {
1059 MapT map;
1060 map.match = std::make_unique<ChannelT>();
1061 map.match->name = "/foo*";
1062 map.match->source_node = "pi1";
1063 map.rename = std::make_unique<ChannelT>();
1064 map.rename->name = "/pi1/foo";
1065 maps.emplace_back(std::move(map));
1066 }
1067 {
1068 MapT map;
1069 map.match = std::make_unique<ChannelT>();
1070 map.match->name = "/foo*";
1071 map.match->source_node = "pi2";
1072 map.rename = std::make_unique<ChannelT>();
1073 map.rename->name = "/pi2/foo";
1074 maps.emplace_back(std::move(map));
1075 }
1076 {
1077 MapT map;
1078 map.match = std::make_unique<ChannelT>();
1079 map.match->name = "/foo";
1080 map.match->type = "aos.examples.Ping";
1081 map.rename = std::make_unique<ChannelT>();
1082 map.rename->name = "/foo/renamed";
1083 maps.emplace_back(std::move(map));
1084 }
1085 reader.RenameLoggedChannel<aos::examples::Ping>(
1086 "/aos", configuration::GetNode(reader.configuration(), "pi2"),
1087 "/pi2/foo/renamed", maps);
1088
1089 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1090 log_reader_factory.set_send_delay(chrono::microseconds(0));
1091
1092 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1093 // Note: An extra channel gets remapped automatically due to a timestamp
1094 // channel being LOCAL_LOGGER'd.
1095 const bool shared = std::get<0>(GetParam()).shared;
1096 ASSERT_EQ(remapped_channels.size(), shared ? 1u : 2u);
1097 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->name()->string_view(),
1098 "/pi2/foo/renamed");
1099 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->type()->string_view(),
1100 "aos.examples.Ping");
1101 if (!shared) {
1102 EXPECT_EQ(remapped_channels[0]->name()->string_view(),
1103 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1104 "aos-message_bridge-Timestamp");
1105 EXPECT_EQ(remapped_channels[0]->type()->string_view(),
1106 "aos.message_bridge.RemoteMessage");
1107 }
1108
1109 reader.Register(&log_reader_factory);
1110
1111 const Node *pi1 =
1112 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1113 const Node *pi2 =
1114 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1115
1116 // Confirm we can read the data on the renamed channel, just for pi2. Nothing
1117 // else should have moved.
1118 std::unique_ptr<EventLoop> pi2_event_loop =
1119 log_reader_factory.MakeEventLoop("test", pi2);
1120 pi2_event_loop->SkipTimingReport();
1121 std::unique_ptr<EventLoop> full_pi2_event_loop =
1122 log_reader_factory.MakeEventLoop("test", pi2);
1123 full_pi2_event_loop->SkipTimingReport();
1124 std::unique_ptr<EventLoop> pi1_event_loop =
1125 log_reader_factory.MakeEventLoop("test", pi1);
1126 pi1_event_loop->SkipTimingReport();
1127
1128 MessageCounter<aos::examples::Ping> pi2_ping(pi2_event_loop.get(), "/aos");
1129 MessageCounter<aos::examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1130 "/foo");
1131 MessageCounter<aos::examples::Ping> full_pi2_renamed_ping(
1132 full_pi2_event_loop.get(), "/pi2/foo/renamed");
1133 MessageCounter<aos::examples::Ping> pi1_ping(pi1_event_loop.get(), "/aos");
1134
1135 log_reader_factory.Run();
1136
1137 EXPECT_EQ(pi2_ping.count(), 0u);
1138 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1139 EXPECT_NE(full_pi2_renamed_ping.count(), 0u);
1140 EXPECT_NE(pi1_ping.count(), 0u);
1141
1142 reader.Deregister();
1143}
1144
Naman Guptaa63aa132023-03-22 20:06:34 -07001145// Tests that we can remap a forwarded channel as well.
1146TEST_P(MultinodeLoggerTest, RemapForwardedLoggedChannel) {
1147 time_converter_.StartEqual();
1148 {
1149 LoggerState pi1_logger = MakeLogger(pi1_);
1150 LoggerState pi2_logger = MakeLogger(pi2_);
1151
1152 event_loop_factory_.RunFor(chrono::milliseconds(95));
1153
1154 StartLogger(&pi1_logger);
1155 StartLogger(&pi2_logger);
1156
1157 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1158 }
1159
1160 LogReader reader(SortParts(logfiles_));
1161
1162 reader.RemapLoggedChannel<examples::Ping>("/test");
1163
1164 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1165 log_reader_factory.set_send_delay(chrono::microseconds(0));
1166
1167 reader.Register(&log_reader_factory);
1168
1169 const Node *pi1 =
1170 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1171 const Node *pi2 =
1172 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1173
1174 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1175 // else should have moved.
1176 std::unique_ptr<EventLoop> pi1_event_loop =
1177 log_reader_factory.MakeEventLoop("test", pi1);
1178 pi1_event_loop->SkipTimingReport();
1179 std::unique_ptr<EventLoop> full_pi1_event_loop =
1180 log_reader_factory.MakeEventLoop("test", pi1);
1181 full_pi1_event_loop->SkipTimingReport();
1182 std::unique_ptr<EventLoop> pi2_event_loop =
1183 log_reader_factory.MakeEventLoop("test", pi2);
1184 pi2_event_loop->SkipTimingReport();
1185
1186 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1187 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1188 MessageCounter<examples::Ping> pi1_original_ping(pi1_event_loop.get(),
1189 "/original/test");
1190 MessageCounter<examples::Ping> pi2_original_ping(pi2_event_loop.get(),
1191 "/original/test");
1192
1193 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1194 pi1_original_ping_timestamp;
1195 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1196 pi1_ping_timestamp;
1197 if (!shared()) {
1198 pi1_original_ping_timestamp =
1199 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1200 pi1_event_loop.get(),
1201 "/pi1/aos/remote_timestamps/pi2/original/test/aos-examples-Ping");
1202 pi1_ping_timestamp =
1203 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1204 pi1_event_loop.get(),
1205 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1206 }
1207
1208 log_reader_factory.Run();
1209
1210 EXPECT_EQ(pi1_ping.count(), 0u);
1211 EXPECT_EQ(pi2_ping.count(), 0u);
1212 EXPECT_NE(pi1_original_ping.count(), 0u);
1213 EXPECT_NE(pi2_original_ping.count(), 0u);
1214 if (!shared()) {
1215 EXPECT_NE(pi1_original_ping_timestamp->count(), 0u);
1216 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1217 }
1218
1219 reader.Deregister();
1220}
1221
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001222// Tests that we can rename a forwarded channel as well.
1223TEST_P(MultinodeLoggerTest, RenameForwardedLoggedChannel) {
1224 std::vector<std::string> actual_filenames;
1225 time_converter_.StartEqual();
1226 {
1227 LoggerState pi1_logger = MakeLogger(pi1_);
1228 LoggerState pi2_logger = MakeLogger(pi2_);
1229
1230 event_loop_factory_.RunFor(chrono::milliseconds(95));
1231
1232 StartLogger(&pi1_logger);
1233 StartLogger(&pi2_logger);
1234
1235 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1236
1237 pi1_logger.AppendAllFilenames(&actual_filenames);
1238 pi2_logger.AppendAllFilenames(&actual_filenames);
1239 }
1240
1241 LogReader reader(SortParts(actual_filenames));
1242
1243 std::vector<MapT> maps;
1244 {
1245 MapT map;
1246 map.match = std::make_unique<ChannelT>();
1247 map.match->name = "/production*";
1248 map.match->source_node = "pi1";
1249 map.rename = std::make_unique<ChannelT>();
1250 map.rename->name = "/pi1/production";
1251 maps.emplace_back(std::move(map));
1252 }
1253 {
1254 MapT map;
1255 map.match = std::make_unique<ChannelT>();
1256 map.match->name = "/production*";
1257 map.match->source_node = "pi2";
1258 map.rename = std::make_unique<ChannelT>();
1259 map.rename->name = "/pi2/production";
1260 maps.emplace_back(std::move(map));
1261 }
1262 reader.RenameLoggedChannel<aos::examples::Ping>(
1263 "/test", configuration::GetNode(reader.configuration(), "pi1"),
1264 "/pi1/production", maps);
1265
1266 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1267 log_reader_factory.set_send_delay(chrono::microseconds(0));
1268
1269 reader.Register(&log_reader_factory);
1270
1271 const Node *pi1 =
1272 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1273 const Node *pi2 =
1274 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1275
1276 // Confirm we can read the data on the renamed channel, on both the source
1277 // node and the remote node. In case of split timestamp channels, confirm that
1278 // we receive the timestamp messages on the renamed channel as well.
1279 std::unique_ptr<EventLoop> pi1_event_loop =
1280 log_reader_factory.MakeEventLoop("test", pi1);
1281 pi1_event_loop->SkipTimingReport();
1282 std::unique_ptr<EventLoop> full_pi1_event_loop =
1283 log_reader_factory.MakeEventLoop("test", pi1);
1284 full_pi1_event_loop->SkipTimingReport();
1285 std::unique_ptr<EventLoop> pi2_event_loop =
1286 log_reader_factory.MakeEventLoop("test", pi2);
1287 pi2_event_loop->SkipTimingReport();
1288
1289 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1290 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1291 MessageCounter<examples::Ping> pi1_renamed_ping(pi1_event_loop.get(),
1292 "/pi1/production");
1293 MessageCounter<examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1294 "/pi1/production");
1295
1296 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1297 pi1_renamed_ping_timestamp;
1298 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1299 pi1_ping_timestamp;
1300 if (!shared()) {
1301 pi1_renamed_ping_timestamp =
1302 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1303 pi1_event_loop.get(),
1304 "/pi1/aos/remote_timestamps/pi2/pi1/production/aos-examples-Ping");
1305 pi1_ping_timestamp =
1306 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1307 pi1_event_loop.get(),
1308 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1309 }
1310
1311 log_reader_factory.Run();
1312
1313 EXPECT_EQ(pi1_ping.count(), 0u);
1314 EXPECT_EQ(pi2_ping.count(), 0u);
1315 EXPECT_NE(pi1_renamed_ping.count(), 0u);
1316 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1317 if (!shared()) {
1318 EXPECT_NE(pi1_renamed_ping_timestamp->count(), 0u);
1319 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1320 }
1321
1322 reader.Deregister();
1323}
1324
Naman Guptaa63aa132023-03-22 20:06:34 -07001325// Tests that we observe all the same events in log replay (for a given node)
1326// whether we just register an event loop for that node or if we register a full
1327// event loop factory.
1328TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
1329 time_converter_.StartEqual();
1330 constexpr chrono::milliseconds kStartupDelay(95);
1331 {
1332 LoggerState pi1_logger = MakeLogger(pi1_);
1333 LoggerState pi2_logger = MakeLogger(pi2_);
1334
1335 event_loop_factory_.RunFor(kStartupDelay);
1336
1337 StartLogger(&pi1_logger);
1338 StartLogger(&pi2_logger);
1339
1340 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1341 }
1342
1343 LogReader full_reader(SortParts(logfiles_));
1344 LogReader single_node_reader(SortParts(logfiles_));
1345
1346 SimulatedEventLoopFactory full_factory(full_reader.configuration());
1347 SimulatedEventLoopFactory single_node_factory(
1348 single_node_reader.configuration());
1349 single_node_factory.SkipTimingReport();
1350 single_node_factory.DisableStatistics();
1351 std::unique_ptr<EventLoop> replay_event_loop =
1352 single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
1353 "log_reader");
1354
1355 full_reader.Register(&full_factory);
1356 single_node_reader.Register(replay_event_loop.get());
1357
1358 const Node *full_pi1 =
1359 configuration::GetNode(full_factory.configuration(), "pi1");
1360
1361 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1362 // else should have moved.
1363 std::unique_ptr<EventLoop> full_event_loop =
1364 full_factory.MakeEventLoop("test", full_pi1);
1365 full_event_loop->SkipTimingReport();
1366 full_event_loop->SkipAosLog();
1367 // maps are indexed on channel index.
1368 // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
1369 std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
1370 observed_messages;
1371 std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
1372 for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
1373 ++ii) {
1374 const Channel *channel =
1375 full_event_loop->configuration()->channels()->Get(ii);
1376 // We currently don't support replaying remote timestamp channels in
1377 // realtime replay (unless the remote timestamp channel was not NOT_LOGGED,
1378 // in which case it gets auto-remapped and replayed on a /original channel).
1379 if (channel->name()->string_view().find("remote_timestamp") !=
1380 std::string_view::npos &&
1381 channel->name()->string_view().find("/original") ==
1382 std::string_view::npos) {
1383 continue;
1384 }
1385 if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
1386 observed_messages[ii] = {};
1387 fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
1388 full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
1389 if (fetchers[ii]->Fetch()) {
1390 observed_messages[ii].push_back(std::make_pair(
1391 fetchers[ii]->context().monotonic_event_time, true));
1392 }
1393 });
1394 full_event_loop->MakeRawNoArgWatcher(
1395 channel, [ii, &observed_messages](const Context &context) {
1396 observed_messages[ii].push_back(
1397 std::make_pair(context.monotonic_event_time, false));
1398 });
1399 }
1400 }
1401
1402 full_factory.Run();
1403 fetchers.clear();
1404 full_reader.Deregister();
1405
1406 const Node *single_node_pi1 =
1407 configuration::GetNode(single_node_factory.configuration(), "pi1");
1408 std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
1409
1410 std::unique_ptr<EventLoop> single_node_event_loop =
1411 single_node_factory.MakeEventLoop("test", single_node_pi1);
1412 single_node_event_loop->SkipTimingReport();
1413 single_node_event_loop->SkipAosLog();
1414 for (size_t ii = 0;
1415 ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
1416 const Channel *channel =
1417 single_node_event_loop->configuration()->channels()->Get(ii);
1418 single_node_factory.DisableForwarding(channel);
1419 if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
1420 single_node_fetchers[ii] =
1421 single_node_event_loop->MakeRawFetcher(channel);
1422 single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
1423 EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
1424 << "Single EventLoop replay doesn't support pre-loading fetchers. "
1425 << configuration::StrippedChannelToString(channel);
1426 });
1427 single_node_event_loop->MakeRawNoArgWatcher(
1428 channel, [ii, &observed_messages, channel,
1429 kStartupDelay](const Context &context) {
1430 if (observed_messages[ii].empty()) {
1431 FAIL() << "Observed extra message at "
1432 << context.monotonic_event_time << " on "
1433 << configuration::StrippedChannelToString(channel);
1434 return;
1435 }
1436 const std::pair<monotonic_clock::time_point, bool> &message =
1437 observed_messages[ii].front();
1438 if (message.second) {
1439 EXPECT_LE(message.first,
1440 context.monotonic_event_time + kStartupDelay)
1441 << "Mismatched message times " << context.monotonic_event_time
1442 << " and " << message.first << " on "
1443 << configuration::StrippedChannelToString(channel);
1444 } else {
1445 EXPECT_EQ(message.first,
1446 context.monotonic_event_time + kStartupDelay)
1447 << "Mismatched message times " << context.monotonic_event_time
1448 << " and " << message.first << " on "
1449 << configuration::StrippedChannelToString(channel);
1450 }
1451 observed_messages[ii].erase(observed_messages[ii].begin());
1452 });
1453 }
1454 }
1455
1456 single_node_factory.Run();
1457
1458 single_node_fetchers.clear();
1459
1460 single_node_reader.Deregister();
1461
1462 for (const auto &pair : observed_messages) {
1463 EXPECT_TRUE(pair.second.empty())
1464 << "Missed " << pair.second.size() << " messages on "
1465 << configuration::StrippedChannelToString(
1466 single_node_event_loop->configuration()->channels()->Get(
1467 pair.first));
1468 }
1469}
1470
1471// Tests that we properly recreate forwarded timestamps when replaying a log.
1472// This should be enough that we can then re-run the logger and get a valid log
1473// back.
1474TEST_P(MultinodeLoggerTest, MessageHeader) {
1475 time_converter_.StartEqual();
1476 {
1477 LoggerState pi1_logger = MakeLogger(pi1_);
1478 LoggerState pi2_logger = MakeLogger(pi2_);
1479
1480 event_loop_factory_.RunFor(chrono::milliseconds(95));
1481
1482 StartLogger(&pi1_logger);
1483 StartLogger(&pi2_logger);
1484
1485 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1486 }
1487
1488 LogReader reader(SortParts(logfiles_));
1489
1490 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1491 log_reader_factory.set_send_delay(chrono::microseconds(0));
1492
1493 // This sends out the fetched messages and advances time to the start of the
1494 // log file.
1495 reader.Register(&log_reader_factory);
1496
1497 const Node *pi1 =
1498 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1499 const Node *pi2 =
1500 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1501
1502 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1503 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1504 LOG(INFO) << "now pi1 "
1505 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1506 LOG(INFO) << "now pi2 "
1507 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1508
1509 EXPECT_THAT(reader.LoggedNodes(),
1510 ::testing::ElementsAre(
1511 configuration::GetNode(reader.logged_configuration(), pi1),
1512 configuration::GetNode(reader.logged_configuration(), pi2)));
1513
1514 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1515
1516 std::unique_ptr<EventLoop> pi1_event_loop =
1517 log_reader_factory.MakeEventLoop("test", pi1);
1518 std::unique_ptr<EventLoop> pi2_event_loop =
1519 log_reader_factory.MakeEventLoop("test", pi2);
1520
1521 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
1522 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1523 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
1524 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1525
1526 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1527 pi1_event_loop->MakeFetcher<examples::Ping>("/test");
1528 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1529 pi2_event_loop->MakeFetcher<examples::Ping>("/test");
1530
1531 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
1532 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1533 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
1534 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1535
1536 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1537 pi2_event_loop->MakeFetcher<examples::Pong>("/test");
1538 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1539 pi1_event_loop->MakeFetcher<examples::Pong>("/test");
1540
1541 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
1542 pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
1543 const size_t ping_timestamp_channel = configuration::ChannelIndex(
1544 pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
1545
1546 const size_t pi2_timestamp_channel = configuration::ChannelIndex(
1547 pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
1548 const size_t pong_timestamp_channel = configuration::ChannelIndex(
1549 pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
1550
1551 const chrono::nanoseconds network_delay = event_loop_factory_.network_delay();
1552 const chrono::nanoseconds send_delay = event_loop_factory_.send_delay();
1553
1554 for (std::pair<int, std::string> channel :
1555 shared()
1556 ? std::vector<
1557 std::pair<int, std::string>>{{-1,
1558 "/aos/remote_timestamps/pi2"}}
1559 : std::vector<std::pair<int, std::string>>{
1560 {pi1_timestamp_channel,
1561 "/aos/remote_timestamps/pi2/pi1/aos/"
1562 "aos-message_bridge-Timestamp"},
1563 {ping_timestamp_channel,
1564 "/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
1565 pi1_event_loop->MakeWatcher(
1566 channel.second,
1567 [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
1568 ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
1569 &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
1570 &ping_on_pi2_fetcher, network_delay, send_delay,
1571 channel_index = channel.first](const RemoteMessage &header) {
1572 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1573 chrono::nanoseconds(header.monotonic_sent_time()));
1574 const aos::realtime_clock::time_point header_realtime_sent_time(
1575 chrono::nanoseconds(header.realtime_sent_time()));
1576 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1577 chrono::nanoseconds(header.monotonic_remote_time()));
1578 const aos::realtime_clock::time_point header_realtime_remote_time(
1579 chrono::nanoseconds(header.realtime_remote_time()));
1580
1581 if (channel_index != -1) {
1582 ASSERT_EQ(channel_index, header.channel_index());
1583 }
1584
1585 const Context *pi1_context = nullptr;
1586 const Context *pi2_context = nullptr;
1587
1588 if (header.channel_index() == pi1_timestamp_channel) {
1589 ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
1590 ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
1591 pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
1592 pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
1593 } else if (header.channel_index() == ping_timestamp_channel) {
1594 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
1595 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
1596 pi1_context = &ping_on_pi1_fetcher.context();
1597 pi2_context = &ping_on_pi2_fetcher.context();
1598 } else {
1599 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1600 << configuration::CleanedChannelToString(
1601 pi1_event_loop->configuration()->channels()->Get(
1602 header.channel_index()));
1603 }
1604
1605 ASSERT_TRUE(header.has_boot_uuid());
1606 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1607 pi2_event_loop->boot_uuid());
1608
1609 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
1610 EXPECT_EQ(pi2_context->remote_queue_index,
1611 header.remote_queue_index());
1612 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
1613
1614 EXPECT_EQ(pi2_context->monotonic_event_time,
1615 header_monotonic_sent_time);
1616 EXPECT_EQ(pi2_context->realtime_event_time,
1617 header_realtime_sent_time);
1618 EXPECT_EQ(pi2_context->realtime_remote_time,
1619 header_realtime_remote_time);
1620 EXPECT_EQ(pi2_context->monotonic_remote_time,
1621 header_monotonic_remote_time);
1622
1623 EXPECT_EQ(pi1_context->realtime_event_time,
1624 header_realtime_remote_time);
1625 EXPECT_EQ(pi1_context->monotonic_event_time,
1626 header_monotonic_remote_time);
1627
1628 // Time estimation isn't perfect, but we know the clocks were
1629 // identical when logged, so we know when this should have come back.
1630 // Confirm we got it when we expected.
1631 EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
1632 pi1_context->monotonic_event_time + 2 * network_delay +
1633 send_delay);
1634 });
1635 }
1636 for (std::pair<int, std::string> channel :
1637 shared()
1638 ? std::vector<
1639 std::pair<int, std::string>>{{-1,
1640 "/aos/remote_timestamps/pi1"}}
1641 : std::vector<std::pair<int, std::string>>{
1642 {pi2_timestamp_channel,
1643 "/aos/remote_timestamps/pi1/pi2/aos/"
1644 "aos-message_bridge-Timestamp"}}) {
1645 pi2_event_loop->MakeWatcher(
1646 channel.second,
1647 [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
1648 pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
1649 &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
1650 &pong_on_pi1_fetcher, network_delay, send_delay,
1651 channel_index = channel.first](const RemoteMessage &header) {
1652 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1653 chrono::nanoseconds(header.monotonic_sent_time()));
1654 const aos::realtime_clock::time_point header_realtime_sent_time(
1655 chrono::nanoseconds(header.realtime_sent_time()));
1656 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1657 chrono::nanoseconds(header.monotonic_remote_time()));
1658 const aos::realtime_clock::time_point header_realtime_remote_time(
1659 chrono::nanoseconds(header.realtime_remote_time()));
1660
1661 if (channel_index != -1) {
1662 ASSERT_EQ(channel_index, header.channel_index());
1663 }
1664
1665 const Context *pi2_context = nullptr;
1666 const Context *pi1_context = nullptr;
1667
1668 if (header.channel_index() == pi2_timestamp_channel) {
1669 ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
1670 ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
1671 pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
1672 pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
1673 } else if (header.channel_index() == pong_timestamp_channel) {
1674 ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
1675 ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
1676 pi2_context = &pong_on_pi2_fetcher.context();
1677 pi1_context = &pong_on_pi1_fetcher.context();
1678 } else {
1679 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1680 << configuration::CleanedChannelToString(
1681 pi2_event_loop->configuration()->channels()->Get(
1682 header.channel_index()));
1683 }
1684
1685 ASSERT_TRUE(header.has_boot_uuid());
1686 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1687 pi1_event_loop->boot_uuid());
1688
1689 EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
1690 EXPECT_EQ(pi1_context->remote_queue_index,
1691 header.remote_queue_index());
1692 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
1693
1694 EXPECT_EQ(pi1_context->monotonic_event_time,
1695 header_monotonic_sent_time);
1696 EXPECT_EQ(pi1_context->realtime_event_time,
1697 header_realtime_sent_time);
1698 EXPECT_EQ(pi1_context->realtime_remote_time,
1699 header_realtime_remote_time);
1700 EXPECT_EQ(pi1_context->monotonic_remote_time,
1701 header_monotonic_remote_time);
1702
1703 EXPECT_EQ(pi2_context->realtime_event_time,
1704 header_realtime_remote_time);
1705 EXPECT_EQ(pi2_context->monotonic_event_time,
1706 header_monotonic_remote_time);
1707
1708 // Time estimation isn't perfect, but we know the clocks were
1709 // identical when logged, so we know when this should have come back.
1710 // Confirm we got it when we expected.
1711 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
1712 pi2_context->monotonic_event_time + 2 * network_delay +
1713 send_delay);
1714 });
1715 }
1716
1717 // And confirm we can re-create a log again, while checking the contents.
1718 {
1719 LoggerState pi1_logger = MakeLogger(
1720 log_reader_factory.GetNodeEventLoopFactory("pi1"), &log_reader_factory);
1721 LoggerState pi2_logger = MakeLogger(
1722 log_reader_factory.GetNodeEventLoopFactory("pi2"), &log_reader_factory);
1723
1724 StartLogger(&pi1_logger, tmp_dir_ + "/relogged1");
1725 StartLogger(&pi2_logger, tmp_dir_ + "/relogged2");
1726
1727 log_reader_factory.Run();
1728 }
1729
1730 reader.Deregister();
1731
1732 // And verify that we can run the LogReader over the relogged files without
1733 // hitting any fatal errors.
1734 {
1735 LogReader relogged_reader(SortParts(MakeLogFiles(
1736 tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 3, 3, true)));
1737 relogged_reader.Register();
1738
1739 relogged_reader.event_loop_factory()->Run();
1740 }
1741 // And confirm that we can read the logged file using the reader's
1742 // configuration.
1743 {
1744 LogReader relogged_reader(
1745 SortParts(MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2",
1746 3, 3, true)),
1747 reader.configuration());
1748 relogged_reader.Register();
1749
1750 relogged_reader.event_loop_factory()->Run();
1751 }
1752}
1753
1754// Tests that we properly populate and extract the logger_start time by setting
1755// up a clock difference between 2 nodes and looking at the resulting parts.
1756TEST_P(MultinodeLoggerTest, LoggerStartTime) {
1757 std::vector<std::string> actual_filenames;
1758 time_converter_.AddMonotonic(
1759 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1760 {
1761 LoggerState pi1_logger = MakeLogger(pi1_);
1762 LoggerState pi2_logger = MakeLogger(pi2_);
1763
1764 StartLogger(&pi1_logger);
1765 StartLogger(&pi2_logger);
1766
1767 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1768
1769 pi1_logger.AppendAllFilenames(&actual_filenames);
1770 pi2_logger.AppendAllFilenames(&actual_filenames);
1771 }
1772
1773 ASSERT_THAT(actual_filenames,
1774 ::testing::UnorderedElementsAreArray(logfiles_));
1775
1776 for (const LogFile &log_file : SortParts(logfiles_)) {
1777 for (const LogParts &log_part : log_file.parts) {
1778 if (log_part.node == log_file.logger_node) {
1779 EXPECT_EQ(log_part.logger_monotonic_start_time,
1780 aos::monotonic_clock::min_time);
1781 EXPECT_EQ(log_part.logger_realtime_start_time,
1782 aos::realtime_clock::min_time);
1783 } else {
1784 const chrono::seconds offset = log_file.logger_node == "pi1"
1785 ? -chrono::seconds(1000)
1786 : chrono::seconds(1000);
1787 EXPECT_EQ(log_part.logger_monotonic_start_time,
1788 log_part.monotonic_start_time + offset);
1789 EXPECT_EQ(log_part.logger_realtime_start_time,
1790 log_file.realtime_start_time +
1791 (log_part.logger_monotonic_start_time -
1792 log_file.monotonic_start_time));
1793 }
1794 }
1795 }
1796}
1797
1798// Test that renaming the base, renames the folder.
1799TEST_P(MultinodeLoggerTest, LoggerRenameFolder) {
1800 util::UnlinkRecursive(tmp_dir_ + "/renamefolder");
1801 util::UnlinkRecursive(tmp_dir_ + "/new-good");
1802 time_converter_.AddMonotonic(
1803 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1804 logfile_base1_ = tmp_dir_ + "/renamefolder/multi_logfile1";
1805 logfile_base2_ = tmp_dir_ + "/renamefolder/multi_logfile2";
1806 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
1807 LoggerState pi1_logger = MakeLogger(pi1_);
1808 LoggerState pi2_logger = MakeLogger(pi2_);
1809
1810 StartLogger(&pi1_logger);
1811 StartLogger(&pi2_logger);
1812
1813 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1814 logfile_base1_ = tmp_dir_ + "/new-good/multi_logfile1";
1815 logfile_base2_ = tmp_dir_ + "/new-good/multi_logfile2";
1816 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
1817 ASSERT_TRUE(pi1_logger.logger->RenameLogBase(logfile_base1_));
1818 ASSERT_TRUE(pi2_logger.logger->RenameLogBase(logfile_base2_));
1819 for (auto &file : logfiles_) {
1820 struct stat s;
1821 EXPECT_EQ(0, stat(file.c_str(), &s));
1822 }
1823}
1824
1825// Test that renaming the file base dies.
1826TEST_P(MultinodeLoggerDeathTest, LoggerRenameFile) {
1827 time_converter_.AddMonotonic(
1828 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1829 util::UnlinkRecursive(tmp_dir_ + "/renamefile");
1830 logfile_base1_ = tmp_dir_ + "/renamefile/multi_logfile1";
1831 logfile_base2_ = tmp_dir_ + "/renamefile/multi_logfile2";
1832 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
1833 LoggerState pi1_logger = MakeLogger(pi1_);
1834 StartLogger(&pi1_logger);
1835 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1836 logfile_base1_ = tmp_dir_ + "/new-renamefile/new_multi_logfile1";
1837 EXPECT_DEATH({ pi1_logger.logger->RenameLogBase(logfile_base1_); },
1838 "Rename of file base from");
1839}
1840
1841// TODO(austin): We can write a test which recreates a logfile and confirms that
1842// we get it back. That is the ultimate test.
1843
1844// Tests that we properly recreate forwarded timestamps when replaying a log.
1845// This should be enough that we can then re-run the logger and get a valid log
1846// back.
1847TEST_P(MultinodeLoggerTest, RemoteReboot) {
1848 std::vector<std::string> actual_filenames;
1849
1850 const UUID pi1_boot0 = UUID::Random();
1851 const UUID pi2_boot0 = UUID::Random();
1852 const UUID pi2_boot1 = UUID::Random();
1853 {
1854 CHECK_EQ(pi1_index_, 0u);
1855 CHECK_EQ(pi2_index_, 1u);
1856
1857 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
1858 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
1859 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
1860
1861 time_converter_.AddNextTimestamp(
1862 distributed_clock::epoch(),
1863 {BootTimestamp::epoch(), BootTimestamp::epoch()});
1864 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
1865 time_converter_.AddNextTimestamp(
1866 distributed_clock::epoch() + reboot_time,
1867 {BootTimestamp::epoch() + reboot_time,
1868 BootTimestamp{
1869 .boot = 1,
1870 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
1871 }
1872
1873 {
1874 LoggerState pi1_logger = MakeLogger(pi1_);
1875
1876 event_loop_factory_.RunFor(chrono::milliseconds(95));
1877 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
1878 pi1_boot0);
1879 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
1880 pi2_boot0);
1881
1882 StartLogger(&pi1_logger);
1883
1884 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1885
1886 VLOG(1) << "Reboot now!";
1887
1888 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1889 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
1890 pi1_boot0);
1891 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
1892 pi2_boot1);
1893
1894 pi1_logger.AppendAllFilenames(&actual_filenames);
1895 }
1896
1897 std::sort(actual_filenames.begin(), actual_filenames.end());
1898 std::sort(pi1_reboot_logfiles_.begin(), pi1_reboot_logfiles_.end());
1899 ASSERT_THAT(actual_filenames,
1900 ::testing::UnorderedElementsAreArray(pi1_reboot_logfiles_));
1901
1902 // Confirm that our new oldest timestamps properly update as we reboot and
1903 // rotate.
1904 for (const std::string &file : pi1_reboot_logfiles_) {
1905 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
1906 ReadHeader(file);
1907 CHECK(log_header);
1908 if (log_header->message().has_configuration()) {
1909 continue;
1910 }
1911
1912 const monotonic_clock::time_point monotonic_start_time =
1913 monotonic_clock::time_point(
1914 chrono::nanoseconds(log_header->message().monotonic_start_time()));
1915 const UUID source_node_boot_uuid = UUID::FromString(
1916 log_header->message().source_node_boot_uuid()->string_view());
1917
1918 if (log_header->message().node()->name()->string_view() != "pi1") {
1919 // The remote message channel should rotate later and have more parts.
1920 // This only is true on the log files with shared remote messages.
1921 //
1922 // TODO(austin): I'm not the most thrilled with this test pattern... It
1923 // feels brittle in a different way.
1924 if (file.find("aos.message_bridge.RemoteMessage") == std::string::npos ||
1925 !shared()) {
1926 switch (log_header->message().parts_index()) {
1927 case 0:
1928 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1929 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
1930 break;
1931 case 1:
1932 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1933 ASSERT_EQ(monotonic_start_time,
1934 monotonic_clock::epoch() + chrono::seconds(1));
1935 break;
1936 case 2:
1937 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1938 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
1939 break;
1940 case 3:
1941 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1942 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
1943 chrono::nanoseconds(2322999462))
1944 << " on " << file;
1945 break;
1946 default:
1947 FAIL();
1948 break;
1949 }
1950 } else {
1951 switch (log_header->message().parts_index()) {
1952 case 0:
1953 case 1:
1954 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1955 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
1956 break;
1957 case 2:
1958 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1959 ASSERT_EQ(monotonic_start_time,
1960 monotonic_clock::epoch() + chrono::seconds(1));
1961 break;
1962 case 3:
1963 case 4:
1964 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1965 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
1966 break;
1967 case 5:
1968 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1969 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
1970 chrono::nanoseconds(2322999462))
1971 << " on " << file;
1972 break;
1973 default:
1974 FAIL();
1975 break;
1976 }
1977 }
1978 continue;
1979 }
1980 SCOPED_TRACE(file);
1981 SCOPED_TRACE(aos::FlatbufferToJson(
1982 *log_header, {.multi_line = true, .max_vector_size = 100}));
1983 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
1984 ASSERT_EQ(
1985 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
1986 EXPECT_EQ(
1987 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
1988 monotonic_clock::max_time.time_since_epoch().count());
1989 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
1990 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
1991 2u);
1992 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
1993 monotonic_clock::max_time.time_since_epoch().count());
1994 ASSERT_TRUE(log_header->message()
1995 .has_oldest_remote_unreliable_monotonic_timestamps());
1996 ASSERT_EQ(log_header->message()
1997 .oldest_remote_unreliable_monotonic_timestamps()
1998 ->size(),
1999 2u);
2000 EXPECT_EQ(log_header->message()
2001 .oldest_remote_unreliable_monotonic_timestamps()
2002 ->Get(0),
2003 monotonic_clock::max_time.time_since_epoch().count());
2004 ASSERT_TRUE(log_header->message()
2005 .has_oldest_local_unreliable_monotonic_timestamps());
2006 ASSERT_EQ(log_header->message()
2007 .oldest_local_unreliable_monotonic_timestamps()
2008 ->size(),
2009 2u);
2010 EXPECT_EQ(log_header->message()
2011 .oldest_local_unreliable_monotonic_timestamps()
2012 ->Get(0),
2013 monotonic_clock::max_time.time_since_epoch().count());
2014
2015 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2016 monotonic_clock::time_point(chrono::nanoseconds(
2017 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2018 1)));
2019 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2020 monotonic_clock::time_point(chrono::nanoseconds(
2021 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2022 const monotonic_clock::time_point
2023 oldest_remote_unreliable_monotonic_timestamps =
2024 monotonic_clock::time_point(chrono::nanoseconds(
2025 log_header->message()
2026 .oldest_remote_unreliable_monotonic_timestamps()
2027 ->Get(1)));
2028 const monotonic_clock::time_point
2029 oldest_local_unreliable_monotonic_timestamps =
2030 monotonic_clock::time_point(chrono::nanoseconds(
2031 log_header->message()
2032 .oldest_local_unreliable_monotonic_timestamps()
2033 ->Get(1)));
2034 const monotonic_clock::time_point
2035 oldest_remote_reliable_monotonic_timestamps =
2036 monotonic_clock::time_point(chrono::nanoseconds(
2037 log_header->message()
2038 .oldest_remote_reliable_monotonic_timestamps()
2039 ->Get(1)));
2040 const monotonic_clock::time_point
2041 oldest_local_reliable_monotonic_timestamps =
2042 monotonic_clock::time_point(chrono::nanoseconds(
2043 log_header->message()
2044 .oldest_local_reliable_monotonic_timestamps()
2045 ->Get(1)));
2046 const monotonic_clock::time_point
2047 oldest_logger_remote_unreliable_monotonic_timestamps =
2048 monotonic_clock::time_point(chrono::nanoseconds(
2049 log_header->message()
2050 .oldest_logger_remote_unreliable_monotonic_timestamps()
2051 ->Get(0)));
2052 const monotonic_clock::time_point
2053 oldest_logger_local_unreliable_monotonic_timestamps =
2054 monotonic_clock::time_point(chrono::nanoseconds(
2055 log_header->message()
2056 .oldest_logger_local_unreliable_monotonic_timestamps()
2057 ->Get(0)));
2058 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2059 monotonic_clock::max_time);
2060 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2061 monotonic_clock::max_time);
2062 switch (log_header->message().parts_index()) {
2063 case 0:
2064 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2065 monotonic_clock::max_time);
2066 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2067 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2068 monotonic_clock::max_time);
2069 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2070 monotonic_clock::max_time);
2071 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2072 monotonic_clock::max_time);
2073 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2074 monotonic_clock::max_time);
2075 break;
2076 case 1:
2077 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2078 monotonic_clock::time_point(chrono::microseconds(90200)));
2079 EXPECT_EQ(oldest_local_monotonic_timestamps,
2080 monotonic_clock::time_point(chrono::microseconds(90350)));
2081 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2082 monotonic_clock::time_point(chrono::microseconds(90200)));
2083 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2084 monotonic_clock::time_point(chrono::microseconds(90350)));
2085 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2086 monotonic_clock::max_time);
2087 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2088 monotonic_clock::max_time);
2089 break;
2090 case 2:
2091 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2092 monotonic_clock::time_point(chrono::microseconds(90200)))
2093 << file;
2094 EXPECT_EQ(oldest_local_monotonic_timestamps,
2095 monotonic_clock::time_point(chrono::microseconds(90350)))
2096 << file;
2097 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2098 monotonic_clock::time_point(chrono::microseconds(90200)))
2099 << file;
2100 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2101 monotonic_clock::time_point(chrono::microseconds(90350)))
2102 << file;
2103 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2104 monotonic_clock::time_point(chrono::microseconds(100000)))
2105 << file;
2106 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2107 monotonic_clock::time_point(chrono::microseconds(100150)))
2108 << file;
2109 break;
2110 case 3:
2111 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2112 monotonic_clock::time_point(chrono::milliseconds(1323) +
2113 chrono::microseconds(200)));
2114 EXPECT_EQ(oldest_local_monotonic_timestamps,
2115 monotonic_clock::time_point(chrono::microseconds(10100350)));
2116 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2117 monotonic_clock::time_point(chrono::milliseconds(1323) +
2118 chrono::microseconds(200)));
2119 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2120 monotonic_clock::time_point(chrono::microseconds(10100350)));
2121 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2122 monotonic_clock::max_time)
2123 << file;
2124 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2125 monotonic_clock::max_time)
2126 << file;
2127 break;
2128 case 4:
2129 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2130 monotonic_clock::time_point(chrono::milliseconds(1323) +
2131 chrono::microseconds(200)));
2132 EXPECT_EQ(oldest_local_monotonic_timestamps,
2133 monotonic_clock::time_point(chrono::microseconds(10100350)));
2134 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2135 monotonic_clock::time_point(chrono::milliseconds(1323) +
2136 chrono::microseconds(200)));
2137 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2138 monotonic_clock::time_point(chrono::microseconds(10100350)));
2139 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2140 monotonic_clock::time_point(chrono::microseconds(1423000)))
2141 << file;
2142 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2143 monotonic_clock::time_point(chrono::microseconds(10200150)))
2144 << file;
2145 break;
2146 default:
2147 FAIL();
2148 break;
2149 }
2150 }
2151
2152 // Confirm that we refuse to replay logs with missing boot uuids.
2153 {
2154 LogReader reader(SortParts(pi1_reboot_logfiles_));
2155
2156 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2157 log_reader_factory.set_send_delay(chrono::microseconds(0));
2158
2159 // This sends out the fetched messages and advances time to the start of
2160 // the log file.
2161 reader.Register(&log_reader_factory);
2162
2163 log_reader_factory.Run();
2164
2165 reader.Deregister();
2166 }
2167}
2168
2169// Tests that we can sort a log which only has timestamps from the remote
2170// because the local message_bridge_client failed to connect.
2171TEST_P(MultinodeLoggerTest, RemoteRebootOnlyTimestamps) {
2172 const UUID pi1_boot0 = UUID::Random();
2173 const UUID pi2_boot0 = UUID::Random();
2174 const UUID pi2_boot1 = UUID::Random();
2175 {
2176 CHECK_EQ(pi1_index_, 0u);
2177 CHECK_EQ(pi2_index_, 1u);
2178
2179 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2180 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2181 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2182
2183 time_converter_.AddNextTimestamp(
2184 distributed_clock::epoch(),
2185 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2186 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2187 time_converter_.AddNextTimestamp(
2188 distributed_clock::epoch() + reboot_time,
2189 {BootTimestamp::epoch() + reboot_time,
2190 BootTimestamp{
2191 .boot = 1,
2192 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2193 }
2194 pi2_->Disconnect(pi1_->node());
2195
2196 std::vector<std::string> filenames;
2197 {
2198 LoggerState pi1_logger = MakeLogger(pi1_);
2199
2200 event_loop_factory_.RunFor(chrono::milliseconds(95));
2201 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2202 pi1_boot0);
2203 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2204 pi2_boot0);
2205
2206 StartLogger(&pi1_logger);
2207
2208 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2209
2210 VLOG(1) << "Reboot now!";
2211
2212 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2213 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2214 pi1_boot0);
2215 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2216 pi2_boot1);
2217 pi1_logger.AppendAllFilenames(&filenames);
2218 }
2219
2220 std::sort(filenames.begin(), filenames.end());
2221
2222 // Confirm that our new oldest timestamps properly update as we reboot and
2223 // rotate.
2224 size_t timestamp_file_count = 0;
2225 for (const std::string &file : filenames) {
2226 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2227 ReadHeader(file);
2228 CHECK(log_header);
2229
2230 if (log_header->message().has_configuration()) {
2231 continue;
2232 }
2233
2234 const monotonic_clock::time_point monotonic_start_time =
2235 monotonic_clock::time_point(
2236 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2237 const UUID source_node_boot_uuid = UUID::FromString(
2238 log_header->message().source_node_boot_uuid()->string_view());
2239
2240 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2241 ASSERT_EQ(
2242 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2243 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2244 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2245 2u);
2246 ASSERT_TRUE(log_header->message()
2247 .has_oldest_remote_unreliable_monotonic_timestamps());
2248 ASSERT_EQ(log_header->message()
2249 .oldest_remote_unreliable_monotonic_timestamps()
2250 ->size(),
2251 2u);
2252 ASSERT_TRUE(log_header->message()
2253 .has_oldest_local_unreliable_monotonic_timestamps());
2254 ASSERT_EQ(log_header->message()
2255 .oldest_local_unreliable_monotonic_timestamps()
2256 ->size(),
2257 2u);
2258 ASSERT_TRUE(log_header->message()
2259 .has_oldest_remote_reliable_monotonic_timestamps());
2260 ASSERT_EQ(log_header->message()
2261 .oldest_remote_reliable_monotonic_timestamps()
2262 ->size(),
2263 2u);
2264 ASSERT_TRUE(
2265 log_header->message().has_oldest_local_reliable_monotonic_timestamps());
2266 ASSERT_EQ(log_header->message()
2267 .oldest_local_reliable_monotonic_timestamps()
2268 ->size(),
2269 2u);
2270
2271 ASSERT_TRUE(
2272 log_header->message()
2273 .has_oldest_logger_remote_unreliable_monotonic_timestamps());
2274 ASSERT_EQ(log_header->message()
2275 .oldest_logger_remote_unreliable_monotonic_timestamps()
2276 ->size(),
2277 2u);
2278 ASSERT_TRUE(log_header->message()
2279 .has_oldest_logger_local_unreliable_monotonic_timestamps());
2280 ASSERT_EQ(log_header->message()
2281 .oldest_logger_local_unreliable_monotonic_timestamps()
2282 ->size(),
2283 2u);
2284
2285 if (log_header->message().node()->name()->string_view() != "pi1") {
2286 ASSERT_TRUE(file.find("aos.message_bridge.RemoteMessage") !=
2287 std::string::npos);
2288
2289 const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
2290 ReadNthMessage(file, 0);
2291 CHECK(msg);
2292
2293 EXPECT_TRUE(msg->message().has_monotonic_sent_time());
2294 EXPECT_TRUE(msg->message().has_monotonic_remote_time());
2295
2296 const monotonic_clock::time_point
2297 expected_oldest_local_monotonic_timestamps(
2298 chrono::nanoseconds(msg->message().monotonic_sent_time()));
2299 const monotonic_clock::time_point
2300 expected_oldest_remote_monotonic_timestamps(
2301 chrono::nanoseconds(msg->message().monotonic_remote_time()));
2302 const monotonic_clock::time_point
2303 expected_oldest_timestamp_monotonic_timestamps(
2304 chrono::nanoseconds(msg->message().monotonic_timestamp_time()));
2305
2306 EXPECT_NE(expected_oldest_local_monotonic_timestamps,
2307 monotonic_clock::min_time);
2308 EXPECT_NE(expected_oldest_remote_monotonic_timestamps,
2309 monotonic_clock::min_time);
2310 EXPECT_NE(expected_oldest_timestamp_monotonic_timestamps,
2311 monotonic_clock::min_time);
2312
2313 ++timestamp_file_count;
2314 // Since the log file is from the perspective of the other node,
2315 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2316 monotonic_clock::time_point(chrono::nanoseconds(
2317 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2318 0)));
2319 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2320 monotonic_clock::time_point(chrono::nanoseconds(
2321 log_header->message().oldest_local_monotonic_timestamps()->Get(
2322 0)));
2323 const monotonic_clock::time_point
2324 oldest_remote_unreliable_monotonic_timestamps =
2325 monotonic_clock::time_point(chrono::nanoseconds(
2326 log_header->message()
2327 .oldest_remote_unreliable_monotonic_timestamps()
2328 ->Get(0)));
2329 const monotonic_clock::time_point
2330 oldest_local_unreliable_monotonic_timestamps =
2331 monotonic_clock::time_point(chrono::nanoseconds(
2332 log_header->message()
2333 .oldest_local_unreliable_monotonic_timestamps()
2334 ->Get(0)));
2335 const monotonic_clock::time_point
2336 oldest_remote_reliable_monotonic_timestamps =
2337 monotonic_clock::time_point(chrono::nanoseconds(
2338 log_header->message()
2339 .oldest_remote_reliable_monotonic_timestamps()
2340 ->Get(0)));
2341 const monotonic_clock::time_point
2342 oldest_local_reliable_monotonic_timestamps =
2343 monotonic_clock::time_point(chrono::nanoseconds(
2344 log_header->message()
2345 .oldest_local_reliable_monotonic_timestamps()
2346 ->Get(0)));
2347 const monotonic_clock::time_point
2348 oldest_logger_remote_unreliable_monotonic_timestamps =
2349 monotonic_clock::time_point(chrono::nanoseconds(
2350 log_header->message()
2351 .oldest_logger_remote_unreliable_monotonic_timestamps()
2352 ->Get(1)));
2353 const monotonic_clock::time_point
2354 oldest_logger_local_unreliable_monotonic_timestamps =
2355 monotonic_clock::time_point(chrono::nanoseconds(
2356 log_header->message()
2357 .oldest_logger_local_unreliable_monotonic_timestamps()
2358 ->Get(1)));
2359
2360 const Channel *channel =
2361 event_loop_factory_.configuration()->channels()->Get(
2362 msg->message().channel_index());
2363 const Connection *connection = configuration::ConnectionToNode(
2364 channel, configuration::GetNode(
2365 event_loop_factory_.configuration(),
2366 log_header->message().node()->name()->string_view()));
2367
2368 const bool reliable = connection->time_to_live() == 0;
2369
2370 SCOPED_TRACE(file);
2371 SCOPED_TRACE(aos::FlatbufferToJson(
2372 *log_header, {.multi_line = true, .max_vector_size = 100}));
2373
2374 if (shared()) {
2375 // Confirm that the oldest timestamps match what we expect. Based on
2376 // what we are doing, we know that the oldest time is the first
2377 // message's time.
2378 //
2379 // This makes the test robust to both the split and combined config
2380 // tests.
2381 switch (log_header->message().parts_index()) {
2382 case 0:
2383 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2384 expected_oldest_remote_monotonic_timestamps);
2385 EXPECT_EQ(oldest_local_monotonic_timestamps,
2386 expected_oldest_local_monotonic_timestamps);
2387 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2388 expected_oldest_local_monotonic_timestamps)
2389 << file;
2390 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2391 expected_oldest_timestamp_monotonic_timestamps)
2392 << file;
2393
2394 if (reliable) {
2395 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2396 expected_oldest_remote_monotonic_timestamps);
2397 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2398 expected_oldest_local_monotonic_timestamps);
2399 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2400 monotonic_clock::max_time);
2401 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2402 monotonic_clock::max_time);
2403 } else {
2404 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2405 monotonic_clock::max_time);
2406 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2407 monotonic_clock::max_time);
2408 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2409 expected_oldest_remote_monotonic_timestamps);
2410 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2411 expected_oldest_local_monotonic_timestamps);
2412 }
2413 break;
2414 case 1:
2415 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2416 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2417 EXPECT_EQ(oldest_local_monotonic_timestamps,
2418 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2419 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2420 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2421 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2422 monotonic_clock::epoch() + chrono::nanoseconds(90250000));
2423 if (reliable) {
2424 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2425 expected_oldest_remote_monotonic_timestamps);
2426 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2427 expected_oldest_local_monotonic_timestamps);
2428 EXPECT_EQ(
2429 oldest_remote_unreliable_monotonic_timestamps,
2430 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2431 EXPECT_EQ(
2432 oldest_local_unreliable_monotonic_timestamps,
2433 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2434 } else {
2435 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2436 monotonic_clock::max_time);
2437 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2438 monotonic_clock::max_time);
2439 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2440 expected_oldest_remote_monotonic_timestamps);
2441 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2442 expected_oldest_local_monotonic_timestamps);
2443 }
2444 break;
2445 case 2:
2446 EXPECT_EQ(
2447 oldest_remote_monotonic_timestamps,
2448 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2449 EXPECT_EQ(
2450 oldest_local_monotonic_timestamps,
2451 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2452 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2453 expected_oldest_local_monotonic_timestamps)
2454 << file;
2455 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2456 expected_oldest_timestamp_monotonic_timestamps)
2457 << file;
2458 if (reliable) {
2459 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2460 expected_oldest_remote_monotonic_timestamps);
2461 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2462 expected_oldest_local_monotonic_timestamps);
2463 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2464 monotonic_clock::max_time);
2465 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2466 monotonic_clock::max_time);
2467 } else {
2468 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2469 monotonic_clock::max_time);
2470 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2471 monotonic_clock::max_time);
2472 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2473 expected_oldest_remote_monotonic_timestamps);
2474 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2475 expected_oldest_local_monotonic_timestamps);
2476 }
2477 break;
2478
2479 case 3:
2480 EXPECT_EQ(
2481 oldest_remote_monotonic_timestamps,
2482 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2483 EXPECT_EQ(
2484 oldest_local_monotonic_timestamps,
2485 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2486 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2487 expected_oldest_remote_monotonic_timestamps);
2488 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2489 expected_oldest_local_monotonic_timestamps);
2490 EXPECT_EQ(
2491 oldest_logger_remote_unreliable_monotonic_timestamps,
2492 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2493 EXPECT_EQ(
2494 oldest_logger_local_unreliable_monotonic_timestamps,
2495 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2496 break;
2497 default:
2498 FAIL();
2499 break;
2500 }
2501
2502 switch (log_header->message().parts_index()) {
2503 case 0:
2504 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2505 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2506 break;
2507 case 1:
2508 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2509 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2510 break;
2511 case 2:
2512 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2513 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2514 break;
2515 case 3:
2516 if (shared()) {
2517 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2518 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2519 break;
2520 }
2521 [[fallthrough]];
2522 default:
2523 FAIL();
2524 break;
2525 }
2526 } else {
2527 switch (log_header->message().parts_index()) {
2528 case 0:
2529 if (reliable) {
2530 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2531 monotonic_clock::max_time);
2532 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2533 monotonic_clock::max_time);
2534 EXPECT_EQ(
2535 oldest_logger_remote_unreliable_monotonic_timestamps,
2536 monotonic_clock::epoch() + chrono::nanoseconds(100150000))
2537 << file;
2538 EXPECT_EQ(
2539 oldest_logger_local_unreliable_monotonic_timestamps,
2540 monotonic_clock::epoch() + chrono::nanoseconds(100250000))
2541 << file;
2542 } else {
2543 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2544 expected_oldest_remote_monotonic_timestamps);
2545 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2546 expected_oldest_local_monotonic_timestamps);
2547 EXPECT_EQ(
2548 oldest_logger_remote_unreliable_monotonic_timestamps,
2549 monotonic_clock::epoch() + chrono::nanoseconds(90150000))
2550 << file;
2551 EXPECT_EQ(
2552 oldest_logger_local_unreliable_monotonic_timestamps,
2553 monotonic_clock::epoch() + chrono::nanoseconds(90250000))
2554 << file;
2555 }
2556 break;
2557 case 1:
2558 if (reliable) {
2559 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2560 monotonic_clock::max_time);
2561 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2562 monotonic_clock::max_time);
2563 EXPECT_EQ(
2564 oldest_logger_remote_unreliable_monotonic_timestamps,
2565 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2566 EXPECT_EQ(
2567 oldest_logger_local_unreliable_monotonic_timestamps,
2568 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2569 } else {
2570 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2571 expected_oldest_remote_monotonic_timestamps);
2572 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2573 expected_oldest_local_monotonic_timestamps);
2574 EXPECT_EQ(
2575 oldest_logger_remote_unreliable_monotonic_timestamps,
2576 monotonic_clock::epoch() + chrono::nanoseconds(1323150000));
2577 EXPECT_EQ(
2578 oldest_logger_local_unreliable_monotonic_timestamps,
2579 monotonic_clock::epoch() + chrono::nanoseconds(10100250000));
2580 }
2581 break;
2582 default:
2583 FAIL();
2584 break;
2585 }
2586
2587 switch (log_header->message().parts_index()) {
2588 case 0:
2589 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2590 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2591 break;
2592 case 1:
2593 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2594 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2595 break;
2596 default:
2597 FAIL();
2598 break;
2599 }
2600 }
2601
2602 continue;
2603 }
2604 EXPECT_EQ(
2605 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2606 monotonic_clock::max_time.time_since_epoch().count());
2607 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2608 monotonic_clock::max_time.time_since_epoch().count());
2609 EXPECT_EQ(log_header->message()
2610 .oldest_remote_unreliable_monotonic_timestamps()
2611 ->Get(0),
2612 monotonic_clock::max_time.time_since_epoch().count());
2613 EXPECT_EQ(log_header->message()
2614 .oldest_local_unreliable_monotonic_timestamps()
2615 ->Get(0),
2616 monotonic_clock::max_time.time_since_epoch().count());
2617
2618 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2619 monotonic_clock::time_point(chrono::nanoseconds(
2620 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2621 1)));
2622 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2623 monotonic_clock::time_point(chrono::nanoseconds(
2624 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2625 const monotonic_clock::time_point
2626 oldest_remote_unreliable_monotonic_timestamps =
2627 monotonic_clock::time_point(chrono::nanoseconds(
2628 log_header->message()
2629 .oldest_remote_unreliable_monotonic_timestamps()
2630 ->Get(1)));
2631 const monotonic_clock::time_point
2632 oldest_local_unreliable_monotonic_timestamps =
2633 monotonic_clock::time_point(chrono::nanoseconds(
2634 log_header->message()
2635 .oldest_local_unreliable_monotonic_timestamps()
2636 ->Get(1)));
2637 switch (log_header->message().parts_index()) {
2638 case 0:
2639 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2640 monotonic_clock::max_time);
2641 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2642 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2643 monotonic_clock::max_time);
2644 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2645 monotonic_clock::max_time);
2646 break;
2647 default:
2648 FAIL();
2649 break;
2650 }
2651 }
2652
2653 if (shared()) {
2654 EXPECT_EQ(timestamp_file_count, 4u);
2655 } else {
2656 EXPECT_EQ(timestamp_file_count, 4u);
2657 }
2658
2659 // Confirm that we can actually sort the resulting log and read it.
2660 {
2661 LogReader reader(SortParts(filenames));
2662
2663 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2664 log_reader_factory.set_send_delay(chrono::microseconds(0));
2665
2666 // This sends out the fetched messages and advances time to the start of
2667 // the log file.
2668 reader.Register(&log_reader_factory);
2669
2670 log_reader_factory.Run();
2671
2672 reader.Deregister();
2673 }
2674}
2675
2676// Tests that we properly handle one direction of message_bridge being
2677// unavailable.
2678TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
2679 pi1_->Disconnect(pi2_->node());
2680 time_converter_.AddMonotonic(
2681 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2682
2683 time_converter_.AddMonotonic(
2684 {chrono::milliseconds(10000),
2685 chrono::milliseconds(10000) - chrono::milliseconds(1)});
2686 {
2687 LoggerState pi1_logger = MakeLogger(pi1_);
2688
2689 event_loop_factory_.RunFor(chrono::milliseconds(95));
2690
2691 StartLogger(&pi1_logger);
2692
2693 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2694 }
2695
2696 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2697 // to confirm the right thing happened.
2698 ConfirmReadable(pi1_single_direction_logfiles_);
2699}
2700
2701// Tests that we properly handle one direction of message_bridge being
2702// unavailable.
2703TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
2704 pi1_->Disconnect(pi2_->node());
2705 time_converter_.AddMonotonic(
2706 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(500)});
2707
2708 time_converter_.AddMonotonic(
2709 {chrono::milliseconds(10000),
2710 chrono::milliseconds(10000) + chrono::milliseconds(1)});
2711 {
2712 LoggerState pi1_logger = MakeLogger(pi1_);
2713
2714 event_loop_factory_.RunFor(chrono::milliseconds(95));
2715
2716 StartLogger(&pi1_logger);
2717
2718 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2719 }
2720
2721 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2722 // to confirm the right thing happened.
2723 ConfirmReadable(pi1_single_direction_logfiles_);
2724}
2725
2726// Tests that we explode if someone passes in a part file twice with a better
2727// error than an out of order error.
2728TEST_P(MultinodeLoggerTest, DuplicateLogFiles) {
2729 time_converter_.AddMonotonic(
2730 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2731 {
2732 LoggerState pi1_logger = MakeLogger(pi1_);
2733
2734 event_loop_factory_.RunFor(chrono::milliseconds(95));
2735
2736 StartLogger(&pi1_logger);
2737
2738 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2739 }
2740
2741 std::vector<std::string> duplicates;
2742 for (const std::string &f : pi1_single_direction_logfiles_) {
2743 duplicates.emplace_back(f);
2744 duplicates.emplace_back(f);
2745 }
2746 EXPECT_DEATH({ SortParts(duplicates); }, "Found duplicate parts in");
2747}
2748
2749// Tests that we explode if someone loses a part out of the middle of a log.
2750TEST_P(MultinodeLoggerTest, MissingPartsFromMiddle) {
2751 time_converter_.AddMonotonic(
2752 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2753 {
2754 LoggerState pi1_logger = MakeLogger(pi1_);
2755
2756 event_loop_factory_.RunFor(chrono::milliseconds(95));
2757
2758 StartLogger(&pi1_logger);
2759 aos::monotonic_clock::time_point last_rotation_time =
2760 pi1_logger.event_loop->monotonic_now();
2761 pi1_logger.logger->set_on_logged_period([&] {
2762 const auto now = pi1_logger.event_loop->monotonic_now();
2763 if (now > last_rotation_time + std::chrono::seconds(5)) {
2764 pi1_logger.logger->Rotate();
2765 last_rotation_time = now;
2766 }
2767 });
2768
2769 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2770 }
2771
2772 std::vector<std::string> missing_parts;
2773
2774 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
2775 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part2" + Extension());
2776 missing_parts.emplace_back(absl::StrCat(
2777 logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
2778
2779 EXPECT_DEATH({ SortParts(missing_parts); },
2780 "Broken log, missing part files between");
2781}
2782
2783// Tests that we properly handle a dead node. Do this by just disconnecting it
2784// and only using one nodes of logs.
2785TEST_P(MultinodeLoggerTest, DeadNode) {
2786 pi1_->Disconnect(pi2_->node());
2787 pi2_->Disconnect(pi1_->node());
2788 time_converter_.AddMonotonic(
2789 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2790 {
2791 LoggerState pi1_logger = MakeLogger(pi1_);
2792
2793 event_loop_factory_.RunFor(chrono::milliseconds(95));
2794
2795 StartLogger(&pi1_logger);
2796
2797 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2798 }
2799
2800 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2801 // to confirm the right thing happened.
2802 ConfirmReadable(MakePi1DeadNodeLogfiles());
2803}
2804
2805// Tests that we can relog with a different config. This makes most sense when
2806// you are trying to edit a log and want to use channel renaming + the original
2807// config in the new log.
2808TEST_P(MultinodeLoggerTest, LogDifferentConfig) {
2809 time_converter_.StartEqual();
2810 {
2811 LoggerState pi1_logger = MakeLogger(pi1_);
2812 LoggerState pi2_logger = MakeLogger(pi2_);
2813
2814 event_loop_factory_.RunFor(chrono::milliseconds(95));
2815
2816 StartLogger(&pi1_logger);
2817 StartLogger(&pi2_logger);
2818
2819 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2820 }
2821
2822 LogReader reader(SortParts(logfiles_));
2823 reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
2824
2825 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2826 log_reader_factory.set_send_delay(chrono::microseconds(0));
2827
2828 // This sends out the fetched messages and advances time to the start of the
2829 // log file.
2830 reader.Register(&log_reader_factory);
2831
2832 const Node *pi1 =
2833 configuration::GetNode(log_reader_factory.configuration(), "pi1");
2834 const Node *pi2 =
2835 configuration::GetNode(log_reader_factory.configuration(), "pi2");
2836
2837 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
2838 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
2839 LOG(INFO) << "now pi1 "
2840 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
2841 LOG(INFO) << "now pi2 "
2842 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
2843
2844 EXPECT_THAT(reader.LoggedNodes(),
2845 ::testing::ElementsAre(
2846 configuration::GetNode(reader.logged_configuration(), pi1),
2847 configuration::GetNode(reader.logged_configuration(), pi2)));
2848
2849 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
2850
2851 // And confirm we can re-create a log again, while checking the contents.
2852 std::vector<std::string> log_files;
2853 {
2854 LoggerState pi1_logger =
2855 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
2856 &log_reader_factory, reader.logged_configuration());
2857 LoggerState pi2_logger =
2858 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
2859 &log_reader_factory, reader.logged_configuration());
2860
2861 pi1_logger.StartLogger(tmp_dir_ + "/relogged1");
2862 pi2_logger.StartLogger(tmp_dir_ + "/relogged2");
2863
2864 log_reader_factory.Run();
2865
2866 for (auto &x : pi1_logger.log_namer->all_filenames()) {
2867 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged1_", x));
2868 }
2869 for (auto &x : pi2_logger.log_namer->all_filenames()) {
2870 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged2_", x));
2871 }
2872 }
2873
2874 reader.Deregister();
2875
2876 // And verify that we can run the LogReader over the relogged files without
2877 // hitting any fatal errors.
2878 {
2879 LogReader relogged_reader(SortParts(log_files));
2880 relogged_reader.Register();
2881
2882 relogged_reader.event_loop_factory()->Run();
2883 }
2884}
2885
2886// Tests that we properly replay a log where the start time for a node is before
2887// any data on the node. This can happen if the logger starts before data is
2888// published. While the scenario below is a bit convoluted, we have seen logs
2889// like this generated out in the wild.
2890TEST(MultinodeRebootLoggerTest, StartTimeBeforeData) {
2891 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2892 aos::configuration::ReadConfig(ArtifactPath(
2893 "aos/events/logging/multinode_pingpong_split3_config.json"));
2894 message_bridge::TestingTimeConverter time_converter(
2895 configuration::NodesCount(&config.message()));
2896 SimulatedEventLoopFactory event_loop_factory(&config.message());
2897 event_loop_factory.SetTimeConverter(&time_converter);
2898 NodeEventLoopFactory *const pi1 =
2899 event_loop_factory.GetNodeEventLoopFactory("pi1");
2900 const size_t pi1_index = configuration::GetNodeIndex(
2901 event_loop_factory.configuration(), pi1->node());
2902 NodeEventLoopFactory *const pi2 =
2903 event_loop_factory.GetNodeEventLoopFactory("pi2");
2904 const size_t pi2_index = configuration::GetNodeIndex(
2905 event_loop_factory.configuration(), pi2->node());
2906 NodeEventLoopFactory *const pi3 =
2907 event_loop_factory.GetNodeEventLoopFactory("pi3");
2908 const size_t pi3_index = configuration::GetNodeIndex(
2909 event_loop_factory.configuration(), pi3->node());
2910
2911 const std::string kLogfile1_1 =
2912 aos::testing::TestTmpDir() + "/multi_logfile1/";
2913 const std::string kLogfile2_1 =
2914 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
2915 const std::string kLogfile2_2 =
2916 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
2917 const std::string kLogfile3_1 =
2918 aos::testing::TestTmpDir() + "/multi_logfile3/";
2919 util::UnlinkRecursive(kLogfile1_1);
2920 util::UnlinkRecursive(kLogfile2_1);
2921 util::UnlinkRecursive(kLogfile2_2);
2922 util::UnlinkRecursive(kLogfile3_1);
2923 const UUID pi1_boot0 = UUID::Random();
2924 const UUID pi2_boot0 = UUID::Random();
2925 const UUID pi2_boot1 = UUID::Random();
2926 const UUID pi3_boot0 = UUID::Random();
2927 {
2928 CHECK_EQ(pi1_index, 0u);
2929 CHECK_EQ(pi2_index, 1u);
2930 CHECK_EQ(pi3_index, 2u);
2931
2932 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
2933 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
2934 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
2935 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
2936
2937 time_converter.AddNextTimestamp(
2938 distributed_clock::epoch(),
2939 {BootTimestamp::epoch(), BootTimestamp::epoch(),
2940 BootTimestamp::epoch()});
2941 const chrono::nanoseconds reboot_time = chrono::milliseconds(20000);
2942 time_converter.AddNextTimestamp(
2943 distributed_clock::epoch() + reboot_time,
2944 {BootTimestamp::epoch() + reboot_time,
2945 BootTimestamp{
2946 .boot = 1,
2947 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)},
2948 BootTimestamp::epoch() + reboot_time});
2949 }
2950
2951 // Make everything perfectly quiet.
2952 event_loop_factory.SkipTimingReport();
2953 event_loop_factory.DisableStatistics();
2954
2955 std::vector<std::string> filenames;
2956 {
2957 LoggerState pi1_logger = MakeLoggerState(
2958 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2959 LoggerState pi3_logger = MakeLoggerState(
2960 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2961 {
2962 // And now start the logger.
2963 LoggerState pi2_logger = MakeLoggerState(
2964 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2965
2966 event_loop_factory.RunFor(chrono::milliseconds(1000));
2967
2968 pi1_logger.StartLogger(kLogfile1_1);
2969 pi3_logger.StartLogger(kLogfile3_1);
2970 pi2_logger.StartLogger(kLogfile2_1);
2971
2972 event_loop_factory.RunFor(chrono::milliseconds(10000));
2973
2974 // Now that we've got a start time in the past, turn on data.
2975 event_loop_factory.EnableStatistics();
2976 std::unique_ptr<aos::EventLoop> ping_event_loop =
2977 pi1->MakeEventLoop("ping");
2978 Ping ping(ping_event_loop.get());
2979
2980 pi2->AlwaysStart<Pong>("pong");
2981
2982 event_loop_factory.RunFor(chrono::milliseconds(3000));
2983
2984 pi2_logger.AppendAllFilenames(&filenames);
2985
2986 // Stop logging on pi2 before rebooting and completely shut off all
2987 // messages on pi2.
2988 pi2->DisableStatistics();
2989 pi1->Disconnect(pi2->node());
2990 pi2->Disconnect(pi1->node());
2991 }
2992 event_loop_factory.RunFor(chrono::milliseconds(7000));
2993 // pi2 now reboots.
2994 {
2995 event_loop_factory.RunFor(chrono::milliseconds(1000));
2996
2997 // Start logging again on pi2 after it is up.
2998 LoggerState pi2_logger = MakeLoggerState(
2999 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3000 pi2_logger.StartLogger(kLogfile2_2);
3001
3002 event_loop_factory.RunFor(chrono::milliseconds(10000));
3003 // And, now that we have a start time in the log, turn data back on.
3004 pi2->EnableStatistics();
3005 pi1->Connect(pi2->node());
3006 pi2->Connect(pi1->node());
3007
3008 pi2->AlwaysStart<Pong>("pong");
3009 std::unique_ptr<aos::EventLoop> ping_event_loop =
3010 pi1->MakeEventLoop("ping");
3011 Ping ping(ping_event_loop.get());
3012
3013 event_loop_factory.RunFor(chrono::milliseconds(3000));
3014
3015 pi2_logger.AppendAllFilenames(&filenames);
3016 }
3017
3018 pi1_logger.AppendAllFilenames(&filenames);
3019 pi3_logger.AppendAllFilenames(&filenames);
3020 }
3021
3022 // Confirm that we can parse the result. LogReader has enough internal CHECKs
3023 // to confirm the right thing happened.
3024 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3025 auto result = ConfirmReadable(filenames);
3026 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
3027 chrono::seconds(1)));
3028 EXPECT_THAT(result[0].second,
3029 ::testing::ElementsAre(realtime_clock::epoch() +
3030 chrono::microseconds(34990350)));
3031
3032 EXPECT_THAT(result[1].first,
3033 ::testing::ElementsAre(
3034 realtime_clock::epoch() + chrono::seconds(1),
3035 realtime_clock::epoch() + chrono::microseconds(3323000)));
3036 EXPECT_THAT(result[1].second,
3037 ::testing::ElementsAre(
3038 realtime_clock::epoch() + chrono::microseconds(13990200),
3039 realtime_clock::epoch() + chrono::microseconds(16313200)));
3040
3041 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
3042 chrono::seconds(1)));
3043 EXPECT_THAT(result[2].second,
3044 ::testing::ElementsAre(realtime_clock::epoch() +
3045 chrono::microseconds(34900150)));
3046}
3047
3048// Tests that local data before remote data after reboot is properly replayed.
3049// We only trigger a reboot in the timestamp interpolation function when solving
3050// the timestamp problem when we actually have a point in the function. This
3051// originally only happened when a point passes the noncausal filter. At the
3052// start of time for the second boot, if we aren't careful, we will have
3053// messages which need to be published at times before the boot. This happens
3054// when a local message is in the log before a forwarded message, so there is no
3055// point in the interpolation function. This delays the reboot. So, we need to
3056// recreate that situation and make sure it doesn't come back.
3057TEST(MultinodeRebootLoggerTest,
3058 LocalMessageBeforeRemoteBeforeStartAfterReboot) {
3059 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3060 aos::configuration::ReadConfig(ArtifactPath(
3061 "aos/events/logging/multinode_pingpong_split3_config.json"));
3062 message_bridge::TestingTimeConverter time_converter(
3063 configuration::NodesCount(&config.message()));
3064 SimulatedEventLoopFactory event_loop_factory(&config.message());
3065 event_loop_factory.SetTimeConverter(&time_converter);
3066 NodeEventLoopFactory *const pi1 =
3067 event_loop_factory.GetNodeEventLoopFactory("pi1");
3068 const size_t pi1_index = configuration::GetNodeIndex(
3069 event_loop_factory.configuration(), pi1->node());
3070 NodeEventLoopFactory *const pi2 =
3071 event_loop_factory.GetNodeEventLoopFactory("pi2");
3072 const size_t pi2_index = configuration::GetNodeIndex(
3073 event_loop_factory.configuration(), pi2->node());
3074 NodeEventLoopFactory *const pi3 =
3075 event_loop_factory.GetNodeEventLoopFactory("pi3");
3076 const size_t pi3_index = configuration::GetNodeIndex(
3077 event_loop_factory.configuration(), pi3->node());
3078
3079 const std::string kLogfile1_1 =
3080 aos::testing::TestTmpDir() + "/multi_logfile1/";
3081 const std::string kLogfile2_1 =
3082 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3083 const std::string kLogfile2_2 =
3084 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3085 const std::string kLogfile3_1 =
3086 aos::testing::TestTmpDir() + "/multi_logfile3/";
3087 util::UnlinkRecursive(kLogfile1_1);
3088 util::UnlinkRecursive(kLogfile2_1);
3089 util::UnlinkRecursive(kLogfile2_2);
3090 util::UnlinkRecursive(kLogfile3_1);
3091 const UUID pi1_boot0 = UUID::Random();
3092 const UUID pi2_boot0 = UUID::Random();
3093 const UUID pi2_boot1 = UUID::Random();
3094 const UUID pi3_boot0 = UUID::Random();
3095 {
3096 CHECK_EQ(pi1_index, 0u);
3097 CHECK_EQ(pi2_index, 1u);
3098 CHECK_EQ(pi3_index, 2u);
3099
3100 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3101 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3102 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3103 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3104
3105 time_converter.AddNextTimestamp(
3106 distributed_clock::epoch(),
3107 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3108 BootTimestamp::epoch()});
3109 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3110 time_converter.AddNextTimestamp(
3111 distributed_clock::epoch() + reboot_time,
3112 {BootTimestamp::epoch() + reboot_time,
3113 BootTimestamp{.boot = 1,
3114 .time = monotonic_clock::epoch() + reboot_time +
3115 chrono::seconds(100)},
3116 BootTimestamp::epoch() + reboot_time});
3117 }
3118
3119 std::vector<std::string> filenames;
3120 {
3121 LoggerState pi1_logger = MakeLoggerState(
3122 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3123 LoggerState pi3_logger = MakeLoggerState(
3124 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3125 {
3126 // And now start the logger.
3127 LoggerState pi2_logger = MakeLoggerState(
3128 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3129
3130 pi1_logger.StartLogger(kLogfile1_1);
3131 pi3_logger.StartLogger(kLogfile3_1);
3132 pi2_logger.StartLogger(kLogfile2_1);
3133
3134 event_loop_factory.RunFor(chrono::milliseconds(1005));
3135
3136 // Now that we've got a start time in the past, turn on data.
3137 std::unique_ptr<aos::EventLoop> ping_event_loop =
3138 pi1->MakeEventLoop("ping");
3139 Ping ping(ping_event_loop.get());
3140
3141 pi2->AlwaysStart<Pong>("pong");
3142
3143 event_loop_factory.RunFor(chrono::milliseconds(3000));
3144
3145 pi2_logger.AppendAllFilenames(&filenames);
3146
3147 // Disable any remote messages on pi2.
3148 pi1->Disconnect(pi2->node());
3149 pi2->Disconnect(pi1->node());
3150 }
3151 event_loop_factory.RunFor(chrono::milliseconds(995));
3152 // pi2 now reboots at 5 seconds.
3153 {
3154 event_loop_factory.RunFor(chrono::milliseconds(1000));
3155
3156 // Make local stuff happen before we start logging and connect the remote.
3157 pi2->AlwaysStart<Pong>("pong");
3158 std::unique_ptr<aos::EventLoop> ping_event_loop =
3159 pi1->MakeEventLoop("ping");
3160 Ping ping(ping_event_loop.get());
3161 event_loop_factory.RunFor(chrono::milliseconds(1005));
3162
3163 // Start logging again on pi2 after it is up.
3164 LoggerState pi2_logger = MakeLoggerState(
3165 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3166 pi2_logger.StartLogger(kLogfile2_2);
3167
3168 // And allow remote messages now that we have some local ones.
3169 pi1->Connect(pi2->node());
3170 pi2->Connect(pi1->node());
3171
3172 event_loop_factory.RunFor(chrono::milliseconds(1000));
3173
3174 event_loop_factory.RunFor(chrono::milliseconds(3000));
3175
3176 pi2_logger.AppendAllFilenames(&filenames);
3177 }
3178
3179 pi1_logger.AppendAllFilenames(&filenames);
3180 pi3_logger.AppendAllFilenames(&filenames);
3181 }
3182
3183 // Confirm that we can parse the result. LogReader has enough internal CHECKs
3184 // to confirm the right thing happened.
3185 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3186 auto result = ConfirmReadable(filenames);
3187
3188 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3189 EXPECT_THAT(result[0].second,
3190 ::testing::ElementsAre(realtime_clock::epoch() +
3191 chrono::microseconds(11000350)));
3192
3193 EXPECT_THAT(result[1].first,
3194 ::testing::ElementsAre(
3195 realtime_clock::epoch(),
3196 realtime_clock::epoch() + chrono::microseconds(107005000)));
3197 EXPECT_THAT(result[1].second,
3198 ::testing::ElementsAre(
3199 realtime_clock::epoch() + chrono::microseconds(4000150),
3200 realtime_clock::epoch() + chrono::microseconds(111000200)));
3201
3202 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3203 EXPECT_THAT(result[2].second,
3204 ::testing::ElementsAre(realtime_clock::epoch() +
3205 chrono::microseconds(11000150)));
3206
3207 auto start_stop_result = ConfirmReadable(
3208 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3209 realtime_clock::epoch() + chrono::milliseconds(3000));
3210
3211 EXPECT_THAT(
3212 start_stop_result[0].first,
3213 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3214 EXPECT_THAT(
3215 start_stop_result[0].second,
3216 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3217 EXPECT_THAT(
3218 start_stop_result[1].first,
3219 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3220 EXPECT_THAT(
3221 start_stop_result[1].second,
3222 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3223 EXPECT_THAT(
3224 start_stop_result[2].first,
3225 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3226 EXPECT_THAT(
3227 start_stop_result[2].second,
3228 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3229}
3230
3231// Tests that setting the start and stop flags across a reboot works as
3232// expected.
3233TEST(MultinodeRebootLoggerTest, RebootStartStopTimes) {
3234 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3235 aos::configuration::ReadConfig(ArtifactPath(
3236 "aos/events/logging/multinode_pingpong_split3_config.json"));
3237 message_bridge::TestingTimeConverter time_converter(
3238 configuration::NodesCount(&config.message()));
3239 SimulatedEventLoopFactory event_loop_factory(&config.message());
3240 event_loop_factory.SetTimeConverter(&time_converter);
3241 NodeEventLoopFactory *const pi1 =
3242 event_loop_factory.GetNodeEventLoopFactory("pi1");
3243 const size_t pi1_index = configuration::GetNodeIndex(
3244 event_loop_factory.configuration(), pi1->node());
3245 NodeEventLoopFactory *const pi2 =
3246 event_loop_factory.GetNodeEventLoopFactory("pi2");
3247 const size_t pi2_index = configuration::GetNodeIndex(
3248 event_loop_factory.configuration(), pi2->node());
3249 NodeEventLoopFactory *const pi3 =
3250 event_loop_factory.GetNodeEventLoopFactory("pi3");
3251 const size_t pi3_index = configuration::GetNodeIndex(
3252 event_loop_factory.configuration(), pi3->node());
3253
3254 const std::string kLogfile1_1 =
3255 aos::testing::TestTmpDir() + "/multi_logfile1/";
3256 const std::string kLogfile2_1 =
3257 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3258 const std::string kLogfile2_2 =
3259 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3260 const std::string kLogfile3_1 =
3261 aos::testing::TestTmpDir() + "/multi_logfile3/";
3262 util::UnlinkRecursive(kLogfile1_1);
3263 util::UnlinkRecursive(kLogfile2_1);
3264 util::UnlinkRecursive(kLogfile2_2);
3265 util::UnlinkRecursive(kLogfile3_1);
3266 {
3267 CHECK_EQ(pi1_index, 0u);
3268 CHECK_EQ(pi2_index, 1u);
3269 CHECK_EQ(pi3_index, 2u);
3270
3271 time_converter.AddNextTimestamp(
3272 distributed_clock::epoch(),
3273 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3274 BootTimestamp::epoch()});
3275 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3276 time_converter.AddNextTimestamp(
3277 distributed_clock::epoch() + reboot_time,
3278 {BootTimestamp::epoch() + reboot_time,
3279 BootTimestamp{.boot = 1,
3280 .time = monotonic_clock::epoch() + reboot_time},
3281 BootTimestamp::epoch() + reboot_time});
3282 }
3283
3284 std::vector<std::string> filenames;
3285 {
3286 LoggerState pi1_logger = MakeLoggerState(
3287 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3288 LoggerState pi3_logger = MakeLoggerState(
3289 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3290 {
3291 // And now start the logger.
3292 LoggerState pi2_logger = MakeLoggerState(
3293 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3294
3295 pi1_logger.StartLogger(kLogfile1_1);
3296 pi3_logger.StartLogger(kLogfile3_1);
3297 pi2_logger.StartLogger(kLogfile2_1);
3298
3299 event_loop_factory.RunFor(chrono::milliseconds(1005));
3300
3301 // Now that we've got a start time in the past, turn on data.
3302 std::unique_ptr<aos::EventLoop> ping_event_loop =
3303 pi1->MakeEventLoop("ping");
3304 Ping ping(ping_event_loop.get());
3305
3306 pi2->AlwaysStart<Pong>("pong");
3307
3308 event_loop_factory.RunFor(chrono::milliseconds(3000));
3309
3310 pi2_logger.AppendAllFilenames(&filenames);
3311 }
3312 event_loop_factory.RunFor(chrono::milliseconds(995));
3313 // pi2 now reboots at 5 seconds.
3314 {
3315 event_loop_factory.RunFor(chrono::milliseconds(1000));
3316
3317 // Make local stuff happen before we start logging and connect the remote.
3318 pi2->AlwaysStart<Pong>("pong");
3319 std::unique_ptr<aos::EventLoop> ping_event_loop =
3320 pi1->MakeEventLoop("ping");
3321 Ping ping(ping_event_loop.get());
3322 event_loop_factory.RunFor(chrono::milliseconds(5));
3323
3324 // Start logging again on pi2 after it is up.
3325 LoggerState pi2_logger = MakeLoggerState(
3326 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3327 pi2_logger.StartLogger(kLogfile2_2);
3328
3329 event_loop_factory.RunFor(chrono::milliseconds(5000));
3330
3331 pi2_logger.AppendAllFilenames(&filenames);
3332 }
3333
3334 pi1_logger.AppendAllFilenames(&filenames);
3335 pi3_logger.AppendAllFilenames(&filenames);
3336 }
3337
3338 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3339 auto result = ConfirmReadable(filenames);
3340
3341 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3342 EXPECT_THAT(result[0].second,
3343 ::testing::ElementsAre(realtime_clock::epoch() +
3344 chrono::microseconds(11000350)));
3345
3346 EXPECT_THAT(result[1].first,
3347 ::testing::ElementsAre(
3348 realtime_clock::epoch(),
3349 realtime_clock::epoch() + chrono::microseconds(6005000)));
3350 EXPECT_THAT(result[1].second,
3351 ::testing::ElementsAre(
3352 realtime_clock::epoch() + chrono::microseconds(4900150),
3353 realtime_clock::epoch() + chrono::microseconds(11000200)));
3354
3355 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3356 EXPECT_THAT(result[2].second,
3357 ::testing::ElementsAre(realtime_clock::epoch() +
3358 chrono::microseconds(11000150)));
3359
3360 // Confirm we observed the correct start and stop times. We should see the
3361 // reboot here.
3362 auto start_stop_result = ConfirmReadable(
3363 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3364 realtime_clock::epoch() + chrono::milliseconds(8000));
3365
3366 EXPECT_THAT(
3367 start_stop_result[0].first,
3368 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3369 EXPECT_THAT(
3370 start_stop_result[0].second,
3371 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3372 EXPECT_THAT(start_stop_result[1].first,
3373 ::testing::ElementsAre(
3374 realtime_clock::epoch() + chrono::seconds(2),
3375 realtime_clock::epoch() + chrono::microseconds(6005000)));
3376 EXPECT_THAT(start_stop_result[1].second,
3377 ::testing::ElementsAre(
3378 realtime_clock::epoch() + chrono::microseconds(4900150),
3379 realtime_clock::epoch() + chrono::seconds(8)));
3380 EXPECT_THAT(
3381 start_stop_result[2].first,
3382 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3383 EXPECT_THAT(
3384 start_stop_result[2].second,
3385 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3386}
3387
3388// Tests that we properly handle one direction being down.
3389TEST(MissingDirectionTest, OneDirection) {
3390 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3391 aos::configuration::ReadConfig(ArtifactPath(
3392 "aos/events/logging/multinode_pingpong_split4_config.json"));
3393 message_bridge::TestingTimeConverter time_converter(
3394 configuration::NodesCount(&config.message()));
3395 SimulatedEventLoopFactory event_loop_factory(&config.message());
3396 event_loop_factory.SetTimeConverter(&time_converter);
3397
3398 NodeEventLoopFactory *const pi1 =
3399 event_loop_factory.GetNodeEventLoopFactory("pi1");
3400 const size_t pi1_index = configuration::GetNodeIndex(
3401 event_loop_factory.configuration(), pi1->node());
3402 NodeEventLoopFactory *const pi2 =
3403 event_loop_factory.GetNodeEventLoopFactory("pi2");
3404 const size_t pi2_index = configuration::GetNodeIndex(
3405 event_loop_factory.configuration(), pi2->node());
3406 std::vector<std::string> filenames;
3407
3408 {
3409 CHECK_EQ(pi1_index, 0u);
3410 CHECK_EQ(pi2_index, 1u);
3411
3412 time_converter.AddNextTimestamp(
3413 distributed_clock::epoch(),
3414 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3415
3416 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3417 time_converter.AddNextTimestamp(
3418 distributed_clock::epoch() + reboot_time,
3419 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3420 BootTimestamp::epoch() + reboot_time});
3421 }
3422
3423 const std::string kLogfile2_1 =
3424 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3425 const std::string kLogfile1_1 =
3426 aos::testing::TestTmpDir() + "/multi_logfile1.1/";
3427 util::UnlinkRecursive(kLogfile2_1);
3428 util::UnlinkRecursive(kLogfile1_1);
3429
3430 pi2->Disconnect(pi1->node());
3431
3432 pi1->AlwaysStart<Ping>("ping");
3433 pi2->AlwaysStart<Pong>("pong");
3434
3435 {
3436 LoggerState pi2_logger = MakeLoggerState(
3437 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3438
3439 event_loop_factory.RunFor(chrono::milliseconds(95));
3440
3441 pi2_logger.StartLogger(kLogfile2_1);
3442
3443 event_loop_factory.RunFor(chrono::milliseconds(6000));
3444
3445 pi2->Connect(pi1->node());
3446
3447 LoggerState pi1_logger = MakeLoggerState(
3448 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3449 pi1_logger.StartLogger(kLogfile1_1);
3450
3451 event_loop_factory.RunFor(chrono::milliseconds(5000));
3452 pi1_logger.AppendAllFilenames(&filenames);
3453 pi2_logger.AppendAllFilenames(&filenames);
3454 }
3455
3456 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3457 ConfirmReadable(filenames);
3458}
3459
3460// Tests that we properly handle only one direction ever existing after a
3461// reboot.
3462TEST(MissingDirectionTest, OneDirectionAfterReboot) {
3463 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3464 aos::configuration::ReadConfig(ArtifactPath(
3465 "aos/events/logging/multinode_pingpong_split4_config.json"));
3466 message_bridge::TestingTimeConverter time_converter(
3467 configuration::NodesCount(&config.message()));
3468 SimulatedEventLoopFactory event_loop_factory(&config.message());
3469 event_loop_factory.SetTimeConverter(&time_converter);
3470
3471 NodeEventLoopFactory *const pi1 =
3472 event_loop_factory.GetNodeEventLoopFactory("pi1");
3473 const size_t pi1_index = configuration::GetNodeIndex(
3474 event_loop_factory.configuration(), pi1->node());
3475 NodeEventLoopFactory *const pi2 =
3476 event_loop_factory.GetNodeEventLoopFactory("pi2");
3477 const size_t pi2_index = configuration::GetNodeIndex(
3478 event_loop_factory.configuration(), pi2->node());
3479 std::vector<std::string> filenames;
3480
3481 {
3482 CHECK_EQ(pi1_index, 0u);
3483 CHECK_EQ(pi2_index, 1u);
3484
3485 time_converter.AddNextTimestamp(
3486 distributed_clock::epoch(),
3487 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3488
3489 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3490 time_converter.AddNextTimestamp(
3491 distributed_clock::epoch() + reboot_time,
3492 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3493 BootTimestamp::epoch() + reboot_time});
3494 }
3495
3496 const std::string kLogfile2_1 =
3497 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3498 util::UnlinkRecursive(kLogfile2_1);
3499
3500 pi1->AlwaysStart<Ping>("ping");
3501
3502 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3503 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3504 // second boot.
3505 {
3506 LoggerState pi2_logger = MakeLoggerState(
3507 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3508
3509 event_loop_factory.RunFor(chrono::milliseconds(95));
3510
3511 pi2_logger.StartLogger(kLogfile2_1);
3512
3513 event_loop_factory.RunFor(chrono::milliseconds(4000));
3514
3515 pi2->Disconnect(pi1->node());
3516
3517 event_loop_factory.RunFor(chrono::milliseconds(1000));
3518 pi1->AlwaysStart<Ping>("ping");
3519
3520 event_loop_factory.RunFor(chrono::milliseconds(5000));
3521 pi2_logger.AppendAllFilenames(&filenames);
3522 }
3523
3524 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3525 ConfirmReadable(filenames);
3526}
3527
3528// Tests that we properly handle only one direction ever existing after a reboot
3529// with only reliable data.
3530TEST(MissingDirectionTest, OneDirectionAfterRebootReliable) {
3531 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3532 aos::configuration::ReadConfig(ArtifactPath(
3533 "aos/events/logging/multinode_pingpong_split4_reliable_config.json"));
3534 message_bridge::TestingTimeConverter time_converter(
3535 configuration::NodesCount(&config.message()));
3536 SimulatedEventLoopFactory event_loop_factory(&config.message());
3537 event_loop_factory.SetTimeConverter(&time_converter);
3538
3539 NodeEventLoopFactory *const pi1 =
3540 event_loop_factory.GetNodeEventLoopFactory("pi1");
3541 const size_t pi1_index = configuration::GetNodeIndex(
3542 event_loop_factory.configuration(), pi1->node());
3543 NodeEventLoopFactory *const pi2 =
3544 event_loop_factory.GetNodeEventLoopFactory("pi2");
3545 const size_t pi2_index = configuration::GetNodeIndex(
3546 event_loop_factory.configuration(), pi2->node());
3547 std::vector<std::string> filenames;
3548
3549 {
3550 CHECK_EQ(pi1_index, 0u);
3551 CHECK_EQ(pi2_index, 1u);
3552
3553 time_converter.AddNextTimestamp(
3554 distributed_clock::epoch(),
3555 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3556
3557 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3558 time_converter.AddNextTimestamp(
3559 distributed_clock::epoch() + reboot_time,
3560 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3561 BootTimestamp::epoch() + reboot_time});
3562 }
3563
3564 const std::string kLogfile2_1 =
3565 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3566 util::UnlinkRecursive(kLogfile2_1);
3567
3568 pi1->AlwaysStart<Ping>("ping");
3569
3570 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3571 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3572 // second boot.
3573 {
3574 LoggerState pi2_logger = MakeLoggerState(
3575 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3576
3577 event_loop_factory.RunFor(chrono::milliseconds(95));
3578
3579 pi2_logger.StartLogger(kLogfile2_1);
3580
3581 event_loop_factory.RunFor(chrono::milliseconds(4000));
3582
3583 pi2->Disconnect(pi1->node());
3584
3585 event_loop_factory.RunFor(chrono::milliseconds(1000));
3586 pi1->AlwaysStart<Ping>("ping");
3587
3588 event_loop_factory.RunFor(chrono::milliseconds(5000));
3589 pi2_logger.AppendAllFilenames(&filenames);
3590 }
3591
3592 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3593 ConfirmReadable(filenames);
3594}
3595
3596// Tests that we properly handle what used to be a time violation in one
3597// direction. This can occur when one direction goes down after sending some
3598// data, but the other keeps working. The down direction ends up resolving to a
3599// straight line in the noncausal filter, where the direction which is still up
3600// can cross that line. Really, time progressed along just fine but we assumed
3601// that the offset was a line when it could have deviated by up to 1ms/second.
3602TEST_P(MultinodeLoggerTest, OneDirectionTimeDrift) {
3603 std::vector<std::string> filenames;
3604
3605 CHECK_EQ(pi1_index_, 0u);
3606 CHECK_EQ(pi2_index_, 1u);
3607
3608 time_converter_.AddNextTimestamp(
3609 distributed_clock::epoch(),
3610 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3611
3612 const chrono::nanoseconds before_disconnect_duration =
3613 time_converter_.AddMonotonic(
3614 {chrono::milliseconds(1000), chrono::milliseconds(1000)});
3615
3616 const chrono::nanoseconds test_duration =
3617 time_converter_.AddMonotonic(
3618 {chrono::milliseconds(1000), chrono::milliseconds(1000)}) +
3619 time_converter_.AddMonotonic(
3620 {chrono::milliseconds(10000),
3621 chrono::milliseconds(10000) - chrono::milliseconds(5)}) +
3622 time_converter_.AddMonotonic(
3623 {chrono::milliseconds(10000),
3624 chrono::milliseconds(10000) + chrono::milliseconds(5)});
3625
3626 const std::string kLogfile =
3627 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3628 util::UnlinkRecursive(kLogfile);
3629
3630 {
3631 LoggerState pi2_logger = MakeLogger(pi2_);
3632 pi2_logger.StartLogger(kLogfile);
3633 event_loop_factory_.RunFor(before_disconnect_duration);
3634
3635 pi2_->Disconnect(pi1_->node());
3636
3637 event_loop_factory_.RunFor(test_duration);
3638 pi2_->Connect(pi1_->node());
3639
3640 event_loop_factory_.RunFor(chrono::milliseconds(5000));
3641 pi2_logger.AppendAllFilenames(&filenames);
3642 }
3643
3644 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3645 ConfirmReadable(filenames);
3646}
3647
3648// Tests that we can replay a logfile that has timestamps such that at least one
3649// node's epoch is at a positive distributed_clock (and thus will have to be
3650// booted after the other node(s)).
3651TEST_P(MultinodeLoggerTest, StartOneNodeBeforeOther) {
3652 std::vector<std::string> filenames;
3653
3654 CHECK_EQ(pi1_index_, 0u);
3655 CHECK_EQ(pi2_index_, 1u);
3656
3657 time_converter_.AddNextTimestamp(
3658 distributed_clock::epoch(),
3659 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3660
3661 const chrono::nanoseconds before_reboot_duration = chrono::milliseconds(1000);
3662 time_converter_.RebootAt(
3663 0, distributed_clock::time_point(before_reboot_duration));
3664
3665 const chrono::nanoseconds test_duration = time_converter_.AddMonotonic(
3666 {chrono::milliseconds(10000), chrono::milliseconds(10000)});
3667
3668 const std::string kLogfile =
3669 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3670 util::UnlinkRecursive(kLogfile);
3671
3672 pi2_->Disconnect(pi1_->node());
3673 pi1_->Disconnect(pi2_->node());
3674
3675 {
3676 LoggerState pi2_logger = MakeLogger(pi2_);
3677
3678 pi2_logger.StartLogger(kLogfile);
3679 event_loop_factory_.RunFor(before_reboot_duration);
3680
3681 pi2_->Connect(pi1_->node());
3682 pi1_->Connect(pi2_->node());
3683
3684 event_loop_factory_.RunFor(test_duration);
3685
3686 pi2_logger.AppendAllFilenames(&filenames);
3687 }
3688
3689 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3690 ConfirmReadable(filenames);
3691
3692 {
3693 LogReader reader(sorted_parts);
3694 SimulatedEventLoopFactory replay_factory(reader.configuration());
3695 reader.RegisterWithoutStarting(&replay_factory);
3696
3697 NodeEventLoopFactory *const replay_node =
3698 reader.event_loop_factory()->GetNodeEventLoopFactory("pi1");
3699
3700 std::unique_ptr<EventLoop> test_event_loop =
3701 replay_node->MakeEventLoop("test_reader");
3702 replay_node->OnStartup([replay_node]() {
3703 // Check that we didn't boot until at least t=0.
3704 CHECK_LE(monotonic_clock::epoch(), replay_node->monotonic_now());
3705 });
3706 test_event_loop->OnRun([&test_event_loop]() {
3707 // Check that we didn't boot until at least t=0.
3708 EXPECT_LE(monotonic_clock::epoch(), test_event_loop->monotonic_now());
3709 });
3710 reader.event_loop_factory()->Run();
3711 reader.Deregister();
3712 }
3713}
3714
3715// Tests that when we have a loop without all the logs at all points in time, we
3716// can sort it properly.
3717TEST(MultinodeLoggerLoopTest, Loop) {
3718 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3719 aos::configuration::ReadConfig(ArtifactPath(
3720 "aos/events/logging/multinode_pingpong_triangle_split_config.json"));
3721 message_bridge::TestingTimeConverter time_converter(
3722 configuration::NodesCount(&config.message()));
3723 SimulatedEventLoopFactory event_loop_factory(&config.message());
3724 event_loop_factory.SetTimeConverter(&time_converter);
3725
3726 NodeEventLoopFactory *const pi1 =
3727 event_loop_factory.GetNodeEventLoopFactory("pi1");
3728 NodeEventLoopFactory *const pi2 =
3729 event_loop_factory.GetNodeEventLoopFactory("pi2");
3730 NodeEventLoopFactory *const pi3 =
3731 event_loop_factory.GetNodeEventLoopFactory("pi3");
3732
3733 const std::string kLogfile1_1 =
3734 aos::testing::TestTmpDir() + "/multi_logfile1/";
3735 const std::string kLogfile2_1 =
3736 aos::testing::TestTmpDir() + "/multi_logfile2/";
3737 const std::string kLogfile3_1 =
3738 aos::testing::TestTmpDir() + "/multi_logfile3/";
3739 util::UnlinkRecursive(kLogfile1_1);
3740 util::UnlinkRecursive(kLogfile2_1);
3741 util::UnlinkRecursive(kLogfile3_1);
3742
3743 {
3744 // Make pi1 boot before everything else.
3745 time_converter.AddNextTimestamp(
3746 distributed_clock::epoch(),
3747 {BootTimestamp::epoch(),
3748 BootTimestamp::epoch() - chrono::milliseconds(100),
3749 BootTimestamp::epoch() - chrono::milliseconds(300)});
3750 }
3751
3752 // We want to setup a situation such that 2 of the 3 legs of the loop are very
3753 // confident about time being X, and the third leg is pulling the average off
3754 // to one side.
3755 //
3756 // It's easiest to visualize this in timestamp_plotter.
3757
3758 std::vector<std::string> filenames;
3759 {
3760 // Have pi1 send out a reliable message at startup. This sets up a long
3761 // forwarding time message at the start to bias time.
3762 std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
3763 {
3764 aos::Sender<examples::Ping> ping_sender =
3765 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
3766
3767 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
3768 examples::Ping::Builder ping_builder =
3769 builder.MakeBuilder<examples::Ping>();
3770 CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
3771 }
3772
3773 // Wait a while so there's enough data to let the worst case be rather off.
3774 event_loop_factory.RunFor(chrono::seconds(1000));
3775
3776 // Now start a receiving node first. This sets up 2 tight bounds between 2
3777 // of the nodes.
3778 LoggerState pi2_logger = MakeLoggerState(
3779 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3780 pi2_logger.StartLogger(kLogfile2_1);
3781
3782 event_loop_factory.RunFor(chrono::seconds(100));
3783
3784 // And now start the third leg.
3785 LoggerState pi3_logger = MakeLoggerState(
3786 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3787 pi3_logger.StartLogger(kLogfile3_1);
3788
3789 LoggerState pi1_logger = MakeLoggerState(
3790 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3791 pi1_logger.StartLogger(kLogfile1_1);
3792
3793 event_loop_factory.RunFor(chrono::seconds(100));
3794
3795 pi1_logger.AppendAllFilenames(&filenames);
3796 pi2_logger.AppendAllFilenames(&filenames);
3797 pi3_logger.AppendAllFilenames(&filenames);
3798 }
3799
3800 // Make sure we can read this.
3801 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3802 auto result = ConfirmReadable(filenames);
3803}
3804
3805} // namespace testing
3806} // namespace logger
3807} // namespace aos