blob: f0104667da3cf1b8dd1307f912c4099a7a563bb3 [file] [log] [blame]
Brian Smartte67d7112023-03-20 12:06:30 -07001#include <algorithm>
2
Philipp Schrader790cb542023-07-05 21:06:52 -07003#include "gmock/gmock.h"
4#include "gtest/gtest.h"
5
Naman Guptaa63aa132023-03-22 20:06:34 -07006#include "aos/events/logging/log_reader.h"
7#include "aos/events/logging/multinode_logger_test_lib.h"
8#include "aos/events/message_counter.h"
9#include "aos/events/ping_lib.h"
10#include "aos/events/pong_lib.h"
11#include "aos/network/remote_message_generated.h"
12#include "aos/network/timestamp_generated.h"
13#include "aos/testing/tmpdir.h"
Naman Guptaa63aa132023-03-22 20:06:34 -070014
15namespace aos {
16namespace logger {
17namespace testing {
18
19namespace chrono = std::chrono;
20using aos::message_bridge::RemoteMessage;
21using aos::testing::ArtifactPath;
22using aos::testing::MessageCounter;
23
Naman Guptaa63aa132023-03-22 20:06:34 -070024INSTANTIATE_TEST_SUITE_P(
25 All, MultinodeLoggerTest,
26 ::testing::Combine(
27 ::testing::Values(
28 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080029 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070030 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080031 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070032 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
33
34INSTANTIATE_TEST_SUITE_P(
35 All, MultinodeLoggerDeathTest,
36 ::testing::Combine(
37 ::testing::Values(
38 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080039 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070040 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080041 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070042 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
43
44// Tests that we can write and read simple multi-node log files.
45TEST_P(MultinodeLoggerTest, SimpleMultiNode) {
46 std::vector<std::string> actual_filenames;
47 time_converter_.StartEqual();
48
49 {
50 LoggerState pi1_logger = MakeLogger(pi1_);
51 LoggerState pi2_logger = MakeLogger(pi2_);
52
53 event_loop_factory_.RunFor(chrono::milliseconds(95));
54
55 StartLogger(&pi1_logger);
56 StartLogger(&pi2_logger);
57
58 event_loop_factory_.RunFor(chrono::milliseconds(20000));
59 pi1_logger.AppendAllFilenames(&actual_filenames);
60 pi2_logger.AppendAllFilenames(&actual_filenames);
61 }
62
63 ASSERT_THAT(actual_filenames,
64 ::testing::UnorderedElementsAreArray(logfiles_));
65
66 {
67 std::set<std::string> logfile_uuids;
68 std::set<std::string> parts_uuids;
69 // Confirm that we have the expected number of UUIDs for both the logfile
70 // UUIDs and parts UUIDs.
71 std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
72 for (std::string_view f : logfiles_) {
73 log_header.emplace_back(ReadHeader(f).value());
74 if (!log_header.back().message().has_configuration()) {
75 logfile_uuids.insert(
76 log_header.back().message().log_event_uuid()->str());
77 parts_uuids.insert(log_header.back().message().parts_uuid()->str());
78 }
79 }
80
81 EXPECT_EQ(logfile_uuids.size(), 2u);
82 if (shared()) {
83 EXPECT_EQ(parts_uuids.size(), 7u);
84 } else {
85 EXPECT_EQ(parts_uuids.size(), 8u);
86 }
87
88 // And confirm everything is on the correct node.
89 EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
90 EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi1");
91 EXPECT_EQ(log_header[4].message().node()->name()->string_view(), "pi1");
92
93 EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
94 EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi2");
95
96 EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi2");
97 EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi2");
98 EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi2");
99
100 EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi1");
101 EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi1");
102
103 EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
104 EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
105
106 if (shared()) {
107 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
108 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi2");
109 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi2");
110
111 EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi1");
112 EXPECT_EQ(log_header[18].message().node()->name()->string_view(), "pi1");
113 } else {
114 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
115 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi2");
116
117 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi1");
118 EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi1");
119
120 EXPECT_EQ(log_header[18].message().node()->name()->string_view(), "pi2");
121 EXPECT_EQ(log_header[19].message().node()->name()->string_view(), "pi2");
122 }
123
124 // And the parts index matches.
125 EXPECT_EQ(log_header[2].message().parts_index(), 0);
126 EXPECT_EQ(log_header[3].message().parts_index(), 1);
127 EXPECT_EQ(log_header[4].message().parts_index(), 2);
128
129 EXPECT_EQ(log_header[5].message().parts_index(), 0);
130 EXPECT_EQ(log_header[6].message().parts_index(), 1);
131
132 EXPECT_EQ(log_header[7].message().parts_index(), 0);
133 EXPECT_EQ(log_header[8].message().parts_index(), 1);
134 EXPECT_EQ(log_header[9].message().parts_index(), 2);
135
136 EXPECT_EQ(log_header[10].message().parts_index(), 0);
137 EXPECT_EQ(log_header[11].message().parts_index(), 1);
138
139 EXPECT_EQ(log_header[12].message().parts_index(), 0);
140 EXPECT_EQ(log_header[13].message().parts_index(), 1);
141
142 if (shared()) {
143 EXPECT_EQ(log_header[14].message().parts_index(), 0);
144 EXPECT_EQ(log_header[15].message().parts_index(), 1);
145 EXPECT_EQ(log_header[16].message().parts_index(), 2);
146
147 EXPECT_EQ(log_header[17].message().parts_index(), 0);
148 EXPECT_EQ(log_header[18].message().parts_index(), 1);
149 } else {
150 EXPECT_EQ(log_header[14].message().parts_index(), 0);
151 EXPECT_EQ(log_header[15].message().parts_index(), 1);
152
153 EXPECT_EQ(log_header[16].message().parts_index(), 0);
154 EXPECT_EQ(log_header[17].message().parts_index(), 1);
155
156 EXPECT_EQ(log_header[18].message().parts_index(), 0);
157 EXPECT_EQ(log_header[19].message().parts_index(), 1);
158 }
159 }
160
161 const std::vector<LogFile> sorted_log_files = SortParts(logfiles_);
162 {
163 using ::testing::UnorderedElementsAre;
164 std::shared_ptr<const aos::Configuration> config =
165 sorted_log_files[0].config;
166
167 // Timing reports, pings
168 EXPECT_THAT(CountChannelsData(config, logfiles_[2]),
169 UnorderedElementsAre(
170 std::make_tuple("/pi1/aos",
171 "aos.message_bridge.ServerStatistics", 1),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700172 std::make_tuple("/test", "aos.examples.Ping", 1),
173 std::make_tuple("/pi1/aos", "aos.examples.Ping", 1)))
Naman Guptaa63aa132023-03-22 20:06:34 -0700174 << " : " << logfiles_[2];
175 {
176 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700177 std::make_tuple("/pi1/aos", "aos.examples.Ping", 10),
Naman Guptaa63aa132023-03-22 20:06:34 -0700178 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 1),
179 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
180 1)};
181 if (!std::get<0>(GetParam()).shared) {
182 channel_counts.push_back(
183 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
184 "aos-message_bridge-Timestamp",
185 "aos.message_bridge.RemoteMessage", 1));
186 }
187 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
188 ::testing::UnorderedElementsAreArray(channel_counts))
189 << " : " << logfiles_[3];
190 }
191 {
192 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700193 std::make_tuple("/pi1/aos", "aos.examples.Ping", 1990),
Naman Guptaa63aa132023-03-22 20:06:34 -0700194 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 199),
195 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
196 20),
197 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
198 199),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700199 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
Naman Guptaa63aa132023-03-22 20:06:34 -0700200 std::make_tuple("/test", "aos.examples.Ping", 2000)};
201 if (!std::get<0>(GetParam()).shared) {
202 channel_counts.push_back(
203 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
204 "aos-message_bridge-Timestamp",
205 "aos.message_bridge.RemoteMessage", 199));
206 }
207 EXPECT_THAT(CountChannelsData(config, logfiles_[4]),
208 ::testing::UnorderedElementsAreArray(channel_counts))
209 << " : " << logfiles_[4];
210 }
211 // Timestamps for pong
212 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
213 UnorderedElementsAre())
214 << " : " << logfiles_[2];
215 EXPECT_THAT(
216 CountChannelsTimestamp(config, logfiles_[3]),
217 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
218 << " : " << logfiles_[3];
219 EXPECT_THAT(
220 CountChannelsTimestamp(config, logfiles_[4]),
221 UnorderedElementsAre(
222 std::make_tuple("/test", "aos.examples.Pong", 2000),
223 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)))
224 << " : " << logfiles_[4];
225
226 // Pong data.
227 EXPECT_THAT(
228 CountChannelsData(config, logfiles_[5]),
229 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 91)))
230 << " : " << logfiles_[5];
231 EXPECT_THAT(CountChannelsData(config, logfiles_[6]),
232 UnorderedElementsAre(
233 std::make_tuple("/test", "aos.examples.Pong", 1910)))
234 << " : " << logfiles_[6];
235
236 // No timestamps
237 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
238 UnorderedElementsAre())
239 << " : " << logfiles_[5];
240 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[6]),
241 UnorderedElementsAre())
242 << " : " << logfiles_[6];
243
244 // Timing reports and pongs.
245 EXPECT_THAT(CountChannelsData(config, logfiles_[7]),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700246 UnorderedElementsAre(
247 std::make_tuple("/pi2/aos", "aos.examples.Ping", 1),
248 std::make_tuple("/pi2/aos",
249 "aos.message_bridge.ServerStatistics", 1)))
Naman Guptaa63aa132023-03-22 20:06:34 -0700250 << " : " << logfiles_[7];
251 EXPECT_THAT(
252 CountChannelsData(config, logfiles_[8]),
253 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
254 << " : " << logfiles_[8];
255 EXPECT_THAT(
256 CountChannelsData(config, logfiles_[9]),
257 UnorderedElementsAre(
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700258 std::make_tuple("/pi2/aos", "aos.examples.Ping", 2000),
Naman Guptaa63aa132023-03-22 20:06:34 -0700259 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
260 std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
261 20),
262 std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
263 200),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700264 std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
Naman Guptaa63aa132023-03-22 20:06:34 -0700265 std::make_tuple("/test", "aos.examples.Pong", 2000)))
266 << " : " << logfiles_[9];
267 // And ping timestamps.
268 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[7]),
269 UnorderedElementsAre())
270 << " : " << logfiles_[7];
271 EXPECT_THAT(
272 CountChannelsTimestamp(config, logfiles_[8]),
273 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
274 << " : " << logfiles_[8];
275 EXPECT_THAT(
276 CountChannelsTimestamp(config, logfiles_[9]),
277 UnorderedElementsAre(
278 std::make_tuple("/test", "aos.examples.Ping", 2000),
279 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
280 << " : " << logfiles_[9];
281
282 // And then test that the remotely logged timestamp data files only have
283 // timestamps in them.
284 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
285 UnorderedElementsAre())
286 << " : " << logfiles_[10];
287 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
288 UnorderedElementsAre())
289 << " : " << logfiles_[11];
290 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[12]),
291 UnorderedElementsAre())
292 << " : " << logfiles_[12];
293 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[13]),
294 UnorderedElementsAre())
295 << " : " << logfiles_[13];
296
297 EXPECT_THAT(CountChannelsData(config, logfiles_[10]),
298 UnorderedElementsAre(std::make_tuple(
299 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
300 << " : " << logfiles_[10];
301 EXPECT_THAT(CountChannelsData(config, logfiles_[11]),
302 UnorderedElementsAre(std::make_tuple(
303 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
304 << " : " << logfiles_[11];
305
306 EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
307 UnorderedElementsAre(std::make_tuple(
308 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
309 << " : " << logfiles_[12];
310 EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
311 UnorderedElementsAre(std::make_tuple(
312 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
313 << " : " << logfiles_[13];
314
315 // Timestamps from pi2 on pi1, and the other way.
316 if (shared()) {
317 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
318 UnorderedElementsAre())
319 << " : " << logfiles_[14];
320 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
321 UnorderedElementsAre())
322 << " : " << logfiles_[15];
323 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
324 UnorderedElementsAre())
325 << " : " << logfiles_[16];
326 EXPECT_THAT(CountChannelsData(config, logfiles_[17]),
327 UnorderedElementsAre())
328 << " : " << logfiles_[17];
329 EXPECT_THAT(CountChannelsData(config, logfiles_[18]),
330 UnorderedElementsAre())
331 << " : " << logfiles_[18];
332
333 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
334 UnorderedElementsAre(
335 std::make_tuple("/test", "aos.examples.Ping", 1)))
336 << " : " << logfiles_[14];
337 EXPECT_THAT(
338 CountChannelsTimestamp(config, logfiles_[15]),
339 UnorderedElementsAre(
340 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 9),
341 std::make_tuple("/test", "aos.examples.Ping", 90)))
342 << " : " << logfiles_[15];
343 EXPECT_THAT(
344 CountChannelsTimestamp(config, logfiles_[16]),
345 UnorderedElementsAre(
346 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 191),
347 std::make_tuple("/test", "aos.examples.Ping", 1910)))
348 << " : " << logfiles_[16];
349 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
350 UnorderedElementsAre(std::make_tuple(
351 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
352 << " : " << logfiles_[17];
353 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[18]),
354 UnorderedElementsAre(std::make_tuple(
355 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
356 << " : " << logfiles_[18];
357 } else {
358 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
359 UnorderedElementsAre())
360 << " : " << logfiles_[14];
361 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
362 UnorderedElementsAre())
363 << " : " << logfiles_[15];
364 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
365 UnorderedElementsAre())
366 << " : " << logfiles_[16];
367 EXPECT_THAT(CountChannelsData(config, logfiles_[17]),
368 UnorderedElementsAre())
369 << " : " << logfiles_[17];
370 EXPECT_THAT(CountChannelsData(config, logfiles_[18]),
371 UnorderedElementsAre())
372 << " : " << logfiles_[18];
373 EXPECT_THAT(CountChannelsData(config, logfiles_[19]),
374 UnorderedElementsAre())
375 << " : " << logfiles_[19];
376
377 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
378 UnorderedElementsAre(std::make_tuple(
379 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
380 << " : " << logfiles_[14];
381 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
382 UnorderedElementsAre(std::make_tuple(
383 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
384 << " : " << logfiles_[15];
385 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[16]),
386 UnorderedElementsAre(std::make_tuple(
387 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
388 << " : " << logfiles_[16];
389 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
390 UnorderedElementsAre(std::make_tuple(
391 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
392 << " : " << logfiles_[17];
393 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[18]),
394 UnorderedElementsAre(
395 std::make_tuple("/test", "aos.examples.Ping", 91)))
396 << " : " << logfiles_[18];
397 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[19]),
398 UnorderedElementsAre(
399 std::make_tuple("/test", "aos.examples.Ping", 1910)))
400 << " : " << logfiles_[19];
401 }
402 }
403
404 LogReader reader(sorted_log_files);
405
406 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
407 log_reader_factory.set_send_delay(chrono::microseconds(0));
408
409 // This sends out the fetched messages and advances time to the start of the
410 // log file.
411 reader.Register(&log_reader_factory);
412
413 const Node *pi1 =
414 configuration::GetNode(log_reader_factory.configuration(), "pi1");
415 const Node *pi2 =
416 configuration::GetNode(log_reader_factory.configuration(), "pi2");
417
418 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
419 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
420 LOG(INFO) << "now pi1 "
421 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
422 LOG(INFO) << "now pi2 "
423 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
424
425 EXPECT_THAT(reader.LoggedNodes(),
426 ::testing::ElementsAre(
427 configuration::GetNode(reader.logged_configuration(), pi1),
428 configuration::GetNode(reader.logged_configuration(), pi2)));
429
430 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
431
432 std::unique_ptr<EventLoop> pi1_event_loop =
433 log_reader_factory.MakeEventLoop("test", pi1);
434 std::unique_ptr<EventLoop> pi2_event_loop =
435 log_reader_factory.MakeEventLoop("test", pi2);
436
437 int pi1_ping_count = 10;
438 int pi2_ping_count = 10;
439 int pi1_pong_count = 10;
440 int pi2_pong_count = 10;
441
442 // Confirm that the ping value matches.
443 pi1_event_loop->MakeWatcher(
444 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
445 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
446 << pi1_event_loop->context().monotonic_remote_time << " -> "
447 << pi1_event_loop->context().monotonic_event_time;
448 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
449 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
450 pi1_ping_count * chrono::milliseconds(10) +
451 monotonic_clock::epoch());
452 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
453 pi1_ping_count * chrono::milliseconds(10) +
454 realtime_clock::epoch());
455 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
456 pi1_event_loop->context().monotonic_event_time);
457 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
458 pi1_event_loop->context().realtime_event_time);
459
460 ++pi1_ping_count;
461 });
462 pi2_event_loop->MakeWatcher(
463 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
464 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
465 << pi2_event_loop->context().monotonic_remote_time << " -> "
466 << pi2_event_loop->context().monotonic_event_time;
467 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
468
469 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
470 pi2_ping_count * chrono::milliseconds(10) +
471 monotonic_clock::epoch());
472 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
473 pi2_ping_count * chrono::milliseconds(10) +
474 realtime_clock::epoch());
475 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time +
476 chrono::microseconds(150),
477 pi2_event_loop->context().monotonic_event_time);
478 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time +
479 chrono::microseconds(150),
480 pi2_event_loop->context().realtime_event_time);
481 ++pi2_ping_count;
482 });
483
484 constexpr ssize_t kQueueIndexOffset = -9;
485 // Confirm that the ping and pong counts both match, and the value also
486 // matches.
487 pi1_event_loop->MakeWatcher(
488 "/test", [&pi1_event_loop, &pi1_ping_count,
489 &pi1_pong_count](const examples::Pong &pong) {
490 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
491 << pi1_event_loop->context().monotonic_remote_time << " -> "
492 << pi1_event_loop->context().monotonic_event_time;
493
494 EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
495 pi1_pong_count + kQueueIndexOffset);
496 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
497 chrono::microseconds(200) +
498 pi1_pong_count * chrono::milliseconds(10) +
499 monotonic_clock::epoch());
500 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
501 chrono::microseconds(200) +
502 pi1_pong_count * chrono::milliseconds(10) +
503 realtime_clock::epoch());
504
505 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
506 chrono::microseconds(150),
507 pi1_event_loop->context().monotonic_event_time);
508 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
509 chrono::microseconds(150),
510 pi1_event_loop->context().realtime_event_time);
511
512 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
513 ++pi1_pong_count;
514 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
515 });
516 pi2_event_loop->MakeWatcher(
517 "/test", [&pi2_event_loop, &pi2_ping_count,
518 &pi2_pong_count](const examples::Pong &pong) {
519 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
520 << pi2_event_loop->context().monotonic_remote_time << " -> "
521 << pi2_event_loop->context().monotonic_event_time;
522
523 EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
524 pi2_pong_count + kQueueIndexOffset);
525
526 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
527 chrono::microseconds(200) +
528 pi2_pong_count * chrono::milliseconds(10) +
529 monotonic_clock::epoch());
530 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
531 chrono::microseconds(200) +
532 pi2_pong_count * chrono::milliseconds(10) +
533 realtime_clock::epoch());
534
535 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
536 pi2_event_loop->context().monotonic_event_time);
537 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
538 pi2_event_loop->context().realtime_event_time);
539
540 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
541 ++pi2_pong_count;
542 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
543 });
544
545 log_reader_factory.Run();
546 EXPECT_EQ(pi1_ping_count, 2010);
547 EXPECT_EQ(pi2_ping_count, 2010);
548 EXPECT_EQ(pi1_pong_count, 2010);
549 EXPECT_EQ(pi2_pong_count, 2010);
550
551 reader.Deregister();
552}
553
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600554// MultinodeLoggerTest that tests the mutate callback works across multiple
555// nodes with remapping
556TEST_P(MultinodeLoggerTest, MultiNodeRemapMutateCallback) {
557 time_converter_.StartEqual();
558 std::vector<std::string> actual_filenames;
559
560 {
561 LoggerState pi1_logger = MakeLogger(pi1_);
562 LoggerState pi2_logger = MakeLogger(pi2_);
563
564 event_loop_factory_.RunFor(chrono::milliseconds(95));
565
566 StartLogger(&pi1_logger);
567 StartLogger(&pi2_logger);
568
569 event_loop_factory_.RunFor(chrono::milliseconds(20000));
570 pi1_logger.AppendAllFilenames(&actual_filenames);
571 pi2_logger.AppendAllFilenames(&actual_filenames);
572 }
573
574 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
575
576 LogReader reader(sorted_parts, &config_.message());
577 // Remap just on pi1.
578 reader.RemapLoggedChannel<examples::Pong>(
579 "/test", configuration::GetNode(reader.configuration(), "pi1"));
580
581 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
582
583 int pong_count = 0;
584 // Adds a callback which mutates the value of the pong message before the
585 // message is sent which is the feature we are testing here
586 reader.AddBeforeSendCallback("/test",
587 [&pong_count](aos::examples::Pong *pong) {
588 pong->mutate_value(pong->value() + 1);
589 pong_count = pong->value();
590 });
591
592 // This sends out the fetched messages and advances time to the start of the
593 // log file.
594 reader.Register(&log_reader_factory);
595
596 const Node *pi1 =
597 configuration::GetNode(log_reader_factory.configuration(), "pi1");
598 const Node *pi2 =
599 configuration::GetNode(log_reader_factory.configuration(), "pi2");
600
601 EXPECT_THAT(reader.LoggedNodes(),
602 ::testing::ElementsAre(
603 configuration::GetNode(reader.logged_configuration(), pi1),
604 configuration::GetNode(reader.logged_configuration(), pi2)));
605
606 std::unique_ptr<EventLoop> pi1_event_loop =
607 log_reader_factory.MakeEventLoop("test", pi1);
608 std::unique_ptr<EventLoop> pi2_event_loop =
609 log_reader_factory.MakeEventLoop("test", pi2);
610
611 pi1_event_loop->MakeWatcher("/original/test",
612 [&pong_count](const examples::Pong &pong) {
613 EXPECT_EQ(pong_count, pong.value());
614 });
615
616 pi2_event_loop->MakeWatcher("/test",
617 [&pong_count](const examples::Pong &pong) {
618 EXPECT_EQ(pong_count, pong.value());
619 });
620
621 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
622 reader.Deregister();
623
624 EXPECT_EQ(pong_count, 2011);
625}
626
627// MultinodeLoggerTest that tests the mutate callback works across multiple
628// nodes
629TEST_P(MultinodeLoggerTest, MultiNodeMutateCallback) {
630 time_converter_.StartEqual();
631 std::vector<std::string> actual_filenames;
632
633 {
634 LoggerState pi1_logger = MakeLogger(pi1_);
635 LoggerState pi2_logger = MakeLogger(pi2_);
636
637 event_loop_factory_.RunFor(chrono::milliseconds(95));
638
639 StartLogger(&pi1_logger);
640 StartLogger(&pi2_logger);
641
642 event_loop_factory_.RunFor(chrono::milliseconds(20000));
643 pi1_logger.AppendAllFilenames(&actual_filenames);
644 pi2_logger.AppendAllFilenames(&actual_filenames);
645 }
646
647 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
648
649 LogReader reader(sorted_parts, &config_.message());
650
651 int pong_count = 0;
652 // Adds a callback which mutates the value of the pong message before the
653 // message is sent which is the feature we are testing here
654 reader.AddBeforeSendCallback("/test",
655 [&pong_count](aos::examples::Pong *pong) {
656 pong->mutate_value(pong->value() + 1);
657 pong_count = pong->value();
658 });
659
660 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
661
662 // This sends out the fetched messages and advances time to the start of the
663 // log file.
664 reader.Register(&log_reader_factory);
665
666 const Node *pi1 =
667 configuration::GetNode(log_reader_factory.configuration(), "pi1");
668 const Node *pi2 =
669 configuration::GetNode(log_reader_factory.configuration(), "pi2");
670
671 EXPECT_THAT(reader.LoggedNodes(),
672 ::testing::ElementsAre(
673 configuration::GetNode(reader.logged_configuration(), pi1),
674 configuration::GetNode(reader.logged_configuration(), pi2)));
675
676 std::unique_ptr<EventLoop> pi1_event_loop =
677 log_reader_factory.MakeEventLoop("test", pi1);
678 std::unique_ptr<EventLoop> pi2_event_loop =
679 log_reader_factory.MakeEventLoop("test", pi2);
680
681 pi1_event_loop->MakeWatcher("/test",
682 [&pong_count](const examples::Pong &pong) {
683 EXPECT_EQ(pong_count, pong.value());
684 });
685
686 pi2_event_loop->MakeWatcher("/test",
687 [&pong_count](const examples::Pong &pong) {
688 EXPECT_EQ(pong_count, pong.value());
689 });
690
691 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
692 reader.Deregister();
693
694 EXPECT_EQ(pong_count, 2011);
695}
696
697// Tests that the before send callback is only called from the sender node if it
698// is forwarded
699TEST_P(MultinodeLoggerTest, OnlyDoBeforeSendCallbackOnSenderNode) {
700 time_converter_.StartEqual();
701 {
702 LoggerState pi1_logger = MakeLogger(pi1_);
703 LoggerState pi2_logger = MakeLogger(pi2_);
704
705 event_loop_factory_.RunFor(chrono::milliseconds(95));
706
707 StartLogger(&pi1_logger);
708 StartLogger(&pi2_logger);
709
710 event_loop_factory_.RunFor(chrono::milliseconds(20000));
711 }
712
713 LogReader reader(SortParts(logfiles_));
714
715 int ping_count = 0;
716 // Adds a callback which mutates the value of the pong message before the
717 // message is sent which is the feature we are testing here
718 reader.AddBeforeSendCallback("/test",
719 [&ping_count](aos::examples::Ping *ping) {
720 ++ping_count;
721 ping->mutate_value(ping_count);
722 });
723
724 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
725 log_reader_factory.set_send_delay(chrono::microseconds(0));
726
727 reader.Register(&log_reader_factory);
728
729 const Node *pi1 =
730 configuration::GetNode(log_reader_factory.configuration(), "pi1");
731 const Node *pi2 =
732 configuration::GetNode(log_reader_factory.configuration(), "pi2");
733
734 std::unique_ptr<EventLoop> pi1_event_loop =
735 log_reader_factory.MakeEventLoop("test", pi1);
736 pi1_event_loop->SkipTimingReport();
737 std::unique_ptr<EventLoop> pi2_event_loop =
738 log_reader_factory.MakeEventLoop("test", pi2);
739 pi2_event_loop->SkipTimingReport();
740
741 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
742 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
743
744 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
745 pi1_ping_timestamp;
746 if (!shared()) {
747 pi1_ping_timestamp =
748 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
749 pi1_event_loop.get(),
750 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
751 }
752
753 log_reader_factory.Run();
754
755 EXPECT_EQ(pi1_ping.count(), 2000u);
756 EXPECT_EQ(pi2_ping.count(), 2000u);
757 // If the BeforeSendCallback is called on both nodes, then the ping count
758 // would be 4002 instead of 2001
759 EXPECT_EQ(ping_count, 2001u);
760 if (!shared()) {
761 EXPECT_EQ(pi1_ping_timestamp->count(), 2000u);
762 }
763
764 reader.Deregister();
765}
766
767// Tests that we do not allow adding callbacks after Register is called
768TEST_P(MultinodeLoggerDeathTest, AddCallbackAfterRegister) {
769 time_converter_.StartEqual();
770 std::vector<std::string> actual_filenames;
771
772 {
773 LoggerState pi1_logger = MakeLogger(pi1_);
774 LoggerState pi2_logger = MakeLogger(pi2_);
775
776 event_loop_factory_.RunFor(chrono::milliseconds(95));
777
778 StartLogger(&pi1_logger);
779 StartLogger(&pi2_logger);
780
781 event_loop_factory_.RunFor(chrono::milliseconds(20000));
782 pi1_logger.AppendAllFilenames(&actual_filenames);
783 pi2_logger.AppendAllFilenames(&actual_filenames);
784 }
785
786 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
787
788 LogReader reader(sorted_parts, &config_.message());
789 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
790 reader.Register(&log_reader_factory);
791 EXPECT_DEATH(
792 {
793 reader.AddBeforeSendCallback("/test", [](aos::examples::Pong *) {
794 LOG(FATAL) << "This should not be called";
795 });
796 },
797 "Cannot add callbacks after calling Register");
798 reader.Deregister();
799}
800
Naman Guptaa63aa132023-03-22 20:06:34 -0700801// Test that if we feed the replay with a mismatched node list that we die on
802// the LogReader constructor.
803TEST_P(MultinodeLoggerDeathTest, MultiNodeBadReplayConfig) {
804 time_converter_.StartEqual();
805 {
806 LoggerState pi1_logger = MakeLogger(pi1_);
807 LoggerState pi2_logger = MakeLogger(pi2_);
808
809 event_loop_factory_.RunFor(chrono::milliseconds(95));
810
811 StartLogger(&pi1_logger);
812 StartLogger(&pi2_logger);
813
814 event_loop_factory_.RunFor(chrono::milliseconds(20000));
815 }
816
817 // Test that, if we add an additional node to the replay config that the
818 // logger complains about the mismatch in number of nodes.
819 FlatbufferDetachedBuffer<Configuration> extra_nodes_config =
820 configuration::MergeWithConfig(&config_.message(), R"({
821 "nodes": [
822 {
823 "name": "extra-node"
824 }
825 ]
826 }
827 )");
828
829 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
830 EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
831 "Log file and replay config need to have matching nodes lists.");
832}
833
834// Tests that we can read log files where they don't start at the same monotonic
835// time.
836TEST_P(MultinodeLoggerTest, StaggeredStart) {
837 time_converter_.StartEqual();
838 std::vector<std::string> actual_filenames;
839
840 {
841 LoggerState pi1_logger = MakeLogger(pi1_);
842 LoggerState pi2_logger = MakeLogger(pi2_);
843
844 event_loop_factory_.RunFor(chrono::milliseconds(95));
845
846 StartLogger(&pi1_logger);
847
848 event_loop_factory_.RunFor(chrono::milliseconds(200));
849
850 StartLogger(&pi2_logger);
851
852 event_loop_factory_.RunFor(chrono::milliseconds(20000));
853 pi1_logger.AppendAllFilenames(&actual_filenames);
854 pi2_logger.AppendAllFilenames(&actual_filenames);
855 }
856
857 // Since we delay starting pi2, it already knows about all the timestamps so
858 // we don't end up with extra parts.
859 LogReader reader(SortParts(actual_filenames));
860
861 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
862 log_reader_factory.set_send_delay(chrono::microseconds(0));
863
864 // This sends out the fetched messages and advances time to the start of the
865 // log file.
866 reader.Register(&log_reader_factory);
867
868 const Node *pi1 =
869 configuration::GetNode(log_reader_factory.configuration(), "pi1");
870 const Node *pi2 =
871 configuration::GetNode(log_reader_factory.configuration(), "pi2");
872
873 EXPECT_THAT(reader.LoggedNodes(),
874 ::testing::ElementsAre(
875 configuration::GetNode(reader.logged_configuration(), pi1),
876 configuration::GetNode(reader.logged_configuration(), pi2)));
877
878 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
879
880 std::unique_ptr<EventLoop> pi1_event_loop =
881 log_reader_factory.MakeEventLoop("test", pi1);
882 std::unique_ptr<EventLoop> pi2_event_loop =
883 log_reader_factory.MakeEventLoop("test", pi2);
884
885 int pi1_ping_count = 30;
886 int pi2_ping_count = 30;
887 int pi1_pong_count = 30;
888 int pi2_pong_count = 30;
889
890 // Confirm that the ping value matches.
891 pi1_event_loop->MakeWatcher(
892 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
893 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
894 << pi1_event_loop->context().monotonic_remote_time << " -> "
895 << pi1_event_loop->context().monotonic_event_time;
896 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
897
898 ++pi1_ping_count;
899 });
900 pi2_event_loop->MakeWatcher(
901 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
902 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
903 << pi2_event_loop->context().monotonic_remote_time << " -> "
904 << pi2_event_loop->context().monotonic_event_time;
905 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
906
907 ++pi2_ping_count;
908 });
909
910 // Confirm that the ping and pong counts both match, and the value also
911 // matches.
912 pi1_event_loop->MakeWatcher(
913 "/test", [&pi1_event_loop, &pi1_ping_count,
914 &pi1_pong_count](const examples::Pong &pong) {
915 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
916 << pi1_event_loop->context().monotonic_remote_time << " -> "
917 << pi1_event_loop->context().monotonic_event_time;
918
919 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
920 ++pi1_pong_count;
921 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
922 });
923 pi2_event_loop->MakeWatcher(
924 "/test", [&pi2_event_loop, &pi2_ping_count,
925 &pi2_pong_count](const examples::Pong &pong) {
926 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
927 << pi2_event_loop->context().monotonic_remote_time << " -> "
928 << pi2_event_loop->context().monotonic_event_time;
929
930 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
931 ++pi2_pong_count;
932 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
933 });
934
935 log_reader_factory.Run();
936 EXPECT_EQ(pi1_ping_count, 2030);
937 EXPECT_EQ(pi2_ping_count, 2030);
938 EXPECT_EQ(pi1_pong_count, 2030);
939 EXPECT_EQ(pi2_pong_count, 2030);
940
941 reader.Deregister();
942}
943
944// Tests that we can read log files where the monotonic clocks drift and don't
945// match correctly. While we are here, also test that different ending times
946// also is readable.
947TEST_P(MultinodeLoggerTest, MismatchedClocks) {
948 // TODO(austin): Negate...
949 const chrono::nanoseconds initial_pi2_offset = chrono::seconds(1000);
950
951 time_converter_.AddMonotonic(
952 {BootTimestamp::epoch(), BootTimestamp::epoch() + initial_pi2_offset});
953 // Wait for 95 ms, (~0.1 seconds - 1/2 of the ping/pong period), and set the
954 // skew to be 200 uS/s
955 const chrono::nanoseconds startup_sleep1 = time_converter_.AddMonotonic(
956 {chrono::milliseconds(95),
957 chrono::milliseconds(95) - chrono::nanoseconds(200) * 95});
958 // Run another 200 ms to have one logger start first.
959 const chrono::nanoseconds startup_sleep2 = time_converter_.AddMonotonic(
960 {chrono::milliseconds(200), chrono::milliseconds(200)});
961 // Slew one way then the other at the same 200 uS/S slew rate. Make sure we
962 // go far enough to cause problems if this isn't accounted for.
963 const chrono::nanoseconds logger_run1 = time_converter_.AddMonotonic(
964 {chrono::milliseconds(20000),
965 chrono::milliseconds(20000) - chrono::nanoseconds(200) * 20000});
966 const chrono::nanoseconds logger_run2 = time_converter_.AddMonotonic(
967 {chrono::milliseconds(40000),
968 chrono::milliseconds(40000) + chrono::nanoseconds(200) * 40000});
969 const chrono::nanoseconds logger_run3 = time_converter_.AddMonotonic(
970 {chrono::milliseconds(400), chrono::milliseconds(400)});
971
972 {
973 LoggerState pi2_logger = MakeLogger(pi2_);
974
975 LOG(INFO) << "pi2 times: " << pi2_->monotonic_now() << " "
976 << pi2_->realtime_now() << " distributed "
977 << pi2_->ToDistributedClock(pi2_->monotonic_now());
978
979 LOG(INFO) << "pi2_ times: " << pi2_->monotonic_now() << " "
980 << pi2_->realtime_now() << " distributed "
981 << pi2_->ToDistributedClock(pi2_->monotonic_now());
982
983 event_loop_factory_.RunFor(startup_sleep1);
984
985 StartLogger(&pi2_logger);
986
987 event_loop_factory_.RunFor(startup_sleep2);
988
989 {
990 // Run pi1's logger for only part of the time.
991 LoggerState pi1_logger = MakeLogger(pi1_);
992
993 StartLogger(&pi1_logger);
994 event_loop_factory_.RunFor(logger_run1);
995
996 // Make sure we slewed time far enough so that the difference is greater
997 // than the network delay. This confirms that if we sort incorrectly, it
998 // would show in the results.
999 EXPECT_LT(
1000 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
1001 -event_loop_factory_.send_delay() -
1002 event_loop_factory_.network_delay());
1003
1004 event_loop_factory_.RunFor(logger_run2);
1005
1006 // And now check that we went far enough the other way to make sure we
1007 // cover both problems.
1008 EXPECT_GT(
1009 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
1010 event_loop_factory_.send_delay() +
1011 event_loop_factory_.network_delay());
1012 }
1013
1014 // And log a bit more on pi2.
1015 event_loop_factory_.RunFor(logger_run3);
1016 }
1017
1018 LogReader reader(
James Kuszmaul298b4a22023-06-28 20:01:03 -07001019 SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 3, 3)));
Naman Guptaa63aa132023-03-22 20:06:34 -07001020
1021 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1022 log_reader_factory.set_send_delay(chrono::microseconds(0));
1023
1024 const Node *pi1 =
1025 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1026 const Node *pi2 =
1027 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1028
1029 // This sends out the fetched messages and advances time to the start of the
1030 // log file.
1031 reader.Register(&log_reader_factory);
1032
1033 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1034 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1035 LOG(INFO) << "now pi1 "
1036 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1037 LOG(INFO) << "now pi2 "
1038 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1039
1040 LOG(INFO) << "Done registering (pi1) "
1041 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
1042 << " "
1043 << log_reader_factory.GetNodeEventLoopFactory(pi1)->realtime_now();
1044 LOG(INFO) << "Done registering (pi2) "
1045 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now()
1046 << " "
1047 << log_reader_factory.GetNodeEventLoopFactory(pi2)->realtime_now();
1048
1049 EXPECT_THAT(reader.LoggedNodes(),
1050 ::testing::ElementsAre(
1051 configuration::GetNode(reader.logged_configuration(), pi1),
1052 configuration::GetNode(reader.logged_configuration(), pi2)));
1053
1054 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1055
1056 std::unique_ptr<EventLoop> pi1_event_loop =
1057 log_reader_factory.MakeEventLoop("test", pi1);
1058 std::unique_ptr<EventLoop> pi2_event_loop =
1059 log_reader_factory.MakeEventLoop("test", pi2);
1060
1061 int pi1_ping_count = 30;
1062 int pi2_ping_count = 30;
1063 int pi1_pong_count = 30;
1064 int pi2_pong_count = 30;
1065
1066 // Confirm that the ping value matches.
1067 pi1_event_loop->MakeWatcher(
1068 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
1069 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
1070 << pi1_event_loop->context().monotonic_remote_time << " -> "
1071 << pi1_event_loop->context().monotonic_event_time;
1072 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
1073
1074 ++pi1_ping_count;
1075 });
1076 pi2_event_loop->MakeWatcher(
1077 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
1078 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
1079 << pi2_event_loop->context().monotonic_remote_time << " -> "
1080 << pi2_event_loop->context().monotonic_event_time;
1081 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
1082
1083 ++pi2_ping_count;
1084 });
1085
1086 // Confirm that the ping and pong counts both match, and the value also
1087 // matches.
1088 pi1_event_loop->MakeWatcher(
1089 "/test", [&pi1_event_loop, &pi1_ping_count,
1090 &pi1_pong_count](const examples::Pong &pong) {
1091 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
1092 << pi1_event_loop->context().monotonic_remote_time << " -> "
1093 << pi1_event_loop->context().monotonic_event_time;
1094
1095 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
1096 ++pi1_pong_count;
1097 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
1098 });
1099 pi2_event_loop->MakeWatcher(
1100 "/test", [&pi2_event_loop, &pi2_ping_count,
1101 &pi2_pong_count](const examples::Pong &pong) {
1102 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
1103 << pi2_event_loop->context().monotonic_remote_time << " -> "
1104 << pi2_event_loop->context().monotonic_event_time;
1105
1106 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
1107 ++pi2_pong_count;
1108 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
1109 });
1110
1111 log_reader_factory.Run();
1112 EXPECT_EQ(pi1_ping_count, 6030);
1113 EXPECT_EQ(pi2_ping_count, 6030);
1114 EXPECT_EQ(pi1_pong_count, 6030);
1115 EXPECT_EQ(pi2_pong_count, 6030);
1116
1117 reader.Deregister();
1118}
1119
1120// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
1121TEST_P(MultinodeLoggerTest, SortParts) {
1122 time_converter_.StartEqual();
1123 // Make a bunch of parts.
1124 {
1125 LoggerState pi1_logger = MakeLogger(pi1_);
1126 LoggerState pi2_logger = MakeLogger(pi2_);
1127
1128 event_loop_factory_.RunFor(chrono::milliseconds(95));
1129
1130 StartLogger(&pi1_logger);
1131 StartLogger(&pi2_logger);
1132
1133 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1134 }
1135
1136 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1137 VerifyParts(sorted_parts);
1138}
1139
1140// Tests that we can sort a bunch of parts with an empty part. We should ignore
1141// it and remove it from the sorted list.
1142TEST_P(MultinodeLoggerTest, SortEmptyParts) {
1143 time_converter_.StartEqual();
1144 // Make a bunch of parts.
1145 {
1146 LoggerState pi1_logger = MakeLogger(pi1_);
1147 LoggerState pi2_logger = MakeLogger(pi2_);
1148
1149 event_loop_factory_.RunFor(chrono::milliseconds(95));
1150
1151 StartLogger(&pi1_logger);
1152 StartLogger(&pi2_logger);
1153
1154 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1155 }
1156
1157 // TODO(austin): Should we flip out if the file can't open?
1158 const std::string kEmptyFile("foobarinvalidfiledoesnotexist" + Extension());
1159
1160 aos::util::WriteStringToFileOrDie(kEmptyFile, "");
1161 logfiles_.emplace_back(kEmptyFile);
1162
1163 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1164 VerifyParts(sorted_parts, {kEmptyFile});
1165}
1166
1167// Tests that we can sort a bunch of parts with the end missing off a
1168// file. We should use the part we can read.
1169TEST_P(MultinodeLoggerTest, SortTruncatedParts) {
1170 std::vector<std::string> actual_filenames;
1171 time_converter_.StartEqual();
1172 // Make a bunch of parts.
1173 {
1174 LoggerState pi1_logger = MakeLogger(pi1_);
1175 LoggerState pi2_logger = MakeLogger(pi2_);
1176
1177 event_loop_factory_.RunFor(chrono::milliseconds(95));
1178
1179 StartLogger(&pi1_logger);
1180 StartLogger(&pi2_logger);
1181
1182 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1183
1184 pi1_logger.AppendAllFilenames(&actual_filenames);
1185 pi2_logger.AppendAllFilenames(&actual_filenames);
1186 }
1187
1188 ASSERT_THAT(actual_filenames,
1189 ::testing::UnorderedElementsAreArray(logfiles_));
1190
1191 // Strip off the end of one of the files. Pick one with a lot of data.
1192 // For snappy, needs to have enough data to be >1 chunk of compressed data so
1193 // that we don't corrupt the entire log part.
1194 ::std::string compressed_contents =
1195 aos::util::ReadFileToStringOrDie(logfiles_[4]);
1196
1197 aos::util::WriteStringToFileOrDie(
1198 logfiles_[4],
1199 compressed_contents.substr(0, compressed_contents.size() - 100));
1200
1201 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1202 VerifyParts(sorted_parts);
1203}
1204
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001205// Tests that if we remap a logged channel, it shows up correctly.
Naman Guptaa63aa132023-03-22 20:06:34 -07001206TEST_P(MultinodeLoggerTest, RemapLoggedChannel) {
1207 time_converter_.StartEqual();
1208 {
1209 LoggerState pi1_logger = MakeLogger(pi1_);
1210 LoggerState pi2_logger = MakeLogger(pi2_);
1211
1212 event_loop_factory_.RunFor(chrono::milliseconds(95));
1213
1214 StartLogger(&pi1_logger);
1215 StartLogger(&pi2_logger);
1216
1217 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1218 }
1219
1220 LogReader reader(SortParts(logfiles_));
1221
1222 // Remap just on pi1.
1223 reader.RemapLoggedChannel<aos::timing::Report>(
1224 "/aos", configuration::GetNode(reader.configuration(), "pi1"));
1225
1226 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1227 log_reader_factory.set_send_delay(chrono::microseconds(0));
1228
1229 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1230 // Note: An extra channel gets remapped automatically due to a timestamp
1231 // channel being LOCAL_LOGGER'd.
1232 ASSERT_EQ(remapped_channels.size(), std::get<0>(GetParam()).shared ? 1u : 2u);
1233 EXPECT_EQ(remapped_channels[0]->name()->string_view(), "/original/pi1/aos");
1234 EXPECT_EQ(remapped_channels[0]->type()->string_view(), "aos.timing.Report");
1235 if (!std::get<0>(GetParam()).shared) {
1236 EXPECT_EQ(remapped_channels[1]->name()->string_view(),
1237 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1238 "aos-message_bridge-Timestamp");
1239 EXPECT_EQ(remapped_channels[1]->type()->string_view(),
1240 "aos.message_bridge.RemoteMessage");
1241 }
1242
1243 reader.Register(&log_reader_factory);
1244
1245 const Node *pi1 =
1246 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1247 const Node *pi2 =
1248 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1249
1250 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1251 // else should have moved.
1252 std::unique_ptr<EventLoop> pi1_event_loop =
1253 log_reader_factory.MakeEventLoop("test", pi1);
1254 pi1_event_loop->SkipTimingReport();
1255 std::unique_ptr<EventLoop> full_pi1_event_loop =
1256 log_reader_factory.MakeEventLoop("test", pi1);
1257 full_pi1_event_loop->SkipTimingReport();
1258 std::unique_ptr<EventLoop> pi2_event_loop =
1259 log_reader_factory.MakeEventLoop("test", pi2);
1260 pi2_event_loop->SkipTimingReport();
1261
1262 MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
1263 "/aos");
1264 MessageCounter<aos::timing::Report> full_pi1_timing_report(
1265 full_pi1_event_loop.get(), "/pi1/aos");
1266 MessageCounter<aos::timing::Report> pi1_original_timing_report(
1267 pi1_event_loop.get(), "/original/aos");
1268 MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
1269 full_pi1_event_loop.get(), "/original/pi1/aos");
1270 MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
1271 "/aos");
1272
1273 log_reader_factory.Run();
1274
1275 EXPECT_EQ(pi1_timing_report.count(), 0u);
1276 EXPECT_EQ(full_pi1_timing_report.count(), 0u);
1277 EXPECT_NE(pi1_original_timing_report.count(), 0u);
1278 EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
1279 EXPECT_NE(pi2_timing_report.count(), 0u);
1280
1281 reader.Deregister();
1282}
1283
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001284// Tests that if we rename a logged channel, it shows up correctly.
1285TEST_P(MultinodeLoggerTest, RenameLoggedChannel) {
1286 std::vector<std::string> actual_filenames;
1287 time_converter_.StartEqual();
1288 {
1289 LoggerState pi1_logger = MakeLogger(pi1_);
1290 LoggerState pi2_logger = MakeLogger(pi2_);
1291
1292 event_loop_factory_.RunFor(chrono::milliseconds(95));
1293
1294 StartLogger(&pi1_logger);
1295 StartLogger(&pi2_logger);
1296
1297 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1298
1299 pi1_logger.AppendAllFilenames(&actual_filenames);
1300 pi2_logger.AppendAllFilenames(&actual_filenames);
1301 }
1302
1303 LogReader reader(SortParts(actual_filenames));
1304
1305 // Rename just on pi2. Add some global maps just to verify they get added in
1306 // the config and used correctly.
1307 std::vector<MapT> maps;
1308 {
1309 MapT map;
1310 map.match = std::make_unique<ChannelT>();
1311 map.match->name = "/foo*";
1312 map.match->source_node = "pi1";
1313 map.rename = std::make_unique<ChannelT>();
1314 map.rename->name = "/pi1/foo";
1315 maps.emplace_back(std::move(map));
1316 }
1317 {
1318 MapT map;
1319 map.match = std::make_unique<ChannelT>();
1320 map.match->name = "/foo*";
1321 map.match->source_node = "pi2";
1322 map.rename = std::make_unique<ChannelT>();
1323 map.rename->name = "/pi2/foo";
1324 maps.emplace_back(std::move(map));
1325 }
1326 {
1327 MapT map;
1328 map.match = std::make_unique<ChannelT>();
1329 map.match->name = "/foo";
1330 map.match->type = "aos.examples.Ping";
1331 map.rename = std::make_unique<ChannelT>();
1332 map.rename->name = "/foo/renamed";
1333 maps.emplace_back(std::move(map));
1334 }
1335 reader.RenameLoggedChannel<aos::examples::Ping>(
1336 "/aos", configuration::GetNode(reader.configuration(), "pi2"),
1337 "/pi2/foo/renamed", maps);
1338
1339 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1340 log_reader_factory.set_send_delay(chrono::microseconds(0));
1341
1342 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1343 // Note: An extra channel gets remapped automatically due to a timestamp
1344 // channel being LOCAL_LOGGER'd.
1345 const bool shared = std::get<0>(GetParam()).shared;
1346 ASSERT_EQ(remapped_channels.size(), shared ? 1u : 2u);
1347 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->name()->string_view(),
1348 "/pi2/foo/renamed");
1349 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->type()->string_view(),
1350 "aos.examples.Ping");
1351 if (!shared) {
1352 EXPECT_EQ(remapped_channels[0]->name()->string_view(),
1353 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1354 "aos-message_bridge-Timestamp");
1355 EXPECT_EQ(remapped_channels[0]->type()->string_view(),
1356 "aos.message_bridge.RemoteMessage");
1357 }
1358
1359 reader.Register(&log_reader_factory);
1360
1361 const Node *pi1 =
1362 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1363 const Node *pi2 =
1364 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1365
1366 // Confirm we can read the data on the renamed channel, just for pi2. Nothing
1367 // else should have moved.
1368 std::unique_ptr<EventLoop> pi2_event_loop =
1369 log_reader_factory.MakeEventLoop("test", pi2);
1370 pi2_event_loop->SkipTimingReport();
1371 std::unique_ptr<EventLoop> full_pi2_event_loop =
1372 log_reader_factory.MakeEventLoop("test", pi2);
1373 full_pi2_event_loop->SkipTimingReport();
1374 std::unique_ptr<EventLoop> pi1_event_loop =
1375 log_reader_factory.MakeEventLoop("test", pi1);
1376 pi1_event_loop->SkipTimingReport();
1377
1378 MessageCounter<aos::examples::Ping> pi2_ping(pi2_event_loop.get(), "/aos");
1379 MessageCounter<aos::examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1380 "/foo");
1381 MessageCounter<aos::examples::Ping> full_pi2_renamed_ping(
1382 full_pi2_event_loop.get(), "/pi2/foo/renamed");
1383 MessageCounter<aos::examples::Ping> pi1_ping(pi1_event_loop.get(), "/aos");
1384
1385 log_reader_factory.Run();
1386
1387 EXPECT_EQ(pi2_ping.count(), 0u);
1388 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1389 EXPECT_NE(full_pi2_renamed_ping.count(), 0u);
1390 EXPECT_NE(pi1_ping.count(), 0u);
1391
1392 reader.Deregister();
1393}
1394
Naman Guptaa63aa132023-03-22 20:06:34 -07001395// Tests that we can remap a forwarded channel as well.
1396TEST_P(MultinodeLoggerTest, RemapForwardedLoggedChannel) {
1397 time_converter_.StartEqual();
1398 {
1399 LoggerState pi1_logger = MakeLogger(pi1_);
1400 LoggerState pi2_logger = MakeLogger(pi2_);
1401
1402 event_loop_factory_.RunFor(chrono::milliseconds(95));
1403
1404 StartLogger(&pi1_logger);
1405 StartLogger(&pi2_logger);
1406
1407 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1408 }
1409
1410 LogReader reader(SortParts(logfiles_));
1411
1412 reader.RemapLoggedChannel<examples::Ping>("/test");
1413
1414 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1415 log_reader_factory.set_send_delay(chrono::microseconds(0));
1416
1417 reader.Register(&log_reader_factory);
1418
1419 const Node *pi1 =
1420 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1421 const Node *pi2 =
1422 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1423
1424 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1425 // else should have moved.
1426 std::unique_ptr<EventLoop> pi1_event_loop =
1427 log_reader_factory.MakeEventLoop("test", pi1);
1428 pi1_event_loop->SkipTimingReport();
1429 std::unique_ptr<EventLoop> full_pi1_event_loop =
1430 log_reader_factory.MakeEventLoop("test", pi1);
1431 full_pi1_event_loop->SkipTimingReport();
1432 std::unique_ptr<EventLoop> pi2_event_loop =
1433 log_reader_factory.MakeEventLoop("test", pi2);
1434 pi2_event_loop->SkipTimingReport();
1435
1436 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1437 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1438 MessageCounter<examples::Ping> pi1_original_ping(pi1_event_loop.get(),
1439 "/original/test");
1440 MessageCounter<examples::Ping> pi2_original_ping(pi2_event_loop.get(),
1441 "/original/test");
1442
1443 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1444 pi1_original_ping_timestamp;
1445 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1446 pi1_ping_timestamp;
1447 if (!shared()) {
1448 pi1_original_ping_timestamp =
1449 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1450 pi1_event_loop.get(),
1451 "/pi1/aos/remote_timestamps/pi2/original/test/aos-examples-Ping");
1452 pi1_ping_timestamp =
1453 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1454 pi1_event_loop.get(),
1455 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1456 }
1457
1458 log_reader_factory.Run();
1459
1460 EXPECT_EQ(pi1_ping.count(), 0u);
1461 EXPECT_EQ(pi2_ping.count(), 0u);
1462 EXPECT_NE(pi1_original_ping.count(), 0u);
1463 EXPECT_NE(pi2_original_ping.count(), 0u);
1464 if (!shared()) {
1465 EXPECT_NE(pi1_original_ping_timestamp->count(), 0u);
1466 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1467 }
1468
1469 reader.Deregister();
1470}
1471
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001472// Tests that we can rename a forwarded channel as well.
1473TEST_P(MultinodeLoggerTest, RenameForwardedLoggedChannel) {
1474 std::vector<std::string> actual_filenames;
1475 time_converter_.StartEqual();
1476 {
1477 LoggerState pi1_logger = MakeLogger(pi1_);
1478 LoggerState pi2_logger = MakeLogger(pi2_);
1479
1480 event_loop_factory_.RunFor(chrono::milliseconds(95));
1481
1482 StartLogger(&pi1_logger);
1483 StartLogger(&pi2_logger);
1484
1485 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1486
1487 pi1_logger.AppendAllFilenames(&actual_filenames);
1488 pi2_logger.AppendAllFilenames(&actual_filenames);
1489 }
1490
1491 LogReader reader(SortParts(actual_filenames));
1492
1493 std::vector<MapT> maps;
1494 {
1495 MapT map;
1496 map.match = std::make_unique<ChannelT>();
1497 map.match->name = "/production*";
1498 map.match->source_node = "pi1";
1499 map.rename = std::make_unique<ChannelT>();
1500 map.rename->name = "/pi1/production";
1501 maps.emplace_back(std::move(map));
1502 }
1503 {
1504 MapT map;
1505 map.match = std::make_unique<ChannelT>();
1506 map.match->name = "/production*";
1507 map.match->source_node = "pi2";
1508 map.rename = std::make_unique<ChannelT>();
1509 map.rename->name = "/pi2/production";
1510 maps.emplace_back(std::move(map));
1511 }
1512 reader.RenameLoggedChannel<aos::examples::Ping>(
1513 "/test", configuration::GetNode(reader.configuration(), "pi1"),
1514 "/pi1/production", maps);
1515
1516 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1517 log_reader_factory.set_send_delay(chrono::microseconds(0));
1518
1519 reader.Register(&log_reader_factory);
1520
1521 const Node *pi1 =
1522 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1523 const Node *pi2 =
1524 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1525
1526 // Confirm we can read the data on the renamed channel, on both the source
1527 // node and the remote node. In case of split timestamp channels, confirm that
1528 // we receive the timestamp messages on the renamed channel as well.
1529 std::unique_ptr<EventLoop> pi1_event_loop =
1530 log_reader_factory.MakeEventLoop("test", pi1);
1531 pi1_event_loop->SkipTimingReport();
1532 std::unique_ptr<EventLoop> full_pi1_event_loop =
1533 log_reader_factory.MakeEventLoop("test", pi1);
1534 full_pi1_event_loop->SkipTimingReport();
1535 std::unique_ptr<EventLoop> pi2_event_loop =
1536 log_reader_factory.MakeEventLoop("test", pi2);
1537 pi2_event_loop->SkipTimingReport();
1538
1539 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1540 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1541 MessageCounter<examples::Ping> pi1_renamed_ping(pi1_event_loop.get(),
1542 "/pi1/production");
1543 MessageCounter<examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1544 "/pi1/production");
1545
1546 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1547 pi1_renamed_ping_timestamp;
1548 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1549 pi1_ping_timestamp;
1550 if (!shared()) {
1551 pi1_renamed_ping_timestamp =
1552 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1553 pi1_event_loop.get(),
1554 "/pi1/aos/remote_timestamps/pi2/pi1/production/aos-examples-Ping");
1555 pi1_ping_timestamp =
1556 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1557 pi1_event_loop.get(),
1558 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1559 }
1560
1561 log_reader_factory.Run();
1562
1563 EXPECT_EQ(pi1_ping.count(), 0u);
1564 EXPECT_EQ(pi2_ping.count(), 0u);
1565 EXPECT_NE(pi1_renamed_ping.count(), 0u);
1566 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1567 if (!shared()) {
1568 EXPECT_NE(pi1_renamed_ping_timestamp->count(), 0u);
1569 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1570 }
1571
1572 reader.Deregister();
1573}
1574
Naman Guptaa63aa132023-03-22 20:06:34 -07001575// Tests that we observe all the same events in log replay (for a given node)
1576// whether we just register an event loop for that node or if we register a full
1577// event loop factory.
1578TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
1579 time_converter_.StartEqual();
1580 constexpr chrono::milliseconds kStartupDelay(95);
1581 {
1582 LoggerState pi1_logger = MakeLogger(pi1_);
1583 LoggerState pi2_logger = MakeLogger(pi2_);
1584
1585 event_loop_factory_.RunFor(kStartupDelay);
1586
1587 StartLogger(&pi1_logger);
1588 StartLogger(&pi2_logger);
1589
1590 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1591 }
1592
1593 LogReader full_reader(SortParts(logfiles_));
1594 LogReader single_node_reader(SortParts(logfiles_));
1595
1596 SimulatedEventLoopFactory full_factory(full_reader.configuration());
1597 SimulatedEventLoopFactory single_node_factory(
1598 single_node_reader.configuration());
1599 single_node_factory.SkipTimingReport();
1600 single_node_factory.DisableStatistics();
1601 std::unique_ptr<EventLoop> replay_event_loop =
1602 single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
1603 "log_reader");
1604
1605 full_reader.Register(&full_factory);
1606 single_node_reader.Register(replay_event_loop.get());
1607
1608 const Node *full_pi1 =
1609 configuration::GetNode(full_factory.configuration(), "pi1");
1610
1611 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1612 // else should have moved.
1613 std::unique_ptr<EventLoop> full_event_loop =
1614 full_factory.MakeEventLoop("test", full_pi1);
1615 full_event_loop->SkipTimingReport();
1616 full_event_loop->SkipAosLog();
1617 // maps are indexed on channel index.
1618 // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
1619 std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
1620 observed_messages;
1621 std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
1622 for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
1623 ++ii) {
1624 const Channel *channel =
1625 full_event_loop->configuration()->channels()->Get(ii);
1626 // We currently don't support replaying remote timestamp channels in
1627 // realtime replay (unless the remote timestamp channel was not NOT_LOGGED,
1628 // in which case it gets auto-remapped and replayed on a /original channel).
1629 if (channel->name()->string_view().find("remote_timestamp") !=
1630 std::string_view::npos &&
1631 channel->name()->string_view().find("/original") ==
1632 std::string_view::npos) {
1633 continue;
1634 }
1635 if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
1636 observed_messages[ii] = {};
1637 fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
1638 full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
1639 if (fetchers[ii]->Fetch()) {
1640 observed_messages[ii].push_back(std::make_pair(
1641 fetchers[ii]->context().monotonic_event_time, true));
1642 }
1643 });
1644 full_event_loop->MakeRawNoArgWatcher(
1645 channel, [ii, &observed_messages](const Context &context) {
1646 observed_messages[ii].push_back(
1647 std::make_pair(context.monotonic_event_time, false));
1648 });
1649 }
1650 }
1651
1652 full_factory.Run();
1653 fetchers.clear();
1654 full_reader.Deregister();
1655
1656 const Node *single_node_pi1 =
1657 configuration::GetNode(single_node_factory.configuration(), "pi1");
1658 std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
1659
1660 std::unique_ptr<EventLoop> single_node_event_loop =
1661 single_node_factory.MakeEventLoop("test", single_node_pi1);
1662 single_node_event_loop->SkipTimingReport();
1663 single_node_event_loop->SkipAosLog();
1664 for (size_t ii = 0;
1665 ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
1666 const Channel *channel =
1667 single_node_event_loop->configuration()->channels()->Get(ii);
1668 single_node_factory.DisableForwarding(channel);
1669 if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
1670 single_node_fetchers[ii] =
1671 single_node_event_loop->MakeRawFetcher(channel);
1672 single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
1673 EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
1674 << "Single EventLoop replay doesn't support pre-loading fetchers. "
1675 << configuration::StrippedChannelToString(channel);
1676 });
1677 single_node_event_loop->MakeRawNoArgWatcher(
1678 channel, [ii, &observed_messages, channel,
1679 kStartupDelay](const Context &context) {
1680 if (observed_messages[ii].empty()) {
1681 FAIL() << "Observed extra message at "
1682 << context.monotonic_event_time << " on "
1683 << configuration::StrippedChannelToString(channel);
1684 return;
1685 }
1686 const std::pair<monotonic_clock::time_point, bool> &message =
1687 observed_messages[ii].front();
1688 if (message.second) {
1689 EXPECT_LE(message.first,
1690 context.monotonic_event_time + kStartupDelay)
1691 << "Mismatched message times " << context.monotonic_event_time
1692 << " and " << message.first << " on "
1693 << configuration::StrippedChannelToString(channel);
1694 } else {
1695 EXPECT_EQ(message.first,
1696 context.monotonic_event_time + kStartupDelay)
1697 << "Mismatched message times " << context.monotonic_event_time
1698 << " and " << message.first << " on "
1699 << configuration::StrippedChannelToString(channel);
1700 }
1701 observed_messages[ii].erase(observed_messages[ii].begin());
1702 });
1703 }
1704 }
1705
1706 single_node_factory.Run();
1707
1708 single_node_fetchers.clear();
1709
1710 single_node_reader.Deregister();
1711
1712 for (const auto &pair : observed_messages) {
1713 EXPECT_TRUE(pair.second.empty())
1714 << "Missed " << pair.second.size() << " messages on "
1715 << configuration::StrippedChannelToString(
1716 single_node_event_loop->configuration()->channels()->Get(
1717 pair.first));
1718 }
1719}
1720
1721// Tests that we properly recreate forwarded timestamps when replaying a log.
1722// This should be enough that we can then re-run the logger and get a valid log
1723// back.
1724TEST_P(MultinodeLoggerTest, MessageHeader) {
1725 time_converter_.StartEqual();
1726 {
1727 LoggerState pi1_logger = MakeLogger(pi1_);
1728 LoggerState pi2_logger = MakeLogger(pi2_);
1729
1730 event_loop_factory_.RunFor(chrono::milliseconds(95));
1731
1732 StartLogger(&pi1_logger);
1733 StartLogger(&pi2_logger);
1734
1735 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1736 }
1737
1738 LogReader reader(SortParts(logfiles_));
1739
1740 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1741 log_reader_factory.set_send_delay(chrono::microseconds(0));
1742
1743 // This sends out the fetched messages and advances time to the start of the
1744 // log file.
1745 reader.Register(&log_reader_factory);
1746
1747 const Node *pi1 =
1748 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1749 const Node *pi2 =
1750 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1751
1752 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1753 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1754 LOG(INFO) << "now pi1 "
1755 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1756 LOG(INFO) << "now pi2 "
1757 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1758
1759 EXPECT_THAT(reader.LoggedNodes(),
1760 ::testing::ElementsAre(
1761 configuration::GetNode(reader.logged_configuration(), pi1),
1762 configuration::GetNode(reader.logged_configuration(), pi2)));
1763
1764 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1765
1766 std::unique_ptr<EventLoop> pi1_event_loop =
1767 log_reader_factory.MakeEventLoop("test", pi1);
1768 std::unique_ptr<EventLoop> pi2_event_loop =
1769 log_reader_factory.MakeEventLoop("test", pi2);
1770
1771 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
1772 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1773 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
1774 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1775
1776 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1777 pi1_event_loop->MakeFetcher<examples::Ping>("/test");
1778 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1779 pi2_event_loop->MakeFetcher<examples::Ping>("/test");
1780
1781 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
1782 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1783 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
1784 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1785
1786 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1787 pi2_event_loop->MakeFetcher<examples::Pong>("/test");
1788 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1789 pi1_event_loop->MakeFetcher<examples::Pong>("/test");
1790
1791 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
1792 pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
1793 const size_t ping_timestamp_channel = configuration::ChannelIndex(
1794 pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
1795
1796 const size_t pi2_timestamp_channel = configuration::ChannelIndex(
1797 pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
1798 const size_t pong_timestamp_channel = configuration::ChannelIndex(
1799 pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
1800
1801 const chrono::nanoseconds network_delay = event_loop_factory_.network_delay();
1802 const chrono::nanoseconds send_delay = event_loop_factory_.send_delay();
1803
1804 for (std::pair<int, std::string> channel :
1805 shared()
1806 ? std::vector<
1807 std::pair<int, std::string>>{{-1,
1808 "/aos/remote_timestamps/pi2"}}
1809 : std::vector<std::pair<int, std::string>>{
1810 {pi1_timestamp_channel,
1811 "/aos/remote_timestamps/pi2/pi1/aos/"
1812 "aos-message_bridge-Timestamp"},
1813 {ping_timestamp_channel,
1814 "/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
1815 pi1_event_loop->MakeWatcher(
1816 channel.second,
1817 [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
1818 ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
1819 &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
1820 &ping_on_pi2_fetcher, network_delay, send_delay,
1821 channel_index = channel.first](const RemoteMessage &header) {
1822 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1823 chrono::nanoseconds(header.monotonic_sent_time()));
1824 const aos::realtime_clock::time_point header_realtime_sent_time(
1825 chrono::nanoseconds(header.realtime_sent_time()));
1826 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1827 chrono::nanoseconds(header.monotonic_remote_time()));
1828 const aos::realtime_clock::time_point header_realtime_remote_time(
1829 chrono::nanoseconds(header.realtime_remote_time()));
1830
1831 if (channel_index != -1) {
1832 ASSERT_EQ(channel_index, header.channel_index());
1833 }
1834
1835 const Context *pi1_context = nullptr;
1836 const Context *pi2_context = nullptr;
1837
1838 if (header.channel_index() == pi1_timestamp_channel) {
1839 ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
1840 ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
1841 pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
1842 pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
1843 } else if (header.channel_index() == ping_timestamp_channel) {
1844 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
1845 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
1846 pi1_context = &ping_on_pi1_fetcher.context();
1847 pi2_context = &ping_on_pi2_fetcher.context();
1848 } else {
1849 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1850 << configuration::CleanedChannelToString(
1851 pi1_event_loop->configuration()->channels()->Get(
1852 header.channel_index()));
1853 }
1854
1855 ASSERT_TRUE(header.has_boot_uuid());
1856 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1857 pi2_event_loop->boot_uuid());
1858
1859 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
1860 EXPECT_EQ(pi2_context->remote_queue_index,
1861 header.remote_queue_index());
1862 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
1863
1864 EXPECT_EQ(pi2_context->monotonic_event_time,
1865 header_monotonic_sent_time);
1866 EXPECT_EQ(pi2_context->realtime_event_time,
1867 header_realtime_sent_time);
1868 EXPECT_EQ(pi2_context->realtime_remote_time,
1869 header_realtime_remote_time);
1870 EXPECT_EQ(pi2_context->monotonic_remote_time,
1871 header_monotonic_remote_time);
1872
1873 EXPECT_EQ(pi1_context->realtime_event_time,
1874 header_realtime_remote_time);
1875 EXPECT_EQ(pi1_context->monotonic_event_time,
1876 header_monotonic_remote_time);
1877
1878 // Time estimation isn't perfect, but we know the clocks were
1879 // identical when logged, so we know when this should have come back.
1880 // Confirm we got it when we expected.
1881 EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
1882 pi1_context->monotonic_event_time + 2 * network_delay +
1883 send_delay);
1884 });
1885 }
1886 for (std::pair<int, std::string> channel :
1887 shared()
1888 ? std::vector<
1889 std::pair<int, std::string>>{{-1,
1890 "/aos/remote_timestamps/pi1"}}
1891 : std::vector<std::pair<int, std::string>>{
1892 {pi2_timestamp_channel,
1893 "/aos/remote_timestamps/pi1/pi2/aos/"
1894 "aos-message_bridge-Timestamp"}}) {
1895 pi2_event_loop->MakeWatcher(
1896 channel.second,
1897 [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
1898 pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
1899 &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
1900 &pong_on_pi1_fetcher, network_delay, send_delay,
1901 channel_index = channel.first](const RemoteMessage &header) {
1902 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1903 chrono::nanoseconds(header.monotonic_sent_time()));
1904 const aos::realtime_clock::time_point header_realtime_sent_time(
1905 chrono::nanoseconds(header.realtime_sent_time()));
1906 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1907 chrono::nanoseconds(header.monotonic_remote_time()));
1908 const aos::realtime_clock::time_point header_realtime_remote_time(
1909 chrono::nanoseconds(header.realtime_remote_time()));
1910
1911 if (channel_index != -1) {
1912 ASSERT_EQ(channel_index, header.channel_index());
1913 }
1914
1915 const Context *pi2_context = nullptr;
1916 const Context *pi1_context = nullptr;
1917
1918 if (header.channel_index() == pi2_timestamp_channel) {
1919 ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
1920 ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
1921 pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
1922 pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
1923 } else if (header.channel_index() == pong_timestamp_channel) {
1924 ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
1925 ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
1926 pi2_context = &pong_on_pi2_fetcher.context();
1927 pi1_context = &pong_on_pi1_fetcher.context();
1928 } else {
1929 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1930 << configuration::CleanedChannelToString(
1931 pi2_event_loop->configuration()->channels()->Get(
1932 header.channel_index()));
1933 }
1934
1935 ASSERT_TRUE(header.has_boot_uuid());
1936 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1937 pi1_event_loop->boot_uuid());
1938
1939 EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
1940 EXPECT_EQ(pi1_context->remote_queue_index,
1941 header.remote_queue_index());
1942 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
1943
1944 EXPECT_EQ(pi1_context->monotonic_event_time,
1945 header_monotonic_sent_time);
1946 EXPECT_EQ(pi1_context->realtime_event_time,
1947 header_realtime_sent_time);
1948 EXPECT_EQ(pi1_context->realtime_remote_time,
1949 header_realtime_remote_time);
1950 EXPECT_EQ(pi1_context->monotonic_remote_time,
1951 header_monotonic_remote_time);
1952
1953 EXPECT_EQ(pi2_context->realtime_event_time,
1954 header_realtime_remote_time);
1955 EXPECT_EQ(pi2_context->monotonic_event_time,
1956 header_monotonic_remote_time);
1957
1958 // Time estimation isn't perfect, but we know the clocks were
1959 // identical when logged, so we know when this should have come back.
1960 // Confirm we got it when we expected.
1961 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
1962 pi2_context->monotonic_event_time + 2 * network_delay +
1963 send_delay);
1964 });
1965 }
1966
1967 // And confirm we can re-create a log again, while checking the contents.
1968 {
1969 LoggerState pi1_logger = MakeLogger(
1970 log_reader_factory.GetNodeEventLoopFactory("pi1"), &log_reader_factory);
1971 LoggerState pi2_logger = MakeLogger(
1972 log_reader_factory.GetNodeEventLoopFactory("pi2"), &log_reader_factory);
1973
1974 StartLogger(&pi1_logger, tmp_dir_ + "/relogged1");
1975 StartLogger(&pi2_logger, tmp_dir_ + "/relogged2");
1976
1977 log_reader_factory.Run();
1978 }
1979
1980 reader.Deregister();
1981
1982 // And verify that we can run the LogReader over the relogged files without
1983 // hitting any fatal errors.
1984 {
1985 LogReader relogged_reader(SortParts(MakeLogFiles(
1986 tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 3, 3, true)));
1987 relogged_reader.Register();
1988
1989 relogged_reader.event_loop_factory()->Run();
1990 }
1991 // And confirm that we can read the logged file using the reader's
1992 // configuration.
1993 {
1994 LogReader relogged_reader(
1995 SortParts(MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2",
1996 3, 3, true)),
1997 reader.configuration());
1998 relogged_reader.Register();
1999
2000 relogged_reader.event_loop_factory()->Run();
2001 }
2002}
2003
2004// Tests that we properly populate and extract the logger_start time by setting
2005// up a clock difference between 2 nodes and looking at the resulting parts.
2006TEST_P(MultinodeLoggerTest, LoggerStartTime) {
2007 std::vector<std::string> actual_filenames;
2008 time_converter_.AddMonotonic(
2009 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2010 {
2011 LoggerState pi1_logger = MakeLogger(pi1_);
2012 LoggerState pi2_logger = MakeLogger(pi2_);
2013
2014 StartLogger(&pi1_logger);
2015 StartLogger(&pi2_logger);
2016
2017 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2018
2019 pi1_logger.AppendAllFilenames(&actual_filenames);
2020 pi2_logger.AppendAllFilenames(&actual_filenames);
2021 }
2022
2023 ASSERT_THAT(actual_filenames,
2024 ::testing::UnorderedElementsAreArray(logfiles_));
2025
2026 for (const LogFile &log_file : SortParts(logfiles_)) {
2027 for (const LogParts &log_part : log_file.parts) {
2028 if (log_part.node == log_file.logger_node) {
2029 EXPECT_EQ(log_part.logger_monotonic_start_time,
2030 aos::monotonic_clock::min_time);
2031 EXPECT_EQ(log_part.logger_realtime_start_time,
2032 aos::realtime_clock::min_time);
2033 } else {
2034 const chrono::seconds offset = log_file.logger_node == "pi1"
2035 ? -chrono::seconds(1000)
2036 : chrono::seconds(1000);
2037 EXPECT_EQ(log_part.logger_monotonic_start_time,
2038 log_part.monotonic_start_time + offset);
2039 EXPECT_EQ(log_part.logger_realtime_start_time,
2040 log_file.realtime_start_time +
2041 (log_part.logger_monotonic_start_time -
2042 log_file.monotonic_start_time));
2043 }
2044 }
2045 }
2046}
2047
2048// Test that renaming the base, renames the folder.
2049TEST_P(MultinodeLoggerTest, LoggerRenameFolder) {
2050 util::UnlinkRecursive(tmp_dir_ + "/renamefolder");
2051 util::UnlinkRecursive(tmp_dir_ + "/new-good");
2052 time_converter_.AddMonotonic(
2053 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2054 logfile_base1_ = tmp_dir_ + "/renamefolder/multi_logfile1";
2055 logfile_base2_ = tmp_dir_ + "/renamefolder/multi_logfile2";
2056 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
2057 LoggerState pi1_logger = MakeLogger(pi1_);
2058 LoggerState pi2_logger = MakeLogger(pi2_);
2059
2060 StartLogger(&pi1_logger);
2061 StartLogger(&pi2_logger);
2062
2063 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2064 logfile_base1_ = tmp_dir_ + "/new-good/multi_logfile1";
2065 logfile_base2_ = tmp_dir_ + "/new-good/multi_logfile2";
2066 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
Alexei Strotscaf17d32023-04-03 22:31:11 -07002067
2068 // Sequence of set_base_name and Rotate simulates rename operation. Since
2069 // rename is not supported by all namers, RenameLogBase moved from logger to
2070 // the higher level abstraction, yet log_namers support rename, and it is
2071 // legal to test it here.
2072 pi1_logger.log_namer->set_base_name(logfile_base1_);
2073 pi1_logger.logger->Rotate();
2074 pi2_logger.log_namer->set_base_name(logfile_base2_);
2075 pi2_logger.logger->Rotate();
2076
Naman Guptaa63aa132023-03-22 20:06:34 -07002077 for (auto &file : logfiles_) {
2078 struct stat s;
2079 EXPECT_EQ(0, stat(file.c_str(), &s));
2080 }
2081}
2082
2083// Test that renaming the file base dies.
2084TEST_P(MultinodeLoggerDeathTest, LoggerRenameFile) {
2085 time_converter_.AddMonotonic(
2086 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2087 util::UnlinkRecursive(tmp_dir_ + "/renamefile");
2088 logfile_base1_ = tmp_dir_ + "/renamefile/multi_logfile1";
2089 logfile_base2_ = tmp_dir_ + "/renamefile/multi_logfile2";
2090 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
2091 LoggerState pi1_logger = MakeLogger(pi1_);
2092 StartLogger(&pi1_logger);
2093 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2094 logfile_base1_ = tmp_dir_ + "/new-renamefile/new_multi_logfile1";
Alexei Strotscaf17d32023-04-03 22:31:11 -07002095 EXPECT_DEATH({ pi1_logger.log_namer->set_base_name(logfile_base1_); },
Naman Guptaa63aa132023-03-22 20:06:34 -07002096 "Rename of file base from");
2097}
2098
2099// TODO(austin): We can write a test which recreates a logfile and confirms that
2100// we get it back. That is the ultimate test.
2101
2102// Tests that we properly recreate forwarded timestamps when replaying a log.
2103// This should be enough that we can then re-run the logger and get a valid log
2104// back.
2105TEST_P(MultinodeLoggerTest, RemoteReboot) {
2106 std::vector<std::string> actual_filenames;
2107
2108 const UUID pi1_boot0 = UUID::Random();
2109 const UUID pi2_boot0 = UUID::Random();
2110 const UUID pi2_boot1 = UUID::Random();
2111 {
2112 CHECK_EQ(pi1_index_, 0u);
2113 CHECK_EQ(pi2_index_, 1u);
2114
2115 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2116 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2117 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2118
2119 time_converter_.AddNextTimestamp(
2120 distributed_clock::epoch(),
2121 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2122 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2123 time_converter_.AddNextTimestamp(
2124 distributed_clock::epoch() + reboot_time,
2125 {BootTimestamp::epoch() + reboot_time,
2126 BootTimestamp{
2127 .boot = 1,
2128 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2129 }
2130
2131 {
2132 LoggerState pi1_logger = MakeLogger(pi1_);
2133
2134 event_loop_factory_.RunFor(chrono::milliseconds(95));
2135 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2136 pi1_boot0);
2137 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2138 pi2_boot0);
2139
2140 StartLogger(&pi1_logger);
2141
2142 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2143
2144 VLOG(1) << "Reboot now!";
2145
2146 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2147 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2148 pi1_boot0);
2149 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2150 pi2_boot1);
2151
2152 pi1_logger.AppendAllFilenames(&actual_filenames);
2153 }
2154
2155 std::sort(actual_filenames.begin(), actual_filenames.end());
2156 std::sort(pi1_reboot_logfiles_.begin(), pi1_reboot_logfiles_.end());
2157 ASSERT_THAT(actual_filenames,
2158 ::testing::UnorderedElementsAreArray(pi1_reboot_logfiles_));
2159
2160 // Confirm that our new oldest timestamps properly update as we reboot and
2161 // rotate.
2162 for (const std::string &file : pi1_reboot_logfiles_) {
2163 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2164 ReadHeader(file);
2165 CHECK(log_header);
2166 if (log_header->message().has_configuration()) {
2167 continue;
2168 }
2169
2170 const monotonic_clock::time_point monotonic_start_time =
2171 monotonic_clock::time_point(
2172 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2173 const UUID source_node_boot_uuid = UUID::FromString(
2174 log_header->message().source_node_boot_uuid()->string_view());
2175
2176 if (log_header->message().node()->name()->string_view() != "pi1") {
2177 // The remote message channel should rotate later and have more parts.
2178 // This only is true on the log files with shared remote messages.
2179 //
2180 // TODO(austin): I'm not the most thrilled with this test pattern... It
2181 // feels brittle in a different way.
2182 if (file.find("aos.message_bridge.RemoteMessage") == std::string::npos ||
2183 !shared()) {
2184 switch (log_header->message().parts_index()) {
2185 case 0:
2186 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2187 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2188 break;
2189 case 1:
2190 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2191 ASSERT_EQ(monotonic_start_time,
2192 monotonic_clock::epoch() + chrono::seconds(1));
2193 break;
2194 case 2:
2195 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2196 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2197 break;
2198 case 3:
2199 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2200 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
2201 chrono::nanoseconds(2322999462))
2202 << " on " << file;
2203 break;
2204 default:
2205 FAIL();
2206 break;
2207 }
2208 } else {
2209 switch (log_header->message().parts_index()) {
2210 case 0:
2211 case 1:
2212 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2213 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2214 break;
2215 case 2:
2216 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2217 ASSERT_EQ(monotonic_start_time,
2218 monotonic_clock::epoch() + chrono::seconds(1));
2219 break;
2220 case 3:
2221 case 4:
2222 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2223 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2224 break;
2225 case 5:
2226 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2227 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
2228 chrono::nanoseconds(2322999462))
2229 << " on " << file;
2230 break;
2231 default:
2232 FAIL();
2233 break;
2234 }
2235 }
2236 continue;
2237 }
2238 SCOPED_TRACE(file);
2239 SCOPED_TRACE(aos::FlatbufferToJson(
2240 *log_header, {.multi_line = true, .max_vector_size = 100}));
2241 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2242 ASSERT_EQ(
2243 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2244 EXPECT_EQ(
2245 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2246 monotonic_clock::max_time.time_since_epoch().count());
2247 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2248 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2249 2u);
2250 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2251 monotonic_clock::max_time.time_since_epoch().count());
2252 ASSERT_TRUE(log_header->message()
2253 .has_oldest_remote_unreliable_monotonic_timestamps());
2254 ASSERT_EQ(log_header->message()
2255 .oldest_remote_unreliable_monotonic_timestamps()
2256 ->size(),
2257 2u);
2258 EXPECT_EQ(log_header->message()
2259 .oldest_remote_unreliable_monotonic_timestamps()
2260 ->Get(0),
2261 monotonic_clock::max_time.time_since_epoch().count());
2262 ASSERT_TRUE(log_header->message()
2263 .has_oldest_local_unreliable_monotonic_timestamps());
2264 ASSERT_EQ(log_header->message()
2265 .oldest_local_unreliable_monotonic_timestamps()
2266 ->size(),
2267 2u);
2268 EXPECT_EQ(log_header->message()
2269 .oldest_local_unreliable_monotonic_timestamps()
2270 ->Get(0),
2271 monotonic_clock::max_time.time_since_epoch().count());
2272
2273 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2274 monotonic_clock::time_point(chrono::nanoseconds(
2275 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2276 1)));
2277 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2278 monotonic_clock::time_point(chrono::nanoseconds(
2279 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2280 const monotonic_clock::time_point
2281 oldest_remote_unreliable_monotonic_timestamps =
2282 monotonic_clock::time_point(chrono::nanoseconds(
2283 log_header->message()
2284 .oldest_remote_unreliable_monotonic_timestamps()
2285 ->Get(1)));
2286 const monotonic_clock::time_point
2287 oldest_local_unreliable_monotonic_timestamps =
2288 monotonic_clock::time_point(chrono::nanoseconds(
2289 log_header->message()
2290 .oldest_local_unreliable_monotonic_timestamps()
2291 ->Get(1)));
2292 const monotonic_clock::time_point
2293 oldest_remote_reliable_monotonic_timestamps =
2294 monotonic_clock::time_point(chrono::nanoseconds(
2295 log_header->message()
2296 .oldest_remote_reliable_monotonic_timestamps()
2297 ->Get(1)));
2298 const monotonic_clock::time_point
2299 oldest_local_reliable_monotonic_timestamps =
2300 monotonic_clock::time_point(chrono::nanoseconds(
2301 log_header->message()
2302 .oldest_local_reliable_monotonic_timestamps()
2303 ->Get(1)));
2304 const monotonic_clock::time_point
2305 oldest_logger_remote_unreliable_monotonic_timestamps =
2306 monotonic_clock::time_point(chrono::nanoseconds(
2307 log_header->message()
2308 .oldest_logger_remote_unreliable_monotonic_timestamps()
2309 ->Get(0)));
2310 const monotonic_clock::time_point
2311 oldest_logger_local_unreliable_monotonic_timestamps =
2312 monotonic_clock::time_point(chrono::nanoseconds(
2313 log_header->message()
2314 .oldest_logger_local_unreliable_monotonic_timestamps()
2315 ->Get(0)));
2316 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2317 monotonic_clock::max_time);
2318 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2319 monotonic_clock::max_time);
2320 switch (log_header->message().parts_index()) {
2321 case 0:
2322 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2323 monotonic_clock::max_time);
2324 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2325 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2326 monotonic_clock::max_time);
2327 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2328 monotonic_clock::max_time);
2329 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2330 monotonic_clock::max_time);
2331 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2332 monotonic_clock::max_time);
2333 break;
2334 case 1:
2335 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2336 monotonic_clock::time_point(chrono::microseconds(90200)));
2337 EXPECT_EQ(oldest_local_monotonic_timestamps,
2338 monotonic_clock::time_point(chrono::microseconds(90350)));
2339 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2340 monotonic_clock::time_point(chrono::microseconds(90200)));
2341 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2342 monotonic_clock::time_point(chrono::microseconds(90350)));
2343 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2344 monotonic_clock::max_time);
2345 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2346 monotonic_clock::max_time);
2347 break;
2348 case 2:
2349 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2350 monotonic_clock::time_point(chrono::microseconds(90200)))
2351 << file;
2352 EXPECT_EQ(oldest_local_monotonic_timestamps,
2353 monotonic_clock::time_point(chrono::microseconds(90350)))
2354 << file;
2355 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2356 monotonic_clock::time_point(chrono::microseconds(90200)))
2357 << file;
2358 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2359 monotonic_clock::time_point(chrono::microseconds(90350)))
2360 << file;
2361 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2362 monotonic_clock::time_point(chrono::microseconds(100000)))
2363 << file;
2364 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2365 monotonic_clock::time_point(chrono::microseconds(100150)))
2366 << file;
2367 break;
2368 case 3:
2369 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2370 monotonic_clock::time_point(chrono::milliseconds(1323) +
2371 chrono::microseconds(200)));
2372 EXPECT_EQ(oldest_local_monotonic_timestamps,
2373 monotonic_clock::time_point(chrono::microseconds(10100350)));
2374 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2375 monotonic_clock::time_point(chrono::milliseconds(1323) +
2376 chrono::microseconds(200)));
2377 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2378 monotonic_clock::time_point(chrono::microseconds(10100350)));
2379 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2380 monotonic_clock::max_time)
2381 << file;
2382 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2383 monotonic_clock::max_time)
2384 << file;
2385 break;
2386 case 4:
2387 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2388 monotonic_clock::time_point(chrono::milliseconds(1323) +
2389 chrono::microseconds(200)));
2390 EXPECT_EQ(oldest_local_monotonic_timestamps,
2391 monotonic_clock::time_point(chrono::microseconds(10100350)));
2392 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2393 monotonic_clock::time_point(chrono::milliseconds(1323) +
2394 chrono::microseconds(200)));
2395 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2396 monotonic_clock::time_point(chrono::microseconds(10100350)));
2397 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2398 monotonic_clock::time_point(chrono::microseconds(1423000)))
2399 << file;
2400 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2401 monotonic_clock::time_point(chrono::microseconds(10200150)))
2402 << file;
2403 break;
2404 default:
2405 FAIL();
2406 break;
2407 }
2408 }
2409
2410 // Confirm that we refuse to replay logs with missing boot uuids.
2411 {
2412 LogReader reader(SortParts(pi1_reboot_logfiles_));
2413
2414 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2415 log_reader_factory.set_send_delay(chrono::microseconds(0));
2416
2417 // This sends out the fetched messages and advances time to the start of
2418 // the log file.
2419 reader.Register(&log_reader_factory);
2420
2421 log_reader_factory.Run();
2422
2423 reader.Deregister();
2424 }
2425}
2426
2427// Tests that we can sort a log which only has timestamps from the remote
2428// because the local message_bridge_client failed to connect.
2429TEST_P(MultinodeLoggerTest, RemoteRebootOnlyTimestamps) {
2430 const UUID pi1_boot0 = UUID::Random();
2431 const UUID pi2_boot0 = UUID::Random();
2432 const UUID pi2_boot1 = UUID::Random();
2433 {
2434 CHECK_EQ(pi1_index_, 0u);
2435 CHECK_EQ(pi2_index_, 1u);
2436
2437 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2438 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2439 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2440
2441 time_converter_.AddNextTimestamp(
2442 distributed_clock::epoch(),
2443 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2444 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2445 time_converter_.AddNextTimestamp(
2446 distributed_clock::epoch() + reboot_time,
2447 {BootTimestamp::epoch() + reboot_time,
2448 BootTimestamp{
2449 .boot = 1,
2450 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2451 }
2452 pi2_->Disconnect(pi1_->node());
2453
2454 std::vector<std::string> filenames;
2455 {
2456 LoggerState pi1_logger = MakeLogger(pi1_);
2457
2458 event_loop_factory_.RunFor(chrono::milliseconds(95));
2459 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2460 pi1_boot0);
2461 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2462 pi2_boot0);
2463
2464 StartLogger(&pi1_logger);
2465
2466 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2467
2468 VLOG(1) << "Reboot now!";
2469
2470 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2471 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2472 pi1_boot0);
2473 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2474 pi2_boot1);
2475 pi1_logger.AppendAllFilenames(&filenames);
2476 }
2477
2478 std::sort(filenames.begin(), filenames.end());
2479
2480 // Confirm that our new oldest timestamps properly update as we reboot and
2481 // rotate.
2482 size_t timestamp_file_count = 0;
2483 for (const std::string &file : filenames) {
2484 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2485 ReadHeader(file);
2486 CHECK(log_header);
2487
2488 if (log_header->message().has_configuration()) {
2489 continue;
2490 }
2491
2492 const monotonic_clock::time_point monotonic_start_time =
2493 monotonic_clock::time_point(
2494 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2495 const UUID source_node_boot_uuid = UUID::FromString(
2496 log_header->message().source_node_boot_uuid()->string_view());
2497
2498 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2499 ASSERT_EQ(
2500 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2501 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2502 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2503 2u);
2504 ASSERT_TRUE(log_header->message()
2505 .has_oldest_remote_unreliable_monotonic_timestamps());
2506 ASSERT_EQ(log_header->message()
2507 .oldest_remote_unreliable_monotonic_timestamps()
2508 ->size(),
2509 2u);
2510 ASSERT_TRUE(log_header->message()
2511 .has_oldest_local_unreliable_monotonic_timestamps());
2512 ASSERT_EQ(log_header->message()
2513 .oldest_local_unreliable_monotonic_timestamps()
2514 ->size(),
2515 2u);
2516 ASSERT_TRUE(log_header->message()
2517 .has_oldest_remote_reliable_monotonic_timestamps());
2518 ASSERT_EQ(log_header->message()
2519 .oldest_remote_reliable_monotonic_timestamps()
2520 ->size(),
2521 2u);
2522 ASSERT_TRUE(
2523 log_header->message().has_oldest_local_reliable_monotonic_timestamps());
2524 ASSERT_EQ(log_header->message()
2525 .oldest_local_reliable_monotonic_timestamps()
2526 ->size(),
2527 2u);
2528
2529 ASSERT_TRUE(
2530 log_header->message()
2531 .has_oldest_logger_remote_unreliable_monotonic_timestamps());
2532 ASSERT_EQ(log_header->message()
2533 .oldest_logger_remote_unreliable_monotonic_timestamps()
2534 ->size(),
2535 2u);
2536 ASSERT_TRUE(log_header->message()
2537 .has_oldest_logger_local_unreliable_monotonic_timestamps());
2538 ASSERT_EQ(log_header->message()
2539 .oldest_logger_local_unreliable_monotonic_timestamps()
2540 ->size(),
2541 2u);
2542
2543 if (log_header->message().node()->name()->string_view() != "pi1") {
2544 ASSERT_TRUE(file.find("aos.message_bridge.RemoteMessage") !=
2545 std::string::npos);
2546
2547 const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
2548 ReadNthMessage(file, 0);
2549 CHECK(msg);
2550
2551 EXPECT_TRUE(msg->message().has_monotonic_sent_time());
2552 EXPECT_TRUE(msg->message().has_monotonic_remote_time());
2553
2554 const monotonic_clock::time_point
2555 expected_oldest_local_monotonic_timestamps(
2556 chrono::nanoseconds(msg->message().monotonic_sent_time()));
2557 const monotonic_clock::time_point
2558 expected_oldest_remote_monotonic_timestamps(
2559 chrono::nanoseconds(msg->message().monotonic_remote_time()));
2560 const monotonic_clock::time_point
2561 expected_oldest_timestamp_monotonic_timestamps(
2562 chrono::nanoseconds(msg->message().monotonic_timestamp_time()));
2563
2564 EXPECT_NE(expected_oldest_local_monotonic_timestamps,
2565 monotonic_clock::min_time);
2566 EXPECT_NE(expected_oldest_remote_monotonic_timestamps,
2567 monotonic_clock::min_time);
2568 EXPECT_NE(expected_oldest_timestamp_monotonic_timestamps,
2569 monotonic_clock::min_time);
2570
2571 ++timestamp_file_count;
2572 // Since the log file is from the perspective of the other node,
2573 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2574 monotonic_clock::time_point(chrono::nanoseconds(
2575 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2576 0)));
2577 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2578 monotonic_clock::time_point(chrono::nanoseconds(
2579 log_header->message().oldest_local_monotonic_timestamps()->Get(
2580 0)));
2581 const monotonic_clock::time_point
2582 oldest_remote_unreliable_monotonic_timestamps =
2583 monotonic_clock::time_point(chrono::nanoseconds(
2584 log_header->message()
2585 .oldest_remote_unreliable_monotonic_timestamps()
2586 ->Get(0)));
2587 const monotonic_clock::time_point
2588 oldest_local_unreliable_monotonic_timestamps =
2589 monotonic_clock::time_point(chrono::nanoseconds(
2590 log_header->message()
2591 .oldest_local_unreliable_monotonic_timestamps()
2592 ->Get(0)));
2593 const monotonic_clock::time_point
2594 oldest_remote_reliable_monotonic_timestamps =
2595 monotonic_clock::time_point(chrono::nanoseconds(
2596 log_header->message()
2597 .oldest_remote_reliable_monotonic_timestamps()
2598 ->Get(0)));
2599 const monotonic_clock::time_point
2600 oldest_local_reliable_monotonic_timestamps =
2601 monotonic_clock::time_point(chrono::nanoseconds(
2602 log_header->message()
2603 .oldest_local_reliable_monotonic_timestamps()
2604 ->Get(0)));
2605 const monotonic_clock::time_point
2606 oldest_logger_remote_unreliable_monotonic_timestamps =
2607 monotonic_clock::time_point(chrono::nanoseconds(
2608 log_header->message()
2609 .oldest_logger_remote_unreliable_monotonic_timestamps()
2610 ->Get(1)));
2611 const monotonic_clock::time_point
2612 oldest_logger_local_unreliable_monotonic_timestamps =
2613 monotonic_clock::time_point(chrono::nanoseconds(
2614 log_header->message()
2615 .oldest_logger_local_unreliable_monotonic_timestamps()
2616 ->Get(1)));
2617
2618 const Channel *channel =
2619 event_loop_factory_.configuration()->channels()->Get(
2620 msg->message().channel_index());
2621 const Connection *connection = configuration::ConnectionToNode(
2622 channel, configuration::GetNode(
2623 event_loop_factory_.configuration(),
2624 log_header->message().node()->name()->string_view()));
2625
2626 const bool reliable = connection->time_to_live() == 0;
2627
2628 SCOPED_TRACE(file);
2629 SCOPED_TRACE(aos::FlatbufferToJson(
2630 *log_header, {.multi_line = true, .max_vector_size = 100}));
2631
2632 if (shared()) {
2633 // Confirm that the oldest timestamps match what we expect. Based on
2634 // what we are doing, we know that the oldest time is the first
2635 // message's time.
2636 //
2637 // This makes the test robust to both the split and combined config
2638 // tests.
2639 switch (log_header->message().parts_index()) {
2640 case 0:
2641 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2642 expected_oldest_remote_monotonic_timestamps);
2643 EXPECT_EQ(oldest_local_monotonic_timestamps,
2644 expected_oldest_local_monotonic_timestamps);
2645 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2646 expected_oldest_local_monotonic_timestamps)
2647 << file;
2648 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2649 expected_oldest_timestamp_monotonic_timestamps)
2650 << file;
2651
2652 if (reliable) {
2653 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2654 expected_oldest_remote_monotonic_timestamps);
2655 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2656 expected_oldest_local_monotonic_timestamps);
2657 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2658 monotonic_clock::max_time);
2659 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2660 monotonic_clock::max_time);
2661 } else {
2662 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2663 monotonic_clock::max_time);
2664 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2665 monotonic_clock::max_time);
2666 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2667 expected_oldest_remote_monotonic_timestamps);
2668 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2669 expected_oldest_local_monotonic_timestamps);
2670 }
2671 break;
2672 case 1:
2673 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2674 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2675 EXPECT_EQ(oldest_local_monotonic_timestamps,
2676 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2677 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2678 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2679 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2680 monotonic_clock::epoch() + chrono::nanoseconds(90250000));
2681 if (reliable) {
2682 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2683 expected_oldest_remote_monotonic_timestamps);
2684 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2685 expected_oldest_local_monotonic_timestamps);
2686 EXPECT_EQ(
2687 oldest_remote_unreliable_monotonic_timestamps,
2688 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2689 EXPECT_EQ(
2690 oldest_local_unreliable_monotonic_timestamps,
2691 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2692 } else {
2693 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2694 monotonic_clock::max_time);
2695 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2696 monotonic_clock::max_time);
2697 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2698 expected_oldest_remote_monotonic_timestamps);
2699 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2700 expected_oldest_local_monotonic_timestamps);
2701 }
2702 break;
2703 case 2:
2704 EXPECT_EQ(
2705 oldest_remote_monotonic_timestamps,
2706 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2707 EXPECT_EQ(
2708 oldest_local_monotonic_timestamps,
2709 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2710 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2711 expected_oldest_local_monotonic_timestamps)
2712 << file;
2713 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2714 expected_oldest_timestamp_monotonic_timestamps)
2715 << file;
2716 if (reliable) {
2717 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2718 expected_oldest_remote_monotonic_timestamps);
2719 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2720 expected_oldest_local_monotonic_timestamps);
2721 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2722 monotonic_clock::max_time);
2723 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2724 monotonic_clock::max_time);
2725 } else {
2726 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2727 monotonic_clock::max_time);
2728 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2729 monotonic_clock::max_time);
2730 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2731 expected_oldest_remote_monotonic_timestamps);
2732 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2733 expected_oldest_local_monotonic_timestamps);
2734 }
2735 break;
2736
2737 case 3:
2738 EXPECT_EQ(
2739 oldest_remote_monotonic_timestamps,
2740 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2741 EXPECT_EQ(
2742 oldest_local_monotonic_timestamps,
2743 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2744 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2745 expected_oldest_remote_monotonic_timestamps);
2746 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2747 expected_oldest_local_monotonic_timestamps);
2748 EXPECT_EQ(
2749 oldest_logger_remote_unreliable_monotonic_timestamps,
2750 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2751 EXPECT_EQ(
2752 oldest_logger_local_unreliable_monotonic_timestamps,
2753 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2754 break;
2755 default:
2756 FAIL();
2757 break;
2758 }
2759
2760 switch (log_header->message().parts_index()) {
2761 case 0:
2762 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2763 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2764 break;
2765 case 1:
2766 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2767 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2768 break;
2769 case 2:
2770 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2771 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2772 break;
2773 case 3:
2774 if (shared()) {
2775 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2776 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2777 break;
2778 }
2779 [[fallthrough]];
2780 default:
2781 FAIL();
2782 break;
2783 }
2784 } else {
2785 switch (log_header->message().parts_index()) {
2786 case 0:
2787 if (reliable) {
2788 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2789 monotonic_clock::max_time);
2790 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2791 monotonic_clock::max_time);
2792 EXPECT_EQ(
2793 oldest_logger_remote_unreliable_monotonic_timestamps,
2794 monotonic_clock::epoch() + chrono::nanoseconds(100150000))
2795 << file;
2796 EXPECT_EQ(
2797 oldest_logger_local_unreliable_monotonic_timestamps,
2798 monotonic_clock::epoch() + chrono::nanoseconds(100250000))
2799 << file;
2800 } else {
2801 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2802 expected_oldest_remote_monotonic_timestamps);
2803 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2804 expected_oldest_local_monotonic_timestamps);
2805 EXPECT_EQ(
2806 oldest_logger_remote_unreliable_monotonic_timestamps,
2807 monotonic_clock::epoch() + chrono::nanoseconds(90150000))
2808 << file;
2809 EXPECT_EQ(
2810 oldest_logger_local_unreliable_monotonic_timestamps,
2811 monotonic_clock::epoch() + chrono::nanoseconds(90250000))
2812 << file;
2813 }
2814 break;
2815 case 1:
2816 if (reliable) {
2817 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2818 monotonic_clock::max_time);
2819 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2820 monotonic_clock::max_time);
2821 EXPECT_EQ(
2822 oldest_logger_remote_unreliable_monotonic_timestamps,
2823 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2824 EXPECT_EQ(
2825 oldest_logger_local_unreliable_monotonic_timestamps,
2826 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2827 } else {
2828 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2829 expected_oldest_remote_monotonic_timestamps);
2830 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2831 expected_oldest_local_monotonic_timestamps);
2832 EXPECT_EQ(
2833 oldest_logger_remote_unreliable_monotonic_timestamps,
2834 monotonic_clock::epoch() + chrono::nanoseconds(1323150000));
2835 EXPECT_EQ(
2836 oldest_logger_local_unreliable_monotonic_timestamps,
2837 monotonic_clock::epoch() + chrono::nanoseconds(10100250000));
2838 }
2839 break;
2840 default:
2841 FAIL();
2842 break;
2843 }
2844
2845 switch (log_header->message().parts_index()) {
2846 case 0:
2847 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2848 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2849 break;
2850 case 1:
2851 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2852 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2853 break;
2854 default:
2855 FAIL();
2856 break;
2857 }
2858 }
2859
2860 continue;
2861 }
2862 EXPECT_EQ(
2863 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2864 monotonic_clock::max_time.time_since_epoch().count());
2865 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2866 monotonic_clock::max_time.time_since_epoch().count());
2867 EXPECT_EQ(log_header->message()
2868 .oldest_remote_unreliable_monotonic_timestamps()
2869 ->Get(0),
2870 monotonic_clock::max_time.time_since_epoch().count());
2871 EXPECT_EQ(log_header->message()
2872 .oldest_local_unreliable_monotonic_timestamps()
2873 ->Get(0),
2874 monotonic_clock::max_time.time_since_epoch().count());
2875
2876 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2877 monotonic_clock::time_point(chrono::nanoseconds(
2878 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2879 1)));
2880 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2881 monotonic_clock::time_point(chrono::nanoseconds(
2882 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2883 const monotonic_clock::time_point
2884 oldest_remote_unreliable_monotonic_timestamps =
2885 monotonic_clock::time_point(chrono::nanoseconds(
2886 log_header->message()
2887 .oldest_remote_unreliable_monotonic_timestamps()
2888 ->Get(1)));
2889 const monotonic_clock::time_point
2890 oldest_local_unreliable_monotonic_timestamps =
2891 monotonic_clock::time_point(chrono::nanoseconds(
2892 log_header->message()
2893 .oldest_local_unreliable_monotonic_timestamps()
2894 ->Get(1)));
2895 switch (log_header->message().parts_index()) {
2896 case 0:
2897 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2898 monotonic_clock::max_time);
2899 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2900 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2901 monotonic_clock::max_time);
2902 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2903 monotonic_clock::max_time);
2904 break;
2905 default:
2906 FAIL();
2907 break;
2908 }
2909 }
2910
2911 if (shared()) {
2912 EXPECT_EQ(timestamp_file_count, 4u);
2913 } else {
2914 EXPECT_EQ(timestamp_file_count, 4u);
2915 }
2916
2917 // Confirm that we can actually sort the resulting log and read it.
2918 {
2919 LogReader reader(SortParts(filenames));
2920
2921 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2922 log_reader_factory.set_send_delay(chrono::microseconds(0));
2923
2924 // This sends out the fetched messages and advances time to the start of
2925 // the log file.
2926 reader.Register(&log_reader_factory);
2927
2928 log_reader_factory.Run();
2929
2930 reader.Deregister();
2931 }
2932}
2933
2934// Tests that we properly handle one direction of message_bridge being
2935// unavailable.
2936TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
2937 pi1_->Disconnect(pi2_->node());
2938 time_converter_.AddMonotonic(
2939 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2940
2941 time_converter_.AddMonotonic(
2942 {chrono::milliseconds(10000),
2943 chrono::milliseconds(10000) - chrono::milliseconds(1)});
2944 {
2945 LoggerState pi1_logger = MakeLogger(pi1_);
2946
2947 event_loop_factory_.RunFor(chrono::milliseconds(95));
2948
2949 StartLogger(&pi1_logger);
2950
2951 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2952 }
2953
2954 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2955 // to confirm the right thing happened.
2956 ConfirmReadable(pi1_single_direction_logfiles_);
2957}
2958
2959// Tests that we properly handle one direction of message_bridge being
2960// unavailable.
2961TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
2962 pi1_->Disconnect(pi2_->node());
2963 time_converter_.AddMonotonic(
2964 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(500)});
2965
2966 time_converter_.AddMonotonic(
2967 {chrono::milliseconds(10000),
2968 chrono::milliseconds(10000) + chrono::milliseconds(1)});
2969 {
2970 LoggerState pi1_logger = MakeLogger(pi1_);
2971
2972 event_loop_factory_.RunFor(chrono::milliseconds(95));
2973
2974 StartLogger(&pi1_logger);
2975
2976 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2977 }
2978
2979 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2980 // to confirm the right thing happened.
2981 ConfirmReadable(pi1_single_direction_logfiles_);
2982}
2983
2984// Tests that we explode if someone passes in a part file twice with a better
2985// error than an out of order error.
2986TEST_P(MultinodeLoggerTest, DuplicateLogFiles) {
2987 time_converter_.AddMonotonic(
2988 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2989 {
2990 LoggerState pi1_logger = MakeLogger(pi1_);
2991
2992 event_loop_factory_.RunFor(chrono::milliseconds(95));
2993
2994 StartLogger(&pi1_logger);
2995
2996 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2997 }
2998
2999 std::vector<std::string> duplicates;
3000 for (const std::string &f : pi1_single_direction_logfiles_) {
3001 duplicates.emplace_back(f);
3002 duplicates.emplace_back(f);
3003 }
3004 EXPECT_DEATH({ SortParts(duplicates); }, "Found duplicate parts in");
3005}
3006
3007// Tests that we explode if someone loses a part out of the middle of a log.
3008TEST_P(MultinodeLoggerTest, MissingPartsFromMiddle) {
3009 time_converter_.AddMonotonic(
3010 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
3011 {
3012 LoggerState pi1_logger = MakeLogger(pi1_);
3013
3014 event_loop_factory_.RunFor(chrono::milliseconds(95));
3015
3016 StartLogger(&pi1_logger);
3017 aos::monotonic_clock::time_point last_rotation_time =
3018 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07003019 pi1_logger.logger->set_on_logged_period(
3020 [&](aos::monotonic_clock::time_point) {
3021 const auto now = pi1_logger.event_loop->monotonic_now();
3022 if (now > last_rotation_time + std::chrono::seconds(5)) {
3023 pi1_logger.logger->Rotate();
3024 last_rotation_time = now;
3025 }
3026 });
Naman Guptaa63aa132023-03-22 20:06:34 -07003027
3028 event_loop_factory_.RunFor(chrono::milliseconds(10000));
3029 }
3030
3031 std::vector<std::string> missing_parts;
3032
3033 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
3034 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part2" + Extension());
3035 missing_parts.emplace_back(absl::StrCat(
3036 logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
3037
3038 EXPECT_DEATH({ SortParts(missing_parts); },
3039 "Broken log, missing part files between");
3040}
3041
3042// Tests that we properly handle a dead node. Do this by just disconnecting it
3043// and only using one nodes of logs.
3044TEST_P(MultinodeLoggerTest, DeadNode) {
3045 pi1_->Disconnect(pi2_->node());
3046 pi2_->Disconnect(pi1_->node());
3047 time_converter_.AddMonotonic(
3048 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
3049 {
3050 LoggerState pi1_logger = MakeLogger(pi1_);
3051
3052 event_loop_factory_.RunFor(chrono::milliseconds(95));
3053
3054 StartLogger(&pi1_logger);
3055
3056 event_loop_factory_.RunFor(chrono::milliseconds(10000));
3057 }
3058
3059 // Confirm that we can parse the result. LogReader has enough internal CHECKs
3060 // to confirm the right thing happened.
3061 ConfirmReadable(MakePi1DeadNodeLogfiles());
3062}
3063
3064// Tests that we can relog with a different config. This makes most sense when
3065// you are trying to edit a log and want to use channel renaming + the original
3066// config in the new log.
3067TEST_P(MultinodeLoggerTest, LogDifferentConfig) {
3068 time_converter_.StartEqual();
3069 {
3070 LoggerState pi1_logger = MakeLogger(pi1_);
3071 LoggerState pi2_logger = MakeLogger(pi2_);
3072
3073 event_loop_factory_.RunFor(chrono::milliseconds(95));
3074
3075 StartLogger(&pi1_logger);
3076 StartLogger(&pi2_logger);
3077
3078 event_loop_factory_.RunFor(chrono::milliseconds(20000));
3079 }
3080
3081 LogReader reader(SortParts(logfiles_));
3082 reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
3083
3084 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
3085 log_reader_factory.set_send_delay(chrono::microseconds(0));
3086
3087 // This sends out the fetched messages and advances time to the start of the
3088 // log file.
3089 reader.Register(&log_reader_factory);
3090
3091 const Node *pi1 =
3092 configuration::GetNode(log_reader_factory.configuration(), "pi1");
3093 const Node *pi2 =
3094 configuration::GetNode(log_reader_factory.configuration(), "pi2");
3095
3096 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
3097 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
3098 LOG(INFO) << "now pi1 "
3099 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
3100 LOG(INFO) << "now pi2 "
3101 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
3102
3103 EXPECT_THAT(reader.LoggedNodes(),
3104 ::testing::ElementsAre(
3105 configuration::GetNode(reader.logged_configuration(), pi1),
3106 configuration::GetNode(reader.logged_configuration(), pi2)));
3107
3108 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
3109
3110 // And confirm we can re-create a log again, while checking the contents.
3111 std::vector<std::string> log_files;
3112 {
3113 LoggerState pi1_logger =
3114 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
3115 &log_reader_factory, reader.logged_configuration());
3116 LoggerState pi2_logger =
3117 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
3118 &log_reader_factory, reader.logged_configuration());
3119
3120 pi1_logger.StartLogger(tmp_dir_ + "/relogged1");
3121 pi2_logger.StartLogger(tmp_dir_ + "/relogged2");
3122
3123 log_reader_factory.Run();
3124
3125 for (auto &x : pi1_logger.log_namer->all_filenames()) {
3126 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged1_", x));
3127 }
3128 for (auto &x : pi2_logger.log_namer->all_filenames()) {
3129 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged2_", x));
3130 }
3131 }
3132
3133 reader.Deregister();
3134
3135 // And verify that we can run the LogReader over the relogged files without
3136 // hitting any fatal errors.
3137 {
3138 LogReader relogged_reader(SortParts(log_files));
3139 relogged_reader.Register();
3140
3141 relogged_reader.event_loop_factory()->Run();
3142 }
3143}
3144
3145// Tests that we properly replay a log where the start time for a node is before
3146// any data on the node. This can happen if the logger starts before data is
3147// published. While the scenario below is a bit convoluted, we have seen logs
3148// like this generated out in the wild.
3149TEST(MultinodeRebootLoggerTest, StartTimeBeforeData) {
3150 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3151 aos::configuration::ReadConfig(ArtifactPath(
3152 "aos/events/logging/multinode_pingpong_split3_config.json"));
3153 message_bridge::TestingTimeConverter time_converter(
3154 configuration::NodesCount(&config.message()));
3155 SimulatedEventLoopFactory event_loop_factory(&config.message());
3156 event_loop_factory.SetTimeConverter(&time_converter);
3157 NodeEventLoopFactory *const pi1 =
3158 event_loop_factory.GetNodeEventLoopFactory("pi1");
3159 const size_t pi1_index = configuration::GetNodeIndex(
3160 event_loop_factory.configuration(), pi1->node());
3161 NodeEventLoopFactory *const pi2 =
3162 event_loop_factory.GetNodeEventLoopFactory("pi2");
3163 const size_t pi2_index = configuration::GetNodeIndex(
3164 event_loop_factory.configuration(), pi2->node());
3165 NodeEventLoopFactory *const pi3 =
3166 event_loop_factory.GetNodeEventLoopFactory("pi3");
3167 const size_t pi3_index = configuration::GetNodeIndex(
3168 event_loop_factory.configuration(), pi3->node());
3169
3170 const std::string kLogfile1_1 =
3171 aos::testing::TestTmpDir() + "/multi_logfile1/";
3172 const std::string kLogfile2_1 =
3173 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3174 const std::string kLogfile2_2 =
3175 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3176 const std::string kLogfile3_1 =
3177 aos::testing::TestTmpDir() + "/multi_logfile3/";
3178 util::UnlinkRecursive(kLogfile1_1);
3179 util::UnlinkRecursive(kLogfile2_1);
3180 util::UnlinkRecursive(kLogfile2_2);
3181 util::UnlinkRecursive(kLogfile3_1);
3182 const UUID pi1_boot0 = UUID::Random();
3183 const UUID pi2_boot0 = UUID::Random();
3184 const UUID pi2_boot1 = UUID::Random();
3185 const UUID pi3_boot0 = UUID::Random();
3186 {
3187 CHECK_EQ(pi1_index, 0u);
3188 CHECK_EQ(pi2_index, 1u);
3189 CHECK_EQ(pi3_index, 2u);
3190
3191 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3192 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3193 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3194 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3195
3196 time_converter.AddNextTimestamp(
3197 distributed_clock::epoch(),
3198 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3199 BootTimestamp::epoch()});
3200 const chrono::nanoseconds reboot_time = chrono::milliseconds(20000);
3201 time_converter.AddNextTimestamp(
3202 distributed_clock::epoch() + reboot_time,
3203 {BootTimestamp::epoch() + reboot_time,
3204 BootTimestamp{
3205 .boot = 1,
3206 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)},
3207 BootTimestamp::epoch() + reboot_time});
3208 }
3209
3210 // Make everything perfectly quiet.
3211 event_loop_factory.SkipTimingReport();
3212 event_loop_factory.DisableStatistics();
3213
3214 std::vector<std::string> filenames;
3215 {
3216 LoggerState pi1_logger = MakeLoggerState(
3217 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3218 LoggerState pi3_logger = MakeLoggerState(
3219 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3220 {
3221 // And now start the logger.
3222 LoggerState pi2_logger = MakeLoggerState(
3223 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3224
3225 event_loop_factory.RunFor(chrono::milliseconds(1000));
3226
3227 pi1_logger.StartLogger(kLogfile1_1);
3228 pi3_logger.StartLogger(kLogfile3_1);
3229 pi2_logger.StartLogger(kLogfile2_1);
3230
3231 event_loop_factory.RunFor(chrono::milliseconds(10000));
3232
3233 // Now that we've got a start time in the past, turn on data.
3234 event_loop_factory.EnableStatistics();
3235 std::unique_ptr<aos::EventLoop> ping_event_loop =
3236 pi1->MakeEventLoop("ping");
3237 Ping ping(ping_event_loop.get());
3238
3239 pi2->AlwaysStart<Pong>("pong");
3240
3241 event_loop_factory.RunFor(chrono::milliseconds(3000));
3242
3243 pi2_logger.AppendAllFilenames(&filenames);
3244
3245 // Stop logging on pi2 before rebooting and completely shut off all
3246 // messages on pi2.
3247 pi2->DisableStatistics();
3248 pi1->Disconnect(pi2->node());
3249 pi2->Disconnect(pi1->node());
3250 }
3251 event_loop_factory.RunFor(chrono::milliseconds(7000));
3252 // pi2 now reboots.
3253 {
3254 event_loop_factory.RunFor(chrono::milliseconds(1000));
3255
3256 // Start logging again on pi2 after it is up.
3257 LoggerState pi2_logger = MakeLoggerState(
3258 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3259 pi2_logger.StartLogger(kLogfile2_2);
3260
3261 event_loop_factory.RunFor(chrono::milliseconds(10000));
3262 // And, now that we have a start time in the log, turn data back on.
3263 pi2->EnableStatistics();
3264 pi1->Connect(pi2->node());
3265 pi2->Connect(pi1->node());
3266
3267 pi2->AlwaysStart<Pong>("pong");
3268 std::unique_ptr<aos::EventLoop> ping_event_loop =
3269 pi1->MakeEventLoop("ping");
3270 Ping ping(ping_event_loop.get());
3271
3272 event_loop_factory.RunFor(chrono::milliseconds(3000));
3273
3274 pi2_logger.AppendAllFilenames(&filenames);
3275 }
3276
3277 pi1_logger.AppendAllFilenames(&filenames);
3278 pi3_logger.AppendAllFilenames(&filenames);
3279 }
3280
3281 // Confirm that we can parse the result. LogReader has enough internal CHECKs
3282 // to confirm the right thing happened.
3283 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3284 auto result = ConfirmReadable(filenames);
3285 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
3286 chrono::seconds(1)));
3287 EXPECT_THAT(result[0].second,
3288 ::testing::ElementsAre(realtime_clock::epoch() +
3289 chrono::microseconds(34990350)));
3290
3291 EXPECT_THAT(result[1].first,
3292 ::testing::ElementsAre(
3293 realtime_clock::epoch() + chrono::seconds(1),
3294 realtime_clock::epoch() + chrono::microseconds(3323000)));
3295 EXPECT_THAT(result[1].second,
3296 ::testing::ElementsAre(
3297 realtime_clock::epoch() + chrono::microseconds(13990200),
3298 realtime_clock::epoch() + chrono::microseconds(16313200)));
3299
3300 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
3301 chrono::seconds(1)));
3302 EXPECT_THAT(result[2].second,
3303 ::testing::ElementsAre(realtime_clock::epoch() +
3304 chrono::microseconds(34900150)));
3305}
3306
3307// Tests that local data before remote data after reboot is properly replayed.
3308// We only trigger a reboot in the timestamp interpolation function when solving
3309// the timestamp problem when we actually have a point in the function. This
3310// originally only happened when a point passes the noncausal filter. At the
3311// start of time for the second boot, if we aren't careful, we will have
3312// messages which need to be published at times before the boot. This happens
3313// when a local message is in the log before a forwarded message, so there is no
3314// point in the interpolation function. This delays the reboot. So, we need to
3315// recreate that situation and make sure it doesn't come back.
3316TEST(MultinodeRebootLoggerTest,
3317 LocalMessageBeforeRemoteBeforeStartAfterReboot) {
3318 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3319 aos::configuration::ReadConfig(ArtifactPath(
3320 "aos/events/logging/multinode_pingpong_split3_config.json"));
3321 message_bridge::TestingTimeConverter time_converter(
3322 configuration::NodesCount(&config.message()));
3323 SimulatedEventLoopFactory event_loop_factory(&config.message());
3324 event_loop_factory.SetTimeConverter(&time_converter);
3325 NodeEventLoopFactory *const pi1 =
3326 event_loop_factory.GetNodeEventLoopFactory("pi1");
3327 const size_t pi1_index = configuration::GetNodeIndex(
3328 event_loop_factory.configuration(), pi1->node());
3329 NodeEventLoopFactory *const pi2 =
3330 event_loop_factory.GetNodeEventLoopFactory("pi2");
3331 const size_t pi2_index = configuration::GetNodeIndex(
3332 event_loop_factory.configuration(), pi2->node());
3333 NodeEventLoopFactory *const pi3 =
3334 event_loop_factory.GetNodeEventLoopFactory("pi3");
3335 const size_t pi3_index = configuration::GetNodeIndex(
3336 event_loop_factory.configuration(), pi3->node());
3337
3338 const std::string kLogfile1_1 =
3339 aos::testing::TestTmpDir() + "/multi_logfile1/";
3340 const std::string kLogfile2_1 =
3341 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3342 const std::string kLogfile2_2 =
3343 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3344 const std::string kLogfile3_1 =
3345 aos::testing::TestTmpDir() + "/multi_logfile3/";
3346 util::UnlinkRecursive(kLogfile1_1);
3347 util::UnlinkRecursive(kLogfile2_1);
3348 util::UnlinkRecursive(kLogfile2_2);
3349 util::UnlinkRecursive(kLogfile3_1);
3350 const UUID pi1_boot0 = UUID::Random();
3351 const UUID pi2_boot0 = UUID::Random();
3352 const UUID pi2_boot1 = UUID::Random();
3353 const UUID pi3_boot0 = UUID::Random();
3354 {
3355 CHECK_EQ(pi1_index, 0u);
3356 CHECK_EQ(pi2_index, 1u);
3357 CHECK_EQ(pi3_index, 2u);
3358
3359 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3360 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3361 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3362 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3363
3364 time_converter.AddNextTimestamp(
3365 distributed_clock::epoch(),
3366 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3367 BootTimestamp::epoch()});
3368 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3369 time_converter.AddNextTimestamp(
3370 distributed_clock::epoch() + reboot_time,
3371 {BootTimestamp::epoch() + reboot_time,
3372 BootTimestamp{.boot = 1,
3373 .time = monotonic_clock::epoch() + reboot_time +
3374 chrono::seconds(100)},
3375 BootTimestamp::epoch() + reboot_time});
3376 }
3377
3378 std::vector<std::string> filenames;
3379 {
3380 LoggerState pi1_logger = MakeLoggerState(
3381 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3382 LoggerState pi3_logger = MakeLoggerState(
3383 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3384 {
3385 // And now start the logger.
3386 LoggerState pi2_logger = MakeLoggerState(
3387 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3388
3389 pi1_logger.StartLogger(kLogfile1_1);
3390 pi3_logger.StartLogger(kLogfile3_1);
3391 pi2_logger.StartLogger(kLogfile2_1);
3392
3393 event_loop_factory.RunFor(chrono::milliseconds(1005));
3394
3395 // Now that we've got a start time in the past, turn on data.
3396 std::unique_ptr<aos::EventLoop> ping_event_loop =
3397 pi1->MakeEventLoop("ping");
3398 Ping ping(ping_event_loop.get());
3399
3400 pi2->AlwaysStart<Pong>("pong");
3401
3402 event_loop_factory.RunFor(chrono::milliseconds(3000));
3403
3404 pi2_logger.AppendAllFilenames(&filenames);
3405
3406 // Disable any remote messages on pi2.
3407 pi1->Disconnect(pi2->node());
3408 pi2->Disconnect(pi1->node());
3409 }
3410 event_loop_factory.RunFor(chrono::milliseconds(995));
3411 // pi2 now reboots at 5 seconds.
3412 {
3413 event_loop_factory.RunFor(chrono::milliseconds(1000));
3414
3415 // Make local stuff happen before we start logging and connect the remote.
3416 pi2->AlwaysStart<Pong>("pong");
3417 std::unique_ptr<aos::EventLoop> ping_event_loop =
3418 pi1->MakeEventLoop("ping");
3419 Ping ping(ping_event_loop.get());
3420 event_loop_factory.RunFor(chrono::milliseconds(1005));
3421
3422 // Start logging again on pi2 after it is up.
3423 LoggerState pi2_logger = MakeLoggerState(
3424 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3425 pi2_logger.StartLogger(kLogfile2_2);
3426
3427 // And allow remote messages now that we have some local ones.
3428 pi1->Connect(pi2->node());
3429 pi2->Connect(pi1->node());
3430
3431 event_loop_factory.RunFor(chrono::milliseconds(1000));
3432
3433 event_loop_factory.RunFor(chrono::milliseconds(3000));
3434
3435 pi2_logger.AppendAllFilenames(&filenames);
3436 }
3437
3438 pi1_logger.AppendAllFilenames(&filenames);
3439 pi3_logger.AppendAllFilenames(&filenames);
3440 }
3441
3442 // Confirm that we can parse the result. LogReader has enough internal CHECKs
3443 // to confirm the right thing happened.
3444 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3445 auto result = ConfirmReadable(filenames);
3446
3447 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3448 EXPECT_THAT(result[0].second,
3449 ::testing::ElementsAre(realtime_clock::epoch() +
3450 chrono::microseconds(11000350)));
3451
3452 EXPECT_THAT(result[1].first,
3453 ::testing::ElementsAre(
3454 realtime_clock::epoch(),
3455 realtime_clock::epoch() + chrono::microseconds(107005000)));
3456 EXPECT_THAT(result[1].second,
3457 ::testing::ElementsAre(
3458 realtime_clock::epoch() + chrono::microseconds(4000150),
3459 realtime_clock::epoch() + chrono::microseconds(111000200)));
3460
3461 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3462 EXPECT_THAT(result[2].second,
3463 ::testing::ElementsAre(realtime_clock::epoch() +
3464 chrono::microseconds(11000150)));
3465
3466 auto start_stop_result = ConfirmReadable(
3467 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3468 realtime_clock::epoch() + chrono::milliseconds(3000));
3469
3470 EXPECT_THAT(
3471 start_stop_result[0].first,
3472 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3473 EXPECT_THAT(
3474 start_stop_result[0].second,
3475 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3476 EXPECT_THAT(
3477 start_stop_result[1].first,
3478 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3479 EXPECT_THAT(
3480 start_stop_result[1].second,
3481 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3482 EXPECT_THAT(
3483 start_stop_result[2].first,
3484 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3485 EXPECT_THAT(
3486 start_stop_result[2].second,
3487 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3488}
3489
3490// Tests that setting the start and stop flags across a reboot works as
3491// expected.
3492TEST(MultinodeRebootLoggerTest, RebootStartStopTimes) {
3493 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3494 aos::configuration::ReadConfig(ArtifactPath(
3495 "aos/events/logging/multinode_pingpong_split3_config.json"));
3496 message_bridge::TestingTimeConverter time_converter(
3497 configuration::NodesCount(&config.message()));
3498 SimulatedEventLoopFactory event_loop_factory(&config.message());
3499 event_loop_factory.SetTimeConverter(&time_converter);
3500 NodeEventLoopFactory *const pi1 =
3501 event_loop_factory.GetNodeEventLoopFactory("pi1");
3502 const size_t pi1_index = configuration::GetNodeIndex(
3503 event_loop_factory.configuration(), pi1->node());
3504 NodeEventLoopFactory *const pi2 =
3505 event_loop_factory.GetNodeEventLoopFactory("pi2");
3506 const size_t pi2_index = configuration::GetNodeIndex(
3507 event_loop_factory.configuration(), pi2->node());
3508 NodeEventLoopFactory *const pi3 =
3509 event_loop_factory.GetNodeEventLoopFactory("pi3");
3510 const size_t pi3_index = configuration::GetNodeIndex(
3511 event_loop_factory.configuration(), pi3->node());
3512
3513 const std::string kLogfile1_1 =
3514 aos::testing::TestTmpDir() + "/multi_logfile1/";
3515 const std::string kLogfile2_1 =
3516 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3517 const std::string kLogfile2_2 =
3518 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3519 const std::string kLogfile3_1 =
3520 aos::testing::TestTmpDir() + "/multi_logfile3/";
3521 util::UnlinkRecursive(kLogfile1_1);
3522 util::UnlinkRecursive(kLogfile2_1);
3523 util::UnlinkRecursive(kLogfile2_2);
3524 util::UnlinkRecursive(kLogfile3_1);
3525 {
3526 CHECK_EQ(pi1_index, 0u);
3527 CHECK_EQ(pi2_index, 1u);
3528 CHECK_EQ(pi3_index, 2u);
3529
3530 time_converter.AddNextTimestamp(
3531 distributed_clock::epoch(),
3532 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3533 BootTimestamp::epoch()});
3534 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3535 time_converter.AddNextTimestamp(
3536 distributed_clock::epoch() + reboot_time,
3537 {BootTimestamp::epoch() + reboot_time,
3538 BootTimestamp{.boot = 1,
3539 .time = monotonic_clock::epoch() + reboot_time},
3540 BootTimestamp::epoch() + reboot_time});
3541 }
3542
3543 std::vector<std::string> filenames;
3544 {
3545 LoggerState pi1_logger = MakeLoggerState(
3546 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3547 LoggerState pi3_logger = MakeLoggerState(
3548 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3549 {
3550 // And now start the logger.
3551 LoggerState pi2_logger = MakeLoggerState(
3552 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3553
3554 pi1_logger.StartLogger(kLogfile1_1);
3555 pi3_logger.StartLogger(kLogfile3_1);
3556 pi2_logger.StartLogger(kLogfile2_1);
3557
3558 event_loop_factory.RunFor(chrono::milliseconds(1005));
3559
3560 // Now that we've got a start time in the past, turn on data.
3561 std::unique_ptr<aos::EventLoop> ping_event_loop =
3562 pi1->MakeEventLoop("ping");
3563 Ping ping(ping_event_loop.get());
3564
3565 pi2->AlwaysStart<Pong>("pong");
3566
3567 event_loop_factory.RunFor(chrono::milliseconds(3000));
3568
3569 pi2_logger.AppendAllFilenames(&filenames);
3570 }
3571 event_loop_factory.RunFor(chrono::milliseconds(995));
3572 // pi2 now reboots at 5 seconds.
3573 {
3574 event_loop_factory.RunFor(chrono::milliseconds(1000));
3575
3576 // Make local stuff happen before we start logging and connect the remote.
3577 pi2->AlwaysStart<Pong>("pong");
3578 std::unique_ptr<aos::EventLoop> ping_event_loop =
3579 pi1->MakeEventLoop("ping");
3580 Ping ping(ping_event_loop.get());
3581 event_loop_factory.RunFor(chrono::milliseconds(5));
3582
3583 // Start logging again on pi2 after it is up.
3584 LoggerState pi2_logger = MakeLoggerState(
3585 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3586 pi2_logger.StartLogger(kLogfile2_2);
3587
3588 event_loop_factory.RunFor(chrono::milliseconds(5000));
3589
3590 pi2_logger.AppendAllFilenames(&filenames);
3591 }
3592
3593 pi1_logger.AppendAllFilenames(&filenames);
3594 pi3_logger.AppendAllFilenames(&filenames);
3595 }
3596
3597 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3598 auto result = ConfirmReadable(filenames);
3599
3600 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3601 EXPECT_THAT(result[0].second,
3602 ::testing::ElementsAre(realtime_clock::epoch() +
3603 chrono::microseconds(11000350)));
3604
3605 EXPECT_THAT(result[1].first,
3606 ::testing::ElementsAre(
3607 realtime_clock::epoch(),
3608 realtime_clock::epoch() + chrono::microseconds(6005000)));
3609 EXPECT_THAT(result[1].second,
3610 ::testing::ElementsAre(
3611 realtime_clock::epoch() + chrono::microseconds(4900150),
3612 realtime_clock::epoch() + chrono::microseconds(11000200)));
3613
3614 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3615 EXPECT_THAT(result[2].second,
3616 ::testing::ElementsAre(realtime_clock::epoch() +
3617 chrono::microseconds(11000150)));
3618
3619 // Confirm we observed the correct start and stop times. We should see the
3620 // reboot here.
3621 auto start_stop_result = ConfirmReadable(
3622 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3623 realtime_clock::epoch() + chrono::milliseconds(8000));
3624
3625 EXPECT_THAT(
3626 start_stop_result[0].first,
3627 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3628 EXPECT_THAT(
3629 start_stop_result[0].second,
3630 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3631 EXPECT_THAT(start_stop_result[1].first,
3632 ::testing::ElementsAre(
3633 realtime_clock::epoch() + chrono::seconds(2),
3634 realtime_clock::epoch() + chrono::microseconds(6005000)));
3635 EXPECT_THAT(start_stop_result[1].second,
3636 ::testing::ElementsAre(
3637 realtime_clock::epoch() + chrono::microseconds(4900150),
3638 realtime_clock::epoch() + chrono::seconds(8)));
3639 EXPECT_THAT(
3640 start_stop_result[2].first,
3641 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3642 EXPECT_THAT(
3643 start_stop_result[2].second,
3644 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3645}
3646
3647// Tests that we properly handle one direction being down.
3648TEST(MissingDirectionTest, OneDirection) {
3649 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3650 aos::configuration::ReadConfig(ArtifactPath(
3651 "aos/events/logging/multinode_pingpong_split4_config.json"));
3652 message_bridge::TestingTimeConverter time_converter(
3653 configuration::NodesCount(&config.message()));
3654 SimulatedEventLoopFactory event_loop_factory(&config.message());
3655 event_loop_factory.SetTimeConverter(&time_converter);
3656
3657 NodeEventLoopFactory *const pi1 =
3658 event_loop_factory.GetNodeEventLoopFactory("pi1");
3659 const size_t pi1_index = configuration::GetNodeIndex(
3660 event_loop_factory.configuration(), pi1->node());
3661 NodeEventLoopFactory *const pi2 =
3662 event_loop_factory.GetNodeEventLoopFactory("pi2");
3663 const size_t pi2_index = configuration::GetNodeIndex(
3664 event_loop_factory.configuration(), pi2->node());
3665 std::vector<std::string> filenames;
3666
3667 {
3668 CHECK_EQ(pi1_index, 0u);
3669 CHECK_EQ(pi2_index, 1u);
3670
3671 time_converter.AddNextTimestamp(
3672 distributed_clock::epoch(),
3673 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3674
3675 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3676 time_converter.AddNextTimestamp(
3677 distributed_clock::epoch() + reboot_time,
3678 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3679 BootTimestamp::epoch() + reboot_time});
3680 }
3681
3682 const std::string kLogfile2_1 =
3683 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3684 const std::string kLogfile1_1 =
3685 aos::testing::TestTmpDir() + "/multi_logfile1.1/";
3686 util::UnlinkRecursive(kLogfile2_1);
3687 util::UnlinkRecursive(kLogfile1_1);
3688
3689 pi2->Disconnect(pi1->node());
3690
3691 pi1->AlwaysStart<Ping>("ping");
3692 pi2->AlwaysStart<Pong>("pong");
3693
3694 {
3695 LoggerState pi2_logger = MakeLoggerState(
3696 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3697
3698 event_loop_factory.RunFor(chrono::milliseconds(95));
3699
3700 pi2_logger.StartLogger(kLogfile2_1);
3701
3702 event_loop_factory.RunFor(chrono::milliseconds(6000));
3703
3704 pi2->Connect(pi1->node());
3705
3706 LoggerState pi1_logger = MakeLoggerState(
3707 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3708 pi1_logger.StartLogger(kLogfile1_1);
3709
3710 event_loop_factory.RunFor(chrono::milliseconds(5000));
3711 pi1_logger.AppendAllFilenames(&filenames);
3712 pi2_logger.AppendAllFilenames(&filenames);
3713 }
3714
3715 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3716 ConfirmReadable(filenames);
3717}
3718
3719// Tests that we properly handle only one direction ever existing after a
3720// reboot.
3721TEST(MissingDirectionTest, OneDirectionAfterReboot) {
3722 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3723 aos::configuration::ReadConfig(ArtifactPath(
3724 "aos/events/logging/multinode_pingpong_split4_config.json"));
3725 message_bridge::TestingTimeConverter time_converter(
3726 configuration::NodesCount(&config.message()));
3727 SimulatedEventLoopFactory event_loop_factory(&config.message());
3728 event_loop_factory.SetTimeConverter(&time_converter);
3729
3730 NodeEventLoopFactory *const pi1 =
3731 event_loop_factory.GetNodeEventLoopFactory("pi1");
3732 const size_t pi1_index = configuration::GetNodeIndex(
3733 event_loop_factory.configuration(), pi1->node());
3734 NodeEventLoopFactory *const pi2 =
3735 event_loop_factory.GetNodeEventLoopFactory("pi2");
3736 const size_t pi2_index = configuration::GetNodeIndex(
3737 event_loop_factory.configuration(), pi2->node());
3738 std::vector<std::string> filenames;
3739
3740 {
3741 CHECK_EQ(pi1_index, 0u);
3742 CHECK_EQ(pi2_index, 1u);
3743
3744 time_converter.AddNextTimestamp(
3745 distributed_clock::epoch(),
3746 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3747
3748 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3749 time_converter.AddNextTimestamp(
3750 distributed_clock::epoch() + reboot_time,
3751 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3752 BootTimestamp::epoch() + reboot_time});
3753 }
3754
3755 const std::string kLogfile2_1 =
3756 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3757 util::UnlinkRecursive(kLogfile2_1);
3758
3759 pi1->AlwaysStart<Ping>("ping");
3760
3761 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3762 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3763 // second boot.
3764 {
3765 LoggerState pi2_logger = MakeLoggerState(
3766 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3767
3768 event_loop_factory.RunFor(chrono::milliseconds(95));
3769
3770 pi2_logger.StartLogger(kLogfile2_1);
3771
3772 event_loop_factory.RunFor(chrono::milliseconds(4000));
3773
3774 pi2->Disconnect(pi1->node());
3775
3776 event_loop_factory.RunFor(chrono::milliseconds(1000));
3777 pi1->AlwaysStart<Ping>("ping");
3778
3779 event_loop_factory.RunFor(chrono::milliseconds(5000));
3780 pi2_logger.AppendAllFilenames(&filenames);
3781 }
3782
3783 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3784 ConfirmReadable(filenames);
3785}
3786
3787// Tests that we properly handle only one direction ever existing after a reboot
3788// with only reliable data.
3789TEST(MissingDirectionTest, OneDirectionAfterRebootReliable) {
3790 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3791 aos::configuration::ReadConfig(ArtifactPath(
3792 "aos/events/logging/multinode_pingpong_split4_reliable_config.json"));
3793 message_bridge::TestingTimeConverter time_converter(
3794 configuration::NodesCount(&config.message()));
3795 SimulatedEventLoopFactory event_loop_factory(&config.message());
3796 event_loop_factory.SetTimeConverter(&time_converter);
3797
3798 NodeEventLoopFactory *const pi1 =
3799 event_loop_factory.GetNodeEventLoopFactory("pi1");
3800 const size_t pi1_index = configuration::GetNodeIndex(
3801 event_loop_factory.configuration(), pi1->node());
3802 NodeEventLoopFactory *const pi2 =
3803 event_loop_factory.GetNodeEventLoopFactory("pi2");
3804 const size_t pi2_index = configuration::GetNodeIndex(
3805 event_loop_factory.configuration(), pi2->node());
3806 std::vector<std::string> filenames;
3807
3808 {
3809 CHECK_EQ(pi1_index, 0u);
3810 CHECK_EQ(pi2_index, 1u);
3811
3812 time_converter.AddNextTimestamp(
3813 distributed_clock::epoch(),
3814 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3815
3816 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3817 time_converter.AddNextTimestamp(
3818 distributed_clock::epoch() + reboot_time,
3819 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3820 BootTimestamp::epoch() + reboot_time});
3821 }
3822
3823 const std::string kLogfile2_1 =
3824 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3825 util::UnlinkRecursive(kLogfile2_1);
3826
3827 pi1->AlwaysStart<Ping>("ping");
3828
3829 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3830 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3831 // second boot.
3832 {
3833 LoggerState pi2_logger = MakeLoggerState(
3834 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3835
3836 event_loop_factory.RunFor(chrono::milliseconds(95));
3837
3838 pi2_logger.StartLogger(kLogfile2_1);
3839
3840 event_loop_factory.RunFor(chrono::milliseconds(4000));
3841
3842 pi2->Disconnect(pi1->node());
3843
3844 event_loop_factory.RunFor(chrono::milliseconds(1000));
3845 pi1->AlwaysStart<Ping>("ping");
3846
3847 event_loop_factory.RunFor(chrono::milliseconds(5000));
3848 pi2_logger.AppendAllFilenames(&filenames);
3849 }
3850
3851 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3852 ConfirmReadable(filenames);
3853}
3854
Brian Smartte67d7112023-03-20 12:06:30 -07003855// Tests that we properly handle only one direction ever existing after a reboot
3856// with mixed unreliable vs reliable, where reliable has an earlier timestamp
3857// than unreliable.
3858TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase1) {
3859 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3860 aos::configuration::ReadConfig(ArtifactPath(
3861 "aos/events/logging/multinode_pingpong_split4_mixed1_config.json"));
3862 message_bridge::TestingTimeConverter time_converter(
3863 configuration::NodesCount(&config.message()));
3864 SimulatedEventLoopFactory event_loop_factory(&config.message());
3865 event_loop_factory.SetTimeConverter(&time_converter);
3866
3867 NodeEventLoopFactory *const pi1 =
3868 event_loop_factory.GetNodeEventLoopFactory("pi1");
3869 const size_t pi1_index = configuration::GetNodeIndex(
3870 event_loop_factory.configuration(), pi1->node());
3871 NodeEventLoopFactory *const pi2 =
3872 event_loop_factory.GetNodeEventLoopFactory("pi2");
3873 const size_t pi2_index = configuration::GetNodeIndex(
3874 event_loop_factory.configuration(), pi2->node());
3875 std::vector<std::string> filenames;
3876
3877 {
3878 CHECK_EQ(pi1_index, 0u);
3879 CHECK_EQ(pi2_index, 1u);
3880
3881 time_converter.AddNextTimestamp(
3882 distributed_clock::epoch(),
3883 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3884
3885 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3886 time_converter.AddNextTimestamp(
3887 distributed_clock::epoch() + reboot_time,
3888 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3889 BootTimestamp::epoch() + reboot_time});
3890 }
3891
3892 const std::string kLogfile2_1 =
3893 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3894 util::UnlinkRecursive(kLogfile2_1);
3895
3896 // The following sequence using the above reference config creates
3897 // a reliable message timestamp < unreliable message timestamp.
3898 {
3899 pi1->DisableStatistics();
3900 pi2->DisableStatistics();
3901
3902 event_loop_factory.RunFor(chrono::milliseconds(95));
3903
3904 pi1->AlwaysStart<Ping>("ping");
3905
3906 event_loop_factory.RunFor(chrono::milliseconds(5250));
3907
3908 pi1->EnableStatistics();
3909
3910 event_loop_factory.RunFor(chrono::milliseconds(1000));
3911
3912 LoggerState pi2_logger = MakeLoggerState(
3913 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3914
3915 pi2_logger.StartLogger(kLogfile2_1);
3916
3917 event_loop_factory.RunFor(chrono::milliseconds(5000));
3918 pi2_logger.AppendAllFilenames(&filenames);
3919 }
3920
3921 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3922 ConfirmReadable(filenames);
3923}
3924
3925// Tests that we properly handle only one direction ever existing after a reboot
3926// with mixed unreliable vs reliable, where unreliable has an earlier timestamp
3927// than reliable.
3928TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase2) {
3929 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3930 aos::configuration::ReadConfig(ArtifactPath(
3931 "aos/events/logging/multinode_pingpong_split4_mixed2_config.json"));
3932 message_bridge::TestingTimeConverter time_converter(
3933 configuration::NodesCount(&config.message()));
3934 SimulatedEventLoopFactory event_loop_factory(&config.message());
3935 event_loop_factory.SetTimeConverter(&time_converter);
3936
3937 NodeEventLoopFactory *const pi1 =
3938 event_loop_factory.GetNodeEventLoopFactory("pi1");
3939 const size_t pi1_index = configuration::GetNodeIndex(
3940 event_loop_factory.configuration(), pi1->node());
3941 NodeEventLoopFactory *const pi2 =
3942 event_loop_factory.GetNodeEventLoopFactory("pi2");
3943 const size_t pi2_index = configuration::GetNodeIndex(
3944 event_loop_factory.configuration(), pi2->node());
3945 std::vector<std::string> filenames;
3946
3947 {
3948 CHECK_EQ(pi1_index, 0u);
3949 CHECK_EQ(pi2_index, 1u);
3950
3951 time_converter.AddNextTimestamp(
3952 distributed_clock::epoch(),
3953 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3954
3955 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3956 time_converter.AddNextTimestamp(
3957 distributed_clock::epoch() + reboot_time,
3958 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3959 BootTimestamp::epoch() + reboot_time});
3960 }
3961
3962 const std::string kLogfile2_1 =
3963 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3964 util::UnlinkRecursive(kLogfile2_1);
3965
3966 // The following sequence using the above reference config creates
3967 // an unreliable message timestamp < reliable message timestamp.
3968 {
3969 pi1->DisableStatistics();
3970 pi2->DisableStatistics();
3971
3972 event_loop_factory.RunFor(chrono::milliseconds(95));
3973
3974 pi1->AlwaysStart<Ping>("ping");
3975
3976 event_loop_factory.RunFor(chrono::milliseconds(5250));
3977
3978 pi1->EnableStatistics();
3979
3980 event_loop_factory.RunFor(chrono::milliseconds(1000));
3981
3982 LoggerState pi2_logger = MakeLoggerState(
3983 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3984
3985 pi2_logger.StartLogger(kLogfile2_1);
3986
3987 event_loop_factory.RunFor(chrono::milliseconds(5000));
3988 pi2_logger.AppendAllFilenames(&filenames);
3989 }
3990
3991 const std::vector<LogFile> sorted_parts = SortParts(filenames);
3992 ConfirmReadable(filenames);
3993}
3994
Naman Guptaa63aa132023-03-22 20:06:34 -07003995// Tests that we properly handle what used to be a time violation in one
3996// direction. This can occur when one direction goes down after sending some
3997// data, but the other keeps working. The down direction ends up resolving to a
3998// straight line in the noncausal filter, where the direction which is still up
3999// can cross that line. Really, time progressed along just fine but we assumed
4000// that the offset was a line when it could have deviated by up to 1ms/second.
4001TEST_P(MultinodeLoggerTest, OneDirectionTimeDrift) {
4002 std::vector<std::string> filenames;
4003
4004 CHECK_EQ(pi1_index_, 0u);
4005 CHECK_EQ(pi2_index_, 1u);
4006
4007 time_converter_.AddNextTimestamp(
4008 distributed_clock::epoch(),
4009 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4010
4011 const chrono::nanoseconds before_disconnect_duration =
4012 time_converter_.AddMonotonic(
4013 {chrono::milliseconds(1000), chrono::milliseconds(1000)});
4014
4015 const chrono::nanoseconds test_duration =
4016 time_converter_.AddMonotonic(
4017 {chrono::milliseconds(1000), chrono::milliseconds(1000)}) +
4018 time_converter_.AddMonotonic(
4019 {chrono::milliseconds(10000),
4020 chrono::milliseconds(10000) - chrono::milliseconds(5)}) +
4021 time_converter_.AddMonotonic(
4022 {chrono::milliseconds(10000),
4023 chrono::milliseconds(10000) + chrono::milliseconds(5)});
4024
4025 const std::string kLogfile =
4026 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
4027 util::UnlinkRecursive(kLogfile);
4028
4029 {
4030 LoggerState pi2_logger = MakeLogger(pi2_);
4031 pi2_logger.StartLogger(kLogfile);
4032 event_loop_factory_.RunFor(before_disconnect_duration);
4033
4034 pi2_->Disconnect(pi1_->node());
4035
4036 event_loop_factory_.RunFor(test_duration);
4037 pi2_->Connect(pi1_->node());
4038
4039 event_loop_factory_.RunFor(chrono::milliseconds(5000));
4040 pi2_logger.AppendAllFilenames(&filenames);
4041 }
4042
4043 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4044 ConfirmReadable(filenames);
4045}
4046
4047// Tests that we can replay a logfile that has timestamps such that at least one
4048// node's epoch is at a positive distributed_clock (and thus will have to be
4049// booted after the other node(s)).
4050TEST_P(MultinodeLoggerTest, StartOneNodeBeforeOther) {
4051 std::vector<std::string> filenames;
4052
4053 CHECK_EQ(pi1_index_, 0u);
4054 CHECK_EQ(pi2_index_, 1u);
4055
4056 time_converter_.AddNextTimestamp(
4057 distributed_clock::epoch(),
4058 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4059
4060 const chrono::nanoseconds before_reboot_duration = chrono::milliseconds(1000);
4061 time_converter_.RebootAt(
4062 0, distributed_clock::time_point(before_reboot_duration));
4063
4064 const chrono::nanoseconds test_duration = time_converter_.AddMonotonic(
4065 {chrono::milliseconds(10000), chrono::milliseconds(10000)});
4066
4067 const std::string kLogfile =
4068 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
4069 util::UnlinkRecursive(kLogfile);
4070
4071 pi2_->Disconnect(pi1_->node());
4072 pi1_->Disconnect(pi2_->node());
4073
4074 {
4075 LoggerState pi2_logger = MakeLogger(pi2_);
4076
4077 pi2_logger.StartLogger(kLogfile);
4078 event_loop_factory_.RunFor(before_reboot_duration);
4079
4080 pi2_->Connect(pi1_->node());
4081 pi1_->Connect(pi2_->node());
4082
4083 event_loop_factory_.RunFor(test_duration);
4084
4085 pi2_logger.AppendAllFilenames(&filenames);
4086 }
4087
4088 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4089 ConfirmReadable(filenames);
4090
4091 {
4092 LogReader reader(sorted_parts);
4093 SimulatedEventLoopFactory replay_factory(reader.configuration());
4094 reader.RegisterWithoutStarting(&replay_factory);
4095
4096 NodeEventLoopFactory *const replay_node =
4097 reader.event_loop_factory()->GetNodeEventLoopFactory("pi1");
4098
4099 std::unique_ptr<EventLoop> test_event_loop =
4100 replay_node->MakeEventLoop("test_reader");
4101 replay_node->OnStartup([replay_node]() {
4102 // Check that we didn't boot until at least t=0.
4103 CHECK_LE(monotonic_clock::epoch(), replay_node->monotonic_now());
4104 });
4105 test_event_loop->OnRun([&test_event_loop]() {
4106 // Check that we didn't boot until at least t=0.
4107 EXPECT_LE(monotonic_clock::epoch(), test_event_loop->monotonic_now());
4108 });
4109 reader.event_loop_factory()->Run();
4110 reader.Deregister();
4111 }
4112}
4113
4114// Tests that when we have a loop without all the logs at all points in time, we
4115// can sort it properly.
4116TEST(MultinodeLoggerLoopTest, Loop) {
4117 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4118 aos::configuration::ReadConfig(ArtifactPath(
4119 "aos/events/logging/multinode_pingpong_triangle_split_config.json"));
4120 message_bridge::TestingTimeConverter time_converter(
4121 configuration::NodesCount(&config.message()));
4122 SimulatedEventLoopFactory event_loop_factory(&config.message());
4123 event_loop_factory.SetTimeConverter(&time_converter);
4124
4125 NodeEventLoopFactory *const pi1 =
4126 event_loop_factory.GetNodeEventLoopFactory("pi1");
4127 NodeEventLoopFactory *const pi2 =
4128 event_loop_factory.GetNodeEventLoopFactory("pi2");
4129 NodeEventLoopFactory *const pi3 =
4130 event_loop_factory.GetNodeEventLoopFactory("pi3");
4131
4132 const std::string kLogfile1_1 =
4133 aos::testing::TestTmpDir() + "/multi_logfile1/";
4134 const std::string kLogfile2_1 =
4135 aos::testing::TestTmpDir() + "/multi_logfile2/";
4136 const std::string kLogfile3_1 =
4137 aos::testing::TestTmpDir() + "/multi_logfile3/";
4138 util::UnlinkRecursive(kLogfile1_1);
4139 util::UnlinkRecursive(kLogfile2_1);
4140 util::UnlinkRecursive(kLogfile3_1);
4141
4142 {
4143 // Make pi1 boot before everything else.
4144 time_converter.AddNextTimestamp(
4145 distributed_clock::epoch(),
4146 {BootTimestamp::epoch(),
4147 BootTimestamp::epoch() - chrono::milliseconds(100),
4148 BootTimestamp::epoch() - chrono::milliseconds(300)});
4149 }
4150
4151 // We want to setup a situation such that 2 of the 3 legs of the loop are very
4152 // confident about time being X, and the third leg is pulling the average off
4153 // to one side.
4154 //
4155 // It's easiest to visualize this in timestamp_plotter.
4156
4157 std::vector<std::string> filenames;
4158 {
4159 // Have pi1 send out a reliable message at startup. This sets up a long
4160 // forwarding time message at the start to bias time.
4161 std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
4162 {
4163 aos::Sender<examples::Ping> ping_sender =
4164 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
4165
4166 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
4167 examples::Ping::Builder ping_builder =
4168 builder.MakeBuilder<examples::Ping>();
4169 CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
4170 }
4171
4172 // Wait a while so there's enough data to let the worst case be rather off.
4173 event_loop_factory.RunFor(chrono::seconds(1000));
4174
4175 // Now start a receiving node first. This sets up 2 tight bounds between 2
4176 // of the nodes.
4177 LoggerState pi2_logger = MakeLoggerState(
4178 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4179 pi2_logger.StartLogger(kLogfile2_1);
4180
4181 event_loop_factory.RunFor(chrono::seconds(100));
4182
4183 // And now start the third leg.
4184 LoggerState pi3_logger = MakeLoggerState(
4185 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4186 pi3_logger.StartLogger(kLogfile3_1);
4187
4188 LoggerState pi1_logger = MakeLoggerState(
4189 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4190 pi1_logger.StartLogger(kLogfile1_1);
4191
4192 event_loop_factory.RunFor(chrono::seconds(100));
4193
4194 pi1_logger.AppendAllFilenames(&filenames);
4195 pi2_logger.AppendAllFilenames(&filenames);
4196 pi3_logger.AppendAllFilenames(&filenames);
4197 }
4198
4199 // Make sure we can read this.
4200 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4201 auto result = ConfirmReadable(filenames);
4202}
4203
Austin Schuh08dba8f2023-05-01 08:29:30 -07004204// Tests that RestartLogging works in the simple case. Unfortunately, the
4205// failure cases involve simulating time elapsing in callbacks, which is really
4206// hard. The best we can reasonably do is make sure 2 back to back logs are
4207// parseable together.
4208TEST_P(MultinodeLoggerTest, RestartLogging) {
4209 time_converter_.AddMonotonic(
4210 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
4211 std::vector<std::string> filenames;
4212 {
4213 LoggerState pi1_logger = MakeLogger(pi1_);
4214
4215 event_loop_factory_.RunFor(chrono::milliseconds(95));
4216
4217 StartLogger(&pi1_logger, logfile_base1_);
4218 aos::monotonic_clock::time_point last_rotation_time =
4219 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07004220 pi1_logger.logger->set_on_logged_period(
4221 [&](aos::monotonic_clock::time_point) {
4222 const auto now = pi1_logger.event_loop->monotonic_now();
4223 if (now > last_rotation_time + std::chrono::seconds(5)) {
4224 pi1_logger.AppendAllFilenames(&filenames);
4225 std::unique_ptr<MultiNodeFilesLogNamer> namer =
4226 pi1_logger.MakeLogNamer(logfile_base2_);
4227 pi1_logger.log_namer = namer.get();
Austin Schuh08dba8f2023-05-01 08:29:30 -07004228
Austin Schuh2f864452023-07-17 14:53:08 -07004229 pi1_logger.logger->RestartLogging(std::move(namer));
4230 last_rotation_time = now;
4231 }
4232 });
Austin Schuh08dba8f2023-05-01 08:29:30 -07004233
4234 event_loop_factory_.RunFor(chrono::milliseconds(7000));
4235
4236 pi1_logger.AppendAllFilenames(&filenames);
4237 }
4238
4239 for (const auto &x : filenames) {
4240 LOG(INFO) << x;
4241 }
4242
4243 EXPECT_GE(filenames.size(), 2u);
4244
4245 ConfirmReadable(filenames);
4246
4247 // TODO(austin): It would be good to confirm that any one time messages end up
4248 // in both logs correctly.
4249}
4250
Naman Guptaa63aa132023-03-22 20:06:34 -07004251} // namespace testing
4252} // namespace logger
4253} // namespace aos