blob: f9eb11ec06f85a4d42c2f662f87d10d9b8df326b [file] [log] [blame]
Brian Smartte67d7112023-03-20 12:06:30 -07001#include <algorithm>
2
Naman Guptaa63aa132023-03-22 20:06:34 -07003#include "aos/events/logging/log_reader.h"
4#include "aos/events/logging/multinode_logger_test_lib.h"
5#include "aos/events/message_counter.h"
6#include "aos/events/ping_lib.h"
7#include "aos/events/pong_lib.h"
8#include "aos/network/remote_message_generated.h"
9#include "aos/network/timestamp_generated.h"
10#include "aos/testing/tmpdir.h"
11#include "gmock/gmock.h"
12#include "gtest/gtest.h"
13
14namespace aos {
15namespace logger {
16namespace testing {
17
18namespace chrono = std::chrono;
19using aos::message_bridge::RemoteMessage;
20using aos::testing::ArtifactPath;
21using aos::testing::MessageCounter;
22
Naman Guptaa63aa132023-03-22 20:06:34 -070023INSTANTIATE_TEST_SUITE_P(
24 All, MultinodeLoggerTest,
25 ::testing::Combine(
26 ::testing::Values(
27 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080028 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070029 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080030 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070031 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
32
33INSTANTIATE_TEST_SUITE_P(
34 All, MultinodeLoggerDeathTest,
35 ::testing::Combine(
36 ::testing::Values(
37 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080038 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070039 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080040 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070041 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
42
43// Tests that we can write and read simple multi-node log files.
44TEST_P(MultinodeLoggerTest, SimpleMultiNode) {
45 std::vector<std::string> actual_filenames;
46 time_converter_.StartEqual();
47
48 {
49 LoggerState pi1_logger = MakeLogger(pi1_);
50 LoggerState pi2_logger = MakeLogger(pi2_);
51
52 event_loop_factory_.RunFor(chrono::milliseconds(95));
53
54 StartLogger(&pi1_logger);
55 StartLogger(&pi2_logger);
56
57 event_loop_factory_.RunFor(chrono::milliseconds(20000));
58 pi1_logger.AppendAllFilenames(&actual_filenames);
59 pi2_logger.AppendAllFilenames(&actual_filenames);
60 }
61
62 ASSERT_THAT(actual_filenames,
63 ::testing::UnorderedElementsAreArray(logfiles_));
64
65 {
66 std::set<std::string> logfile_uuids;
67 std::set<std::string> parts_uuids;
68 // Confirm that we have the expected number of UUIDs for both the logfile
69 // UUIDs and parts UUIDs.
70 std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
71 for (std::string_view f : logfiles_) {
72 log_header.emplace_back(ReadHeader(f).value());
73 if (!log_header.back().message().has_configuration()) {
74 logfile_uuids.insert(
75 log_header.back().message().log_event_uuid()->str());
76 parts_uuids.insert(log_header.back().message().parts_uuid()->str());
77 }
78 }
79
80 EXPECT_EQ(logfile_uuids.size(), 2u);
81 if (shared()) {
82 EXPECT_EQ(parts_uuids.size(), 7u);
83 } else {
84 EXPECT_EQ(parts_uuids.size(), 8u);
85 }
86
87 // And confirm everything is on the correct node.
88 EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
89 EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi1");
90 EXPECT_EQ(log_header[4].message().node()->name()->string_view(), "pi1");
91
92 EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
93 EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi2");
94
95 EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi2");
96 EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi2");
97 EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi2");
98
99 EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi1");
100 EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi1");
101
102 EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
103 EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
104
105 if (shared()) {
106 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
107 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi2");
108 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi2");
109
110 EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi1");
111 EXPECT_EQ(log_header[18].message().node()->name()->string_view(), "pi1");
112 } else {
113 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
114 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi2");
115
116 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi1");
117 EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi1");
118
119 EXPECT_EQ(log_header[18].message().node()->name()->string_view(), "pi2");
120 EXPECT_EQ(log_header[19].message().node()->name()->string_view(), "pi2");
121 }
122
123 // And the parts index matches.
124 EXPECT_EQ(log_header[2].message().parts_index(), 0);
125 EXPECT_EQ(log_header[3].message().parts_index(), 1);
126 EXPECT_EQ(log_header[4].message().parts_index(), 2);
127
128 EXPECT_EQ(log_header[5].message().parts_index(), 0);
129 EXPECT_EQ(log_header[6].message().parts_index(), 1);
130
131 EXPECT_EQ(log_header[7].message().parts_index(), 0);
132 EXPECT_EQ(log_header[8].message().parts_index(), 1);
133 EXPECT_EQ(log_header[9].message().parts_index(), 2);
134
135 EXPECT_EQ(log_header[10].message().parts_index(), 0);
136 EXPECT_EQ(log_header[11].message().parts_index(), 1);
137
138 EXPECT_EQ(log_header[12].message().parts_index(), 0);
139 EXPECT_EQ(log_header[13].message().parts_index(), 1);
140
141 if (shared()) {
142 EXPECT_EQ(log_header[14].message().parts_index(), 0);
143 EXPECT_EQ(log_header[15].message().parts_index(), 1);
144 EXPECT_EQ(log_header[16].message().parts_index(), 2);
145
146 EXPECT_EQ(log_header[17].message().parts_index(), 0);
147 EXPECT_EQ(log_header[18].message().parts_index(), 1);
148 } else {
149 EXPECT_EQ(log_header[14].message().parts_index(), 0);
150 EXPECT_EQ(log_header[15].message().parts_index(), 1);
151
152 EXPECT_EQ(log_header[16].message().parts_index(), 0);
153 EXPECT_EQ(log_header[17].message().parts_index(), 1);
154
155 EXPECT_EQ(log_header[18].message().parts_index(), 0);
156 EXPECT_EQ(log_header[19].message().parts_index(), 1);
157 }
158 }
159
160 const std::vector<LogFile> sorted_log_files = SortParts(logfiles_);
161 {
162 using ::testing::UnorderedElementsAre;
163 std::shared_ptr<const aos::Configuration> config =
164 sorted_log_files[0].config;
165
166 // Timing reports, pings
167 EXPECT_THAT(CountChannelsData(config, logfiles_[2]),
168 UnorderedElementsAre(
169 std::make_tuple("/pi1/aos",
170 "aos.message_bridge.ServerStatistics", 1),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700171 std::make_tuple("/test", "aos.examples.Ping", 1),
172 std::make_tuple("/pi1/aos", "aos.examples.Ping", 1)))
Naman Guptaa63aa132023-03-22 20:06:34 -0700173 << " : " << logfiles_[2];
174 {
175 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700176 std::make_tuple("/pi1/aos", "aos.examples.Ping", 10),
Naman Guptaa63aa132023-03-22 20:06:34 -0700177 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 1),
178 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
179 1)};
180 if (!std::get<0>(GetParam()).shared) {
181 channel_counts.push_back(
182 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
183 "aos-message_bridge-Timestamp",
184 "aos.message_bridge.RemoteMessage", 1));
185 }
186 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
187 ::testing::UnorderedElementsAreArray(channel_counts))
188 << " : " << logfiles_[3];
189 }
190 {
191 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700192 std::make_tuple("/pi1/aos", "aos.examples.Ping", 1990),
Naman Guptaa63aa132023-03-22 20:06:34 -0700193 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 199),
194 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
195 20),
196 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
197 199),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700198 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
Naman Guptaa63aa132023-03-22 20:06:34 -0700199 std::make_tuple("/test", "aos.examples.Ping", 2000)};
200 if (!std::get<0>(GetParam()).shared) {
201 channel_counts.push_back(
202 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
203 "aos-message_bridge-Timestamp",
204 "aos.message_bridge.RemoteMessage", 199));
205 }
206 EXPECT_THAT(CountChannelsData(config, logfiles_[4]),
207 ::testing::UnorderedElementsAreArray(channel_counts))
208 << " : " << logfiles_[4];
209 }
210 // Timestamps for pong
211 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
212 UnorderedElementsAre())
213 << " : " << logfiles_[2];
214 EXPECT_THAT(
215 CountChannelsTimestamp(config, logfiles_[3]),
216 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
217 << " : " << logfiles_[3];
218 EXPECT_THAT(
219 CountChannelsTimestamp(config, logfiles_[4]),
220 UnorderedElementsAre(
221 std::make_tuple("/test", "aos.examples.Pong", 2000),
222 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)))
223 << " : " << logfiles_[4];
224
225 // Pong data.
226 EXPECT_THAT(
227 CountChannelsData(config, logfiles_[5]),
228 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 91)))
229 << " : " << logfiles_[5];
230 EXPECT_THAT(CountChannelsData(config, logfiles_[6]),
231 UnorderedElementsAre(
232 std::make_tuple("/test", "aos.examples.Pong", 1910)))
233 << " : " << logfiles_[6];
234
235 // No timestamps
236 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
237 UnorderedElementsAre())
238 << " : " << logfiles_[5];
239 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[6]),
240 UnorderedElementsAre())
241 << " : " << logfiles_[6];
242
243 // Timing reports and pongs.
244 EXPECT_THAT(CountChannelsData(config, logfiles_[7]),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700245 UnorderedElementsAre(
246 std::make_tuple("/pi2/aos", "aos.examples.Ping", 1),
247 std::make_tuple("/pi2/aos",
248 "aos.message_bridge.ServerStatistics", 1)))
Naman Guptaa63aa132023-03-22 20:06:34 -0700249 << " : " << logfiles_[7];
250 EXPECT_THAT(
251 CountChannelsData(config, logfiles_[8]),
252 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
253 << " : " << logfiles_[8];
254 EXPECT_THAT(
255 CountChannelsData(config, logfiles_[9]),
256 UnorderedElementsAre(
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700257 std::make_tuple("/pi2/aos", "aos.examples.Ping", 2000),
Naman Guptaa63aa132023-03-22 20:06:34 -0700258 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
259 std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
260 20),
261 std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
262 200),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700263 std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
Naman Guptaa63aa132023-03-22 20:06:34 -0700264 std::make_tuple("/test", "aos.examples.Pong", 2000)))
265 << " : " << logfiles_[9];
266 // And ping timestamps.
267 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[7]),
268 UnorderedElementsAre())
269 << " : " << logfiles_[7];
270 EXPECT_THAT(
271 CountChannelsTimestamp(config, logfiles_[8]),
272 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
273 << " : " << logfiles_[8];
274 EXPECT_THAT(
275 CountChannelsTimestamp(config, logfiles_[9]),
276 UnorderedElementsAre(
277 std::make_tuple("/test", "aos.examples.Ping", 2000),
278 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
279 << " : " << logfiles_[9];
280
281 // And then test that the remotely logged timestamp data files only have
282 // timestamps in them.
283 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
284 UnorderedElementsAre())
285 << " : " << logfiles_[10];
286 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
287 UnorderedElementsAre())
288 << " : " << logfiles_[11];
289 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[12]),
290 UnorderedElementsAre())
291 << " : " << logfiles_[12];
292 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[13]),
293 UnorderedElementsAre())
294 << " : " << logfiles_[13];
295
296 EXPECT_THAT(CountChannelsData(config, logfiles_[10]),
297 UnorderedElementsAre(std::make_tuple(
298 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
299 << " : " << logfiles_[10];
300 EXPECT_THAT(CountChannelsData(config, logfiles_[11]),
301 UnorderedElementsAre(std::make_tuple(
302 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
303 << " : " << logfiles_[11];
304
305 EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
306 UnorderedElementsAre(std::make_tuple(
307 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
308 << " : " << logfiles_[12];
309 EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
310 UnorderedElementsAre(std::make_tuple(
311 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
312 << " : " << logfiles_[13];
313
314 // Timestamps from pi2 on pi1, and the other way.
315 if (shared()) {
316 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
317 UnorderedElementsAre())
318 << " : " << logfiles_[14];
319 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
320 UnorderedElementsAre())
321 << " : " << logfiles_[15];
322 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
323 UnorderedElementsAre())
324 << " : " << logfiles_[16];
325 EXPECT_THAT(CountChannelsData(config, logfiles_[17]),
326 UnorderedElementsAre())
327 << " : " << logfiles_[17];
328 EXPECT_THAT(CountChannelsData(config, logfiles_[18]),
329 UnorderedElementsAre())
330 << " : " << logfiles_[18];
331
332 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
333 UnorderedElementsAre(
334 std::make_tuple("/test", "aos.examples.Ping", 1)))
335 << " : " << logfiles_[14];
336 EXPECT_THAT(
337 CountChannelsTimestamp(config, logfiles_[15]),
338 UnorderedElementsAre(
339 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 9),
340 std::make_tuple("/test", "aos.examples.Ping", 90)))
341 << " : " << logfiles_[15];
342 EXPECT_THAT(
343 CountChannelsTimestamp(config, logfiles_[16]),
344 UnorderedElementsAre(
345 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 191),
346 std::make_tuple("/test", "aos.examples.Ping", 1910)))
347 << " : " << logfiles_[16];
348 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
349 UnorderedElementsAre(std::make_tuple(
350 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
351 << " : " << logfiles_[17];
352 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[18]),
353 UnorderedElementsAre(std::make_tuple(
354 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
355 << " : " << logfiles_[18];
356 } else {
357 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
358 UnorderedElementsAre())
359 << " : " << logfiles_[14];
360 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
361 UnorderedElementsAre())
362 << " : " << logfiles_[15];
363 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
364 UnorderedElementsAre())
365 << " : " << logfiles_[16];
366 EXPECT_THAT(CountChannelsData(config, logfiles_[17]),
367 UnorderedElementsAre())
368 << " : " << logfiles_[17];
369 EXPECT_THAT(CountChannelsData(config, logfiles_[18]),
370 UnorderedElementsAre())
371 << " : " << logfiles_[18];
372 EXPECT_THAT(CountChannelsData(config, logfiles_[19]),
373 UnorderedElementsAre())
374 << " : " << logfiles_[19];
375
376 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
377 UnorderedElementsAre(std::make_tuple(
378 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
379 << " : " << logfiles_[14];
380 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
381 UnorderedElementsAre(std::make_tuple(
382 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
383 << " : " << logfiles_[15];
384 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[16]),
385 UnorderedElementsAre(std::make_tuple(
386 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
387 << " : " << logfiles_[16];
388 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
389 UnorderedElementsAre(std::make_tuple(
390 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
391 << " : " << logfiles_[17];
392 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[18]),
393 UnorderedElementsAre(
394 std::make_tuple("/test", "aos.examples.Ping", 91)))
395 << " : " << logfiles_[18];
396 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[19]),
397 UnorderedElementsAre(
398 std::make_tuple("/test", "aos.examples.Ping", 1910)))
399 << " : " << logfiles_[19];
400 }
401 }
402
403 LogReader reader(sorted_log_files);
404
405 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
406 log_reader_factory.set_send_delay(chrono::microseconds(0));
407
408 // This sends out the fetched messages and advances time to the start of the
409 // log file.
410 reader.Register(&log_reader_factory);
411
412 const Node *pi1 =
413 configuration::GetNode(log_reader_factory.configuration(), "pi1");
414 const Node *pi2 =
415 configuration::GetNode(log_reader_factory.configuration(), "pi2");
416
417 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
418 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
419 LOG(INFO) << "now pi1 "
420 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
421 LOG(INFO) << "now pi2 "
422 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
423
424 EXPECT_THAT(reader.LoggedNodes(),
425 ::testing::ElementsAre(
426 configuration::GetNode(reader.logged_configuration(), pi1),
427 configuration::GetNode(reader.logged_configuration(), pi2)));
428
429 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
430
431 std::unique_ptr<EventLoop> pi1_event_loop =
432 log_reader_factory.MakeEventLoop("test", pi1);
433 std::unique_ptr<EventLoop> pi2_event_loop =
434 log_reader_factory.MakeEventLoop("test", pi2);
435
436 int pi1_ping_count = 10;
437 int pi2_ping_count = 10;
438 int pi1_pong_count = 10;
439 int pi2_pong_count = 10;
440
441 // Confirm that the ping value matches.
442 pi1_event_loop->MakeWatcher(
443 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
444 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
445 << pi1_event_loop->context().monotonic_remote_time << " -> "
446 << pi1_event_loop->context().monotonic_event_time;
447 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
448 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
449 pi1_ping_count * chrono::milliseconds(10) +
450 monotonic_clock::epoch());
451 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
452 pi1_ping_count * chrono::milliseconds(10) +
453 realtime_clock::epoch());
454 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
455 pi1_event_loop->context().monotonic_event_time);
456 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
457 pi1_event_loop->context().realtime_event_time);
458
459 ++pi1_ping_count;
460 });
461 pi2_event_loop->MakeWatcher(
462 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
463 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
464 << pi2_event_loop->context().monotonic_remote_time << " -> "
465 << pi2_event_loop->context().monotonic_event_time;
466 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
467
468 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
469 pi2_ping_count * chrono::milliseconds(10) +
470 monotonic_clock::epoch());
471 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
472 pi2_ping_count * chrono::milliseconds(10) +
473 realtime_clock::epoch());
474 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time +
475 chrono::microseconds(150),
476 pi2_event_loop->context().monotonic_event_time);
477 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time +
478 chrono::microseconds(150),
479 pi2_event_loop->context().realtime_event_time);
480 ++pi2_ping_count;
481 });
482
483 constexpr ssize_t kQueueIndexOffset = -9;
484 // Confirm that the ping and pong counts both match, and the value also
485 // matches.
486 pi1_event_loop->MakeWatcher(
487 "/test", [&pi1_event_loop, &pi1_ping_count,
488 &pi1_pong_count](const examples::Pong &pong) {
489 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
490 << pi1_event_loop->context().monotonic_remote_time << " -> "
491 << pi1_event_loop->context().monotonic_event_time;
492
493 EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
494 pi1_pong_count + kQueueIndexOffset);
495 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
496 chrono::microseconds(200) +
497 pi1_pong_count * chrono::milliseconds(10) +
498 monotonic_clock::epoch());
499 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
500 chrono::microseconds(200) +
501 pi1_pong_count * chrono::milliseconds(10) +
502 realtime_clock::epoch());
503
504 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
505 chrono::microseconds(150),
506 pi1_event_loop->context().monotonic_event_time);
507 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
508 chrono::microseconds(150),
509 pi1_event_loop->context().realtime_event_time);
510
511 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
512 ++pi1_pong_count;
513 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
514 });
515 pi2_event_loop->MakeWatcher(
516 "/test", [&pi2_event_loop, &pi2_ping_count,
517 &pi2_pong_count](const examples::Pong &pong) {
518 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
519 << pi2_event_loop->context().monotonic_remote_time << " -> "
520 << pi2_event_loop->context().monotonic_event_time;
521
522 EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
523 pi2_pong_count + kQueueIndexOffset);
524
525 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
526 chrono::microseconds(200) +
527 pi2_pong_count * chrono::milliseconds(10) +
528 monotonic_clock::epoch());
529 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
530 chrono::microseconds(200) +
531 pi2_pong_count * chrono::milliseconds(10) +
532 realtime_clock::epoch());
533
534 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
535 pi2_event_loop->context().monotonic_event_time);
536 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
537 pi2_event_loop->context().realtime_event_time);
538
539 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
540 ++pi2_pong_count;
541 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
542 });
543
544 log_reader_factory.Run();
545 EXPECT_EQ(pi1_ping_count, 2010);
546 EXPECT_EQ(pi2_ping_count, 2010);
547 EXPECT_EQ(pi1_pong_count, 2010);
548 EXPECT_EQ(pi2_pong_count, 2010);
549
550 reader.Deregister();
551}
552
553// Test that if we feed the replay with a mismatched node list that we die on
554// the LogReader constructor.
555TEST_P(MultinodeLoggerDeathTest, MultiNodeBadReplayConfig) {
556 time_converter_.StartEqual();
557 {
558 LoggerState pi1_logger = MakeLogger(pi1_);
559 LoggerState pi2_logger = MakeLogger(pi2_);
560
561 event_loop_factory_.RunFor(chrono::milliseconds(95));
562
563 StartLogger(&pi1_logger);
564 StartLogger(&pi2_logger);
565
566 event_loop_factory_.RunFor(chrono::milliseconds(20000));
567 }
568
569 // Test that, if we add an additional node to the replay config that the
570 // logger complains about the mismatch in number of nodes.
571 FlatbufferDetachedBuffer<Configuration> extra_nodes_config =
572 configuration::MergeWithConfig(&config_.message(), R"({
573 "nodes": [
574 {
575 "name": "extra-node"
576 }
577 ]
578 }
579 )");
580
581 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
582 EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
583 "Log file and replay config need to have matching nodes lists.");
584}
585
586// Tests that we can read log files where they don't start at the same monotonic
587// time.
588TEST_P(MultinodeLoggerTest, StaggeredStart) {
589 time_converter_.StartEqual();
590 std::vector<std::string> actual_filenames;
591
592 {
593 LoggerState pi1_logger = MakeLogger(pi1_);
594 LoggerState pi2_logger = MakeLogger(pi2_);
595
596 event_loop_factory_.RunFor(chrono::milliseconds(95));
597
598 StartLogger(&pi1_logger);
599
600 event_loop_factory_.RunFor(chrono::milliseconds(200));
601
602 StartLogger(&pi2_logger);
603
604 event_loop_factory_.RunFor(chrono::milliseconds(20000));
605 pi1_logger.AppendAllFilenames(&actual_filenames);
606 pi2_logger.AppendAllFilenames(&actual_filenames);
607 }
608
609 // Since we delay starting pi2, it already knows about all the timestamps so
610 // we don't end up with extra parts.
611 LogReader reader(SortParts(actual_filenames));
612
613 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
614 log_reader_factory.set_send_delay(chrono::microseconds(0));
615
616 // This sends out the fetched messages and advances time to the start of the
617 // log file.
618 reader.Register(&log_reader_factory);
619
620 const Node *pi1 =
621 configuration::GetNode(log_reader_factory.configuration(), "pi1");
622 const Node *pi2 =
623 configuration::GetNode(log_reader_factory.configuration(), "pi2");
624
625 EXPECT_THAT(reader.LoggedNodes(),
626 ::testing::ElementsAre(
627 configuration::GetNode(reader.logged_configuration(), pi1),
628 configuration::GetNode(reader.logged_configuration(), pi2)));
629
630 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
631
632 std::unique_ptr<EventLoop> pi1_event_loop =
633 log_reader_factory.MakeEventLoop("test", pi1);
634 std::unique_ptr<EventLoop> pi2_event_loop =
635 log_reader_factory.MakeEventLoop("test", pi2);
636
637 int pi1_ping_count = 30;
638 int pi2_ping_count = 30;
639 int pi1_pong_count = 30;
640 int pi2_pong_count = 30;
641
642 // Confirm that the ping value matches.
643 pi1_event_loop->MakeWatcher(
644 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
645 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
646 << pi1_event_loop->context().monotonic_remote_time << " -> "
647 << pi1_event_loop->context().monotonic_event_time;
648 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
649
650 ++pi1_ping_count;
651 });
652 pi2_event_loop->MakeWatcher(
653 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
654 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
655 << pi2_event_loop->context().monotonic_remote_time << " -> "
656 << pi2_event_loop->context().monotonic_event_time;
657 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
658
659 ++pi2_ping_count;
660 });
661
662 // Confirm that the ping and pong counts both match, and the value also
663 // matches.
664 pi1_event_loop->MakeWatcher(
665 "/test", [&pi1_event_loop, &pi1_ping_count,
666 &pi1_pong_count](const examples::Pong &pong) {
667 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
668 << pi1_event_loop->context().monotonic_remote_time << " -> "
669 << pi1_event_loop->context().monotonic_event_time;
670
671 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
672 ++pi1_pong_count;
673 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
674 });
675 pi2_event_loop->MakeWatcher(
676 "/test", [&pi2_event_loop, &pi2_ping_count,
677 &pi2_pong_count](const examples::Pong &pong) {
678 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
679 << pi2_event_loop->context().monotonic_remote_time << " -> "
680 << pi2_event_loop->context().monotonic_event_time;
681
682 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
683 ++pi2_pong_count;
684 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
685 });
686
687 log_reader_factory.Run();
688 EXPECT_EQ(pi1_ping_count, 2030);
689 EXPECT_EQ(pi2_ping_count, 2030);
690 EXPECT_EQ(pi1_pong_count, 2030);
691 EXPECT_EQ(pi2_pong_count, 2030);
692
693 reader.Deregister();
694}
695
696// Tests that we can read log files where the monotonic clocks drift and don't
697// match correctly. While we are here, also test that different ending times
698// also is readable.
699TEST_P(MultinodeLoggerTest, MismatchedClocks) {
700 // TODO(austin): Negate...
701 const chrono::nanoseconds initial_pi2_offset = chrono::seconds(1000);
702
703 time_converter_.AddMonotonic(
704 {BootTimestamp::epoch(), BootTimestamp::epoch() + initial_pi2_offset});
705 // Wait for 95 ms, (~0.1 seconds - 1/2 of the ping/pong period), and set the
706 // skew to be 200 uS/s
707 const chrono::nanoseconds startup_sleep1 = time_converter_.AddMonotonic(
708 {chrono::milliseconds(95),
709 chrono::milliseconds(95) - chrono::nanoseconds(200) * 95});
710 // Run another 200 ms to have one logger start first.
711 const chrono::nanoseconds startup_sleep2 = time_converter_.AddMonotonic(
712 {chrono::milliseconds(200), chrono::milliseconds(200)});
713 // Slew one way then the other at the same 200 uS/S slew rate. Make sure we
714 // go far enough to cause problems if this isn't accounted for.
715 const chrono::nanoseconds logger_run1 = time_converter_.AddMonotonic(
716 {chrono::milliseconds(20000),
717 chrono::milliseconds(20000) - chrono::nanoseconds(200) * 20000});
718 const chrono::nanoseconds logger_run2 = time_converter_.AddMonotonic(
719 {chrono::milliseconds(40000),
720 chrono::milliseconds(40000) + chrono::nanoseconds(200) * 40000});
721 const chrono::nanoseconds logger_run3 = time_converter_.AddMonotonic(
722 {chrono::milliseconds(400), chrono::milliseconds(400)});
723
724 {
725 LoggerState pi2_logger = MakeLogger(pi2_);
726
727 LOG(INFO) << "pi2 times: " << pi2_->monotonic_now() << " "
728 << pi2_->realtime_now() << " distributed "
729 << pi2_->ToDistributedClock(pi2_->monotonic_now());
730
731 LOG(INFO) << "pi2_ times: " << pi2_->monotonic_now() << " "
732 << pi2_->realtime_now() << " distributed "
733 << pi2_->ToDistributedClock(pi2_->monotonic_now());
734
735 event_loop_factory_.RunFor(startup_sleep1);
736
737 StartLogger(&pi2_logger);
738
739 event_loop_factory_.RunFor(startup_sleep2);
740
741 {
742 // Run pi1's logger for only part of the time.
743 LoggerState pi1_logger = MakeLogger(pi1_);
744
745 StartLogger(&pi1_logger);
746 event_loop_factory_.RunFor(logger_run1);
747
748 // Make sure we slewed time far enough so that the difference is greater
749 // than the network delay. This confirms that if we sort incorrectly, it
750 // would show in the results.
751 EXPECT_LT(
752 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
753 -event_loop_factory_.send_delay() -
754 event_loop_factory_.network_delay());
755
756 event_loop_factory_.RunFor(logger_run2);
757
758 // And now check that we went far enough the other way to make sure we
759 // cover both problems.
760 EXPECT_GT(
761 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
762 event_loop_factory_.send_delay() +
763 event_loop_factory_.network_delay());
764 }
765
766 // And log a bit more on pi2.
767 event_loop_factory_.RunFor(logger_run3);
768 }
769
770 LogReader reader(
James Kuszmaul298b4a22023-06-28 20:01:03 -0700771 SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 3, 3)));
Naman Guptaa63aa132023-03-22 20:06:34 -0700772
773 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
774 log_reader_factory.set_send_delay(chrono::microseconds(0));
775
776 const Node *pi1 =
777 configuration::GetNode(log_reader_factory.configuration(), "pi1");
778 const Node *pi2 =
779 configuration::GetNode(log_reader_factory.configuration(), "pi2");
780
781 // This sends out the fetched messages and advances time to the start of the
782 // log file.
783 reader.Register(&log_reader_factory);
784
785 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
786 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
787 LOG(INFO) << "now pi1 "
788 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
789 LOG(INFO) << "now pi2 "
790 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
791
792 LOG(INFO) << "Done registering (pi1) "
793 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
794 << " "
795 << log_reader_factory.GetNodeEventLoopFactory(pi1)->realtime_now();
796 LOG(INFO) << "Done registering (pi2) "
797 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now()
798 << " "
799 << log_reader_factory.GetNodeEventLoopFactory(pi2)->realtime_now();
800
801 EXPECT_THAT(reader.LoggedNodes(),
802 ::testing::ElementsAre(
803 configuration::GetNode(reader.logged_configuration(), pi1),
804 configuration::GetNode(reader.logged_configuration(), pi2)));
805
806 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
807
808 std::unique_ptr<EventLoop> pi1_event_loop =
809 log_reader_factory.MakeEventLoop("test", pi1);
810 std::unique_ptr<EventLoop> pi2_event_loop =
811 log_reader_factory.MakeEventLoop("test", pi2);
812
813 int pi1_ping_count = 30;
814 int pi2_ping_count = 30;
815 int pi1_pong_count = 30;
816 int pi2_pong_count = 30;
817
818 // Confirm that the ping value matches.
819 pi1_event_loop->MakeWatcher(
820 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
821 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
822 << pi1_event_loop->context().monotonic_remote_time << " -> "
823 << pi1_event_loop->context().monotonic_event_time;
824 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
825
826 ++pi1_ping_count;
827 });
828 pi2_event_loop->MakeWatcher(
829 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
830 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
831 << pi2_event_loop->context().monotonic_remote_time << " -> "
832 << pi2_event_loop->context().monotonic_event_time;
833 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
834
835 ++pi2_ping_count;
836 });
837
838 // Confirm that the ping and pong counts both match, and the value also
839 // matches.
840 pi1_event_loop->MakeWatcher(
841 "/test", [&pi1_event_loop, &pi1_ping_count,
842 &pi1_pong_count](const examples::Pong &pong) {
843 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
844 << pi1_event_loop->context().monotonic_remote_time << " -> "
845 << pi1_event_loop->context().monotonic_event_time;
846
847 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
848 ++pi1_pong_count;
849 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
850 });
851 pi2_event_loop->MakeWatcher(
852 "/test", [&pi2_event_loop, &pi2_ping_count,
853 &pi2_pong_count](const examples::Pong &pong) {
854 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
855 << pi2_event_loop->context().monotonic_remote_time << " -> "
856 << pi2_event_loop->context().monotonic_event_time;
857
858 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
859 ++pi2_pong_count;
860 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
861 });
862
863 log_reader_factory.Run();
864 EXPECT_EQ(pi1_ping_count, 6030);
865 EXPECT_EQ(pi2_ping_count, 6030);
866 EXPECT_EQ(pi1_pong_count, 6030);
867 EXPECT_EQ(pi2_pong_count, 6030);
868
869 reader.Deregister();
870}
871
872// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
873TEST_P(MultinodeLoggerTest, SortParts) {
874 time_converter_.StartEqual();
875 // Make a bunch of parts.
876 {
877 LoggerState pi1_logger = MakeLogger(pi1_);
878 LoggerState pi2_logger = MakeLogger(pi2_);
879
880 event_loop_factory_.RunFor(chrono::milliseconds(95));
881
882 StartLogger(&pi1_logger);
883 StartLogger(&pi2_logger);
884
885 event_loop_factory_.RunFor(chrono::milliseconds(2000));
886 }
887
888 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
889 VerifyParts(sorted_parts);
890}
891
892// Tests that we can sort a bunch of parts with an empty part. We should ignore
893// it and remove it from the sorted list.
894TEST_P(MultinodeLoggerTest, SortEmptyParts) {
895 time_converter_.StartEqual();
896 // Make a bunch of parts.
897 {
898 LoggerState pi1_logger = MakeLogger(pi1_);
899 LoggerState pi2_logger = MakeLogger(pi2_);
900
901 event_loop_factory_.RunFor(chrono::milliseconds(95));
902
903 StartLogger(&pi1_logger);
904 StartLogger(&pi2_logger);
905
906 event_loop_factory_.RunFor(chrono::milliseconds(2000));
907 }
908
909 // TODO(austin): Should we flip out if the file can't open?
910 const std::string kEmptyFile("foobarinvalidfiledoesnotexist" + Extension());
911
912 aos::util::WriteStringToFileOrDie(kEmptyFile, "");
913 logfiles_.emplace_back(kEmptyFile);
914
915 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
916 VerifyParts(sorted_parts, {kEmptyFile});
917}
918
919// Tests that we can sort a bunch of parts with the end missing off a
920// file. We should use the part we can read.
921TEST_P(MultinodeLoggerTest, SortTruncatedParts) {
922 std::vector<std::string> actual_filenames;
923 time_converter_.StartEqual();
924 // Make a bunch of parts.
925 {
926 LoggerState pi1_logger = MakeLogger(pi1_);
927 LoggerState pi2_logger = MakeLogger(pi2_);
928
929 event_loop_factory_.RunFor(chrono::milliseconds(95));
930
931 StartLogger(&pi1_logger);
932 StartLogger(&pi2_logger);
933
934 event_loop_factory_.RunFor(chrono::milliseconds(2000));
935
936 pi1_logger.AppendAllFilenames(&actual_filenames);
937 pi2_logger.AppendAllFilenames(&actual_filenames);
938 }
939
940 ASSERT_THAT(actual_filenames,
941 ::testing::UnorderedElementsAreArray(logfiles_));
942
943 // Strip off the end of one of the files. Pick one with a lot of data.
944 // For snappy, needs to have enough data to be >1 chunk of compressed data so
945 // that we don't corrupt the entire log part.
946 ::std::string compressed_contents =
947 aos::util::ReadFileToStringOrDie(logfiles_[4]);
948
949 aos::util::WriteStringToFileOrDie(
950 logfiles_[4],
951 compressed_contents.substr(0, compressed_contents.size() - 100));
952
953 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
954 VerifyParts(sorted_parts);
955}
956
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700957// Tests that if we remap a logged channel, it shows up correctly.
Naman Guptaa63aa132023-03-22 20:06:34 -0700958TEST_P(MultinodeLoggerTest, RemapLoggedChannel) {
959 time_converter_.StartEqual();
960 {
961 LoggerState pi1_logger = MakeLogger(pi1_);
962 LoggerState pi2_logger = MakeLogger(pi2_);
963
964 event_loop_factory_.RunFor(chrono::milliseconds(95));
965
966 StartLogger(&pi1_logger);
967 StartLogger(&pi2_logger);
968
969 event_loop_factory_.RunFor(chrono::milliseconds(20000));
970 }
971
972 LogReader reader(SortParts(logfiles_));
973
974 // Remap just on pi1.
975 reader.RemapLoggedChannel<aos::timing::Report>(
976 "/aos", configuration::GetNode(reader.configuration(), "pi1"));
977
978 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
979 log_reader_factory.set_send_delay(chrono::microseconds(0));
980
981 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
982 // Note: An extra channel gets remapped automatically due to a timestamp
983 // channel being LOCAL_LOGGER'd.
984 ASSERT_EQ(remapped_channels.size(), std::get<0>(GetParam()).shared ? 1u : 2u);
985 EXPECT_EQ(remapped_channels[0]->name()->string_view(), "/original/pi1/aos");
986 EXPECT_EQ(remapped_channels[0]->type()->string_view(), "aos.timing.Report");
987 if (!std::get<0>(GetParam()).shared) {
988 EXPECT_EQ(remapped_channels[1]->name()->string_view(),
989 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
990 "aos-message_bridge-Timestamp");
991 EXPECT_EQ(remapped_channels[1]->type()->string_view(),
992 "aos.message_bridge.RemoteMessage");
993 }
994
995 reader.Register(&log_reader_factory);
996
997 const Node *pi1 =
998 configuration::GetNode(log_reader_factory.configuration(), "pi1");
999 const Node *pi2 =
1000 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1001
1002 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1003 // else should have moved.
1004 std::unique_ptr<EventLoop> pi1_event_loop =
1005 log_reader_factory.MakeEventLoop("test", pi1);
1006 pi1_event_loop->SkipTimingReport();
1007 std::unique_ptr<EventLoop> full_pi1_event_loop =
1008 log_reader_factory.MakeEventLoop("test", pi1);
1009 full_pi1_event_loop->SkipTimingReport();
1010 std::unique_ptr<EventLoop> pi2_event_loop =
1011 log_reader_factory.MakeEventLoop("test", pi2);
1012 pi2_event_loop->SkipTimingReport();
1013
1014 MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
1015 "/aos");
1016 MessageCounter<aos::timing::Report> full_pi1_timing_report(
1017 full_pi1_event_loop.get(), "/pi1/aos");
1018 MessageCounter<aos::timing::Report> pi1_original_timing_report(
1019 pi1_event_loop.get(), "/original/aos");
1020 MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
1021 full_pi1_event_loop.get(), "/original/pi1/aos");
1022 MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
1023 "/aos");
1024
1025 log_reader_factory.Run();
1026
1027 EXPECT_EQ(pi1_timing_report.count(), 0u);
1028 EXPECT_EQ(full_pi1_timing_report.count(), 0u);
1029 EXPECT_NE(pi1_original_timing_report.count(), 0u);
1030 EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
1031 EXPECT_NE(pi2_timing_report.count(), 0u);
1032
1033 reader.Deregister();
1034}
1035
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001036// Tests that if we rename a logged channel, it shows up correctly.
1037TEST_P(MultinodeLoggerTest, RenameLoggedChannel) {
1038 std::vector<std::string> actual_filenames;
1039 time_converter_.StartEqual();
1040 {
1041 LoggerState pi1_logger = MakeLogger(pi1_);
1042 LoggerState pi2_logger = MakeLogger(pi2_);
1043
1044 event_loop_factory_.RunFor(chrono::milliseconds(95));
1045
1046 StartLogger(&pi1_logger);
1047 StartLogger(&pi2_logger);
1048
1049 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1050
1051 pi1_logger.AppendAllFilenames(&actual_filenames);
1052 pi2_logger.AppendAllFilenames(&actual_filenames);
1053 }
1054
1055 LogReader reader(SortParts(actual_filenames));
1056
1057 // Rename just on pi2. Add some global maps just to verify they get added in
1058 // the config and used correctly.
1059 std::vector<MapT> maps;
1060 {
1061 MapT map;
1062 map.match = std::make_unique<ChannelT>();
1063 map.match->name = "/foo*";
1064 map.match->source_node = "pi1";
1065 map.rename = std::make_unique<ChannelT>();
1066 map.rename->name = "/pi1/foo";
1067 maps.emplace_back(std::move(map));
1068 }
1069 {
1070 MapT map;
1071 map.match = std::make_unique<ChannelT>();
1072 map.match->name = "/foo*";
1073 map.match->source_node = "pi2";
1074 map.rename = std::make_unique<ChannelT>();
1075 map.rename->name = "/pi2/foo";
1076 maps.emplace_back(std::move(map));
1077 }
1078 {
1079 MapT map;
1080 map.match = std::make_unique<ChannelT>();
1081 map.match->name = "/foo";
1082 map.match->type = "aos.examples.Ping";
1083 map.rename = std::make_unique<ChannelT>();
1084 map.rename->name = "/foo/renamed";
1085 maps.emplace_back(std::move(map));
1086 }
1087 reader.RenameLoggedChannel<aos::examples::Ping>(
1088 "/aos", configuration::GetNode(reader.configuration(), "pi2"),
1089 "/pi2/foo/renamed", maps);
1090
1091 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1092 log_reader_factory.set_send_delay(chrono::microseconds(0));
1093
1094 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1095 // Note: An extra channel gets remapped automatically due to a timestamp
1096 // channel being LOCAL_LOGGER'd.
1097 const bool shared = std::get<0>(GetParam()).shared;
1098 ASSERT_EQ(remapped_channels.size(), shared ? 1u : 2u);
1099 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->name()->string_view(),
1100 "/pi2/foo/renamed");
1101 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->type()->string_view(),
1102 "aos.examples.Ping");
1103 if (!shared) {
1104 EXPECT_EQ(remapped_channels[0]->name()->string_view(),
1105 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1106 "aos-message_bridge-Timestamp");
1107 EXPECT_EQ(remapped_channels[0]->type()->string_view(),
1108 "aos.message_bridge.RemoteMessage");
1109 }
1110
1111 reader.Register(&log_reader_factory);
1112
1113 const Node *pi1 =
1114 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1115 const Node *pi2 =
1116 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1117
1118 // Confirm we can read the data on the renamed channel, just for pi2. Nothing
1119 // else should have moved.
1120 std::unique_ptr<EventLoop> pi2_event_loop =
1121 log_reader_factory.MakeEventLoop("test", pi2);
1122 pi2_event_loop->SkipTimingReport();
1123 std::unique_ptr<EventLoop> full_pi2_event_loop =
1124 log_reader_factory.MakeEventLoop("test", pi2);
1125 full_pi2_event_loop->SkipTimingReport();
1126 std::unique_ptr<EventLoop> pi1_event_loop =
1127 log_reader_factory.MakeEventLoop("test", pi1);
1128 pi1_event_loop->SkipTimingReport();
1129
1130 MessageCounter<aos::examples::Ping> pi2_ping(pi2_event_loop.get(), "/aos");
1131 MessageCounter<aos::examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1132 "/foo");
1133 MessageCounter<aos::examples::Ping> full_pi2_renamed_ping(
1134 full_pi2_event_loop.get(), "/pi2/foo/renamed");
1135 MessageCounter<aos::examples::Ping> pi1_ping(pi1_event_loop.get(), "/aos");
1136
1137 log_reader_factory.Run();
1138
1139 EXPECT_EQ(pi2_ping.count(), 0u);
1140 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1141 EXPECT_NE(full_pi2_renamed_ping.count(), 0u);
1142 EXPECT_NE(pi1_ping.count(), 0u);
1143
1144 reader.Deregister();
1145}
1146
Naman Guptaa63aa132023-03-22 20:06:34 -07001147// Tests that we can remap a forwarded channel as well.
1148TEST_P(MultinodeLoggerTest, RemapForwardedLoggedChannel) {
1149 time_converter_.StartEqual();
1150 {
1151 LoggerState pi1_logger = MakeLogger(pi1_);
1152 LoggerState pi2_logger = MakeLogger(pi2_);
1153
1154 event_loop_factory_.RunFor(chrono::milliseconds(95));
1155
1156 StartLogger(&pi1_logger);
1157 StartLogger(&pi2_logger);
1158
1159 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1160 }
1161
1162 LogReader reader(SortParts(logfiles_));
1163
1164 reader.RemapLoggedChannel<examples::Ping>("/test");
1165
1166 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1167 log_reader_factory.set_send_delay(chrono::microseconds(0));
1168
1169 reader.Register(&log_reader_factory);
1170
1171 const Node *pi1 =
1172 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1173 const Node *pi2 =
1174 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1175
1176 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1177 // else should have moved.
1178 std::unique_ptr<EventLoop> pi1_event_loop =
1179 log_reader_factory.MakeEventLoop("test", pi1);
1180 pi1_event_loop->SkipTimingReport();
1181 std::unique_ptr<EventLoop> full_pi1_event_loop =
1182 log_reader_factory.MakeEventLoop("test", pi1);
1183 full_pi1_event_loop->SkipTimingReport();
1184 std::unique_ptr<EventLoop> pi2_event_loop =
1185 log_reader_factory.MakeEventLoop("test", pi2);
1186 pi2_event_loop->SkipTimingReport();
1187
1188 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1189 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1190 MessageCounter<examples::Ping> pi1_original_ping(pi1_event_loop.get(),
1191 "/original/test");
1192 MessageCounter<examples::Ping> pi2_original_ping(pi2_event_loop.get(),
1193 "/original/test");
1194
1195 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1196 pi1_original_ping_timestamp;
1197 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1198 pi1_ping_timestamp;
1199 if (!shared()) {
1200 pi1_original_ping_timestamp =
1201 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1202 pi1_event_loop.get(),
1203 "/pi1/aos/remote_timestamps/pi2/original/test/aos-examples-Ping");
1204 pi1_ping_timestamp =
1205 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1206 pi1_event_loop.get(),
1207 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1208 }
1209
1210 log_reader_factory.Run();
1211
1212 EXPECT_EQ(pi1_ping.count(), 0u);
1213 EXPECT_EQ(pi2_ping.count(), 0u);
1214 EXPECT_NE(pi1_original_ping.count(), 0u);
1215 EXPECT_NE(pi2_original_ping.count(), 0u);
1216 if (!shared()) {
1217 EXPECT_NE(pi1_original_ping_timestamp->count(), 0u);
1218 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1219 }
1220
1221 reader.Deregister();
1222}
1223
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001224// Tests that we can rename a forwarded channel as well.
1225TEST_P(MultinodeLoggerTest, RenameForwardedLoggedChannel) {
1226 std::vector<std::string> actual_filenames;
1227 time_converter_.StartEqual();
1228 {
1229 LoggerState pi1_logger = MakeLogger(pi1_);
1230 LoggerState pi2_logger = MakeLogger(pi2_);
1231
1232 event_loop_factory_.RunFor(chrono::milliseconds(95));
1233
1234 StartLogger(&pi1_logger);
1235 StartLogger(&pi2_logger);
1236
1237 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1238
1239 pi1_logger.AppendAllFilenames(&actual_filenames);
1240 pi2_logger.AppendAllFilenames(&actual_filenames);
1241 }
1242
1243 LogReader reader(SortParts(actual_filenames));
1244
1245 std::vector<MapT> maps;
1246 {
1247 MapT map;
1248 map.match = std::make_unique<ChannelT>();
1249 map.match->name = "/production*";
1250 map.match->source_node = "pi1";
1251 map.rename = std::make_unique<ChannelT>();
1252 map.rename->name = "/pi1/production";
1253 maps.emplace_back(std::move(map));
1254 }
1255 {
1256 MapT map;
1257 map.match = std::make_unique<ChannelT>();
1258 map.match->name = "/production*";
1259 map.match->source_node = "pi2";
1260 map.rename = std::make_unique<ChannelT>();
1261 map.rename->name = "/pi2/production";
1262 maps.emplace_back(std::move(map));
1263 }
1264 reader.RenameLoggedChannel<aos::examples::Ping>(
1265 "/test", configuration::GetNode(reader.configuration(), "pi1"),
1266 "/pi1/production", maps);
1267
1268 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1269 log_reader_factory.set_send_delay(chrono::microseconds(0));
1270
1271 reader.Register(&log_reader_factory);
1272
1273 const Node *pi1 =
1274 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1275 const Node *pi2 =
1276 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1277
1278 // Confirm we can read the data on the renamed channel, on both the source
1279 // node and the remote node. In case of split timestamp channels, confirm that
1280 // we receive the timestamp messages on the renamed channel as well.
1281 std::unique_ptr<EventLoop> pi1_event_loop =
1282 log_reader_factory.MakeEventLoop("test", pi1);
1283 pi1_event_loop->SkipTimingReport();
1284 std::unique_ptr<EventLoop> full_pi1_event_loop =
1285 log_reader_factory.MakeEventLoop("test", pi1);
1286 full_pi1_event_loop->SkipTimingReport();
1287 std::unique_ptr<EventLoop> pi2_event_loop =
1288 log_reader_factory.MakeEventLoop("test", pi2);
1289 pi2_event_loop->SkipTimingReport();
1290
1291 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1292 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1293 MessageCounter<examples::Ping> pi1_renamed_ping(pi1_event_loop.get(),
1294 "/pi1/production");
1295 MessageCounter<examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1296 "/pi1/production");
1297
1298 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1299 pi1_renamed_ping_timestamp;
1300 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1301 pi1_ping_timestamp;
1302 if (!shared()) {
1303 pi1_renamed_ping_timestamp =
1304 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1305 pi1_event_loop.get(),
1306 "/pi1/aos/remote_timestamps/pi2/pi1/production/aos-examples-Ping");
1307 pi1_ping_timestamp =
1308 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1309 pi1_event_loop.get(),
1310 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1311 }
1312
1313 log_reader_factory.Run();
1314
1315 EXPECT_EQ(pi1_ping.count(), 0u);
1316 EXPECT_EQ(pi2_ping.count(), 0u);
1317 EXPECT_NE(pi1_renamed_ping.count(), 0u);
1318 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1319 if (!shared()) {
1320 EXPECT_NE(pi1_renamed_ping_timestamp->count(), 0u);
1321 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1322 }
1323
1324 reader.Deregister();
1325}
1326
Naman Guptaa63aa132023-03-22 20:06:34 -07001327// Tests that we observe all the same events in log replay (for a given node)
1328// whether we just register an event loop for that node or if we register a full
1329// event loop factory.
1330TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
1331 time_converter_.StartEqual();
1332 constexpr chrono::milliseconds kStartupDelay(95);
1333 {
1334 LoggerState pi1_logger = MakeLogger(pi1_);
1335 LoggerState pi2_logger = MakeLogger(pi2_);
1336
1337 event_loop_factory_.RunFor(kStartupDelay);
1338
1339 StartLogger(&pi1_logger);
1340 StartLogger(&pi2_logger);
1341
1342 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1343 }
1344
1345 LogReader full_reader(SortParts(logfiles_));
1346 LogReader single_node_reader(SortParts(logfiles_));
1347
1348 SimulatedEventLoopFactory full_factory(full_reader.configuration());
1349 SimulatedEventLoopFactory single_node_factory(
1350 single_node_reader.configuration());
1351 single_node_factory.SkipTimingReport();
1352 single_node_factory.DisableStatistics();
1353 std::unique_ptr<EventLoop> replay_event_loop =
1354 single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
1355 "log_reader");
1356
1357 full_reader.Register(&full_factory);
1358 single_node_reader.Register(replay_event_loop.get());
1359
1360 const Node *full_pi1 =
1361 configuration::GetNode(full_factory.configuration(), "pi1");
1362
1363 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1364 // else should have moved.
1365 std::unique_ptr<EventLoop> full_event_loop =
1366 full_factory.MakeEventLoop("test", full_pi1);
1367 full_event_loop->SkipTimingReport();
1368 full_event_loop->SkipAosLog();
1369 // maps are indexed on channel index.
1370 // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
1371 std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
1372 observed_messages;
1373 std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
1374 for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
1375 ++ii) {
1376 const Channel *channel =
1377 full_event_loop->configuration()->channels()->Get(ii);
1378 // We currently don't support replaying remote timestamp channels in
1379 // realtime replay (unless the remote timestamp channel was not NOT_LOGGED,
1380 // in which case it gets auto-remapped and replayed on a /original channel).
1381 if (channel->name()->string_view().find("remote_timestamp") !=
1382 std::string_view::npos &&
1383 channel->name()->string_view().find("/original") ==
1384 std::string_view::npos) {
1385 continue;
1386 }
1387 if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
1388 observed_messages[ii] = {};
1389 fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
1390 full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
1391 if (fetchers[ii]->Fetch()) {
1392 observed_messages[ii].push_back(std::make_pair(
1393 fetchers[ii]->context().monotonic_event_time, true));
1394 }
1395 });
1396 full_event_loop->MakeRawNoArgWatcher(
1397 channel, [ii, &observed_messages](const Context &context) {
1398 observed_messages[ii].push_back(
1399 std::make_pair(context.monotonic_event_time, false));
1400 });
1401 }
1402 }
1403
1404 full_factory.Run();
1405 fetchers.clear();
1406 full_reader.Deregister();
1407
1408 const Node *single_node_pi1 =
1409 configuration::GetNode(single_node_factory.configuration(), "pi1");
1410 std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
1411
1412 std::unique_ptr<EventLoop> single_node_event_loop =
1413 single_node_factory.MakeEventLoop("test", single_node_pi1);
1414 single_node_event_loop->SkipTimingReport();
1415 single_node_event_loop->SkipAosLog();
1416 for (size_t ii = 0;
1417 ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
1418 const Channel *channel =
1419 single_node_event_loop->configuration()->channels()->Get(ii);
1420 single_node_factory.DisableForwarding(channel);
1421 if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
1422 single_node_fetchers[ii] =
1423 single_node_event_loop->MakeRawFetcher(channel);
1424 single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
1425 EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
1426 << "Single EventLoop replay doesn't support pre-loading fetchers. "
1427 << configuration::StrippedChannelToString(channel);
1428 });
1429 single_node_event_loop->MakeRawNoArgWatcher(
1430 channel, [ii, &observed_messages, channel,
1431 kStartupDelay](const Context &context) {
1432 if (observed_messages[ii].empty()) {
1433 FAIL() << "Observed extra message at "
1434 << context.monotonic_event_time << " on "
1435 << configuration::StrippedChannelToString(channel);
1436 return;
1437 }
1438 const std::pair<monotonic_clock::time_point, bool> &message =
1439 observed_messages[ii].front();
1440 if (message.second) {
1441 EXPECT_LE(message.first,
1442 context.monotonic_event_time + kStartupDelay)
1443 << "Mismatched message times " << context.monotonic_event_time
1444 << " and " << message.first << " on "
1445 << configuration::StrippedChannelToString(channel);
1446 } else {
1447 EXPECT_EQ(message.first,
1448 context.monotonic_event_time + kStartupDelay)
1449 << "Mismatched message times " << context.monotonic_event_time
1450 << " and " << message.first << " on "
1451 << configuration::StrippedChannelToString(channel);
1452 }
1453 observed_messages[ii].erase(observed_messages[ii].begin());
1454 });
1455 }
1456 }
1457
1458 single_node_factory.Run();
1459
1460 single_node_fetchers.clear();
1461
1462 single_node_reader.Deregister();
1463
1464 for (const auto &pair : observed_messages) {
1465 EXPECT_TRUE(pair.second.empty())
1466 << "Missed " << pair.second.size() << " messages on "
1467 << configuration::StrippedChannelToString(
1468 single_node_event_loop->configuration()->channels()->Get(
1469 pair.first));
1470 }
1471}
1472
1473// Tests that we properly recreate forwarded timestamps when replaying a log.
1474// This should be enough that we can then re-run the logger and get a valid log
1475// back.
1476TEST_P(MultinodeLoggerTest, MessageHeader) {
1477 time_converter_.StartEqual();
1478 {
1479 LoggerState pi1_logger = MakeLogger(pi1_);
1480 LoggerState pi2_logger = MakeLogger(pi2_);
1481
1482 event_loop_factory_.RunFor(chrono::milliseconds(95));
1483
1484 StartLogger(&pi1_logger);
1485 StartLogger(&pi2_logger);
1486
1487 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1488 }
1489
1490 LogReader reader(SortParts(logfiles_));
1491
1492 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1493 log_reader_factory.set_send_delay(chrono::microseconds(0));
1494
1495 // This sends out the fetched messages and advances time to the start of the
1496 // log file.
1497 reader.Register(&log_reader_factory);
1498
1499 const Node *pi1 =
1500 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1501 const Node *pi2 =
1502 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1503
1504 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1505 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1506 LOG(INFO) << "now pi1 "
1507 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1508 LOG(INFO) << "now pi2 "
1509 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1510
1511 EXPECT_THAT(reader.LoggedNodes(),
1512 ::testing::ElementsAre(
1513 configuration::GetNode(reader.logged_configuration(), pi1),
1514 configuration::GetNode(reader.logged_configuration(), pi2)));
1515
1516 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1517
1518 std::unique_ptr<EventLoop> pi1_event_loop =
1519 log_reader_factory.MakeEventLoop("test", pi1);
1520 std::unique_ptr<EventLoop> pi2_event_loop =
1521 log_reader_factory.MakeEventLoop("test", pi2);
1522
1523 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
1524 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1525 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
1526 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1527
1528 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1529 pi1_event_loop->MakeFetcher<examples::Ping>("/test");
1530 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1531 pi2_event_loop->MakeFetcher<examples::Ping>("/test");
1532
1533 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
1534 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1535 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
1536 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1537
1538 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1539 pi2_event_loop->MakeFetcher<examples::Pong>("/test");
1540 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1541 pi1_event_loop->MakeFetcher<examples::Pong>("/test");
1542
1543 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
1544 pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
1545 const size_t ping_timestamp_channel = configuration::ChannelIndex(
1546 pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
1547
1548 const size_t pi2_timestamp_channel = configuration::ChannelIndex(
1549 pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
1550 const size_t pong_timestamp_channel = configuration::ChannelIndex(
1551 pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
1552
1553 const chrono::nanoseconds network_delay = event_loop_factory_.network_delay();
1554 const chrono::nanoseconds send_delay = event_loop_factory_.send_delay();
1555
1556 for (std::pair<int, std::string> channel :
1557 shared()
1558 ? std::vector<
1559 std::pair<int, std::string>>{{-1,
1560 "/aos/remote_timestamps/pi2"}}
1561 : std::vector<std::pair<int, std::string>>{
1562 {pi1_timestamp_channel,
1563 "/aos/remote_timestamps/pi2/pi1/aos/"
1564 "aos-message_bridge-Timestamp"},
1565 {ping_timestamp_channel,
1566 "/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
1567 pi1_event_loop->MakeWatcher(
1568 channel.second,
1569 [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
1570 ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
1571 &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
1572 &ping_on_pi2_fetcher, network_delay, send_delay,
1573 channel_index = channel.first](const RemoteMessage &header) {
1574 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1575 chrono::nanoseconds(header.monotonic_sent_time()));
1576 const aos::realtime_clock::time_point header_realtime_sent_time(
1577 chrono::nanoseconds(header.realtime_sent_time()));
1578 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1579 chrono::nanoseconds(header.monotonic_remote_time()));
1580 const aos::realtime_clock::time_point header_realtime_remote_time(
1581 chrono::nanoseconds(header.realtime_remote_time()));
1582
1583 if (channel_index != -1) {
1584 ASSERT_EQ(channel_index, header.channel_index());
1585 }
1586
1587 const Context *pi1_context = nullptr;
1588 const Context *pi2_context = nullptr;
1589
1590 if (header.channel_index() == pi1_timestamp_channel) {
1591 ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
1592 ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
1593 pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
1594 pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
1595 } else if (header.channel_index() == ping_timestamp_channel) {
1596 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
1597 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
1598 pi1_context = &ping_on_pi1_fetcher.context();
1599 pi2_context = &ping_on_pi2_fetcher.context();
1600 } else {
1601 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1602 << configuration::CleanedChannelToString(
1603 pi1_event_loop->configuration()->channels()->Get(
1604 header.channel_index()));
1605 }
1606
1607 ASSERT_TRUE(header.has_boot_uuid());
1608 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1609 pi2_event_loop->boot_uuid());
1610
1611 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
1612 EXPECT_EQ(pi2_context->remote_queue_index,
1613 header.remote_queue_index());
1614 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
1615
1616 EXPECT_EQ(pi2_context->monotonic_event_time,
1617 header_monotonic_sent_time);
1618 EXPECT_EQ(pi2_context->realtime_event_time,
1619 header_realtime_sent_time);
1620 EXPECT_EQ(pi2_context->realtime_remote_time,
1621 header_realtime_remote_time);
1622 EXPECT_EQ(pi2_context->monotonic_remote_time,
1623 header_monotonic_remote_time);
1624
1625 EXPECT_EQ(pi1_context->realtime_event_time,
1626 header_realtime_remote_time);
1627 EXPECT_EQ(pi1_context->monotonic_event_time,
1628 header_monotonic_remote_time);
1629
1630 // Time estimation isn't perfect, but we know the clocks were
1631 // identical when logged, so we know when this should have come back.
1632 // Confirm we got it when we expected.
1633 EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
1634 pi1_context->monotonic_event_time + 2 * network_delay +
1635 send_delay);
1636 });
1637 }
1638 for (std::pair<int, std::string> channel :
1639 shared()
1640 ? std::vector<
1641 std::pair<int, std::string>>{{-1,
1642 "/aos/remote_timestamps/pi1"}}
1643 : std::vector<std::pair<int, std::string>>{
1644 {pi2_timestamp_channel,
1645 "/aos/remote_timestamps/pi1/pi2/aos/"
1646 "aos-message_bridge-Timestamp"}}) {
1647 pi2_event_loop->MakeWatcher(
1648 channel.second,
1649 [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
1650 pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
1651 &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
1652 &pong_on_pi1_fetcher, network_delay, send_delay,
1653 channel_index = channel.first](const RemoteMessage &header) {
1654 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1655 chrono::nanoseconds(header.monotonic_sent_time()));
1656 const aos::realtime_clock::time_point header_realtime_sent_time(
1657 chrono::nanoseconds(header.realtime_sent_time()));
1658 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1659 chrono::nanoseconds(header.monotonic_remote_time()));
1660 const aos::realtime_clock::time_point header_realtime_remote_time(
1661 chrono::nanoseconds(header.realtime_remote_time()));
1662
1663 if (channel_index != -1) {
1664 ASSERT_EQ(channel_index, header.channel_index());
1665 }
1666
1667 const Context *pi2_context = nullptr;
1668 const Context *pi1_context = nullptr;
1669
1670 if (header.channel_index() == pi2_timestamp_channel) {
1671 ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
1672 ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
1673 pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
1674 pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
1675 } else if (header.channel_index() == pong_timestamp_channel) {
1676 ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
1677 ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
1678 pi2_context = &pong_on_pi2_fetcher.context();
1679 pi1_context = &pong_on_pi1_fetcher.context();
1680 } else {
1681 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1682 << configuration::CleanedChannelToString(
1683 pi2_event_loop->configuration()->channels()->Get(
1684 header.channel_index()));
1685 }
1686
1687 ASSERT_TRUE(header.has_boot_uuid());
1688 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1689 pi1_event_loop->boot_uuid());
1690
1691 EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
1692 EXPECT_EQ(pi1_context->remote_queue_index,
1693 header.remote_queue_index());
1694 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
1695
1696 EXPECT_EQ(pi1_context->monotonic_event_time,
1697 header_monotonic_sent_time);
1698 EXPECT_EQ(pi1_context->realtime_event_time,
1699 header_realtime_sent_time);
1700 EXPECT_EQ(pi1_context->realtime_remote_time,
1701 header_realtime_remote_time);
1702 EXPECT_EQ(pi1_context->monotonic_remote_time,
1703 header_monotonic_remote_time);
1704
1705 EXPECT_EQ(pi2_context->realtime_event_time,
1706 header_realtime_remote_time);
1707 EXPECT_EQ(pi2_context->monotonic_event_time,
1708 header_monotonic_remote_time);
1709
1710 // Time estimation isn't perfect, but we know the clocks were
1711 // identical when logged, so we know when this should have come back.
1712 // Confirm we got it when we expected.
1713 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
1714 pi2_context->monotonic_event_time + 2 * network_delay +
1715 send_delay);
1716 });
1717 }
1718
1719 // And confirm we can re-create a log again, while checking the contents.
1720 {
1721 LoggerState pi1_logger = MakeLogger(
1722 log_reader_factory.GetNodeEventLoopFactory("pi1"), &log_reader_factory);
1723 LoggerState pi2_logger = MakeLogger(
1724 log_reader_factory.GetNodeEventLoopFactory("pi2"), &log_reader_factory);
1725
1726 StartLogger(&pi1_logger, tmp_dir_ + "/relogged1");
1727 StartLogger(&pi2_logger, tmp_dir_ + "/relogged2");
1728
1729 log_reader_factory.Run();
1730 }
1731
1732 reader.Deregister();
1733
1734 // And verify that we can run the LogReader over the relogged files without
1735 // hitting any fatal errors.
1736 {
1737 LogReader relogged_reader(SortParts(MakeLogFiles(
1738 tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 3, 3, true)));
1739 relogged_reader.Register();
1740
1741 relogged_reader.event_loop_factory()->Run();
1742 }
1743 // And confirm that we can read the logged file using the reader's
1744 // configuration.
1745 {
1746 LogReader relogged_reader(
1747 SortParts(MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2",
1748 3, 3, true)),
1749 reader.configuration());
1750 relogged_reader.Register();
1751
1752 relogged_reader.event_loop_factory()->Run();
1753 }
1754}
1755
1756// Tests that we properly populate and extract the logger_start time by setting
1757// up a clock difference between 2 nodes and looking at the resulting parts.
1758TEST_P(MultinodeLoggerTest, LoggerStartTime) {
1759 std::vector<std::string> actual_filenames;
1760 time_converter_.AddMonotonic(
1761 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1762 {
1763 LoggerState pi1_logger = MakeLogger(pi1_);
1764 LoggerState pi2_logger = MakeLogger(pi2_);
1765
1766 StartLogger(&pi1_logger);
1767 StartLogger(&pi2_logger);
1768
1769 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1770
1771 pi1_logger.AppendAllFilenames(&actual_filenames);
1772 pi2_logger.AppendAllFilenames(&actual_filenames);
1773 }
1774
1775 ASSERT_THAT(actual_filenames,
1776 ::testing::UnorderedElementsAreArray(logfiles_));
1777
1778 for (const LogFile &log_file : SortParts(logfiles_)) {
1779 for (const LogParts &log_part : log_file.parts) {
1780 if (log_part.node == log_file.logger_node) {
1781 EXPECT_EQ(log_part.logger_monotonic_start_time,
1782 aos::monotonic_clock::min_time);
1783 EXPECT_EQ(log_part.logger_realtime_start_time,
1784 aos::realtime_clock::min_time);
1785 } else {
1786 const chrono::seconds offset = log_file.logger_node == "pi1"
1787 ? -chrono::seconds(1000)
1788 : chrono::seconds(1000);
1789 EXPECT_EQ(log_part.logger_monotonic_start_time,
1790 log_part.monotonic_start_time + offset);
1791 EXPECT_EQ(log_part.logger_realtime_start_time,
1792 log_file.realtime_start_time +
1793 (log_part.logger_monotonic_start_time -
1794 log_file.monotonic_start_time));
1795 }
1796 }
1797 }
1798}
1799
1800// Test that renaming the base, renames the folder.
1801TEST_P(MultinodeLoggerTest, LoggerRenameFolder) {
1802 util::UnlinkRecursive(tmp_dir_ + "/renamefolder");
1803 util::UnlinkRecursive(tmp_dir_ + "/new-good");
1804 time_converter_.AddMonotonic(
1805 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1806 logfile_base1_ = tmp_dir_ + "/renamefolder/multi_logfile1";
1807 logfile_base2_ = tmp_dir_ + "/renamefolder/multi_logfile2";
1808 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
1809 LoggerState pi1_logger = MakeLogger(pi1_);
1810 LoggerState pi2_logger = MakeLogger(pi2_);
1811
1812 StartLogger(&pi1_logger);
1813 StartLogger(&pi2_logger);
1814
1815 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1816 logfile_base1_ = tmp_dir_ + "/new-good/multi_logfile1";
1817 logfile_base2_ = tmp_dir_ + "/new-good/multi_logfile2";
1818 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
Alexei Strotscaf17d32023-04-03 22:31:11 -07001819
1820 // Sequence of set_base_name and Rotate simulates rename operation. Since
1821 // rename is not supported by all namers, RenameLogBase moved from logger to
1822 // the higher level abstraction, yet log_namers support rename, and it is
1823 // legal to test it here.
1824 pi1_logger.log_namer->set_base_name(logfile_base1_);
1825 pi1_logger.logger->Rotate();
1826 pi2_logger.log_namer->set_base_name(logfile_base2_);
1827 pi2_logger.logger->Rotate();
1828
Naman Guptaa63aa132023-03-22 20:06:34 -07001829 for (auto &file : logfiles_) {
1830 struct stat s;
1831 EXPECT_EQ(0, stat(file.c_str(), &s));
1832 }
1833}
1834
1835// Test that renaming the file base dies.
1836TEST_P(MultinodeLoggerDeathTest, LoggerRenameFile) {
1837 time_converter_.AddMonotonic(
1838 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1839 util::UnlinkRecursive(tmp_dir_ + "/renamefile");
1840 logfile_base1_ = tmp_dir_ + "/renamefile/multi_logfile1";
1841 logfile_base2_ = tmp_dir_ + "/renamefile/multi_logfile2";
1842 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
1843 LoggerState pi1_logger = MakeLogger(pi1_);
1844 StartLogger(&pi1_logger);
1845 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1846 logfile_base1_ = tmp_dir_ + "/new-renamefile/new_multi_logfile1";
Alexei Strotscaf17d32023-04-03 22:31:11 -07001847 EXPECT_DEATH({ pi1_logger.log_namer->set_base_name(logfile_base1_); },
Naman Guptaa63aa132023-03-22 20:06:34 -07001848 "Rename of file base from");
1849}
1850
1851// TODO(austin): We can write a test which recreates a logfile and confirms that
1852// we get it back. That is the ultimate test.
1853
1854// Tests that we properly recreate forwarded timestamps when replaying a log.
1855// This should be enough that we can then re-run the logger and get a valid log
1856// back.
1857TEST_P(MultinodeLoggerTest, RemoteReboot) {
1858 std::vector<std::string> actual_filenames;
1859
1860 const UUID pi1_boot0 = UUID::Random();
1861 const UUID pi2_boot0 = UUID::Random();
1862 const UUID pi2_boot1 = UUID::Random();
1863 {
1864 CHECK_EQ(pi1_index_, 0u);
1865 CHECK_EQ(pi2_index_, 1u);
1866
1867 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
1868 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
1869 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
1870
1871 time_converter_.AddNextTimestamp(
1872 distributed_clock::epoch(),
1873 {BootTimestamp::epoch(), BootTimestamp::epoch()});
1874 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
1875 time_converter_.AddNextTimestamp(
1876 distributed_clock::epoch() + reboot_time,
1877 {BootTimestamp::epoch() + reboot_time,
1878 BootTimestamp{
1879 .boot = 1,
1880 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
1881 }
1882
1883 {
1884 LoggerState pi1_logger = MakeLogger(pi1_);
1885
1886 event_loop_factory_.RunFor(chrono::milliseconds(95));
1887 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
1888 pi1_boot0);
1889 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
1890 pi2_boot0);
1891
1892 StartLogger(&pi1_logger);
1893
1894 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1895
1896 VLOG(1) << "Reboot now!";
1897
1898 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1899 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
1900 pi1_boot0);
1901 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
1902 pi2_boot1);
1903
1904 pi1_logger.AppendAllFilenames(&actual_filenames);
1905 }
1906
1907 std::sort(actual_filenames.begin(), actual_filenames.end());
1908 std::sort(pi1_reboot_logfiles_.begin(), pi1_reboot_logfiles_.end());
1909 ASSERT_THAT(actual_filenames,
1910 ::testing::UnorderedElementsAreArray(pi1_reboot_logfiles_));
1911
1912 // Confirm that our new oldest timestamps properly update as we reboot and
1913 // rotate.
1914 for (const std::string &file : pi1_reboot_logfiles_) {
1915 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
1916 ReadHeader(file);
1917 CHECK(log_header);
1918 if (log_header->message().has_configuration()) {
1919 continue;
1920 }
1921
1922 const monotonic_clock::time_point monotonic_start_time =
1923 monotonic_clock::time_point(
1924 chrono::nanoseconds(log_header->message().monotonic_start_time()));
1925 const UUID source_node_boot_uuid = UUID::FromString(
1926 log_header->message().source_node_boot_uuid()->string_view());
1927
1928 if (log_header->message().node()->name()->string_view() != "pi1") {
1929 // The remote message channel should rotate later and have more parts.
1930 // This only is true on the log files with shared remote messages.
1931 //
1932 // TODO(austin): I'm not the most thrilled with this test pattern... It
1933 // feels brittle in a different way.
1934 if (file.find("aos.message_bridge.RemoteMessage") == std::string::npos ||
1935 !shared()) {
1936 switch (log_header->message().parts_index()) {
1937 case 0:
1938 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1939 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
1940 break;
1941 case 1:
1942 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1943 ASSERT_EQ(monotonic_start_time,
1944 monotonic_clock::epoch() + chrono::seconds(1));
1945 break;
1946 case 2:
1947 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1948 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
1949 break;
1950 case 3:
1951 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1952 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
1953 chrono::nanoseconds(2322999462))
1954 << " on " << file;
1955 break;
1956 default:
1957 FAIL();
1958 break;
1959 }
1960 } else {
1961 switch (log_header->message().parts_index()) {
1962 case 0:
1963 case 1:
1964 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1965 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
1966 break;
1967 case 2:
1968 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1969 ASSERT_EQ(monotonic_start_time,
1970 monotonic_clock::epoch() + chrono::seconds(1));
1971 break;
1972 case 3:
1973 case 4:
1974 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1975 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
1976 break;
1977 case 5:
1978 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1979 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
1980 chrono::nanoseconds(2322999462))
1981 << " on " << file;
1982 break;
1983 default:
1984 FAIL();
1985 break;
1986 }
1987 }
1988 continue;
1989 }
1990 SCOPED_TRACE(file);
1991 SCOPED_TRACE(aos::FlatbufferToJson(
1992 *log_header, {.multi_line = true, .max_vector_size = 100}));
1993 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
1994 ASSERT_EQ(
1995 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
1996 EXPECT_EQ(
1997 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
1998 monotonic_clock::max_time.time_since_epoch().count());
1999 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2000 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2001 2u);
2002 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2003 monotonic_clock::max_time.time_since_epoch().count());
2004 ASSERT_TRUE(log_header->message()
2005 .has_oldest_remote_unreliable_monotonic_timestamps());
2006 ASSERT_EQ(log_header->message()
2007 .oldest_remote_unreliable_monotonic_timestamps()
2008 ->size(),
2009 2u);
2010 EXPECT_EQ(log_header->message()
2011 .oldest_remote_unreliable_monotonic_timestamps()
2012 ->Get(0),
2013 monotonic_clock::max_time.time_since_epoch().count());
2014 ASSERT_TRUE(log_header->message()
2015 .has_oldest_local_unreliable_monotonic_timestamps());
2016 ASSERT_EQ(log_header->message()
2017 .oldest_local_unreliable_monotonic_timestamps()
2018 ->size(),
2019 2u);
2020 EXPECT_EQ(log_header->message()
2021 .oldest_local_unreliable_monotonic_timestamps()
2022 ->Get(0),
2023 monotonic_clock::max_time.time_since_epoch().count());
2024
2025 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2026 monotonic_clock::time_point(chrono::nanoseconds(
2027 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2028 1)));
2029 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2030 monotonic_clock::time_point(chrono::nanoseconds(
2031 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2032 const monotonic_clock::time_point
2033 oldest_remote_unreliable_monotonic_timestamps =
2034 monotonic_clock::time_point(chrono::nanoseconds(
2035 log_header->message()
2036 .oldest_remote_unreliable_monotonic_timestamps()
2037 ->Get(1)));
2038 const monotonic_clock::time_point
2039 oldest_local_unreliable_monotonic_timestamps =
2040 monotonic_clock::time_point(chrono::nanoseconds(
2041 log_header->message()
2042 .oldest_local_unreliable_monotonic_timestamps()
2043 ->Get(1)));
2044 const monotonic_clock::time_point
2045 oldest_remote_reliable_monotonic_timestamps =
2046 monotonic_clock::time_point(chrono::nanoseconds(
2047 log_header->message()
2048 .oldest_remote_reliable_monotonic_timestamps()
2049 ->Get(1)));
2050 const monotonic_clock::time_point
2051 oldest_local_reliable_monotonic_timestamps =
2052 monotonic_clock::time_point(chrono::nanoseconds(
2053 log_header->message()
2054 .oldest_local_reliable_monotonic_timestamps()
2055 ->Get(1)));
2056 const monotonic_clock::time_point
2057 oldest_logger_remote_unreliable_monotonic_timestamps =
2058 monotonic_clock::time_point(chrono::nanoseconds(
2059 log_header->message()
2060 .oldest_logger_remote_unreliable_monotonic_timestamps()
2061 ->Get(0)));
2062 const monotonic_clock::time_point
2063 oldest_logger_local_unreliable_monotonic_timestamps =
2064 monotonic_clock::time_point(chrono::nanoseconds(
2065 log_header->message()
2066 .oldest_logger_local_unreliable_monotonic_timestamps()
2067 ->Get(0)));
2068 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2069 monotonic_clock::max_time);
2070 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2071 monotonic_clock::max_time);
2072 switch (log_header->message().parts_index()) {
2073 case 0:
2074 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2075 monotonic_clock::max_time);
2076 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2077 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2078 monotonic_clock::max_time);
2079 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2080 monotonic_clock::max_time);
2081 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2082 monotonic_clock::max_time);
2083 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2084 monotonic_clock::max_time);
2085 break;
2086 case 1:
2087 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2088 monotonic_clock::time_point(chrono::microseconds(90200)));
2089 EXPECT_EQ(oldest_local_monotonic_timestamps,
2090 monotonic_clock::time_point(chrono::microseconds(90350)));
2091 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2092 monotonic_clock::time_point(chrono::microseconds(90200)));
2093 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2094 monotonic_clock::time_point(chrono::microseconds(90350)));
2095 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2096 monotonic_clock::max_time);
2097 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2098 monotonic_clock::max_time);
2099 break;
2100 case 2:
2101 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2102 monotonic_clock::time_point(chrono::microseconds(90200)))
2103 << file;
2104 EXPECT_EQ(oldest_local_monotonic_timestamps,
2105 monotonic_clock::time_point(chrono::microseconds(90350)))
2106 << file;
2107 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2108 monotonic_clock::time_point(chrono::microseconds(90200)))
2109 << file;
2110 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2111 monotonic_clock::time_point(chrono::microseconds(90350)))
2112 << file;
2113 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2114 monotonic_clock::time_point(chrono::microseconds(100000)))
2115 << file;
2116 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2117 monotonic_clock::time_point(chrono::microseconds(100150)))
2118 << file;
2119 break;
2120 case 3:
2121 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2122 monotonic_clock::time_point(chrono::milliseconds(1323) +
2123 chrono::microseconds(200)));
2124 EXPECT_EQ(oldest_local_monotonic_timestamps,
2125 monotonic_clock::time_point(chrono::microseconds(10100350)));
2126 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2127 monotonic_clock::time_point(chrono::milliseconds(1323) +
2128 chrono::microseconds(200)));
2129 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2130 monotonic_clock::time_point(chrono::microseconds(10100350)));
2131 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2132 monotonic_clock::max_time)
2133 << file;
2134 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2135 monotonic_clock::max_time)
2136 << file;
2137 break;
2138 case 4:
2139 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2140 monotonic_clock::time_point(chrono::milliseconds(1323) +
2141 chrono::microseconds(200)));
2142 EXPECT_EQ(oldest_local_monotonic_timestamps,
2143 monotonic_clock::time_point(chrono::microseconds(10100350)));
2144 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2145 monotonic_clock::time_point(chrono::milliseconds(1323) +
2146 chrono::microseconds(200)));
2147 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2148 monotonic_clock::time_point(chrono::microseconds(10100350)));
2149 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2150 monotonic_clock::time_point(chrono::microseconds(1423000)))
2151 << file;
2152 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2153 monotonic_clock::time_point(chrono::microseconds(10200150)))
2154 << file;
2155 break;
2156 default:
2157 FAIL();
2158 break;
2159 }
2160 }
2161
2162 // Confirm that we refuse to replay logs with missing boot uuids.
2163 {
2164 LogReader reader(SortParts(pi1_reboot_logfiles_));
2165
2166 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2167 log_reader_factory.set_send_delay(chrono::microseconds(0));
2168
2169 // This sends out the fetched messages and advances time to the start of
2170 // the log file.
2171 reader.Register(&log_reader_factory);
2172
2173 log_reader_factory.Run();
2174
2175 reader.Deregister();
2176 }
2177}
2178
2179// Tests that we can sort a log which only has timestamps from the remote
2180// because the local message_bridge_client failed to connect.
2181TEST_P(MultinodeLoggerTest, RemoteRebootOnlyTimestamps) {
2182 const UUID pi1_boot0 = UUID::Random();
2183 const UUID pi2_boot0 = UUID::Random();
2184 const UUID pi2_boot1 = UUID::Random();
2185 {
2186 CHECK_EQ(pi1_index_, 0u);
2187 CHECK_EQ(pi2_index_, 1u);
2188
2189 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2190 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2191 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2192
2193 time_converter_.AddNextTimestamp(
2194 distributed_clock::epoch(),
2195 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2196 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2197 time_converter_.AddNextTimestamp(
2198 distributed_clock::epoch() + reboot_time,
2199 {BootTimestamp::epoch() + reboot_time,
2200 BootTimestamp{
2201 .boot = 1,
2202 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2203 }
2204 pi2_->Disconnect(pi1_->node());
2205
2206 std::vector<std::string> filenames;
2207 {
2208 LoggerState pi1_logger = MakeLogger(pi1_);
2209
2210 event_loop_factory_.RunFor(chrono::milliseconds(95));
2211 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2212 pi1_boot0);
2213 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2214 pi2_boot0);
2215
2216 StartLogger(&pi1_logger);
2217
2218 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2219
2220 VLOG(1) << "Reboot now!";
2221
2222 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2223 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2224 pi1_boot0);
2225 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2226 pi2_boot1);
2227 pi1_logger.AppendAllFilenames(&filenames);
2228 }
2229
2230 std::sort(filenames.begin(), filenames.end());
2231
2232 // Confirm that our new oldest timestamps properly update as we reboot and
2233 // rotate.
2234 size_t timestamp_file_count = 0;
2235 for (const std::string &file : filenames) {
2236 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2237 ReadHeader(file);
2238 CHECK(log_header);
2239
2240 if (log_header->message().has_configuration()) {
2241 continue;
2242 }
2243
2244 const monotonic_clock::time_point monotonic_start_time =
2245 monotonic_clock::time_point(
2246 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2247 const UUID source_node_boot_uuid = UUID::FromString(
2248 log_header->message().source_node_boot_uuid()->string_view());
2249
2250 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2251 ASSERT_EQ(
2252 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2253 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2254 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2255 2u);
2256 ASSERT_TRUE(log_header->message()
2257 .has_oldest_remote_unreliable_monotonic_timestamps());
2258 ASSERT_EQ(log_header->message()
2259 .oldest_remote_unreliable_monotonic_timestamps()
2260 ->size(),
2261 2u);
2262 ASSERT_TRUE(log_header->message()
2263 .has_oldest_local_unreliable_monotonic_timestamps());
2264 ASSERT_EQ(log_header->message()
2265 .oldest_local_unreliable_monotonic_timestamps()
2266 ->size(),
2267 2u);
2268 ASSERT_TRUE(log_header->message()
2269 .has_oldest_remote_reliable_monotonic_timestamps());
2270 ASSERT_EQ(log_header->message()
2271 .oldest_remote_reliable_monotonic_timestamps()
2272 ->size(),
2273 2u);
2274 ASSERT_TRUE(
2275 log_header->message().has_oldest_local_reliable_monotonic_timestamps());
2276 ASSERT_EQ(log_header->message()
2277 .oldest_local_reliable_monotonic_timestamps()
2278 ->size(),
2279 2u);
2280
2281 ASSERT_TRUE(
2282 log_header->message()
2283 .has_oldest_logger_remote_unreliable_monotonic_timestamps());
2284 ASSERT_EQ(log_header->message()
2285 .oldest_logger_remote_unreliable_monotonic_timestamps()
2286 ->size(),
2287 2u);
2288 ASSERT_TRUE(log_header->message()
2289 .has_oldest_logger_local_unreliable_monotonic_timestamps());
2290 ASSERT_EQ(log_header->message()
2291 .oldest_logger_local_unreliable_monotonic_timestamps()
2292 ->size(),
2293 2u);
2294
2295 if (log_header->message().node()->name()->string_view() != "pi1") {
2296 ASSERT_TRUE(file.find("aos.message_bridge.RemoteMessage") !=
2297 std::string::npos);
2298
2299 const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
2300 ReadNthMessage(file, 0);
2301 CHECK(msg);
2302
2303 EXPECT_TRUE(msg->message().has_monotonic_sent_time());
2304 EXPECT_TRUE(msg->message().has_monotonic_remote_time());
2305
2306 const monotonic_clock::time_point
2307 expected_oldest_local_monotonic_timestamps(
2308 chrono::nanoseconds(msg->message().monotonic_sent_time()));
2309 const monotonic_clock::time_point
2310 expected_oldest_remote_monotonic_timestamps(
2311 chrono::nanoseconds(msg->message().monotonic_remote_time()));
2312 const monotonic_clock::time_point
2313 expected_oldest_timestamp_monotonic_timestamps(
2314 chrono::nanoseconds(msg->message().monotonic_timestamp_time()));
2315
2316 EXPECT_NE(expected_oldest_local_monotonic_timestamps,
2317 monotonic_clock::min_time);
2318 EXPECT_NE(expected_oldest_remote_monotonic_timestamps,
2319 monotonic_clock::min_time);
2320 EXPECT_NE(expected_oldest_timestamp_monotonic_timestamps,
2321 monotonic_clock::min_time);
2322
2323 ++timestamp_file_count;
2324 // Since the log file is from the perspective of the other node,
2325 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2326 monotonic_clock::time_point(chrono::nanoseconds(
2327 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2328 0)));
2329 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2330 monotonic_clock::time_point(chrono::nanoseconds(
2331 log_header->message().oldest_local_monotonic_timestamps()->Get(
2332 0)));
2333 const monotonic_clock::time_point
2334 oldest_remote_unreliable_monotonic_timestamps =
2335 monotonic_clock::time_point(chrono::nanoseconds(
2336 log_header->message()
2337 .oldest_remote_unreliable_monotonic_timestamps()
2338 ->Get(0)));
2339 const monotonic_clock::time_point
2340 oldest_local_unreliable_monotonic_timestamps =
2341 monotonic_clock::time_point(chrono::nanoseconds(
2342 log_header->message()
2343 .oldest_local_unreliable_monotonic_timestamps()
2344 ->Get(0)));
2345 const monotonic_clock::time_point
2346 oldest_remote_reliable_monotonic_timestamps =
2347 monotonic_clock::time_point(chrono::nanoseconds(
2348 log_header->message()
2349 .oldest_remote_reliable_monotonic_timestamps()
2350 ->Get(0)));
2351 const monotonic_clock::time_point
2352 oldest_local_reliable_monotonic_timestamps =
2353 monotonic_clock::time_point(chrono::nanoseconds(
2354 log_header->message()
2355 .oldest_local_reliable_monotonic_timestamps()
2356 ->Get(0)));
2357 const monotonic_clock::time_point
2358 oldest_logger_remote_unreliable_monotonic_timestamps =
2359 monotonic_clock::time_point(chrono::nanoseconds(
2360 log_header->message()
2361 .oldest_logger_remote_unreliable_monotonic_timestamps()
2362 ->Get(1)));
2363 const monotonic_clock::time_point
2364 oldest_logger_local_unreliable_monotonic_timestamps =
2365 monotonic_clock::time_point(chrono::nanoseconds(
2366 log_header->message()
2367 .oldest_logger_local_unreliable_monotonic_timestamps()
2368 ->Get(1)));
2369
2370 const Channel *channel =
2371 event_loop_factory_.configuration()->channels()->Get(
2372 msg->message().channel_index());
2373 const Connection *connection = configuration::ConnectionToNode(
2374 channel, configuration::GetNode(
2375 event_loop_factory_.configuration(),
2376 log_header->message().node()->name()->string_view()));
2377
2378 const bool reliable = connection->time_to_live() == 0;
2379
2380 SCOPED_TRACE(file);
2381 SCOPED_TRACE(aos::FlatbufferToJson(
2382 *log_header, {.multi_line = true, .max_vector_size = 100}));
2383
2384 if (shared()) {
2385 // Confirm that the oldest timestamps match what we expect. Based on
2386 // what we are doing, we know that the oldest time is the first
2387 // message's time.
2388 //
2389 // This makes the test robust to both the split and combined config
2390 // tests.
2391 switch (log_header->message().parts_index()) {
2392 case 0:
2393 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2394 expected_oldest_remote_monotonic_timestamps);
2395 EXPECT_EQ(oldest_local_monotonic_timestamps,
2396 expected_oldest_local_monotonic_timestamps);
2397 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2398 expected_oldest_local_monotonic_timestamps)
2399 << file;
2400 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2401 expected_oldest_timestamp_monotonic_timestamps)
2402 << file;
2403
2404 if (reliable) {
2405 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2406 expected_oldest_remote_monotonic_timestamps);
2407 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2408 expected_oldest_local_monotonic_timestamps);
2409 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2410 monotonic_clock::max_time);
2411 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2412 monotonic_clock::max_time);
2413 } else {
2414 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2415 monotonic_clock::max_time);
2416 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2417 monotonic_clock::max_time);
2418 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2419 expected_oldest_remote_monotonic_timestamps);
2420 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2421 expected_oldest_local_monotonic_timestamps);
2422 }
2423 break;
2424 case 1:
2425 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2426 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2427 EXPECT_EQ(oldest_local_monotonic_timestamps,
2428 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2429 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2430 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2431 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2432 monotonic_clock::epoch() + chrono::nanoseconds(90250000));
2433 if (reliable) {
2434 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2435 expected_oldest_remote_monotonic_timestamps);
2436 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2437 expected_oldest_local_monotonic_timestamps);
2438 EXPECT_EQ(
2439 oldest_remote_unreliable_monotonic_timestamps,
2440 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2441 EXPECT_EQ(
2442 oldest_local_unreliable_monotonic_timestamps,
2443 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2444 } else {
2445 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2446 monotonic_clock::max_time);
2447 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2448 monotonic_clock::max_time);
2449 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2450 expected_oldest_remote_monotonic_timestamps);
2451 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2452 expected_oldest_local_monotonic_timestamps);
2453 }
2454 break;
2455 case 2:
2456 EXPECT_EQ(
2457 oldest_remote_monotonic_timestamps,
2458 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2459 EXPECT_EQ(
2460 oldest_local_monotonic_timestamps,
2461 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2462 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2463 expected_oldest_local_monotonic_timestamps)
2464 << file;
2465 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2466 expected_oldest_timestamp_monotonic_timestamps)
2467 << file;
2468 if (reliable) {
2469 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2470 expected_oldest_remote_monotonic_timestamps);
2471 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2472 expected_oldest_local_monotonic_timestamps);
2473 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2474 monotonic_clock::max_time);
2475 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2476 monotonic_clock::max_time);
2477 } else {
2478 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2479 monotonic_clock::max_time);
2480 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2481 monotonic_clock::max_time);
2482 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2483 expected_oldest_remote_monotonic_timestamps);
2484 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2485 expected_oldest_local_monotonic_timestamps);
2486 }
2487 break;
2488
2489 case 3:
2490 EXPECT_EQ(
2491 oldest_remote_monotonic_timestamps,
2492 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2493 EXPECT_EQ(
2494 oldest_local_monotonic_timestamps,
2495 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2496 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2497 expected_oldest_remote_monotonic_timestamps);
2498 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2499 expected_oldest_local_monotonic_timestamps);
2500 EXPECT_EQ(
2501 oldest_logger_remote_unreliable_monotonic_timestamps,
2502 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2503 EXPECT_EQ(
2504 oldest_logger_local_unreliable_monotonic_timestamps,
2505 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2506 break;
2507 default:
2508 FAIL();
2509 break;
2510 }
2511
2512 switch (log_header->message().parts_index()) {
2513 case 0:
2514 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2515 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2516 break;
2517 case 1:
2518 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2519 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2520 break;
2521 case 2:
2522 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2523 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2524 break;
2525 case 3:
2526 if (shared()) {
2527 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2528 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2529 break;
2530 }
2531 [[fallthrough]];
2532 default:
2533 FAIL();
2534 break;
2535 }
2536 } else {
2537 switch (log_header->message().parts_index()) {
2538 case 0:
2539 if (reliable) {
2540 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2541 monotonic_clock::max_time);
2542 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2543 monotonic_clock::max_time);
2544 EXPECT_EQ(
2545 oldest_logger_remote_unreliable_monotonic_timestamps,
2546 monotonic_clock::epoch() + chrono::nanoseconds(100150000))
2547 << file;
2548 EXPECT_EQ(
2549 oldest_logger_local_unreliable_monotonic_timestamps,
2550 monotonic_clock::epoch() + chrono::nanoseconds(100250000))
2551 << file;
2552 } else {
2553 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2554 expected_oldest_remote_monotonic_timestamps);
2555 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2556 expected_oldest_local_monotonic_timestamps);
2557 EXPECT_EQ(
2558 oldest_logger_remote_unreliable_monotonic_timestamps,
2559 monotonic_clock::epoch() + chrono::nanoseconds(90150000))
2560 << file;
2561 EXPECT_EQ(
2562 oldest_logger_local_unreliable_monotonic_timestamps,
2563 monotonic_clock::epoch() + chrono::nanoseconds(90250000))
2564 << file;
2565 }
2566 break;
2567 case 1:
2568 if (reliable) {
2569 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2570 monotonic_clock::max_time);
2571 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2572 monotonic_clock::max_time);
2573 EXPECT_EQ(
2574 oldest_logger_remote_unreliable_monotonic_timestamps,
2575 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2576 EXPECT_EQ(
2577 oldest_logger_local_unreliable_monotonic_timestamps,
2578 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2579 } else {
2580 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2581 expected_oldest_remote_monotonic_timestamps);
2582 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2583 expected_oldest_local_monotonic_timestamps);
2584 EXPECT_EQ(
2585 oldest_logger_remote_unreliable_monotonic_timestamps,
2586 monotonic_clock::epoch() + chrono::nanoseconds(1323150000));
2587 EXPECT_EQ(
2588 oldest_logger_local_unreliable_monotonic_timestamps,
2589 monotonic_clock::epoch() + chrono::nanoseconds(10100250000));
2590 }
2591 break;
2592 default:
2593 FAIL();
2594 break;
2595 }
2596
2597 switch (log_header->message().parts_index()) {
2598 case 0:
2599 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2600 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2601 break;
2602 case 1:
2603 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2604 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2605 break;
2606 default:
2607 FAIL();
2608 break;
2609 }
2610 }
2611
2612 continue;
2613 }
2614 EXPECT_EQ(
2615 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2616 monotonic_clock::max_time.time_since_epoch().count());
2617 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2618 monotonic_clock::max_time.time_since_epoch().count());
2619 EXPECT_EQ(log_header->message()
2620 .oldest_remote_unreliable_monotonic_timestamps()
2621 ->Get(0),
2622 monotonic_clock::max_time.time_since_epoch().count());
2623 EXPECT_EQ(log_header->message()
2624 .oldest_local_unreliable_monotonic_timestamps()
2625 ->Get(0),
2626 monotonic_clock::max_time.time_since_epoch().count());
2627
2628 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2629 monotonic_clock::time_point(chrono::nanoseconds(
2630 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2631 1)));
2632 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2633 monotonic_clock::time_point(chrono::nanoseconds(
2634 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2635 const monotonic_clock::time_point
2636 oldest_remote_unreliable_monotonic_timestamps =
2637 monotonic_clock::time_point(chrono::nanoseconds(
2638 log_header->message()
2639 .oldest_remote_unreliable_monotonic_timestamps()
2640 ->Get(1)));
2641 const monotonic_clock::time_point
2642 oldest_local_unreliable_monotonic_timestamps =
2643 monotonic_clock::time_point(chrono::nanoseconds(
2644 log_header->message()
2645 .oldest_local_unreliable_monotonic_timestamps()
2646 ->Get(1)));
2647 switch (log_header->message().parts_index()) {
2648 case 0:
2649 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2650 monotonic_clock::max_time);
2651 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2652 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2653 monotonic_clock::max_time);
2654 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2655 monotonic_clock::max_time);
2656 break;
2657 default:
2658 FAIL();
2659 break;
2660 }
2661 }
2662
2663 if (shared()) {
2664 EXPECT_EQ(timestamp_file_count, 4u);
2665 } else {
2666 EXPECT_EQ(timestamp_file_count, 4u);
2667 }
2668
2669 // Confirm that we can actually sort the resulting log and read it.
2670 {
2671 LogReader reader(SortParts(filenames));
2672
2673 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2674 log_reader_factory.set_send_delay(chrono::microseconds(0));
2675
2676 // This sends out the fetched messages and advances time to the start of
2677 // the log file.
2678 reader.Register(&log_reader_factory);
2679
2680 log_reader_factory.Run();
2681
2682 reader.Deregister();
2683 }
2684}
2685
2686// Tests that we properly handle one direction of message_bridge being
2687// unavailable.
2688TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
2689 pi1_->Disconnect(pi2_->node());
2690 time_converter_.AddMonotonic(
2691 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2692
2693 time_converter_.AddMonotonic(
2694 {chrono::milliseconds(10000),
2695 chrono::milliseconds(10000) - chrono::milliseconds(1)});
2696 {
2697 LoggerState pi1_logger = MakeLogger(pi1_);
2698
2699 event_loop_factory_.RunFor(chrono::milliseconds(95));
2700
2701 StartLogger(&pi1_logger);
2702
2703 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2704 }
2705
2706 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2707 // to confirm the right thing happened.
2708 ConfirmReadable(pi1_single_direction_logfiles_);
2709}
2710
2711// Tests that we properly handle one direction of message_bridge being
2712// unavailable.
2713TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
2714 pi1_->Disconnect(pi2_->node());
2715 time_converter_.AddMonotonic(
2716 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(500)});
2717
2718 time_converter_.AddMonotonic(
2719 {chrono::milliseconds(10000),
2720 chrono::milliseconds(10000) + chrono::milliseconds(1)});
2721 {
2722 LoggerState pi1_logger = MakeLogger(pi1_);
2723
2724 event_loop_factory_.RunFor(chrono::milliseconds(95));
2725
2726 StartLogger(&pi1_logger);
2727
2728 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2729 }
2730
2731 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2732 // to confirm the right thing happened.
2733 ConfirmReadable(pi1_single_direction_logfiles_);
2734}
2735
2736// Tests that we explode if someone passes in a part file twice with a better
2737// error than an out of order error.
2738TEST_P(MultinodeLoggerTest, DuplicateLogFiles) {
2739 time_converter_.AddMonotonic(
2740 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2741 {
2742 LoggerState pi1_logger = MakeLogger(pi1_);
2743
2744 event_loop_factory_.RunFor(chrono::milliseconds(95));
2745
2746 StartLogger(&pi1_logger);
2747
2748 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2749 }
2750
2751 std::vector<std::string> duplicates;
2752 for (const std::string &f : pi1_single_direction_logfiles_) {
2753 duplicates.emplace_back(f);
2754 duplicates.emplace_back(f);
2755 }
2756 EXPECT_DEATH({ SortParts(duplicates); }, "Found duplicate parts in");
2757}
2758
2759// Tests that we explode if someone loses a part out of the middle of a log.
2760TEST_P(MultinodeLoggerTest, MissingPartsFromMiddle) {
2761 time_converter_.AddMonotonic(
2762 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2763 {
2764 LoggerState pi1_logger = MakeLogger(pi1_);
2765
2766 event_loop_factory_.RunFor(chrono::milliseconds(95));
2767
2768 StartLogger(&pi1_logger);
2769 aos::monotonic_clock::time_point last_rotation_time =
2770 pi1_logger.event_loop->monotonic_now();
2771 pi1_logger.logger->set_on_logged_period([&] {
2772 const auto now = pi1_logger.event_loop->monotonic_now();
2773 if (now > last_rotation_time + std::chrono::seconds(5)) {
2774 pi1_logger.logger->Rotate();
2775 last_rotation_time = now;
2776 }
2777 });
2778
2779 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2780 }
2781
2782 std::vector<std::string> missing_parts;
2783
2784 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
2785 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part2" + Extension());
2786 missing_parts.emplace_back(absl::StrCat(
2787 logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
2788
2789 EXPECT_DEATH({ SortParts(missing_parts); },
2790 "Broken log, missing part files between");
2791}
2792
2793// Tests that we properly handle a dead node. Do this by just disconnecting it
2794// and only using one nodes of logs.
2795TEST_P(MultinodeLoggerTest, DeadNode) {
2796 pi1_->Disconnect(pi2_->node());
2797 pi2_->Disconnect(pi1_->node());
2798 time_converter_.AddMonotonic(
2799 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2800 {
2801 LoggerState pi1_logger = MakeLogger(pi1_);
2802
2803 event_loop_factory_.RunFor(chrono::milliseconds(95));
2804
2805 StartLogger(&pi1_logger);
2806
2807 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2808 }
2809
2810 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2811 // to confirm the right thing happened.
2812 ConfirmReadable(MakePi1DeadNodeLogfiles());
2813}
2814
2815// Tests that we can relog with a different config. This makes most sense when
2816// you are trying to edit a log and want to use channel renaming + the original
2817// config in the new log.
2818TEST_P(MultinodeLoggerTest, LogDifferentConfig) {
2819 time_converter_.StartEqual();
2820 {
2821 LoggerState pi1_logger = MakeLogger(pi1_);
2822 LoggerState pi2_logger = MakeLogger(pi2_);
2823
2824 event_loop_factory_.RunFor(chrono::milliseconds(95));
2825
2826 StartLogger(&pi1_logger);
2827 StartLogger(&pi2_logger);
2828
2829 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2830 }
2831
2832 LogReader reader(SortParts(logfiles_));
2833 reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
2834
2835 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2836 log_reader_factory.set_send_delay(chrono::microseconds(0));
2837
2838 // This sends out the fetched messages and advances time to the start of the
2839 // log file.
2840 reader.Register(&log_reader_factory);
2841
2842 const Node *pi1 =
2843 configuration::GetNode(log_reader_factory.configuration(), "pi1");
2844 const Node *pi2 =
2845 configuration::GetNode(log_reader_factory.configuration(), "pi2");
2846
2847 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
2848 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
2849 LOG(INFO) << "now pi1 "
2850 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
2851 LOG(INFO) << "now pi2 "
2852 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
2853
2854 EXPECT_THAT(reader.LoggedNodes(),
2855 ::testing::ElementsAre(
2856 configuration::GetNode(reader.logged_configuration(), pi1),
2857 configuration::GetNode(reader.logged_configuration(), pi2)));
2858
2859 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
2860
2861 // And confirm we can re-create a log again, while checking the contents.
2862 std::vector<std::string> log_files;
2863 {
2864 LoggerState pi1_logger =
2865 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
2866 &log_reader_factory, reader.logged_configuration());
2867 LoggerState pi2_logger =
2868 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
2869 &log_reader_factory, reader.logged_configuration());
2870
2871 pi1_logger.StartLogger(tmp_dir_ + "/relogged1");
2872 pi2_logger.StartLogger(tmp_dir_ + "/relogged2");
2873
2874 log_reader_factory.Run();
2875
2876 for (auto &x : pi1_logger.log_namer->all_filenames()) {
2877 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged1_", x));
2878 }
2879 for (auto &x : pi2_logger.log_namer->all_filenames()) {
2880 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged2_", x));
2881 }
2882 }
2883
2884 reader.Deregister();
2885
2886 // And verify that we can run the LogReader over the relogged files without
2887 // hitting any fatal errors.
2888 {
2889 LogReader relogged_reader(SortParts(log_files));
2890 relogged_reader.Register();
2891
2892 relogged_reader.event_loop_factory()->Run();
2893 }
2894}
2895
2896// Tests that we properly replay a log where the start time for a node is before
2897// any data on the node. This can happen if the logger starts before data is
2898// published. While the scenario below is a bit convoluted, we have seen logs
2899// like this generated out in the wild.
2900TEST(MultinodeRebootLoggerTest, StartTimeBeforeData) {
2901 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2902 aos::configuration::ReadConfig(ArtifactPath(
2903 "aos/events/logging/multinode_pingpong_split3_config.json"));
2904 message_bridge::TestingTimeConverter time_converter(
2905 configuration::NodesCount(&config.message()));
2906 SimulatedEventLoopFactory event_loop_factory(&config.message());
2907 event_loop_factory.SetTimeConverter(&time_converter);
2908 NodeEventLoopFactory *const pi1 =
2909 event_loop_factory.GetNodeEventLoopFactory("pi1");
2910 const size_t pi1_index = configuration::GetNodeIndex(
2911 event_loop_factory.configuration(), pi1->node());
2912 NodeEventLoopFactory *const pi2 =
2913 event_loop_factory.GetNodeEventLoopFactory("pi2");
2914 const size_t pi2_index = configuration::GetNodeIndex(
2915 event_loop_factory.configuration(), pi2->node());
2916 NodeEventLoopFactory *const pi3 =
2917 event_loop_factory.GetNodeEventLoopFactory("pi3");
2918 const size_t pi3_index = configuration::GetNodeIndex(
2919 event_loop_factory.configuration(), pi3->node());
2920
2921 const std::string kLogfile1_1 =
2922 aos::testing::TestTmpDir() + "/multi_logfile1/";
2923 const std::string kLogfile2_1 =
2924 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
2925 const std::string kLogfile2_2 =
2926 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
2927 const std::string kLogfile3_1 =
2928 aos::testing::TestTmpDir() + "/multi_logfile3/";
2929 util::UnlinkRecursive(kLogfile1_1);
2930 util::UnlinkRecursive(kLogfile2_1);
2931 util::UnlinkRecursive(kLogfile2_2);
2932 util::UnlinkRecursive(kLogfile3_1);
2933 const UUID pi1_boot0 = UUID::Random();
2934 const UUID pi2_boot0 = UUID::Random();
2935 const UUID pi2_boot1 = UUID::Random();
2936 const UUID pi3_boot0 = UUID::Random();
2937 {
2938 CHECK_EQ(pi1_index, 0u);
2939 CHECK_EQ(pi2_index, 1u);
2940 CHECK_EQ(pi3_index, 2u);
2941
2942 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
2943 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
2944 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
2945 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
2946
2947 time_converter.AddNextTimestamp(
2948 distributed_clock::epoch(),
2949 {BootTimestamp::epoch(), BootTimestamp::epoch(),
2950 BootTimestamp::epoch()});
2951 const chrono::nanoseconds reboot_time = chrono::milliseconds(20000);
2952 time_converter.AddNextTimestamp(
2953 distributed_clock::epoch() + reboot_time,
2954 {BootTimestamp::epoch() + reboot_time,
2955 BootTimestamp{
2956 .boot = 1,
2957 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)},
2958 BootTimestamp::epoch() + reboot_time});
2959 }
2960
2961 // Make everything perfectly quiet.
2962 event_loop_factory.SkipTimingReport();
2963 event_loop_factory.DisableStatistics();
2964
2965 std::vector<std::string> filenames;
2966 {
2967 LoggerState pi1_logger = MakeLoggerState(
2968 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2969 LoggerState pi3_logger = MakeLoggerState(
2970 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2971 {
2972 // And now start the logger.
2973 LoggerState pi2_logger = MakeLoggerState(
2974 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2975
2976 event_loop_factory.RunFor(chrono::milliseconds(1000));
2977
2978 pi1_logger.StartLogger(kLogfile1_1);
2979 pi3_logger.StartLogger(kLogfile3_1);
2980 pi2_logger.StartLogger(kLogfile2_1);
2981
2982 event_loop_factory.RunFor(chrono::milliseconds(10000));
2983
2984 // Now that we've got a start time in the past, turn on data.
2985 event_loop_factory.EnableStatistics();
2986 std::unique_ptr<aos::EventLoop> ping_event_loop =
2987 pi1->MakeEventLoop("ping");
2988 Ping ping(ping_event_loop.get());
2989
2990 pi2->AlwaysStart<Pong>("pong");
2991
2992 event_loop_factory.RunFor(chrono::milliseconds(3000));
2993
2994 pi2_logger.AppendAllFilenames(&filenames);
2995
2996 // Stop logging on pi2 before rebooting and completely shut off all
2997 // messages on pi2.
2998 pi2->DisableStatistics();
2999 pi1->Disconnect(pi2->node());
3000 pi2->Disconnect(pi1->node());
3001 }
3002 event_loop_factory.RunFor(chrono::milliseconds(7000));
3003 // pi2 now reboots.
3004 {
3005 event_loop_factory.RunFor(chrono::milliseconds(1000));
3006
3007 // Start logging again on pi2 after it is up.
3008 LoggerState pi2_logger = MakeLoggerState(
3009 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3010 pi2_logger.StartLogger(kLogfile2_2);
3011
3012 event_loop_factory.RunFor(chrono::milliseconds(10000));
3013 // And, now that we have a start time in the log, turn data back on.
3014 pi2->EnableStatistics();
3015 pi1->Connect(pi2->node());
3016 pi2->Connect(pi1->node());
3017
3018 pi2->AlwaysStart<Pong>("pong");
3019 std::unique_ptr<aos::EventLoop> ping_event_loop =
3020 pi1->MakeEventLoop("ping");
3021 Ping ping(ping_event_loop.get());
3022
3023 event_loop_factory.RunFor(chrono::milliseconds(3000));
3024
3025 pi2_logger.AppendAllFilenames(&filenames);
3026 }
3027
3028 pi1_logger.AppendAllFilenames(&filenames);
3029 pi3_logger.AppendAllFilenames(&filenames);
3030 }
3031
3032 // Confirm that we can parse the result. LogReader has enough internal CHECKs
3033 // to confirm the right thing happened.
3034 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3035 auto result = ConfirmReadable(filenames);
3036 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
3037 chrono::seconds(1)));
3038 EXPECT_THAT(result[0].second,
3039 ::testing::ElementsAre(realtime_clock::epoch() +
3040 chrono::microseconds(34990350)));
3041
3042 EXPECT_THAT(result[1].first,
3043 ::testing::ElementsAre(
3044 realtime_clock::epoch() + chrono::seconds(1),
3045 realtime_clock::epoch() + chrono::microseconds(3323000)));
3046 EXPECT_THAT(result[1].second,
3047 ::testing::ElementsAre(
3048 realtime_clock::epoch() + chrono::microseconds(13990200),
3049 realtime_clock::epoch() + chrono::microseconds(16313200)));
3050
3051 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
3052 chrono::seconds(1)));
3053 EXPECT_THAT(result[2].second,
3054 ::testing::ElementsAre(realtime_clock::epoch() +
3055 chrono::microseconds(34900150)));
3056}
3057
3058// Tests that local data before remote data after reboot is properly replayed.
3059// We only trigger a reboot in the timestamp interpolation function when solving
3060// the timestamp problem when we actually have a point in the function. This
3061// originally only happened when a point passes the noncausal filter. At the
3062// start of time for the second boot, if we aren't careful, we will have
3063// messages which need to be published at times before the boot. This happens
3064// when a local message is in the log before a forwarded message, so there is no
3065// point in the interpolation function. This delays the reboot. So, we need to
3066// recreate that situation and make sure it doesn't come back.
3067TEST(MultinodeRebootLoggerTest,
3068 LocalMessageBeforeRemoteBeforeStartAfterReboot) {
3069 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3070 aos::configuration::ReadConfig(ArtifactPath(
3071 "aos/events/logging/multinode_pingpong_split3_config.json"));
3072 message_bridge::TestingTimeConverter time_converter(
3073 configuration::NodesCount(&config.message()));
3074 SimulatedEventLoopFactory event_loop_factory(&config.message());
3075 event_loop_factory.SetTimeConverter(&time_converter);
3076 NodeEventLoopFactory *const pi1 =
3077 event_loop_factory.GetNodeEventLoopFactory("pi1");
3078 const size_t pi1_index = configuration::GetNodeIndex(
3079 event_loop_factory.configuration(), pi1->node());
3080 NodeEventLoopFactory *const pi2 =
3081 event_loop_factory.GetNodeEventLoopFactory("pi2");
3082 const size_t pi2_index = configuration::GetNodeIndex(
3083 event_loop_factory.configuration(), pi2->node());
3084 NodeEventLoopFactory *const pi3 =
3085 event_loop_factory.GetNodeEventLoopFactory("pi3");
3086 const size_t pi3_index = configuration::GetNodeIndex(
3087 event_loop_factory.configuration(), pi3->node());
3088
3089 const std::string kLogfile1_1 =
3090 aos::testing::TestTmpDir() + "/multi_logfile1/";
3091 const std::string kLogfile2_1 =
3092 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3093 const std::string kLogfile2_2 =
3094 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3095 const std::string kLogfile3_1 =
3096 aos::testing::TestTmpDir() + "/multi_logfile3/";
3097 util::UnlinkRecursive(kLogfile1_1);
3098 util::UnlinkRecursive(kLogfile2_1);
3099 util::UnlinkRecursive(kLogfile2_2);
3100 util::UnlinkRecursive(kLogfile3_1);
3101 const UUID pi1_boot0 = UUID::Random();
3102 const UUID pi2_boot0 = UUID::Random();
3103 const UUID pi2_boot1 = UUID::Random();
3104 const UUID pi3_boot0 = UUID::Random();
3105 {
3106 CHECK_EQ(pi1_index, 0u);
3107 CHECK_EQ(pi2_index, 1u);
3108 CHECK_EQ(pi3_index, 2u);
3109
3110 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3111 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3112 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3113 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3114
3115 time_converter.AddNextTimestamp(
3116 distributed_clock::epoch(),
3117 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3118 BootTimestamp::epoch()});
3119 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3120 time_converter.AddNextTimestamp(
3121 distributed_clock::epoch() + reboot_time,
3122 {BootTimestamp::epoch() + reboot_time,
3123 BootTimestamp{.boot = 1,
3124 .time = monotonic_clock::epoch() + reboot_time +
3125 chrono::seconds(100)},
3126 BootTimestamp::epoch() + reboot_time});
3127 }
3128
3129 std::vector<std::string> filenames;
3130 {
3131 LoggerState pi1_logger = MakeLoggerState(
3132 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3133 LoggerState pi3_logger = MakeLoggerState(
3134 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3135 {
3136 // And now start the logger.
3137 LoggerState pi2_logger = MakeLoggerState(
3138 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3139
3140 pi1_logger.StartLogger(kLogfile1_1);
3141 pi3_logger.StartLogger(kLogfile3_1);
3142 pi2_logger.StartLogger(kLogfile2_1);
3143
3144 event_loop_factory.RunFor(chrono::milliseconds(1005));
3145
3146 // Now that we've got a start time in the past, turn on data.
3147 std::unique_ptr<aos::EventLoop> ping_event_loop =
3148 pi1->MakeEventLoop("ping");
3149 Ping ping(ping_event_loop.get());
3150
3151 pi2->AlwaysStart<Pong>("pong");
3152
3153 event_loop_factory.RunFor(chrono::milliseconds(3000));
3154
3155 pi2_logger.AppendAllFilenames(&filenames);
3156
3157 // Disable any remote messages on pi2.
3158 pi1->Disconnect(pi2->node());
3159 pi2->Disconnect(pi1->node());
3160 }
3161 event_loop_factory.RunFor(chrono::milliseconds(995));
3162 // pi2 now reboots at 5 seconds.
3163 {
3164 event_loop_factory.RunFor(chrono::milliseconds(1000));
3165
3166 // Make local stuff happen before we start logging and connect the remote.
3167 pi2->AlwaysStart<Pong>("pong");
3168 std::unique_ptr<aos::EventLoop> ping_event_loop =
3169 pi1->MakeEventLoop("ping");
3170 Ping ping(ping_event_loop.get());
3171 event_loop_factory.RunFor(chrono::milliseconds(1005));
3172
3173 // Start logging again on pi2 after it is up.
3174 LoggerState pi2_logger = MakeLoggerState(
3175 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3176 pi2_logger.StartLogger(kLogfile2_2);
3177
3178 // And allow remote messages now that we have some local ones.
3179 pi1->Connect(pi2->node());
3180 pi2->Connect(pi1->node());
3181
3182 event_loop_factory.RunFor(chrono::milliseconds(1000));
3183
3184 event_loop_factory.RunFor(chrono::milliseconds(3000));
3185
3186 pi2_logger.AppendAllFilenames(&filenames);
3187 }
3188
3189 pi1_logger.AppendAllFilenames(&filenames);
3190 pi3_logger.AppendAllFilenames(&filenames);
3191 }
3192
3193 // Confirm that we can parse the result. LogReader has enough internal CHECKs
3194 // to confirm the right thing happened.
3195 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3196 auto result = ConfirmReadable(filenames);
3197
3198 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3199 EXPECT_THAT(result[0].second,
3200 ::testing::ElementsAre(realtime_clock::epoch() +
3201 chrono::microseconds(11000350)));
3202
3203 EXPECT_THAT(result[1].first,
3204 ::testing::ElementsAre(
3205 realtime_clock::epoch(),
3206 realtime_clock::epoch() + chrono::microseconds(107005000)));
3207 EXPECT_THAT(result[1].second,
3208 ::testing::ElementsAre(
3209 realtime_clock::epoch() + chrono::microseconds(4000150),
3210 realtime_clock::epoch() + chrono::microseconds(111000200)));
3211
3212 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3213 EXPECT_THAT(result[2].second,
3214 ::testing::ElementsAre(realtime_clock::epoch() +
3215 chrono::microseconds(11000150)));
3216
3217 auto start_stop_result = ConfirmReadable(
3218 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3219 realtime_clock::epoch() + chrono::milliseconds(3000));
3220
3221 EXPECT_THAT(
3222 start_stop_result[0].first,
3223 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3224 EXPECT_THAT(
3225 start_stop_result[0].second,
3226 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3227 EXPECT_THAT(
3228 start_stop_result[1].first,
3229 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3230 EXPECT_THAT(
3231 start_stop_result[1].second,
3232 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3233 EXPECT_THAT(
3234 start_stop_result[2].first,
3235 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3236 EXPECT_THAT(
3237 start_stop_result[2].second,
3238 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3239}
3240
3241// Tests that setting the start and stop flags across a reboot works as
3242// expected.
3243TEST(MultinodeRebootLoggerTest, RebootStartStopTimes) {
3244 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3245 aos::configuration::ReadConfig(ArtifactPath(
3246 "aos/events/logging/multinode_pingpong_split3_config.json"));
3247 message_bridge::TestingTimeConverter time_converter(
3248 configuration::NodesCount(&config.message()));
3249 SimulatedEventLoopFactory event_loop_factory(&config.message());
3250 event_loop_factory.SetTimeConverter(&time_converter);
3251 NodeEventLoopFactory *const pi1 =
3252 event_loop_factory.GetNodeEventLoopFactory("pi1");
3253 const size_t pi1_index = configuration::GetNodeIndex(
3254 event_loop_factory.configuration(), pi1->node());
3255 NodeEventLoopFactory *const pi2 =
3256 event_loop_factory.GetNodeEventLoopFactory("pi2");
3257 const size_t pi2_index = configuration::GetNodeIndex(
3258 event_loop_factory.configuration(), pi2->node());
3259 NodeEventLoopFactory *const pi3 =
3260 event_loop_factory.GetNodeEventLoopFactory("pi3");
3261 const size_t pi3_index = configuration::GetNodeIndex(
3262 event_loop_factory.configuration(), pi3->node());
3263
3264 const std::string kLogfile1_1 =
3265 aos::testing::TestTmpDir() + "/multi_logfile1/";
3266 const std::string kLogfile2_1 =
3267 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3268 const std::string kLogfile2_2 =
3269 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3270 const std::string kLogfile3_1 =
3271 aos::testing::TestTmpDir() + "/multi_logfile3/";
3272 util::UnlinkRecursive(kLogfile1_1);
3273 util::UnlinkRecursive(kLogfile2_1);
3274 util::UnlinkRecursive(kLogfile2_2);
3275 util::UnlinkRecursive(kLogfile3_1);
3276 {
3277 CHECK_EQ(pi1_index, 0u);
3278 CHECK_EQ(pi2_index, 1u);
3279 CHECK_EQ(pi3_index, 2u);
3280
3281 time_converter.AddNextTimestamp(
3282 distributed_clock::epoch(),
3283 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3284 BootTimestamp::epoch()});
3285 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3286 time_converter.AddNextTimestamp(
3287 distributed_clock::epoch() + reboot_time,
3288 {BootTimestamp::epoch() + reboot_time,
3289 BootTimestamp{.boot = 1,
3290 .time = monotonic_clock::epoch() + reboot_time},
3291 BootTimestamp::epoch() + reboot_time});
3292 }
3293
3294 std::vector<std::string> filenames;
3295 {
3296 LoggerState pi1_logger = MakeLoggerState(
3297 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3298 LoggerState pi3_logger = MakeLoggerState(
3299 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3300 {
3301 // And now start the logger.
3302 LoggerState pi2_logger = MakeLoggerState(
3303 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3304
3305 pi1_logger.StartLogger(kLogfile1_1);
3306 pi3_logger.StartLogger(kLogfile3_1);
3307 pi2_logger.StartLogger(kLogfile2_1);
3308
3309 event_loop_factory.RunFor(chrono::milliseconds(1005));
3310
3311 // Now that we've got a start time in the past, turn on data.
3312 std::unique_ptr<aos::EventLoop> ping_event_loop =
3313 pi1->MakeEventLoop("ping");
3314 Ping ping(ping_event_loop.get());
3315
3316 pi2->AlwaysStart<Pong>("pong");
3317
3318 event_loop_factory.RunFor(chrono::milliseconds(3000));
3319
3320 pi2_logger.AppendAllFilenames(&filenames);
3321 }
3322 event_loop_factory.RunFor(chrono::milliseconds(995));
3323 // pi2 now reboots at 5 seconds.
3324 {
3325 event_loop_factory.RunFor(chrono::milliseconds(1000));
3326
3327 // Make local stuff happen before we start logging and connect the remote.
3328 pi2->AlwaysStart<Pong>("pong");
3329 std::unique_ptr<aos::EventLoop> ping_event_loop =
3330 pi1->MakeEventLoop("ping");
3331 Ping ping(ping_event_loop.get());
3332 event_loop_factory.RunFor(chrono::milliseconds(5));
3333
3334 // Start logging again on pi2 after it is up.
3335 LoggerState pi2_logger = MakeLoggerState(
3336 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3337 pi2_logger.StartLogger(kLogfile2_2);
3338
3339 event_loop_factory.RunFor(chrono::milliseconds(5000));
3340
3341 pi2_logger.AppendAllFilenames(&filenames);
3342 }
3343
3344 pi1_logger.AppendAllFilenames(&filenames);
3345 pi3_logger.AppendAllFilenames(&filenames);
3346 }
3347
3348 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3349 auto result = ConfirmReadable(filenames);
3350
3351 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3352 EXPECT_THAT(result[0].second,
3353 ::testing::ElementsAre(realtime_clock::epoch() +
3354 chrono::microseconds(11000350)));
3355
3356 EXPECT_THAT(result[1].first,
3357 ::testing::ElementsAre(
3358 realtime_clock::epoch(),
3359 realtime_clock::epoch() + chrono::microseconds(6005000)));
3360 EXPECT_THAT(result[1].second,
3361 ::testing::ElementsAre(
3362 realtime_clock::epoch() + chrono::microseconds(4900150),
3363 realtime_clock::epoch() + chrono::microseconds(11000200)));
3364
3365 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3366 EXPECT_THAT(result[2].second,
3367 ::testing::ElementsAre(realtime_clock::epoch() +
3368 chrono::microseconds(11000150)));
3369
3370 // Confirm we observed the correct start and stop times. We should see the
3371 // reboot here.
3372 auto start_stop_result = ConfirmReadable(
3373 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3374 realtime_clock::epoch() + chrono::milliseconds(8000));
3375
3376 EXPECT_THAT(
3377 start_stop_result[0].first,
3378 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3379 EXPECT_THAT(
3380 start_stop_result[0].second,
3381 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3382 EXPECT_THAT(start_stop_result[1].first,
3383 ::testing::ElementsAre(
3384 realtime_clock::epoch() + chrono::seconds(2),
3385 realtime_clock::epoch() + chrono::microseconds(6005000)));
3386 EXPECT_THAT(start_stop_result[1].second,
3387 ::testing::ElementsAre(
3388 realtime_clock::epoch() + chrono::microseconds(4900150),
3389 realtime_clock::epoch() + chrono::seconds(8)));
3390 EXPECT_THAT(
3391 start_stop_result[2].first,
3392 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3393 EXPECT_THAT(
3394 start_stop_result[2].second,
3395 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3396}
3397
3398// Tests that we properly handle one direction being down.
3399TEST(MissingDirectionTest, OneDirection) {
3400 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3401 aos::configuration::ReadConfig(ArtifactPath(
3402 "aos/events/logging/multinode_pingpong_split4_config.json"));
3403 message_bridge::TestingTimeConverter time_converter(
3404 configuration::NodesCount(&config.message()));
3405 SimulatedEventLoopFactory event_loop_factory(&config.message());
3406 event_loop_factory.SetTimeConverter(&time_converter);
3407
3408 NodeEventLoopFactory *const pi1 =
3409 event_loop_factory.GetNodeEventLoopFactory("pi1");
3410 const size_t pi1_index = configuration::GetNodeIndex(
3411 event_loop_factory.configuration(), pi1->node());
3412 NodeEventLoopFactory *const pi2 =
3413 event_loop_factory.GetNodeEventLoopFactory("pi2");
3414 const size_t pi2_index = configuration::GetNodeIndex(
3415 event_loop_factory.configuration(), pi2->node());
3416 std::vector<std::string> filenames;
3417
3418 {
3419 CHECK_EQ(pi1_index, 0u);
3420 CHECK_EQ(pi2_index, 1u);
3421
3422 time_converter.AddNextTimestamp(
3423 distributed_clock::epoch(),
3424 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3425
3426 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3427 time_converter.AddNextTimestamp(
3428 distributed_clock::epoch() + reboot_time,
3429 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3430 BootTimestamp::epoch() + reboot_time});
3431 }
3432
3433 const std::string kLogfile2_1 =
3434 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3435 const std::string kLogfile1_1 =
3436 aos::testing::TestTmpDir() + "/multi_logfile1.1/";
3437 util::UnlinkRecursive(kLogfile2_1);
3438 util::UnlinkRecursive(kLogfile1_1);
3439
3440 pi2->Disconnect(pi1->node());
3441
3442 pi1->AlwaysStart<Ping>("ping");
3443 pi2->AlwaysStart<Pong>("pong");
3444
3445 {
3446 LoggerState pi2_logger = MakeLoggerState(
3447 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3448
3449 event_loop_factory.RunFor(chrono::milliseconds(95));
3450
3451 pi2_logger.StartLogger(kLogfile2_1);
3452
3453 event_loop_factory.RunFor(chrono::milliseconds(6000));
3454
3455 pi2->Connect(pi1->node());
3456
3457 LoggerState pi1_logger = MakeLoggerState(
3458 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3459 pi1_logger.StartLogger(kLogfile1_1);
3460
3461 event_loop_factory.RunFor(chrono::milliseconds(5000));
3462 pi1_logger.AppendAllFilenames(&filenames);
3463 pi2_logger.AppendAllFilenames(&filenames);
3464 }
3465
3466 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3467 ConfirmReadable(filenames);
3468}
3469
3470// Tests that we properly handle only one direction ever existing after a
3471// reboot.
3472TEST(MissingDirectionTest, OneDirectionAfterReboot) {
3473 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3474 aos::configuration::ReadConfig(ArtifactPath(
3475 "aos/events/logging/multinode_pingpong_split4_config.json"));
3476 message_bridge::TestingTimeConverter time_converter(
3477 configuration::NodesCount(&config.message()));
3478 SimulatedEventLoopFactory event_loop_factory(&config.message());
3479 event_loop_factory.SetTimeConverter(&time_converter);
3480
3481 NodeEventLoopFactory *const pi1 =
3482 event_loop_factory.GetNodeEventLoopFactory("pi1");
3483 const size_t pi1_index = configuration::GetNodeIndex(
3484 event_loop_factory.configuration(), pi1->node());
3485 NodeEventLoopFactory *const pi2 =
3486 event_loop_factory.GetNodeEventLoopFactory("pi2");
3487 const size_t pi2_index = configuration::GetNodeIndex(
3488 event_loop_factory.configuration(), pi2->node());
3489 std::vector<std::string> filenames;
3490
3491 {
3492 CHECK_EQ(pi1_index, 0u);
3493 CHECK_EQ(pi2_index, 1u);
3494
3495 time_converter.AddNextTimestamp(
3496 distributed_clock::epoch(),
3497 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3498
3499 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3500 time_converter.AddNextTimestamp(
3501 distributed_clock::epoch() + reboot_time,
3502 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3503 BootTimestamp::epoch() + reboot_time});
3504 }
3505
3506 const std::string kLogfile2_1 =
3507 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3508 util::UnlinkRecursive(kLogfile2_1);
3509
3510 pi1->AlwaysStart<Ping>("ping");
3511
3512 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3513 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3514 // second boot.
3515 {
3516 LoggerState pi2_logger = MakeLoggerState(
3517 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3518
3519 event_loop_factory.RunFor(chrono::milliseconds(95));
3520
3521 pi2_logger.StartLogger(kLogfile2_1);
3522
3523 event_loop_factory.RunFor(chrono::milliseconds(4000));
3524
3525 pi2->Disconnect(pi1->node());
3526
3527 event_loop_factory.RunFor(chrono::milliseconds(1000));
3528 pi1->AlwaysStart<Ping>("ping");
3529
3530 event_loop_factory.RunFor(chrono::milliseconds(5000));
3531 pi2_logger.AppendAllFilenames(&filenames);
3532 }
3533
3534 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3535 ConfirmReadable(filenames);
3536}
3537
3538// Tests that we properly handle only one direction ever existing after a reboot
3539// with only reliable data.
3540TEST(MissingDirectionTest, OneDirectionAfterRebootReliable) {
3541 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3542 aos::configuration::ReadConfig(ArtifactPath(
3543 "aos/events/logging/multinode_pingpong_split4_reliable_config.json"));
3544 message_bridge::TestingTimeConverter time_converter(
3545 configuration::NodesCount(&config.message()));
3546 SimulatedEventLoopFactory event_loop_factory(&config.message());
3547 event_loop_factory.SetTimeConverter(&time_converter);
3548
3549 NodeEventLoopFactory *const pi1 =
3550 event_loop_factory.GetNodeEventLoopFactory("pi1");
3551 const size_t pi1_index = configuration::GetNodeIndex(
3552 event_loop_factory.configuration(), pi1->node());
3553 NodeEventLoopFactory *const pi2 =
3554 event_loop_factory.GetNodeEventLoopFactory("pi2");
3555 const size_t pi2_index = configuration::GetNodeIndex(
3556 event_loop_factory.configuration(), pi2->node());
3557 std::vector<std::string> filenames;
3558
3559 {
3560 CHECK_EQ(pi1_index, 0u);
3561 CHECK_EQ(pi2_index, 1u);
3562
3563 time_converter.AddNextTimestamp(
3564 distributed_clock::epoch(),
3565 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3566
3567 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3568 time_converter.AddNextTimestamp(
3569 distributed_clock::epoch() + reboot_time,
3570 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3571 BootTimestamp::epoch() + reboot_time});
3572 }
3573
3574 const std::string kLogfile2_1 =
3575 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3576 util::UnlinkRecursive(kLogfile2_1);
3577
3578 pi1->AlwaysStart<Ping>("ping");
3579
3580 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3581 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3582 // second boot.
3583 {
3584 LoggerState pi2_logger = MakeLoggerState(
3585 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3586
3587 event_loop_factory.RunFor(chrono::milliseconds(95));
3588
3589 pi2_logger.StartLogger(kLogfile2_1);
3590
3591 event_loop_factory.RunFor(chrono::milliseconds(4000));
3592
3593 pi2->Disconnect(pi1->node());
3594
3595 event_loop_factory.RunFor(chrono::milliseconds(1000));
3596 pi1->AlwaysStart<Ping>("ping");
3597
3598 event_loop_factory.RunFor(chrono::milliseconds(5000));
3599 pi2_logger.AppendAllFilenames(&filenames);
3600 }
3601
3602 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3603 ConfirmReadable(filenames);
3604}
3605
Brian Smartte67d7112023-03-20 12:06:30 -07003606// Tests that we properly handle only one direction ever existing after a reboot
3607// with mixed unreliable vs reliable, where reliable has an earlier timestamp
3608// than unreliable.
3609TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase1) {
3610 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3611 aos::configuration::ReadConfig(ArtifactPath(
3612 "aos/events/logging/multinode_pingpong_split4_mixed1_config.json"));
3613 message_bridge::TestingTimeConverter time_converter(
3614 configuration::NodesCount(&config.message()));
3615 SimulatedEventLoopFactory event_loop_factory(&config.message());
3616 event_loop_factory.SetTimeConverter(&time_converter);
3617
3618 NodeEventLoopFactory *const pi1 =
3619 event_loop_factory.GetNodeEventLoopFactory("pi1");
3620 const size_t pi1_index = configuration::GetNodeIndex(
3621 event_loop_factory.configuration(), pi1->node());
3622 NodeEventLoopFactory *const pi2 =
3623 event_loop_factory.GetNodeEventLoopFactory("pi2");
3624 const size_t pi2_index = configuration::GetNodeIndex(
3625 event_loop_factory.configuration(), pi2->node());
3626 std::vector<std::string> filenames;
3627
3628 {
3629 CHECK_EQ(pi1_index, 0u);
3630 CHECK_EQ(pi2_index, 1u);
3631
3632 time_converter.AddNextTimestamp(
3633 distributed_clock::epoch(),
3634 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3635
3636 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3637 time_converter.AddNextTimestamp(
3638 distributed_clock::epoch() + reboot_time,
3639 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3640 BootTimestamp::epoch() + reboot_time});
3641 }
3642
3643 const std::string kLogfile2_1 =
3644 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3645 util::UnlinkRecursive(kLogfile2_1);
3646
3647 // The following sequence using the above reference config creates
3648 // a reliable message timestamp < unreliable message timestamp.
3649 {
3650 pi1->DisableStatistics();
3651 pi2->DisableStatistics();
3652
3653 event_loop_factory.RunFor(chrono::milliseconds(95));
3654
3655 pi1->AlwaysStart<Ping>("ping");
3656
3657 event_loop_factory.RunFor(chrono::milliseconds(5250));
3658
3659 pi1->EnableStatistics();
3660
3661 event_loop_factory.RunFor(chrono::milliseconds(1000));
3662
3663 LoggerState pi2_logger = MakeLoggerState(
3664 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3665
3666 pi2_logger.StartLogger(kLogfile2_1);
3667
3668 event_loop_factory.RunFor(chrono::milliseconds(5000));
3669 pi2_logger.AppendAllFilenames(&filenames);
3670 }
3671
3672 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3673 ConfirmReadable(filenames);
3674}
3675
3676// Tests that we properly handle only one direction ever existing after a reboot
3677// with mixed unreliable vs reliable, where unreliable has an earlier timestamp
3678// than reliable.
3679TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase2) {
3680 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3681 aos::configuration::ReadConfig(ArtifactPath(
3682 "aos/events/logging/multinode_pingpong_split4_mixed2_config.json"));
3683 message_bridge::TestingTimeConverter time_converter(
3684 configuration::NodesCount(&config.message()));
3685 SimulatedEventLoopFactory event_loop_factory(&config.message());
3686 event_loop_factory.SetTimeConverter(&time_converter);
3687
3688 NodeEventLoopFactory *const pi1 =
3689 event_loop_factory.GetNodeEventLoopFactory("pi1");
3690 const size_t pi1_index = configuration::GetNodeIndex(
3691 event_loop_factory.configuration(), pi1->node());
3692 NodeEventLoopFactory *const pi2 =
3693 event_loop_factory.GetNodeEventLoopFactory("pi2");
3694 const size_t pi2_index = configuration::GetNodeIndex(
3695 event_loop_factory.configuration(), pi2->node());
3696 std::vector<std::string> filenames;
3697
3698 {
3699 CHECK_EQ(pi1_index, 0u);
3700 CHECK_EQ(pi2_index, 1u);
3701
3702 time_converter.AddNextTimestamp(
3703 distributed_clock::epoch(),
3704 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3705
3706 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3707 time_converter.AddNextTimestamp(
3708 distributed_clock::epoch() + reboot_time,
3709 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3710 BootTimestamp::epoch() + reboot_time});
3711 }
3712
3713 const std::string kLogfile2_1 =
3714 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3715 util::UnlinkRecursive(kLogfile2_1);
3716
3717 // The following sequence using the above reference config creates
3718 // an unreliable message timestamp < reliable message timestamp.
3719 {
3720 pi1->DisableStatistics();
3721 pi2->DisableStatistics();
3722
3723 event_loop_factory.RunFor(chrono::milliseconds(95));
3724
3725 pi1->AlwaysStart<Ping>("ping");
3726
3727 event_loop_factory.RunFor(chrono::milliseconds(5250));
3728
3729 pi1->EnableStatistics();
3730
3731 event_loop_factory.RunFor(chrono::milliseconds(1000));
3732
3733 LoggerState pi2_logger = MakeLoggerState(
3734 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3735
3736 pi2_logger.StartLogger(kLogfile2_1);
3737
3738 event_loop_factory.RunFor(chrono::milliseconds(5000));
3739 pi2_logger.AppendAllFilenames(&filenames);
3740 }
3741
3742 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3743 ConfirmReadable(filenames);
3744}
3745
Naman Guptaa63aa132023-03-22 20:06:34 -07003746// Tests that we properly handle what used to be a time violation in one
3747// direction. This can occur when one direction goes down after sending some
3748// data, but the other keeps working. The down direction ends up resolving to a
3749// straight line in the noncausal filter, where the direction which is still up
3750// can cross that line. Really, time progressed along just fine but we assumed
3751// that the offset was a line when it could have deviated by up to 1ms/second.
3752TEST_P(MultinodeLoggerTest, OneDirectionTimeDrift) {
3753 std::vector<std::string> filenames;
3754
3755 CHECK_EQ(pi1_index_, 0u);
3756 CHECK_EQ(pi2_index_, 1u);
3757
3758 time_converter_.AddNextTimestamp(
3759 distributed_clock::epoch(),
3760 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3761
3762 const chrono::nanoseconds before_disconnect_duration =
3763 time_converter_.AddMonotonic(
3764 {chrono::milliseconds(1000), chrono::milliseconds(1000)});
3765
3766 const chrono::nanoseconds test_duration =
3767 time_converter_.AddMonotonic(
3768 {chrono::milliseconds(1000), chrono::milliseconds(1000)}) +
3769 time_converter_.AddMonotonic(
3770 {chrono::milliseconds(10000),
3771 chrono::milliseconds(10000) - chrono::milliseconds(5)}) +
3772 time_converter_.AddMonotonic(
3773 {chrono::milliseconds(10000),
3774 chrono::milliseconds(10000) + chrono::milliseconds(5)});
3775
3776 const std::string kLogfile =
3777 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3778 util::UnlinkRecursive(kLogfile);
3779
3780 {
3781 LoggerState pi2_logger = MakeLogger(pi2_);
3782 pi2_logger.StartLogger(kLogfile);
3783 event_loop_factory_.RunFor(before_disconnect_duration);
3784
3785 pi2_->Disconnect(pi1_->node());
3786
3787 event_loop_factory_.RunFor(test_duration);
3788 pi2_->Connect(pi1_->node());
3789
3790 event_loop_factory_.RunFor(chrono::milliseconds(5000));
3791 pi2_logger.AppendAllFilenames(&filenames);
3792 }
3793
3794 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3795 ConfirmReadable(filenames);
3796}
3797
3798// Tests that we can replay a logfile that has timestamps such that at least one
3799// node's epoch is at a positive distributed_clock (and thus will have to be
3800// booted after the other node(s)).
3801TEST_P(MultinodeLoggerTest, StartOneNodeBeforeOther) {
3802 std::vector<std::string> filenames;
3803
3804 CHECK_EQ(pi1_index_, 0u);
3805 CHECK_EQ(pi2_index_, 1u);
3806
3807 time_converter_.AddNextTimestamp(
3808 distributed_clock::epoch(),
3809 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3810
3811 const chrono::nanoseconds before_reboot_duration = chrono::milliseconds(1000);
3812 time_converter_.RebootAt(
3813 0, distributed_clock::time_point(before_reboot_duration));
3814
3815 const chrono::nanoseconds test_duration = time_converter_.AddMonotonic(
3816 {chrono::milliseconds(10000), chrono::milliseconds(10000)});
3817
3818 const std::string kLogfile =
3819 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3820 util::UnlinkRecursive(kLogfile);
3821
3822 pi2_->Disconnect(pi1_->node());
3823 pi1_->Disconnect(pi2_->node());
3824
3825 {
3826 LoggerState pi2_logger = MakeLogger(pi2_);
3827
3828 pi2_logger.StartLogger(kLogfile);
3829 event_loop_factory_.RunFor(before_reboot_duration);
3830
3831 pi2_->Connect(pi1_->node());
3832 pi1_->Connect(pi2_->node());
3833
3834 event_loop_factory_.RunFor(test_duration);
3835
3836 pi2_logger.AppendAllFilenames(&filenames);
3837 }
3838
3839 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3840 ConfirmReadable(filenames);
3841
3842 {
3843 LogReader reader(sorted_parts);
3844 SimulatedEventLoopFactory replay_factory(reader.configuration());
3845 reader.RegisterWithoutStarting(&replay_factory);
3846
3847 NodeEventLoopFactory *const replay_node =
3848 reader.event_loop_factory()->GetNodeEventLoopFactory("pi1");
3849
3850 std::unique_ptr<EventLoop> test_event_loop =
3851 replay_node->MakeEventLoop("test_reader");
3852 replay_node->OnStartup([replay_node]() {
3853 // Check that we didn't boot until at least t=0.
3854 CHECK_LE(monotonic_clock::epoch(), replay_node->monotonic_now());
3855 });
3856 test_event_loop->OnRun([&test_event_loop]() {
3857 // Check that we didn't boot until at least t=0.
3858 EXPECT_LE(monotonic_clock::epoch(), test_event_loop->monotonic_now());
3859 });
3860 reader.event_loop_factory()->Run();
3861 reader.Deregister();
3862 }
3863}
3864
3865// Tests that when we have a loop without all the logs at all points in time, we
3866// can sort it properly.
3867TEST(MultinodeLoggerLoopTest, Loop) {
3868 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3869 aos::configuration::ReadConfig(ArtifactPath(
3870 "aos/events/logging/multinode_pingpong_triangle_split_config.json"));
3871 message_bridge::TestingTimeConverter time_converter(
3872 configuration::NodesCount(&config.message()));
3873 SimulatedEventLoopFactory event_loop_factory(&config.message());
3874 event_loop_factory.SetTimeConverter(&time_converter);
3875
3876 NodeEventLoopFactory *const pi1 =
3877 event_loop_factory.GetNodeEventLoopFactory("pi1");
3878 NodeEventLoopFactory *const pi2 =
3879 event_loop_factory.GetNodeEventLoopFactory("pi2");
3880 NodeEventLoopFactory *const pi3 =
3881 event_loop_factory.GetNodeEventLoopFactory("pi3");
3882
3883 const std::string kLogfile1_1 =
3884 aos::testing::TestTmpDir() + "/multi_logfile1/";
3885 const std::string kLogfile2_1 =
3886 aos::testing::TestTmpDir() + "/multi_logfile2/";
3887 const std::string kLogfile3_1 =
3888 aos::testing::TestTmpDir() + "/multi_logfile3/";
3889 util::UnlinkRecursive(kLogfile1_1);
3890 util::UnlinkRecursive(kLogfile2_1);
3891 util::UnlinkRecursive(kLogfile3_1);
3892
3893 {
3894 // Make pi1 boot before everything else.
3895 time_converter.AddNextTimestamp(
3896 distributed_clock::epoch(),
3897 {BootTimestamp::epoch(),
3898 BootTimestamp::epoch() - chrono::milliseconds(100),
3899 BootTimestamp::epoch() - chrono::milliseconds(300)});
3900 }
3901
3902 // We want to setup a situation such that 2 of the 3 legs of the loop are very
3903 // confident about time being X, and the third leg is pulling the average off
3904 // to one side.
3905 //
3906 // It's easiest to visualize this in timestamp_plotter.
3907
3908 std::vector<std::string> filenames;
3909 {
3910 // Have pi1 send out a reliable message at startup. This sets up a long
3911 // forwarding time message at the start to bias time.
3912 std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
3913 {
3914 aos::Sender<examples::Ping> ping_sender =
3915 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
3916
3917 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
3918 examples::Ping::Builder ping_builder =
3919 builder.MakeBuilder<examples::Ping>();
3920 CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
3921 }
3922
3923 // Wait a while so there's enough data to let the worst case be rather off.
3924 event_loop_factory.RunFor(chrono::seconds(1000));
3925
3926 // Now start a receiving node first. This sets up 2 tight bounds between 2
3927 // of the nodes.
3928 LoggerState pi2_logger = MakeLoggerState(
3929 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3930 pi2_logger.StartLogger(kLogfile2_1);
3931
3932 event_loop_factory.RunFor(chrono::seconds(100));
3933
3934 // And now start the third leg.
3935 LoggerState pi3_logger = MakeLoggerState(
3936 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3937 pi3_logger.StartLogger(kLogfile3_1);
3938
3939 LoggerState pi1_logger = MakeLoggerState(
3940 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3941 pi1_logger.StartLogger(kLogfile1_1);
3942
3943 event_loop_factory.RunFor(chrono::seconds(100));
3944
3945 pi1_logger.AppendAllFilenames(&filenames);
3946 pi2_logger.AppendAllFilenames(&filenames);
3947 pi3_logger.AppendAllFilenames(&filenames);
3948 }
3949
3950 // Make sure we can read this.
3951 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3952 auto result = ConfirmReadable(filenames);
3953}
3954
Austin Schuh08dba8f2023-05-01 08:29:30 -07003955// Tests that RestartLogging works in the simple case. Unfortunately, the
3956// failure cases involve simulating time elapsing in callbacks, which is really
3957// hard. The best we can reasonably do is make sure 2 back to back logs are
3958// parseable together.
3959TEST_P(MultinodeLoggerTest, RestartLogging) {
3960 time_converter_.AddMonotonic(
3961 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
3962 std::vector<std::string> filenames;
3963 {
3964 LoggerState pi1_logger = MakeLogger(pi1_);
3965
3966 event_loop_factory_.RunFor(chrono::milliseconds(95));
3967
3968 StartLogger(&pi1_logger, logfile_base1_);
3969 aos::monotonic_clock::time_point last_rotation_time =
3970 pi1_logger.event_loop->monotonic_now();
3971 pi1_logger.logger->set_on_logged_period([&] {
3972 const auto now = pi1_logger.event_loop->monotonic_now();
3973 if (now > last_rotation_time + std::chrono::seconds(5)) {
3974 pi1_logger.AppendAllFilenames(&filenames);
3975 std::unique_ptr<MultiNodeFilesLogNamer> namer =
3976 pi1_logger.MakeLogNamer(logfile_base2_);
3977 pi1_logger.log_namer = namer.get();
3978
3979 pi1_logger.logger->RestartLogging(std::move(namer));
3980 last_rotation_time = now;
3981 }
3982 });
3983
3984 event_loop_factory_.RunFor(chrono::milliseconds(7000));
3985
3986 pi1_logger.AppendAllFilenames(&filenames);
3987 }
3988
3989 for (const auto &x : filenames) {
3990 LOG(INFO) << x;
3991 }
3992
3993 EXPECT_GE(filenames.size(), 2u);
3994
3995 ConfirmReadable(filenames);
3996
3997 // TODO(austin): It would be good to confirm that any one time messages end up
3998 // in both logs correctly.
3999}
4000
Naman Guptaa63aa132023-03-22 20:06:34 -07004001} // namespace testing
4002} // namespace logger
4003} // namespace aos