blob: 44736a59cb20bdf5bf2327291e9d487f7be7eace [file] [log] [blame]
Brian Smartte67d7112023-03-20 12:06:30 -07001#include <algorithm>
2
Philipp Schrader790cb542023-07-05 21:06:52 -07003#include "gmock/gmock.h"
4#include "gtest/gtest.h"
5
Naman Guptaa63aa132023-03-22 20:06:34 -07006#include "aos/events/logging/log_reader.h"
7#include "aos/events/logging/multinode_logger_test_lib.h"
8#include "aos/events/message_counter.h"
9#include "aos/events/ping_lib.h"
10#include "aos/events/pong_lib.h"
11#include "aos/network/remote_message_generated.h"
12#include "aos/network/timestamp_generated.h"
13#include "aos/testing/tmpdir.h"
Naman Guptaa63aa132023-03-22 20:06:34 -070014
15namespace aos {
16namespace logger {
17namespace testing {
18
19namespace chrono = std::chrono;
20using aos::message_bridge::RemoteMessage;
21using aos::testing::ArtifactPath;
22using aos::testing::MessageCounter;
23
Naman Guptaa63aa132023-03-22 20:06:34 -070024INSTANTIATE_TEST_SUITE_P(
25 All, MultinodeLoggerTest,
26 ::testing::Combine(
27 ::testing::Values(
28 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080029 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070030 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080031 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070032 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
33
34INSTANTIATE_TEST_SUITE_P(
35 All, MultinodeLoggerDeathTest,
36 ::testing::Combine(
37 ::testing::Values(
38 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080039 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070040 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080041 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070042 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
43
44// Tests that we can write and read simple multi-node log files.
45TEST_P(MultinodeLoggerTest, SimpleMultiNode) {
46 std::vector<std::string> actual_filenames;
47 time_converter_.StartEqual();
48
49 {
50 LoggerState pi1_logger = MakeLogger(pi1_);
51 LoggerState pi2_logger = MakeLogger(pi2_);
52
53 event_loop_factory_.RunFor(chrono::milliseconds(95));
54
55 StartLogger(&pi1_logger);
56 StartLogger(&pi2_logger);
57
58 event_loop_factory_.RunFor(chrono::milliseconds(20000));
59 pi1_logger.AppendAllFilenames(&actual_filenames);
60 pi2_logger.AppendAllFilenames(&actual_filenames);
61 }
62
63 ASSERT_THAT(actual_filenames,
64 ::testing::UnorderedElementsAreArray(logfiles_));
65
66 {
67 std::set<std::string> logfile_uuids;
68 std::set<std::string> parts_uuids;
69 // Confirm that we have the expected number of UUIDs for both the logfile
70 // UUIDs and parts UUIDs.
71 std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
72 for (std::string_view f : logfiles_) {
73 log_header.emplace_back(ReadHeader(f).value());
74 if (!log_header.back().message().has_configuration()) {
75 logfile_uuids.insert(
76 log_header.back().message().log_event_uuid()->str());
77 parts_uuids.insert(log_header.back().message().parts_uuid()->str());
78 }
79 }
80
81 EXPECT_EQ(logfile_uuids.size(), 2u);
82 if (shared()) {
83 EXPECT_EQ(parts_uuids.size(), 7u);
84 } else {
85 EXPECT_EQ(parts_uuids.size(), 8u);
86 }
87
88 // And confirm everything is on the correct node.
89 EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
90 EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi1");
91 EXPECT_EQ(log_header[4].message().node()->name()->string_view(), "pi1");
92
93 EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
94 EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi2");
95
96 EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi2");
97 EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi2");
98 EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi2");
99
100 EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi1");
101 EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi1");
102
103 EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
104 EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
105
106 if (shared()) {
107 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
108 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi2");
109 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi2");
110
111 EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi1");
112 EXPECT_EQ(log_header[18].message().node()->name()->string_view(), "pi1");
113 } else {
114 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
115 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi2");
116
117 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi1");
118 EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi1");
119
120 EXPECT_EQ(log_header[18].message().node()->name()->string_view(), "pi2");
121 EXPECT_EQ(log_header[19].message().node()->name()->string_view(), "pi2");
122 }
123
124 // And the parts index matches.
125 EXPECT_EQ(log_header[2].message().parts_index(), 0);
126 EXPECT_EQ(log_header[3].message().parts_index(), 1);
127 EXPECT_EQ(log_header[4].message().parts_index(), 2);
128
129 EXPECT_EQ(log_header[5].message().parts_index(), 0);
130 EXPECT_EQ(log_header[6].message().parts_index(), 1);
131
132 EXPECT_EQ(log_header[7].message().parts_index(), 0);
133 EXPECT_EQ(log_header[8].message().parts_index(), 1);
134 EXPECT_EQ(log_header[9].message().parts_index(), 2);
135
136 EXPECT_EQ(log_header[10].message().parts_index(), 0);
137 EXPECT_EQ(log_header[11].message().parts_index(), 1);
138
139 EXPECT_EQ(log_header[12].message().parts_index(), 0);
140 EXPECT_EQ(log_header[13].message().parts_index(), 1);
141
142 if (shared()) {
143 EXPECT_EQ(log_header[14].message().parts_index(), 0);
144 EXPECT_EQ(log_header[15].message().parts_index(), 1);
145 EXPECT_EQ(log_header[16].message().parts_index(), 2);
146
147 EXPECT_EQ(log_header[17].message().parts_index(), 0);
148 EXPECT_EQ(log_header[18].message().parts_index(), 1);
149 } else {
150 EXPECT_EQ(log_header[14].message().parts_index(), 0);
151 EXPECT_EQ(log_header[15].message().parts_index(), 1);
152
153 EXPECT_EQ(log_header[16].message().parts_index(), 0);
154 EXPECT_EQ(log_header[17].message().parts_index(), 1);
155
156 EXPECT_EQ(log_header[18].message().parts_index(), 0);
157 EXPECT_EQ(log_header[19].message().parts_index(), 1);
158 }
159 }
160
161 const std::vector<LogFile> sorted_log_files = SortParts(logfiles_);
162 {
163 using ::testing::UnorderedElementsAre;
164 std::shared_ptr<const aos::Configuration> config =
165 sorted_log_files[0].config;
166
167 // Timing reports, pings
168 EXPECT_THAT(CountChannelsData(config, logfiles_[2]),
169 UnorderedElementsAre(
170 std::make_tuple("/pi1/aos",
171 "aos.message_bridge.ServerStatistics", 1),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700172 std::make_tuple("/test", "aos.examples.Ping", 1),
173 std::make_tuple("/pi1/aos", "aos.examples.Ping", 1)))
Naman Guptaa63aa132023-03-22 20:06:34 -0700174 << " : " << logfiles_[2];
175 {
176 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700177 std::make_tuple("/pi1/aos", "aos.examples.Ping", 10),
Naman Guptaa63aa132023-03-22 20:06:34 -0700178 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 1),
179 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
180 1)};
181 if (!std::get<0>(GetParam()).shared) {
182 channel_counts.push_back(
183 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
184 "aos-message_bridge-Timestamp",
185 "aos.message_bridge.RemoteMessage", 1));
186 }
187 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
188 ::testing::UnorderedElementsAreArray(channel_counts))
189 << " : " << logfiles_[3];
190 }
191 {
192 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700193 std::make_tuple("/pi1/aos", "aos.examples.Ping", 1990),
Naman Guptaa63aa132023-03-22 20:06:34 -0700194 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 199),
195 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
196 20),
197 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
198 199),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700199 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
Naman Guptaa63aa132023-03-22 20:06:34 -0700200 std::make_tuple("/test", "aos.examples.Ping", 2000)};
201 if (!std::get<0>(GetParam()).shared) {
202 channel_counts.push_back(
203 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
204 "aos-message_bridge-Timestamp",
205 "aos.message_bridge.RemoteMessage", 199));
206 }
207 EXPECT_THAT(CountChannelsData(config, logfiles_[4]),
208 ::testing::UnorderedElementsAreArray(channel_counts))
209 << " : " << logfiles_[4];
210 }
211 // Timestamps for pong
212 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
213 UnorderedElementsAre())
214 << " : " << logfiles_[2];
215 EXPECT_THAT(
216 CountChannelsTimestamp(config, logfiles_[3]),
217 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
218 << " : " << logfiles_[3];
219 EXPECT_THAT(
220 CountChannelsTimestamp(config, logfiles_[4]),
221 UnorderedElementsAre(
222 std::make_tuple("/test", "aos.examples.Pong", 2000),
223 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)))
224 << " : " << logfiles_[4];
225
226 // Pong data.
227 EXPECT_THAT(
228 CountChannelsData(config, logfiles_[5]),
229 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 91)))
230 << " : " << logfiles_[5];
231 EXPECT_THAT(CountChannelsData(config, logfiles_[6]),
232 UnorderedElementsAre(
233 std::make_tuple("/test", "aos.examples.Pong", 1910)))
234 << " : " << logfiles_[6];
235
236 // No timestamps
237 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
238 UnorderedElementsAre())
239 << " : " << logfiles_[5];
240 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[6]),
241 UnorderedElementsAre())
242 << " : " << logfiles_[6];
243
244 // Timing reports and pongs.
245 EXPECT_THAT(CountChannelsData(config, logfiles_[7]),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700246 UnorderedElementsAre(
247 std::make_tuple("/pi2/aos", "aos.examples.Ping", 1),
248 std::make_tuple("/pi2/aos",
249 "aos.message_bridge.ServerStatistics", 1)))
Naman Guptaa63aa132023-03-22 20:06:34 -0700250 << " : " << logfiles_[7];
251 EXPECT_THAT(
252 CountChannelsData(config, logfiles_[8]),
253 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
254 << " : " << logfiles_[8];
255 EXPECT_THAT(
256 CountChannelsData(config, logfiles_[9]),
257 UnorderedElementsAre(
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700258 std::make_tuple("/pi2/aos", "aos.examples.Ping", 2000),
Naman Guptaa63aa132023-03-22 20:06:34 -0700259 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
260 std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
261 20),
262 std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
263 200),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700264 std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
Naman Guptaa63aa132023-03-22 20:06:34 -0700265 std::make_tuple("/test", "aos.examples.Pong", 2000)))
266 << " : " << logfiles_[9];
267 // And ping timestamps.
268 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[7]),
269 UnorderedElementsAre())
270 << " : " << logfiles_[7];
271 EXPECT_THAT(
272 CountChannelsTimestamp(config, logfiles_[8]),
273 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
274 << " : " << logfiles_[8];
275 EXPECT_THAT(
276 CountChannelsTimestamp(config, logfiles_[9]),
277 UnorderedElementsAre(
278 std::make_tuple("/test", "aos.examples.Ping", 2000),
279 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
280 << " : " << logfiles_[9];
281
282 // And then test that the remotely logged timestamp data files only have
283 // timestamps in them.
284 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
285 UnorderedElementsAre())
286 << " : " << logfiles_[10];
287 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
288 UnorderedElementsAre())
289 << " : " << logfiles_[11];
290 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[12]),
291 UnorderedElementsAre())
292 << " : " << logfiles_[12];
293 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[13]),
294 UnorderedElementsAre())
295 << " : " << logfiles_[13];
296
297 EXPECT_THAT(CountChannelsData(config, logfiles_[10]),
298 UnorderedElementsAre(std::make_tuple(
299 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
300 << " : " << logfiles_[10];
301 EXPECT_THAT(CountChannelsData(config, logfiles_[11]),
302 UnorderedElementsAre(std::make_tuple(
303 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
304 << " : " << logfiles_[11];
305
306 EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
307 UnorderedElementsAre(std::make_tuple(
308 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
309 << " : " << logfiles_[12];
310 EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
311 UnorderedElementsAre(std::make_tuple(
312 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
313 << " : " << logfiles_[13];
314
315 // Timestamps from pi2 on pi1, and the other way.
316 if (shared()) {
317 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
318 UnorderedElementsAre())
319 << " : " << logfiles_[14];
320 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
321 UnorderedElementsAre())
322 << " : " << logfiles_[15];
323 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
324 UnorderedElementsAre())
325 << " : " << logfiles_[16];
326 EXPECT_THAT(CountChannelsData(config, logfiles_[17]),
327 UnorderedElementsAre())
328 << " : " << logfiles_[17];
329 EXPECT_THAT(CountChannelsData(config, logfiles_[18]),
330 UnorderedElementsAre())
331 << " : " << logfiles_[18];
332
333 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
334 UnorderedElementsAre(
335 std::make_tuple("/test", "aos.examples.Ping", 1)))
336 << " : " << logfiles_[14];
337 EXPECT_THAT(
338 CountChannelsTimestamp(config, logfiles_[15]),
339 UnorderedElementsAre(
340 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 9),
341 std::make_tuple("/test", "aos.examples.Ping", 90)))
342 << " : " << logfiles_[15];
343 EXPECT_THAT(
344 CountChannelsTimestamp(config, logfiles_[16]),
345 UnorderedElementsAre(
346 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 191),
347 std::make_tuple("/test", "aos.examples.Ping", 1910)))
348 << " : " << logfiles_[16];
349 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
350 UnorderedElementsAre(std::make_tuple(
351 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
352 << " : " << logfiles_[17];
353 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[18]),
354 UnorderedElementsAre(std::make_tuple(
355 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
356 << " : " << logfiles_[18];
357 } else {
358 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
359 UnorderedElementsAre())
360 << " : " << logfiles_[14];
361 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
362 UnorderedElementsAre())
363 << " : " << logfiles_[15];
364 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
365 UnorderedElementsAre())
366 << " : " << logfiles_[16];
367 EXPECT_THAT(CountChannelsData(config, logfiles_[17]),
368 UnorderedElementsAre())
369 << " : " << logfiles_[17];
370 EXPECT_THAT(CountChannelsData(config, logfiles_[18]),
371 UnorderedElementsAre())
372 << " : " << logfiles_[18];
373 EXPECT_THAT(CountChannelsData(config, logfiles_[19]),
374 UnorderedElementsAre())
375 << " : " << logfiles_[19];
376
377 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
378 UnorderedElementsAre(std::make_tuple(
379 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
380 << " : " << logfiles_[14];
381 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
382 UnorderedElementsAre(std::make_tuple(
383 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
384 << " : " << logfiles_[15];
385 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[16]),
386 UnorderedElementsAre(std::make_tuple(
387 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
388 << " : " << logfiles_[16];
389 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
390 UnorderedElementsAre(std::make_tuple(
391 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
392 << " : " << logfiles_[17];
393 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[18]),
394 UnorderedElementsAre(
395 std::make_tuple("/test", "aos.examples.Ping", 91)))
396 << " : " << logfiles_[18];
397 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[19]),
398 UnorderedElementsAre(
399 std::make_tuple("/test", "aos.examples.Ping", 1910)))
400 << " : " << logfiles_[19];
401 }
402 }
403
404 LogReader reader(sorted_log_files);
405
406 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
407 log_reader_factory.set_send_delay(chrono::microseconds(0));
408
409 // This sends out the fetched messages and advances time to the start of the
410 // log file.
411 reader.Register(&log_reader_factory);
412
413 const Node *pi1 =
414 configuration::GetNode(log_reader_factory.configuration(), "pi1");
415 const Node *pi2 =
416 configuration::GetNode(log_reader_factory.configuration(), "pi2");
417
418 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
419 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
420 LOG(INFO) << "now pi1 "
421 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
422 LOG(INFO) << "now pi2 "
423 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
424
425 EXPECT_THAT(reader.LoggedNodes(),
426 ::testing::ElementsAre(
427 configuration::GetNode(reader.logged_configuration(), pi1),
428 configuration::GetNode(reader.logged_configuration(), pi2)));
429
430 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
431
432 std::unique_ptr<EventLoop> pi1_event_loop =
433 log_reader_factory.MakeEventLoop("test", pi1);
434 std::unique_ptr<EventLoop> pi2_event_loop =
435 log_reader_factory.MakeEventLoop("test", pi2);
436
437 int pi1_ping_count = 10;
438 int pi2_ping_count = 10;
439 int pi1_pong_count = 10;
440 int pi2_pong_count = 10;
441
442 // Confirm that the ping value matches.
443 pi1_event_loop->MakeWatcher(
444 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
445 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
446 << pi1_event_loop->context().monotonic_remote_time << " -> "
447 << pi1_event_loop->context().monotonic_event_time;
448 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
449 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
450 pi1_ping_count * chrono::milliseconds(10) +
451 monotonic_clock::epoch());
452 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
453 pi1_ping_count * chrono::milliseconds(10) +
454 realtime_clock::epoch());
455 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
456 pi1_event_loop->context().monotonic_event_time);
457 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
458 pi1_event_loop->context().realtime_event_time);
459
460 ++pi1_ping_count;
461 });
462 pi2_event_loop->MakeWatcher(
463 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
464 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
465 << pi2_event_loop->context().monotonic_remote_time << " -> "
466 << pi2_event_loop->context().monotonic_event_time;
467 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
468
469 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
470 pi2_ping_count * chrono::milliseconds(10) +
471 monotonic_clock::epoch());
472 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
473 pi2_ping_count * chrono::milliseconds(10) +
474 realtime_clock::epoch());
475 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time +
476 chrono::microseconds(150),
477 pi2_event_loop->context().monotonic_event_time);
478 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time +
479 chrono::microseconds(150),
480 pi2_event_loop->context().realtime_event_time);
481 ++pi2_ping_count;
482 });
483
484 constexpr ssize_t kQueueIndexOffset = -9;
485 // Confirm that the ping and pong counts both match, and the value also
486 // matches.
487 pi1_event_loop->MakeWatcher(
488 "/test", [&pi1_event_loop, &pi1_ping_count,
489 &pi1_pong_count](const examples::Pong &pong) {
490 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
491 << pi1_event_loop->context().monotonic_remote_time << " -> "
492 << pi1_event_loop->context().monotonic_event_time;
493
494 EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
495 pi1_pong_count + kQueueIndexOffset);
496 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
497 chrono::microseconds(200) +
498 pi1_pong_count * chrono::milliseconds(10) +
499 monotonic_clock::epoch());
500 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
501 chrono::microseconds(200) +
502 pi1_pong_count * chrono::milliseconds(10) +
503 realtime_clock::epoch());
504
505 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
506 chrono::microseconds(150),
507 pi1_event_loop->context().monotonic_event_time);
508 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
509 chrono::microseconds(150),
510 pi1_event_loop->context().realtime_event_time);
511
512 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
513 ++pi1_pong_count;
514 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
515 });
516 pi2_event_loop->MakeWatcher(
517 "/test", [&pi2_event_loop, &pi2_ping_count,
518 &pi2_pong_count](const examples::Pong &pong) {
519 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
520 << pi2_event_loop->context().monotonic_remote_time << " -> "
521 << pi2_event_loop->context().monotonic_event_time;
522
523 EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
524 pi2_pong_count + kQueueIndexOffset);
525
526 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
527 chrono::microseconds(200) +
528 pi2_pong_count * chrono::milliseconds(10) +
529 monotonic_clock::epoch());
530 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
531 chrono::microseconds(200) +
532 pi2_pong_count * chrono::milliseconds(10) +
533 realtime_clock::epoch());
534
535 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
536 pi2_event_loop->context().monotonic_event_time);
537 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
538 pi2_event_loop->context().realtime_event_time);
539
540 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
541 ++pi2_pong_count;
542 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
543 });
544
545 log_reader_factory.Run();
546 EXPECT_EQ(pi1_ping_count, 2010);
547 EXPECT_EQ(pi2_ping_count, 2010);
548 EXPECT_EQ(pi1_pong_count, 2010);
549 EXPECT_EQ(pi2_pong_count, 2010);
550
551 reader.Deregister();
552}
553
554// Test that if we feed the replay with a mismatched node list that we die on
555// the LogReader constructor.
556TEST_P(MultinodeLoggerDeathTest, MultiNodeBadReplayConfig) {
557 time_converter_.StartEqual();
558 {
559 LoggerState pi1_logger = MakeLogger(pi1_);
560 LoggerState pi2_logger = MakeLogger(pi2_);
561
562 event_loop_factory_.RunFor(chrono::milliseconds(95));
563
564 StartLogger(&pi1_logger);
565 StartLogger(&pi2_logger);
566
567 event_loop_factory_.RunFor(chrono::milliseconds(20000));
568 }
569
570 // Test that, if we add an additional node to the replay config that the
571 // logger complains about the mismatch in number of nodes.
572 FlatbufferDetachedBuffer<Configuration> extra_nodes_config =
573 configuration::MergeWithConfig(&config_.message(), R"({
574 "nodes": [
575 {
576 "name": "extra-node"
577 }
578 ]
579 }
580 )");
581
582 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
583 EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
584 "Log file and replay config need to have matching nodes lists.");
585}
586
587// Tests that we can read log files where they don't start at the same monotonic
588// time.
589TEST_P(MultinodeLoggerTest, StaggeredStart) {
590 time_converter_.StartEqual();
591 std::vector<std::string> actual_filenames;
592
593 {
594 LoggerState pi1_logger = MakeLogger(pi1_);
595 LoggerState pi2_logger = MakeLogger(pi2_);
596
597 event_loop_factory_.RunFor(chrono::milliseconds(95));
598
599 StartLogger(&pi1_logger);
600
601 event_loop_factory_.RunFor(chrono::milliseconds(200));
602
603 StartLogger(&pi2_logger);
604
605 event_loop_factory_.RunFor(chrono::milliseconds(20000));
606 pi1_logger.AppendAllFilenames(&actual_filenames);
607 pi2_logger.AppendAllFilenames(&actual_filenames);
608 }
609
610 // Since we delay starting pi2, it already knows about all the timestamps so
611 // we don't end up with extra parts.
612 LogReader reader(SortParts(actual_filenames));
613
614 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
615 log_reader_factory.set_send_delay(chrono::microseconds(0));
616
617 // This sends out the fetched messages and advances time to the start of the
618 // log file.
619 reader.Register(&log_reader_factory);
620
621 const Node *pi1 =
622 configuration::GetNode(log_reader_factory.configuration(), "pi1");
623 const Node *pi2 =
624 configuration::GetNode(log_reader_factory.configuration(), "pi2");
625
626 EXPECT_THAT(reader.LoggedNodes(),
627 ::testing::ElementsAre(
628 configuration::GetNode(reader.logged_configuration(), pi1),
629 configuration::GetNode(reader.logged_configuration(), pi2)));
630
631 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
632
633 std::unique_ptr<EventLoop> pi1_event_loop =
634 log_reader_factory.MakeEventLoop("test", pi1);
635 std::unique_ptr<EventLoop> pi2_event_loop =
636 log_reader_factory.MakeEventLoop("test", pi2);
637
638 int pi1_ping_count = 30;
639 int pi2_ping_count = 30;
640 int pi1_pong_count = 30;
641 int pi2_pong_count = 30;
642
643 // Confirm that the ping value matches.
644 pi1_event_loop->MakeWatcher(
645 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
646 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
647 << pi1_event_loop->context().monotonic_remote_time << " -> "
648 << pi1_event_loop->context().monotonic_event_time;
649 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
650
651 ++pi1_ping_count;
652 });
653 pi2_event_loop->MakeWatcher(
654 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
655 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
656 << pi2_event_loop->context().monotonic_remote_time << " -> "
657 << pi2_event_loop->context().monotonic_event_time;
658 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
659
660 ++pi2_ping_count;
661 });
662
663 // Confirm that the ping and pong counts both match, and the value also
664 // matches.
665 pi1_event_loop->MakeWatcher(
666 "/test", [&pi1_event_loop, &pi1_ping_count,
667 &pi1_pong_count](const examples::Pong &pong) {
668 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
669 << pi1_event_loop->context().monotonic_remote_time << " -> "
670 << pi1_event_loop->context().monotonic_event_time;
671
672 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
673 ++pi1_pong_count;
674 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
675 });
676 pi2_event_loop->MakeWatcher(
677 "/test", [&pi2_event_loop, &pi2_ping_count,
678 &pi2_pong_count](const examples::Pong &pong) {
679 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
680 << pi2_event_loop->context().monotonic_remote_time << " -> "
681 << pi2_event_loop->context().monotonic_event_time;
682
683 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
684 ++pi2_pong_count;
685 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
686 });
687
688 log_reader_factory.Run();
689 EXPECT_EQ(pi1_ping_count, 2030);
690 EXPECT_EQ(pi2_ping_count, 2030);
691 EXPECT_EQ(pi1_pong_count, 2030);
692 EXPECT_EQ(pi2_pong_count, 2030);
693
694 reader.Deregister();
695}
696
697// Tests that we can read log files where the monotonic clocks drift and don't
698// match correctly. While we are here, also test that different ending times
699// also is readable.
700TEST_P(MultinodeLoggerTest, MismatchedClocks) {
701 // TODO(austin): Negate...
702 const chrono::nanoseconds initial_pi2_offset = chrono::seconds(1000);
703
704 time_converter_.AddMonotonic(
705 {BootTimestamp::epoch(), BootTimestamp::epoch() + initial_pi2_offset});
706 // Wait for 95 ms, (~0.1 seconds - 1/2 of the ping/pong period), and set the
707 // skew to be 200 uS/s
708 const chrono::nanoseconds startup_sleep1 = time_converter_.AddMonotonic(
709 {chrono::milliseconds(95),
710 chrono::milliseconds(95) - chrono::nanoseconds(200) * 95});
711 // Run another 200 ms to have one logger start first.
712 const chrono::nanoseconds startup_sleep2 = time_converter_.AddMonotonic(
713 {chrono::milliseconds(200), chrono::milliseconds(200)});
714 // Slew one way then the other at the same 200 uS/S slew rate. Make sure we
715 // go far enough to cause problems if this isn't accounted for.
716 const chrono::nanoseconds logger_run1 = time_converter_.AddMonotonic(
717 {chrono::milliseconds(20000),
718 chrono::milliseconds(20000) - chrono::nanoseconds(200) * 20000});
719 const chrono::nanoseconds logger_run2 = time_converter_.AddMonotonic(
720 {chrono::milliseconds(40000),
721 chrono::milliseconds(40000) + chrono::nanoseconds(200) * 40000});
722 const chrono::nanoseconds logger_run3 = time_converter_.AddMonotonic(
723 {chrono::milliseconds(400), chrono::milliseconds(400)});
724
725 {
726 LoggerState pi2_logger = MakeLogger(pi2_);
727
728 LOG(INFO) << "pi2 times: " << pi2_->monotonic_now() << " "
729 << pi2_->realtime_now() << " distributed "
730 << pi2_->ToDistributedClock(pi2_->monotonic_now());
731
732 LOG(INFO) << "pi2_ times: " << pi2_->monotonic_now() << " "
733 << pi2_->realtime_now() << " distributed "
734 << pi2_->ToDistributedClock(pi2_->monotonic_now());
735
736 event_loop_factory_.RunFor(startup_sleep1);
737
738 StartLogger(&pi2_logger);
739
740 event_loop_factory_.RunFor(startup_sleep2);
741
742 {
743 // Run pi1's logger for only part of the time.
744 LoggerState pi1_logger = MakeLogger(pi1_);
745
746 StartLogger(&pi1_logger);
747 event_loop_factory_.RunFor(logger_run1);
748
749 // Make sure we slewed time far enough so that the difference is greater
750 // than the network delay. This confirms that if we sort incorrectly, it
751 // would show in the results.
752 EXPECT_LT(
753 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
754 -event_loop_factory_.send_delay() -
755 event_loop_factory_.network_delay());
756
757 event_loop_factory_.RunFor(logger_run2);
758
759 // And now check that we went far enough the other way to make sure we
760 // cover both problems.
761 EXPECT_GT(
762 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
763 event_loop_factory_.send_delay() +
764 event_loop_factory_.network_delay());
765 }
766
767 // And log a bit more on pi2.
768 event_loop_factory_.RunFor(logger_run3);
769 }
770
771 LogReader reader(
James Kuszmaul298b4a22023-06-28 20:01:03 -0700772 SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 3, 3)));
Naman Guptaa63aa132023-03-22 20:06:34 -0700773
774 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
775 log_reader_factory.set_send_delay(chrono::microseconds(0));
776
777 const Node *pi1 =
778 configuration::GetNode(log_reader_factory.configuration(), "pi1");
779 const Node *pi2 =
780 configuration::GetNode(log_reader_factory.configuration(), "pi2");
781
782 // This sends out the fetched messages and advances time to the start of the
783 // log file.
784 reader.Register(&log_reader_factory);
785
786 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
787 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
788 LOG(INFO) << "now pi1 "
789 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
790 LOG(INFO) << "now pi2 "
791 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
792
793 LOG(INFO) << "Done registering (pi1) "
794 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
795 << " "
796 << log_reader_factory.GetNodeEventLoopFactory(pi1)->realtime_now();
797 LOG(INFO) << "Done registering (pi2) "
798 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now()
799 << " "
800 << log_reader_factory.GetNodeEventLoopFactory(pi2)->realtime_now();
801
802 EXPECT_THAT(reader.LoggedNodes(),
803 ::testing::ElementsAre(
804 configuration::GetNode(reader.logged_configuration(), pi1),
805 configuration::GetNode(reader.logged_configuration(), pi2)));
806
807 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
808
809 std::unique_ptr<EventLoop> pi1_event_loop =
810 log_reader_factory.MakeEventLoop("test", pi1);
811 std::unique_ptr<EventLoop> pi2_event_loop =
812 log_reader_factory.MakeEventLoop("test", pi2);
813
814 int pi1_ping_count = 30;
815 int pi2_ping_count = 30;
816 int pi1_pong_count = 30;
817 int pi2_pong_count = 30;
818
819 // Confirm that the ping value matches.
820 pi1_event_loop->MakeWatcher(
821 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
822 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
823 << pi1_event_loop->context().monotonic_remote_time << " -> "
824 << pi1_event_loop->context().monotonic_event_time;
825 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
826
827 ++pi1_ping_count;
828 });
829 pi2_event_loop->MakeWatcher(
830 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
831 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
832 << pi2_event_loop->context().monotonic_remote_time << " -> "
833 << pi2_event_loop->context().monotonic_event_time;
834 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
835
836 ++pi2_ping_count;
837 });
838
839 // Confirm that the ping and pong counts both match, and the value also
840 // matches.
841 pi1_event_loop->MakeWatcher(
842 "/test", [&pi1_event_loop, &pi1_ping_count,
843 &pi1_pong_count](const examples::Pong &pong) {
844 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
845 << pi1_event_loop->context().monotonic_remote_time << " -> "
846 << pi1_event_loop->context().monotonic_event_time;
847
848 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
849 ++pi1_pong_count;
850 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
851 });
852 pi2_event_loop->MakeWatcher(
853 "/test", [&pi2_event_loop, &pi2_ping_count,
854 &pi2_pong_count](const examples::Pong &pong) {
855 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
856 << pi2_event_loop->context().monotonic_remote_time << " -> "
857 << pi2_event_loop->context().monotonic_event_time;
858
859 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
860 ++pi2_pong_count;
861 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
862 });
863
864 log_reader_factory.Run();
865 EXPECT_EQ(pi1_ping_count, 6030);
866 EXPECT_EQ(pi2_ping_count, 6030);
867 EXPECT_EQ(pi1_pong_count, 6030);
868 EXPECT_EQ(pi2_pong_count, 6030);
869
870 reader.Deregister();
871}
872
873// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
874TEST_P(MultinodeLoggerTest, SortParts) {
875 time_converter_.StartEqual();
876 // Make a bunch of parts.
877 {
878 LoggerState pi1_logger = MakeLogger(pi1_);
879 LoggerState pi2_logger = MakeLogger(pi2_);
880
881 event_loop_factory_.RunFor(chrono::milliseconds(95));
882
883 StartLogger(&pi1_logger);
884 StartLogger(&pi2_logger);
885
886 event_loop_factory_.RunFor(chrono::milliseconds(2000));
887 }
888
889 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
890 VerifyParts(sorted_parts);
891}
892
893// Tests that we can sort a bunch of parts with an empty part. We should ignore
894// it and remove it from the sorted list.
895TEST_P(MultinodeLoggerTest, SortEmptyParts) {
896 time_converter_.StartEqual();
897 // Make a bunch of parts.
898 {
899 LoggerState pi1_logger = MakeLogger(pi1_);
900 LoggerState pi2_logger = MakeLogger(pi2_);
901
902 event_loop_factory_.RunFor(chrono::milliseconds(95));
903
904 StartLogger(&pi1_logger);
905 StartLogger(&pi2_logger);
906
907 event_loop_factory_.RunFor(chrono::milliseconds(2000));
908 }
909
910 // TODO(austin): Should we flip out if the file can't open?
911 const std::string kEmptyFile("foobarinvalidfiledoesnotexist" + Extension());
912
913 aos::util::WriteStringToFileOrDie(kEmptyFile, "");
914 logfiles_.emplace_back(kEmptyFile);
915
916 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
917 VerifyParts(sorted_parts, {kEmptyFile});
918}
919
920// Tests that we can sort a bunch of parts with the end missing off a
921// file. We should use the part we can read.
922TEST_P(MultinodeLoggerTest, SortTruncatedParts) {
923 std::vector<std::string> actual_filenames;
924 time_converter_.StartEqual();
925 // Make a bunch of parts.
926 {
927 LoggerState pi1_logger = MakeLogger(pi1_);
928 LoggerState pi2_logger = MakeLogger(pi2_);
929
930 event_loop_factory_.RunFor(chrono::milliseconds(95));
931
932 StartLogger(&pi1_logger);
933 StartLogger(&pi2_logger);
934
935 event_loop_factory_.RunFor(chrono::milliseconds(2000));
936
937 pi1_logger.AppendAllFilenames(&actual_filenames);
938 pi2_logger.AppendAllFilenames(&actual_filenames);
939 }
940
941 ASSERT_THAT(actual_filenames,
942 ::testing::UnorderedElementsAreArray(logfiles_));
943
944 // Strip off the end of one of the files. Pick one with a lot of data.
945 // For snappy, needs to have enough data to be >1 chunk of compressed data so
946 // that we don't corrupt the entire log part.
947 ::std::string compressed_contents =
948 aos::util::ReadFileToStringOrDie(logfiles_[4]);
949
950 aos::util::WriteStringToFileOrDie(
951 logfiles_[4],
952 compressed_contents.substr(0, compressed_contents.size() - 100));
953
954 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
955 VerifyParts(sorted_parts);
956}
957
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700958// Tests that if we remap a logged channel, it shows up correctly.
Naman Guptaa63aa132023-03-22 20:06:34 -0700959TEST_P(MultinodeLoggerTest, RemapLoggedChannel) {
960 time_converter_.StartEqual();
961 {
962 LoggerState pi1_logger = MakeLogger(pi1_);
963 LoggerState pi2_logger = MakeLogger(pi2_);
964
965 event_loop_factory_.RunFor(chrono::milliseconds(95));
966
967 StartLogger(&pi1_logger);
968 StartLogger(&pi2_logger);
969
970 event_loop_factory_.RunFor(chrono::milliseconds(20000));
971 }
972
973 LogReader reader(SortParts(logfiles_));
974
975 // Remap just on pi1.
976 reader.RemapLoggedChannel<aos::timing::Report>(
977 "/aos", configuration::GetNode(reader.configuration(), "pi1"));
978
979 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
980 log_reader_factory.set_send_delay(chrono::microseconds(0));
981
982 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
983 // Note: An extra channel gets remapped automatically due to a timestamp
984 // channel being LOCAL_LOGGER'd.
985 ASSERT_EQ(remapped_channels.size(), std::get<0>(GetParam()).shared ? 1u : 2u);
986 EXPECT_EQ(remapped_channels[0]->name()->string_view(), "/original/pi1/aos");
987 EXPECT_EQ(remapped_channels[0]->type()->string_view(), "aos.timing.Report");
988 if (!std::get<0>(GetParam()).shared) {
989 EXPECT_EQ(remapped_channels[1]->name()->string_view(),
990 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
991 "aos-message_bridge-Timestamp");
992 EXPECT_EQ(remapped_channels[1]->type()->string_view(),
993 "aos.message_bridge.RemoteMessage");
994 }
995
996 reader.Register(&log_reader_factory);
997
998 const Node *pi1 =
999 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1000 const Node *pi2 =
1001 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1002
1003 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1004 // else should have moved.
1005 std::unique_ptr<EventLoop> pi1_event_loop =
1006 log_reader_factory.MakeEventLoop("test", pi1);
1007 pi1_event_loop->SkipTimingReport();
1008 std::unique_ptr<EventLoop> full_pi1_event_loop =
1009 log_reader_factory.MakeEventLoop("test", pi1);
1010 full_pi1_event_loop->SkipTimingReport();
1011 std::unique_ptr<EventLoop> pi2_event_loop =
1012 log_reader_factory.MakeEventLoop("test", pi2);
1013 pi2_event_loop->SkipTimingReport();
1014
1015 MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
1016 "/aos");
1017 MessageCounter<aos::timing::Report> full_pi1_timing_report(
1018 full_pi1_event_loop.get(), "/pi1/aos");
1019 MessageCounter<aos::timing::Report> pi1_original_timing_report(
1020 pi1_event_loop.get(), "/original/aos");
1021 MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
1022 full_pi1_event_loop.get(), "/original/pi1/aos");
1023 MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
1024 "/aos");
1025
1026 log_reader_factory.Run();
1027
1028 EXPECT_EQ(pi1_timing_report.count(), 0u);
1029 EXPECT_EQ(full_pi1_timing_report.count(), 0u);
1030 EXPECT_NE(pi1_original_timing_report.count(), 0u);
1031 EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
1032 EXPECT_NE(pi2_timing_report.count(), 0u);
1033
1034 reader.Deregister();
1035}
1036
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001037// Tests that if we rename a logged channel, it shows up correctly.
1038TEST_P(MultinodeLoggerTest, RenameLoggedChannel) {
1039 std::vector<std::string> actual_filenames;
1040 time_converter_.StartEqual();
1041 {
1042 LoggerState pi1_logger = MakeLogger(pi1_);
1043 LoggerState pi2_logger = MakeLogger(pi2_);
1044
1045 event_loop_factory_.RunFor(chrono::milliseconds(95));
1046
1047 StartLogger(&pi1_logger);
1048 StartLogger(&pi2_logger);
1049
1050 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1051
1052 pi1_logger.AppendAllFilenames(&actual_filenames);
1053 pi2_logger.AppendAllFilenames(&actual_filenames);
1054 }
1055
1056 LogReader reader(SortParts(actual_filenames));
1057
1058 // Rename just on pi2. Add some global maps just to verify they get added in
1059 // the config and used correctly.
1060 std::vector<MapT> maps;
1061 {
1062 MapT map;
1063 map.match = std::make_unique<ChannelT>();
1064 map.match->name = "/foo*";
1065 map.match->source_node = "pi1";
1066 map.rename = std::make_unique<ChannelT>();
1067 map.rename->name = "/pi1/foo";
1068 maps.emplace_back(std::move(map));
1069 }
1070 {
1071 MapT map;
1072 map.match = std::make_unique<ChannelT>();
1073 map.match->name = "/foo*";
1074 map.match->source_node = "pi2";
1075 map.rename = std::make_unique<ChannelT>();
1076 map.rename->name = "/pi2/foo";
1077 maps.emplace_back(std::move(map));
1078 }
1079 {
1080 MapT map;
1081 map.match = std::make_unique<ChannelT>();
1082 map.match->name = "/foo";
1083 map.match->type = "aos.examples.Ping";
1084 map.rename = std::make_unique<ChannelT>();
1085 map.rename->name = "/foo/renamed";
1086 maps.emplace_back(std::move(map));
1087 }
1088 reader.RenameLoggedChannel<aos::examples::Ping>(
1089 "/aos", configuration::GetNode(reader.configuration(), "pi2"),
1090 "/pi2/foo/renamed", maps);
1091
1092 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1093 log_reader_factory.set_send_delay(chrono::microseconds(0));
1094
1095 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1096 // Note: An extra channel gets remapped automatically due to a timestamp
1097 // channel being LOCAL_LOGGER'd.
1098 const bool shared = std::get<0>(GetParam()).shared;
1099 ASSERT_EQ(remapped_channels.size(), shared ? 1u : 2u);
1100 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->name()->string_view(),
1101 "/pi2/foo/renamed");
1102 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->type()->string_view(),
1103 "aos.examples.Ping");
1104 if (!shared) {
1105 EXPECT_EQ(remapped_channels[0]->name()->string_view(),
1106 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1107 "aos-message_bridge-Timestamp");
1108 EXPECT_EQ(remapped_channels[0]->type()->string_view(),
1109 "aos.message_bridge.RemoteMessage");
1110 }
1111
1112 reader.Register(&log_reader_factory);
1113
1114 const Node *pi1 =
1115 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1116 const Node *pi2 =
1117 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1118
1119 // Confirm we can read the data on the renamed channel, just for pi2. Nothing
1120 // else should have moved.
1121 std::unique_ptr<EventLoop> pi2_event_loop =
1122 log_reader_factory.MakeEventLoop("test", pi2);
1123 pi2_event_loop->SkipTimingReport();
1124 std::unique_ptr<EventLoop> full_pi2_event_loop =
1125 log_reader_factory.MakeEventLoop("test", pi2);
1126 full_pi2_event_loop->SkipTimingReport();
1127 std::unique_ptr<EventLoop> pi1_event_loop =
1128 log_reader_factory.MakeEventLoop("test", pi1);
1129 pi1_event_loop->SkipTimingReport();
1130
1131 MessageCounter<aos::examples::Ping> pi2_ping(pi2_event_loop.get(), "/aos");
1132 MessageCounter<aos::examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1133 "/foo");
1134 MessageCounter<aos::examples::Ping> full_pi2_renamed_ping(
1135 full_pi2_event_loop.get(), "/pi2/foo/renamed");
1136 MessageCounter<aos::examples::Ping> pi1_ping(pi1_event_loop.get(), "/aos");
1137
1138 log_reader_factory.Run();
1139
1140 EXPECT_EQ(pi2_ping.count(), 0u);
1141 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1142 EXPECT_NE(full_pi2_renamed_ping.count(), 0u);
1143 EXPECT_NE(pi1_ping.count(), 0u);
1144
1145 reader.Deregister();
1146}
1147
Naman Guptaa63aa132023-03-22 20:06:34 -07001148// Tests that we can remap a forwarded channel as well.
1149TEST_P(MultinodeLoggerTest, RemapForwardedLoggedChannel) {
1150 time_converter_.StartEqual();
1151 {
1152 LoggerState pi1_logger = MakeLogger(pi1_);
1153 LoggerState pi2_logger = MakeLogger(pi2_);
1154
1155 event_loop_factory_.RunFor(chrono::milliseconds(95));
1156
1157 StartLogger(&pi1_logger);
1158 StartLogger(&pi2_logger);
1159
1160 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1161 }
1162
1163 LogReader reader(SortParts(logfiles_));
1164
1165 reader.RemapLoggedChannel<examples::Ping>("/test");
1166
1167 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1168 log_reader_factory.set_send_delay(chrono::microseconds(0));
1169
1170 reader.Register(&log_reader_factory);
1171
1172 const Node *pi1 =
1173 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1174 const Node *pi2 =
1175 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1176
1177 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1178 // else should have moved.
1179 std::unique_ptr<EventLoop> pi1_event_loop =
1180 log_reader_factory.MakeEventLoop("test", pi1);
1181 pi1_event_loop->SkipTimingReport();
1182 std::unique_ptr<EventLoop> full_pi1_event_loop =
1183 log_reader_factory.MakeEventLoop("test", pi1);
1184 full_pi1_event_loop->SkipTimingReport();
1185 std::unique_ptr<EventLoop> pi2_event_loop =
1186 log_reader_factory.MakeEventLoop("test", pi2);
1187 pi2_event_loop->SkipTimingReport();
1188
1189 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1190 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1191 MessageCounter<examples::Ping> pi1_original_ping(pi1_event_loop.get(),
1192 "/original/test");
1193 MessageCounter<examples::Ping> pi2_original_ping(pi2_event_loop.get(),
1194 "/original/test");
1195
1196 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1197 pi1_original_ping_timestamp;
1198 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1199 pi1_ping_timestamp;
1200 if (!shared()) {
1201 pi1_original_ping_timestamp =
1202 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1203 pi1_event_loop.get(),
1204 "/pi1/aos/remote_timestamps/pi2/original/test/aos-examples-Ping");
1205 pi1_ping_timestamp =
1206 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1207 pi1_event_loop.get(),
1208 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1209 }
1210
1211 log_reader_factory.Run();
1212
1213 EXPECT_EQ(pi1_ping.count(), 0u);
1214 EXPECT_EQ(pi2_ping.count(), 0u);
1215 EXPECT_NE(pi1_original_ping.count(), 0u);
1216 EXPECT_NE(pi2_original_ping.count(), 0u);
1217 if (!shared()) {
1218 EXPECT_NE(pi1_original_ping_timestamp->count(), 0u);
1219 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1220 }
1221
1222 reader.Deregister();
1223}
1224
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001225// Tests that we can rename a forwarded channel as well.
1226TEST_P(MultinodeLoggerTest, RenameForwardedLoggedChannel) {
1227 std::vector<std::string> actual_filenames;
1228 time_converter_.StartEqual();
1229 {
1230 LoggerState pi1_logger = MakeLogger(pi1_);
1231 LoggerState pi2_logger = MakeLogger(pi2_);
1232
1233 event_loop_factory_.RunFor(chrono::milliseconds(95));
1234
1235 StartLogger(&pi1_logger);
1236 StartLogger(&pi2_logger);
1237
1238 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1239
1240 pi1_logger.AppendAllFilenames(&actual_filenames);
1241 pi2_logger.AppendAllFilenames(&actual_filenames);
1242 }
1243
1244 LogReader reader(SortParts(actual_filenames));
1245
1246 std::vector<MapT> maps;
1247 {
1248 MapT map;
1249 map.match = std::make_unique<ChannelT>();
1250 map.match->name = "/production*";
1251 map.match->source_node = "pi1";
1252 map.rename = std::make_unique<ChannelT>();
1253 map.rename->name = "/pi1/production";
1254 maps.emplace_back(std::move(map));
1255 }
1256 {
1257 MapT map;
1258 map.match = std::make_unique<ChannelT>();
1259 map.match->name = "/production*";
1260 map.match->source_node = "pi2";
1261 map.rename = std::make_unique<ChannelT>();
1262 map.rename->name = "/pi2/production";
1263 maps.emplace_back(std::move(map));
1264 }
1265 reader.RenameLoggedChannel<aos::examples::Ping>(
1266 "/test", configuration::GetNode(reader.configuration(), "pi1"),
1267 "/pi1/production", maps);
1268
1269 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1270 log_reader_factory.set_send_delay(chrono::microseconds(0));
1271
1272 reader.Register(&log_reader_factory);
1273
1274 const Node *pi1 =
1275 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1276 const Node *pi2 =
1277 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1278
1279 // Confirm we can read the data on the renamed channel, on both the source
1280 // node and the remote node. In case of split timestamp channels, confirm that
1281 // we receive the timestamp messages on the renamed channel as well.
1282 std::unique_ptr<EventLoop> pi1_event_loop =
1283 log_reader_factory.MakeEventLoop("test", pi1);
1284 pi1_event_loop->SkipTimingReport();
1285 std::unique_ptr<EventLoop> full_pi1_event_loop =
1286 log_reader_factory.MakeEventLoop("test", pi1);
1287 full_pi1_event_loop->SkipTimingReport();
1288 std::unique_ptr<EventLoop> pi2_event_loop =
1289 log_reader_factory.MakeEventLoop("test", pi2);
1290 pi2_event_loop->SkipTimingReport();
1291
1292 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1293 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1294 MessageCounter<examples::Ping> pi1_renamed_ping(pi1_event_loop.get(),
1295 "/pi1/production");
1296 MessageCounter<examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1297 "/pi1/production");
1298
1299 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1300 pi1_renamed_ping_timestamp;
1301 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1302 pi1_ping_timestamp;
1303 if (!shared()) {
1304 pi1_renamed_ping_timestamp =
1305 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1306 pi1_event_loop.get(),
1307 "/pi1/aos/remote_timestamps/pi2/pi1/production/aos-examples-Ping");
1308 pi1_ping_timestamp =
1309 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1310 pi1_event_loop.get(),
1311 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1312 }
1313
1314 log_reader_factory.Run();
1315
1316 EXPECT_EQ(pi1_ping.count(), 0u);
1317 EXPECT_EQ(pi2_ping.count(), 0u);
1318 EXPECT_NE(pi1_renamed_ping.count(), 0u);
1319 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1320 if (!shared()) {
1321 EXPECT_NE(pi1_renamed_ping_timestamp->count(), 0u);
1322 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1323 }
1324
1325 reader.Deregister();
1326}
1327
Naman Guptaa63aa132023-03-22 20:06:34 -07001328// Tests that we observe all the same events in log replay (for a given node)
1329// whether we just register an event loop for that node or if we register a full
1330// event loop factory.
1331TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
1332 time_converter_.StartEqual();
1333 constexpr chrono::milliseconds kStartupDelay(95);
1334 {
1335 LoggerState pi1_logger = MakeLogger(pi1_);
1336 LoggerState pi2_logger = MakeLogger(pi2_);
1337
1338 event_loop_factory_.RunFor(kStartupDelay);
1339
1340 StartLogger(&pi1_logger);
1341 StartLogger(&pi2_logger);
1342
1343 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1344 }
1345
1346 LogReader full_reader(SortParts(logfiles_));
1347 LogReader single_node_reader(SortParts(logfiles_));
1348
1349 SimulatedEventLoopFactory full_factory(full_reader.configuration());
1350 SimulatedEventLoopFactory single_node_factory(
1351 single_node_reader.configuration());
1352 single_node_factory.SkipTimingReport();
1353 single_node_factory.DisableStatistics();
1354 std::unique_ptr<EventLoop> replay_event_loop =
1355 single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
1356 "log_reader");
1357
1358 full_reader.Register(&full_factory);
1359 single_node_reader.Register(replay_event_loop.get());
1360
1361 const Node *full_pi1 =
1362 configuration::GetNode(full_factory.configuration(), "pi1");
1363
1364 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1365 // else should have moved.
1366 std::unique_ptr<EventLoop> full_event_loop =
1367 full_factory.MakeEventLoop("test", full_pi1);
1368 full_event_loop->SkipTimingReport();
1369 full_event_loop->SkipAosLog();
1370 // maps are indexed on channel index.
1371 // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
1372 std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
1373 observed_messages;
1374 std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
1375 for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
1376 ++ii) {
1377 const Channel *channel =
1378 full_event_loop->configuration()->channels()->Get(ii);
1379 // We currently don't support replaying remote timestamp channels in
1380 // realtime replay (unless the remote timestamp channel was not NOT_LOGGED,
1381 // in which case it gets auto-remapped and replayed on a /original channel).
1382 if (channel->name()->string_view().find("remote_timestamp") !=
1383 std::string_view::npos &&
1384 channel->name()->string_view().find("/original") ==
1385 std::string_view::npos) {
1386 continue;
1387 }
1388 if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
1389 observed_messages[ii] = {};
1390 fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
1391 full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
1392 if (fetchers[ii]->Fetch()) {
1393 observed_messages[ii].push_back(std::make_pair(
1394 fetchers[ii]->context().monotonic_event_time, true));
1395 }
1396 });
1397 full_event_loop->MakeRawNoArgWatcher(
1398 channel, [ii, &observed_messages](const Context &context) {
1399 observed_messages[ii].push_back(
1400 std::make_pair(context.monotonic_event_time, false));
1401 });
1402 }
1403 }
1404
1405 full_factory.Run();
1406 fetchers.clear();
1407 full_reader.Deregister();
1408
1409 const Node *single_node_pi1 =
1410 configuration::GetNode(single_node_factory.configuration(), "pi1");
1411 std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
1412
1413 std::unique_ptr<EventLoop> single_node_event_loop =
1414 single_node_factory.MakeEventLoop("test", single_node_pi1);
1415 single_node_event_loop->SkipTimingReport();
1416 single_node_event_loop->SkipAosLog();
1417 for (size_t ii = 0;
1418 ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
1419 const Channel *channel =
1420 single_node_event_loop->configuration()->channels()->Get(ii);
1421 single_node_factory.DisableForwarding(channel);
1422 if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
1423 single_node_fetchers[ii] =
1424 single_node_event_loop->MakeRawFetcher(channel);
1425 single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
1426 EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
1427 << "Single EventLoop replay doesn't support pre-loading fetchers. "
1428 << configuration::StrippedChannelToString(channel);
1429 });
1430 single_node_event_loop->MakeRawNoArgWatcher(
1431 channel, [ii, &observed_messages, channel,
1432 kStartupDelay](const Context &context) {
1433 if (observed_messages[ii].empty()) {
1434 FAIL() << "Observed extra message at "
1435 << context.monotonic_event_time << " on "
1436 << configuration::StrippedChannelToString(channel);
1437 return;
1438 }
1439 const std::pair<monotonic_clock::time_point, bool> &message =
1440 observed_messages[ii].front();
1441 if (message.second) {
1442 EXPECT_LE(message.first,
1443 context.monotonic_event_time + kStartupDelay)
1444 << "Mismatched message times " << context.monotonic_event_time
1445 << " and " << message.first << " on "
1446 << configuration::StrippedChannelToString(channel);
1447 } else {
1448 EXPECT_EQ(message.first,
1449 context.monotonic_event_time + kStartupDelay)
1450 << "Mismatched message times " << context.monotonic_event_time
1451 << " and " << message.first << " on "
1452 << configuration::StrippedChannelToString(channel);
1453 }
1454 observed_messages[ii].erase(observed_messages[ii].begin());
1455 });
1456 }
1457 }
1458
1459 single_node_factory.Run();
1460
1461 single_node_fetchers.clear();
1462
1463 single_node_reader.Deregister();
1464
1465 for (const auto &pair : observed_messages) {
1466 EXPECT_TRUE(pair.second.empty())
1467 << "Missed " << pair.second.size() << " messages on "
1468 << configuration::StrippedChannelToString(
1469 single_node_event_loop->configuration()->channels()->Get(
1470 pair.first));
1471 }
1472}
1473
1474// Tests that we properly recreate forwarded timestamps when replaying a log.
1475// This should be enough that we can then re-run the logger and get a valid log
1476// back.
1477TEST_P(MultinodeLoggerTest, MessageHeader) {
1478 time_converter_.StartEqual();
1479 {
1480 LoggerState pi1_logger = MakeLogger(pi1_);
1481 LoggerState pi2_logger = MakeLogger(pi2_);
1482
1483 event_loop_factory_.RunFor(chrono::milliseconds(95));
1484
1485 StartLogger(&pi1_logger);
1486 StartLogger(&pi2_logger);
1487
1488 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1489 }
1490
1491 LogReader reader(SortParts(logfiles_));
1492
1493 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1494 log_reader_factory.set_send_delay(chrono::microseconds(0));
1495
1496 // This sends out the fetched messages and advances time to the start of the
1497 // log file.
1498 reader.Register(&log_reader_factory);
1499
1500 const Node *pi1 =
1501 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1502 const Node *pi2 =
1503 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1504
1505 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1506 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1507 LOG(INFO) << "now pi1 "
1508 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1509 LOG(INFO) << "now pi2 "
1510 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1511
1512 EXPECT_THAT(reader.LoggedNodes(),
1513 ::testing::ElementsAre(
1514 configuration::GetNode(reader.logged_configuration(), pi1),
1515 configuration::GetNode(reader.logged_configuration(), pi2)));
1516
1517 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1518
1519 std::unique_ptr<EventLoop> pi1_event_loop =
1520 log_reader_factory.MakeEventLoop("test", pi1);
1521 std::unique_ptr<EventLoop> pi2_event_loop =
1522 log_reader_factory.MakeEventLoop("test", pi2);
1523
1524 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
1525 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1526 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
1527 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1528
1529 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1530 pi1_event_loop->MakeFetcher<examples::Ping>("/test");
1531 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1532 pi2_event_loop->MakeFetcher<examples::Ping>("/test");
1533
1534 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
1535 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1536 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
1537 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1538
1539 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1540 pi2_event_loop->MakeFetcher<examples::Pong>("/test");
1541 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1542 pi1_event_loop->MakeFetcher<examples::Pong>("/test");
1543
1544 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
1545 pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
1546 const size_t ping_timestamp_channel = configuration::ChannelIndex(
1547 pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
1548
1549 const size_t pi2_timestamp_channel = configuration::ChannelIndex(
1550 pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
1551 const size_t pong_timestamp_channel = configuration::ChannelIndex(
1552 pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
1553
1554 const chrono::nanoseconds network_delay = event_loop_factory_.network_delay();
1555 const chrono::nanoseconds send_delay = event_loop_factory_.send_delay();
1556
1557 for (std::pair<int, std::string> channel :
1558 shared()
1559 ? std::vector<
1560 std::pair<int, std::string>>{{-1,
1561 "/aos/remote_timestamps/pi2"}}
1562 : std::vector<std::pair<int, std::string>>{
1563 {pi1_timestamp_channel,
1564 "/aos/remote_timestamps/pi2/pi1/aos/"
1565 "aos-message_bridge-Timestamp"},
1566 {ping_timestamp_channel,
1567 "/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
1568 pi1_event_loop->MakeWatcher(
1569 channel.second,
1570 [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
1571 ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
1572 &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
1573 &ping_on_pi2_fetcher, network_delay, send_delay,
1574 channel_index = channel.first](const RemoteMessage &header) {
1575 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1576 chrono::nanoseconds(header.monotonic_sent_time()));
1577 const aos::realtime_clock::time_point header_realtime_sent_time(
1578 chrono::nanoseconds(header.realtime_sent_time()));
1579 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1580 chrono::nanoseconds(header.monotonic_remote_time()));
1581 const aos::realtime_clock::time_point header_realtime_remote_time(
1582 chrono::nanoseconds(header.realtime_remote_time()));
1583
1584 if (channel_index != -1) {
1585 ASSERT_EQ(channel_index, header.channel_index());
1586 }
1587
1588 const Context *pi1_context = nullptr;
1589 const Context *pi2_context = nullptr;
1590
1591 if (header.channel_index() == pi1_timestamp_channel) {
1592 ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
1593 ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
1594 pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
1595 pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
1596 } else if (header.channel_index() == ping_timestamp_channel) {
1597 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
1598 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
1599 pi1_context = &ping_on_pi1_fetcher.context();
1600 pi2_context = &ping_on_pi2_fetcher.context();
1601 } else {
1602 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1603 << configuration::CleanedChannelToString(
1604 pi1_event_loop->configuration()->channels()->Get(
1605 header.channel_index()));
1606 }
1607
1608 ASSERT_TRUE(header.has_boot_uuid());
1609 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1610 pi2_event_loop->boot_uuid());
1611
1612 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
1613 EXPECT_EQ(pi2_context->remote_queue_index,
1614 header.remote_queue_index());
1615 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
1616
1617 EXPECT_EQ(pi2_context->monotonic_event_time,
1618 header_monotonic_sent_time);
1619 EXPECT_EQ(pi2_context->realtime_event_time,
1620 header_realtime_sent_time);
1621 EXPECT_EQ(pi2_context->realtime_remote_time,
1622 header_realtime_remote_time);
1623 EXPECT_EQ(pi2_context->monotonic_remote_time,
1624 header_monotonic_remote_time);
1625
1626 EXPECT_EQ(pi1_context->realtime_event_time,
1627 header_realtime_remote_time);
1628 EXPECT_EQ(pi1_context->monotonic_event_time,
1629 header_monotonic_remote_time);
1630
1631 // Time estimation isn't perfect, but we know the clocks were
1632 // identical when logged, so we know when this should have come back.
1633 // Confirm we got it when we expected.
1634 EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
1635 pi1_context->monotonic_event_time + 2 * network_delay +
1636 send_delay);
1637 });
1638 }
1639 for (std::pair<int, std::string> channel :
1640 shared()
1641 ? std::vector<
1642 std::pair<int, std::string>>{{-1,
1643 "/aos/remote_timestamps/pi1"}}
1644 : std::vector<std::pair<int, std::string>>{
1645 {pi2_timestamp_channel,
1646 "/aos/remote_timestamps/pi1/pi2/aos/"
1647 "aos-message_bridge-Timestamp"}}) {
1648 pi2_event_loop->MakeWatcher(
1649 channel.second,
1650 [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
1651 pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
1652 &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
1653 &pong_on_pi1_fetcher, network_delay, send_delay,
1654 channel_index = channel.first](const RemoteMessage &header) {
1655 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1656 chrono::nanoseconds(header.monotonic_sent_time()));
1657 const aos::realtime_clock::time_point header_realtime_sent_time(
1658 chrono::nanoseconds(header.realtime_sent_time()));
1659 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1660 chrono::nanoseconds(header.monotonic_remote_time()));
1661 const aos::realtime_clock::time_point header_realtime_remote_time(
1662 chrono::nanoseconds(header.realtime_remote_time()));
1663
1664 if (channel_index != -1) {
1665 ASSERT_EQ(channel_index, header.channel_index());
1666 }
1667
1668 const Context *pi2_context = nullptr;
1669 const Context *pi1_context = nullptr;
1670
1671 if (header.channel_index() == pi2_timestamp_channel) {
1672 ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
1673 ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
1674 pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
1675 pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
1676 } else if (header.channel_index() == pong_timestamp_channel) {
1677 ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
1678 ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
1679 pi2_context = &pong_on_pi2_fetcher.context();
1680 pi1_context = &pong_on_pi1_fetcher.context();
1681 } else {
1682 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1683 << configuration::CleanedChannelToString(
1684 pi2_event_loop->configuration()->channels()->Get(
1685 header.channel_index()));
1686 }
1687
1688 ASSERT_TRUE(header.has_boot_uuid());
1689 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1690 pi1_event_loop->boot_uuid());
1691
1692 EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
1693 EXPECT_EQ(pi1_context->remote_queue_index,
1694 header.remote_queue_index());
1695 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
1696
1697 EXPECT_EQ(pi1_context->monotonic_event_time,
1698 header_monotonic_sent_time);
1699 EXPECT_EQ(pi1_context->realtime_event_time,
1700 header_realtime_sent_time);
1701 EXPECT_EQ(pi1_context->realtime_remote_time,
1702 header_realtime_remote_time);
1703 EXPECT_EQ(pi1_context->monotonic_remote_time,
1704 header_monotonic_remote_time);
1705
1706 EXPECT_EQ(pi2_context->realtime_event_time,
1707 header_realtime_remote_time);
1708 EXPECT_EQ(pi2_context->monotonic_event_time,
1709 header_monotonic_remote_time);
1710
1711 // Time estimation isn't perfect, but we know the clocks were
1712 // identical when logged, so we know when this should have come back.
1713 // Confirm we got it when we expected.
1714 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
1715 pi2_context->monotonic_event_time + 2 * network_delay +
1716 send_delay);
1717 });
1718 }
1719
1720 // And confirm we can re-create a log again, while checking the contents.
1721 {
1722 LoggerState pi1_logger = MakeLogger(
1723 log_reader_factory.GetNodeEventLoopFactory("pi1"), &log_reader_factory);
1724 LoggerState pi2_logger = MakeLogger(
1725 log_reader_factory.GetNodeEventLoopFactory("pi2"), &log_reader_factory);
1726
1727 StartLogger(&pi1_logger, tmp_dir_ + "/relogged1");
1728 StartLogger(&pi2_logger, tmp_dir_ + "/relogged2");
1729
1730 log_reader_factory.Run();
1731 }
1732
1733 reader.Deregister();
1734
1735 // And verify that we can run the LogReader over the relogged files without
1736 // hitting any fatal errors.
1737 {
1738 LogReader relogged_reader(SortParts(MakeLogFiles(
1739 tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 3, 3, true)));
1740 relogged_reader.Register();
1741
1742 relogged_reader.event_loop_factory()->Run();
1743 }
1744 // And confirm that we can read the logged file using the reader's
1745 // configuration.
1746 {
1747 LogReader relogged_reader(
1748 SortParts(MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2",
1749 3, 3, true)),
1750 reader.configuration());
1751 relogged_reader.Register();
1752
1753 relogged_reader.event_loop_factory()->Run();
1754 }
1755}
1756
1757// Tests that we properly populate and extract the logger_start time by setting
1758// up a clock difference between 2 nodes and looking at the resulting parts.
1759TEST_P(MultinodeLoggerTest, LoggerStartTime) {
1760 std::vector<std::string> actual_filenames;
1761 time_converter_.AddMonotonic(
1762 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1763 {
1764 LoggerState pi1_logger = MakeLogger(pi1_);
1765 LoggerState pi2_logger = MakeLogger(pi2_);
1766
1767 StartLogger(&pi1_logger);
1768 StartLogger(&pi2_logger);
1769
1770 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1771
1772 pi1_logger.AppendAllFilenames(&actual_filenames);
1773 pi2_logger.AppendAllFilenames(&actual_filenames);
1774 }
1775
1776 ASSERT_THAT(actual_filenames,
1777 ::testing::UnorderedElementsAreArray(logfiles_));
1778
1779 for (const LogFile &log_file : SortParts(logfiles_)) {
1780 for (const LogParts &log_part : log_file.parts) {
1781 if (log_part.node == log_file.logger_node) {
1782 EXPECT_EQ(log_part.logger_monotonic_start_time,
1783 aos::monotonic_clock::min_time);
1784 EXPECT_EQ(log_part.logger_realtime_start_time,
1785 aos::realtime_clock::min_time);
1786 } else {
1787 const chrono::seconds offset = log_file.logger_node == "pi1"
1788 ? -chrono::seconds(1000)
1789 : chrono::seconds(1000);
1790 EXPECT_EQ(log_part.logger_monotonic_start_time,
1791 log_part.monotonic_start_time + offset);
1792 EXPECT_EQ(log_part.logger_realtime_start_time,
1793 log_file.realtime_start_time +
1794 (log_part.logger_monotonic_start_time -
1795 log_file.monotonic_start_time));
1796 }
1797 }
1798 }
1799}
1800
1801// Test that renaming the base, renames the folder.
1802TEST_P(MultinodeLoggerTest, LoggerRenameFolder) {
1803 util::UnlinkRecursive(tmp_dir_ + "/renamefolder");
1804 util::UnlinkRecursive(tmp_dir_ + "/new-good");
1805 time_converter_.AddMonotonic(
1806 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1807 logfile_base1_ = tmp_dir_ + "/renamefolder/multi_logfile1";
1808 logfile_base2_ = tmp_dir_ + "/renamefolder/multi_logfile2";
1809 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
1810 LoggerState pi1_logger = MakeLogger(pi1_);
1811 LoggerState pi2_logger = MakeLogger(pi2_);
1812
1813 StartLogger(&pi1_logger);
1814 StartLogger(&pi2_logger);
1815
1816 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1817 logfile_base1_ = tmp_dir_ + "/new-good/multi_logfile1";
1818 logfile_base2_ = tmp_dir_ + "/new-good/multi_logfile2";
1819 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
Alexei Strotscaf17d32023-04-03 22:31:11 -07001820
1821 // Sequence of set_base_name and Rotate simulates rename operation. Since
1822 // rename is not supported by all namers, RenameLogBase moved from logger to
1823 // the higher level abstraction, yet log_namers support rename, and it is
1824 // legal to test it here.
1825 pi1_logger.log_namer->set_base_name(logfile_base1_);
1826 pi1_logger.logger->Rotate();
1827 pi2_logger.log_namer->set_base_name(logfile_base2_);
1828 pi2_logger.logger->Rotate();
1829
Naman Guptaa63aa132023-03-22 20:06:34 -07001830 for (auto &file : logfiles_) {
1831 struct stat s;
1832 EXPECT_EQ(0, stat(file.c_str(), &s));
1833 }
1834}
1835
1836// Test that renaming the file base dies.
1837TEST_P(MultinodeLoggerDeathTest, LoggerRenameFile) {
1838 time_converter_.AddMonotonic(
1839 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1840 util::UnlinkRecursive(tmp_dir_ + "/renamefile");
1841 logfile_base1_ = tmp_dir_ + "/renamefile/multi_logfile1";
1842 logfile_base2_ = tmp_dir_ + "/renamefile/multi_logfile2";
1843 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
1844 LoggerState pi1_logger = MakeLogger(pi1_);
1845 StartLogger(&pi1_logger);
1846 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1847 logfile_base1_ = tmp_dir_ + "/new-renamefile/new_multi_logfile1";
Alexei Strotscaf17d32023-04-03 22:31:11 -07001848 EXPECT_DEATH({ pi1_logger.log_namer->set_base_name(logfile_base1_); },
Naman Guptaa63aa132023-03-22 20:06:34 -07001849 "Rename of file base from");
1850}
1851
1852// TODO(austin): We can write a test which recreates a logfile and confirms that
1853// we get it back. That is the ultimate test.
1854
1855// Tests that we properly recreate forwarded timestamps when replaying a log.
1856// This should be enough that we can then re-run the logger and get a valid log
1857// back.
1858TEST_P(MultinodeLoggerTest, RemoteReboot) {
1859 std::vector<std::string> actual_filenames;
1860
1861 const UUID pi1_boot0 = UUID::Random();
1862 const UUID pi2_boot0 = UUID::Random();
1863 const UUID pi2_boot1 = UUID::Random();
1864 {
1865 CHECK_EQ(pi1_index_, 0u);
1866 CHECK_EQ(pi2_index_, 1u);
1867
1868 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
1869 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
1870 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
1871
1872 time_converter_.AddNextTimestamp(
1873 distributed_clock::epoch(),
1874 {BootTimestamp::epoch(), BootTimestamp::epoch()});
1875 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
1876 time_converter_.AddNextTimestamp(
1877 distributed_clock::epoch() + reboot_time,
1878 {BootTimestamp::epoch() + reboot_time,
1879 BootTimestamp{
1880 .boot = 1,
1881 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
1882 }
1883
1884 {
1885 LoggerState pi1_logger = MakeLogger(pi1_);
1886
1887 event_loop_factory_.RunFor(chrono::milliseconds(95));
1888 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
1889 pi1_boot0);
1890 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
1891 pi2_boot0);
1892
1893 StartLogger(&pi1_logger);
1894
1895 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1896
1897 VLOG(1) << "Reboot now!";
1898
1899 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1900 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
1901 pi1_boot0);
1902 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
1903 pi2_boot1);
1904
1905 pi1_logger.AppendAllFilenames(&actual_filenames);
1906 }
1907
1908 std::sort(actual_filenames.begin(), actual_filenames.end());
1909 std::sort(pi1_reboot_logfiles_.begin(), pi1_reboot_logfiles_.end());
1910 ASSERT_THAT(actual_filenames,
1911 ::testing::UnorderedElementsAreArray(pi1_reboot_logfiles_));
1912
1913 // Confirm that our new oldest timestamps properly update as we reboot and
1914 // rotate.
1915 for (const std::string &file : pi1_reboot_logfiles_) {
1916 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
1917 ReadHeader(file);
1918 CHECK(log_header);
1919 if (log_header->message().has_configuration()) {
1920 continue;
1921 }
1922
1923 const monotonic_clock::time_point monotonic_start_time =
1924 monotonic_clock::time_point(
1925 chrono::nanoseconds(log_header->message().monotonic_start_time()));
1926 const UUID source_node_boot_uuid = UUID::FromString(
1927 log_header->message().source_node_boot_uuid()->string_view());
1928
1929 if (log_header->message().node()->name()->string_view() != "pi1") {
1930 // The remote message channel should rotate later and have more parts.
1931 // This only is true on the log files with shared remote messages.
1932 //
1933 // TODO(austin): I'm not the most thrilled with this test pattern... It
1934 // feels brittle in a different way.
1935 if (file.find("aos.message_bridge.RemoteMessage") == std::string::npos ||
1936 !shared()) {
1937 switch (log_header->message().parts_index()) {
1938 case 0:
1939 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1940 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
1941 break;
1942 case 1:
1943 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1944 ASSERT_EQ(monotonic_start_time,
1945 monotonic_clock::epoch() + chrono::seconds(1));
1946 break;
1947 case 2:
1948 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1949 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
1950 break;
1951 case 3:
1952 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1953 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
1954 chrono::nanoseconds(2322999462))
1955 << " on " << file;
1956 break;
1957 default:
1958 FAIL();
1959 break;
1960 }
1961 } else {
1962 switch (log_header->message().parts_index()) {
1963 case 0:
1964 case 1:
1965 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1966 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
1967 break;
1968 case 2:
1969 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
1970 ASSERT_EQ(monotonic_start_time,
1971 monotonic_clock::epoch() + chrono::seconds(1));
1972 break;
1973 case 3:
1974 case 4:
1975 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1976 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
1977 break;
1978 case 5:
1979 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
1980 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
1981 chrono::nanoseconds(2322999462))
1982 << " on " << file;
1983 break;
1984 default:
1985 FAIL();
1986 break;
1987 }
1988 }
1989 continue;
1990 }
1991 SCOPED_TRACE(file);
1992 SCOPED_TRACE(aos::FlatbufferToJson(
1993 *log_header, {.multi_line = true, .max_vector_size = 100}));
1994 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
1995 ASSERT_EQ(
1996 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
1997 EXPECT_EQ(
1998 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
1999 monotonic_clock::max_time.time_since_epoch().count());
2000 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2001 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2002 2u);
2003 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2004 monotonic_clock::max_time.time_since_epoch().count());
2005 ASSERT_TRUE(log_header->message()
2006 .has_oldest_remote_unreliable_monotonic_timestamps());
2007 ASSERT_EQ(log_header->message()
2008 .oldest_remote_unreliable_monotonic_timestamps()
2009 ->size(),
2010 2u);
2011 EXPECT_EQ(log_header->message()
2012 .oldest_remote_unreliable_monotonic_timestamps()
2013 ->Get(0),
2014 monotonic_clock::max_time.time_since_epoch().count());
2015 ASSERT_TRUE(log_header->message()
2016 .has_oldest_local_unreliable_monotonic_timestamps());
2017 ASSERT_EQ(log_header->message()
2018 .oldest_local_unreliable_monotonic_timestamps()
2019 ->size(),
2020 2u);
2021 EXPECT_EQ(log_header->message()
2022 .oldest_local_unreliable_monotonic_timestamps()
2023 ->Get(0),
2024 monotonic_clock::max_time.time_since_epoch().count());
2025
2026 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2027 monotonic_clock::time_point(chrono::nanoseconds(
2028 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2029 1)));
2030 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2031 monotonic_clock::time_point(chrono::nanoseconds(
2032 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2033 const monotonic_clock::time_point
2034 oldest_remote_unreliable_monotonic_timestamps =
2035 monotonic_clock::time_point(chrono::nanoseconds(
2036 log_header->message()
2037 .oldest_remote_unreliable_monotonic_timestamps()
2038 ->Get(1)));
2039 const monotonic_clock::time_point
2040 oldest_local_unreliable_monotonic_timestamps =
2041 monotonic_clock::time_point(chrono::nanoseconds(
2042 log_header->message()
2043 .oldest_local_unreliable_monotonic_timestamps()
2044 ->Get(1)));
2045 const monotonic_clock::time_point
2046 oldest_remote_reliable_monotonic_timestamps =
2047 monotonic_clock::time_point(chrono::nanoseconds(
2048 log_header->message()
2049 .oldest_remote_reliable_monotonic_timestamps()
2050 ->Get(1)));
2051 const monotonic_clock::time_point
2052 oldest_local_reliable_monotonic_timestamps =
2053 monotonic_clock::time_point(chrono::nanoseconds(
2054 log_header->message()
2055 .oldest_local_reliable_monotonic_timestamps()
2056 ->Get(1)));
2057 const monotonic_clock::time_point
2058 oldest_logger_remote_unreliable_monotonic_timestamps =
2059 monotonic_clock::time_point(chrono::nanoseconds(
2060 log_header->message()
2061 .oldest_logger_remote_unreliable_monotonic_timestamps()
2062 ->Get(0)));
2063 const monotonic_clock::time_point
2064 oldest_logger_local_unreliable_monotonic_timestamps =
2065 monotonic_clock::time_point(chrono::nanoseconds(
2066 log_header->message()
2067 .oldest_logger_local_unreliable_monotonic_timestamps()
2068 ->Get(0)));
2069 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2070 monotonic_clock::max_time);
2071 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2072 monotonic_clock::max_time);
2073 switch (log_header->message().parts_index()) {
2074 case 0:
2075 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2076 monotonic_clock::max_time);
2077 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2078 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2079 monotonic_clock::max_time);
2080 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2081 monotonic_clock::max_time);
2082 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2083 monotonic_clock::max_time);
2084 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2085 monotonic_clock::max_time);
2086 break;
2087 case 1:
2088 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2089 monotonic_clock::time_point(chrono::microseconds(90200)));
2090 EXPECT_EQ(oldest_local_monotonic_timestamps,
2091 monotonic_clock::time_point(chrono::microseconds(90350)));
2092 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2093 monotonic_clock::time_point(chrono::microseconds(90200)));
2094 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2095 monotonic_clock::time_point(chrono::microseconds(90350)));
2096 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2097 monotonic_clock::max_time);
2098 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2099 monotonic_clock::max_time);
2100 break;
2101 case 2:
2102 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2103 monotonic_clock::time_point(chrono::microseconds(90200)))
2104 << file;
2105 EXPECT_EQ(oldest_local_monotonic_timestamps,
2106 monotonic_clock::time_point(chrono::microseconds(90350)))
2107 << file;
2108 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2109 monotonic_clock::time_point(chrono::microseconds(90200)))
2110 << file;
2111 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2112 monotonic_clock::time_point(chrono::microseconds(90350)))
2113 << file;
2114 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2115 monotonic_clock::time_point(chrono::microseconds(100000)))
2116 << file;
2117 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2118 monotonic_clock::time_point(chrono::microseconds(100150)))
2119 << file;
2120 break;
2121 case 3:
2122 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2123 monotonic_clock::time_point(chrono::milliseconds(1323) +
2124 chrono::microseconds(200)));
2125 EXPECT_EQ(oldest_local_monotonic_timestamps,
2126 monotonic_clock::time_point(chrono::microseconds(10100350)));
2127 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2128 monotonic_clock::time_point(chrono::milliseconds(1323) +
2129 chrono::microseconds(200)));
2130 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2131 monotonic_clock::time_point(chrono::microseconds(10100350)));
2132 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2133 monotonic_clock::max_time)
2134 << file;
2135 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2136 monotonic_clock::max_time)
2137 << file;
2138 break;
2139 case 4:
2140 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2141 monotonic_clock::time_point(chrono::milliseconds(1323) +
2142 chrono::microseconds(200)));
2143 EXPECT_EQ(oldest_local_monotonic_timestamps,
2144 monotonic_clock::time_point(chrono::microseconds(10100350)));
2145 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2146 monotonic_clock::time_point(chrono::milliseconds(1323) +
2147 chrono::microseconds(200)));
2148 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2149 monotonic_clock::time_point(chrono::microseconds(10100350)));
2150 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2151 monotonic_clock::time_point(chrono::microseconds(1423000)))
2152 << file;
2153 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2154 monotonic_clock::time_point(chrono::microseconds(10200150)))
2155 << file;
2156 break;
2157 default:
2158 FAIL();
2159 break;
2160 }
2161 }
2162
2163 // Confirm that we refuse to replay logs with missing boot uuids.
2164 {
2165 LogReader reader(SortParts(pi1_reboot_logfiles_));
2166
2167 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2168 log_reader_factory.set_send_delay(chrono::microseconds(0));
2169
2170 // This sends out the fetched messages and advances time to the start of
2171 // the log file.
2172 reader.Register(&log_reader_factory);
2173
2174 log_reader_factory.Run();
2175
2176 reader.Deregister();
2177 }
2178}
2179
2180// Tests that we can sort a log which only has timestamps from the remote
2181// because the local message_bridge_client failed to connect.
2182TEST_P(MultinodeLoggerTest, RemoteRebootOnlyTimestamps) {
2183 const UUID pi1_boot0 = UUID::Random();
2184 const UUID pi2_boot0 = UUID::Random();
2185 const UUID pi2_boot1 = UUID::Random();
2186 {
2187 CHECK_EQ(pi1_index_, 0u);
2188 CHECK_EQ(pi2_index_, 1u);
2189
2190 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2191 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2192 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2193
2194 time_converter_.AddNextTimestamp(
2195 distributed_clock::epoch(),
2196 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2197 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2198 time_converter_.AddNextTimestamp(
2199 distributed_clock::epoch() + reboot_time,
2200 {BootTimestamp::epoch() + reboot_time,
2201 BootTimestamp{
2202 .boot = 1,
2203 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2204 }
2205 pi2_->Disconnect(pi1_->node());
2206
2207 std::vector<std::string> filenames;
2208 {
2209 LoggerState pi1_logger = MakeLogger(pi1_);
2210
2211 event_loop_factory_.RunFor(chrono::milliseconds(95));
2212 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2213 pi1_boot0);
2214 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2215 pi2_boot0);
2216
2217 StartLogger(&pi1_logger);
2218
2219 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2220
2221 VLOG(1) << "Reboot now!";
2222
2223 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2224 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2225 pi1_boot0);
2226 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2227 pi2_boot1);
2228 pi1_logger.AppendAllFilenames(&filenames);
2229 }
2230
2231 std::sort(filenames.begin(), filenames.end());
2232
2233 // Confirm that our new oldest timestamps properly update as we reboot and
2234 // rotate.
2235 size_t timestamp_file_count = 0;
2236 for (const std::string &file : filenames) {
2237 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2238 ReadHeader(file);
2239 CHECK(log_header);
2240
2241 if (log_header->message().has_configuration()) {
2242 continue;
2243 }
2244
2245 const monotonic_clock::time_point monotonic_start_time =
2246 monotonic_clock::time_point(
2247 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2248 const UUID source_node_boot_uuid = UUID::FromString(
2249 log_header->message().source_node_boot_uuid()->string_view());
2250
2251 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2252 ASSERT_EQ(
2253 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2254 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2255 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2256 2u);
2257 ASSERT_TRUE(log_header->message()
2258 .has_oldest_remote_unreliable_monotonic_timestamps());
2259 ASSERT_EQ(log_header->message()
2260 .oldest_remote_unreliable_monotonic_timestamps()
2261 ->size(),
2262 2u);
2263 ASSERT_TRUE(log_header->message()
2264 .has_oldest_local_unreliable_monotonic_timestamps());
2265 ASSERT_EQ(log_header->message()
2266 .oldest_local_unreliable_monotonic_timestamps()
2267 ->size(),
2268 2u);
2269 ASSERT_TRUE(log_header->message()
2270 .has_oldest_remote_reliable_monotonic_timestamps());
2271 ASSERT_EQ(log_header->message()
2272 .oldest_remote_reliable_monotonic_timestamps()
2273 ->size(),
2274 2u);
2275 ASSERT_TRUE(
2276 log_header->message().has_oldest_local_reliable_monotonic_timestamps());
2277 ASSERT_EQ(log_header->message()
2278 .oldest_local_reliable_monotonic_timestamps()
2279 ->size(),
2280 2u);
2281
2282 ASSERT_TRUE(
2283 log_header->message()
2284 .has_oldest_logger_remote_unreliable_monotonic_timestamps());
2285 ASSERT_EQ(log_header->message()
2286 .oldest_logger_remote_unreliable_monotonic_timestamps()
2287 ->size(),
2288 2u);
2289 ASSERT_TRUE(log_header->message()
2290 .has_oldest_logger_local_unreliable_monotonic_timestamps());
2291 ASSERT_EQ(log_header->message()
2292 .oldest_logger_local_unreliable_monotonic_timestamps()
2293 ->size(),
2294 2u);
2295
2296 if (log_header->message().node()->name()->string_view() != "pi1") {
2297 ASSERT_TRUE(file.find("aos.message_bridge.RemoteMessage") !=
2298 std::string::npos);
2299
2300 const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
2301 ReadNthMessage(file, 0);
2302 CHECK(msg);
2303
2304 EXPECT_TRUE(msg->message().has_monotonic_sent_time());
2305 EXPECT_TRUE(msg->message().has_monotonic_remote_time());
2306
2307 const monotonic_clock::time_point
2308 expected_oldest_local_monotonic_timestamps(
2309 chrono::nanoseconds(msg->message().monotonic_sent_time()));
2310 const monotonic_clock::time_point
2311 expected_oldest_remote_monotonic_timestamps(
2312 chrono::nanoseconds(msg->message().monotonic_remote_time()));
2313 const monotonic_clock::time_point
2314 expected_oldest_timestamp_monotonic_timestamps(
2315 chrono::nanoseconds(msg->message().monotonic_timestamp_time()));
2316
2317 EXPECT_NE(expected_oldest_local_monotonic_timestamps,
2318 monotonic_clock::min_time);
2319 EXPECT_NE(expected_oldest_remote_monotonic_timestamps,
2320 monotonic_clock::min_time);
2321 EXPECT_NE(expected_oldest_timestamp_monotonic_timestamps,
2322 monotonic_clock::min_time);
2323
2324 ++timestamp_file_count;
2325 // Since the log file is from the perspective of the other node,
2326 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2327 monotonic_clock::time_point(chrono::nanoseconds(
2328 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2329 0)));
2330 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2331 monotonic_clock::time_point(chrono::nanoseconds(
2332 log_header->message().oldest_local_monotonic_timestamps()->Get(
2333 0)));
2334 const monotonic_clock::time_point
2335 oldest_remote_unreliable_monotonic_timestamps =
2336 monotonic_clock::time_point(chrono::nanoseconds(
2337 log_header->message()
2338 .oldest_remote_unreliable_monotonic_timestamps()
2339 ->Get(0)));
2340 const monotonic_clock::time_point
2341 oldest_local_unreliable_monotonic_timestamps =
2342 monotonic_clock::time_point(chrono::nanoseconds(
2343 log_header->message()
2344 .oldest_local_unreliable_monotonic_timestamps()
2345 ->Get(0)));
2346 const monotonic_clock::time_point
2347 oldest_remote_reliable_monotonic_timestamps =
2348 monotonic_clock::time_point(chrono::nanoseconds(
2349 log_header->message()
2350 .oldest_remote_reliable_monotonic_timestamps()
2351 ->Get(0)));
2352 const monotonic_clock::time_point
2353 oldest_local_reliable_monotonic_timestamps =
2354 monotonic_clock::time_point(chrono::nanoseconds(
2355 log_header->message()
2356 .oldest_local_reliable_monotonic_timestamps()
2357 ->Get(0)));
2358 const monotonic_clock::time_point
2359 oldest_logger_remote_unreliable_monotonic_timestamps =
2360 monotonic_clock::time_point(chrono::nanoseconds(
2361 log_header->message()
2362 .oldest_logger_remote_unreliable_monotonic_timestamps()
2363 ->Get(1)));
2364 const monotonic_clock::time_point
2365 oldest_logger_local_unreliable_monotonic_timestamps =
2366 monotonic_clock::time_point(chrono::nanoseconds(
2367 log_header->message()
2368 .oldest_logger_local_unreliable_monotonic_timestamps()
2369 ->Get(1)));
2370
2371 const Channel *channel =
2372 event_loop_factory_.configuration()->channels()->Get(
2373 msg->message().channel_index());
2374 const Connection *connection = configuration::ConnectionToNode(
2375 channel, configuration::GetNode(
2376 event_loop_factory_.configuration(),
2377 log_header->message().node()->name()->string_view()));
2378
2379 const bool reliable = connection->time_to_live() == 0;
2380
2381 SCOPED_TRACE(file);
2382 SCOPED_TRACE(aos::FlatbufferToJson(
2383 *log_header, {.multi_line = true, .max_vector_size = 100}));
2384
2385 if (shared()) {
2386 // Confirm that the oldest timestamps match what we expect. Based on
2387 // what we are doing, we know that the oldest time is the first
2388 // message's time.
2389 //
2390 // This makes the test robust to both the split and combined config
2391 // tests.
2392 switch (log_header->message().parts_index()) {
2393 case 0:
2394 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2395 expected_oldest_remote_monotonic_timestamps);
2396 EXPECT_EQ(oldest_local_monotonic_timestamps,
2397 expected_oldest_local_monotonic_timestamps);
2398 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2399 expected_oldest_local_monotonic_timestamps)
2400 << file;
2401 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2402 expected_oldest_timestamp_monotonic_timestamps)
2403 << file;
2404
2405 if (reliable) {
2406 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2407 expected_oldest_remote_monotonic_timestamps);
2408 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2409 expected_oldest_local_monotonic_timestamps);
2410 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2411 monotonic_clock::max_time);
2412 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2413 monotonic_clock::max_time);
2414 } else {
2415 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2416 monotonic_clock::max_time);
2417 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2418 monotonic_clock::max_time);
2419 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2420 expected_oldest_remote_monotonic_timestamps);
2421 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2422 expected_oldest_local_monotonic_timestamps);
2423 }
2424 break;
2425 case 1:
2426 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2427 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2428 EXPECT_EQ(oldest_local_monotonic_timestamps,
2429 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2430 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2431 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2432 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2433 monotonic_clock::epoch() + chrono::nanoseconds(90250000));
2434 if (reliable) {
2435 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2436 expected_oldest_remote_monotonic_timestamps);
2437 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2438 expected_oldest_local_monotonic_timestamps);
2439 EXPECT_EQ(
2440 oldest_remote_unreliable_monotonic_timestamps,
2441 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2442 EXPECT_EQ(
2443 oldest_local_unreliable_monotonic_timestamps,
2444 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2445 } else {
2446 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2447 monotonic_clock::max_time);
2448 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2449 monotonic_clock::max_time);
2450 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2451 expected_oldest_remote_monotonic_timestamps);
2452 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2453 expected_oldest_local_monotonic_timestamps);
2454 }
2455 break;
2456 case 2:
2457 EXPECT_EQ(
2458 oldest_remote_monotonic_timestamps,
2459 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2460 EXPECT_EQ(
2461 oldest_local_monotonic_timestamps,
2462 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2463 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2464 expected_oldest_local_monotonic_timestamps)
2465 << file;
2466 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2467 expected_oldest_timestamp_monotonic_timestamps)
2468 << file;
2469 if (reliable) {
2470 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2471 expected_oldest_remote_monotonic_timestamps);
2472 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2473 expected_oldest_local_monotonic_timestamps);
2474 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2475 monotonic_clock::max_time);
2476 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2477 monotonic_clock::max_time);
2478 } else {
2479 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2480 monotonic_clock::max_time);
2481 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2482 monotonic_clock::max_time);
2483 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2484 expected_oldest_remote_monotonic_timestamps);
2485 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2486 expected_oldest_local_monotonic_timestamps);
2487 }
2488 break;
2489
2490 case 3:
2491 EXPECT_EQ(
2492 oldest_remote_monotonic_timestamps,
2493 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2494 EXPECT_EQ(
2495 oldest_local_monotonic_timestamps,
2496 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2497 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2498 expected_oldest_remote_monotonic_timestamps);
2499 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2500 expected_oldest_local_monotonic_timestamps);
2501 EXPECT_EQ(
2502 oldest_logger_remote_unreliable_monotonic_timestamps,
2503 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2504 EXPECT_EQ(
2505 oldest_logger_local_unreliable_monotonic_timestamps,
2506 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2507 break;
2508 default:
2509 FAIL();
2510 break;
2511 }
2512
2513 switch (log_header->message().parts_index()) {
2514 case 0:
2515 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2516 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2517 break;
2518 case 1:
2519 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2520 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2521 break;
2522 case 2:
2523 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2524 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2525 break;
2526 case 3:
2527 if (shared()) {
2528 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2529 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2530 break;
2531 }
2532 [[fallthrough]];
2533 default:
2534 FAIL();
2535 break;
2536 }
2537 } else {
2538 switch (log_header->message().parts_index()) {
2539 case 0:
2540 if (reliable) {
2541 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2542 monotonic_clock::max_time);
2543 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2544 monotonic_clock::max_time);
2545 EXPECT_EQ(
2546 oldest_logger_remote_unreliable_monotonic_timestamps,
2547 monotonic_clock::epoch() + chrono::nanoseconds(100150000))
2548 << file;
2549 EXPECT_EQ(
2550 oldest_logger_local_unreliable_monotonic_timestamps,
2551 monotonic_clock::epoch() + chrono::nanoseconds(100250000))
2552 << file;
2553 } else {
2554 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2555 expected_oldest_remote_monotonic_timestamps);
2556 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2557 expected_oldest_local_monotonic_timestamps);
2558 EXPECT_EQ(
2559 oldest_logger_remote_unreliable_monotonic_timestamps,
2560 monotonic_clock::epoch() + chrono::nanoseconds(90150000))
2561 << file;
2562 EXPECT_EQ(
2563 oldest_logger_local_unreliable_monotonic_timestamps,
2564 monotonic_clock::epoch() + chrono::nanoseconds(90250000))
2565 << file;
2566 }
2567 break;
2568 case 1:
2569 if (reliable) {
2570 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2571 monotonic_clock::max_time);
2572 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2573 monotonic_clock::max_time);
2574 EXPECT_EQ(
2575 oldest_logger_remote_unreliable_monotonic_timestamps,
2576 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2577 EXPECT_EQ(
2578 oldest_logger_local_unreliable_monotonic_timestamps,
2579 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2580 } else {
2581 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2582 expected_oldest_remote_monotonic_timestamps);
2583 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2584 expected_oldest_local_monotonic_timestamps);
2585 EXPECT_EQ(
2586 oldest_logger_remote_unreliable_monotonic_timestamps,
2587 monotonic_clock::epoch() + chrono::nanoseconds(1323150000));
2588 EXPECT_EQ(
2589 oldest_logger_local_unreliable_monotonic_timestamps,
2590 monotonic_clock::epoch() + chrono::nanoseconds(10100250000));
2591 }
2592 break;
2593 default:
2594 FAIL();
2595 break;
2596 }
2597
2598 switch (log_header->message().parts_index()) {
2599 case 0:
2600 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2601 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2602 break;
2603 case 1:
2604 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2605 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2606 break;
2607 default:
2608 FAIL();
2609 break;
2610 }
2611 }
2612
2613 continue;
2614 }
2615 EXPECT_EQ(
2616 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2617 monotonic_clock::max_time.time_since_epoch().count());
2618 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2619 monotonic_clock::max_time.time_since_epoch().count());
2620 EXPECT_EQ(log_header->message()
2621 .oldest_remote_unreliable_monotonic_timestamps()
2622 ->Get(0),
2623 monotonic_clock::max_time.time_since_epoch().count());
2624 EXPECT_EQ(log_header->message()
2625 .oldest_local_unreliable_monotonic_timestamps()
2626 ->Get(0),
2627 monotonic_clock::max_time.time_since_epoch().count());
2628
2629 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2630 monotonic_clock::time_point(chrono::nanoseconds(
2631 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2632 1)));
2633 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2634 monotonic_clock::time_point(chrono::nanoseconds(
2635 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2636 const monotonic_clock::time_point
2637 oldest_remote_unreliable_monotonic_timestamps =
2638 monotonic_clock::time_point(chrono::nanoseconds(
2639 log_header->message()
2640 .oldest_remote_unreliable_monotonic_timestamps()
2641 ->Get(1)));
2642 const monotonic_clock::time_point
2643 oldest_local_unreliable_monotonic_timestamps =
2644 monotonic_clock::time_point(chrono::nanoseconds(
2645 log_header->message()
2646 .oldest_local_unreliable_monotonic_timestamps()
2647 ->Get(1)));
2648 switch (log_header->message().parts_index()) {
2649 case 0:
2650 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2651 monotonic_clock::max_time);
2652 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2653 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2654 monotonic_clock::max_time);
2655 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2656 monotonic_clock::max_time);
2657 break;
2658 default:
2659 FAIL();
2660 break;
2661 }
2662 }
2663
2664 if (shared()) {
2665 EXPECT_EQ(timestamp_file_count, 4u);
2666 } else {
2667 EXPECT_EQ(timestamp_file_count, 4u);
2668 }
2669
2670 // Confirm that we can actually sort the resulting log and read it.
2671 {
2672 LogReader reader(SortParts(filenames));
2673
2674 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2675 log_reader_factory.set_send_delay(chrono::microseconds(0));
2676
2677 // This sends out the fetched messages and advances time to the start of
2678 // the log file.
2679 reader.Register(&log_reader_factory);
2680
2681 log_reader_factory.Run();
2682
2683 reader.Deregister();
2684 }
2685}
2686
2687// Tests that we properly handle one direction of message_bridge being
2688// unavailable.
2689TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
2690 pi1_->Disconnect(pi2_->node());
2691 time_converter_.AddMonotonic(
2692 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2693
2694 time_converter_.AddMonotonic(
2695 {chrono::milliseconds(10000),
2696 chrono::milliseconds(10000) - chrono::milliseconds(1)});
2697 {
2698 LoggerState pi1_logger = MakeLogger(pi1_);
2699
2700 event_loop_factory_.RunFor(chrono::milliseconds(95));
2701
2702 StartLogger(&pi1_logger);
2703
2704 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2705 }
2706
2707 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2708 // to confirm the right thing happened.
2709 ConfirmReadable(pi1_single_direction_logfiles_);
2710}
2711
2712// Tests that we properly handle one direction of message_bridge being
2713// unavailable.
2714TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
2715 pi1_->Disconnect(pi2_->node());
2716 time_converter_.AddMonotonic(
2717 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(500)});
2718
2719 time_converter_.AddMonotonic(
2720 {chrono::milliseconds(10000),
2721 chrono::milliseconds(10000) + chrono::milliseconds(1)});
2722 {
2723 LoggerState pi1_logger = MakeLogger(pi1_);
2724
2725 event_loop_factory_.RunFor(chrono::milliseconds(95));
2726
2727 StartLogger(&pi1_logger);
2728
2729 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2730 }
2731
2732 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2733 // to confirm the right thing happened.
2734 ConfirmReadable(pi1_single_direction_logfiles_);
2735}
2736
2737// Tests that we explode if someone passes in a part file twice with a better
2738// error than an out of order error.
2739TEST_P(MultinodeLoggerTest, DuplicateLogFiles) {
2740 time_converter_.AddMonotonic(
2741 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2742 {
2743 LoggerState pi1_logger = MakeLogger(pi1_);
2744
2745 event_loop_factory_.RunFor(chrono::milliseconds(95));
2746
2747 StartLogger(&pi1_logger);
2748
2749 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2750 }
2751
2752 std::vector<std::string> duplicates;
2753 for (const std::string &f : pi1_single_direction_logfiles_) {
2754 duplicates.emplace_back(f);
2755 duplicates.emplace_back(f);
2756 }
2757 EXPECT_DEATH({ SortParts(duplicates); }, "Found duplicate parts in");
2758}
2759
2760// Tests that we explode if someone loses a part out of the middle of a log.
2761TEST_P(MultinodeLoggerTest, MissingPartsFromMiddle) {
2762 time_converter_.AddMonotonic(
2763 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2764 {
2765 LoggerState pi1_logger = MakeLogger(pi1_);
2766
2767 event_loop_factory_.RunFor(chrono::milliseconds(95));
2768
2769 StartLogger(&pi1_logger);
2770 aos::monotonic_clock::time_point last_rotation_time =
2771 pi1_logger.event_loop->monotonic_now();
2772 pi1_logger.logger->set_on_logged_period([&] {
2773 const auto now = pi1_logger.event_loop->monotonic_now();
2774 if (now > last_rotation_time + std::chrono::seconds(5)) {
2775 pi1_logger.logger->Rotate();
2776 last_rotation_time = now;
2777 }
2778 });
2779
2780 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2781 }
2782
2783 std::vector<std::string> missing_parts;
2784
2785 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
2786 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part2" + Extension());
2787 missing_parts.emplace_back(absl::StrCat(
2788 logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
2789
2790 EXPECT_DEATH({ SortParts(missing_parts); },
2791 "Broken log, missing part files between");
2792}
2793
2794// Tests that we properly handle a dead node. Do this by just disconnecting it
2795// and only using one nodes of logs.
2796TEST_P(MultinodeLoggerTest, DeadNode) {
2797 pi1_->Disconnect(pi2_->node());
2798 pi2_->Disconnect(pi1_->node());
2799 time_converter_.AddMonotonic(
2800 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2801 {
2802 LoggerState pi1_logger = MakeLogger(pi1_);
2803
2804 event_loop_factory_.RunFor(chrono::milliseconds(95));
2805
2806 StartLogger(&pi1_logger);
2807
2808 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2809 }
2810
2811 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2812 // to confirm the right thing happened.
2813 ConfirmReadable(MakePi1DeadNodeLogfiles());
2814}
2815
2816// Tests that we can relog with a different config. This makes most sense when
2817// you are trying to edit a log and want to use channel renaming + the original
2818// config in the new log.
2819TEST_P(MultinodeLoggerTest, LogDifferentConfig) {
2820 time_converter_.StartEqual();
2821 {
2822 LoggerState pi1_logger = MakeLogger(pi1_);
2823 LoggerState pi2_logger = MakeLogger(pi2_);
2824
2825 event_loop_factory_.RunFor(chrono::milliseconds(95));
2826
2827 StartLogger(&pi1_logger);
2828 StartLogger(&pi2_logger);
2829
2830 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2831 }
2832
2833 LogReader reader(SortParts(logfiles_));
2834 reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
2835
2836 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2837 log_reader_factory.set_send_delay(chrono::microseconds(0));
2838
2839 // This sends out the fetched messages and advances time to the start of the
2840 // log file.
2841 reader.Register(&log_reader_factory);
2842
2843 const Node *pi1 =
2844 configuration::GetNode(log_reader_factory.configuration(), "pi1");
2845 const Node *pi2 =
2846 configuration::GetNode(log_reader_factory.configuration(), "pi2");
2847
2848 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
2849 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
2850 LOG(INFO) << "now pi1 "
2851 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
2852 LOG(INFO) << "now pi2 "
2853 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
2854
2855 EXPECT_THAT(reader.LoggedNodes(),
2856 ::testing::ElementsAre(
2857 configuration::GetNode(reader.logged_configuration(), pi1),
2858 configuration::GetNode(reader.logged_configuration(), pi2)));
2859
2860 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
2861
2862 // And confirm we can re-create a log again, while checking the contents.
2863 std::vector<std::string> log_files;
2864 {
2865 LoggerState pi1_logger =
2866 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
2867 &log_reader_factory, reader.logged_configuration());
2868 LoggerState pi2_logger =
2869 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
2870 &log_reader_factory, reader.logged_configuration());
2871
2872 pi1_logger.StartLogger(tmp_dir_ + "/relogged1");
2873 pi2_logger.StartLogger(tmp_dir_ + "/relogged2");
2874
2875 log_reader_factory.Run();
2876
2877 for (auto &x : pi1_logger.log_namer->all_filenames()) {
2878 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged1_", x));
2879 }
2880 for (auto &x : pi2_logger.log_namer->all_filenames()) {
2881 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged2_", x));
2882 }
2883 }
2884
2885 reader.Deregister();
2886
2887 // And verify that we can run the LogReader over the relogged files without
2888 // hitting any fatal errors.
2889 {
2890 LogReader relogged_reader(SortParts(log_files));
2891 relogged_reader.Register();
2892
2893 relogged_reader.event_loop_factory()->Run();
2894 }
2895}
2896
2897// Tests that we properly replay a log where the start time for a node is before
2898// any data on the node. This can happen if the logger starts before data is
2899// published. While the scenario below is a bit convoluted, we have seen logs
2900// like this generated out in the wild.
2901TEST(MultinodeRebootLoggerTest, StartTimeBeforeData) {
2902 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2903 aos::configuration::ReadConfig(ArtifactPath(
2904 "aos/events/logging/multinode_pingpong_split3_config.json"));
2905 message_bridge::TestingTimeConverter time_converter(
2906 configuration::NodesCount(&config.message()));
2907 SimulatedEventLoopFactory event_loop_factory(&config.message());
2908 event_loop_factory.SetTimeConverter(&time_converter);
2909 NodeEventLoopFactory *const pi1 =
2910 event_loop_factory.GetNodeEventLoopFactory("pi1");
2911 const size_t pi1_index = configuration::GetNodeIndex(
2912 event_loop_factory.configuration(), pi1->node());
2913 NodeEventLoopFactory *const pi2 =
2914 event_loop_factory.GetNodeEventLoopFactory("pi2");
2915 const size_t pi2_index = configuration::GetNodeIndex(
2916 event_loop_factory.configuration(), pi2->node());
2917 NodeEventLoopFactory *const pi3 =
2918 event_loop_factory.GetNodeEventLoopFactory("pi3");
2919 const size_t pi3_index = configuration::GetNodeIndex(
2920 event_loop_factory.configuration(), pi3->node());
2921
2922 const std::string kLogfile1_1 =
2923 aos::testing::TestTmpDir() + "/multi_logfile1/";
2924 const std::string kLogfile2_1 =
2925 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
2926 const std::string kLogfile2_2 =
2927 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
2928 const std::string kLogfile3_1 =
2929 aos::testing::TestTmpDir() + "/multi_logfile3/";
2930 util::UnlinkRecursive(kLogfile1_1);
2931 util::UnlinkRecursive(kLogfile2_1);
2932 util::UnlinkRecursive(kLogfile2_2);
2933 util::UnlinkRecursive(kLogfile3_1);
2934 const UUID pi1_boot0 = UUID::Random();
2935 const UUID pi2_boot0 = UUID::Random();
2936 const UUID pi2_boot1 = UUID::Random();
2937 const UUID pi3_boot0 = UUID::Random();
2938 {
2939 CHECK_EQ(pi1_index, 0u);
2940 CHECK_EQ(pi2_index, 1u);
2941 CHECK_EQ(pi3_index, 2u);
2942
2943 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
2944 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
2945 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
2946 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
2947
2948 time_converter.AddNextTimestamp(
2949 distributed_clock::epoch(),
2950 {BootTimestamp::epoch(), BootTimestamp::epoch(),
2951 BootTimestamp::epoch()});
2952 const chrono::nanoseconds reboot_time = chrono::milliseconds(20000);
2953 time_converter.AddNextTimestamp(
2954 distributed_clock::epoch() + reboot_time,
2955 {BootTimestamp::epoch() + reboot_time,
2956 BootTimestamp{
2957 .boot = 1,
2958 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)},
2959 BootTimestamp::epoch() + reboot_time});
2960 }
2961
2962 // Make everything perfectly quiet.
2963 event_loop_factory.SkipTimingReport();
2964 event_loop_factory.DisableStatistics();
2965
2966 std::vector<std::string> filenames;
2967 {
2968 LoggerState pi1_logger = MakeLoggerState(
2969 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2970 LoggerState pi3_logger = MakeLoggerState(
2971 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2972 {
2973 // And now start the logger.
2974 LoggerState pi2_logger = MakeLoggerState(
2975 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
2976
2977 event_loop_factory.RunFor(chrono::milliseconds(1000));
2978
2979 pi1_logger.StartLogger(kLogfile1_1);
2980 pi3_logger.StartLogger(kLogfile3_1);
2981 pi2_logger.StartLogger(kLogfile2_1);
2982
2983 event_loop_factory.RunFor(chrono::milliseconds(10000));
2984
2985 // Now that we've got a start time in the past, turn on data.
2986 event_loop_factory.EnableStatistics();
2987 std::unique_ptr<aos::EventLoop> ping_event_loop =
2988 pi1->MakeEventLoop("ping");
2989 Ping ping(ping_event_loop.get());
2990
2991 pi2->AlwaysStart<Pong>("pong");
2992
2993 event_loop_factory.RunFor(chrono::milliseconds(3000));
2994
2995 pi2_logger.AppendAllFilenames(&filenames);
2996
2997 // Stop logging on pi2 before rebooting and completely shut off all
2998 // messages on pi2.
2999 pi2->DisableStatistics();
3000 pi1->Disconnect(pi2->node());
3001 pi2->Disconnect(pi1->node());
3002 }
3003 event_loop_factory.RunFor(chrono::milliseconds(7000));
3004 // pi2 now reboots.
3005 {
3006 event_loop_factory.RunFor(chrono::milliseconds(1000));
3007
3008 // Start logging again on pi2 after it is up.
3009 LoggerState pi2_logger = MakeLoggerState(
3010 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3011 pi2_logger.StartLogger(kLogfile2_2);
3012
3013 event_loop_factory.RunFor(chrono::milliseconds(10000));
3014 // And, now that we have a start time in the log, turn data back on.
3015 pi2->EnableStatistics();
3016 pi1->Connect(pi2->node());
3017 pi2->Connect(pi1->node());
3018
3019 pi2->AlwaysStart<Pong>("pong");
3020 std::unique_ptr<aos::EventLoop> ping_event_loop =
3021 pi1->MakeEventLoop("ping");
3022 Ping ping(ping_event_loop.get());
3023
3024 event_loop_factory.RunFor(chrono::milliseconds(3000));
3025
3026 pi2_logger.AppendAllFilenames(&filenames);
3027 }
3028
3029 pi1_logger.AppendAllFilenames(&filenames);
3030 pi3_logger.AppendAllFilenames(&filenames);
3031 }
3032
3033 // Confirm that we can parse the result. LogReader has enough internal CHECKs
3034 // to confirm the right thing happened.
3035 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3036 auto result = ConfirmReadable(filenames);
3037 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
3038 chrono::seconds(1)));
3039 EXPECT_THAT(result[0].second,
3040 ::testing::ElementsAre(realtime_clock::epoch() +
3041 chrono::microseconds(34990350)));
3042
3043 EXPECT_THAT(result[1].first,
3044 ::testing::ElementsAre(
3045 realtime_clock::epoch() + chrono::seconds(1),
3046 realtime_clock::epoch() + chrono::microseconds(3323000)));
3047 EXPECT_THAT(result[1].second,
3048 ::testing::ElementsAre(
3049 realtime_clock::epoch() + chrono::microseconds(13990200),
3050 realtime_clock::epoch() + chrono::microseconds(16313200)));
3051
3052 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
3053 chrono::seconds(1)));
3054 EXPECT_THAT(result[2].second,
3055 ::testing::ElementsAre(realtime_clock::epoch() +
3056 chrono::microseconds(34900150)));
3057}
3058
3059// Tests that local data before remote data after reboot is properly replayed.
3060// We only trigger a reboot in the timestamp interpolation function when solving
3061// the timestamp problem when we actually have a point in the function. This
3062// originally only happened when a point passes the noncausal filter. At the
3063// start of time for the second boot, if we aren't careful, we will have
3064// messages which need to be published at times before the boot. This happens
3065// when a local message is in the log before a forwarded message, so there is no
3066// point in the interpolation function. This delays the reboot. So, we need to
3067// recreate that situation and make sure it doesn't come back.
3068TEST(MultinodeRebootLoggerTest,
3069 LocalMessageBeforeRemoteBeforeStartAfterReboot) {
3070 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3071 aos::configuration::ReadConfig(ArtifactPath(
3072 "aos/events/logging/multinode_pingpong_split3_config.json"));
3073 message_bridge::TestingTimeConverter time_converter(
3074 configuration::NodesCount(&config.message()));
3075 SimulatedEventLoopFactory event_loop_factory(&config.message());
3076 event_loop_factory.SetTimeConverter(&time_converter);
3077 NodeEventLoopFactory *const pi1 =
3078 event_loop_factory.GetNodeEventLoopFactory("pi1");
3079 const size_t pi1_index = configuration::GetNodeIndex(
3080 event_loop_factory.configuration(), pi1->node());
3081 NodeEventLoopFactory *const pi2 =
3082 event_loop_factory.GetNodeEventLoopFactory("pi2");
3083 const size_t pi2_index = configuration::GetNodeIndex(
3084 event_loop_factory.configuration(), pi2->node());
3085 NodeEventLoopFactory *const pi3 =
3086 event_loop_factory.GetNodeEventLoopFactory("pi3");
3087 const size_t pi3_index = configuration::GetNodeIndex(
3088 event_loop_factory.configuration(), pi3->node());
3089
3090 const std::string kLogfile1_1 =
3091 aos::testing::TestTmpDir() + "/multi_logfile1/";
3092 const std::string kLogfile2_1 =
3093 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3094 const std::string kLogfile2_2 =
3095 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3096 const std::string kLogfile3_1 =
3097 aos::testing::TestTmpDir() + "/multi_logfile3/";
3098 util::UnlinkRecursive(kLogfile1_1);
3099 util::UnlinkRecursive(kLogfile2_1);
3100 util::UnlinkRecursive(kLogfile2_2);
3101 util::UnlinkRecursive(kLogfile3_1);
3102 const UUID pi1_boot0 = UUID::Random();
3103 const UUID pi2_boot0 = UUID::Random();
3104 const UUID pi2_boot1 = UUID::Random();
3105 const UUID pi3_boot0 = UUID::Random();
3106 {
3107 CHECK_EQ(pi1_index, 0u);
3108 CHECK_EQ(pi2_index, 1u);
3109 CHECK_EQ(pi3_index, 2u);
3110
3111 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3112 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3113 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3114 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3115
3116 time_converter.AddNextTimestamp(
3117 distributed_clock::epoch(),
3118 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3119 BootTimestamp::epoch()});
3120 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3121 time_converter.AddNextTimestamp(
3122 distributed_clock::epoch() + reboot_time,
3123 {BootTimestamp::epoch() + reboot_time,
3124 BootTimestamp{.boot = 1,
3125 .time = monotonic_clock::epoch() + reboot_time +
3126 chrono::seconds(100)},
3127 BootTimestamp::epoch() + reboot_time});
3128 }
3129
3130 std::vector<std::string> filenames;
3131 {
3132 LoggerState pi1_logger = MakeLoggerState(
3133 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3134 LoggerState pi3_logger = MakeLoggerState(
3135 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3136 {
3137 // And now start the logger.
3138 LoggerState pi2_logger = MakeLoggerState(
3139 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3140
3141 pi1_logger.StartLogger(kLogfile1_1);
3142 pi3_logger.StartLogger(kLogfile3_1);
3143 pi2_logger.StartLogger(kLogfile2_1);
3144
3145 event_loop_factory.RunFor(chrono::milliseconds(1005));
3146
3147 // Now that we've got a start time in the past, turn on data.
3148 std::unique_ptr<aos::EventLoop> ping_event_loop =
3149 pi1->MakeEventLoop("ping");
3150 Ping ping(ping_event_loop.get());
3151
3152 pi2->AlwaysStart<Pong>("pong");
3153
3154 event_loop_factory.RunFor(chrono::milliseconds(3000));
3155
3156 pi2_logger.AppendAllFilenames(&filenames);
3157
3158 // Disable any remote messages on pi2.
3159 pi1->Disconnect(pi2->node());
3160 pi2->Disconnect(pi1->node());
3161 }
3162 event_loop_factory.RunFor(chrono::milliseconds(995));
3163 // pi2 now reboots at 5 seconds.
3164 {
3165 event_loop_factory.RunFor(chrono::milliseconds(1000));
3166
3167 // Make local stuff happen before we start logging and connect the remote.
3168 pi2->AlwaysStart<Pong>("pong");
3169 std::unique_ptr<aos::EventLoop> ping_event_loop =
3170 pi1->MakeEventLoop("ping");
3171 Ping ping(ping_event_loop.get());
3172 event_loop_factory.RunFor(chrono::milliseconds(1005));
3173
3174 // Start logging again on pi2 after it is up.
3175 LoggerState pi2_logger = MakeLoggerState(
3176 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3177 pi2_logger.StartLogger(kLogfile2_2);
3178
3179 // And allow remote messages now that we have some local ones.
3180 pi1->Connect(pi2->node());
3181 pi2->Connect(pi1->node());
3182
3183 event_loop_factory.RunFor(chrono::milliseconds(1000));
3184
3185 event_loop_factory.RunFor(chrono::milliseconds(3000));
3186
3187 pi2_logger.AppendAllFilenames(&filenames);
3188 }
3189
3190 pi1_logger.AppendAllFilenames(&filenames);
3191 pi3_logger.AppendAllFilenames(&filenames);
3192 }
3193
3194 // Confirm that we can parse the result. LogReader has enough internal CHECKs
3195 // to confirm the right thing happened.
3196 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3197 auto result = ConfirmReadable(filenames);
3198
3199 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3200 EXPECT_THAT(result[0].second,
3201 ::testing::ElementsAre(realtime_clock::epoch() +
3202 chrono::microseconds(11000350)));
3203
3204 EXPECT_THAT(result[1].first,
3205 ::testing::ElementsAre(
3206 realtime_clock::epoch(),
3207 realtime_clock::epoch() + chrono::microseconds(107005000)));
3208 EXPECT_THAT(result[1].second,
3209 ::testing::ElementsAre(
3210 realtime_clock::epoch() + chrono::microseconds(4000150),
3211 realtime_clock::epoch() + chrono::microseconds(111000200)));
3212
3213 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3214 EXPECT_THAT(result[2].second,
3215 ::testing::ElementsAre(realtime_clock::epoch() +
3216 chrono::microseconds(11000150)));
3217
3218 auto start_stop_result = ConfirmReadable(
3219 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3220 realtime_clock::epoch() + chrono::milliseconds(3000));
3221
3222 EXPECT_THAT(
3223 start_stop_result[0].first,
3224 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3225 EXPECT_THAT(
3226 start_stop_result[0].second,
3227 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3228 EXPECT_THAT(
3229 start_stop_result[1].first,
3230 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3231 EXPECT_THAT(
3232 start_stop_result[1].second,
3233 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3234 EXPECT_THAT(
3235 start_stop_result[2].first,
3236 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3237 EXPECT_THAT(
3238 start_stop_result[2].second,
3239 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3240}
3241
3242// Tests that setting the start and stop flags across a reboot works as
3243// expected.
3244TEST(MultinodeRebootLoggerTest, RebootStartStopTimes) {
3245 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3246 aos::configuration::ReadConfig(ArtifactPath(
3247 "aos/events/logging/multinode_pingpong_split3_config.json"));
3248 message_bridge::TestingTimeConverter time_converter(
3249 configuration::NodesCount(&config.message()));
3250 SimulatedEventLoopFactory event_loop_factory(&config.message());
3251 event_loop_factory.SetTimeConverter(&time_converter);
3252 NodeEventLoopFactory *const pi1 =
3253 event_loop_factory.GetNodeEventLoopFactory("pi1");
3254 const size_t pi1_index = configuration::GetNodeIndex(
3255 event_loop_factory.configuration(), pi1->node());
3256 NodeEventLoopFactory *const pi2 =
3257 event_loop_factory.GetNodeEventLoopFactory("pi2");
3258 const size_t pi2_index = configuration::GetNodeIndex(
3259 event_loop_factory.configuration(), pi2->node());
3260 NodeEventLoopFactory *const pi3 =
3261 event_loop_factory.GetNodeEventLoopFactory("pi3");
3262 const size_t pi3_index = configuration::GetNodeIndex(
3263 event_loop_factory.configuration(), pi3->node());
3264
3265 const std::string kLogfile1_1 =
3266 aos::testing::TestTmpDir() + "/multi_logfile1/";
3267 const std::string kLogfile2_1 =
3268 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3269 const std::string kLogfile2_2 =
3270 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3271 const std::string kLogfile3_1 =
3272 aos::testing::TestTmpDir() + "/multi_logfile3/";
3273 util::UnlinkRecursive(kLogfile1_1);
3274 util::UnlinkRecursive(kLogfile2_1);
3275 util::UnlinkRecursive(kLogfile2_2);
3276 util::UnlinkRecursive(kLogfile3_1);
3277 {
3278 CHECK_EQ(pi1_index, 0u);
3279 CHECK_EQ(pi2_index, 1u);
3280 CHECK_EQ(pi3_index, 2u);
3281
3282 time_converter.AddNextTimestamp(
3283 distributed_clock::epoch(),
3284 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3285 BootTimestamp::epoch()});
3286 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3287 time_converter.AddNextTimestamp(
3288 distributed_clock::epoch() + reboot_time,
3289 {BootTimestamp::epoch() + reboot_time,
3290 BootTimestamp{.boot = 1,
3291 .time = monotonic_clock::epoch() + reboot_time},
3292 BootTimestamp::epoch() + reboot_time});
3293 }
3294
3295 std::vector<std::string> filenames;
3296 {
3297 LoggerState pi1_logger = MakeLoggerState(
3298 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3299 LoggerState pi3_logger = MakeLoggerState(
3300 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3301 {
3302 // And now start the logger.
3303 LoggerState pi2_logger = MakeLoggerState(
3304 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3305
3306 pi1_logger.StartLogger(kLogfile1_1);
3307 pi3_logger.StartLogger(kLogfile3_1);
3308 pi2_logger.StartLogger(kLogfile2_1);
3309
3310 event_loop_factory.RunFor(chrono::milliseconds(1005));
3311
3312 // Now that we've got a start time in the past, turn on data.
3313 std::unique_ptr<aos::EventLoop> ping_event_loop =
3314 pi1->MakeEventLoop("ping");
3315 Ping ping(ping_event_loop.get());
3316
3317 pi2->AlwaysStart<Pong>("pong");
3318
3319 event_loop_factory.RunFor(chrono::milliseconds(3000));
3320
3321 pi2_logger.AppendAllFilenames(&filenames);
3322 }
3323 event_loop_factory.RunFor(chrono::milliseconds(995));
3324 // pi2 now reboots at 5 seconds.
3325 {
3326 event_loop_factory.RunFor(chrono::milliseconds(1000));
3327
3328 // Make local stuff happen before we start logging and connect the remote.
3329 pi2->AlwaysStart<Pong>("pong");
3330 std::unique_ptr<aos::EventLoop> ping_event_loop =
3331 pi1->MakeEventLoop("ping");
3332 Ping ping(ping_event_loop.get());
3333 event_loop_factory.RunFor(chrono::milliseconds(5));
3334
3335 // Start logging again on pi2 after it is up.
3336 LoggerState pi2_logger = MakeLoggerState(
3337 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3338 pi2_logger.StartLogger(kLogfile2_2);
3339
3340 event_loop_factory.RunFor(chrono::milliseconds(5000));
3341
3342 pi2_logger.AppendAllFilenames(&filenames);
3343 }
3344
3345 pi1_logger.AppendAllFilenames(&filenames);
3346 pi3_logger.AppendAllFilenames(&filenames);
3347 }
3348
3349 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3350 auto result = ConfirmReadable(filenames);
3351
3352 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3353 EXPECT_THAT(result[0].second,
3354 ::testing::ElementsAre(realtime_clock::epoch() +
3355 chrono::microseconds(11000350)));
3356
3357 EXPECT_THAT(result[1].first,
3358 ::testing::ElementsAre(
3359 realtime_clock::epoch(),
3360 realtime_clock::epoch() + chrono::microseconds(6005000)));
3361 EXPECT_THAT(result[1].second,
3362 ::testing::ElementsAre(
3363 realtime_clock::epoch() + chrono::microseconds(4900150),
3364 realtime_clock::epoch() + chrono::microseconds(11000200)));
3365
3366 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3367 EXPECT_THAT(result[2].second,
3368 ::testing::ElementsAre(realtime_clock::epoch() +
3369 chrono::microseconds(11000150)));
3370
3371 // Confirm we observed the correct start and stop times. We should see the
3372 // reboot here.
3373 auto start_stop_result = ConfirmReadable(
3374 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3375 realtime_clock::epoch() + chrono::milliseconds(8000));
3376
3377 EXPECT_THAT(
3378 start_stop_result[0].first,
3379 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3380 EXPECT_THAT(
3381 start_stop_result[0].second,
3382 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3383 EXPECT_THAT(start_stop_result[1].first,
3384 ::testing::ElementsAre(
3385 realtime_clock::epoch() + chrono::seconds(2),
3386 realtime_clock::epoch() + chrono::microseconds(6005000)));
3387 EXPECT_THAT(start_stop_result[1].second,
3388 ::testing::ElementsAre(
3389 realtime_clock::epoch() + chrono::microseconds(4900150),
3390 realtime_clock::epoch() + chrono::seconds(8)));
3391 EXPECT_THAT(
3392 start_stop_result[2].first,
3393 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3394 EXPECT_THAT(
3395 start_stop_result[2].second,
3396 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3397}
3398
3399// Tests that we properly handle one direction being down.
3400TEST(MissingDirectionTest, OneDirection) {
3401 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3402 aos::configuration::ReadConfig(ArtifactPath(
3403 "aos/events/logging/multinode_pingpong_split4_config.json"));
3404 message_bridge::TestingTimeConverter time_converter(
3405 configuration::NodesCount(&config.message()));
3406 SimulatedEventLoopFactory event_loop_factory(&config.message());
3407 event_loop_factory.SetTimeConverter(&time_converter);
3408
3409 NodeEventLoopFactory *const pi1 =
3410 event_loop_factory.GetNodeEventLoopFactory("pi1");
3411 const size_t pi1_index = configuration::GetNodeIndex(
3412 event_loop_factory.configuration(), pi1->node());
3413 NodeEventLoopFactory *const pi2 =
3414 event_loop_factory.GetNodeEventLoopFactory("pi2");
3415 const size_t pi2_index = configuration::GetNodeIndex(
3416 event_loop_factory.configuration(), pi2->node());
3417 std::vector<std::string> filenames;
3418
3419 {
3420 CHECK_EQ(pi1_index, 0u);
3421 CHECK_EQ(pi2_index, 1u);
3422
3423 time_converter.AddNextTimestamp(
3424 distributed_clock::epoch(),
3425 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3426
3427 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3428 time_converter.AddNextTimestamp(
3429 distributed_clock::epoch() + reboot_time,
3430 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3431 BootTimestamp::epoch() + reboot_time});
3432 }
3433
3434 const std::string kLogfile2_1 =
3435 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3436 const std::string kLogfile1_1 =
3437 aos::testing::TestTmpDir() + "/multi_logfile1.1/";
3438 util::UnlinkRecursive(kLogfile2_1);
3439 util::UnlinkRecursive(kLogfile1_1);
3440
3441 pi2->Disconnect(pi1->node());
3442
3443 pi1->AlwaysStart<Ping>("ping");
3444 pi2->AlwaysStart<Pong>("pong");
3445
3446 {
3447 LoggerState pi2_logger = MakeLoggerState(
3448 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3449
3450 event_loop_factory.RunFor(chrono::milliseconds(95));
3451
3452 pi2_logger.StartLogger(kLogfile2_1);
3453
3454 event_loop_factory.RunFor(chrono::milliseconds(6000));
3455
3456 pi2->Connect(pi1->node());
3457
3458 LoggerState pi1_logger = MakeLoggerState(
3459 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3460 pi1_logger.StartLogger(kLogfile1_1);
3461
3462 event_loop_factory.RunFor(chrono::milliseconds(5000));
3463 pi1_logger.AppendAllFilenames(&filenames);
3464 pi2_logger.AppendAllFilenames(&filenames);
3465 }
3466
3467 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3468 ConfirmReadable(filenames);
3469}
3470
3471// Tests that we properly handle only one direction ever existing after a
3472// reboot.
3473TEST(MissingDirectionTest, OneDirectionAfterReboot) {
3474 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3475 aos::configuration::ReadConfig(ArtifactPath(
3476 "aos/events/logging/multinode_pingpong_split4_config.json"));
3477 message_bridge::TestingTimeConverter time_converter(
3478 configuration::NodesCount(&config.message()));
3479 SimulatedEventLoopFactory event_loop_factory(&config.message());
3480 event_loop_factory.SetTimeConverter(&time_converter);
3481
3482 NodeEventLoopFactory *const pi1 =
3483 event_loop_factory.GetNodeEventLoopFactory("pi1");
3484 const size_t pi1_index = configuration::GetNodeIndex(
3485 event_loop_factory.configuration(), pi1->node());
3486 NodeEventLoopFactory *const pi2 =
3487 event_loop_factory.GetNodeEventLoopFactory("pi2");
3488 const size_t pi2_index = configuration::GetNodeIndex(
3489 event_loop_factory.configuration(), pi2->node());
3490 std::vector<std::string> filenames;
3491
3492 {
3493 CHECK_EQ(pi1_index, 0u);
3494 CHECK_EQ(pi2_index, 1u);
3495
3496 time_converter.AddNextTimestamp(
3497 distributed_clock::epoch(),
3498 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3499
3500 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3501 time_converter.AddNextTimestamp(
3502 distributed_clock::epoch() + reboot_time,
3503 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3504 BootTimestamp::epoch() + reboot_time});
3505 }
3506
3507 const std::string kLogfile2_1 =
3508 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3509 util::UnlinkRecursive(kLogfile2_1);
3510
3511 pi1->AlwaysStart<Ping>("ping");
3512
3513 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3514 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3515 // second boot.
3516 {
3517 LoggerState pi2_logger = MakeLoggerState(
3518 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3519
3520 event_loop_factory.RunFor(chrono::milliseconds(95));
3521
3522 pi2_logger.StartLogger(kLogfile2_1);
3523
3524 event_loop_factory.RunFor(chrono::milliseconds(4000));
3525
3526 pi2->Disconnect(pi1->node());
3527
3528 event_loop_factory.RunFor(chrono::milliseconds(1000));
3529 pi1->AlwaysStart<Ping>("ping");
3530
3531 event_loop_factory.RunFor(chrono::milliseconds(5000));
3532 pi2_logger.AppendAllFilenames(&filenames);
3533 }
3534
3535 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3536 ConfirmReadable(filenames);
3537}
3538
3539// Tests that we properly handle only one direction ever existing after a reboot
3540// with only reliable data.
3541TEST(MissingDirectionTest, OneDirectionAfterRebootReliable) {
3542 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3543 aos::configuration::ReadConfig(ArtifactPath(
3544 "aos/events/logging/multinode_pingpong_split4_reliable_config.json"));
3545 message_bridge::TestingTimeConverter time_converter(
3546 configuration::NodesCount(&config.message()));
3547 SimulatedEventLoopFactory event_loop_factory(&config.message());
3548 event_loop_factory.SetTimeConverter(&time_converter);
3549
3550 NodeEventLoopFactory *const pi1 =
3551 event_loop_factory.GetNodeEventLoopFactory("pi1");
3552 const size_t pi1_index = configuration::GetNodeIndex(
3553 event_loop_factory.configuration(), pi1->node());
3554 NodeEventLoopFactory *const pi2 =
3555 event_loop_factory.GetNodeEventLoopFactory("pi2");
3556 const size_t pi2_index = configuration::GetNodeIndex(
3557 event_loop_factory.configuration(), pi2->node());
3558 std::vector<std::string> filenames;
3559
3560 {
3561 CHECK_EQ(pi1_index, 0u);
3562 CHECK_EQ(pi2_index, 1u);
3563
3564 time_converter.AddNextTimestamp(
3565 distributed_clock::epoch(),
3566 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3567
3568 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3569 time_converter.AddNextTimestamp(
3570 distributed_clock::epoch() + reboot_time,
3571 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3572 BootTimestamp::epoch() + reboot_time});
3573 }
3574
3575 const std::string kLogfile2_1 =
3576 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3577 util::UnlinkRecursive(kLogfile2_1);
3578
3579 pi1->AlwaysStart<Ping>("ping");
3580
3581 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3582 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3583 // second boot.
3584 {
3585 LoggerState pi2_logger = MakeLoggerState(
3586 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3587
3588 event_loop_factory.RunFor(chrono::milliseconds(95));
3589
3590 pi2_logger.StartLogger(kLogfile2_1);
3591
3592 event_loop_factory.RunFor(chrono::milliseconds(4000));
3593
3594 pi2->Disconnect(pi1->node());
3595
3596 event_loop_factory.RunFor(chrono::milliseconds(1000));
3597 pi1->AlwaysStart<Ping>("ping");
3598
3599 event_loop_factory.RunFor(chrono::milliseconds(5000));
3600 pi2_logger.AppendAllFilenames(&filenames);
3601 }
3602
3603 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3604 ConfirmReadable(filenames);
3605}
3606
Brian Smartte67d7112023-03-20 12:06:30 -07003607// Tests that we properly handle only one direction ever existing after a reboot
3608// with mixed unreliable vs reliable, where reliable has an earlier timestamp
3609// than unreliable.
3610TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase1) {
3611 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3612 aos::configuration::ReadConfig(ArtifactPath(
3613 "aos/events/logging/multinode_pingpong_split4_mixed1_config.json"));
3614 message_bridge::TestingTimeConverter time_converter(
3615 configuration::NodesCount(&config.message()));
3616 SimulatedEventLoopFactory event_loop_factory(&config.message());
3617 event_loop_factory.SetTimeConverter(&time_converter);
3618
3619 NodeEventLoopFactory *const pi1 =
3620 event_loop_factory.GetNodeEventLoopFactory("pi1");
3621 const size_t pi1_index = configuration::GetNodeIndex(
3622 event_loop_factory.configuration(), pi1->node());
3623 NodeEventLoopFactory *const pi2 =
3624 event_loop_factory.GetNodeEventLoopFactory("pi2");
3625 const size_t pi2_index = configuration::GetNodeIndex(
3626 event_loop_factory.configuration(), pi2->node());
3627 std::vector<std::string> filenames;
3628
3629 {
3630 CHECK_EQ(pi1_index, 0u);
3631 CHECK_EQ(pi2_index, 1u);
3632
3633 time_converter.AddNextTimestamp(
3634 distributed_clock::epoch(),
3635 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3636
3637 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3638 time_converter.AddNextTimestamp(
3639 distributed_clock::epoch() + reboot_time,
3640 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3641 BootTimestamp::epoch() + reboot_time});
3642 }
3643
3644 const std::string kLogfile2_1 =
3645 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3646 util::UnlinkRecursive(kLogfile2_1);
3647
3648 // The following sequence using the above reference config creates
3649 // a reliable message timestamp < unreliable message timestamp.
3650 {
3651 pi1->DisableStatistics();
3652 pi2->DisableStatistics();
3653
3654 event_loop_factory.RunFor(chrono::milliseconds(95));
3655
3656 pi1->AlwaysStart<Ping>("ping");
3657
3658 event_loop_factory.RunFor(chrono::milliseconds(5250));
3659
3660 pi1->EnableStatistics();
3661
3662 event_loop_factory.RunFor(chrono::milliseconds(1000));
3663
3664 LoggerState pi2_logger = MakeLoggerState(
3665 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3666
3667 pi2_logger.StartLogger(kLogfile2_1);
3668
3669 event_loop_factory.RunFor(chrono::milliseconds(5000));
3670 pi2_logger.AppendAllFilenames(&filenames);
3671 }
3672
3673 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3674 ConfirmReadable(filenames);
3675}
3676
3677// Tests that we properly handle only one direction ever existing after a reboot
3678// with mixed unreliable vs reliable, where unreliable has an earlier timestamp
3679// than reliable.
3680TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase2) {
3681 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3682 aos::configuration::ReadConfig(ArtifactPath(
3683 "aos/events/logging/multinode_pingpong_split4_mixed2_config.json"));
3684 message_bridge::TestingTimeConverter time_converter(
3685 configuration::NodesCount(&config.message()));
3686 SimulatedEventLoopFactory event_loop_factory(&config.message());
3687 event_loop_factory.SetTimeConverter(&time_converter);
3688
3689 NodeEventLoopFactory *const pi1 =
3690 event_loop_factory.GetNodeEventLoopFactory("pi1");
3691 const size_t pi1_index = configuration::GetNodeIndex(
3692 event_loop_factory.configuration(), pi1->node());
3693 NodeEventLoopFactory *const pi2 =
3694 event_loop_factory.GetNodeEventLoopFactory("pi2");
3695 const size_t pi2_index = configuration::GetNodeIndex(
3696 event_loop_factory.configuration(), pi2->node());
3697 std::vector<std::string> filenames;
3698
3699 {
3700 CHECK_EQ(pi1_index, 0u);
3701 CHECK_EQ(pi2_index, 1u);
3702
3703 time_converter.AddNextTimestamp(
3704 distributed_clock::epoch(),
3705 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3706
3707 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3708 time_converter.AddNextTimestamp(
3709 distributed_clock::epoch() + reboot_time,
3710 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3711 BootTimestamp::epoch() + reboot_time});
3712 }
3713
3714 const std::string kLogfile2_1 =
3715 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3716 util::UnlinkRecursive(kLogfile2_1);
3717
3718 // The following sequence using the above reference config creates
3719 // an unreliable message timestamp < reliable message timestamp.
3720 {
3721 pi1->DisableStatistics();
3722 pi2->DisableStatistics();
3723
3724 event_loop_factory.RunFor(chrono::milliseconds(95));
3725
3726 pi1->AlwaysStart<Ping>("ping");
3727
3728 event_loop_factory.RunFor(chrono::milliseconds(5250));
3729
3730 pi1->EnableStatistics();
3731
3732 event_loop_factory.RunFor(chrono::milliseconds(1000));
3733
3734 LoggerState pi2_logger = MakeLoggerState(
3735 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3736
3737 pi2_logger.StartLogger(kLogfile2_1);
3738
3739 event_loop_factory.RunFor(chrono::milliseconds(5000));
3740 pi2_logger.AppendAllFilenames(&filenames);
3741 }
3742
3743 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3744 ConfirmReadable(filenames);
3745}
3746
Naman Guptaa63aa132023-03-22 20:06:34 -07003747// Tests that we properly handle what used to be a time violation in one
3748// direction. This can occur when one direction goes down after sending some
3749// data, but the other keeps working. The down direction ends up resolving to a
3750// straight line in the noncausal filter, where the direction which is still up
3751// can cross that line. Really, time progressed along just fine but we assumed
3752// that the offset was a line when it could have deviated by up to 1ms/second.
3753TEST_P(MultinodeLoggerTest, OneDirectionTimeDrift) {
3754 std::vector<std::string> filenames;
3755
3756 CHECK_EQ(pi1_index_, 0u);
3757 CHECK_EQ(pi2_index_, 1u);
3758
3759 time_converter_.AddNextTimestamp(
3760 distributed_clock::epoch(),
3761 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3762
3763 const chrono::nanoseconds before_disconnect_duration =
3764 time_converter_.AddMonotonic(
3765 {chrono::milliseconds(1000), chrono::milliseconds(1000)});
3766
3767 const chrono::nanoseconds test_duration =
3768 time_converter_.AddMonotonic(
3769 {chrono::milliseconds(1000), chrono::milliseconds(1000)}) +
3770 time_converter_.AddMonotonic(
3771 {chrono::milliseconds(10000),
3772 chrono::milliseconds(10000) - chrono::milliseconds(5)}) +
3773 time_converter_.AddMonotonic(
3774 {chrono::milliseconds(10000),
3775 chrono::milliseconds(10000) + chrono::milliseconds(5)});
3776
3777 const std::string kLogfile =
3778 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3779 util::UnlinkRecursive(kLogfile);
3780
3781 {
3782 LoggerState pi2_logger = MakeLogger(pi2_);
3783 pi2_logger.StartLogger(kLogfile);
3784 event_loop_factory_.RunFor(before_disconnect_duration);
3785
3786 pi2_->Disconnect(pi1_->node());
3787
3788 event_loop_factory_.RunFor(test_duration);
3789 pi2_->Connect(pi1_->node());
3790
3791 event_loop_factory_.RunFor(chrono::milliseconds(5000));
3792 pi2_logger.AppendAllFilenames(&filenames);
3793 }
3794
3795 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3796 ConfirmReadable(filenames);
3797}
3798
3799// Tests that we can replay a logfile that has timestamps such that at least one
3800// node's epoch is at a positive distributed_clock (and thus will have to be
3801// booted after the other node(s)).
3802TEST_P(MultinodeLoggerTest, StartOneNodeBeforeOther) {
3803 std::vector<std::string> filenames;
3804
3805 CHECK_EQ(pi1_index_, 0u);
3806 CHECK_EQ(pi2_index_, 1u);
3807
3808 time_converter_.AddNextTimestamp(
3809 distributed_clock::epoch(),
3810 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3811
3812 const chrono::nanoseconds before_reboot_duration = chrono::milliseconds(1000);
3813 time_converter_.RebootAt(
3814 0, distributed_clock::time_point(before_reboot_duration));
3815
3816 const chrono::nanoseconds test_duration = time_converter_.AddMonotonic(
3817 {chrono::milliseconds(10000), chrono::milliseconds(10000)});
3818
3819 const std::string kLogfile =
3820 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3821 util::UnlinkRecursive(kLogfile);
3822
3823 pi2_->Disconnect(pi1_->node());
3824 pi1_->Disconnect(pi2_->node());
3825
3826 {
3827 LoggerState pi2_logger = MakeLogger(pi2_);
3828
3829 pi2_logger.StartLogger(kLogfile);
3830 event_loop_factory_.RunFor(before_reboot_duration);
3831
3832 pi2_->Connect(pi1_->node());
3833 pi1_->Connect(pi2_->node());
3834
3835 event_loop_factory_.RunFor(test_duration);
3836
3837 pi2_logger.AppendAllFilenames(&filenames);
3838 }
3839
3840 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3841 ConfirmReadable(filenames);
3842
3843 {
3844 LogReader reader(sorted_parts);
3845 SimulatedEventLoopFactory replay_factory(reader.configuration());
3846 reader.RegisterWithoutStarting(&replay_factory);
3847
3848 NodeEventLoopFactory *const replay_node =
3849 reader.event_loop_factory()->GetNodeEventLoopFactory("pi1");
3850
3851 std::unique_ptr<EventLoop> test_event_loop =
3852 replay_node->MakeEventLoop("test_reader");
3853 replay_node->OnStartup([replay_node]() {
3854 // Check that we didn't boot until at least t=0.
3855 CHECK_LE(monotonic_clock::epoch(), replay_node->monotonic_now());
3856 });
3857 test_event_loop->OnRun([&test_event_loop]() {
3858 // Check that we didn't boot until at least t=0.
3859 EXPECT_LE(monotonic_clock::epoch(), test_event_loop->monotonic_now());
3860 });
3861 reader.event_loop_factory()->Run();
3862 reader.Deregister();
3863 }
3864}
3865
3866// Tests that when we have a loop without all the logs at all points in time, we
3867// can sort it properly.
3868TEST(MultinodeLoggerLoopTest, Loop) {
3869 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3870 aos::configuration::ReadConfig(ArtifactPath(
3871 "aos/events/logging/multinode_pingpong_triangle_split_config.json"));
3872 message_bridge::TestingTimeConverter time_converter(
3873 configuration::NodesCount(&config.message()));
3874 SimulatedEventLoopFactory event_loop_factory(&config.message());
3875 event_loop_factory.SetTimeConverter(&time_converter);
3876
3877 NodeEventLoopFactory *const pi1 =
3878 event_loop_factory.GetNodeEventLoopFactory("pi1");
3879 NodeEventLoopFactory *const pi2 =
3880 event_loop_factory.GetNodeEventLoopFactory("pi2");
3881 NodeEventLoopFactory *const pi3 =
3882 event_loop_factory.GetNodeEventLoopFactory("pi3");
3883
3884 const std::string kLogfile1_1 =
3885 aos::testing::TestTmpDir() + "/multi_logfile1/";
3886 const std::string kLogfile2_1 =
3887 aos::testing::TestTmpDir() + "/multi_logfile2/";
3888 const std::string kLogfile3_1 =
3889 aos::testing::TestTmpDir() + "/multi_logfile3/";
3890 util::UnlinkRecursive(kLogfile1_1);
3891 util::UnlinkRecursive(kLogfile2_1);
3892 util::UnlinkRecursive(kLogfile3_1);
3893
3894 {
3895 // Make pi1 boot before everything else.
3896 time_converter.AddNextTimestamp(
3897 distributed_clock::epoch(),
3898 {BootTimestamp::epoch(),
3899 BootTimestamp::epoch() - chrono::milliseconds(100),
3900 BootTimestamp::epoch() - chrono::milliseconds(300)});
3901 }
3902
3903 // We want to setup a situation such that 2 of the 3 legs of the loop are very
3904 // confident about time being X, and the third leg is pulling the average off
3905 // to one side.
3906 //
3907 // It's easiest to visualize this in timestamp_plotter.
3908
3909 std::vector<std::string> filenames;
3910 {
3911 // Have pi1 send out a reliable message at startup. This sets up a long
3912 // forwarding time message at the start to bias time.
3913 std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
3914 {
3915 aos::Sender<examples::Ping> ping_sender =
3916 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
3917
3918 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
3919 examples::Ping::Builder ping_builder =
3920 builder.MakeBuilder<examples::Ping>();
3921 CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
3922 }
3923
3924 // Wait a while so there's enough data to let the worst case be rather off.
3925 event_loop_factory.RunFor(chrono::seconds(1000));
3926
3927 // Now start a receiving node first. This sets up 2 tight bounds between 2
3928 // of the nodes.
3929 LoggerState pi2_logger = MakeLoggerState(
3930 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3931 pi2_logger.StartLogger(kLogfile2_1);
3932
3933 event_loop_factory.RunFor(chrono::seconds(100));
3934
3935 // And now start the third leg.
3936 LoggerState pi3_logger = MakeLoggerState(
3937 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3938 pi3_logger.StartLogger(kLogfile3_1);
3939
3940 LoggerState pi1_logger = MakeLoggerState(
3941 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3942 pi1_logger.StartLogger(kLogfile1_1);
3943
3944 event_loop_factory.RunFor(chrono::seconds(100));
3945
3946 pi1_logger.AppendAllFilenames(&filenames);
3947 pi2_logger.AppendAllFilenames(&filenames);
3948 pi3_logger.AppendAllFilenames(&filenames);
3949 }
3950
3951 // Make sure we can read this.
3952 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3953 auto result = ConfirmReadable(filenames);
3954}
3955
Austin Schuh08dba8f2023-05-01 08:29:30 -07003956// Tests that RestartLogging works in the simple case. Unfortunately, the
3957// failure cases involve simulating time elapsing in callbacks, which is really
3958// hard. The best we can reasonably do is make sure 2 back to back logs are
3959// parseable together.
3960TEST_P(MultinodeLoggerTest, RestartLogging) {
3961 time_converter_.AddMonotonic(
3962 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
3963 std::vector<std::string> filenames;
3964 {
3965 LoggerState pi1_logger = MakeLogger(pi1_);
3966
3967 event_loop_factory_.RunFor(chrono::milliseconds(95));
3968
3969 StartLogger(&pi1_logger, logfile_base1_);
3970 aos::monotonic_clock::time_point last_rotation_time =
3971 pi1_logger.event_loop->monotonic_now();
3972 pi1_logger.logger->set_on_logged_period([&] {
3973 const auto now = pi1_logger.event_loop->monotonic_now();
3974 if (now > last_rotation_time + std::chrono::seconds(5)) {
3975 pi1_logger.AppendAllFilenames(&filenames);
3976 std::unique_ptr<MultiNodeFilesLogNamer> namer =
3977 pi1_logger.MakeLogNamer(logfile_base2_);
3978 pi1_logger.log_namer = namer.get();
3979
3980 pi1_logger.logger->RestartLogging(std::move(namer));
3981 last_rotation_time = now;
3982 }
3983 });
3984
3985 event_loop_factory_.RunFor(chrono::milliseconds(7000));
3986
3987 pi1_logger.AppendAllFilenames(&filenames);
3988 }
3989
3990 for (const auto &x : filenames) {
3991 LOG(INFO) << x;
3992 }
3993
3994 EXPECT_GE(filenames.size(), 2u);
3995
3996 ConfirmReadable(filenames);
3997
3998 // TODO(austin): It would be good to confirm that any one time messages end up
3999 // in both logs correctly.
4000}
4001
Naman Guptaa63aa132023-03-22 20:06:34 -07004002} // namespace testing
4003} // namespace logger
4004} // namespace aos