blob: 2073a5aba8205c9830b5b640e91dfcf63119ffad [file] [log] [blame]
Brian Smartte67d7112023-03-20 12:06:30 -07001#include <algorithm>
2
Philipp Schrader790cb542023-07-05 21:06:52 -07003#include "gmock/gmock.h"
4#include "gtest/gtest.h"
5
Naman Guptaa63aa132023-03-22 20:06:34 -07006#include "aos/events/logging/log_reader.h"
7#include "aos/events/logging/multinode_logger_test_lib.h"
8#include "aos/events/message_counter.h"
9#include "aos/events/ping_lib.h"
10#include "aos/events/pong_lib.h"
11#include "aos/network/remote_message_generated.h"
12#include "aos/network/timestamp_generated.h"
13#include "aos/testing/tmpdir.h"
Naman Guptaa63aa132023-03-22 20:06:34 -070014
15namespace aos {
16namespace logger {
17namespace testing {
18
19namespace chrono = std::chrono;
20using aos::message_bridge::RemoteMessage;
21using aos::testing::ArtifactPath;
22using aos::testing::MessageCounter;
23
Naman Guptaa63aa132023-03-22 20:06:34 -070024INSTANTIATE_TEST_SUITE_P(
25 All, MultinodeLoggerTest,
26 ::testing::Combine(
27 ::testing::Values(
28 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080029 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070030 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080031 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070032 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
33
34INSTANTIATE_TEST_SUITE_P(
35 All, MultinodeLoggerDeathTest,
36 ::testing::Combine(
37 ::testing::Values(
38 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080039 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070040 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080041 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070042 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
43
44// Tests that we can write and read simple multi-node log files.
45TEST_P(MultinodeLoggerTest, SimpleMultiNode) {
46 std::vector<std::string> actual_filenames;
47 time_converter_.StartEqual();
48
49 {
50 LoggerState pi1_logger = MakeLogger(pi1_);
51 LoggerState pi2_logger = MakeLogger(pi2_);
52
53 event_loop_factory_.RunFor(chrono::milliseconds(95));
54
55 StartLogger(&pi1_logger);
56 StartLogger(&pi2_logger);
57
58 event_loop_factory_.RunFor(chrono::milliseconds(20000));
59 pi1_logger.AppendAllFilenames(&actual_filenames);
60 pi2_logger.AppendAllFilenames(&actual_filenames);
61 }
62
63 ASSERT_THAT(actual_filenames,
64 ::testing::UnorderedElementsAreArray(logfiles_));
65
66 {
67 std::set<std::string> logfile_uuids;
68 std::set<std::string> parts_uuids;
69 // Confirm that we have the expected number of UUIDs for both the logfile
70 // UUIDs and parts UUIDs.
71 std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
72 for (std::string_view f : logfiles_) {
73 log_header.emplace_back(ReadHeader(f).value());
74 if (!log_header.back().message().has_configuration()) {
75 logfile_uuids.insert(
76 log_header.back().message().log_event_uuid()->str());
77 parts_uuids.insert(log_header.back().message().parts_uuid()->str());
78 }
79 }
80
81 EXPECT_EQ(logfile_uuids.size(), 2u);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -070082 EXPECT_EQ(parts_uuids.size(), 6u);
Naman Guptaa63aa132023-03-22 20:06:34 -070083
84 // And confirm everything is on the correct node.
85 EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
86 EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi1");
87 EXPECT_EQ(log_header[4].message().node()->name()->string_view(), "pi1");
88
89 EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
90 EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -070091 EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -070092
Mithun Bharadwaj0c629932023-08-02 16:10:40 -070093 EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi1");
94 EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi1");
Naman Guptaa63aa132023-03-22 20:06:34 -070095
Mithun Bharadwaj0c629932023-08-02 16:10:40 -070096 EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi2");
97 EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -070098
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -070099 EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
100 EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
101 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700102
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700103 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi1");
104 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi1");
Naman Guptaa63aa132023-03-22 20:06:34 -0700105
106 // And the parts index matches.
107 EXPECT_EQ(log_header[2].message().parts_index(), 0);
108 EXPECT_EQ(log_header[3].message().parts_index(), 1);
109 EXPECT_EQ(log_header[4].message().parts_index(), 2);
110
111 EXPECT_EQ(log_header[5].message().parts_index(), 0);
112 EXPECT_EQ(log_header[6].message().parts_index(), 1);
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700113 EXPECT_EQ(log_header[7].message().parts_index(), 2);
Naman Guptaa63aa132023-03-22 20:06:34 -0700114
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700115 EXPECT_EQ(log_header[8].message().parts_index(), 0);
116 EXPECT_EQ(log_header[9].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700117
118 EXPECT_EQ(log_header[10].message().parts_index(), 0);
119 EXPECT_EQ(log_header[11].message().parts_index(), 1);
120
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700121 EXPECT_EQ(log_header[12].message().parts_index(), 0);
122 EXPECT_EQ(log_header[13].message().parts_index(), 1);
123 EXPECT_EQ(log_header[14].message().parts_index(), 2);
Naman Guptaa63aa132023-03-22 20:06:34 -0700124
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700125 EXPECT_EQ(log_header[15].message().parts_index(), 0);
126 EXPECT_EQ(log_header[16].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700127 }
128
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700129 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
130 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -0700131 {
132 using ::testing::UnorderedElementsAre;
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700133 std::shared_ptr<const aos::Configuration> config = sorted_parts[0].config;
Naman Guptaa63aa132023-03-22 20:06:34 -0700134
135 // Timing reports, pings
136 EXPECT_THAT(CountChannelsData(config, logfiles_[2]),
137 UnorderedElementsAre(
138 std::make_tuple("/pi1/aos",
139 "aos.message_bridge.ServerStatistics", 1),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700140 std::make_tuple("/test", "aos.examples.Ping", 1),
141 std::make_tuple("/pi1/aos", "aos.examples.Ping", 1)))
Naman Guptaa63aa132023-03-22 20:06:34 -0700142 << " : " << logfiles_[2];
143 {
144 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700145 std::make_tuple("/pi1/aos", "aos.examples.Ping", 10),
Naman Guptaa63aa132023-03-22 20:06:34 -0700146 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 1),
147 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
148 1)};
149 if (!std::get<0>(GetParam()).shared) {
150 channel_counts.push_back(
151 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
152 "aos-message_bridge-Timestamp",
153 "aos.message_bridge.RemoteMessage", 1));
154 }
155 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
156 ::testing::UnorderedElementsAreArray(channel_counts))
157 << " : " << logfiles_[3];
158 }
159 {
160 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700161 std::make_tuple("/pi1/aos", "aos.examples.Ping", 1990),
Naman Guptaa63aa132023-03-22 20:06:34 -0700162 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 199),
163 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
164 20),
165 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
166 199),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700167 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
Naman Guptaa63aa132023-03-22 20:06:34 -0700168 std::make_tuple("/test", "aos.examples.Ping", 2000)};
169 if (!std::get<0>(GetParam()).shared) {
170 channel_counts.push_back(
171 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
172 "aos-message_bridge-Timestamp",
173 "aos.message_bridge.RemoteMessage", 199));
174 }
175 EXPECT_THAT(CountChannelsData(config, logfiles_[4]),
176 ::testing::UnorderedElementsAreArray(channel_counts))
177 << " : " << logfiles_[4];
178 }
179 // Timestamps for pong
180 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
181 UnorderedElementsAre())
182 << " : " << logfiles_[2];
183 EXPECT_THAT(
184 CountChannelsTimestamp(config, logfiles_[3]),
185 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
186 << " : " << logfiles_[3];
187 EXPECT_THAT(
188 CountChannelsTimestamp(config, logfiles_[4]),
189 UnorderedElementsAre(
190 std::make_tuple("/test", "aos.examples.Pong", 2000),
191 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)))
192 << " : " << logfiles_[4];
193
Naman Guptaa63aa132023-03-22 20:06:34 -0700194 // Timing reports and pongs.
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700195 EXPECT_THAT(CountChannelsData(config, logfiles_[5]),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700196 UnorderedElementsAre(
197 std::make_tuple("/pi2/aos", "aos.examples.Ping", 1),
198 std::make_tuple("/pi2/aos",
199 "aos.message_bridge.ServerStatistics", 1)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700200 << " : " << logfiles_[5];
Naman Guptaa63aa132023-03-22 20:06:34 -0700201 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700202 CountChannelsData(config, logfiles_[6]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700203 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700204 << " : " << logfiles_[6];
Naman Guptaa63aa132023-03-22 20:06:34 -0700205 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700206 CountChannelsData(config, logfiles_[7]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700207 UnorderedElementsAre(
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700208 std::make_tuple("/pi2/aos", "aos.examples.Ping", 2000),
Naman Guptaa63aa132023-03-22 20:06:34 -0700209 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
210 std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
211 20),
212 std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
213 200),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700214 std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
Naman Guptaa63aa132023-03-22 20:06:34 -0700215 std::make_tuple("/test", "aos.examples.Pong", 2000)))
Naman Guptaa63aa132023-03-22 20:06:34 -0700216 << " : " << logfiles_[7];
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700217 // And ping timestamps.
218 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
219 UnorderedElementsAre())
220 << " : " << logfiles_[5];
Naman Guptaa63aa132023-03-22 20:06:34 -0700221 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700222 CountChannelsTimestamp(config, logfiles_[6]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700223 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700224 << " : " << logfiles_[6];
Naman Guptaa63aa132023-03-22 20:06:34 -0700225 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700226 CountChannelsTimestamp(config, logfiles_[7]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700227 UnorderedElementsAre(
228 std::make_tuple("/test", "aos.examples.Ping", 2000),
229 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700230 << " : " << logfiles_[7];
Naman Guptaa63aa132023-03-22 20:06:34 -0700231
232 // And then test that the remotely logged timestamp data files only have
233 // timestamps in them.
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700234 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[8]),
235 UnorderedElementsAre())
236 << " : " << logfiles_[8];
237 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[9]),
238 UnorderedElementsAre())
239 << " : " << logfiles_[9];
Naman Guptaa63aa132023-03-22 20:06:34 -0700240 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
241 UnorderedElementsAre())
242 << " : " << logfiles_[10];
243 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
244 UnorderedElementsAre())
245 << " : " << logfiles_[11];
Naman Guptaa63aa132023-03-22 20:06:34 -0700246
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700247 EXPECT_THAT(CountChannelsData(config, logfiles_[8]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700248 UnorderedElementsAre(std::make_tuple(
249 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700250 << " : " << logfiles_[8];
251 EXPECT_THAT(CountChannelsData(config, logfiles_[9]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700252 UnorderedElementsAre(std::make_tuple(
253 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700254 << " : " << logfiles_[9];
Naman Guptaa63aa132023-03-22 20:06:34 -0700255
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700256 // Pong snd timestamp data.
257 EXPECT_THAT(
258 CountChannelsData(config, logfiles_[10]),
259 UnorderedElementsAre(
260 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 9),
261 std::make_tuple("/test", "aos.examples.Pong", 91)))
262 << " : " << logfiles_[10];
263 EXPECT_THAT(
264 CountChannelsData(config, logfiles_[11]),
265 UnorderedElementsAre(
266 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 191),
267 std::make_tuple("/test", "aos.examples.Pong", 1910)))
268 << " : " << logfiles_[11];
Naman Guptaa63aa132023-03-22 20:06:34 -0700269
270 // Timestamps from pi2 on pi1, and the other way.
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700271 // if (shared()) {
272 EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
273 UnorderedElementsAre())
274 << " : " << logfiles_[12];
275 EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
276 UnorderedElementsAre())
277 << " : " << logfiles_[13];
278 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
279 UnorderedElementsAre())
280 << " : " << logfiles_[14];
281 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
282 UnorderedElementsAre())
283 << " : " << logfiles_[15];
284 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
285 UnorderedElementsAre())
286 << " : " << logfiles_[16];
Naman Guptaa63aa132023-03-22 20:06:34 -0700287
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700288 EXPECT_THAT(
289 CountChannelsTimestamp(config, logfiles_[12]),
290 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
291 << " : " << logfiles_[12];
292 EXPECT_THAT(
293 CountChannelsTimestamp(config, logfiles_[13]),
294 UnorderedElementsAre(
295 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 9),
296 std::make_tuple("/test", "aos.examples.Ping", 90)))
297 << " : " << logfiles_[13];
298 EXPECT_THAT(
299 CountChannelsTimestamp(config, logfiles_[14]),
300 UnorderedElementsAre(
301 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 191),
302 std::make_tuple("/test", "aos.examples.Ping", 1910)))
303 << " : " << logfiles_[14];
304 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
305 UnorderedElementsAre(std::make_tuple(
306 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
307 << " : " << logfiles_[15];
308 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[16]),
309 UnorderedElementsAre(std::make_tuple(
310 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
311 << " : " << logfiles_[16];
Naman Guptaa63aa132023-03-22 20:06:34 -0700312 }
313
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700314 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700315
316 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
317 log_reader_factory.set_send_delay(chrono::microseconds(0));
318
319 // This sends out the fetched messages and advances time to the start of the
320 // log file.
321 reader.Register(&log_reader_factory);
322
323 const Node *pi1 =
324 configuration::GetNode(log_reader_factory.configuration(), "pi1");
325 const Node *pi2 =
326 configuration::GetNode(log_reader_factory.configuration(), "pi2");
327
328 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
329 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
330 LOG(INFO) << "now pi1 "
331 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
332 LOG(INFO) << "now pi2 "
333 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
334
335 EXPECT_THAT(reader.LoggedNodes(),
336 ::testing::ElementsAre(
337 configuration::GetNode(reader.logged_configuration(), pi1),
338 configuration::GetNode(reader.logged_configuration(), pi2)));
339
340 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
341
342 std::unique_ptr<EventLoop> pi1_event_loop =
343 log_reader_factory.MakeEventLoop("test", pi1);
344 std::unique_ptr<EventLoop> pi2_event_loop =
345 log_reader_factory.MakeEventLoop("test", pi2);
346
347 int pi1_ping_count = 10;
348 int pi2_ping_count = 10;
349 int pi1_pong_count = 10;
350 int pi2_pong_count = 10;
351
352 // Confirm that the ping value matches.
353 pi1_event_loop->MakeWatcher(
354 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
355 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
356 << pi1_event_loop->context().monotonic_remote_time << " -> "
357 << pi1_event_loop->context().monotonic_event_time;
358 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
359 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
360 pi1_ping_count * chrono::milliseconds(10) +
361 monotonic_clock::epoch());
362 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
363 pi1_ping_count * chrono::milliseconds(10) +
364 realtime_clock::epoch());
365 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
366 pi1_event_loop->context().monotonic_event_time);
367 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
368 pi1_event_loop->context().realtime_event_time);
369
370 ++pi1_ping_count;
371 });
372 pi2_event_loop->MakeWatcher(
373 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
374 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
375 << pi2_event_loop->context().monotonic_remote_time << " -> "
376 << pi2_event_loop->context().monotonic_event_time;
377 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
378
379 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
380 pi2_ping_count * chrono::milliseconds(10) +
381 monotonic_clock::epoch());
382 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
383 pi2_ping_count * chrono::milliseconds(10) +
384 realtime_clock::epoch());
385 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time +
386 chrono::microseconds(150),
387 pi2_event_loop->context().monotonic_event_time);
388 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time +
389 chrono::microseconds(150),
390 pi2_event_loop->context().realtime_event_time);
391 ++pi2_ping_count;
392 });
393
394 constexpr ssize_t kQueueIndexOffset = -9;
395 // Confirm that the ping and pong counts both match, and the value also
396 // matches.
397 pi1_event_loop->MakeWatcher(
398 "/test", [&pi1_event_loop, &pi1_ping_count,
399 &pi1_pong_count](const examples::Pong &pong) {
400 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
401 << pi1_event_loop->context().monotonic_remote_time << " -> "
402 << pi1_event_loop->context().monotonic_event_time;
403
404 EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
405 pi1_pong_count + kQueueIndexOffset);
406 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
407 chrono::microseconds(200) +
408 pi1_pong_count * chrono::milliseconds(10) +
409 monotonic_clock::epoch());
410 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
411 chrono::microseconds(200) +
412 pi1_pong_count * chrono::milliseconds(10) +
413 realtime_clock::epoch());
414
415 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
416 chrono::microseconds(150),
417 pi1_event_loop->context().monotonic_event_time);
418 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
419 chrono::microseconds(150),
420 pi1_event_loop->context().realtime_event_time);
421
422 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
423 ++pi1_pong_count;
424 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
425 });
426 pi2_event_loop->MakeWatcher(
427 "/test", [&pi2_event_loop, &pi2_ping_count,
428 &pi2_pong_count](const examples::Pong &pong) {
429 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
430 << pi2_event_loop->context().monotonic_remote_time << " -> "
431 << pi2_event_loop->context().monotonic_event_time;
432
433 EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
434 pi2_pong_count + kQueueIndexOffset);
435
436 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
437 chrono::microseconds(200) +
438 pi2_pong_count * chrono::milliseconds(10) +
439 monotonic_clock::epoch());
440 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
441 chrono::microseconds(200) +
442 pi2_pong_count * chrono::milliseconds(10) +
443 realtime_clock::epoch());
444
445 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
446 pi2_event_loop->context().monotonic_event_time);
447 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
448 pi2_event_loop->context().realtime_event_time);
449
450 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
451 ++pi2_pong_count;
452 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
453 });
454
455 log_reader_factory.Run();
456 EXPECT_EQ(pi1_ping_count, 2010);
457 EXPECT_EQ(pi2_ping_count, 2010);
458 EXPECT_EQ(pi1_pong_count, 2010);
459 EXPECT_EQ(pi2_pong_count, 2010);
460
461 reader.Deregister();
462}
463
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600464// MultinodeLoggerTest that tests the mutate callback works across multiple
465// nodes with remapping
466TEST_P(MultinodeLoggerTest, MultiNodeRemapMutateCallback) {
467 time_converter_.StartEqual();
468 std::vector<std::string> actual_filenames;
469
470 {
471 LoggerState pi1_logger = MakeLogger(pi1_);
472 LoggerState pi2_logger = MakeLogger(pi2_);
473
474 event_loop_factory_.RunFor(chrono::milliseconds(95));
475
476 StartLogger(&pi1_logger);
477 StartLogger(&pi2_logger);
478
479 event_loop_factory_.RunFor(chrono::milliseconds(20000));
480 pi1_logger.AppendAllFilenames(&actual_filenames);
481 pi2_logger.AppendAllFilenames(&actual_filenames);
482 }
483
484 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700485 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600486
487 LogReader reader(sorted_parts, &config_.message());
488 // Remap just on pi1.
489 reader.RemapLoggedChannel<examples::Pong>(
490 "/test", configuration::GetNode(reader.configuration(), "pi1"));
491
492 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
493
494 int pong_count = 0;
495 // Adds a callback which mutates the value of the pong message before the
496 // message is sent which is the feature we are testing here
497 reader.AddBeforeSendCallback("/test",
498 [&pong_count](aos::examples::Pong *pong) {
499 pong->mutate_value(pong->value() + 1);
500 pong_count = pong->value();
501 });
502
503 // This sends out the fetched messages and advances time to the start of the
504 // log file.
505 reader.Register(&log_reader_factory);
506
507 const Node *pi1 =
508 configuration::GetNode(log_reader_factory.configuration(), "pi1");
509 const Node *pi2 =
510 configuration::GetNode(log_reader_factory.configuration(), "pi2");
511
512 EXPECT_THAT(reader.LoggedNodes(),
513 ::testing::ElementsAre(
514 configuration::GetNode(reader.logged_configuration(), pi1),
515 configuration::GetNode(reader.logged_configuration(), pi2)));
516
517 std::unique_ptr<EventLoop> pi1_event_loop =
518 log_reader_factory.MakeEventLoop("test", pi1);
519 std::unique_ptr<EventLoop> pi2_event_loop =
520 log_reader_factory.MakeEventLoop("test", pi2);
521
522 pi1_event_loop->MakeWatcher("/original/test",
523 [&pong_count](const examples::Pong &pong) {
524 EXPECT_EQ(pong_count, pong.value());
525 });
526
527 pi2_event_loop->MakeWatcher("/test",
528 [&pong_count](const examples::Pong &pong) {
529 EXPECT_EQ(pong_count, pong.value());
530 });
531
532 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
533 reader.Deregister();
534
535 EXPECT_EQ(pong_count, 2011);
536}
537
538// MultinodeLoggerTest that tests the mutate callback works across multiple
539// nodes
540TEST_P(MultinodeLoggerTest, MultiNodeMutateCallback) {
541 time_converter_.StartEqual();
542 std::vector<std::string> actual_filenames;
543
544 {
545 LoggerState pi1_logger = MakeLogger(pi1_);
546 LoggerState pi2_logger = MakeLogger(pi2_);
547
548 event_loop_factory_.RunFor(chrono::milliseconds(95));
549
550 StartLogger(&pi1_logger);
551 StartLogger(&pi2_logger);
552
553 event_loop_factory_.RunFor(chrono::milliseconds(20000));
554 pi1_logger.AppendAllFilenames(&actual_filenames);
555 pi2_logger.AppendAllFilenames(&actual_filenames);
556 }
557
558 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700559 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600560
561 LogReader reader(sorted_parts, &config_.message());
562
563 int pong_count = 0;
564 // Adds a callback which mutates the value of the pong message before the
565 // message is sent which is the feature we are testing here
566 reader.AddBeforeSendCallback("/test",
567 [&pong_count](aos::examples::Pong *pong) {
568 pong->mutate_value(pong->value() + 1);
569 pong_count = pong->value();
570 });
571
572 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
573
574 // This sends out the fetched messages and advances time to the start of the
575 // log file.
576 reader.Register(&log_reader_factory);
577
578 const Node *pi1 =
579 configuration::GetNode(log_reader_factory.configuration(), "pi1");
580 const Node *pi2 =
581 configuration::GetNode(log_reader_factory.configuration(), "pi2");
582
583 EXPECT_THAT(reader.LoggedNodes(),
584 ::testing::ElementsAre(
585 configuration::GetNode(reader.logged_configuration(), pi1),
586 configuration::GetNode(reader.logged_configuration(), pi2)));
587
588 std::unique_ptr<EventLoop> pi1_event_loop =
589 log_reader_factory.MakeEventLoop("test", pi1);
590 std::unique_ptr<EventLoop> pi2_event_loop =
591 log_reader_factory.MakeEventLoop("test", pi2);
592
593 pi1_event_loop->MakeWatcher("/test",
594 [&pong_count](const examples::Pong &pong) {
595 EXPECT_EQ(pong_count, pong.value());
596 });
597
598 pi2_event_loop->MakeWatcher("/test",
599 [&pong_count](const examples::Pong &pong) {
600 EXPECT_EQ(pong_count, pong.value());
601 });
602
603 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
604 reader.Deregister();
605
606 EXPECT_EQ(pong_count, 2011);
607}
608
609// Tests that the before send callback is only called from the sender node if it
610// is forwarded
611TEST_P(MultinodeLoggerTest, OnlyDoBeforeSendCallbackOnSenderNode) {
612 time_converter_.StartEqual();
613 {
614 LoggerState pi1_logger = MakeLogger(pi1_);
615 LoggerState pi2_logger = MakeLogger(pi2_);
616
617 event_loop_factory_.RunFor(chrono::milliseconds(95));
618
619 StartLogger(&pi1_logger);
620 StartLogger(&pi2_logger);
621
622 event_loop_factory_.RunFor(chrono::milliseconds(20000));
623 }
624
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700625 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
626 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
627 LogReader reader(sorted_parts);
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600628
629 int ping_count = 0;
630 // Adds a callback which mutates the value of the pong message before the
631 // message is sent which is the feature we are testing here
632 reader.AddBeforeSendCallback("/test",
633 [&ping_count](aos::examples::Ping *ping) {
634 ++ping_count;
635 ping->mutate_value(ping_count);
636 });
637
638 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
639 log_reader_factory.set_send_delay(chrono::microseconds(0));
640
641 reader.Register(&log_reader_factory);
642
643 const Node *pi1 =
644 configuration::GetNode(log_reader_factory.configuration(), "pi1");
645 const Node *pi2 =
646 configuration::GetNode(log_reader_factory.configuration(), "pi2");
647
648 std::unique_ptr<EventLoop> pi1_event_loop =
649 log_reader_factory.MakeEventLoop("test", pi1);
650 pi1_event_loop->SkipTimingReport();
651 std::unique_ptr<EventLoop> pi2_event_loop =
652 log_reader_factory.MakeEventLoop("test", pi2);
653 pi2_event_loop->SkipTimingReport();
654
655 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
656 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
657
658 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
659 pi1_ping_timestamp;
660 if (!shared()) {
661 pi1_ping_timestamp =
662 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
663 pi1_event_loop.get(),
664 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
665 }
666
667 log_reader_factory.Run();
668
669 EXPECT_EQ(pi1_ping.count(), 2000u);
670 EXPECT_EQ(pi2_ping.count(), 2000u);
671 // If the BeforeSendCallback is called on both nodes, then the ping count
672 // would be 4002 instead of 2001
673 EXPECT_EQ(ping_count, 2001u);
674 if (!shared()) {
675 EXPECT_EQ(pi1_ping_timestamp->count(), 2000u);
676 }
677
678 reader.Deregister();
679}
680
681// Tests that we do not allow adding callbacks after Register is called
682TEST_P(MultinodeLoggerDeathTest, AddCallbackAfterRegister) {
683 time_converter_.StartEqual();
684 std::vector<std::string> actual_filenames;
685
686 {
687 LoggerState pi1_logger = MakeLogger(pi1_);
688 LoggerState pi2_logger = MakeLogger(pi2_);
689
690 event_loop_factory_.RunFor(chrono::milliseconds(95));
691
692 StartLogger(&pi1_logger);
693 StartLogger(&pi2_logger);
694
695 event_loop_factory_.RunFor(chrono::milliseconds(20000));
696 pi1_logger.AppendAllFilenames(&actual_filenames);
697 pi2_logger.AppendAllFilenames(&actual_filenames);
698 }
699
700 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700701 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600702
703 LogReader reader(sorted_parts, &config_.message());
704 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
705 reader.Register(&log_reader_factory);
706 EXPECT_DEATH(
707 {
708 reader.AddBeforeSendCallback("/test", [](aos::examples::Pong *) {
709 LOG(FATAL) << "This should not be called";
710 });
711 },
712 "Cannot add callbacks after calling Register");
713 reader.Deregister();
714}
715
Naman Guptaa63aa132023-03-22 20:06:34 -0700716// Test that if we feed the replay with a mismatched node list that we die on
717// the LogReader constructor.
718TEST_P(MultinodeLoggerDeathTest, MultiNodeBadReplayConfig) {
719 time_converter_.StartEqual();
720 {
721 LoggerState pi1_logger = MakeLogger(pi1_);
722 LoggerState pi2_logger = MakeLogger(pi2_);
723
724 event_loop_factory_.RunFor(chrono::milliseconds(95));
725
726 StartLogger(&pi1_logger);
727 StartLogger(&pi2_logger);
728
729 event_loop_factory_.RunFor(chrono::milliseconds(20000));
730 }
731
732 // Test that, if we add an additional node to the replay config that the
733 // logger complains about the mismatch in number of nodes.
734 FlatbufferDetachedBuffer<Configuration> extra_nodes_config =
735 configuration::MergeWithConfig(&config_.message(), R"({
736 "nodes": [
737 {
738 "name": "extra-node"
739 }
740 ]
741 }
742 )");
743
744 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700745 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -0700746 EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
747 "Log file and replay config need to have matching nodes lists.");
748}
749
750// Tests that we can read log files where they don't start at the same monotonic
751// time.
752TEST_P(MultinodeLoggerTest, StaggeredStart) {
753 time_converter_.StartEqual();
754 std::vector<std::string> actual_filenames;
755
756 {
757 LoggerState pi1_logger = MakeLogger(pi1_);
758 LoggerState pi2_logger = MakeLogger(pi2_);
759
760 event_loop_factory_.RunFor(chrono::milliseconds(95));
761
762 StartLogger(&pi1_logger);
763
764 event_loop_factory_.RunFor(chrono::milliseconds(200));
765
766 StartLogger(&pi2_logger);
767
768 event_loop_factory_.RunFor(chrono::milliseconds(20000));
769 pi1_logger.AppendAllFilenames(&actual_filenames);
770 pi2_logger.AppendAllFilenames(&actual_filenames);
771 }
772
773 // Since we delay starting pi2, it already knows about all the timestamps so
774 // we don't end up with extra parts.
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700775 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
776 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
777 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700778
779 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
780 log_reader_factory.set_send_delay(chrono::microseconds(0));
781
782 // This sends out the fetched messages and advances time to the start of the
783 // log file.
784 reader.Register(&log_reader_factory);
785
786 const Node *pi1 =
787 configuration::GetNode(log_reader_factory.configuration(), "pi1");
788 const Node *pi2 =
789 configuration::GetNode(log_reader_factory.configuration(), "pi2");
790
791 EXPECT_THAT(reader.LoggedNodes(),
792 ::testing::ElementsAre(
793 configuration::GetNode(reader.logged_configuration(), pi1),
794 configuration::GetNode(reader.logged_configuration(), pi2)));
795
796 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
797
798 std::unique_ptr<EventLoop> pi1_event_loop =
799 log_reader_factory.MakeEventLoop("test", pi1);
800 std::unique_ptr<EventLoop> pi2_event_loop =
801 log_reader_factory.MakeEventLoop("test", pi2);
802
803 int pi1_ping_count = 30;
804 int pi2_ping_count = 30;
805 int pi1_pong_count = 30;
806 int pi2_pong_count = 30;
807
808 // Confirm that the ping value matches.
809 pi1_event_loop->MakeWatcher(
810 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
811 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
812 << pi1_event_loop->context().monotonic_remote_time << " -> "
813 << pi1_event_loop->context().monotonic_event_time;
814 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
815
816 ++pi1_ping_count;
817 });
818 pi2_event_loop->MakeWatcher(
819 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
820 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
821 << pi2_event_loop->context().monotonic_remote_time << " -> "
822 << pi2_event_loop->context().monotonic_event_time;
823 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
824
825 ++pi2_ping_count;
826 });
827
828 // Confirm that the ping and pong counts both match, and the value also
829 // matches.
830 pi1_event_loop->MakeWatcher(
831 "/test", [&pi1_event_loop, &pi1_ping_count,
832 &pi1_pong_count](const examples::Pong &pong) {
833 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
834 << pi1_event_loop->context().monotonic_remote_time << " -> "
835 << pi1_event_loop->context().monotonic_event_time;
836
837 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
838 ++pi1_pong_count;
839 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
840 });
841 pi2_event_loop->MakeWatcher(
842 "/test", [&pi2_event_loop, &pi2_ping_count,
843 &pi2_pong_count](const examples::Pong &pong) {
844 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
845 << pi2_event_loop->context().monotonic_remote_time << " -> "
846 << pi2_event_loop->context().monotonic_event_time;
847
848 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
849 ++pi2_pong_count;
850 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
851 });
852
853 log_reader_factory.Run();
854 EXPECT_EQ(pi1_ping_count, 2030);
855 EXPECT_EQ(pi2_ping_count, 2030);
856 EXPECT_EQ(pi1_pong_count, 2030);
857 EXPECT_EQ(pi2_pong_count, 2030);
858
859 reader.Deregister();
860}
861
862// Tests that we can read log files where the monotonic clocks drift and don't
863// match correctly. While we are here, also test that different ending times
864// also is readable.
865TEST_P(MultinodeLoggerTest, MismatchedClocks) {
866 // TODO(austin): Negate...
867 const chrono::nanoseconds initial_pi2_offset = chrono::seconds(1000);
868
869 time_converter_.AddMonotonic(
870 {BootTimestamp::epoch(), BootTimestamp::epoch() + initial_pi2_offset});
871 // Wait for 95 ms, (~0.1 seconds - 1/2 of the ping/pong period), and set the
872 // skew to be 200 uS/s
873 const chrono::nanoseconds startup_sleep1 = time_converter_.AddMonotonic(
874 {chrono::milliseconds(95),
875 chrono::milliseconds(95) - chrono::nanoseconds(200) * 95});
876 // Run another 200 ms to have one logger start first.
877 const chrono::nanoseconds startup_sleep2 = time_converter_.AddMonotonic(
878 {chrono::milliseconds(200), chrono::milliseconds(200)});
879 // Slew one way then the other at the same 200 uS/S slew rate. Make sure we
880 // go far enough to cause problems if this isn't accounted for.
881 const chrono::nanoseconds logger_run1 = time_converter_.AddMonotonic(
882 {chrono::milliseconds(20000),
883 chrono::milliseconds(20000) - chrono::nanoseconds(200) * 20000});
884 const chrono::nanoseconds logger_run2 = time_converter_.AddMonotonic(
885 {chrono::milliseconds(40000),
886 chrono::milliseconds(40000) + chrono::nanoseconds(200) * 40000});
887 const chrono::nanoseconds logger_run3 = time_converter_.AddMonotonic(
888 {chrono::milliseconds(400), chrono::milliseconds(400)});
889
890 {
891 LoggerState pi2_logger = MakeLogger(pi2_);
892
893 LOG(INFO) << "pi2 times: " << pi2_->monotonic_now() << " "
894 << pi2_->realtime_now() << " distributed "
895 << pi2_->ToDistributedClock(pi2_->monotonic_now());
896
897 LOG(INFO) << "pi2_ times: " << pi2_->monotonic_now() << " "
898 << pi2_->realtime_now() << " distributed "
899 << pi2_->ToDistributedClock(pi2_->monotonic_now());
900
901 event_loop_factory_.RunFor(startup_sleep1);
902
903 StartLogger(&pi2_logger);
904
905 event_loop_factory_.RunFor(startup_sleep2);
906
907 {
908 // Run pi1's logger for only part of the time.
909 LoggerState pi1_logger = MakeLogger(pi1_);
910
911 StartLogger(&pi1_logger);
912 event_loop_factory_.RunFor(logger_run1);
913
914 // Make sure we slewed time far enough so that the difference is greater
915 // than the network delay. This confirms that if we sort incorrectly, it
916 // would show in the results.
917 EXPECT_LT(
918 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
919 -event_loop_factory_.send_delay() -
920 event_loop_factory_.network_delay());
921
922 event_loop_factory_.RunFor(logger_run2);
923
924 // And now check that we went far enough the other way to make sure we
925 // cover both problems.
926 EXPECT_GT(
927 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
928 event_loop_factory_.send_delay() +
929 event_loop_factory_.network_delay());
930 }
931
932 // And log a bit more on pi2.
933 event_loop_factory_.RunFor(logger_run3);
934 }
935
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700936 const std::vector<LogFile> sorted_parts =
937 SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 3, 3));
938 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
939 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700940
941 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
942 log_reader_factory.set_send_delay(chrono::microseconds(0));
943
944 const Node *pi1 =
945 configuration::GetNode(log_reader_factory.configuration(), "pi1");
946 const Node *pi2 =
947 configuration::GetNode(log_reader_factory.configuration(), "pi2");
948
949 // This sends out the fetched messages and advances time to the start of the
950 // log file.
951 reader.Register(&log_reader_factory);
952
953 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
954 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
955 LOG(INFO) << "now pi1 "
956 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
957 LOG(INFO) << "now pi2 "
958 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
959
960 LOG(INFO) << "Done registering (pi1) "
961 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
962 << " "
963 << log_reader_factory.GetNodeEventLoopFactory(pi1)->realtime_now();
964 LOG(INFO) << "Done registering (pi2) "
965 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now()
966 << " "
967 << log_reader_factory.GetNodeEventLoopFactory(pi2)->realtime_now();
968
969 EXPECT_THAT(reader.LoggedNodes(),
970 ::testing::ElementsAre(
971 configuration::GetNode(reader.logged_configuration(), pi1),
972 configuration::GetNode(reader.logged_configuration(), pi2)));
973
974 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
975
976 std::unique_ptr<EventLoop> pi1_event_loop =
977 log_reader_factory.MakeEventLoop("test", pi1);
978 std::unique_ptr<EventLoop> pi2_event_loop =
979 log_reader_factory.MakeEventLoop("test", pi2);
980
981 int pi1_ping_count = 30;
982 int pi2_ping_count = 30;
983 int pi1_pong_count = 30;
984 int pi2_pong_count = 30;
985
986 // Confirm that the ping value matches.
987 pi1_event_loop->MakeWatcher(
988 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
989 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
990 << pi1_event_loop->context().monotonic_remote_time << " -> "
991 << pi1_event_loop->context().monotonic_event_time;
992 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
993
994 ++pi1_ping_count;
995 });
996 pi2_event_loop->MakeWatcher(
997 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
998 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
999 << pi2_event_loop->context().monotonic_remote_time << " -> "
1000 << pi2_event_loop->context().monotonic_event_time;
1001 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
1002
1003 ++pi2_ping_count;
1004 });
1005
1006 // Confirm that the ping and pong counts both match, and the value also
1007 // matches.
1008 pi1_event_loop->MakeWatcher(
1009 "/test", [&pi1_event_loop, &pi1_ping_count,
1010 &pi1_pong_count](const examples::Pong &pong) {
1011 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
1012 << pi1_event_loop->context().monotonic_remote_time << " -> "
1013 << pi1_event_loop->context().monotonic_event_time;
1014
1015 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
1016 ++pi1_pong_count;
1017 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
1018 });
1019 pi2_event_loop->MakeWatcher(
1020 "/test", [&pi2_event_loop, &pi2_ping_count,
1021 &pi2_pong_count](const examples::Pong &pong) {
1022 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
1023 << pi2_event_loop->context().monotonic_remote_time << " -> "
1024 << pi2_event_loop->context().monotonic_event_time;
1025
1026 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
1027 ++pi2_pong_count;
1028 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
1029 });
1030
1031 log_reader_factory.Run();
1032 EXPECT_EQ(pi1_ping_count, 6030);
1033 EXPECT_EQ(pi2_ping_count, 6030);
1034 EXPECT_EQ(pi1_pong_count, 6030);
1035 EXPECT_EQ(pi2_pong_count, 6030);
1036
1037 reader.Deregister();
1038}
1039
1040// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
1041TEST_P(MultinodeLoggerTest, SortParts) {
1042 time_converter_.StartEqual();
1043 // Make a bunch of parts.
1044 {
1045 LoggerState pi1_logger = MakeLogger(pi1_);
1046 LoggerState pi2_logger = MakeLogger(pi2_);
1047
1048 event_loop_factory_.RunFor(chrono::milliseconds(95));
1049
1050 StartLogger(&pi1_logger);
1051 StartLogger(&pi2_logger);
1052
1053 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1054 }
1055
1056 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1057 VerifyParts(sorted_parts);
1058}
1059
1060// Tests that we can sort a bunch of parts with an empty part. We should ignore
1061// it and remove it from the sorted list.
1062TEST_P(MultinodeLoggerTest, SortEmptyParts) {
1063 time_converter_.StartEqual();
1064 // Make a bunch of parts.
1065 {
1066 LoggerState pi1_logger = MakeLogger(pi1_);
1067 LoggerState pi2_logger = MakeLogger(pi2_);
1068
1069 event_loop_factory_.RunFor(chrono::milliseconds(95));
1070
1071 StartLogger(&pi1_logger);
1072 StartLogger(&pi2_logger);
1073
1074 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1075 }
1076
1077 // TODO(austin): Should we flip out if the file can't open?
1078 const std::string kEmptyFile("foobarinvalidfiledoesnotexist" + Extension());
1079
1080 aos::util::WriteStringToFileOrDie(kEmptyFile, "");
1081 logfiles_.emplace_back(kEmptyFile);
1082
1083 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1084 VerifyParts(sorted_parts, {kEmptyFile});
1085}
1086
1087// Tests that we can sort a bunch of parts with the end missing off a
1088// file. We should use the part we can read.
1089TEST_P(MultinodeLoggerTest, SortTruncatedParts) {
1090 std::vector<std::string> actual_filenames;
1091 time_converter_.StartEqual();
1092 // Make a bunch of parts.
1093 {
1094 LoggerState pi1_logger = MakeLogger(pi1_);
1095 LoggerState pi2_logger = MakeLogger(pi2_);
1096
1097 event_loop_factory_.RunFor(chrono::milliseconds(95));
1098
1099 StartLogger(&pi1_logger);
1100 StartLogger(&pi2_logger);
1101
1102 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1103
1104 pi1_logger.AppendAllFilenames(&actual_filenames);
1105 pi2_logger.AppendAllFilenames(&actual_filenames);
1106 }
1107
1108 ASSERT_THAT(actual_filenames,
1109 ::testing::UnorderedElementsAreArray(logfiles_));
1110
1111 // Strip off the end of one of the files. Pick one with a lot of data.
1112 // For snappy, needs to have enough data to be >1 chunk of compressed data so
1113 // that we don't corrupt the entire log part.
1114 ::std::string compressed_contents =
1115 aos::util::ReadFileToStringOrDie(logfiles_[4]);
1116
1117 aos::util::WriteStringToFileOrDie(
1118 logfiles_[4],
1119 compressed_contents.substr(0, compressed_contents.size() - 100));
1120
1121 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1122 VerifyParts(sorted_parts);
1123}
1124
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001125// Tests that if we remap a logged channel, it shows up correctly.
Naman Guptaa63aa132023-03-22 20:06:34 -07001126TEST_P(MultinodeLoggerTest, RemapLoggedChannel) {
1127 time_converter_.StartEqual();
1128 {
1129 LoggerState pi1_logger = MakeLogger(pi1_);
1130 LoggerState pi2_logger = MakeLogger(pi2_);
1131
1132 event_loop_factory_.RunFor(chrono::milliseconds(95));
1133
1134 StartLogger(&pi1_logger);
1135 StartLogger(&pi2_logger);
1136
1137 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1138 }
1139
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001140 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1141 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1142 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001143
1144 // Remap just on pi1.
1145 reader.RemapLoggedChannel<aos::timing::Report>(
1146 "/aos", configuration::GetNode(reader.configuration(), "pi1"));
1147
1148 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1149 log_reader_factory.set_send_delay(chrono::microseconds(0));
1150
1151 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1152 // Note: An extra channel gets remapped automatically due to a timestamp
1153 // channel being LOCAL_LOGGER'd.
1154 ASSERT_EQ(remapped_channels.size(), std::get<0>(GetParam()).shared ? 1u : 2u);
1155 EXPECT_EQ(remapped_channels[0]->name()->string_view(), "/original/pi1/aos");
1156 EXPECT_EQ(remapped_channels[0]->type()->string_view(), "aos.timing.Report");
1157 if (!std::get<0>(GetParam()).shared) {
1158 EXPECT_EQ(remapped_channels[1]->name()->string_view(),
1159 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1160 "aos-message_bridge-Timestamp");
1161 EXPECT_EQ(remapped_channels[1]->type()->string_view(),
1162 "aos.message_bridge.RemoteMessage");
1163 }
1164
1165 reader.Register(&log_reader_factory);
1166
1167 const Node *pi1 =
1168 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1169 const Node *pi2 =
1170 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1171
1172 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1173 // else should have moved.
1174 std::unique_ptr<EventLoop> pi1_event_loop =
1175 log_reader_factory.MakeEventLoop("test", pi1);
1176 pi1_event_loop->SkipTimingReport();
1177 std::unique_ptr<EventLoop> full_pi1_event_loop =
1178 log_reader_factory.MakeEventLoop("test", pi1);
1179 full_pi1_event_loop->SkipTimingReport();
1180 std::unique_ptr<EventLoop> pi2_event_loop =
1181 log_reader_factory.MakeEventLoop("test", pi2);
1182 pi2_event_loop->SkipTimingReport();
1183
1184 MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
1185 "/aos");
1186 MessageCounter<aos::timing::Report> full_pi1_timing_report(
1187 full_pi1_event_loop.get(), "/pi1/aos");
1188 MessageCounter<aos::timing::Report> pi1_original_timing_report(
1189 pi1_event_loop.get(), "/original/aos");
1190 MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
1191 full_pi1_event_loop.get(), "/original/pi1/aos");
1192 MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
1193 "/aos");
1194
1195 log_reader_factory.Run();
1196
1197 EXPECT_EQ(pi1_timing_report.count(), 0u);
1198 EXPECT_EQ(full_pi1_timing_report.count(), 0u);
1199 EXPECT_NE(pi1_original_timing_report.count(), 0u);
1200 EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
1201 EXPECT_NE(pi2_timing_report.count(), 0u);
1202
1203 reader.Deregister();
1204}
1205
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001206// Tests that if we rename a logged channel, it shows up correctly.
1207TEST_P(MultinodeLoggerTest, RenameLoggedChannel) {
1208 std::vector<std::string> actual_filenames;
1209 time_converter_.StartEqual();
1210 {
1211 LoggerState pi1_logger = MakeLogger(pi1_);
1212 LoggerState pi2_logger = MakeLogger(pi2_);
1213
1214 event_loop_factory_.RunFor(chrono::milliseconds(95));
1215
1216 StartLogger(&pi1_logger);
1217 StartLogger(&pi2_logger);
1218
1219 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1220
1221 pi1_logger.AppendAllFilenames(&actual_filenames);
1222 pi2_logger.AppendAllFilenames(&actual_filenames);
1223 }
1224
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001225 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1226 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1227 LogReader reader(sorted_parts);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001228
1229 // Rename just on pi2. Add some global maps just to verify they get added in
1230 // the config and used correctly.
1231 std::vector<MapT> maps;
1232 {
1233 MapT map;
1234 map.match = std::make_unique<ChannelT>();
1235 map.match->name = "/foo*";
1236 map.match->source_node = "pi1";
1237 map.rename = std::make_unique<ChannelT>();
1238 map.rename->name = "/pi1/foo";
1239 maps.emplace_back(std::move(map));
1240 }
1241 {
1242 MapT map;
1243 map.match = std::make_unique<ChannelT>();
1244 map.match->name = "/foo*";
1245 map.match->source_node = "pi2";
1246 map.rename = std::make_unique<ChannelT>();
1247 map.rename->name = "/pi2/foo";
1248 maps.emplace_back(std::move(map));
1249 }
1250 {
1251 MapT map;
1252 map.match = std::make_unique<ChannelT>();
1253 map.match->name = "/foo";
1254 map.match->type = "aos.examples.Ping";
1255 map.rename = std::make_unique<ChannelT>();
1256 map.rename->name = "/foo/renamed";
1257 maps.emplace_back(std::move(map));
1258 }
1259 reader.RenameLoggedChannel<aos::examples::Ping>(
1260 "/aos", configuration::GetNode(reader.configuration(), "pi2"),
1261 "/pi2/foo/renamed", maps);
1262
1263 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1264 log_reader_factory.set_send_delay(chrono::microseconds(0));
1265
1266 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1267 // Note: An extra channel gets remapped automatically due to a timestamp
1268 // channel being LOCAL_LOGGER'd.
1269 const bool shared = std::get<0>(GetParam()).shared;
1270 ASSERT_EQ(remapped_channels.size(), shared ? 1u : 2u);
1271 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->name()->string_view(),
1272 "/pi2/foo/renamed");
1273 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->type()->string_view(),
1274 "aos.examples.Ping");
1275 if (!shared) {
1276 EXPECT_EQ(remapped_channels[0]->name()->string_view(),
1277 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1278 "aos-message_bridge-Timestamp");
1279 EXPECT_EQ(remapped_channels[0]->type()->string_view(),
1280 "aos.message_bridge.RemoteMessage");
1281 }
1282
1283 reader.Register(&log_reader_factory);
1284
1285 const Node *pi1 =
1286 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1287 const Node *pi2 =
1288 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1289
1290 // Confirm we can read the data on the renamed channel, just for pi2. Nothing
1291 // else should have moved.
1292 std::unique_ptr<EventLoop> pi2_event_loop =
1293 log_reader_factory.MakeEventLoop("test", pi2);
1294 pi2_event_loop->SkipTimingReport();
1295 std::unique_ptr<EventLoop> full_pi2_event_loop =
1296 log_reader_factory.MakeEventLoop("test", pi2);
1297 full_pi2_event_loop->SkipTimingReport();
1298 std::unique_ptr<EventLoop> pi1_event_loop =
1299 log_reader_factory.MakeEventLoop("test", pi1);
1300 pi1_event_loop->SkipTimingReport();
1301
1302 MessageCounter<aos::examples::Ping> pi2_ping(pi2_event_loop.get(), "/aos");
1303 MessageCounter<aos::examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1304 "/foo");
1305 MessageCounter<aos::examples::Ping> full_pi2_renamed_ping(
1306 full_pi2_event_loop.get(), "/pi2/foo/renamed");
1307 MessageCounter<aos::examples::Ping> pi1_ping(pi1_event_loop.get(), "/aos");
1308
1309 log_reader_factory.Run();
1310
1311 EXPECT_EQ(pi2_ping.count(), 0u);
1312 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1313 EXPECT_NE(full_pi2_renamed_ping.count(), 0u);
1314 EXPECT_NE(pi1_ping.count(), 0u);
1315
1316 reader.Deregister();
1317}
1318
Naman Guptaa63aa132023-03-22 20:06:34 -07001319// Tests that we can remap a forwarded channel as well.
1320TEST_P(MultinodeLoggerTest, RemapForwardedLoggedChannel) {
1321 time_converter_.StartEqual();
1322 {
1323 LoggerState pi1_logger = MakeLogger(pi1_);
1324 LoggerState pi2_logger = MakeLogger(pi2_);
1325
1326 event_loop_factory_.RunFor(chrono::milliseconds(95));
1327
1328 StartLogger(&pi1_logger);
1329 StartLogger(&pi2_logger);
1330
1331 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1332 }
1333
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001334 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1335 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1336 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001337
1338 reader.RemapLoggedChannel<examples::Ping>("/test");
1339
1340 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1341 log_reader_factory.set_send_delay(chrono::microseconds(0));
1342
1343 reader.Register(&log_reader_factory);
1344
1345 const Node *pi1 =
1346 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1347 const Node *pi2 =
1348 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1349
1350 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1351 // else should have moved.
1352 std::unique_ptr<EventLoop> pi1_event_loop =
1353 log_reader_factory.MakeEventLoop("test", pi1);
1354 pi1_event_loop->SkipTimingReport();
1355 std::unique_ptr<EventLoop> full_pi1_event_loop =
1356 log_reader_factory.MakeEventLoop("test", pi1);
1357 full_pi1_event_loop->SkipTimingReport();
1358 std::unique_ptr<EventLoop> pi2_event_loop =
1359 log_reader_factory.MakeEventLoop("test", pi2);
1360 pi2_event_loop->SkipTimingReport();
1361
1362 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1363 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1364 MessageCounter<examples::Ping> pi1_original_ping(pi1_event_loop.get(),
1365 "/original/test");
1366 MessageCounter<examples::Ping> pi2_original_ping(pi2_event_loop.get(),
1367 "/original/test");
1368
1369 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1370 pi1_original_ping_timestamp;
1371 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1372 pi1_ping_timestamp;
1373 if (!shared()) {
1374 pi1_original_ping_timestamp =
1375 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1376 pi1_event_loop.get(),
1377 "/pi1/aos/remote_timestamps/pi2/original/test/aos-examples-Ping");
1378 pi1_ping_timestamp =
1379 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1380 pi1_event_loop.get(),
1381 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1382 }
1383
1384 log_reader_factory.Run();
1385
1386 EXPECT_EQ(pi1_ping.count(), 0u);
1387 EXPECT_EQ(pi2_ping.count(), 0u);
1388 EXPECT_NE(pi1_original_ping.count(), 0u);
1389 EXPECT_NE(pi2_original_ping.count(), 0u);
1390 if (!shared()) {
1391 EXPECT_NE(pi1_original_ping_timestamp->count(), 0u);
1392 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1393 }
1394
1395 reader.Deregister();
1396}
1397
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001398// Tests that we can rename a forwarded channel as well.
1399TEST_P(MultinodeLoggerTest, RenameForwardedLoggedChannel) {
1400 std::vector<std::string> actual_filenames;
1401 time_converter_.StartEqual();
1402 {
1403 LoggerState pi1_logger = MakeLogger(pi1_);
1404 LoggerState pi2_logger = MakeLogger(pi2_);
1405
1406 event_loop_factory_.RunFor(chrono::milliseconds(95));
1407
1408 StartLogger(&pi1_logger);
1409 StartLogger(&pi2_logger);
1410
1411 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1412
1413 pi1_logger.AppendAllFilenames(&actual_filenames);
1414 pi2_logger.AppendAllFilenames(&actual_filenames);
1415 }
1416
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001417 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1418 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1419 LogReader reader(sorted_parts);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001420
1421 std::vector<MapT> maps;
1422 {
1423 MapT map;
1424 map.match = std::make_unique<ChannelT>();
1425 map.match->name = "/production*";
1426 map.match->source_node = "pi1";
1427 map.rename = std::make_unique<ChannelT>();
1428 map.rename->name = "/pi1/production";
1429 maps.emplace_back(std::move(map));
1430 }
1431 {
1432 MapT map;
1433 map.match = std::make_unique<ChannelT>();
1434 map.match->name = "/production*";
1435 map.match->source_node = "pi2";
1436 map.rename = std::make_unique<ChannelT>();
1437 map.rename->name = "/pi2/production";
1438 maps.emplace_back(std::move(map));
1439 }
1440 reader.RenameLoggedChannel<aos::examples::Ping>(
1441 "/test", configuration::GetNode(reader.configuration(), "pi1"),
1442 "/pi1/production", maps);
1443
1444 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1445 log_reader_factory.set_send_delay(chrono::microseconds(0));
1446
1447 reader.Register(&log_reader_factory);
1448
1449 const Node *pi1 =
1450 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1451 const Node *pi2 =
1452 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1453
1454 // Confirm we can read the data on the renamed channel, on both the source
1455 // node and the remote node. In case of split timestamp channels, confirm that
1456 // we receive the timestamp messages on the renamed channel as well.
1457 std::unique_ptr<EventLoop> pi1_event_loop =
1458 log_reader_factory.MakeEventLoop("test", pi1);
1459 pi1_event_loop->SkipTimingReport();
1460 std::unique_ptr<EventLoop> full_pi1_event_loop =
1461 log_reader_factory.MakeEventLoop("test", pi1);
1462 full_pi1_event_loop->SkipTimingReport();
1463 std::unique_ptr<EventLoop> pi2_event_loop =
1464 log_reader_factory.MakeEventLoop("test", pi2);
1465 pi2_event_loop->SkipTimingReport();
1466
1467 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1468 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1469 MessageCounter<examples::Ping> pi1_renamed_ping(pi1_event_loop.get(),
1470 "/pi1/production");
1471 MessageCounter<examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1472 "/pi1/production");
1473
1474 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1475 pi1_renamed_ping_timestamp;
1476 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1477 pi1_ping_timestamp;
1478 if (!shared()) {
1479 pi1_renamed_ping_timestamp =
1480 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1481 pi1_event_loop.get(),
1482 "/pi1/aos/remote_timestamps/pi2/pi1/production/aos-examples-Ping");
1483 pi1_ping_timestamp =
1484 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1485 pi1_event_loop.get(),
1486 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1487 }
1488
1489 log_reader_factory.Run();
1490
1491 EXPECT_EQ(pi1_ping.count(), 0u);
1492 EXPECT_EQ(pi2_ping.count(), 0u);
1493 EXPECT_NE(pi1_renamed_ping.count(), 0u);
1494 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1495 if (!shared()) {
1496 EXPECT_NE(pi1_renamed_ping_timestamp->count(), 0u);
1497 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1498 }
1499
1500 reader.Deregister();
1501}
1502
Naman Guptaa63aa132023-03-22 20:06:34 -07001503// Tests that we observe all the same events in log replay (for a given node)
1504// whether we just register an event loop for that node or if we register a full
1505// event loop factory.
1506TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
1507 time_converter_.StartEqual();
1508 constexpr chrono::milliseconds kStartupDelay(95);
1509 {
1510 LoggerState pi1_logger = MakeLogger(pi1_);
1511 LoggerState pi2_logger = MakeLogger(pi2_);
1512
1513 event_loop_factory_.RunFor(kStartupDelay);
1514
1515 StartLogger(&pi1_logger);
1516 StartLogger(&pi2_logger);
1517
1518 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1519 }
1520
1521 LogReader full_reader(SortParts(logfiles_));
1522 LogReader single_node_reader(SortParts(logfiles_));
1523
1524 SimulatedEventLoopFactory full_factory(full_reader.configuration());
1525 SimulatedEventLoopFactory single_node_factory(
1526 single_node_reader.configuration());
1527 single_node_factory.SkipTimingReport();
1528 single_node_factory.DisableStatistics();
1529 std::unique_ptr<EventLoop> replay_event_loop =
1530 single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
1531 "log_reader");
1532
1533 full_reader.Register(&full_factory);
1534 single_node_reader.Register(replay_event_loop.get());
1535
1536 const Node *full_pi1 =
1537 configuration::GetNode(full_factory.configuration(), "pi1");
1538
1539 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1540 // else should have moved.
1541 std::unique_ptr<EventLoop> full_event_loop =
1542 full_factory.MakeEventLoop("test", full_pi1);
1543 full_event_loop->SkipTimingReport();
1544 full_event_loop->SkipAosLog();
1545 // maps are indexed on channel index.
1546 // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
1547 std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
1548 observed_messages;
1549 std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
1550 for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
1551 ++ii) {
1552 const Channel *channel =
1553 full_event_loop->configuration()->channels()->Get(ii);
1554 // We currently don't support replaying remote timestamp channels in
1555 // realtime replay (unless the remote timestamp channel was not NOT_LOGGED,
1556 // in which case it gets auto-remapped and replayed on a /original channel).
1557 if (channel->name()->string_view().find("remote_timestamp") !=
1558 std::string_view::npos &&
1559 channel->name()->string_view().find("/original") ==
1560 std::string_view::npos) {
1561 continue;
1562 }
1563 if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
1564 observed_messages[ii] = {};
1565 fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
1566 full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
1567 if (fetchers[ii]->Fetch()) {
1568 observed_messages[ii].push_back(std::make_pair(
1569 fetchers[ii]->context().monotonic_event_time, true));
1570 }
1571 });
1572 full_event_loop->MakeRawNoArgWatcher(
1573 channel, [ii, &observed_messages](const Context &context) {
1574 observed_messages[ii].push_back(
1575 std::make_pair(context.monotonic_event_time, false));
1576 });
1577 }
1578 }
1579
1580 full_factory.Run();
1581 fetchers.clear();
1582 full_reader.Deregister();
1583
1584 const Node *single_node_pi1 =
1585 configuration::GetNode(single_node_factory.configuration(), "pi1");
1586 std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
1587
1588 std::unique_ptr<EventLoop> single_node_event_loop =
1589 single_node_factory.MakeEventLoop("test", single_node_pi1);
1590 single_node_event_loop->SkipTimingReport();
1591 single_node_event_loop->SkipAosLog();
1592 for (size_t ii = 0;
1593 ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
1594 const Channel *channel =
1595 single_node_event_loop->configuration()->channels()->Get(ii);
1596 single_node_factory.DisableForwarding(channel);
1597 if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
1598 single_node_fetchers[ii] =
1599 single_node_event_loop->MakeRawFetcher(channel);
1600 single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
1601 EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
1602 << "Single EventLoop replay doesn't support pre-loading fetchers. "
1603 << configuration::StrippedChannelToString(channel);
1604 });
1605 single_node_event_loop->MakeRawNoArgWatcher(
1606 channel, [ii, &observed_messages, channel,
1607 kStartupDelay](const Context &context) {
1608 if (observed_messages[ii].empty()) {
1609 FAIL() << "Observed extra message at "
1610 << context.monotonic_event_time << " on "
1611 << configuration::StrippedChannelToString(channel);
1612 return;
1613 }
1614 const std::pair<monotonic_clock::time_point, bool> &message =
1615 observed_messages[ii].front();
1616 if (message.second) {
1617 EXPECT_LE(message.first,
1618 context.monotonic_event_time + kStartupDelay)
1619 << "Mismatched message times " << context.monotonic_event_time
1620 << " and " << message.first << " on "
1621 << configuration::StrippedChannelToString(channel);
1622 } else {
1623 EXPECT_EQ(message.first,
1624 context.monotonic_event_time + kStartupDelay)
1625 << "Mismatched message times " << context.monotonic_event_time
1626 << " and " << message.first << " on "
1627 << configuration::StrippedChannelToString(channel);
1628 }
1629 observed_messages[ii].erase(observed_messages[ii].begin());
1630 });
1631 }
1632 }
1633
1634 single_node_factory.Run();
1635
1636 single_node_fetchers.clear();
1637
1638 single_node_reader.Deregister();
1639
1640 for (const auto &pair : observed_messages) {
1641 EXPECT_TRUE(pair.second.empty())
1642 << "Missed " << pair.second.size() << " messages on "
1643 << configuration::StrippedChannelToString(
1644 single_node_event_loop->configuration()->channels()->Get(
1645 pair.first));
1646 }
1647}
1648
1649// Tests that we properly recreate forwarded timestamps when replaying a log.
1650// This should be enough that we can then re-run the logger and get a valid log
1651// back.
1652TEST_P(MultinodeLoggerTest, MessageHeader) {
1653 time_converter_.StartEqual();
1654 {
1655 LoggerState pi1_logger = MakeLogger(pi1_);
1656 LoggerState pi2_logger = MakeLogger(pi2_);
1657
1658 event_loop_factory_.RunFor(chrono::milliseconds(95));
1659
1660 StartLogger(&pi1_logger);
1661 StartLogger(&pi2_logger);
1662
1663 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1664 }
1665
1666 LogReader reader(SortParts(logfiles_));
1667
1668 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1669 log_reader_factory.set_send_delay(chrono::microseconds(0));
1670
1671 // This sends out the fetched messages and advances time to the start of the
1672 // log file.
1673 reader.Register(&log_reader_factory);
1674
1675 const Node *pi1 =
1676 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1677 const Node *pi2 =
1678 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1679
1680 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1681 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1682 LOG(INFO) << "now pi1 "
1683 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1684 LOG(INFO) << "now pi2 "
1685 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1686
1687 EXPECT_THAT(reader.LoggedNodes(),
1688 ::testing::ElementsAre(
1689 configuration::GetNode(reader.logged_configuration(), pi1),
1690 configuration::GetNode(reader.logged_configuration(), pi2)));
1691
1692 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1693
1694 std::unique_ptr<EventLoop> pi1_event_loop =
1695 log_reader_factory.MakeEventLoop("test", pi1);
1696 std::unique_ptr<EventLoop> pi2_event_loop =
1697 log_reader_factory.MakeEventLoop("test", pi2);
1698
1699 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
1700 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1701 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
1702 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1703
1704 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1705 pi1_event_loop->MakeFetcher<examples::Ping>("/test");
1706 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1707 pi2_event_loop->MakeFetcher<examples::Ping>("/test");
1708
1709 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
1710 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1711 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
1712 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1713
1714 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1715 pi2_event_loop->MakeFetcher<examples::Pong>("/test");
1716 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1717 pi1_event_loop->MakeFetcher<examples::Pong>("/test");
1718
1719 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
1720 pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
1721 const size_t ping_timestamp_channel = configuration::ChannelIndex(
1722 pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
1723
1724 const size_t pi2_timestamp_channel = configuration::ChannelIndex(
1725 pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
1726 const size_t pong_timestamp_channel = configuration::ChannelIndex(
1727 pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
1728
1729 const chrono::nanoseconds network_delay = event_loop_factory_.network_delay();
1730 const chrono::nanoseconds send_delay = event_loop_factory_.send_delay();
1731
1732 for (std::pair<int, std::string> channel :
1733 shared()
1734 ? std::vector<
1735 std::pair<int, std::string>>{{-1,
1736 "/aos/remote_timestamps/pi2"}}
1737 : std::vector<std::pair<int, std::string>>{
1738 {pi1_timestamp_channel,
1739 "/aos/remote_timestamps/pi2/pi1/aos/"
1740 "aos-message_bridge-Timestamp"},
1741 {ping_timestamp_channel,
1742 "/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
1743 pi1_event_loop->MakeWatcher(
1744 channel.second,
1745 [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
1746 ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
1747 &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
1748 &ping_on_pi2_fetcher, network_delay, send_delay,
1749 channel_index = channel.first](const RemoteMessage &header) {
1750 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1751 chrono::nanoseconds(header.monotonic_sent_time()));
1752 const aos::realtime_clock::time_point header_realtime_sent_time(
1753 chrono::nanoseconds(header.realtime_sent_time()));
1754 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1755 chrono::nanoseconds(header.monotonic_remote_time()));
1756 const aos::realtime_clock::time_point header_realtime_remote_time(
1757 chrono::nanoseconds(header.realtime_remote_time()));
1758
1759 if (channel_index != -1) {
1760 ASSERT_EQ(channel_index, header.channel_index());
1761 }
1762
1763 const Context *pi1_context = nullptr;
1764 const Context *pi2_context = nullptr;
1765
1766 if (header.channel_index() == pi1_timestamp_channel) {
1767 ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
1768 ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
1769 pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
1770 pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
1771 } else if (header.channel_index() == ping_timestamp_channel) {
1772 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
1773 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
1774 pi1_context = &ping_on_pi1_fetcher.context();
1775 pi2_context = &ping_on_pi2_fetcher.context();
1776 } else {
1777 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1778 << configuration::CleanedChannelToString(
1779 pi1_event_loop->configuration()->channels()->Get(
1780 header.channel_index()));
1781 }
1782
1783 ASSERT_TRUE(header.has_boot_uuid());
1784 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1785 pi2_event_loop->boot_uuid());
1786
1787 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
1788 EXPECT_EQ(pi2_context->remote_queue_index,
1789 header.remote_queue_index());
1790 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
1791
1792 EXPECT_EQ(pi2_context->monotonic_event_time,
1793 header_monotonic_sent_time);
1794 EXPECT_EQ(pi2_context->realtime_event_time,
1795 header_realtime_sent_time);
1796 EXPECT_EQ(pi2_context->realtime_remote_time,
1797 header_realtime_remote_time);
1798 EXPECT_EQ(pi2_context->monotonic_remote_time,
1799 header_monotonic_remote_time);
1800
1801 EXPECT_EQ(pi1_context->realtime_event_time,
1802 header_realtime_remote_time);
1803 EXPECT_EQ(pi1_context->monotonic_event_time,
1804 header_monotonic_remote_time);
1805
1806 // Time estimation isn't perfect, but we know the clocks were
1807 // identical when logged, so we know when this should have come back.
1808 // Confirm we got it when we expected.
1809 EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
1810 pi1_context->monotonic_event_time + 2 * network_delay +
1811 send_delay);
1812 });
1813 }
1814 for (std::pair<int, std::string> channel :
1815 shared()
1816 ? std::vector<
1817 std::pair<int, std::string>>{{-1,
1818 "/aos/remote_timestamps/pi1"}}
1819 : std::vector<std::pair<int, std::string>>{
1820 {pi2_timestamp_channel,
1821 "/aos/remote_timestamps/pi1/pi2/aos/"
1822 "aos-message_bridge-Timestamp"}}) {
1823 pi2_event_loop->MakeWatcher(
1824 channel.second,
1825 [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
1826 pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
1827 &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
1828 &pong_on_pi1_fetcher, network_delay, send_delay,
1829 channel_index = channel.first](const RemoteMessage &header) {
1830 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1831 chrono::nanoseconds(header.monotonic_sent_time()));
1832 const aos::realtime_clock::time_point header_realtime_sent_time(
1833 chrono::nanoseconds(header.realtime_sent_time()));
1834 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1835 chrono::nanoseconds(header.monotonic_remote_time()));
1836 const aos::realtime_clock::time_point header_realtime_remote_time(
1837 chrono::nanoseconds(header.realtime_remote_time()));
1838
1839 if (channel_index != -1) {
1840 ASSERT_EQ(channel_index, header.channel_index());
1841 }
1842
1843 const Context *pi2_context = nullptr;
1844 const Context *pi1_context = nullptr;
1845
1846 if (header.channel_index() == pi2_timestamp_channel) {
1847 ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
1848 ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
1849 pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
1850 pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
1851 } else if (header.channel_index() == pong_timestamp_channel) {
1852 ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
1853 ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
1854 pi2_context = &pong_on_pi2_fetcher.context();
1855 pi1_context = &pong_on_pi1_fetcher.context();
1856 } else {
1857 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1858 << configuration::CleanedChannelToString(
1859 pi2_event_loop->configuration()->channels()->Get(
1860 header.channel_index()));
1861 }
1862
1863 ASSERT_TRUE(header.has_boot_uuid());
1864 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1865 pi1_event_loop->boot_uuid());
1866
1867 EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
1868 EXPECT_EQ(pi1_context->remote_queue_index,
1869 header.remote_queue_index());
1870 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
1871
1872 EXPECT_EQ(pi1_context->monotonic_event_time,
1873 header_monotonic_sent_time);
1874 EXPECT_EQ(pi1_context->realtime_event_time,
1875 header_realtime_sent_time);
1876 EXPECT_EQ(pi1_context->realtime_remote_time,
1877 header_realtime_remote_time);
1878 EXPECT_EQ(pi1_context->monotonic_remote_time,
1879 header_monotonic_remote_time);
1880
1881 EXPECT_EQ(pi2_context->realtime_event_time,
1882 header_realtime_remote_time);
1883 EXPECT_EQ(pi2_context->monotonic_event_time,
1884 header_monotonic_remote_time);
1885
1886 // Time estimation isn't perfect, but we know the clocks were
1887 // identical when logged, so we know when this should have come back.
1888 // Confirm we got it when we expected.
1889 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
1890 pi2_context->monotonic_event_time + 2 * network_delay +
1891 send_delay);
1892 });
1893 }
1894
1895 // And confirm we can re-create a log again, while checking the contents.
1896 {
1897 LoggerState pi1_logger = MakeLogger(
1898 log_reader_factory.GetNodeEventLoopFactory("pi1"), &log_reader_factory);
1899 LoggerState pi2_logger = MakeLogger(
1900 log_reader_factory.GetNodeEventLoopFactory("pi2"), &log_reader_factory);
1901
1902 StartLogger(&pi1_logger, tmp_dir_ + "/relogged1");
1903 StartLogger(&pi2_logger, tmp_dir_ + "/relogged2");
1904
1905 log_reader_factory.Run();
1906 }
1907
1908 reader.Deregister();
1909
1910 // And verify that we can run the LogReader over the relogged files without
1911 // hitting any fatal errors.
1912 {
1913 LogReader relogged_reader(SortParts(MakeLogFiles(
1914 tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 3, 3, true)));
1915 relogged_reader.Register();
1916
1917 relogged_reader.event_loop_factory()->Run();
1918 }
1919 // And confirm that we can read the logged file using the reader's
1920 // configuration.
1921 {
1922 LogReader relogged_reader(
1923 SortParts(MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2",
1924 3, 3, true)),
1925 reader.configuration());
1926 relogged_reader.Register();
1927
1928 relogged_reader.event_loop_factory()->Run();
1929 }
1930}
1931
1932// Tests that we properly populate and extract the logger_start time by setting
1933// up a clock difference between 2 nodes and looking at the resulting parts.
1934TEST_P(MultinodeLoggerTest, LoggerStartTime) {
1935 std::vector<std::string> actual_filenames;
1936 time_converter_.AddMonotonic(
1937 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1938 {
1939 LoggerState pi1_logger = MakeLogger(pi1_);
1940 LoggerState pi2_logger = MakeLogger(pi2_);
1941
1942 StartLogger(&pi1_logger);
1943 StartLogger(&pi2_logger);
1944
1945 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1946
1947 pi1_logger.AppendAllFilenames(&actual_filenames);
1948 pi2_logger.AppendAllFilenames(&actual_filenames);
1949 }
1950
1951 ASSERT_THAT(actual_filenames,
1952 ::testing::UnorderedElementsAreArray(logfiles_));
1953
1954 for (const LogFile &log_file : SortParts(logfiles_)) {
1955 for (const LogParts &log_part : log_file.parts) {
1956 if (log_part.node == log_file.logger_node) {
1957 EXPECT_EQ(log_part.logger_monotonic_start_time,
1958 aos::monotonic_clock::min_time);
1959 EXPECT_EQ(log_part.logger_realtime_start_time,
1960 aos::realtime_clock::min_time);
1961 } else {
1962 const chrono::seconds offset = log_file.logger_node == "pi1"
1963 ? -chrono::seconds(1000)
1964 : chrono::seconds(1000);
1965 EXPECT_EQ(log_part.logger_monotonic_start_time,
1966 log_part.monotonic_start_time + offset);
1967 EXPECT_EQ(log_part.logger_realtime_start_time,
1968 log_file.realtime_start_time +
1969 (log_part.logger_monotonic_start_time -
1970 log_file.monotonic_start_time));
1971 }
1972 }
1973 }
1974}
1975
1976// Test that renaming the base, renames the folder.
1977TEST_P(MultinodeLoggerTest, LoggerRenameFolder) {
1978 util::UnlinkRecursive(tmp_dir_ + "/renamefolder");
1979 util::UnlinkRecursive(tmp_dir_ + "/new-good");
1980 time_converter_.AddMonotonic(
1981 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1982 logfile_base1_ = tmp_dir_ + "/renamefolder/multi_logfile1";
1983 logfile_base2_ = tmp_dir_ + "/renamefolder/multi_logfile2";
1984 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
1985 LoggerState pi1_logger = MakeLogger(pi1_);
1986 LoggerState pi2_logger = MakeLogger(pi2_);
1987
1988 StartLogger(&pi1_logger);
1989 StartLogger(&pi2_logger);
1990
1991 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1992 logfile_base1_ = tmp_dir_ + "/new-good/multi_logfile1";
1993 logfile_base2_ = tmp_dir_ + "/new-good/multi_logfile2";
1994 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
Alexei Strotscaf17d32023-04-03 22:31:11 -07001995
1996 // Sequence of set_base_name and Rotate simulates rename operation. Since
1997 // rename is not supported by all namers, RenameLogBase moved from logger to
1998 // the higher level abstraction, yet log_namers support rename, and it is
1999 // legal to test it here.
2000 pi1_logger.log_namer->set_base_name(logfile_base1_);
2001 pi1_logger.logger->Rotate();
2002 pi2_logger.log_namer->set_base_name(logfile_base2_);
2003 pi2_logger.logger->Rotate();
2004
Naman Guptaa63aa132023-03-22 20:06:34 -07002005 for (auto &file : logfiles_) {
2006 struct stat s;
2007 EXPECT_EQ(0, stat(file.c_str(), &s));
2008 }
2009}
2010
2011// Test that renaming the file base dies.
2012TEST_P(MultinodeLoggerDeathTest, LoggerRenameFile) {
2013 time_converter_.AddMonotonic(
2014 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2015 util::UnlinkRecursive(tmp_dir_ + "/renamefile");
2016 logfile_base1_ = tmp_dir_ + "/renamefile/multi_logfile1";
2017 logfile_base2_ = tmp_dir_ + "/renamefile/multi_logfile2";
2018 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
2019 LoggerState pi1_logger = MakeLogger(pi1_);
2020 StartLogger(&pi1_logger);
2021 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2022 logfile_base1_ = tmp_dir_ + "/new-renamefile/new_multi_logfile1";
Alexei Strotscaf17d32023-04-03 22:31:11 -07002023 EXPECT_DEATH({ pi1_logger.log_namer->set_base_name(logfile_base1_); },
Naman Guptaa63aa132023-03-22 20:06:34 -07002024 "Rename of file base from");
2025}
2026
2027// TODO(austin): We can write a test which recreates a logfile and confirms that
2028// we get it back. That is the ultimate test.
2029
2030// Tests that we properly recreate forwarded timestamps when replaying a log.
2031// This should be enough that we can then re-run the logger and get a valid log
2032// back.
2033TEST_P(MultinodeLoggerTest, RemoteReboot) {
2034 std::vector<std::string> actual_filenames;
2035
2036 const UUID pi1_boot0 = UUID::Random();
2037 const UUID pi2_boot0 = UUID::Random();
2038 const UUID pi2_boot1 = UUID::Random();
2039 {
2040 CHECK_EQ(pi1_index_, 0u);
2041 CHECK_EQ(pi2_index_, 1u);
2042
2043 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2044 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2045 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2046
2047 time_converter_.AddNextTimestamp(
2048 distributed_clock::epoch(),
2049 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2050 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2051 time_converter_.AddNextTimestamp(
2052 distributed_clock::epoch() + reboot_time,
2053 {BootTimestamp::epoch() + reboot_time,
2054 BootTimestamp{
2055 .boot = 1,
2056 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2057 }
2058
2059 {
2060 LoggerState pi1_logger = MakeLogger(pi1_);
2061
2062 event_loop_factory_.RunFor(chrono::milliseconds(95));
2063 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2064 pi1_boot0);
2065 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2066 pi2_boot0);
2067
2068 StartLogger(&pi1_logger);
2069
2070 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2071
2072 VLOG(1) << "Reboot now!";
2073
2074 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2075 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2076 pi1_boot0);
2077 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2078 pi2_boot1);
2079
2080 pi1_logger.AppendAllFilenames(&actual_filenames);
2081 }
2082
2083 std::sort(actual_filenames.begin(), actual_filenames.end());
2084 std::sort(pi1_reboot_logfiles_.begin(), pi1_reboot_logfiles_.end());
2085 ASSERT_THAT(actual_filenames,
2086 ::testing::UnorderedElementsAreArray(pi1_reboot_logfiles_));
2087
2088 // Confirm that our new oldest timestamps properly update as we reboot and
2089 // rotate.
2090 for (const std::string &file : pi1_reboot_logfiles_) {
2091 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2092 ReadHeader(file);
2093 CHECK(log_header);
2094 if (log_header->message().has_configuration()) {
2095 continue;
2096 }
2097
2098 const monotonic_clock::time_point monotonic_start_time =
2099 monotonic_clock::time_point(
2100 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2101 const UUID source_node_boot_uuid = UUID::FromString(
2102 log_header->message().source_node_boot_uuid()->string_view());
2103
2104 if (log_header->message().node()->name()->string_view() != "pi1") {
2105 // The remote message channel should rotate later and have more parts.
2106 // This only is true on the log files with shared remote messages.
2107 //
2108 // TODO(austin): I'm not the most thrilled with this test pattern... It
2109 // feels brittle in a different way.
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002110 if (file.find("source_pi1_timestamp_pi2") == std::string::npos) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002111 switch (log_header->message().parts_index()) {
2112 case 0:
2113 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2114 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2115 break;
2116 case 1:
2117 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2118 ASSERT_EQ(monotonic_start_time,
2119 monotonic_clock::epoch() + chrono::seconds(1));
2120 break;
2121 case 2:
2122 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2123 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2124 break;
2125 case 3:
2126 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2127 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
2128 chrono::nanoseconds(2322999462))
2129 << " on " << file;
2130 break;
2131 default:
2132 FAIL();
2133 break;
2134 }
2135 } else {
2136 switch (log_header->message().parts_index()) {
2137 case 0:
2138 case 1:
2139 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2140 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2141 break;
2142 case 2:
2143 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2144 ASSERT_EQ(monotonic_start_time,
2145 monotonic_clock::epoch() + chrono::seconds(1));
2146 break;
2147 case 3:
2148 case 4:
2149 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2150 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2151 break;
2152 case 5:
2153 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2154 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
2155 chrono::nanoseconds(2322999462))
2156 << " on " << file;
2157 break;
2158 default:
2159 FAIL();
2160 break;
2161 }
2162 }
2163 continue;
2164 }
2165 SCOPED_TRACE(file);
2166 SCOPED_TRACE(aos::FlatbufferToJson(
2167 *log_header, {.multi_line = true, .max_vector_size = 100}));
2168 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2169 ASSERT_EQ(
2170 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2171 EXPECT_EQ(
2172 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2173 monotonic_clock::max_time.time_since_epoch().count());
2174 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2175 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2176 2u);
2177 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2178 monotonic_clock::max_time.time_since_epoch().count());
2179 ASSERT_TRUE(log_header->message()
2180 .has_oldest_remote_unreliable_monotonic_timestamps());
2181 ASSERT_EQ(log_header->message()
2182 .oldest_remote_unreliable_monotonic_timestamps()
2183 ->size(),
2184 2u);
2185 EXPECT_EQ(log_header->message()
2186 .oldest_remote_unreliable_monotonic_timestamps()
2187 ->Get(0),
2188 monotonic_clock::max_time.time_since_epoch().count());
2189 ASSERT_TRUE(log_header->message()
2190 .has_oldest_local_unreliable_monotonic_timestamps());
2191 ASSERT_EQ(log_header->message()
2192 .oldest_local_unreliable_monotonic_timestamps()
2193 ->size(),
2194 2u);
2195 EXPECT_EQ(log_header->message()
2196 .oldest_local_unreliable_monotonic_timestamps()
2197 ->Get(0),
2198 monotonic_clock::max_time.time_since_epoch().count());
2199
2200 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2201 monotonic_clock::time_point(chrono::nanoseconds(
2202 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2203 1)));
2204 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2205 monotonic_clock::time_point(chrono::nanoseconds(
2206 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2207 const monotonic_clock::time_point
2208 oldest_remote_unreliable_monotonic_timestamps =
2209 monotonic_clock::time_point(chrono::nanoseconds(
2210 log_header->message()
2211 .oldest_remote_unreliable_monotonic_timestamps()
2212 ->Get(1)));
2213 const monotonic_clock::time_point
2214 oldest_local_unreliable_monotonic_timestamps =
2215 monotonic_clock::time_point(chrono::nanoseconds(
2216 log_header->message()
2217 .oldest_local_unreliable_monotonic_timestamps()
2218 ->Get(1)));
2219 const monotonic_clock::time_point
2220 oldest_remote_reliable_monotonic_timestamps =
2221 monotonic_clock::time_point(chrono::nanoseconds(
2222 log_header->message()
2223 .oldest_remote_reliable_monotonic_timestamps()
2224 ->Get(1)));
2225 const monotonic_clock::time_point
2226 oldest_local_reliable_monotonic_timestamps =
2227 monotonic_clock::time_point(chrono::nanoseconds(
2228 log_header->message()
2229 .oldest_local_reliable_monotonic_timestamps()
2230 ->Get(1)));
2231 const monotonic_clock::time_point
2232 oldest_logger_remote_unreliable_monotonic_timestamps =
2233 monotonic_clock::time_point(chrono::nanoseconds(
2234 log_header->message()
2235 .oldest_logger_remote_unreliable_monotonic_timestamps()
2236 ->Get(0)));
2237 const monotonic_clock::time_point
2238 oldest_logger_local_unreliable_monotonic_timestamps =
2239 monotonic_clock::time_point(chrono::nanoseconds(
2240 log_header->message()
2241 .oldest_logger_local_unreliable_monotonic_timestamps()
2242 ->Get(0)));
2243 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2244 monotonic_clock::max_time);
2245 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2246 monotonic_clock::max_time);
2247 switch (log_header->message().parts_index()) {
2248 case 0:
2249 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2250 monotonic_clock::max_time);
2251 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2252 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2253 monotonic_clock::max_time);
2254 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2255 monotonic_clock::max_time);
2256 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2257 monotonic_clock::max_time);
2258 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2259 monotonic_clock::max_time);
2260 break;
2261 case 1:
2262 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2263 monotonic_clock::time_point(chrono::microseconds(90200)));
2264 EXPECT_EQ(oldest_local_monotonic_timestamps,
2265 monotonic_clock::time_point(chrono::microseconds(90350)));
2266 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2267 monotonic_clock::time_point(chrono::microseconds(90200)));
2268 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2269 monotonic_clock::time_point(chrono::microseconds(90350)));
2270 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2271 monotonic_clock::max_time);
2272 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2273 monotonic_clock::max_time);
2274 break;
2275 case 2:
2276 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2277 monotonic_clock::time_point(chrono::microseconds(90200)))
2278 << file;
2279 EXPECT_EQ(oldest_local_monotonic_timestamps,
2280 monotonic_clock::time_point(chrono::microseconds(90350)))
2281 << file;
2282 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2283 monotonic_clock::time_point(chrono::microseconds(90200)))
2284 << file;
2285 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2286 monotonic_clock::time_point(chrono::microseconds(90350)))
2287 << file;
2288 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2289 monotonic_clock::time_point(chrono::microseconds(100000)))
2290 << file;
2291 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2292 monotonic_clock::time_point(chrono::microseconds(100150)))
2293 << file;
2294 break;
2295 case 3:
2296 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2297 monotonic_clock::time_point(chrono::milliseconds(1323) +
2298 chrono::microseconds(200)));
2299 EXPECT_EQ(oldest_local_monotonic_timestamps,
2300 monotonic_clock::time_point(chrono::microseconds(10100350)));
2301 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2302 monotonic_clock::time_point(chrono::milliseconds(1323) +
2303 chrono::microseconds(200)));
2304 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2305 monotonic_clock::time_point(chrono::microseconds(10100350)));
2306 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2307 monotonic_clock::max_time)
2308 << file;
2309 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2310 monotonic_clock::max_time)
2311 << file;
2312 break;
2313 case 4:
2314 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2315 monotonic_clock::time_point(chrono::milliseconds(1323) +
2316 chrono::microseconds(200)));
2317 EXPECT_EQ(oldest_local_monotonic_timestamps,
2318 monotonic_clock::time_point(chrono::microseconds(10100350)));
2319 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2320 monotonic_clock::time_point(chrono::milliseconds(1323) +
2321 chrono::microseconds(200)));
2322 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2323 monotonic_clock::time_point(chrono::microseconds(10100350)));
2324 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2325 monotonic_clock::time_point(chrono::microseconds(1423000)))
2326 << file;
2327 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2328 monotonic_clock::time_point(chrono::microseconds(10200150)))
2329 << file;
2330 break;
2331 default:
2332 FAIL();
2333 break;
2334 }
2335 }
2336
2337 // Confirm that we refuse to replay logs with missing boot uuids.
2338 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002339 auto sorted_parts = SortParts(pi1_reboot_logfiles_);
2340 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2341 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002342
2343 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2344 log_reader_factory.set_send_delay(chrono::microseconds(0));
2345
2346 // This sends out the fetched messages and advances time to the start of
2347 // the log file.
2348 reader.Register(&log_reader_factory);
2349
2350 log_reader_factory.Run();
2351
2352 reader.Deregister();
2353 }
2354}
2355
2356// Tests that we can sort a log which only has timestamps from the remote
2357// because the local message_bridge_client failed to connect.
2358TEST_P(MultinodeLoggerTest, RemoteRebootOnlyTimestamps) {
2359 const UUID pi1_boot0 = UUID::Random();
2360 const UUID pi2_boot0 = UUID::Random();
2361 const UUID pi2_boot1 = UUID::Random();
2362 {
2363 CHECK_EQ(pi1_index_, 0u);
2364 CHECK_EQ(pi2_index_, 1u);
2365
2366 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2367 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2368 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2369
2370 time_converter_.AddNextTimestamp(
2371 distributed_clock::epoch(),
2372 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2373 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2374 time_converter_.AddNextTimestamp(
2375 distributed_clock::epoch() + reboot_time,
2376 {BootTimestamp::epoch() + reboot_time,
2377 BootTimestamp{
2378 .boot = 1,
2379 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2380 }
2381 pi2_->Disconnect(pi1_->node());
2382
2383 std::vector<std::string> filenames;
2384 {
2385 LoggerState pi1_logger = MakeLogger(pi1_);
2386
2387 event_loop_factory_.RunFor(chrono::milliseconds(95));
2388 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2389 pi1_boot0);
2390 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2391 pi2_boot0);
2392
2393 StartLogger(&pi1_logger);
2394
2395 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2396
2397 VLOG(1) << "Reboot now!";
2398
2399 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2400 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2401 pi1_boot0);
2402 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2403 pi2_boot1);
2404 pi1_logger.AppendAllFilenames(&filenames);
2405 }
2406
2407 std::sort(filenames.begin(), filenames.end());
2408
2409 // Confirm that our new oldest timestamps properly update as we reboot and
2410 // rotate.
2411 size_t timestamp_file_count = 0;
2412 for (const std::string &file : filenames) {
2413 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2414 ReadHeader(file);
2415 CHECK(log_header);
2416
2417 if (log_header->message().has_configuration()) {
2418 continue;
2419 }
2420
2421 const monotonic_clock::time_point monotonic_start_time =
2422 monotonic_clock::time_point(
2423 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2424 const UUID source_node_boot_uuid = UUID::FromString(
2425 log_header->message().source_node_boot_uuid()->string_view());
2426
2427 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2428 ASSERT_EQ(
2429 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2430 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2431 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2432 2u);
2433 ASSERT_TRUE(log_header->message()
2434 .has_oldest_remote_unreliable_monotonic_timestamps());
2435 ASSERT_EQ(log_header->message()
2436 .oldest_remote_unreliable_monotonic_timestamps()
2437 ->size(),
2438 2u);
2439 ASSERT_TRUE(log_header->message()
2440 .has_oldest_local_unreliable_monotonic_timestamps());
2441 ASSERT_EQ(log_header->message()
2442 .oldest_local_unreliable_monotonic_timestamps()
2443 ->size(),
2444 2u);
2445 ASSERT_TRUE(log_header->message()
2446 .has_oldest_remote_reliable_monotonic_timestamps());
2447 ASSERT_EQ(log_header->message()
2448 .oldest_remote_reliable_monotonic_timestamps()
2449 ->size(),
2450 2u);
2451 ASSERT_TRUE(
2452 log_header->message().has_oldest_local_reliable_monotonic_timestamps());
2453 ASSERT_EQ(log_header->message()
2454 .oldest_local_reliable_monotonic_timestamps()
2455 ->size(),
2456 2u);
2457
2458 ASSERT_TRUE(
2459 log_header->message()
2460 .has_oldest_logger_remote_unreliable_monotonic_timestamps());
2461 ASSERT_EQ(log_header->message()
2462 .oldest_logger_remote_unreliable_monotonic_timestamps()
2463 ->size(),
2464 2u);
2465 ASSERT_TRUE(log_header->message()
2466 .has_oldest_logger_local_unreliable_monotonic_timestamps());
2467 ASSERT_EQ(log_header->message()
2468 .oldest_logger_local_unreliable_monotonic_timestamps()
2469 ->size(),
2470 2u);
2471
2472 if (log_header->message().node()->name()->string_view() != "pi1") {
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002473 ASSERT_TRUE(file.find("source_pi1_timestamp_pi2") != std::string::npos);
Naman Guptaa63aa132023-03-22 20:06:34 -07002474
2475 const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
2476 ReadNthMessage(file, 0);
2477 CHECK(msg);
2478
2479 EXPECT_TRUE(msg->message().has_monotonic_sent_time());
2480 EXPECT_TRUE(msg->message().has_monotonic_remote_time());
2481
2482 const monotonic_clock::time_point
2483 expected_oldest_local_monotonic_timestamps(
2484 chrono::nanoseconds(msg->message().monotonic_sent_time()));
2485 const monotonic_clock::time_point
2486 expected_oldest_remote_monotonic_timestamps(
2487 chrono::nanoseconds(msg->message().monotonic_remote_time()));
2488 const monotonic_clock::time_point
2489 expected_oldest_timestamp_monotonic_timestamps(
2490 chrono::nanoseconds(msg->message().monotonic_timestamp_time()));
2491
2492 EXPECT_NE(expected_oldest_local_monotonic_timestamps,
2493 monotonic_clock::min_time);
2494 EXPECT_NE(expected_oldest_remote_monotonic_timestamps,
2495 monotonic_clock::min_time);
2496 EXPECT_NE(expected_oldest_timestamp_monotonic_timestamps,
2497 monotonic_clock::min_time);
2498
2499 ++timestamp_file_count;
2500 // Since the log file is from the perspective of the other node,
2501 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2502 monotonic_clock::time_point(chrono::nanoseconds(
2503 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2504 0)));
2505 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2506 monotonic_clock::time_point(chrono::nanoseconds(
2507 log_header->message().oldest_local_monotonic_timestamps()->Get(
2508 0)));
2509 const monotonic_clock::time_point
2510 oldest_remote_unreliable_monotonic_timestamps =
2511 monotonic_clock::time_point(chrono::nanoseconds(
2512 log_header->message()
2513 .oldest_remote_unreliable_monotonic_timestamps()
2514 ->Get(0)));
2515 const monotonic_clock::time_point
2516 oldest_local_unreliable_monotonic_timestamps =
2517 monotonic_clock::time_point(chrono::nanoseconds(
2518 log_header->message()
2519 .oldest_local_unreliable_monotonic_timestamps()
2520 ->Get(0)));
2521 const monotonic_clock::time_point
2522 oldest_remote_reliable_monotonic_timestamps =
2523 monotonic_clock::time_point(chrono::nanoseconds(
2524 log_header->message()
2525 .oldest_remote_reliable_monotonic_timestamps()
2526 ->Get(0)));
2527 const monotonic_clock::time_point
2528 oldest_local_reliable_monotonic_timestamps =
2529 monotonic_clock::time_point(chrono::nanoseconds(
2530 log_header->message()
2531 .oldest_local_reliable_monotonic_timestamps()
2532 ->Get(0)));
2533 const monotonic_clock::time_point
2534 oldest_logger_remote_unreliable_monotonic_timestamps =
2535 monotonic_clock::time_point(chrono::nanoseconds(
2536 log_header->message()
2537 .oldest_logger_remote_unreliable_monotonic_timestamps()
2538 ->Get(1)));
2539 const monotonic_clock::time_point
2540 oldest_logger_local_unreliable_monotonic_timestamps =
2541 monotonic_clock::time_point(chrono::nanoseconds(
2542 log_header->message()
2543 .oldest_logger_local_unreliable_monotonic_timestamps()
2544 ->Get(1)));
2545
2546 const Channel *channel =
2547 event_loop_factory_.configuration()->channels()->Get(
2548 msg->message().channel_index());
2549 const Connection *connection = configuration::ConnectionToNode(
2550 channel, configuration::GetNode(
2551 event_loop_factory_.configuration(),
2552 log_header->message().node()->name()->string_view()));
2553
2554 const bool reliable = connection->time_to_live() == 0;
2555
2556 SCOPED_TRACE(file);
2557 SCOPED_TRACE(aos::FlatbufferToJson(
2558 *log_header, {.multi_line = true, .max_vector_size = 100}));
2559
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002560 // Confirm that the oldest timestamps match what we expect. Based on
2561 // what we are doing, we know that the oldest time is the first
2562 // message's time.
2563 //
2564 // This makes the test robust to both the split and combined config
2565 // tests.
2566 switch (log_header->message().parts_index()) {
2567 case 0:
2568 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2569 expected_oldest_remote_monotonic_timestamps);
2570 EXPECT_EQ(oldest_local_monotonic_timestamps,
2571 expected_oldest_local_monotonic_timestamps);
2572 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2573 expected_oldest_local_monotonic_timestamps)
2574 << file;
2575 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2576 expected_oldest_timestamp_monotonic_timestamps)
2577 << file;
2578
2579 if (reliable) {
2580 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
Naman Guptaa63aa132023-03-22 20:06:34 -07002581 expected_oldest_remote_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002582 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
Naman Guptaa63aa132023-03-22 20:06:34 -07002583 expected_oldest_local_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002584 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2585 monotonic_clock::max_time);
2586 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2587 monotonic_clock::max_time);
2588 } else {
2589 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2590 monotonic_clock::max_time);
2591 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2592 monotonic_clock::max_time);
Naman Guptaa63aa132023-03-22 20:06:34 -07002593 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2594 expected_oldest_remote_monotonic_timestamps);
2595 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2596 expected_oldest_local_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002597 }
2598 break;
2599 case 1:
2600 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2601 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2602 EXPECT_EQ(oldest_local_monotonic_timestamps,
2603 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2604 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2605 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2606 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2607 monotonic_clock::epoch() + chrono::nanoseconds(90250000));
2608 if (reliable) {
2609 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2610 expected_oldest_remote_monotonic_timestamps);
2611 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2612 expected_oldest_local_monotonic_timestamps);
2613 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2614 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2615 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2616 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2617 } else {
2618 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2619 monotonic_clock::max_time);
2620 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2621 monotonic_clock::max_time);
2622 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2623 expected_oldest_remote_monotonic_timestamps);
2624 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2625 expected_oldest_local_monotonic_timestamps);
2626 }
2627 break;
2628 case 2:
2629 EXPECT_EQ(
2630 oldest_remote_monotonic_timestamps,
2631 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2632 EXPECT_EQ(oldest_local_monotonic_timestamps,
2633 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2634 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2635 expected_oldest_local_monotonic_timestamps)
2636 << file;
2637 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2638 expected_oldest_timestamp_monotonic_timestamps)
2639 << file;
2640 if (reliable) {
2641 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2642 expected_oldest_remote_monotonic_timestamps);
2643 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2644 expected_oldest_local_monotonic_timestamps);
2645 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2646 monotonic_clock::max_time);
2647 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2648 monotonic_clock::max_time);
2649 } else {
2650 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2651 monotonic_clock::max_time);
2652 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2653 monotonic_clock::max_time);
2654 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2655 expected_oldest_remote_monotonic_timestamps);
2656 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2657 expected_oldest_local_monotonic_timestamps);
2658 }
2659 break;
Naman Guptaa63aa132023-03-22 20:06:34 -07002660
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002661 case 3:
2662 EXPECT_EQ(
2663 oldest_remote_monotonic_timestamps,
2664 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2665 EXPECT_EQ(oldest_local_monotonic_timestamps,
2666 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2667 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2668 expected_oldest_remote_monotonic_timestamps);
2669 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2670 expected_oldest_local_monotonic_timestamps);
2671 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2672 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2673 EXPECT_EQ(
2674 oldest_logger_local_unreliable_monotonic_timestamps,
2675 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2676 break;
2677 default:
2678 FAIL();
2679 break;
Naman Guptaa63aa132023-03-22 20:06:34 -07002680 }
2681
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002682 switch (log_header->message().parts_index()) {
2683 case 0:
2684 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2685 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2686 break;
2687 case 1:
2688 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2689 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2690 break;
2691 case 2:
2692 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2693 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2694 break;
2695 case 3:
2696 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2697 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2698 break;
2699 [[fallthrough]];
2700 default:
2701 FAIL();
2702 break;
2703 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002704 continue;
2705 }
2706 EXPECT_EQ(
2707 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2708 monotonic_clock::max_time.time_since_epoch().count());
2709 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2710 monotonic_clock::max_time.time_since_epoch().count());
2711 EXPECT_EQ(log_header->message()
2712 .oldest_remote_unreliable_monotonic_timestamps()
2713 ->Get(0),
2714 monotonic_clock::max_time.time_since_epoch().count());
2715 EXPECT_EQ(log_header->message()
2716 .oldest_local_unreliable_monotonic_timestamps()
2717 ->Get(0),
2718 monotonic_clock::max_time.time_since_epoch().count());
2719
2720 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2721 monotonic_clock::time_point(chrono::nanoseconds(
2722 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2723 1)));
2724 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2725 monotonic_clock::time_point(chrono::nanoseconds(
2726 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2727 const monotonic_clock::time_point
2728 oldest_remote_unreliable_monotonic_timestamps =
2729 monotonic_clock::time_point(chrono::nanoseconds(
2730 log_header->message()
2731 .oldest_remote_unreliable_monotonic_timestamps()
2732 ->Get(1)));
2733 const monotonic_clock::time_point
2734 oldest_local_unreliable_monotonic_timestamps =
2735 monotonic_clock::time_point(chrono::nanoseconds(
2736 log_header->message()
2737 .oldest_local_unreliable_monotonic_timestamps()
2738 ->Get(1)));
2739 switch (log_header->message().parts_index()) {
2740 case 0:
2741 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2742 monotonic_clock::max_time);
2743 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2744 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2745 monotonic_clock::max_time);
2746 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2747 monotonic_clock::max_time);
2748 break;
2749 default:
2750 FAIL();
2751 break;
2752 }
2753 }
2754
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002755 EXPECT_EQ(timestamp_file_count, 4u);
Naman Guptaa63aa132023-03-22 20:06:34 -07002756
2757 // Confirm that we can actually sort the resulting log and read it.
2758 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002759 auto sorted_parts = SortParts(filenames);
2760 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2761 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002762
2763 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2764 log_reader_factory.set_send_delay(chrono::microseconds(0));
2765
2766 // This sends out the fetched messages and advances time to the start of
2767 // the log file.
2768 reader.Register(&log_reader_factory);
2769
2770 log_reader_factory.Run();
2771
2772 reader.Deregister();
2773 }
2774}
2775
2776// Tests that we properly handle one direction of message_bridge being
2777// unavailable.
2778TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
2779 pi1_->Disconnect(pi2_->node());
2780 time_converter_.AddMonotonic(
2781 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2782
2783 time_converter_.AddMonotonic(
2784 {chrono::milliseconds(10000),
2785 chrono::milliseconds(10000) - chrono::milliseconds(1)});
2786 {
2787 LoggerState pi1_logger = MakeLogger(pi1_);
2788
2789 event_loop_factory_.RunFor(chrono::milliseconds(95));
2790
2791 StartLogger(&pi1_logger);
2792
2793 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2794 }
2795
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002796 // Confirm that we can parse the result. LogReader has enough internal
2797 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07002798 ConfirmReadable(pi1_single_direction_logfiles_);
2799}
2800
2801// Tests that we properly handle one direction of message_bridge being
2802// unavailable.
2803TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
2804 pi1_->Disconnect(pi2_->node());
2805 time_converter_.AddMonotonic(
2806 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(500)});
2807
2808 time_converter_.AddMonotonic(
2809 {chrono::milliseconds(10000),
2810 chrono::milliseconds(10000) + chrono::milliseconds(1)});
2811 {
2812 LoggerState pi1_logger = MakeLogger(pi1_);
2813
2814 event_loop_factory_.RunFor(chrono::milliseconds(95));
2815
2816 StartLogger(&pi1_logger);
2817
2818 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2819 }
2820
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002821 // Confirm that we can parse the result. LogReader has enough internal
2822 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07002823 ConfirmReadable(pi1_single_direction_logfiles_);
2824}
2825
2826// Tests that we explode if someone passes in a part file twice with a better
2827// error than an out of order error.
2828TEST_P(MultinodeLoggerTest, DuplicateLogFiles) {
2829 time_converter_.AddMonotonic(
2830 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2831 {
2832 LoggerState pi1_logger = MakeLogger(pi1_);
2833
2834 event_loop_factory_.RunFor(chrono::milliseconds(95));
2835
2836 StartLogger(&pi1_logger);
2837
2838 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2839 }
2840
2841 std::vector<std::string> duplicates;
2842 for (const std::string &f : pi1_single_direction_logfiles_) {
2843 duplicates.emplace_back(f);
2844 duplicates.emplace_back(f);
2845 }
2846 EXPECT_DEATH({ SortParts(duplicates); }, "Found duplicate parts in");
2847}
2848
2849// Tests that we explode if someone loses a part out of the middle of a log.
2850TEST_P(MultinodeLoggerTest, MissingPartsFromMiddle) {
2851 time_converter_.AddMonotonic(
2852 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2853 {
2854 LoggerState pi1_logger = MakeLogger(pi1_);
2855
2856 event_loop_factory_.RunFor(chrono::milliseconds(95));
2857
2858 StartLogger(&pi1_logger);
2859 aos::monotonic_clock::time_point last_rotation_time =
2860 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07002861 pi1_logger.logger->set_on_logged_period(
2862 [&](aos::monotonic_clock::time_point) {
2863 const auto now = pi1_logger.event_loop->monotonic_now();
2864 if (now > last_rotation_time + std::chrono::seconds(5)) {
2865 pi1_logger.logger->Rotate();
2866 last_rotation_time = now;
2867 }
2868 });
Naman Guptaa63aa132023-03-22 20:06:34 -07002869
2870 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2871 }
2872
2873 std::vector<std::string> missing_parts;
2874
2875 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
2876 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part2" + Extension());
2877 missing_parts.emplace_back(absl::StrCat(
2878 logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
2879
2880 EXPECT_DEATH({ SortParts(missing_parts); },
2881 "Broken log, missing part files between");
2882}
2883
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002884// Tests that we properly handle a dead node. Do this by just disconnecting
2885// it and only using one nodes of logs.
Naman Guptaa63aa132023-03-22 20:06:34 -07002886TEST_P(MultinodeLoggerTest, DeadNode) {
2887 pi1_->Disconnect(pi2_->node());
2888 pi2_->Disconnect(pi1_->node());
2889 time_converter_.AddMonotonic(
2890 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2891 {
2892 LoggerState pi1_logger = MakeLogger(pi1_);
2893
2894 event_loop_factory_.RunFor(chrono::milliseconds(95));
2895
2896 StartLogger(&pi1_logger);
2897
2898 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2899 }
2900
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002901 // Confirm that we can parse the result. LogReader has enough internal
2902 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07002903 ConfirmReadable(MakePi1DeadNodeLogfiles());
2904}
2905
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002906// Tests that we can relog with a different config. This makes most sense
2907// when you are trying to edit a log and want to use channel renaming + the
2908// original config in the new log.
Naman Guptaa63aa132023-03-22 20:06:34 -07002909TEST_P(MultinodeLoggerTest, LogDifferentConfig) {
2910 time_converter_.StartEqual();
2911 {
2912 LoggerState pi1_logger = MakeLogger(pi1_);
2913 LoggerState pi2_logger = MakeLogger(pi2_);
2914
2915 event_loop_factory_.RunFor(chrono::milliseconds(95));
2916
2917 StartLogger(&pi1_logger);
2918 StartLogger(&pi2_logger);
2919
2920 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2921 }
2922
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002923 auto sorted_parts = SortParts(logfiles_);
2924 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2925 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002926 reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
2927
2928 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2929 log_reader_factory.set_send_delay(chrono::microseconds(0));
2930
2931 // This sends out the fetched messages and advances time to the start of the
2932 // log file.
2933 reader.Register(&log_reader_factory);
2934
2935 const Node *pi1 =
2936 configuration::GetNode(log_reader_factory.configuration(), "pi1");
2937 const Node *pi2 =
2938 configuration::GetNode(log_reader_factory.configuration(), "pi2");
2939
2940 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
2941 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
2942 LOG(INFO) << "now pi1 "
2943 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
2944 LOG(INFO) << "now pi2 "
2945 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
2946
2947 EXPECT_THAT(reader.LoggedNodes(),
2948 ::testing::ElementsAre(
2949 configuration::GetNode(reader.logged_configuration(), pi1),
2950 configuration::GetNode(reader.logged_configuration(), pi2)));
2951
2952 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
2953
2954 // And confirm we can re-create a log again, while checking the contents.
2955 std::vector<std::string> log_files;
2956 {
2957 LoggerState pi1_logger =
2958 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
2959 &log_reader_factory, reader.logged_configuration());
2960 LoggerState pi2_logger =
2961 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
2962 &log_reader_factory, reader.logged_configuration());
2963
2964 pi1_logger.StartLogger(tmp_dir_ + "/relogged1");
2965 pi2_logger.StartLogger(tmp_dir_ + "/relogged2");
2966
2967 log_reader_factory.Run();
2968
2969 for (auto &x : pi1_logger.log_namer->all_filenames()) {
2970 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged1_", x));
2971 }
2972 for (auto &x : pi2_logger.log_namer->all_filenames()) {
2973 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged2_", x));
2974 }
2975 }
2976
2977 reader.Deregister();
2978
2979 // And verify that we can run the LogReader over the relogged files without
2980 // hitting any fatal errors.
2981 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002982 auto sorted_parts = SortParts(log_files);
2983 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2984 LogReader relogged_reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002985 relogged_reader.Register();
2986
2987 relogged_reader.event_loop_factory()->Run();
2988 }
2989}
2990
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002991// Tests that we properly replay a log where the start time for a node is
2992// before any data on the node. This can happen if the logger starts before
2993// data is published. While the scenario below is a bit convoluted, we have
2994// seen logs like this generated out in the wild.
Naman Guptaa63aa132023-03-22 20:06:34 -07002995TEST(MultinodeRebootLoggerTest, StartTimeBeforeData) {
2996 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
2997 aos::configuration::ReadConfig(ArtifactPath(
2998 "aos/events/logging/multinode_pingpong_split3_config.json"));
2999 message_bridge::TestingTimeConverter time_converter(
3000 configuration::NodesCount(&config.message()));
3001 SimulatedEventLoopFactory event_loop_factory(&config.message());
3002 event_loop_factory.SetTimeConverter(&time_converter);
3003 NodeEventLoopFactory *const pi1 =
3004 event_loop_factory.GetNodeEventLoopFactory("pi1");
3005 const size_t pi1_index = configuration::GetNodeIndex(
3006 event_loop_factory.configuration(), pi1->node());
3007 NodeEventLoopFactory *const pi2 =
3008 event_loop_factory.GetNodeEventLoopFactory("pi2");
3009 const size_t pi2_index = configuration::GetNodeIndex(
3010 event_loop_factory.configuration(), pi2->node());
3011 NodeEventLoopFactory *const pi3 =
3012 event_loop_factory.GetNodeEventLoopFactory("pi3");
3013 const size_t pi3_index = configuration::GetNodeIndex(
3014 event_loop_factory.configuration(), pi3->node());
3015
3016 const std::string kLogfile1_1 =
3017 aos::testing::TestTmpDir() + "/multi_logfile1/";
3018 const std::string kLogfile2_1 =
3019 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3020 const std::string kLogfile2_2 =
3021 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3022 const std::string kLogfile3_1 =
3023 aos::testing::TestTmpDir() + "/multi_logfile3/";
3024 util::UnlinkRecursive(kLogfile1_1);
3025 util::UnlinkRecursive(kLogfile2_1);
3026 util::UnlinkRecursive(kLogfile2_2);
3027 util::UnlinkRecursive(kLogfile3_1);
3028 const UUID pi1_boot0 = UUID::Random();
3029 const UUID pi2_boot0 = UUID::Random();
3030 const UUID pi2_boot1 = UUID::Random();
3031 const UUID pi3_boot0 = UUID::Random();
3032 {
3033 CHECK_EQ(pi1_index, 0u);
3034 CHECK_EQ(pi2_index, 1u);
3035 CHECK_EQ(pi3_index, 2u);
3036
3037 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3038 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3039 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3040 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3041
3042 time_converter.AddNextTimestamp(
3043 distributed_clock::epoch(),
3044 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3045 BootTimestamp::epoch()});
3046 const chrono::nanoseconds reboot_time = chrono::milliseconds(20000);
3047 time_converter.AddNextTimestamp(
3048 distributed_clock::epoch() + reboot_time,
3049 {BootTimestamp::epoch() + reboot_time,
3050 BootTimestamp{
3051 .boot = 1,
3052 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)},
3053 BootTimestamp::epoch() + reboot_time});
3054 }
3055
3056 // Make everything perfectly quiet.
3057 event_loop_factory.SkipTimingReport();
3058 event_loop_factory.DisableStatistics();
3059
3060 std::vector<std::string> filenames;
3061 {
3062 LoggerState pi1_logger = MakeLoggerState(
3063 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3064 LoggerState pi3_logger = MakeLoggerState(
3065 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3066 {
3067 // And now start the logger.
3068 LoggerState pi2_logger = MakeLoggerState(
3069 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3070
3071 event_loop_factory.RunFor(chrono::milliseconds(1000));
3072
3073 pi1_logger.StartLogger(kLogfile1_1);
3074 pi3_logger.StartLogger(kLogfile3_1);
3075 pi2_logger.StartLogger(kLogfile2_1);
3076
3077 event_loop_factory.RunFor(chrono::milliseconds(10000));
3078
3079 // Now that we've got a start time in the past, turn on data.
3080 event_loop_factory.EnableStatistics();
3081 std::unique_ptr<aos::EventLoop> ping_event_loop =
3082 pi1->MakeEventLoop("ping");
3083 Ping ping(ping_event_loop.get());
3084
3085 pi2->AlwaysStart<Pong>("pong");
3086
3087 event_loop_factory.RunFor(chrono::milliseconds(3000));
3088
3089 pi2_logger.AppendAllFilenames(&filenames);
3090
3091 // Stop logging on pi2 before rebooting and completely shut off all
3092 // messages on pi2.
3093 pi2->DisableStatistics();
3094 pi1->Disconnect(pi2->node());
3095 pi2->Disconnect(pi1->node());
3096 }
3097 event_loop_factory.RunFor(chrono::milliseconds(7000));
3098 // pi2 now reboots.
3099 {
3100 event_loop_factory.RunFor(chrono::milliseconds(1000));
3101
3102 // Start logging again on pi2 after it is up.
3103 LoggerState pi2_logger = MakeLoggerState(
3104 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3105 pi2_logger.StartLogger(kLogfile2_2);
3106
3107 event_loop_factory.RunFor(chrono::milliseconds(10000));
3108 // And, now that we have a start time in the log, turn data back on.
3109 pi2->EnableStatistics();
3110 pi1->Connect(pi2->node());
3111 pi2->Connect(pi1->node());
3112
3113 pi2->AlwaysStart<Pong>("pong");
3114 std::unique_ptr<aos::EventLoop> ping_event_loop =
3115 pi1->MakeEventLoop("ping");
3116 Ping ping(ping_event_loop.get());
3117
3118 event_loop_factory.RunFor(chrono::milliseconds(3000));
3119
3120 pi2_logger.AppendAllFilenames(&filenames);
3121 }
3122
3123 pi1_logger.AppendAllFilenames(&filenames);
3124 pi3_logger.AppendAllFilenames(&filenames);
3125 }
3126
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003127 // Confirm that we can parse the result. LogReader has enough internal
3128 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003129 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003130 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003131 auto result = ConfirmReadable(filenames);
3132 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
3133 chrono::seconds(1)));
3134 EXPECT_THAT(result[0].second,
3135 ::testing::ElementsAre(realtime_clock::epoch() +
3136 chrono::microseconds(34990350)));
3137
3138 EXPECT_THAT(result[1].first,
3139 ::testing::ElementsAre(
3140 realtime_clock::epoch() + chrono::seconds(1),
3141 realtime_clock::epoch() + chrono::microseconds(3323000)));
3142 EXPECT_THAT(result[1].second,
3143 ::testing::ElementsAre(
3144 realtime_clock::epoch() + chrono::microseconds(13990200),
3145 realtime_clock::epoch() + chrono::microseconds(16313200)));
3146
3147 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
3148 chrono::seconds(1)));
3149 EXPECT_THAT(result[2].second,
3150 ::testing::ElementsAre(realtime_clock::epoch() +
3151 chrono::microseconds(34900150)));
3152}
3153
3154// Tests that local data before remote data after reboot is properly replayed.
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003155// We only trigger a reboot in the timestamp interpolation function when
3156// solving the timestamp problem when we actually have a point in the
3157// function. This originally only happened when a point passes the noncausal
3158// filter. At the start of time for the second boot, if we aren't careful, we
3159// will have messages which need to be published at times before the boot.
3160// This happens when a local message is in the log before a forwarded message,
3161// so there is no point in the interpolation function. This delays the
3162// reboot. So, we need to recreate that situation and make sure it doesn't
3163// come back.
Naman Guptaa63aa132023-03-22 20:06:34 -07003164TEST(MultinodeRebootLoggerTest,
3165 LocalMessageBeforeRemoteBeforeStartAfterReboot) {
3166 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3167 aos::configuration::ReadConfig(ArtifactPath(
3168 "aos/events/logging/multinode_pingpong_split3_config.json"));
3169 message_bridge::TestingTimeConverter time_converter(
3170 configuration::NodesCount(&config.message()));
3171 SimulatedEventLoopFactory event_loop_factory(&config.message());
3172 event_loop_factory.SetTimeConverter(&time_converter);
3173 NodeEventLoopFactory *const pi1 =
3174 event_loop_factory.GetNodeEventLoopFactory("pi1");
3175 const size_t pi1_index = configuration::GetNodeIndex(
3176 event_loop_factory.configuration(), pi1->node());
3177 NodeEventLoopFactory *const pi2 =
3178 event_loop_factory.GetNodeEventLoopFactory("pi2");
3179 const size_t pi2_index = configuration::GetNodeIndex(
3180 event_loop_factory.configuration(), pi2->node());
3181 NodeEventLoopFactory *const pi3 =
3182 event_loop_factory.GetNodeEventLoopFactory("pi3");
3183 const size_t pi3_index = configuration::GetNodeIndex(
3184 event_loop_factory.configuration(), pi3->node());
3185
3186 const std::string kLogfile1_1 =
3187 aos::testing::TestTmpDir() + "/multi_logfile1/";
3188 const std::string kLogfile2_1 =
3189 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3190 const std::string kLogfile2_2 =
3191 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3192 const std::string kLogfile3_1 =
3193 aos::testing::TestTmpDir() + "/multi_logfile3/";
3194 util::UnlinkRecursive(kLogfile1_1);
3195 util::UnlinkRecursive(kLogfile2_1);
3196 util::UnlinkRecursive(kLogfile2_2);
3197 util::UnlinkRecursive(kLogfile3_1);
3198 const UUID pi1_boot0 = UUID::Random();
3199 const UUID pi2_boot0 = UUID::Random();
3200 const UUID pi2_boot1 = UUID::Random();
3201 const UUID pi3_boot0 = UUID::Random();
3202 {
3203 CHECK_EQ(pi1_index, 0u);
3204 CHECK_EQ(pi2_index, 1u);
3205 CHECK_EQ(pi3_index, 2u);
3206
3207 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3208 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3209 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3210 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3211
3212 time_converter.AddNextTimestamp(
3213 distributed_clock::epoch(),
3214 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3215 BootTimestamp::epoch()});
3216 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3217 time_converter.AddNextTimestamp(
3218 distributed_clock::epoch() + reboot_time,
3219 {BootTimestamp::epoch() + reboot_time,
3220 BootTimestamp{.boot = 1,
3221 .time = monotonic_clock::epoch() + reboot_time +
3222 chrono::seconds(100)},
3223 BootTimestamp::epoch() + reboot_time});
3224 }
3225
3226 std::vector<std::string> filenames;
3227 {
3228 LoggerState pi1_logger = MakeLoggerState(
3229 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3230 LoggerState pi3_logger = MakeLoggerState(
3231 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3232 {
3233 // And now start the logger.
3234 LoggerState pi2_logger = MakeLoggerState(
3235 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3236
3237 pi1_logger.StartLogger(kLogfile1_1);
3238 pi3_logger.StartLogger(kLogfile3_1);
3239 pi2_logger.StartLogger(kLogfile2_1);
3240
3241 event_loop_factory.RunFor(chrono::milliseconds(1005));
3242
3243 // Now that we've got a start time in the past, turn on data.
3244 std::unique_ptr<aos::EventLoop> ping_event_loop =
3245 pi1->MakeEventLoop("ping");
3246 Ping ping(ping_event_loop.get());
3247
3248 pi2->AlwaysStart<Pong>("pong");
3249
3250 event_loop_factory.RunFor(chrono::milliseconds(3000));
3251
3252 pi2_logger.AppendAllFilenames(&filenames);
3253
3254 // Disable any remote messages on pi2.
3255 pi1->Disconnect(pi2->node());
3256 pi2->Disconnect(pi1->node());
3257 }
3258 event_loop_factory.RunFor(chrono::milliseconds(995));
3259 // pi2 now reboots at 5 seconds.
3260 {
3261 event_loop_factory.RunFor(chrono::milliseconds(1000));
3262
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003263 // Make local stuff happen before we start logging and connect the
3264 // remote.
Naman Guptaa63aa132023-03-22 20:06:34 -07003265 pi2->AlwaysStart<Pong>("pong");
3266 std::unique_ptr<aos::EventLoop> ping_event_loop =
3267 pi1->MakeEventLoop("ping");
3268 Ping ping(ping_event_loop.get());
3269 event_loop_factory.RunFor(chrono::milliseconds(1005));
3270
3271 // Start logging again on pi2 after it is up.
3272 LoggerState pi2_logger = MakeLoggerState(
3273 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3274 pi2_logger.StartLogger(kLogfile2_2);
3275
3276 // And allow remote messages now that we have some local ones.
3277 pi1->Connect(pi2->node());
3278 pi2->Connect(pi1->node());
3279
3280 event_loop_factory.RunFor(chrono::milliseconds(1000));
3281
3282 event_loop_factory.RunFor(chrono::milliseconds(3000));
3283
3284 pi2_logger.AppendAllFilenames(&filenames);
3285 }
3286
3287 pi1_logger.AppendAllFilenames(&filenames);
3288 pi3_logger.AppendAllFilenames(&filenames);
3289 }
3290
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003291 // Confirm that we can parse the result. LogReader has enough internal
3292 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003293 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003294 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003295 auto result = ConfirmReadable(filenames);
3296
3297 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3298 EXPECT_THAT(result[0].second,
3299 ::testing::ElementsAre(realtime_clock::epoch() +
3300 chrono::microseconds(11000350)));
3301
3302 EXPECT_THAT(result[1].first,
3303 ::testing::ElementsAre(
3304 realtime_clock::epoch(),
3305 realtime_clock::epoch() + chrono::microseconds(107005000)));
3306 EXPECT_THAT(result[1].second,
3307 ::testing::ElementsAre(
3308 realtime_clock::epoch() + chrono::microseconds(4000150),
3309 realtime_clock::epoch() + chrono::microseconds(111000200)));
3310
3311 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3312 EXPECT_THAT(result[2].second,
3313 ::testing::ElementsAre(realtime_clock::epoch() +
3314 chrono::microseconds(11000150)));
3315
3316 auto start_stop_result = ConfirmReadable(
3317 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3318 realtime_clock::epoch() + chrono::milliseconds(3000));
3319
3320 EXPECT_THAT(
3321 start_stop_result[0].first,
3322 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3323 EXPECT_THAT(
3324 start_stop_result[0].second,
3325 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3326 EXPECT_THAT(
3327 start_stop_result[1].first,
3328 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3329 EXPECT_THAT(
3330 start_stop_result[1].second,
3331 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3332 EXPECT_THAT(
3333 start_stop_result[2].first,
3334 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3335 EXPECT_THAT(
3336 start_stop_result[2].second,
3337 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3338}
3339
3340// Tests that setting the start and stop flags across a reboot works as
3341// expected.
3342TEST(MultinodeRebootLoggerTest, RebootStartStopTimes) {
3343 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3344 aos::configuration::ReadConfig(ArtifactPath(
3345 "aos/events/logging/multinode_pingpong_split3_config.json"));
3346 message_bridge::TestingTimeConverter time_converter(
3347 configuration::NodesCount(&config.message()));
3348 SimulatedEventLoopFactory event_loop_factory(&config.message());
3349 event_loop_factory.SetTimeConverter(&time_converter);
3350 NodeEventLoopFactory *const pi1 =
3351 event_loop_factory.GetNodeEventLoopFactory("pi1");
3352 const size_t pi1_index = configuration::GetNodeIndex(
3353 event_loop_factory.configuration(), pi1->node());
3354 NodeEventLoopFactory *const pi2 =
3355 event_loop_factory.GetNodeEventLoopFactory("pi2");
3356 const size_t pi2_index = configuration::GetNodeIndex(
3357 event_loop_factory.configuration(), pi2->node());
3358 NodeEventLoopFactory *const pi3 =
3359 event_loop_factory.GetNodeEventLoopFactory("pi3");
3360 const size_t pi3_index = configuration::GetNodeIndex(
3361 event_loop_factory.configuration(), pi3->node());
3362
3363 const std::string kLogfile1_1 =
3364 aos::testing::TestTmpDir() + "/multi_logfile1/";
3365 const std::string kLogfile2_1 =
3366 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3367 const std::string kLogfile2_2 =
3368 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3369 const std::string kLogfile3_1 =
3370 aos::testing::TestTmpDir() + "/multi_logfile3/";
3371 util::UnlinkRecursive(kLogfile1_1);
3372 util::UnlinkRecursive(kLogfile2_1);
3373 util::UnlinkRecursive(kLogfile2_2);
3374 util::UnlinkRecursive(kLogfile3_1);
3375 {
3376 CHECK_EQ(pi1_index, 0u);
3377 CHECK_EQ(pi2_index, 1u);
3378 CHECK_EQ(pi3_index, 2u);
3379
3380 time_converter.AddNextTimestamp(
3381 distributed_clock::epoch(),
3382 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3383 BootTimestamp::epoch()});
3384 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3385 time_converter.AddNextTimestamp(
3386 distributed_clock::epoch() + reboot_time,
3387 {BootTimestamp::epoch() + reboot_time,
3388 BootTimestamp{.boot = 1,
3389 .time = monotonic_clock::epoch() + reboot_time},
3390 BootTimestamp::epoch() + reboot_time});
3391 }
3392
3393 std::vector<std::string> filenames;
3394 {
3395 LoggerState pi1_logger = MakeLoggerState(
3396 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3397 LoggerState pi3_logger = MakeLoggerState(
3398 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3399 {
3400 // And now start the logger.
3401 LoggerState pi2_logger = MakeLoggerState(
3402 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3403
3404 pi1_logger.StartLogger(kLogfile1_1);
3405 pi3_logger.StartLogger(kLogfile3_1);
3406 pi2_logger.StartLogger(kLogfile2_1);
3407
3408 event_loop_factory.RunFor(chrono::milliseconds(1005));
3409
3410 // Now that we've got a start time in the past, turn on data.
3411 std::unique_ptr<aos::EventLoop> ping_event_loop =
3412 pi1->MakeEventLoop("ping");
3413 Ping ping(ping_event_loop.get());
3414
3415 pi2->AlwaysStart<Pong>("pong");
3416
3417 event_loop_factory.RunFor(chrono::milliseconds(3000));
3418
3419 pi2_logger.AppendAllFilenames(&filenames);
3420 }
3421 event_loop_factory.RunFor(chrono::milliseconds(995));
3422 // pi2 now reboots at 5 seconds.
3423 {
3424 event_loop_factory.RunFor(chrono::milliseconds(1000));
3425
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003426 // Make local stuff happen before we start logging and connect the
3427 // remote.
Naman Guptaa63aa132023-03-22 20:06:34 -07003428 pi2->AlwaysStart<Pong>("pong");
3429 std::unique_ptr<aos::EventLoop> ping_event_loop =
3430 pi1->MakeEventLoop("ping");
3431 Ping ping(ping_event_loop.get());
3432 event_loop_factory.RunFor(chrono::milliseconds(5));
3433
3434 // Start logging again on pi2 after it is up.
3435 LoggerState pi2_logger = MakeLoggerState(
3436 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3437 pi2_logger.StartLogger(kLogfile2_2);
3438
3439 event_loop_factory.RunFor(chrono::milliseconds(5000));
3440
3441 pi2_logger.AppendAllFilenames(&filenames);
3442 }
3443
3444 pi1_logger.AppendAllFilenames(&filenames);
3445 pi3_logger.AppendAllFilenames(&filenames);
3446 }
3447
3448 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003449 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003450 auto result = ConfirmReadable(filenames);
3451
3452 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3453 EXPECT_THAT(result[0].second,
3454 ::testing::ElementsAre(realtime_clock::epoch() +
3455 chrono::microseconds(11000350)));
3456
3457 EXPECT_THAT(result[1].first,
3458 ::testing::ElementsAre(
3459 realtime_clock::epoch(),
3460 realtime_clock::epoch() + chrono::microseconds(6005000)));
3461 EXPECT_THAT(result[1].second,
3462 ::testing::ElementsAre(
3463 realtime_clock::epoch() + chrono::microseconds(4900150),
3464 realtime_clock::epoch() + chrono::microseconds(11000200)));
3465
3466 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3467 EXPECT_THAT(result[2].second,
3468 ::testing::ElementsAre(realtime_clock::epoch() +
3469 chrono::microseconds(11000150)));
3470
3471 // Confirm we observed the correct start and stop times. We should see the
3472 // reboot here.
3473 auto start_stop_result = ConfirmReadable(
3474 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3475 realtime_clock::epoch() + chrono::milliseconds(8000));
3476
3477 EXPECT_THAT(
3478 start_stop_result[0].first,
3479 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3480 EXPECT_THAT(
3481 start_stop_result[0].second,
3482 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3483 EXPECT_THAT(start_stop_result[1].first,
3484 ::testing::ElementsAre(
3485 realtime_clock::epoch() + chrono::seconds(2),
3486 realtime_clock::epoch() + chrono::microseconds(6005000)));
3487 EXPECT_THAT(start_stop_result[1].second,
3488 ::testing::ElementsAre(
3489 realtime_clock::epoch() + chrono::microseconds(4900150),
3490 realtime_clock::epoch() + chrono::seconds(8)));
3491 EXPECT_THAT(
3492 start_stop_result[2].first,
3493 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3494 EXPECT_THAT(
3495 start_stop_result[2].second,
3496 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3497}
3498
3499// Tests that we properly handle one direction being down.
3500TEST(MissingDirectionTest, OneDirection) {
3501 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3502 aos::configuration::ReadConfig(ArtifactPath(
3503 "aos/events/logging/multinode_pingpong_split4_config.json"));
3504 message_bridge::TestingTimeConverter time_converter(
3505 configuration::NodesCount(&config.message()));
3506 SimulatedEventLoopFactory event_loop_factory(&config.message());
3507 event_loop_factory.SetTimeConverter(&time_converter);
3508
3509 NodeEventLoopFactory *const pi1 =
3510 event_loop_factory.GetNodeEventLoopFactory("pi1");
3511 const size_t pi1_index = configuration::GetNodeIndex(
3512 event_loop_factory.configuration(), pi1->node());
3513 NodeEventLoopFactory *const pi2 =
3514 event_loop_factory.GetNodeEventLoopFactory("pi2");
3515 const size_t pi2_index = configuration::GetNodeIndex(
3516 event_loop_factory.configuration(), pi2->node());
3517 std::vector<std::string> filenames;
3518
3519 {
3520 CHECK_EQ(pi1_index, 0u);
3521 CHECK_EQ(pi2_index, 1u);
3522
3523 time_converter.AddNextTimestamp(
3524 distributed_clock::epoch(),
3525 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3526
3527 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3528 time_converter.AddNextTimestamp(
3529 distributed_clock::epoch() + reboot_time,
3530 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3531 BootTimestamp::epoch() + reboot_time});
3532 }
3533
3534 const std::string kLogfile2_1 =
3535 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3536 const std::string kLogfile1_1 =
3537 aos::testing::TestTmpDir() + "/multi_logfile1.1/";
3538 util::UnlinkRecursive(kLogfile2_1);
3539 util::UnlinkRecursive(kLogfile1_1);
3540
3541 pi2->Disconnect(pi1->node());
3542
3543 pi1->AlwaysStart<Ping>("ping");
3544 pi2->AlwaysStart<Pong>("pong");
3545
3546 {
3547 LoggerState pi2_logger = MakeLoggerState(
3548 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3549
3550 event_loop_factory.RunFor(chrono::milliseconds(95));
3551
3552 pi2_logger.StartLogger(kLogfile2_1);
3553
3554 event_loop_factory.RunFor(chrono::milliseconds(6000));
3555
3556 pi2->Connect(pi1->node());
3557
3558 LoggerState pi1_logger = MakeLoggerState(
3559 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3560 pi1_logger.StartLogger(kLogfile1_1);
3561
3562 event_loop_factory.RunFor(chrono::milliseconds(5000));
3563 pi1_logger.AppendAllFilenames(&filenames);
3564 pi2_logger.AppendAllFilenames(&filenames);
3565 }
3566
3567 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003568 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003569 ConfirmReadable(filenames);
3570}
3571
3572// Tests that we properly handle only one direction ever existing after a
3573// reboot.
3574TEST(MissingDirectionTest, OneDirectionAfterReboot) {
3575 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3576 aos::configuration::ReadConfig(ArtifactPath(
3577 "aos/events/logging/multinode_pingpong_split4_config.json"));
3578 message_bridge::TestingTimeConverter time_converter(
3579 configuration::NodesCount(&config.message()));
3580 SimulatedEventLoopFactory event_loop_factory(&config.message());
3581 event_loop_factory.SetTimeConverter(&time_converter);
3582
3583 NodeEventLoopFactory *const pi1 =
3584 event_loop_factory.GetNodeEventLoopFactory("pi1");
3585 const size_t pi1_index = configuration::GetNodeIndex(
3586 event_loop_factory.configuration(), pi1->node());
3587 NodeEventLoopFactory *const pi2 =
3588 event_loop_factory.GetNodeEventLoopFactory("pi2");
3589 const size_t pi2_index = configuration::GetNodeIndex(
3590 event_loop_factory.configuration(), pi2->node());
3591 std::vector<std::string> filenames;
3592
3593 {
3594 CHECK_EQ(pi1_index, 0u);
3595 CHECK_EQ(pi2_index, 1u);
3596
3597 time_converter.AddNextTimestamp(
3598 distributed_clock::epoch(),
3599 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3600
3601 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3602 time_converter.AddNextTimestamp(
3603 distributed_clock::epoch() + reboot_time,
3604 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3605 BootTimestamp::epoch() + reboot_time});
3606 }
3607
3608 const std::string kLogfile2_1 =
3609 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3610 util::UnlinkRecursive(kLogfile2_1);
3611
3612 pi1->AlwaysStart<Ping>("ping");
3613
3614 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3615 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3616 // second boot.
3617 {
3618 LoggerState pi2_logger = MakeLoggerState(
3619 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3620
3621 event_loop_factory.RunFor(chrono::milliseconds(95));
3622
3623 pi2_logger.StartLogger(kLogfile2_1);
3624
3625 event_loop_factory.RunFor(chrono::milliseconds(4000));
3626
3627 pi2->Disconnect(pi1->node());
3628
3629 event_loop_factory.RunFor(chrono::milliseconds(1000));
3630 pi1->AlwaysStart<Ping>("ping");
3631
3632 event_loop_factory.RunFor(chrono::milliseconds(5000));
3633 pi2_logger.AppendAllFilenames(&filenames);
3634 }
3635
3636 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003637 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003638 ConfirmReadable(filenames);
3639}
3640
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003641// Tests that we properly handle only one direction ever existing after a
3642// reboot with only reliable data.
Naman Guptaa63aa132023-03-22 20:06:34 -07003643TEST(MissingDirectionTest, OneDirectionAfterRebootReliable) {
3644 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003645 aos::configuration::ReadConfig(
3646 ArtifactPath("aos/events/logging/"
3647 "multinode_pingpong_split4_reliable_config.json"));
Naman Guptaa63aa132023-03-22 20:06:34 -07003648 message_bridge::TestingTimeConverter time_converter(
3649 configuration::NodesCount(&config.message()));
3650 SimulatedEventLoopFactory event_loop_factory(&config.message());
3651 event_loop_factory.SetTimeConverter(&time_converter);
3652
3653 NodeEventLoopFactory *const pi1 =
3654 event_loop_factory.GetNodeEventLoopFactory("pi1");
3655 const size_t pi1_index = configuration::GetNodeIndex(
3656 event_loop_factory.configuration(), pi1->node());
3657 NodeEventLoopFactory *const pi2 =
3658 event_loop_factory.GetNodeEventLoopFactory("pi2");
3659 const size_t pi2_index = configuration::GetNodeIndex(
3660 event_loop_factory.configuration(), pi2->node());
3661 std::vector<std::string> filenames;
3662
3663 {
3664 CHECK_EQ(pi1_index, 0u);
3665 CHECK_EQ(pi2_index, 1u);
3666
3667 time_converter.AddNextTimestamp(
3668 distributed_clock::epoch(),
3669 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3670
3671 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3672 time_converter.AddNextTimestamp(
3673 distributed_clock::epoch() + reboot_time,
3674 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3675 BootTimestamp::epoch() + reboot_time});
3676 }
3677
3678 const std::string kLogfile2_1 =
3679 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3680 util::UnlinkRecursive(kLogfile2_1);
3681
3682 pi1->AlwaysStart<Ping>("ping");
3683
3684 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3685 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3686 // second boot.
3687 {
3688 LoggerState pi2_logger = MakeLoggerState(
3689 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3690
3691 event_loop_factory.RunFor(chrono::milliseconds(95));
3692
3693 pi2_logger.StartLogger(kLogfile2_1);
3694
3695 event_loop_factory.RunFor(chrono::milliseconds(4000));
3696
3697 pi2->Disconnect(pi1->node());
3698
3699 event_loop_factory.RunFor(chrono::milliseconds(1000));
3700 pi1->AlwaysStart<Ping>("ping");
3701
3702 event_loop_factory.RunFor(chrono::milliseconds(5000));
3703 pi2_logger.AppendAllFilenames(&filenames);
3704 }
3705
3706 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003707 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003708 ConfirmReadable(filenames);
3709}
3710
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003711// Tests that we properly handle only one direction ever existing after a
3712// reboot with mixed unreliable vs reliable, where reliable has an earlier
3713// timestamp than unreliable.
Brian Smartte67d7112023-03-20 12:06:30 -07003714TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase1) {
3715 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3716 aos::configuration::ReadConfig(ArtifactPath(
3717 "aos/events/logging/multinode_pingpong_split4_mixed1_config.json"));
3718 message_bridge::TestingTimeConverter time_converter(
3719 configuration::NodesCount(&config.message()));
3720 SimulatedEventLoopFactory event_loop_factory(&config.message());
3721 event_loop_factory.SetTimeConverter(&time_converter);
3722
3723 NodeEventLoopFactory *const pi1 =
3724 event_loop_factory.GetNodeEventLoopFactory("pi1");
3725 const size_t pi1_index = configuration::GetNodeIndex(
3726 event_loop_factory.configuration(), pi1->node());
3727 NodeEventLoopFactory *const pi2 =
3728 event_loop_factory.GetNodeEventLoopFactory("pi2");
3729 const size_t pi2_index = configuration::GetNodeIndex(
3730 event_loop_factory.configuration(), pi2->node());
3731 std::vector<std::string> filenames;
3732
3733 {
3734 CHECK_EQ(pi1_index, 0u);
3735 CHECK_EQ(pi2_index, 1u);
3736
3737 time_converter.AddNextTimestamp(
3738 distributed_clock::epoch(),
3739 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3740
3741 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3742 time_converter.AddNextTimestamp(
3743 distributed_clock::epoch() + reboot_time,
3744 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3745 BootTimestamp::epoch() + reboot_time});
3746 }
3747
3748 const std::string kLogfile2_1 =
3749 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3750 util::UnlinkRecursive(kLogfile2_1);
3751
3752 // The following sequence using the above reference config creates
3753 // a reliable message timestamp < unreliable message timestamp.
3754 {
3755 pi1->DisableStatistics();
3756 pi2->DisableStatistics();
3757
3758 event_loop_factory.RunFor(chrono::milliseconds(95));
3759
3760 pi1->AlwaysStart<Ping>("ping");
3761
3762 event_loop_factory.RunFor(chrono::milliseconds(5250));
3763
3764 pi1->EnableStatistics();
3765
3766 event_loop_factory.RunFor(chrono::milliseconds(1000));
3767
3768 LoggerState pi2_logger = MakeLoggerState(
3769 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3770
3771 pi2_logger.StartLogger(kLogfile2_1);
3772
3773 event_loop_factory.RunFor(chrono::milliseconds(5000));
3774 pi2_logger.AppendAllFilenames(&filenames);
3775 }
3776
3777 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003778 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Brian Smartte67d7112023-03-20 12:06:30 -07003779 ConfirmReadable(filenames);
3780}
3781
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003782// Tests that we properly handle only one direction ever existing after a
3783// reboot with mixed unreliable vs reliable, where unreliable has an earlier
3784// timestamp than reliable.
Brian Smartte67d7112023-03-20 12:06:30 -07003785TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase2) {
3786 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3787 aos::configuration::ReadConfig(ArtifactPath(
3788 "aos/events/logging/multinode_pingpong_split4_mixed2_config.json"));
3789 message_bridge::TestingTimeConverter time_converter(
3790 configuration::NodesCount(&config.message()));
3791 SimulatedEventLoopFactory event_loop_factory(&config.message());
3792 event_loop_factory.SetTimeConverter(&time_converter);
3793
3794 NodeEventLoopFactory *const pi1 =
3795 event_loop_factory.GetNodeEventLoopFactory("pi1");
3796 const size_t pi1_index = configuration::GetNodeIndex(
3797 event_loop_factory.configuration(), pi1->node());
3798 NodeEventLoopFactory *const pi2 =
3799 event_loop_factory.GetNodeEventLoopFactory("pi2");
3800 const size_t pi2_index = configuration::GetNodeIndex(
3801 event_loop_factory.configuration(), pi2->node());
3802 std::vector<std::string> filenames;
3803
3804 {
3805 CHECK_EQ(pi1_index, 0u);
3806 CHECK_EQ(pi2_index, 1u);
3807
3808 time_converter.AddNextTimestamp(
3809 distributed_clock::epoch(),
3810 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3811
3812 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3813 time_converter.AddNextTimestamp(
3814 distributed_clock::epoch() + reboot_time,
3815 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3816 BootTimestamp::epoch() + reboot_time});
3817 }
3818
3819 const std::string kLogfile2_1 =
3820 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3821 util::UnlinkRecursive(kLogfile2_1);
3822
3823 // The following sequence using the above reference config creates
3824 // an unreliable message timestamp < reliable message timestamp.
3825 {
3826 pi1->DisableStatistics();
3827 pi2->DisableStatistics();
3828
3829 event_loop_factory.RunFor(chrono::milliseconds(95));
3830
3831 pi1->AlwaysStart<Ping>("ping");
3832
3833 event_loop_factory.RunFor(chrono::milliseconds(5250));
3834
3835 pi1->EnableStatistics();
3836
3837 event_loop_factory.RunFor(chrono::milliseconds(1000));
3838
3839 LoggerState pi2_logger = MakeLoggerState(
3840 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3841
3842 pi2_logger.StartLogger(kLogfile2_1);
3843
3844 event_loop_factory.RunFor(chrono::milliseconds(5000));
3845 pi2_logger.AppendAllFilenames(&filenames);
3846 }
3847
3848 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003849 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Brian Smartte67d7112023-03-20 12:06:30 -07003850 ConfirmReadable(filenames);
3851}
3852
Naman Guptaa63aa132023-03-22 20:06:34 -07003853// Tests that we properly handle what used to be a time violation in one
3854// direction. This can occur when one direction goes down after sending some
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003855// data, but the other keeps working. The down direction ends up resolving to
3856// a straight line in the noncausal filter, where the direction which is still
3857// up can cross that line. Really, time progressed along just fine but we
3858// assumed that the offset was a line when it could have deviated by up to
3859// 1ms/second.
Naman Guptaa63aa132023-03-22 20:06:34 -07003860TEST_P(MultinodeLoggerTest, OneDirectionTimeDrift) {
3861 std::vector<std::string> filenames;
3862
3863 CHECK_EQ(pi1_index_, 0u);
3864 CHECK_EQ(pi2_index_, 1u);
3865
3866 time_converter_.AddNextTimestamp(
3867 distributed_clock::epoch(),
3868 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3869
3870 const chrono::nanoseconds before_disconnect_duration =
3871 time_converter_.AddMonotonic(
3872 {chrono::milliseconds(1000), chrono::milliseconds(1000)});
3873
3874 const chrono::nanoseconds test_duration =
3875 time_converter_.AddMonotonic(
3876 {chrono::milliseconds(1000), chrono::milliseconds(1000)}) +
3877 time_converter_.AddMonotonic(
3878 {chrono::milliseconds(10000),
3879 chrono::milliseconds(10000) - chrono::milliseconds(5)}) +
3880 time_converter_.AddMonotonic(
3881 {chrono::milliseconds(10000),
3882 chrono::milliseconds(10000) + chrono::milliseconds(5)});
3883
3884 const std::string kLogfile =
3885 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3886 util::UnlinkRecursive(kLogfile);
3887
3888 {
3889 LoggerState pi2_logger = MakeLogger(pi2_);
3890 pi2_logger.StartLogger(kLogfile);
3891 event_loop_factory_.RunFor(before_disconnect_duration);
3892
3893 pi2_->Disconnect(pi1_->node());
3894
3895 event_loop_factory_.RunFor(test_duration);
3896 pi2_->Connect(pi1_->node());
3897
3898 event_loop_factory_.RunFor(chrono::milliseconds(5000));
3899 pi2_logger.AppendAllFilenames(&filenames);
3900 }
3901
3902 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003903 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003904 ConfirmReadable(filenames);
3905}
3906
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003907// Tests that we can replay a logfile that has timestamps such that at least
3908// one node's epoch is at a positive distributed_clock (and thus will have to
3909// be booted after the other node(s)).
Naman Guptaa63aa132023-03-22 20:06:34 -07003910TEST_P(MultinodeLoggerTest, StartOneNodeBeforeOther) {
3911 std::vector<std::string> filenames;
3912
3913 CHECK_EQ(pi1_index_, 0u);
3914 CHECK_EQ(pi2_index_, 1u);
3915
3916 time_converter_.AddNextTimestamp(
3917 distributed_clock::epoch(),
3918 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3919
3920 const chrono::nanoseconds before_reboot_duration = chrono::milliseconds(1000);
3921 time_converter_.RebootAt(
3922 0, distributed_clock::time_point(before_reboot_duration));
3923
3924 const chrono::nanoseconds test_duration = time_converter_.AddMonotonic(
3925 {chrono::milliseconds(10000), chrono::milliseconds(10000)});
3926
3927 const std::string kLogfile =
3928 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3929 util::UnlinkRecursive(kLogfile);
3930
3931 pi2_->Disconnect(pi1_->node());
3932 pi1_->Disconnect(pi2_->node());
3933
3934 {
3935 LoggerState pi2_logger = MakeLogger(pi2_);
3936
3937 pi2_logger.StartLogger(kLogfile);
3938 event_loop_factory_.RunFor(before_reboot_duration);
3939
3940 pi2_->Connect(pi1_->node());
3941 pi1_->Connect(pi2_->node());
3942
3943 event_loop_factory_.RunFor(test_duration);
3944
3945 pi2_logger.AppendAllFilenames(&filenames);
3946 }
3947
3948 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003949 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003950 ConfirmReadable(filenames);
3951
3952 {
3953 LogReader reader(sorted_parts);
3954 SimulatedEventLoopFactory replay_factory(reader.configuration());
3955 reader.RegisterWithoutStarting(&replay_factory);
3956
3957 NodeEventLoopFactory *const replay_node =
3958 reader.event_loop_factory()->GetNodeEventLoopFactory("pi1");
3959
3960 std::unique_ptr<EventLoop> test_event_loop =
3961 replay_node->MakeEventLoop("test_reader");
3962 replay_node->OnStartup([replay_node]() {
3963 // Check that we didn't boot until at least t=0.
3964 CHECK_LE(monotonic_clock::epoch(), replay_node->monotonic_now());
3965 });
3966 test_event_loop->OnRun([&test_event_loop]() {
3967 // Check that we didn't boot until at least t=0.
3968 EXPECT_LE(monotonic_clock::epoch(), test_event_loop->monotonic_now());
3969 });
3970 reader.event_loop_factory()->Run();
3971 reader.Deregister();
3972 }
3973}
3974
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003975// Tests that when we have a loop without all the logs at all points in time,
3976// we can sort it properly.
Naman Guptaa63aa132023-03-22 20:06:34 -07003977TEST(MultinodeLoggerLoopTest, Loop) {
3978 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003979 aos::configuration::ReadConfig(
3980 ArtifactPath("aos/events/logging/"
3981 "multinode_pingpong_triangle_split_config.json"));
Naman Guptaa63aa132023-03-22 20:06:34 -07003982 message_bridge::TestingTimeConverter time_converter(
3983 configuration::NodesCount(&config.message()));
3984 SimulatedEventLoopFactory event_loop_factory(&config.message());
3985 event_loop_factory.SetTimeConverter(&time_converter);
3986
3987 NodeEventLoopFactory *const pi1 =
3988 event_loop_factory.GetNodeEventLoopFactory("pi1");
3989 NodeEventLoopFactory *const pi2 =
3990 event_loop_factory.GetNodeEventLoopFactory("pi2");
3991 NodeEventLoopFactory *const pi3 =
3992 event_loop_factory.GetNodeEventLoopFactory("pi3");
3993
3994 const std::string kLogfile1_1 =
3995 aos::testing::TestTmpDir() + "/multi_logfile1/";
3996 const std::string kLogfile2_1 =
3997 aos::testing::TestTmpDir() + "/multi_logfile2/";
3998 const std::string kLogfile3_1 =
3999 aos::testing::TestTmpDir() + "/multi_logfile3/";
4000 util::UnlinkRecursive(kLogfile1_1);
4001 util::UnlinkRecursive(kLogfile2_1);
4002 util::UnlinkRecursive(kLogfile3_1);
4003
4004 {
4005 // Make pi1 boot before everything else.
4006 time_converter.AddNextTimestamp(
4007 distributed_clock::epoch(),
4008 {BootTimestamp::epoch(),
4009 BootTimestamp::epoch() - chrono::milliseconds(100),
4010 BootTimestamp::epoch() - chrono::milliseconds(300)});
4011 }
4012
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004013 // We want to setup a situation such that 2 of the 3 legs of the loop are
4014 // very confident about time being X, and the third leg is pulling the
4015 // average off to one side.
Naman Guptaa63aa132023-03-22 20:06:34 -07004016 //
4017 // It's easiest to visualize this in timestamp_plotter.
4018
4019 std::vector<std::string> filenames;
4020 {
4021 // Have pi1 send out a reliable message at startup. This sets up a long
4022 // forwarding time message at the start to bias time.
4023 std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
4024 {
4025 aos::Sender<examples::Ping> ping_sender =
4026 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
4027
4028 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
4029 examples::Ping::Builder ping_builder =
4030 builder.MakeBuilder<examples::Ping>();
4031 CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
4032 }
4033
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004034 // Wait a while so there's enough data to let the worst case be rather
4035 // off.
Naman Guptaa63aa132023-03-22 20:06:34 -07004036 event_loop_factory.RunFor(chrono::seconds(1000));
4037
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004038 // Now start a receiving node first. This sets up 2 tight bounds between
4039 // 2 of the nodes.
Naman Guptaa63aa132023-03-22 20:06:34 -07004040 LoggerState pi2_logger = MakeLoggerState(
4041 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4042 pi2_logger.StartLogger(kLogfile2_1);
4043
4044 event_loop_factory.RunFor(chrono::seconds(100));
4045
4046 // And now start the third leg.
4047 LoggerState pi3_logger = MakeLoggerState(
4048 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4049 pi3_logger.StartLogger(kLogfile3_1);
4050
4051 LoggerState pi1_logger = MakeLoggerState(
4052 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4053 pi1_logger.StartLogger(kLogfile1_1);
4054
4055 event_loop_factory.RunFor(chrono::seconds(100));
4056
4057 pi1_logger.AppendAllFilenames(&filenames);
4058 pi2_logger.AppendAllFilenames(&filenames);
4059 pi3_logger.AppendAllFilenames(&filenames);
4060 }
4061
4062 // Make sure we can read this.
4063 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004064 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004065 auto result = ConfirmReadable(filenames);
4066}
4067
Austin Schuh08dba8f2023-05-01 08:29:30 -07004068// Tests that RestartLogging works in the simple case. Unfortunately, the
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004069// failure cases involve simulating time elapsing in callbacks, which is
4070// really hard. The best we can reasonably do is make sure 2 back to back
4071// logs are parseable together.
Austin Schuh08dba8f2023-05-01 08:29:30 -07004072TEST_P(MultinodeLoggerTest, RestartLogging) {
4073 time_converter_.AddMonotonic(
4074 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
4075 std::vector<std::string> filenames;
4076 {
4077 LoggerState pi1_logger = MakeLogger(pi1_);
4078
4079 event_loop_factory_.RunFor(chrono::milliseconds(95));
4080
4081 StartLogger(&pi1_logger, logfile_base1_);
4082 aos::monotonic_clock::time_point last_rotation_time =
4083 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07004084 pi1_logger.logger->set_on_logged_period(
4085 [&](aos::monotonic_clock::time_point) {
4086 const auto now = pi1_logger.event_loop->monotonic_now();
4087 if (now > last_rotation_time + std::chrono::seconds(5)) {
4088 pi1_logger.AppendAllFilenames(&filenames);
4089 std::unique_ptr<MultiNodeFilesLogNamer> namer =
4090 pi1_logger.MakeLogNamer(logfile_base2_);
4091 pi1_logger.log_namer = namer.get();
Austin Schuh08dba8f2023-05-01 08:29:30 -07004092
Austin Schuh2f864452023-07-17 14:53:08 -07004093 pi1_logger.logger->RestartLogging(std::move(namer));
4094 last_rotation_time = now;
4095 }
4096 });
Austin Schuh08dba8f2023-05-01 08:29:30 -07004097
4098 event_loop_factory_.RunFor(chrono::milliseconds(7000));
4099
4100 pi1_logger.AppendAllFilenames(&filenames);
4101 }
4102
4103 for (const auto &x : filenames) {
4104 LOG(INFO) << x;
4105 }
4106
4107 EXPECT_GE(filenames.size(), 2u);
4108
4109 ConfirmReadable(filenames);
4110
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004111 // TODO(austin): It would be good to confirm that any one time messages end
4112 // up in both logs correctly.
Austin Schuh08dba8f2023-05-01 08:29:30 -07004113}
4114
Naman Guptaa63aa132023-03-22 20:06:34 -07004115} // namespace testing
4116} // namespace logger
4117} // namespace aos