blob: d9d04b28aec82b9bd7e02f2ad105a49340681872 [file] [log] [blame]
Brian Smartte67d7112023-03-20 12:06:30 -07001#include <algorithm>
2
Naman Guptaa63aa132023-03-22 20:06:34 -07003#include "aos/events/logging/log_reader.h"
4#include "aos/events/logging/multinode_logger_test_lib.h"
5#include "aos/events/message_counter.h"
6#include "aos/events/ping_lib.h"
7#include "aos/events/pong_lib.h"
8#include "aos/network/remote_message_generated.h"
9#include "aos/network/timestamp_generated.h"
10#include "aos/testing/tmpdir.h"
11#include "gmock/gmock.h"
12#include "gtest/gtest.h"
13
14namespace aos {
15namespace logger {
16namespace testing {
17
18namespace chrono = std::chrono;
19using aos::message_bridge::RemoteMessage;
20using aos::testing::ArtifactPath;
21using aos::testing::MessageCounter;
22
Naman Guptaa63aa132023-03-22 20:06:34 -070023INSTANTIATE_TEST_SUITE_P(
24 All, MultinodeLoggerTest,
25 ::testing::Combine(
26 ::testing::Values(
27 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080028 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070029 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080030 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070031 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
32
33INSTANTIATE_TEST_SUITE_P(
34 All, MultinodeLoggerDeathTest,
35 ::testing::Combine(
36 ::testing::Values(
37 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080038 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070039 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080040 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070041 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
42
43// Tests that we can write and read simple multi-node log files.
44TEST_P(MultinodeLoggerTest, SimpleMultiNode) {
45 std::vector<std::string> actual_filenames;
46 time_converter_.StartEqual();
47
48 {
49 LoggerState pi1_logger = MakeLogger(pi1_);
50 LoggerState pi2_logger = MakeLogger(pi2_);
51
52 event_loop_factory_.RunFor(chrono::milliseconds(95));
53
54 StartLogger(&pi1_logger);
55 StartLogger(&pi2_logger);
56
57 event_loop_factory_.RunFor(chrono::milliseconds(20000));
58 pi1_logger.AppendAllFilenames(&actual_filenames);
59 pi2_logger.AppendAllFilenames(&actual_filenames);
60 }
61
62 ASSERT_THAT(actual_filenames,
63 ::testing::UnorderedElementsAreArray(logfiles_));
64
65 {
66 std::set<std::string> logfile_uuids;
67 std::set<std::string> parts_uuids;
68 // Confirm that we have the expected number of UUIDs for both the logfile
69 // UUIDs and parts UUIDs.
70 std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
71 for (std::string_view f : logfiles_) {
72 log_header.emplace_back(ReadHeader(f).value());
73 if (!log_header.back().message().has_configuration()) {
74 logfile_uuids.insert(
75 log_header.back().message().log_event_uuid()->str());
76 parts_uuids.insert(log_header.back().message().parts_uuid()->str());
77 }
78 }
79
80 EXPECT_EQ(logfile_uuids.size(), 2u);
81 if (shared()) {
82 EXPECT_EQ(parts_uuids.size(), 7u);
83 } else {
84 EXPECT_EQ(parts_uuids.size(), 8u);
85 }
86
87 // And confirm everything is on the correct node.
88 EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
89 EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi1");
90 EXPECT_EQ(log_header[4].message().node()->name()->string_view(), "pi1");
91
92 EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
93 EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi2");
94
95 EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi2");
96 EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi2");
97 EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi2");
98
99 EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi1");
100 EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi1");
101
102 EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
103 EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
104
105 if (shared()) {
106 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
107 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi2");
108 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi2");
109
110 EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi1");
111 EXPECT_EQ(log_header[18].message().node()->name()->string_view(), "pi1");
112 } else {
113 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
114 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi2");
115
116 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi1");
117 EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi1");
118
119 EXPECT_EQ(log_header[18].message().node()->name()->string_view(), "pi2");
120 EXPECT_EQ(log_header[19].message().node()->name()->string_view(), "pi2");
121 }
122
123 // And the parts index matches.
124 EXPECT_EQ(log_header[2].message().parts_index(), 0);
125 EXPECT_EQ(log_header[3].message().parts_index(), 1);
126 EXPECT_EQ(log_header[4].message().parts_index(), 2);
127
128 EXPECT_EQ(log_header[5].message().parts_index(), 0);
129 EXPECT_EQ(log_header[6].message().parts_index(), 1);
130
131 EXPECT_EQ(log_header[7].message().parts_index(), 0);
132 EXPECT_EQ(log_header[8].message().parts_index(), 1);
133 EXPECT_EQ(log_header[9].message().parts_index(), 2);
134
135 EXPECT_EQ(log_header[10].message().parts_index(), 0);
136 EXPECT_EQ(log_header[11].message().parts_index(), 1);
137
138 EXPECT_EQ(log_header[12].message().parts_index(), 0);
139 EXPECT_EQ(log_header[13].message().parts_index(), 1);
140
141 if (shared()) {
142 EXPECT_EQ(log_header[14].message().parts_index(), 0);
143 EXPECT_EQ(log_header[15].message().parts_index(), 1);
144 EXPECT_EQ(log_header[16].message().parts_index(), 2);
145
146 EXPECT_EQ(log_header[17].message().parts_index(), 0);
147 EXPECT_EQ(log_header[18].message().parts_index(), 1);
148 } else {
149 EXPECT_EQ(log_header[14].message().parts_index(), 0);
150 EXPECT_EQ(log_header[15].message().parts_index(), 1);
151
152 EXPECT_EQ(log_header[16].message().parts_index(), 0);
153 EXPECT_EQ(log_header[17].message().parts_index(), 1);
154
155 EXPECT_EQ(log_header[18].message().parts_index(), 0);
156 EXPECT_EQ(log_header[19].message().parts_index(), 1);
157 }
158 }
159
160 const std::vector<LogFile> sorted_log_files = SortParts(logfiles_);
161 {
162 using ::testing::UnorderedElementsAre;
163 std::shared_ptr<const aos::Configuration> config =
164 sorted_log_files[0].config;
165
166 // Timing reports, pings
167 EXPECT_THAT(CountChannelsData(config, logfiles_[2]),
168 UnorderedElementsAre(
169 std::make_tuple("/pi1/aos",
170 "aos.message_bridge.ServerStatistics", 1),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700171 std::make_tuple("/test", "aos.examples.Ping", 1),
172 std::make_tuple("/pi1/aos", "aos.examples.Ping", 1)))
Naman Guptaa63aa132023-03-22 20:06:34 -0700173 << " : " << logfiles_[2];
174 {
175 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700176 std::make_tuple("/pi1/aos", "aos.examples.Ping", 10),
Naman Guptaa63aa132023-03-22 20:06:34 -0700177 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 1),
178 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
179 1)};
180 if (!std::get<0>(GetParam()).shared) {
181 channel_counts.push_back(
182 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
183 "aos-message_bridge-Timestamp",
184 "aos.message_bridge.RemoteMessage", 1));
185 }
186 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
187 ::testing::UnorderedElementsAreArray(channel_counts))
188 << " : " << logfiles_[3];
189 }
190 {
191 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700192 std::make_tuple("/pi1/aos", "aos.examples.Ping", 1990),
Naman Guptaa63aa132023-03-22 20:06:34 -0700193 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 199),
194 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
195 20),
196 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
197 199),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700198 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
Naman Guptaa63aa132023-03-22 20:06:34 -0700199 std::make_tuple("/test", "aos.examples.Ping", 2000)};
200 if (!std::get<0>(GetParam()).shared) {
201 channel_counts.push_back(
202 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
203 "aos-message_bridge-Timestamp",
204 "aos.message_bridge.RemoteMessage", 199));
205 }
206 EXPECT_THAT(CountChannelsData(config, logfiles_[4]),
207 ::testing::UnorderedElementsAreArray(channel_counts))
208 << " : " << logfiles_[4];
209 }
210 // Timestamps for pong
211 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
212 UnorderedElementsAre())
213 << " : " << logfiles_[2];
214 EXPECT_THAT(
215 CountChannelsTimestamp(config, logfiles_[3]),
216 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
217 << " : " << logfiles_[3];
218 EXPECT_THAT(
219 CountChannelsTimestamp(config, logfiles_[4]),
220 UnorderedElementsAre(
221 std::make_tuple("/test", "aos.examples.Pong", 2000),
222 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)))
223 << " : " << logfiles_[4];
224
225 // Pong data.
226 EXPECT_THAT(
227 CountChannelsData(config, logfiles_[5]),
228 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 91)))
229 << " : " << logfiles_[5];
230 EXPECT_THAT(CountChannelsData(config, logfiles_[6]),
231 UnorderedElementsAre(
232 std::make_tuple("/test", "aos.examples.Pong", 1910)))
233 << " : " << logfiles_[6];
234
235 // No timestamps
236 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
237 UnorderedElementsAre())
238 << " : " << logfiles_[5];
239 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[6]),
240 UnorderedElementsAre())
241 << " : " << logfiles_[6];
242
243 // Timing reports and pongs.
244 EXPECT_THAT(CountChannelsData(config, logfiles_[7]),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700245 UnorderedElementsAre(
246 std::make_tuple("/pi2/aos", "aos.examples.Ping", 1),
247 std::make_tuple("/pi2/aos",
248 "aos.message_bridge.ServerStatistics", 1)))
Naman Guptaa63aa132023-03-22 20:06:34 -0700249 << " : " << logfiles_[7];
250 EXPECT_THAT(
251 CountChannelsData(config, logfiles_[8]),
252 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
253 << " : " << logfiles_[8];
254 EXPECT_THAT(
255 CountChannelsData(config, logfiles_[9]),
256 UnorderedElementsAre(
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700257 std::make_tuple("/pi2/aos", "aos.examples.Ping", 2000),
Naman Guptaa63aa132023-03-22 20:06:34 -0700258 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
259 std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
260 20),
261 std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
262 200),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700263 std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
Naman Guptaa63aa132023-03-22 20:06:34 -0700264 std::make_tuple("/test", "aos.examples.Pong", 2000)))
265 << " : " << logfiles_[9];
266 // And ping timestamps.
267 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[7]),
268 UnorderedElementsAre())
269 << " : " << logfiles_[7];
270 EXPECT_THAT(
271 CountChannelsTimestamp(config, logfiles_[8]),
272 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
273 << " : " << logfiles_[8];
274 EXPECT_THAT(
275 CountChannelsTimestamp(config, logfiles_[9]),
276 UnorderedElementsAre(
277 std::make_tuple("/test", "aos.examples.Ping", 2000),
278 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
279 << " : " << logfiles_[9];
280
281 // And then test that the remotely logged timestamp data files only have
282 // timestamps in them.
283 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
284 UnorderedElementsAre())
285 << " : " << logfiles_[10];
286 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
287 UnorderedElementsAre())
288 << " : " << logfiles_[11];
289 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[12]),
290 UnorderedElementsAre())
291 << " : " << logfiles_[12];
292 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[13]),
293 UnorderedElementsAre())
294 << " : " << logfiles_[13];
295
296 EXPECT_THAT(CountChannelsData(config, logfiles_[10]),
297 UnorderedElementsAre(std::make_tuple(
298 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
299 << " : " << logfiles_[10];
300 EXPECT_THAT(CountChannelsData(config, logfiles_[11]),
301 UnorderedElementsAre(std::make_tuple(
302 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
303 << " : " << logfiles_[11];
304
305 EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
306 UnorderedElementsAre(std::make_tuple(
307 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
308 << " : " << logfiles_[12];
309 EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
310 UnorderedElementsAre(std::make_tuple(
311 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
312 << " : " << logfiles_[13];
313
314 // Timestamps from pi2 on pi1, and the other way.
315 if (shared()) {
316 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
317 UnorderedElementsAre())
318 << " : " << logfiles_[14];
319 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
320 UnorderedElementsAre())
321 << " : " << logfiles_[15];
322 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
323 UnorderedElementsAre())
324 << " : " << logfiles_[16];
325 EXPECT_THAT(CountChannelsData(config, logfiles_[17]),
326 UnorderedElementsAre())
327 << " : " << logfiles_[17];
328 EXPECT_THAT(CountChannelsData(config, logfiles_[18]),
329 UnorderedElementsAre())
330 << " : " << logfiles_[18];
331
332 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
333 UnorderedElementsAre(
334 std::make_tuple("/test", "aos.examples.Ping", 1)))
335 << " : " << logfiles_[14];
336 EXPECT_THAT(
337 CountChannelsTimestamp(config, logfiles_[15]),
338 UnorderedElementsAre(
339 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 9),
340 std::make_tuple("/test", "aos.examples.Ping", 90)))
341 << " : " << logfiles_[15];
342 EXPECT_THAT(
343 CountChannelsTimestamp(config, logfiles_[16]),
344 UnorderedElementsAre(
345 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 191),
346 std::make_tuple("/test", "aos.examples.Ping", 1910)))
347 << " : " << logfiles_[16];
348 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
349 UnorderedElementsAre(std::make_tuple(
350 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
351 << " : " << logfiles_[17];
352 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[18]),
353 UnorderedElementsAre(std::make_tuple(
354 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
355 << " : " << logfiles_[18];
356 } else {
357 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
358 UnorderedElementsAre())
359 << " : " << logfiles_[14];
360 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
361 UnorderedElementsAre())
362 << " : " << logfiles_[15];
363 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
364 UnorderedElementsAre())
365 << " : " << logfiles_[16];
366 EXPECT_THAT(CountChannelsData(config, logfiles_[17]),
367 UnorderedElementsAre())
368 << " : " << logfiles_[17];
369 EXPECT_THAT(CountChannelsData(config, logfiles_[18]),
370 UnorderedElementsAre())
371 << " : " << logfiles_[18];
372 EXPECT_THAT(CountChannelsData(config, logfiles_[19]),
373 UnorderedElementsAre())
374 << " : " << logfiles_[19];
375
376 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
377 UnorderedElementsAre(std::make_tuple(
378 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
379 << " : " << logfiles_[14];
380 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
381 UnorderedElementsAre(std::make_tuple(
382 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
383 << " : " << logfiles_[15];
384 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[16]),
385 UnorderedElementsAre(std::make_tuple(
386 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
387 << " : " << logfiles_[16];
388 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
389 UnorderedElementsAre(std::make_tuple(
390 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
391 << " : " << logfiles_[17];
392 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[18]),
393 UnorderedElementsAre(
394 std::make_tuple("/test", "aos.examples.Ping", 91)))
395 << " : " << logfiles_[18];
396 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[19]),
397 UnorderedElementsAre(
398 std::make_tuple("/test", "aos.examples.Ping", 1910)))
399 << " : " << logfiles_[19];
400 }
401 }
402
403 LogReader reader(sorted_log_files);
404
405 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
406 log_reader_factory.set_send_delay(chrono::microseconds(0));
407
408 // This sends out the fetched messages and advances time to the start of the
409 // log file.
410 reader.Register(&log_reader_factory);
411
412 const Node *pi1 =
413 configuration::GetNode(log_reader_factory.configuration(), "pi1");
414 const Node *pi2 =
415 configuration::GetNode(log_reader_factory.configuration(), "pi2");
416
417 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
418 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
419 LOG(INFO) << "now pi1 "
420 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
421 LOG(INFO) << "now pi2 "
422 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
423
424 EXPECT_THAT(reader.LoggedNodes(),
425 ::testing::ElementsAre(
426 configuration::GetNode(reader.logged_configuration(), pi1),
427 configuration::GetNode(reader.logged_configuration(), pi2)));
428
429 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
430
431 std::unique_ptr<EventLoop> pi1_event_loop =
432 log_reader_factory.MakeEventLoop("test", pi1);
433 std::unique_ptr<EventLoop> pi2_event_loop =
434 log_reader_factory.MakeEventLoop("test", pi2);
435
436 int pi1_ping_count = 10;
437 int pi2_ping_count = 10;
438 int pi1_pong_count = 10;
439 int pi2_pong_count = 10;
440
441 // Confirm that the ping value matches.
442 pi1_event_loop->MakeWatcher(
443 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
444 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
445 << pi1_event_loop->context().monotonic_remote_time << " -> "
446 << pi1_event_loop->context().monotonic_event_time;
447 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
448 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
449 pi1_ping_count * chrono::milliseconds(10) +
450 monotonic_clock::epoch());
451 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
452 pi1_ping_count * chrono::milliseconds(10) +
453 realtime_clock::epoch());
454 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
455 pi1_event_loop->context().monotonic_event_time);
456 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
457 pi1_event_loop->context().realtime_event_time);
458
459 ++pi1_ping_count;
460 });
461 pi2_event_loop->MakeWatcher(
462 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
463 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
464 << pi2_event_loop->context().monotonic_remote_time << " -> "
465 << pi2_event_loop->context().monotonic_event_time;
466 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
467
468 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
469 pi2_ping_count * chrono::milliseconds(10) +
470 monotonic_clock::epoch());
471 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
472 pi2_ping_count * chrono::milliseconds(10) +
473 realtime_clock::epoch());
474 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time +
475 chrono::microseconds(150),
476 pi2_event_loop->context().monotonic_event_time);
477 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time +
478 chrono::microseconds(150),
479 pi2_event_loop->context().realtime_event_time);
480 ++pi2_ping_count;
481 });
482
483 constexpr ssize_t kQueueIndexOffset = -9;
484 // Confirm that the ping and pong counts both match, and the value also
485 // matches.
486 pi1_event_loop->MakeWatcher(
487 "/test", [&pi1_event_loop, &pi1_ping_count,
488 &pi1_pong_count](const examples::Pong &pong) {
489 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
490 << pi1_event_loop->context().monotonic_remote_time << " -> "
491 << pi1_event_loop->context().monotonic_event_time;
492
493 EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
494 pi1_pong_count + kQueueIndexOffset);
495 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
496 chrono::microseconds(200) +
497 pi1_pong_count * chrono::milliseconds(10) +
498 monotonic_clock::epoch());
499 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
500 chrono::microseconds(200) +
501 pi1_pong_count * chrono::milliseconds(10) +
502 realtime_clock::epoch());
503
504 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
505 chrono::microseconds(150),
506 pi1_event_loop->context().monotonic_event_time);
507 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
508 chrono::microseconds(150),
509 pi1_event_loop->context().realtime_event_time);
510
511 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
512 ++pi1_pong_count;
513 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
514 });
515 pi2_event_loop->MakeWatcher(
516 "/test", [&pi2_event_loop, &pi2_ping_count,
517 &pi2_pong_count](const examples::Pong &pong) {
518 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
519 << pi2_event_loop->context().monotonic_remote_time << " -> "
520 << pi2_event_loop->context().monotonic_event_time;
521
522 EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
523 pi2_pong_count + kQueueIndexOffset);
524
525 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
526 chrono::microseconds(200) +
527 pi2_pong_count * chrono::milliseconds(10) +
528 monotonic_clock::epoch());
529 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
530 chrono::microseconds(200) +
531 pi2_pong_count * chrono::milliseconds(10) +
532 realtime_clock::epoch());
533
534 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
535 pi2_event_loop->context().monotonic_event_time);
536 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
537 pi2_event_loop->context().realtime_event_time);
538
539 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
540 ++pi2_pong_count;
541 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
542 });
543
544 log_reader_factory.Run();
545 EXPECT_EQ(pi1_ping_count, 2010);
546 EXPECT_EQ(pi2_ping_count, 2010);
547 EXPECT_EQ(pi1_pong_count, 2010);
548 EXPECT_EQ(pi2_pong_count, 2010);
549
550 reader.Deregister();
551}
552
553// Test that if we feed the replay with a mismatched node list that we die on
554// the LogReader constructor.
555TEST_P(MultinodeLoggerDeathTest, MultiNodeBadReplayConfig) {
556 time_converter_.StartEqual();
557 {
558 LoggerState pi1_logger = MakeLogger(pi1_);
559 LoggerState pi2_logger = MakeLogger(pi2_);
560
561 event_loop_factory_.RunFor(chrono::milliseconds(95));
562
563 StartLogger(&pi1_logger);
564 StartLogger(&pi2_logger);
565
566 event_loop_factory_.RunFor(chrono::milliseconds(20000));
567 }
568
569 // Test that, if we add an additional node to the replay config that the
570 // logger complains about the mismatch in number of nodes.
571 FlatbufferDetachedBuffer<Configuration> extra_nodes_config =
572 configuration::MergeWithConfig(&config_.message(), R"({
573 "nodes": [
574 {
575 "name": "extra-node"
576 }
577 ]
578 }
579 )");
580
581 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
582 EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
583 "Log file and replay config need to have matching nodes lists.");
584}
585
586// Tests that we can read log files where they don't start at the same monotonic
587// time.
588TEST_P(MultinodeLoggerTest, StaggeredStart) {
589 time_converter_.StartEqual();
590 std::vector<std::string> actual_filenames;
591
592 {
593 LoggerState pi1_logger = MakeLogger(pi1_);
594 LoggerState pi2_logger = MakeLogger(pi2_);
595
596 event_loop_factory_.RunFor(chrono::milliseconds(95));
597
598 StartLogger(&pi1_logger);
599
600 event_loop_factory_.RunFor(chrono::milliseconds(200));
601
602 StartLogger(&pi2_logger);
603
604 event_loop_factory_.RunFor(chrono::milliseconds(20000));
605 pi1_logger.AppendAllFilenames(&actual_filenames);
606 pi2_logger.AppendAllFilenames(&actual_filenames);
607 }
608
609 // Since we delay starting pi2, it already knows about all the timestamps so
610 // we don't end up with extra parts.
611 LogReader reader(SortParts(actual_filenames));
612
613 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
614 log_reader_factory.set_send_delay(chrono::microseconds(0));
615
616 // This sends out the fetched messages and advances time to the start of the
617 // log file.
618 reader.Register(&log_reader_factory);
619
620 const Node *pi1 =
621 configuration::GetNode(log_reader_factory.configuration(), "pi1");
622 const Node *pi2 =
623 configuration::GetNode(log_reader_factory.configuration(), "pi2");
624
625 EXPECT_THAT(reader.LoggedNodes(),
626 ::testing::ElementsAre(
627 configuration::GetNode(reader.logged_configuration(), pi1),
628 configuration::GetNode(reader.logged_configuration(), pi2)));
629
630 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
631
632 std::unique_ptr<EventLoop> pi1_event_loop =
633 log_reader_factory.MakeEventLoop("test", pi1);
634 std::unique_ptr<EventLoop> pi2_event_loop =
635 log_reader_factory.MakeEventLoop("test", pi2);
636
637 int pi1_ping_count = 30;
638 int pi2_ping_count = 30;
639 int pi1_pong_count = 30;
640 int pi2_pong_count = 30;
641
642 // Confirm that the ping value matches.
643 pi1_event_loop->MakeWatcher(
644 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
645 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
646 << pi1_event_loop->context().monotonic_remote_time << " -> "
647 << pi1_event_loop->context().monotonic_event_time;
648 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
649
650 ++pi1_ping_count;
651 });
652 pi2_event_loop->MakeWatcher(
653 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
654 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
655 << pi2_event_loop->context().monotonic_remote_time << " -> "
656 << pi2_event_loop->context().monotonic_event_time;
657 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
658
659 ++pi2_ping_count;
660 });
661
662 // Confirm that the ping and pong counts both match, and the value also
663 // matches.
664 pi1_event_loop->MakeWatcher(
665 "/test", [&pi1_event_loop, &pi1_ping_count,
666 &pi1_pong_count](const examples::Pong &pong) {
667 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
668 << pi1_event_loop->context().monotonic_remote_time << " -> "
669 << pi1_event_loop->context().monotonic_event_time;
670
671 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
672 ++pi1_pong_count;
673 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
674 });
675 pi2_event_loop->MakeWatcher(
676 "/test", [&pi2_event_loop, &pi2_ping_count,
677 &pi2_pong_count](const examples::Pong &pong) {
678 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
679 << pi2_event_loop->context().monotonic_remote_time << " -> "
680 << pi2_event_loop->context().monotonic_event_time;
681
682 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
683 ++pi2_pong_count;
684 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
685 });
686
687 log_reader_factory.Run();
688 EXPECT_EQ(pi1_ping_count, 2030);
689 EXPECT_EQ(pi2_ping_count, 2030);
690 EXPECT_EQ(pi1_pong_count, 2030);
691 EXPECT_EQ(pi2_pong_count, 2030);
692
693 reader.Deregister();
694}
695
696// Tests that we can read log files where the monotonic clocks drift and don't
697// match correctly. While we are here, also test that different ending times
698// also is readable.
699TEST_P(MultinodeLoggerTest, MismatchedClocks) {
700 // TODO(austin): Negate...
701 const chrono::nanoseconds initial_pi2_offset = chrono::seconds(1000);
702
703 time_converter_.AddMonotonic(
704 {BootTimestamp::epoch(), BootTimestamp::epoch() + initial_pi2_offset});
705 // Wait for 95 ms, (~0.1 seconds - 1/2 of the ping/pong period), and set the
706 // skew to be 200 uS/s
707 const chrono::nanoseconds startup_sleep1 = time_converter_.AddMonotonic(
708 {chrono::milliseconds(95),
709 chrono::milliseconds(95) - chrono::nanoseconds(200) * 95});
710 // Run another 200 ms to have one logger start first.
711 const chrono::nanoseconds startup_sleep2 = time_converter_.AddMonotonic(
712 {chrono::milliseconds(200), chrono::milliseconds(200)});
713 // Slew one way then the other at the same 200 uS/S slew rate. Make sure we
714 // go far enough to cause problems if this isn't accounted for.
715 const chrono::nanoseconds logger_run1 = time_converter_.AddMonotonic(
716 {chrono::milliseconds(20000),
717 chrono::milliseconds(20000) - chrono::nanoseconds(200) * 20000});
718 const chrono::nanoseconds logger_run2 = time_converter_.AddMonotonic(
719 {chrono::milliseconds(40000),
720 chrono::milliseconds(40000) + chrono::nanoseconds(200) * 40000});
721 const chrono::nanoseconds logger_run3 = time_converter_.AddMonotonic(
722 {chrono::milliseconds(400), chrono::milliseconds(400)});
723
724 {
725 LoggerState pi2_logger = MakeLogger(pi2_);
726
727 LOG(INFO) << "pi2 times: " << pi2_->monotonic_now() << " "
728 << pi2_->realtime_now() << " distributed "
729 << pi2_->ToDistributedClock(pi2_->monotonic_now());
730
731 LOG(INFO) << "pi2_ times: " << pi2_->monotonic_now() << " "
732 << pi2_->realtime_now() << " distributed "
733 << pi2_->ToDistributedClock(pi2_->monotonic_now());
734
735 event_loop_factory_.RunFor(startup_sleep1);
736
737 StartLogger(&pi2_logger);
738
739 event_loop_factory_.RunFor(startup_sleep2);
740
741 {
742 // Run pi1's logger for only part of the time.
743 LoggerState pi1_logger = MakeLogger(pi1_);
744
745 StartLogger(&pi1_logger);
746 event_loop_factory_.RunFor(logger_run1);
747
748 // Make sure we slewed time far enough so that the difference is greater
749 // than the network delay. This confirms that if we sort incorrectly, it
750 // would show in the results.
751 EXPECT_LT(
752 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
753 -event_loop_factory_.send_delay() -
754 event_loop_factory_.network_delay());
755
756 event_loop_factory_.RunFor(logger_run2);
757
758 // And now check that we went far enough the other way to make sure we
759 // cover both problems.
760 EXPECT_GT(
761 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
762 event_loop_factory_.send_delay() +
763 event_loop_factory_.network_delay());
764 }
765
766 // And log a bit more on pi2.
767 event_loop_factory_.RunFor(logger_run3);
768 }
769
770 LogReader reader(
771 SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 3, 2)));
772
773 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
774 log_reader_factory.set_send_delay(chrono::microseconds(0));
775
776 const Node *pi1 =
777 configuration::GetNode(log_reader_factory.configuration(), "pi1");
778 const Node *pi2 =
779 configuration::GetNode(log_reader_factory.configuration(), "pi2");
780
781 // This sends out the fetched messages and advances time to the start of the
782 // log file.
783 reader.Register(&log_reader_factory);
784
785 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
786 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
787 LOG(INFO) << "now pi1 "
788 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
789 LOG(INFO) << "now pi2 "
790 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
791
792 LOG(INFO) << "Done registering (pi1) "
793 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
794 << " "
795 << log_reader_factory.GetNodeEventLoopFactory(pi1)->realtime_now();
796 LOG(INFO) << "Done registering (pi2) "
797 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now()
798 << " "
799 << log_reader_factory.GetNodeEventLoopFactory(pi2)->realtime_now();
800
801 EXPECT_THAT(reader.LoggedNodes(),
802 ::testing::ElementsAre(
803 configuration::GetNode(reader.logged_configuration(), pi1),
804 configuration::GetNode(reader.logged_configuration(), pi2)));
805
806 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
807
808 std::unique_ptr<EventLoop> pi1_event_loop =
809 log_reader_factory.MakeEventLoop("test", pi1);
810 std::unique_ptr<EventLoop> pi2_event_loop =
811 log_reader_factory.MakeEventLoop("test", pi2);
812
813 int pi1_ping_count = 30;
814 int pi2_ping_count = 30;
815 int pi1_pong_count = 30;
816 int pi2_pong_count = 30;
817
818 // Confirm that the ping value matches.
819 pi1_event_loop->MakeWatcher(
820 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
821 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
822 << pi1_event_loop->context().monotonic_remote_time << " -> "
823 << pi1_event_loop->context().monotonic_event_time;
824 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
825
826 ++pi1_ping_count;
827 });
828 pi2_event_loop->MakeWatcher(
829 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
830 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
831 << pi2_event_loop->context().monotonic_remote_time << " -> "
832 << pi2_event_loop->context().monotonic_event_time;
833 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
834
835 ++pi2_ping_count;
836 });
837
838 // Confirm that the ping and pong counts both match, and the value also
839 // matches.
840 pi1_event_loop->MakeWatcher(
841 "/test", [&pi1_event_loop, &pi1_ping_count,
842 &pi1_pong_count](const examples::Pong &pong) {
843 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
844 << pi1_event_loop->context().monotonic_remote_time << " -> "
845 << pi1_event_loop->context().monotonic_event_time;
846
847 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
848 ++pi1_pong_count;
849 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
850 });
851 pi2_event_loop->MakeWatcher(
852 "/test", [&pi2_event_loop, &pi2_ping_count,
853 &pi2_pong_count](const examples::Pong &pong) {
854 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
855 << pi2_event_loop->context().monotonic_remote_time << " -> "
856 << pi2_event_loop->context().monotonic_event_time;
857
858 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
859 ++pi2_pong_count;
860 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
861 });
862
863 log_reader_factory.Run();
864 EXPECT_EQ(pi1_ping_count, 6030);
865 EXPECT_EQ(pi2_ping_count, 6030);
866 EXPECT_EQ(pi1_pong_count, 6030);
867 EXPECT_EQ(pi2_pong_count, 6030);
868
869 reader.Deregister();
870}
871
872// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
873TEST_P(MultinodeLoggerTest, SortParts) {
874 time_converter_.StartEqual();
875 // Make a bunch of parts.
876 {
877 LoggerState pi1_logger = MakeLogger(pi1_);
878 LoggerState pi2_logger = MakeLogger(pi2_);
879
880 event_loop_factory_.RunFor(chrono::milliseconds(95));
881
882 StartLogger(&pi1_logger);
883 StartLogger(&pi2_logger);
884
885 event_loop_factory_.RunFor(chrono::milliseconds(2000));
886 }
887
888 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
889 VerifyParts(sorted_parts);
890}
891
892// Tests that we can sort a bunch of parts with an empty part. We should ignore
893// it and remove it from the sorted list.
894TEST_P(MultinodeLoggerTest, SortEmptyParts) {
895 time_converter_.StartEqual();
896 // Make a bunch of parts.
897 {
898 LoggerState pi1_logger = MakeLogger(pi1_);
899 LoggerState pi2_logger = MakeLogger(pi2_);
900
901 event_loop_factory_.RunFor(chrono::milliseconds(95));
902
903 StartLogger(&pi1_logger);
904 StartLogger(&pi2_logger);
905
906 event_loop_factory_.RunFor(chrono::milliseconds(2000));
907 }
908
909 // TODO(austin): Should we flip out if the file can't open?
910 const std::string kEmptyFile("foobarinvalidfiledoesnotexist" + Extension());
911
912 aos::util::WriteStringToFileOrDie(kEmptyFile, "");
913 logfiles_.emplace_back(kEmptyFile);
914
915 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
916 VerifyParts(sorted_parts, {kEmptyFile});
917}
918
919// Tests that we can sort a bunch of parts with the end missing off a
920// file. We should use the part we can read.
921TEST_P(MultinodeLoggerTest, SortTruncatedParts) {
922 std::vector<std::string> actual_filenames;
923 time_converter_.StartEqual();
924 // Make a bunch of parts.
925 {
926 LoggerState pi1_logger = MakeLogger(pi1_);
927 LoggerState pi2_logger = MakeLogger(pi2_);
928
929 event_loop_factory_.RunFor(chrono::milliseconds(95));
930
931 StartLogger(&pi1_logger);
932 StartLogger(&pi2_logger);
933
934 event_loop_factory_.RunFor(chrono::milliseconds(2000));
935
936 pi1_logger.AppendAllFilenames(&actual_filenames);
937 pi2_logger.AppendAllFilenames(&actual_filenames);
938 }
939
940 ASSERT_THAT(actual_filenames,
941 ::testing::UnorderedElementsAreArray(logfiles_));
942
943 // Strip off the end of one of the files. Pick one with a lot of data.
944 // For snappy, needs to have enough data to be >1 chunk of compressed data so
945 // that we don't corrupt the entire log part.
946 ::std::string compressed_contents =
947 aos::util::ReadFileToStringOrDie(logfiles_[4]);
948
949 aos::util::WriteStringToFileOrDie(
950 logfiles_[4],
951 compressed_contents.substr(0, compressed_contents.size() - 100));
952
953 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
954 VerifyParts(sorted_parts);
955}
956
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700957// Tests that if we remap a logged channel, it shows up correctly.
Naman Guptaa63aa132023-03-22 20:06:34 -0700958TEST_P(MultinodeLoggerTest, RemapLoggedChannel) {
959 time_converter_.StartEqual();
960 {
961 LoggerState pi1_logger = MakeLogger(pi1_);
962 LoggerState pi2_logger = MakeLogger(pi2_);
963
964 event_loop_factory_.RunFor(chrono::milliseconds(95));
965
966 StartLogger(&pi1_logger);
967 StartLogger(&pi2_logger);
968
969 event_loop_factory_.RunFor(chrono::milliseconds(20000));
970 }
971
972 LogReader reader(SortParts(logfiles_));
973
974 // Remap just on pi1.
975 reader.RemapLoggedChannel<aos::timing::Report>(
976 "/aos", configuration::GetNode(reader.configuration(), "pi1"));
977
978 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
979 log_reader_factory.set_send_delay(chrono::microseconds(0));
980
981 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
982 // Note: An extra channel gets remapped automatically due to a timestamp
983 // channel being LOCAL_LOGGER'd.
984 ASSERT_EQ(remapped_channels.size(), std::get<0>(GetParam()).shared ? 1u : 2u);
985 EXPECT_EQ(remapped_channels[0]->name()->string_view(), "/original/pi1/aos");
986 EXPECT_EQ(remapped_channels[0]->type()->string_view(), "aos.timing.Report");
987 if (!std::get<0>(GetParam()).shared) {
988 EXPECT_EQ(remapped_channels[1]->name()->string_view(),
989 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
990 "aos-message_bridge-Timestamp");
991 EXPECT_EQ(remapped_channels[1]->type()->string_view(),
992 "aos.message_bridge.RemoteMessage");
993 }
994
995 reader.Register(&log_reader_factory);
996
997 const Node *pi1 =
998 configuration::GetNode(log_reader_factory.configuration(), "pi1");
999 const Node *pi2 =
1000 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1001
1002 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1003 // else should have moved.
1004 std::unique_ptr<EventLoop> pi1_event_loop =
1005 log_reader_factory.MakeEventLoop("test", pi1);
1006 pi1_event_loop->SkipTimingReport();
1007 std::unique_ptr<EventLoop> full_pi1_event_loop =
1008 log_reader_factory.MakeEventLoop("test", pi1);
1009 full_pi1_event_loop->SkipTimingReport();
1010 std::unique_ptr<EventLoop> pi2_event_loop =
1011 log_reader_factory.MakeEventLoop("test", pi2);
1012 pi2_event_loop->SkipTimingReport();
1013
1014 MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
1015 "/aos");
1016 MessageCounter<aos::timing::Report> full_pi1_timing_report(
1017 full_pi1_event_loop.get(), "/pi1/aos");
1018 MessageCounter<aos::timing::Report> pi1_original_timing_report(
1019 pi1_event_loop.get(), "/original/aos");
1020 MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
1021 full_pi1_event_loop.get(), "/original/pi1/aos");
1022 MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
1023 "/aos");
1024
1025 log_reader_factory.Run();
1026
1027 EXPECT_EQ(pi1_timing_report.count(), 0u);
1028 EXPECT_EQ(full_pi1_timing_report.count(), 0u);
1029 EXPECT_NE(pi1_original_timing_report.count(), 0u);
1030 EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
1031 EXPECT_NE(pi2_timing_report.count(), 0u);
1032
1033 reader.Deregister();
1034}
1035
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001036// Tests that if we rename a logged channel, it shows up correctly.
1037TEST_P(MultinodeLoggerTest, RenameLoggedChannel) {
1038 std::vector<std::string> actual_filenames;
1039 time_converter_.StartEqual();
1040 {
1041 LoggerState pi1_logger = MakeLogger(pi1_);
1042 LoggerState pi2_logger = MakeLogger(pi2_);
1043
1044 event_loop_factory_.RunFor(chrono::milliseconds(95));
1045
1046 StartLogger(&pi1_logger);
1047 StartLogger(&pi2_logger);
1048
1049 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1050
1051 pi1_logger.AppendAllFilenames(&actual_filenames);
1052 pi2_logger.AppendAllFilenames(&actual_filenames);
1053 }
1054
1055 LogReader reader(SortParts(actual_filenames));
1056
1057 // Rename just on pi2. Add some global maps just to verify they get added in
1058 // the config and used correctly.
1059 std::vector<MapT> maps;
1060 {
1061 MapT map;
1062 map.match = std::make_unique<ChannelT>();
1063 map.match->name = "/foo*";
1064 map.match->source_node = "pi1";
1065 map.rename = std::make_unique<ChannelT>();
1066 map.rename->name = "/pi1/foo";
1067 maps.emplace_back(std::move(map));
1068 }
1069 {
1070 MapT map;
1071 map.match = std::make_unique<ChannelT>();
1072 map.match->name = "/foo*";
1073 map.match->source_node = "pi2";
1074 map.rename = std::make_unique<ChannelT>();
1075 map.rename->name = "/pi2/foo";
1076 maps.emplace_back(std::move(map));
1077 }
1078 {
1079 MapT map;
1080 map.match = std::make_unique<ChannelT>();
1081 map.match->name = "/foo";
1082 map.match->type = "aos.examples.Ping";
1083 map.rename = std::make_unique<ChannelT>();
1084 map.rename->name = "/foo/renamed";
1085 maps.emplace_back(std::move(map));
1086 }
1087 reader.RenameLoggedChannel<aos::examples::Ping>(
1088 "/aos", configuration::GetNode(reader.configuration(), "pi2"),
1089 "/pi2/foo/renamed", maps);
1090
1091 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1092 log_reader_factory.set_send_delay(chrono::microseconds(0));
1093
1094 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1095 // Note: An extra channel gets remapped automatically due to a timestamp
1096 // channel being LOCAL_LOGGER'd.
1097 const bool shared = std::get<0>(GetParam()).shared;
1098 ASSERT_EQ(remapped_channels.size(), shared ? 1u : 2u);
1099 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->name()->string_view(),
1100 "/pi2/foo/renamed");
1101 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->type()->string_view(),
1102 "aos.examples.Ping");
1103 if (!shared) {
1104 EXPECT_EQ(remapped_channels[0]->name()->string_view(),
1105 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1106 "aos-message_bridge-Timestamp");
1107 EXPECT_EQ(remapped_channels[0]->type()->string_view(),
1108 "aos.message_bridge.RemoteMessage");
1109 }
1110
1111 reader.Register(&log_reader_factory);
1112
1113 const Node *pi1 =
1114 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1115 const Node *pi2 =
1116 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1117
1118 // Confirm we can read the data on the renamed channel, just for pi2. Nothing
1119 // else should have moved.
1120 std::unique_ptr<EventLoop> pi2_event_loop =
1121 log_reader_factory.MakeEventLoop("test", pi2);
1122 pi2_event_loop->SkipTimingReport();
1123 std::unique_ptr<EventLoop> full_pi2_event_loop =
1124 log_reader_factory.MakeEventLoop("test", pi2);
1125 full_pi2_event_loop->SkipTimingReport();
1126 std::unique_ptr<EventLoop> pi1_event_loop =
1127 log_reader_factory.MakeEventLoop("test", pi1);
1128 pi1_event_loop->SkipTimingReport();
1129
1130 MessageCounter<aos::examples::Ping> pi2_ping(pi2_event_loop.get(), "/aos");
1131 MessageCounter<aos::examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1132 "/foo");
1133 MessageCounter<aos::examples::Ping> full_pi2_renamed_ping(
1134 full_pi2_event_loop.get(), "/pi2/foo/renamed");
1135 MessageCounter<aos::examples::Ping> pi1_ping(pi1_event_loop.get(), "/aos");
1136
1137 log_reader_factory.Run();
1138
1139 EXPECT_EQ(pi2_ping.count(), 0u);
1140 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1141 EXPECT_NE(full_pi2_renamed_ping.count(), 0u);
1142 EXPECT_NE(pi1_ping.count(), 0u);
1143
1144 reader.Deregister();
1145}
1146
Naman Guptaa63aa132023-03-22 20:06:34 -07001147// Tests that we can remap a forwarded channel as well.
1148TEST_P(MultinodeLoggerTest, RemapForwardedLoggedChannel) {
1149 time_converter_.StartEqual();
1150 {
1151 LoggerState pi1_logger = MakeLogger(pi1_);
1152 LoggerState pi2_logger = MakeLogger(pi2_);
1153
1154 event_loop_factory_.RunFor(chrono::milliseconds(95));
1155
1156 StartLogger(&pi1_logger);
1157 StartLogger(&pi2_logger);
1158
1159 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1160 }
1161
1162 LogReader reader(SortParts(logfiles_));
1163
1164 reader.RemapLoggedChannel<examples::Ping>("/test");
1165
1166 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1167 log_reader_factory.set_send_delay(chrono::microseconds(0));
1168
1169 reader.Register(&log_reader_factory);
1170
1171 const Node *pi1 =
1172 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1173 const Node *pi2 =
1174 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1175
1176 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1177 // else should have moved.
1178 std::unique_ptr<EventLoop> pi1_event_loop =
1179 log_reader_factory.MakeEventLoop("test", pi1);
1180 pi1_event_loop->SkipTimingReport();
1181 std::unique_ptr<EventLoop> full_pi1_event_loop =
1182 log_reader_factory.MakeEventLoop("test", pi1);
1183 full_pi1_event_loop->SkipTimingReport();
1184 std::unique_ptr<EventLoop> pi2_event_loop =
1185 log_reader_factory.MakeEventLoop("test", pi2);
1186 pi2_event_loop->SkipTimingReport();
1187
1188 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1189 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1190 MessageCounter<examples::Ping> pi1_original_ping(pi1_event_loop.get(),
1191 "/original/test");
1192 MessageCounter<examples::Ping> pi2_original_ping(pi2_event_loop.get(),
1193 "/original/test");
1194
1195 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1196 pi1_original_ping_timestamp;
1197 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1198 pi1_ping_timestamp;
1199 if (!shared()) {
1200 pi1_original_ping_timestamp =
1201 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1202 pi1_event_loop.get(),
1203 "/pi1/aos/remote_timestamps/pi2/original/test/aos-examples-Ping");
1204 pi1_ping_timestamp =
1205 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1206 pi1_event_loop.get(),
1207 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1208 }
1209
1210 log_reader_factory.Run();
1211
1212 EXPECT_EQ(pi1_ping.count(), 0u);
1213 EXPECT_EQ(pi2_ping.count(), 0u);
1214 EXPECT_NE(pi1_original_ping.count(), 0u);
1215 EXPECT_NE(pi2_original_ping.count(), 0u);
1216 if (!shared()) {
1217 EXPECT_NE(pi1_original_ping_timestamp->count(), 0u);
1218 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1219 }
1220
1221 reader.Deregister();
1222}
1223
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001224// Tests that we can rename a forwarded channel as well.
1225TEST_P(MultinodeLoggerTest, RenameForwardedLoggedChannel) {
1226 std::vector<std::string> actual_filenames;
1227 time_converter_.StartEqual();
1228 {
1229 LoggerState pi1_logger = MakeLogger(pi1_);
1230 LoggerState pi2_logger = MakeLogger(pi2_);
1231
1232 event_loop_factory_.RunFor(chrono::milliseconds(95));
1233
1234 StartLogger(&pi1_logger);
1235 StartLogger(&pi2_logger);
1236
1237 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1238
1239 pi1_logger.AppendAllFilenames(&actual_filenames);
1240 pi2_logger.AppendAllFilenames(&actual_filenames);
1241 }
1242
1243 LogReader reader(SortParts(actual_filenames));
1244
1245 std::vector<MapT> maps;
1246 {
1247 MapT map;
1248 map.match = std::make_unique<ChannelT>();
1249 map.match->name = "/production*";
1250 map.match->source_node = "pi1";
1251 map.rename = std::make_unique<ChannelT>();
1252 map.rename->name = "/pi1/production";
1253 maps.emplace_back(std::move(map));
1254 }
1255 {
1256 MapT map;
1257 map.match = std::make_unique<ChannelT>();
1258 map.match->name = "/production*";
1259 map.match->source_node = "pi2";
1260 map.rename = std::make_unique<ChannelT>();
1261 map.rename->name = "/pi2/production";
1262 maps.emplace_back(std::move(map));
1263 }
1264 reader.RenameLoggedChannel<aos::examples::Ping>(
1265 "/test", configuration::GetNode(reader.configuration(), "pi1"),
1266 "/pi1/production", maps);
1267
1268 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1269 log_reader_factory.set_send_delay(chrono::microseconds(0));
1270
1271 reader.Register(&log_reader_factory);
1272
1273 const Node *pi1 =
1274 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1275 const Node *pi2 =
1276 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1277
1278 // Confirm we can read the data on the renamed channel, on both the source
1279 // node and the remote node. In case of split timestamp channels, confirm that
1280 // we receive the timestamp messages on the renamed channel as well.
1281 std::unique_ptr<EventLoop> pi1_event_loop =
1282 log_reader_factory.MakeEventLoop("test", pi1);
1283 pi1_event_loop->SkipTimingReport();
1284 std::unique_ptr<EventLoop> full_pi1_event_loop =
1285 log_reader_factory.MakeEventLoop("test", pi1);
1286 full_pi1_event_loop->SkipTimingReport();
1287 std::unique_ptr<EventLoop> pi2_event_loop =
1288 log_reader_factory.MakeEventLoop("test", pi2);
1289 pi2_event_loop->SkipTimingReport();
1290
1291 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1292 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1293 MessageCounter<examples::Ping> pi1_renamed_ping(pi1_event_loop.get(),
1294 "/pi1/production");
1295 MessageCounter<examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1296 "/pi1/production");
1297
1298 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1299 pi1_renamed_ping_timestamp;
1300 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1301 pi1_ping_timestamp;
1302 if (!shared()) {
1303 pi1_renamed_ping_timestamp =
1304 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1305 pi1_event_loop.get(),
1306 "/pi1/aos/remote_timestamps/pi2/pi1/production/aos-examples-Ping");
1307 pi1_ping_timestamp =
1308 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1309 pi1_event_loop.get(),
1310 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1311 }
1312
1313 log_reader_factory.Run();
1314
1315 EXPECT_EQ(pi1_ping.count(), 0u);
1316 EXPECT_EQ(pi2_ping.count(), 0u);
1317 EXPECT_NE(pi1_renamed_ping.count(), 0u);
1318 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1319 if (!shared()) {
1320 EXPECT_NE(pi1_renamed_ping_timestamp->count(), 0u);
1321 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1322 }
1323
1324 reader.Deregister();
1325}
1326
Naman Guptaa63aa132023-03-22 20:06:34 -07001327// Tests that we observe all the same events in log replay (for a given node)
1328// whether we just register an event loop for that node or if we register a full
1329// event loop factory.
1330TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
1331 time_converter_.StartEqual();
1332 constexpr chrono::milliseconds kStartupDelay(95);
1333 {
1334 LoggerState pi1_logger = MakeLogger(pi1_);
1335 LoggerState pi2_logger = MakeLogger(pi2_);
1336
1337 event_loop_factory_.RunFor(kStartupDelay);
1338
1339 StartLogger(&pi1_logger);
1340 StartLogger(&pi2_logger);
1341
1342 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1343 }
1344
1345 LogReader full_reader(SortParts(logfiles_));
1346 LogReader single_node_reader(SortParts(logfiles_));
1347
1348 SimulatedEventLoopFactory full_factory(full_reader.configuration());
1349 SimulatedEventLoopFactory single_node_factory(
1350 single_node_reader.configuration());
1351 single_node_factory.SkipTimingReport();
1352 single_node_factory.DisableStatistics();
1353 std::unique_ptr<EventLoop> replay_event_loop =
1354 single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
1355 "log_reader");
1356
1357 full_reader.Register(&full_factory);
1358 single_node_reader.Register(replay_event_loop.get());
1359
1360 const Node *full_pi1 =
1361 configuration::GetNode(full_factory.configuration(), "pi1");
1362
1363 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1364 // else should have moved.
1365 std::unique_ptr<EventLoop> full_event_loop =
1366 full_factory.MakeEventLoop("test", full_pi1);
1367 full_event_loop->SkipTimingReport();
1368 full_event_loop->SkipAosLog();
1369 // maps are indexed on channel index.
1370 // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
1371 std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
1372 observed_messages;
1373 std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
1374 for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
1375 ++ii) {
1376 const Channel *channel =
1377 full_event_loop->configuration()->channels()->Get(ii);
1378 // We currently don't support replaying remote timestamp channels in
1379 // realtime replay (unless the remote timestamp channel was not NOT_LOGGED,
1380 // in which case it gets auto-remapped and replayed on a /original channel).
1381 if (channel->name()->string_view().find("remote_timestamp") !=
1382 std::string_view::npos &&
1383 channel->name()->string_view().find("/original") ==
1384 std::string_view::npos) {
1385 continue;
1386 }
1387 if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
1388 observed_messages[ii] = {};
1389 fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
1390 full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
1391 if (fetchers[ii]->Fetch()) {
1392 observed_messages[ii].push_back(std::make_pair(
1393 fetchers[ii]->context().monotonic_event_time, true));
1394 }
1395 });
1396 full_event_loop->MakeRawNoArgWatcher(
1397 channel, [ii, &observed_messages](const Context &context) {
1398 observed_messages[ii].push_back(
1399 std::make_pair(context.monotonic_event_time, false));
1400 });
1401 }
1402 }
1403
1404 full_factory.Run();
1405 fetchers.clear();
1406 full_reader.Deregister();
1407
1408 const Node *single_node_pi1 =
1409 configuration::GetNode(single_node_factory.configuration(), "pi1");
1410 std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
1411
1412 std::unique_ptr<EventLoop> single_node_event_loop =
1413 single_node_factory.MakeEventLoop("test", single_node_pi1);
1414 single_node_event_loop->SkipTimingReport();
1415 single_node_event_loop->SkipAosLog();
1416 for (size_t ii = 0;
1417 ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
1418 const Channel *channel =
1419 single_node_event_loop->configuration()->channels()->Get(ii);
1420 single_node_factory.DisableForwarding(channel);
1421 if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
1422 single_node_fetchers[ii] =
1423 single_node_event_loop->MakeRawFetcher(channel);
1424 single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
1425 EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
1426 << "Single EventLoop replay doesn't support pre-loading fetchers. "
1427 << configuration::StrippedChannelToString(channel);
1428 });
1429 single_node_event_loop->MakeRawNoArgWatcher(
1430 channel, [ii, &observed_messages, channel,
1431 kStartupDelay](const Context &context) {
1432 if (observed_messages[ii].empty()) {
1433 FAIL() << "Observed extra message at "
1434 << context.monotonic_event_time << " on "
1435 << configuration::StrippedChannelToString(channel);
1436 return;
1437 }
1438 const std::pair<monotonic_clock::time_point, bool> &message =
1439 observed_messages[ii].front();
1440 if (message.second) {
1441 EXPECT_LE(message.first,
1442 context.monotonic_event_time + kStartupDelay)
1443 << "Mismatched message times " << context.monotonic_event_time
1444 << " and " << message.first << " on "
1445 << configuration::StrippedChannelToString(channel);
1446 } else {
1447 EXPECT_EQ(message.first,
1448 context.monotonic_event_time + kStartupDelay)
1449 << "Mismatched message times " << context.monotonic_event_time
1450 << " and " << message.first << " on "
1451 << configuration::StrippedChannelToString(channel);
1452 }
1453 observed_messages[ii].erase(observed_messages[ii].begin());
1454 });
1455 }
1456 }
1457
1458 single_node_factory.Run();
1459
1460 single_node_fetchers.clear();
1461
1462 single_node_reader.Deregister();
1463
1464 for (const auto &pair : observed_messages) {
1465 EXPECT_TRUE(pair.second.empty())
1466 << "Missed " << pair.second.size() << " messages on "
1467 << configuration::StrippedChannelToString(
1468 single_node_event_loop->configuration()->channels()->Get(
1469 pair.first));
1470 }
1471}
1472
1473// Tests that we properly recreate forwarded timestamps when replaying a log.
1474// This should be enough that we can then re-run the logger and get a valid log
1475// back.
1476TEST_P(MultinodeLoggerTest, MessageHeader) {
1477 time_converter_.StartEqual();
1478 {
1479 LoggerState pi1_logger = MakeLogger(pi1_);
1480 LoggerState pi2_logger = MakeLogger(pi2_);
1481
1482 event_loop_factory_.RunFor(chrono::milliseconds(95));
1483
1484 StartLogger(&pi1_logger);
1485 StartLogger(&pi2_logger);
1486
1487 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1488 }
1489
1490 LogReader reader(SortParts(logfiles_));
1491
1492 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1493 log_reader_factory.set_send_delay(chrono::microseconds(0));
1494
1495 // This sends out the fetched messages and advances time to the start of the
1496 // log file.
1497 reader.Register(&log_reader_factory);
1498
1499 const Node *pi1 =
1500 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1501 const Node *pi2 =
1502 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1503
1504 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1505 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1506 LOG(INFO) << "now pi1 "
1507 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1508 LOG(INFO) << "now pi2 "
1509 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1510
1511 EXPECT_THAT(reader.LoggedNodes(),
1512 ::testing::ElementsAre(
1513 configuration::GetNode(reader.logged_configuration(), pi1),
1514 configuration::GetNode(reader.logged_configuration(), pi2)));
1515
1516 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1517
1518 std::unique_ptr<EventLoop> pi1_event_loop =
1519 log_reader_factory.MakeEventLoop("test", pi1);
1520 std::unique_ptr<EventLoop> pi2_event_loop =
1521 log_reader_factory.MakeEventLoop("test", pi2);
1522
1523 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
1524 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1525 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
1526 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1527
1528 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1529 pi1_event_loop->MakeFetcher<examples::Ping>("/test");
1530 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1531 pi2_event_loop->MakeFetcher<examples::Ping>("/test");
1532
1533 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
1534 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1535 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
1536 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1537
1538 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1539 pi2_event_loop->MakeFetcher<examples::Pong>("/test");
1540 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1541 pi1_event_loop->MakeFetcher<examples::Pong>("/test");
1542
1543 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
1544 pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
1545 const size_t ping_timestamp_channel = configuration::ChannelIndex(
1546 pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
1547
1548 const size_t pi2_timestamp_channel = configuration::ChannelIndex(
1549 pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
1550 const size_t pong_timestamp_channel = configuration::ChannelIndex(
1551 pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
1552
1553 const chrono::nanoseconds network_delay = event_loop_factory_.network_delay();
1554 const chrono::nanoseconds send_delay = event_loop_factory_.send_delay();
1555
1556 for (std::pair<int, std::string> channel :
1557 shared()
1558 ? std::vector<
1559 std::pair<int, std::string>>{{-1,
1560 "/aos/remote_timestamps/pi2"}}
1561 : std::vector<std::pair<int, std::string>>{
1562 {pi1_timestamp_channel,
1563 "/aos/remote_timestamps/pi2/pi1/aos/"
1564 "aos-message_bridge-Timestamp"},
1565 {ping_timestamp_channel,
1566 "/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
1567 pi1_event_loop->MakeWatcher(
1568 channel.second,
1569 [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
1570 ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
1571 &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
1572 &ping_on_pi2_fetcher, network_delay, send_delay,
1573 channel_index = channel.first](const RemoteMessage &header) {
1574 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1575 chrono::nanoseconds(header.monotonic_sent_time()));
1576 const aos::realtime_clock::time_point header_realtime_sent_time(
1577 chrono::nanoseconds(header.realtime_sent_time()));
1578 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1579 chrono::nanoseconds(header.monotonic_remote_time()));
1580 const aos::realtime_clock::time_point header_realtime_remote_time(
1581 chrono::nanoseconds(header.realtime_remote_time()));
1582
1583 if (channel_index != -1) {
1584 ASSERT_EQ(channel_index, header.channel_index());
1585 }
1586
1587 const Context *pi1_context = nullptr;
1588 const Context *pi2_context = nullptr;
1589
1590 if (header.channel_index() == pi1_timestamp_channel) {
1591 ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
1592 ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
1593 pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
1594 pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
1595 } else if (header.channel_index() == ping_timestamp_channel) {
1596 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
1597 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
1598 pi1_context = &ping_on_pi1_fetcher.context();
1599 pi2_context = &ping_on_pi2_fetcher.context();
1600 } else {
1601 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1602 << configuration::CleanedChannelToString(
1603 pi1_event_loop->configuration()->channels()->Get(
1604 header.channel_index()));
1605 }
1606
1607 ASSERT_TRUE(header.has_boot_uuid());
1608 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1609 pi2_event_loop->boot_uuid());
1610
1611 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
1612 EXPECT_EQ(pi2_context->remote_queue_index,
1613 header.remote_queue_index());
1614 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
1615
1616 EXPECT_EQ(pi2_context->monotonic_event_time,
1617 header_monotonic_sent_time);
1618 EXPECT_EQ(pi2_context->realtime_event_time,
1619 header_realtime_sent_time);
1620 EXPECT_EQ(pi2_context->realtime_remote_time,
1621 header_realtime_remote_time);
1622 EXPECT_EQ(pi2_context->monotonic_remote_time,
1623 header_monotonic_remote_time);
1624
1625 EXPECT_EQ(pi1_context->realtime_event_time,
1626 header_realtime_remote_time);
1627 EXPECT_EQ(pi1_context->monotonic_event_time,
1628 header_monotonic_remote_time);
1629
1630 // Time estimation isn't perfect, but we know the clocks were
1631 // identical when logged, so we know when this should have come back.
1632 // Confirm we got it when we expected.
1633 EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
1634 pi1_context->monotonic_event_time + 2 * network_delay +
1635 send_delay);
1636 });
1637 }
1638 for (std::pair<int, std::string> channel :
1639 shared()
1640 ? std::vector<
1641 std::pair<int, std::string>>{{-1,
1642 "/aos/remote_timestamps/pi1"}}
1643 : std::vector<std::pair<int, std::string>>{
1644 {pi2_timestamp_channel,
1645 "/aos/remote_timestamps/pi1/pi2/aos/"
1646 "aos-message_bridge-Timestamp"}}) {
1647 pi2_event_loop->MakeWatcher(
1648 channel.second,
1649 [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
1650 pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
1651 &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
1652 &pong_on_pi1_fetcher, network_delay, send_delay,
1653 channel_index = channel.first](const RemoteMessage &header) {
1654 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1655 chrono::nanoseconds(header.monotonic_sent_time()));
1656 const aos::realtime_clock::time_point header_realtime_sent_time(
1657 chrono::nanoseconds(header.realtime_sent_time()));
1658 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1659 chrono::nanoseconds(header.monotonic_remote_time()));
1660 const aos::realtime_clock::time_point header_realtime_remote_time(
1661 chrono::nanoseconds(header.realtime_remote_time()));
1662
1663 if (channel_index != -1) {
1664 ASSERT_EQ(channel_index, header.channel_index());
1665 }
1666
1667 const Context *pi2_context = nullptr;
1668 const Context *pi1_context = nullptr;
1669
1670 if (header.channel_index() == pi2_timestamp_channel) {
1671 ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
1672 ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
1673 pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
1674 pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
1675 } else if (header.channel_index() == pong_timestamp_channel) {
1676 ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
1677 ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
1678 pi2_context = &pong_on_pi2_fetcher.context();
1679 pi1_context = &pong_on_pi1_fetcher.context();
1680 } else {
1681 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1682 << configuration::CleanedChannelToString(
1683 pi2_event_loop->configuration()->channels()->Get(
1684 header.channel_index()));
1685 }
1686
1687 ASSERT_TRUE(header.has_boot_uuid());
1688 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1689 pi1_event_loop->boot_uuid());
1690
1691 EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
1692 EXPECT_EQ(pi1_context->remote_queue_index,
1693 header.remote_queue_index());
1694 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
1695
1696 EXPECT_EQ(pi1_context->monotonic_event_time,
1697 header_monotonic_sent_time);
1698 EXPECT_EQ(pi1_context->realtime_event_time,
1699 header_realtime_sent_time);
1700 EXPECT_EQ(pi1_context->realtime_remote_time,
1701 header_realtime_remote_time);
1702 EXPECT_EQ(pi1_context->monotonic_remote_time,
1703 header_monotonic_remote_time);
1704
1705 EXPECT_EQ(pi2_context->realtime_event_time,
1706 header_realtime_remote_time);
1707 EXPECT_EQ(pi2_context->monotonic_event_time,
1708 header_monotonic_remote_time);
1709
1710 // Time estimation isn't perfect, but we know the clocks were
1711 // identical when logged, so we know when this should have come back.
1712 // Confirm we got it when we expected.
1713 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
1714 pi2_context->monotonic_event_time + 2 * network_delay +
1715 send_delay);
1716 });
1717 }
1718
1719 // And confirm we can re-create a log again, while checking the contents.
1720 {
1721 LoggerState pi1_logger = MakeLogger(
1722 log_reader_factory.GetNodeEventLoopFactory("pi1"), &log_reader_factory);
1723 LoggerState pi2_logger = MakeLogger(
1724 log_reader_factory.GetNodeEventLoopFactory("pi2"), &log_reader_factory);
1725
1726 StartLogger(&pi1_logger, tmp_dir_ + "/relogged1");
1727 StartLogger(&pi2_logger, tmp_dir_ + "/relogged2");
1728
1729 log_reader_factory.Run();
1730 }
1731
1732 reader.Deregister();
1733
1734 // And verify that we can run the LogReader over the relogged files without
1735 // hitting any fatal errors.
1736 {
1737 LogReader relogged_reader(SortParts(MakeLogFiles(
1738 tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 3, 3, true)));
1739 relogged_reader.Register();
1740
1741 relogged_reader.event_loop_factory()->Run();
1742 }
1743 // And confirm that we can read the logged file using the reader's
1744 // configuration.
1745 {
1746 LogReader relogged_reader(
1747 SortParts(MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2",
1748 3, 3, true)),
1749 reader.configuration());
1750 relogged_reader.Register();
1751
1752 relogged_reader.event_loop_factory()->Run();
1753 }
1754}
1755
1756// Tests that we properly populate and extract the logger_start time by setting
1757// up a clock difference between 2 nodes and looking at the resulting parts.
1758TEST_P(MultinodeLoggerTest, LoggerStartTime) {
1759 std::vector<std::string> actual_filenames;
1760 time_converter_.AddMonotonic(
1761 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1762 {
1763 LoggerState pi1_logger = MakeLogger(pi1_);
1764 LoggerState pi2_logger = MakeLogger(pi2_);
1765
1766 StartLogger(&pi1_logger);
1767 StartLogger(&pi2_logger);
1768
1769 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1770
1771 pi1_logger.AppendAllFilenames(&actual_filenames);
1772 pi2_logger.AppendAllFilenames(&actual_filenames);
1773 }
1774
1775 ASSERT_THAT(actual_filenames,
1776 ::testing::UnorderedElementsAreArray(logfiles_));
1777
1778 for (const LogFile &log_file : SortParts(logfiles_)) {
1779 for (const LogParts &log_part : log_file.parts) {
1780 if (log_part.node == log_file.logger_node) {
1781 EXPECT_EQ(log_part.logger_monotonic_start_time,
1782 aos::monotonic_clock::min_time);
1783 EXPECT_EQ(log_part.logger_realtime_start_time,
1784 aos::realtime_clock::min_time);
1785 } else {
1786 const chrono::seconds offset = log_file.logger_node == "pi1"
1787 ? -chrono::seconds(1000)
1788 : chrono::seconds(1000);
1789 EXPECT_EQ(log_part.logger_monotonic_start_time,
1790 log_part.monotonic_start_time + offset);
1791 EXPECT_EQ(log_part.logger_realtime_start_time,
1792 log_file.realtime_start_time +
1793 (log_part.logger_monotonic_start_time -
1794 log_file.monotonic_start_time));
1795 }
1796 }
1797 }
1798}
1799
1800// Test that renaming the base, renames the folder.
1801TEST_P(MultinodeLoggerTest, LoggerRenameFolder) {
1802 util::UnlinkRecursive(tmp_dir_ + "/renamefolder");
1803 util::UnlinkRecursive(tmp_dir_ + "/new-good");
1804 time_converter_.AddMonotonic(
1805 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1806 logfile_base1_ = tmp_dir_ + "/renamefolder/multi_logfile1";
1807 logfile_base2_ = tmp_dir_ + "/renamefolder/multi_logfile2";
1808 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
1809 LoggerState pi1_logger = MakeLogger(pi1_);
1810 LoggerState pi2_logger = MakeLogger(pi2_);
1811
1812 StartLogger(&pi1_logger);
1813 StartLogger(&pi2_logger);
1814
1815 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1816 logfile_base1_ = tmp_dir_ + "/new-good/multi_logfile1";
1817 logfile_base2_ = tmp_dir_ + "/new-good/multi_logfile2";
1818 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
1819 ASSERT_TRUE(pi1_logger.logger->RenameLogBase(logfile_base1_));
1820 ASSERT_TRUE(pi2_logger.logger->RenameLogBase(logfile_base2_));
1821 for (auto &file : logfiles_) {
1822 struct stat s;
1823 EXPECT_EQ(0, stat(file.c_str(), &s));
1824 }
1825}
1826
1827// Test that renaming the file base dies.
1828TEST_P(MultinodeLoggerDeathTest, LoggerRenameFile) {
1829 time_converter_.AddMonotonic(
1830 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1831 util::UnlinkRecursive(tmp_dir_ + "/renamefile");
1832 logfile_base1_ = tmp_dir_ + "/renamefile/multi_logfile1";
1833 logfile_base2_ = tmp_dir_ + "/renamefile/multi_logfile2";
1834 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
1835 LoggerState pi1_logger = MakeLogger(pi1_);
1836 StartLogger(&pi1_logger);
1837 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1838 logfile_base1_ = tmp_dir_ + "/new-renamefile/new_multi_logfile1";
1839 EXPECT_DEATH({ pi1_logger.logger->RenameLogBase(logfile_base1_); },
1840 "Rename of file base from");
1841}
1842
1843// TODO(austin): We can write a test which recreates a logfile and confirms that
1844// we get it back. That is the ultimate test.
1845
1846// Tests that we properly recreate forwarded timestamps when replaying a log.
1847// This should be enough that we can then re-run the logger and get a valid log
1848// back.
1849TEST_P(MultinodeLoggerTest, RemoteReboot) {
1850 std::vector<std::string> actual_filenames;
1851
1852 const UUID pi1_boot0 = UUID::Random();
1853 const UUID pi2_boot0 = UUID::Random();
1854 const UUID pi2_boot1 = UUID::Random();
1855 {
1856 CHECK_EQ(pi1_index_, 0u);
1857 CHECK_EQ(pi2_index_, 1u);
1858
1859 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
1860 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
1861 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
1862
1863 time_converter_.AddNextTimestamp(
1864 distributed_clock::epoch(),
1865 {BootTimestamp::epoch(), BootTimestamp::epoch()});
1866 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
1867 time_converter_.AddNextTimestamp(
1868 distributed_clock::epoch() + reboot_time,
1869 {BootTimestamp::epoch() + reboot_time,
1870 BootTimestamp{
1871 .boot = 1,
1872 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
1873 }
1874
1875 {
1876 LoggerState pi1_logger = MakeLogger(pi1_);
1877
1878 event_loop_factory_.RunFor(chrono::milliseconds(95));
1879 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
1880 pi1_boot0);
1881 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
1882 pi2_boot0);
1883
1884 StartLogger(&pi1_logger);
1885
1886 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1887
1888 VLOG(1) << "Reboot now!";
1889
1890 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1891 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
1892 pi1_boot0);
1893 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
1894 pi2_boot1);
1895
1896 pi1_logger.AppendAllFilenames(&actual_filenames);
1897 }
1898
1899 std::sort(actual_filenames.begin(), actual_filenames.end());
1900 std::sort(pi1_reboot_logfiles_.begin(), pi1_reboot_logfiles_.end());
1901 ASSERT_THAT(actual_filenames,
1902 ::testing::UnorderedElementsAreArray(pi1_reboot_logfiles_));
1903
1904 // Confirm that our new oldest timestamps properly update as we reboot and
1905 // rotate.
1906 for (const std::string &file : pi1_reboot_logfiles_) {
1907 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
1908 ReadHeader(file);
1909 CHECK(log_header);
1910 if (log_header->message().has_configuration()) {
1911 continue;
1912 }
1913
1914 const monotonic_clock::time_point monotonic_start_time =
1915 monotonic_clock::time_point(
1916 chrono::nanoseconds(log_header->message().monotonic_start_time()));
1917 const UUID source_node_boot_uuid = UUID::FromString(
1918 log_header->message().source_node_boot_uuid()->string_view());
1919
1920 if (log_header->message().node()->name()->string_view() != "pi1") {
1921 // The remote message channel should rotate later and have more parts.
1922 // This only is true on the log files with shared remote messages.
1923 //
1924 // TODO(austin): I'm not the most thrilled with this test pattern... It
1925 // feels brittle in a different way.
1926 if (file.find("aos.message_bridge.RemoteMessage") == std::string::npos ||
1927 !shared()) {
1928 switch (log_header->message().parts_index()) {
1929 case 0:
1930 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1931 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
1932 break;
1933 case 1:
1934 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1935 ASSERT_EQ(monotonic_start_time,
1936 monotonic_clock::epoch() + chrono::seconds(1));
1937 break;
1938 case 2:
1939 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1940 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
1941 break;
1942 case 3:
1943 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1944 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
1945 chrono::nanoseconds(2322999462))
1946 << " on " << file;
1947 break;
1948 default:
1949 FAIL();
1950 break;
1951 }
1952 } else {
1953 switch (log_header->message().parts_index()) {
1954 case 0:
1955 case 1:
1956 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1957 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
1958 break;
1959 case 2:
1960 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1961 ASSERT_EQ(monotonic_start_time,
1962 monotonic_clock::epoch() + chrono::seconds(1));
1963 break;
1964 case 3:
1965 case 4:
1966 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1967 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
1968 break;
1969 case 5:
1970 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1971 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
1972 chrono::nanoseconds(2322999462))
1973 << " on " << file;
1974 break;
1975 default:
1976 FAIL();
1977 break;
1978 }
1979 }
1980 continue;
1981 }
1982 SCOPED_TRACE(file);
1983 SCOPED_TRACE(aos::FlatbufferToJson(
1984 *log_header, {.multi_line = true, .max_vector_size = 100}));
1985 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
1986 ASSERT_EQ(
1987 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
1988 EXPECT_EQ(
1989 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
1990 monotonic_clock::max_time.time_since_epoch().count());
1991 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
1992 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
1993 2u);
1994 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
1995 monotonic_clock::max_time.time_since_epoch().count());
1996 ASSERT_TRUE(log_header->message()
1997 .has_oldest_remote_unreliable_monotonic_timestamps());
1998 ASSERT_EQ(log_header->message()
1999 .oldest_remote_unreliable_monotonic_timestamps()
2000 ->size(),
2001 2u);
2002 EXPECT_EQ(log_header->message()
2003 .oldest_remote_unreliable_monotonic_timestamps()
2004 ->Get(0),
2005 monotonic_clock::max_time.time_since_epoch().count());
2006 ASSERT_TRUE(log_header->message()
2007 .has_oldest_local_unreliable_monotonic_timestamps());
2008 ASSERT_EQ(log_header->message()
2009 .oldest_local_unreliable_monotonic_timestamps()
2010 ->size(),
2011 2u);
2012 EXPECT_EQ(log_header->message()
2013 .oldest_local_unreliable_monotonic_timestamps()
2014 ->Get(0),
2015 monotonic_clock::max_time.time_since_epoch().count());
2016
2017 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2018 monotonic_clock::time_point(chrono::nanoseconds(
2019 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2020 1)));
2021 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2022 monotonic_clock::time_point(chrono::nanoseconds(
2023 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2024 const monotonic_clock::time_point
2025 oldest_remote_unreliable_monotonic_timestamps =
2026 monotonic_clock::time_point(chrono::nanoseconds(
2027 log_header->message()
2028 .oldest_remote_unreliable_monotonic_timestamps()
2029 ->Get(1)));
2030 const monotonic_clock::time_point
2031 oldest_local_unreliable_monotonic_timestamps =
2032 monotonic_clock::time_point(chrono::nanoseconds(
2033 log_header->message()
2034 .oldest_local_unreliable_monotonic_timestamps()
2035 ->Get(1)));
2036 const monotonic_clock::time_point
2037 oldest_remote_reliable_monotonic_timestamps =
2038 monotonic_clock::time_point(chrono::nanoseconds(
2039 log_header->message()
2040 .oldest_remote_reliable_monotonic_timestamps()
2041 ->Get(1)));
2042 const monotonic_clock::time_point
2043 oldest_local_reliable_monotonic_timestamps =
2044 monotonic_clock::time_point(chrono::nanoseconds(
2045 log_header->message()
2046 .oldest_local_reliable_monotonic_timestamps()
2047 ->Get(1)));
2048 const monotonic_clock::time_point
2049 oldest_logger_remote_unreliable_monotonic_timestamps =
2050 monotonic_clock::time_point(chrono::nanoseconds(
2051 log_header->message()
2052 .oldest_logger_remote_unreliable_monotonic_timestamps()
2053 ->Get(0)));
2054 const monotonic_clock::time_point
2055 oldest_logger_local_unreliable_monotonic_timestamps =
2056 monotonic_clock::time_point(chrono::nanoseconds(
2057 log_header->message()
2058 .oldest_logger_local_unreliable_monotonic_timestamps()
2059 ->Get(0)));
2060 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2061 monotonic_clock::max_time);
2062 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2063 monotonic_clock::max_time);
2064 switch (log_header->message().parts_index()) {
2065 case 0:
2066 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2067 monotonic_clock::max_time);
2068 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2069 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2070 monotonic_clock::max_time);
2071 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2072 monotonic_clock::max_time);
2073 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2074 monotonic_clock::max_time);
2075 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2076 monotonic_clock::max_time);
2077 break;
2078 case 1:
2079 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2080 monotonic_clock::time_point(chrono::microseconds(90200)));
2081 EXPECT_EQ(oldest_local_monotonic_timestamps,
2082 monotonic_clock::time_point(chrono::microseconds(90350)));
2083 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2084 monotonic_clock::time_point(chrono::microseconds(90200)));
2085 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2086 monotonic_clock::time_point(chrono::microseconds(90350)));
2087 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2088 monotonic_clock::max_time);
2089 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2090 monotonic_clock::max_time);
2091 break;
2092 case 2:
2093 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2094 monotonic_clock::time_point(chrono::microseconds(90200)))
2095 << file;
2096 EXPECT_EQ(oldest_local_monotonic_timestamps,
2097 monotonic_clock::time_point(chrono::microseconds(90350)))
2098 << file;
2099 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2100 monotonic_clock::time_point(chrono::microseconds(90200)))
2101 << file;
2102 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2103 monotonic_clock::time_point(chrono::microseconds(90350)))
2104 << file;
2105 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2106 monotonic_clock::time_point(chrono::microseconds(100000)))
2107 << file;
2108 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2109 monotonic_clock::time_point(chrono::microseconds(100150)))
2110 << file;
2111 break;
2112 case 3:
2113 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2114 monotonic_clock::time_point(chrono::milliseconds(1323) +
2115 chrono::microseconds(200)));
2116 EXPECT_EQ(oldest_local_monotonic_timestamps,
2117 monotonic_clock::time_point(chrono::microseconds(10100350)));
2118 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2119 monotonic_clock::time_point(chrono::milliseconds(1323) +
2120 chrono::microseconds(200)));
2121 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2122 monotonic_clock::time_point(chrono::microseconds(10100350)));
2123 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2124 monotonic_clock::max_time)
2125 << file;
2126 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2127 monotonic_clock::max_time)
2128 << file;
2129 break;
2130 case 4:
2131 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2132 monotonic_clock::time_point(chrono::milliseconds(1323) +
2133 chrono::microseconds(200)));
2134 EXPECT_EQ(oldest_local_monotonic_timestamps,
2135 monotonic_clock::time_point(chrono::microseconds(10100350)));
2136 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2137 monotonic_clock::time_point(chrono::milliseconds(1323) +
2138 chrono::microseconds(200)));
2139 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2140 monotonic_clock::time_point(chrono::microseconds(10100350)));
2141 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2142 monotonic_clock::time_point(chrono::microseconds(1423000)))
2143 << file;
2144 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2145 monotonic_clock::time_point(chrono::microseconds(10200150)))
2146 << file;
2147 break;
2148 default:
2149 FAIL();
2150 break;
2151 }
2152 }
2153
2154 // Confirm that we refuse to replay logs with missing boot uuids.
2155 {
2156 LogReader reader(SortParts(pi1_reboot_logfiles_));
2157
2158 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2159 log_reader_factory.set_send_delay(chrono::microseconds(0));
2160
2161 // This sends out the fetched messages and advances time to the start of
2162 // the log file.
2163 reader.Register(&log_reader_factory);
2164
2165 log_reader_factory.Run();
2166
2167 reader.Deregister();
2168 }
2169}
2170
2171// Tests that we can sort a log which only has timestamps from the remote
2172// because the local message_bridge_client failed to connect.
2173TEST_P(MultinodeLoggerTest, RemoteRebootOnlyTimestamps) {
2174 const UUID pi1_boot0 = UUID::Random();
2175 const UUID pi2_boot0 = UUID::Random();
2176 const UUID pi2_boot1 = UUID::Random();
2177 {
2178 CHECK_EQ(pi1_index_, 0u);
2179 CHECK_EQ(pi2_index_, 1u);
2180
2181 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2182 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2183 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2184
2185 time_converter_.AddNextTimestamp(
2186 distributed_clock::epoch(),
2187 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2188 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2189 time_converter_.AddNextTimestamp(
2190 distributed_clock::epoch() + reboot_time,
2191 {BootTimestamp::epoch() + reboot_time,
2192 BootTimestamp{
2193 .boot = 1,
2194 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2195 }
2196 pi2_->Disconnect(pi1_->node());
2197
2198 std::vector<std::string> filenames;
2199 {
2200 LoggerState pi1_logger = MakeLogger(pi1_);
2201
2202 event_loop_factory_.RunFor(chrono::milliseconds(95));
2203 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2204 pi1_boot0);
2205 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2206 pi2_boot0);
2207
2208 StartLogger(&pi1_logger);
2209
2210 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2211
2212 VLOG(1) << "Reboot now!";
2213
2214 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2215 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2216 pi1_boot0);
2217 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2218 pi2_boot1);
2219 pi1_logger.AppendAllFilenames(&filenames);
2220 }
2221
2222 std::sort(filenames.begin(), filenames.end());
2223
2224 // Confirm that our new oldest timestamps properly update as we reboot and
2225 // rotate.
2226 size_t timestamp_file_count = 0;
2227 for (const std::string &file : filenames) {
2228 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2229 ReadHeader(file);
2230 CHECK(log_header);
2231
2232 if (log_header->message().has_configuration()) {
2233 continue;
2234 }
2235
2236 const monotonic_clock::time_point monotonic_start_time =
2237 monotonic_clock::time_point(
2238 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2239 const UUID source_node_boot_uuid = UUID::FromString(
2240 log_header->message().source_node_boot_uuid()->string_view());
2241
2242 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2243 ASSERT_EQ(
2244 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2245 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2246 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2247 2u);
2248 ASSERT_TRUE(log_header->message()
2249 .has_oldest_remote_unreliable_monotonic_timestamps());
2250 ASSERT_EQ(log_header->message()
2251 .oldest_remote_unreliable_monotonic_timestamps()
2252 ->size(),
2253 2u);
2254 ASSERT_TRUE(log_header->message()
2255 .has_oldest_local_unreliable_monotonic_timestamps());
2256 ASSERT_EQ(log_header->message()
2257 .oldest_local_unreliable_monotonic_timestamps()
2258 ->size(),
2259 2u);
2260 ASSERT_TRUE(log_header->message()
2261 .has_oldest_remote_reliable_monotonic_timestamps());
2262 ASSERT_EQ(log_header->message()
2263 .oldest_remote_reliable_monotonic_timestamps()
2264 ->size(),
2265 2u);
2266 ASSERT_TRUE(
2267 log_header->message().has_oldest_local_reliable_monotonic_timestamps());
2268 ASSERT_EQ(log_header->message()
2269 .oldest_local_reliable_monotonic_timestamps()
2270 ->size(),
2271 2u);
2272
2273 ASSERT_TRUE(
2274 log_header->message()
2275 .has_oldest_logger_remote_unreliable_monotonic_timestamps());
2276 ASSERT_EQ(log_header->message()
2277 .oldest_logger_remote_unreliable_monotonic_timestamps()
2278 ->size(),
2279 2u);
2280 ASSERT_TRUE(log_header->message()
2281 .has_oldest_logger_local_unreliable_monotonic_timestamps());
2282 ASSERT_EQ(log_header->message()
2283 .oldest_logger_local_unreliable_monotonic_timestamps()
2284 ->size(),
2285 2u);
2286
2287 if (log_header->message().node()->name()->string_view() != "pi1") {
2288 ASSERT_TRUE(file.find("aos.message_bridge.RemoteMessage") !=
2289 std::string::npos);
2290
2291 const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
2292 ReadNthMessage(file, 0);
2293 CHECK(msg);
2294
2295 EXPECT_TRUE(msg->message().has_monotonic_sent_time());
2296 EXPECT_TRUE(msg->message().has_monotonic_remote_time());
2297
2298 const monotonic_clock::time_point
2299 expected_oldest_local_monotonic_timestamps(
2300 chrono::nanoseconds(msg->message().monotonic_sent_time()));
2301 const monotonic_clock::time_point
2302 expected_oldest_remote_monotonic_timestamps(
2303 chrono::nanoseconds(msg->message().monotonic_remote_time()));
2304 const monotonic_clock::time_point
2305 expected_oldest_timestamp_monotonic_timestamps(
2306 chrono::nanoseconds(msg->message().monotonic_timestamp_time()));
2307
2308 EXPECT_NE(expected_oldest_local_monotonic_timestamps,
2309 monotonic_clock::min_time);
2310 EXPECT_NE(expected_oldest_remote_monotonic_timestamps,
2311 monotonic_clock::min_time);
2312 EXPECT_NE(expected_oldest_timestamp_monotonic_timestamps,
2313 monotonic_clock::min_time);
2314
2315 ++timestamp_file_count;
2316 // Since the log file is from the perspective of the other node,
2317 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2318 monotonic_clock::time_point(chrono::nanoseconds(
2319 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2320 0)));
2321 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2322 monotonic_clock::time_point(chrono::nanoseconds(
2323 log_header->message().oldest_local_monotonic_timestamps()->Get(
2324 0)));
2325 const monotonic_clock::time_point
2326 oldest_remote_unreliable_monotonic_timestamps =
2327 monotonic_clock::time_point(chrono::nanoseconds(
2328 log_header->message()
2329 .oldest_remote_unreliable_monotonic_timestamps()
2330 ->Get(0)));
2331 const monotonic_clock::time_point
2332 oldest_local_unreliable_monotonic_timestamps =
2333 monotonic_clock::time_point(chrono::nanoseconds(
2334 log_header->message()
2335 .oldest_local_unreliable_monotonic_timestamps()
2336 ->Get(0)));
2337 const monotonic_clock::time_point
2338 oldest_remote_reliable_monotonic_timestamps =
2339 monotonic_clock::time_point(chrono::nanoseconds(
2340 log_header->message()
2341 .oldest_remote_reliable_monotonic_timestamps()
2342 ->Get(0)));
2343 const monotonic_clock::time_point
2344 oldest_local_reliable_monotonic_timestamps =
2345 monotonic_clock::time_point(chrono::nanoseconds(
2346 log_header->message()
2347 .oldest_local_reliable_monotonic_timestamps()
2348 ->Get(0)));
2349 const monotonic_clock::time_point
2350 oldest_logger_remote_unreliable_monotonic_timestamps =
2351 monotonic_clock::time_point(chrono::nanoseconds(
2352 log_header->message()
2353 .oldest_logger_remote_unreliable_monotonic_timestamps()
2354 ->Get(1)));
2355 const monotonic_clock::time_point
2356 oldest_logger_local_unreliable_monotonic_timestamps =
2357 monotonic_clock::time_point(chrono::nanoseconds(
2358 log_header->message()
2359 .oldest_logger_local_unreliable_monotonic_timestamps()
2360 ->Get(1)));
2361
2362 const Channel *channel =
2363 event_loop_factory_.configuration()->channels()->Get(
2364 msg->message().channel_index());
2365 const Connection *connection = configuration::ConnectionToNode(
2366 channel, configuration::GetNode(
2367 event_loop_factory_.configuration(),
2368 log_header->message().node()->name()->string_view()));
2369
2370 const bool reliable = connection->time_to_live() == 0;
2371
2372 SCOPED_TRACE(file);
2373 SCOPED_TRACE(aos::FlatbufferToJson(
2374 *log_header, {.multi_line = true, .max_vector_size = 100}));
2375
2376 if (shared()) {
2377 // Confirm that the oldest timestamps match what we expect. Based on
2378 // what we are doing, we know that the oldest time is the first
2379 // message's time.
2380 //
2381 // This makes the test robust to both the split and combined config
2382 // tests.
2383 switch (log_header->message().parts_index()) {
2384 case 0:
2385 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2386 expected_oldest_remote_monotonic_timestamps);
2387 EXPECT_EQ(oldest_local_monotonic_timestamps,
2388 expected_oldest_local_monotonic_timestamps);
2389 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2390 expected_oldest_local_monotonic_timestamps)
2391 << file;
2392 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2393 expected_oldest_timestamp_monotonic_timestamps)
2394 << file;
2395
2396 if (reliable) {
2397 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2398 expected_oldest_remote_monotonic_timestamps);
2399 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2400 expected_oldest_local_monotonic_timestamps);
2401 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2402 monotonic_clock::max_time);
2403 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2404 monotonic_clock::max_time);
2405 } else {
2406 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2407 monotonic_clock::max_time);
2408 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2409 monotonic_clock::max_time);
2410 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2411 expected_oldest_remote_monotonic_timestamps);
2412 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2413 expected_oldest_local_monotonic_timestamps);
2414 }
2415 break;
2416 case 1:
2417 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2418 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2419 EXPECT_EQ(oldest_local_monotonic_timestamps,
2420 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2421 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2422 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2423 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2424 monotonic_clock::epoch() + chrono::nanoseconds(90250000));
2425 if (reliable) {
2426 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2427 expected_oldest_remote_monotonic_timestamps);
2428 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2429 expected_oldest_local_monotonic_timestamps);
2430 EXPECT_EQ(
2431 oldest_remote_unreliable_monotonic_timestamps,
2432 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2433 EXPECT_EQ(
2434 oldest_local_unreliable_monotonic_timestamps,
2435 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2436 } else {
2437 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2438 monotonic_clock::max_time);
2439 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2440 monotonic_clock::max_time);
2441 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2442 expected_oldest_remote_monotonic_timestamps);
2443 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2444 expected_oldest_local_monotonic_timestamps);
2445 }
2446 break;
2447 case 2:
2448 EXPECT_EQ(
2449 oldest_remote_monotonic_timestamps,
2450 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2451 EXPECT_EQ(
2452 oldest_local_monotonic_timestamps,
2453 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2454 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2455 expected_oldest_local_monotonic_timestamps)
2456 << file;
2457 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2458 expected_oldest_timestamp_monotonic_timestamps)
2459 << file;
2460 if (reliable) {
2461 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2462 expected_oldest_remote_monotonic_timestamps);
2463 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2464 expected_oldest_local_monotonic_timestamps);
2465 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2466 monotonic_clock::max_time);
2467 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2468 monotonic_clock::max_time);
2469 } else {
2470 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2471 monotonic_clock::max_time);
2472 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2473 monotonic_clock::max_time);
2474 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2475 expected_oldest_remote_monotonic_timestamps);
2476 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2477 expected_oldest_local_monotonic_timestamps);
2478 }
2479 break;
2480
2481 case 3:
2482 EXPECT_EQ(
2483 oldest_remote_monotonic_timestamps,
2484 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2485 EXPECT_EQ(
2486 oldest_local_monotonic_timestamps,
2487 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2488 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2489 expected_oldest_remote_monotonic_timestamps);
2490 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2491 expected_oldest_local_monotonic_timestamps);
2492 EXPECT_EQ(
2493 oldest_logger_remote_unreliable_monotonic_timestamps,
2494 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2495 EXPECT_EQ(
2496 oldest_logger_local_unreliable_monotonic_timestamps,
2497 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2498 break;
2499 default:
2500 FAIL();
2501 break;
2502 }
2503
2504 switch (log_header->message().parts_index()) {
2505 case 0:
2506 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2507 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2508 break;
2509 case 1:
2510 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2511 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2512 break;
2513 case 2:
2514 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2515 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2516 break;
2517 case 3:
2518 if (shared()) {
2519 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2520 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2521 break;
2522 }
2523 [[fallthrough]];
2524 default:
2525 FAIL();
2526 break;
2527 }
2528 } else {
2529 switch (log_header->message().parts_index()) {
2530 case 0:
2531 if (reliable) {
2532 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2533 monotonic_clock::max_time);
2534 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2535 monotonic_clock::max_time);
2536 EXPECT_EQ(
2537 oldest_logger_remote_unreliable_monotonic_timestamps,
2538 monotonic_clock::epoch() + chrono::nanoseconds(100150000))
2539 << file;
2540 EXPECT_EQ(
2541 oldest_logger_local_unreliable_monotonic_timestamps,
2542 monotonic_clock::epoch() + chrono::nanoseconds(100250000))
2543 << file;
2544 } else {
2545 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2546 expected_oldest_remote_monotonic_timestamps);
2547 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2548 expected_oldest_local_monotonic_timestamps);
2549 EXPECT_EQ(
2550 oldest_logger_remote_unreliable_monotonic_timestamps,
2551 monotonic_clock::epoch() + chrono::nanoseconds(90150000))
2552 << file;
2553 EXPECT_EQ(
2554 oldest_logger_local_unreliable_monotonic_timestamps,
2555 monotonic_clock::epoch() + chrono::nanoseconds(90250000))
2556 << file;
2557 }
2558 break;
2559 case 1:
2560 if (reliable) {
2561 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2562 monotonic_clock::max_time);
2563 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2564 monotonic_clock::max_time);
2565 EXPECT_EQ(
2566 oldest_logger_remote_unreliable_monotonic_timestamps,
2567 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2568 EXPECT_EQ(
2569 oldest_logger_local_unreliable_monotonic_timestamps,
2570 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2571 } else {
2572 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2573 expected_oldest_remote_monotonic_timestamps);
2574 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2575 expected_oldest_local_monotonic_timestamps);
2576 EXPECT_EQ(
2577 oldest_logger_remote_unreliable_monotonic_timestamps,
2578 monotonic_clock::epoch() + chrono::nanoseconds(1323150000));
2579 EXPECT_EQ(
2580 oldest_logger_local_unreliable_monotonic_timestamps,
2581 monotonic_clock::epoch() + chrono::nanoseconds(10100250000));
2582 }
2583 break;
2584 default:
2585 FAIL();
2586 break;
2587 }
2588
2589 switch (log_header->message().parts_index()) {
2590 case 0:
2591 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2592 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2593 break;
2594 case 1:
2595 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2596 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2597 break;
2598 default:
2599 FAIL();
2600 break;
2601 }
2602 }
2603
2604 continue;
2605 }
2606 EXPECT_EQ(
2607 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2608 monotonic_clock::max_time.time_since_epoch().count());
2609 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2610 monotonic_clock::max_time.time_since_epoch().count());
2611 EXPECT_EQ(log_header->message()
2612 .oldest_remote_unreliable_monotonic_timestamps()
2613 ->Get(0),
2614 monotonic_clock::max_time.time_since_epoch().count());
2615 EXPECT_EQ(log_header->message()
2616 .oldest_local_unreliable_monotonic_timestamps()
2617 ->Get(0),
2618 monotonic_clock::max_time.time_since_epoch().count());
2619
2620 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2621 monotonic_clock::time_point(chrono::nanoseconds(
2622 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2623 1)));
2624 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2625 monotonic_clock::time_point(chrono::nanoseconds(
2626 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2627 const monotonic_clock::time_point
2628 oldest_remote_unreliable_monotonic_timestamps =
2629 monotonic_clock::time_point(chrono::nanoseconds(
2630 log_header->message()
2631 .oldest_remote_unreliable_monotonic_timestamps()
2632 ->Get(1)));
2633 const monotonic_clock::time_point
2634 oldest_local_unreliable_monotonic_timestamps =
2635 monotonic_clock::time_point(chrono::nanoseconds(
2636 log_header->message()
2637 .oldest_local_unreliable_monotonic_timestamps()
2638 ->Get(1)));
2639 switch (log_header->message().parts_index()) {
2640 case 0:
2641 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2642 monotonic_clock::max_time);
2643 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2644 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2645 monotonic_clock::max_time);
2646 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2647 monotonic_clock::max_time);
2648 break;
2649 default:
2650 FAIL();
2651 break;
2652 }
2653 }
2654
2655 if (shared()) {
2656 EXPECT_EQ(timestamp_file_count, 4u);
2657 } else {
2658 EXPECT_EQ(timestamp_file_count, 4u);
2659 }
2660
2661 // Confirm that we can actually sort the resulting log and read it.
2662 {
2663 LogReader reader(SortParts(filenames));
2664
2665 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2666 log_reader_factory.set_send_delay(chrono::microseconds(0));
2667
2668 // This sends out the fetched messages and advances time to the start of
2669 // the log file.
2670 reader.Register(&log_reader_factory);
2671
2672 log_reader_factory.Run();
2673
2674 reader.Deregister();
2675 }
2676}
2677
2678// Tests that we properly handle one direction of message_bridge being
2679// unavailable.
2680TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
2681 pi1_->Disconnect(pi2_->node());
2682 time_converter_.AddMonotonic(
2683 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2684
2685 time_converter_.AddMonotonic(
2686 {chrono::milliseconds(10000),
2687 chrono::milliseconds(10000) - chrono::milliseconds(1)});
2688 {
2689 LoggerState pi1_logger = MakeLogger(pi1_);
2690
2691 event_loop_factory_.RunFor(chrono::milliseconds(95));
2692
2693 StartLogger(&pi1_logger);
2694
2695 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2696 }
2697
2698 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2699 // to confirm the right thing happened.
2700 ConfirmReadable(pi1_single_direction_logfiles_);
2701}
2702
2703// Tests that we properly handle one direction of message_bridge being
2704// unavailable.
2705TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
2706 pi1_->Disconnect(pi2_->node());
2707 time_converter_.AddMonotonic(
2708 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(500)});
2709
2710 time_converter_.AddMonotonic(
2711 {chrono::milliseconds(10000),
2712 chrono::milliseconds(10000) + chrono::milliseconds(1)});
2713 {
2714 LoggerState pi1_logger = MakeLogger(pi1_);
2715
2716 event_loop_factory_.RunFor(chrono::milliseconds(95));
2717
2718 StartLogger(&pi1_logger);
2719
2720 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2721 }
2722
2723 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2724 // to confirm the right thing happened.
2725 ConfirmReadable(pi1_single_direction_logfiles_);
2726}
2727
2728// Tests that we explode if someone passes in a part file twice with a better
2729// error than an out of order error.
2730TEST_P(MultinodeLoggerTest, DuplicateLogFiles) {
2731 time_converter_.AddMonotonic(
2732 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2733 {
2734 LoggerState pi1_logger = MakeLogger(pi1_);
2735
2736 event_loop_factory_.RunFor(chrono::milliseconds(95));
2737
2738 StartLogger(&pi1_logger);
2739
2740 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2741 }
2742
2743 std::vector<std::string> duplicates;
2744 for (const std::string &f : pi1_single_direction_logfiles_) {
2745 duplicates.emplace_back(f);
2746 duplicates.emplace_back(f);
2747 }
2748 EXPECT_DEATH({ SortParts(duplicates); }, "Found duplicate parts in");
2749}
2750
2751// Tests that we explode if someone loses a part out of the middle of a log.
2752TEST_P(MultinodeLoggerTest, MissingPartsFromMiddle) {
2753 time_converter_.AddMonotonic(
2754 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2755 {
2756 LoggerState pi1_logger = MakeLogger(pi1_);
2757
2758 event_loop_factory_.RunFor(chrono::milliseconds(95));
2759
2760 StartLogger(&pi1_logger);
2761 aos::monotonic_clock::time_point last_rotation_time =
2762 pi1_logger.event_loop->monotonic_now();
2763 pi1_logger.logger->set_on_logged_period([&] {
2764 const auto now = pi1_logger.event_loop->monotonic_now();
2765 if (now > last_rotation_time + std::chrono::seconds(5)) {
2766 pi1_logger.logger->Rotate();
2767 last_rotation_time = now;
2768 }
2769 });
2770
2771 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2772 }
2773
2774 std::vector<std::string> missing_parts;
2775
2776 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
2777 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part2" + Extension());
2778 missing_parts.emplace_back(absl::StrCat(
2779 logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
2780
2781 EXPECT_DEATH({ SortParts(missing_parts); },
2782 "Broken log, missing part files between");
2783}
2784
2785// Tests that we properly handle a dead node. Do this by just disconnecting it
2786// and only using one nodes of logs.
2787TEST_P(MultinodeLoggerTest, DeadNode) {
2788 pi1_->Disconnect(pi2_->node());
2789 pi2_->Disconnect(pi1_->node());
2790 time_converter_.AddMonotonic(
2791 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2792 {
2793 LoggerState pi1_logger = MakeLogger(pi1_);
2794
2795 event_loop_factory_.RunFor(chrono::milliseconds(95));
2796
2797 StartLogger(&pi1_logger);
2798
2799 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2800 }
2801
2802 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2803 // to confirm the right thing happened.
2804 ConfirmReadable(MakePi1DeadNodeLogfiles());
2805}
2806
2807// Tests that we can relog with a different config. This makes most sense when
2808// you are trying to edit a log and want to use channel renaming + the original
2809// config in the new log.
2810TEST_P(MultinodeLoggerTest, LogDifferentConfig) {
2811 time_converter_.StartEqual();
2812 {
2813 LoggerState pi1_logger = MakeLogger(pi1_);
2814 LoggerState pi2_logger = MakeLogger(pi2_);
2815
2816 event_loop_factory_.RunFor(chrono::milliseconds(95));
2817
2818 StartLogger(&pi1_logger);
2819 StartLogger(&pi2_logger);
2820
2821 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2822 }
2823
2824 LogReader reader(SortParts(logfiles_));
2825 reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
2826
2827 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2828 log_reader_factory.set_send_delay(chrono::microseconds(0));
2829
2830 // This sends out the fetched messages and advances time to the start of the
2831 // log file.
2832 reader.Register(&log_reader_factory);
2833
2834 const Node *pi1 =
2835 configuration::GetNode(log_reader_factory.configuration(), "pi1");
2836 const Node *pi2 =
2837 configuration::GetNode(log_reader_factory.configuration(), "pi2");
2838
2839 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
2840 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
2841 LOG(INFO) << "now pi1 "
2842 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
2843 LOG(INFO) << "now pi2 "
2844 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
2845
2846 EXPECT_THAT(reader.LoggedNodes(),
2847 ::testing::ElementsAre(
2848 configuration::GetNode(reader.logged_configuration(), pi1),
2849 configuration::GetNode(reader.logged_configuration(), pi2)));
2850
2851 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
2852
2853 // And confirm we can re-create a log again, while checking the contents.
2854 std::vector<std::string> log_files;
2855 {
2856 LoggerState pi1_logger =
2857 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
2858 &log_reader_factory, reader.logged_configuration());
2859 LoggerState pi2_logger =
2860 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
2861 &log_reader_factory, reader.logged_configuration());
2862
2863 pi1_logger.StartLogger(tmp_dir_ + "/relogged1");
2864 pi2_logger.StartLogger(tmp_dir_ + "/relogged2");
2865
2866 log_reader_factory.Run();
2867
2868 for (auto &x : pi1_logger.log_namer->all_filenames()) {
2869 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged1_", x));
2870 }
2871 for (auto &x : pi2_logger.log_namer->all_filenames()) {
2872 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged2_", x));
2873 }
2874 }
2875
2876 reader.Deregister();
2877
2878 // And verify that we can run the LogReader over the relogged files without
2879 // hitting any fatal errors.
2880 {
2881 LogReader relogged_reader(SortParts(log_files));
2882 relogged_reader.Register();
2883
2884 relogged_reader.event_loop_factory()->Run();
2885 }
2886}
2887
2888// Tests that we properly replay a log where the start time for a node is before
2889// any data on the node. This can happen if the logger starts before data is
2890// published. While the scenario below is a bit convoluted, we have seen logs
2891// like this generated out in the wild.
2892TEST(MultinodeRebootLoggerTest, StartTimeBeforeData) {
2893 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2894 aos::configuration::ReadConfig(ArtifactPath(
2895 "aos/events/logging/multinode_pingpong_split3_config.json"));
2896 message_bridge::TestingTimeConverter time_converter(
2897 configuration::NodesCount(&config.message()));
2898 SimulatedEventLoopFactory event_loop_factory(&config.message());
2899 event_loop_factory.SetTimeConverter(&time_converter);
2900 NodeEventLoopFactory *const pi1 =
2901 event_loop_factory.GetNodeEventLoopFactory("pi1");
2902 const size_t pi1_index = configuration::GetNodeIndex(
2903 event_loop_factory.configuration(), pi1->node());
2904 NodeEventLoopFactory *const pi2 =
2905 event_loop_factory.GetNodeEventLoopFactory("pi2");
2906 const size_t pi2_index = configuration::GetNodeIndex(
2907 event_loop_factory.configuration(), pi2->node());
2908 NodeEventLoopFactory *const pi3 =
2909 event_loop_factory.GetNodeEventLoopFactory("pi3");
2910 const size_t pi3_index = configuration::GetNodeIndex(
2911 event_loop_factory.configuration(), pi3->node());
2912
2913 const std::string kLogfile1_1 =
2914 aos::testing::TestTmpDir() + "/multi_logfile1/";
2915 const std::string kLogfile2_1 =
2916 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
2917 const std::string kLogfile2_2 =
2918 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
2919 const std::string kLogfile3_1 =
2920 aos::testing::TestTmpDir() + "/multi_logfile3/";
2921 util::UnlinkRecursive(kLogfile1_1);
2922 util::UnlinkRecursive(kLogfile2_1);
2923 util::UnlinkRecursive(kLogfile2_2);
2924 util::UnlinkRecursive(kLogfile3_1);
2925 const UUID pi1_boot0 = UUID::Random();
2926 const UUID pi2_boot0 = UUID::Random();
2927 const UUID pi2_boot1 = UUID::Random();
2928 const UUID pi3_boot0 = UUID::Random();
2929 {
2930 CHECK_EQ(pi1_index, 0u);
2931 CHECK_EQ(pi2_index, 1u);
2932 CHECK_EQ(pi3_index, 2u);
2933
2934 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
2935 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
2936 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
2937 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
2938
2939 time_converter.AddNextTimestamp(
2940 distributed_clock::epoch(),
2941 {BootTimestamp::epoch(), BootTimestamp::epoch(),
2942 BootTimestamp::epoch()});
2943 const chrono::nanoseconds reboot_time = chrono::milliseconds(20000);
2944 time_converter.AddNextTimestamp(
2945 distributed_clock::epoch() + reboot_time,
2946 {BootTimestamp::epoch() + reboot_time,
2947 BootTimestamp{
2948 .boot = 1,
2949 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)},
2950 BootTimestamp::epoch() + reboot_time});
2951 }
2952
2953 // Make everything perfectly quiet.
2954 event_loop_factory.SkipTimingReport();
2955 event_loop_factory.DisableStatistics();
2956
2957 std::vector<std::string> filenames;
2958 {
2959 LoggerState pi1_logger = MakeLoggerState(
2960 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2961 LoggerState pi3_logger = MakeLoggerState(
2962 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2963 {
2964 // And now start the logger.
2965 LoggerState pi2_logger = MakeLoggerState(
2966 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2967
2968 event_loop_factory.RunFor(chrono::milliseconds(1000));
2969
2970 pi1_logger.StartLogger(kLogfile1_1);
2971 pi3_logger.StartLogger(kLogfile3_1);
2972 pi2_logger.StartLogger(kLogfile2_1);
2973
2974 event_loop_factory.RunFor(chrono::milliseconds(10000));
2975
2976 // Now that we've got a start time in the past, turn on data.
2977 event_loop_factory.EnableStatistics();
2978 std::unique_ptr<aos::EventLoop> ping_event_loop =
2979 pi1->MakeEventLoop("ping");
2980 Ping ping(ping_event_loop.get());
2981
2982 pi2->AlwaysStart<Pong>("pong");
2983
2984 event_loop_factory.RunFor(chrono::milliseconds(3000));
2985
2986 pi2_logger.AppendAllFilenames(&filenames);
2987
2988 // Stop logging on pi2 before rebooting and completely shut off all
2989 // messages on pi2.
2990 pi2->DisableStatistics();
2991 pi1->Disconnect(pi2->node());
2992 pi2->Disconnect(pi1->node());
2993 }
2994 event_loop_factory.RunFor(chrono::milliseconds(7000));
2995 // pi2 now reboots.
2996 {
2997 event_loop_factory.RunFor(chrono::milliseconds(1000));
2998
2999 // Start logging again on pi2 after it is up.
3000 LoggerState pi2_logger = MakeLoggerState(
3001 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3002 pi2_logger.StartLogger(kLogfile2_2);
3003
3004 event_loop_factory.RunFor(chrono::milliseconds(10000));
3005 // And, now that we have a start time in the log, turn data back on.
3006 pi2->EnableStatistics();
3007 pi1->Connect(pi2->node());
3008 pi2->Connect(pi1->node());
3009
3010 pi2->AlwaysStart<Pong>("pong");
3011 std::unique_ptr<aos::EventLoop> ping_event_loop =
3012 pi1->MakeEventLoop("ping");
3013 Ping ping(ping_event_loop.get());
3014
3015 event_loop_factory.RunFor(chrono::milliseconds(3000));
3016
3017 pi2_logger.AppendAllFilenames(&filenames);
3018 }
3019
3020 pi1_logger.AppendAllFilenames(&filenames);
3021 pi3_logger.AppendAllFilenames(&filenames);
3022 }
3023
3024 // Confirm that we can parse the result. LogReader has enough internal CHECKs
3025 // to confirm the right thing happened.
3026 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3027 auto result = ConfirmReadable(filenames);
3028 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
3029 chrono::seconds(1)));
3030 EXPECT_THAT(result[0].second,
3031 ::testing::ElementsAre(realtime_clock::epoch() +
3032 chrono::microseconds(34990350)));
3033
3034 EXPECT_THAT(result[1].first,
3035 ::testing::ElementsAre(
3036 realtime_clock::epoch() + chrono::seconds(1),
3037 realtime_clock::epoch() + chrono::microseconds(3323000)));
3038 EXPECT_THAT(result[1].second,
3039 ::testing::ElementsAre(
3040 realtime_clock::epoch() + chrono::microseconds(13990200),
3041 realtime_clock::epoch() + chrono::microseconds(16313200)));
3042
3043 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
3044 chrono::seconds(1)));
3045 EXPECT_THAT(result[2].second,
3046 ::testing::ElementsAre(realtime_clock::epoch() +
3047 chrono::microseconds(34900150)));
3048}
3049
3050// Tests that local data before remote data after reboot is properly replayed.
3051// We only trigger a reboot in the timestamp interpolation function when solving
3052// the timestamp problem when we actually have a point in the function. This
3053// originally only happened when a point passes the noncausal filter. At the
3054// start of time for the second boot, if we aren't careful, we will have
3055// messages which need to be published at times before the boot. This happens
3056// when a local message is in the log before a forwarded message, so there is no
3057// point in the interpolation function. This delays the reboot. So, we need to
3058// recreate that situation and make sure it doesn't come back.
3059TEST(MultinodeRebootLoggerTest,
3060 LocalMessageBeforeRemoteBeforeStartAfterReboot) {
3061 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3062 aos::configuration::ReadConfig(ArtifactPath(
3063 "aos/events/logging/multinode_pingpong_split3_config.json"));
3064 message_bridge::TestingTimeConverter time_converter(
3065 configuration::NodesCount(&config.message()));
3066 SimulatedEventLoopFactory event_loop_factory(&config.message());
3067 event_loop_factory.SetTimeConverter(&time_converter);
3068 NodeEventLoopFactory *const pi1 =
3069 event_loop_factory.GetNodeEventLoopFactory("pi1");
3070 const size_t pi1_index = configuration::GetNodeIndex(
3071 event_loop_factory.configuration(), pi1->node());
3072 NodeEventLoopFactory *const pi2 =
3073 event_loop_factory.GetNodeEventLoopFactory("pi2");
3074 const size_t pi2_index = configuration::GetNodeIndex(
3075 event_loop_factory.configuration(), pi2->node());
3076 NodeEventLoopFactory *const pi3 =
3077 event_loop_factory.GetNodeEventLoopFactory("pi3");
3078 const size_t pi3_index = configuration::GetNodeIndex(
3079 event_loop_factory.configuration(), pi3->node());
3080
3081 const std::string kLogfile1_1 =
3082 aos::testing::TestTmpDir() + "/multi_logfile1/";
3083 const std::string kLogfile2_1 =
3084 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3085 const std::string kLogfile2_2 =
3086 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3087 const std::string kLogfile3_1 =
3088 aos::testing::TestTmpDir() + "/multi_logfile3/";
3089 util::UnlinkRecursive(kLogfile1_1);
3090 util::UnlinkRecursive(kLogfile2_1);
3091 util::UnlinkRecursive(kLogfile2_2);
3092 util::UnlinkRecursive(kLogfile3_1);
3093 const UUID pi1_boot0 = UUID::Random();
3094 const UUID pi2_boot0 = UUID::Random();
3095 const UUID pi2_boot1 = UUID::Random();
3096 const UUID pi3_boot0 = UUID::Random();
3097 {
3098 CHECK_EQ(pi1_index, 0u);
3099 CHECK_EQ(pi2_index, 1u);
3100 CHECK_EQ(pi3_index, 2u);
3101
3102 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3103 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3104 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3105 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3106
3107 time_converter.AddNextTimestamp(
3108 distributed_clock::epoch(),
3109 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3110 BootTimestamp::epoch()});
3111 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3112 time_converter.AddNextTimestamp(
3113 distributed_clock::epoch() + reboot_time,
3114 {BootTimestamp::epoch() + reboot_time,
3115 BootTimestamp{.boot = 1,
3116 .time = monotonic_clock::epoch() + reboot_time +
3117 chrono::seconds(100)},
3118 BootTimestamp::epoch() + reboot_time});
3119 }
3120
3121 std::vector<std::string> filenames;
3122 {
3123 LoggerState pi1_logger = MakeLoggerState(
3124 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3125 LoggerState pi3_logger = MakeLoggerState(
3126 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3127 {
3128 // And now start the logger.
3129 LoggerState pi2_logger = MakeLoggerState(
3130 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3131
3132 pi1_logger.StartLogger(kLogfile1_1);
3133 pi3_logger.StartLogger(kLogfile3_1);
3134 pi2_logger.StartLogger(kLogfile2_1);
3135
3136 event_loop_factory.RunFor(chrono::milliseconds(1005));
3137
3138 // Now that we've got a start time in the past, turn on data.
3139 std::unique_ptr<aos::EventLoop> ping_event_loop =
3140 pi1->MakeEventLoop("ping");
3141 Ping ping(ping_event_loop.get());
3142
3143 pi2->AlwaysStart<Pong>("pong");
3144
3145 event_loop_factory.RunFor(chrono::milliseconds(3000));
3146
3147 pi2_logger.AppendAllFilenames(&filenames);
3148
3149 // Disable any remote messages on pi2.
3150 pi1->Disconnect(pi2->node());
3151 pi2->Disconnect(pi1->node());
3152 }
3153 event_loop_factory.RunFor(chrono::milliseconds(995));
3154 // pi2 now reboots at 5 seconds.
3155 {
3156 event_loop_factory.RunFor(chrono::milliseconds(1000));
3157
3158 // Make local stuff happen before we start logging and connect the remote.
3159 pi2->AlwaysStart<Pong>("pong");
3160 std::unique_ptr<aos::EventLoop> ping_event_loop =
3161 pi1->MakeEventLoop("ping");
3162 Ping ping(ping_event_loop.get());
3163 event_loop_factory.RunFor(chrono::milliseconds(1005));
3164
3165 // Start logging again on pi2 after it is up.
3166 LoggerState pi2_logger = MakeLoggerState(
3167 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3168 pi2_logger.StartLogger(kLogfile2_2);
3169
3170 // And allow remote messages now that we have some local ones.
3171 pi1->Connect(pi2->node());
3172 pi2->Connect(pi1->node());
3173
3174 event_loop_factory.RunFor(chrono::milliseconds(1000));
3175
3176 event_loop_factory.RunFor(chrono::milliseconds(3000));
3177
3178 pi2_logger.AppendAllFilenames(&filenames);
3179 }
3180
3181 pi1_logger.AppendAllFilenames(&filenames);
3182 pi3_logger.AppendAllFilenames(&filenames);
3183 }
3184
3185 // Confirm that we can parse the result. LogReader has enough internal CHECKs
3186 // to confirm the right thing happened.
3187 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3188 auto result = ConfirmReadable(filenames);
3189
3190 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3191 EXPECT_THAT(result[0].second,
3192 ::testing::ElementsAre(realtime_clock::epoch() +
3193 chrono::microseconds(11000350)));
3194
3195 EXPECT_THAT(result[1].first,
3196 ::testing::ElementsAre(
3197 realtime_clock::epoch(),
3198 realtime_clock::epoch() + chrono::microseconds(107005000)));
3199 EXPECT_THAT(result[1].second,
3200 ::testing::ElementsAre(
3201 realtime_clock::epoch() + chrono::microseconds(4000150),
3202 realtime_clock::epoch() + chrono::microseconds(111000200)));
3203
3204 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3205 EXPECT_THAT(result[2].second,
3206 ::testing::ElementsAre(realtime_clock::epoch() +
3207 chrono::microseconds(11000150)));
3208
3209 auto start_stop_result = ConfirmReadable(
3210 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3211 realtime_clock::epoch() + chrono::milliseconds(3000));
3212
3213 EXPECT_THAT(
3214 start_stop_result[0].first,
3215 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3216 EXPECT_THAT(
3217 start_stop_result[0].second,
3218 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3219 EXPECT_THAT(
3220 start_stop_result[1].first,
3221 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3222 EXPECT_THAT(
3223 start_stop_result[1].second,
3224 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3225 EXPECT_THAT(
3226 start_stop_result[2].first,
3227 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3228 EXPECT_THAT(
3229 start_stop_result[2].second,
3230 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3231}
3232
3233// Tests that setting the start and stop flags across a reboot works as
3234// expected.
3235TEST(MultinodeRebootLoggerTest, RebootStartStopTimes) {
3236 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3237 aos::configuration::ReadConfig(ArtifactPath(
3238 "aos/events/logging/multinode_pingpong_split3_config.json"));
3239 message_bridge::TestingTimeConverter time_converter(
3240 configuration::NodesCount(&config.message()));
3241 SimulatedEventLoopFactory event_loop_factory(&config.message());
3242 event_loop_factory.SetTimeConverter(&time_converter);
3243 NodeEventLoopFactory *const pi1 =
3244 event_loop_factory.GetNodeEventLoopFactory("pi1");
3245 const size_t pi1_index = configuration::GetNodeIndex(
3246 event_loop_factory.configuration(), pi1->node());
3247 NodeEventLoopFactory *const pi2 =
3248 event_loop_factory.GetNodeEventLoopFactory("pi2");
3249 const size_t pi2_index = configuration::GetNodeIndex(
3250 event_loop_factory.configuration(), pi2->node());
3251 NodeEventLoopFactory *const pi3 =
3252 event_loop_factory.GetNodeEventLoopFactory("pi3");
3253 const size_t pi3_index = configuration::GetNodeIndex(
3254 event_loop_factory.configuration(), pi3->node());
3255
3256 const std::string kLogfile1_1 =
3257 aos::testing::TestTmpDir() + "/multi_logfile1/";
3258 const std::string kLogfile2_1 =
3259 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3260 const std::string kLogfile2_2 =
3261 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3262 const std::string kLogfile3_1 =
3263 aos::testing::TestTmpDir() + "/multi_logfile3/";
3264 util::UnlinkRecursive(kLogfile1_1);
3265 util::UnlinkRecursive(kLogfile2_1);
3266 util::UnlinkRecursive(kLogfile2_2);
3267 util::UnlinkRecursive(kLogfile3_1);
3268 {
3269 CHECK_EQ(pi1_index, 0u);
3270 CHECK_EQ(pi2_index, 1u);
3271 CHECK_EQ(pi3_index, 2u);
3272
3273 time_converter.AddNextTimestamp(
3274 distributed_clock::epoch(),
3275 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3276 BootTimestamp::epoch()});
3277 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3278 time_converter.AddNextTimestamp(
3279 distributed_clock::epoch() + reboot_time,
3280 {BootTimestamp::epoch() + reboot_time,
3281 BootTimestamp{.boot = 1,
3282 .time = monotonic_clock::epoch() + reboot_time},
3283 BootTimestamp::epoch() + reboot_time});
3284 }
3285
3286 std::vector<std::string> filenames;
3287 {
3288 LoggerState pi1_logger = MakeLoggerState(
3289 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3290 LoggerState pi3_logger = MakeLoggerState(
3291 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3292 {
3293 // And now start the logger.
3294 LoggerState pi2_logger = MakeLoggerState(
3295 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3296
3297 pi1_logger.StartLogger(kLogfile1_1);
3298 pi3_logger.StartLogger(kLogfile3_1);
3299 pi2_logger.StartLogger(kLogfile2_1);
3300
3301 event_loop_factory.RunFor(chrono::milliseconds(1005));
3302
3303 // Now that we've got a start time in the past, turn on data.
3304 std::unique_ptr<aos::EventLoop> ping_event_loop =
3305 pi1->MakeEventLoop("ping");
3306 Ping ping(ping_event_loop.get());
3307
3308 pi2->AlwaysStart<Pong>("pong");
3309
3310 event_loop_factory.RunFor(chrono::milliseconds(3000));
3311
3312 pi2_logger.AppendAllFilenames(&filenames);
3313 }
3314 event_loop_factory.RunFor(chrono::milliseconds(995));
3315 // pi2 now reboots at 5 seconds.
3316 {
3317 event_loop_factory.RunFor(chrono::milliseconds(1000));
3318
3319 // Make local stuff happen before we start logging and connect the remote.
3320 pi2->AlwaysStart<Pong>("pong");
3321 std::unique_ptr<aos::EventLoop> ping_event_loop =
3322 pi1->MakeEventLoop("ping");
3323 Ping ping(ping_event_loop.get());
3324 event_loop_factory.RunFor(chrono::milliseconds(5));
3325
3326 // Start logging again on pi2 after it is up.
3327 LoggerState pi2_logger = MakeLoggerState(
3328 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3329 pi2_logger.StartLogger(kLogfile2_2);
3330
3331 event_loop_factory.RunFor(chrono::milliseconds(5000));
3332
3333 pi2_logger.AppendAllFilenames(&filenames);
3334 }
3335
3336 pi1_logger.AppendAllFilenames(&filenames);
3337 pi3_logger.AppendAllFilenames(&filenames);
3338 }
3339
3340 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3341 auto result = ConfirmReadable(filenames);
3342
3343 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3344 EXPECT_THAT(result[0].second,
3345 ::testing::ElementsAre(realtime_clock::epoch() +
3346 chrono::microseconds(11000350)));
3347
3348 EXPECT_THAT(result[1].first,
3349 ::testing::ElementsAre(
3350 realtime_clock::epoch(),
3351 realtime_clock::epoch() + chrono::microseconds(6005000)));
3352 EXPECT_THAT(result[1].second,
3353 ::testing::ElementsAre(
3354 realtime_clock::epoch() + chrono::microseconds(4900150),
3355 realtime_clock::epoch() + chrono::microseconds(11000200)));
3356
3357 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3358 EXPECT_THAT(result[2].second,
3359 ::testing::ElementsAre(realtime_clock::epoch() +
3360 chrono::microseconds(11000150)));
3361
3362 // Confirm we observed the correct start and stop times. We should see the
3363 // reboot here.
3364 auto start_stop_result = ConfirmReadable(
3365 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3366 realtime_clock::epoch() + chrono::milliseconds(8000));
3367
3368 EXPECT_THAT(
3369 start_stop_result[0].first,
3370 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3371 EXPECT_THAT(
3372 start_stop_result[0].second,
3373 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3374 EXPECT_THAT(start_stop_result[1].first,
3375 ::testing::ElementsAre(
3376 realtime_clock::epoch() + chrono::seconds(2),
3377 realtime_clock::epoch() + chrono::microseconds(6005000)));
3378 EXPECT_THAT(start_stop_result[1].second,
3379 ::testing::ElementsAre(
3380 realtime_clock::epoch() + chrono::microseconds(4900150),
3381 realtime_clock::epoch() + chrono::seconds(8)));
3382 EXPECT_THAT(
3383 start_stop_result[2].first,
3384 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3385 EXPECT_THAT(
3386 start_stop_result[2].second,
3387 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3388}
3389
3390// Tests that we properly handle one direction being down.
3391TEST(MissingDirectionTest, OneDirection) {
3392 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3393 aos::configuration::ReadConfig(ArtifactPath(
3394 "aos/events/logging/multinode_pingpong_split4_config.json"));
3395 message_bridge::TestingTimeConverter time_converter(
3396 configuration::NodesCount(&config.message()));
3397 SimulatedEventLoopFactory event_loop_factory(&config.message());
3398 event_loop_factory.SetTimeConverter(&time_converter);
3399
3400 NodeEventLoopFactory *const pi1 =
3401 event_loop_factory.GetNodeEventLoopFactory("pi1");
3402 const size_t pi1_index = configuration::GetNodeIndex(
3403 event_loop_factory.configuration(), pi1->node());
3404 NodeEventLoopFactory *const pi2 =
3405 event_loop_factory.GetNodeEventLoopFactory("pi2");
3406 const size_t pi2_index = configuration::GetNodeIndex(
3407 event_loop_factory.configuration(), pi2->node());
3408 std::vector<std::string> filenames;
3409
3410 {
3411 CHECK_EQ(pi1_index, 0u);
3412 CHECK_EQ(pi2_index, 1u);
3413
3414 time_converter.AddNextTimestamp(
3415 distributed_clock::epoch(),
3416 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3417
3418 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3419 time_converter.AddNextTimestamp(
3420 distributed_clock::epoch() + reboot_time,
3421 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3422 BootTimestamp::epoch() + reboot_time});
3423 }
3424
3425 const std::string kLogfile2_1 =
3426 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3427 const std::string kLogfile1_1 =
3428 aos::testing::TestTmpDir() + "/multi_logfile1.1/";
3429 util::UnlinkRecursive(kLogfile2_1);
3430 util::UnlinkRecursive(kLogfile1_1);
3431
3432 pi2->Disconnect(pi1->node());
3433
3434 pi1->AlwaysStart<Ping>("ping");
3435 pi2->AlwaysStart<Pong>("pong");
3436
3437 {
3438 LoggerState pi2_logger = MakeLoggerState(
3439 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3440
3441 event_loop_factory.RunFor(chrono::milliseconds(95));
3442
3443 pi2_logger.StartLogger(kLogfile2_1);
3444
3445 event_loop_factory.RunFor(chrono::milliseconds(6000));
3446
3447 pi2->Connect(pi1->node());
3448
3449 LoggerState pi1_logger = MakeLoggerState(
3450 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3451 pi1_logger.StartLogger(kLogfile1_1);
3452
3453 event_loop_factory.RunFor(chrono::milliseconds(5000));
3454 pi1_logger.AppendAllFilenames(&filenames);
3455 pi2_logger.AppendAllFilenames(&filenames);
3456 }
3457
3458 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3459 ConfirmReadable(filenames);
3460}
3461
3462// Tests that we properly handle only one direction ever existing after a
3463// reboot.
3464TEST(MissingDirectionTest, OneDirectionAfterReboot) {
3465 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3466 aos::configuration::ReadConfig(ArtifactPath(
3467 "aos/events/logging/multinode_pingpong_split4_config.json"));
3468 message_bridge::TestingTimeConverter time_converter(
3469 configuration::NodesCount(&config.message()));
3470 SimulatedEventLoopFactory event_loop_factory(&config.message());
3471 event_loop_factory.SetTimeConverter(&time_converter);
3472
3473 NodeEventLoopFactory *const pi1 =
3474 event_loop_factory.GetNodeEventLoopFactory("pi1");
3475 const size_t pi1_index = configuration::GetNodeIndex(
3476 event_loop_factory.configuration(), pi1->node());
3477 NodeEventLoopFactory *const pi2 =
3478 event_loop_factory.GetNodeEventLoopFactory("pi2");
3479 const size_t pi2_index = configuration::GetNodeIndex(
3480 event_loop_factory.configuration(), pi2->node());
3481 std::vector<std::string> filenames;
3482
3483 {
3484 CHECK_EQ(pi1_index, 0u);
3485 CHECK_EQ(pi2_index, 1u);
3486
3487 time_converter.AddNextTimestamp(
3488 distributed_clock::epoch(),
3489 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3490
3491 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3492 time_converter.AddNextTimestamp(
3493 distributed_clock::epoch() + reboot_time,
3494 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3495 BootTimestamp::epoch() + reboot_time});
3496 }
3497
3498 const std::string kLogfile2_1 =
3499 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3500 util::UnlinkRecursive(kLogfile2_1);
3501
3502 pi1->AlwaysStart<Ping>("ping");
3503
3504 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3505 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3506 // second boot.
3507 {
3508 LoggerState pi2_logger = MakeLoggerState(
3509 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3510
3511 event_loop_factory.RunFor(chrono::milliseconds(95));
3512
3513 pi2_logger.StartLogger(kLogfile2_1);
3514
3515 event_loop_factory.RunFor(chrono::milliseconds(4000));
3516
3517 pi2->Disconnect(pi1->node());
3518
3519 event_loop_factory.RunFor(chrono::milliseconds(1000));
3520 pi1->AlwaysStart<Ping>("ping");
3521
3522 event_loop_factory.RunFor(chrono::milliseconds(5000));
3523 pi2_logger.AppendAllFilenames(&filenames);
3524 }
3525
3526 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3527 ConfirmReadable(filenames);
3528}
3529
3530// Tests that we properly handle only one direction ever existing after a reboot
3531// with only reliable data.
3532TEST(MissingDirectionTest, OneDirectionAfterRebootReliable) {
3533 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3534 aos::configuration::ReadConfig(ArtifactPath(
3535 "aos/events/logging/multinode_pingpong_split4_reliable_config.json"));
3536 message_bridge::TestingTimeConverter time_converter(
3537 configuration::NodesCount(&config.message()));
3538 SimulatedEventLoopFactory event_loop_factory(&config.message());
3539 event_loop_factory.SetTimeConverter(&time_converter);
3540
3541 NodeEventLoopFactory *const pi1 =
3542 event_loop_factory.GetNodeEventLoopFactory("pi1");
3543 const size_t pi1_index = configuration::GetNodeIndex(
3544 event_loop_factory.configuration(), pi1->node());
3545 NodeEventLoopFactory *const pi2 =
3546 event_loop_factory.GetNodeEventLoopFactory("pi2");
3547 const size_t pi2_index = configuration::GetNodeIndex(
3548 event_loop_factory.configuration(), pi2->node());
3549 std::vector<std::string> filenames;
3550
3551 {
3552 CHECK_EQ(pi1_index, 0u);
3553 CHECK_EQ(pi2_index, 1u);
3554
3555 time_converter.AddNextTimestamp(
3556 distributed_clock::epoch(),
3557 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3558
3559 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3560 time_converter.AddNextTimestamp(
3561 distributed_clock::epoch() + reboot_time,
3562 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3563 BootTimestamp::epoch() + reboot_time});
3564 }
3565
3566 const std::string kLogfile2_1 =
3567 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3568 util::UnlinkRecursive(kLogfile2_1);
3569
3570 pi1->AlwaysStart<Ping>("ping");
3571
3572 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3573 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3574 // second boot.
3575 {
3576 LoggerState pi2_logger = MakeLoggerState(
3577 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3578
3579 event_loop_factory.RunFor(chrono::milliseconds(95));
3580
3581 pi2_logger.StartLogger(kLogfile2_1);
3582
3583 event_loop_factory.RunFor(chrono::milliseconds(4000));
3584
3585 pi2->Disconnect(pi1->node());
3586
3587 event_loop_factory.RunFor(chrono::milliseconds(1000));
3588 pi1->AlwaysStart<Ping>("ping");
3589
3590 event_loop_factory.RunFor(chrono::milliseconds(5000));
3591 pi2_logger.AppendAllFilenames(&filenames);
3592 }
3593
3594 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3595 ConfirmReadable(filenames);
3596}
3597
Brian Smartte67d7112023-03-20 12:06:30 -07003598// Tests that we properly handle only one direction ever existing after a reboot
3599// with mixed unreliable vs reliable, where reliable has an earlier timestamp
3600// than unreliable.
3601TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase1) {
3602 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3603 aos::configuration::ReadConfig(ArtifactPath(
3604 "aos/events/logging/multinode_pingpong_split4_mixed1_config.json"));
3605 message_bridge::TestingTimeConverter time_converter(
3606 configuration::NodesCount(&config.message()));
3607 SimulatedEventLoopFactory event_loop_factory(&config.message());
3608 event_loop_factory.SetTimeConverter(&time_converter);
3609
3610 NodeEventLoopFactory *const pi1 =
3611 event_loop_factory.GetNodeEventLoopFactory("pi1");
3612 const size_t pi1_index = configuration::GetNodeIndex(
3613 event_loop_factory.configuration(), pi1->node());
3614 NodeEventLoopFactory *const pi2 =
3615 event_loop_factory.GetNodeEventLoopFactory("pi2");
3616 const size_t pi2_index = configuration::GetNodeIndex(
3617 event_loop_factory.configuration(), pi2->node());
3618 std::vector<std::string> filenames;
3619
3620 {
3621 CHECK_EQ(pi1_index, 0u);
3622 CHECK_EQ(pi2_index, 1u);
3623
3624 time_converter.AddNextTimestamp(
3625 distributed_clock::epoch(),
3626 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3627
3628 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3629 time_converter.AddNextTimestamp(
3630 distributed_clock::epoch() + reboot_time,
3631 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3632 BootTimestamp::epoch() + reboot_time});
3633 }
3634
3635 const std::string kLogfile2_1 =
3636 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3637 util::UnlinkRecursive(kLogfile2_1);
3638
3639 // The following sequence using the above reference config creates
3640 // a reliable message timestamp < unreliable message timestamp.
3641 {
3642 pi1->DisableStatistics();
3643 pi2->DisableStatistics();
3644
3645 event_loop_factory.RunFor(chrono::milliseconds(95));
3646
3647 pi1->AlwaysStart<Ping>("ping");
3648
3649 event_loop_factory.RunFor(chrono::milliseconds(5250));
3650
3651 pi1->EnableStatistics();
3652
3653 event_loop_factory.RunFor(chrono::milliseconds(1000));
3654
3655 LoggerState pi2_logger = MakeLoggerState(
3656 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3657
3658 pi2_logger.StartLogger(kLogfile2_1);
3659
3660 event_loop_factory.RunFor(chrono::milliseconds(5000));
3661 pi2_logger.AppendAllFilenames(&filenames);
3662 }
3663
3664 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3665 ConfirmReadable(filenames);
3666}
3667
3668// Tests that we properly handle only one direction ever existing after a reboot
3669// with mixed unreliable vs reliable, where unreliable has an earlier timestamp
3670// than reliable.
3671TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase2) {
3672 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3673 aos::configuration::ReadConfig(ArtifactPath(
3674 "aos/events/logging/multinode_pingpong_split4_mixed2_config.json"));
3675 message_bridge::TestingTimeConverter time_converter(
3676 configuration::NodesCount(&config.message()));
3677 SimulatedEventLoopFactory event_loop_factory(&config.message());
3678 event_loop_factory.SetTimeConverter(&time_converter);
3679
3680 NodeEventLoopFactory *const pi1 =
3681 event_loop_factory.GetNodeEventLoopFactory("pi1");
3682 const size_t pi1_index = configuration::GetNodeIndex(
3683 event_loop_factory.configuration(), pi1->node());
3684 NodeEventLoopFactory *const pi2 =
3685 event_loop_factory.GetNodeEventLoopFactory("pi2");
3686 const size_t pi2_index = configuration::GetNodeIndex(
3687 event_loop_factory.configuration(), pi2->node());
3688 std::vector<std::string> filenames;
3689
3690 {
3691 CHECK_EQ(pi1_index, 0u);
3692 CHECK_EQ(pi2_index, 1u);
3693
3694 time_converter.AddNextTimestamp(
3695 distributed_clock::epoch(),
3696 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3697
3698 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3699 time_converter.AddNextTimestamp(
3700 distributed_clock::epoch() + reboot_time,
3701 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3702 BootTimestamp::epoch() + reboot_time});
3703 }
3704
3705 const std::string kLogfile2_1 =
3706 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3707 util::UnlinkRecursive(kLogfile2_1);
3708
3709 // The following sequence using the above reference config creates
3710 // an unreliable message timestamp < reliable message timestamp.
3711 {
3712 pi1->DisableStatistics();
3713 pi2->DisableStatistics();
3714
3715 event_loop_factory.RunFor(chrono::milliseconds(95));
3716
3717 pi1->AlwaysStart<Ping>("ping");
3718
3719 event_loop_factory.RunFor(chrono::milliseconds(5250));
3720
3721 pi1->EnableStatistics();
3722
3723 event_loop_factory.RunFor(chrono::milliseconds(1000));
3724
3725 LoggerState pi2_logger = MakeLoggerState(
3726 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3727
3728 pi2_logger.StartLogger(kLogfile2_1);
3729
3730 event_loop_factory.RunFor(chrono::milliseconds(5000));
3731 pi2_logger.AppendAllFilenames(&filenames);
3732 }
3733
3734 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3735 ConfirmReadable(filenames);
3736}
3737
Naman Guptaa63aa132023-03-22 20:06:34 -07003738// Tests that we properly handle what used to be a time violation in one
3739// direction. This can occur when one direction goes down after sending some
3740// data, but the other keeps working. The down direction ends up resolving to a
3741// straight line in the noncausal filter, where the direction which is still up
3742// can cross that line. Really, time progressed along just fine but we assumed
3743// that the offset was a line when it could have deviated by up to 1ms/second.
3744TEST_P(MultinodeLoggerTest, OneDirectionTimeDrift) {
3745 std::vector<std::string> filenames;
3746
3747 CHECK_EQ(pi1_index_, 0u);
3748 CHECK_EQ(pi2_index_, 1u);
3749
3750 time_converter_.AddNextTimestamp(
3751 distributed_clock::epoch(),
3752 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3753
3754 const chrono::nanoseconds before_disconnect_duration =
3755 time_converter_.AddMonotonic(
3756 {chrono::milliseconds(1000), chrono::milliseconds(1000)});
3757
3758 const chrono::nanoseconds test_duration =
3759 time_converter_.AddMonotonic(
3760 {chrono::milliseconds(1000), chrono::milliseconds(1000)}) +
3761 time_converter_.AddMonotonic(
3762 {chrono::milliseconds(10000),
3763 chrono::milliseconds(10000) - chrono::milliseconds(5)}) +
3764 time_converter_.AddMonotonic(
3765 {chrono::milliseconds(10000),
3766 chrono::milliseconds(10000) + chrono::milliseconds(5)});
3767
3768 const std::string kLogfile =
3769 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3770 util::UnlinkRecursive(kLogfile);
3771
3772 {
3773 LoggerState pi2_logger = MakeLogger(pi2_);
3774 pi2_logger.StartLogger(kLogfile);
3775 event_loop_factory_.RunFor(before_disconnect_duration);
3776
3777 pi2_->Disconnect(pi1_->node());
3778
3779 event_loop_factory_.RunFor(test_duration);
3780 pi2_->Connect(pi1_->node());
3781
3782 event_loop_factory_.RunFor(chrono::milliseconds(5000));
3783 pi2_logger.AppendAllFilenames(&filenames);
3784 }
3785
3786 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3787 ConfirmReadable(filenames);
3788}
3789
3790// Tests that we can replay a logfile that has timestamps such that at least one
3791// node's epoch is at a positive distributed_clock (and thus will have to be
3792// booted after the other node(s)).
3793TEST_P(MultinodeLoggerTest, StartOneNodeBeforeOther) {
3794 std::vector<std::string> filenames;
3795
3796 CHECK_EQ(pi1_index_, 0u);
3797 CHECK_EQ(pi2_index_, 1u);
3798
3799 time_converter_.AddNextTimestamp(
3800 distributed_clock::epoch(),
3801 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3802
3803 const chrono::nanoseconds before_reboot_duration = chrono::milliseconds(1000);
3804 time_converter_.RebootAt(
3805 0, distributed_clock::time_point(before_reboot_duration));
3806
3807 const chrono::nanoseconds test_duration = time_converter_.AddMonotonic(
3808 {chrono::milliseconds(10000), chrono::milliseconds(10000)});
3809
3810 const std::string kLogfile =
3811 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3812 util::UnlinkRecursive(kLogfile);
3813
3814 pi2_->Disconnect(pi1_->node());
3815 pi1_->Disconnect(pi2_->node());
3816
3817 {
3818 LoggerState pi2_logger = MakeLogger(pi2_);
3819
3820 pi2_logger.StartLogger(kLogfile);
3821 event_loop_factory_.RunFor(before_reboot_duration);
3822
3823 pi2_->Connect(pi1_->node());
3824 pi1_->Connect(pi2_->node());
3825
3826 event_loop_factory_.RunFor(test_duration);
3827
3828 pi2_logger.AppendAllFilenames(&filenames);
3829 }
3830
3831 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3832 ConfirmReadable(filenames);
3833
3834 {
3835 LogReader reader(sorted_parts);
3836 SimulatedEventLoopFactory replay_factory(reader.configuration());
3837 reader.RegisterWithoutStarting(&replay_factory);
3838
3839 NodeEventLoopFactory *const replay_node =
3840 reader.event_loop_factory()->GetNodeEventLoopFactory("pi1");
3841
3842 std::unique_ptr<EventLoop> test_event_loop =
3843 replay_node->MakeEventLoop("test_reader");
3844 replay_node->OnStartup([replay_node]() {
3845 // Check that we didn't boot until at least t=0.
3846 CHECK_LE(monotonic_clock::epoch(), replay_node->monotonic_now());
3847 });
3848 test_event_loop->OnRun([&test_event_loop]() {
3849 // Check that we didn't boot until at least t=0.
3850 EXPECT_LE(monotonic_clock::epoch(), test_event_loop->monotonic_now());
3851 });
3852 reader.event_loop_factory()->Run();
3853 reader.Deregister();
3854 }
3855}
3856
3857// Tests that when we have a loop without all the logs at all points in time, we
3858// can sort it properly.
3859TEST(MultinodeLoggerLoopTest, Loop) {
3860 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3861 aos::configuration::ReadConfig(ArtifactPath(
3862 "aos/events/logging/multinode_pingpong_triangle_split_config.json"));
3863 message_bridge::TestingTimeConverter time_converter(
3864 configuration::NodesCount(&config.message()));
3865 SimulatedEventLoopFactory event_loop_factory(&config.message());
3866 event_loop_factory.SetTimeConverter(&time_converter);
3867
3868 NodeEventLoopFactory *const pi1 =
3869 event_loop_factory.GetNodeEventLoopFactory("pi1");
3870 NodeEventLoopFactory *const pi2 =
3871 event_loop_factory.GetNodeEventLoopFactory("pi2");
3872 NodeEventLoopFactory *const pi3 =
3873 event_loop_factory.GetNodeEventLoopFactory("pi3");
3874
3875 const std::string kLogfile1_1 =
3876 aos::testing::TestTmpDir() + "/multi_logfile1/";
3877 const std::string kLogfile2_1 =
3878 aos::testing::TestTmpDir() + "/multi_logfile2/";
3879 const std::string kLogfile3_1 =
3880 aos::testing::TestTmpDir() + "/multi_logfile3/";
3881 util::UnlinkRecursive(kLogfile1_1);
3882 util::UnlinkRecursive(kLogfile2_1);
3883 util::UnlinkRecursive(kLogfile3_1);
3884
3885 {
3886 // Make pi1 boot before everything else.
3887 time_converter.AddNextTimestamp(
3888 distributed_clock::epoch(),
3889 {BootTimestamp::epoch(),
3890 BootTimestamp::epoch() - chrono::milliseconds(100),
3891 BootTimestamp::epoch() - chrono::milliseconds(300)});
3892 }
3893
3894 // We want to setup a situation such that 2 of the 3 legs of the loop are very
3895 // confident about time being X, and the third leg is pulling the average off
3896 // to one side.
3897 //
3898 // It's easiest to visualize this in timestamp_plotter.
3899
3900 std::vector<std::string> filenames;
3901 {
3902 // Have pi1 send out a reliable message at startup. This sets up a long
3903 // forwarding time message at the start to bias time.
3904 std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
3905 {
3906 aos::Sender<examples::Ping> ping_sender =
3907 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
3908
3909 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
3910 examples::Ping::Builder ping_builder =
3911 builder.MakeBuilder<examples::Ping>();
3912 CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
3913 }
3914
3915 // Wait a while so there's enough data to let the worst case be rather off.
3916 event_loop_factory.RunFor(chrono::seconds(1000));
3917
3918 // Now start a receiving node first. This sets up 2 tight bounds between 2
3919 // of the nodes.
3920 LoggerState pi2_logger = MakeLoggerState(
3921 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3922 pi2_logger.StartLogger(kLogfile2_1);
3923
3924 event_loop_factory.RunFor(chrono::seconds(100));
3925
3926 // And now start the third leg.
3927 LoggerState pi3_logger = MakeLoggerState(
3928 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3929 pi3_logger.StartLogger(kLogfile3_1);
3930
3931 LoggerState pi1_logger = MakeLoggerState(
3932 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3933 pi1_logger.StartLogger(kLogfile1_1);
3934
3935 event_loop_factory.RunFor(chrono::seconds(100));
3936
3937 pi1_logger.AppendAllFilenames(&filenames);
3938 pi2_logger.AppendAllFilenames(&filenames);
3939 pi3_logger.AppendAllFilenames(&filenames);
3940 }
3941
3942 // Make sure we can read this.
3943 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3944 auto result = ConfirmReadable(filenames);
3945}
3946
Austin Schuh08dba8f2023-05-01 08:29:30 -07003947// Tests that RestartLogging works in the simple case. Unfortunately, the
3948// failure cases involve simulating time elapsing in callbacks, which is really
3949// hard. The best we can reasonably do is make sure 2 back to back logs are
3950// parseable together.
3951TEST_P(MultinodeLoggerTest, RestartLogging) {
3952 time_converter_.AddMonotonic(
3953 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
3954 std::vector<std::string> filenames;
3955 {
3956 LoggerState pi1_logger = MakeLogger(pi1_);
3957
3958 event_loop_factory_.RunFor(chrono::milliseconds(95));
3959
3960 StartLogger(&pi1_logger, logfile_base1_);
3961 aos::monotonic_clock::time_point last_rotation_time =
3962 pi1_logger.event_loop->monotonic_now();
3963 pi1_logger.logger->set_on_logged_period([&] {
3964 const auto now = pi1_logger.event_loop->monotonic_now();
3965 if (now > last_rotation_time + std::chrono::seconds(5)) {
3966 pi1_logger.AppendAllFilenames(&filenames);
3967 std::unique_ptr<MultiNodeFilesLogNamer> namer =
3968 pi1_logger.MakeLogNamer(logfile_base2_);
3969 pi1_logger.log_namer = namer.get();
3970
3971 pi1_logger.logger->RestartLogging(std::move(namer));
3972 last_rotation_time = now;
3973 }
3974 });
3975
3976 event_loop_factory_.RunFor(chrono::milliseconds(7000));
3977
3978 pi1_logger.AppendAllFilenames(&filenames);
3979 }
3980
3981 for (const auto &x : filenames) {
3982 LOG(INFO) << x;
3983 }
3984
3985 EXPECT_GE(filenames.size(), 2u);
3986
3987 ConfirmReadable(filenames);
3988
3989 // TODO(austin): It would be good to confirm that any one time messages end up
3990 // in both logs correctly.
3991}
3992
Naman Guptaa63aa132023-03-22 20:06:34 -07003993} // namespace testing
3994} // namespace logger
3995} // namespace aos