blob: 677fc72635b908b13ca6040e4b4cab531ae429c6 [file] [log] [blame]
Brian Smartte67d7112023-03-20 12:06:30 -07001#include <algorithm>
2
Philipp Schrader790cb542023-07-05 21:06:52 -07003#include "gmock/gmock.h"
4#include "gtest/gtest.h"
5
Naman Guptaa63aa132023-03-22 20:06:34 -07006#include "aos/events/logging/log_reader.h"
7#include "aos/events/logging/multinode_logger_test_lib.h"
8#include "aos/events/message_counter.h"
9#include "aos/events/ping_lib.h"
10#include "aos/events/pong_lib.h"
11#include "aos/network/remote_message_generated.h"
12#include "aos/network/timestamp_generated.h"
13#include "aos/testing/tmpdir.h"
Naman Guptaa63aa132023-03-22 20:06:34 -070014
15namespace aos {
16namespace logger {
17namespace testing {
18
19namespace chrono = std::chrono;
20using aos::message_bridge::RemoteMessage;
21using aos::testing::ArtifactPath;
22using aos::testing::MessageCounter;
23
Naman Guptaa63aa132023-03-22 20:06:34 -070024INSTANTIATE_TEST_SUITE_P(
25 All, MultinodeLoggerTest,
26 ::testing::Combine(
27 ::testing::Values(
28 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080029 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070030 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080031 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070032 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
33
34INSTANTIATE_TEST_SUITE_P(
35 All, MultinodeLoggerDeathTest,
36 ::testing::Combine(
37 ::testing::Values(
38 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080039 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070040 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080041 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070042 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
43
44// Tests that we can write and read simple multi-node log files.
45TEST_P(MultinodeLoggerTest, SimpleMultiNode) {
46 std::vector<std::string> actual_filenames;
47 time_converter_.StartEqual();
48
49 {
50 LoggerState pi1_logger = MakeLogger(pi1_);
51 LoggerState pi2_logger = MakeLogger(pi2_);
52
53 event_loop_factory_.RunFor(chrono::milliseconds(95));
54
55 StartLogger(&pi1_logger);
56 StartLogger(&pi2_logger);
57
58 event_loop_factory_.RunFor(chrono::milliseconds(20000));
59 pi1_logger.AppendAllFilenames(&actual_filenames);
60 pi2_logger.AppendAllFilenames(&actual_filenames);
61 }
62
63 ASSERT_THAT(actual_filenames,
64 ::testing::UnorderedElementsAreArray(logfiles_));
65
66 {
67 std::set<std::string> logfile_uuids;
68 std::set<std::string> parts_uuids;
69 // Confirm that we have the expected number of UUIDs for both the logfile
70 // UUIDs and parts UUIDs.
71 std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
72 for (std::string_view f : logfiles_) {
73 log_header.emplace_back(ReadHeader(f).value());
74 if (!log_header.back().message().has_configuration()) {
75 logfile_uuids.insert(
76 log_header.back().message().log_event_uuid()->str());
77 parts_uuids.insert(log_header.back().message().parts_uuid()->str());
78 }
79 }
80
81 EXPECT_EQ(logfile_uuids.size(), 2u);
82 if (shared()) {
83 EXPECT_EQ(parts_uuids.size(), 7u);
84 } else {
85 EXPECT_EQ(parts_uuids.size(), 8u);
86 }
87
88 // And confirm everything is on the correct node.
89 EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
90 EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi1");
91 EXPECT_EQ(log_header[4].message().node()->name()->string_view(), "pi1");
92
93 EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
94 EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi2");
95
96 EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi2");
97 EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi2");
98 EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi2");
99
100 EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi1");
101 EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi1");
102
103 EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
104 EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
105
106 if (shared()) {
107 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
108 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi2");
109 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi2");
110
111 EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi1");
112 EXPECT_EQ(log_header[18].message().node()->name()->string_view(), "pi1");
113 } else {
114 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
115 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi2");
116
117 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi1");
118 EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi1");
119
120 EXPECT_EQ(log_header[18].message().node()->name()->string_view(), "pi2");
121 EXPECT_EQ(log_header[19].message().node()->name()->string_view(), "pi2");
122 }
123
124 // And the parts index matches.
125 EXPECT_EQ(log_header[2].message().parts_index(), 0);
126 EXPECT_EQ(log_header[3].message().parts_index(), 1);
127 EXPECT_EQ(log_header[4].message().parts_index(), 2);
128
129 EXPECT_EQ(log_header[5].message().parts_index(), 0);
130 EXPECT_EQ(log_header[6].message().parts_index(), 1);
131
132 EXPECT_EQ(log_header[7].message().parts_index(), 0);
133 EXPECT_EQ(log_header[8].message().parts_index(), 1);
134 EXPECT_EQ(log_header[9].message().parts_index(), 2);
135
136 EXPECT_EQ(log_header[10].message().parts_index(), 0);
137 EXPECT_EQ(log_header[11].message().parts_index(), 1);
138
139 EXPECT_EQ(log_header[12].message().parts_index(), 0);
140 EXPECT_EQ(log_header[13].message().parts_index(), 1);
141
142 if (shared()) {
143 EXPECT_EQ(log_header[14].message().parts_index(), 0);
144 EXPECT_EQ(log_header[15].message().parts_index(), 1);
145 EXPECT_EQ(log_header[16].message().parts_index(), 2);
146
147 EXPECT_EQ(log_header[17].message().parts_index(), 0);
148 EXPECT_EQ(log_header[18].message().parts_index(), 1);
149 } else {
150 EXPECT_EQ(log_header[14].message().parts_index(), 0);
151 EXPECT_EQ(log_header[15].message().parts_index(), 1);
152
153 EXPECT_EQ(log_header[16].message().parts_index(), 0);
154 EXPECT_EQ(log_header[17].message().parts_index(), 1);
155
156 EXPECT_EQ(log_header[18].message().parts_index(), 0);
157 EXPECT_EQ(log_header[19].message().parts_index(), 1);
158 }
159 }
160
161 const std::vector<LogFile> sorted_log_files = SortParts(logfiles_);
162 {
163 using ::testing::UnorderedElementsAre;
164 std::shared_ptr<const aos::Configuration> config =
165 sorted_log_files[0].config;
166
167 // Timing reports, pings
168 EXPECT_THAT(CountChannelsData(config, logfiles_[2]),
169 UnorderedElementsAre(
170 std::make_tuple("/pi1/aos",
171 "aos.message_bridge.ServerStatistics", 1),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700172 std::make_tuple("/test", "aos.examples.Ping", 1),
173 std::make_tuple("/pi1/aos", "aos.examples.Ping", 1)))
Naman Guptaa63aa132023-03-22 20:06:34 -0700174 << " : " << logfiles_[2];
175 {
176 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700177 std::make_tuple("/pi1/aos", "aos.examples.Ping", 10),
Naman Guptaa63aa132023-03-22 20:06:34 -0700178 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 1),
179 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
180 1)};
181 if (!std::get<0>(GetParam()).shared) {
182 channel_counts.push_back(
183 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
184 "aos-message_bridge-Timestamp",
185 "aos.message_bridge.RemoteMessage", 1));
186 }
187 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
188 ::testing::UnorderedElementsAreArray(channel_counts))
189 << " : " << logfiles_[3];
190 }
191 {
192 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700193 std::make_tuple("/pi1/aos", "aos.examples.Ping", 1990),
Naman Guptaa63aa132023-03-22 20:06:34 -0700194 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 199),
195 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
196 20),
197 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
198 199),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700199 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
Naman Guptaa63aa132023-03-22 20:06:34 -0700200 std::make_tuple("/test", "aos.examples.Ping", 2000)};
201 if (!std::get<0>(GetParam()).shared) {
202 channel_counts.push_back(
203 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
204 "aos-message_bridge-Timestamp",
205 "aos.message_bridge.RemoteMessage", 199));
206 }
207 EXPECT_THAT(CountChannelsData(config, logfiles_[4]),
208 ::testing::UnorderedElementsAreArray(channel_counts))
209 << " : " << logfiles_[4];
210 }
211 // Timestamps for pong
212 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
213 UnorderedElementsAre())
214 << " : " << logfiles_[2];
215 EXPECT_THAT(
216 CountChannelsTimestamp(config, logfiles_[3]),
217 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
218 << " : " << logfiles_[3];
219 EXPECT_THAT(
220 CountChannelsTimestamp(config, logfiles_[4]),
221 UnorderedElementsAre(
222 std::make_tuple("/test", "aos.examples.Pong", 2000),
223 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)))
224 << " : " << logfiles_[4];
225
226 // Pong data.
227 EXPECT_THAT(
228 CountChannelsData(config, logfiles_[5]),
229 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 91)))
230 << " : " << logfiles_[5];
231 EXPECT_THAT(CountChannelsData(config, logfiles_[6]),
232 UnorderedElementsAre(
233 std::make_tuple("/test", "aos.examples.Pong", 1910)))
234 << " : " << logfiles_[6];
235
236 // No timestamps
237 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
238 UnorderedElementsAre())
239 << " : " << logfiles_[5];
240 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[6]),
241 UnorderedElementsAre())
242 << " : " << logfiles_[6];
243
244 // Timing reports and pongs.
245 EXPECT_THAT(CountChannelsData(config, logfiles_[7]),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700246 UnorderedElementsAre(
247 std::make_tuple("/pi2/aos", "aos.examples.Ping", 1),
248 std::make_tuple("/pi2/aos",
249 "aos.message_bridge.ServerStatistics", 1)))
Naman Guptaa63aa132023-03-22 20:06:34 -0700250 << " : " << logfiles_[7];
251 EXPECT_THAT(
252 CountChannelsData(config, logfiles_[8]),
253 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
254 << " : " << logfiles_[8];
255 EXPECT_THAT(
256 CountChannelsData(config, logfiles_[9]),
257 UnorderedElementsAre(
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700258 std::make_tuple("/pi2/aos", "aos.examples.Ping", 2000),
Naman Guptaa63aa132023-03-22 20:06:34 -0700259 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
260 std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
261 20),
262 std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
263 200),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700264 std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
Naman Guptaa63aa132023-03-22 20:06:34 -0700265 std::make_tuple("/test", "aos.examples.Pong", 2000)))
266 << " : " << logfiles_[9];
267 // And ping timestamps.
268 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[7]),
269 UnorderedElementsAre())
270 << " : " << logfiles_[7];
271 EXPECT_THAT(
272 CountChannelsTimestamp(config, logfiles_[8]),
273 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
274 << " : " << logfiles_[8];
275 EXPECT_THAT(
276 CountChannelsTimestamp(config, logfiles_[9]),
277 UnorderedElementsAre(
278 std::make_tuple("/test", "aos.examples.Ping", 2000),
279 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
280 << " : " << logfiles_[9];
281
282 // And then test that the remotely logged timestamp data files only have
283 // timestamps in them.
284 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
285 UnorderedElementsAre())
286 << " : " << logfiles_[10];
287 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
288 UnorderedElementsAre())
289 << " : " << logfiles_[11];
290 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[12]),
291 UnorderedElementsAre())
292 << " : " << logfiles_[12];
293 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[13]),
294 UnorderedElementsAre())
295 << " : " << logfiles_[13];
296
297 EXPECT_THAT(CountChannelsData(config, logfiles_[10]),
298 UnorderedElementsAre(std::make_tuple(
299 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
300 << " : " << logfiles_[10];
301 EXPECT_THAT(CountChannelsData(config, logfiles_[11]),
302 UnorderedElementsAre(std::make_tuple(
303 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
304 << " : " << logfiles_[11];
305
306 EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
307 UnorderedElementsAre(std::make_tuple(
308 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
309 << " : " << logfiles_[12];
310 EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
311 UnorderedElementsAre(std::make_tuple(
312 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
313 << " : " << logfiles_[13];
314
315 // Timestamps from pi2 on pi1, and the other way.
316 if (shared()) {
317 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
318 UnorderedElementsAre())
319 << " : " << logfiles_[14];
320 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
321 UnorderedElementsAre())
322 << " : " << logfiles_[15];
323 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
324 UnorderedElementsAre())
325 << " : " << logfiles_[16];
326 EXPECT_THAT(CountChannelsData(config, logfiles_[17]),
327 UnorderedElementsAre())
328 << " : " << logfiles_[17];
329 EXPECT_THAT(CountChannelsData(config, logfiles_[18]),
330 UnorderedElementsAre())
331 << " : " << logfiles_[18];
332
333 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
334 UnorderedElementsAre(
335 std::make_tuple("/test", "aos.examples.Ping", 1)))
336 << " : " << logfiles_[14];
337 EXPECT_THAT(
338 CountChannelsTimestamp(config, logfiles_[15]),
339 UnorderedElementsAre(
340 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 9),
341 std::make_tuple("/test", "aos.examples.Ping", 90)))
342 << " : " << logfiles_[15];
343 EXPECT_THAT(
344 CountChannelsTimestamp(config, logfiles_[16]),
345 UnorderedElementsAre(
346 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 191),
347 std::make_tuple("/test", "aos.examples.Ping", 1910)))
348 << " : " << logfiles_[16];
349 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
350 UnorderedElementsAre(std::make_tuple(
351 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
352 << " : " << logfiles_[17];
353 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[18]),
354 UnorderedElementsAre(std::make_tuple(
355 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
356 << " : " << logfiles_[18];
357 } else {
358 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
359 UnorderedElementsAre())
360 << " : " << logfiles_[14];
361 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
362 UnorderedElementsAre())
363 << " : " << logfiles_[15];
364 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
365 UnorderedElementsAre())
366 << " : " << logfiles_[16];
367 EXPECT_THAT(CountChannelsData(config, logfiles_[17]),
368 UnorderedElementsAre())
369 << " : " << logfiles_[17];
370 EXPECT_THAT(CountChannelsData(config, logfiles_[18]),
371 UnorderedElementsAre())
372 << " : " << logfiles_[18];
373 EXPECT_THAT(CountChannelsData(config, logfiles_[19]),
374 UnorderedElementsAre())
375 << " : " << logfiles_[19];
376
377 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
378 UnorderedElementsAre(std::make_tuple(
379 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
380 << " : " << logfiles_[14];
381 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
382 UnorderedElementsAre(std::make_tuple(
383 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
384 << " : " << logfiles_[15];
385 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[16]),
386 UnorderedElementsAre(std::make_tuple(
387 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
388 << " : " << logfiles_[16];
389 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
390 UnorderedElementsAre(std::make_tuple(
391 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
392 << " : " << logfiles_[17];
393 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[18]),
394 UnorderedElementsAre(
395 std::make_tuple("/test", "aos.examples.Ping", 91)))
396 << " : " << logfiles_[18];
397 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[19]),
398 UnorderedElementsAre(
399 std::make_tuple("/test", "aos.examples.Ping", 1910)))
400 << " : " << logfiles_[19];
401 }
402 }
403
404 LogReader reader(sorted_log_files);
405
406 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
407 log_reader_factory.set_send_delay(chrono::microseconds(0));
408
409 // This sends out the fetched messages and advances time to the start of the
410 // log file.
411 reader.Register(&log_reader_factory);
412
413 const Node *pi1 =
414 configuration::GetNode(log_reader_factory.configuration(), "pi1");
415 const Node *pi2 =
416 configuration::GetNode(log_reader_factory.configuration(), "pi2");
417
418 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
419 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
420 LOG(INFO) << "now pi1 "
421 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
422 LOG(INFO) << "now pi2 "
423 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
424
425 EXPECT_THAT(reader.LoggedNodes(),
426 ::testing::ElementsAre(
427 configuration::GetNode(reader.logged_configuration(), pi1),
428 configuration::GetNode(reader.logged_configuration(), pi2)));
429
430 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
431
432 std::unique_ptr<EventLoop> pi1_event_loop =
433 log_reader_factory.MakeEventLoop("test", pi1);
434 std::unique_ptr<EventLoop> pi2_event_loop =
435 log_reader_factory.MakeEventLoop("test", pi2);
436
437 int pi1_ping_count = 10;
438 int pi2_ping_count = 10;
439 int pi1_pong_count = 10;
440 int pi2_pong_count = 10;
441
442 // Confirm that the ping value matches.
443 pi1_event_loop->MakeWatcher(
444 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
445 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
446 << pi1_event_loop->context().monotonic_remote_time << " -> "
447 << pi1_event_loop->context().monotonic_event_time;
448 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
449 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
450 pi1_ping_count * chrono::milliseconds(10) +
451 monotonic_clock::epoch());
452 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
453 pi1_ping_count * chrono::milliseconds(10) +
454 realtime_clock::epoch());
455 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
456 pi1_event_loop->context().monotonic_event_time);
457 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
458 pi1_event_loop->context().realtime_event_time);
459
460 ++pi1_ping_count;
461 });
462 pi2_event_loop->MakeWatcher(
463 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
464 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
465 << pi2_event_loop->context().monotonic_remote_time << " -> "
466 << pi2_event_loop->context().monotonic_event_time;
467 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
468
469 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
470 pi2_ping_count * chrono::milliseconds(10) +
471 monotonic_clock::epoch());
472 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
473 pi2_ping_count * chrono::milliseconds(10) +
474 realtime_clock::epoch());
475 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time +
476 chrono::microseconds(150),
477 pi2_event_loop->context().monotonic_event_time);
478 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time +
479 chrono::microseconds(150),
480 pi2_event_loop->context().realtime_event_time);
481 ++pi2_ping_count;
482 });
483
484 constexpr ssize_t kQueueIndexOffset = -9;
485 // Confirm that the ping and pong counts both match, and the value also
486 // matches.
487 pi1_event_loop->MakeWatcher(
488 "/test", [&pi1_event_loop, &pi1_ping_count,
489 &pi1_pong_count](const examples::Pong &pong) {
490 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
491 << pi1_event_loop->context().monotonic_remote_time << " -> "
492 << pi1_event_loop->context().monotonic_event_time;
493
494 EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
495 pi1_pong_count + kQueueIndexOffset);
496 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
497 chrono::microseconds(200) +
498 pi1_pong_count * chrono::milliseconds(10) +
499 monotonic_clock::epoch());
500 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
501 chrono::microseconds(200) +
502 pi1_pong_count * chrono::milliseconds(10) +
503 realtime_clock::epoch());
504
505 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
506 chrono::microseconds(150),
507 pi1_event_loop->context().monotonic_event_time);
508 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
509 chrono::microseconds(150),
510 pi1_event_loop->context().realtime_event_time);
511
512 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
513 ++pi1_pong_count;
514 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
515 });
516 pi2_event_loop->MakeWatcher(
517 "/test", [&pi2_event_loop, &pi2_ping_count,
518 &pi2_pong_count](const examples::Pong &pong) {
519 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
520 << pi2_event_loop->context().monotonic_remote_time << " -> "
521 << pi2_event_loop->context().monotonic_event_time;
522
523 EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
524 pi2_pong_count + kQueueIndexOffset);
525
526 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
527 chrono::microseconds(200) +
528 pi2_pong_count * chrono::milliseconds(10) +
529 monotonic_clock::epoch());
530 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
531 chrono::microseconds(200) +
532 pi2_pong_count * chrono::milliseconds(10) +
533 realtime_clock::epoch());
534
535 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
536 pi2_event_loop->context().monotonic_event_time);
537 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
538 pi2_event_loop->context().realtime_event_time);
539
540 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
541 ++pi2_pong_count;
542 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
543 });
544
545 log_reader_factory.Run();
546 EXPECT_EQ(pi1_ping_count, 2010);
547 EXPECT_EQ(pi2_ping_count, 2010);
548 EXPECT_EQ(pi1_pong_count, 2010);
549 EXPECT_EQ(pi2_pong_count, 2010);
550
551 reader.Deregister();
552}
553
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600554// MultinodeLoggerTest that tests the mutate callback works across multiple
555// nodes with remapping
556TEST_P(MultinodeLoggerTest, MultiNodeRemapMutateCallback) {
557 time_converter_.StartEqual();
558 std::vector<std::string> actual_filenames;
559
560 {
561 LoggerState pi1_logger = MakeLogger(pi1_);
562 LoggerState pi2_logger = MakeLogger(pi2_);
563
564 event_loop_factory_.RunFor(chrono::milliseconds(95));
565
566 StartLogger(&pi1_logger);
567 StartLogger(&pi2_logger);
568
569 event_loop_factory_.RunFor(chrono::milliseconds(20000));
570 pi1_logger.AppendAllFilenames(&actual_filenames);
571 pi2_logger.AppendAllFilenames(&actual_filenames);
572 }
573
574 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
575
576 LogReader reader(sorted_parts, &config_.message());
577 // Remap just on pi1.
578 reader.RemapLoggedChannel<examples::Pong>(
579 "/test", configuration::GetNode(reader.configuration(), "pi1"));
580
581 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
582
583 int pong_count = 0;
584 // Adds a callback which mutates the value of the pong message before the
585 // message is sent which is the feature we are testing here
586 reader.AddBeforeSendCallback("/test",
587 [&pong_count](aos::examples::Pong *pong) {
588 pong->mutate_value(pong->value() + 1);
589 pong_count = pong->value();
590 });
591
592 // This sends out the fetched messages and advances time to the start of the
593 // log file.
594 reader.Register(&log_reader_factory);
595
596 const Node *pi1 =
597 configuration::GetNode(log_reader_factory.configuration(), "pi1");
598 const Node *pi2 =
599 configuration::GetNode(log_reader_factory.configuration(), "pi2");
600
601 EXPECT_THAT(reader.LoggedNodes(),
602 ::testing::ElementsAre(
603 configuration::GetNode(reader.logged_configuration(), pi1),
604 configuration::GetNode(reader.logged_configuration(), pi2)));
605
606 std::unique_ptr<EventLoop> pi1_event_loop =
607 log_reader_factory.MakeEventLoop("test", pi1);
608 std::unique_ptr<EventLoop> pi2_event_loop =
609 log_reader_factory.MakeEventLoop("test", pi2);
610
611 pi1_event_loop->MakeWatcher("/original/test",
612 [&pong_count](const examples::Pong &pong) {
613 EXPECT_EQ(pong_count, pong.value());
614 });
615
616 pi2_event_loop->MakeWatcher("/test",
617 [&pong_count](const examples::Pong &pong) {
618 EXPECT_EQ(pong_count, pong.value());
619 });
620
621 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
622 reader.Deregister();
623
624 EXPECT_EQ(pong_count, 2011);
625}
626
627// MultinodeLoggerTest that tests the mutate callback works across multiple
628// nodes
629TEST_P(MultinodeLoggerTest, MultiNodeMutateCallback) {
630 time_converter_.StartEqual();
631 std::vector<std::string> actual_filenames;
632
633 {
634 LoggerState pi1_logger = MakeLogger(pi1_);
635 LoggerState pi2_logger = MakeLogger(pi2_);
636
637 event_loop_factory_.RunFor(chrono::milliseconds(95));
638
639 StartLogger(&pi1_logger);
640 StartLogger(&pi2_logger);
641
642 event_loop_factory_.RunFor(chrono::milliseconds(20000));
643 pi1_logger.AppendAllFilenames(&actual_filenames);
644 pi2_logger.AppendAllFilenames(&actual_filenames);
645 }
646
647 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
648
649 LogReader reader(sorted_parts, &config_.message());
650
651 int pong_count = 0;
652 // Adds a callback which mutates the value of the pong message before the
653 // message is sent which is the feature we are testing here
654 reader.AddBeforeSendCallback("/test",
655 [&pong_count](aos::examples::Pong *pong) {
656 pong->mutate_value(pong->value() + 1);
657 pong_count = pong->value();
658 });
659
660 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
661
662 // This sends out the fetched messages and advances time to the start of the
663 // log file.
664 reader.Register(&log_reader_factory);
665
666 const Node *pi1 =
667 configuration::GetNode(log_reader_factory.configuration(), "pi1");
668 const Node *pi2 =
669 configuration::GetNode(log_reader_factory.configuration(), "pi2");
670
671 EXPECT_THAT(reader.LoggedNodes(),
672 ::testing::ElementsAre(
673 configuration::GetNode(reader.logged_configuration(), pi1),
674 configuration::GetNode(reader.logged_configuration(), pi2)));
675
676 std::unique_ptr<EventLoop> pi1_event_loop =
677 log_reader_factory.MakeEventLoop("test", pi1);
678 std::unique_ptr<EventLoop> pi2_event_loop =
679 log_reader_factory.MakeEventLoop("test", pi2);
680
681 pi1_event_loop->MakeWatcher("/test",
682 [&pong_count](const examples::Pong &pong) {
683 EXPECT_EQ(pong_count, pong.value());
684 });
685
686 pi2_event_loop->MakeWatcher("/test",
687 [&pong_count](const examples::Pong &pong) {
688 EXPECT_EQ(pong_count, pong.value());
689 });
690
691 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
692 reader.Deregister();
693
694 EXPECT_EQ(pong_count, 2011);
695}
696
697// Tests that the before send callback is only called from the sender node if it
698// is forwarded
699TEST_P(MultinodeLoggerTest, OnlyDoBeforeSendCallbackOnSenderNode) {
700 time_converter_.StartEqual();
701 {
702 LoggerState pi1_logger = MakeLogger(pi1_);
703 LoggerState pi2_logger = MakeLogger(pi2_);
704
705 event_loop_factory_.RunFor(chrono::milliseconds(95));
706
707 StartLogger(&pi1_logger);
708 StartLogger(&pi2_logger);
709
710 event_loop_factory_.RunFor(chrono::milliseconds(20000));
711 }
712
713 LogReader reader(SortParts(logfiles_));
714
715 int ping_count = 0;
716 // Adds a callback which mutates the value of the pong message before the
717 // message is sent which is the feature we are testing here
718 reader.AddBeforeSendCallback("/test",
719 [&ping_count](aos::examples::Ping *ping) {
720 ++ping_count;
721 ping->mutate_value(ping_count);
722 });
723
724 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
725 log_reader_factory.set_send_delay(chrono::microseconds(0));
726
727 reader.Register(&log_reader_factory);
728
729 const Node *pi1 =
730 configuration::GetNode(log_reader_factory.configuration(), "pi1");
731 const Node *pi2 =
732 configuration::GetNode(log_reader_factory.configuration(), "pi2");
733
734 std::unique_ptr<EventLoop> pi1_event_loop =
735 log_reader_factory.MakeEventLoop("test", pi1);
736 pi1_event_loop->SkipTimingReport();
737 std::unique_ptr<EventLoop> pi2_event_loop =
738 log_reader_factory.MakeEventLoop("test", pi2);
739 pi2_event_loop->SkipTimingReport();
740
741 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
742 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
743
744 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
745 pi1_ping_timestamp;
746 if (!shared()) {
747 pi1_ping_timestamp =
748 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
749 pi1_event_loop.get(),
750 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
751 }
752
753 log_reader_factory.Run();
754
755 EXPECT_EQ(pi1_ping.count(), 2000u);
756 EXPECT_EQ(pi2_ping.count(), 2000u);
757 // If the BeforeSendCallback is called on both nodes, then the ping count
758 // would be 4002 instead of 2001
759 EXPECT_EQ(ping_count, 2001u);
760 if (!shared()) {
761 EXPECT_EQ(pi1_ping_timestamp->count(), 2000u);
762 }
763
764 reader.Deregister();
765}
766
767// Tests that we do not allow adding callbacks after Register is called
768TEST_P(MultinodeLoggerDeathTest, AddCallbackAfterRegister) {
769 time_converter_.StartEqual();
770 std::vector<std::string> actual_filenames;
771
772 {
773 LoggerState pi1_logger = MakeLogger(pi1_);
774 LoggerState pi2_logger = MakeLogger(pi2_);
775
776 event_loop_factory_.RunFor(chrono::milliseconds(95));
777
778 StartLogger(&pi1_logger);
779 StartLogger(&pi2_logger);
780
781 event_loop_factory_.RunFor(chrono::milliseconds(20000));
782 pi1_logger.AppendAllFilenames(&actual_filenames);
783 pi2_logger.AppendAllFilenames(&actual_filenames);
784 }
785
786 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
787
788 LogReader reader(sorted_parts, &config_.message());
789 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
790 reader.Register(&log_reader_factory);
791 EXPECT_DEATH(
792 {
793 reader.AddBeforeSendCallback("/test", [](aos::examples::Pong *) {
794 LOG(FATAL) << "This should not be called";
795 });
796 },
797 "Cannot add callbacks after calling Register");
798 reader.Deregister();
799}
800
Naman Guptaa63aa132023-03-22 20:06:34 -0700801// Test that if we feed the replay with a mismatched node list that we die on
802// the LogReader constructor.
803TEST_P(MultinodeLoggerDeathTest, MultiNodeBadReplayConfig) {
804 time_converter_.StartEqual();
805 {
806 LoggerState pi1_logger = MakeLogger(pi1_);
807 LoggerState pi2_logger = MakeLogger(pi2_);
808
809 event_loop_factory_.RunFor(chrono::milliseconds(95));
810
811 StartLogger(&pi1_logger);
812 StartLogger(&pi2_logger);
813
814 event_loop_factory_.RunFor(chrono::milliseconds(20000));
815 }
816
817 // Test that, if we add an additional node to the replay config that the
818 // logger complains about the mismatch in number of nodes.
819 FlatbufferDetachedBuffer<Configuration> extra_nodes_config =
820 configuration::MergeWithConfig(&config_.message(), R"({
821 "nodes": [
822 {
823 "name": "extra-node"
824 }
825 ]
826 }
827 )");
828
829 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
830 EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
831 "Log file and replay config need to have matching nodes lists.");
832}
833
834// Tests that we can read log files where they don't start at the same monotonic
835// time.
836TEST_P(MultinodeLoggerTest, StaggeredStart) {
837 time_converter_.StartEqual();
838 std::vector<std::string> actual_filenames;
839
840 {
841 LoggerState pi1_logger = MakeLogger(pi1_);
842 LoggerState pi2_logger = MakeLogger(pi2_);
843
844 event_loop_factory_.RunFor(chrono::milliseconds(95));
845
846 StartLogger(&pi1_logger);
847
848 event_loop_factory_.RunFor(chrono::milliseconds(200));
849
850 StartLogger(&pi2_logger);
851
852 event_loop_factory_.RunFor(chrono::milliseconds(20000));
853 pi1_logger.AppendAllFilenames(&actual_filenames);
854 pi2_logger.AppendAllFilenames(&actual_filenames);
855 }
856
857 // Since we delay starting pi2, it already knows about all the timestamps so
858 // we don't end up with extra parts.
859 LogReader reader(SortParts(actual_filenames));
860
861 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
862 log_reader_factory.set_send_delay(chrono::microseconds(0));
863
864 // This sends out the fetched messages and advances time to the start of the
865 // log file.
866 reader.Register(&log_reader_factory);
867
868 const Node *pi1 =
869 configuration::GetNode(log_reader_factory.configuration(), "pi1");
870 const Node *pi2 =
871 configuration::GetNode(log_reader_factory.configuration(), "pi2");
872
873 EXPECT_THAT(reader.LoggedNodes(),
874 ::testing::ElementsAre(
875 configuration::GetNode(reader.logged_configuration(), pi1),
876 configuration::GetNode(reader.logged_configuration(), pi2)));
877
878 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
879
880 std::unique_ptr<EventLoop> pi1_event_loop =
881 log_reader_factory.MakeEventLoop("test", pi1);
882 std::unique_ptr<EventLoop> pi2_event_loop =
883 log_reader_factory.MakeEventLoop("test", pi2);
884
885 int pi1_ping_count = 30;
886 int pi2_ping_count = 30;
887 int pi1_pong_count = 30;
888 int pi2_pong_count = 30;
889
890 // Confirm that the ping value matches.
891 pi1_event_loop->MakeWatcher(
892 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
893 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
894 << pi1_event_loop->context().monotonic_remote_time << " -> "
895 << pi1_event_loop->context().monotonic_event_time;
896 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
897
898 ++pi1_ping_count;
899 });
900 pi2_event_loop->MakeWatcher(
901 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
902 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
903 << pi2_event_loop->context().monotonic_remote_time << " -> "
904 << pi2_event_loop->context().monotonic_event_time;
905 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
906
907 ++pi2_ping_count;
908 });
909
910 // Confirm that the ping and pong counts both match, and the value also
911 // matches.
912 pi1_event_loop->MakeWatcher(
913 "/test", [&pi1_event_loop, &pi1_ping_count,
914 &pi1_pong_count](const examples::Pong &pong) {
915 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
916 << pi1_event_loop->context().monotonic_remote_time << " -> "
917 << pi1_event_loop->context().monotonic_event_time;
918
919 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
920 ++pi1_pong_count;
921 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
922 });
923 pi2_event_loop->MakeWatcher(
924 "/test", [&pi2_event_loop, &pi2_ping_count,
925 &pi2_pong_count](const examples::Pong &pong) {
926 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
927 << pi2_event_loop->context().monotonic_remote_time << " -> "
928 << pi2_event_loop->context().monotonic_event_time;
929
930 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
931 ++pi2_pong_count;
932 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
933 });
934
935 log_reader_factory.Run();
936 EXPECT_EQ(pi1_ping_count, 2030);
937 EXPECT_EQ(pi2_ping_count, 2030);
938 EXPECT_EQ(pi1_pong_count, 2030);
939 EXPECT_EQ(pi2_pong_count, 2030);
940
941 reader.Deregister();
942}
943
944// Tests that we can read log files where the monotonic clocks drift and don't
945// match correctly. While we are here, also test that different ending times
946// also is readable.
947TEST_P(MultinodeLoggerTest, MismatchedClocks) {
948 // TODO(austin): Negate...
949 const chrono::nanoseconds initial_pi2_offset = chrono::seconds(1000);
950
951 time_converter_.AddMonotonic(
952 {BootTimestamp::epoch(), BootTimestamp::epoch() + initial_pi2_offset});
953 // Wait for 95 ms, (~0.1 seconds - 1/2 of the ping/pong period), and set the
954 // skew to be 200 uS/s
955 const chrono::nanoseconds startup_sleep1 = time_converter_.AddMonotonic(
956 {chrono::milliseconds(95),
957 chrono::milliseconds(95) - chrono::nanoseconds(200) * 95});
958 // Run another 200 ms to have one logger start first.
959 const chrono::nanoseconds startup_sleep2 = time_converter_.AddMonotonic(
960 {chrono::milliseconds(200), chrono::milliseconds(200)});
961 // Slew one way then the other at the same 200 uS/S slew rate. Make sure we
962 // go far enough to cause problems if this isn't accounted for.
963 const chrono::nanoseconds logger_run1 = time_converter_.AddMonotonic(
964 {chrono::milliseconds(20000),
965 chrono::milliseconds(20000) - chrono::nanoseconds(200) * 20000});
966 const chrono::nanoseconds logger_run2 = time_converter_.AddMonotonic(
967 {chrono::milliseconds(40000),
968 chrono::milliseconds(40000) + chrono::nanoseconds(200) * 40000});
969 const chrono::nanoseconds logger_run3 = time_converter_.AddMonotonic(
970 {chrono::milliseconds(400), chrono::milliseconds(400)});
971
972 {
973 LoggerState pi2_logger = MakeLogger(pi2_);
974
975 LOG(INFO) << "pi2 times: " << pi2_->monotonic_now() << " "
976 << pi2_->realtime_now() << " distributed "
977 << pi2_->ToDistributedClock(pi2_->monotonic_now());
978
979 LOG(INFO) << "pi2_ times: " << pi2_->monotonic_now() << " "
980 << pi2_->realtime_now() << " distributed "
981 << pi2_->ToDistributedClock(pi2_->monotonic_now());
982
983 event_loop_factory_.RunFor(startup_sleep1);
984
985 StartLogger(&pi2_logger);
986
987 event_loop_factory_.RunFor(startup_sleep2);
988
989 {
990 // Run pi1's logger for only part of the time.
991 LoggerState pi1_logger = MakeLogger(pi1_);
992
993 StartLogger(&pi1_logger);
994 event_loop_factory_.RunFor(logger_run1);
995
996 // Make sure we slewed time far enough so that the difference is greater
997 // than the network delay. This confirms that if we sort incorrectly, it
998 // would show in the results.
999 EXPECT_LT(
1000 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
1001 -event_loop_factory_.send_delay() -
1002 event_loop_factory_.network_delay());
1003
1004 event_loop_factory_.RunFor(logger_run2);
1005
1006 // And now check that we went far enough the other way to make sure we
1007 // cover both problems.
1008 EXPECT_GT(
1009 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
1010 event_loop_factory_.send_delay() +
1011 event_loop_factory_.network_delay());
1012 }
1013
1014 // And log a bit more on pi2.
1015 event_loop_factory_.RunFor(logger_run3);
1016 }
1017
1018 LogReader reader(
James Kuszmaul298b4a22023-06-28 20:01:03 -07001019 SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 3, 3)));
Naman Guptaa63aa132023-03-22 20:06:34 -07001020
1021 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1022 log_reader_factory.set_send_delay(chrono::microseconds(0));
1023
1024 const Node *pi1 =
1025 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1026 const Node *pi2 =
1027 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1028
1029 // This sends out the fetched messages and advances time to the start of the
1030 // log file.
1031 reader.Register(&log_reader_factory);
1032
1033 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1034 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1035 LOG(INFO) << "now pi1 "
1036 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1037 LOG(INFO) << "now pi2 "
1038 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1039
1040 LOG(INFO) << "Done registering (pi1) "
1041 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
1042 << " "
1043 << log_reader_factory.GetNodeEventLoopFactory(pi1)->realtime_now();
1044 LOG(INFO) << "Done registering (pi2) "
1045 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now()
1046 << " "
1047 << log_reader_factory.GetNodeEventLoopFactory(pi2)->realtime_now();
1048
1049 EXPECT_THAT(reader.LoggedNodes(),
1050 ::testing::ElementsAre(
1051 configuration::GetNode(reader.logged_configuration(), pi1),
1052 configuration::GetNode(reader.logged_configuration(), pi2)));
1053
1054 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1055
1056 std::unique_ptr<EventLoop> pi1_event_loop =
1057 log_reader_factory.MakeEventLoop("test", pi1);
1058 std::unique_ptr<EventLoop> pi2_event_loop =
1059 log_reader_factory.MakeEventLoop("test", pi2);
1060
1061 int pi1_ping_count = 30;
1062 int pi2_ping_count = 30;
1063 int pi1_pong_count = 30;
1064 int pi2_pong_count = 30;
1065
1066 // Confirm that the ping value matches.
1067 pi1_event_loop->MakeWatcher(
1068 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
1069 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
1070 << pi1_event_loop->context().monotonic_remote_time << " -> "
1071 << pi1_event_loop->context().monotonic_event_time;
1072 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
1073
1074 ++pi1_ping_count;
1075 });
1076 pi2_event_loop->MakeWatcher(
1077 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
1078 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
1079 << pi2_event_loop->context().monotonic_remote_time << " -> "
1080 << pi2_event_loop->context().monotonic_event_time;
1081 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
1082
1083 ++pi2_ping_count;
1084 });
1085
1086 // Confirm that the ping and pong counts both match, and the value also
1087 // matches.
1088 pi1_event_loop->MakeWatcher(
1089 "/test", [&pi1_event_loop, &pi1_ping_count,
1090 &pi1_pong_count](const examples::Pong &pong) {
1091 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
1092 << pi1_event_loop->context().monotonic_remote_time << " -> "
1093 << pi1_event_loop->context().monotonic_event_time;
1094
1095 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
1096 ++pi1_pong_count;
1097 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
1098 });
1099 pi2_event_loop->MakeWatcher(
1100 "/test", [&pi2_event_loop, &pi2_ping_count,
1101 &pi2_pong_count](const examples::Pong &pong) {
1102 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
1103 << pi2_event_loop->context().monotonic_remote_time << " -> "
1104 << pi2_event_loop->context().monotonic_event_time;
1105
1106 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
1107 ++pi2_pong_count;
1108 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
1109 });
1110
1111 log_reader_factory.Run();
1112 EXPECT_EQ(pi1_ping_count, 6030);
1113 EXPECT_EQ(pi2_ping_count, 6030);
1114 EXPECT_EQ(pi1_pong_count, 6030);
1115 EXPECT_EQ(pi2_pong_count, 6030);
1116
1117 reader.Deregister();
1118}
1119
1120// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
1121TEST_P(MultinodeLoggerTest, SortParts) {
1122 time_converter_.StartEqual();
1123 // Make a bunch of parts.
1124 {
1125 LoggerState pi1_logger = MakeLogger(pi1_);
1126 LoggerState pi2_logger = MakeLogger(pi2_);
1127
1128 event_loop_factory_.RunFor(chrono::milliseconds(95));
1129
1130 StartLogger(&pi1_logger);
1131 StartLogger(&pi2_logger);
1132
1133 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1134 }
1135
1136 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1137 VerifyParts(sorted_parts);
1138}
1139
1140// Tests that we can sort a bunch of parts with an empty part. We should ignore
1141// it and remove it from the sorted list.
1142TEST_P(MultinodeLoggerTest, SortEmptyParts) {
1143 time_converter_.StartEqual();
1144 // Make a bunch of parts.
1145 {
1146 LoggerState pi1_logger = MakeLogger(pi1_);
1147 LoggerState pi2_logger = MakeLogger(pi2_);
1148
1149 event_loop_factory_.RunFor(chrono::milliseconds(95));
1150
1151 StartLogger(&pi1_logger);
1152 StartLogger(&pi2_logger);
1153
1154 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1155 }
1156
1157 // TODO(austin): Should we flip out if the file can't open?
1158 const std::string kEmptyFile("foobarinvalidfiledoesnotexist" + Extension());
1159
1160 aos::util::WriteStringToFileOrDie(kEmptyFile, "");
1161 logfiles_.emplace_back(kEmptyFile);
1162
1163 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1164 VerifyParts(sorted_parts, {kEmptyFile});
1165}
1166
1167// Tests that we can sort a bunch of parts with the end missing off a
1168// file. We should use the part we can read.
1169TEST_P(MultinodeLoggerTest, SortTruncatedParts) {
1170 std::vector<std::string> actual_filenames;
1171 time_converter_.StartEqual();
1172 // Make a bunch of parts.
1173 {
1174 LoggerState pi1_logger = MakeLogger(pi1_);
1175 LoggerState pi2_logger = MakeLogger(pi2_);
1176
1177 event_loop_factory_.RunFor(chrono::milliseconds(95));
1178
1179 StartLogger(&pi1_logger);
1180 StartLogger(&pi2_logger);
1181
1182 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1183
1184 pi1_logger.AppendAllFilenames(&actual_filenames);
1185 pi2_logger.AppendAllFilenames(&actual_filenames);
1186 }
1187
1188 ASSERT_THAT(actual_filenames,
1189 ::testing::UnorderedElementsAreArray(logfiles_));
1190
1191 // Strip off the end of one of the files. Pick one with a lot of data.
1192 // For snappy, needs to have enough data to be >1 chunk of compressed data so
1193 // that we don't corrupt the entire log part.
1194 ::std::string compressed_contents =
1195 aos::util::ReadFileToStringOrDie(logfiles_[4]);
1196
1197 aos::util::WriteStringToFileOrDie(
1198 logfiles_[4],
1199 compressed_contents.substr(0, compressed_contents.size() - 100));
1200
1201 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1202 VerifyParts(sorted_parts);
1203}
1204
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001205// Tests that if we remap a logged channel, it shows up correctly.
Naman Guptaa63aa132023-03-22 20:06:34 -07001206TEST_P(MultinodeLoggerTest, RemapLoggedChannel) {
1207 time_converter_.StartEqual();
1208 {
1209 LoggerState pi1_logger = MakeLogger(pi1_);
1210 LoggerState pi2_logger = MakeLogger(pi2_);
1211
1212 event_loop_factory_.RunFor(chrono::milliseconds(95));
1213
1214 StartLogger(&pi1_logger);
1215 StartLogger(&pi2_logger);
1216
1217 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1218 }
1219
1220 LogReader reader(SortParts(logfiles_));
1221
1222 // Remap just on pi1.
1223 reader.RemapLoggedChannel<aos::timing::Report>(
1224 "/aos", configuration::GetNode(reader.configuration(), "pi1"));
1225
1226 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1227 log_reader_factory.set_send_delay(chrono::microseconds(0));
1228
1229 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1230 // Note: An extra channel gets remapped automatically due to a timestamp
1231 // channel being LOCAL_LOGGER'd.
1232 ASSERT_EQ(remapped_channels.size(), std::get<0>(GetParam()).shared ? 1u : 2u);
1233 EXPECT_EQ(remapped_channels[0]->name()->string_view(), "/original/pi1/aos");
1234 EXPECT_EQ(remapped_channels[0]->type()->string_view(), "aos.timing.Report");
1235 if (!std::get<0>(GetParam()).shared) {
1236 EXPECT_EQ(remapped_channels[1]->name()->string_view(),
1237 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1238 "aos-message_bridge-Timestamp");
1239 EXPECT_EQ(remapped_channels[1]->type()->string_view(),
1240 "aos.message_bridge.RemoteMessage");
1241 }
1242
1243 reader.Register(&log_reader_factory);
1244
1245 const Node *pi1 =
1246 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1247 const Node *pi2 =
1248 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1249
1250 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1251 // else should have moved.
1252 std::unique_ptr<EventLoop> pi1_event_loop =
1253 log_reader_factory.MakeEventLoop("test", pi1);
1254 pi1_event_loop->SkipTimingReport();
1255 std::unique_ptr<EventLoop> full_pi1_event_loop =
1256 log_reader_factory.MakeEventLoop("test", pi1);
1257 full_pi1_event_loop->SkipTimingReport();
1258 std::unique_ptr<EventLoop> pi2_event_loop =
1259 log_reader_factory.MakeEventLoop("test", pi2);
1260 pi2_event_loop->SkipTimingReport();
1261
1262 MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
1263 "/aos");
1264 MessageCounter<aos::timing::Report> full_pi1_timing_report(
1265 full_pi1_event_loop.get(), "/pi1/aos");
1266 MessageCounter<aos::timing::Report> pi1_original_timing_report(
1267 pi1_event_loop.get(), "/original/aos");
1268 MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
1269 full_pi1_event_loop.get(), "/original/pi1/aos");
1270 MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
1271 "/aos");
1272
1273 log_reader_factory.Run();
1274
1275 EXPECT_EQ(pi1_timing_report.count(), 0u);
1276 EXPECT_EQ(full_pi1_timing_report.count(), 0u);
1277 EXPECT_NE(pi1_original_timing_report.count(), 0u);
1278 EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
1279 EXPECT_NE(pi2_timing_report.count(), 0u);
1280
1281 reader.Deregister();
1282}
1283
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001284// Tests that if we rename a logged channel, it shows up correctly.
1285TEST_P(MultinodeLoggerTest, RenameLoggedChannel) {
1286 std::vector<std::string> actual_filenames;
1287 time_converter_.StartEqual();
1288 {
1289 LoggerState pi1_logger = MakeLogger(pi1_);
1290 LoggerState pi2_logger = MakeLogger(pi2_);
1291
1292 event_loop_factory_.RunFor(chrono::milliseconds(95));
1293
1294 StartLogger(&pi1_logger);
1295 StartLogger(&pi2_logger);
1296
1297 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1298
1299 pi1_logger.AppendAllFilenames(&actual_filenames);
1300 pi2_logger.AppendAllFilenames(&actual_filenames);
1301 }
1302
1303 LogReader reader(SortParts(actual_filenames));
1304
1305 // Rename just on pi2. Add some global maps just to verify they get added in
1306 // the config and used correctly.
1307 std::vector<MapT> maps;
1308 {
1309 MapT map;
1310 map.match = std::make_unique<ChannelT>();
1311 map.match->name = "/foo*";
1312 map.match->source_node = "pi1";
1313 map.rename = std::make_unique<ChannelT>();
1314 map.rename->name = "/pi1/foo";
1315 maps.emplace_back(std::move(map));
1316 }
1317 {
1318 MapT map;
1319 map.match = std::make_unique<ChannelT>();
1320 map.match->name = "/foo*";
1321 map.match->source_node = "pi2";
1322 map.rename = std::make_unique<ChannelT>();
1323 map.rename->name = "/pi2/foo";
1324 maps.emplace_back(std::move(map));
1325 }
1326 {
1327 MapT map;
1328 map.match = std::make_unique<ChannelT>();
1329 map.match->name = "/foo";
1330 map.match->type = "aos.examples.Ping";
1331 map.rename = std::make_unique<ChannelT>();
1332 map.rename->name = "/foo/renamed";
1333 maps.emplace_back(std::move(map));
1334 }
1335 reader.RenameLoggedChannel<aos::examples::Ping>(
1336 "/aos", configuration::GetNode(reader.configuration(), "pi2"),
1337 "/pi2/foo/renamed", maps);
1338
1339 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1340 log_reader_factory.set_send_delay(chrono::microseconds(0));
1341
1342 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1343 // Note: An extra channel gets remapped automatically due to a timestamp
1344 // channel being LOCAL_LOGGER'd.
1345 const bool shared = std::get<0>(GetParam()).shared;
1346 ASSERT_EQ(remapped_channels.size(), shared ? 1u : 2u);
1347 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->name()->string_view(),
1348 "/pi2/foo/renamed");
1349 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->type()->string_view(),
1350 "aos.examples.Ping");
1351 if (!shared) {
1352 EXPECT_EQ(remapped_channels[0]->name()->string_view(),
1353 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1354 "aos-message_bridge-Timestamp");
1355 EXPECT_EQ(remapped_channels[0]->type()->string_view(),
1356 "aos.message_bridge.RemoteMessage");
1357 }
1358
1359 reader.Register(&log_reader_factory);
1360
1361 const Node *pi1 =
1362 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1363 const Node *pi2 =
1364 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1365
1366 // Confirm we can read the data on the renamed channel, just for pi2. Nothing
1367 // else should have moved.
1368 std::unique_ptr<EventLoop> pi2_event_loop =
1369 log_reader_factory.MakeEventLoop("test", pi2);
1370 pi2_event_loop->SkipTimingReport();
1371 std::unique_ptr<EventLoop> full_pi2_event_loop =
1372 log_reader_factory.MakeEventLoop("test", pi2);
1373 full_pi2_event_loop->SkipTimingReport();
1374 std::unique_ptr<EventLoop> pi1_event_loop =
1375 log_reader_factory.MakeEventLoop("test", pi1);
1376 pi1_event_loop->SkipTimingReport();
1377
1378 MessageCounter<aos::examples::Ping> pi2_ping(pi2_event_loop.get(), "/aos");
1379 MessageCounter<aos::examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1380 "/foo");
1381 MessageCounter<aos::examples::Ping> full_pi2_renamed_ping(
1382 full_pi2_event_loop.get(), "/pi2/foo/renamed");
1383 MessageCounter<aos::examples::Ping> pi1_ping(pi1_event_loop.get(), "/aos");
1384
1385 log_reader_factory.Run();
1386
1387 EXPECT_EQ(pi2_ping.count(), 0u);
1388 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1389 EXPECT_NE(full_pi2_renamed_ping.count(), 0u);
1390 EXPECT_NE(pi1_ping.count(), 0u);
1391
1392 reader.Deregister();
1393}
1394
Naman Guptaa63aa132023-03-22 20:06:34 -07001395// Tests that we can remap a forwarded channel as well.
1396TEST_P(MultinodeLoggerTest, RemapForwardedLoggedChannel) {
1397 time_converter_.StartEqual();
1398 {
1399 LoggerState pi1_logger = MakeLogger(pi1_);
1400 LoggerState pi2_logger = MakeLogger(pi2_);
1401
1402 event_loop_factory_.RunFor(chrono::milliseconds(95));
1403
1404 StartLogger(&pi1_logger);
1405 StartLogger(&pi2_logger);
1406
1407 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1408 }
1409
1410 LogReader reader(SortParts(logfiles_));
1411
1412 reader.RemapLoggedChannel<examples::Ping>("/test");
1413
1414 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1415 log_reader_factory.set_send_delay(chrono::microseconds(0));
1416
1417 reader.Register(&log_reader_factory);
1418
1419 const Node *pi1 =
1420 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1421 const Node *pi2 =
1422 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1423
1424 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1425 // else should have moved.
1426 std::unique_ptr<EventLoop> pi1_event_loop =
1427 log_reader_factory.MakeEventLoop("test", pi1);
1428 pi1_event_loop->SkipTimingReport();
1429 std::unique_ptr<EventLoop> full_pi1_event_loop =
1430 log_reader_factory.MakeEventLoop("test", pi1);
1431 full_pi1_event_loop->SkipTimingReport();
1432 std::unique_ptr<EventLoop> pi2_event_loop =
1433 log_reader_factory.MakeEventLoop("test", pi2);
1434 pi2_event_loop->SkipTimingReport();
1435
1436 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1437 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1438 MessageCounter<examples::Ping> pi1_original_ping(pi1_event_loop.get(),
1439 "/original/test");
1440 MessageCounter<examples::Ping> pi2_original_ping(pi2_event_loop.get(),
1441 "/original/test");
1442
1443 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1444 pi1_original_ping_timestamp;
1445 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1446 pi1_ping_timestamp;
1447 if (!shared()) {
1448 pi1_original_ping_timestamp =
1449 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1450 pi1_event_loop.get(),
1451 "/pi1/aos/remote_timestamps/pi2/original/test/aos-examples-Ping");
1452 pi1_ping_timestamp =
1453 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1454 pi1_event_loop.get(),
1455 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1456 }
1457
1458 log_reader_factory.Run();
1459
1460 EXPECT_EQ(pi1_ping.count(), 0u);
1461 EXPECT_EQ(pi2_ping.count(), 0u);
1462 EXPECT_NE(pi1_original_ping.count(), 0u);
1463 EXPECT_NE(pi2_original_ping.count(), 0u);
1464 if (!shared()) {
1465 EXPECT_NE(pi1_original_ping_timestamp->count(), 0u);
1466 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1467 }
1468
1469 reader.Deregister();
1470}
1471
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001472// Tests that we can rename a forwarded channel as well.
1473TEST_P(MultinodeLoggerTest, RenameForwardedLoggedChannel) {
1474 std::vector<std::string> actual_filenames;
1475 time_converter_.StartEqual();
1476 {
1477 LoggerState pi1_logger = MakeLogger(pi1_);
1478 LoggerState pi2_logger = MakeLogger(pi2_);
1479
1480 event_loop_factory_.RunFor(chrono::milliseconds(95));
1481
1482 StartLogger(&pi1_logger);
1483 StartLogger(&pi2_logger);
1484
1485 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1486
1487 pi1_logger.AppendAllFilenames(&actual_filenames);
1488 pi2_logger.AppendAllFilenames(&actual_filenames);
1489 }
1490
1491 LogReader reader(SortParts(actual_filenames));
1492
1493 std::vector<MapT> maps;
1494 {
1495 MapT map;
1496 map.match = std::make_unique<ChannelT>();
1497 map.match->name = "/production*";
1498 map.match->source_node = "pi1";
1499 map.rename = std::make_unique<ChannelT>();
1500 map.rename->name = "/pi1/production";
1501 maps.emplace_back(std::move(map));
1502 }
1503 {
1504 MapT map;
1505 map.match = std::make_unique<ChannelT>();
1506 map.match->name = "/production*";
1507 map.match->source_node = "pi2";
1508 map.rename = std::make_unique<ChannelT>();
1509 map.rename->name = "/pi2/production";
1510 maps.emplace_back(std::move(map));
1511 }
1512 reader.RenameLoggedChannel<aos::examples::Ping>(
1513 "/test", configuration::GetNode(reader.configuration(), "pi1"),
1514 "/pi1/production", maps);
1515
1516 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1517 log_reader_factory.set_send_delay(chrono::microseconds(0));
1518
1519 reader.Register(&log_reader_factory);
1520
1521 const Node *pi1 =
1522 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1523 const Node *pi2 =
1524 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1525
1526 // Confirm we can read the data on the renamed channel, on both the source
1527 // node and the remote node. In case of split timestamp channels, confirm that
1528 // we receive the timestamp messages on the renamed channel as well.
1529 std::unique_ptr<EventLoop> pi1_event_loop =
1530 log_reader_factory.MakeEventLoop("test", pi1);
1531 pi1_event_loop->SkipTimingReport();
1532 std::unique_ptr<EventLoop> full_pi1_event_loop =
1533 log_reader_factory.MakeEventLoop("test", pi1);
1534 full_pi1_event_loop->SkipTimingReport();
1535 std::unique_ptr<EventLoop> pi2_event_loop =
1536 log_reader_factory.MakeEventLoop("test", pi2);
1537 pi2_event_loop->SkipTimingReport();
1538
1539 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1540 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1541 MessageCounter<examples::Ping> pi1_renamed_ping(pi1_event_loop.get(),
1542 "/pi1/production");
1543 MessageCounter<examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1544 "/pi1/production");
1545
1546 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1547 pi1_renamed_ping_timestamp;
1548 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1549 pi1_ping_timestamp;
1550 if (!shared()) {
1551 pi1_renamed_ping_timestamp =
1552 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1553 pi1_event_loop.get(),
1554 "/pi1/aos/remote_timestamps/pi2/pi1/production/aos-examples-Ping");
1555 pi1_ping_timestamp =
1556 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1557 pi1_event_loop.get(),
1558 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1559 }
1560
1561 log_reader_factory.Run();
1562
1563 EXPECT_EQ(pi1_ping.count(), 0u);
1564 EXPECT_EQ(pi2_ping.count(), 0u);
1565 EXPECT_NE(pi1_renamed_ping.count(), 0u);
1566 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1567 if (!shared()) {
1568 EXPECT_NE(pi1_renamed_ping_timestamp->count(), 0u);
1569 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1570 }
1571
1572 reader.Deregister();
1573}
1574
Naman Guptaa63aa132023-03-22 20:06:34 -07001575// Tests that we observe all the same events in log replay (for a given node)
1576// whether we just register an event loop for that node or if we register a full
1577// event loop factory.
1578TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
1579 time_converter_.StartEqual();
1580 constexpr chrono::milliseconds kStartupDelay(95);
1581 {
1582 LoggerState pi1_logger = MakeLogger(pi1_);
1583 LoggerState pi2_logger = MakeLogger(pi2_);
1584
1585 event_loop_factory_.RunFor(kStartupDelay);
1586
1587 StartLogger(&pi1_logger);
1588 StartLogger(&pi2_logger);
1589
1590 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1591 }
1592
1593 LogReader full_reader(SortParts(logfiles_));
1594 LogReader single_node_reader(SortParts(logfiles_));
1595
1596 SimulatedEventLoopFactory full_factory(full_reader.configuration());
1597 SimulatedEventLoopFactory single_node_factory(
1598 single_node_reader.configuration());
1599 single_node_factory.SkipTimingReport();
1600 single_node_factory.DisableStatistics();
1601 std::unique_ptr<EventLoop> replay_event_loop =
1602 single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
1603 "log_reader");
1604
1605 full_reader.Register(&full_factory);
1606 single_node_reader.Register(replay_event_loop.get());
1607
1608 const Node *full_pi1 =
1609 configuration::GetNode(full_factory.configuration(), "pi1");
1610
1611 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1612 // else should have moved.
1613 std::unique_ptr<EventLoop> full_event_loop =
1614 full_factory.MakeEventLoop("test", full_pi1);
1615 full_event_loop->SkipTimingReport();
1616 full_event_loop->SkipAosLog();
1617 // maps are indexed on channel index.
1618 // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
1619 std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
1620 observed_messages;
1621 std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
1622 for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
1623 ++ii) {
1624 const Channel *channel =
1625 full_event_loop->configuration()->channels()->Get(ii);
1626 // We currently don't support replaying remote timestamp channels in
1627 // realtime replay (unless the remote timestamp channel was not NOT_LOGGED,
1628 // in which case it gets auto-remapped and replayed on a /original channel).
1629 if (channel->name()->string_view().find("remote_timestamp") !=
1630 std::string_view::npos &&
1631 channel->name()->string_view().find("/original") ==
1632 std::string_view::npos) {
1633 continue;
1634 }
1635 if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
1636 observed_messages[ii] = {};
1637 fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
1638 full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
1639 if (fetchers[ii]->Fetch()) {
1640 observed_messages[ii].push_back(std::make_pair(
1641 fetchers[ii]->context().monotonic_event_time, true));
1642 }
1643 });
1644 full_event_loop->MakeRawNoArgWatcher(
1645 channel, [ii, &observed_messages](const Context &context) {
1646 observed_messages[ii].push_back(
1647 std::make_pair(context.monotonic_event_time, false));
1648 });
1649 }
1650 }
1651
1652 full_factory.Run();
1653 fetchers.clear();
1654 full_reader.Deregister();
1655
1656 const Node *single_node_pi1 =
1657 configuration::GetNode(single_node_factory.configuration(), "pi1");
1658 std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
1659
1660 std::unique_ptr<EventLoop> single_node_event_loop =
1661 single_node_factory.MakeEventLoop("test", single_node_pi1);
1662 single_node_event_loop->SkipTimingReport();
1663 single_node_event_loop->SkipAosLog();
1664 for (size_t ii = 0;
1665 ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
1666 const Channel *channel =
1667 single_node_event_loop->configuration()->channels()->Get(ii);
1668 single_node_factory.DisableForwarding(channel);
1669 if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
1670 single_node_fetchers[ii] =
1671 single_node_event_loop->MakeRawFetcher(channel);
1672 single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
1673 EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
1674 << "Single EventLoop replay doesn't support pre-loading fetchers. "
1675 << configuration::StrippedChannelToString(channel);
1676 });
1677 single_node_event_loop->MakeRawNoArgWatcher(
1678 channel, [ii, &observed_messages, channel,
1679 kStartupDelay](const Context &context) {
1680 if (observed_messages[ii].empty()) {
1681 FAIL() << "Observed extra message at "
1682 << context.monotonic_event_time << " on "
1683 << configuration::StrippedChannelToString(channel);
1684 return;
1685 }
1686 const std::pair<monotonic_clock::time_point, bool> &message =
1687 observed_messages[ii].front();
1688 if (message.second) {
1689 EXPECT_LE(message.first,
1690 context.monotonic_event_time + kStartupDelay)
1691 << "Mismatched message times " << context.monotonic_event_time
1692 << " and " << message.first << " on "
1693 << configuration::StrippedChannelToString(channel);
1694 } else {
1695 EXPECT_EQ(message.first,
1696 context.monotonic_event_time + kStartupDelay)
1697 << "Mismatched message times " << context.monotonic_event_time
1698 << " and " << message.first << " on "
1699 << configuration::StrippedChannelToString(channel);
1700 }
1701 observed_messages[ii].erase(observed_messages[ii].begin());
1702 });
1703 }
1704 }
1705
1706 single_node_factory.Run();
1707
1708 single_node_fetchers.clear();
1709
1710 single_node_reader.Deregister();
1711
1712 for (const auto &pair : observed_messages) {
1713 EXPECT_TRUE(pair.second.empty())
1714 << "Missed " << pair.second.size() << " messages on "
1715 << configuration::StrippedChannelToString(
1716 single_node_event_loop->configuration()->channels()->Get(
1717 pair.first));
1718 }
1719}
1720
1721// Tests that we properly recreate forwarded timestamps when replaying a log.
1722// This should be enough that we can then re-run the logger and get a valid log
1723// back.
1724TEST_P(MultinodeLoggerTest, MessageHeader) {
1725 time_converter_.StartEqual();
1726 {
1727 LoggerState pi1_logger = MakeLogger(pi1_);
1728 LoggerState pi2_logger = MakeLogger(pi2_);
1729
1730 event_loop_factory_.RunFor(chrono::milliseconds(95));
1731
1732 StartLogger(&pi1_logger);
1733 StartLogger(&pi2_logger);
1734
1735 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1736 }
1737
1738 LogReader reader(SortParts(logfiles_));
1739
1740 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1741 log_reader_factory.set_send_delay(chrono::microseconds(0));
1742
1743 // This sends out the fetched messages and advances time to the start of the
1744 // log file.
1745 reader.Register(&log_reader_factory);
1746
1747 const Node *pi1 =
1748 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1749 const Node *pi2 =
1750 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1751
1752 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1753 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1754 LOG(INFO) << "now pi1 "
1755 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1756 LOG(INFO) << "now pi2 "
1757 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1758
1759 EXPECT_THAT(reader.LoggedNodes(),
1760 ::testing::ElementsAre(
1761 configuration::GetNode(reader.logged_configuration(), pi1),
1762 configuration::GetNode(reader.logged_configuration(), pi2)));
1763
1764 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1765
1766 std::unique_ptr<EventLoop> pi1_event_loop =
1767 log_reader_factory.MakeEventLoop("test", pi1);
1768 std::unique_ptr<EventLoop> pi2_event_loop =
1769 log_reader_factory.MakeEventLoop("test", pi2);
1770
1771 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
1772 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1773 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
1774 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1775
1776 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1777 pi1_event_loop->MakeFetcher<examples::Ping>("/test");
1778 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1779 pi2_event_loop->MakeFetcher<examples::Ping>("/test");
1780
1781 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
1782 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1783 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
1784 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1785
1786 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1787 pi2_event_loop->MakeFetcher<examples::Pong>("/test");
1788 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1789 pi1_event_loop->MakeFetcher<examples::Pong>("/test");
1790
1791 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
1792 pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
1793 const size_t ping_timestamp_channel = configuration::ChannelIndex(
1794 pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
1795
1796 const size_t pi2_timestamp_channel = configuration::ChannelIndex(
1797 pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
1798 const size_t pong_timestamp_channel = configuration::ChannelIndex(
1799 pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
1800
1801 const chrono::nanoseconds network_delay = event_loop_factory_.network_delay();
1802 const chrono::nanoseconds send_delay = event_loop_factory_.send_delay();
1803
1804 for (std::pair<int, std::string> channel :
1805 shared()
1806 ? std::vector<
1807 std::pair<int, std::string>>{{-1,
1808 "/aos/remote_timestamps/pi2"}}
1809 : std::vector<std::pair<int, std::string>>{
1810 {pi1_timestamp_channel,
1811 "/aos/remote_timestamps/pi2/pi1/aos/"
1812 "aos-message_bridge-Timestamp"},
1813 {ping_timestamp_channel,
1814 "/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
1815 pi1_event_loop->MakeWatcher(
1816 channel.second,
1817 [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
1818 ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
1819 &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
1820 &ping_on_pi2_fetcher, network_delay, send_delay,
1821 channel_index = channel.first](const RemoteMessage &header) {
1822 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1823 chrono::nanoseconds(header.monotonic_sent_time()));
1824 const aos::realtime_clock::time_point header_realtime_sent_time(
1825 chrono::nanoseconds(header.realtime_sent_time()));
1826 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1827 chrono::nanoseconds(header.monotonic_remote_time()));
1828 const aos::realtime_clock::time_point header_realtime_remote_time(
1829 chrono::nanoseconds(header.realtime_remote_time()));
1830
1831 if (channel_index != -1) {
1832 ASSERT_EQ(channel_index, header.channel_index());
1833 }
1834
1835 const Context *pi1_context = nullptr;
1836 const Context *pi2_context = nullptr;
1837
1838 if (header.channel_index() == pi1_timestamp_channel) {
1839 ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
1840 ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
1841 pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
1842 pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
1843 } else if (header.channel_index() == ping_timestamp_channel) {
1844 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
1845 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
1846 pi1_context = &ping_on_pi1_fetcher.context();
1847 pi2_context = &ping_on_pi2_fetcher.context();
1848 } else {
1849 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1850 << configuration::CleanedChannelToString(
1851 pi1_event_loop->configuration()->channels()->Get(
1852 header.channel_index()));
1853 }
1854
1855 ASSERT_TRUE(header.has_boot_uuid());
1856 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1857 pi2_event_loop->boot_uuid());
1858
1859 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
1860 EXPECT_EQ(pi2_context->remote_queue_index,
1861 header.remote_queue_index());
1862 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
1863
1864 EXPECT_EQ(pi2_context->monotonic_event_time,
1865 header_monotonic_sent_time);
1866 EXPECT_EQ(pi2_context->realtime_event_time,
1867 header_realtime_sent_time);
1868 EXPECT_EQ(pi2_context->realtime_remote_time,
1869 header_realtime_remote_time);
1870 EXPECT_EQ(pi2_context->monotonic_remote_time,
1871 header_monotonic_remote_time);
1872
1873 EXPECT_EQ(pi1_context->realtime_event_time,
1874 header_realtime_remote_time);
1875 EXPECT_EQ(pi1_context->monotonic_event_time,
1876 header_monotonic_remote_time);
1877
1878 // Time estimation isn't perfect, but we know the clocks were
1879 // identical when logged, so we know when this should have come back.
1880 // Confirm we got it when we expected.
1881 EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
1882 pi1_context->monotonic_event_time + 2 * network_delay +
1883 send_delay);
1884 });
1885 }
1886 for (std::pair<int, std::string> channel :
1887 shared()
1888 ? std::vector<
1889 std::pair<int, std::string>>{{-1,
1890 "/aos/remote_timestamps/pi1"}}
1891 : std::vector<std::pair<int, std::string>>{
1892 {pi2_timestamp_channel,
1893 "/aos/remote_timestamps/pi1/pi2/aos/"
1894 "aos-message_bridge-Timestamp"}}) {
1895 pi2_event_loop->MakeWatcher(
1896 channel.second,
1897 [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
1898 pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
1899 &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
1900 &pong_on_pi1_fetcher, network_delay, send_delay,
1901 channel_index = channel.first](const RemoteMessage &header) {
1902 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1903 chrono::nanoseconds(header.monotonic_sent_time()));
1904 const aos::realtime_clock::time_point header_realtime_sent_time(
1905 chrono::nanoseconds(header.realtime_sent_time()));
1906 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1907 chrono::nanoseconds(header.monotonic_remote_time()));
1908 const aos::realtime_clock::time_point header_realtime_remote_time(
1909 chrono::nanoseconds(header.realtime_remote_time()));
1910
1911 if (channel_index != -1) {
1912 ASSERT_EQ(channel_index, header.channel_index());
1913 }
1914
1915 const Context *pi2_context = nullptr;
1916 const Context *pi1_context = nullptr;
1917
1918 if (header.channel_index() == pi2_timestamp_channel) {
1919 ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
1920 ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
1921 pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
1922 pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
1923 } else if (header.channel_index() == pong_timestamp_channel) {
1924 ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
1925 ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
1926 pi2_context = &pong_on_pi2_fetcher.context();
1927 pi1_context = &pong_on_pi1_fetcher.context();
1928 } else {
1929 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1930 << configuration::CleanedChannelToString(
1931 pi2_event_loop->configuration()->channels()->Get(
1932 header.channel_index()));
1933 }
1934
1935 ASSERT_TRUE(header.has_boot_uuid());
1936 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1937 pi1_event_loop->boot_uuid());
1938
1939 EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
1940 EXPECT_EQ(pi1_context->remote_queue_index,
1941 header.remote_queue_index());
1942 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
1943
1944 EXPECT_EQ(pi1_context->monotonic_event_time,
1945 header_monotonic_sent_time);
1946 EXPECT_EQ(pi1_context->realtime_event_time,
1947 header_realtime_sent_time);
1948 EXPECT_EQ(pi1_context->realtime_remote_time,
1949 header_realtime_remote_time);
1950 EXPECT_EQ(pi1_context->monotonic_remote_time,
1951 header_monotonic_remote_time);
1952
1953 EXPECT_EQ(pi2_context->realtime_event_time,
1954 header_realtime_remote_time);
1955 EXPECT_EQ(pi2_context->monotonic_event_time,
1956 header_monotonic_remote_time);
1957
1958 // Time estimation isn't perfect, but we know the clocks were
1959 // identical when logged, so we know when this should have come back.
1960 // Confirm we got it when we expected.
1961 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
1962 pi2_context->monotonic_event_time + 2 * network_delay +
1963 send_delay);
1964 });
1965 }
1966
1967 // And confirm we can re-create a log again, while checking the contents.
1968 {
1969 LoggerState pi1_logger = MakeLogger(
1970 log_reader_factory.GetNodeEventLoopFactory("pi1"), &log_reader_factory);
1971 LoggerState pi2_logger = MakeLogger(
1972 log_reader_factory.GetNodeEventLoopFactory("pi2"), &log_reader_factory);
1973
1974 StartLogger(&pi1_logger, tmp_dir_ + "/relogged1");
1975 StartLogger(&pi2_logger, tmp_dir_ + "/relogged2");
1976
1977 log_reader_factory.Run();
1978 }
1979
1980 reader.Deregister();
1981
1982 // And verify that we can run the LogReader over the relogged files without
1983 // hitting any fatal errors.
1984 {
1985 LogReader relogged_reader(SortParts(MakeLogFiles(
1986 tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 3, 3, true)));
1987 relogged_reader.Register();
1988
1989 relogged_reader.event_loop_factory()->Run();
1990 }
1991 // And confirm that we can read the logged file using the reader's
1992 // configuration.
1993 {
1994 LogReader relogged_reader(
1995 SortParts(MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2",
1996 3, 3, true)),
1997 reader.configuration());
1998 relogged_reader.Register();
1999
2000 relogged_reader.event_loop_factory()->Run();
2001 }
2002}
2003
2004// Tests that we properly populate and extract the logger_start time by setting
2005// up a clock difference between 2 nodes and looking at the resulting parts.
2006TEST_P(MultinodeLoggerTest, LoggerStartTime) {
2007 std::vector<std::string> actual_filenames;
2008 time_converter_.AddMonotonic(
2009 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2010 {
2011 LoggerState pi1_logger = MakeLogger(pi1_);
2012 LoggerState pi2_logger = MakeLogger(pi2_);
2013
2014 StartLogger(&pi1_logger);
2015 StartLogger(&pi2_logger);
2016
2017 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2018
2019 pi1_logger.AppendAllFilenames(&actual_filenames);
2020 pi2_logger.AppendAllFilenames(&actual_filenames);
2021 }
2022
2023 ASSERT_THAT(actual_filenames,
2024 ::testing::UnorderedElementsAreArray(logfiles_));
2025
2026 for (const LogFile &log_file : SortParts(logfiles_)) {
2027 for (const LogParts &log_part : log_file.parts) {
2028 if (log_part.node == log_file.logger_node) {
2029 EXPECT_EQ(log_part.logger_monotonic_start_time,
2030 aos::monotonic_clock::min_time);
2031 EXPECT_EQ(log_part.logger_realtime_start_time,
2032 aos::realtime_clock::min_time);
2033 } else {
2034 const chrono::seconds offset = log_file.logger_node == "pi1"
2035 ? -chrono::seconds(1000)
2036 : chrono::seconds(1000);
2037 EXPECT_EQ(log_part.logger_monotonic_start_time,
2038 log_part.monotonic_start_time + offset);
2039 EXPECT_EQ(log_part.logger_realtime_start_time,
2040 log_file.realtime_start_time +
2041 (log_part.logger_monotonic_start_time -
2042 log_file.monotonic_start_time));
2043 }
2044 }
2045 }
2046}
2047
2048// Test that renaming the base, renames the folder.
2049TEST_P(MultinodeLoggerTest, LoggerRenameFolder) {
2050 util::UnlinkRecursive(tmp_dir_ + "/renamefolder");
2051 util::UnlinkRecursive(tmp_dir_ + "/new-good");
2052 time_converter_.AddMonotonic(
2053 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2054 logfile_base1_ = tmp_dir_ + "/renamefolder/multi_logfile1";
2055 logfile_base2_ = tmp_dir_ + "/renamefolder/multi_logfile2";
2056 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
2057 LoggerState pi1_logger = MakeLogger(pi1_);
2058 LoggerState pi2_logger = MakeLogger(pi2_);
2059
2060 StartLogger(&pi1_logger);
2061 StartLogger(&pi2_logger);
2062
2063 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2064 logfile_base1_ = tmp_dir_ + "/new-good/multi_logfile1";
2065 logfile_base2_ = tmp_dir_ + "/new-good/multi_logfile2";
2066 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
Alexei Strotscaf17d32023-04-03 22:31:11 -07002067
2068 // Sequence of set_base_name and Rotate simulates rename operation. Since
2069 // rename is not supported by all namers, RenameLogBase moved from logger to
2070 // the higher level abstraction, yet log_namers support rename, and it is
2071 // legal to test it here.
2072 pi1_logger.log_namer->set_base_name(logfile_base1_);
2073 pi1_logger.logger->Rotate();
2074 pi2_logger.log_namer->set_base_name(logfile_base2_);
2075 pi2_logger.logger->Rotate();
2076
Naman Guptaa63aa132023-03-22 20:06:34 -07002077 for (auto &file : logfiles_) {
2078 struct stat s;
2079 EXPECT_EQ(0, stat(file.c_str(), &s));
2080 }
2081}
2082
2083// Test that renaming the file base dies.
2084TEST_P(MultinodeLoggerDeathTest, LoggerRenameFile) {
2085 time_converter_.AddMonotonic(
2086 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2087 util::UnlinkRecursive(tmp_dir_ + "/renamefile");
2088 logfile_base1_ = tmp_dir_ + "/renamefile/multi_logfile1";
2089 logfile_base2_ = tmp_dir_ + "/renamefile/multi_logfile2";
2090 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
2091 LoggerState pi1_logger = MakeLogger(pi1_);
2092 StartLogger(&pi1_logger);
2093 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2094 logfile_base1_ = tmp_dir_ + "/new-renamefile/new_multi_logfile1";
Alexei Strotscaf17d32023-04-03 22:31:11 -07002095 EXPECT_DEATH({ pi1_logger.log_namer->set_base_name(logfile_base1_); },
Naman Guptaa63aa132023-03-22 20:06:34 -07002096 "Rename of file base from");
2097}
2098
2099// TODO(austin): We can write a test which recreates a logfile and confirms that
2100// we get it back. That is the ultimate test.
2101
2102// Tests that we properly recreate forwarded timestamps when replaying a log.
2103// This should be enough that we can then re-run the logger and get a valid log
2104// back.
2105TEST_P(MultinodeLoggerTest, RemoteReboot) {
2106 std::vector<std::string> actual_filenames;
2107
2108 const UUID pi1_boot0 = UUID::Random();
2109 const UUID pi2_boot0 = UUID::Random();
2110 const UUID pi2_boot1 = UUID::Random();
2111 {
2112 CHECK_EQ(pi1_index_, 0u);
2113 CHECK_EQ(pi2_index_, 1u);
2114
2115 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2116 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2117 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2118
2119 time_converter_.AddNextTimestamp(
2120 distributed_clock::epoch(),
2121 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2122 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2123 time_converter_.AddNextTimestamp(
2124 distributed_clock::epoch() + reboot_time,
2125 {BootTimestamp::epoch() + reboot_time,
2126 BootTimestamp{
2127 .boot = 1,
2128 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2129 }
2130
2131 {
2132 LoggerState pi1_logger = MakeLogger(pi1_);
2133
2134 event_loop_factory_.RunFor(chrono::milliseconds(95));
2135 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2136 pi1_boot0);
2137 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2138 pi2_boot0);
2139
2140 StartLogger(&pi1_logger);
2141
2142 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2143
2144 VLOG(1) << "Reboot now!";
2145
2146 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2147 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2148 pi1_boot0);
2149 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2150 pi2_boot1);
2151
2152 pi1_logger.AppendAllFilenames(&actual_filenames);
2153 }
2154
2155 std::sort(actual_filenames.begin(), actual_filenames.end());
2156 std::sort(pi1_reboot_logfiles_.begin(), pi1_reboot_logfiles_.end());
2157 ASSERT_THAT(actual_filenames,
2158 ::testing::UnorderedElementsAreArray(pi1_reboot_logfiles_));
2159
2160 // Confirm that our new oldest timestamps properly update as we reboot and
2161 // rotate.
2162 for (const std::string &file : pi1_reboot_logfiles_) {
2163 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2164 ReadHeader(file);
2165 CHECK(log_header);
2166 if (log_header->message().has_configuration()) {
2167 continue;
2168 }
2169
2170 const monotonic_clock::time_point monotonic_start_time =
2171 monotonic_clock::time_point(
2172 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2173 const UUID source_node_boot_uuid = UUID::FromString(
2174 log_header->message().source_node_boot_uuid()->string_view());
2175
2176 if (log_header->message().node()->name()->string_view() != "pi1") {
2177 // The remote message channel should rotate later and have more parts.
2178 // This only is true on the log files with shared remote messages.
2179 //
2180 // TODO(austin): I'm not the most thrilled with this test pattern... It
2181 // feels brittle in a different way.
2182 if (file.find("aos.message_bridge.RemoteMessage") == std::string::npos ||
2183 !shared()) {
2184 switch (log_header->message().parts_index()) {
2185 case 0:
2186 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2187 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2188 break;
2189 case 1:
2190 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2191 ASSERT_EQ(monotonic_start_time,
2192 monotonic_clock::epoch() + chrono::seconds(1));
2193 break;
2194 case 2:
2195 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2196 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2197 break;
2198 case 3:
2199 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2200 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
2201 chrono::nanoseconds(2322999462))
2202 << " on " << file;
2203 break;
2204 default:
2205 FAIL();
2206 break;
2207 }
2208 } else {
2209 switch (log_header->message().parts_index()) {
2210 case 0:
2211 case 1:
2212 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2213 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2214 break;
2215 case 2:
2216 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2217 ASSERT_EQ(monotonic_start_time,
2218 monotonic_clock::epoch() + chrono::seconds(1));
2219 break;
2220 case 3:
2221 case 4:
2222 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2223 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2224 break;
2225 case 5:
2226 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2227 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
2228 chrono::nanoseconds(2322999462))
2229 << " on " << file;
2230 break;
2231 default:
2232 FAIL();
2233 break;
2234 }
2235 }
2236 continue;
2237 }
2238 SCOPED_TRACE(file);
2239 SCOPED_TRACE(aos::FlatbufferToJson(
2240 *log_header, {.multi_line = true, .max_vector_size = 100}));
2241 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2242 ASSERT_EQ(
2243 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2244 EXPECT_EQ(
2245 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2246 monotonic_clock::max_time.time_since_epoch().count());
2247 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2248 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2249 2u);
2250 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2251 monotonic_clock::max_time.time_since_epoch().count());
2252 ASSERT_TRUE(log_header->message()
2253 .has_oldest_remote_unreliable_monotonic_timestamps());
2254 ASSERT_EQ(log_header->message()
2255 .oldest_remote_unreliable_monotonic_timestamps()
2256 ->size(),
2257 2u);
2258 EXPECT_EQ(log_header->message()
2259 .oldest_remote_unreliable_monotonic_timestamps()
2260 ->Get(0),
2261 monotonic_clock::max_time.time_since_epoch().count());
2262 ASSERT_TRUE(log_header->message()
2263 .has_oldest_local_unreliable_monotonic_timestamps());
2264 ASSERT_EQ(log_header->message()
2265 .oldest_local_unreliable_monotonic_timestamps()
2266 ->size(),
2267 2u);
2268 EXPECT_EQ(log_header->message()
2269 .oldest_local_unreliable_monotonic_timestamps()
2270 ->Get(0),
2271 monotonic_clock::max_time.time_since_epoch().count());
2272
2273 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2274 monotonic_clock::time_point(chrono::nanoseconds(
2275 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2276 1)));
2277 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2278 monotonic_clock::time_point(chrono::nanoseconds(
2279 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2280 const monotonic_clock::time_point
2281 oldest_remote_unreliable_monotonic_timestamps =
2282 monotonic_clock::time_point(chrono::nanoseconds(
2283 log_header->message()
2284 .oldest_remote_unreliable_monotonic_timestamps()
2285 ->Get(1)));
2286 const monotonic_clock::time_point
2287 oldest_local_unreliable_monotonic_timestamps =
2288 monotonic_clock::time_point(chrono::nanoseconds(
2289 log_header->message()
2290 .oldest_local_unreliable_monotonic_timestamps()
2291 ->Get(1)));
2292 const monotonic_clock::time_point
2293 oldest_remote_reliable_monotonic_timestamps =
2294 monotonic_clock::time_point(chrono::nanoseconds(
2295 log_header->message()
2296 .oldest_remote_reliable_monotonic_timestamps()
2297 ->Get(1)));
2298 const monotonic_clock::time_point
2299 oldest_local_reliable_monotonic_timestamps =
2300 monotonic_clock::time_point(chrono::nanoseconds(
2301 log_header->message()
2302 .oldest_local_reliable_monotonic_timestamps()
2303 ->Get(1)));
2304 const monotonic_clock::time_point
2305 oldest_logger_remote_unreliable_monotonic_timestamps =
2306 monotonic_clock::time_point(chrono::nanoseconds(
2307 log_header->message()
2308 .oldest_logger_remote_unreliable_monotonic_timestamps()
2309 ->Get(0)));
2310 const monotonic_clock::time_point
2311 oldest_logger_local_unreliable_monotonic_timestamps =
2312 monotonic_clock::time_point(chrono::nanoseconds(
2313 log_header->message()
2314 .oldest_logger_local_unreliable_monotonic_timestamps()
2315 ->Get(0)));
2316 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2317 monotonic_clock::max_time);
2318 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2319 monotonic_clock::max_time);
2320 switch (log_header->message().parts_index()) {
2321 case 0:
2322 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2323 monotonic_clock::max_time);
2324 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2325 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2326 monotonic_clock::max_time);
2327 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2328 monotonic_clock::max_time);
2329 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2330 monotonic_clock::max_time);
2331 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2332 monotonic_clock::max_time);
2333 break;
2334 case 1:
2335 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2336 monotonic_clock::time_point(chrono::microseconds(90200)));
2337 EXPECT_EQ(oldest_local_monotonic_timestamps,
2338 monotonic_clock::time_point(chrono::microseconds(90350)));
2339 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2340 monotonic_clock::time_point(chrono::microseconds(90200)));
2341 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2342 monotonic_clock::time_point(chrono::microseconds(90350)));
2343 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2344 monotonic_clock::max_time);
2345 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2346 monotonic_clock::max_time);
2347 break;
2348 case 2:
2349 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2350 monotonic_clock::time_point(chrono::microseconds(90200)))
2351 << file;
2352 EXPECT_EQ(oldest_local_monotonic_timestamps,
2353 monotonic_clock::time_point(chrono::microseconds(90350)))
2354 << file;
2355 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2356 monotonic_clock::time_point(chrono::microseconds(90200)))
2357 << file;
2358 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2359 monotonic_clock::time_point(chrono::microseconds(90350)))
2360 << file;
2361 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2362 monotonic_clock::time_point(chrono::microseconds(100000)))
2363 << file;
2364 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2365 monotonic_clock::time_point(chrono::microseconds(100150)))
2366 << file;
2367 break;
2368 case 3:
2369 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2370 monotonic_clock::time_point(chrono::milliseconds(1323) +
2371 chrono::microseconds(200)));
2372 EXPECT_EQ(oldest_local_monotonic_timestamps,
2373 monotonic_clock::time_point(chrono::microseconds(10100350)));
2374 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2375 monotonic_clock::time_point(chrono::milliseconds(1323) +
2376 chrono::microseconds(200)));
2377 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2378 monotonic_clock::time_point(chrono::microseconds(10100350)));
2379 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2380 monotonic_clock::max_time)
2381 << file;
2382 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2383 monotonic_clock::max_time)
2384 << file;
2385 break;
2386 case 4:
2387 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2388 monotonic_clock::time_point(chrono::milliseconds(1323) +
2389 chrono::microseconds(200)));
2390 EXPECT_EQ(oldest_local_monotonic_timestamps,
2391 monotonic_clock::time_point(chrono::microseconds(10100350)));
2392 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2393 monotonic_clock::time_point(chrono::milliseconds(1323) +
2394 chrono::microseconds(200)));
2395 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2396 monotonic_clock::time_point(chrono::microseconds(10100350)));
2397 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2398 monotonic_clock::time_point(chrono::microseconds(1423000)))
2399 << file;
2400 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2401 monotonic_clock::time_point(chrono::microseconds(10200150)))
2402 << file;
2403 break;
2404 default:
2405 FAIL();
2406 break;
2407 }
2408 }
2409
2410 // Confirm that we refuse to replay logs with missing boot uuids.
2411 {
2412 LogReader reader(SortParts(pi1_reboot_logfiles_));
2413
2414 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2415 log_reader_factory.set_send_delay(chrono::microseconds(0));
2416
2417 // This sends out the fetched messages and advances time to the start of
2418 // the log file.
2419 reader.Register(&log_reader_factory);
2420
2421 log_reader_factory.Run();
2422
2423 reader.Deregister();
2424 }
2425}
2426
2427// Tests that we can sort a log which only has timestamps from the remote
2428// because the local message_bridge_client failed to connect.
2429TEST_P(MultinodeLoggerTest, RemoteRebootOnlyTimestamps) {
2430 const UUID pi1_boot0 = UUID::Random();
2431 const UUID pi2_boot0 = UUID::Random();
2432 const UUID pi2_boot1 = UUID::Random();
2433 {
2434 CHECK_EQ(pi1_index_, 0u);
2435 CHECK_EQ(pi2_index_, 1u);
2436
2437 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2438 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2439 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2440
2441 time_converter_.AddNextTimestamp(
2442 distributed_clock::epoch(),
2443 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2444 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2445 time_converter_.AddNextTimestamp(
2446 distributed_clock::epoch() + reboot_time,
2447 {BootTimestamp::epoch() + reboot_time,
2448 BootTimestamp{
2449 .boot = 1,
2450 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2451 }
2452 pi2_->Disconnect(pi1_->node());
2453
2454 std::vector<std::string> filenames;
2455 {
2456 LoggerState pi1_logger = MakeLogger(pi1_);
2457
2458 event_loop_factory_.RunFor(chrono::milliseconds(95));
2459 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2460 pi1_boot0);
2461 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2462 pi2_boot0);
2463
2464 StartLogger(&pi1_logger);
2465
2466 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2467
2468 VLOG(1) << "Reboot now!";
2469
2470 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2471 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2472 pi1_boot0);
2473 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2474 pi2_boot1);
2475 pi1_logger.AppendAllFilenames(&filenames);
2476 }
2477
2478 std::sort(filenames.begin(), filenames.end());
2479
2480 // Confirm that our new oldest timestamps properly update as we reboot and
2481 // rotate.
2482 size_t timestamp_file_count = 0;
2483 for (const std::string &file : filenames) {
2484 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2485 ReadHeader(file);
2486 CHECK(log_header);
2487
2488 if (log_header->message().has_configuration()) {
2489 continue;
2490 }
2491
2492 const monotonic_clock::time_point monotonic_start_time =
2493 monotonic_clock::time_point(
2494 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2495 const UUID source_node_boot_uuid = UUID::FromString(
2496 log_header->message().source_node_boot_uuid()->string_view());
2497
2498 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2499 ASSERT_EQ(
2500 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2501 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2502 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2503 2u);
2504 ASSERT_TRUE(log_header->message()
2505 .has_oldest_remote_unreliable_monotonic_timestamps());
2506 ASSERT_EQ(log_header->message()
2507 .oldest_remote_unreliable_monotonic_timestamps()
2508 ->size(),
2509 2u);
2510 ASSERT_TRUE(log_header->message()
2511 .has_oldest_local_unreliable_monotonic_timestamps());
2512 ASSERT_EQ(log_header->message()
2513 .oldest_local_unreliable_monotonic_timestamps()
2514 ->size(),
2515 2u);
2516 ASSERT_TRUE(log_header->message()
2517 .has_oldest_remote_reliable_monotonic_timestamps());
2518 ASSERT_EQ(log_header->message()
2519 .oldest_remote_reliable_monotonic_timestamps()
2520 ->size(),
2521 2u);
2522 ASSERT_TRUE(
2523 log_header->message().has_oldest_local_reliable_monotonic_timestamps());
2524 ASSERT_EQ(log_header->message()
2525 .oldest_local_reliable_monotonic_timestamps()
2526 ->size(),
2527 2u);
2528
2529 ASSERT_TRUE(
2530 log_header->message()
2531 .has_oldest_logger_remote_unreliable_monotonic_timestamps());
2532 ASSERT_EQ(log_header->message()
2533 .oldest_logger_remote_unreliable_monotonic_timestamps()
2534 ->size(),
2535 2u);
2536 ASSERT_TRUE(log_header->message()
2537 .has_oldest_logger_local_unreliable_monotonic_timestamps());
2538 ASSERT_EQ(log_header->message()
2539 .oldest_logger_local_unreliable_monotonic_timestamps()
2540 ->size(),
2541 2u);
2542
2543 if (log_header->message().node()->name()->string_view() != "pi1") {
2544 ASSERT_TRUE(file.find("aos.message_bridge.RemoteMessage") !=
2545 std::string::npos);
2546
2547 const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
2548 ReadNthMessage(file, 0);
2549 CHECK(msg);
2550
2551 EXPECT_TRUE(msg->message().has_monotonic_sent_time());
2552 EXPECT_TRUE(msg->message().has_monotonic_remote_time());
2553
2554 const monotonic_clock::time_point
2555 expected_oldest_local_monotonic_timestamps(
2556 chrono::nanoseconds(msg->message().monotonic_sent_time()));
2557 const monotonic_clock::time_point
2558 expected_oldest_remote_monotonic_timestamps(
2559 chrono::nanoseconds(msg->message().monotonic_remote_time()));
2560 const monotonic_clock::time_point
2561 expected_oldest_timestamp_monotonic_timestamps(
2562 chrono::nanoseconds(msg->message().monotonic_timestamp_time()));
2563
2564 EXPECT_NE(expected_oldest_local_monotonic_timestamps,
2565 monotonic_clock::min_time);
2566 EXPECT_NE(expected_oldest_remote_monotonic_timestamps,
2567 monotonic_clock::min_time);
2568 EXPECT_NE(expected_oldest_timestamp_monotonic_timestamps,
2569 monotonic_clock::min_time);
2570
2571 ++timestamp_file_count;
2572 // Since the log file is from the perspective of the other node,
2573 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2574 monotonic_clock::time_point(chrono::nanoseconds(
2575 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2576 0)));
2577 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2578 monotonic_clock::time_point(chrono::nanoseconds(
2579 log_header->message().oldest_local_monotonic_timestamps()->Get(
2580 0)));
2581 const monotonic_clock::time_point
2582 oldest_remote_unreliable_monotonic_timestamps =
2583 monotonic_clock::time_point(chrono::nanoseconds(
2584 log_header->message()
2585 .oldest_remote_unreliable_monotonic_timestamps()
2586 ->Get(0)));
2587 const monotonic_clock::time_point
2588 oldest_local_unreliable_monotonic_timestamps =
2589 monotonic_clock::time_point(chrono::nanoseconds(
2590 log_header->message()
2591 .oldest_local_unreliable_monotonic_timestamps()
2592 ->Get(0)));
2593 const monotonic_clock::time_point
2594 oldest_remote_reliable_monotonic_timestamps =
2595 monotonic_clock::time_point(chrono::nanoseconds(
2596 log_header->message()
2597 .oldest_remote_reliable_monotonic_timestamps()
2598 ->Get(0)));
2599 const monotonic_clock::time_point
2600 oldest_local_reliable_monotonic_timestamps =
2601 monotonic_clock::time_point(chrono::nanoseconds(
2602 log_header->message()
2603 .oldest_local_reliable_monotonic_timestamps()
2604 ->Get(0)));
2605 const monotonic_clock::time_point
2606 oldest_logger_remote_unreliable_monotonic_timestamps =
2607 monotonic_clock::time_point(chrono::nanoseconds(
2608 log_header->message()
2609 .oldest_logger_remote_unreliable_monotonic_timestamps()
2610 ->Get(1)));
2611 const monotonic_clock::time_point
2612 oldest_logger_local_unreliable_monotonic_timestamps =
2613 monotonic_clock::time_point(chrono::nanoseconds(
2614 log_header->message()
2615 .oldest_logger_local_unreliable_monotonic_timestamps()
2616 ->Get(1)));
2617
2618 const Channel *channel =
2619 event_loop_factory_.configuration()->channels()->Get(
2620 msg->message().channel_index());
2621 const Connection *connection = configuration::ConnectionToNode(
2622 channel, configuration::GetNode(
2623 event_loop_factory_.configuration(),
2624 log_header->message().node()->name()->string_view()));
2625
2626 const bool reliable = connection->time_to_live() == 0;
2627
2628 SCOPED_TRACE(file);
2629 SCOPED_TRACE(aos::FlatbufferToJson(
2630 *log_header, {.multi_line = true, .max_vector_size = 100}));
2631
2632 if (shared()) {
2633 // Confirm that the oldest timestamps match what we expect. Based on
2634 // what we are doing, we know that the oldest time is the first
2635 // message's time.
2636 //
2637 // This makes the test robust to both the split and combined config
2638 // tests.
2639 switch (log_header->message().parts_index()) {
2640 case 0:
2641 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2642 expected_oldest_remote_monotonic_timestamps);
2643 EXPECT_EQ(oldest_local_monotonic_timestamps,
2644 expected_oldest_local_monotonic_timestamps);
2645 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2646 expected_oldest_local_monotonic_timestamps)
2647 << file;
2648 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2649 expected_oldest_timestamp_monotonic_timestamps)
2650 << file;
2651
2652 if (reliable) {
2653 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2654 expected_oldest_remote_monotonic_timestamps);
2655 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2656 expected_oldest_local_monotonic_timestamps);
2657 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2658 monotonic_clock::max_time);
2659 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2660 monotonic_clock::max_time);
2661 } else {
2662 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2663 monotonic_clock::max_time);
2664 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2665 monotonic_clock::max_time);
2666 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2667 expected_oldest_remote_monotonic_timestamps);
2668 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2669 expected_oldest_local_monotonic_timestamps);
2670 }
2671 break;
2672 case 1:
2673 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2674 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2675 EXPECT_EQ(oldest_local_monotonic_timestamps,
2676 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2677 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2678 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2679 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2680 monotonic_clock::epoch() + chrono::nanoseconds(90250000));
2681 if (reliable) {
2682 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2683 expected_oldest_remote_monotonic_timestamps);
2684 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2685 expected_oldest_local_monotonic_timestamps);
2686 EXPECT_EQ(
2687 oldest_remote_unreliable_monotonic_timestamps,
2688 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2689 EXPECT_EQ(
2690 oldest_local_unreliable_monotonic_timestamps,
2691 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2692 } else {
2693 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2694 monotonic_clock::max_time);
2695 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2696 monotonic_clock::max_time);
2697 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2698 expected_oldest_remote_monotonic_timestamps);
2699 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2700 expected_oldest_local_monotonic_timestamps);
2701 }
2702 break;
2703 case 2:
2704 EXPECT_EQ(
2705 oldest_remote_monotonic_timestamps,
2706 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2707 EXPECT_EQ(
2708 oldest_local_monotonic_timestamps,
2709 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2710 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2711 expected_oldest_local_monotonic_timestamps)
2712 << file;
2713 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2714 expected_oldest_timestamp_monotonic_timestamps)
2715 << file;
2716 if (reliable) {
2717 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2718 expected_oldest_remote_monotonic_timestamps);
2719 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2720 expected_oldest_local_monotonic_timestamps);
2721 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2722 monotonic_clock::max_time);
2723 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2724 monotonic_clock::max_time);
2725 } else {
2726 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2727 monotonic_clock::max_time);
2728 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2729 monotonic_clock::max_time);
2730 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2731 expected_oldest_remote_monotonic_timestamps);
2732 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2733 expected_oldest_local_monotonic_timestamps);
2734 }
2735 break;
2736
2737 case 3:
2738 EXPECT_EQ(
2739 oldest_remote_monotonic_timestamps,
2740 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2741 EXPECT_EQ(
2742 oldest_local_monotonic_timestamps,
2743 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2744 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2745 expected_oldest_remote_monotonic_timestamps);
2746 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2747 expected_oldest_local_monotonic_timestamps);
2748 EXPECT_EQ(
2749 oldest_logger_remote_unreliable_monotonic_timestamps,
2750 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2751 EXPECT_EQ(
2752 oldest_logger_local_unreliable_monotonic_timestamps,
2753 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2754 break;
2755 default:
2756 FAIL();
2757 break;
2758 }
2759
2760 switch (log_header->message().parts_index()) {
2761 case 0:
2762 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2763 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2764 break;
2765 case 1:
2766 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2767 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2768 break;
2769 case 2:
2770 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2771 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2772 break;
2773 case 3:
2774 if (shared()) {
2775 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2776 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2777 break;
2778 }
2779 [[fallthrough]];
2780 default:
2781 FAIL();
2782 break;
2783 }
2784 } else {
2785 switch (log_header->message().parts_index()) {
2786 case 0:
2787 if (reliable) {
2788 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2789 monotonic_clock::max_time);
2790 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2791 monotonic_clock::max_time);
2792 EXPECT_EQ(
2793 oldest_logger_remote_unreliable_monotonic_timestamps,
2794 monotonic_clock::epoch() + chrono::nanoseconds(100150000))
2795 << file;
2796 EXPECT_EQ(
2797 oldest_logger_local_unreliable_monotonic_timestamps,
2798 monotonic_clock::epoch() + chrono::nanoseconds(100250000))
2799 << file;
2800 } else {
2801 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2802 expected_oldest_remote_monotonic_timestamps);
2803 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2804 expected_oldest_local_monotonic_timestamps);
2805 EXPECT_EQ(
2806 oldest_logger_remote_unreliable_monotonic_timestamps,
2807 monotonic_clock::epoch() + chrono::nanoseconds(90150000))
2808 << file;
2809 EXPECT_EQ(
2810 oldest_logger_local_unreliable_monotonic_timestamps,
2811 monotonic_clock::epoch() + chrono::nanoseconds(90250000))
2812 << file;
2813 }
2814 break;
2815 case 1:
2816 if (reliable) {
2817 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2818 monotonic_clock::max_time);
2819 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2820 monotonic_clock::max_time);
2821 EXPECT_EQ(
2822 oldest_logger_remote_unreliable_monotonic_timestamps,
2823 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2824 EXPECT_EQ(
2825 oldest_logger_local_unreliable_monotonic_timestamps,
2826 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2827 } else {
2828 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2829 expected_oldest_remote_monotonic_timestamps);
2830 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2831 expected_oldest_local_monotonic_timestamps);
2832 EXPECT_EQ(
2833 oldest_logger_remote_unreliable_monotonic_timestamps,
2834 monotonic_clock::epoch() + chrono::nanoseconds(1323150000));
2835 EXPECT_EQ(
2836 oldest_logger_local_unreliable_monotonic_timestamps,
2837 monotonic_clock::epoch() + chrono::nanoseconds(10100250000));
2838 }
2839 break;
2840 default:
2841 FAIL();
2842 break;
2843 }
2844
2845 switch (log_header->message().parts_index()) {
2846 case 0:
2847 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2848 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2849 break;
2850 case 1:
2851 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2852 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2853 break;
2854 default:
2855 FAIL();
2856 break;
2857 }
2858 }
2859
2860 continue;
2861 }
2862 EXPECT_EQ(
2863 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2864 monotonic_clock::max_time.time_since_epoch().count());
2865 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2866 monotonic_clock::max_time.time_since_epoch().count());
2867 EXPECT_EQ(log_header->message()
2868 .oldest_remote_unreliable_monotonic_timestamps()
2869 ->Get(0),
2870 monotonic_clock::max_time.time_since_epoch().count());
2871 EXPECT_EQ(log_header->message()
2872 .oldest_local_unreliable_monotonic_timestamps()
2873 ->Get(0),
2874 monotonic_clock::max_time.time_since_epoch().count());
2875
2876 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2877 monotonic_clock::time_point(chrono::nanoseconds(
2878 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2879 1)));
2880 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2881 monotonic_clock::time_point(chrono::nanoseconds(
2882 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2883 const monotonic_clock::time_point
2884 oldest_remote_unreliable_monotonic_timestamps =
2885 monotonic_clock::time_point(chrono::nanoseconds(
2886 log_header->message()
2887 .oldest_remote_unreliable_monotonic_timestamps()
2888 ->Get(1)));
2889 const monotonic_clock::time_point
2890 oldest_local_unreliable_monotonic_timestamps =
2891 monotonic_clock::time_point(chrono::nanoseconds(
2892 log_header->message()
2893 .oldest_local_unreliable_monotonic_timestamps()
2894 ->Get(1)));
2895 switch (log_header->message().parts_index()) {
2896 case 0:
2897 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2898 monotonic_clock::max_time);
2899 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2900 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2901 monotonic_clock::max_time);
2902 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2903 monotonic_clock::max_time);
2904 break;
2905 default:
2906 FAIL();
2907 break;
2908 }
2909 }
2910
2911 if (shared()) {
2912 EXPECT_EQ(timestamp_file_count, 4u);
2913 } else {
2914 EXPECT_EQ(timestamp_file_count, 4u);
2915 }
2916
2917 // Confirm that we can actually sort the resulting log and read it.
2918 {
2919 LogReader reader(SortParts(filenames));
2920
2921 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2922 log_reader_factory.set_send_delay(chrono::microseconds(0));
2923
2924 // This sends out the fetched messages and advances time to the start of
2925 // the log file.
2926 reader.Register(&log_reader_factory);
2927
2928 log_reader_factory.Run();
2929
2930 reader.Deregister();
2931 }
2932}
2933
2934// Tests that we properly handle one direction of message_bridge being
2935// unavailable.
2936TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
2937 pi1_->Disconnect(pi2_->node());
2938 time_converter_.AddMonotonic(
2939 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2940
2941 time_converter_.AddMonotonic(
2942 {chrono::milliseconds(10000),
2943 chrono::milliseconds(10000) - chrono::milliseconds(1)});
2944 {
2945 LoggerState pi1_logger = MakeLogger(pi1_);
2946
2947 event_loop_factory_.RunFor(chrono::milliseconds(95));
2948
2949 StartLogger(&pi1_logger);
2950
2951 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2952 }
2953
2954 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2955 // to confirm the right thing happened.
2956 ConfirmReadable(pi1_single_direction_logfiles_);
2957}
2958
2959// Tests that we properly handle one direction of message_bridge being
2960// unavailable.
2961TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
2962 pi1_->Disconnect(pi2_->node());
2963 time_converter_.AddMonotonic(
2964 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(500)});
2965
2966 time_converter_.AddMonotonic(
2967 {chrono::milliseconds(10000),
2968 chrono::milliseconds(10000) + chrono::milliseconds(1)});
2969 {
2970 LoggerState pi1_logger = MakeLogger(pi1_);
2971
2972 event_loop_factory_.RunFor(chrono::milliseconds(95));
2973
2974 StartLogger(&pi1_logger);
2975
2976 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2977 }
2978
2979 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2980 // to confirm the right thing happened.
2981 ConfirmReadable(pi1_single_direction_logfiles_);
2982}
2983
2984// Tests that we explode if someone passes in a part file twice with a better
2985// error than an out of order error.
2986TEST_P(MultinodeLoggerTest, DuplicateLogFiles) {
2987 time_converter_.AddMonotonic(
2988 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2989 {
2990 LoggerState pi1_logger = MakeLogger(pi1_);
2991
2992 event_loop_factory_.RunFor(chrono::milliseconds(95));
2993
2994 StartLogger(&pi1_logger);
2995
2996 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2997 }
2998
2999 std::vector<std::string> duplicates;
3000 for (const std::string &f : pi1_single_direction_logfiles_) {
3001 duplicates.emplace_back(f);
3002 duplicates.emplace_back(f);
3003 }
3004 EXPECT_DEATH({ SortParts(duplicates); }, "Found duplicate parts in");
3005}
3006
3007// Tests that we explode if someone loses a part out of the middle of a log.
3008TEST_P(MultinodeLoggerTest, MissingPartsFromMiddle) {
3009 time_converter_.AddMonotonic(
3010 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
3011 {
3012 LoggerState pi1_logger = MakeLogger(pi1_);
3013
3014 event_loop_factory_.RunFor(chrono::milliseconds(95));
3015
3016 StartLogger(&pi1_logger);
3017 aos::monotonic_clock::time_point last_rotation_time =
3018 pi1_logger.event_loop->monotonic_now();
3019 pi1_logger.logger->set_on_logged_period([&] {
3020 const auto now = pi1_logger.event_loop->monotonic_now();
3021 if (now > last_rotation_time + std::chrono::seconds(5)) {
3022 pi1_logger.logger->Rotate();
3023 last_rotation_time = now;
3024 }
3025 });
3026
3027 event_loop_factory_.RunFor(chrono::milliseconds(10000));
3028 }
3029
3030 std::vector<std::string> missing_parts;
3031
3032 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
3033 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part2" + Extension());
3034 missing_parts.emplace_back(absl::StrCat(
3035 logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
3036
3037 EXPECT_DEATH({ SortParts(missing_parts); },
3038 "Broken log, missing part files between");
3039}
3040
3041// Tests that we properly handle a dead node. Do this by just disconnecting it
3042// and only using one nodes of logs.
3043TEST_P(MultinodeLoggerTest, DeadNode) {
3044 pi1_->Disconnect(pi2_->node());
3045 pi2_->Disconnect(pi1_->node());
3046 time_converter_.AddMonotonic(
3047 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
3048 {
3049 LoggerState pi1_logger = MakeLogger(pi1_);
3050
3051 event_loop_factory_.RunFor(chrono::milliseconds(95));
3052
3053 StartLogger(&pi1_logger);
3054
3055 event_loop_factory_.RunFor(chrono::milliseconds(10000));
3056 }
3057
3058 // Confirm that we can parse the result. LogReader has enough internal CHECKs
3059 // to confirm the right thing happened.
3060 ConfirmReadable(MakePi1DeadNodeLogfiles());
3061}
3062
3063// Tests that we can relog with a different config. This makes most sense when
3064// you are trying to edit a log and want to use channel renaming + the original
3065// config in the new log.
3066TEST_P(MultinodeLoggerTest, LogDifferentConfig) {
3067 time_converter_.StartEqual();
3068 {
3069 LoggerState pi1_logger = MakeLogger(pi1_);
3070 LoggerState pi2_logger = MakeLogger(pi2_);
3071
3072 event_loop_factory_.RunFor(chrono::milliseconds(95));
3073
3074 StartLogger(&pi1_logger);
3075 StartLogger(&pi2_logger);
3076
3077 event_loop_factory_.RunFor(chrono::milliseconds(20000));
3078 }
3079
3080 LogReader reader(SortParts(logfiles_));
3081 reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
3082
3083 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
3084 log_reader_factory.set_send_delay(chrono::microseconds(0));
3085
3086 // This sends out the fetched messages and advances time to the start of the
3087 // log file.
3088 reader.Register(&log_reader_factory);
3089
3090 const Node *pi1 =
3091 configuration::GetNode(log_reader_factory.configuration(), "pi1");
3092 const Node *pi2 =
3093 configuration::GetNode(log_reader_factory.configuration(), "pi2");
3094
3095 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
3096 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
3097 LOG(INFO) << "now pi1 "
3098 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
3099 LOG(INFO) << "now pi2 "
3100 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
3101
3102 EXPECT_THAT(reader.LoggedNodes(),
3103 ::testing::ElementsAre(
3104 configuration::GetNode(reader.logged_configuration(), pi1),
3105 configuration::GetNode(reader.logged_configuration(), pi2)));
3106
3107 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
3108
3109 // And confirm we can re-create a log again, while checking the contents.
3110 std::vector<std::string> log_files;
3111 {
3112 LoggerState pi1_logger =
3113 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
3114 &log_reader_factory, reader.logged_configuration());
3115 LoggerState pi2_logger =
3116 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
3117 &log_reader_factory, reader.logged_configuration());
3118
3119 pi1_logger.StartLogger(tmp_dir_ + "/relogged1");
3120 pi2_logger.StartLogger(tmp_dir_ + "/relogged2");
3121
3122 log_reader_factory.Run();
3123
3124 for (auto &x : pi1_logger.log_namer->all_filenames()) {
3125 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged1_", x));
3126 }
3127 for (auto &x : pi2_logger.log_namer->all_filenames()) {
3128 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged2_", x));
3129 }
3130 }
3131
3132 reader.Deregister();
3133
3134 // And verify that we can run the LogReader over the relogged files without
3135 // hitting any fatal errors.
3136 {
3137 LogReader relogged_reader(SortParts(log_files));
3138 relogged_reader.Register();
3139
3140 relogged_reader.event_loop_factory()->Run();
3141 }
3142}
3143
3144// Tests that we properly replay a log where the start time for a node is before
3145// any data on the node. This can happen if the logger starts before data is
3146// published. While the scenario below is a bit convoluted, we have seen logs
3147// like this generated out in the wild.
3148TEST(MultinodeRebootLoggerTest, StartTimeBeforeData) {
3149 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3150 aos::configuration::ReadConfig(ArtifactPath(
3151 "aos/events/logging/multinode_pingpong_split3_config.json"));
3152 message_bridge::TestingTimeConverter time_converter(
3153 configuration::NodesCount(&config.message()));
3154 SimulatedEventLoopFactory event_loop_factory(&config.message());
3155 event_loop_factory.SetTimeConverter(&time_converter);
3156 NodeEventLoopFactory *const pi1 =
3157 event_loop_factory.GetNodeEventLoopFactory("pi1");
3158 const size_t pi1_index = configuration::GetNodeIndex(
3159 event_loop_factory.configuration(), pi1->node());
3160 NodeEventLoopFactory *const pi2 =
3161 event_loop_factory.GetNodeEventLoopFactory("pi2");
3162 const size_t pi2_index = configuration::GetNodeIndex(
3163 event_loop_factory.configuration(), pi2->node());
3164 NodeEventLoopFactory *const pi3 =
3165 event_loop_factory.GetNodeEventLoopFactory("pi3");
3166 const size_t pi3_index = configuration::GetNodeIndex(
3167 event_loop_factory.configuration(), pi3->node());
3168
3169 const std::string kLogfile1_1 =
3170 aos::testing::TestTmpDir() + "/multi_logfile1/";
3171 const std::string kLogfile2_1 =
3172 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3173 const std::string kLogfile2_2 =
3174 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3175 const std::string kLogfile3_1 =
3176 aos::testing::TestTmpDir() + "/multi_logfile3/";
3177 util::UnlinkRecursive(kLogfile1_1);
3178 util::UnlinkRecursive(kLogfile2_1);
3179 util::UnlinkRecursive(kLogfile2_2);
3180 util::UnlinkRecursive(kLogfile3_1);
3181 const UUID pi1_boot0 = UUID::Random();
3182 const UUID pi2_boot0 = UUID::Random();
3183 const UUID pi2_boot1 = UUID::Random();
3184 const UUID pi3_boot0 = UUID::Random();
3185 {
3186 CHECK_EQ(pi1_index, 0u);
3187 CHECK_EQ(pi2_index, 1u);
3188 CHECK_EQ(pi3_index, 2u);
3189
3190 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3191 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3192 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3193 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3194
3195 time_converter.AddNextTimestamp(
3196 distributed_clock::epoch(),
3197 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3198 BootTimestamp::epoch()});
3199 const chrono::nanoseconds reboot_time = chrono::milliseconds(20000);
3200 time_converter.AddNextTimestamp(
3201 distributed_clock::epoch() + reboot_time,
3202 {BootTimestamp::epoch() + reboot_time,
3203 BootTimestamp{
3204 .boot = 1,
3205 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)},
3206 BootTimestamp::epoch() + reboot_time});
3207 }
3208
3209 // Make everything perfectly quiet.
3210 event_loop_factory.SkipTimingReport();
3211 event_loop_factory.DisableStatistics();
3212
3213 std::vector<std::string> filenames;
3214 {
3215 LoggerState pi1_logger = MakeLoggerState(
3216 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3217 LoggerState pi3_logger = MakeLoggerState(
3218 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3219 {
3220 // And now start the logger.
3221 LoggerState pi2_logger = MakeLoggerState(
3222 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3223
3224 event_loop_factory.RunFor(chrono::milliseconds(1000));
3225
3226 pi1_logger.StartLogger(kLogfile1_1);
3227 pi3_logger.StartLogger(kLogfile3_1);
3228 pi2_logger.StartLogger(kLogfile2_1);
3229
3230 event_loop_factory.RunFor(chrono::milliseconds(10000));
3231
3232 // Now that we've got a start time in the past, turn on data.
3233 event_loop_factory.EnableStatistics();
3234 std::unique_ptr<aos::EventLoop> ping_event_loop =
3235 pi1->MakeEventLoop("ping");
3236 Ping ping(ping_event_loop.get());
3237
3238 pi2->AlwaysStart<Pong>("pong");
3239
3240 event_loop_factory.RunFor(chrono::milliseconds(3000));
3241
3242 pi2_logger.AppendAllFilenames(&filenames);
3243
3244 // Stop logging on pi2 before rebooting and completely shut off all
3245 // messages on pi2.
3246 pi2->DisableStatistics();
3247 pi1->Disconnect(pi2->node());
3248 pi2->Disconnect(pi1->node());
3249 }
3250 event_loop_factory.RunFor(chrono::milliseconds(7000));
3251 // pi2 now reboots.
3252 {
3253 event_loop_factory.RunFor(chrono::milliseconds(1000));
3254
3255 // Start logging again on pi2 after it is up.
3256 LoggerState pi2_logger = MakeLoggerState(
3257 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3258 pi2_logger.StartLogger(kLogfile2_2);
3259
3260 event_loop_factory.RunFor(chrono::milliseconds(10000));
3261 // And, now that we have a start time in the log, turn data back on.
3262 pi2->EnableStatistics();
3263 pi1->Connect(pi2->node());
3264 pi2->Connect(pi1->node());
3265
3266 pi2->AlwaysStart<Pong>("pong");
3267 std::unique_ptr<aos::EventLoop> ping_event_loop =
3268 pi1->MakeEventLoop("ping");
3269 Ping ping(ping_event_loop.get());
3270
3271 event_loop_factory.RunFor(chrono::milliseconds(3000));
3272
3273 pi2_logger.AppendAllFilenames(&filenames);
3274 }
3275
3276 pi1_logger.AppendAllFilenames(&filenames);
3277 pi3_logger.AppendAllFilenames(&filenames);
3278 }
3279
3280 // Confirm that we can parse the result. LogReader has enough internal CHECKs
3281 // to confirm the right thing happened.
3282 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3283 auto result = ConfirmReadable(filenames);
3284 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
3285 chrono::seconds(1)));
3286 EXPECT_THAT(result[0].second,
3287 ::testing::ElementsAre(realtime_clock::epoch() +
3288 chrono::microseconds(34990350)));
3289
3290 EXPECT_THAT(result[1].first,
3291 ::testing::ElementsAre(
3292 realtime_clock::epoch() + chrono::seconds(1),
3293 realtime_clock::epoch() + chrono::microseconds(3323000)));
3294 EXPECT_THAT(result[1].second,
3295 ::testing::ElementsAre(
3296 realtime_clock::epoch() + chrono::microseconds(13990200),
3297 realtime_clock::epoch() + chrono::microseconds(16313200)));
3298
3299 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
3300 chrono::seconds(1)));
3301 EXPECT_THAT(result[2].second,
3302 ::testing::ElementsAre(realtime_clock::epoch() +
3303 chrono::microseconds(34900150)));
3304}
3305
3306// Tests that local data before remote data after reboot is properly replayed.
3307// We only trigger a reboot in the timestamp interpolation function when solving
3308// the timestamp problem when we actually have a point in the function. This
3309// originally only happened when a point passes the noncausal filter. At the
3310// start of time for the second boot, if we aren't careful, we will have
3311// messages which need to be published at times before the boot. This happens
3312// when a local message is in the log before a forwarded message, so there is no
3313// point in the interpolation function. This delays the reboot. So, we need to
3314// recreate that situation and make sure it doesn't come back.
3315TEST(MultinodeRebootLoggerTest,
3316 LocalMessageBeforeRemoteBeforeStartAfterReboot) {
3317 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3318 aos::configuration::ReadConfig(ArtifactPath(
3319 "aos/events/logging/multinode_pingpong_split3_config.json"));
3320 message_bridge::TestingTimeConverter time_converter(
3321 configuration::NodesCount(&config.message()));
3322 SimulatedEventLoopFactory event_loop_factory(&config.message());
3323 event_loop_factory.SetTimeConverter(&time_converter);
3324 NodeEventLoopFactory *const pi1 =
3325 event_loop_factory.GetNodeEventLoopFactory("pi1");
3326 const size_t pi1_index = configuration::GetNodeIndex(
3327 event_loop_factory.configuration(), pi1->node());
3328 NodeEventLoopFactory *const pi2 =
3329 event_loop_factory.GetNodeEventLoopFactory("pi2");
3330 const size_t pi2_index = configuration::GetNodeIndex(
3331 event_loop_factory.configuration(), pi2->node());
3332 NodeEventLoopFactory *const pi3 =
3333 event_loop_factory.GetNodeEventLoopFactory("pi3");
3334 const size_t pi3_index = configuration::GetNodeIndex(
3335 event_loop_factory.configuration(), pi3->node());
3336
3337 const std::string kLogfile1_1 =
3338 aos::testing::TestTmpDir() + "/multi_logfile1/";
3339 const std::string kLogfile2_1 =
3340 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3341 const std::string kLogfile2_2 =
3342 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3343 const std::string kLogfile3_1 =
3344 aos::testing::TestTmpDir() + "/multi_logfile3/";
3345 util::UnlinkRecursive(kLogfile1_1);
3346 util::UnlinkRecursive(kLogfile2_1);
3347 util::UnlinkRecursive(kLogfile2_2);
3348 util::UnlinkRecursive(kLogfile3_1);
3349 const UUID pi1_boot0 = UUID::Random();
3350 const UUID pi2_boot0 = UUID::Random();
3351 const UUID pi2_boot1 = UUID::Random();
3352 const UUID pi3_boot0 = UUID::Random();
3353 {
3354 CHECK_EQ(pi1_index, 0u);
3355 CHECK_EQ(pi2_index, 1u);
3356 CHECK_EQ(pi3_index, 2u);
3357
3358 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3359 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3360 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3361 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3362
3363 time_converter.AddNextTimestamp(
3364 distributed_clock::epoch(),
3365 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3366 BootTimestamp::epoch()});
3367 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3368 time_converter.AddNextTimestamp(
3369 distributed_clock::epoch() + reboot_time,
3370 {BootTimestamp::epoch() + reboot_time,
3371 BootTimestamp{.boot = 1,
3372 .time = monotonic_clock::epoch() + reboot_time +
3373 chrono::seconds(100)},
3374 BootTimestamp::epoch() + reboot_time});
3375 }
3376
3377 std::vector<std::string> filenames;
3378 {
3379 LoggerState pi1_logger = MakeLoggerState(
3380 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3381 LoggerState pi3_logger = MakeLoggerState(
3382 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3383 {
3384 // And now start the logger.
3385 LoggerState pi2_logger = MakeLoggerState(
3386 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3387
3388 pi1_logger.StartLogger(kLogfile1_1);
3389 pi3_logger.StartLogger(kLogfile3_1);
3390 pi2_logger.StartLogger(kLogfile2_1);
3391
3392 event_loop_factory.RunFor(chrono::milliseconds(1005));
3393
3394 // Now that we've got a start time in the past, turn on data.
3395 std::unique_ptr<aos::EventLoop> ping_event_loop =
3396 pi1->MakeEventLoop("ping");
3397 Ping ping(ping_event_loop.get());
3398
3399 pi2->AlwaysStart<Pong>("pong");
3400
3401 event_loop_factory.RunFor(chrono::milliseconds(3000));
3402
3403 pi2_logger.AppendAllFilenames(&filenames);
3404
3405 // Disable any remote messages on pi2.
3406 pi1->Disconnect(pi2->node());
3407 pi2->Disconnect(pi1->node());
3408 }
3409 event_loop_factory.RunFor(chrono::milliseconds(995));
3410 // pi2 now reboots at 5 seconds.
3411 {
3412 event_loop_factory.RunFor(chrono::milliseconds(1000));
3413
3414 // Make local stuff happen before we start logging and connect the remote.
3415 pi2->AlwaysStart<Pong>("pong");
3416 std::unique_ptr<aos::EventLoop> ping_event_loop =
3417 pi1->MakeEventLoop("ping");
3418 Ping ping(ping_event_loop.get());
3419 event_loop_factory.RunFor(chrono::milliseconds(1005));
3420
3421 // Start logging again on pi2 after it is up.
3422 LoggerState pi2_logger = MakeLoggerState(
3423 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3424 pi2_logger.StartLogger(kLogfile2_2);
3425
3426 // And allow remote messages now that we have some local ones.
3427 pi1->Connect(pi2->node());
3428 pi2->Connect(pi1->node());
3429
3430 event_loop_factory.RunFor(chrono::milliseconds(1000));
3431
3432 event_loop_factory.RunFor(chrono::milliseconds(3000));
3433
3434 pi2_logger.AppendAllFilenames(&filenames);
3435 }
3436
3437 pi1_logger.AppendAllFilenames(&filenames);
3438 pi3_logger.AppendAllFilenames(&filenames);
3439 }
3440
3441 // Confirm that we can parse the result. LogReader has enough internal CHECKs
3442 // to confirm the right thing happened.
3443 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3444 auto result = ConfirmReadable(filenames);
3445
3446 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3447 EXPECT_THAT(result[0].second,
3448 ::testing::ElementsAre(realtime_clock::epoch() +
3449 chrono::microseconds(11000350)));
3450
3451 EXPECT_THAT(result[1].first,
3452 ::testing::ElementsAre(
3453 realtime_clock::epoch(),
3454 realtime_clock::epoch() + chrono::microseconds(107005000)));
3455 EXPECT_THAT(result[1].second,
3456 ::testing::ElementsAre(
3457 realtime_clock::epoch() + chrono::microseconds(4000150),
3458 realtime_clock::epoch() + chrono::microseconds(111000200)));
3459
3460 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3461 EXPECT_THAT(result[2].second,
3462 ::testing::ElementsAre(realtime_clock::epoch() +
3463 chrono::microseconds(11000150)));
3464
3465 auto start_stop_result = ConfirmReadable(
3466 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3467 realtime_clock::epoch() + chrono::milliseconds(3000));
3468
3469 EXPECT_THAT(
3470 start_stop_result[0].first,
3471 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3472 EXPECT_THAT(
3473 start_stop_result[0].second,
3474 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3475 EXPECT_THAT(
3476 start_stop_result[1].first,
3477 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3478 EXPECT_THAT(
3479 start_stop_result[1].second,
3480 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3481 EXPECT_THAT(
3482 start_stop_result[2].first,
3483 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3484 EXPECT_THAT(
3485 start_stop_result[2].second,
3486 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3487}
3488
3489// Tests that setting the start and stop flags across a reboot works as
3490// expected.
3491TEST(MultinodeRebootLoggerTest, RebootStartStopTimes) {
3492 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3493 aos::configuration::ReadConfig(ArtifactPath(
3494 "aos/events/logging/multinode_pingpong_split3_config.json"));
3495 message_bridge::TestingTimeConverter time_converter(
3496 configuration::NodesCount(&config.message()));
3497 SimulatedEventLoopFactory event_loop_factory(&config.message());
3498 event_loop_factory.SetTimeConverter(&time_converter);
3499 NodeEventLoopFactory *const pi1 =
3500 event_loop_factory.GetNodeEventLoopFactory("pi1");
3501 const size_t pi1_index = configuration::GetNodeIndex(
3502 event_loop_factory.configuration(), pi1->node());
3503 NodeEventLoopFactory *const pi2 =
3504 event_loop_factory.GetNodeEventLoopFactory("pi2");
3505 const size_t pi2_index = configuration::GetNodeIndex(
3506 event_loop_factory.configuration(), pi2->node());
3507 NodeEventLoopFactory *const pi3 =
3508 event_loop_factory.GetNodeEventLoopFactory("pi3");
3509 const size_t pi3_index = configuration::GetNodeIndex(
3510 event_loop_factory.configuration(), pi3->node());
3511
3512 const std::string kLogfile1_1 =
3513 aos::testing::TestTmpDir() + "/multi_logfile1/";
3514 const std::string kLogfile2_1 =
3515 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3516 const std::string kLogfile2_2 =
3517 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3518 const std::string kLogfile3_1 =
3519 aos::testing::TestTmpDir() + "/multi_logfile3/";
3520 util::UnlinkRecursive(kLogfile1_1);
3521 util::UnlinkRecursive(kLogfile2_1);
3522 util::UnlinkRecursive(kLogfile2_2);
3523 util::UnlinkRecursive(kLogfile3_1);
3524 {
3525 CHECK_EQ(pi1_index, 0u);
3526 CHECK_EQ(pi2_index, 1u);
3527 CHECK_EQ(pi3_index, 2u);
3528
3529 time_converter.AddNextTimestamp(
3530 distributed_clock::epoch(),
3531 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3532 BootTimestamp::epoch()});
3533 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3534 time_converter.AddNextTimestamp(
3535 distributed_clock::epoch() + reboot_time,
3536 {BootTimestamp::epoch() + reboot_time,
3537 BootTimestamp{.boot = 1,
3538 .time = monotonic_clock::epoch() + reboot_time},
3539 BootTimestamp::epoch() + reboot_time});
3540 }
3541
3542 std::vector<std::string> filenames;
3543 {
3544 LoggerState pi1_logger = MakeLoggerState(
3545 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3546 LoggerState pi3_logger = MakeLoggerState(
3547 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3548 {
3549 // And now start the logger.
3550 LoggerState pi2_logger = MakeLoggerState(
3551 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3552
3553 pi1_logger.StartLogger(kLogfile1_1);
3554 pi3_logger.StartLogger(kLogfile3_1);
3555 pi2_logger.StartLogger(kLogfile2_1);
3556
3557 event_loop_factory.RunFor(chrono::milliseconds(1005));
3558
3559 // Now that we've got a start time in the past, turn on data.
3560 std::unique_ptr<aos::EventLoop> ping_event_loop =
3561 pi1->MakeEventLoop("ping");
3562 Ping ping(ping_event_loop.get());
3563
3564 pi2->AlwaysStart<Pong>("pong");
3565
3566 event_loop_factory.RunFor(chrono::milliseconds(3000));
3567
3568 pi2_logger.AppendAllFilenames(&filenames);
3569 }
3570 event_loop_factory.RunFor(chrono::milliseconds(995));
3571 // pi2 now reboots at 5 seconds.
3572 {
3573 event_loop_factory.RunFor(chrono::milliseconds(1000));
3574
3575 // Make local stuff happen before we start logging and connect the remote.
3576 pi2->AlwaysStart<Pong>("pong");
3577 std::unique_ptr<aos::EventLoop> ping_event_loop =
3578 pi1->MakeEventLoop("ping");
3579 Ping ping(ping_event_loop.get());
3580 event_loop_factory.RunFor(chrono::milliseconds(5));
3581
3582 // Start logging again on pi2 after it is up.
3583 LoggerState pi2_logger = MakeLoggerState(
3584 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3585 pi2_logger.StartLogger(kLogfile2_2);
3586
3587 event_loop_factory.RunFor(chrono::milliseconds(5000));
3588
3589 pi2_logger.AppendAllFilenames(&filenames);
3590 }
3591
3592 pi1_logger.AppendAllFilenames(&filenames);
3593 pi3_logger.AppendAllFilenames(&filenames);
3594 }
3595
3596 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3597 auto result = ConfirmReadable(filenames);
3598
3599 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3600 EXPECT_THAT(result[0].second,
3601 ::testing::ElementsAre(realtime_clock::epoch() +
3602 chrono::microseconds(11000350)));
3603
3604 EXPECT_THAT(result[1].first,
3605 ::testing::ElementsAre(
3606 realtime_clock::epoch(),
3607 realtime_clock::epoch() + chrono::microseconds(6005000)));
3608 EXPECT_THAT(result[1].second,
3609 ::testing::ElementsAre(
3610 realtime_clock::epoch() + chrono::microseconds(4900150),
3611 realtime_clock::epoch() + chrono::microseconds(11000200)));
3612
3613 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3614 EXPECT_THAT(result[2].second,
3615 ::testing::ElementsAre(realtime_clock::epoch() +
3616 chrono::microseconds(11000150)));
3617
3618 // Confirm we observed the correct start and stop times. We should see the
3619 // reboot here.
3620 auto start_stop_result = ConfirmReadable(
3621 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3622 realtime_clock::epoch() + chrono::milliseconds(8000));
3623
3624 EXPECT_THAT(
3625 start_stop_result[0].first,
3626 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3627 EXPECT_THAT(
3628 start_stop_result[0].second,
3629 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3630 EXPECT_THAT(start_stop_result[1].first,
3631 ::testing::ElementsAre(
3632 realtime_clock::epoch() + chrono::seconds(2),
3633 realtime_clock::epoch() + chrono::microseconds(6005000)));
3634 EXPECT_THAT(start_stop_result[1].second,
3635 ::testing::ElementsAre(
3636 realtime_clock::epoch() + chrono::microseconds(4900150),
3637 realtime_clock::epoch() + chrono::seconds(8)));
3638 EXPECT_THAT(
3639 start_stop_result[2].first,
3640 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3641 EXPECT_THAT(
3642 start_stop_result[2].second,
3643 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3644}
3645
3646// Tests that we properly handle one direction being down.
3647TEST(MissingDirectionTest, OneDirection) {
3648 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3649 aos::configuration::ReadConfig(ArtifactPath(
3650 "aos/events/logging/multinode_pingpong_split4_config.json"));
3651 message_bridge::TestingTimeConverter time_converter(
3652 configuration::NodesCount(&config.message()));
3653 SimulatedEventLoopFactory event_loop_factory(&config.message());
3654 event_loop_factory.SetTimeConverter(&time_converter);
3655
3656 NodeEventLoopFactory *const pi1 =
3657 event_loop_factory.GetNodeEventLoopFactory("pi1");
3658 const size_t pi1_index = configuration::GetNodeIndex(
3659 event_loop_factory.configuration(), pi1->node());
3660 NodeEventLoopFactory *const pi2 =
3661 event_loop_factory.GetNodeEventLoopFactory("pi2");
3662 const size_t pi2_index = configuration::GetNodeIndex(
3663 event_loop_factory.configuration(), pi2->node());
3664 std::vector<std::string> filenames;
3665
3666 {
3667 CHECK_EQ(pi1_index, 0u);
3668 CHECK_EQ(pi2_index, 1u);
3669
3670 time_converter.AddNextTimestamp(
3671 distributed_clock::epoch(),
3672 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3673
3674 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3675 time_converter.AddNextTimestamp(
3676 distributed_clock::epoch() + reboot_time,
3677 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3678 BootTimestamp::epoch() + reboot_time});
3679 }
3680
3681 const std::string kLogfile2_1 =
3682 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3683 const std::string kLogfile1_1 =
3684 aos::testing::TestTmpDir() + "/multi_logfile1.1/";
3685 util::UnlinkRecursive(kLogfile2_1);
3686 util::UnlinkRecursive(kLogfile1_1);
3687
3688 pi2->Disconnect(pi1->node());
3689
3690 pi1->AlwaysStart<Ping>("ping");
3691 pi2->AlwaysStart<Pong>("pong");
3692
3693 {
3694 LoggerState pi2_logger = MakeLoggerState(
3695 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3696
3697 event_loop_factory.RunFor(chrono::milliseconds(95));
3698
3699 pi2_logger.StartLogger(kLogfile2_1);
3700
3701 event_loop_factory.RunFor(chrono::milliseconds(6000));
3702
3703 pi2->Connect(pi1->node());
3704
3705 LoggerState pi1_logger = MakeLoggerState(
3706 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3707 pi1_logger.StartLogger(kLogfile1_1);
3708
3709 event_loop_factory.RunFor(chrono::milliseconds(5000));
3710 pi1_logger.AppendAllFilenames(&filenames);
3711 pi2_logger.AppendAllFilenames(&filenames);
3712 }
3713
3714 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3715 ConfirmReadable(filenames);
3716}
3717
3718// Tests that we properly handle only one direction ever existing after a
3719// reboot.
3720TEST(MissingDirectionTest, OneDirectionAfterReboot) {
3721 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3722 aos::configuration::ReadConfig(ArtifactPath(
3723 "aos/events/logging/multinode_pingpong_split4_config.json"));
3724 message_bridge::TestingTimeConverter time_converter(
3725 configuration::NodesCount(&config.message()));
3726 SimulatedEventLoopFactory event_loop_factory(&config.message());
3727 event_loop_factory.SetTimeConverter(&time_converter);
3728
3729 NodeEventLoopFactory *const pi1 =
3730 event_loop_factory.GetNodeEventLoopFactory("pi1");
3731 const size_t pi1_index = configuration::GetNodeIndex(
3732 event_loop_factory.configuration(), pi1->node());
3733 NodeEventLoopFactory *const pi2 =
3734 event_loop_factory.GetNodeEventLoopFactory("pi2");
3735 const size_t pi2_index = configuration::GetNodeIndex(
3736 event_loop_factory.configuration(), pi2->node());
3737 std::vector<std::string> filenames;
3738
3739 {
3740 CHECK_EQ(pi1_index, 0u);
3741 CHECK_EQ(pi2_index, 1u);
3742
3743 time_converter.AddNextTimestamp(
3744 distributed_clock::epoch(),
3745 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3746
3747 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3748 time_converter.AddNextTimestamp(
3749 distributed_clock::epoch() + reboot_time,
3750 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3751 BootTimestamp::epoch() + reboot_time});
3752 }
3753
3754 const std::string kLogfile2_1 =
3755 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3756 util::UnlinkRecursive(kLogfile2_1);
3757
3758 pi1->AlwaysStart<Ping>("ping");
3759
3760 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3761 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3762 // second boot.
3763 {
3764 LoggerState pi2_logger = MakeLoggerState(
3765 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3766
3767 event_loop_factory.RunFor(chrono::milliseconds(95));
3768
3769 pi2_logger.StartLogger(kLogfile2_1);
3770
3771 event_loop_factory.RunFor(chrono::milliseconds(4000));
3772
3773 pi2->Disconnect(pi1->node());
3774
3775 event_loop_factory.RunFor(chrono::milliseconds(1000));
3776 pi1->AlwaysStart<Ping>("ping");
3777
3778 event_loop_factory.RunFor(chrono::milliseconds(5000));
3779 pi2_logger.AppendAllFilenames(&filenames);
3780 }
3781
3782 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3783 ConfirmReadable(filenames);
3784}
3785
3786// Tests that we properly handle only one direction ever existing after a reboot
3787// with only reliable data.
3788TEST(MissingDirectionTest, OneDirectionAfterRebootReliable) {
3789 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3790 aos::configuration::ReadConfig(ArtifactPath(
3791 "aos/events/logging/multinode_pingpong_split4_reliable_config.json"));
3792 message_bridge::TestingTimeConverter time_converter(
3793 configuration::NodesCount(&config.message()));
3794 SimulatedEventLoopFactory event_loop_factory(&config.message());
3795 event_loop_factory.SetTimeConverter(&time_converter);
3796
3797 NodeEventLoopFactory *const pi1 =
3798 event_loop_factory.GetNodeEventLoopFactory("pi1");
3799 const size_t pi1_index = configuration::GetNodeIndex(
3800 event_loop_factory.configuration(), pi1->node());
3801 NodeEventLoopFactory *const pi2 =
3802 event_loop_factory.GetNodeEventLoopFactory("pi2");
3803 const size_t pi2_index = configuration::GetNodeIndex(
3804 event_loop_factory.configuration(), pi2->node());
3805 std::vector<std::string> filenames;
3806
3807 {
3808 CHECK_EQ(pi1_index, 0u);
3809 CHECK_EQ(pi2_index, 1u);
3810
3811 time_converter.AddNextTimestamp(
3812 distributed_clock::epoch(),
3813 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3814
3815 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3816 time_converter.AddNextTimestamp(
3817 distributed_clock::epoch() + reboot_time,
3818 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3819 BootTimestamp::epoch() + reboot_time});
3820 }
3821
3822 const std::string kLogfile2_1 =
3823 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3824 util::UnlinkRecursive(kLogfile2_1);
3825
3826 pi1->AlwaysStart<Ping>("ping");
3827
3828 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3829 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3830 // second boot.
3831 {
3832 LoggerState pi2_logger = MakeLoggerState(
3833 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3834
3835 event_loop_factory.RunFor(chrono::milliseconds(95));
3836
3837 pi2_logger.StartLogger(kLogfile2_1);
3838
3839 event_loop_factory.RunFor(chrono::milliseconds(4000));
3840
3841 pi2->Disconnect(pi1->node());
3842
3843 event_loop_factory.RunFor(chrono::milliseconds(1000));
3844 pi1->AlwaysStart<Ping>("ping");
3845
3846 event_loop_factory.RunFor(chrono::milliseconds(5000));
3847 pi2_logger.AppendAllFilenames(&filenames);
3848 }
3849
3850 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3851 ConfirmReadable(filenames);
3852}
3853
Brian Smartte67d7112023-03-20 12:06:30 -07003854// Tests that we properly handle only one direction ever existing after a reboot
3855// with mixed unreliable vs reliable, where reliable has an earlier timestamp
3856// than unreliable.
3857TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase1) {
3858 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3859 aos::configuration::ReadConfig(ArtifactPath(
3860 "aos/events/logging/multinode_pingpong_split4_mixed1_config.json"));
3861 message_bridge::TestingTimeConverter time_converter(
3862 configuration::NodesCount(&config.message()));
3863 SimulatedEventLoopFactory event_loop_factory(&config.message());
3864 event_loop_factory.SetTimeConverter(&time_converter);
3865
3866 NodeEventLoopFactory *const pi1 =
3867 event_loop_factory.GetNodeEventLoopFactory("pi1");
3868 const size_t pi1_index = configuration::GetNodeIndex(
3869 event_loop_factory.configuration(), pi1->node());
3870 NodeEventLoopFactory *const pi2 =
3871 event_loop_factory.GetNodeEventLoopFactory("pi2");
3872 const size_t pi2_index = configuration::GetNodeIndex(
3873 event_loop_factory.configuration(), pi2->node());
3874 std::vector<std::string> filenames;
3875
3876 {
3877 CHECK_EQ(pi1_index, 0u);
3878 CHECK_EQ(pi2_index, 1u);
3879
3880 time_converter.AddNextTimestamp(
3881 distributed_clock::epoch(),
3882 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3883
3884 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3885 time_converter.AddNextTimestamp(
3886 distributed_clock::epoch() + reboot_time,
3887 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3888 BootTimestamp::epoch() + reboot_time});
3889 }
3890
3891 const std::string kLogfile2_1 =
3892 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3893 util::UnlinkRecursive(kLogfile2_1);
3894
3895 // The following sequence using the above reference config creates
3896 // a reliable message timestamp < unreliable message timestamp.
3897 {
3898 pi1->DisableStatistics();
3899 pi2->DisableStatistics();
3900
3901 event_loop_factory.RunFor(chrono::milliseconds(95));
3902
3903 pi1->AlwaysStart<Ping>("ping");
3904
3905 event_loop_factory.RunFor(chrono::milliseconds(5250));
3906
3907 pi1->EnableStatistics();
3908
3909 event_loop_factory.RunFor(chrono::milliseconds(1000));
3910
3911 LoggerState pi2_logger = MakeLoggerState(
3912 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3913
3914 pi2_logger.StartLogger(kLogfile2_1);
3915
3916 event_loop_factory.RunFor(chrono::milliseconds(5000));
3917 pi2_logger.AppendAllFilenames(&filenames);
3918 }
3919
3920 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3921 ConfirmReadable(filenames);
3922}
3923
3924// Tests that we properly handle only one direction ever existing after a reboot
3925// with mixed unreliable vs reliable, where unreliable has an earlier timestamp
3926// than reliable.
3927TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase2) {
3928 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3929 aos::configuration::ReadConfig(ArtifactPath(
3930 "aos/events/logging/multinode_pingpong_split4_mixed2_config.json"));
3931 message_bridge::TestingTimeConverter time_converter(
3932 configuration::NodesCount(&config.message()));
3933 SimulatedEventLoopFactory event_loop_factory(&config.message());
3934 event_loop_factory.SetTimeConverter(&time_converter);
3935
3936 NodeEventLoopFactory *const pi1 =
3937 event_loop_factory.GetNodeEventLoopFactory("pi1");
3938 const size_t pi1_index = configuration::GetNodeIndex(
3939 event_loop_factory.configuration(), pi1->node());
3940 NodeEventLoopFactory *const pi2 =
3941 event_loop_factory.GetNodeEventLoopFactory("pi2");
3942 const size_t pi2_index = configuration::GetNodeIndex(
3943 event_loop_factory.configuration(), pi2->node());
3944 std::vector<std::string> filenames;
3945
3946 {
3947 CHECK_EQ(pi1_index, 0u);
3948 CHECK_EQ(pi2_index, 1u);
3949
3950 time_converter.AddNextTimestamp(
3951 distributed_clock::epoch(),
3952 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3953
3954 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3955 time_converter.AddNextTimestamp(
3956 distributed_clock::epoch() + reboot_time,
3957 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3958 BootTimestamp::epoch() + reboot_time});
3959 }
3960
3961 const std::string kLogfile2_1 =
3962 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3963 util::UnlinkRecursive(kLogfile2_1);
3964
3965 // The following sequence using the above reference config creates
3966 // an unreliable message timestamp < reliable message timestamp.
3967 {
3968 pi1->DisableStatistics();
3969 pi2->DisableStatistics();
3970
3971 event_loop_factory.RunFor(chrono::milliseconds(95));
3972
3973 pi1->AlwaysStart<Ping>("ping");
3974
3975 event_loop_factory.RunFor(chrono::milliseconds(5250));
3976
3977 pi1->EnableStatistics();
3978
3979 event_loop_factory.RunFor(chrono::milliseconds(1000));
3980
3981 LoggerState pi2_logger = MakeLoggerState(
3982 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3983
3984 pi2_logger.StartLogger(kLogfile2_1);
3985
3986 event_loop_factory.RunFor(chrono::milliseconds(5000));
3987 pi2_logger.AppendAllFilenames(&filenames);
3988 }
3989
3990 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3991 ConfirmReadable(filenames);
3992}
3993
Naman Guptaa63aa132023-03-22 20:06:34 -07003994// Tests that we properly handle what used to be a time violation in one
3995// direction. This can occur when one direction goes down after sending some
3996// data, but the other keeps working. The down direction ends up resolving to a
3997// straight line in the noncausal filter, where the direction which is still up
3998// can cross that line. Really, time progressed along just fine but we assumed
3999// that the offset was a line when it could have deviated by up to 1ms/second.
4000TEST_P(MultinodeLoggerTest, OneDirectionTimeDrift) {
4001 std::vector<std::string> filenames;
4002
4003 CHECK_EQ(pi1_index_, 0u);
4004 CHECK_EQ(pi2_index_, 1u);
4005
4006 time_converter_.AddNextTimestamp(
4007 distributed_clock::epoch(),
4008 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4009
4010 const chrono::nanoseconds before_disconnect_duration =
4011 time_converter_.AddMonotonic(
4012 {chrono::milliseconds(1000), chrono::milliseconds(1000)});
4013
4014 const chrono::nanoseconds test_duration =
4015 time_converter_.AddMonotonic(
4016 {chrono::milliseconds(1000), chrono::milliseconds(1000)}) +
4017 time_converter_.AddMonotonic(
4018 {chrono::milliseconds(10000),
4019 chrono::milliseconds(10000) - chrono::milliseconds(5)}) +
4020 time_converter_.AddMonotonic(
4021 {chrono::milliseconds(10000),
4022 chrono::milliseconds(10000) + chrono::milliseconds(5)});
4023
4024 const std::string kLogfile =
4025 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
4026 util::UnlinkRecursive(kLogfile);
4027
4028 {
4029 LoggerState pi2_logger = MakeLogger(pi2_);
4030 pi2_logger.StartLogger(kLogfile);
4031 event_loop_factory_.RunFor(before_disconnect_duration);
4032
4033 pi2_->Disconnect(pi1_->node());
4034
4035 event_loop_factory_.RunFor(test_duration);
4036 pi2_->Connect(pi1_->node());
4037
4038 event_loop_factory_.RunFor(chrono::milliseconds(5000));
4039 pi2_logger.AppendAllFilenames(&filenames);
4040 }
4041
4042 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4043 ConfirmReadable(filenames);
4044}
4045
4046// Tests that we can replay a logfile that has timestamps such that at least one
4047// node's epoch is at a positive distributed_clock (and thus will have to be
4048// booted after the other node(s)).
4049TEST_P(MultinodeLoggerTest, StartOneNodeBeforeOther) {
4050 std::vector<std::string> filenames;
4051
4052 CHECK_EQ(pi1_index_, 0u);
4053 CHECK_EQ(pi2_index_, 1u);
4054
4055 time_converter_.AddNextTimestamp(
4056 distributed_clock::epoch(),
4057 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4058
4059 const chrono::nanoseconds before_reboot_duration = chrono::milliseconds(1000);
4060 time_converter_.RebootAt(
4061 0, distributed_clock::time_point(before_reboot_duration));
4062
4063 const chrono::nanoseconds test_duration = time_converter_.AddMonotonic(
4064 {chrono::milliseconds(10000), chrono::milliseconds(10000)});
4065
4066 const std::string kLogfile =
4067 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
4068 util::UnlinkRecursive(kLogfile);
4069
4070 pi2_->Disconnect(pi1_->node());
4071 pi1_->Disconnect(pi2_->node());
4072
4073 {
4074 LoggerState pi2_logger = MakeLogger(pi2_);
4075
4076 pi2_logger.StartLogger(kLogfile);
4077 event_loop_factory_.RunFor(before_reboot_duration);
4078
4079 pi2_->Connect(pi1_->node());
4080 pi1_->Connect(pi2_->node());
4081
4082 event_loop_factory_.RunFor(test_duration);
4083
4084 pi2_logger.AppendAllFilenames(&filenames);
4085 }
4086
4087 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4088 ConfirmReadable(filenames);
4089
4090 {
4091 LogReader reader(sorted_parts);
4092 SimulatedEventLoopFactory replay_factory(reader.configuration());
4093 reader.RegisterWithoutStarting(&replay_factory);
4094
4095 NodeEventLoopFactory *const replay_node =
4096 reader.event_loop_factory()->GetNodeEventLoopFactory("pi1");
4097
4098 std::unique_ptr<EventLoop> test_event_loop =
4099 replay_node->MakeEventLoop("test_reader");
4100 replay_node->OnStartup([replay_node]() {
4101 // Check that we didn't boot until at least t=0.
4102 CHECK_LE(monotonic_clock::epoch(), replay_node->monotonic_now());
4103 });
4104 test_event_loop->OnRun([&test_event_loop]() {
4105 // Check that we didn't boot until at least t=0.
4106 EXPECT_LE(monotonic_clock::epoch(), test_event_loop->monotonic_now());
4107 });
4108 reader.event_loop_factory()->Run();
4109 reader.Deregister();
4110 }
4111}
4112
4113// Tests that when we have a loop without all the logs at all points in time, we
4114// can sort it properly.
4115TEST(MultinodeLoggerLoopTest, Loop) {
4116 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4117 aos::configuration::ReadConfig(ArtifactPath(
4118 "aos/events/logging/multinode_pingpong_triangle_split_config.json"));
4119 message_bridge::TestingTimeConverter time_converter(
4120 configuration::NodesCount(&config.message()));
4121 SimulatedEventLoopFactory event_loop_factory(&config.message());
4122 event_loop_factory.SetTimeConverter(&time_converter);
4123
4124 NodeEventLoopFactory *const pi1 =
4125 event_loop_factory.GetNodeEventLoopFactory("pi1");
4126 NodeEventLoopFactory *const pi2 =
4127 event_loop_factory.GetNodeEventLoopFactory("pi2");
4128 NodeEventLoopFactory *const pi3 =
4129 event_loop_factory.GetNodeEventLoopFactory("pi3");
4130
4131 const std::string kLogfile1_1 =
4132 aos::testing::TestTmpDir() + "/multi_logfile1/";
4133 const std::string kLogfile2_1 =
4134 aos::testing::TestTmpDir() + "/multi_logfile2/";
4135 const std::string kLogfile3_1 =
4136 aos::testing::TestTmpDir() + "/multi_logfile3/";
4137 util::UnlinkRecursive(kLogfile1_1);
4138 util::UnlinkRecursive(kLogfile2_1);
4139 util::UnlinkRecursive(kLogfile3_1);
4140
4141 {
4142 // Make pi1 boot before everything else.
4143 time_converter.AddNextTimestamp(
4144 distributed_clock::epoch(),
4145 {BootTimestamp::epoch(),
4146 BootTimestamp::epoch() - chrono::milliseconds(100),
4147 BootTimestamp::epoch() - chrono::milliseconds(300)});
4148 }
4149
4150 // We want to setup a situation such that 2 of the 3 legs of the loop are very
4151 // confident about time being X, and the third leg is pulling the average off
4152 // to one side.
4153 //
4154 // It's easiest to visualize this in timestamp_plotter.
4155
4156 std::vector<std::string> filenames;
4157 {
4158 // Have pi1 send out a reliable message at startup. This sets up a long
4159 // forwarding time message at the start to bias time.
4160 std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
4161 {
4162 aos::Sender<examples::Ping> ping_sender =
4163 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
4164
4165 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
4166 examples::Ping::Builder ping_builder =
4167 builder.MakeBuilder<examples::Ping>();
4168 CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
4169 }
4170
4171 // Wait a while so there's enough data to let the worst case be rather off.
4172 event_loop_factory.RunFor(chrono::seconds(1000));
4173
4174 // Now start a receiving node first. This sets up 2 tight bounds between 2
4175 // of the nodes.
4176 LoggerState pi2_logger = MakeLoggerState(
4177 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4178 pi2_logger.StartLogger(kLogfile2_1);
4179
4180 event_loop_factory.RunFor(chrono::seconds(100));
4181
4182 // And now start the third leg.
4183 LoggerState pi3_logger = MakeLoggerState(
4184 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4185 pi3_logger.StartLogger(kLogfile3_1);
4186
4187 LoggerState pi1_logger = MakeLoggerState(
4188 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4189 pi1_logger.StartLogger(kLogfile1_1);
4190
4191 event_loop_factory.RunFor(chrono::seconds(100));
4192
4193 pi1_logger.AppendAllFilenames(&filenames);
4194 pi2_logger.AppendAllFilenames(&filenames);
4195 pi3_logger.AppendAllFilenames(&filenames);
4196 }
4197
4198 // Make sure we can read this.
4199 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4200 auto result = ConfirmReadable(filenames);
4201}
4202
Austin Schuh08dba8f2023-05-01 08:29:30 -07004203// Tests that RestartLogging works in the simple case. Unfortunately, the
4204// failure cases involve simulating time elapsing in callbacks, which is really
4205// hard. The best we can reasonably do is make sure 2 back to back logs are
4206// parseable together.
4207TEST_P(MultinodeLoggerTest, RestartLogging) {
4208 time_converter_.AddMonotonic(
4209 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
4210 std::vector<std::string> filenames;
4211 {
4212 LoggerState pi1_logger = MakeLogger(pi1_);
4213
4214 event_loop_factory_.RunFor(chrono::milliseconds(95));
4215
4216 StartLogger(&pi1_logger, logfile_base1_);
4217 aos::monotonic_clock::time_point last_rotation_time =
4218 pi1_logger.event_loop->monotonic_now();
4219 pi1_logger.logger->set_on_logged_period([&] {
4220 const auto now = pi1_logger.event_loop->monotonic_now();
4221 if (now > last_rotation_time + std::chrono::seconds(5)) {
4222 pi1_logger.AppendAllFilenames(&filenames);
4223 std::unique_ptr<MultiNodeFilesLogNamer> namer =
4224 pi1_logger.MakeLogNamer(logfile_base2_);
4225 pi1_logger.log_namer = namer.get();
4226
4227 pi1_logger.logger->RestartLogging(std::move(namer));
4228 last_rotation_time = now;
4229 }
4230 });
4231
4232 event_loop_factory_.RunFor(chrono::milliseconds(7000));
4233
4234 pi1_logger.AppendAllFilenames(&filenames);
4235 }
4236
4237 for (const auto &x : filenames) {
4238 LOG(INFO) << x;
4239 }
4240
4241 EXPECT_GE(filenames.size(), 2u);
4242
4243 ConfirmReadable(filenames);
4244
4245 // TODO(austin): It would be good to confirm that any one time messages end up
4246 // in both logs correctly.
4247}
4248
Naman Guptaa63aa132023-03-22 20:06:34 -07004249} // namespace testing
4250} // namespace logger
4251} // namespace aos