blob: 01f4d2f66aeb29c26addf48fe5e68823b3a160e4 [file] [log] [blame]
Brian Smartte67d7112023-03-20 12:06:30 -07001#include <algorithm>
2
Philipp Schrader790cb542023-07-05 21:06:52 -07003#include "gmock/gmock.h"
4#include "gtest/gtest.h"
5
Naman Guptaa63aa132023-03-22 20:06:34 -07006#include "aos/events/logging/log_reader.h"
7#include "aos/events/logging/multinode_logger_test_lib.h"
8#include "aos/events/message_counter.h"
9#include "aos/events/ping_lib.h"
10#include "aos/events/pong_lib.h"
11#include "aos/network/remote_message_generated.h"
12#include "aos/network/timestamp_generated.h"
13#include "aos/testing/tmpdir.h"
Naman Guptaa63aa132023-03-22 20:06:34 -070014
15namespace aos {
16namespace logger {
17namespace testing {
18
19namespace chrono = std::chrono;
20using aos::message_bridge::RemoteMessage;
21using aos::testing::ArtifactPath;
22using aos::testing::MessageCounter;
23
Naman Guptaa63aa132023-03-22 20:06:34 -070024INSTANTIATE_TEST_SUITE_P(
25 All, MultinodeLoggerTest,
26 ::testing::Combine(
27 ::testing::Values(
28 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080029 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070030 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080031 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070032 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
33
34INSTANTIATE_TEST_SUITE_P(
35 All, MultinodeLoggerDeathTest,
36 ::testing::Combine(
37 ::testing::Values(
38 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080039 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070040 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080041 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070042 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
43
44// Tests that we can write and read simple multi-node log files.
45TEST_P(MultinodeLoggerTest, SimpleMultiNode) {
46 std::vector<std::string> actual_filenames;
47 time_converter_.StartEqual();
48
49 {
50 LoggerState pi1_logger = MakeLogger(pi1_);
51 LoggerState pi2_logger = MakeLogger(pi2_);
52
53 event_loop_factory_.RunFor(chrono::milliseconds(95));
54
55 StartLogger(&pi1_logger);
56 StartLogger(&pi2_logger);
57
58 event_loop_factory_.RunFor(chrono::milliseconds(20000));
59 pi1_logger.AppendAllFilenames(&actual_filenames);
60 pi2_logger.AppendAllFilenames(&actual_filenames);
61 }
62
63 ASSERT_THAT(actual_filenames,
64 ::testing::UnorderedElementsAreArray(logfiles_));
65
66 {
67 std::set<std::string> logfile_uuids;
68 std::set<std::string> parts_uuids;
69 // Confirm that we have the expected number of UUIDs for both the logfile
70 // UUIDs and parts UUIDs.
71 std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
72 for (std::string_view f : logfiles_) {
73 log_header.emplace_back(ReadHeader(f).value());
74 if (!log_header.back().message().has_configuration()) {
75 logfile_uuids.insert(
76 log_header.back().message().log_event_uuid()->str());
77 parts_uuids.insert(log_header.back().message().parts_uuid()->str());
78 }
79 }
80
81 EXPECT_EQ(logfile_uuids.size(), 2u);
82 if (shared()) {
83 EXPECT_EQ(parts_uuids.size(), 7u);
84 } else {
85 EXPECT_EQ(parts_uuids.size(), 8u);
86 }
87
88 // And confirm everything is on the correct node.
89 EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
90 EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi1");
91 EXPECT_EQ(log_header[4].message().node()->name()->string_view(), "pi1");
92
93 EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
94 EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi2");
95
96 EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi2");
97 EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi2");
98 EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi2");
99
100 EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi1");
101 EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi1");
102
103 EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
104 EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
105
106 if (shared()) {
107 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
108 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi2");
109 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi2");
110
111 EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi1");
112 EXPECT_EQ(log_header[18].message().node()->name()->string_view(), "pi1");
113 } else {
114 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
115 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi2");
116
117 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi1");
118 EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi1");
119
120 EXPECT_EQ(log_header[18].message().node()->name()->string_view(), "pi2");
121 EXPECT_EQ(log_header[19].message().node()->name()->string_view(), "pi2");
122 }
123
124 // And the parts index matches.
125 EXPECT_EQ(log_header[2].message().parts_index(), 0);
126 EXPECT_EQ(log_header[3].message().parts_index(), 1);
127 EXPECT_EQ(log_header[4].message().parts_index(), 2);
128
129 EXPECT_EQ(log_header[5].message().parts_index(), 0);
130 EXPECT_EQ(log_header[6].message().parts_index(), 1);
131
132 EXPECT_EQ(log_header[7].message().parts_index(), 0);
133 EXPECT_EQ(log_header[8].message().parts_index(), 1);
134 EXPECT_EQ(log_header[9].message().parts_index(), 2);
135
136 EXPECT_EQ(log_header[10].message().parts_index(), 0);
137 EXPECT_EQ(log_header[11].message().parts_index(), 1);
138
139 EXPECT_EQ(log_header[12].message().parts_index(), 0);
140 EXPECT_EQ(log_header[13].message().parts_index(), 1);
141
142 if (shared()) {
143 EXPECT_EQ(log_header[14].message().parts_index(), 0);
144 EXPECT_EQ(log_header[15].message().parts_index(), 1);
145 EXPECT_EQ(log_header[16].message().parts_index(), 2);
146
147 EXPECT_EQ(log_header[17].message().parts_index(), 0);
148 EXPECT_EQ(log_header[18].message().parts_index(), 1);
149 } else {
150 EXPECT_EQ(log_header[14].message().parts_index(), 0);
151 EXPECT_EQ(log_header[15].message().parts_index(), 1);
152
153 EXPECT_EQ(log_header[16].message().parts_index(), 0);
154 EXPECT_EQ(log_header[17].message().parts_index(), 1);
155
156 EXPECT_EQ(log_header[18].message().parts_index(), 0);
157 EXPECT_EQ(log_header[19].message().parts_index(), 1);
158 }
159 }
160
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700161 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
162 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -0700163 {
164 using ::testing::UnorderedElementsAre;
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700165 std::shared_ptr<const aos::Configuration> config = sorted_parts[0].config;
Naman Guptaa63aa132023-03-22 20:06:34 -0700166
167 // Timing reports, pings
168 EXPECT_THAT(CountChannelsData(config, logfiles_[2]),
169 UnorderedElementsAre(
170 std::make_tuple("/pi1/aos",
171 "aos.message_bridge.ServerStatistics", 1),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700172 std::make_tuple("/test", "aos.examples.Ping", 1),
173 std::make_tuple("/pi1/aos", "aos.examples.Ping", 1)))
Naman Guptaa63aa132023-03-22 20:06:34 -0700174 << " : " << logfiles_[2];
175 {
176 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700177 std::make_tuple("/pi1/aos", "aos.examples.Ping", 10),
Naman Guptaa63aa132023-03-22 20:06:34 -0700178 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 1),
179 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
180 1)};
181 if (!std::get<0>(GetParam()).shared) {
182 channel_counts.push_back(
183 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
184 "aos-message_bridge-Timestamp",
185 "aos.message_bridge.RemoteMessage", 1));
186 }
187 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
188 ::testing::UnorderedElementsAreArray(channel_counts))
189 << " : " << logfiles_[3];
190 }
191 {
192 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700193 std::make_tuple("/pi1/aos", "aos.examples.Ping", 1990),
Naman Guptaa63aa132023-03-22 20:06:34 -0700194 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 199),
195 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
196 20),
197 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
198 199),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700199 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
Naman Guptaa63aa132023-03-22 20:06:34 -0700200 std::make_tuple("/test", "aos.examples.Ping", 2000)};
201 if (!std::get<0>(GetParam()).shared) {
202 channel_counts.push_back(
203 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
204 "aos-message_bridge-Timestamp",
205 "aos.message_bridge.RemoteMessage", 199));
206 }
207 EXPECT_THAT(CountChannelsData(config, logfiles_[4]),
208 ::testing::UnorderedElementsAreArray(channel_counts))
209 << " : " << logfiles_[4];
210 }
211 // Timestamps for pong
212 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
213 UnorderedElementsAre())
214 << " : " << logfiles_[2];
215 EXPECT_THAT(
216 CountChannelsTimestamp(config, logfiles_[3]),
217 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
218 << " : " << logfiles_[3];
219 EXPECT_THAT(
220 CountChannelsTimestamp(config, logfiles_[4]),
221 UnorderedElementsAre(
222 std::make_tuple("/test", "aos.examples.Pong", 2000),
223 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)))
224 << " : " << logfiles_[4];
225
226 // Pong data.
227 EXPECT_THAT(
228 CountChannelsData(config, logfiles_[5]),
229 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 91)))
230 << " : " << logfiles_[5];
231 EXPECT_THAT(CountChannelsData(config, logfiles_[6]),
232 UnorderedElementsAre(
233 std::make_tuple("/test", "aos.examples.Pong", 1910)))
234 << " : " << logfiles_[6];
235
236 // No timestamps
237 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
238 UnorderedElementsAre())
239 << " : " << logfiles_[5];
240 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[6]),
241 UnorderedElementsAre())
242 << " : " << logfiles_[6];
243
244 // Timing reports and pongs.
245 EXPECT_THAT(CountChannelsData(config, logfiles_[7]),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700246 UnorderedElementsAre(
247 std::make_tuple("/pi2/aos", "aos.examples.Ping", 1),
248 std::make_tuple("/pi2/aos",
249 "aos.message_bridge.ServerStatistics", 1)))
Naman Guptaa63aa132023-03-22 20:06:34 -0700250 << " : " << logfiles_[7];
251 EXPECT_THAT(
252 CountChannelsData(config, logfiles_[8]),
253 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
254 << " : " << logfiles_[8];
255 EXPECT_THAT(
256 CountChannelsData(config, logfiles_[9]),
257 UnorderedElementsAre(
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700258 std::make_tuple("/pi2/aos", "aos.examples.Ping", 2000),
Naman Guptaa63aa132023-03-22 20:06:34 -0700259 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
260 std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
261 20),
262 std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
263 200),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700264 std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
Naman Guptaa63aa132023-03-22 20:06:34 -0700265 std::make_tuple("/test", "aos.examples.Pong", 2000)))
266 << " : " << logfiles_[9];
267 // And ping timestamps.
268 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[7]),
269 UnorderedElementsAre())
270 << " : " << logfiles_[7];
271 EXPECT_THAT(
272 CountChannelsTimestamp(config, logfiles_[8]),
273 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
274 << " : " << logfiles_[8];
275 EXPECT_THAT(
276 CountChannelsTimestamp(config, logfiles_[9]),
277 UnorderedElementsAre(
278 std::make_tuple("/test", "aos.examples.Ping", 2000),
279 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
280 << " : " << logfiles_[9];
281
282 // And then test that the remotely logged timestamp data files only have
283 // timestamps in them.
284 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
285 UnorderedElementsAre())
286 << " : " << logfiles_[10];
287 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
288 UnorderedElementsAre())
289 << " : " << logfiles_[11];
290 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[12]),
291 UnorderedElementsAre())
292 << " : " << logfiles_[12];
293 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[13]),
294 UnorderedElementsAre())
295 << " : " << logfiles_[13];
296
297 EXPECT_THAT(CountChannelsData(config, logfiles_[10]),
298 UnorderedElementsAre(std::make_tuple(
299 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
300 << " : " << logfiles_[10];
301 EXPECT_THAT(CountChannelsData(config, logfiles_[11]),
302 UnorderedElementsAre(std::make_tuple(
303 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
304 << " : " << logfiles_[11];
305
306 EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
307 UnorderedElementsAre(std::make_tuple(
308 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
309 << " : " << logfiles_[12];
310 EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
311 UnorderedElementsAre(std::make_tuple(
312 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
313 << " : " << logfiles_[13];
314
315 // Timestamps from pi2 on pi1, and the other way.
316 if (shared()) {
317 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
318 UnorderedElementsAre())
319 << " : " << logfiles_[14];
320 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
321 UnorderedElementsAre())
322 << " : " << logfiles_[15];
323 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
324 UnorderedElementsAre())
325 << " : " << logfiles_[16];
326 EXPECT_THAT(CountChannelsData(config, logfiles_[17]),
327 UnorderedElementsAre())
328 << " : " << logfiles_[17];
329 EXPECT_THAT(CountChannelsData(config, logfiles_[18]),
330 UnorderedElementsAre())
331 << " : " << logfiles_[18];
332
333 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
334 UnorderedElementsAre(
335 std::make_tuple("/test", "aos.examples.Ping", 1)))
336 << " : " << logfiles_[14];
337 EXPECT_THAT(
338 CountChannelsTimestamp(config, logfiles_[15]),
339 UnorderedElementsAre(
340 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 9),
341 std::make_tuple("/test", "aos.examples.Ping", 90)))
342 << " : " << logfiles_[15];
343 EXPECT_THAT(
344 CountChannelsTimestamp(config, logfiles_[16]),
345 UnorderedElementsAre(
346 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 191),
347 std::make_tuple("/test", "aos.examples.Ping", 1910)))
348 << " : " << logfiles_[16];
349 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
350 UnorderedElementsAre(std::make_tuple(
351 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
352 << " : " << logfiles_[17];
353 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[18]),
354 UnorderedElementsAre(std::make_tuple(
355 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
356 << " : " << logfiles_[18];
357 } else {
358 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
359 UnorderedElementsAre())
360 << " : " << logfiles_[14];
361 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
362 UnorderedElementsAre())
363 << " : " << logfiles_[15];
364 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
365 UnorderedElementsAre())
366 << " : " << logfiles_[16];
367 EXPECT_THAT(CountChannelsData(config, logfiles_[17]),
368 UnorderedElementsAre())
369 << " : " << logfiles_[17];
370 EXPECT_THAT(CountChannelsData(config, logfiles_[18]),
371 UnorderedElementsAre())
372 << " : " << logfiles_[18];
373 EXPECT_THAT(CountChannelsData(config, logfiles_[19]),
374 UnorderedElementsAre())
375 << " : " << logfiles_[19];
376
377 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
378 UnorderedElementsAre(std::make_tuple(
379 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
380 << " : " << logfiles_[14];
381 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
382 UnorderedElementsAre(std::make_tuple(
383 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
384 << " : " << logfiles_[15];
385 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[16]),
386 UnorderedElementsAre(std::make_tuple(
387 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
388 << " : " << logfiles_[16];
389 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
390 UnorderedElementsAre(std::make_tuple(
391 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
392 << " : " << logfiles_[17];
393 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[18]),
394 UnorderedElementsAre(
395 std::make_tuple("/test", "aos.examples.Ping", 91)))
396 << " : " << logfiles_[18];
397 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[19]),
398 UnorderedElementsAre(
399 std::make_tuple("/test", "aos.examples.Ping", 1910)))
400 << " : " << logfiles_[19];
401 }
402 }
403
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700404 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700405
406 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
407 log_reader_factory.set_send_delay(chrono::microseconds(0));
408
409 // This sends out the fetched messages and advances time to the start of the
410 // log file.
411 reader.Register(&log_reader_factory);
412
413 const Node *pi1 =
414 configuration::GetNode(log_reader_factory.configuration(), "pi1");
415 const Node *pi2 =
416 configuration::GetNode(log_reader_factory.configuration(), "pi2");
417
418 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
419 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
420 LOG(INFO) << "now pi1 "
421 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
422 LOG(INFO) << "now pi2 "
423 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
424
425 EXPECT_THAT(reader.LoggedNodes(),
426 ::testing::ElementsAre(
427 configuration::GetNode(reader.logged_configuration(), pi1),
428 configuration::GetNode(reader.logged_configuration(), pi2)));
429
430 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
431
432 std::unique_ptr<EventLoop> pi1_event_loop =
433 log_reader_factory.MakeEventLoop("test", pi1);
434 std::unique_ptr<EventLoop> pi2_event_loop =
435 log_reader_factory.MakeEventLoop("test", pi2);
436
437 int pi1_ping_count = 10;
438 int pi2_ping_count = 10;
439 int pi1_pong_count = 10;
440 int pi2_pong_count = 10;
441
442 // Confirm that the ping value matches.
443 pi1_event_loop->MakeWatcher(
444 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
445 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
446 << pi1_event_loop->context().monotonic_remote_time << " -> "
447 << pi1_event_loop->context().monotonic_event_time;
448 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
449 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
450 pi1_ping_count * chrono::milliseconds(10) +
451 monotonic_clock::epoch());
452 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
453 pi1_ping_count * chrono::milliseconds(10) +
454 realtime_clock::epoch());
455 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
456 pi1_event_loop->context().monotonic_event_time);
457 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
458 pi1_event_loop->context().realtime_event_time);
459
460 ++pi1_ping_count;
461 });
462 pi2_event_loop->MakeWatcher(
463 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
464 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
465 << pi2_event_loop->context().monotonic_remote_time << " -> "
466 << pi2_event_loop->context().monotonic_event_time;
467 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
468
469 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
470 pi2_ping_count * chrono::milliseconds(10) +
471 monotonic_clock::epoch());
472 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
473 pi2_ping_count * chrono::milliseconds(10) +
474 realtime_clock::epoch());
475 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time +
476 chrono::microseconds(150),
477 pi2_event_loop->context().monotonic_event_time);
478 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time +
479 chrono::microseconds(150),
480 pi2_event_loop->context().realtime_event_time);
481 ++pi2_ping_count;
482 });
483
484 constexpr ssize_t kQueueIndexOffset = -9;
485 // Confirm that the ping and pong counts both match, and the value also
486 // matches.
487 pi1_event_loop->MakeWatcher(
488 "/test", [&pi1_event_loop, &pi1_ping_count,
489 &pi1_pong_count](const examples::Pong &pong) {
490 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
491 << pi1_event_loop->context().monotonic_remote_time << " -> "
492 << pi1_event_loop->context().monotonic_event_time;
493
494 EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
495 pi1_pong_count + kQueueIndexOffset);
496 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
497 chrono::microseconds(200) +
498 pi1_pong_count * chrono::milliseconds(10) +
499 monotonic_clock::epoch());
500 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
501 chrono::microseconds(200) +
502 pi1_pong_count * chrono::milliseconds(10) +
503 realtime_clock::epoch());
504
505 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
506 chrono::microseconds(150),
507 pi1_event_loop->context().monotonic_event_time);
508 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
509 chrono::microseconds(150),
510 pi1_event_loop->context().realtime_event_time);
511
512 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
513 ++pi1_pong_count;
514 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
515 });
516 pi2_event_loop->MakeWatcher(
517 "/test", [&pi2_event_loop, &pi2_ping_count,
518 &pi2_pong_count](const examples::Pong &pong) {
519 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
520 << pi2_event_loop->context().monotonic_remote_time << " -> "
521 << pi2_event_loop->context().monotonic_event_time;
522
523 EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
524 pi2_pong_count + kQueueIndexOffset);
525
526 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
527 chrono::microseconds(200) +
528 pi2_pong_count * chrono::milliseconds(10) +
529 monotonic_clock::epoch());
530 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
531 chrono::microseconds(200) +
532 pi2_pong_count * chrono::milliseconds(10) +
533 realtime_clock::epoch());
534
535 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
536 pi2_event_loop->context().monotonic_event_time);
537 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
538 pi2_event_loop->context().realtime_event_time);
539
540 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
541 ++pi2_pong_count;
542 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
543 });
544
545 log_reader_factory.Run();
546 EXPECT_EQ(pi1_ping_count, 2010);
547 EXPECT_EQ(pi2_ping_count, 2010);
548 EXPECT_EQ(pi1_pong_count, 2010);
549 EXPECT_EQ(pi2_pong_count, 2010);
550
551 reader.Deregister();
552}
553
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600554// MultinodeLoggerTest that tests the mutate callback works across multiple
555// nodes with remapping
556TEST_P(MultinodeLoggerTest, MultiNodeRemapMutateCallback) {
557 time_converter_.StartEqual();
558 std::vector<std::string> actual_filenames;
559
560 {
561 LoggerState pi1_logger = MakeLogger(pi1_);
562 LoggerState pi2_logger = MakeLogger(pi2_);
563
564 event_loop_factory_.RunFor(chrono::milliseconds(95));
565
566 StartLogger(&pi1_logger);
567 StartLogger(&pi2_logger);
568
569 event_loop_factory_.RunFor(chrono::milliseconds(20000));
570 pi1_logger.AppendAllFilenames(&actual_filenames);
571 pi2_logger.AppendAllFilenames(&actual_filenames);
572 }
573
574 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700575 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600576
577 LogReader reader(sorted_parts, &config_.message());
578 // Remap just on pi1.
579 reader.RemapLoggedChannel<examples::Pong>(
580 "/test", configuration::GetNode(reader.configuration(), "pi1"));
581
582 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
583
584 int pong_count = 0;
585 // Adds a callback which mutates the value of the pong message before the
586 // message is sent which is the feature we are testing here
587 reader.AddBeforeSendCallback("/test",
588 [&pong_count](aos::examples::Pong *pong) {
589 pong->mutate_value(pong->value() + 1);
590 pong_count = pong->value();
591 });
592
593 // This sends out the fetched messages and advances time to the start of the
594 // log file.
595 reader.Register(&log_reader_factory);
596
597 const Node *pi1 =
598 configuration::GetNode(log_reader_factory.configuration(), "pi1");
599 const Node *pi2 =
600 configuration::GetNode(log_reader_factory.configuration(), "pi2");
601
602 EXPECT_THAT(reader.LoggedNodes(),
603 ::testing::ElementsAre(
604 configuration::GetNode(reader.logged_configuration(), pi1),
605 configuration::GetNode(reader.logged_configuration(), pi2)));
606
607 std::unique_ptr<EventLoop> pi1_event_loop =
608 log_reader_factory.MakeEventLoop("test", pi1);
609 std::unique_ptr<EventLoop> pi2_event_loop =
610 log_reader_factory.MakeEventLoop("test", pi2);
611
612 pi1_event_loop->MakeWatcher("/original/test",
613 [&pong_count](const examples::Pong &pong) {
614 EXPECT_EQ(pong_count, pong.value());
615 });
616
617 pi2_event_loop->MakeWatcher("/test",
618 [&pong_count](const examples::Pong &pong) {
619 EXPECT_EQ(pong_count, pong.value());
620 });
621
622 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
623 reader.Deregister();
624
625 EXPECT_EQ(pong_count, 2011);
626}
627
628// MultinodeLoggerTest that tests the mutate callback works across multiple
629// nodes
630TEST_P(MultinodeLoggerTest, MultiNodeMutateCallback) {
631 time_converter_.StartEqual();
632 std::vector<std::string> actual_filenames;
633
634 {
635 LoggerState pi1_logger = MakeLogger(pi1_);
636 LoggerState pi2_logger = MakeLogger(pi2_);
637
638 event_loop_factory_.RunFor(chrono::milliseconds(95));
639
640 StartLogger(&pi1_logger);
641 StartLogger(&pi2_logger);
642
643 event_loop_factory_.RunFor(chrono::milliseconds(20000));
644 pi1_logger.AppendAllFilenames(&actual_filenames);
645 pi2_logger.AppendAllFilenames(&actual_filenames);
646 }
647
648 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700649 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600650
651 LogReader reader(sorted_parts, &config_.message());
652
653 int pong_count = 0;
654 // Adds a callback which mutates the value of the pong message before the
655 // message is sent which is the feature we are testing here
656 reader.AddBeforeSendCallback("/test",
657 [&pong_count](aos::examples::Pong *pong) {
658 pong->mutate_value(pong->value() + 1);
659 pong_count = pong->value();
660 });
661
662 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
663
664 // This sends out the fetched messages and advances time to the start of the
665 // log file.
666 reader.Register(&log_reader_factory);
667
668 const Node *pi1 =
669 configuration::GetNode(log_reader_factory.configuration(), "pi1");
670 const Node *pi2 =
671 configuration::GetNode(log_reader_factory.configuration(), "pi2");
672
673 EXPECT_THAT(reader.LoggedNodes(),
674 ::testing::ElementsAre(
675 configuration::GetNode(reader.logged_configuration(), pi1),
676 configuration::GetNode(reader.logged_configuration(), pi2)));
677
678 std::unique_ptr<EventLoop> pi1_event_loop =
679 log_reader_factory.MakeEventLoop("test", pi1);
680 std::unique_ptr<EventLoop> pi2_event_loop =
681 log_reader_factory.MakeEventLoop("test", pi2);
682
683 pi1_event_loop->MakeWatcher("/test",
684 [&pong_count](const examples::Pong &pong) {
685 EXPECT_EQ(pong_count, pong.value());
686 });
687
688 pi2_event_loop->MakeWatcher("/test",
689 [&pong_count](const examples::Pong &pong) {
690 EXPECT_EQ(pong_count, pong.value());
691 });
692
693 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
694 reader.Deregister();
695
696 EXPECT_EQ(pong_count, 2011);
697}
698
699// Tests that the before send callback is only called from the sender node if it
700// is forwarded
701TEST_P(MultinodeLoggerTest, OnlyDoBeforeSendCallbackOnSenderNode) {
702 time_converter_.StartEqual();
703 {
704 LoggerState pi1_logger = MakeLogger(pi1_);
705 LoggerState pi2_logger = MakeLogger(pi2_);
706
707 event_loop_factory_.RunFor(chrono::milliseconds(95));
708
709 StartLogger(&pi1_logger);
710 StartLogger(&pi2_logger);
711
712 event_loop_factory_.RunFor(chrono::milliseconds(20000));
713 }
714
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700715 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
716 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
717 LogReader reader(sorted_parts);
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600718
719 int ping_count = 0;
720 // Adds a callback which mutates the value of the pong message before the
721 // message is sent which is the feature we are testing here
722 reader.AddBeforeSendCallback("/test",
723 [&ping_count](aos::examples::Ping *ping) {
724 ++ping_count;
725 ping->mutate_value(ping_count);
726 });
727
728 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
729 log_reader_factory.set_send_delay(chrono::microseconds(0));
730
731 reader.Register(&log_reader_factory);
732
733 const Node *pi1 =
734 configuration::GetNode(log_reader_factory.configuration(), "pi1");
735 const Node *pi2 =
736 configuration::GetNode(log_reader_factory.configuration(), "pi2");
737
738 std::unique_ptr<EventLoop> pi1_event_loop =
739 log_reader_factory.MakeEventLoop("test", pi1);
740 pi1_event_loop->SkipTimingReport();
741 std::unique_ptr<EventLoop> pi2_event_loop =
742 log_reader_factory.MakeEventLoop("test", pi2);
743 pi2_event_loop->SkipTimingReport();
744
745 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
746 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
747
748 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
749 pi1_ping_timestamp;
750 if (!shared()) {
751 pi1_ping_timestamp =
752 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
753 pi1_event_loop.get(),
754 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
755 }
756
757 log_reader_factory.Run();
758
759 EXPECT_EQ(pi1_ping.count(), 2000u);
760 EXPECT_EQ(pi2_ping.count(), 2000u);
761 // If the BeforeSendCallback is called on both nodes, then the ping count
762 // would be 4002 instead of 2001
763 EXPECT_EQ(ping_count, 2001u);
764 if (!shared()) {
765 EXPECT_EQ(pi1_ping_timestamp->count(), 2000u);
766 }
767
768 reader.Deregister();
769}
770
771// Tests that we do not allow adding callbacks after Register is called
772TEST_P(MultinodeLoggerDeathTest, AddCallbackAfterRegister) {
773 time_converter_.StartEqual();
774 std::vector<std::string> actual_filenames;
775
776 {
777 LoggerState pi1_logger = MakeLogger(pi1_);
778 LoggerState pi2_logger = MakeLogger(pi2_);
779
780 event_loop_factory_.RunFor(chrono::milliseconds(95));
781
782 StartLogger(&pi1_logger);
783 StartLogger(&pi2_logger);
784
785 event_loop_factory_.RunFor(chrono::milliseconds(20000));
786 pi1_logger.AppendAllFilenames(&actual_filenames);
787 pi2_logger.AppendAllFilenames(&actual_filenames);
788 }
789
790 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700791 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600792
793 LogReader reader(sorted_parts, &config_.message());
794 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
795 reader.Register(&log_reader_factory);
796 EXPECT_DEATH(
797 {
798 reader.AddBeforeSendCallback("/test", [](aos::examples::Pong *) {
799 LOG(FATAL) << "This should not be called";
800 });
801 },
802 "Cannot add callbacks after calling Register");
803 reader.Deregister();
804}
805
Naman Guptaa63aa132023-03-22 20:06:34 -0700806// Test that if we feed the replay with a mismatched node list that we die on
807// the LogReader constructor.
808TEST_P(MultinodeLoggerDeathTest, MultiNodeBadReplayConfig) {
809 time_converter_.StartEqual();
810 {
811 LoggerState pi1_logger = MakeLogger(pi1_);
812 LoggerState pi2_logger = MakeLogger(pi2_);
813
814 event_loop_factory_.RunFor(chrono::milliseconds(95));
815
816 StartLogger(&pi1_logger);
817 StartLogger(&pi2_logger);
818
819 event_loop_factory_.RunFor(chrono::milliseconds(20000));
820 }
821
822 // Test that, if we add an additional node to the replay config that the
823 // logger complains about the mismatch in number of nodes.
824 FlatbufferDetachedBuffer<Configuration> extra_nodes_config =
825 configuration::MergeWithConfig(&config_.message(), R"({
826 "nodes": [
827 {
828 "name": "extra-node"
829 }
830 ]
831 }
832 )");
833
834 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700835 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -0700836 EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
837 "Log file and replay config need to have matching nodes lists.");
838}
839
840// Tests that we can read log files where they don't start at the same monotonic
841// time.
842TEST_P(MultinodeLoggerTest, StaggeredStart) {
843 time_converter_.StartEqual();
844 std::vector<std::string> actual_filenames;
845
846 {
847 LoggerState pi1_logger = MakeLogger(pi1_);
848 LoggerState pi2_logger = MakeLogger(pi2_);
849
850 event_loop_factory_.RunFor(chrono::milliseconds(95));
851
852 StartLogger(&pi1_logger);
853
854 event_loop_factory_.RunFor(chrono::milliseconds(200));
855
856 StartLogger(&pi2_logger);
857
858 event_loop_factory_.RunFor(chrono::milliseconds(20000));
859 pi1_logger.AppendAllFilenames(&actual_filenames);
860 pi2_logger.AppendAllFilenames(&actual_filenames);
861 }
862
863 // Since we delay starting pi2, it already knows about all the timestamps so
864 // we don't end up with extra parts.
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700865 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
866 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
867 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700868
869 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
870 log_reader_factory.set_send_delay(chrono::microseconds(0));
871
872 // This sends out the fetched messages and advances time to the start of the
873 // log file.
874 reader.Register(&log_reader_factory);
875
876 const Node *pi1 =
877 configuration::GetNode(log_reader_factory.configuration(), "pi1");
878 const Node *pi2 =
879 configuration::GetNode(log_reader_factory.configuration(), "pi2");
880
881 EXPECT_THAT(reader.LoggedNodes(),
882 ::testing::ElementsAre(
883 configuration::GetNode(reader.logged_configuration(), pi1),
884 configuration::GetNode(reader.logged_configuration(), pi2)));
885
886 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
887
888 std::unique_ptr<EventLoop> pi1_event_loop =
889 log_reader_factory.MakeEventLoop("test", pi1);
890 std::unique_ptr<EventLoop> pi2_event_loop =
891 log_reader_factory.MakeEventLoop("test", pi2);
892
893 int pi1_ping_count = 30;
894 int pi2_ping_count = 30;
895 int pi1_pong_count = 30;
896 int pi2_pong_count = 30;
897
898 // Confirm that the ping value matches.
899 pi1_event_loop->MakeWatcher(
900 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
901 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
902 << pi1_event_loop->context().monotonic_remote_time << " -> "
903 << pi1_event_loop->context().monotonic_event_time;
904 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
905
906 ++pi1_ping_count;
907 });
908 pi2_event_loop->MakeWatcher(
909 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
910 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
911 << pi2_event_loop->context().monotonic_remote_time << " -> "
912 << pi2_event_loop->context().monotonic_event_time;
913 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
914
915 ++pi2_ping_count;
916 });
917
918 // Confirm that the ping and pong counts both match, and the value also
919 // matches.
920 pi1_event_loop->MakeWatcher(
921 "/test", [&pi1_event_loop, &pi1_ping_count,
922 &pi1_pong_count](const examples::Pong &pong) {
923 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
924 << pi1_event_loop->context().monotonic_remote_time << " -> "
925 << pi1_event_loop->context().monotonic_event_time;
926
927 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
928 ++pi1_pong_count;
929 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
930 });
931 pi2_event_loop->MakeWatcher(
932 "/test", [&pi2_event_loop, &pi2_ping_count,
933 &pi2_pong_count](const examples::Pong &pong) {
934 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
935 << pi2_event_loop->context().monotonic_remote_time << " -> "
936 << pi2_event_loop->context().monotonic_event_time;
937
938 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
939 ++pi2_pong_count;
940 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
941 });
942
943 log_reader_factory.Run();
944 EXPECT_EQ(pi1_ping_count, 2030);
945 EXPECT_EQ(pi2_ping_count, 2030);
946 EXPECT_EQ(pi1_pong_count, 2030);
947 EXPECT_EQ(pi2_pong_count, 2030);
948
949 reader.Deregister();
950}
951
952// Tests that we can read log files where the monotonic clocks drift and don't
953// match correctly. While we are here, also test that different ending times
954// also is readable.
955TEST_P(MultinodeLoggerTest, MismatchedClocks) {
956 // TODO(austin): Negate...
957 const chrono::nanoseconds initial_pi2_offset = chrono::seconds(1000);
958
959 time_converter_.AddMonotonic(
960 {BootTimestamp::epoch(), BootTimestamp::epoch() + initial_pi2_offset});
961 // Wait for 95 ms, (~0.1 seconds - 1/2 of the ping/pong period), and set the
962 // skew to be 200 uS/s
963 const chrono::nanoseconds startup_sleep1 = time_converter_.AddMonotonic(
964 {chrono::milliseconds(95),
965 chrono::milliseconds(95) - chrono::nanoseconds(200) * 95});
966 // Run another 200 ms to have one logger start first.
967 const chrono::nanoseconds startup_sleep2 = time_converter_.AddMonotonic(
968 {chrono::milliseconds(200), chrono::milliseconds(200)});
969 // Slew one way then the other at the same 200 uS/S slew rate. Make sure we
970 // go far enough to cause problems if this isn't accounted for.
971 const chrono::nanoseconds logger_run1 = time_converter_.AddMonotonic(
972 {chrono::milliseconds(20000),
973 chrono::milliseconds(20000) - chrono::nanoseconds(200) * 20000});
974 const chrono::nanoseconds logger_run2 = time_converter_.AddMonotonic(
975 {chrono::milliseconds(40000),
976 chrono::milliseconds(40000) + chrono::nanoseconds(200) * 40000});
977 const chrono::nanoseconds logger_run3 = time_converter_.AddMonotonic(
978 {chrono::milliseconds(400), chrono::milliseconds(400)});
979
980 {
981 LoggerState pi2_logger = MakeLogger(pi2_);
982
983 LOG(INFO) << "pi2 times: " << pi2_->monotonic_now() << " "
984 << pi2_->realtime_now() << " distributed "
985 << pi2_->ToDistributedClock(pi2_->monotonic_now());
986
987 LOG(INFO) << "pi2_ times: " << pi2_->monotonic_now() << " "
988 << pi2_->realtime_now() << " distributed "
989 << pi2_->ToDistributedClock(pi2_->monotonic_now());
990
991 event_loop_factory_.RunFor(startup_sleep1);
992
993 StartLogger(&pi2_logger);
994
995 event_loop_factory_.RunFor(startup_sleep2);
996
997 {
998 // Run pi1's logger for only part of the time.
999 LoggerState pi1_logger = MakeLogger(pi1_);
1000
1001 StartLogger(&pi1_logger);
1002 event_loop_factory_.RunFor(logger_run1);
1003
1004 // Make sure we slewed time far enough so that the difference is greater
1005 // than the network delay. This confirms that if we sort incorrectly, it
1006 // would show in the results.
1007 EXPECT_LT(
1008 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
1009 -event_loop_factory_.send_delay() -
1010 event_loop_factory_.network_delay());
1011
1012 event_loop_factory_.RunFor(logger_run2);
1013
1014 // And now check that we went far enough the other way to make sure we
1015 // cover both problems.
1016 EXPECT_GT(
1017 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
1018 event_loop_factory_.send_delay() +
1019 event_loop_factory_.network_delay());
1020 }
1021
1022 // And log a bit more on pi2.
1023 event_loop_factory_.RunFor(logger_run3);
1024 }
1025
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001026 const std::vector<LogFile> sorted_parts =
1027 SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 3, 3));
1028 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1029 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001030
1031 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1032 log_reader_factory.set_send_delay(chrono::microseconds(0));
1033
1034 const Node *pi1 =
1035 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1036 const Node *pi2 =
1037 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1038
1039 // This sends out the fetched messages and advances time to the start of the
1040 // log file.
1041 reader.Register(&log_reader_factory);
1042
1043 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1044 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1045 LOG(INFO) << "now pi1 "
1046 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1047 LOG(INFO) << "now pi2 "
1048 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1049
1050 LOG(INFO) << "Done registering (pi1) "
1051 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
1052 << " "
1053 << log_reader_factory.GetNodeEventLoopFactory(pi1)->realtime_now();
1054 LOG(INFO) << "Done registering (pi2) "
1055 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now()
1056 << " "
1057 << log_reader_factory.GetNodeEventLoopFactory(pi2)->realtime_now();
1058
1059 EXPECT_THAT(reader.LoggedNodes(),
1060 ::testing::ElementsAre(
1061 configuration::GetNode(reader.logged_configuration(), pi1),
1062 configuration::GetNode(reader.logged_configuration(), pi2)));
1063
1064 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1065
1066 std::unique_ptr<EventLoop> pi1_event_loop =
1067 log_reader_factory.MakeEventLoop("test", pi1);
1068 std::unique_ptr<EventLoop> pi2_event_loop =
1069 log_reader_factory.MakeEventLoop("test", pi2);
1070
1071 int pi1_ping_count = 30;
1072 int pi2_ping_count = 30;
1073 int pi1_pong_count = 30;
1074 int pi2_pong_count = 30;
1075
1076 // Confirm that the ping value matches.
1077 pi1_event_loop->MakeWatcher(
1078 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
1079 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
1080 << pi1_event_loop->context().monotonic_remote_time << " -> "
1081 << pi1_event_loop->context().monotonic_event_time;
1082 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
1083
1084 ++pi1_ping_count;
1085 });
1086 pi2_event_loop->MakeWatcher(
1087 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
1088 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
1089 << pi2_event_loop->context().monotonic_remote_time << " -> "
1090 << pi2_event_loop->context().monotonic_event_time;
1091 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
1092
1093 ++pi2_ping_count;
1094 });
1095
1096 // Confirm that the ping and pong counts both match, and the value also
1097 // matches.
1098 pi1_event_loop->MakeWatcher(
1099 "/test", [&pi1_event_loop, &pi1_ping_count,
1100 &pi1_pong_count](const examples::Pong &pong) {
1101 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
1102 << pi1_event_loop->context().monotonic_remote_time << " -> "
1103 << pi1_event_loop->context().monotonic_event_time;
1104
1105 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
1106 ++pi1_pong_count;
1107 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
1108 });
1109 pi2_event_loop->MakeWatcher(
1110 "/test", [&pi2_event_loop, &pi2_ping_count,
1111 &pi2_pong_count](const examples::Pong &pong) {
1112 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
1113 << pi2_event_loop->context().monotonic_remote_time << " -> "
1114 << pi2_event_loop->context().monotonic_event_time;
1115
1116 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
1117 ++pi2_pong_count;
1118 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
1119 });
1120
1121 log_reader_factory.Run();
1122 EXPECT_EQ(pi1_ping_count, 6030);
1123 EXPECT_EQ(pi2_ping_count, 6030);
1124 EXPECT_EQ(pi1_pong_count, 6030);
1125 EXPECT_EQ(pi2_pong_count, 6030);
1126
1127 reader.Deregister();
1128}
1129
1130// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
1131TEST_P(MultinodeLoggerTest, SortParts) {
1132 time_converter_.StartEqual();
1133 // Make a bunch of parts.
1134 {
1135 LoggerState pi1_logger = MakeLogger(pi1_);
1136 LoggerState pi2_logger = MakeLogger(pi2_);
1137
1138 event_loop_factory_.RunFor(chrono::milliseconds(95));
1139
1140 StartLogger(&pi1_logger);
1141 StartLogger(&pi2_logger);
1142
1143 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1144 }
1145
1146 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1147 VerifyParts(sorted_parts);
1148}
1149
1150// Tests that we can sort a bunch of parts with an empty part. We should ignore
1151// it and remove it from the sorted list.
1152TEST_P(MultinodeLoggerTest, SortEmptyParts) {
1153 time_converter_.StartEqual();
1154 // Make a bunch of parts.
1155 {
1156 LoggerState pi1_logger = MakeLogger(pi1_);
1157 LoggerState pi2_logger = MakeLogger(pi2_);
1158
1159 event_loop_factory_.RunFor(chrono::milliseconds(95));
1160
1161 StartLogger(&pi1_logger);
1162 StartLogger(&pi2_logger);
1163
1164 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1165 }
1166
1167 // TODO(austin): Should we flip out if the file can't open?
1168 const std::string kEmptyFile("foobarinvalidfiledoesnotexist" + Extension());
1169
1170 aos::util::WriteStringToFileOrDie(kEmptyFile, "");
1171 logfiles_.emplace_back(kEmptyFile);
1172
1173 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1174 VerifyParts(sorted_parts, {kEmptyFile});
1175}
1176
1177// Tests that we can sort a bunch of parts with the end missing off a
1178// file. We should use the part we can read.
1179TEST_P(MultinodeLoggerTest, SortTruncatedParts) {
1180 std::vector<std::string> actual_filenames;
1181 time_converter_.StartEqual();
1182 // Make a bunch of parts.
1183 {
1184 LoggerState pi1_logger = MakeLogger(pi1_);
1185 LoggerState pi2_logger = MakeLogger(pi2_);
1186
1187 event_loop_factory_.RunFor(chrono::milliseconds(95));
1188
1189 StartLogger(&pi1_logger);
1190 StartLogger(&pi2_logger);
1191
1192 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1193
1194 pi1_logger.AppendAllFilenames(&actual_filenames);
1195 pi2_logger.AppendAllFilenames(&actual_filenames);
1196 }
1197
1198 ASSERT_THAT(actual_filenames,
1199 ::testing::UnorderedElementsAreArray(logfiles_));
1200
1201 // Strip off the end of one of the files. Pick one with a lot of data.
1202 // For snappy, needs to have enough data to be >1 chunk of compressed data so
1203 // that we don't corrupt the entire log part.
1204 ::std::string compressed_contents =
1205 aos::util::ReadFileToStringOrDie(logfiles_[4]);
1206
1207 aos::util::WriteStringToFileOrDie(
1208 logfiles_[4],
1209 compressed_contents.substr(0, compressed_contents.size() - 100));
1210
1211 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1212 VerifyParts(sorted_parts);
1213}
1214
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001215// Tests that if we remap a logged channel, it shows up correctly.
Naman Guptaa63aa132023-03-22 20:06:34 -07001216TEST_P(MultinodeLoggerTest, RemapLoggedChannel) {
1217 time_converter_.StartEqual();
1218 {
1219 LoggerState pi1_logger = MakeLogger(pi1_);
1220 LoggerState pi2_logger = MakeLogger(pi2_);
1221
1222 event_loop_factory_.RunFor(chrono::milliseconds(95));
1223
1224 StartLogger(&pi1_logger);
1225 StartLogger(&pi2_logger);
1226
1227 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1228 }
1229
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001230 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1231 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1232 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001233
1234 // Remap just on pi1.
1235 reader.RemapLoggedChannel<aos::timing::Report>(
1236 "/aos", configuration::GetNode(reader.configuration(), "pi1"));
1237
1238 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1239 log_reader_factory.set_send_delay(chrono::microseconds(0));
1240
1241 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1242 // Note: An extra channel gets remapped automatically due to a timestamp
1243 // channel being LOCAL_LOGGER'd.
1244 ASSERT_EQ(remapped_channels.size(), std::get<0>(GetParam()).shared ? 1u : 2u);
1245 EXPECT_EQ(remapped_channels[0]->name()->string_view(), "/original/pi1/aos");
1246 EXPECT_EQ(remapped_channels[0]->type()->string_view(), "aos.timing.Report");
1247 if (!std::get<0>(GetParam()).shared) {
1248 EXPECT_EQ(remapped_channels[1]->name()->string_view(),
1249 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1250 "aos-message_bridge-Timestamp");
1251 EXPECT_EQ(remapped_channels[1]->type()->string_view(),
1252 "aos.message_bridge.RemoteMessage");
1253 }
1254
1255 reader.Register(&log_reader_factory);
1256
1257 const Node *pi1 =
1258 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1259 const Node *pi2 =
1260 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1261
1262 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1263 // else should have moved.
1264 std::unique_ptr<EventLoop> pi1_event_loop =
1265 log_reader_factory.MakeEventLoop("test", pi1);
1266 pi1_event_loop->SkipTimingReport();
1267 std::unique_ptr<EventLoop> full_pi1_event_loop =
1268 log_reader_factory.MakeEventLoop("test", pi1);
1269 full_pi1_event_loop->SkipTimingReport();
1270 std::unique_ptr<EventLoop> pi2_event_loop =
1271 log_reader_factory.MakeEventLoop("test", pi2);
1272 pi2_event_loop->SkipTimingReport();
1273
1274 MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
1275 "/aos");
1276 MessageCounter<aos::timing::Report> full_pi1_timing_report(
1277 full_pi1_event_loop.get(), "/pi1/aos");
1278 MessageCounter<aos::timing::Report> pi1_original_timing_report(
1279 pi1_event_loop.get(), "/original/aos");
1280 MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
1281 full_pi1_event_loop.get(), "/original/pi1/aos");
1282 MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
1283 "/aos");
1284
1285 log_reader_factory.Run();
1286
1287 EXPECT_EQ(pi1_timing_report.count(), 0u);
1288 EXPECT_EQ(full_pi1_timing_report.count(), 0u);
1289 EXPECT_NE(pi1_original_timing_report.count(), 0u);
1290 EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
1291 EXPECT_NE(pi2_timing_report.count(), 0u);
1292
1293 reader.Deregister();
1294}
1295
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001296// Tests that if we rename a logged channel, it shows up correctly.
1297TEST_P(MultinodeLoggerTest, RenameLoggedChannel) {
1298 std::vector<std::string> actual_filenames;
1299 time_converter_.StartEqual();
1300 {
1301 LoggerState pi1_logger = MakeLogger(pi1_);
1302 LoggerState pi2_logger = MakeLogger(pi2_);
1303
1304 event_loop_factory_.RunFor(chrono::milliseconds(95));
1305
1306 StartLogger(&pi1_logger);
1307 StartLogger(&pi2_logger);
1308
1309 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1310
1311 pi1_logger.AppendAllFilenames(&actual_filenames);
1312 pi2_logger.AppendAllFilenames(&actual_filenames);
1313 }
1314
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001315 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1316 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1317 LogReader reader(sorted_parts);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001318
1319 // Rename just on pi2. Add some global maps just to verify they get added in
1320 // the config and used correctly.
1321 std::vector<MapT> maps;
1322 {
1323 MapT map;
1324 map.match = std::make_unique<ChannelT>();
1325 map.match->name = "/foo*";
1326 map.match->source_node = "pi1";
1327 map.rename = std::make_unique<ChannelT>();
1328 map.rename->name = "/pi1/foo";
1329 maps.emplace_back(std::move(map));
1330 }
1331 {
1332 MapT map;
1333 map.match = std::make_unique<ChannelT>();
1334 map.match->name = "/foo*";
1335 map.match->source_node = "pi2";
1336 map.rename = std::make_unique<ChannelT>();
1337 map.rename->name = "/pi2/foo";
1338 maps.emplace_back(std::move(map));
1339 }
1340 {
1341 MapT map;
1342 map.match = std::make_unique<ChannelT>();
1343 map.match->name = "/foo";
1344 map.match->type = "aos.examples.Ping";
1345 map.rename = std::make_unique<ChannelT>();
1346 map.rename->name = "/foo/renamed";
1347 maps.emplace_back(std::move(map));
1348 }
1349 reader.RenameLoggedChannel<aos::examples::Ping>(
1350 "/aos", configuration::GetNode(reader.configuration(), "pi2"),
1351 "/pi2/foo/renamed", maps);
1352
1353 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1354 log_reader_factory.set_send_delay(chrono::microseconds(0));
1355
1356 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1357 // Note: An extra channel gets remapped automatically due to a timestamp
1358 // channel being LOCAL_LOGGER'd.
1359 const bool shared = std::get<0>(GetParam()).shared;
1360 ASSERT_EQ(remapped_channels.size(), shared ? 1u : 2u);
1361 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->name()->string_view(),
1362 "/pi2/foo/renamed");
1363 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->type()->string_view(),
1364 "aos.examples.Ping");
1365 if (!shared) {
1366 EXPECT_EQ(remapped_channels[0]->name()->string_view(),
1367 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1368 "aos-message_bridge-Timestamp");
1369 EXPECT_EQ(remapped_channels[0]->type()->string_view(),
1370 "aos.message_bridge.RemoteMessage");
1371 }
1372
1373 reader.Register(&log_reader_factory);
1374
1375 const Node *pi1 =
1376 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1377 const Node *pi2 =
1378 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1379
1380 // Confirm we can read the data on the renamed channel, just for pi2. Nothing
1381 // else should have moved.
1382 std::unique_ptr<EventLoop> pi2_event_loop =
1383 log_reader_factory.MakeEventLoop("test", pi2);
1384 pi2_event_loop->SkipTimingReport();
1385 std::unique_ptr<EventLoop> full_pi2_event_loop =
1386 log_reader_factory.MakeEventLoop("test", pi2);
1387 full_pi2_event_loop->SkipTimingReport();
1388 std::unique_ptr<EventLoop> pi1_event_loop =
1389 log_reader_factory.MakeEventLoop("test", pi1);
1390 pi1_event_loop->SkipTimingReport();
1391
1392 MessageCounter<aos::examples::Ping> pi2_ping(pi2_event_loop.get(), "/aos");
1393 MessageCounter<aos::examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1394 "/foo");
1395 MessageCounter<aos::examples::Ping> full_pi2_renamed_ping(
1396 full_pi2_event_loop.get(), "/pi2/foo/renamed");
1397 MessageCounter<aos::examples::Ping> pi1_ping(pi1_event_loop.get(), "/aos");
1398
1399 log_reader_factory.Run();
1400
1401 EXPECT_EQ(pi2_ping.count(), 0u);
1402 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1403 EXPECT_NE(full_pi2_renamed_ping.count(), 0u);
1404 EXPECT_NE(pi1_ping.count(), 0u);
1405
1406 reader.Deregister();
1407}
1408
Naman Guptaa63aa132023-03-22 20:06:34 -07001409// Tests that we can remap a forwarded channel as well.
1410TEST_P(MultinodeLoggerTest, RemapForwardedLoggedChannel) {
1411 time_converter_.StartEqual();
1412 {
1413 LoggerState pi1_logger = MakeLogger(pi1_);
1414 LoggerState pi2_logger = MakeLogger(pi2_);
1415
1416 event_loop_factory_.RunFor(chrono::milliseconds(95));
1417
1418 StartLogger(&pi1_logger);
1419 StartLogger(&pi2_logger);
1420
1421 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1422 }
1423
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001424 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1425 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1426 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001427
1428 reader.RemapLoggedChannel<examples::Ping>("/test");
1429
1430 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1431 log_reader_factory.set_send_delay(chrono::microseconds(0));
1432
1433 reader.Register(&log_reader_factory);
1434
1435 const Node *pi1 =
1436 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1437 const Node *pi2 =
1438 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1439
1440 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1441 // else should have moved.
1442 std::unique_ptr<EventLoop> pi1_event_loop =
1443 log_reader_factory.MakeEventLoop("test", pi1);
1444 pi1_event_loop->SkipTimingReport();
1445 std::unique_ptr<EventLoop> full_pi1_event_loop =
1446 log_reader_factory.MakeEventLoop("test", pi1);
1447 full_pi1_event_loop->SkipTimingReport();
1448 std::unique_ptr<EventLoop> pi2_event_loop =
1449 log_reader_factory.MakeEventLoop("test", pi2);
1450 pi2_event_loop->SkipTimingReport();
1451
1452 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1453 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1454 MessageCounter<examples::Ping> pi1_original_ping(pi1_event_loop.get(),
1455 "/original/test");
1456 MessageCounter<examples::Ping> pi2_original_ping(pi2_event_loop.get(),
1457 "/original/test");
1458
1459 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1460 pi1_original_ping_timestamp;
1461 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1462 pi1_ping_timestamp;
1463 if (!shared()) {
1464 pi1_original_ping_timestamp =
1465 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1466 pi1_event_loop.get(),
1467 "/pi1/aos/remote_timestamps/pi2/original/test/aos-examples-Ping");
1468 pi1_ping_timestamp =
1469 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1470 pi1_event_loop.get(),
1471 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1472 }
1473
1474 log_reader_factory.Run();
1475
1476 EXPECT_EQ(pi1_ping.count(), 0u);
1477 EXPECT_EQ(pi2_ping.count(), 0u);
1478 EXPECT_NE(pi1_original_ping.count(), 0u);
1479 EXPECT_NE(pi2_original_ping.count(), 0u);
1480 if (!shared()) {
1481 EXPECT_NE(pi1_original_ping_timestamp->count(), 0u);
1482 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1483 }
1484
1485 reader.Deregister();
1486}
1487
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001488// Tests that we can rename a forwarded channel as well.
1489TEST_P(MultinodeLoggerTest, RenameForwardedLoggedChannel) {
1490 std::vector<std::string> actual_filenames;
1491 time_converter_.StartEqual();
1492 {
1493 LoggerState pi1_logger = MakeLogger(pi1_);
1494 LoggerState pi2_logger = MakeLogger(pi2_);
1495
1496 event_loop_factory_.RunFor(chrono::milliseconds(95));
1497
1498 StartLogger(&pi1_logger);
1499 StartLogger(&pi2_logger);
1500
1501 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1502
1503 pi1_logger.AppendAllFilenames(&actual_filenames);
1504 pi2_logger.AppendAllFilenames(&actual_filenames);
1505 }
1506
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001507 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1508 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1509 LogReader reader(sorted_parts);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001510
1511 std::vector<MapT> maps;
1512 {
1513 MapT map;
1514 map.match = std::make_unique<ChannelT>();
1515 map.match->name = "/production*";
1516 map.match->source_node = "pi1";
1517 map.rename = std::make_unique<ChannelT>();
1518 map.rename->name = "/pi1/production";
1519 maps.emplace_back(std::move(map));
1520 }
1521 {
1522 MapT map;
1523 map.match = std::make_unique<ChannelT>();
1524 map.match->name = "/production*";
1525 map.match->source_node = "pi2";
1526 map.rename = std::make_unique<ChannelT>();
1527 map.rename->name = "/pi2/production";
1528 maps.emplace_back(std::move(map));
1529 }
1530 reader.RenameLoggedChannel<aos::examples::Ping>(
1531 "/test", configuration::GetNode(reader.configuration(), "pi1"),
1532 "/pi1/production", maps);
1533
1534 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1535 log_reader_factory.set_send_delay(chrono::microseconds(0));
1536
1537 reader.Register(&log_reader_factory);
1538
1539 const Node *pi1 =
1540 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1541 const Node *pi2 =
1542 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1543
1544 // Confirm we can read the data on the renamed channel, on both the source
1545 // node and the remote node. In case of split timestamp channels, confirm that
1546 // we receive the timestamp messages on the renamed channel as well.
1547 std::unique_ptr<EventLoop> pi1_event_loop =
1548 log_reader_factory.MakeEventLoop("test", pi1);
1549 pi1_event_loop->SkipTimingReport();
1550 std::unique_ptr<EventLoop> full_pi1_event_loop =
1551 log_reader_factory.MakeEventLoop("test", pi1);
1552 full_pi1_event_loop->SkipTimingReport();
1553 std::unique_ptr<EventLoop> pi2_event_loop =
1554 log_reader_factory.MakeEventLoop("test", pi2);
1555 pi2_event_loop->SkipTimingReport();
1556
1557 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1558 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1559 MessageCounter<examples::Ping> pi1_renamed_ping(pi1_event_loop.get(),
1560 "/pi1/production");
1561 MessageCounter<examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1562 "/pi1/production");
1563
1564 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1565 pi1_renamed_ping_timestamp;
1566 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1567 pi1_ping_timestamp;
1568 if (!shared()) {
1569 pi1_renamed_ping_timestamp =
1570 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1571 pi1_event_loop.get(),
1572 "/pi1/aos/remote_timestamps/pi2/pi1/production/aos-examples-Ping");
1573 pi1_ping_timestamp =
1574 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1575 pi1_event_loop.get(),
1576 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1577 }
1578
1579 log_reader_factory.Run();
1580
1581 EXPECT_EQ(pi1_ping.count(), 0u);
1582 EXPECT_EQ(pi2_ping.count(), 0u);
1583 EXPECT_NE(pi1_renamed_ping.count(), 0u);
1584 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1585 if (!shared()) {
1586 EXPECT_NE(pi1_renamed_ping_timestamp->count(), 0u);
1587 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1588 }
1589
1590 reader.Deregister();
1591}
1592
Naman Guptaa63aa132023-03-22 20:06:34 -07001593// Tests that we observe all the same events in log replay (for a given node)
1594// whether we just register an event loop for that node or if we register a full
1595// event loop factory.
1596TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
1597 time_converter_.StartEqual();
1598 constexpr chrono::milliseconds kStartupDelay(95);
1599 {
1600 LoggerState pi1_logger = MakeLogger(pi1_);
1601 LoggerState pi2_logger = MakeLogger(pi2_);
1602
1603 event_loop_factory_.RunFor(kStartupDelay);
1604
1605 StartLogger(&pi1_logger);
1606 StartLogger(&pi2_logger);
1607
1608 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1609 }
1610
1611 LogReader full_reader(SortParts(logfiles_));
1612 LogReader single_node_reader(SortParts(logfiles_));
1613
1614 SimulatedEventLoopFactory full_factory(full_reader.configuration());
1615 SimulatedEventLoopFactory single_node_factory(
1616 single_node_reader.configuration());
1617 single_node_factory.SkipTimingReport();
1618 single_node_factory.DisableStatistics();
1619 std::unique_ptr<EventLoop> replay_event_loop =
1620 single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
1621 "log_reader");
1622
1623 full_reader.Register(&full_factory);
1624 single_node_reader.Register(replay_event_loop.get());
1625
1626 const Node *full_pi1 =
1627 configuration::GetNode(full_factory.configuration(), "pi1");
1628
1629 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1630 // else should have moved.
1631 std::unique_ptr<EventLoop> full_event_loop =
1632 full_factory.MakeEventLoop("test", full_pi1);
1633 full_event_loop->SkipTimingReport();
1634 full_event_loop->SkipAosLog();
1635 // maps are indexed on channel index.
1636 // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
1637 std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
1638 observed_messages;
1639 std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
1640 for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
1641 ++ii) {
1642 const Channel *channel =
1643 full_event_loop->configuration()->channels()->Get(ii);
1644 // We currently don't support replaying remote timestamp channels in
1645 // realtime replay (unless the remote timestamp channel was not NOT_LOGGED,
1646 // in which case it gets auto-remapped and replayed on a /original channel).
1647 if (channel->name()->string_view().find("remote_timestamp") !=
1648 std::string_view::npos &&
1649 channel->name()->string_view().find("/original") ==
1650 std::string_view::npos) {
1651 continue;
1652 }
1653 if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
1654 observed_messages[ii] = {};
1655 fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
1656 full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
1657 if (fetchers[ii]->Fetch()) {
1658 observed_messages[ii].push_back(std::make_pair(
1659 fetchers[ii]->context().monotonic_event_time, true));
1660 }
1661 });
1662 full_event_loop->MakeRawNoArgWatcher(
1663 channel, [ii, &observed_messages](const Context &context) {
1664 observed_messages[ii].push_back(
1665 std::make_pair(context.monotonic_event_time, false));
1666 });
1667 }
1668 }
1669
1670 full_factory.Run();
1671 fetchers.clear();
1672 full_reader.Deregister();
1673
1674 const Node *single_node_pi1 =
1675 configuration::GetNode(single_node_factory.configuration(), "pi1");
1676 std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
1677
1678 std::unique_ptr<EventLoop> single_node_event_loop =
1679 single_node_factory.MakeEventLoop("test", single_node_pi1);
1680 single_node_event_loop->SkipTimingReport();
1681 single_node_event_loop->SkipAosLog();
1682 for (size_t ii = 0;
1683 ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
1684 const Channel *channel =
1685 single_node_event_loop->configuration()->channels()->Get(ii);
1686 single_node_factory.DisableForwarding(channel);
1687 if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
1688 single_node_fetchers[ii] =
1689 single_node_event_loop->MakeRawFetcher(channel);
1690 single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
1691 EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
1692 << "Single EventLoop replay doesn't support pre-loading fetchers. "
1693 << configuration::StrippedChannelToString(channel);
1694 });
1695 single_node_event_loop->MakeRawNoArgWatcher(
1696 channel, [ii, &observed_messages, channel,
1697 kStartupDelay](const Context &context) {
1698 if (observed_messages[ii].empty()) {
1699 FAIL() << "Observed extra message at "
1700 << context.monotonic_event_time << " on "
1701 << configuration::StrippedChannelToString(channel);
1702 return;
1703 }
1704 const std::pair<monotonic_clock::time_point, bool> &message =
1705 observed_messages[ii].front();
1706 if (message.second) {
1707 EXPECT_LE(message.first,
1708 context.monotonic_event_time + kStartupDelay)
1709 << "Mismatched message times " << context.monotonic_event_time
1710 << " and " << message.first << " on "
1711 << configuration::StrippedChannelToString(channel);
1712 } else {
1713 EXPECT_EQ(message.first,
1714 context.monotonic_event_time + kStartupDelay)
1715 << "Mismatched message times " << context.monotonic_event_time
1716 << " and " << message.first << " on "
1717 << configuration::StrippedChannelToString(channel);
1718 }
1719 observed_messages[ii].erase(observed_messages[ii].begin());
1720 });
1721 }
1722 }
1723
1724 single_node_factory.Run();
1725
1726 single_node_fetchers.clear();
1727
1728 single_node_reader.Deregister();
1729
1730 for (const auto &pair : observed_messages) {
1731 EXPECT_TRUE(pair.second.empty())
1732 << "Missed " << pair.second.size() << " messages on "
1733 << configuration::StrippedChannelToString(
1734 single_node_event_loop->configuration()->channels()->Get(
1735 pair.first));
1736 }
1737}
1738
1739// Tests that we properly recreate forwarded timestamps when replaying a log.
1740// This should be enough that we can then re-run the logger and get a valid log
1741// back.
1742TEST_P(MultinodeLoggerTest, MessageHeader) {
1743 time_converter_.StartEqual();
1744 {
1745 LoggerState pi1_logger = MakeLogger(pi1_);
1746 LoggerState pi2_logger = MakeLogger(pi2_);
1747
1748 event_loop_factory_.RunFor(chrono::milliseconds(95));
1749
1750 StartLogger(&pi1_logger);
1751 StartLogger(&pi2_logger);
1752
1753 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1754 }
1755
1756 LogReader reader(SortParts(logfiles_));
1757
1758 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1759 log_reader_factory.set_send_delay(chrono::microseconds(0));
1760
1761 // This sends out the fetched messages and advances time to the start of the
1762 // log file.
1763 reader.Register(&log_reader_factory);
1764
1765 const Node *pi1 =
1766 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1767 const Node *pi2 =
1768 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1769
1770 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1771 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1772 LOG(INFO) << "now pi1 "
1773 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1774 LOG(INFO) << "now pi2 "
1775 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1776
1777 EXPECT_THAT(reader.LoggedNodes(),
1778 ::testing::ElementsAre(
1779 configuration::GetNode(reader.logged_configuration(), pi1),
1780 configuration::GetNode(reader.logged_configuration(), pi2)));
1781
1782 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1783
1784 std::unique_ptr<EventLoop> pi1_event_loop =
1785 log_reader_factory.MakeEventLoop("test", pi1);
1786 std::unique_ptr<EventLoop> pi2_event_loop =
1787 log_reader_factory.MakeEventLoop("test", pi2);
1788
1789 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
1790 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1791 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
1792 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1793
1794 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1795 pi1_event_loop->MakeFetcher<examples::Ping>("/test");
1796 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1797 pi2_event_loop->MakeFetcher<examples::Ping>("/test");
1798
1799 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
1800 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1801 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
1802 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1803
1804 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1805 pi2_event_loop->MakeFetcher<examples::Pong>("/test");
1806 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1807 pi1_event_loop->MakeFetcher<examples::Pong>("/test");
1808
1809 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
1810 pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
1811 const size_t ping_timestamp_channel = configuration::ChannelIndex(
1812 pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
1813
1814 const size_t pi2_timestamp_channel = configuration::ChannelIndex(
1815 pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
1816 const size_t pong_timestamp_channel = configuration::ChannelIndex(
1817 pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
1818
1819 const chrono::nanoseconds network_delay = event_loop_factory_.network_delay();
1820 const chrono::nanoseconds send_delay = event_loop_factory_.send_delay();
1821
1822 for (std::pair<int, std::string> channel :
1823 shared()
1824 ? std::vector<
1825 std::pair<int, std::string>>{{-1,
1826 "/aos/remote_timestamps/pi2"}}
1827 : std::vector<std::pair<int, std::string>>{
1828 {pi1_timestamp_channel,
1829 "/aos/remote_timestamps/pi2/pi1/aos/"
1830 "aos-message_bridge-Timestamp"},
1831 {ping_timestamp_channel,
1832 "/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
1833 pi1_event_loop->MakeWatcher(
1834 channel.second,
1835 [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
1836 ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
1837 &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
1838 &ping_on_pi2_fetcher, network_delay, send_delay,
1839 channel_index = channel.first](const RemoteMessage &header) {
1840 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1841 chrono::nanoseconds(header.monotonic_sent_time()));
1842 const aos::realtime_clock::time_point header_realtime_sent_time(
1843 chrono::nanoseconds(header.realtime_sent_time()));
1844 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1845 chrono::nanoseconds(header.monotonic_remote_time()));
1846 const aos::realtime_clock::time_point header_realtime_remote_time(
1847 chrono::nanoseconds(header.realtime_remote_time()));
1848
1849 if (channel_index != -1) {
1850 ASSERT_EQ(channel_index, header.channel_index());
1851 }
1852
1853 const Context *pi1_context = nullptr;
1854 const Context *pi2_context = nullptr;
1855
1856 if (header.channel_index() == pi1_timestamp_channel) {
1857 ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
1858 ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
1859 pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
1860 pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
1861 } else if (header.channel_index() == ping_timestamp_channel) {
1862 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
1863 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
1864 pi1_context = &ping_on_pi1_fetcher.context();
1865 pi2_context = &ping_on_pi2_fetcher.context();
1866 } else {
1867 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1868 << configuration::CleanedChannelToString(
1869 pi1_event_loop->configuration()->channels()->Get(
1870 header.channel_index()));
1871 }
1872
1873 ASSERT_TRUE(header.has_boot_uuid());
1874 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1875 pi2_event_loop->boot_uuid());
1876
1877 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
1878 EXPECT_EQ(pi2_context->remote_queue_index,
1879 header.remote_queue_index());
1880 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
1881
1882 EXPECT_EQ(pi2_context->monotonic_event_time,
1883 header_monotonic_sent_time);
1884 EXPECT_EQ(pi2_context->realtime_event_time,
1885 header_realtime_sent_time);
1886 EXPECT_EQ(pi2_context->realtime_remote_time,
1887 header_realtime_remote_time);
1888 EXPECT_EQ(pi2_context->monotonic_remote_time,
1889 header_monotonic_remote_time);
1890
1891 EXPECT_EQ(pi1_context->realtime_event_time,
1892 header_realtime_remote_time);
1893 EXPECT_EQ(pi1_context->monotonic_event_time,
1894 header_monotonic_remote_time);
1895
1896 // Time estimation isn't perfect, but we know the clocks were
1897 // identical when logged, so we know when this should have come back.
1898 // Confirm we got it when we expected.
1899 EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
1900 pi1_context->monotonic_event_time + 2 * network_delay +
1901 send_delay);
1902 });
1903 }
1904 for (std::pair<int, std::string> channel :
1905 shared()
1906 ? std::vector<
1907 std::pair<int, std::string>>{{-1,
1908 "/aos/remote_timestamps/pi1"}}
1909 : std::vector<std::pair<int, std::string>>{
1910 {pi2_timestamp_channel,
1911 "/aos/remote_timestamps/pi1/pi2/aos/"
1912 "aos-message_bridge-Timestamp"}}) {
1913 pi2_event_loop->MakeWatcher(
1914 channel.second,
1915 [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
1916 pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
1917 &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
1918 &pong_on_pi1_fetcher, network_delay, send_delay,
1919 channel_index = channel.first](const RemoteMessage &header) {
1920 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1921 chrono::nanoseconds(header.monotonic_sent_time()));
1922 const aos::realtime_clock::time_point header_realtime_sent_time(
1923 chrono::nanoseconds(header.realtime_sent_time()));
1924 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1925 chrono::nanoseconds(header.monotonic_remote_time()));
1926 const aos::realtime_clock::time_point header_realtime_remote_time(
1927 chrono::nanoseconds(header.realtime_remote_time()));
1928
1929 if (channel_index != -1) {
1930 ASSERT_EQ(channel_index, header.channel_index());
1931 }
1932
1933 const Context *pi2_context = nullptr;
1934 const Context *pi1_context = nullptr;
1935
1936 if (header.channel_index() == pi2_timestamp_channel) {
1937 ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
1938 ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
1939 pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
1940 pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
1941 } else if (header.channel_index() == pong_timestamp_channel) {
1942 ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
1943 ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
1944 pi2_context = &pong_on_pi2_fetcher.context();
1945 pi1_context = &pong_on_pi1_fetcher.context();
1946 } else {
1947 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1948 << configuration::CleanedChannelToString(
1949 pi2_event_loop->configuration()->channels()->Get(
1950 header.channel_index()));
1951 }
1952
1953 ASSERT_TRUE(header.has_boot_uuid());
1954 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1955 pi1_event_loop->boot_uuid());
1956
1957 EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
1958 EXPECT_EQ(pi1_context->remote_queue_index,
1959 header.remote_queue_index());
1960 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
1961
1962 EXPECT_EQ(pi1_context->monotonic_event_time,
1963 header_monotonic_sent_time);
1964 EXPECT_EQ(pi1_context->realtime_event_time,
1965 header_realtime_sent_time);
1966 EXPECT_EQ(pi1_context->realtime_remote_time,
1967 header_realtime_remote_time);
1968 EXPECT_EQ(pi1_context->monotonic_remote_time,
1969 header_monotonic_remote_time);
1970
1971 EXPECT_EQ(pi2_context->realtime_event_time,
1972 header_realtime_remote_time);
1973 EXPECT_EQ(pi2_context->monotonic_event_time,
1974 header_monotonic_remote_time);
1975
1976 // Time estimation isn't perfect, but we know the clocks were
1977 // identical when logged, so we know when this should have come back.
1978 // Confirm we got it when we expected.
1979 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
1980 pi2_context->monotonic_event_time + 2 * network_delay +
1981 send_delay);
1982 });
1983 }
1984
1985 // And confirm we can re-create a log again, while checking the contents.
1986 {
1987 LoggerState pi1_logger = MakeLogger(
1988 log_reader_factory.GetNodeEventLoopFactory("pi1"), &log_reader_factory);
1989 LoggerState pi2_logger = MakeLogger(
1990 log_reader_factory.GetNodeEventLoopFactory("pi2"), &log_reader_factory);
1991
1992 StartLogger(&pi1_logger, tmp_dir_ + "/relogged1");
1993 StartLogger(&pi2_logger, tmp_dir_ + "/relogged2");
1994
1995 log_reader_factory.Run();
1996 }
1997
1998 reader.Deregister();
1999
2000 // And verify that we can run the LogReader over the relogged files without
2001 // hitting any fatal errors.
2002 {
2003 LogReader relogged_reader(SortParts(MakeLogFiles(
2004 tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 3, 3, true)));
2005 relogged_reader.Register();
2006
2007 relogged_reader.event_loop_factory()->Run();
2008 }
2009 // And confirm that we can read the logged file using the reader's
2010 // configuration.
2011 {
2012 LogReader relogged_reader(
2013 SortParts(MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2",
2014 3, 3, true)),
2015 reader.configuration());
2016 relogged_reader.Register();
2017
2018 relogged_reader.event_loop_factory()->Run();
2019 }
2020}
2021
2022// Tests that we properly populate and extract the logger_start time by setting
2023// up a clock difference between 2 nodes and looking at the resulting parts.
2024TEST_P(MultinodeLoggerTest, LoggerStartTime) {
2025 std::vector<std::string> actual_filenames;
2026 time_converter_.AddMonotonic(
2027 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2028 {
2029 LoggerState pi1_logger = MakeLogger(pi1_);
2030 LoggerState pi2_logger = MakeLogger(pi2_);
2031
2032 StartLogger(&pi1_logger);
2033 StartLogger(&pi2_logger);
2034
2035 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2036
2037 pi1_logger.AppendAllFilenames(&actual_filenames);
2038 pi2_logger.AppendAllFilenames(&actual_filenames);
2039 }
2040
2041 ASSERT_THAT(actual_filenames,
2042 ::testing::UnorderedElementsAreArray(logfiles_));
2043
2044 for (const LogFile &log_file : SortParts(logfiles_)) {
2045 for (const LogParts &log_part : log_file.parts) {
2046 if (log_part.node == log_file.logger_node) {
2047 EXPECT_EQ(log_part.logger_monotonic_start_time,
2048 aos::monotonic_clock::min_time);
2049 EXPECT_EQ(log_part.logger_realtime_start_time,
2050 aos::realtime_clock::min_time);
2051 } else {
2052 const chrono::seconds offset = log_file.logger_node == "pi1"
2053 ? -chrono::seconds(1000)
2054 : chrono::seconds(1000);
2055 EXPECT_EQ(log_part.logger_monotonic_start_time,
2056 log_part.monotonic_start_time + offset);
2057 EXPECT_EQ(log_part.logger_realtime_start_time,
2058 log_file.realtime_start_time +
2059 (log_part.logger_monotonic_start_time -
2060 log_file.monotonic_start_time));
2061 }
2062 }
2063 }
2064}
2065
2066// Test that renaming the base, renames the folder.
2067TEST_P(MultinodeLoggerTest, LoggerRenameFolder) {
2068 util::UnlinkRecursive(tmp_dir_ + "/renamefolder");
2069 util::UnlinkRecursive(tmp_dir_ + "/new-good");
2070 time_converter_.AddMonotonic(
2071 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2072 logfile_base1_ = tmp_dir_ + "/renamefolder/multi_logfile1";
2073 logfile_base2_ = tmp_dir_ + "/renamefolder/multi_logfile2";
2074 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
2075 LoggerState pi1_logger = MakeLogger(pi1_);
2076 LoggerState pi2_logger = MakeLogger(pi2_);
2077
2078 StartLogger(&pi1_logger);
2079 StartLogger(&pi2_logger);
2080
2081 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2082 logfile_base1_ = tmp_dir_ + "/new-good/multi_logfile1";
2083 logfile_base2_ = tmp_dir_ + "/new-good/multi_logfile2";
2084 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
Alexei Strotscaf17d32023-04-03 22:31:11 -07002085
2086 // Sequence of set_base_name and Rotate simulates rename operation. Since
2087 // rename is not supported by all namers, RenameLogBase moved from logger to
2088 // the higher level abstraction, yet log_namers support rename, and it is
2089 // legal to test it here.
2090 pi1_logger.log_namer->set_base_name(logfile_base1_);
2091 pi1_logger.logger->Rotate();
2092 pi2_logger.log_namer->set_base_name(logfile_base2_);
2093 pi2_logger.logger->Rotate();
2094
Naman Guptaa63aa132023-03-22 20:06:34 -07002095 for (auto &file : logfiles_) {
2096 struct stat s;
2097 EXPECT_EQ(0, stat(file.c_str(), &s));
2098 }
2099}
2100
2101// Test that renaming the file base dies.
2102TEST_P(MultinodeLoggerDeathTest, LoggerRenameFile) {
2103 time_converter_.AddMonotonic(
2104 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2105 util::UnlinkRecursive(tmp_dir_ + "/renamefile");
2106 logfile_base1_ = tmp_dir_ + "/renamefile/multi_logfile1";
2107 logfile_base2_ = tmp_dir_ + "/renamefile/multi_logfile2";
2108 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
2109 LoggerState pi1_logger = MakeLogger(pi1_);
2110 StartLogger(&pi1_logger);
2111 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2112 logfile_base1_ = tmp_dir_ + "/new-renamefile/new_multi_logfile1";
Alexei Strotscaf17d32023-04-03 22:31:11 -07002113 EXPECT_DEATH({ pi1_logger.log_namer->set_base_name(logfile_base1_); },
Naman Guptaa63aa132023-03-22 20:06:34 -07002114 "Rename of file base from");
2115}
2116
2117// TODO(austin): We can write a test which recreates a logfile and confirms that
2118// we get it back. That is the ultimate test.
2119
2120// Tests that we properly recreate forwarded timestamps when replaying a log.
2121// This should be enough that we can then re-run the logger and get a valid log
2122// back.
2123TEST_P(MultinodeLoggerTest, RemoteReboot) {
2124 std::vector<std::string> actual_filenames;
2125
2126 const UUID pi1_boot0 = UUID::Random();
2127 const UUID pi2_boot0 = UUID::Random();
2128 const UUID pi2_boot1 = UUID::Random();
2129 {
2130 CHECK_EQ(pi1_index_, 0u);
2131 CHECK_EQ(pi2_index_, 1u);
2132
2133 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2134 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2135 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2136
2137 time_converter_.AddNextTimestamp(
2138 distributed_clock::epoch(),
2139 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2140 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2141 time_converter_.AddNextTimestamp(
2142 distributed_clock::epoch() + reboot_time,
2143 {BootTimestamp::epoch() + reboot_time,
2144 BootTimestamp{
2145 .boot = 1,
2146 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2147 }
2148
2149 {
2150 LoggerState pi1_logger = MakeLogger(pi1_);
2151
2152 event_loop_factory_.RunFor(chrono::milliseconds(95));
2153 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2154 pi1_boot0);
2155 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2156 pi2_boot0);
2157
2158 StartLogger(&pi1_logger);
2159
2160 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2161
2162 VLOG(1) << "Reboot now!";
2163
2164 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2165 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2166 pi1_boot0);
2167 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2168 pi2_boot1);
2169
2170 pi1_logger.AppendAllFilenames(&actual_filenames);
2171 }
2172
2173 std::sort(actual_filenames.begin(), actual_filenames.end());
2174 std::sort(pi1_reboot_logfiles_.begin(), pi1_reboot_logfiles_.end());
2175 ASSERT_THAT(actual_filenames,
2176 ::testing::UnorderedElementsAreArray(pi1_reboot_logfiles_));
2177
2178 // Confirm that our new oldest timestamps properly update as we reboot and
2179 // rotate.
2180 for (const std::string &file : pi1_reboot_logfiles_) {
2181 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2182 ReadHeader(file);
2183 CHECK(log_header);
2184 if (log_header->message().has_configuration()) {
2185 continue;
2186 }
2187
2188 const monotonic_clock::time_point monotonic_start_time =
2189 monotonic_clock::time_point(
2190 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2191 const UUID source_node_boot_uuid = UUID::FromString(
2192 log_header->message().source_node_boot_uuid()->string_view());
2193
2194 if (log_header->message().node()->name()->string_view() != "pi1") {
2195 // The remote message channel should rotate later and have more parts.
2196 // This only is true on the log files with shared remote messages.
2197 //
2198 // TODO(austin): I'm not the most thrilled with this test pattern... It
2199 // feels brittle in a different way.
2200 if (file.find("aos.message_bridge.RemoteMessage") == std::string::npos ||
2201 !shared()) {
2202 switch (log_header->message().parts_index()) {
2203 case 0:
2204 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2205 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2206 break;
2207 case 1:
2208 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2209 ASSERT_EQ(monotonic_start_time,
2210 monotonic_clock::epoch() + chrono::seconds(1));
2211 break;
2212 case 2:
2213 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2214 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2215 break;
2216 case 3:
2217 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2218 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
2219 chrono::nanoseconds(2322999462))
2220 << " on " << file;
2221 break;
2222 default:
2223 FAIL();
2224 break;
2225 }
2226 } else {
2227 switch (log_header->message().parts_index()) {
2228 case 0:
2229 case 1:
2230 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2231 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2232 break;
2233 case 2:
2234 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2235 ASSERT_EQ(monotonic_start_time,
2236 monotonic_clock::epoch() + chrono::seconds(1));
2237 break;
2238 case 3:
2239 case 4:
2240 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2241 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2242 break;
2243 case 5:
2244 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2245 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
2246 chrono::nanoseconds(2322999462))
2247 << " on " << file;
2248 break;
2249 default:
2250 FAIL();
2251 break;
2252 }
2253 }
2254 continue;
2255 }
2256 SCOPED_TRACE(file);
2257 SCOPED_TRACE(aos::FlatbufferToJson(
2258 *log_header, {.multi_line = true, .max_vector_size = 100}));
2259 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2260 ASSERT_EQ(
2261 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2262 EXPECT_EQ(
2263 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2264 monotonic_clock::max_time.time_since_epoch().count());
2265 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2266 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2267 2u);
2268 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2269 monotonic_clock::max_time.time_since_epoch().count());
2270 ASSERT_TRUE(log_header->message()
2271 .has_oldest_remote_unreliable_monotonic_timestamps());
2272 ASSERT_EQ(log_header->message()
2273 .oldest_remote_unreliable_monotonic_timestamps()
2274 ->size(),
2275 2u);
2276 EXPECT_EQ(log_header->message()
2277 .oldest_remote_unreliable_monotonic_timestamps()
2278 ->Get(0),
2279 monotonic_clock::max_time.time_since_epoch().count());
2280 ASSERT_TRUE(log_header->message()
2281 .has_oldest_local_unreliable_monotonic_timestamps());
2282 ASSERT_EQ(log_header->message()
2283 .oldest_local_unreliable_monotonic_timestamps()
2284 ->size(),
2285 2u);
2286 EXPECT_EQ(log_header->message()
2287 .oldest_local_unreliable_monotonic_timestamps()
2288 ->Get(0),
2289 monotonic_clock::max_time.time_since_epoch().count());
2290
2291 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2292 monotonic_clock::time_point(chrono::nanoseconds(
2293 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2294 1)));
2295 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2296 monotonic_clock::time_point(chrono::nanoseconds(
2297 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2298 const monotonic_clock::time_point
2299 oldest_remote_unreliable_monotonic_timestamps =
2300 monotonic_clock::time_point(chrono::nanoseconds(
2301 log_header->message()
2302 .oldest_remote_unreliable_monotonic_timestamps()
2303 ->Get(1)));
2304 const monotonic_clock::time_point
2305 oldest_local_unreliable_monotonic_timestamps =
2306 monotonic_clock::time_point(chrono::nanoseconds(
2307 log_header->message()
2308 .oldest_local_unreliable_monotonic_timestamps()
2309 ->Get(1)));
2310 const monotonic_clock::time_point
2311 oldest_remote_reliable_monotonic_timestamps =
2312 monotonic_clock::time_point(chrono::nanoseconds(
2313 log_header->message()
2314 .oldest_remote_reliable_monotonic_timestamps()
2315 ->Get(1)));
2316 const monotonic_clock::time_point
2317 oldest_local_reliable_monotonic_timestamps =
2318 monotonic_clock::time_point(chrono::nanoseconds(
2319 log_header->message()
2320 .oldest_local_reliable_monotonic_timestamps()
2321 ->Get(1)));
2322 const monotonic_clock::time_point
2323 oldest_logger_remote_unreliable_monotonic_timestamps =
2324 monotonic_clock::time_point(chrono::nanoseconds(
2325 log_header->message()
2326 .oldest_logger_remote_unreliable_monotonic_timestamps()
2327 ->Get(0)));
2328 const monotonic_clock::time_point
2329 oldest_logger_local_unreliable_monotonic_timestamps =
2330 monotonic_clock::time_point(chrono::nanoseconds(
2331 log_header->message()
2332 .oldest_logger_local_unreliable_monotonic_timestamps()
2333 ->Get(0)));
2334 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2335 monotonic_clock::max_time);
2336 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2337 monotonic_clock::max_time);
2338 switch (log_header->message().parts_index()) {
2339 case 0:
2340 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2341 monotonic_clock::max_time);
2342 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2343 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2344 monotonic_clock::max_time);
2345 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2346 monotonic_clock::max_time);
2347 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2348 monotonic_clock::max_time);
2349 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2350 monotonic_clock::max_time);
2351 break;
2352 case 1:
2353 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2354 monotonic_clock::time_point(chrono::microseconds(90200)));
2355 EXPECT_EQ(oldest_local_monotonic_timestamps,
2356 monotonic_clock::time_point(chrono::microseconds(90350)));
2357 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2358 monotonic_clock::time_point(chrono::microseconds(90200)));
2359 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2360 monotonic_clock::time_point(chrono::microseconds(90350)));
2361 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2362 monotonic_clock::max_time);
2363 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2364 monotonic_clock::max_time);
2365 break;
2366 case 2:
2367 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2368 monotonic_clock::time_point(chrono::microseconds(90200)))
2369 << file;
2370 EXPECT_EQ(oldest_local_monotonic_timestamps,
2371 monotonic_clock::time_point(chrono::microseconds(90350)))
2372 << file;
2373 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2374 monotonic_clock::time_point(chrono::microseconds(90200)))
2375 << file;
2376 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2377 monotonic_clock::time_point(chrono::microseconds(90350)))
2378 << file;
2379 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2380 monotonic_clock::time_point(chrono::microseconds(100000)))
2381 << file;
2382 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2383 monotonic_clock::time_point(chrono::microseconds(100150)))
2384 << file;
2385 break;
2386 case 3:
2387 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2388 monotonic_clock::time_point(chrono::milliseconds(1323) +
2389 chrono::microseconds(200)));
2390 EXPECT_EQ(oldest_local_monotonic_timestamps,
2391 monotonic_clock::time_point(chrono::microseconds(10100350)));
2392 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2393 monotonic_clock::time_point(chrono::milliseconds(1323) +
2394 chrono::microseconds(200)));
2395 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2396 monotonic_clock::time_point(chrono::microseconds(10100350)));
2397 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2398 monotonic_clock::max_time)
2399 << file;
2400 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2401 monotonic_clock::max_time)
2402 << file;
2403 break;
2404 case 4:
2405 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2406 monotonic_clock::time_point(chrono::milliseconds(1323) +
2407 chrono::microseconds(200)));
2408 EXPECT_EQ(oldest_local_monotonic_timestamps,
2409 monotonic_clock::time_point(chrono::microseconds(10100350)));
2410 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2411 monotonic_clock::time_point(chrono::milliseconds(1323) +
2412 chrono::microseconds(200)));
2413 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2414 monotonic_clock::time_point(chrono::microseconds(10100350)));
2415 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2416 monotonic_clock::time_point(chrono::microseconds(1423000)))
2417 << file;
2418 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2419 monotonic_clock::time_point(chrono::microseconds(10200150)))
2420 << file;
2421 break;
2422 default:
2423 FAIL();
2424 break;
2425 }
2426 }
2427
2428 // Confirm that we refuse to replay logs with missing boot uuids.
2429 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002430 auto sorted_parts = SortParts(pi1_reboot_logfiles_);
2431 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2432 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002433
2434 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2435 log_reader_factory.set_send_delay(chrono::microseconds(0));
2436
2437 // This sends out the fetched messages and advances time to the start of
2438 // the log file.
2439 reader.Register(&log_reader_factory);
2440
2441 log_reader_factory.Run();
2442
2443 reader.Deregister();
2444 }
2445}
2446
2447// Tests that we can sort a log which only has timestamps from the remote
2448// because the local message_bridge_client failed to connect.
2449TEST_P(MultinodeLoggerTest, RemoteRebootOnlyTimestamps) {
2450 const UUID pi1_boot0 = UUID::Random();
2451 const UUID pi2_boot0 = UUID::Random();
2452 const UUID pi2_boot1 = UUID::Random();
2453 {
2454 CHECK_EQ(pi1_index_, 0u);
2455 CHECK_EQ(pi2_index_, 1u);
2456
2457 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2458 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2459 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2460
2461 time_converter_.AddNextTimestamp(
2462 distributed_clock::epoch(),
2463 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2464 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2465 time_converter_.AddNextTimestamp(
2466 distributed_clock::epoch() + reboot_time,
2467 {BootTimestamp::epoch() + reboot_time,
2468 BootTimestamp{
2469 .boot = 1,
2470 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2471 }
2472 pi2_->Disconnect(pi1_->node());
2473
2474 std::vector<std::string> filenames;
2475 {
2476 LoggerState pi1_logger = MakeLogger(pi1_);
2477
2478 event_loop_factory_.RunFor(chrono::milliseconds(95));
2479 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2480 pi1_boot0);
2481 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2482 pi2_boot0);
2483
2484 StartLogger(&pi1_logger);
2485
2486 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2487
2488 VLOG(1) << "Reboot now!";
2489
2490 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2491 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2492 pi1_boot0);
2493 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2494 pi2_boot1);
2495 pi1_logger.AppendAllFilenames(&filenames);
2496 }
2497
2498 std::sort(filenames.begin(), filenames.end());
2499
2500 // Confirm that our new oldest timestamps properly update as we reboot and
2501 // rotate.
2502 size_t timestamp_file_count = 0;
2503 for (const std::string &file : filenames) {
2504 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2505 ReadHeader(file);
2506 CHECK(log_header);
2507
2508 if (log_header->message().has_configuration()) {
2509 continue;
2510 }
2511
2512 const monotonic_clock::time_point monotonic_start_time =
2513 monotonic_clock::time_point(
2514 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2515 const UUID source_node_boot_uuid = UUID::FromString(
2516 log_header->message().source_node_boot_uuid()->string_view());
2517
2518 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2519 ASSERT_EQ(
2520 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2521 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2522 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2523 2u);
2524 ASSERT_TRUE(log_header->message()
2525 .has_oldest_remote_unreliable_monotonic_timestamps());
2526 ASSERT_EQ(log_header->message()
2527 .oldest_remote_unreliable_monotonic_timestamps()
2528 ->size(),
2529 2u);
2530 ASSERT_TRUE(log_header->message()
2531 .has_oldest_local_unreliable_monotonic_timestamps());
2532 ASSERT_EQ(log_header->message()
2533 .oldest_local_unreliable_monotonic_timestamps()
2534 ->size(),
2535 2u);
2536 ASSERT_TRUE(log_header->message()
2537 .has_oldest_remote_reliable_monotonic_timestamps());
2538 ASSERT_EQ(log_header->message()
2539 .oldest_remote_reliable_monotonic_timestamps()
2540 ->size(),
2541 2u);
2542 ASSERT_TRUE(
2543 log_header->message().has_oldest_local_reliable_monotonic_timestamps());
2544 ASSERT_EQ(log_header->message()
2545 .oldest_local_reliable_monotonic_timestamps()
2546 ->size(),
2547 2u);
2548
2549 ASSERT_TRUE(
2550 log_header->message()
2551 .has_oldest_logger_remote_unreliable_monotonic_timestamps());
2552 ASSERT_EQ(log_header->message()
2553 .oldest_logger_remote_unreliable_monotonic_timestamps()
2554 ->size(),
2555 2u);
2556 ASSERT_TRUE(log_header->message()
2557 .has_oldest_logger_local_unreliable_monotonic_timestamps());
2558 ASSERT_EQ(log_header->message()
2559 .oldest_logger_local_unreliable_monotonic_timestamps()
2560 ->size(),
2561 2u);
2562
2563 if (log_header->message().node()->name()->string_view() != "pi1") {
2564 ASSERT_TRUE(file.find("aos.message_bridge.RemoteMessage") !=
2565 std::string::npos);
2566
2567 const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
2568 ReadNthMessage(file, 0);
2569 CHECK(msg);
2570
2571 EXPECT_TRUE(msg->message().has_monotonic_sent_time());
2572 EXPECT_TRUE(msg->message().has_monotonic_remote_time());
2573
2574 const monotonic_clock::time_point
2575 expected_oldest_local_monotonic_timestamps(
2576 chrono::nanoseconds(msg->message().monotonic_sent_time()));
2577 const monotonic_clock::time_point
2578 expected_oldest_remote_monotonic_timestamps(
2579 chrono::nanoseconds(msg->message().monotonic_remote_time()));
2580 const monotonic_clock::time_point
2581 expected_oldest_timestamp_monotonic_timestamps(
2582 chrono::nanoseconds(msg->message().monotonic_timestamp_time()));
2583
2584 EXPECT_NE(expected_oldest_local_monotonic_timestamps,
2585 monotonic_clock::min_time);
2586 EXPECT_NE(expected_oldest_remote_monotonic_timestamps,
2587 monotonic_clock::min_time);
2588 EXPECT_NE(expected_oldest_timestamp_monotonic_timestamps,
2589 monotonic_clock::min_time);
2590
2591 ++timestamp_file_count;
2592 // Since the log file is from the perspective of the other node,
2593 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2594 monotonic_clock::time_point(chrono::nanoseconds(
2595 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2596 0)));
2597 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2598 monotonic_clock::time_point(chrono::nanoseconds(
2599 log_header->message().oldest_local_monotonic_timestamps()->Get(
2600 0)));
2601 const monotonic_clock::time_point
2602 oldest_remote_unreliable_monotonic_timestamps =
2603 monotonic_clock::time_point(chrono::nanoseconds(
2604 log_header->message()
2605 .oldest_remote_unreliable_monotonic_timestamps()
2606 ->Get(0)));
2607 const monotonic_clock::time_point
2608 oldest_local_unreliable_monotonic_timestamps =
2609 monotonic_clock::time_point(chrono::nanoseconds(
2610 log_header->message()
2611 .oldest_local_unreliable_monotonic_timestamps()
2612 ->Get(0)));
2613 const monotonic_clock::time_point
2614 oldest_remote_reliable_monotonic_timestamps =
2615 monotonic_clock::time_point(chrono::nanoseconds(
2616 log_header->message()
2617 .oldest_remote_reliable_monotonic_timestamps()
2618 ->Get(0)));
2619 const monotonic_clock::time_point
2620 oldest_local_reliable_monotonic_timestamps =
2621 monotonic_clock::time_point(chrono::nanoseconds(
2622 log_header->message()
2623 .oldest_local_reliable_monotonic_timestamps()
2624 ->Get(0)));
2625 const monotonic_clock::time_point
2626 oldest_logger_remote_unreliable_monotonic_timestamps =
2627 monotonic_clock::time_point(chrono::nanoseconds(
2628 log_header->message()
2629 .oldest_logger_remote_unreliable_monotonic_timestamps()
2630 ->Get(1)));
2631 const monotonic_clock::time_point
2632 oldest_logger_local_unreliable_monotonic_timestamps =
2633 monotonic_clock::time_point(chrono::nanoseconds(
2634 log_header->message()
2635 .oldest_logger_local_unreliable_monotonic_timestamps()
2636 ->Get(1)));
2637
2638 const Channel *channel =
2639 event_loop_factory_.configuration()->channels()->Get(
2640 msg->message().channel_index());
2641 const Connection *connection = configuration::ConnectionToNode(
2642 channel, configuration::GetNode(
2643 event_loop_factory_.configuration(),
2644 log_header->message().node()->name()->string_view()));
2645
2646 const bool reliable = connection->time_to_live() == 0;
2647
2648 SCOPED_TRACE(file);
2649 SCOPED_TRACE(aos::FlatbufferToJson(
2650 *log_header, {.multi_line = true, .max_vector_size = 100}));
2651
2652 if (shared()) {
2653 // Confirm that the oldest timestamps match what we expect. Based on
2654 // what we are doing, we know that the oldest time is the first
2655 // message's time.
2656 //
2657 // This makes the test robust to both the split and combined config
2658 // tests.
2659 switch (log_header->message().parts_index()) {
2660 case 0:
2661 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2662 expected_oldest_remote_monotonic_timestamps);
2663 EXPECT_EQ(oldest_local_monotonic_timestamps,
2664 expected_oldest_local_monotonic_timestamps);
2665 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2666 expected_oldest_local_monotonic_timestamps)
2667 << file;
2668 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2669 expected_oldest_timestamp_monotonic_timestamps)
2670 << file;
2671
2672 if (reliable) {
2673 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2674 expected_oldest_remote_monotonic_timestamps);
2675 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2676 expected_oldest_local_monotonic_timestamps);
2677 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2678 monotonic_clock::max_time);
2679 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2680 monotonic_clock::max_time);
2681 } else {
2682 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2683 monotonic_clock::max_time);
2684 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2685 monotonic_clock::max_time);
2686 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2687 expected_oldest_remote_monotonic_timestamps);
2688 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2689 expected_oldest_local_monotonic_timestamps);
2690 }
2691 break;
2692 case 1:
2693 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2694 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2695 EXPECT_EQ(oldest_local_monotonic_timestamps,
2696 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2697 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2698 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2699 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2700 monotonic_clock::epoch() + chrono::nanoseconds(90250000));
2701 if (reliable) {
2702 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2703 expected_oldest_remote_monotonic_timestamps);
2704 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2705 expected_oldest_local_monotonic_timestamps);
2706 EXPECT_EQ(
2707 oldest_remote_unreliable_monotonic_timestamps,
2708 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2709 EXPECT_EQ(
2710 oldest_local_unreliable_monotonic_timestamps,
2711 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2712 } else {
2713 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2714 monotonic_clock::max_time);
2715 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2716 monotonic_clock::max_time);
2717 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2718 expected_oldest_remote_monotonic_timestamps);
2719 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2720 expected_oldest_local_monotonic_timestamps);
2721 }
2722 break;
2723 case 2:
2724 EXPECT_EQ(
2725 oldest_remote_monotonic_timestamps,
2726 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2727 EXPECT_EQ(
2728 oldest_local_monotonic_timestamps,
2729 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2730 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2731 expected_oldest_local_monotonic_timestamps)
2732 << file;
2733 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2734 expected_oldest_timestamp_monotonic_timestamps)
2735 << file;
2736 if (reliable) {
2737 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2738 expected_oldest_remote_monotonic_timestamps);
2739 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2740 expected_oldest_local_monotonic_timestamps);
2741 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2742 monotonic_clock::max_time);
2743 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2744 monotonic_clock::max_time);
2745 } else {
2746 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2747 monotonic_clock::max_time);
2748 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2749 monotonic_clock::max_time);
2750 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2751 expected_oldest_remote_monotonic_timestamps);
2752 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2753 expected_oldest_local_monotonic_timestamps);
2754 }
2755 break;
2756
2757 case 3:
2758 EXPECT_EQ(
2759 oldest_remote_monotonic_timestamps,
2760 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2761 EXPECT_EQ(
2762 oldest_local_monotonic_timestamps,
2763 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2764 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2765 expected_oldest_remote_monotonic_timestamps);
2766 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2767 expected_oldest_local_monotonic_timestamps);
2768 EXPECT_EQ(
2769 oldest_logger_remote_unreliable_monotonic_timestamps,
2770 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2771 EXPECT_EQ(
2772 oldest_logger_local_unreliable_monotonic_timestamps,
2773 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2774 break;
2775 default:
2776 FAIL();
2777 break;
2778 }
2779
2780 switch (log_header->message().parts_index()) {
2781 case 0:
2782 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2783 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2784 break;
2785 case 1:
2786 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2787 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2788 break;
2789 case 2:
2790 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2791 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2792 break;
2793 case 3:
2794 if (shared()) {
2795 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2796 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2797 break;
2798 }
2799 [[fallthrough]];
2800 default:
2801 FAIL();
2802 break;
2803 }
2804 } else {
2805 switch (log_header->message().parts_index()) {
2806 case 0:
2807 if (reliable) {
2808 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2809 monotonic_clock::max_time);
2810 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2811 monotonic_clock::max_time);
2812 EXPECT_EQ(
2813 oldest_logger_remote_unreliable_monotonic_timestamps,
2814 monotonic_clock::epoch() + chrono::nanoseconds(100150000))
2815 << file;
2816 EXPECT_EQ(
2817 oldest_logger_local_unreliable_monotonic_timestamps,
2818 monotonic_clock::epoch() + chrono::nanoseconds(100250000))
2819 << file;
2820 } else {
2821 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2822 expected_oldest_remote_monotonic_timestamps);
2823 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2824 expected_oldest_local_monotonic_timestamps);
2825 EXPECT_EQ(
2826 oldest_logger_remote_unreliable_monotonic_timestamps,
2827 monotonic_clock::epoch() + chrono::nanoseconds(90150000))
2828 << file;
2829 EXPECT_EQ(
2830 oldest_logger_local_unreliable_monotonic_timestamps,
2831 monotonic_clock::epoch() + chrono::nanoseconds(90250000))
2832 << file;
2833 }
2834 break;
2835 case 1:
2836 if (reliable) {
2837 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2838 monotonic_clock::max_time);
2839 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2840 monotonic_clock::max_time);
2841 EXPECT_EQ(
2842 oldest_logger_remote_unreliable_monotonic_timestamps,
2843 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2844 EXPECT_EQ(
2845 oldest_logger_local_unreliable_monotonic_timestamps,
2846 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2847 } else {
2848 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2849 expected_oldest_remote_monotonic_timestamps);
2850 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2851 expected_oldest_local_monotonic_timestamps);
2852 EXPECT_EQ(
2853 oldest_logger_remote_unreliable_monotonic_timestamps,
2854 monotonic_clock::epoch() + chrono::nanoseconds(1323150000));
2855 EXPECT_EQ(
2856 oldest_logger_local_unreliable_monotonic_timestamps,
2857 monotonic_clock::epoch() + chrono::nanoseconds(10100250000));
2858 }
2859 break;
2860 default:
2861 FAIL();
2862 break;
2863 }
2864
2865 switch (log_header->message().parts_index()) {
2866 case 0:
2867 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2868 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2869 break;
2870 case 1:
2871 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2872 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2873 break;
2874 default:
2875 FAIL();
2876 break;
2877 }
2878 }
2879
2880 continue;
2881 }
2882 EXPECT_EQ(
2883 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2884 monotonic_clock::max_time.time_since_epoch().count());
2885 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2886 monotonic_clock::max_time.time_since_epoch().count());
2887 EXPECT_EQ(log_header->message()
2888 .oldest_remote_unreliable_monotonic_timestamps()
2889 ->Get(0),
2890 monotonic_clock::max_time.time_since_epoch().count());
2891 EXPECT_EQ(log_header->message()
2892 .oldest_local_unreliable_monotonic_timestamps()
2893 ->Get(0),
2894 monotonic_clock::max_time.time_since_epoch().count());
2895
2896 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2897 monotonic_clock::time_point(chrono::nanoseconds(
2898 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2899 1)));
2900 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2901 monotonic_clock::time_point(chrono::nanoseconds(
2902 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2903 const monotonic_clock::time_point
2904 oldest_remote_unreliable_monotonic_timestamps =
2905 monotonic_clock::time_point(chrono::nanoseconds(
2906 log_header->message()
2907 .oldest_remote_unreliable_monotonic_timestamps()
2908 ->Get(1)));
2909 const monotonic_clock::time_point
2910 oldest_local_unreliable_monotonic_timestamps =
2911 monotonic_clock::time_point(chrono::nanoseconds(
2912 log_header->message()
2913 .oldest_local_unreliable_monotonic_timestamps()
2914 ->Get(1)));
2915 switch (log_header->message().parts_index()) {
2916 case 0:
2917 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2918 monotonic_clock::max_time);
2919 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2920 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2921 monotonic_clock::max_time);
2922 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2923 monotonic_clock::max_time);
2924 break;
2925 default:
2926 FAIL();
2927 break;
2928 }
2929 }
2930
2931 if (shared()) {
2932 EXPECT_EQ(timestamp_file_count, 4u);
2933 } else {
2934 EXPECT_EQ(timestamp_file_count, 4u);
2935 }
2936
2937 // Confirm that we can actually sort the resulting log and read it.
2938 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002939 auto sorted_parts = SortParts(filenames);
2940 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2941 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002942
2943 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2944 log_reader_factory.set_send_delay(chrono::microseconds(0));
2945
2946 // This sends out the fetched messages and advances time to the start of
2947 // the log file.
2948 reader.Register(&log_reader_factory);
2949
2950 log_reader_factory.Run();
2951
2952 reader.Deregister();
2953 }
2954}
2955
2956// Tests that we properly handle one direction of message_bridge being
2957// unavailable.
2958TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
2959 pi1_->Disconnect(pi2_->node());
2960 time_converter_.AddMonotonic(
2961 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2962
2963 time_converter_.AddMonotonic(
2964 {chrono::milliseconds(10000),
2965 chrono::milliseconds(10000) - chrono::milliseconds(1)});
2966 {
2967 LoggerState pi1_logger = MakeLogger(pi1_);
2968
2969 event_loop_factory_.RunFor(chrono::milliseconds(95));
2970
2971 StartLogger(&pi1_logger);
2972
2973 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2974 }
2975
2976 // Confirm that we can parse the result. LogReader has enough internal CHECKs
2977 // to confirm the right thing happened.
2978 ConfirmReadable(pi1_single_direction_logfiles_);
2979}
2980
2981// Tests that we properly handle one direction of message_bridge being
2982// unavailable.
2983TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
2984 pi1_->Disconnect(pi2_->node());
2985 time_converter_.AddMonotonic(
2986 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(500)});
2987
2988 time_converter_.AddMonotonic(
2989 {chrono::milliseconds(10000),
2990 chrono::milliseconds(10000) + chrono::milliseconds(1)});
2991 {
2992 LoggerState pi1_logger = MakeLogger(pi1_);
2993
2994 event_loop_factory_.RunFor(chrono::milliseconds(95));
2995
2996 StartLogger(&pi1_logger);
2997
2998 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2999 }
3000
3001 // Confirm that we can parse the result. LogReader has enough internal CHECKs
3002 // to confirm the right thing happened.
3003 ConfirmReadable(pi1_single_direction_logfiles_);
3004}
3005
3006// Tests that we explode if someone passes in a part file twice with a better
3007// error than an out of order error.
3008TEST_P(MultinodeLoggerTest, DuplicateLogFiles) {
3009 time_converter_.AddMonotonic(
3010 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
3011 {
3012 LoggerState pi1_logger = MakeLogger(pi1_);
3013
3014 event_loop_factory_.RunFor(chrono::milliseconds(95));
3015
3016 StartLogger(&pi1_logger);
3017
3018 event_loop_factory_.RunFor(chrono::milliseconds(10000));
3019 }
3020
3021 std::vector<std::string> duplicates;
3022 for (const std::string &f : pi1_single_direction_logfiles_) {
3023 duplicates.emplace_back(f);
3024 duplicates.emplace_back(f);
3025 }
3026 EXPECT_DEATH({ SortParts(duplicates); }, "Found duplicate parts in");
3027}
3028
3029// Tests that we explode if someone loses a part out of the middle of a log.
3030TEST_P(MultinodeLoggerTest, MissingPartsFromMiddle) {
3031 time_converter_.AddMonotonic(
3032 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
3033 {
3034 LoggerState pi1_logger = MakeLogger(pi1_);
3035
3036 event_loop_factory_.RunFor(chrono::milliseconds(95));
3037
3038 StartLogger(&pi1_logger);
3039 aos::monotonic_clock::time_point last_rotation_time =
3040 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07003041 pi1_logger.logger->set_on_logged_period(
3042 [&](aos::monotonic_clock::time_point) {
3043 const auto now = pi1_logger.event_loop->monotonic_now();
3044 if (now > last_rotation_time + std::chrono::seconds(5)) {
3045 pi1_logger.logger->Rotate();
3046 last_rotation_time = now;
3047 }
3048 });
Naman Guptaa63aa132023-03-22 20:06:34 -07003049
3050 event_loop_factory_.RunFor(chrono::milliseconds(10000));
3051 }
3052
3053 std::vector<std::string> missing_parts;
3054
3055 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
3056 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part2" + Extension());
3057 missing_parts.emplace_back(absl::StrCat(
3058 logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
3059
3060 EXPECT_DEATH({ SortParts(missing_parts); },
3061 "Broken log, missing part files between");
3062}
3063
3064// Tests that we properly handle a dead node. Do this by just disconnecting it
3065// and only using one nodes of logs.
3066TEST_P(MultinodeLoggerTest, DeadNode) {
3067 pi1_->Disconnect(pi2_->node());
3068 pi2_->Disconnect(pi1_->node());
3069 time_converter_.AddMonotonic(
3070 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
3071 {
3072 LoggerState pi1_logger = MakeLogger(pi1_);
3073
3074 event_loop_factory_.RunFor(chrono::milliseconds(95));
3075
3076 StartLogger(&pi1_logger);
3077
3078 event_loop_factory_.RunFor(chrono::milliseconds(10000));
3079 }
3080
3081 // Confirm that we can parse the result. LogReader has enough internal CHECKs
3082 // to confirm the right thing happened.
3083 ConfirmReadable(MakePi1DeadNodeLogfiles());
3084}
3085
3086// Tests that we can relog with a different config. This makes most sense when
3087// you are trying to edit a log and want to use channel renaming + the original
3088// config in the new log.
3089TEST_P(MultinodeLoggerTest, LogDifferentConfig) {
3090 time_converter_.StartEqual();
3091 {
3092 LoggerState pi1_logger = MakeLogger(pi1_);
3093 LoggerState pi2_logger = MakeLogger(pi2_);
3094
3095 event_loop_factory_.RunFor(chrono::milliseconds(95));
3096
3097 StartLogger(&pi1_logger);
3098 StartLogger(&pi2_logger);
3099
3100 event_loop_factory_.RunFor(chrono::milliseconds(20000));
3101 }
3102
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003103 auto sorted_parts = SortParts(logfiles_);
3104 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3105 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07003106 reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
3107
3108 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
3109 log_reader_factory.set_send_delay(chrono::microseconds(0));
3110
3111 // This sends out the fetched messages and advances time to the start of the
3112 // log file.
3113 reader.Register(&log_reader_factory);
3114
3115 const Node *pi1 =
3116 configuration::GetNode(log_reader_factory.configuration(), "pi1");
3117 const Node *pi2 =
3118 configuration::GetNode(log_reader_factory.configuration(), "pi2");
3119
3120 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
3121 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
3122 LOG(INFO) << "now pi1 "
3123 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
3124 LOG(INFO) << "now pi2 "
3125 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
3126
3127 EXPECT_THAT(reader.LoggedNodes(),
3128 ::testing::ElementsAre(
3129 configuration::GetNode(reader.logged_configuration(), pi1),
3130 configuration::GetNode(reader.logged_configuration(), pi2)));
3131
3132 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
3133
3134 // And confirm we can re-create a log again, while checking the contents.
3135 std::vector<std::string> log_files;
3136 {
3137 LoggerState pi1_logger =
3138 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
3139 &log_reader_factory, reader.logged_configuration());
3140 LoggerState pi2_logger =
3141 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
3142 &log_reader_factory, reader.logged_configuration());
3143
3144 pi1_logger.StartLogger(tmp_dir_ + "/relogged1");
3145 pi2_logger.StartLogger(tmp_dir_ + "/relogged2");
3146
3147 log_reader_factory.Run();
3148
3149 for (auto &x : pi1_logger.log_namer->all_filenames()) {
3150 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged1_", x));
3151 }
3152 for (auto &x : pi2_logger.log_namer->all_filenames()) {
3153 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged2_", x));
3154 }
3155 }
3156
3157 reader.Deregister();
3158
3159 // And verify that we can run the LogReader over the relogged files without
3160 // hitting any fatal errors.
3161 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003162 auto sorted_parts = SortParts(log_files);
3163 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3164 LogReader relogged_reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07003165 relogged_reader.Register();
3166
3167 relogged_reader.event_loop_factory()->Run();
3168 }
3169}
3170
3171// Tests that we properly replay a log where the start time for a node is before
3172// any data on the node. This can happen if the logger starts before data is
3173// published. While the scenario below is a bit convoluted, we have seen logs
3174// like this generated out in the wild.
3175TEST(MultinodeRebootLoggerTest, StartTimeBeforeData) {
3176 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3177 aos::configuration::ReadConfig(ArtifactPath(
3178 "aos/events/logging/multinode_pingpong_split3_config.json"));
3179 message_bridge::TestingTimeConverter time_converter(
3180 configuration::NodesCount(&config.message()));
3181 SimulatedEventLoopFactory event_loop_factory(&config.message());
3182 event_loop_factory.SetTimeConverter(&time_converter);
3183 NodeEventLoopFactory *const pi1 =
3184 event_loop_factory.GetNodeEventLoopFactory("pi1");
3185 const size_t pi1_index = configuration::GetNodeIndex(
3186 event_loop_factory.configuration(), pi1->node());
3187 NodeEventLoopFactory *const pi2 =
3188 event_loop_factory.GetNodeEventLoopFactory("pi2");
3189 const size_t pi2_index = configuration::GetNodeIndex(
3190 event_loop_factory.configuration(), pi2->node());
3191 NodeEventLoopFactory *const pi3 =
3192 event_loop_factory.GetNodeEventLoopFactory("pi3");
3193 const size_t pi3_index = configuration::GetNodeIndex(
3194 event_loop_factory.configuration(), pi3->node());
3195
3196 const std::string kLogfile1_1 =
3197 aos::testing::TestTmpDir() + "/multi_logfile1/";
3198 const std::string kLogfile2_1 =
3199 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3200 const std::string kLogfile2_2 =
3201 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3202 const std::string kLogfile3_1 =
3203 aos::testing::TestTmpDir() + "/multi_logfile3/";
3204 util::UnlinkRecursive(kLogfile1_1);
3205 util::UnlinkRecursive(kLogfile2_1);
3206 util::UnlinkRecursive(kLogfile2_2);
3207 util::UnlinkRecursive(kLogfile3_1);
3208 const UUID pi1_boot0 = UUID::Random();
3209 const UUID pi2_boot0 = UUID::Random();
3210 const UUID pi2_boot1 = UUID::Random();
3211 const UUID pi3_boot0 = UUID::Random();
3212 {
3213 CHECK_EQ(pi1_index, 0u);
3214 CHECK_EQ(pi2_index, 1u);
3215 CHECK_EQ(pi3_index, 2u);
3216
3217 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3218 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3219 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3220 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3221
3222 time_converter.AddNextTimestamp(
3223 distributed_clock::epoch(),
3224 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3225 BootTimestamp::epoch()});
3226 const chrono::nanoseconds reboot_time = chrono::milliseconds(20000);
3227 time_converter.AddNextTimestamp(
3228 distributed_clock::epoch() + reboot_time,
3229 {BootTimestamp::epoch() + reboot_time,
3230 BootTimestamp{
3231 .boot = 1,
3232 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)},
3233 BootTimestamp::epoch() + reboot_time});
3234 }
3235
3236 // Make everything perfectly quiet.
3237 event_loop_factory.SkipTimingReport();
3238 event_loop_factory.DisableStatistics();
3239
3240 std::vector<std::string> filenames;
3241 {
3242 LoggerState pi1_logger = MakeLoggerState(
3243 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3244 LoggerState pi3_logger = MakeLoggerState(
3245 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3246 {
3247 // And now start the logger.
3248 LoggerState pi2_logger = MakeLoggerState(
3249 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3250
3251 event_loop_factory.RunFor(chrono::milliseconds(1000));
3252
3253 pi1_logger.StartLogger(kLogfile1_1);
3254 pi3_logger.StartLogger(kLogfile3_1);
3255 pi2_logger.StartLogger(kLogfile2_1);
3256
3257 event_loop_factory.RunFor(chrono::milliseconds(10000));
3258
3259 // Now that we've got a start time in the past, turn on data.
3260 event_loop_factory.EnableStatistics();
3261 std::unique_ptr<aos::EventLoop> ping_event_loop =
3262 pi1->MakeEventLoop("ping");
3263 Ping ping(ping_event_loop.get());
3264
3265 pi2->AlwaysStart<Pong>("pong");
3266
3267 event_loop_factory.RunFor(chrono::milliseconds(3000));
3268
3269 pi2_logger.AppendAllFilenames(&filenames);
3270
3271 // Stop logging on pi2 before rebooting and completely shut off all
3272 // messages on pi2.
3273 pi2->DisableStatistics();
3274 pi1->Disconnect(pi2->node());
3275 pi2->Disconnect(pi1->node());
3276 }
3277 event_loop_factory.RunFor(chrono::milliseconds(7000));
3278 // pi2 now reboots.
3279 {
3280 event_loop_factory.RunFor(chrono::milliseconds(1000));
3281
3282 // Start logging again on pi2 after it is up.
3283 LoggerState pi2_logger = MakeLoggerState(
3284 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3285 pi2_logger.StartLogger(kLogfile2_2);
3286
3287 event_loop_factory.RunFor(chrono::milliseconds(10000));
3288 // And, now that we have a start time in the log, turn data back on.
3289 pi2->EnableStatistics();
3290 pi1->Connect(pi2->node());
3291 pi2->Connect(pi1->node());
3292
3293 pi2->AlwaysStart<Pong>("pong");
3294 std::unique_ptr<aos::EventLoop> ping_event_loop =
3295 pi1->MakeEventLoop("ping");
3296 Ping ping(ping_event_loop.get());
3297
3298 event_loop_factory.RunFor(chrono::milliseconds(3000));
3299
3300 pi2_logger.AppendAllFilenames(&filenames);
3301 }
3302
3303 pi1_logger.AppendAllFilenames(&filenames);
3304 pi3_logger.AppendAllFilenames(&filenames);
3305 }
3306
3307 // Confirm that we can parse the result. LogReader has enough internal CHECKs
3308 // to confirm the right thing happened.
3309 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003310 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003311 auto result = ConfirmReadable(filenames);
3312 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
3313 chrono::seconds(1)));
3314 EXPECT_THAT(result[0].second,
3315 ::testing::ElementsAre(realtime_clock::epoch() +
3316 chrono::microseconds(34990350)));
3317
3318 EXPECT_THAT(result[1].first,
3319 ::testing::ElementsAre(
3320 realtime_clock::epoch() + chrono::seconds(1),
3321 realtime_clock::epoch() + chrono::microseconds(3323000)));
3322 EXPECT_THAT(result[1].second,
3323 ::testing::ElementsAre(
3324 realtime_clock::epoch() + chrono::microseconds(13990200),
3325 realtime_clock::epoch() + chrono::microseconds(16313200)));
3326
3327 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
3328 chrono::seconds(1)));
3329 EXPECT_THAT(result[2].second,
3330 ::testing::ElementsAre(realtime_clock::epoch() +
3331 chrono::microseconds(34900150)));
3332}
3333
3334// Tests that local data before remote data after reboot is properly replayed.
3335// We only trigger a reboot in the timestamp interpolation function when solving
3336// the timestamp problem when we actually have a point in the function. This
3337// originally only happened when a point passes the noncausal filter. At the
3338// start of time for the second boot, if we aren't careful, we will have
3339// messages which need to be published at times before the boot. This happens
3340// when a local message is in the log before a forwarded message, so there is no
3341// point in the interpolation function. This delays the reboot. So, we need to
3342// recreate that situation and make sure it doesn't come back.
3343TEST(MultinodeRebootLoggerTest,
3344 LocalMessageBeforeRemoteBeforeStartAfterReboot) {
3345 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3346 aos::configuration::ReadConfig(ArtifactPath(
3347 "aos/events/logging/multinode_pingpong_split3_config.json"));
3348 message_bridge::TestingTimeConverter time_converter(
3349 configuration::NodesCount(&config.message()));
3350 SimulatedEventLoopFactory event_loop_factory(&config.message());
3351 event_loop_factory.SetTimeConverter(&time_converter);
3352 NodeEventLoopFactory *const pi1 =
3353 event_loop_factory.GetNodeEventLoopFactory("pi1");
3354 const size_t pi1_index = configuration::GetNodeIndex(
3355 event_loop_factory.configuration(), pi1->node());
3356 NodeEventLoopFactory *const pi2 =
3357 event_loop_factory.GetNodeEventLoopFactory("pi2");
3358 const size_t pi2_index = configuration::GetNodeIndex(
3359 event_loop_factory.configuration(), pi2->node());
3360 NodeEventLoopFactory *const pi3 =
3361 event_loop_factory.GetNodeEventLoopFactory("pi3");
3362 const size_t pi3_index = configuration::GetNodeIndex(
3363 event_loop_factory.configuration(), pi3->node());
3364
3365 const std::string kLogfile1_1 =
3366 aos::testing::TestTmpDir() + "/multi_logfile1/";
3367 const std::string kLogfile2_1 =
3368 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3369 const std::string kLogfile2_2 =
3370 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3371 const std::string kLogfile3_1 =
3372 aos::testing::TestTmpDir() + "/multi_logfile3/";
3373 util::UnlinkRecursive(kLogfile1_1);
3374 util::UnlinkRecursive(kLogfile2_1);
3375 util::UnlinkRecursive(kLogfile2_2);
3376 util::UnlinkRecursive(kLogfile3_1);
3377 const UUID pi1_boot0 = UUID::Random();
3378 const UUID pi2_boot0 = UUID::Random();
3379 const UUID pi2_boot1 = UUID::Random();
3380 const UUID pi3_boot0 = UUID::Random();
3381 {
3382 CHECK_EQ(pi1_index, 0u);
3383 CHECK_EQ(pi2_index, 1u);
3384 CHECK_EQ(pi3_index, 2u);
3385
3386 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3387 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3388 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3389 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3390
3391 time_converter.AddNextTimestamp(
3392 distributed_clock::epoch(),
3393 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3394 BootTimestamp::epoch()});
3395 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3396 time_converter.AddNextTimestamp(
3397 distributed_clock::epoch() + reboot_time,
3398 {BootTimestamp::epoch() + reboot_time,
3399 BootTimestamp{.boot = 1,
3400 .time = monotonic_clock::epoch() + reboot_time +
3401 chrono::seconds(100)},
3402 BootTimestamp::epoch() + reboot_time});
3403 }
3404
3405 std::vector<std::string> filenames;
3406 {
3407 LoggerState pi1_logger = MakeLoggerState(
3408 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3409 LoggerState pi3_logger = MakeLoggerState(
3410 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3411 {
3412 // And now start the logger.
3413 LoggerState pi2_logger = MakeLoggerState(
3414 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3415
3416 pi1_logger.StartLogger(kLogfile1_1);
3417 pi3_logger.StartLogger(kLogfile3_1);
3418 pi2_logger.StartLogger(kLogfile2_1);
3419
3420 event_loop_factory.RunFor(chrono::milliseconds(1005));
3421
3422 // Now that we've got a start time in the past, turn on data.
3423 std::unique_ptr<aos::EventLoop> ping_event_loop =
3424 pi1->MakeEventLoop("ping");
3425 Ping ping(ping_event_loop.get());
3426
3427 pi2->AlwaysStart<Pong>("pong");
3428
3429 event_loop_factory.RunFor(chrono::milliseconds(3000));
3430
3431 pi2_logger.AppendAllFilenames(&filenames);
3432
3433 // Disable any remote messages on pi2.
3434 pi1->Disconnect(pi2->node());
3435 pi2->Disconnect(pi1->node());
3436 }
3437 event_loop_factory.RunFor(chrono::milliseconds(995));
3438 // pi2 now reboots at 5 seconds.
3439 {
3440 event_loop_factory.RunFor(chrono::milliseconds(1000));
3441
3442 // Make local stuff happen before we start logging and connect the remote.
3443 pi2->AlwaysStart<Pong>("pong");
3444 std::unique_ptr<aos::EventLoop> ping_event_loop =
3445 pi1->MakeEventLoop("ping");
3446 Ping ping(ping_event_loop.get());
3447 event_loop_factory.RunFor(chrono::milliseconds(1005));
3448
3449 // Start logging again on pi2 after it is up.
3450 LoggerState pi2_logger = MakeLoggerState(
3451 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3452 pi2_logger.StartLogger(kLogfile2_2);
3453
3454 // And allow remote messages now that we have some local ones.
3455 pi1->Connect(pi2->node());
3456 pi2->Connect(pi1->node());
3457
3458 event_loop_factory.RunFor(chrono::milliseconds(1000));
3459
3460 event_loop_factory.RunFor(chrono::milliseconds(3000));
3461
3462 pi2_logger.AppendAllFilenames(&filenames);
3463 }
3464
3465 pi1_logger.AppendAllFilenames(&filenames);
3466 pi3_logger.AppendAllFilenames(&filenames);
3467 }
3468
3469 // Confirm that we can parse the result. LogReader has enough internal CHECKs
3470 // to confirm the right thing happened.
3471 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003472 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003473 auto result = ConfirmReadable(filenames);
3474
3475 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3476 EXPECT_THAT(result[0].second,
3477 ::testing::ElementsAre(realtime_clock::epoch() +
3478 chrono::microseconds(11000350)));
3479
3480 EXPECT_THAT(result[1].first,
3481 ::testing::ElementsAre(
3482 realtime_clock::epoch(),
3483 realtime_clock::epoch() + chrono::microseconds(107005000)));
3484 EXPECT_THAT(result[1].second,
3485 ::testing::ElementsAre(
3486 realtime_clock::epoch() + chrono::microseconds(4000150),
3487 realtime_clock::epoch() + chrono::microseconds(111000200)));
3488
3489 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3490 EXPECT_THAT(result[2].second,
3491 ::testing::ElementsAre(realtime_clock::epoch() +
3492 chrono::microseconds(11000150)));
3493
3494 auto start_stop_result = ConfirmReadable(
3495 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3496 realtime_clock::epoch() + chrono::milliseconds(3000));
3497
3498 EXPECT_THAT(
3499 start_stop_result[0].first,
3500 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3501 EXPECT_THAT(
3502 start_stop_result[0].second,
3503 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3504 EXPECT_THAT(
3505 start_stop_result[1].first,
3506 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3507 EXPECT_THAT(
3508 start_stop_result[1].second,
3509 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3510 EXPECT_THAT(
3511 start_stop_result[2].first,
3512 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3513 EXPECT_THAT(
3514 start_stop_result[2].second,
3515 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3516}
3517
3518// Tests that setting the start and stop flags across a reboot works as
3519// expected.
3520TEST(MultinodeRebootLoggerTest, RebootStartStopTimes) {
3521 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3522 aos::configuration::ReadConfig(ArtifactPath(
3523 "aos/events/logging/multinode_pingpong_split3_config.json"));
3524 message_bridge::TestingTimeConverter time_converter(
3525 configuration::NodesCount(&config.message()));
3526 SimulatedEventLoopFactory event_loop_factory(&config.message());
3527 event_loop_factory.SetTimeConverter(&time_converter);
3528 NodeEventLoopFactory *const pi1 =
3529 event_loop_factory.GetNodeEventLoopFactory("pi1");
3530 const size_t pi1_index = configuration::GetNodeIndex(
3531 event_loop_factory.configuration(), pi1->node());
3532 NodeEventLoopFactory *const pi2 =
3533 event_loop_factory.GetNodeEventLoopFactory("pi2");
3534 const size_t pi2_index = configuration::GetNodeIndex(
3535 event_loop_factory.configuration(), pi2->node());
3536 NodeEventLoopFactory *const pi3 =
3537 event_loop_factory.GetNodeEventLoopFactory("pi3");
3538 const size_t pi3_index = configuration::GetNodeIndex(
3539 event_loop_factory.configuration(), pi3->node());
3540
3541 const std::string kLogfile1_1 =
3542 aos::testing::TestTmpDir() + "/multi_logfile1/";
3543 const std::string kLogfile2_1 =
3544 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3545 const std::string kLogfile2_2 =
3546 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3547 const std::string kLogfile3_1 =
3548 aos::testing::TestTmpDir() + "/multi_logfile3/";
3549 util::UnlinkRecursive(kLogfile1_1);
3550 util::UnlinkRecursive(kLogfile2_1);
3551 util::UnlinkRecursive(kLogfile2_2);
3552 util::UnlinkRecursive(kLogfile3_1);
3553 {
3554 CHECK_EQ(pi1_index, 0u);
3555 CHECK_EQ(pi2_index, 1u);
3556 CHECK_EQ(pi3_index, 2u);
3557
3558 time_converter.AddNextTimestamp(
3559 distributed_clock::epoch(),
3560 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3561 BootTimestamp::epoch()});
3562 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3563 time_converter.AddNextTimestamp(
3564 distributed_clock::epoch() + reboot_time,
3565 {BootTimestamp::epoch() + reboot_time,
3566 BootTimestamp{.boot = 1,
3567 .time = monotonic_clock::epoch() + reboot_time},
3568 BootTimestamp::epoch() + reboot_time});
3569 }
3570
3571 std::vector<std::string> filenames;
3572 {
3573 LoggerState pi1_logger = MakeLoggerState(
3574 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3575 LoggerState pi3_logger = MakeLoggerState(
3576 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3577 {
3578 // And now start the logger.
3579 LoggerState pi2_logger = MakeLoggerState(
3580 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3581
3582 pi1_logger.StartLogger(kLogfile1_1);
3583 pi3_logger.StartLogger(kLogfile3_1);
3584 pi2_logger.StartLogger(kLogfile2_1);
3585
3586 event_loop_factory.RunFor(chrono::milliseconds(1005));
3587
3588 // Now that we've got a start time in the past, turn on data.
3589 std::unique_ptr<aos::EventLoop> ping_event_loop =
3590 pi1->MakeEventLoop("ping");
3591 Ping ping(ping_event_loop.get());
3592
3593 pi2->AlwaysStart<Pong>("pong");
3594
3595 event_loop_factory.RunFor(chrono::milliseconds(3000));
3596
3597 pi2_logger.AppendAllFilenames(&filenames);
3598 }
3599 event_loop_factory.RunFor(chrono::milliseconds(995));
3600 // pi2 now reboots at 5 seconds.
3601 {
3602 event_loop_factory.RunFor(chrono::milliseconds(1000));
3603
3604 // Make local stuff happen before we start logging and connect the remote.
3605 pi2->AlwaysStart<Pong>("pong");
3606 std::unique_ptr<aos::EventLoop> ping_event_loop =
3607 pi1->MakeEventLoop("ping");
3608 Ping ping(ping_event_loop.get());
3609 event_loop_factory.RunFor(chrono::milliseconds(5));
3610
3611 // Start logging again on pi2 after it is up.
3612 LoggerState pi2_logger = MakeLoggerState(
3613 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3614 pi2_logger.StartLogger(kLogfile2_2);
3615
3616 event_loop_factory.RunFor(chrono::milliseconds(5000));
3617
3618 pi2_logger.AppendAllFilenames(&filenames);
3619 }
3620
3621 pi1_logger.AppendAllFilenames(&filenames);
3622 pi3_logger.AppendAllFilenames(&filenames);
3623 }
3624
3625 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003626 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003627 auto result = ConfirmReadable(filenames);
3628
3629 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3630 EXPECT_THAT(result[0].second,
3631 ::testing::ElementsAre(realtime_clock::epoch() +
3632 chrono::microseconds(11000350)));
3633
3634 EXPECT_THAT(result[1].first,
3635 ::testing::ElementsAre(
3636 realtime_clock::epoch(),
3637 realtime_clock::epoch() + chrono::microseconds(6005000)));
3638 EXPECT_THAT(result[1].second,
3639 ::testing::ElementsAre(
3640 realtime_clock::epoch() + chrono::microseconds(4900150),
3641 realtime_clock::epoch() + chrono::microseconds(11000200)));
3642
3643 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3644 EXPECT_THAT(result[2].second,
3645 ::testing::ElementsAre(realtime_clock::epoch() +
3646 chrono::microseconds(11000150)));
3647
3648 // Confirm we observed the correct start and stop times. We should see the
3649 // reboot here.
3650 auto start_stop_result = ConfirmReadable(
3651 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3652 realtime_clock::epoch() + chrono::milliseconds(8000));
3653
3654 EXPECT_THAT(
3655 start_stop_result[0].first,
3656 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3657 EXPECT_THAT(
3658 start_stop_result[0].second,
3659 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3660 EXPECT_THAT(start_stop_result[1].first,
3661 ::testing::ElementsAre(
3662 realtime_clock::epoch() + chrono::seconds(2),
3663 realtime_clock::epoch() + chrono::microseconds(6005000)));
3664 EXPECT_THAT(start_stop_result[1].second,
3665 ::testing::ElementsAre(
3666 realtime_clock::epoch() + chrono::microseconds(4900150),
3667 realtime_clock::epoch() + chrono::seconds(8)));
3668 EXPECT_THAT(
3669 start_stop_result[2].first,
3670 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3671 EXPECT_THAT(
3672 start_stop_result[2].second,
3673 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3674}
3675
3676// Tests that we properly handle one direction being down.
3677TEST(MissingDirectionTest, OneDirection) {
3678 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3679 aos::configuration::ReadConfig(ArtifactPath(
3680 "aos/events/logging/multinode_pingpong_split4_config.json"));
3681 message_bridge::TestingTimeConverter time_converter(
3682 configuration::NodesCount(&config.message()));
3683 SimulatedEventLoopFactory event_loop_factory(&config.message());
3684 event_loop_factory.SetTimeConverter(&time_converter);
3685
3686 NodeEventLoopFactory *const pi1 =
3687 event_loop_factory.GetNodeEventLoopFactory("pi1");
3688 const size_t pi1_index = configuration::GetNodeIndex(
3689 event_loop_factory.configuration(), pi1->node());
3690 NodeEventLoopFactory *const pi2 =
3691 event_loop_factory.GetNodeEventLoopFactory("pi2");
3692 const size_t pi2_index = configuration::GetNodeIndex(
3693 event_loop_factory.configuration(), pi2->node());
3694 std::vector<std::string> filenames;
3695
3696 {
3697 CHECK_EQ(pi1_index, 0u);
3698 CHECK_EQ(pi2_index, 1u);
3699
3700 time_converter.AddNextTimestamp(
3701 distributed_clock::epoch(),
3702 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3703
3704 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3705 time_converter.AddNextTimestamp(
3706 distributed_clock::epoch() + reboot_time,
3707 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3708 BootTimestamp::epoch() + reboot_time});
3709 }
3710
3711 const std::string kLogfile2_1 =
3712 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3713 const std::string kLogfile1_1 =
3714 aos::testing::TestTmpDir() + "/multi_logfile1.1/";
3715 util::UnlinkRecursive(kLogfile2_1);
3716 util::UnlinkRecursive(kLogfile1_1);
3717
3718 pi2->Disconnect(pi1->node());
3719
3720 pi1->AlwaysStart<Ping>("ping");
3721 pi2->AlwaysStart<Pong>("pong");
3722
3723 {
3724 LoggerState pi2_logger = MakeLoggerState(
3725 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3726
3727 event_loop_factory.RunFor(chrono::milliseconds(95));
3728
3729 pi2_logger.StartLogger(kLogfile2_1);
3730
3731 event_loop_factory.RunFor(chrono::milliseconds(6000));
3732
3733 pi2->Connect(pi1->node());
3734
3735 LoggerState pi1_logger = MakeLoggerState(
3736 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3737 pi1_logger.StartLogger(kLogfile1_1);
3738
3739 event_loop_factory.RunFor(chrono::milliseconds(5000));
3740 pi1_logger.AppendAllFilenames(&filenames);
3741 pi2_logger.AppendAllFilenames(&filenames);
3742 }
3743
3744 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003745 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003746 ConfirmReadable(filenames);
3747}
3748
3749// Tests that we properly handle only one direction ever existing after a
3750// reboot.
3751TEST(MissingDirectionTest, OneDirectionAfterReboot) {
3752 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3753 aos::configuration::ReadConfig(ArtifactPath(
3754 "aos/events/logging/multinode_pingpong_split4_config.json"));
3755 message_bridge::TestingTimeConverter time_converter(
3756 configuration::NodesCount(&config.message()));
3757 SimulatedEventLoopFactory event_loop_factory(&config.message());
3758 event_loop_factory.SetTimeConverter(&time_converter);
3759
3760 NodeEventLoopFactory *const pi1 =
3761 event_loop_factory.GetNodeEventLoopFactory("pi1");
3762 const size_t pi1_index = configuration::GetNodeIndex(
3763 event_loop_factory.configuration(), pi1->node());
3764 NodeEventLoopFactory *const pi2 =
3765 event_loop_factory.GetNodeEventLoopFactory("pi2");
3766 const size_t pi2_index = configuration::GetNodeIndex(
3767 event_loop_factory.configuration(), pi2->node());
3768 std::vector<std::string> filenames;
3769
3770 {
3771 CHECK_EQ(pi1_index, 0u);
3772 CHECK_EQ(pi2_index, 1u);
3773
3774 time_converter.AddNextTimestamp(
3775 distributed_clock::epoch(),
3776 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3777
3778 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3779 time_converter.AddNextTimestamp(
3780 distributed_clock::epoch() + reboot_time,
3781 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3782 BootTimestamp::epoch() + reboot_time});
3783 }
3784
3785 const std::string kLogfile2_1 =
3786 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3787 util::UnlinkRecursive(kLogfile2_1);
3788
3789 pi1->AlwaysStart<Ping>("ping");
3790
3791 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3792 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3793 // second boot.
3794 {
3795 LoggerState pi2_logger = MakeLoggerState(
3796 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3797
3798 event_loop_factory.RunFor(chrono::milliseconds(95));
3799
3800 pi2_logger.StartLogger(kLogfile2_1);
3801
3802 event_loop_factory.RunFor(chrono::milliseconds(4000));
3803
3804 pi2->Disconnect(pi1->node());
3805
3806 event_loop_factory.RunFor(chrono::milliseconds(1000));
3807 pi1->AlwaysStart<Ping>("ping");
3808
3809 event_loop_factory.RunFor(chrono::milliseconds(5000));
3810 pi2_logger.AppendAllFilenames(&filenames);
3811 }
3812
3813 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003814 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003815 ConfirmReadable(filenames);
3816}
3817
3818// Tests that we properly handle only one direction ever existing after a reboot
3819// with only reliable data.
3820TEST(MissingDirectionTest, OneDirectionAfterRebootReliable) {
3821 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3822 aos::configuration::ReadConfig(ArtifactPath(
3823 "aos/events/logging/multinode_pingpong_split4_reliable_config.json"));
3824 message_bridge::TestingTimeConverter time_converter(
3825 configuration::NodesCount(&config.message()));
3826 SimulatedEventLoopFactory event_loop_factory(&config.message());
3827 event_loop_factory.SetTimeConverter(&time_converter);
3828
3829 NodeEventLoopFactory *const pi1 =
3830 event_loop_factory.GetNodeEventLoopFactory("pi1");
3831 const size_t pi1_index = configuration::GetNodeIndex(
3832 event_loop_factory.configuration(), pi1->node());
3833 NodeEventLoopFactory *const pi2 =
3834 event_loop_factory.GetNodeEventLoopFactory("pi2");
3835 const size_t pi2_index = configuration::GetNodeIndex(
3836 event_loop_factory.configuration(), pi2->node());
3837 std::vector<std::string> filenames;
3838
3839 {
3840 CHECK_EQ(pi1_index, 0u);
3841 CHECK_EQ(pi2_index, 1u);
3842
3843 time_converter.AddNextTimestamp(
3844 distributed_clock::epoch(),
3845 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3846
3847 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3848 time_converter.AddNextTimestamp(
3849 distributed_clock::epoch() + reboot_time,
3850 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3851 BootTimestamp::epoch() + reboot_time});
3852 }
3853
3854 const std::string kLogfile2_1 =
3855 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3856 util::UnlinkRecursive(kLogfile2_1);
3857
3858 pi1->AlwaysStart<Ping>("ping");
3859
3860 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3861 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3862 // second boot.
3863 {
3864 LoggerState pi2_logger = MakeLoggerState(
3865 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3866
3867 event_loop_factory.RunFor(chrono::milliseconds(95));
3868
3869 pi2_logger.StartLogger(kLogfile2_1);
3870
3871 event_loop_factory.RunFor(chrono::milliseconds(4000));
3872
3873 pi2->Disconnect(pi1->node());
3874
3875 event_loop_factory.RunFor(chrono::milliseconds(1000));
3876 pi1->AlwaysStart<Ping>("ping");
3877
3878 event_loop_factory.RunFor(chrono::milliseconds(5000));
3879 pi2_logger.AppendAllFilenames(&filenames);
3880 }
3881
3882 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003883 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003884 ConfirmReadable(filenames);
3885}
3886
Brian Smartte67d7112023-03-20 12:06:30 -07003887// Tests that we properly handle only one direction ever existing after a reboot
3888// with mixed unreliable vs reliable, where reliable has an earlier timestamp
3889// than unreliable.
3890TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase1) {
3891 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3892 aos::configuration::ReadConfig(ArtifactPath(
3893 "aos/events/logging/multinode_pingpong_split4_mixed1_config.json"));
3894 message_bridge::TestingTimeConverter time_converter(
3895 configuration::NodesCount(&config.message()));
3896 SimulatedEventLoopFactory event_loop_factory(&config.message());
3897 event_loop_factory.SetTimeConverter(&time_converter);
3898
3899 NodeEventLoopFactory *const pi1 =
3900 event_loop_factory.GetNodeEventLoopFactory("pi1");
3901 const size_t pi1_index = configuration::GetNodeIndex(
3902 event_loop_factory.configuration(), pi1->node());
3903 NodeEventLoopFactory *const pi2 =
3904 event_loop_factory.GetNodeEventLoopFactory("pi2");
3905 const size_t pi2_index = configuration::GetNodeIndex(
3906 event_loop_factory.configuration(), pi2->node());
3907 std::vector<std::string> filenames;
3908
3909 {
3910 CHECK_EQ(pi1_index, 0u);
3911 CHECK_EQ(pi2_index, 1u);
3912
3913 time_converter.AddNextTimestamp(
3914 distributed_clock::epoch(),
3915 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3916
3917 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3918 time_converter.AddNextTimestamp(
3919 distributed_clock::epoch() + reboot_time,
3920 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3921 BootTimestamp::epoch() + reboot_time});
3922 }
3923
3924 const std::string kLogfile2_1 =
3925 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3926 util::UnlinkRecursive(kLogfile2_1);
3927
3928 // The following sequence using the above reference config creates
3929 // a reliable message timestamp < unreliable message timestamp.
3930 {
3931 pi1->DisableStatistics();
3932 pi2->DisableStatistics();
3933
3934 event_loop_factory.RunFor(chrono::milliseconds(95));
3935
3936 pi1->AlwaysStart<Ping>("ping");
3937
3938 event_loop_factory.RunFor(chrono::milliseconds(5250));
3939
3940 pi1->EnableStatistics();
3941
3942 event_loop_factory.RunFor(chrono::milliseconds(1000));
3943
3944 LoggerState pi2_logger = MakeLoggerState(
3945 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3946
3947 pi2_logger.StartLogger(kLogfile2_1);
3948
3949 event_loop_factory.RunFor(chrono::milliseconds(5000));
3950 pi2_logger.AppendAllFilenames(&filenames);
3951 }
3952
3953 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003954 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Brian Smartte67d7112023-03-20 12:06:30 -07003955 ConfirmReadable(filenames);
3956}
3957
3958// Tests that we properly handle only one direction ever existing after a reboot
3959// with mixed unreliable vs reliable, where unreliable has an earlier timestamp
3960// than reliable.
3961TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase2) {
3962 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3963 aos::configuration::ReadConfig(ArtifactPath(
3964 "aos/events/logging/multinode_pingpong_split4_mixed2_config.json"));
3965 message_bridge::TestingTimeConverter time_converter(
3966 configuration::NodesCount(&config.message()));
3967 SimulatedEventLoopFactory event_loop_factory(&config.message());
3968 event_loop_factory.SetTimeConverter(&time_converter);
3969
3970 NodeEventLoopFactory *const pi1 =
3971 event_loop_factory.GetNodeEventLoopFactory("pi1");
3972 const size_t pi1_index = configuration::GetNodeIndex(
3973 event_loop_factory.configuration(), pi1->node());
3974 NodeEventLoopFactory *const pi2 =
3975 event_loop_factory.GetNodeEventLoopFactory("pi2");
3976 const size_t pi2_index = configuration::GetNodeIndex(
3977 event_loop_factory.configuration(), pi2->node());
3978 std::vector<std::string> filenames;
3979
3980 {
3981 CHECK_EQ(pi1_index, 0u);
3982 CHECK_EQ(pi2_index, 1u);
3983
3984 time_converter.AddNextTimestamp(
3985 distributed_clock::epoch(),
3986 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3987
3988 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3989 time_converter.AddNextTimestamp(
3990 distributed_clock::epoch() + reboot_time,
3991 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3992 BootTimestamp::epoch() + reboot_time});
3993 }
3994
3995 const std::string kLogfile2_1 =
3996 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3997 util::UnlinkRecursive(kLogfile2_1);
3998
3999 // The following sequence using the above reference config creates
4000 // an unreliable message timestamp < reliable message timestamp.
4001 {
4002 pi1->DisableStatistics();
4003 pi2->DisableStatistics();
4004
4005 event_loop_factory.RunFor(chrono::milliseconds(95));
4006
4007 pi1->AlwaysStart<Ping>("ping");
4008
4009 event_loop_factory.RunFor(chrono::milliseconds(5250));
4010
4011 pi1->EnableStatistics();
4012
4013 event_loop_factory.RunFor(chrono::milliseconds(1000));
4014
4015 LoggerState pi2_logger = MakeLoggerState(
4016 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4017
4018 pi2_logger.StartLogger(kLogfile2_1);
4019
4020 event_loop_factory.RunFor(chrono::milliseconds(5000));
4021 pi2_logger.AppendAllFilenames(&filenames);
4022 }
4023
4024 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004025 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Brian Smartte67d7112023-03-20 12:06:30 -07004026 ConfirmReadable(filenames);
4027}
4028
Naman Guptaa63aa132023-03-22 20:06:34 -07004029// Tests that we properly handle what used to be a time violation in one
4030// direction. This can occur when one direction goes down after sending some
4031// data, but the other keeps working. The down direction ends up resolving to a
4032// straight line in the noncausal filter, where the direction which is still up
4033// can cross that line. Really, time progressed along just fine but we assumed
4034// that the offset was a line when it could have deviated by up to 1ms/second.
4035TEST_P(MultinodeLoggerTest, OneDirectionTimeDrift) {
4036 std::vector<std::string> filenames;
4037
4038 CHECK_EQ(pi1_index_, 0u);
4039 CHECK_EQ(pi2_index_, 1u);
4040
4041 time_converter_.AddNextTimestamp(
4042 distributed_clock::epoch(),
4043 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4044
4045 const chrono::nanoseconds before_disconnect_duration =
4046 time_converter_.AddMonotonic(
4047 {chrono::milliseconds(1000), chrono::milliseconds(1000)});
4048
4049 const chrono::nanoseconds test_duration =
4050 time_converter_.AddMonotonic(
4051 {chrono::milliseconds(1000), chrono::milliseconds(1000)}) +
4052 time_converter_.AddMonotonic(
4053 {chrono::milliseconds(10000),
4054 chrono::milliseconds(10000) - chrono::milliseconds(5)}) +
4055 time_converter_.AddMonotonic(
4056 {chrono::milliseconds(10000),
4057 chrono::milliseconds(10000) + chrono::milliseconds(5)});
4058
4059 const std::string kLogfile =
4060 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
4061 util::UnlinkRecursive(kLogfile);
4062
4063 {
4064 LoggerState pi2_logger = MakeLogger(pi2_);
4065 pi2_logger.StartLogger(kLogfile);
4066 event_loop_factory_.RunFor(before_disconnect_duration);
4067
4068 pi2_->Disconnect(pi1_->node());
4069
4070 event_loop_factory_.RunFor(test_duration);
4071 pi2_->Connect(pi1_->node());
4072
4073 event_loop_factory_.RunFor(chrono::milliseconds(5000));
4074 pi2_logger.AppendAllFilenames(&filenames);
4075 }
4076
4077 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004078 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004079 ConfirmReadable(filenames);
4080}
4081
4082// Tests that we can replay a logfile that has timestamps such that at least one
4083// node's epoch is at a positive distributed_clock (and thus will have to be
4084// booted after the other node(s)).
4085TEST_P(MultinodeLoggerTest, StartOneNodeBeforeOther) {
4086 std::vector<std::string> filenames;
4087
4088 CHECK_EQ(pi1_index_, 0u);
4089 CHECK_EQ(pi2_index_, 1u);
4090
4091 time_converter_.AddNextTimestamp(
4092 distributed_clock::epoch(),
4093 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4094
4095 const chrono::nanoseconds before_reboot_duration = chrono::milliseconds(1000);
4096 time_converter_.RebootAt(
4097 0, distributed_clock::time_point(before_reboot_duration));
4098
4099 const chrono::nanoseconds test_duration = time_converter_.AddMonotonic(
4100 {chrono::milliseconds(10000), chrono::milliseconds(10000)});
4101
4102 const std::string kLogfile =
4103 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
4104 util::UnlinkRecursive(kLogfile);
4105
4106 pi2_->Disconnect(pi1_->node());
4107 pi1_->Disconnect(pi2_->node());
4108
4109 {
4110 LoggerState pi2_logger = MakeLogger(pi2_);
4111
4112 pi2_logger.StartLogger(kLogfile);
4113 event_loop_factory_.RunFor(before_reboot_duration);
4114
4115 pi2_->Connect(pi1_->node());
4116 pi1_->Connect(pi2_->node());
4117
4118 event_loop_factory_.RunFor(test_duration);
4119
4120 pi2_logger.AppendAllFilenames(&filenames);
4121 }
4122
4123 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004124 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004125 ConfirmReadable(filenames);
4126
4127 {
4128 LogReader reader(sorted_parts);
4129 SimulatedEventLoopFactory replay_factory(reader.configuration());
4130 reader.RegisterWithoutStarting(&replay_factory);
4131
4132 NodeEventLoopFactory *const replay_node =
4133 reader.event_loop_factory()->GetNodeEventLoopFactory("pi1");
4134
4135 std::unique_ptr<EventLoop> test_event_loop =
4136 replay_node->MakeEventLoop("test_reader");
4137 replay_node->OnStartup([replay_node]() {
4138 // Check that we didn't boot until at least t=0.
4139 CHECK_LE(monotonic_clock::epoch(), replay_node->monotonic_now());
4140 });
4141 test_event_loop->OnRun([&test_event_loop]() {
4142 // Check that we didn't boot until at least t=0.
4143 EXPECT_LE(monotonic_clock::epoch(), test_event_loop->monotonic_now());
4144 });
4145 reader.event_loop_factory()->Run();
4146 reader.Deregister();
4147 }
4148}
4149
4150// Tests that when we have a loop without all the logs at all points in time, we
4151// can sort it properly.
4152TEST(MultinodeLoggerLoopTest, Loop) {
4153 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4154 aos::configuration::ReadConfig(ArtifactPath(
4155 "aos/events/logging/multinode_pingpong_triangle_split_config.json"));
4156 message_bridge::TestingTimeConverter time_converter(
4157 configuration::NodesCount(&config.message()));
4158 SimulatedEventLoopFactory event_loop_factory(&config.message());
4159 event_loop_factory.SetTimeConverter(&time_converter);
4160
4161 NodeEventLoopFactory *const pi1 =
4162 event_loop_factory.GetNodeEventLoopFactory("pi1");
4163 NodeEventLoopFactory *const pi2 =
4164 event_loop_factory.GetNodeEventLoopFactory("pi2");
4165 NodeEventLoopFactory *const pi3 =
4166 event_loop_factory.GetNodeEventLoopFactory("pi3");
4167
4168 const std::string kLogfile1_1 =
4169 aos::testing::TestTmpDir() + "/multi_logfile1/";
4170 const std::string kLogfile2_1 =
4171 aos::testing::TestTmpDir() + "/multi_logfile2/";
4172 const std::string kLogfile3_1 =
4173 aos::testing::TestTmpDir() + "/multi_logfile3/";
4174 util::UnlinkRecursive(kLogfile1_1);
4175 util::UnlinkRecursive(kLogfile2_1);
4176 util::UnlinkRecursive(kLogfile3_1);
4177
4178 {
4179 // Make pi1 boot before everything else.
4180 time_converter.AddNextTimestamp(
4181 distributed_clock::epoch(),
4182 {BootTimestamp::epoch(),
4183 BootTimestamp::epoch() - chrono::milliseconds(100),
4184 BootTimestamp::epoch() - chrono::milliseconds(300)});
4185 }
4186
4187 // We want to setup a situation such that 2 of the 3 legs of the loop are very
4188 // confident about time being X, and the third leg is pulling the average off
4189 // to one side.
4190 //
4191 // It's easiest to visualize this in timestamp_plotter.
4192
4193 std::vector<std::string> filenames;
4194 {
4195 // Have pi1 send out a reliable message at startup. This sets up a long
4196 // forwarding time message at the start to bias time.
4197 std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
4198 {
4199 aos::Sender<examples::Ping> ping_sender =
4200 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
4201
4202 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
4203 examples::Ping::Builder ping_builder =
4204 builder.MakeBuilder<examples::Ping>();
4205 CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
4206 }
4207
4208 // Wait a while so there's enough data to let the worst case be rather off.
4209 event_loop_factory.RunFor(chrono::seconds(1000));
4210
4211 // Now start a receiving node first. This sets up 2 tight bounds between 2
4212 // of the nodes.
4213 LoggerState pi2_logger = MakeLoggerState(
4214 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4215 pi2_logger.StartLogger(kLogfile2_1);
4216
4217 event_loop_factory.RunFor(chrono::seconds(100));
4218
4219 // And now start the third leg.
4220 LoggerState pi3_logger = MakeLoggerState(
4221 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4222 pi3_logger.StartLogger(kLogfile3_1);
4223
4224 LoggerState pi1_logger = MakeLoggerState(
4225 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4226 pi1_logger.StartLogger(kLogfile1_1);
4227
4228 event_loop_factory.RunFor(chrono::seconds(100));
4229
4230 pi1_logger.AppendAllFilenames(&filenames);
4231 pi2_logger.AppendAllFilenames(&filenames);
4232 pi3_logger.AppendAllFilenames(&filenames);
4233 }
4234
4235 // Make sure we can read this.
4236 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004237 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004238 auto result = ConfirmReadable(filenames);
4239}
4240
Austin Schuh08dba8f2023-05-01 08:29:30 -07004241// Tests that RestartLogging works in the simple case. Unfortunately, the
4242// failure cases involve simulating time elapsing in callbacks, which is really
4243// hard. The best we can reasonably do is make sure 2 back to back logs are
4244// parseable together.
4245TEST_P(MultinodeLoggerTest, RestartLogging) {
4246 time_converter_.AddMonotonic(
4247 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
4248 std::vector<std::string> filenames;
4249 {
4250 LoggerState pi1_logger = MakeLogger(pi1_);
4251
4252 event_loop_factory_.RunFor(chrono::milliseconds(95));
4253
4254 StartLogger(&pi1_logger, logfile_base1_);
4255 aos::monotonic_clock::time_point last_rotation_time =
4256 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07004257 pi1_logger.logger->set_on_logged_period(
4258 [&](aos::monotonic_clock::time_point) {
4259 const auto now = pi1_logger.event_loop->monotonic_now();
4260 if (now > last_rotation_time + std::chrono::seconds(5)) {
4261 pi1_logger.AppendAllFilenames(&filenames);
4262 std::unique_ptr<MultiNodeFilesLogNamer> namer =
4263 pi1_logger.MakeLogNamer(logfile_base2_);
4264 pi1_logger.log_namer = namer.get();
Austin Schuh08dba8f2023-05-01 08:29:30 -07004265
Austin Schuh2f864452023-07-17 14:53:08 -07004266 pi1_logger.logger->RestartLogging(std::move(namer));
4267 last_rotation_time = now;
4268 }
4269 });
Austin Schuh08dba8f2023-05-01 08:29:30 -07004270
4271 event_loop_factory_.RunFor(chrono::milliseconds(7000));
4272
4273 pi1_logger.AppendAllFilenames(&filenames);
4274 }
4275
4276 for (const auto &x : filenames) {
4277 LOG(INFO) << x;
4278 }
4279
4280 EXPECT_GE(filenames.size(), 2u);
4281
4282 ConfirmReadable(filenames);
4283
4284 // TODO(austin): It would be good to confirm that any one time messages end up
4285 // in both logs correctly.
4286}
4287
Naman Guptaa63aa132023-03-22 20:06:34 -07004288} // namespace testing
4289} // namespace logger
4290} // namespace aos