blob: b843e3e8ea0210fb0c089ac8f7515178404c8376 [file] [log] [blame]
Brian Smartte67d7112023-03-20 12:06:30 -07001#include <algorithm>
2
Philipp Schrader790cb542023-07-05 21:06:52 -07003#include "gmock/gmock.h"
4#include "gtest/gtest.h"
5
Naman Guptaa63aa132023-03-22 20:06:34 -07006#include "aos/events/logging/log_reader.h"
7#include "aos/events/logging/multinode_logger_test_lib.h"
8#include "aos/events/message_counter.h"
9#include "aos/events/ping_lib.h"
10#include "aos/events/pong_lib.h"
11#include "aos/network/remote_message_generated.h"
12#include "aos/network/timestamp_generated.h"
13#include "aos/testing/tmpdir.h"
Naman Guptaa63aa132023-03-22 20:06:34 -070014
15namespace aos {
16namespace logger {
17namespace testing {
18
19namespace chrono = std::chrono;
20using aos::message_bridge::RemoteMessage;
21using aos::testing::ArtifactPath;
22using aos::testing::MessageCounter;
23
Naman Guptaa63aa132023-03-22 20:06:34 -070024INSTANTIATE_TEST_SUITE_P(
25 All, MultinodeLoggerTest,
26 ::testing::Combine(
27 ::testing::Values(
28 ConfigParams{"multinode_pingpong_combined_config.json", true,
Austin Schuh6ecfe902023-08-04 22:44:37 -070029 kCombinedConfigSha1(), kCombinedConfigSha1(),
30 FileStrategy::kKeepSeparate},
Naman Guptaa63aa132023-03-22 20:06:34 -070031 ConfigParams{"multinode_pingpong_split_config.json", false,
Austin Schuh6ecfe902023-08-04 22:44:37 -070032 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
33 FileStrategy::kKeepSeparate},
34 ConfigParams{"multinode_pingpong_split_config.json", false,
35 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
36 FileStrategy::kCombine}),
Naman Guptaa63aa132023-03-22 20:06:34 -070037 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
38
39INSTANTIATE_TEST_SUITE_P(
40 All, MultinodeLoggerDeathTest,
41 ::testing::Combine(
42 ::testing::Values(
43 ConfigParams{"multinode_pingpong_combined_config.json", true,
Austin Schuh6ecfe902023-08-04 22:44:37 -070044 kCombinedConfigSha1(), kCombinedConfigSha1(),
45 FileStrategy::kKeepSeparate},
Naman Guptaa63aa132023-03-22 20:06:34 -070046 ConfigParams{"multinode_pingpong_split_config.json", false,
Austin Schuh6ecfe902023-08-04 22:44:37 -070047 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
48 FileStrategy::kKeepSeparate},
49 ConfigParams{"multinode_pingpong_split_config.json", false,
50 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
51 FileStrategy::kCombine}),
Naman Guptaa63aa132023-03-22 20:06:34 -070052 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
53
54// Tests that we can write and read simple multi-node log files.
55TEST_P(MultinodeLoggerTest, SimpleMultiNode) {
Austin Schuh6ecfe902023-08-04 22:44:37 -070056 if (file_strategy() == FileStrategy::kCombine) {
57 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
58 }
59
Naman Guptaa63aa132023-03-22 20:06:34 -070060 std::vector<std::string> actual_filenames;
61 time_converter_.StartEqual();
62
63 {
64 LoggerState pi1_logger = MakeLogger(pi1_);
65 LoggerState pi2_logger = MakeLogger(pi2_);
66
67 event_loop_factory_.RunFor(chrono::milliseconds(95));
68
69 StartLogger(&pi1_logger);
70 StartLogger(&pi2_logger);
71
72 event_loop_factory_.RunFor(chrono::milliseconds(20000));
73 pi1_logger.AppendAllFilenames(&actual_filenames);
74 pi2_logger.AppendAllFilenames(&actual_filenames);
75 }
76
77 ASSERT_THAT(actual_filenames,
78 ::testing::UnorderedElementsAreArray(logfiles_));
79
80 {
81 std::set<std::string> logfile_uuids;
82 std::set<std::string> parts_uuids;
83 // Confirm that we have the expected number of UUIDs for both the logfile
84 // UUIDs and parts UUIDs.
85 std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
86 for (std::string_view f : logfiles_) {
87 log_header.emplace_back(ReadHeader(f).value());
88 if (!log_header.back().message().has_configuration()) {
89 logfile_uuids.insert(
90 log_header.back().message().log_event_uuid()->str());
91 parts_uuids.insert(log_header.back().message().parts_uuid()->str());
92 }
93 }
94
95 EXPECT_EQ(logfile_uuids.size(), 2u);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -070096 EXPECT_EQ(parts_uuids.size(), 8u);
Naman Guptaa63aa132023-03-22 20:06:34 -070097
98 // And confirm everything is on the correct node.
99 EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
100 EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi1");
101 EXPECT_EQ(log_header[4].message().node()->name()->string_view(), "pi1");
102
103 EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
104 EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700105 EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700106
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700107 EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi1");
108 EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi1");
Naman Guptaa63aa132023-03-22 20:06:34 -0700109
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700110 EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi2");
111 EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700112
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700113 EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
114 EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
115 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700116
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700117 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi1");
118 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi1");
Naman Guptaa63aa132023-03-22 20:06:34 -0700119
120 // And the parts index matches.
121 EXPECT_EQ(log_header[2].message().parts_index(), 0);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700122
123 EXPECT_EQ(log_header[3].message().parts_index(), 0);
124 EXPECT_EQ(log_header[4].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700125
126 EXPECT_EQ(log_header[5].message().parts_index(), 0);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700127
128 EXPECT_EQ(log_header[6].message().parts_index(), 0);
129 EXPECT_EQ(log_header[7].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700130
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700131 EXPECT_EQ(log_header[8].message().parts_index(), 0);
132 EXPECT_EQ(log_header[9].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700133
134 EXPECT_EQ(log_header[10].message().parts_index(), 0);
135 EXPECT_EQ(log_header[11].message().parts_index(), 1);
136
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700137 EXPECT_EQ(log_header[12].message().parts_index(), 0);
138 EXPECT_EQ(log_header[13].message().parts_index(), 1);
139 EXPECT_EQ(log_header[14].message().parts_index(), 2);
Naman Guptaa63aa132023-03-22 20:06:34 -0700140
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700141 EXPECT_EQ(log_header[15].message().parts_index(), 0);
142 EXPECT_EQ(log_header[16].message().parts_index(), 1);
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700143
144 // And that the data_stored field is right.
145 EXPECT_THAT(*log_header[2].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700146 ::testing::ElementsAre(StoredDataType::DATA));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700147 EXPECT_THAT(*log_header[3].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700148 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700149 EXPECT_THAT(*log_header[4].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700150 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700151
152 EXPECT_THAT(*log_header[5].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700153 ::testing::ElementsAre(StoredDataType::DATA));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700154 EXPECT_THAT(*log_header[6].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700155 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700156 EXPECT_THAT(*log_header[7].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700157 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700158
159 EXPECT_THAT(*log_header[8].message().data_stored(),
160 ::testing::ElementsAre(StoredDataType::DATA));
161 EXPECT_THAT(*log_header[9].message().data_stored(),
162 ::testing::ElementsAre(StoredDataType::DATA));
163
164 EXPECT_THAT(*log_header[10].message().data_stored(),
165 ::testing::ElementsAre(StoredDataType::DATA));
166 EXPECT_THAT(*log_header[11].message().data_stored(),
167 ::testing::ElementsAre(StoredDataType::DATA));
168
169 EXPECT_THAT(*log_header[12].message().data_stored(),
170 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
171 EXPECT_THAT(*log_header[13].message().data_stored(),
172 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
173 EXPECT_THAT(*log_header[14].message().data_stored(),
174 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
175
176 EXPECT_THAT(*log_header[15].message().data_stored(),
177 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
178 EXPECT_THAT(*log_header[16].message().data_stored(),
179 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
Naman Guptaa63aa132023-03-22 20:06:34 -0700180 }
181
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700182 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
183 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -0700184 {
185 using ::testing::UnorderedElementsAre;
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700186 std::shared_ptr<const aos::Configuration> config = sorted_parts[0].config;
Naman Guptaa63aa132023-03-22 20:06:34 -0700187
188 // Timing reports, pings
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700189 if (shared()) {
190 EXPECT_THAT(
191 CountChannelsData(config, logfiles_[2]),
192 UnorderedElementsAre(
193 std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
194 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
195 200),
196 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
197 21),
198 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
199 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
200 std::make_tuple("/test", "aos.examples.Ping", 2001)))
201 << " : " << logfiles_[2];
202 } else {
203 EXPECT_THAT(
204 CountChannelsData(config, logfiles_[2]),
205 UnorderedElementsAre(
206 std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
207 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
208 200),
209 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
210 21),
211 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
212 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
213 std::make_tuple("/test", "aos.examples.Ping", 2001),
214 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
215 "aos-message_bridge-Timestamp",
216 "aos.message_bridge.RemoteMessage", 200)))
217 << " : " << logfiles_[2];
Naman Guptaa63aa132023-03-22 20:06:34 -0700218 }
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700219
220 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
221 ::testing::UnorderedElementsAre())
222 << " : " << logfiles_[3];
223 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
224 ::testing::UnorderedElementsAre())
225 << " : " << logfiles_[4];
226
Naman Guptaa63aa132023-03-22 20:06:34 -0700227 // Timestamps for pong
228 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
229 UnorderedElementsAre())
230 << " : " << logfiles_[2];
231 EXPECT_THAT(
232 CountChannelsTimestamp(config, logfiles_[3]),
233 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
234 << " : " << logfiles_[3];
235 EXPECT_THAT(
236 CountChannelsTimestamp(config, logfiles_[4]),
237 UnorderedElementsAre(
238 std::make_tuple("/test", "aos.examples.Pong", 2000),
239 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)))
240 << " : " << logfiles_[4];
241
Naman Guptaa63aa132023-03-22 20:06:34 -0700242 // Timing reports and pongs.
Naman Guptaa63aa132023-03-22 20:06:34 -0700243 EXPECT_THAT(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700244 CountChannelsData(config, logfiles_[5]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700245 UnorderedElementsAre(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700246 std::make_tuple("/pi2/aos", "aos.examples.Ping", 2001),
Naman Guptaa63aa132023-03-22 20:06:34 -0700247 std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
248 200),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700249 std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
250 21),
251 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700252 std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700253 std::make_tuple("/test", "aos.examples.Pong", 2001)))
254 << " : " << logfiles_[5];
255 EXPECT_THAT(CountChannelsData(config, logfiles_[6]), UnorderedElementsAre())
256 << " : " << logfiles_[6];
257 EXPECT_THAT(CountChannelsData(config, logfiles_[7]), UnorderedElementsAre())
Naman Guptaa63aa132023-03-22 20:06:34 -0700258 << " : " << logfiles_[7];
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700259 // And ping timestamps.
260 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
261 UnorderedElementsAre())
262 << " : " << logfiles_[5];
Naman Guptaa63aa132023-03-22 20:06:34 -0700263 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700264 CountChannelsTimestamp(config, logfiles_[6]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700265 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700266 << " : " << logfiles_[6];
Naman Guptaa63aa132023-03-22 20:06:34 -0700267 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700268 CountChannelsTimestamp(config, logfiles_[7]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700269 UnorderedElementsAre(
270 std::make_tuple("/test", "aos.examples.Ping", 2000),
271 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700272 << " : " << logfiles_[7];
Naman Guptaa63aa132023-03-22 20:06:34 -0700273
274 // And then test that the remotely logged timestamp data files only have
275 // timestamps in them.
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700276 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[8]),
277 UnorderedElementsAre())
278 << " : " << logfiles_[8];
279 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[9]),
280 UnorderedElementsAre())
281 << " : " << logfiles_[9];
Naman Guptaa63aa132023-03-22 20:06:34 -0700282 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
283 UnorderedElementsAre())
284 << " : " << logfiles_[10];
285 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
286 UnorderedElementsAre())
287 << " : " << logfiles_[11];
Naman Guptaa63aa132023-03-22 20:06:34 -0700288
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700289 EXPECT_THAT(CountChannelsData(config, logfiles_[8]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700290 UnorderedElementsAre(std::make_tuple(
291 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700292 << " : " << logfiles_[8];
293 EXPECT_THAT(CountChannelsData(config, logfiles_[9]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700294 UnorderedElementsAre(std::make_tuple(
295 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700296 << " : " << logfiles_[9];
Naman Guptaa63aa132023-03-22 20:06:34 -0700297
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700298 // Pong snd timestamp data.
299 EXPECT_THAT(
300 CountChannelsData(config, logfiles_[10]),
301 UnorderedElementsAre(
302 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 9),
303 std::make_tuple("/test", "aos.examples.Pong", 91)))
304 << " : " << logfiles_[10];
305 EXPECT_THAT(
306 CountChannelsData(config, logfiles_[11]),
307 UnorderedElementsAre(
308 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 191),
309 std::make_tuple("/test", "aos.examples.Pong", 1910)))
310 << " : " << logfiles_[11];
Naman Guptaa63aa132023-03-22 20:06:34 -0700311
312 // Timestamps from pi2 on pi1, and the other way.
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700313 // if (shared()) {
314 EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
315 UnorderedElementsAre())
316 << " : " << logfiles_[12];
317 EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
318 UnorderedElementsAre())
319 << " : " << logfiles_[13];
320 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
321 UnorderedElementsAre())
322 << " : " << logfiles_[14];
323 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
324 UnorderedElementsAre())
325 << " : " << logfiles_[15];
326 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
327 UnorderedElementsAre())
328 << " : " << logfiles_[16];
Naman Guptaa63aa132023-03-22 20:06:34 -0700329
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700330 EXPECT_THAT(
331 CountChannelsTimestamp(config, logfiles_[12]),
332 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
333 << " : " << logfiles_[12];
334 EXPECT_THAT(
335 CountChannelsTimestamp(config, logfiles_[13]),
336 UnorderedElementsAre(
337 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 9),
338 std::make_tuple("/test", "aos.examples.Ping", 90)))
339 << " : " << logfiles_[13];
340 EXPECT_THAT(
341 CountChannelsTimestamp(config, logfiles_[14]),
342 UnorderedElementsAre(
343 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 191),
344 std::make_tuple("/test", "aos.examples.Ping", 1910)))
345 << " : " << logfiles_[14];
346 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
347 UnorderedElementsAre(std::make_tuple(
348 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
349 << " : " << logfiles_[15];
350 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[16]),
351 UnorderedElementsAre(std::make_tuple(
352 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
353 << " : " << logfiles_[16];
Naman Guptaa63aa132023-03-22 20:06:34 -0700354 }
355
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700356 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700357
358 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
359 log_reader_factory.set_send_delay(chrono::microseconds(0));
360
361 // This sends out the fetched messages and advances time to the start of the
362 // log file.
363 reader.Register(&log_reader_factory);
364
365 const Node *pi1 =
366 configuration::GetNode(log_reader_factory.configuration(), "pi1");
367 const Node *pi2 =
368 configuration::GetNode(log_reader_factory.configuration(), "pi2");
369
370 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
371 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
372 LOG(INFO) << "now pi1 "
373 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
374 LOG(INFO) << "now pi2 "
375 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
376
377 EXPECT_THAT(reader.LoggedNodes(),
378 ::testing::ElementsAre(
379 configuration::GetNode(reader.logged_configuration(), pi1),
380 configuration::GetNode(reader.logged_configuration(), pi2)));
381
382 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
383
384 std::unique_ptr<EventLoop> pi1_event_loop =
385 log_reader_factory.MakeEventLoop("test", pi1);
386 std::unique_ptr<EventLoop> pi2_event_loop =
387 log_reader_factory.MakeEventLoop("test", pi2);
388
389 int pi1_ping_count = 10;
390 int pi2_ping_count = 10;
391 int pi1_pong_count = 10;
392 int pi2_pong_count = 10;
393
394 // Confirm that the ping value matches.
395 pi1_event_loop->MakeWatcher(
396 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
397 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
398 << pi1_event_loop->context().monotonic_remote_time << " -> "
399 << pi1_event_loop->context().monotonic_event_time;
400 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
401 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
402 pi1_ping_count * chrono::milliseconds(10) +
403 monotonic_clock::epoch());
404 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
405 pi1_ping_count * chrono::milliseconds(10) +
406 realtime_clock::epoch());
407 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
408 pi1_event_loop->context().monotonic_event_time);
409 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
410 pi1_event_loop->context().realtime_event_time);
411
412 ++pi1_ping_count;
413 });
414 pi2_event_loop->MakeWatcher(
415 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
416 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
417 << pi2_event_loop->context().monotonic_remote_time << " -> "
418 << pi2_event_loop->context().monotonic_event_time;
419 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
420
421 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
422 pi2_ping_count * chrono::milliseconds(10) +
423 monotonic_clock::epoch());
424 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
425 pi2_ping_count * chrono::milliseconds(10) +
426 realtime_clock::epoch());
427 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time +
428 chrono::microseconds(150),
429 pi2_event_loop->context().monotonic_event_time);
430 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time +
431 chrono::microseconds(150),
432 pi2_event_loop->context().realtime_event_time);
433 ++pi2_ping_count;
434 });
435
436 constexpr ssize_t kQueueIndexOffset = -9;
437 // Confirm that the ping and pong counts both match, and the value also
438 // matches.
439 pi1_event_loop->MakeWatcher(
440 "/test", [&pi1_event_loop, &pi1_ping_count,
441 &pi1_pong_count](const examples::Pong &pong) {
442 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
443 << pi1_event_loop->context().monotonic_remote_time << " -> "
444 << pi1_event_loop->context().monotonic_event_time;
445
446 EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
447 pi1_pong_count + kQueueIndexOffset);
448 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
449 chrono::microseconds(200) +
450 pi1_pong_count * chrono::milliseconds(10) +
451 monotonic_clock::epoch());
452 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
453 chrono::microseconds(200) +
454 pi1_pong_count * chrono::milliseconds(10) +
455 realtime_clock::epoch());
456
457 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
458 chrono::microseconds(150),
459 pi1_event_loop->context().monotonic_event_time);
460 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
461 chrono::microseconds(150),
462 pi1_event_loop->context().realtime_event_time);
463
464 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
465 ++pi1_pong_count;
466 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
467 });
468 pi2_event_loop->MakeWatcher(
469 "/test", [&pi2_event_loop, &pi2_ping_count,
470 &pi2_pong_count](const examples::Pong &pong) {
471 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
472 << pi2_event_loop->context().monotonic_remote_time << " -> "
473 << pi2_event_loop->context().monotonic_event_time;
474
475 EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
476 pi2_pong_count + kQueueIndexOffset);
477
478 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
479 chrono::microseconds(200) +
480 pi2_pong_count * chrono::milliseconds(10) +
481 monotonic_clock::epoch());
482 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
483 chrono::microseconds(200) +
484 pi2_pong_count * chrono::milliseconds(10) +
485 realtime_clock::epoch());
486
487 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
488 pi2_event_loop->context().monotonic_event_time);
489 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
490 pi2_event_loop->context().realtime_event_time);
491
492 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
493 ++pi2_pong_count;
494 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
495 });
496
497 log_reader_factory.Run();
498 EXPECT_EQ(pi1_ping_count, 2010);
499 EXPECT_EQ(pi2_ping_count, 2010);
500 EXPECT_EQ(pi1_pong_count, 2010);
501 EXPECT_EQ(pi2_pong_count, 2010);
502
503 reader.Deregister();
504}
505
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600506// MultinodeLoggerTest that tests the mutate callback works across multiple
507// nodes with remapping
508TEST_P(MultinodeLoggerTest, MultiNodeRemapMutateCallback) {
509 time_converter_.StartEqual();
510 std::vector<std::string> actual_filenames;
511
512 {
513 LoggerState pi1_logger = MakeLogger(pi1_);
514 LoggerState pi2_logger = MakeLogger(pi2_);
515
516 event_loop_factory_.RunFor(chrono::milliseconds(95));
517
518 StartLogger(&pi1_logger);
519 StartLogger(&pi2_logger);
520
521 event_loop_factory_.RunFor(chrono::milliseconds(20000));
522 pi1_logger.AppendAllFilenames(&actual_filenames);
523 pi2_logger.AppendAllFilenames(&actual_filenames);
524 }
525
Austin Schuh8fb4b452023-08-04 17:02:27 -0700526 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700527 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600528
529 LogReader reader(sorted_parts, &config_.message());
530 // Remap just on pi1.
531 reader.RemapLoggedChannel<examples::Pong>(
532 "/test", configuration::GetNode(reader.configuration(), "pi1"));
533
534 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
535
536 int pong_count = 0;
537 // Adds a callback which mutates the value of the pong message before the
538 // message is sent which is the feature we are testing here
539 reader.AddBeforeSendCallback("/test",
540 [&pong_count](aos::examples::Pong *pong) {
541 pong->mutate_value(pong->value() + 1);
542 pong_count = pong->value();
543 });
544
545 // This sends out the fetched messages and advances time to the start of the
546 // log file.
547 reader.Register(&log_reader_factory);
548
549 const Node *pi1 =
550 configuration::GetNode(log_reader_factory.configuration(), "pi1");
551 const Node *pi2 =
552 configuration::GetNode(log_reader_factory.configuration(), "pi2");
553
554 EXPECT_THAT(reader.LoggedNodes(),
555 ::testing::ElementsAre(
556 configuration::GetNode(reader.logged_configuration(), pi1),
557 configuration::GetNode(reader.logged_configuration(), pi2)));
558
559 std::unique_ptr<EventLoop> pi1_event_loop =
560 log_reader_factory.MakeEventLoop("test", pi1);
561 std::unique_ptr<EventLoop> pi2_event_loop =
562 log_reader_factory.MakeEventLoop("test", pi2);
563
564 pi1_event_loop->MakeWatcher("/original/test",
565 [&pong_count](const examples::Pong &pong) {
566 EXPECT_EQ(pong_count, pong.value());
567 });
568
569 pi2_event_loop->MakeWatcher("/test",
570 [&pong_count](const examples::Pong &pong) {
571 EXPECT_EQ(pong_count, pong.value());
572 });
573
574 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
575 reader.Deregister();
576
577 EXPECT_EQ(pong_count, 2011);
578}
579
580// MultinodeLoggerTest that tests the mutate callback works across multiple
581// nodes
582TEST_P(MultinodeLoggerTest, MultiNodeMutateCallback) {
583 time_converter_.StartEqual();
584 std::vector<std::string> actual_filenames;
585
586 {
587 LoggerState pi1_logger = MakeLogger(pi1_);
588 LoggerState pi2_logger = MakeLogger(pi2_);
589
590 event_loop_factory_.RunFor(chrono::milliseconds(95));
591
592 StartLogger(&pi1_logger);
593 StartLogger(&pi2_logger);
594
595 event_loop_factory_.RunFor(chrono::milliseconds(20000));
596 pi1_logger.AppendAllFilenames(&actual_filenames);
597 pi2_logger.AppendAllFilenames(&actual_filenames);
598 }
599
Austin Schuh8fb4b452023-08-04 17:02:27 -0700600 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700601 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600602
603 LogReader reader(sorted_parts, &config_.message());
604
605 int pong_count = 0;
606 // Adds a callback which mutates the value of the pong message before the
607 // message is sent which is the feature we are testing here
608 reader.AddBeforeSendCallback("/test",
609 [&pong_count](aos::examples::Pong *pong) {
610 pong->mutate_value(pong->value() + 1);
611 pong_count = pong->value();
612 });
613
614 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
615
616 // This sends out the fetched messages and advances time to the start of the
617 // log file.
618 reader.Register(&log_reader_factory);
619
620 const Node *pi1 =
621 configuration::GetNode(log_reader_factory.configuration(), "pi1");
622 const Node *pi2 =
623 configuration::GetNode(log_reader_factory.configuration(), "pi2");
624
625 EXPECT_THAT(reader.LoggedNodes(),
626 ::testing::ElementsAre(
627 configuration::GetNode(reader.logged_configuration(), pi1),
628 configuration::GetNode(reader.logged_configuration(), pi2)));
629
630 std::unique_ptr<EventLoop> pi1_event_loop =
631 log_reader_factory.MakeEventLoop("test", pi1);
632 std::unique_ptr<EventLoop> pi2_event_loop =
633 log_reader_factory.MakeEventLoop("test", pi2);
634
635 pi1_event_loop->MakeWatcher("/test",
636 [&pong_count](const examples::Pong &pong) {
637 EXPECT_EQ(pong_count, pong.value());
638 });
639
640 pi2_event_loop->MakeWatcher("/test",
641 [&pong_count](const examples::Pong &pong) {
642 EXPECT_EQ(pong_count, pong.value());
643 });
644
645 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
646 reader.Deregister();
647
648 EXPECT_EQ(pong_count, 2011);
649}
650
651// Tests that the before send callback is only called from the sender node if it
652// is forwarded
653TEST_P(MultinodeLoggerTest, OnlyDoBeforeSendCallbackOnSenderNode) {
654 time_converter_.StartEqual();
Austin Schuh8fb4b452023-08-04 17:02:27 -0700655
656 std::vector<std::string> filenames;
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600657 {
658 LoggerState pi1_logger = MakeLogger(pi1_);
659 LoggerState pi2_logger = MakeLogger(pi2_);
660
661 event_loop_factory_.RunFor(chrono::milliseconds(95));
662
663 StartLogger(&pi1_logger);
664 StartLogger(&pi2_logger);
665
666 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -0700667
668 pi1_logger.AppendAllFilenames(&filenames);
669 pi2_logger.AppendAllFilenames(&filenames);
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600670 }
671
Austin Schuh8fb4b452023-08-04 17:02:27 -0700672 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700673 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
674 LogReader reader(sorted_parts);
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600675
676 int ping_count = 0;
677 // Adds a callback which mutates the value of the pong message before the
678 // message is sent which is the feature we are testing here
679 reader.AddBeforeSendCallback("/test",
680 [&ping_count](aos::examples::Ping *ping) {
681 ++ping_count;
682 ping->mutate_value(ping_count);
683 });
684
685 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
686 log_reader_factory.set_send_delay(chrono::microseconds(0));
687
688 reader.Register(&log_reader_factory);
689
690 const Node *pi1 =
691 configuration::GetNode(log_reader_factory.configuration(), "pi1");
692 const Node *pi2 =
693 configuration::GetNode(log_reader_factory.configuration(), "pi2");
694
695 std::unique_ptr<EventLoop> pi1_event_loop =
696 log_reader_factory.MakeEventLoop("test", pi1);
697 pi1_event_loop->SkipTimingReport();
698 std::unique_ptr<EventLoop> pi2_event_loop =
699 log_reader_factory.MakeEventLoop("test", pi2);
700 pi2_event_loop->SkipTimingReport();
701
702 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
703 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
704
705 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
706 pi1_ping_timestamp;
707 if (!shared()) {
708 pi1_ping_timestamp =
709 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
710 pi1_event_loop.get(),
711 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
712 }
713
714 log_reader_factory.Run();
715
716 EXPECT_EQ(pi1_ping.count(), 2000u);
717 EXPECT_EQ(pi2_ping.count(), 2000u);
718 // If the BeforeSendCallback is called on both nodes, then the ping count
719 // would be 4002 instead of 2001
720 EXPECT_EQ(ping_count, 2001u);
721 if (!shared()) {
722 EXPECT_EQ(pi1_ping_timestamp->count(), 2000u);
723 }
724
725 reader.Deregister();
726}
727
728// Tests that we do not allow adding callbacks after Register is called
729TEST_P(MultinodeLoggerDeathTest, AddCallbackAfterRegister) {
730 time_converter_.StartEqual();
731 std::vector<std::string> actual_filenames;
732
733 {
734 LoggerState pi1_logger = MakeLogger(pi1_);
735 LoggerState pi2_logger = MakeLogger(pi2_);
736
737 event_loop_factory_.RunFor(chrono::milliseconds(95));
738
739 StartLogger(&pi1_logger);
740 StartLogger(&pi2_logger);
741
742 event_loop_factory_.RunFor(chrono::milliseconds(20000));
743 pi1_logger.AppendAllFilenames(&actual_filenames);
744 pi2_logger.AppendAllFilenames(&actual_filenames);
745 }
746
Austin Schuh8fb4b452023-08-04 17:02:27 -0700747 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700748 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600749
750 LogReader reader(sorted_parts, &config_.message());
751 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
752 reader.Register(&log_reader_factory);
753 EXPECT_DEATH(
754 {
755 reader.AddBeforeSendCallback("/test", [](aos::examples::Pong *) {
756 LOG(FATAL) << "This should not be called";
757 });
758 },
759 "Cannot add callbacks after calling Register");
760 reader.Deregister();
761}
762
Naman Guptaa63aa132023-03-22 20:06:34 -0700763// Test that if we feed the replay with a mismatched node list that we die on
764// the LogReader constructor.
765TEST_P(MultinodeLoggerDeathTest, MultiNodeBadReplayConfig) {
766 time_converter_.StartEqual();
Austin Schuh8fb4b452023-08-04 17:02:27 -0700767
768 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -0700769 {
770 LoggerState pi1_logger = MakeLogger(pi1_);
771 LoggerState pi2_logger = MakeLogger(pi2_);
772
773 event_loop_factory_.RunFor(chrono::milliseconds(95));
774
775 StartLogger(&pi1_logger);
776 StartLogger(&pi2_logger);
777
778 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -0700779
780 pi1_logger.AppendAllFilenames(&filenames);
781 pi2_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -0700782 }
783
784 // Test that, if we add an additional node to the replay config that the
785 // logger complains about the mismatch in number of nodes.
786 FlatbufferDetachedBuffer<Configuration> extra_nodes_config =
787 configuration::MergeWithConfig(&config_.message(), R"({
788 "nodes": [
789 {
790 "name": "extra-node"
791 }
792 ]
793 }
794 )");
795
Austin Schuh8fb4b452023-08-04 17:02:27 -0700796 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700797 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -0700798 EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
799 "Log file and replay config need to have matching nodes lists.");
800}
801
802// Tests that we can read log files where they don't start at the same monotonic
803// time.
804TEST_P(MultinodeLoggerTest, StaggeredStart) {
805 time_converter_.StartEqual();
806 std::vector<std::string> actual_filenames;
807
808 {
809 LoggerState pi1_logger = MakeLogger(pi1_);
810 LoggerState pi2_logger = MakeLogger(pi2_);
811
812 event_loop_factory_.RunFor(chrono::milliseconds(95));
813
814 StartLogger(&pi1_logger);
815
816 event_loop_factory_.RunFor(chrono::milliseconds(200));
817
818 StartLogger(&pi2_logger);
819
820 event_loop_factory_.RunFor(chrono::milliseconds(20000));
821 pi1_logger.AppendAllFilenames(&actual_filenames);
822 pi2_logger.AppendAllFilenames(&actual_filenames);
823 }
824
825 // Since we delay starting pi2, it already knows about all the timestamps so
826 // we don't end up with extra parts.
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700827 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
828 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
829 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700830
831 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
832 log_reader_factory.set_send_delay(chrono::microseconds(0));
833
834 // This sends out the fetched messages and advances time to the start of the
835 // log file.
836 reader.Register(&log_reader_factory);
837
838 const Node *pi1 =
839 configuration::GetNode(log_reader_factory.configuration(), "pi1");
840 const Node *pi2 =
841 configuration::GetNode(log_reader_factory.configuration(), "pi2");
842
843 EXPECT_THAT(reader.LoggedNodes(),
844 ::testing::ElementsAre(
845 configuration::GetNode(reader.logged_configuration(), pi1),
846 configuration::GetNode(reader.logged_configuration(), pi2)));
847
848 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
849
850 std::unique_ptr<EventLoop> pi1_event_loop =
851 log_reader_factory.MakeEventLoop("test", pi1);
852 std::unique_ptr<EventLoop> pi2_event_loop =
853 log_reader_factory.MakeEventLoop("test", pi2);
854
855 int pi1_ping_count = 30;
856 int pi2_ping_count = 30;
857 int pi1_pong_count = 30;
858 int pi2_pong_count = 30;
859
860 // Confirm that the ping value matches.
861 pi1_event_loop->MakeWatcher(
862 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
863 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
864 << pi1_event_loop->context().monotonic_remote_time << " -> "
865 << pi1_event_loop->context().monotonic_event_time;
866 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
867
868 ++pi1_ping_count;
869 });
870 pi2_event_loop->MakeWatcher(
871 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
872 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
873 << pi2_event_loop->context().monotonic_remote_time << " -> "
874 << pi2_event_loop->context().monotonic_event_time;
875 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
876
877 ++pi2_ping_count;
878 });
879
880 // Confirm that the ping and pong counts both match, and the value also
881 // matches.
882 pi1_event_loop->MakeWatcher(
883 "/test", [&pi1_event_loop, &pi1_ping_count,
884 &pi1_pong_count](const examples::Pong &pong) {
885 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
886 << pi1_event_loop->context().monotonic_remote_time << " -> "
887 << pi1_event_loop->context().monotonic_event_time;
888
889 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
890 ++pi1_pong_count;
891 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
892 });
893 pi2_event_loop->MakeWatcher(
894 "/test", [&pi2_event_loop, &pi2_ping_count,
895 &pi2_pong_count](const examples::Pong &pong) {
896 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
897 << pi2_event_loop->context().monotonic_remote_time << " -> "
898 << pi2_event_loop->context().monotonic_event_time;
899
900 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
901 ++pi2_pong_count;
902 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
903 });
904
905 log_reader_factory.Run();
906 EXPECT_EQ(pi1_ping_count, 2030);
907 EXPECT_EQ(pi2_ping_count, 2030);
908 EXPECT_EQ(pi1_pong_count, 2030);
909 EXPECT_EQ(pi2_pong_count, 2030);
910
911 reader.Deregister();
912}
913
914// Tests that we can read log files where the monotonic clocks drift and don't
915// match correctly. While we are here, also test that different ending times
916// also is readable.
917TEST_P(MultinodeLoggerTest, MismatchedClocks) {
918 // TODO(austin): Negate...
919 const chrono::nanoseconds initial_pi2_offset = chrono::seconds(1000);
920
921 time_converter_.AddMonotonic(
922 {BootTimestamp::epoch(), BootTimestamp::epoch() + initial_pi2_offset});
923 // Wait for 95 ms, (~0.1 seconds - 1/2 of the ping/pong period), and set the
924 // skew to be 200 uS/s
925 const chrono::nanoseconds startup_sleep1 = time_converter_.AddMonotonic(
926 {chrono::milliseconds(95),
927 chrono::milliseconds(95) - chrono::nanoseconds(200) * 95});
928 // Run another 200 ms to have one logger start first.
929 const chrono::nanoseconds startup_sleep2 = time_converter_.AddMonotonic(
930 {chrono::milliseconds(200), chrono::milliseconds(200)});
931 // Slew one way then the other at the same 200 uS/S slew rate. Make sure we
932 // go far enough to cause problems if this isn't accounted for.
933 const chrono::nanoseconds logger_run1 = time_converter_.AddMonotonic(
934 {chrono::milliseconds(20000),
935 chrono::milliseconds(20000) - chrono::nanoseconds(200) * 20000});
936 const chrono::nanoseconds logger_run2 = time_converter_.AddMonotonic(
937 {chrono::milliseconds(40000),
938 chrono::milliseconds(40000) + chrono::nanoseconds(200) * 40000});
939 const chrono::nanoseconds logger_run3 = time_converter_.AddMonotonic(
940 {chrono::milliseconds(400), chrono::milliseconds(400)});
941
Austin Schuh8fb4b452023-08-04 17:02:27 -0700942 std::vector<std::string> actual_filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -0700943 {
944 LoggerState pi2_logger = MakeLogger(pi2_);
945
946 LOG(INFO) << "pi2 times: " << pi2_->monotonic_now() << " "
947 << pi2_->realtime_now() << " distributed "
948 << pi2_->ToDistributedClock(pi2_->monotonic_now());
949
950 LOG(INFO) << "pi2_ times: " << pi2_->monotonic_now() << " "
951 << pi2_->realtime_now() << " distributed "
952 << pi2_->ToDistributedClock(pi2_->monotonic_now());
953
954 event_loop_factory_.RunFor(startup_sleep1);
955
956 StartLogger(&pi2_logger);
957
958 event_loop_factory_.RunFor(startup_sleep2);
959
960 {
961 // Run pi1's logger for only part of the time.
962 LoggerState pi1_logger = MakeLogger(pi1_);
963
964 StartLogger(&pi1_logger);
965 event_loop_factory_.RunFor(logger_run1);
966
967 // Make sure we slewed time far enough so that the difference is greater
968 // than the network delay. This confirms that if we sort incorrectly, it
969 // would show in the results.
970 EXPECT_LT(
971 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
972 -event_loop_factory_.send_delay() -
973 event_loop_factory_.network_delay());
974
975 event_loop_factory_.RunFor(logger_run2);
976
977 // And now check that we went far enough the other way to make sure we
978 // cover both problems.
979 EXPECT_GT(
980 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
981 event_loop_factory_.send_delay() +
982 event_loop_factory_.network_delay());
Austin Schuh8fb4b452023-08-04 17:02:27 -0700983
984 pi1_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -0700985 }
986
987 // And log a bit more on pi2.
988 event_loop_factory_.RunFor(logger_run3);
Austin Schuh8fb4b452023-08-04 17:02:27 -0700989
990 pi2_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -0700991 }
992
Austin Schuh8fb4b452023-08-04 17:02:27 -0700993 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700994 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
995 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700996
997 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
998 log_reader_factory.set_send_delay(chrono::microseconds(0));
999
1000 const Node *pi1 =
1001 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1002 const Node *pi2 =
1003 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1004
1005 // This sends out the fetched messages and advances time to the start of the
1006 // log file.
1007 reader.Register(&log_reader_factory);
1008
1009 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1010 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1011 LOG(INFO) << "now pi1 "
1012 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1013 LOG(INFO) << "now pi2 "
1014 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1015
1016 LOG(INFO) << "Done registering (pi1) "
1017 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
1018 << " "
1019 << log_reader_factory.GetNodeEventLoopFactory(pi1)->realtime_now();
1020 LOG(INFO) << "Done registering (pi2) "
1021 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now()
1022 << " "
1023 << log_reader_factory.GetNodeEventLoopFactory(pi2)->realtime_now();
1024
1025 EXPECT_THAT(reader.LoggedNodes(),
1026 ::testing::ElementsAre(
1027 configuration::GetNode(reader.logged_configuration(), pi1),
1028 configuration::GetNode(reader.logged_configuration(), pi2)));
1029
1030 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1031
1032 std::unique_ptr<EventLoop> pi1_event_loop =
1033 log_reader_factory.MakeEventLoop("test", pi1);
1034 std::unique_ptr<EventLoop> pi2_event_loop =
1035 log_reader_factory.MakeEventLoop("test", pi2);
1036
1037 int pi1_ping_count = 30;
1038 int pi2_ping_count = 30;
1039 int pi1_pong_count = 30;
1040 int pi2_pong_count = 30;
1041
1042 // Confirm that the ping value matches.
1043 pi1_event_loop->MakeWatcher(
1044 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
1045 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
1046 << pi1_event_loop->context().monotonic_remote_time << " -> "
1047 << pi1_event_loop->context().monotonic_event_time;
1048 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
1049
1050 ++pi1_ping_count;
1051 });
1052 pi2_event_loop->MakeWatcher(
1053 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
1054 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
1055 << pi2_event_loop->context().monotonic_remote_time << " -> "
1056 << pi2_event_loop->context().monotonic_event_time;
1057 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
1058
1059 ++pi2_ping_count;
1060 });
1061
1062 // Confirm that the ping and pong counts both match, and the value also
1063 // matches.
1064 pi1_event_loop->MakeWatcher(
1065 "/test", [&pi1_event_loop, &pi1_ping_count,
1066 &pi1_pong_count](const examples::Pong &pong) {
1067 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
1068 << pi1_event_loop->context().monotonic_remote_time << " -> "
1069 << pi1_event_loop->context().monotonic_event_time;
1070
1071 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
1072 ++pi1_pong_count;
1073 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
1074 });
1075 pi2_event_loop->MakeWatcher(
1076 "/test", [&pi2_event_loop, &pi2_ping_count,
1077 &pi2_pong_count](const examples::Pong &pong) {
1078 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
1079 << pi2_event_loop->context().monotonic_remote_time << " -> "
1080 << pi2_event_loop->context().monotonic_event_time;
1081
1082 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
1083 ++pi2_pong_count;
1084 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
1085 });
1086
1087 log_reader_factory.Run();
1088 EXPECT_EQ(pi1_ping_count, 6030);
1089 EXPECT_EQ(pi2_ping_count, 6030);
1090 EXPECT_EQ(pi1_pong_count, 6030);
1091 EXPECT_EQ(pi2_pong_count, 6030);
1092
1093 reader.Deregister();
1094}
1095
1096// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
1097TEST_P(MultinodeLoggerTest, SortParts) {
1098 time_converter_.StartEqual();
1099 // Make a bunch of parts.
1100 {
1101 LoggerState pi1_logger = MakeLogger(pi1_);
1102 LoggerState pi2_logger = MakeLogger(pi2_);
1103
1104 event_loop_factory_.RunFor(chrono::milliseconds(95));
1105
1106 StartLogger(&pi1_logger);
1107 StartLogger(&pi2_logger);
1108
1109 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1110 }
1111
1112 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1113 VerifyParts(sorted_parts);
1114}
1115
1116// Tests that we can sort a bunch of parts with an empty part. We should ignore
1117// it and remove it from the sorted list.
1118TEST_P(MultinodeLoggerTest, SortEmptyParts) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07001119 std::vector<std::string> actual_filenames;
1120
Naman Guptaa63aa132023-03-22 20:06:34 -07001121 time_converter_.StartEqual();
1122 // Make a bunch of parts.
1123 {
1124 LoggerState pi1_logger = MakeLogger(pi1_);
1125 LoggerState pi2_logger = MakeLogger(pi2_);
1126
1127 event_loop_factory_.RunFor(chrono::milliseconds(95));
1128
1129 StartLogger(&pi1_logger);
1130 StartLogger(&pi2_logger);
1131
1132 event_loop_factory_.RunFor(chrono::milliseconds(2000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07001133 pi1_logger.AppendAllFilenames(&actual_filenames);
1134 pi2_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001135 }
1136
1137 // TODO(austin): Should we flip out if the file can't open?
1138 const std::string kEmptyFile("foobarinvalidfiledoesnotexist" + Extension());
1139
1140 aos::util::WriteStringToFileOrDie(kEmptyFile, "");
Austin Schuh8fb4b452023-08-04 17:02:27 -07001141 actual_filenames.emplace_back(kEmptyFile);
Naman Guptaa63aa132023-03-22 20:06:34 -07001142
Austin Schuh8fb4b452023-08-04 17:02:27 -07001143 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001144 VerifyParts(sorted_parts, {kEmptyFile});
1145}
1146
1147// Tests that we can sort a bunch of parts with the end missing off a
1148// file. We should use the part we can read.
1149TEST_P(MultinodeLoggerTest, SortTruncatedParts) {
Austin Schuh6ecfe902023-08-04 22:44:37 -07001150 if (file_strategy() == FileStrategy::kCombine) {
1151 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
1152 }
1153
Naman Guptaa63aa132023-03-22 20:06:34 -07001154 std::vector<std::string> actual_filenames;
1155 time_converter_.StartEqual();
1156 // Make a bunch of parts.
1157 {
1158 LoggerState pi1_logger = MakeLogger(pi1_);
1159 LoggerState pi2_logger = MakeLogger(pi2_);
1160
1161 event_loop_factory_.RunFor(chrono::milliseconds(95));
1162
1163 StartLogger(&pi1_logger);
1164 StartLogger(&pi2_logger);
1165
1166 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1167
1168 pi1_logger.AppendAllFilenames(&actual_filenames);
1169 pi2_logger.AppendAllFilenames(&actual_filenames);
1170 }
1171
1172 ASSERT_THAT(actual_filenames,
1173 ::testing::UnorderedElementsAreArray(logfiles_));
1174
1175 // Strip off the end of one of the files. Pick one with a lot of data.
1176 // For snappy, needs to have enough data to be >1 chunk of compressed data so
1177 // that we don't corrupt the entire log part.
1178 ::std::string compressed_contents =
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07001179 aos::util::ReadFileToStringOrDie(logfiles_[2]);
Naman Guptaa63aa132023-03-22 20:06:34 -07001180
1181 aos::util::WriteStringToFileOrDie(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07001182 logfiles_[2],
Naman Guptaa63aa132023-03-22 20:06:34 -07001183 compressed_contents.substr(0, compressed_contents.size() - 100));
1184
1185 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1186 VerifyParts(sorted_parts);
1187}
1188
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001189// Tests that if we remap a logged channel, it shows up correctly.
Naman Guptaa63aa132023-03-22 20:06:34 -07001190TEST_P(MultinodeLoggerTest, RemapLoggedChannel) {
1191 time_converter_.StartEqual();
Austin Schuh8fb4b452023-08-04 17:02:27 -07001192
1193 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07001194 {
1195 LoggerState pi1_logger = MakeLogger(pi1_);
1196 LoggerState pi2_logger = MakeLogger(pi2_);
1197
1198 event_loop_factory_.RunFor(chrono::milliseconds(95));
1199
1200 StartLogger(&pi1_logger);
1201 StartLogger(&pi2_logger);
1202
1203 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07001204
1205 pi1_logger.AppendAllFilenames(&filenames);
1206 pi2_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001207 }
1208
Austin Schuh8fb4b452023-08-04 17:02:27 -07001209 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001210 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1211 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001212
1213 // Remap just on pi1.
1214 reader.RemapLoggedChannel<aos::timing::Report>(
1215 "/aos", configuration::GetNode(reader.configuration(), "pi1"));
1216
1217 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1218 log_reader_factory.set_send_delay(chrono::microseconds(0));
1219
1220 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1221 // Note: An extra channel gets remapped automatically due to a timestamp
1222 // channel being LOCAL_LOGGER'd.
1223 ASSERT_EQ(remapped_channels.size(), std::get<0>(GetParam()).shared ? 1u : 2u);
1224 EXPECT_EQ(remapped_channels[0]->name()->string_view(), "/original/pi1/aos");
1225 EXPECT_EQ(remapped_channels[0]->type()->string_view(), "aos.timing.Report");
1226 if (!std::get<0>(GetParam()).shared) {
1227 EXPECT_EQ(remapped_channels[1]->name()->string_view(),
1228 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1229 "aos-message_bridge-Timestamp");
1230 EXPECT_EQ(remapped_channels[1]->type()->string_view(),
1231 "aos.message_bridge.RemoteMessage");
1232 }
1233
1234 reader.Register(&log_reader_factory);
1235
1236 const Node *pi1 =
1237 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1238 const Node *pi2 =
1239 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1240
1241 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1242 // else should have moved.
1243 std::unique_ptr<EventLoop> pi1_event_loop =
1244 log_reader_factory.MakeEventLoop("test", pi1);
1245 pi1_event_loop->SkipTimingReport();
1246 std::unique_ptr<EventLoop> full_pi1_event_loop =
1247 log_reader_factory.MakeEventLoop("test", pi1);
1248 full_pi1_event_loop->SkipTimingReport();
1249 std::unique_ptr<EventLoop> pi2_event_loop =
1250 log_reader_factory.MakeEventLoop("test", pi2);
1251 pi2_event_loop->SkipTimingReport();
1252
1253 MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
1254 "/aos");
1255 MessageCounter<aos::timing::Report> full_pi1_timing_report(
1256 full_pi1_event_loop.get(), "/pi1/aos");
1257 MessageCounter<aos::timing::Report> pi1_original_timing_report(
1258 pi1_event_loop.get(), "/original/aos");
1259 MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
1260 full_pi1_event_loop.get(), "/original/pi1/aos");
1261 MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
1262 "/aos");
1263
1264 log_reader_factory.Run();
1265
1266 EXPECT_EQ(pi1_timing_report.count(), 0u);
1267 EXPECT_EQ(full_pi1_timing_report.count(), 0u);
1268 EXPECT_NE(pi1_original_timing_report.count(), 0u);
1269 EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
1270 EXPECT_NE(pi2_timing_report.count(), 0u);
1271
1272 reader.Deregister();
1273}
1274
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001275// Tests that if we rename a logged channel, it shows up correctly.
1276TEST_P(MultinodeLoggerTest, RenameLoggedChannel) {
1277 std::vector<std::string> actual_filenames;
1278 time_converter_.StartEqual();
1279 {
1280 LoggerState pi1_logger = MakeLogger(pi1_);
1281 LoggerState pi2_logger = MakeLogger(pi2_);
1282
1283 event_loop_factory_.RunFor(chrono::milliseconds(95));
1284
1285 StartLogger(&pi1_logger);
1286 StartLogger(&pi2_logger);
1287
1288 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1289
1290 pi1_logger.AppendAllFilenames(&actual_filenames);
1291 pi2_logger.AppendAllFilenames(&actual_filenames);
1292 }
1293
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001294 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1295 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1296 LogReader reader(sorted_parts);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001297
1298 // Rename just on pi2. Add some global maps just to verify they get added in
1299 // the config and used correctly.
1300 std::vector<MapT> maps;
1301 {
1302 MapT map;
1303 map.match = std::make_unique<ChannelT>();
1304 map.match->name = "/foo*";
1305 map.match->source_node = "pi1";
1306 map.rename = std::make_unique<ChannelT>();
1307 map.rename->name = "/pi1/foo";
1308 maps.emplace_back(std::move(map));
1309 }
1310 {
1311 MapT map;
1312 map.match = std::make_unique<ChannelT>();
1313 map.match->name = "/foo*";
1314 map.match->source_node = "pi2";
1315 map.rename = std::make_unique<ChannelT>();
1316 map.rename->name = "/pi2/foo";
1317 maps.emplace_back(std::move(map));
1318 }
1319 {
1320 MapT map;
1321 map.match = std::make_unique<ChannelT>();
1322 map.match->name = "/foo";
1323 map.match->type = "aos.examples.Ping";
1324 map.rename = std::make_unique<ChannelT>();
1325 map.rename->name = "/foo/renamed";
1326 maps.emplace_back(std::move(map));
1327 }
1328 reader.RenameLoggedChannel<aos::examples::Ping>(
1329 "/aos", configuration::GetNode(reader.configuration(), "pi2"),
1330 "/pi2/foo/renamed", maps);
1331
1332 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1333 log_reader_factory.set_send_delay(chrono::microseconds(0));
1334
1335 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1336 // Note: An extra channel gets remapped automatically due to a timestamp
1337 // channel being LOCAL_LOGGER'd.
1338 const bool shared = std::get<0>(GetParam()).shared;
1339 ASSERT_EQ(remapped_channels.size(), shared ? 1u : 2u);
1340 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->name()->string_view(),
1341 "/pi2/foo/renamed");
1342 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->type()->string_view(),
1343 "aos.examples.Ping");
1344 if (!shared) {
1345 EXPECT_EQ(remapped_channels[0]->name()->string_view(),
1346 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1347 "aos-message_bridge-Timestamp");
1348 EXPECT_EQ(remapped_channels[0]->type()->string_view(),
1349 "aos.message_bridge.RemoteMessage");
1350 }
1351
1352 reader.Register(&log_reader_factory);
1353
1354 const Node *pi1 =
1355 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1356 const Node *pi2 =
1357 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1358
1359 // Confirm we can read the data on the renamed channel, just for pi2. Nothing
1360 // else should have moved.
1361 std::unique_ptr<EventLoop> pi2_event_loop =
1362 log_reader_factory.MakeEventLoop("test", pi2);
1363 pi2_event_loop->SkipTimingReport();
1364 std::unique_ptr<EventLoop> full_pi2_event_loop =
1365 log_reader_factory.MakeEventLoop("test", pi2);
1366 full_pi2_event_loop->SkipTimingReport();
1367 std::unique_ptr<EventLoop> pi1_event_loop =
1368 log_reader_factory.MakeEventLoop("test", pi1);
1369 pi1_event_loop->SkipTimingReport();
1370
1371 MessageCounter<aos::examples::Ping> pi2_ping(pi2_event_loop.get(), "/aos");
1372 MessageCounter<aos::examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1373 "/foo");
1374 MessageCounter<aos::examples::Ping> full_pi2_renamed_ping(
1375 full_pi2_event_loop.get(), "/pi2/foo/renamed");
1376 MessageCounter<aos::examples::Ping> pi1_ping(pi1_event_loop.get(), "/aos");
1377
1378 log_reader_factory.Run();
1379
1380 EXPECT_EQ(pi2_ping.count(), 0u);
1381 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1382 EXPECT_NE(full_pi2_renamed_ping.count(), 0u);
1383 EXPECT_NE(pi1_ping.count(), 0u);
1384
1385 reader.Deregister();
1386}
1387
Naman Guptaa63aa132023-03-22 20:06:34 -07001388// Tests that we can remap a forwarded channel as well.
1389TEST_P(MultinodeLoggerTest, RemapForwardedLoggedChannel) {
1390 time_converter_.StartEqual();
1391 {
1392 LoggerState pi1_logger = MakeLogger(pi1_);
1393 LoggerState pi2_logger = MakeLogger(pi2_);
1394
1395 event_loop_factory_.RunFor(chrono::milliseconds(95));
1396
1397 StartLogger(&pi1_logger);
1398 StartLogger(&pi2_logger);
1399
1400 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1401 }
1402
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001403 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1404 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1405 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001406
1407 reader.RemapLoggedChannel<examples::Ping>("/test");
1408
1409 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1410 log_reader_factory.set_send_delay(chrono::microseconds(0));
1411
1412 reader.Register(&log_reader_factory);
1413
1414 const Node *pi1 =
1415 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1416 const Node *pi2 =
1417 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1418
1419 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1420 // else should have moved.
1421 std::unique_ptr<EventLoop> pi1_event_loop =
1422 log_reader_factory.MakeEventLoop("test", pi1);
1423 pi1_event_loop->SkipTimingReport();
1424 std::unique_ptr<EventLoop> full_pi1_event_loop =
1425 log_reader_factory.MakeEventLoop("test", pi1);
1426 full_pi1_event_loop->SkipTimingReport();
1427 std::unique_ptr<EventLoop> pi2_event_loop =
1428 log_reader_factory.MakeEventLoop("test", pi2);
1429 pi2_event_loop->SkipTimingReport();
1430
1431 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1432 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1433 MessageCounter<examples::Ping> pi1_original_ping(pi1_event_loop.get(),
1434 "/original/test");
1435 MessageCounter<examples::Ping> pi2_original_ping(pi2_event_loop.get(),
1436 "/original/test");
1437
1438 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1439 pi1_original_ping_timestamp;
1440 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1441 pi1_ping_timestamp;
1442 if (!shared()) {
1443 pi1_original_ping_timestamp =
1444 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1445 pi1_event_loop.get(),
1446 "/pi1/aos/remote_timestamps/pi2/original/test/aos-examples-Ping");
1447 pi1_ping_timestamp =
1448 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1449 pi1_event_loop.get(),
1450 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1451 }
1452
1453 log_reader_factory.Run();
1454
1455 EXPECT_EQ(pi1_ping.count(), 0u);
1456 EXPECT_EQ(pi2_ping.count(), 0u);
1457 EXPECT_NE(pi1_original_ping.count(), 0u);
1458 EXPECT_NE(pi2_original_ping.count(), 0u);
1459 if (!shared()) {
1460 EXPECT_NE(pi1_original_ping_timestamp->count(), 0u);
1461 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1462 }
1463
1464 reader.Deregister();
1465}
1466
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001467// Tests that we can rename a forwarded channel as well.
1468TEST_P(MultinodeLoggerTest, RenameForwardedLoggedChannel) {
1469 std::vector<std::string> actual_filenames;
1470 time_converter_.StartEqual();
1471 {
1472 LoggerState pi1_logger = MakeLogger(pi1_);
1473 LoggerState pi2_logger = MakeLogger(pi2_);
1474
1475 event_loop_factory_.RunFor(chrono::milliseconds(95));
1476
1477 StartLogger(&pi1_logger);
1478 StartLogger(&pi2_logger);
1479
1480 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1481
1482 pi1_logger.AppendAllFilenames(&actual_filenames);
1483 pi2_logger.AppendAllFilenames(&actual_filenames);
1484 }
1485
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001486 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1487 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1488 LogReader reader(sorted_parts);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001489
1490 std::vector<MapT> maps;
1491 {
1492 MapT map;
1493 map.match = std::make_unique<ChannelT>();
1494 map.match->name = "/production*";
1495 map.match->source_node = "pi1";
1496 map.rename = std::make_unique<ChannelT>();
1497 map.rename->name = "/pi1/production";
1498 maps.emplace_back(std::move(map));
1499 }
1500 {
1501 MapT map;
1502 map.match = std::make_unique<ChannelT>();
1503 map.match->name = "/production*";
1504 map.match->source_node = "pi2";
1505 map.rename = std::make_unique<ChannelT>();
1506 map.rename->name = "/pi2/production";
1507 maps.emplace_back(std::move(map));
1508 }
1509 reader.RenameLoggedChannel<aos::examples::Ping>(
1510 "/test", configuration::GetNode(reader.configuration(), "pi1"),
1511 "/pi1/production", maps);
1512
1513 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1514 log_reader_factory.set_send_delay(chrono::microseconds(0));
1515
1516 reader.Register(&log_reader_factory);
1517
1518 const Node *pi1 =
1519 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1520 const Node *pi2 =
1521 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1522
1523 // Confirm we can read the data on the renamed channel, on both the source
1524 // node and the remote node. In case of split timestamp channels, confirm that
1525 // we receive the timestamp messages on the renamed channel as well.
1526 std::unique_ptr<EventLoop> pi1_event_loop =
1527 log_reader_factory.MakeEventLoop("test", pi1);
1528 pi1_event_loop->SkipTimingReport();
1529 std::unique_ptr<EventLoop> full_pi1_event_loop =
1530 log_reader_factory.MakeEventLoop("test", pi1);
1531 full_pi1_event_loop->SkipTimingReport();
1532 std::unique_ptr<EventLoop> pi2_event_loop =
1533 log_reader_factory.MakeEventLoop("test", pi2);
1534 pi2_event_loop->SkipTimingReport();
1535
1536 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1537 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1538 MessageCounter<examples::Ping> pi1_renamed_ping(pi1_event_loop.get(),
1539 "/pi1/production");
1540 MessageCounter<examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1541 "/pi1/production");
1542
1543 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1544 pi1_renamed_ping_timestamp;
1545 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1546 pi1_ping_timestamp;
1547 if (!shared()) {
1548 pi1_renamed_ping_timestamp =
1549 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1550 pi1_event_loop.get(),
1551 "/pi1/aos/remote_timestamps/pi2/pi1/production/aos-examples-Ping");
1552 pi1_ping_timestamp =
1553 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1554 pi1_event_loop.get(),
1555 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1556 }
1557
1558 log_reader_factory.Run();
1559
1560 EXPECT_EQ(pi1_ping.count(), 0u);
1561 EXPECT_EQ(pi2_ping.count(), 0u);
1562 EXPECT_NE(pi1_renamed_ping.count(), 0u);
1563 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1564 if (!shared()) {
1565 EXPECT_NE(pi1_renamed_ping_timestamp->count(), 0u);
1566 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1567 }
1568
1569 reader.Deregister();
1570}
1571
Naman Guptaa63aa132023-03-22 20:06:34 -07001572// Tests that we observe all the same events in log replay (for a given node)
1573// whether we just register an event loop for that node or if we register a full
1574// event loop factory.
1575TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
1576 time_converter_.StartEqual();
1577 constexpr chrono::milliseconds kStartupDelay(95);
Austin Schuh8fb4b452023-08-04 17:02:27 -07001578 std::vector<std::string> filenames;
1579
Naman Guptaa63aa132023-03-22 20:06:34 -07001580 {
1581 LoggerState pi1_logger = MakeLogger(pi1_);
1582 LoggerState pi2_logger = MakeLogger(pi2_);
1583
1584 event_loop_factory_.RunFor(kStartupDelay);
1585
1586 StartLogger(&pi1_logger);
1587 StartLogger(&pi2_logger);
1588
1589 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07001590
1591 pi1_logger.AppendAllFilenames(&filenames);
1592 pi2_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001593 }
1594
Austin Schuh8fb4b452023-08-04 17:02:27 -07001595 LogReader full_reader(SortParts(filenames));
1596 LogReader single_node_reader(SortParts(filenames));
Naman Guptaa63aa132023-03-22 20:06:34 -07001597
1598 SimulatedEventLoopFactory full_factory(full_reader.configuration());
1599 SimulatedEventLoopFactory single_node_factory(
1600 single_node_reader.configuration());
1601 single_node_factory.SkipTimingReport();
1602 single_node_factory.DisableStatistics();
1603 std::unique_ptr<EventLoop> replay_event_loop =
1604 single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
1605 "log_reader");
1606
1607 full_reader.Register(&full_factory);
1608 single_node_reader.Register(replay_event_loop.get());
1609
1610 const Node *full_pi1 =
1611 configuration::GetNode(full_factory.configuration(), "pi1");
1612
1613 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1614 // else should have moved.
1615 std::unique_ptr<EventLoop> full_event_loop =
1616 full_factory.MakeEventLoop("test", full_pi1);
1617 full_event_loop->SkipTimingReport();
1618 full_event_loop->SkipAosLog();
1619 // maps are indexed on channel index.
1620 // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
1621 std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
1622 observed_messages;
1623 std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
1624 for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
1625 ++ii) {
1626 const Channel *channel =
1627 full_event_loop->configuration()->channels()->Get(ii);
1628 // We currently don't support replaying remote timestamp channels in
1629 // realtime replay (unless the remote timestamp channel was not NOT_LOGGED,
1630 // in which case it gets auto-remapped and replayed on a /original channel).
1631 if (channel->name()->string_view().find("remote_timestamp") !=
1632 std::string_view::npos &&
1633 channel->name()->string_view().find("/original") ==
1634 std::string_view::npos) {
1635 continue;
1636 }
1637 if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
1638 observed_messages[ii] = {};
1639 fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
1640 full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
1641 if (fetchers[ii]->Fetch()) {
1642 observed_messages[ii].push_back(std::make_pair(
1643 fetchers[ii]->context().monotonic_event_time, true));
1644 }
1645 });
1646 full_event_loop->MakeRawNoArgWatcher(
1647 channel, [ii, &observed_messages](const Context &context) {
1648 observed_messages[ii].push_back(
1649 std::make_pair(context.monotonic_event_time, false));
1650 });
1651 }
1652 }
1653
1654 full_factory.Run();
1655 fetchers.clear();
1656 full_reader.Deregister();
1657
1658 const Node *single_node_pi1 =
1659 configuration::GetNode(single_node_factory.configuration(), "pi1");
1660 std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
1661
1662 std::unique_ptr<EventLoop> single_node_event_loop =
1663 single_node_factory.MakeEventLoop("test", single_node_pi1);
1664 single_node_event_loop->SkipTimingReport();
1665 single_node_event_loop->SkipAosLog();
1666 for (size_t ii = 0;
1667 ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
1668 const Channel *channel =
1669 single_node_event_loop->configuration()->channels()->Get(ii);
1670 single_node_factory.DisableForwarding(channel);
1671 if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
1672 single_node_fetchers[ii] =
1673 single_node_event_loop->MakeRawFetcher(channel);
1674 single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
1675 EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
1676 << "Single EventLoop replay doesn't support pre-loading fetchers. "
1677 << configuration::StrippedChannelToString(channel);
1678 });
1679 single_node_event_loop->MakeRawNoArgWatcher(
1680 channel, [ii, &observed_messages, channel,
1681 kStartupDelay](const Context &context) {
1682 if (observed_messages[ii].empty()) {
1683 FAIL() << "Observed extra message at "
1684 << context.monotonic_event_time << " on "
1685 << configuration::StrippedChannelToString(channel);
1686 return;
1687 }
1688 const std::pair<monotonic_clock::time_point, bool> &message =
1689 observed_messages[ii].front();
1690 if (message.second) {
1691 EXPECT_LE(message.first,
1692 context.monotonic_event_time + kStartupDelay)
1693 << "Mismatched message times " << context.monotonic_event_time
1694 << " and " << message.first << " on "
1695 << configuration::StrippedChannelToString(channel);
1696 } else {
1697 EXPECT_EQ(message.first,
1698 context.monotonic_event_time + kStartupDelay)
1699 << "Mismatched message times " << context.monotonic_event_time
1700 << " and " << message.first << " on "
1701 << configuration::StrippedChannelToString(channel);
1702 }
1703 observed_messages[ii].erase(observed_messages[ii].begin());
1704 });
1705 }
1706 }
1707
1708 single_node_factory.Run();
1709
1710 single_node_fetchers.clear();
1711
1712 single_node_reader.Deregister();
1713
1714 for (const auto &pair : observed_messages) {
1715 EXPECT_TRUE(pair.second.empty())
1716 << "Missed " << pair.second.size() << " messages on "
1717 << configuration::StrippedChannelToString(
1718 single_node_event_loop->configuration()->channels()->Get(
1719 pair.first));
1720 }
1721}
1722
1723// Tests that we properly recreate forwarded timestamps when replaying a log.
1724// This should be enough that we can then re-run the logger and get a valid log
1725// back.
1726TEST_P(MultinodeLoggerTest, MessageHeader) {
1727 time_converter_.StartEqual();
1728 {
1729 LoggerState pi1_logger = MakeLogger(pi1_);
1730 LoggerState pi2_logger = MakeLogger(pi2_);
1731
1732 event_loop_factory_.RunFor(chrono::milliseconds(95));
1733
1734 StartLogger(&pi1_logger);
1735 StartLogger(&pi2_logger);
1736
1737 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1738 }
1739
1740 LogReader reader(SortParts(logfiles_));
1741
1742 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1743 log_reader_factory.set_send_delay(chrono::microseconds(0));
1744
1745 // This sends out the fetched messages and advances time to the start of the
1746 // log file.
1747 reader.Register(&log_reader_factory);
1748
1749 const Node *pi1 =
1750 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1751 const Node *pi2 =
1752 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1753
1754 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1755 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1756 LOG(INFO) << "now pi1 "
1757 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1758 LOG(INFO) << "now pi2 "
1759 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1760
1761 EXPECT_THAT(reader.LoggedNodes(),
1762 ::testing::ElementsAre(
1763 configuration::GetNode(reader.logged_configuration(), pi1),
1764 configuration::GetNode(reader.logged_configuration(), pi2)));
1765
1766 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1767
1768 std::unique_ptr<EventLoop> pi1_event_loop =
1769 log_reader_factory.MakeEventLoop("test", pi1);
1770 std::unique_ptr<EventLoop> pi2_event_loop =
1771 log_reader_factory.MakeEventLoop("test", pi2);
1772
1773 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
1774 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1775 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
1776 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1777
1778 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1779 pi1_event_loop->MakeFetcher<examples::Ping>("/test");
1780 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1781 pi2_event_loop->MakeFetcher<examples::Ping>("/test");
1782
1783 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
1784 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1785 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
1786 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1787
1788 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1789 pi2_event_loop->MakeFetcher<examples::Pong>("/test");
1790 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1791 pi1_event_loop->MakeFetcher<examples::Pong>("/test");
1792
1793 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
1794 pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
1795 const size_t ping_timestamp_channel = configuration::ChannelIndex(
1796 pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
1797
1798 const size_t pi2_timestamp_channel = configuration::ChannelIndex(
1799 pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
1800 const size_t pong_timestamp_channel = configuration::ChannelIndex(
1801 pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
1802
1803 const chrono::nanoseconds network_delay = event_loop_factory_.network_delay();
1804 const chrono::nanoseconds send_delay = event_loop_factory_.send_delay();
1805
1806 for (std::pair<int, std::string> channel :
1807 shared()
1808 ? std::vector<
1809 std::pair<int, std::string>>{{-1,
1810 "/aos/remote_timestamps/pi2"}}
1811 : std::vector<std::pair<int, std::string>>{
1812 {pi1_timestamp_channel,
1813 "/aos/remote_timestamps/pi2/pi1/aos/"
1814 "aos-message_bridge-Timestamp"},
1815 {ping_timestamp_channel,
1816 "/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
1817 pi1_event_loop->MakeWatcher(
1818 channel.second,
1819 [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
1820 ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
1821 &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
1822 &ping_on_pi2_fetcher, network_delay, send_delay,
1823 channel_index = channel.first](const RemoteMessage &header) {
1824 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1825 chrono::nanoseconds(header.monotonic_sent_time()));
1826 const aos::realtime_clock::time_point header_realtime_sent_time(
1827 chrono::nanoseconds(header.realtime_sent_time()));
1828 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1829 chrono::nanoseconds(header.monotonic_remote_time()));
1830 const aos::realtime_clock::time_point header_realtime_remote_time(
1831 chrono::nanoseconds(header.realtime_remote_time()));
1832
1833 if (channel_index != -1) {
1834 ASSERT_EQ(channel_index, header.channel_index());
1835 }
1836
1837 const Context *pi1_context = nullptr;
1838 const Context *pi2_context = nullptr;
1839
1840 if (header.channel_index() == pi1_timestamp_channel) {
1841 ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
1842 ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
1843 pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
1844 pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
1845 } else if (header.channel_index() == ping_timestamp_channel) {
1846 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
1847 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
1848 pi1_context = &ping_on_pi1_fetcher.context();
1849 pi2_context = &ping_on_pi2_fetcher.context();
1850 } else {
1851 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1852 << configuration::CleanedChannelToString(
1853 pi1_event_loop->configuration()->channels()->Get(
1854 header.channel_index()));
1855 }
1856
1857 ASSERT_TRUE(header.has_boot_uuid());
1858 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1859 pi2_event_loop->boot_uuid());
1860
1861 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
1862 EXPECT_EQ(pi2_context->remote_queue_index,
1863 header.remote_queue_index());
1864 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
1865
1866 EXPECT_EQ(pi2_context->monotonic_event_time,
1867 header_monotonic_sent_time);
1868 EXPECT_EQ(pi2_context->realtime_event_time,
1869 header_realtime_sent_time);
1870 EXPECT_EQ(pi2_context->realtime_remote_time,
1871 header_realtime_remote_time);
1872 EXPECT_EQ(pi2_context->monotonic_remote_time,
1873 header_monotonic_remote_time);
1874
1875 EXPECT_EQ(pi1_context->realtime_event_time,
1876 header_realtime_remote_time);
1877 EXPECT_EQ(pi1_context->monotonic_event_time,
1878 header_monotonic_remote_time);
1879
1880 // Time estimation isn't perfect, but we know the clocks were
1881 // identical when logged, so we know when this should have come back.
1882 // Confirm we got it when we expected.
1883 EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
1884 pi1_context->monotonic_event_time + 2 * network_delay +
1885 send_delay);
1886 });
1887 }
1888 for (std::pair<int, std::string> channel :
1889 shared()
1890 ? std::vector<
1891 std::pair<int, std::string>>{{-1,
1892 "/aos/remote_timestamps/pi1"}}
1893 : std::vector<std::pair<int, std::string>>{
1894 {pi2_timestamp_channel,
1895 "/aos/remote_timestamps/pi1/pi2/aos/"
1896 "aos-message_bridge-Timestamp"}}) {
1897 pi2_event_loop->MakeWatcher(
1898 channel.second,
1899 [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
1900 pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
1901 &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
1902 &pong_on_pi1_fetcher, network_delay, send_delay,
1903 channel_index = channel.first](const RemoteMessage &header) {
1904 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1905 chrono::nanoseconds(header.monotonic_sent_time()));
1906 const aos::realtime_clock::time_point header_realtime_sent_time(
1907 chrono::nanoseconds(header.realtime_sent_time()));
1908 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1909 chrono::nanoseconds(header.monotonic_remote_time()));
1910 const aos::realtime_clock::time_point header_realtime_remote_time(
1911 chrono::nanoseconds(header.realtime_remote_time()));
1912
1913 if (channel_index != -1) {
1914 ASSERT_EQ(channel_index, header.channel_index());
1915 }
1916
1917 const Context *pi2_context = nullptr;
1918 const Context *pi1_context = nullptr;
1919
1920 if (header.channel_index() == pi2_timestamp_channel) {
1921 ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
1922 ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
1923 pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
1924 pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
1925 } else if (header.channel_index() == pong_timestamp_channel) {
1926 ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
1927 ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
1928 pi2_context = &pong_on_pi2_fetcher.context();
1929 pi1_context = &pong_on_pi1_fetcher.context();
1930 } else {
1931 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1932 << configuration::CleanedChannelToString(
1933 pi2_event_loop->configuration()->channels()->Get(
1934 header.channel_index()));
1935 }
1936
1937 ASSERT_TRUE(header.has_boot_uuid());
1938 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1939 pi1_event_loop->boot_uuid());
1940
1941 EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
1942 EXPECT_EQ(pi1_context->remote_queue_index,
1943 header.remote_queue_index());
1944 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
1945
1946 EXPECT_EQ(pi1_context->monotonic_event_time,
1947 header_monotonic_sent_time);
1948 EXPECT_EQ(pi1_context->realtime_event_time,
1949 header_realtime_sent_time);
1950 EXPECT_EQ(pi1_context->realtime_remote_time,
1951 header_realtime_remote_time);
1952 EXPECT_EQ(pi1_context->monotonic_remote_time,
1953 header_monotonic_remote_time);
1954
1955 EXPECT_EQ(pi2_context->realtime_event_time,
1956 header_realtime_remote_time);
1957 EXPECT_EQ(pi2_context->monotonic_event_time,
1958 header_monotonic_remote_time);
1959
1960 // Time estimation isn't perfect, but we know the clocks were
1961 // identical when logged, so we know when this should have come back.
1962 // Confirm we got it when we expected.
1963 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
1964 pi2_context->monotonic_event_time + 2 * network_delay +
1965 send_delay);
1966 });
1967 }
1968
1969 // And confirm we can re-create a log again, while checking the contents.
1970 {
1971 LoggerState pi1_logger = MakeLogger(
1972 log_reader_factory.GetNodeEventLoopFactory("pi1"), &log_reader_factory);
1973 LoggerState pi2_logger = MakeLogger(
1974 log_reader_factory.GetNodeEventLoopFactory("pi2"), &log_reader_factory);
1975
Austin Schuh8fb4b452023-08-04 17:02:27 -07001976 StartLogger(&pi1_logger, tmp_dir_ + "/logs/relogged1");
1977 StartLogger(&pi2_logger, tmp_dir_ + "/logs/relogged2");
Naman Guptaa63aa132023-03-22 20:06:34 -07001978
1979 log_reader_factory.Run();
1980 }
1981
1982 reader.Deregister();
1983
1984 // And verify that we can run the LogReader over the relogged files without
1985 // hitting any fatal errors.
1986 {
Austin Schuh8fb4b452023-08-04 17:02:27 -07001987 LogReader relogged_reader(SortParts(
1988 MakeLogFiles(tmp_dir_ + "/logs/relogged1", tmp_dir_ + "/logs/relogged2",
1989 1, 1, 2, 2, true)));
Naman Guptaa63aa132023-03-22 20:06:34 -07001990 relogged_reader.Register();
1991
1992 relogged_reader.event_loop_factory()->Run();
1993 }
1994 // And confirm that we can read the logged file using the reader's
1995 // configuration.
1996 {
1997 LogReader relogged_reader(
Austin Schuh8fb4b452023-08-04 17:02:27 -07001998 SortParts(MakeLogFiles(tmp_dir_ + "/logs/relogged1",
1999 tmp_dir_ + "/logs/relogged2", 1, 1, 2, 2, true)),
Naman Guptaa63aa132023-03-22 20:06:34 -07002000 reader.configuration());
2001 relogged_reader.Register();
2002
2003 relogged_reader.event_loop_factory()->Run();
2004 }
2005}
2006
2007// Tests that we properly populate and extract the logger_start time by setting
2008// up a clock difference between 2 nodes and looking at the resulting parts.
2009TEST_P(MultinodeLoggerTest, LoggerStartTime) {
2010 std::vector<std::string> actual_filenames;
2011 time_converter_.AddMonotonic(
2012 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2013 {
2014 LoggerState pi1_logger = MakeLogger(pi1_);
2015 LoggerState pi2_logger = MakeLogger(pi2_);
2016
2017 StartLogger(&pi1_logger);
2018 StartLogger(&pi2_logger);
2019
2020 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2021
2022 pi1_logger.AppendAllFilenames(&actual_filenames);
2023 pi2_logger.AppendAllFilenames(&actual_filenames);
2024 }
2025
2026 ASSERT_THAT(actual_filenames,
2027 ::testing::UnorderedElementsAreArray(logfiles_));
2028
Austin Schuh8fb4b452023-08-04 17:02:27 -07002029 for (const LogFile &log_file : SortParts(actual_filenames)) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002030 for (const LogParts &log_part : log_file.parts) {
2031 if (log_part.node == log_file.logger_node) {
2032 EXPECT_EQ(log_part.logger_monotonic_start_time,
2033 aos::monotonic_clock::min_time);
2034 EXPECT_EQ(log_part.logger_realtime_start_time,
2035 aos::realtime_clock::min_time);
2036 } else {
2037 const chrono::seconds offset = log_file.logger_node == "pi1"
2038 ? -chrono::seconds(1000)
2039 : chrono::seconds(1000);
2040 EXPECT_EQ(log_part.logger_monotonic_start_time,
2041 log_part.monotonic_start_time + offset);
2042 EXPECT_EQ(log_part.logger_realtime_start_time,
2043 log_file.realtime_start_time +
2044 (log_part.logger_monotonic_start_time -
2045 log_file.monotonic_start_time));
2046 }
2047 }
2048 }
2049}
2050
2051// Test that renaming the base, renames the folder.
2052TEST_P(MultinodeLoggerTest, LoggerRenameFolder) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002053 time_converter_.AddMonotonic(
2054 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07002055 logfile_base1_ = tmp_dir_ + "/logs/renamefolder/multi_logfile1";
2056 logfile_base2_ = tmp_dir_ + "/logs/renamefolder/multi_logfile2";
2057
Naman Guptaa63aa132023-03-22 20:06:34 -07002058 LoggerState pi1_logger = MakeLogger(pi1_);
2059 LoggerState pi2_logger = MakeLogger(pi2_);
2060
2061 StartLogger(&pi1_logger);
2062 StartLogger(&pi2_logger);
2063
2064 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002065 logfile_base1_ = tmp_dir_ + "/logs/new-good/multi_logfile1";
2066 logfile_base2_ = tmp_dir_ + "/logs/new-good/multi_logfile2";
Naman Guptaa63aa132023-03-22 20:06:34 -07002067 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
Alexei Strotscaf17d32023-04-03 22:31:11 -07002068
2069 // Sequence of set_base_name and Rotate simulates rename operation. Since
2070 // rename is not supported by all namers, RenameLogBase moved from logger to
2071 // the higher level abstraction, yet log_namers support rename, and it is
2072 // legal to test it here.
2073 pi1_logger.log_namer->set_base_name(logfile_base1_);
2074 pi1_logger.logger->Rotate();
2075 pi2_logger.log_namer->set_base_name(logfile_base2_);
2076 pi2_logger.logger->Rotate();
2077
Naman Guptaa63aa132023-03-22 20:06:34 -07002078 for (auto &file : logfiles_) {
2079 struct stat s;
2080 EXPECT_EQ(0, stat(file.c_str(), &s));
2081 }
2082}
2083
2084// Test that renaming the file base dies.
2085TEST_P(MultinodeLoggerDeathTest, LoggerRenameFile) {
2086 time_converter_.AddMonotonic(
2087 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07002088 logfile_base1_ = tmp_dir_ + "/logs/renamefile/multi_logfile1";
2089 logfile_base2_ = tmp_dir_ + "/logs/renamefile/multi_logfile2";
2090
Naman Guptaa63aa132023-03-22 20:06:34 -07002091 LoggerState pi1_logger = MakeLogger(pi1_);
2092 StartLogger(&pi1_logger);
2093 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002094 logfile_base1_ = tmp_dir_ + "/logs/new-renamefile/new_multi_logfile1";
Alexei Strotscaf17d32023-04-03 22:31:11 -07002095 EXPECT_DEATH({ pi1_logger.log_namer->set_base_name(logfile_base1_); },
Naman Guptaa63aa132023-03-22 20:06:34 -07002096 "Rename of file base from");
2097}
2098
2099// TODO(austin): We can write a test which recreates a logfile and confirms that
2100// we get it back. That is the ultimate test.
2101
2102// Tests that we properly recreate forwarded timestamps when replaying a log.
2103// This should be enough that we can then re-run the logger and get a valid log
2104// back.
2105TEST_P(MultinodeLoggerTest, RemoteReboot) {
Austin Schuh6ecfe902023-08-04 22:44:37 -07002106 if (file_strategy() == FileStrategy::kCombine) {
2107 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
2108 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002109 std::vector<std::string> actual_filenames;
2110
2111 const UUID pi1_boot0 = UUID::Random();
2112 const UUID pi2_boot0 = UUID::Random();
2113 const UUID pi2_boot1 = UUID::Random();
2114 {
2115 CHECK_EQ(pi1_index_, 0u);
2116 CHECK_EQ(pi2_index_, 1u);
2117
2118 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2119 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2120 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2121
2122 time_converter_.AddNextTimestamp(
2123 distributed_clock::epoch(),
2124 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2125 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2126 time_converter_.AddNextTimestamp(
2127 distributed_clock::epoch() + reboot_time,
2128 {BootTimestamp::epoch() + reboot_time,
2129 BootTimestamp{
2130 .boot = 1,
2131 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2132 }
2133
2134 {
2135 LoggerState pi1_logger = MakeLogger(pi1_);
2136
2137 event_loop_factory_.RunFor(chrono::milliseconds(95));
2138 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2139 pi1_boot0);
2140 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2141 pi2_boot0);
2142
2143 StartLogger(&pi1_logger);
2144
2145 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2146
2147 VLOG(1) << "Reboot now!";
2148
2149 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2150 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2151 pi1_boot0);
2152 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2153 pi2_boot1);
2154
2155 pi1_logger.AppendAllFilenames(&actual_filenames);
2156 }
2157
2158 std::sort(actual_filenames.begin(), actual_filenames.end());
2159 std::sort(pi1_reboot_logfiles_.begin(), pi1_reboot_logfiles_.end());
2160 ASSERT_THAT(actual_filenames,
2161 ::testing::UnorderedElementsAreArray(pi1_reboot_logfiles_));
2162
2163 // Confirm that our new oldest timestamps properly update as we reboot and
2164 // rotate.
2165 for (const std::string &file : pi1_reboot_logfiles_) {
2166 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2167 ReadHeader(file);
2168 CHECK(log_header);
2169 if (log_header->message().has_configuration()) {
2170 continue;
2171 }
2172
2173 const monotonic_clock::time_point monotonic_start_time =
2174 monotonic_clock::time_point(
2175 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2176 const UUID source_node_boot_uuid = UUID::FromString(
2177 log_header->message().source_node_boot_uuid()->string_view());
2178
2179 if (log_header->message().node()->name()->string_view() != "pi1") {
2180 // The remote message channel should rotate later and have more parts.
2181 // This only is true on the log files with shared remote messages.
2182 //
2183 // TODO(austin): I'm not the most thrilled with this test pattern... It
2184 // feels brittle in a different way.
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002185 if (file.find("timestamps/remote_pi2") == std::string::npos) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002186 switch (log_header->message().parts_index()) {
2187 case 0:
2188 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2189 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2190 break;
2191 case 1:
2192 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2193 ASSERT_EQ(monotonic_start_time,
2194 monotonic_clock::epoch() + chrono::seconds(1));
2195 break;
2196 case 2:
2197 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2198 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2199 break;
2200 case 3:
2201 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2202 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
2203 chrono::nanoseconds(2322999462))
2204 << " on " << file;
2205 break;
2206 default:
2207 FAIL();
2208 break;
2209 }
2210 } else {
2211 switch (log_header->message().parts_index()) {
2212 case 0:
2213 case 1:
2214 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2215 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2216 break;
2217 case 2:
2218 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2219 ASSERT_EQ(monotonic_start_time,
2220 monotonic_clock::epoch() + chrono::seconds(1));
2221 break;
2222 case 3:
2223 case 4:
2224 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2225 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2226 break;
2227 case 5:
2228 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2229 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
2230 chrono::nanoseconds(2322999462))
2231 << " on " << file;
2232 break;
2233 default:
2234 FAIL();
2235 break;
2236 }
2237 }
2238 continue;
2239 }
2240 SCOPED_TRACE(file);
2241 SCOPED_TRACE(aos::FlatbufferToJson(
2242 *log_header, {.multi_line = true, .max_vector_size = 100}));
2243 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2244 ASSERT_EQ(
2245 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2246 EXPECT_EQ(
2247 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2248 monotonic_clock::max_time.time_since_epoch().count());
2249 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2250 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2251 2u);
2252 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2253 monotonic_clock::max_time.time_since_epoch().count());
2254 ASSERT_TRUE(log_header->message()
2255 .has_oldest_remote_unreliable_monotonic_timestamps());
2256 ASSERT_EQ(log_header->message()
2257 .oldest_remote_unreliable_monotonic_timestamps()
2258 ->size(),
2259 2u);
2260 EXPECT_EQ(log_header->message()
2261 .oldest_remote_unreliable_monotonic_timestamps()
2262 ->Get(0),
2263 monotonic_clock::max_time.time_since_epoch().count());
2264 ASSERT_TRUE(log_header->message()
2265 .has_oldest_local_unreliable_monotonic_timestamps());
2266 ASSERT_EQ(log_header->message()
2267 .oldest_local_unreliable_monotonic_timestamps()
2268 ->size(),
2269 2u);
2270 EXPECT_EQ(log_header->message()
2271 .oldest_local_unreliable_monotonic_timestamps()
2272 ->Get(0),
2273 monotonic_clock::max_time.time_since_epoch().count());
2274
2275 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2276 monotonic_clock::time_point(chrono::nanoseconds(
2277 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2278 1)));
2279 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2280 monotonic_clock::time_point(chrono::nanoseconds(
2281 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2282 const monotonic_clock::time_point
2283 oldest_remote_unreliable_monotonic_timestamps =
2284 monotonic_clock::time_point(chrono::nanoseconds(
2285 log_header->message()
2286 .oldest_remote_unreliable_monotonic_timestamps()
2287 ->Get(1)));
2288 const monotonic_clock::time_point
2289 oldest_local_unreliable_monotonic_timestamps =
2290 monotonic_clock::time_point(chrono::nanoseconds(
2291 log_header->message()
2292 .oldest_local_unreliable_monotonic_timestamps()
2293 ->Get(1)));
2294 const monotonic_clock::time_point
2295 oldest_remote_reliable_monotonic_timestamps =
2296 monotonic_clock::time_point(chrono::nanoseconds(
2297 log_header->message()
2298 .oldest_remote_reliable_monotonic_timestamps()
2299 ->Get(1)));
2300 const monotonic_clock::time_point
2301 oldest_local_reliable_monotonic_timestamps =
2302 monotonic_clock::time_point(chrono::nanoseconds(
2303 log_header->message()
2304 .oldest_local_reliable_monotonic_timestamps()
2305 ->Get(1)));
2306 const monotonic_clock::time_point
2307 oldest_logger_remote_unreliable_monotonic_timestamps =
2308 monotonic_clock::time_point(chrono::nanoseconds(
2309 log_header->message()
2310 .oldest_logger_remote_unreliable_monotonic_timestamps()
2311 ->Get(0)));
2312 const monotonic_clock::time_point
2313 oldest_logger_local_unreliable_monotonic_timestamps =
2314 monotonic_clock::time_point(chrono::nanoseconds(
2315 log_header->message()
2316 .oldest_logger_local_unreliable_monotonic_timestamps()
2317 ->Get(0)));
2318 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2319 monotonic_clock::max_time);
2320 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2321 monotonic_clock::max_time);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002322 if (log_header->message().data_stored()->Get(0) == StoredDataType::DATA) {
2323 switch (log_header->message().parts_index()) {
2324 case 0:
2325 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2326 monotonic_clock::max_time);
2327 EXPECT_EQ(oldest_local_monotonic_timestamps,
2328 monotonic_clock::max_time);
2329 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2330 monotonic_clock::max_time);
2331 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2332 monotonic_clock::max_time);
2333 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2334 monotonic_clock::max_time);
2335 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2336 monotonic_clock::max_time);
2337 break;
2338 default:
2339 FAIL();
2340 break;
2341 }
2342 } else if (log_header->message().data_stored()->Get(0) ==
2343 StoredDataType::TIMESTAMPS) {
2344 switch (log_header->message().parts_index()) {
2345 case 0:
2346 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2347 monotonic_clock::time_point(chrono::microseconds(90200)));
2348 EXPECT_EQ(oldest_local_monotonic_timestamps,
2349 monotonic_clock::time_point(chrono::microseconds(90350)));
2350 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2351 monotonic_clock::time_point(chrono::microseconds(90200)));
2352 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2353 monotonic_clock::time_point(chrono::microseconds(90350)));
2354 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2355 monotonic_clock::max_time);
2356 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2357 monotonic_clock::max_time);
2358 break;
2359 case 1:
2360 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2361 monotonic_clock::time_point(chrono::microseconds(90200)))
2362 << file;
2363 EXPECT_EQ(oldest_local_monotonic_timestamps,
2364 monotonic_clock::time_point(chrono::microseconds(90350)))
2365 << file;
2366 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2367 monotonic_clock::time_point(chrono::microseconds(90200)))
2368 << file;
2369 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2370 monotonic_clock::time_point(chrono::microseconds(90350)))
2371 << file;
2372 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2373 monotonic_clock::time_point(chrono::microseconds(100000)))
2374 << file;
2375 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2376 monotonic_clock::time_point(chrono::microseconds(100150)))
2377 << file;
2378 break;
2379 case 2:
2380 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2381 monotonic_clock::time_point(chrono::milliseconds(1323) +
2382 chrono::microseconds(200)));
2383 EXPECT_EQ(
2384 oldest_local_monotonic_timestamps,
2385 monotonic_clock::time_point(chrono::microseconds(10100350)));
2386 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2387 monotonic_clock::time_point(chrono::milliseconds(1323) +
2388 chrono::microseconds(200)));
2389 EXPECT_EQ(
2390 oldest_local_unreliable_monotonic_timestamps,
2391 monotonic_clock::time_point(chrono::microseconds(10100350)));
2392 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2393 monotonic_clock::max_time)
2394 << file;
2395 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2396 monotonic_clock::max_time)
2397 << file;
2398 break;
2399 case 3:
2400 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2401 monotonic_clock::time_point(chrono::milliseconds(1323) +
2402 chrono::microseconds(200)));
2403 EXPECT_EQ(
2404 oldest_local_monotonic_timestamps,
2405 monotonic_clock::time_point(chrono::microseconds(10100350)));
2406 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2407 monotonic_clock::time_point(chrono::milliseconds(1323) +
2408 chrono::microseconds(200)));
2409 EXPECT_EQ(
2410 oldest_local_unreliable_monotonic_timestamps,
2411 monotonic_clock::time_point(chrono::microseconds(10100350)));
2412 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2413 monotonic_clock::time_point(chrono::microseconds(1423000)))
2414 << file;
2415 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2416 monotonic_clock::time_point(chrono::microseconds(10200150)))
2417 << file;
2418 break;
2419 default:
2420 FAIL();
2421 break;
2422 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002423 }
2424 }
2425
2426 // Confirm that we refuse to replay logs with missing boot uuids.
2427 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002428 auto sorted_parts = SortParts(pi1_reboot_logfiles_);
2429 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2430 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002431
2432 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2433 log_reader_factory.set_send_delay(chrono::microseconds(0));
2434
2435 // This sends out the fetched messages and advances time to the start of
2436 // the log file.
2437 reader.Register(&log_reader_factory);
2438
2439 log_reader_factory.Run();
2440
2441 reader.Deregister();
2442 }
2443}
2444
2445// Tests that we can sort a log which only has timestamps from the remote
2446// because the local message_bridge_client failed to connect.
2447TEST_P(MultinodeLoggerTest, RemoteRebootOnlyTimestamps) {
Austin Schuh6ecfe902023-08-04 22:44:37 -07002448 if (file_strategy() == FileStrategy::kCombine) {
2449 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
2450 }
2451
Naman Guptaa63aa132023-03-22 20:06:34 -07002452 const UUID pi1_boot0 = UUID::Random();
2453 const UUID pi2_boot0 = UUID::Random();
2454 const UUID pi2_boot1 = UUID::Random();
2455 {
2456 CHECK_EQ(pi1_index_, 0u);
2457 CHECK_EQ(pi2_index_, 1u);
2458
2459 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2460 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2461 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2462
2463 time_converter_.AddNextTimestamp(
2464 distributed_clock::epoch(),
2465 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2466 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2467 time_converter_.AddNextTimestamp(
2468 distributed_clock::epoch() + reboot_time,
2469 {BootTimestamp::epoch() + reboot_time,
2470 BootTimestamp{
2471 .boot = 1,
2472 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2473 }
2474 pi2_->Disconnect(pi1_->node());
2475
2476 std::vector<std::string> filenames;
2477 {
2478 LoggerState pi1_logger = MakeLogger(pi1_);
2479
2480 event_loop_factory_.RunFor(chrono::milliseconds(95));
2481 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2482 pi1_boot0);
2483 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2484 pi2_boot0);
2485
2486 StartLogger(&pi1_logger);
2487
2488 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2489
2490 VLOG(1) << "Reboot now!";
2491
2492 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2493 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2494 pi1_boot0);
2495 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2496 pi2_boot1);
2497 pi1_logger.AppendAllFilenames(&filenames);
2498 }
2499
2500 std::sort(filenames.begin(), filenames.end());
2501
2502 // Confirm that our new oldest timestamps properly update as we reboot and
2503 // rotate.
2504 size_t timestamp_file_count = 0;
2505 for (const std::string &file : filenames) {
2506 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2507 ReadHeader(file);
2508 CHECK(log_header);
2509
2510 if (log_header->message().has_configuration()) {
2511 continue;
2512 }
2513
2514 const monotonic_clock::time_point monotonic_start_time =
2515 monotonic_clock::time_point(
2516 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2517 const UUID source_node_boot_uuid = UUID::FromString(
2518 log_header->message().source_node_boot_uuid()->string_view());
2519
2520 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2521 ASSERT_EQ(
2522 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2523 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2524 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2525 2u);
2526 ASSERT_TRUE(log_header->message()
2527 .has_oldest_remote_unreliable_monotonic_timestamps());
2528 ASSERT_EQ(log_header->message()
2529 .oldest_remote_unreliable_monotonic_timestamps()
2530 ->size(),
2531 2u);
2532 ASSERT_TRUE(log_header->message()
2533 .has_oldest_local_unreliable_monotonic_timestamps());
2534 ASSERT_EQ(log_header->message()
2535 .oldest_local_unreliable_monotonic_timestamps()
2536 ->size(),
2537 2u);
2538 ASSERT_TRUE(log_header->message()
2539 .has_oldest_remote_reliable_monotonic_timestamps());
2540 ASSERT_EQ(log_header->message()
2541 .oldest_remote_reliable_monotonic_timestamps()
2542 ->size(),
2543 2u);
2544 ASSERT_TRUE(
2545 log_header->message().has_oldest_local_reliable_monotonic_timestamps());
2546 ASSERT_EQ(log_header->message()
2547 .oldest_local_reliable_monotonic_timestamps()
2548 ->size(),
2549 2u);
2550
2551 ASSERT_TRUE(
2552 log_header->message()
2553 .has_oldest_logger_remote_unreliable_monotonic_timestamps());
2554 ASSERT_EQ(log_header->message()
2555 .oldest_logger_remote_unreliable_monotonic_timestamps()
2556 ->size(),
2557 2u);
2558 ASSERT_TRUE(log_header->message()
2559 .has_oldest_logger_local_unreliable_monotonic_timestamps());
2560 ASSERT_EQ(log_header->message()
2561 .oldest_logger_local_unreliable_monotonic_timestamps()
2562 ->size(),
2563 2u);
2564
2565 if (log_header->message().node()->name()->string_view() != "pi1") {
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002566 ASSERT_TRUE(file.find("timestamps/remote_pi2") != std::string::npos);
Naman Guptaa63aa132023-03-22 20:06:34 -07002567
2568 const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
2569 ReadNthMessage(file, 0);
2570 CHECK(msg);
2571
2572 EXPECT_TRUE(msg->message().has_monotonic_sent_time());
2573 EXPECT_TRUE(msg->message().has_monotonic_remote_time());
2574
2575 const monotonic_clock::time_point
2576 expected_oldest_local_monotonic_timestamps(
2577 chrono::nanoseconds(msg->message().monotonic_sent_time()));
2578 const monotonic_clock::time_point
2579 expected_oldest_remote_monotonic_timestamps(
2580 chrono::nanoseconds(msg->message().monotonic_remote_time()));
2581 const monotonic_clock::time_point
2582 expected_oldest_timestamp_monotonic_timestamps(
2583 chrono::nanoseconds(msg->message().monotonic_timestamp_time()));
2584
2585 EXPECT_NE(expected_oldest_local_monotonic_timestamps,
2586 monotonic_clock::min_time);
2587 EXPECT_NE(expected_oldest_remote_monotonic_timestamps,
2588 monotonic_clock::min_time);
2589 EXPECT_NE(expected_oldest_timestamp_monotonic_timestamps,
2590 monotonic_clock::min_time);
2591
2592 ++timestamp_file_count;
2593 // Since the log file is from the perspective of the other node,
2594 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2595 monotonic_clock::time_point(chrono::nanoseconds(
2596 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2597 0)));
2598 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2599 monotonic_clock::time_point(chrono::nanoseconds(
2600 log_header->message().oldest_local_monotonic_timestamps()->Get(
2601 0)));
2602 const monotonic_clock::time_point
2603 oldest_remote_unreliable_monotonic_timestamps =
2604 monotonic_clock::time_point(chrono::nanoseconds(
2605 log_header->message()
2606 .oldest_remote_unreliable_monotonic_timestamps()
2607 ->Get(0)));
2608 const monotonic_clock::time_point
2609 oldest_local_unreliable_monotonic_timestamps =
2610 monotonic_clock::time_point(chrono::nanoseconds(
2611 log_header->message()
2612 .oldest_local_unreliable_monotonic_timestamps()
2613 ->Get(0)));
2614 const monotonic_clock::time_point
2615 oldest_remote_reliable_monotonic_timestamps =
2616 monotonic_clock::time_point(chrono::nanoseconds(
2617 log_header->message()
2618 .oldest_remote_reliable_monotonic_timestamps()
2619 ->Get(0)));
2620 const monotonic_clock::time_point
2621 oldest_local_reliable_monotonic_timestamps =
2622 monotonic_clock::time_point(chrono::nanoseconds(
2623 log_header->message()
2624 .oldest_local_reliable_monotonic_timestamps()
2625 ->Get(0)));
2626 const monotonic_clock::time_point
2627 oldest_logger_remote_unreliable_monotonic_timestamps =
2628 monotonic_clock::time_point(chrono::nanoseconds(
2629 log_header->message()
2630 .oldest_logger_remote_unreliable_monotonic_timestamps()
2631 ->Get(1)));
2632 const monotonic_clock::time_point
2633 oldest_logger_local_unreliable_monotonic_timestamps =
2634 monotonic_clock::time_point(chrono::nanoseconds(
2635 log_header->message()
2636 .oldest_logger_local_unreliable_monotonic_timestamps()
2637 ->Get(1)));
2638
2639 const Channel *channel =
2640 event_loop_factory_.configuration()->channels()->Get(
2641 msg->message().channel_index());
2642 const Connection *connection = configuration::ConnectionToNode(
2643 channel, configuration::GetNode(
2644 event_loop_factory_.configuration(),
2645 log_header->message().node()->name()->string_view()));
2646
2647 const bool reliable = connection->time_to_live() == 0;
2648
2649 SCOPED_TRACE(file);
2650 SCOPED_TRACE(aos::FlatbufferToJson(
2651 *log_header, {.multi_line = true, .max_vector_size = 100}));
2652
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002653 // Confirm that the oldest timestamps match what we expect. Based on
2654 // what we are doing, we know that the oldest time is the first
2655 // message's time.
2656 //
2657 // This makes the test robust to both the split and combined config
2658 // tests.
2659 switch (log_header->message().parts_index()) {
2660 case 0:
2661 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2662 expected_oldest_remote_monotonic_timestamps);
2663 EXPECT_EQ(oldest_local_monotonic_timestamps,
2664 expected_oldest_local_monotonic_timestamps);
2665 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2666 expected_oldest_local_monotonic_timestamps)
2667 << file;
2668 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2669 expected_oldest_timestamp_monotonic_timestamps)
2670 << file;
2671
2672 if (reliable) {
2673 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
Naman Guptaa63aa132023-03-22 20:06:34 -07002674 expected_oldest_remote_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002675 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
Naman Guptaa63aa132023-03-22 20:06:34 -07002676 expected_oldest_local_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002677 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2678 monotonic_clock::max_time);
2679 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2680 monotonic_clock::max_time);
2681 } else {
2682 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2683 monotonic_clock::max_time);
2684 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2685 monotonic_clock::max_time);
Naman Guptaa63aa132023-03-22 20:06:34 -07002686 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2687 expected_oldest_remote_monotonic_timestamps);
2688 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2689 expected_oldest_local_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002690 }
2691 break;
2692 case 1:
2693 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2694 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2695 EXPECT_EQ(oldest_local_monotonic_timestamps,
2696 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2697 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2698 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2699 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2700 monotonic_clock::epoch() + chrono::nanoseconds(90250000));
2701 if (reliable) {
2702 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2703 expected_oldest_remote_monotonic_timestamps);
2704 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2705 expected_oldest_local_monotonic_timestamps);
2706 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2707 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2708 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2709 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2710 } else {
2711 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2712 monotonic_clock::max_time);
2713 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2714 monotonic_clock::max_time);
2715 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2716 expected_oldest_remote_monotonic_timestamps);
2717 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2718 expected_oldest_local_monotonic_timestamps);
2719 }
2720 break;
2721 case 2:
2722 EXPECT_EQ(
2723 oldest_remote_monotonic_timestamps,
2724 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2725 EXPECT_EQ(oldest_local_monotonic_timestamps,
2726 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2727 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2728 expected_oldest_local_monotonic_timestamps)
2729 << file;
2730 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2731 expected_oldest_timestamp_monotonic_timestamps)
2732 << file;
2733 if (reliable) {
2734 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2735 expected_oldest_remote_monotonic_timestamps);
2736 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2737 expected_oldest_local_monotonic_timestamps);
2738 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2739 monotonic_clock::max_time);
2740 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2741 monotonic_clock::max_time);
2742 } else {
2743 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2744 monotonic_clock::max_time);
2745 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2746 monotonic_clock::max_time);
2747 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2748 expected_oldest_remote_monotonic_timestamps);
2749 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2750 expected_oldest_local_monotonic_timestamps);
2751 }
2752 break;
Naman Guptaa63aa132023-03-22 20:06:34 -07002753
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002754 case 3:
2755 EXPECT_EQ(
2756 oldest_remote_monotonic_timestamps,
2757 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2758 EXPECT_EQ(oldest_local_monotonic_timestamps,
2759 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2760 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2761 expected_oldest_remote_monotonic_timestamps);
2762 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2763 expected_oldest_local_monotonic_timestamps);
2764 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2765 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2766 EXPECT_EQ(
2767 oldest_logger_local_unreliable_monotonic_timestamps,
2768 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2769 break;
2770 default:
2771 FAIL();
2772 break;
Naman Guptaa63aa132023-03-22 20:06:34 -07002773 }
2774
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002775 switch (log_header->message().parts_index()) {
2776 case 0:
2777 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2778 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2779 break;
2780 case 1:
2781 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2782 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2783 break;
2784 case 2:
2785 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2786 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2787 break;
2788 case 3:
2789 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2790 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2791 break;
2792 [[fallthrough]];
2793 default:
2794 FAIL();
2795 break;
2796 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002797 continue;
2798 }
2799 EXPECT_EQ(
2800 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2801 monotonic_clock::max_time.time_since_epoch().count());
2802 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2803 monotonic_clock::max_time.time_since_epoch().count());
2804 EXPECT_EQ(log_header->message()
2805 .oldest_remote_unreliable_monotonic_timestamps()
2806 ->Get(0),
2807 monotonic_clock::max_time.time_since_epoch().count());
2808 EXPECT_EQ(log_header->message()
2809 .oldest_local_unreliable_monotonic_timestamps()
2810 ->Get(0),
2811 monotonic_clock::max_time.time_since_epoch().count());
2812
2813 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2814 monotonic_clock::time_point(chrono::nanoseconds(
2815 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2816 1)));
2817 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2818 monotonic_clock::time_point(chrono::nanoseconds(
2819 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2820 const monotonic_clock::time_point
2821 oldest_remote_unreliable_monotonic_timestamps =
2822 monotonic_clock::time_point(chrono::nanoseconds(
2823 log_header->message()
2824 .oldest_remote_unreliable_monotonic_timestamps()
2825 ->Get(1)));
2826 const monotonic_clock::time_point
2827 oldest_local_unreliable_monotonic_timestamps =
2828 monotonic_clock::time_point(chrono::nanoseconds(
2829 log_header->message()
2830 .oldest_local_unreliable_monotonic_timestamps()
2831 ->Get(1)));
2832 switch (log_header->message().parts_index()) {
2833 case 0:
2834 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2835 monotonic_clock::max_time);
2836 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2837 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2838 monotonic_clock::max_time);
2839 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2840 monotonic_clock::max_time);
2841 break;
2842 default:
2843 FAIL();
2844 break;
2845 }
2846 }
2847
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002848 EXPECT_EQ(timestamp_file_count, 4u);
Naman Guptaa63aa132023-03-22 20:06:34 -07002849
2850 // Confirm that we can actually sort the resulting log and read it.
2851 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002852 auto sorted_parts = SortParts(filenames);
2853 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2854 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002855
2856 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2857 log_reader_factory.set_send_delay(chrono::microseconds(0));
2858
2859 // This sends out the fetched messages and advances time to the start of
2860 // the log file.
2861 reader.Register(&log_reader_factory);
2862
2863 log_reader_factory.Run();
2864
2865 reader.Deregister();
2866 }
2867}
2868
2869// Tests that we properly handle one direction of message_bridge being
2870// unavailable.
2871TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07002872 std::vector<std::string> actual_filenames;
2873
Naman Guptaa63aa132023-03-22 20:06:34 -07002874 pi1_->Disconnect(pi2_->node());
2875 time_converter_.AddMonotonic(
2876 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2877
2878 time_converter_.AddMonotonic(
2879 {chrono::milliseconds(10000),
2880 chrono::milliseconds(10000) - chrono::milliseconds(1)});
2881 {
2882 LoggerState pi1_logger = MakeLogger(pi1_);
2883
2884 event_loop_factory_.RunFor(chrono::milliseconds(95));
2885
2886 StartLogger(&pi1_logger);
2887
2888 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002889 pi1_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07002890 }
2891
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002892 // Confirm that we can parse the result. LogReader has enough internal
2893 // CHECKs to confirm the right thing happened.
Austin Schuh8fb4b452023-08-04 17:02:27 -07002894 ConfirmReadable(actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07002895}
2896
2897// Tests that we properly handle one direction of message_bridge being
2898// unavailable.
2899TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
2900 pi1_->Disconnect(pi2_->node());
2901 time_converter_.AddMonotonic(
2902 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(500)});
2903
2904 time_converter_.AddMonotonic(
2905 {chrono::milliseconds(10000),
2906 chrono::milliseconds(10000) + chrono::milliseconds(1)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07002907
2908 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07002909 {
2910 LoggerState pi1_logger = MakeLogger(pi1_);
2911
2912 event_loop_factory_.RunFor(chrono::milliseconds(95));
2913
2914 StartLogger(&pi1_logger);
2915
2916 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002917 pi1_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07002918 }
2919
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002920 // Confirm that we can parse the result. LogReader has enough internal
2921 // CHECKs to confirm the right thing happened.
Austin Schuh8fb4b452023-08-04 17:02:27 -07002922 ConfirmReadable(filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07002923}
2924
2925// Tests that we explode if someone passes in a part file twice with a better
2926// error than an out of order error.
2927TEST_P(MultinodeLoggerTest, DuplicateLogFiles) {
2928 time_converter_.AddMonotonic(
2929 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07002930
2931 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07002932 {
2933 LoggerState pi1_logger = MakeLogger(pi1_);
2934
2935 event_loop_factory_.RunFor(chrono::milliseconds(95));
2936
2937 StartLogger(&pi1_logger);
2938
2939 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002940
2941 pi1_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07002942 }
2943
2944 std::vector<std::string> duplicates;
Austin Schuh8fb4b452023-08-04 17:02:27 -07002945 for (const std::string &f : filenames) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002946 duplicates.emplace_back(f);
2947 duplicates.emplace_back(f);
2948 }
2949 EXPECT_DEATH({ SortParts(duplicates); }, "Found duplicate parts in");
2950}
2951
2952// Tests that we explode if someone loses a part out of the middle of a log.
2953TEST_P(MultinodeLoggerTest, MissingPartsFromMiddle) {
Austin Schuh6ecfe902023-08-04 22:44:37 -07002954 if (file_strategy() == FileStrategy::kCombine) {
2955 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
2956 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002957 time_converter_.AddMonotonic(
2958 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2959 {
2960 LoggerState pi1_logger = MakeLogger(pi1_);
2961
2962 event_loop_factory_.RunFor(chrono::milliseconds(95));
2963
2964 StartLogger(&pi1_logger);
2965 aos::monotonic_clock::time_point last_rotation_time =
2966 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07002967 pi1_logger.logger->set_on_logged_period(
2968 [&](aos::monotonic_clock::time_point) {
2969 const auto now = pi1_logger.event_loop->monotonic_now();
2970 if (now > last_rotation_time + std::chrono::seconds(5)) {
2971 pi1_logger.logger->Rotate();
2972 last_rotation_time = now;
2973 }
2974 });
Naman Guptaa63aa132023-03-22 20:06:34 -07002975
2976 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2977 }
2978
2979 std::vector<std::string> missing_parts;
2980
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002981 missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part0" +
2982 Extension());
2983 missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part2" +
2984 Extension());
Naman Guptaa63aa132023-03-22 20:06:34 -07002985 missing_parts.emplace_back(absl::StrCat(
2986 logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
2987
2988 EXPECT_DEATH({ SortParts(missing_parts); },
2989 "Broken log, missing part files between");
2990}
2991
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002992// Tests that we properly handle a dead node. Do this by just disconnecting
2993// it and only using one nodes of logs.
Naman Guptaa63aa132023-03-22 20:06:34 -07002994TEST_P(MultinodeLoggerTest, DeadNode) {
2995 pi1_->Disconnect(pi2_->node());
2996 pi2_->Disconnect(pi1_->node());
2997 time_converter_.AddMonotonic(
2998 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2999 {
3000 LoggerState pi1_logger = MakeLogger(pi1_);
3001
3002 event_loop_factory_.RunFor(chrono::milliseconds(95));
3003
3004 StartLogger(&pi1_logger);
3005
3006 event_loop_factory_.RunFor(chrono::milliseconds(10000));
3007 }
3008
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003009 // Confirm that we can parse the result. LogReader has enough internal
3010 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003011 ConfirmReadable(MakePi1DeadNodeLogfiles());
3012}
3013
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003014// Tests that we can relog with a different config. This makes most sense
3015// when you are trying to edit a log and want to use channel renaming + the
3016// original config in the new log.
Naman Guptaa63aa132023-03-22 20:06:34 -07003017TEST_P(MultinodeLoggerTest, LogDifferentConfig) {
3018 time_converter_.StartEqual();
3019 {
3020 LoggerState pi1_logger = MakeLogger(pi1_);
3021 LoggerState pi2_logger = MakeLogger(pi2_);
3022
3023 event_loop_factory_.RunFor(chrono::milliseconds(95));
3024
3025 StartLogger(&pi1_logger);
3026 StartLogger(&pi2_logger);
3027
3028 event_loop_factory_.RunFor(chrono::milliseconds(20000));
3029 }
3030
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003031 auto sorted_parts = SortParts(logfiles_);
3032 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3033 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07003034 reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
3035
3036 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
3037 log_reader_factory.set_send_delay(chrono::microseconds(0));
3038
3039 // This sends out the fetched messages and advances time to the start of the
3040 // log file.
3041 reader.Register(&log_reader_factory);
3042
3043 const Node *pi1 =
3044 configuration::GetNode(log_reader_factory.configuration(), "pi1");
3045 const Node *pi2 =
3046 configuration::GetNode(log_reader_factory.configuration(), "pi2");
3047
3048 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
3049 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
3050 LOG(INFO) << "now pi1 "
3051 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
3052 LOG(INFO) << "now pi2 "
3053 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
3054
3055 EXPECT_THAT(reader.LoggedNodes(),
3056 ::testing::ElementsAre(
3057 configuration::GetNode(reader.logged_configuration(), pi1),
3058 configuration::GetNode(reader.logged_configuration(), pi2)));
3059
3060 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
3061
3062 // And confirm we can re-create a log again, while checking the contents.
3063 std::vector<std::string> log_files;
3064 {
3065 LoggerState pi1_logger =
3066 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
3067 &log_reader_factory, reader.logged_configuration());
3068 LoggerState pi2_logger =
3069 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
3070 &log_reader_factory, reader.logged_configuration());
3071
Austin Schuh7e417682023-08-11 17:05:30 -07003072 pi1_logger.StartLogger(tmp_dir_ + "/logs/relogged1");
3073 pi2_logger.StartLogger(tmp_dir_ + "/logs/relogged2");
Naman Guptaa63aa132023-03-22 20:06:34 -07003074
3075 log_reader_factory.Run();
3076
3077 for (auto &x : pi1_logger.log_namer->all_filenames()) {
Austin Schuh7e417682023-08-11 17:05:30 -07003078 log_files.emplace_back(absl::StrCat(tmp_dir_, "/logs/relogged1_", x));
Naman Guptaa63aa132023-03-22 20:06:34 -07003079 }
3080 for (auto &x : pi2_logger.log_namer->all_filenames()) {
Austin Schuh7e417682023-08-11 17:05:30 -07003081 log_files.emplace_back(absl::StrCat(tmp_dir_, "/logs/relogged2_", x));
Naman Guptaa63aa132023-03-22 20:06:34 -07003082 }
3083 }
3084
3085 reader.Deregister();
3086
3087 // And verify that we can run the LogReader over the relogged files without
3088 // hitting any fatal errors.
3089 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003090 auto sorted_parts = SortParts(log_files);
3091 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3092 LogReader relogged_reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07003093 relogged_reader.Register();
3094
3095 relogged_reader.event_loop_factory()->Run();
3096 }
3097}
3098
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003099// Tests that we properly replay a log where the start time for a node is
3100// before any data on the node. This can happen if the logger starts before
3101// data is published. While the scenario below is a bit convoluted, we have
3102// seen logs like this generated out in the wild.
Naman Guptaa63aa132023-03-22 20:06:34 -07003103TEST(MultinodeRebootLoggerTest, StartTimeBeforeData) {
Austin Schuh7e417682023-08-11 17:05:30 -07003104 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3105 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3106
Naman Guptaa63aa132023-03-22 20:06:34 -07003107 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3108 aos::configuration::ReadConfig(ArtifactPath(
3109 "aos/events/logging/multinode_pingpong_split3_config.json"));
3110 message_bridge::TestingTimeConverter time_converter(
3111 configuration::NodesCount(&config.message()));
3112 SimulatedEventLoopFactory event_loop_factory(&config.message());
3113 event_loop_factory.SetTimeConverter(&time_converter);
3114 NodeEventLoopFactory *const pi1 =
3115 event_loop_factory.GetNodeEventLoopFactory("pi1");
3116 const size_t pi1_index = configuration::GetNodeIndex(
3117 event_loop_factory.configuration(), pi1->node());
3118 NodeEventLoopFactory *const pi2 =
3119 event_loop_factory.GetNodeEventLoopFactory("pi2");
3120 const size_t pi2_index = configuration::GetNodeIndex(
3121 event_loop_factory.configuration(), pi2->node());
3122 NodeEventLoopFactory *const pi3 =
3123 event_loop_factory.GetNodeEventLoopFactory("pi3");
3124 const size_t pi3_index = configuration::GetNodeIndex(
3125 event_loop_factory.configuration(), pi3->node());
3126
3127 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003128 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003129 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003130 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003131 const std::string kLogfile2_2 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003132 aos::testing::TestTmpDir() + "/logs/multi_logfile2.2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003133 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003134 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
3135
Naman Guptaa63aa132023-03-22 20:06:34 -07003136 const UUID pi1_boot0 = UUID::Random();
3137 const UUID pi2_boot0 = UUID::Random();
3138 const UUID pi2_boot1 = UUID::Random();
3139 const UUID pi3_boot0 = UUID::Random();
3140 {
3141 CHECK_EQ(pi1_index, 0u);
3142 CHECK_EQ(pi2_index, 1u);
3143 CHECK_EQ(pi3_index, 2u);
3144
3145 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3146 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3147 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3148 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3149
3150 time_converter.AddNextTimestamp(
3151 distributed_clock::epoch(),
3152 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3153 BootTimestamp::epoch()});
3154 const chrono::nanoseconds reboot_time = chrono::milliseconds(20000);
3155 time_converter.AddNextTimestamp(
3156 distributed_clock::epoch() + reboot_time,
3157 {BootTimestamp::epoch() + reboot_time,
3158 BootTimestamp{
3159 .boot = 1,
3160 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)},
3161 BootTimestamp::epoch() + reboot_time});
3162 }
3163
3164 // Make everything perfectly quiet.
3165 event_loop_factory.SkipTimingReport();
3166 event_loop_factory.DisableStatistics();
3167
3168 std::vector<std::string> filenames;
3169 {
3170 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003171 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3172 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003173 LoggerState pi3_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003174 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3175 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003176 {
3177 // And now start the logger.
3178 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003179 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3180 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003181
3182 event_loop_factory.RunFor(chrono::milliseconds(1000));
3183
3184 pi1_logger.StartLogger(kLogfile1_1);
3185 pi3_logger.StartLogger(kLogfile3_1);
3186 pi2_logger.StartLogger(kLogfile2_1);
3187
3188 event_loop_factory.RunFor(chrono::milliseconds(10000));
3189
3190 // Now that we've got a start time in the past, turn on data.
3191 event_loop_factory.EnableStatistics();
3192 std::unique_ptr<aos::EventLoop> ping_event_loop =
3193 pi1->MakeEventLoop("ping");
3194 Ping ping(ping_event_loop.get());
3195
3196 pi2->AlwaysStart<Pong>("pong");
3197
3198 event_loop_factory.RunFor(chrono::milliseconds(3000));
3199
3200 pi2_logger.AppendAllFilenames(&filenames);
3201
3202 // Stop logging on pi2 before rebooting and completely shut off all
3203 // messages on pi2.
3204 pi2->DisableStatistics();
3205 pi1->Disconnect(pi2->node());
3206 pi2->Disconnect(pi1->node());
3207 }
3208 event_loop_factory.RunFor(chrono::milliseconds(7000));
3209 // pi2 now reboots.
3210 {
3211 event_loop_factory.RunFor(chrono::milliseconds(1000));
3212
3213 // Start logging again on pi2 after it is up.
3214 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003215 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3216 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003217 pi2_logger.StartLogger(kLogfile2_2);
3218
3219 event_loop_factory.RunFor(chrono::milliseconds(10000));
3220 // And, now that we have a start time in the log, turn data back on.
3221 pi2->EnableStatistics();
3222 pi1->Connect(pi2->node());
3223 pi2->Connect(pi1->node());
3224
3225 pi2->AlwaysStart<Pong>("pong");
3226 std::unique_ptr<aos::EventLoop> ping_event_loop =
3227 pi1->MakeEventLoop("ping");
3228 Ping ping(ping_event_loop.get());
3229
3230 event_loop_factory.RunFor(chrono::milliseconds(3000));
3231
3232 pi2_logger.AppendAllFilenames(&filenames);
3233 }
3234
3235 pi1_logger.AppendAllFilenames(&filenames);
3236 pi3_logger.AppendAllFilenames(&filenames);
3237 }
3238
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003239 // Confirm that we can parse the result. LogReader has enough internal
3240 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003241 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003242 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003243 auto result = ConfirmReadable(filenames);
3244 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
3245 chrono::seconds(1)));
3246 EXPECT_THAT(result[0].second,
3247 ::testing::ElementsAre(realtime_clock::epoch() +
3248 chrono::microseconds(34990350)));
3249
3250 EXPECT_THAT(result[1].first,
3251 ::testing::ElementsAre(
3252 realtime_clock::epoch() + chrono::seconds(1),
3253 realtime_clock::epoch() + chrono::microseconds(3323000)));
3254 EXPECT_THAT(result[1].second,
3255 ::testing::ElementsAre(
3256 realtime_clock::epoch() + chrono::microseconds(13990200),
3257 realtime_clock::epoch() + chrono::microseconds(16313200)));
3258
3259 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
3260 chrono::seconds(1)));
3261 EXPECT_THAT(result[2].second,
3262 ::testing::ElementsAre(realtime_clock::epoch() +
3263 chrono::microseconds(34900150)));
3264}
3265
3266// Tests that local data before remote data after reboot is properly replayed.
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003267// We only trigger a reboot in the timestamp interpolation function when
3268// solving the timestamp problem when we actually have a point in the
3269// function. This originally only happened when a point passes the noncausal
3270// filter. At the start of time for the second boot, if we aren't careful, we
3271// will have messages which need to be published at times before the boot.
3272// This happens when a local message is in the log before a forwarded message,
3273// so there is no point in the interpolation function. This delays the
3274// reboot. So, we need to recreate that situation and make sure it doesn't
3275// come back.
Naman Guptaa63aa132023-03-22 20:06:34 -07003276TEST(MultinodeRebootLoggerTest,
3277 LocalMessageBeforeRemoteBeforeStartAfterReboot) {
Austin Schuh7e417682023-08-11 17:05:30 -07003278 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3279 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3280
Naman Guptaa63aa132023-03-22 20:06:34 -07003281 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3282 aos::configuration::ReadConfig(ArtifactPath(
3283 "aos/events/logging/multinode_pingpong_split3_config.json"));
3284 message_bridge::TestingTimeConverter time_converter(
3285 configuration::NodesCount(&config.message()));
3286 SimulatedEventLoopFactory event_loop_factory(&config.message());
3287 event_loop_factory.SetTimeConverter(&time_converter);
3288 NodeEventLoopFactory *const pi1 =
3289 event_loop_factory.GetNodeEventLoopFactory("pi1");
3290 const size_t pi1_index = configuration::GetNodeIndex(
3291 event_loop_factory.configuration(), pi1->node());
3292 NodeEventLoopFactory *const pi2 =
3293 event_loop_factory.GetNodeEventLoopFactory("pi2");
3294 const size_t pi2_index = configuration::GetNodeIndex(
3295 event_loop_factory.configuration(), pi2->node());
3296 NodeEventLoopFactory *const pi3 =
3297 event_loop_factory.GetNodeEventLoopFactory("pi3");
3298 const size_t pi3_index = configuration::GetNodeIndex(
3299 event_loop_factory.configuration(), pi3->node());
3300
3301 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003302 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003303 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003304 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003305 const std::string kLogfile2_2 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003306 aos::testing::TestTmpDir() + "/logs/multi_logfile2.2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003307 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003308 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003309 const UUID pi1_boot0 = UUID::Random();
3310 const UUID pi2_boot0 = UUID::Random();
3311 const UUID pi2_boot1 = UUID::Random();
3312 const UUID pi3_boot0 = UUID::Random();
3313 {
3314 CHECK_EQ(pi1_index, 0u);
3315 CHECK_EQ(pi2_index, 1u);
3316 CHECK_EQ(pi3_index, 2u);
3317
3318 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3319 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3320 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3321 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3322
3323 time_converter.AddNextTimestamp(
3324 distributed_clock::epoch(),
3325 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3326 BootTimestamp::epoch()});
3327 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3328 time_converter.AddNextTimestamp(
3329 distributed_clock::epoch() + reboot_time,
3330 {BootTimestamp::epoch() + reboot_time,
3331 BootTimestamp{.boot = 1,
3332 .time = monotonic_clock::epoch() + reboot_time +
3333 chrono::seconds(100)},
3334 BootTimestamp::epoch() + reboot_time});
3335 }
3336
3337 std::vector<std::string> filenames;
3338 {
3339 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003340 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3341 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003342 LoggerState pi3_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003343 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3344 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003345 {
3346 // And now start the logger.
3347 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003348 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3349 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003350
3351 pi1_logger.StartLogger(kLogfile1_1);
3352 pi3_logger.StartLogger(kLogfile3_1);
3353 pi2_logger.StartLogger(kLogfile2_1);
3354
3355 event_loop_factory.RunFor(chrono::milliseconds(1005));
3356
3357 // Now that we've got a start time in the past, turn on data.
3358 std::unique_ptr<aos::EventLoop> ping_event_loop =
3359 pi1->MakeEventLoop("ping");
3360 Ping ping(ping_event_loop.get());
3361
3362 pi2->AlwaysStart<Pong>("pong");
3363
3364 event_loop_factory.RunFor(chrono::milliseconds(3000));
3365
3366 pi2_logger.AppendAllFilenames(&filenames);
3367
3368 // Disable any remote messages on pi2.
3369 pi1->Disconnect(pi2->node());
3370 pi2->Disconnect(pi1->node());
3371 }
3372 event_loop_factory.RunFor(chrono::milliseconds(995));
3373 // pi2 now reboots at 5 seconds.
3374 {
3375 event_loop_factory.RunFor(chrono::milliseconds(1000));
3376
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003377 // Make local stuff happen before we start logging and connect the
3378 // remote.
Naman Guptaa63aa132023-03-22 20:06:34 -07003379 pi2->AlwaysStart<Pong>("pong");
3380 std::unique_ptr<aos::EventLoop> ping_event_loop =
3381 pi1->MakeEventLoop("ping");
3382 Ping ping(ping_event_loop.get());
3383 event_loop_factory.RunFor(chrono::milliseconds(1005));
3384
3385 // Start logging again on pi2 after it is up.
3386 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003387 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3388 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003389 pi2_logger.StartLogger(kLogfile2_2);
3390
3391 // And allow remote messages now that we have some local ones.
3392 pi1->Connect(pi2->node());
3393 pi2->Connect(pi1->node());
3394
3395 event_loop_factory.RunFor(chrono::milliseconds(1000));
3396
3397 event_loop_factory.RunFor(chrono::milliseconds(3000));
3398
3399 pi2_logger.AppendAllFilenames(&filenames);
3400 }
3401
3402 pi1_logger.AppendAllFilenames(&filenames);
3403 pi3_logger.AppendAllFilenames(&filenames);
3404 }
3405
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003406 // Confirm that we can parse the result. LogReader has enough internal
3407 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003408 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003409 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003410 auto result = ConfirmReadable(filenames);
3411
3412 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3413 EXPECT_THAT(result[0].second,
3414 ::testing::ElementsAre(realtime_clock::epoch() +
3415 chrono::microseconds(11000350)));
3416
3417 EXPECT_THAT(result[1].first,
3418 ::testing::ElementsAre(
3419 realtime_clock::epoch(),
3420 realtime_clock::epoch() + chrono::microseconds(107005000)));
3421 EXPECT_THAT(result[1].second,
3422 ::testing::ElementsAre(
3423 realtime_clock::epoch() + chrono::microseconds(4000150),
3424 realtime_clock::epoch() + chrono::microseconds(111000200)));
3425
3426 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3427 EXPECT_THAT(result[2].second,
3428 ::testing::ElementsAre(realtime_clock::epoch() +
3429 chrono::microseconds(11000150)));
3430
3431 auto start_stop_result = ConfirmReadable(
3432 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3433 realtime_clock::epoch() + chrono::milliseconds(3000));
3434
3435 EXPECT_THAT(
3436 start_stop_result[0].first,
3437 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3438 EXPECT_THAT(
3439 start_stop_result[0].second,
3440 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3441 EXPECT_THAT(
3442 start_stop_result[1].first,
3443 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3444 EXPECT_THAT(
3445 start_stop_result[1].second,
3446 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3447 EXPECT_THAT(
3448 start_stop_result[2].first,
3449 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3450 EXPECT_THAT(
3451 start_stop_result[2].second,
3452 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3453}
3454
3455// Tests that setting the start and stop flags across a reboot works as
3456// expected.
3457TEST(MultinodeRebootLoggerTest, RebootStartStopTimes) {
Austin Schuh7e417682023-08-11 17:05:30 -07003458 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3459 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3460
Naman Guptaa63aa132023-03-22 20:06:34 -07003461 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3462 aos::configuration::ReadConfig(ArtifactPath(
3463 "aos/events/logging/multinode_pingpong_split3_config.json"));
3464 message_bridge::TestingTimeConverter time_converter(
3465 configuration::NodesCount(&config.message()));
3466 SimulatedEventLoopFactory event_loop_factory(&config.message());
3467 event_loop_factory.SetTimeConverter(&time_converter);
3468 NodeEventLoopFactory *const pi1 =
3469 event_loop_factory.GetNodeEventLoopFactory("pi1");
3470 const size_t pi1_index = configuration::GetNodeIndex(
3471 event_loop_factory.configuration(), pi1->node());
3472 NodeEventLoopFactory *const pi2 =
3473 event_loop_factory.GetNodeEventLoopFactory("pi2");
3474 const size_t pi2_index = configuration::GetNodeIndex(
3475 event_loop_factory.configuration(), pi2->node());
3476 NodeEventLoopFactory *const pi3 =
3477 event_loop_factory.GetNodeEventLoopFactory("pi3");
3478 const size_t pi3_index = configuration::GetNodeIndex(
3479 event_loop_factory.configuration(), pi3->node());
3480
3481 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003482 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003483 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003484 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003485 const std::string kLogfile2_2 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003486 aos::testing::TestTmpDir() + "/logs/multi_logfile2.2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003487 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003488 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003489 {
3490 CHECK_EQ(pi1_index, 0u);
3491 CHECK_EQ(pi2_index, 1u);
3492 CHECK_EQ(pi3_index, 2u);
3493
3494 time_converter.AddNextTimestamp(
3495 distributed_clock::epoch(),
3496 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3497 BootTimestamp::epoch()});
3498 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3499 time_converter.AddNextTimestamp(
3500 distributed_clock::epoch() + reboot_time,
3501 {BootTimestamp::epoch() + reboot_time,
3502 BootTimestamp{.boot = 1,
3503 .time = monotonic_clock::epoch() + reboot_time},
3504 BootTimestamp::epoch() + reboot_time});
3505 }
3506
3507 std::vector<std::string> filenames;
3508 {
3509 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003510 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3511 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003512 LoggerState pi3_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003513 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3514 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003515 {
3516 // And now start the logger.
3517 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003518 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3519 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003520
3521 pi1_logger.StartLogger(kLogfile1_1);
3522 pi3_logger.StartLogger(kLogfile3_1);
3523 pi2_logger.StartLogger(kLogfile2_1);
3524
3525 event_loop_factory.RunFor(chrono::milliseconds(1005));
3526
3527 // Now that we've got a start time in the past, turn on data.
3528 std::unique_ptr<aos::EventLoop> ping_event_loop =
3529 pi1->MakeEventLoop("ping");
3530 Ping ping(ping_event_loop.get());
3531
3532 pi2->AlwaysStart<Pong>("pong");
3533
3534 event_loop_factory.RunFor(chrono::milliseconds(3000));
3535
3536 pi2_logger.AppendAllFilenames(&filenames);
3537 }
3538 event_loop_factory.RunFor(chrono::milliseconds(995));
3539 // pi2 now reboots at 5 seconds.
3540 {
3541 event_loop_factory.RunFor(chrono::milliseconds(1000));
3542
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003543 // Make local stuff happen before we start logging and connect the
3544 // remote.
Naman Guptaa63aa132023-03-22 20:06:34 -07003545 pi2->AlwaysStart<Pong>("pong");
3546 std::unique_ptr<aos::EventLoop> ping_event_loop =
3547 pi1->MakeEventLoop("ping");
3548 Ping ping(ping_event_loop.get());
3549 event_loop_factory.RunFor(chrono::milliseconds(5));
3550
3551 // Start logging again on pi2 after it is up.
3552 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003553 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3554 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003555 pi2_logger.StartLogger(kLogfile2_2);
3556
3557 event_loop_factory.RunFor(chrono::milliseconds(5000));
3558
3559 pi2_logger.AppendAllFilenames(&filenames);
3560 }
3561
3562 pi1_logger.AppendAllFilenames(&filenames);
3563 pi3_logger.AppendAllFilenames(&filenames);
3564 }
3565
3566 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003567 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003568 auto result = ConfirmReadable(filenames);
3569
3570 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3571 EXPECT_THAT(result[0].second,
3572 ::testing::ElementsAre(realtime_clock::epoch() +
3573 chrono::microseconds(11000350)));
3574
3575 EXPECT_THAT(result[1].first,
3576 ::testing::ElementsAre(
3577 realtime_clock::epoch(),
3578 realtime_clock::epoch() + chrono::microseconds(6005000)));
3579 EXPECT_THAT(result[1].second,
3580 ::testing::ElementsAre(
3581 realtime_clock::epoch() + chrono::microseconds(4900150),
3582 realtime_clock::epoch() + chrono::microseconds(11000200)));
3583
3584 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3585 EXPECT_THAT(result[2].second,
3586 ::testing::ElementsAre(realtime_clock::epoch() +
3587 chrono::microseconds(11000150)));
3588
3589 // Confirm we observed the correct start and stop times. We should see the
3590 // reboot here.
3591 auto start_stop_result = ConfirmReadable(
3592 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3593 realtime_clock::epoch() + chrono::milliseconds(8000));
3594
3595 EXPECT_THAT(
3596 start_stop_result[0].first,
3597 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3598 EXPECT_THAT(
3599 start_stop_result[0].second,
3600 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3601 EXPECT_THAT(start_stop_result[1].first,
3602 ::testing::ElementsAre(
3603 realtime_clock::epoch() + chrono::seconds(2),
3604 realtime_clock::epoch() + chrono::microseconds(6005000)));
3605 EXPECT_THAT(start_stop_result[1].second,
3606 ::testing::ElementsAre(
3607 realtime_clock::epoch() + chrono::microseconds(4900150),
3608 realtime_clock::epoch() + chrono::seconds(8)));
3609 EXPECT_THAT(
3610 start_stop_result[2].first,
3611 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3612 EXPECT_THAT(
3613 start_stop_result[2].second,
3614 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3615}
3616
3617// Tests that we properly handle one direction being down.
3618TEST(MissingDirectionTest, OneDirection) {
Austin Schuh7e417682023-08-11 17:05:30 -07003619 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3620 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3621
Naman Guptaa63aa132023-03-22 20:06:34 -07003622 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3623 aos::configuration::ReadConfig(ArtifactPath(
3624 "aos/events/logging/multinode_pingpong_split4_config.json"));
3625 message_bridge::TestingTimeConverter time_converter(
3626 configuration::NodesCount(&config.message()));
3627 SimulatedEventLoopFactory event_loop_factory(&config.message());
3628 event_loop_factory.SetTimeConverter(&time_converter);
3629
3630 NodeEventLoopFactory *const pi1 =
3631 event_loop_factory.GetNodeEventLoopFactory("pi1");
3632 const size_t pi1_index = configuration::GetNodeIndex(
3633 event_loop_factory.configuration(), pi1->node());
3634 NodeEventLoopFactory *const pi2 =
3635 event_loop_factory.GetNodeEventLoopFactory("pi2");
3636 const size_t pi2_index = configuration::GetNodeIndex(
3637 event_loop_factory.configuration(), pi2->node());
3638 std::vector<std::string> filenames;
3639
3640 {
3641 CHECK_EQ(pi1_index, 0u);
3642 CHECK_EQ(pi2_index, 1u);
3643
3644 time_converter.AddNextTimestamp(
3645 distributed_clock::epoch(),
3646 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3647
3648 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3649 time_converter.AddNextTimestamp(
3650 distributed_clock::epoch() + reboot_time,
3651 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3652 BootTimestamp::epoch() + reboot_time});
3653 }
3654
3655 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003656 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003657 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003658 aos::testing::TestTmpDir() + "/logs/multi_logfile1.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003659
3660 pi2->Disconnect(pi1->node());
3661
3662 pi1->AlwaysStart<Ping>("ping");
3663 pi2->AlwaysStart<Pong>("pong");
3664
3665 {
3666 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003667 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3668 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003669
3670 event_loop_factory.RunFor(chrono::milliseconds(95));
3671
3672 pi2_logger.StartLogger(kLogfile2_1);
3673
3674 event_loop_factory.RunFor(chrono::milliseconds(6000));
3675
3676 pi2->Connect(pi1->node());
3677
3678 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003679 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3680 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003681 pi1_logger.StartLogger(kLogfile1_1);
3682
3683 event_loop_factory.RunFor(chrono::milliseconds(5000));
3684 pi1_logger.AppendAllFilenames(&filenames);
3685 pi2_logger.AppendAllFilenames(&filenames);
3686 }
3687
3688 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003689 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003690 ConfirmReadable(filenames);
3691}
3692
3693// Tests that we properly handle only one direction ever existing after a
3694// reboot.
3695TEST(MissingDirectionTest, OneDirectionAfterReboot) {
Austin Schuh7e417682023-08-11 17:05:30 -07003696 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3697 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3698
Naman Guptaa63aa132023-03-22 20:06:34 -07003699 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3700 aos::configuration::ReadConfig(ArtifactPath(
3701 "aos/events/logging/multinode_pingpong_split4_config.json"));
3702 message_bridge::TestingTimeConverter time_converter(
3703 configuration::NodesCount(&config.message()));
3704 SimulatedEventLoopFactory event_loop_factory(&config.message());
3705 event_loop_factory.SetTimeConverter(&time_converter);
3706
3707 NodeEventLoopFactory *const pi1 =
3708 event_loop_factory.GetNodeEventLoopFactory("pi1");
3709 const size_t pi1_index = configuration::GetNodeIndex(
3710 event_loop_factory.configuration(), pi1->node());
3711 NodeEventLoopFactory *const pi2 =
3712 event_loop_factory.GetNodeEventLoopFactory("pi2");
3713 const size_t pi2_index = configuration::GetNodeIndex(
3714 event_loop_factory.configuration(), pi2->node());
3715 std::vector<std::string> filenames;
3716
3717 {
3718 CHECK_EQ(pi1_index, 0u);
3719 CHECK_EQ(pi2_index, 1u);
3720
3721 time_converter.AddNextTimestamp(
3722 distributed_clock::epoch(),
3723 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3724
3725 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3726 time_converter.AddNextTimestamp(
3727 distributed_clock::epoch() + reboot_time,
3728 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3729 BootTimestamp::epoch() + reboot_time});
3730 }
3731
3732 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003733 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003734
3735 pi1->AlwaysStart<Ping>("ping");
3736
3737 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3738 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3739 // second boot.
3740 {
3741 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003742 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3743 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003744
3745 event_loop_factory.RunFor(chrono::milliseconds(95));
3746
3747 pi2_logger.StartLogger(kLogfile2_1);
3748
3749 event_loop_factory.RunFor(chrono::milliseconds(4000));
3750
3751 pi2->Disconnect(pi1->node());
3752
3753 event_loop_factory.RunFor(chrono::milliseconds(1000));
3754 pi1->AlwaysStart<Ping>("ping");
3755
3756 event_loop_factory.RunFor(chrono::milliseconds(5000));
3757 pi2_logger.AppendAllFilenames(&filenames);
3758 }
3759
3760 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003761 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003762 ConfirmReadable(filenames);
3763}
3764
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003765// Tests that we properly handle only one direction ever existing after a
3766// reboot with only reliable data.
Naman Guptaa63aa132023-03-22 20:06:34 -07003767TEST(MissingDirectionTest, OneDirectionAfterRebootReliable) {
Austin Schuh7e417682023-08-11 17:05:30 -07003768 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3769 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3770
Naman Guptaa63aa132023-03-22 20:06:34 -07003771 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003772 aos::configuration::ReadConfig(
3773 ArtifactPath("aos/events/logging/"
3774 "multinode_pingpong_split4_reliable_config.json"));
Naman Guptaa63aa132023-03-22 20:06:34 -07003775 message_bridge::TestingTimeConverter time_converter(
3776 configuration::NodesCount(&config.message()));
3777 SimulatedEventLoopFactory event_loop_factory(&config.message());
3778 event_loop_factory.SetTimeConverter(&time_converter);
3779
3780 NodeEventLoopFactory *const pi1 =
3781 event_loop_factory.GetNodeEventLoopFactory("pi1");
3782 const size_t pi1_index = configuration::GetNodeIndex(
3783 event_loop_factory.configuration(), pi1->node());
3784 NodeEventLoopFactory *const pi2 =
3785 event_loop_factory.GetNodeEventLoopFactory("pi2");
3786 const size_t pi2_index = configuration::GetNodeIndex(
3787 event_loop_factory.configuration(), pi2->node());
3788 std::vector<std::string> filenames;
3789
3790 {
3791 CHECK_EQ(pi1_index, 0u);
3792 CHECK_EQ(pi2_index, 1u);
3793
3794 time_converter.AddNextTimestamp(
3795 distributed_clock::epoch(),
3796 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3797
3798 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3799 time_converter.AddNextTimestamp(
3800 distributed_clock::epoch() + reboot_time,
3801 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3802 BootTimestamp::epoch() + reboot_time});
3803 }
3804
3805 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003806 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003807
3808 pi1->AlwaysStart<Ping>("ping");
3809
3810 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3811 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3812 // second boot.
3813 {
3814 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003815 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3816 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003817
3818 event_loop_factory.RunFor(chrono::milliseconds(95));
3819
3820 pi2_logger.StartLogger(kLogfile2_1);
3821
3822 event_loop_factory.RunFor(chrono::milliseconds(4000));
3823
3824 pi2->Disconnect(pi1->node());
3825
3826 event_loop_factory.RunFor(chrono::milliseconds(1000));
3827 pi1->AlwaysStart<Ping>("ping");
3828
3829 event_loop_factory.RunFor(chrono::milliseconds(5000));
3830 pi2_logger.AppendAllFilenames(&filenames);
3831 }
3832
3833 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003834 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003835 ConfirmReadable(filenames);
3836}
3837
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003838// Tests that we properly handle only one direction ever existing after a
3839// reboot with mixed unreliable vs reliable, where reliable has an earlier
3840// timestamp than unreliable.
Brian Smartte67d7112023-03-20 12:06:30 -07003841TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase1) {
Austin Schuh7e417682023-08-11 17:05:30 -07003842 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3843 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3844
Brian Smartte67d7112023-03-20 12:06:30 -07003845 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3846 aos::configuration::ReadConfig(ArtifactPath(
3847 "aos/events/logging/multinode_pingpong_split4_mixed1_config.json"));
3848 message_bridge::TestingTimeConverter time_converter(
3849 configuration::NodesCount(&config.message()));
3850 SimulatedEventLoopFactory event_loop_factory(&config.message());
3851 event_loop_factory.SetTimeConverter(&time_converter);
3852
3853 NodeEventLoopFactory *const pi1 =
3854 event_loop_factory.GetNodeEventLoopFactory("pi1");
3855 const size_t pi1_index = configuration::GetNodeIndex(
3856 event_loop_factory.configuration(), pi1->node());
3857 NodeEventLoopFactory *const pi2 =
3858 event_loop_factory.GetNodeEventLoopFactory("pi2");
3859 const size_t pi2_index = configuration::GetNodeIndex(
3860 event_loop_factory.configuration(), pi2->node());
3861 std::vector<std::string> filenames;
3862
3863 {
3864 CHECK_EQ(pi1_index, 0u);
3865 CHECK_EQ(pi2_index, 1u);
3866
3867 time_converter.AddNextTimestamp(
3868 distributed_clock::epoch(),
3869 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3870
3871 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3872 time_converter.AddNextTimestamp(
3873 distributed_clock::epoch() + reboot_time,
3874 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3875 BootTimestamp::epoch() + reboot_time});
3876 }
3877
3878 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003879 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Brian Smartte67d7112023-03-20 12:06:30 -07003880
3881 // The following sequence using the above reference config creates
3882 // a reliable message timestamp < unreliable message timestamp.
3883 {
3884 pi1->DisableStatistics();
3885 pi2->DisableStatistics();
3886
3887 event_loop_factory.RunFor(chrono::milliseconds(95));
3888
3889 pi1->AlwaysStart<Ping>("ping");
3890
3891 event_loop_factory.RunFor(chrono::milliseconds(5250));
3892
3893 pi1->EnableStatistics();
3894
3895 event_loop_factory.RunFor(chrono::milliseconds(1000));
3896
3897 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003898 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3899 FileStrategy::kKeepSeparate);
Brian Smartte67d7112023-03-20 12:06:30 -07003900
3901 pi2_logger.StartLogger(kLogfile2_1);
3902
3903 event_loop_factory.RunFor(chrono::milliseconds(5000));
3904 pi2_logger.AppendAllFilenames(&filenames);
3905 }
3906
3907 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003908 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Brian Smartte67d7112023-03-20 12:06:30 -07003909 ConfirmReadable(filenames);
3910}
3911
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003912// Tests that we properly handle only one direction ever existing after a
3913// reboot with mixed unreliable vs reliable, where unreliable has an earlier
3914// timestamp than reliable.
Brian Smartte67d7112023-03-20 12:06:30 -07003915TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase2) {
Austin Schuh7e417682023-08-11 17:05:30 -07003916 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3917 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3918
Brian Smartte67d7112023-03-20 12:06:30 -07003919 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3920 aos::configuration::ReadConfig(ArtifactPath(
3921 "aos/events/logging/multinode_pingpong_split4_mixed2_config.json"));
3922 message_bridge::TestingTimeConverter time_converter(
3923 configuration::NodesCount(&config.message()));
3924 SimulatedEventLoopFactory event_loop_factory(&config.message());
3925 event_loop_factory.SetTimeConverter(&time_converter);
3926
3927 NodeEventLoopFactory *const pi1 =
3928 event_loop_factory.GetNodeEventLoopFactory("pi1");
3929 const size_t pi1_index = configuration::GetNodeIndex(
3930 event_loop_factory.configuration(), pi1->node());
3931 NodeEventLoopFactory *const pi2 =
3932 event_loop_factory.GetNodeEventLoopFactory("pi2");
3933 const size_t pi2_index = configuration::GetNodeIndex(
3934 event_loop_factory.configuration(), pi2->node());
3935 std::vector<std::string> filenames;
3936
3937 {
3938 CHECK_EQ(pi1_index, 0u);
3939 CHECK_EQ(pi2_index, 1u);
3940
3941 time_converter.AddNextTimestamp(
3942 distributed_clock::epoch(),
3943 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3944
3945 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3946 time_converter.AddNextTimestamp(
3947 distributed_clock::epoch() + reboot_time,
3948 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3949 BootTimestamp::epoch() + reboot_time});
3950 }
3951
3952 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003953 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Brian Smartte67d7112023-03-20 12:06:30 -07003954
3955 // The following sequence using the above reference config creates
3956 // an unreliable message timestamp < reliable message timestamp.
3957 {
3958 pi1->DisableStatistics();
3959 pi2->DisableStatistics();
3960
3961 event_loop_factory.RunFor(chrono::milliseconds(95));
3962
3963 pi1->AlwaysStart<Ping>("ping");
3964
3965 event_loop_factory.RunFor(chrono::milliseconds(5250));
3966
3967 pi1->EnableStatistics();
3968
3969 event_loop_factory.RunFor(chrono::milliseconds(1000));
3970
3971 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003972 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3973 FileStrategy::kKeepSeparate);
Brian Smartte67d7112023-03-20 12:06:30 -07003974
3975 pi2_logger.StartLogger(kLogfile2_1);
3976
3977 event_loop_factory.RunFor(chrono::milliseconds(5000));
3978 pi2_logger.AppendAllFilenames(&filenames);
3979 }
3980
3981 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003982 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Brian Smartte67d7112023-03-20 12:06:30 -07003983 ConfirmReadable(filenames);
3984}
3985
Naman Guptaa63aa132023-03-22 20:06:34 -07003986// Tests that we properly handle what used to be a time violation in one
3987// direction. This can occur when one direction goes down after sending some
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003988// data, but the other keeps working. The down direction ends up resolving to
3989// a straight line in the noncausal filter, where the direction which is still
3990// up can cross that line. Really, time progressed along just fine but we
3991// assumed that the offset was a line when it could have deviated by up to
3992// 1ms/second.
Naman Guptaa63aa132023-03-22 20:06:34 -07003993TEST_P(MultinodeLoggerTest, OneDirectionTimeDrift) {
3994 std::vector<std::string> filenames;
3995
3996 CHECK_EQ(pi1_index_, 0u);
3997 CHECK_EQ(pi2_index_, 1u);
3998
3999 time_converter_.AddNextTimestamp(
4000 distributed_clock::epoch(),
4001 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4002
4003 const chrono::nanoseconds before_disconnect_duration =
4004 time_converter_.AddMonotonic(
4005 {chrono::milliseconds(1000), chrono::milliseconds(1000)});
4006
4007 const chrono::nanoseconds test_duration =
4008 time_converter_.AddMonotonic(
4009 {chrono::milliseconds(1000), chrono::milliseconds(1000)}) +
4010 time_converter_.AddMonotonic(
4011 {chrono::milliseconds(10000),
4012 chrono::milliseconds(10000) - chrono::milliseconds(5)}) +
4013 time_converter_.AddMonotonic(
4014 {chrono::milliseconds(10000),
4015 chrono::milliseconds(10000) + chrono::milliseconds(5)});
4016
4017 const std::string kLogfile =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004018 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004019
4020 {
4021 LoggerState pi2_logger = MakeLogger(pi2_);
4022 pi2_logger.StartLogger(kLogfile);
4023 event_loop_factory_.RunFor(before_disconnect_duration);
4024
4025 pi2_->Disconnect(pi1_->node());
4026
4027 event_loop_factory_.RunFor(test_duration);
4028 pi2_->Connect(pi1_->node());
4029
4030 event_loop_factory_.RunFor(chrono::milliseconds(5000));
4031 pi2_logger.AppendAllFilenames(&filenames);
4032 }
4033
4034 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004035 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004036 ConfirmReadable(filenames);
4037}
4038
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004039// Tests that we can replay a logfile that has timestamps such that at least
4040// one node's epoch is at a positive distributed_clock (and thus will have to
4041// be booted after the other node(s)).
Naman Guptaa63aa132023-03-22 20:06:34 -07004042TEST_P(MultinodeLoggerTest, StartOneNodeBeforeOther) {
4043 std::vector<std::string> filenames;
4044
4045 CHECK_EQ(pi1_index_, 0u);
4046 CHECK_EQ(pi2_index_, 1u);
4047
4048 time_converter_.AddNextTimestamp(
4049 distributed_clock::epoch(),
4050 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4051
4052 const chrono::nanoseconds before_reboot_duration = chrono::milliseconds(1000);
4053 time_converter_.RebootAt(
4054 0, distributed_clock::time_point(before_reboot_duration));
4055
4056 const chrono::nanoseconds test_duration = time_converter_.AddMonotonic(
4057 {chrono::milliseconds(10000), chrono::milliseconds(10000)});
4058
4059 const std::string kLogfile =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004060 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004061
4062 pi2_->Disconnect(pi1_->node());
4063 pi1_->Disconnect(pi2_->node());
4064
4065 {
4066 LoggerState pi2_logger = MakeLogger(pi2_);
4067
4068 pi2_logger.StartLogger(kLogfile);
4069 event_loop_factory_.RunFor(before_reboot_duration);
4070
4071 pi2_->Connect(pi1_->node());
4072 pi1_->Connect(pi2_->node());
4073
4074 event_loop_factory_.RunFor(test_duration);
4075
4076 pi2_logger.AppendAllFilenames(&filenames);
4077 }
4078
4079 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004080 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004081 ConfirmReadable(filenames);
4082
4083 {
4084 LogReader reader(sorted_parts);
4085 SimulatedEventLoopFactory replay_factory(reader.configuration());
4086 reader.RegisterWithoutStarting(&replay_factory);
4087
4088 NodeEventLoopFactory *const replay_node =
4089 reader.event_loop_factory()->GetNodeEventLoopFactory("pi1");
4090
4091 std::unique_ptr<EventLoop> test_event_loop =
4092 replay_node->MakeEventLoop("test_reader");
4093 replay_node->OnStartup([replay_node]() {
4094 // Check that we didn't boot until at least t=0.
4095 CHECK_LE(monotonic_clock::epoch(), replay_node->monotonic_now());
4096 });
4097 test_event_loop->OnRun([&test_event_loop]() {
4098 // Check that we didn't boot until at least t=0.
4099 EXPECT_LE(monotonic_clock::epoch(), test_event_loop->monotonic_now());
4100 });
4101 reader.event_loop_factory()->Run();
4102 reader.Deregister();
4103 }
4104}
4105
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004106// Tests that when we have a loop without all the logs at all points in time,
4107// we can sort it properly.
Naman Guptaa63aa132023-03-22 20:06:34 -07004108TEST(MultinodeLoggerLoopTest, Loop) {
Austin Schuh7e417682023-08-11 17:05:30 -07004109 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4110 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4111
Naman Guptaa63aa132023-03-22 20:06:34 -07004112 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004113 aos::configuration::ReadConfig(
4114 ArtifactPath("aos/events/logging/"
4115 "multinode_pingpong_triangle_split_config.json"));
Naman Guptaa63aa132023-03-22 20:06:34 -07004116 message_bridge::TestingTimeConverter time_converter(
4117 configuration::NodesCount(&config.message()));
4118 SimulatedEventLoopFactory event_loop_factory(&config.message());
4119 event_loop_factory.SetTimeConverter(&time_converter);
4120
4121 NodeEventLoopFactory *const pi1 =
4122 event_loop_factory.GetNodeEventLoopFactory("pi1");
4123 NodeEventLoopFactory *const pi2 =
4124 event_loop_factory.GetNodeEventLoopFactory("pi2");
4125 NodeEventLoopFactory *const pi3 =
4126 event_loop_factory.GetNodeEventLoopFactory("pi3");
4127
4128 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004129 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004130 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004131 aos::testing::TestTmpDir() + "/logs/multi_logfile2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004132 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004133 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004134
4135 {
4136 // Make pi1 boot before everything else.
4137 time_converter.AddNextTimestamp(
4138 distributed_clock::epoch(),
4139 {BootTimestamp::epoch(),
4140 BootTimestamp::epoch() - chrono::milliseconds(100),
4141 BootTimestamp::epoch() - chrono::milliseconds(300)});
4142 }
4143
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004144 // We want to setup a situation such that 2 of the 3 legs of the loop are
4145 // very confident about time being X, and the third leg is pulling the
4146 // average off to one side.
Naman Guptaa63aa132023-03-22 20:06:34 -07004147 //
4148 // It's easiest to visualize this in timestamp_plotter.
4149
4150 std::vector<std::string> filenames;
4151 {
4152 // Have pi1 send out a reliable message at startup. This sets up a long
4153 // forwarding time message at the start to bias time.
4154 std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
4155 {
4156 aos::Sender<examples::Ping> ping_sender =
4157 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
4158
4159 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
4160 examples::Ping::Builder ping_builder =
4161 builder.MakeBuilder<examples::Ping>();
4162 CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
4163 }
4164
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004165 // Wait a while so there's enough data to let the worst case be rather
4166 // off.
Naman Guptaa63aa132023-03-22 20:06:34 -07004167 event_loop_factory.RunFor(chrono::seconds(1000));
4168
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004169 // Now start a receiving node first. This sets up 2 tight bounds between
4170 // 2 of the nodes.
Naman Guptaa63aa132023-03-22 20:06:34 -07004171 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004172 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4173 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004174 pi2_logger.StartLogger(kLogfile2_1);
4175
4176 event_loop_factory.RunFor(chrono::seconds(100));
4177
4178 // And now start the third leg.
4179 LoggerState pi3_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004180 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4181 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004182 pi3_logger.StartLogger(kLogfile3_1);
4183
4184 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004185 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4186 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004187 pi1_logger.StartLogger(kLogfile1_1);
4188
4189 event_loop_factory.RunFor(chrono::seconds(100));
4190
4191 pi1_logger.AppendAllFilenames(&filenames);
4192 pi2_logger.AppendAllFilenames(&filenames);
4193 pi3_logger.AppendAllFilenames(&filenames);
4194 }
4195
4196 // Make sure we can read this.
4197 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004198 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004199 auto result = ConfirmReadable(filenames);
4200}
4201
Austin Schuh08dba8f2023-05-01 08:29:30 -07004202// Tests that RestartLogging works in the simple case. Unfortunately, the
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004203// failure cases involve simulating time elapsing in callbacks, which is
4204// really hard. The best we can reasonably do is make sure 2 back to back
4205// logs are parseable together.
Austin Schuh08dba8f2023-05-01 08:29:30 -07004206TEST_P(MultinodeLoggerTest, RestartLogging) {
4207 time_converter_.AddMonotonic(
4208 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
4209 std::vector<std::string> filenames;
4210 {
4211 LoggerState pi1_logger = MakeLogger(pi1_);
4212
4213 event_loop_factory_.RunFor(chrono::milliseconds(95));
4214
4215 StartLogger(&pi1_logger, logfile_base1_);
4216 aos::monotonic_clock::time_point last_rotation_time =
4217 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07004218 pi1_logger.logger->set_on_logged_period(
4219 [&](aos::monotonic_clock::time_point) {
4220 const auto now = pi1_logger.event_loop->monotonic_now();
4221 if (now > last_rotation_time + std::chrono::seconds(5)) {
4222 pi1_logger.AppendAllFilenames(&filenames);
4223 std::unique_ptr<MultiNodeFilesLogNamer> namer =
4224 pi1_logger.MakeLogNamer(logfile_base2_);
4225 pi1_logger.log_namer = namer.get();
Austin Schuh08dba8f2023-05-01 08:29:30 -07004226
Austin Schuh2f864452023-07-17 14:53:08 -07004227 pi1_logger.logger->RestartLogging(std::move(namer));
4228 last_rotation_time = now;
4229 }
4230 });
Austin Schuh08dba8f2023-05-01 08:29:30 -07004231
4232 event_loop_factory_.RunFor(chrono::milliseconds(7000));
4233
4234 pi1_logger.AppendAllFilenames(&filenames);
4235 }
4236
4237 for (const auto &x : filenames) {
4238 LOG(INFO) << x;
4239 }
4240
4241 EXPECT_GE(filenames.size(), 2u);
4242
4243 ConfirmReadable(filenames);
4244
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004245 // TODO(austin): It would be good to confirm that any one time messages end
4246 // up in both logs correctly.
Austin Schuh08dba8f2023-05-01 08:29:30 -07004247}
4248
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004249// Tests that when we have evidence of 2 boots, and then start logging, the
4250// max_out_of_order_duration ends up reasonable on the boot with the start time.
4251TEST(MultinodeLoggerLoopTest, PreviousBootData) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07004252 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4253 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4254
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004255 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4256 aos::configuration::ReadConfig(ArtifactPath(
4257 "aos/events/logging/multinode_pingpong_reboot_ooo_config.json"));
4258 message_bridge::TestingTimeConverter time_converter(
4259 configuration::NodesCount(&config.message()));
4260 SimulatedEventLoopFactory event_loop_factory(&config.message());
4261 event_loop_factory.SetTimeConverter(&time_converter);
4262
4263 const UUID pi1_boot0 = UUID::Random();
4264 const UUID pi2_boot0 = UUID::Random();
4265 const UUID pi2_boot1 = UUID::Random();
4266
4267 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004268 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004269
4270 {
4271 constexpr size_t kPi1Index = 0;
4272 constexpr size_t kPi2Index = 1;
4273 time_converter.set_boot_uuid(kPi1Index, 0, pi1_boot0);
4274 time_converter.set_boot_uuid(kPi2Index, 0, pi2_boot0);
4275 time_converter.set_boot_uuid(kPi2Index, 1, pi2_boot1);
4276
4277 // Make pi1 boot before everything else.
4278 time_converter.AddNextTimestamp(
4279 distributed_clock::epoch(),
4280 {BootTimestamp::epoch(),
4281 BootTimestamp::epoch() - chrono::milliseconds(100)});
4282
4283 const chrono::nanoseconds reboot_time = chrono::seconds(1005);
4284 time_converter.AddNextTimestamp(
4285 distributed_clock::epoch() + reboot_time,
4286 {BootTimestamp::epoch() + reboot_time,
4287 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()}});
4288 }
4289
4290 NodeEventLoopFactory *const pi1 =
4291 event_loop_factory.GetNodeEventLoopFactory("pi1");
4292 NodeEventLoopFactory *const pi2 =
4293 event_loop_factory.GetNodeEventLoopFactory("pi2");
4294
4295 // What we want is for pi2 to send a message at t=1000 on the first channel
4296 // (/atest1 pong), and t=1 on the second channel (/atest3 pong). That'll make
4297 // the max out of order duration be large.
4298 //
4299 // Then, we reboot, and only send messages on a third channel (/atest2 pong).
4300 // The order is key, they need to sort in this order in the config.
4301
4302 std::vector<std::string> filenames;
4303 {
4304 {
4305 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4306 aos::Sender<examples::Pong> pong_sender =
4307 pi2_event_loop->MakeSender<examples::Pong>("/atest3");
4308
4309 pi2_event_loop->OnRun([&]() {
4310 aos::Sender<examples::Pong>::Builder builder =
4311 pong_sender.MakeBuilder();
4312 examples::Pong::Builder pong_builder =
4313 builder.MakeBuilder<examples::Pong>();
4314 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4315 });
4316
4317 event_loop_factory.RunFor(chrono::seconds(1000));
4318 }
4319
4320 {
4321 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4322 aos::Sender<examples::Pong> pong_sender =
4323 pi2_event_loop->MakeSender<examples::Pong>("/atest1");
4324
4325 aos::Sender<examples::Pong>::Builder builder = pong_sender.MakeBuilder();
4326 examples::Pong::Builder pong_builder =
4327 builder.MakeBuilder<examples::Pong>();
4328 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4329 }
4330
4331 event_loop_factory.RunFor(chrono::seconds(10));
4332
4333 // Now start a receiving node first. This sets up 2 tight bounds between
4334 // 2 of the nodes.
4335 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004336 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4337 FileStrategy::kKeepSeparate);
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004338 pi1_logger.StartLogger(kLogfile1_1);
4339
4340 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4341 aos::Sender<examples::Pong> pong_sender =
4342 pi2_event_loop->MakeSender<examples::Pong>("/atest2");
4343
4344 pi2_event_loop->AddPhasedLoop(
4345 [&pong_sender](int) {
4346 aos::Sender<examples::Pong>::Builder builder =
4347 pong_sender.MakeBuilder();
4348 examples::Pong::Builder pong_builder =
4349 builder.MakeBuilder<examples::Pong>();
4350 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4351 },
4352 chrono::milliseconds(10));
4353
4354 event_loop_factory.RunFor(chrono::seconds(100));
4355
4356 pi1_logger.AppendAllFilenames(&filenames);
4357 }
4358
4359 // Make sure we can read this.
4360 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4361 EXPECT_TRUE(AllRebootPartsMatchOutOfOrderDuration(sorted_parts, "pi2"));
4362 auto result = ConfirmReadable(filenames);
4363}
4364
4365// Tests that when we start without a connection, and then start logging, the
4366// max_out_of_order_duration ends up reasonable.
4367TEST(MultinodeLoggerLoopTest, StartDisconnected) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07004368 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4369 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4370
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004371 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4372 aos::configuration::ReadConfig(ArtifactPath(
4373 "aos/events/logging/multinode_pingpong_reboot_ooo_config.json"));
4374 message_bridge::TestingTimeConverter time_converter(
4375 configuration::NodesCount(&config.message()));
4376 SimulatedEventLoopFactory event_loop_factory(&config.message());
4377 event_loop_factory.SetTimeConverter(&time_converter);
4378
4379 time_converter.StartEqual();
4380
4381 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004382 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004383
4384 NodeEventLoopFactory *const pi1 =
4385 event_loop_factory.GetNodeEventLoopFactory("pi1");
4386 NodeEventLoopFactory *const pi2 =
4387 event_loop_factory.GetNodeEventLoopFactory("pi2");
4388
4389 // What we want is for pi2 to send a message at t=1000 on the first channel
4390 // (/atest1 pong), and t=1 on the second channel (/atest3 pong). That'll make
4391 // the max out of order duration be large.
4392 //
4393 // Then, we disconnect, and only send messages on a third channel
4394 // (/atest2 pong). The order is key, they need to sort in this order in the
4395 // config so we observe them in the order which grows the
4396 // max_out_of_order_duration.
4397
4398 std::vector<std::string> filenames;
4399 {
4400 {
4401 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4402 aos::Sender<examples::Pong> pong_sender =
4403 pi2_event_loop->MakeSender<examples::Pong>("/atest3");
4404
4405 pi2_event_loop->OnRun([&]() {
4406 aos::Sender<examples::Pong>::Builder builder =
4407 pong_sender.MakeBuilder();
4408 examples::Pong::Builder pong_builder =
4409 builder.MakeBuilder<examples::Pong>();
4410 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4411 });
4412
4413 event_loop_factory.RunFor(chrono::seconds(1000));
4414 }
4415
4416 {
4417 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4418 aos::Sender<examples::Pong> pong_sender =
4419 pi2_event_loop->MakeSender<examples::Pong>("/atest1");
4420
4421 aos::Sender<examples::Pong>::Builder builder = pong_sender.MakeBuilder();
4422 examples::Pong::Builder pong_builder =
4423 builder.MakeBuilder<examples::Pong>();
4424 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4425 }
4426
4427 event_loop_factory.RunFor(chrono::seconds(10));
4428
4429 pi1->Disconnect(pi2->node());
4430 pi2->Disconnect(pi1->node());
4431
4432 // Make data flow.
4433 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4434 aos::Sender<examples::Pong> pong_sender =
4435 pi2_event_loop->MakeSender<examples::Pong>("/atest2");
4436
4437 pi2_event_loop->AddPhasedLoop(
4438 [&pong_sender](int) {
4439 aos::Sender<examples::Pong>::Builder builder =
4440 pong_sender.MakeBuilder();
4441 examples::Pong::Builder pong_builder =
4442 builder.MakeBuilder<examples::Pong>();
4443 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4444 },
4445 chrono::milliseconds(10));
4446
4447 event_loop_factory.RunFor(chrono::seconds(10));
4448
4449 // Now start a receiving node first. This sets up 2 tight bounds between
4450 // 2 of the nodes.
4451 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004452 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4453 FileStrategy::kKeepSeparate);
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004454 pi1_logger.StartLogger(kLogfile1_1);
4455
4456 event_loop_factory.RunFor(chrono::seconds(10));
4457
4458 // Now, reconnect, and everything should recover.
4459 pi1->Connect(pi2->node());
4460 pi2->Connect(pi1->node());
4461
4462 event_loop_factory.RunFor(chrono::seconds(10));
4463
4464 pi1_logger.AppendAllFilenames(&filenames);
4465 }
4466
4467 // Make sure we can read this.
4468 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4469 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
4470 auto result = ConfirmReadable(filenames);
4471}
4472
Austin Schuh1124c512023-08-01 15:20:44 -07004473// Class to spam Pong messages blindly.
4474class PongSender {
4475 public:
4476 PongSender(EventLoop *loop, std::string_view channel_name)
4477 : sender_(loop->MakeSender<examples::Pong>(channel_name)) {
4478 loop->AddPhasedLoop(
4479 [this](int) {
4480 aos::Sender<examples::Pong>::Builder builder = sender_.MakeBuilder();
4481 examples::Pong::Builder pong_builder =
4482 builder.MakeBuilder<examples::Pong>();
4483 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4484 },
4485 chrono::milliseconds(10));
4486 }
4487
4488 private:
4489 aos::Sender<examples::Pong> sender_;
4490};
4491
4492// Tests that we log correctly as nodes connect slowly.
4493TEST(MultinodeLoggerLoopTest, StaggeredConnect) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07004494 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4495 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4496
Austin Schuh1124c512023-08-01 15:20:44 -07004497 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4498 aos::configuration::ReadConfig(ArtifactPath(
4499 "aos/events/logging/multinode_pingpong_pi3_pingpong_config.json"));
4500 message_bridge::TestingTimeConverter time_converter(
4501 configuration::NodesCount(&config.message()));
4502 SimulatedEventLoopFactory event_loop_factory(&config.message());
4503 event_loop_factory.SetTimeConverter(&time_converter);
4504
4505 time_converter.StartEqual();
4506
4507 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004508 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Austin Schuh1124c512023-08-01 15:20:44 -07004509
4510 NodeEventLoopFactory *const pi1 =
4511 event_loop_factory.GetNodeEventLoopFactory("pi1");
4512 NodeEventLoopFactory *const pi2 =
4513 event_loop_factory.GetNodeEventLoopFactory("pi2");
4514 NodeEventLoopFactory *const pi3 =
4515 event_loop_factory.GetNodeEventLoopFactory("pi3");
4516
4517 // What we want is for pi2 to send a message at t=1000 on the first channel
4518 // (/atest1 pong), and t=1 on the second channel (/atest3 pong). That'll make
4519 // the max out of order duration be large.
4520 //
4521 // Then, we disconnect, and only send messages on a third channel
4522 // (/atest2 pong). The order is key, they need to sort in this order in the
4523 // config so we observe them in the order which grows the
4524 // max_out_of_order_duration.
4525
4526 pi1->Disconnect(pi2->node());
4527 pi2->Disconnect(pi1->node());
4528
4529 pi1->Disconnect(pi3->node());
4530 pi3->Disconnect(pi1->node());
4531
4532 std::vector<std::string> filenames;
4533 pi2->AlwaysStart<PongSender>("pongsender", "/test2");
4534 pi3->AlwaysStart<PongSender>("pongsender", "/test3");
4535
4536 event_loop_factory.RunFor(chrono::seconds(10));
4537
4538 {
4539 // Now start a receiving node first. This sets up 2 tight bounds between
4540 // 2 of the nodes.
4541 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004542 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4543 FileStrategy::kKeepSeparate);
Austin Schuh1124c512023-08-01 15:20:44 -07004544 pi1_logger.StartLogger(kLogfile1_1);
4545
4546 event_loop_factory.RunFor(chrono::seconds(10));
4547
4548 // Now, reconnect, and everything should recover.
4549 pi1->Connect(pi2->node());
4550 pi2->Connect(pi1->node());
4551
4552 event_loop_factory.RunFor(chrono::seconds(10));
4553
4554 pi1->Connect(pi3->node());
4555 pi3->Connect(pi1->node());
4556
4557 event_loop_factory.RunFor(chrono::seconds(10));
4558
4559 pi1_logger.AppendAllFilenames(&filenames);
4560 }
4561
4562 // Make sure we can read this.
4563 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4564 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
4565 auto result = ConfirmReadable(filenames);
4566}
4567
Naman Guptaa63aa132023-03-22 20:06:34 -07004568} // namespace testing
4569} // namespace logger
4570} // namespace aos