blob: 3c438e35af64072291c44c556ab1ad9935cb7a47 [file] [log] [blame]
Brian Smartte67d7112023-03-20 12:06:30 -07001#include <algorithm>
2
Philipp Schrader790cb542023-07-05 21:06:52 -07003#include "gmock/gmock.h"
4#include "gtest/gtest.h"
5
Naman Guptaa63aa132023-03-22 20:06:34 -07006#include "aos/events/logging/log_reader.h"
7#include "aos/events/logging/multinode_logger_test_lib.h"
8#include "aos/events/message_counter.h"
9#include "aos/events/ping_lib.h"
10#include "aos/events/pong_lib.h"
11#include "aos/network/remote_message_generated.h"
12#include "aos/network/timestamp_generated.h"
13#include "aos/testing/tmpdir.h"
Naman Guptaa63aa132023-03-22 20:06:34 -070014
Stephan Pleinesf63bde82024-01-13 15:59:33 -080015namespace aos::logger::testing {
Naman Guptaa63aa132023-03-22 20:06:34 -070016
17namespace chrono = std::chrono;
18using aos::message_bridge::RemoteMessage;
19using aos::testing::ArtifactPath;
20using aos::testing::MessageCounter;
21
Naman Guptaa63aa132023-03-22 20:06:34 -070022INSTANTIATE_TEST_SUITE_P(
23 All, MultinodeLoggerTest,
24 ::testing::Combine(
25 ::testing::Values(
26 ConfigParams{"multinode_pingpong_combined_config.json", true,
Austin Schuh6ecfe902023-08-04 22:44:37 -070027 kCombinedConfigSha1(), kCombinedConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070028 FileStrategy::kKeepSeparate,
29 ForceTimestampBuffering::kForceBufferTimestamps},
30 ConfigParams{"multinode_pingpong_combined_config.json", true,
31 kCombinedConfigSha1(), kCombinedConfigSha1(),
32 FileStrategy::kKeepSeparate,
33 ForceTimestampBuffering::kAutoBuffer},
Naman Guptaa63aa132023-03-22 20:06:34 -070034 ConfigParams{"multinode_pingpong_split_config.json", false,
Austin Schuh6ecfe902023-08-04 22:44:37 -070035 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070036 FileStrategy::kKeepSeparate,
37 ForceTimestampBuffering::kForceBufferTimestamps},
Austin Schuh6ecfe902023-08-04 22:44:37 -070038 ConfigParams{"multinode_pingpong_split_config.json", false,
39 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070040 FileStrategy::kKeepSeparate,
41 ForceTimestampBuffering::kAutoBuffer},
42 ConfigParams{"multinode_pingpong_split_config.json", false,
43 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
44 FileStrategy::kCombine,
45 ForceTimestampBuffering::kForceBufferTimestamps},
46 ConfigParams{"multinode_pingpong_split_config.json", false,
47 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
48 FileStrategy::kCombine,
49 ForceTimestampBuffering::kAutoBuffer}),
Naman Guptaa63aa132023-03-22 20:06:34 -070050 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
51
52INSTANTIATE_TEST_SUITE_P(
53 All, MultinodeLoggerDeathTest,
54 ::testing::Combine(
55 ::testing::Values(
56 ConfigParams{"multinode_pingpong_combined_config.json", true,
Austin Schuh6ecfe902023-08-04 22:44:37 -070057 kCombinedConfigSha1(), kCombinedConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070058 FileStrategy::kKeepSeparate,
59 ForceTimestampBuffering::kForceBufferTimestamps},
60 ConfigParams{"multinode_pingpong_combined_config.json", true,
61 kCombinedConfigSha1(), kCombinedConfigSha1(),
62 FileStrategy::kKeepSeparate,
63 ForceTimestampBuffering::kAutoBuffer},
Naman Guptaa63aa132023-03-22 20:06:34 -070064 ConfigParams{"multinode_pingpong_split_config.json", false,
Austin Schuh6ecfe902023-08-04 22:44:37 -070065 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070066 FileStrategy::kKeepSeparate,
67 ForceTimestampBuffering::kForceBufferTimestamps},
Austin Schuh6ecfe902023-08-04 22:44:37 -070068 ConfigParams{"multinode_pingpong_split_config.json", false,
69 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070070 FileStrategy::kKeepSeparate,
71 ForceTimestampBuffering::kAutoBuffer},
72 ConfigParams{"multinode_pingpong_split_config.json", false,
73 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
74 FileStrategy::kCombine,
75 ForceTimestampBuffering::kForceBufferTimestamps},
76 ConfigParams{"multinode_pingpong_split_config.json", false,
77 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
78 FileStrategy::kCombine,
79 ForceTimestampBuffering::kAutoBuffer}),
Naman Guptaa63aa132023-03-22 20:06:34 -070080 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
81
82// Tests that we can write and read simple multi-node log files.
83TEST_P(MultinodeLoggerTest, SimpleMultiNode) {
Austin Schuh6ecfe902023-08-04 22:44:37 -070084 if (file_strategy() == FileStrategy::kCombine) {
85 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
86 }
87
Naman Guptaa63aa132023-03-22 20:06:34 -070088 std::vector<std::string> actual_filenames;
89 time_converter_.StartEqual();
90
91 {
92 LoggerState pi1_logger = MakeLogger(pi1_);
93 LoggerState pi2_logger = MakeLogger(pi2_);
94
95 event_loop_factory_.RunFor(chrono::milliseconds(95));
96
97 StartLogger(&pi1_logger);
98 StartLogger(&pi2_logger);
99
100 event_loop_factory_.RunFor(chrono::milliseconds(20000));
101 pi1_logger.AppendAllFilenames(&actual_filenames);
102 pi2_logger.AppendAllFilenames(&actual_filenames);
103 }
104
105 ASSERT_THAT(actual_filenames,
106 ::testing::UnorderedElementsAreArray(logfiles_));
107
108 {
109 std::set<std::string> logfile_uuids;
110 std::set<std::string> parts_uuids;
111 // Confirm that we have the expected number of UUIDs for both the logfile
112 // UUIDs and parts UUIDs.
113 std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
114 for (std::string_view f : logfiles_) {
115 log_header.emplace_back(ReadHeader(f).value());
116 if (!log_header.back().message().has_configuration()) {
117 logfile_uuids.insert(
118 log_header.back().message().log_event_uuid()->str());
119 parts_uuids.insert(log_header.back().message().parts_uuid()->str());
120 }
121 }
122
123 EXPECT_EQ(logfile_uuids.size(), 2u);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700124 EXPECT_EQ(parts_uuids.size(), 8u);
Naman Guptaa63aa132023-03-22 20:06:34 -0700125
126 // And confirm everything is on the correct node.
127 EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
128 EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi1");
129 EXPECT_EQ(log_header[4].message().node()->name()->string_view(), "pi1");
130
131 EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
132 EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700133 EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700134
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700135 EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi1");
136 EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi1");
Naman Guptaa63aa132023-03-22 20:06:34 -0700137
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700138 EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi2");
139 EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700140
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700141 EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
142 EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
143 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700144
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700145 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi1");
146 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi1");
Naman Guptaa63aa132023-03-22 20:06:34 -0700147
148 // And the parts index matches.
149 EXPECT_EQ(log_header[2].message().parts_index(), 0);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700150
151 EXPECT_EQ(log_header[3].message().parts_index(), 0);
152 EXPECT_EQ(log_header[4].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700153
154 EXPECT_EQ(log_header[5].message().parts_index(), 0);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700155
156 EXPECT_EQ(log_header[6].message().parts_index(), 0);
157 EXPECT_EQ(log_header[7].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700158
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700159 EXPECT_EQ(log_header[8].message().parts_index(), 0);
160 EXPECT_EQ(log_header[9].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700161
162 EXPECT_EQ(log_header[10].message().parts_index(), 0);
163 EXPECT_EQ(log_header[11].message().parts_index(), 1);
164
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700165 EXPECT_EQ(log_header[12].message().parts_index(), 0);
166 EXPECT_EQ(log_header[13].message().parts_index(), 1);
167 EXPECT_EQ(log_header[14].message().parts_index(), 2);
Naman Guptaa63aa132023-03-22 20:06:34 -0700168
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700169 EXPECT_EQ(log_header[15].message().parts_index(), 0);
170 EXPECT_EQ(log_header[16].message().parts_index(), 1);
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700171
172 // And that the data_stored field is right.
173 EXPECT_THAT(*log_header[2].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700174 ::testing::ElementsAre(StoredDataType::DATA));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700175 EXPECT_THAT(*log_header[3].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700176 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700177 EXPECT_THAT(*log_header[4].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700178 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700179
180 EXPECT_THAT(*log_header[5].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700181 ::testing::ElementsAre(StoredDataType::DATA));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700182 EXPECT_THAT(*log_header[6].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700183 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700184 EXPECT_THAT(*log_header[7].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700185 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700186
187 EXPECT_THAT(*log_header[8].message().data_stored(),
188 ::testing::ElementsAre(StoredDataType::DATA));
189 EXPECT_THAT(*log_header[9].message().data_stored(),
190 ::testing::ElementsAre(StoredDataType::DATA));
191
192 EXPECT_THAT(*log_header[10].message().data_stored(),
193 ::testing::ElementsAre(StoredDataType::DATA));
194 EXPECT_THAT(*log_header[11].message().data_stored(),
195 ::testing::ElementsAre(StoredDataType::DATA));
196
197 EXPECT_THAT(*log_header[12].message().data_stored(),
198 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
199 EXPECT_THAT(*log_header[13].message().data_stored(),
200 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
201 EXPECT_THAT(*log_header[14].message().data_stored(),
202 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
203
204 EXPECT_THAT(*log_header[15].message().data_stored(),
205 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
206 EXPECT_THAT(*log_header[16].message().data_stored(),
207 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
Naman Guptaa63aa132023-03-22 20:06:34 -0700208 }
209
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700210 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
211 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -0700212 {
213 using ::testing::UnorderedElementsAre;
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700214 std::shared_ptr<const aos::Configuration> config = sorted_parts[0].config;
Naman Guptaa63aa132023-03-22 20:06:34 -0700215
216 // Timing reports, pings
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700217 if (shared()) {
218 EXPECT_THAT(
219 CountChannelsData(config, logfiles_[2]),
220 UnorderedElementsAre(
221 std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
222 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
223 200),
224 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
225 21),
226 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
227 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
228 std::make_tuple("/test", "aos.examples.Ping", 2001)))
229 << " : " << logfiles_[2];
230 } else {
231 EXPECT_THAT(
232 CountChannelsData(config, logfiles_[2]),
233 UnorderedElementsAre(
234 std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
235 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
236 200),
237 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
238 21),
239 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
240 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
241 std::make_tuple("/test", "aos.examples.Ping", 2001),
242 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
243 "aos-message_bridge-Timestamp",
244 "aos.message_bridge.RemoteMessage", 200)))
245 << " : " << logfiles_[2];
Naman Guptaa63aa132023-03-22 20:06:34 -0700246 }
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700247
248 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
249 ::testing::UnorderedElementsAre())
250 << " : " << logfiles_[3];
251 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
252 ::testing::UnorderedElementsAre())
253 << " : " << logfiles_[4];
254
Naman Guptaa63aa132023-03-22 20:06:34 -0700255 // Timestamps for pong
256 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
257 UnorderedElementsAre())
258 << " : " << logfiles_[2];
259 EXPECT_THAT(
260 CountChannelsTimestamp(config, logfiles_[3]),
261 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
262 << " : " << logfiles_[3];
263 EXPECT_THAT(
264 CountChannelsTimestamp(config, logfiles_[4]),
265 UnorderedElementsAre(
266 std::make_tuple("/test", "aos.examples.Pong", 2000),
267 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)))
268 << " : " << logfiles_[4];
269
Naman Guptaa63aa132023-03-22 20:06:34 -0700270 // Timing reports and pongs.
Naman Guptaa63aa132023-03-22 20:06:34 -0700271 EXPECT_THAT(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700272 CountChannelsData(config, logfiles_[5]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700273 UnorderedElementsAre(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700274 std::make_tuple("/pi2/aos", "aos.examples.Ping", 2001),
Naman Guptaa63aa132023-03-22 20:06:34 -0700275 std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
276 200),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700277 std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
278 21),
279 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700280 std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700281 std::make_tuple("/test", "aos.examples.Pong", 2001)))
282 << " : " << logfiles_[5];
283 EXPECT_THAT(CountChannelsData(config, logfiles_[6]), UnorderedElementsAre())
284 << " : " << logfiles_[6];
285 EXPECT_THAT(CountChannelsData(config, logfiles_[7]), UnorderedElementsAre())
Naman Guptaa63aa132023-03-22 20:06:34 -0700286 << " : " << logfiles_[7];
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700287 // And ping timestamps.
288 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
289 UnorderedElementsAre())
290 << " : " << logfiles_[5];
Naman Guptaa63aa132023-03-22 20:06:34 -0700291 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700292 CountChannelsTimestamp(config, logfiles_[6]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700293 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700294 << " : " << logfiles_[6];
Naman Guptaa63aa132023-03-22 20:06:34 -0700295 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700296 CountChannelsTimestamp(config, logfiles_[7]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700297 UnorderedElementsAre(
298 std::make_tuple("/test", "aos.examples.Ping", 2000),
299 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700300 << " : " << logfiles_[7];
Naman Guptaa63aa132023-03-22 20:06:34 -0700301
302 // And then test that the remotely logged timestamp data files only have
303 // timestamps in them.
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700304 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[8]),
305 UnorderedElementsAre())
306 << " : " << logfiles_[8];
307 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[9]),
308 UnorderedElementsAre())
309 << " : " << logfiles_[9];
Naman Guptaa63aa132023-03-22 20:06:34 -0700310 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
311 UnorderedElementsAre())
312 << " : " << logfiles_[10];
313 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
314 UnorderedElementsAre())
315 << " : " << logfiles_[11];
Naman Guptaa63aa132023-03-22 20:06:34 -0700316
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700317 EXPECT_THAT(CountChannelsData(config, logfiles_[8]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700318 UnorderedElementsAre(std::make_tuple(
319 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700320 << " : " << logfiles_[8];
321 EXPECT_THAT(CountChannelsData(config, logfiles_[9]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700322 UnorderedElementsAre(std::make_tuple(
323 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700324 << " : " << logfiles_[9];
Naman Guptaa63aa132023-03-22 20:06:34 -0700325
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700326 // Pong snd timestamp data.
327 EXPECT_THAT(
328 CountChannelsData(config, logfiles_[10]),
329 UnorderedElementsAre(
330 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 9),
331 std::make_tuple("/test", "aos.examples.Pong", 91)))
332 << " : " << logfiles_[10];
333 EXPECT_THAT(
334 CountChannelsData(config, logfiles_[11]),
335 UnorderedElementsAre(
336 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 191),
337 std::make_tuple("/test", "aos.examples.Pong", 1910)))
338 << " : " << logfiles_[11];
Naman Guptaa63aa132023-03-22 20:06:34 -0700339
340 // Timestamps from pi2 on pi1, and the other way.
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700341 // if (shared()) {
342 EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
343 UnorderedElementsAre())
344 << " : " << logfiles_[12];
345 EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
346 UnorderedElementsAre())
347 << " : " << logfiles_[13];
348 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
349 UnorderedElementsAre())
350 << " : " << logfiles_[14];
351 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
352 UnorderedElementsAre())
353 << " : " << logfiles_[15];
354 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
355 UnorderedElementsAre())
356 << " : " << logfiles_[16];
Naman Guptaa63aa132023-03-22 20:06:34 -0700357
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700358 EXPECT_THAT(
359 CountChannelsTimestamp(config, logfiles_[12]),
360 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
361 << " : " << logfiles_[12];
362 EXPECT_THAT(
363 CountChannelsTimestamp(config, logfiles_[13]),
364 UnorderedElementsAre(
365 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 9),
366 std::make_tuple("/test", "aos.examples.Ping", 90)))
367 << " : " << logfiles_[13];
368 EXPECT_THAT(
369 CountChannelsTimestamp(config, logfiles_[14]),
370 UnorderedElementsAre(
371 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 191),
372 std::make_tuple("/test", "aos.examples.Ping", 1910)))
373 << " : " << logfiles_[14];
374 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
375 UnorderedElementsAre(std::make_tuple(
376 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
377 << " : " << logfiles_[15];
378 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[16]),
379 UnorderedElementsAre(std::make_tuple(
380 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
381 << " : " << logfiles_[16];
Naman Guptaa63aa132023-03-22 20:06:34 -0700382 }
383
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700384 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700385
386 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
387 log_reader_factory.set_send_delay(chrono::microseconds(0));
388
389 // This sends out the fetched messages and advances time to the start of the
390 // log file.
391 reader.Register(&log_reader_factory);
392
393 const Node *pi1 =
394 configuration::GetNode(log_reader_factory.configuration(), "pi1");
395 const Node *pi2 =
396 configuration::GetNode(log_reader_factory.configuration(), "pi2");
397
398 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
399 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
400 LOG(INFO) << "now pi1 "
401 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
402 LOG(INFO) << "now pi2 "
403 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
404
405 EXPECT_THAT(reader.LoggedNodes(),
406 ::testing::ElementsAre(
407 configuration::GetNode(reader.logged_configuration(), pi1),
408 configuration::GetNode(reader.logged_configuration(), pi2)));
409
410 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
411
412 std::unique_ptr<EventLoop> pi1_event_loop =
413 log_reader_factory.MakeEventLoop("test", pi1);
414 std::unique_ptr<EventLoop> pi2_event_loop =
415 log_reader_factory.MakeEventLoop("test", pi2);
416
417 int pi1_ping_count = 10;
418 int pi2_ping_count = 10;
419 int pi1_pong_count = 10;
420 int pi2_pong_count = 10;
421
422 // Confirm that the ping value matches.
423 pi1_event_loop->MakeWatcher(
424 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
425 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
426 << pi1_event_loop->context().monotonic_remote_time << " -> "
427 << pi1_event_loop->context().monotonic_event_time;
428 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
429 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
430 pi1_ping_count * chrono::milliseconds(10) +
431 monotonic_clock::epoch());
432 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
433 pi1_ping_count * chrono::milliseconds(10) +
434 realtime_clock::epoch());
435 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
436 pi1_event_loop->context().monotonic_event_time);
437 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
438 pi1_event_loop->context().realtime_event_time);
439
440 ++pi1_ping_count;
441 });
442 pi2_event_loop->MakeWatcher(
443 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
444 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
445 << pi2_event_loop->context().monotonic_remote_time << " -> "
446 << pi2_event_loop->context().monotonic_event_time;
447 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
448
449 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
450 pi2_ping_count * chrono::milliseconds(10) +
451 monotonic_clock::epoch());
452 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
453 pi2_ping_count * chrono::milliseconds(10) +
454 realtime_clock::epoch());
455 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time +
456 chrono::microseconds(150),
457 pi2_event_loop->context().monotonic_event_time);
458 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time +
459 chrono::microseconds(150),
460 pi2_event_loop->context().realtime_event_time);
461 ++pi2_ping_count;
462 });
463
464 constexpr ssize_t kQueueIndexOffset = -9;
465 // Confirm that the ping and pong counts both match, and the value also
466 // matches.
467 pi1_event_loop->MakeWatcher(
468 "/test", [&pi1_event_loop, &pi1_ping_count,
469 &pi1_pong_count](const examples::Pong &pong) {
470 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
471 << pi1_event_loop->context().monotonic_remote_time << " -> "
472 << pi1_event_loop->context().monotonic_event_time;
473
474 EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
475 pi1_pong_count + kQueueIndexOffset);
476 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
477 chrono::microseconds(200) +
478 pi1_pong_count * chrono::milliseconds(10) +
479 monotonic_clock::epoch());
480 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
481 chrono::microseconds(200) +
482 pi1_pong_count * chrono::milliseconds(10) +
483 realtime_clock::epoch());
484
485 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
486 chrono::microseconds(150),
487 pi1_event_loop->context().monotonic_event_time);
488 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
489 chrono::microseconds(150),
490 pi1_event_loop->context().realtime_event_time);
491
492 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
493 ++pi1_pong_count;
494 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
495 });
496 pi2_event_loop->MakeWatcher(
497 "/test", [&pi2_event_loop, &pi2_ping_count,
498 &pi2_pong_count](const examples::Pong &pong) {
499 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
500 << pi2_event_loop->context().monotonic_remote_time << " -> "
501 << pi2_event_loop->context().monotonic_event_time;
502
503 EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
504 pi2_pong_count + kQueueIndexOffset);
505
506 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
507 chrono::microseconds(200) +
508 pi2_pong_count * chrono::milliseconds(10) +
509 monotonic_clock::epoch());
510 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
511 chrono::microseconds(200) +
512 pi2_pong_count * chrono::milliseconds(10) +
513 realtime_clock::epoch());
514
515 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
516 pi2_event_loop->context().monotonic_event_time);
517 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
518 pi2_event_loop->context().realtime_event_time);
519
520 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
521 ++pi2_pong_count;
522 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
523 });
524
525 log_reader_factory.Run();
526 EXPECT_EQ(pi1_ping_count, 2010);
527 EXPECT_EQ(pi2_ping_count, 2010);
528 EXPECT_EQ(pi1_pong_count, 2010);
529 EXPECT_EQ(pi2_pong_count, 2010);
530
531 reader.Deregister();
532}
533
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600534// MultinodeLoggerTest that tests the mutate callback works across multiple
535// nodes with remapping
536TEST_P(MultinodeLoggerTest, MultiNodeRemapMutateCallback) {
537 time_converter_.StartEqual();
538 std::vector<std::string> actual_filenames;
539
540 {
541 LoggerState pi1_logger = MakeLogger(pi1_);
542 LoggerState pi2_logger = MakeLogger(pi2_);
543
544 event_loop_factory_.RunFor(chrono::milliseconds(95));
545
546 StartLogger(&pi1_logger);
547 StartLogger(&pi2_logger);
548
549 event_loop_factory_.RunFor(chrono::milliseconds(20000));
550 pi1_logger.AppendAllFilenames(&actual_filenames);
551 pi2_logger.AppendAllFilenames(&actual_filenames);
552 }
553
Austin Schuh8fb4b452023-08-04 17:02:27 -0700554 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700555 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600556
557 LogReader reader(sorted_parts, &config_.message());
558 // Remap just on pi1.
559 reader.RemapLoggedChannel<examples::Pong>(
560 "/test", configuration::GetNode(reader.configuration(), "pi1"));
561
562 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
563
564 int pong_count = 0;
565 // Adds a callback which mutates the value of the pong message before the
566 // message is sent which is the feature we are testing here
567 reader.AddBeforeSendCallback("/test",
568 [&pong_count](aos::examples::Pong *pong) {
569 pong->mutate_value(pong->value() + 1);
570 pong_count = pong->value();
571 });
572
573 // This sends out the fetched messages and advances time to the start of the
574 // log file.
575 reader.Register(&log_reader_factory);
576
577 const Node *pi1 =
578 configuration::GetNode(log_reader_factory.configuration(), "pi1");
579 const Node *pi2 =
580 configuration::GetNode(log_reader_factory.configuration(), "pi2");
581
582 EXPECT_THAT(reader.LoggedNodes(),
583 ::testing::ElementsAre(
584 configuration::GetNode(reader.logged_configuration(), pi1),
585 configuration::GetNode(reader.logged_configuration(), pi2)));
586
587 std::unique_ptr<EventLoop> pi1_event_loop =
588 log_reader_factory.MakeEventLoop("test", pi1);
589 std::unique_ptr<EventLoop> pi2_event_loop =
590 log_reader_factory.MakeEventLoop("test", pi2);
591
592 pi1_event_loop->MakeWatcher("/original/test",
593 [&pong_count](const examples::Pong &pong) {
594 EXPECT_EQ(pong_count, pong.value());
595 });
596
597 pi2_event_loop->MakeWatcher("/test",
598 [&pong_count](const examples::Pong &pong) {
599 EXPECT_EQ(pong_count, pong.value());
600 });
601
602 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
603 reader.Deregister();
604
605 EXPECT_EQ(pong_count, 2011);
606}
607
608// MultinodeLoggerTest that tests the mutate callback works across multiple
609// nodes
610TEST_P(MultinodeLoggerTest, MultiNodeMutateCallback) {
611 time_converter_.StartEqual();
612 std::vector<std::string> actual_filenames;
613
614 {
615 LoggerState pi1_logger = MakeLogger(pi1_);
616 LoggerState pi2_logger = MakeLogger(pi2_);
617
618 event_loop_factory_.RunFor(chrono::milliseconds(95));
619
620 StartLogger(&pi1_logger);
621 StartLogger(&pi2_logger);
622
623 event_loop_factory_.RunFor(chrono::milliseconds(20000));
624 pi1_logger.AppendAllFilenames(&actual_filenames);
625 pi2_logger.AppendAllFilenames(&actual_filenames);
626 }
627
Austin Schuh8fb4b452023-08-04 17:02:27 -0700628 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700629 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600630
631 LogReader reader(sorted_parts, &config_.message());
632
633 int pong_count = 0;
634 // Adds a callback which mutates the value of the pong message before the
635 // message is sent which is the feature we are testing here
636 reader.AddBeforeSendCallback("/test",
637 [&pong_count](aos::examples::Pong *pong) {
638 pong->mutate_value(pong->value() + 1);
639 pong_count = pong->value();
640 });
641
642 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
643
644 // This sends out the fetched messages and advances time to the start of the
645 // log file.
646 reader.Register(&log_reader_factory);
647
648 const Node *pi1 =
649 configuration::GetNode(log_reader_factory.configuration(), "pi1");
650 const Node *pi2 =
651 configuration::GetNode(log_reader_factory.configuration(), "pi2");
652
653 EXPECT_THAT(reader.LoggedNodes(),
654 ::testing::ElementsAre(
655 configuration::GetNode(reader.logged_configuration(), pi1),
656 configuration::GetNode(reader.logged_configuration(), pi2)));
657
658 std::unique_ptr<EventLoop> pi1_event_loop =
659 log_reader_factory.MakeEventLoop("test", pi1);
660 std::unique_ptr<EventLoop> pi2_event_loop =
661 log_reader_factory.MakeEventLoop("test", pi2);
662
663 pi1_event_loop->MakeWatcher("/test",
664 [&pong_count](const examples::Pong &pong) {
665 EXPECT_EQ(pong_count, pong.value());
666 });
667
668 pi2_event_loop->MakeWatcher("/test",
669 [&pong_count](const examples::Pong &pong) {
670 EXPECT_EQ(pong_count, pong.value());
671 });
672
673 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
674 reader.Deregister();
675
676 EXPECT_EQ(pong_count, 2011);
677}
678
679// Tests that the before send callback is only called from the sender node if it
680// is forwarded
681TEST_P(MultinodeLoggerTest, OnlyDoBeforeSendCallbackOnSenderNode) {
682 time_converter_.StartEqual();
Austin Schuh8fb4b452023-08-04 17:02:27 -0700683
684 std::vector<std::string> filenames;
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600685 {
686 LoggerState pi1_logger = MakeLogger(pi1_);
687 LoggerState pi2_logger = MakeLogger(pi2_);
688
689 event_loop_factory_.RunFor(chrono::milliseconds(95));
690
691 StartLogger(&pi1_logger);
692 StartLogger(&pi2_logger);
693
694 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -0700695
696 pi1_logger.AppendAllFilenames(&filenames);
697 pi2_logger.AppendAllFilenames(&filenames);
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600698 }
699
Austin Schuh8fb4b452023-08-04 17:02:27 -0700700 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700701 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
702 LogReader reader(sorted_parts);
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600703
704 int ping_count = 0;
705 // Adds a callback which mutates the value of the pong message before the
706 // message is sent which is the feature we are testing here
707 reader.AddBeforeSendCallback("/test",
708 [&ping_count](aos::examples::Ping *ping) {
709 ++ping_count;
710 ping->mutate_value(ping_count);
711 });
712
713 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
714 log_reader_factory.set_send_delay(chrono::microseconds(0));
715
716 reader.Register(&log_reader_factory);
717
718 const Node *pi1 =
719 configuration::GetNode(log_reader_factory.configuration(), "pi1");
720 const Node *pi2 =
721 configuration::GetNode(log_reader_factory.configuration(), "pi2");
722
723 std::unique_ptr<EventLoop> pi1_event_loop =
724 log_reader_factory.MakeEventLoop("test", pi1);
725 pi1_event_loop->SkipTimingReport();
726 std::unique_ptr<EventLoop> pi2_event_loop =
727 log_reader_factory.MakeEventLoop("test", pi2);
728 pi2_event_loop->SkipTimingReport();
729
730 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
731 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
732
733 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
734 pi1_ping_timestamp;
735 if (!shared()) {
736 pi1_ping_timestamp =
737 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
738 pi1_event_loop.get(),
739 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
740 }
741
742 log_reader_factory.Run();
743
744 EXPECT_EQ(pi1_ping.count(), 2000u);
745 EXPECT_EQ(pi2_ping.count(), 2000u);
746 // If the BeforeSendCallback is called on both nodes, then the ping count
747 // would be 4002 instead of 2001
748 EXPECT_EQ(ping_count, 2001u);
749 if (!shared()) {
750 EXPECT_EQ(pi1_ping_timestamp->count(), 2000u);
751 }
752
753 reader.Deregister();
754}
755
756// Tests that we do not allow adding callbacks after Register is called
757TEST_P(MultinodeLoggerDeathTest, AddCallbackAfterRegister) {
758 time_converter_.StartEqual();
759 std::vector<std::string> actual_filenames;
760
761 {
762 LoggerState pi1_logger = MakeLogger(pi1_);
763 LoggerState pi2_logger = MakeLogger(pi2_);
764
765 event_loop_factory_.RunFor(chrono::milliseconds(95));
766
767 StartLogger(&pi1_logger);
768 StartLogger(&pi2_logger);
769
770 event_loop_factory_.RunFor(chrono::milliseconds(20000));
771 pi1_logger.AppendAllFilenames(&actual_filenames);
772 pi2_logger.AppendAllFilenames(&actual_filenames);
773 }
774
Austin Schuh8fb4b452023-08-04 17:02:27 -0700775 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700776 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600777
778 LogReader reader(sorted_parts, &config_.message());
779 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
780 reader.Register(&log_reader_factory);
781 EXPECT_DEATH(
782 {
783 reader.AddBeforeSendCallback("/test", [](aos::examples::Pong *) {
784 LOG(FATAL) << "This should not be called";
785 });
786 },
787 "Cannot add callbacks after calling Register");
788 reader.Deregister();
789}
790
Naman Guptaa63aa132023-03-22 20:06:34 -0700791// Test that if we feed the replay with a mismatched node list that we die on
792// the LogReader constructor.
793TEST_P(MultinodeLoggerDeathTest, MultiNodeBadReplayConfig) {
794 time_converter_.StartEqual();
Austin Schuh8fb4b452023-08-04 17:02:27 -0700795
796 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -0700797 {
798 LoggerState pi1_logger = MakeLogger(pi1_);
799 LoggerState pi2_logger = MakeLogger(pi2_);
800
801 event_loop_factory_.RunFor(chrono::milliseconds(95));
802
803 StartLogger(&pi1_logger);
804 StartLogger(&pi2_logger);
805
806 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -0700807
808 pi1_logger.AppendAllFilenames(&filenames);
809 pi2_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -0700810 }
811
812 // Test that, if we add an additional node to the replay config that the
813 // logger complains about the mismatch in number of nodes.
814 FlatbufferDetachedBuffer<Configuration> extra_nodes_config =
815 configuration::MergeWithConfig(&config_.message(), R"({
816 "nodes": [
817 {
818 "name": "extra-node"
819 }
820 ]
821 }
822 )");
823
Austin Schuh8fb4b452023-08-04 17:02:27 -0700824 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700825 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -0700826 EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
827 "Log file and replay config need to have matching nodes lists.");
828}
829
830// Tests that we can read log files where they don't start at the same monotonic
831// time.
832TEST_P(MultinodeLoggerTest, StaggeredStart) {
833 time_converter_.StartEqual();
834 std::vector<std::string> actual_filenames;
835
836 {
837 LoggerState pi1_logger = MakeLogger(pi1_);
838 LoggerState pi2_logger = MakeLogger(pi2_);
839
840 event_loop_factory_.RunFor(chrono::milliseconds(95));
841
842 StartLogger(&pi1_logger);
843
844 event_loop_factory_.RunFor(chrono::milliseconds(200));
845
846 StartLogger(&pi2_logger);
847
848 event_loop_factory_.RunFor(chrono::milliseconds(20000));
849 pi1_logger.AppendAllFilenames(&actual_filenames);
850 pi2_logger.AppendAllFilenames(&actual_filenames);
851 }
852
853 // Since we delay starting pi2, it already knows about all the timestamps so
854 // we don't end up with extra parts.
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700855 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
856 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
857 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700858
859 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
860 log_reader_factory.set_send_delay(chrono::microseconds(0));
861
862 // This sends out the fetched messages and advances time to the start of the
863 // log file.
864 reader.Register(&log_reader_factory);
865
866 const Node *pi1 =
867 configuration::GetNode(log_reader_factory.configuration(), "pi1");
868 const Node *pi2 =
869 configuration::GetNode(log_reader_factory.configuration(), "pi2");
870
871 EXPECT_THAT(reader.LoggedNodes(),
872 ::testing::ElementsAre(
873 configuration::GetNode(reader.logged_configuration(), pi1),
874 configuration::GetNode(reader.logged_configuration(), pi2)));
875
876 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
877
878 std::unique_ptr<EventLoop> pi1_event_loop =
879 log_reader_factory.MakeEventLoop("test", pi1);
880 std::unique_ptr<EventLoop> pi2_event_loop =
881 log_reader_factory.MakeEventLoop("test", pi2);
882
883 int pi1_ping_count = 30;
884 int pi2_ping_count = 30;
885 int pi1_pong_count = 30;
886 int pi2_pong_count = 30;
887
888 // Confirm that the ping value matches.
889 pi1_event_loop->MakeWatcher(
890 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
891 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
892 << pi1_event_loop->context().monotonic_remote_time << " -> "
893 << pi1_event_loop->context().monotonic_event_time;
894 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
895
896 ++pi1_ping_count;
897 });
898 pi2_event_loop->MakeWatcher(
899 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
900 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
901 << pi2_event_loop->context().monotonic_remote_time << " -> "
902 << pi2_event_loop->context().monotonic_event_time;
903 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
904
905 ++pi2_ping_count;
906 });
907
908 // Confirm that the ping and pong counts both match, and the value also
909 // matches.
910 pi1_event_loop->MakeWatcher(
911 "/test", [&pi1_event_loop, &pi1_ping_count,
912 &pi1_pong_count](const examples::Pong &pong) {
913 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
914 << pi1_event_loop->context().monotonic_remote_time << " -> "
915 << pi1_event_loop->context().monotonic_event_time;
916
917 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
918 ++pi1_pong_count;
919 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
920 });
921 pi2_event_loop->MakeWatcher(
922 "/test", [&pi2_event_loop, &pi2_ping_count,
923 &pi2_pong_count](const examples::Pong &pong) {
924 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
925 << pi2_event_loop->context().monotonic_remote_time << " -> "
926 << pi2_event_loop->context().monotonic_event_time;
927
928 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
929 ++pi2_pong_count;
930 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
931 });
932
933 log_reader_factory.Run();
934 EXPECT_EQ(pi1_ping_count, 2030);
935 EXPECT_EQ(pi2_ping_count, 2030);
936 EXPECT_EQ(pi1_pong_count, 2030);
937 EXPECT_EQ(pi2_pong_count, 2030);
938
939 reader.Deregister();
940}
941
942// Tests that we can read log files where the monotonic clocks drift and don't
943// match correctly. While we are here, also test that different ending times
944// also is readable.
945TEST_P(MultinodeLoggerTest, MismatchedClocks) {
946 // TODO(austin): Negate...
947 const chrono::nanoseconds initial_pi2_offset = chrono::seconds(1000);
948
949 time_converter_.AddMonotonic(
950 {BootTimestamp::epoch(), BootTimestamp::epoch() + initial_pi2_offset});
951 // Wait for 95 ms, (~0.1 seconds - 1/2 of the ping/pong period), and set the
952 // skew to be 200 uS/s
953 const chrono::nanoseconds startup_sleep1 = time_converter_.AddMonotonic(
954 {chrono::milliseconds(95),
955 chrono::milliseconds(95) - chrono::nanoseconds(200) * 95});
956 // Run another 200 ms to have one logger start first.
957 const chrono::nanoseconds startup_sleep2 = time_converter_.AddMonotonic(
958 {chrono::milliseconds(200), chrono::milliseconds(200)});
959 // Slew one way then the other at the same 200 uS/S slew rate. Make sure we
960 // go far enough to cause problems if this isn't accounted for.
961 const chrono::nanoseconds logger_run1 = time_converter_.AddMonotonic(
962 {chrono::milliseconds(20000),
963 chrono::milliseconds(20000) - chrono::nanoseconds(200) * 20000});
964 const chrono::nanoseconds logger_run2 = time_converter_.AddMonotonic(
965 {chrono::milliseconds(40000),
966 chrono::milliseconds(40000) + chrono::nanoseconds(200) * 40000});
967 const chrono::nanoseconds logger_run3 = time_converter_.AddMonotonic(
968 {chrono::milliseconds(400), chrono::milliseconds(400)});
969
Austin Schuh8fb4b452023-08-04 17:02:27 -0700970 std::vector<std::string> actual_filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -0700971 {
972 LoggerState pi2_logger = MakeLogger(pi2_);
973
974 LOG(INFO) << "pi2 times: " << pi2_->monotonic_now() << " "
975 << pi2_->realtime_now() << " distributed "
976 << pi2_->ToDistributedClock(pi2_->monotonic_now());
977
978 LOG(INFO) << "pi2_ times: " << pi2_->monotonic_now() << " "
979 << pi2_->realtime_now() << " distributed "
980 << pi2_->ToDistributedClock(pi2_->monotonic_now());
981
982 event_loop_factory_.RunFor(startup_sleep1);
983
984 StartLogger(&pi2_logger);
985
986 event_loop_factory_.RunFor(startup_sleep2);
987
988 {
989 // Run pi1's logger for only part of the time.
990 LoggerState pi1_logger = MakeLogger(pi1_);
991
992 StartLogger(&pi1_logger);
993 event_loop_factory_.RunFor(logger_run1);
994
995 // Make sure we slewed time far enough so that the difference is greater
996 // than the network delay. This confirms that if we sort incorrectly, it
997 // would show in the results.
998 EXPECT_LT(
999 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
1000 -event_loop_factory_.send_delay() -
1001 event_loop_factory_.network_delay());
1002
1003 event_loop_factory_.RunFor(logger_run2);
1004
1005 // And now check that we went far enough the other way to make sure we
1006 // cover both problems.
1007 EXPECT_GT(
1008 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
1009 event_loop_factory_.send_delay() +
1010 event_loop_factory_.network_delay());
Austin Schuh8fb4b452023-08-04 17:02:27 -07001011
1012 pi1_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001013 }
1014
1015 // And log a bit more on pi2.
1016 event_loop_factory_.RunFor(logger_run3);
Austin Schuh8fb4b452023-08-04 17:02:27 -07001017
1018 pi2_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001019 }
1020
Austin Schuh8fb4b452023-08-04 17:02:27 -07001021 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001022 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1023 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001024
1025 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1026 log_reader_factory.set_send_delay(chrono::microseconds(0));
1027
1028 const Node *pi1 =
1029 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1030 const Node *pi2 =
1031 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1032
1033 // This sends out the fetched messages and advances time to the start of the
1034 // log file.
1035 reader.Register(&log_reader_factory);
1036
1037 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1038 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1039 LOG(INFO) << "now pi1 "
1040 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1041 LOG(INFO) << "now pi2 "
1042 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1043
1044 LOG(INFO) << "Done registering (pi1) "
1045 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
1046 << " "
1047 << log_reader_factory.GetNodeEventLoopFactory(pi1)->realtime_now();
1048 LOG(INFO) << "Done registering (pi2) "
1049 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now()
1050 << " "
1051 << log_reader_factory.GetNodeEventLoopFactory(pi2)->realtime_now();
1052
1053 EXPECT_THAT(reader.LoggedNodes(),
1054 ::testing::ElementsAre(
1055 configuration::GetNode(reader.logged_configuration(), pi1),
1056 configuration::GetNode(reader.logged_configuration(), pi2)));
1057
1058 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1059
1060 std::unique_ptr<EventLoop> pi1_event_loop =
1061 log_reader_factory.MakeEventLoop("test", pi1);
1062 std::unique_ptr<EventLoop> pi2_event_loop =
1063 log_reader_factory.MakeEventLoop("test", pi2);
1064
1065 int pi1_ping_count = 30;
1066 int pi2_ping_count = 30;
1067 int pi1_pong_count = 30;
1068 int pi2_pong_count = 30;
1069
1070 // Confirm that the ping value matches.
1071 pi1_event_loop->MakeWatcher(
1072 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
1073 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
1074 << pi1_event_loop->context().monotonic_remote_time << " -> "
1075 << pi1_event_loop->context().monotonic_event_time;
1076 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
1077
1078 ++pi1_ping_count;
1079 });
1080 pi2_event_loop->MakeWatcher(
1081 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
1082 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
1083 << pi2_event_loop->context().monotonic_remote_time << " -> "
1084 << pi2_event_loop->context().monotonic_event_time;
1085 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
1086
1087 ++pi2_ping_count;
1088 });
1089
1090 // Confirm that the ping and pong counts both match, and the value also
1091 // matches.
1092 pi1_event_loop->MakeWatcher(
1093 "/test", [&pi1_event_loop, &pi1_ping_count,
1094 &pi1_pong_count](const examples::Pong &pong) {
1095 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
1096 << pi1_event_loop->context().monotonic_remote_time << " -> "
1097 << pi1_event_loop->context().monotonic_event_time;
1098
1099 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
1100 ++pi1_pong_count;
1101 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
1102 });
1103 pi2_event_loop->MakeWatcher(
1104 "/test", [&pi2_event_loop, &pi2_ping_count,
1105 &pi2_pong_count](const examples::Pong &pong) {
1106 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
1107 << pi2_event_loop->context().monotonic_remote_time << " -> "
1108 << pi2_event_loop->context().monotonic_event_time;
1109
1110 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
1111 ++pi2_pong_count;
1112 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
1113 });
1114
1115 log_reader_factory.Run();
1116 EXPECT_EQ(pi1_ping_count, 6030);
1117 EXPECT_EQ(pi2_ping_count, 6030);
1118 EXPECT_EQ(pi1_pong_count, 6030);
1119 EXPECT_EQ(pi2_pong_count, 6030);
1120
1121 reader.Deregister();
1122}
1123
1124// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
1125TEST_P(MultinodeLoggerTest, SortParts) {
1126 time_converter_.StartEqual();
1127 // Make a bunch of parts.
1128 {
1129 LoggerState pi1_logger = MakeLogger(pi1_);
1130 LoggerState pi2_logger = MakeLogger(pi2_);
1131
1132 event_loop_factory_.RunFor(chrono::milliseconds(95));
1133
1134 StartLogger(&pi1_logger);
1135 StartLogger(&pi2_logger);
1136
1137 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1138 }
1139
1140 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1141 VerifyParts(sorted_parts);
1142}
1143
1144// Tests that we can sort a bunch of parts with an empty part. We should ignore
1145// it and remove it from the sorted list.
1146TEST_P(MultinodeLoggerTest, SortEmptyParts) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07001147 std::vector<std::string> actual_filenames;
1148
Naman Guptaa63aa132023-03-22 20:06:34 -07001149 time_converter_.StartEqual();
1150 // Make a bunch of parts.
1151 {
1152 LoggerState pi1_logger = MakeLogger(pi1_);
1153 LoggerState pi2_logger = MakeLogger(pi2_);
1154
1155 event_loop_factory_.RunFor(chrono::milliseconds(95));
1156
1157 StartLogger(&pi1_logger);
1158 StartLogger(&pi2_logger);
1159
1160 event_loop_factory_.RunFor(chrono::milliseconds(2000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07001161 pi1_logger.AppendAllFilenames(&actual_filenames);
1162 pi2_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001163 }
1164
1165 // TODO(austin): Should we flip out if the file can't open?
1166 const std::string kEmptyFile("foobarinvalidfiledoesnotexist" + Extension());
1167
1168 aos::util::WriteStringToFileOrDie(kEmptyFile, "");
Austin Schuh8fb4b452023-08-04 17:02:27 -07001169 actual_filenames.emplace_back(kEmptyFile);
Naman Guptaa63aa132023-03-22 20:06:34 -07001170
Austin Schuh8fb4b452023-08-04 17:02:27 -07001171 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001172 VerifyParts(sorted_parts, {kEmptyFile});
1173}
1174
1175// Tests that we can sort a bunch of parts with the end missing off a
1176// file. We should use the part we can read.
1177TEST_P(MultinodeLoggerTest, SortTruncatedParts) {
Austin Schuh6ecfe902023-08-04 22:44:37 -07001178 if (file_strategy() == FileStrategy::kCombine) {
1179 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
1180 }
1181
Naman Guptaa63aa132023-03-22 20:06:34 -07001182 std::vector<std::string> actual_filenames;
1183 time_converter_.StartEqual();
1184 // Make a bunch of parts.
1185 {
1186 LoggerState pi1_logger = MakeLogger(pi1_);
1187 LoggerState pi2_logger = MakeLogger(pi2_);
1188
1189 event_loop_factory_.RunFor(chrono::milliseconds(95));
1190
1191 StartLogger(&pi1_logger);
1192 StartLogger(&pi2_logger);
1193
1194 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1195
1196 pi1_logger.AppendAllFilenames(&actual_filenames);
1197 pi2_logger.AppendAllFilenames(&actual_filenames);
1198 }
1199
1200 ASSERT_THAT(actual_filenames,
1201 ::testing::UnorderedElementsAreArray(logfiles_));
1202
1203 // Strip off the end of one of the files. Pick one with a lot of data.
1204 // For snappy, needs to have enough data to be >1 chunk of compressed data so
1205 // that we don't corrupt the entire log part.
1206 ::std::string compressed_contents =
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07001207 aos::util::ReadFileToStringOrDie(logfiles_[2]);
Naman Guptaa63aa132023-03-22 20:06:34 -07001208
1209 aos::util::WriteStringToFileOrDie(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07001210 logfiles_[2],
Naman Guptaa63aa132023-03-22 20:06:34 -07001211 compressed_contents.substr(0, compressed_contents.size() - 100));
1212
1213 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1214 VerifyParts(sorted_parts);
1215}
1216
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001217// Tests that if we remap a logged channel, it shows up correctly.
Naman Guptaa63aa132023-03-22 20:06:34 -07001218TEST_P(MultinodeLoggerTest, RemapLoggedChannel) {
1219 time_converter_.StartEqual();
Austin Schuh8fb4b452023-08-04 17:02:27 -07001220
1221 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07001222 {
1223 LoggerState pi1_logger = MakeLogger(pi1_);
1224 LoggerState pi2_logger = MakeLogger(pi2_);
1225
1226 event_loop_factory_.RunFor(chrono::milliseconds(95));
1227
1228 StartLogger(&pi1_logger);
1229 StartLogger(&pi2_logger);
1230
1231 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07001232
1233 pi1_logger.AppendAllFilenames(&filenames);
1234 pi2_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001235 }
1236
Austin Schuh8fb4b452023-08-04 17:02:27 -07001237 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001238 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1239 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001240
1241 // Remap just on pi1.
1242 reader.RemapLoggedChannel<aos::timing::Report>(
1243 "/aos", configuration::GetNode(reader.configuration(), "pi1"));
1244
1245 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1246 log_reader_factory.set_send_delay(chrono::microseconds(0));
1247
1248 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1249 // Note: An extra channel gets remapped automatically due to a timestamp
1250 // channel being LOCAL_LOGGER'd.
1251 ASSERT_EQ(remapped_channels.size(), std::get<0>(GetParam()).shared ? 1u : 2u);
1252 EXPECT_EQ(remapped_channels[0]->name()->string_view(), "/original/pi1/aos");
1253 EXPECT_EQ(remapped_channels[0]->type()->string_view(), "aos.timing.Report");
1254 if (!std::get<0>(GetParam()).shared) {
1255 EXPECT_EQ(remapped_channels[1]->name()->string_view(),
1256 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1257 "aos-message_bridge-Timestamp");
1258 EXPECT_EQ(remapped_channels[1]->type()->string_view(),
1259 "aos.message_bridge.RemoteMessage");
1260 }
1261
1262 reader.Register(&log_reader_factory);
1263
1264 const Node *pi1 =
1265 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1266 const Node *pi2 =
1267 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1268
1269 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1270 // else should have moved.
1271 std::unique_ptr<EventLoop> pi1_event_loop =
1272 log_reader_factory.MakeEventLoop("test", pi1);
1273 pi1_event_loop->SkipTimingReport();
1274 std::unique_ptr<EventLoop> full_pi1_event_loop =
1275 log_reader_factory.MakeEventLoop("test", pi1);
1276 full_pi1_event_loop->SkipTimingReport();
1277 std::unique_ptr<EventLoop> pi2_event_loop =
1278 log_reader_factory.MakeEventLoop("test", pi2);
1279 pi2_event_loop->SkipTimingReport();
1280
1281 MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
1282 "/aos");
1283 MessageCounter<aos::timing::Report> full_pi1_timing_report(
1284 full_pi1_event_loop.get(), "/pi1/aos");
1285 MessageCounter<aos::timing::Report> pi1_original_timing_report(
1286 pi1_event_loop.get(), "/original/aos");
1287 MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
1288 full_pi1_event_loop.get(), "/original/pi1/aos");
1289 MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
1290 "/aos");
1291
1292 log_reader_factory.Run();
1293
1294 EXPECT_EQ(pi1_timing_report.count(), 0u);
1295 EXPECT_EQ(full_pi1_timing_report.count(), 0u);
1296 EXPECT_NE(pi1_original_timing_report.count(), 0u);
1297 EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
1298 EXPECT_NE(pi2_timing_report.count(), 0u);
1299
1300 reader.Deregister();
1301}
1302
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001303// Tests that if we rename a logged channel, it shows up correctly.
1304TEST_P(MultinodeLoggerTest, RenameLoggedChannel) {
1305 std::vector<std::string> actual_filenames;
1306 time_converter_.StartEqual();
1307 {
1308 LoggerState pi1_logger = MakeLogger(pi1_);
1309 LoggerState pi2_logger = MakeLogger(pi2_);
1310
1311 event_loop_factory_.RunFor(chrono::milliseconds(95));
1312
1313 StartLogger(&pi1_logger);
1314 StartLogger(&pi2_logger);
1315
1316 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1317
1318 pi1_logger.AppendAllFilenames(&actual_filenames);
1319 pi2_logger.AppendAllFilenames(&actual_filenames);
1320 }
1321
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001322 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1323 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1324 LogReader reader(sorted_parts);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001325
1326 // Rename just on pi2. Add some global maps just to verify they get added in
1327 // the config and used correctly.
1328 std::vector<MapT> maps;
1329 {
1330 MapT map;
1331 map.match = std::make_unique<ChannelT>();
1332 map.match->name = "/foo*";
1333 map.match->source_node = "pi1";
1334 map.rename = std::make_unique<ChannelT>();
1335 map.rename->name = "/pi1/foo";
1336 maps.emplace_back(std::move(map));
1337 }
1338 {
1339 MapT map;
1340 map.match = std::make_unique<ChannelT>();
1341 map.match->name = "/foo*";
1342 map.match->source_node = "pi2";
1343 map.rename = std::make_unique<ChannelT>();
1344 map.rename->name = "/pi2/foo";
1345 maps.emplace_back(std::move(map));
1346 }
1347 {
1348 MapT map;
1349 map.match = std::make_unique<ChannelT>();
1350 map.match->name = "/foo";
1351 map.match->type = "aos.examples.Ping";
1352 map.rename = std::make_unique<ChannelT>();
1353 map.rename->name = "/foo/renamed";
1354 maps.emplace_back(std::move(map));
1355 }
1356 reader.RenameLoggedChannel<aos::examples::Ping>(
1357 "/aos", configuration::GetNode(reader.configuration(), "pi2"),
1358 "/pi2/foo/renamed", maps);
1359
1360 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1361 log_reader_factory.set_send_delay(chrono::microseconds(0));
1362
1363 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1364 // Note: An extra channel gets remapped automatically due to a timestamp
1365 // channel being LOCAL_LOGGER'd.
1366 const bool shared = std::get<0>(GetParam()).shared;
1367 ASSERT_EQ(remapped_channels.size(), shared ? 1u : 2u);
1368 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->name()->string_view(),
1369 "/pi2/foo/renamed");
1370 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->type()->string_view(),
1371 "aos.examples.Ping");
1372 if (!shared) {
1373 EXPECT_EQ(remapped_channels[0]->name()->string_view(),
1374 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1375 "aos-message_bridge-Timestamp");
1376 EXPECT_EQ(remapped_channels[0]->type()->string_view(),
1377 "aos.message_bridge.RemoteMessage");
1378 }
1379
1380 reader.Register(&log_reader_factory);
1381
1382 const Node *pi1 =
1383 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1384 const Node *pi2 =
1385 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1386
1387 // Confirm we can read the data on the renamed channel, just for pi2. Nothing
1388 // else should have moved.
1389 std::unique_ptr<EventLoop> pi2_event_loop =
1390 log_reader_factory.MakeEventLoop("test", pi2);
1391 pi2_event_loop->SkipTimingReport();
1392 std::unique_ptr<EventLoop> full_pi2_event_loop =
1393 log_reader_factory.MakeEventLoop("test", pi2);
1394 full_pi2_event_loop->SkipTimingReport();
1395 std::unique_ptr<EventLoop> pi1_event_loop =
1396 log_reader_factory.MakeEventLoop("test", pi1);
1397 pi1_event_loop->SkipTimingReport();
1398
1399 MessageCounter<aos::examples::Ping> pi2_ping(pi2_event_loop.get(), "/aos");
1400 MessageCounter<aos::examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1401 "/foo");
1402 MessageCounter<aos::examples::Ping> full_pi2_renamed_ping(
1403 full_pi2_event_loop.get(), "/pi2/foo/renamed");
1404 MessageCounter<aos::examples::Ping> pi1_ping(pi1_event_loop.get(), "/aos");
1405
1406 log_reader_factory.Run();
1407
1408 EXPECT_EQ(pi2_ping.count(), 0u);
1409 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1410 EXPECT_NE(full_pi2_renamed_ping.count(), 0u);
1411 EXPECT_NE(pi1_ping.count(), 0u);
1412
1413 reader.Deregister();
1414}
1415
Naman Guptaa63aa132023-03-22 20:06:34 -07001416// Tests that we can remap a forwarded channel as well.
1417TEST_P(MultinodeLoggerTest, RemapForwardedLoggedChannel) {
1418 time_converter_.StartEqual();
1419 {
1420 LoggerState pi1_logger = MakeLogger(pi1_);
1421 LoggerState pi2_logger = MakeLogger(pi2_);
1422
1423 event_loop_factory_.RunFor(chrono::milliseconds(95));
1424
1425 StartLogger(&pi1_logger);
1426 StartLogger(&pi2_logger);
1427
1428 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1429 }
1430
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001431 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1432 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1433 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001434
1435 reader.RemapLoggedChannel<examples::Ping>("/test");
1436
1437 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1438 log_reader_factory.set_send_delay(chrono::microseconds(0));
1439
1440 reader.Register(&log_reader_factory);
1441
1442 const Node *pi1 =
1443 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1444 const Node *pi2 =
1445 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1446
1447 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1448 // else should have moved.
1449 std::unique_ptr<EventLoop> pi1_event_loop =
1450 log_reader_factory.MakeEventLoop("test", pi1);
1451 pi1_event_loop->SkipTimingReport();
1452 std::unique_ptr<EventLoop> full_pi1_event_loop =
1453 log_reader_factory.MakeEventLoop("test", pi1);
1454 full_pi1_event_loop->SkipTimingReport();
1455 std::unique_ptr<EventLoop> pi2_event_loop =
1456 log_reader_factory.MakeEventLoop("test", pi2);
1457 pi2_event_loop->SkipTimingReport();
1458
1459 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1460 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1461 MessageCounter<examples::Ping> pi1_original_ping(pi1_event_loop.get(),
1462 "/original/test");
1463 MessageCounter<examples::Ping> pi2_original_ping(pi2_event_loop.get(),
1464 "/original/test");
1465
1466 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1467 pi1_original_ping_timestamp;
1468 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1469 pi1_ping_timestamp;
1470 if (!shared()) {
1471 pi1_original_ping_timestamp =
1472 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1473 pi1_event_loop.get(),
1474 "/pi1/aos/remote_timestamps/pi2/original/test/aos-examples-Ping");
1475 pi1_ping_timestamp =
1476 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1477 pi1_event_loop.get(),
1478 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1479 }
1480
1481 log_reader_factory.Run();
1482
1483 EXPECT_EQ(pi1_ping.count(), 0u);
1484 EXPECT_EQ(pi2_ping.count(), 0u);
1485 EXPECT_NE(pi1_original_ping.count(), 0u);
1486 EXPECT_NE(pi2_original_ping.count(), 0u);
1487 if (!shared()) {
1488 EXPECT_NE(pi1_original_ping_timestamp->count(), 0u);
1489 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1490 }
1491
1492 reader.Deregister();
1493}
1494
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001495// Tests that we can rename a forwarded channel as well.
1496TEST_P(MultinodeLoggerTest, RenameForwardedLoggedChannel) {
1497 std::vector<std::string> actual_filenames;
1498 time_converter_.StartEqual();
1499 {
1500 LoggerState pi1_logger = MakeLogger(pi1_);
1501 LoggerState pi2_logger = MakeLogger(pi2_);
1502
1503 event_loop_factory_.RunFor(chrono::milliseconds(95));
1504
1505 StartLogger(&pi1_logger);
1506 StartLogger(&pi2_logger);
1507
1508 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1509
1510 pi1_logger.AppendAllFilenames(&actual_filenames);
1511 pi2_logger.AppendAllFilenames(&actual_filenames);
1512 }
1513
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001514 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1515 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1516 LogReader reader(sorted_parts);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001517
1518 std::vector<MapT> maps;
1519 {
1520 MapT map;
1521 map.match = std::make_unique<ChannelT>();
1522 map.match->name = "/production*";
1523 map.match->source_node = "pi1";
1524 map.rename = std::make_unique<ChannelT>();
1525 map.rename->name = "/pi1/production";
1526 maps.emplace_back(std::move(map));
1527 }
1528 {
1529 MapT map;
1530 map.match = std::make_unique<ChannelT>();
1531 map.match->name = "/production*";
1532 map.match->source_node = "pi2";
1533 map.rename = std::make_unique<ChannelT>();
1534 map.rename->name = "/pi2/production";
1535 maps.emplace_back(std::move(map));
1536 }
1537 reader.RenameLoggedChannel<aos::examples::Ping>(
1538 "/test", configuration::GetNode(reader.configuration(), "pi1"),
1539 "/pi1/production", maps);
1540
1541 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1542 log_reader_factory.set_send_delay(chrono::microseconds(0));
1543
1544 reader.Register(&log_reader_factory);
1545
1546 const Node *pi1 =
1547 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1548 const Node *pi2 =
1549 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1550
1551 // Confirm we can read the data on the renamed channel, on both the source
1552 // node and the remote node. In case of split timestamp channels, confirm that
1553 // we receive the timestamp messages on the renamed channel as well.
1554 std::unique_ptr<EventLoop> pi1_event_loop =
1555 log_reader_factory.MakeEventLoop("test", pi1);
1556 pi1_event_loop->SkipTimingReport();
1557 std::unique_ptr<EventLoop> full_pi1_event_loop =
1558 log_reader_factory.MakeEventLoop("test", pi1);
1559 full_pi1_event_loop->SkipTimingReport();
1560 std::unique_ptr<EventLoop> pi2_event_loop =
1561 log_reader_factory.MakeEventLoop("test", pi2);
1562 pi2_event_loop->SkipTimingReport();
1563
1564 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1565 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1566 MessageCounter<examples::Ping> pi1_renamed_ping(pi1_event_loop.get(),
1567 "/pi1/production");
1568 MessageCounter<examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1569 "/pi1/production");
1570
1571 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1572 pi1_renamed_ping_timestamp;
1573 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1574 pi1_ping_timestamp;
1575 if (!shared()) {
1576 pi1_renamed_ping_timestamp =
1577 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1578 pi1_event_loop.get(),
1579 "/pi1/aos/remote_timestamps/pi2/pi1/production/aos-examples-Ping");
1580 pi1_ping_timestamp =
1581 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1582 pi1_event_loop.get(),
1583 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1584 }
1585
1586 log_reader_factory.Run();
1587
1588 EXPECT_EQ(pi1_ping.count(), 0u);
1589 EXPECT_EQ(pi2_ping.count(), 0u);
1590 EXPECT_NE(pi1_renamed_ping.count(), 0u);
1591 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1592 if (!shared()) {
1593 EXPECT_NE(pi1_renamed_ping_timestamp->count(), 0u);
1594 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1595 }
1596
1597 reader.Deregister();
1598}
1599
Naman Guptaa63aa132023-03-22 20:06:34 -07001600// Tests that we observe all the same events in log replay (for a given node)
1601// whether we just register an event loop for that node or if we register a full
1602// event loop factory.
1603TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
1604 time_converter_.StartEqual();
1605 constexpr chrono::milliseconds kStartupDelay(95);
Austin Schuh8fb4b452023-08-04 17:02:27 -07001606 std::vector<std::string> filenames;
1607
Naman Guptaa63aa132023-03-22 20:06:34 -07001608 {
1609 LoggerState pi1_logger = MakeLogger(pi1_);
1610 LoggerState pi2_logger = MakeLogger(pi2_);
1611
1612 event_loop_factory_.RunFor(kStartupDelay);
1613
1614 StartLogger(&pi1_logger);
1615 StartLogger(&pi2_logger);
1616
1617 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07001618
1619 pi1_logger.AppendAllFilenames(&filenames);
1620 pi2_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001621 }
1622
Austin Schuh8fb4b452023-08-04 17:02:27 -07001623 LogReader full_reader(SortParts(filenames));
1624 LogReader single_node_reader(SortParts(filenames));
Naman Guptaa63aa132023-03-22 20:06:34 -07001625
1626 SimulatedEventLoopFactory full_factory(full_reader.configuration());
1627 SimulatedEventLoopFactory single_node_factory(
1628 single_node_reader.configuration());
1629 single_node_factory.SkipTimingReport();
1630 single_node_factory.DisableStatistics();
1631 std::unique_ptr<EventLoop> replay_event_loop =
1632 single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
1633 "log_reader");
1634
1635 full_reader.Register(&full_factory);
1636 single_node_reader.Register(replay_event_loop.get());
1637
1638 const Node *full_pi1 =
1639 configuration::GetNode(full_factory.configuration(), "pi1");
1640
1641 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1642 // else should have moved.
1643 std::unique_ptr<EventLoop> full_event_loop =
1644 full_factory.MakeEventLoop("test", full_pi1);
1645 full_event_loop->SkipTimingReport();
1646 full_event_loop->SkipAosLog();
1647 // maps are indexed on channel index.
1648 // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
1649 std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
1650 observed_messages;
1651 std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
1652 for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
1653 ++ii) {
1654 const Channel *channel =
1655 full_event_loop->configuration()->channels()->Get(ii);
1656 // We currently don't support replaying remote timestamp channels in
1657 // realtime replay (unless the remote timestamp channel was not NOT_LOGGED,
1658 // in which case it gets auto-remapped and replayed on a /original channel).
1659 if (channel->name()->string_view().find("remote_timestamp") !=
1660 std::string_view::npos &&
1661 channel->name()->string_view().find("/original") ==
1662 std::string_view::npos) {
1663 continue;
1664 }
1665 if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
1666 observed_messages[ii] = {};
1667 fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
1668 full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
1669 if (fetchers[ii]->Fetch()) {
1670 observed_messages[ii].push_back(std::make_pair(
1671 fetchers[ii]->context().monotonic_event_time, true));
1672 }
1673 });
1674 full_event_loop->MakeRawNoArgWatcher(
1675 channel, [ii, &observed_messages](const Context &context) {
1676 observed_messages[ii].push_back(
1677 std::make_pair(context.monotonic_event_time, false));
1678 });
1679 }
1680 }
1681
1682 full_factory.Run();
1683 fetchers.clear();
1684 full_reader.Deregister();
1685
1686 const Node *single_node_pi1 =
1687 configuration::GetNode(single_node_factory.configuration(), "pi1");
1688 std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
1689
1690 std::unique_ptr<EventLoop> single_node_event_loop =
1691 single_node_factory.MakeEventLoop("test", single_node_pi1);
1692 single_node_event_loop->SkipTimingReport();
1693 single_node_event_loop->SkipAosLog();
1694 for (size_t ii = 0;
1695 ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
1696 const Channel *channel =
1697 single_node_event_loop->configuration()->channels()->Get(ii);
1698 single_node_factory.DisableForwarding(channel);
1699 if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
1700 single_node_fetchers[ii] =
1701 single_node_event_loop->MakeRawFetcher(channel);
1702 single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
1703 EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
1704 << "Single EventLoop replay doesn't support pre-loading fetchers. "
1705 << configuration::StrippedChannelToString(channel);
1706 });
1707 single_node_event_loop->MakeRawNoArgWatcher(
1708 channel, [ii, &observed_messages, channel,
1709 kStartupDelay](const Context &context) {
1710 if (observed_messages[ii].empty()) {
1711 FAIL() << "Observed extra message at "
1712 << context.monotonic_event_time << " on "
1713 << configuration::StrippedChannelToString(channel);
1714 return;
1715 }
1716 const std::pair<monotonic_clock::time_point, bool> &message =
1717 observed_messages[ii].front();
1718 if (message.second) {
1719 EXPECT_LE(message.first,
1720 context.monotonic_event_time + kStartupDelay)
1721 << "Mismatched message times " << context.monotonic_event_time
1722 << " and " << message.first << " on "
1723 << configuration::StrippedChannelToString(channel);
1724 } else {
1725 EXPECT_EQ(message.first,
1726 context.monotonic_event_time + kStartupDelay)
1727 << "Mismatched message times " << context.monotonic_event_time
1728 << " and " << message.first << " on "
1729 << configuration::StrippedChannelToString(channel);
1730 }
1731 observed_messages[ii].erase(observed_messages[ii].begin());
1732 });
1733 }
1734 }
1735
1736 single_node_factory.Run();
1737
1738 single_node_fetchers.clear();
1739
1740 single_node_reader.Deregister();
1741
1742 for (const auto &pair : observed_messages) {
1743 EXPECT_TRUE(pair.second.empty())
1744 << "Missed " << pair.second.size() << " messages on "
1745 << configuration::StrippedChannelToString(
1746 single_node_event_loop->configuration()->channels()->Get(
1747 pair.first));
1748 }
1749}
1750
1751// Tests that we properly recreate forwarded timestamps when replaying a log.
1752// This should be enough that we can then re-run the logger and get a valid log
1753// back.
1754TEST_P(MultinodeLoggerTest, MessageHeader) {
1755 time_converter_.StartEqual();
1756 {
1757 LoggerState pi1_logger = MakeLogger(pi1_);
1758 LoggerState pi2_logger = MakeLogger(pi2_);
1759
1760 event_loop_factory_.RunFor(chrono::milliseconds(95));
1761
1762 StartLogger(&pi1_logger);
1763 StartLogger(&pi2_logger);
1764
1765 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1766 }
1767
1768 LogReader reader(SortParts(logfiles_));
1769
1770 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1771 log_reader_factory.set_send_delay(chrono::microseconds(0));
1772
1773 // This sends out the fetched messages and advances time to the start of the
1774 // log file.
1775 reader.Register(&log_reader_factory);
1776
1777 const Node *pi1 =
1778 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1779 const Node *pi2 =
1780 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1781
1782 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1783 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1784 LOG(INFO) << "now pi1 "
1785 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1786 LOG(INFO) << "now pi2 "
1787 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1788
1789 EXPECT_THAT(reader.LoggedNodes(),
1790 ::testing::ElementsAre(
1791 configuration::GetNode(reader.logged_configuration(), pi1),
1792 configuration::GetNode(reader.logged_configuration(), pi2)));
1793
1794 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1795
1796 std::unique_ptr<EventLoop> pi1_event_loop =
1797 log_reader_factory.MakeEventLoop("test", pi1);
1798 std::unique_ptr<EventLoop> pi2_event_loop =
1799 log_reader_factory.MakeEventLoop("test", pi2);
1800
1801 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
1802 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1803 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
1804 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1805
1806 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1807 pi1_event_loop->MakeFetcher<examples::Ping>("/test");
1808 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1809 pi2_event_loop->MakeFetcher<examples::Ping>("/test");
1810
1811 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
1812 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1813 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
1814 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1815
1816 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1817 pi2_event_loop->MakeFetcher<examples::Pong>("/test");
1818 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1819 pi1_event_loop->MakeFetcher<examples::Pong>("/test");
1820
1821 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
1822 pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
1823 const size_t ping_timestamp_channel = configuration::ChannelIndex(
1824 pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
1825
1826 const size_t pi2_timestamp_channel = configuration::ChannelIndex(
1827 pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
1828 const size_t pong_timestamp_channel = configuration::ChannelIndex(
1829 pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
1830
1831 const chrono::nanoseconds network_delay = event_loop_factory_.network_delay();
1832 const chrono::nanoseconds send_delay = event_loop_factory_.send_delay();
1833
1834 for (std::pair<int, std::string> channel :
1835 shared()
1836 ? std::vector<
1837 std::pair<int, std::string>>{{-1,
1838 "/aos/remote_timestamps/pi2"}}
1839 : std::vector<std::pair<int, std::string>>{
1840 {pi1_timestamp_channel,
1841 "/aos/remote_timestamps/pi2/pi1/aos/"
1842 "aos-message_bridge-Timestamp"},
1843 {ping_timestamp_channel,
1844 "/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
1845 pi1_event_loop->MakeWatcher(
1846 channel.second,
1847 [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
1848 ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
1849 &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
1850 &ping_on_pi2_fetcher, network_delay, send_delay,
1851 channel_index = channel.first](const RemoteMessage &header) {
1852 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1853 chrono::nanoseconds(header.monotonic_sent_time()));
1854 const aos::realtime_clock::time_point header_realtime_sent_time(
1855 chrono::nanoseconds(header.realtime_sent_time()));
1856 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1857 chrono::nanoseconds(header.monotonic_remote_time()));
1858 const aos::realtime_clock::time_point header_realtime_remote_time(
1859 chrono::nanoseconds(header.realtime_remote_time()));
1860
1861 if (channel_index != -1) {
1862 ASSERT_EQ(channel_index, header.channel_index());
1863 }
1864
1865 const Context *pi1_context = nullptr;
1866 const Context *pi2_context = nullptr;
1867
1868 if (header.channel_index() == pi1_timestamp_channel) {
1869 ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
1870 ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
1871 pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
1872 pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
1873 } else if (header.channel_index() == ping_timestamp_channel) {
1874 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
1875 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
1876 pi1_context = &ping_on_pi1_fetcher.context();
1877 pi2_context = &ping_on_pi2_fetcher.context();
1878 } else {
1879 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1880 << configuration::CleanedChannelToString(
1881 pi1_event_loop->configuration()->channels()->Get(
1882 header.channel_index()));
1883 }
1884
1885 ASSERT_TRUE(header.has_boot_uuid());
1886 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1887 pi2_event_loop->boot_uuid());
1888
1889 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
1890 EXPECT_EQ(pi2_context->remote_queue_index,
1891 header.remote_queue_index());
1892 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
1893
1894 EXPECT_EQ(pi2_context->monotonic_event_time,
1895 header_monotonic_sent_time);
1896 EXPECT_EQ(pi2_context->realtime_event_time,
1897 header_realtime_sent_time);
1898 EXPECT_EQ(pi2_context->realtime_remote_time,
1899 header_realtime_remote_time);
1900 EXPECT_EQ(pi2_context->monotonic_remote_time,
1901 header_monotonic_remote_time);
1902
1903 EXPECT_EQ(pi1_context->realtime_event_time,
1904 header_realtime_remote_time);
1905 EXPECT_EQ(pi1_context->monotonic_event_time,
1906 header_monotonic_remote_time);
1907
1908 // Time estimation isn't perfect, but we know the clocks were
1909 // identical when logged, so we know when this should have come back.
1910 // Confirm we got it when we expected.
1911 EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
1912 pi1_context->monotonic_event_time + 2 * network_delay +
1913 send_delay);
1914 });
1915 }
1916 for (std::pair<int, std::string> channel :
1917 shared()
1918 ? std::vector<
1919 std::pair<int, std::string>>{{-1,
1920 "/aos/remote_timestamps/pi1"}}
1921 : std::vector<std::pair<int, std::string>>{
1922 {pi2_timestamp_channel,
1923 "/aos/remote_timestamps/pi1/pi2/aos/"
1924 "aos-message_bridge-Timestamp"}}) {
1925 pi2_event_loop->MakeWatcher(
1926 channel.second,
1927 [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
1928 pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
1929 &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
1930 &pong_on_pi1_fetcher, network_delay, send_delay,
1931 channel_index = channel.first](const RemoteMessage &header) {
1932 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1933 chrono::nanoseconds(header.monotonic_sent_time()));
1934 const aos::realtime_clock::time_point header_realtime_sent_time(
1935 chrono::nanoseconds(header.realtime_sent_time()));
1936 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1937 chrono::nanoseconds(header.monotonic_remote_time()));
1938 const aos::realtime_clock::time_point header_realtime_remote_time(
1939 chrono::nanoseconds(header.realtime_remote_time()));
1940
1941 if (channel_index != -1) {
1942 ASSERT_EQ(channel_index, header.channel_index());
1943 }
1944
1945 const Context *pi2_context = nullptr;
1946 const Context *pi1_context = nullptr;
1947
1948 if (header.channel_index() == pi2_timestamp_channel) {
1949 ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
1950 ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
1951 pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
1952 pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
1953 } else if (header.channel_index() == pong_timestamp_channel) {
1954 ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
1955 ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
1956 pi2_context = &pong_on_pi2_fetcher.context();
1957 pi1_context = &pong_on_pi1_fetcher.context();
1958 } else {
1959 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1960 << configuration::CleanedChannelToString(
1961 pi2_event_loop->configuration()->channels()->Get(
1962 header.channel_index()));
1963 }
1964
1965 ASSERT_TRUE(header.has_boot_uuid());
1966 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1967 pi1_event_loop->boot_uuid());
1968
1969 EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
1970 EXPECT_EQ(pi1_context->remote_queue_index,
1971 header.remote_queue_index());
1972 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
1973
1974 EXPECT_EQ(pi1_context->monotonic_event_time,
1975 header_monotonic_sent_time);
1976 EXPECT_EQ(pi1_context->realtime_event_time,
1977 header_realtime_sent_time);
1978 EXPECT_EQ(pi1_context->realtime_remote_time,
1979 header_realtime_remote_time);
1980 EXPECT_EQ(pi1_context->monotonic_remote_time,
1981 header_monotonic_remote_time);
1982
1983 EXPECT_EQ(pi2_context->realtime_event_time,
1984 header_realtime_remote_time);
1985 EXPECT_EQ(pi2_context->monotonic_event_time,
1986 header_monotonic_remote_time);
1987
1988 // Time estimation isn't perfect, but we know the clocks were
1989 // identical when logged, so we know when this should have come back.
1990 // Confirm we got it when we expected.
1991 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
1992 pi2_context->monotonic_event_time + 2 * network_delay +
1993 send_delay);
1994 });
1995 }
1996
1997 // And confirm we can re-create a log again, while checking the contents.
1998 {
1999 LoggerState pi1_logger = MakeLogger(
2000 log_reader_factory.GetNodeEventLoopFactory("pi1"), &log_reader_factory);
2001 LoggerState pi2_logger = MakeLogger(
2002 log_reader_factory.GetNodeEventLoopFactory("pi2"), &log_reader_factory);
2003
Austin Schuh8fb4b452023-08-04 17:02:27 -07002004 StartLogger(&pi1_logger, tmp_dir_ + "/logs/relogged1");
2005 StartLogger(&pi2_logger, tmp_dir_ + "/logs/relogged2");
Naman Guptaa63aa132023-03-22 20:06:34 -07002006
2007 log_reader_factory.Run();
2008 }
2009
2010 reader.Deregister();
2011
2012 // And verify that we can run the LogReader over the relogged files without
2013 // hitting any fatal errors.
2014 {
Austin Schuh8fb4b452023-08-04 17:02:27 -07002015 LogReader relogged_reader(SortParts(
2016 MakeLogFiles(tmp_dir_ + "/logs/relogged1", tmp_dir_ + "/logs/relogged2",
2017 1, 1, 2, 2, true)));
Naman Guptaa63aa132023-03-22 20:06:34 -07002018 relogged_reader.Register();
2019
2020 relogged_reader.event_loop_factory()->Run();
2021 }
2022 // And confirm that we can read the logged file using the reader's
2023 // configuration.
2024 {
2025 LogReader relogged_reader(
Austin Schuh8fb4b452023-08-04 17:02:27 -07002026 SortParts(MakeLogFiles(tmp_dir_ + "/logs/relogged1",
2027 tmp_dir_ + "/logs/relogged2", 1, 1, 2, 2, true)),
Naman Guptaa63aa132023-03-22 20:06:34 -07002028 reader.configuration());
2029 relogged_reader.Register();
2030
2031 relogged_reader.event_loop_factory()->Run();
2032 }
2033}
2034
2035// Tests that we properly populate and extract the logger_start time by setting
2036// up a clock difference between 2 nodes and looking at the resulting parts.
2037TEST_P(MultinodeLoggerTest, LoggerStartTime) {
2038 std::vector<std::string> actual_filenames;
2039 time_converter_.AddMonotonic(
2040 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2041 {
2042 LoggerState pi1_logger = MakeLogger(pi1_);
2043 LoggerState pi2_logger = MakeLogger(pi2_);
2044
2045 StartLogger(&pi1_logger);
2046 StartLogger(&pi2_logger);
2047
2048 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2049
2050 pi1_logger.AppendAllFilenames(&actual_filenames);
2051 pi2_logger.AppendAllFilenames(&actual_filenames);
2052 }
2053
2054 ASSERT_THAT(actual_filenames,
2055 ::testing::UnorderedElementsAreArray(logfiles_));
2056
Austin Schuh8fb4b452023-08-04 17:02:27 -07002057 for (const LogFile &log_file : SortParts(actual_filenames)) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002058 for (const LogParts &log_part : log_file.parts) {
2059 if (log_part.node == log_file.logger_node) {
2060 EXPECT_EQ(log_part.logger_monotonic_start_time,
2061 aos::monotonic_clock::min_time);
2062 EXPECT_EQ(log_part.logger_realtime_start_time,
2063 aos::realtime_clock::min_time);
2064 } else {
2065 const chrono::seconds offset = log_file.logger_node == "pi1"
2066 ? -chrono::seconds(1000)
2067 : chrono::seconds(1000);
2068 EXPECT_EQ(log_part.logger_monotonic_start_time,
2069 log_part.monotonic_start_time + offset);
2070 EXPECT_EQ(log_part.logger_realtime_start_time,
2071 log_file.realtime_start_time +
2072 (log_part.logger_monotonic_start_time -
2073 log_file.monotonic_start_time));
2074 }
2075 }
2076 }
2077}
2078
2079// Test that renaming the base, renames the folder.
2080TEST_P(MultinodeLoggerTest, LoggerRenameFolder) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002081 time_converter_.AddMonotonic(
2082 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07002083 logfile_base1_ = tmp_dir_ + "/logs/renamefolder/multi_logfile1";
2084 logfile_base2_ = tmp_dir_ + "/logs/renamefolder/multi_logfile2";
2085
Naman Guptaa63aa132023-03-22 20:06:34 -07002086 LoggerState pi1_logger = MakeLogger(pi1_);
2087 LoggerState pi2_logger = MakeLogger(pi2_);
2088
2089 StartLogger(&pi1_logger);
2090 StartLogger(&pi2_logger);
2091
2092 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002093 logfile_base1_ = tmp_dir_ + "/logs/new-good/multi_logfile1";
2094 logfile_base2_ = tmp_dir_ + "/logs/new-good/multi_logfile2";
Naman Guptaa63aa132023-03-22 20:06:34 -07002095 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
Alexei Strotscaf17d32023-04-03 22:31:11 -07002096
2097 // Sequence of set_base_name and Rotate simulates rename operation. Since
2098 // rename is not supported by all namers, RenameLogBase moved from logger to
2099 // the higher level abstraction, yet log_namers support rename, and it is
2100 // legal to test it here.
2101 pi1_logger.log_namer->set_base_name(logfile_base1_);
2102 pi1_logger.logger->Rotate();
2103 pi2_logger.log_namer->set_base_name(logfile_base2_);
2104 pi2_logger.logger->Rotate();
2105
Naman Guptaa63aa132023-03-22 20:06:34 -07002106 for (auto &file : logfiles_) {
2107 struct stat s;
2108 EXPECT_EQ(0, stat(file.c_str(), &s));
2109 }
2110}
2111
2112// Test that renaming the file base dies.
2113TEST_P(MultinodeLoggerDeathTest, LoggerRenameFile) {
2114 time_converter_.AddMonotonic(
2115 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07002116 logfile_base1_ = tmp_dir_ + "/logs/renamefile/multi_logfile1";
2117 logfile_base2_ = tmp_dir_ + "/logs/renamefile/multi_logfile2";
2118
Naman Guptaa63aa132023-03-22 20:06:34 -07002119 LoggerState pi1_logger = MakeLogger(pi1_);
2120 StartLogger(&pi1_logger);
2121 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002122 logfile_base1_ = tmp_dir_ + "/logs/new-renamefile/new_multi_logfile1";
Alexei Strotscaf17d32023-04-03 22:31:11 -07002123 EXPECT_DEATH({ pi1_logger.log_namer->set_base_name(logfile_base1_); },
Naman Guptaa63aa132023-03-22 20:06:34 -07002124 "Rename of file base from");
2125}
2126
2127// TODO(austin): We can write a test which recreates a logfile and confirms that
2128// we get it back. That is the ultimate test.
2129
2130// Tests that we properly recreate forwarded timestamps when replaying a log.
2131// This should be enough that we can then re-run the logger and get a valid log
2132// back.
2133TEST_P(MultinodeLoggerTest, RemoteReboot) {
Austin Schuh6ecfe902023-08-04 22:44:37 -07002134 if (file_strategy() == FileStrategy::kCombine) {
2135 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
2136 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002137 std::vector<std::string> actual_filenames;
2138
2139 const UUID pi1_boot0 = UUID::Random();
2140 const UUID pi2_boot0 = UUID::Random();
2141 const UUID pi2_boot1 = UUID::Random();
2142 {
2143 CHECK_EQ(pi1_index_, 0u);
2144 CHECK_EQ(pi2_index_, 1u);
2145
2146 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2147 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2148 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2149
2150 time_converter_.AddNextTimestamp(
2151 distributed_clock::epoch(),
2152 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2153 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2154 time_converter_.AddNextTimestamp(
2155 distributed_clock::epoch() + reboot_time,
2156 {BootTimestamp::epoch() + reboot_time,
2157 BootTimestamp{
2158 .boot = 1,
2159 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2160 }
2161
2162 {
2163 LoggerState pi1_logger = MakeLogger(pi1_);
2164
2165 event_loop_factory_.RunFor(chrono::milliseconds(95));
2166 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2167 pi1_boot0);
2168 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2169 pi2_boot0);
2170
2171 StartLogger(&pi1_logger);
2172
2173 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2174
2175 VLOG(1) << "Reboot now!";
2176
2177 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2178 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2179 pi1_boot0);
2180 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2181 pi2_boot1);
2182
2183 pi1_logger.AppendAllFilenames(&actual_filenames);
2184 }
2185
2186 std::sort(actual_filenames.begin(), actual_filenames.end());
2187 std::sort(pi1_reboot_logfiles_.begin(), pi1_reboot_logfiles_.end());
2188 ASSERT_THAT(actual_filenames,
2189 ::testing::UnorderedElementsAreArray(pi1_reboot_logfiles_));
2190
2191 // Confirm that our new oldest timestamps properly update as we reboot and
2192 // rotate.
2193 for (const std::string &file : pi1_reboot_logfiles_) {
2194 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2195 ReadHeader(file);
2196 CHECK(log_header);
2197 if (log_header->message().has_configuration()) {
2198 continue;
2199 }
2200
2201 const monotonic_clock::time_point monotonic_start_time =
2202 monotonic_clock::time_point(
2203 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2204 const UUID source_node_boot_uuid = UUID::FromString(
2205 log_header->message().source_node_boot_uuid()->string_view());
2206
2207 if (log_header->message().node()->name()->string_view() != "pi1") {
2208 // The remote message channel should rotate later and have more parts.
2209 // This only is true on the log files with shared remote messages.
2210 //
2211 // TODO(austin): I'm not the most thrilled with this test pattern... It
2212 // feels brittle in a different way.
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002213 if (file.find("timestamps/remote_pi2") == std::string::npos) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002214 switch (log_header->message().parts_index()) {
2215 case 0:
2216 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2217 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2218 break;
2219 case 1:
2220 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2221 ASSERT_EQ(monotonic_start_time,
2222 monotonic_clock::epoch() + chrono::seconds(1));
2223 break;
2224 case 2:
2225 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2226 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2227 break;
2228 case 3:
2229 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2230 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
2231 chrono::nanoseconds(2322999462))
2232 << " on " << file;
2233 break;
2234 default:
2235 FAIL();
2236 break;
2237 }
2238 } else {
2239 switch (log_header->message().parts_index()) {
2240 case 0:
2241 case 1:
2242 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2243 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2244 break;
2245 case 2:
2246 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2247 ASSERT_EQ(monotonic_start_time,
2248 monotonic_clock::epoch() + chrono::seconds(1));
2249 break;
2250 case 3:
2251 case 4:
2252 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2253 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2254 break;
2255 case 5:
2256 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2257 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
2258 chrono::nanoseconds(2322999462))
2259 << " on " << file;
2260 break;
2261 default:
2262 FAIL();
2263 break;
2264 }
2265 }
2266 continue;
2267 }
2268 SCOPED_TRACE(file);
2269 SCOPED_TRACE(aos::FlatbufferToJson(
2270 *log_header, {.multi_line = true, .max_vector_size = 100}));
2271 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2272 ASSERT_EQ(
2273 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2274 EXPECT_EQ(
2275 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2276 monotonic_clock::max_time.time_since_epoch().count());
2277 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2278 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2279 2u);
2280 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2281 monotonic_clock::max_time.time_since_epoch().count());
2282 ASSERT_TRUE(log_header->message()
2283 .has_oldest_remote_unreliable_monotonic_timestamps());
2284 ASSERT_EQ(log_header->message()
2285 .oldest_remote_unreliable_monotonic_timestamps()
2286 ->size(),
2287 2u);
2288 EXPECT_EQ(log_header->message()
2289 .oldest_remote_unreliable_monotonic_timestamps()
2290 ->Get(0),
2291 monotonic_clock::max_time.time_since_epoch().count());
2292 ASSERT_TRUE(log_header->message()
2293 .has_oldest_local_unreliable_monotonic_timestamps());
2294 ASSERT_EQ(log_header->message()
2295 .oldest_local_unreliable_monotonic_timestamps()
2296 ->size(),
2297 2u);
2298 EXPECT_EQ(log_header->message()
2299 .oldest_local_unreliable_monotonic_timestamps()
2300 ->Get(0),
2301 monotonic_clock::max_time.time_since_epoch().count());
2302
2303 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2304 monotonic_clock::time_point(chrono::nanoseconds(
2305 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2306 1)));
2307 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2308 monotonic_clock::time_point(chrono::nanoseconds(
2309 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2310 const monotonic_clock::time_point
2311 oldest_remote_unreliable_monotonic_timestamps =
2312 monotonic_clock::time_point(chrono::nanoseconds(
2313 log_header->message()
2314 .oldest_remote_unreliable_monotonic_timestamps()
2315 ->Get(1)));
2316 const monotonic_clock::time_point
2317 oldest_local_unreliable_monotonic_timestamps =
2318 monotonic_clock::time_point(chrono::nanoseconds(
2319 log_header->message()
2320 .oldest_local_unreliable_monotonic_timestamps()
2321 ->Get(1)));
2322 const monotonic_clock::time_point
2323 oldest_remote_reliable_monotonic_timestamps =
2324 monotonic_clock::time_point(chrono::nanoseconds(
2325 log_header->message()
2326 .oldest_remote_reliable_monotonic_timestamps()
2327 ->Get(1)));
2328 const monotonic_clock::time_point
2329 oldest_local_reliable_monotonic_timestamps =
2330 monotonic_clock::time_point(chrono::nanoseconds(
2331 log_header->message()
2332 .oldest_local_reliable_monotonic_timestamps()
2333 ->Get(1)));
2334 const monotonic_clock::time_point
2335 oldest_logger_remote_unreliable_monotonic_timestamps =
2336 monotonic_clock::time_point(chrono::nanoseconds(
2337 log_header->message()
2338 .oldest_logger_remote_unreliable_monotonic_timestamps()
2339 ->Get(0)));
2340 const monotonic_clock::time_point
2341 oldest_logger_local_unreliable_monotonic_timestamps =
2342 monotonic_clock::time_point(chrono::nanoseconds(
2343 log_header->message()
2344 .oldest_logger_local_unreliable_monotonic_timestamps()
2345 ->Get(0)));
2346 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2347 monotonic_clock::max_time);
2348 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2349 monotonic_clock::max_time);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002350 if (log_header->message().data_stored()->Get(0) == StoredDataType::DATA) {
2351 switch (log_header->message().parts_index()) {
2352 case 0:
2353 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2354 monotonic_clock::max_time);
2355 EXPECT_EQ(oldest_local_monotonic_timestamps,
2356 monotonic_clock::max_time);
2357 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2358 monotonic_clock::max_time);
2359 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2360 monotonic_clock::max_time);
2361 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2362 monotonic_clock::max_time);
2363 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2364 monotonic_clock::max_time);
2365 break;
2366 default:
2367 FAIL();
2368 break;
2369 }
2370 } else if (log_header->message().data_stored()->Get(0) ==
2371 StoredDataType::TIMESTAMPS) {
2372 switch (log_header->message().parts_index()) {
2373 case 0:
2374 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2375 monotonic_clock::time_point(chrono::microseconds(90200)));
2376 EXPECT_EQ(oldest_local_monotonic_timestamps,
2377 monotonic_clock::time_point(chrono::microseconds(90350)));
2378 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2379 monotonic_clock::time_point(chrono::microseconds(90200)));
2380 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2381 monotonic_clock::time_point(chrono::microseconds(90350)));
2382 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2383 monotonic_clock::max_time);
2384 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2385 monotonic_clock::max_time);
2386 break;
2387 case 1:
2388 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2389 monotonic_clock::time_point(chrono::microseconds(90200)))
2390 << file;
2391 EXPECT_EQ(oldest_local_monotonic_timestamps,
2392 monotonic_clock::time_point(chrono::microseconds(90350)))
2393 << file;
2394 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2395 monotonic_clock::time_point(chrono::microseconds(90200)))
2396 << file;
2397 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2398 monotonic_clock::time_point(chrono::microseconds(90350)))
2399 << file;
2400 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2401 monotonic_clock::time_point(chrono::microseconds(100000)))
2402 << file;
2403 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2404 monotonic_clock::time_point(chrono::microseconds(100150)))
2405 << file;
2406 break;
2407 case 2:
2408 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2409 monotonic_clock::time_point(chrono::milliseconds(1323) +
2410 chrono::microseconds(200)));
2411 EXPECT_EQ(
2412 oldest_local_monotonic_timestamps,
2413 monotonic_clock::time_point(chrono::microseconds(10100350)));
2414 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2415 monotonic_clock::time_point(chrono::milliseconds(1323) +
2416 chrono::microseconds(200)));
2417 EXPECT_EQ(
2418 oldest_local_unreliable_monotonic_timestamps,
2419 monotonic_clock::time_point(chrono::microseconds(10100350)));
2420 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2421 monotonic_clock::max_time)
2422 << file;
2423 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2424 monotonic_clock::max_time)
2425 << file;
2426 break;
2427 case 3:
2428 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2429 monotonic_clock::time_point(chrono::milliseconds(1323) +
2430 chrono::microseconds(200)));
2431 EXPECT_EQ(
2432 oldest_local_monotonic_timestamps,
2433 monotonic_clock::time_point(chrono::microseconds(10100350)));
2434 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2435 monotonic_clock::time_point(chrono::milliseconds(1323) +
2436 chrono::microseconds(200)));
2437 EXPECT_EQ(
2438 oldest_local_unreliable_monotonic_timestamps,
2439 monotonic_clock::time_point(chrono::microseconds(10100350)));
2440 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2441 monotonic_clock::time_point(chrono::microseconds(1423000)))
2442 << file;
2443 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2444 monotonic_clock::time_point(chrono::microseconds(10200150)))
2445 << file;
2446 break;
2447 default:
2448 FAIL();
2449 break;
2450 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002451 }
2452 }
2453
2454 // Confirm that we refuse to replay logs with missing boot uuids.
2455 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002456 auto sorted_parts = SortParts(pi1_reboot_logfiles_);
2457 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2458 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002459
2460 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2461 log_reader_factory.set_send_delay(chrono::microseconds(0));
2462
2463 // This sends out the fetched messages and advances time to the start of
2464 // the log file.
2465 reader.Register(&log_reader_factory);
2466
2467 log_reader_factory.Run();
2468
2469 reader.Deregister();
2470 }
2471}
2472
2473// Tests that we can sort a log which only has timestamps from the remote
2474// because the local message_bridge_client failed to connect.
2475TEST_P(MultinodeLoggerTest, RemoteRebootOnlyTimestamps) {
Austin Schuh6ecfe902023-08-04 22:44:37 -07002476 if (file_strategy() == FileStrategy::kCombine) {
2477 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
2478 }
2479
Naman Guptaa63aa132023-03-22 20:06:34 -07002480 const UUID pi1_boot0 = UUID::Random();
2481 const UUID pi2_boot0 = UUID::Random();
2482 const UUID pi2_boot1 = UUID::Random();
2483 {
2484 CHECK_EQ(pi1_index_, 0u);
2485 CHECK_EQ(pi2_index_, 1u);
2486
2487 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2488 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2489 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2490
2491 time_converter_.AddNextTimestamp(
2492 distributed_clock::epoch(),
2493 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2494 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2495 time_converter_.AddNextTimestamp(
2496 distributed_clock::epoch() + reboot_time,
2497 {BootTimestamp::epoch() + reboot_time,
2498 BootTimestamp{
2499 .boot = 1,
2500 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2501 }
2502 pi2_->Disconnect(pi1_->node());
2503
2504 std::vector<std::string> filenames;
2505 {
2506 LoggerState pi1_logger = MakeLogger(pi1_);
2507
2508 event_loop_factory_.RunFor(chrono::milliseconds(95));
2509 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2510 pi1_boot0);
2511 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2512 pi2_boot0);
2513
2514 StartLogger(&pi1_logger);
2515
2516 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2517
2518 VLOG(1) << "Reboot now!";
2519
2520 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2521 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2522 pi1_boot0);
2523 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2524 pi2_boot1);
2525 pi1_logger.AppendAllFilenames(&filenames);
2526 }
2527
2528 std::sort(filenames.begin(), filenames.end());
2529
2530 // Confirm that our new oldest timestamps properly update as we reboot and
2531 // rotate.
2532 size_t timestamp_file_count = 0;
2533 for (const std::string &file : filenames) {
2534 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2535 ReadHeader(file);
2536 CHECK(log_header);
2537
2538 if (log_header->message().has_configuration()) {
2539 continue;
2540 }
2541
2542 const monotonic_clock::time_point monotonic_start_time =
2543 monotonic_clock::time_point(
2544 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2545 const UUID source_node_boot_uuid = UUID::FromString(
2546 log_header->message().source_node_boot_uuid()->string_view());
2547
2548 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2549 ASSERT_EQ(
2550 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2551 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2552 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2553 2u);
2554 ASSERT_TRUE(log_header->message()
2555 .has_oldest_remote_unreliable_monotonic_timestamps());
2556 ASSERT_EQ(log_header->message()
2557 .oldest_remote_unreliable_monotonic_timestamps()
2558 ->size(),
2559 2u);
2560 ASSERT_TRUE(log_header->message()
2561 .has_oldest_local_unreliable_monotonic_timestamps());
2562 ASSERT_EQ(log_header->message()
2563 .oldest_local_unreliable_monotonic_timestamps()
2564 ->size(),
2565 2u);
2566 ASSERT_TRUE(log_header->message()
2567 .has_oldest_remote_reliable_monotonic_timestamps());
2568 ASSERT_EQ(log_header->message()
2569 .oldest_remote_reliable_monotonic_timestamps()
2570 ->size(),
2571 2u);
2572 ASSERT_TRUE(
2573 log_header->message().has_oldest_local_reliable_monotonic_timestamps());
2574 ASSERT_EQ(log_header->message()
2575 .oldest_local_reliable_monotonic_timestamps()
2576 ->size(),
2577 2u);
2578
2579 ASSERT_TRUE(
2580 log_header->message()
2581 .has_oldest_logger_remote_unreliable_monotonic_timestamps());
2582 ASSERT_EQ(log_header->message()
2583 .oldest_logger_remote_unreliable_monotonic_timestamps()
2584 ->size(),
2585 2u);
2586 ASSERT_TRUE(log_header->message()
2587 .has_oldest_logger_local_unreliable_monotonic_timestamps());
2588 ASSERT_EQ(log_header->message()
2589 .oldest_logger_local_unreliable_monotonic_timestamps()
2590 ->size(),
2591 2u);
2592
2593 if (log_header->message().node()->name()->string_view() != "pi1") {
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002594 ASSERT_TRUE(file.find("timestamps/remote_pi2") != std::string::npos);
Naman Guptaa63aa132023-03-22 20:06:34 -07002595
2596 const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
2597 ReadNthMessage(file, 0);
2598 CHECK(msg);
2599
2600 EXPECT_TRUE(msg->message().has_monotonic_sent_time());
2601 EXPECT_TRUE(msg->message().has_monotonic_remote_time());
2602
2603 const monotonic_clock::time_point
2604 expected_oldest_local_monotonic_timestamps(
2605 chrono::nanoseconds(msg->message().monotonic_sent_time()));
2606 const monotonic_clock::time_point
2607 expected_oldest_remote_monotonic_timestamps(
2608 chrono::nanoseconds(msg->message().monotonic_remote_time()));
2609 const monotonic_clock::time_point
2610 expected_oldest_timestamp_monotonic_timestamps(
2611 chrono::nanoseconds(msg->message().monotonic_timestamp_time()));
2612
2613 EXPECT_NE(expected_oldest_local_monotonic_timestamps,
2614 monotonic_clock::min_time);
2615 EXPECT_NE(expected_oldest_remote_monotonic_timestamps,
2616 monotonic_clock::min_time);
2617 EXPECT_NE(expected_oldest_timestamp_monotonic_timestamps,
2618 monotonic_clock::min_time);
2619
2620 ++timestamp_file_count;
2621 // Since the log file is from the perspective of the other node,
2622 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2623 monotonic_clock::time_point(chrono::nanoseconds(
2624 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2625 0)));
2626 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2627 monotonic_clock::time_point(chrono::nanoseconds(
2628 log_header->message().oldest_local_monotonic_timestamps()->Get(
2629 0)));
2630 const monotonic_clock::time_point
2631 oldest_remote_unreliable_monotonic_timestamps =
2632 monotonic_clock::time_point(chrono::nanoseconds(
2633 log_header->message()
2634 .oldest_remote_unreliable_monotonic_timestamps()
2635 ->Get(0)));
2636 const monotonic_clock::time_point
2637 oldest_local_unreliable_monotonic_timestamps =
2638 monotonic_clock::time_point(chrono::nanoseconds(
2639 log_header->message()
2640 .oldest_local_unreliable_monotonic_timestamps()
2641 ->Get(0)));
2642 const monotonic_clock::time_point
2643 oldest_remote_reliable_monotonic_timestamps =
2644 monotonic_clock::time_point(chrono::nanoseconds(
2645 log_header->message()
2646 .oldest_remote_reliable_monotonic_timestamps()
2647 ->Get(0)));
2648 const monotonic_clock::time_point
2649 oldest_local_reliable_monotonic_timestamps =
2650 monotonic_clock::time_point(chrono::nanoseconds(
2651 log_header->message()
2652 .oldest_local_reliable_monotonic_timestamps()
2653 ->Get(0)));
2654 const monotonic_clock::time_point
2655 oldest_logger_remote_unreliable_monotonic_timestamps =
2656 monotonic_clock::time_point(chrono::nanoseconds(
2657 log_header->message()
2658 .oldest_logger_remote_unreliable_monotonic_timestamps()
2659 ->Get(1)));
2660 const monotonic_clock::time_point
2661 oldest_logger_local_unreliable_monotonic_timestamps =
2662 monotonic_clock::time_point(chrono::nanoseconds(
2663 log_header->message()
2664 .oldest_logger_local_unreliable_monotonic_timestamps()
2665 ->Get(1)));
2666
2667 const Channel *channel =
2668 event_loop_factory_.configuration()->channels()->Get(
2669 msg->message().channel_index());
2670 const Connection *connection = configuration::ConnectionToNode(
2671 channel, configuration::GetNode(
2672 event_loop_factory_.configuration(),
2673 log_header->message().node()->name()->string_view()));
2674
2675 const bool reliable = connection->time_to_live() == 0;
2676
2677 SCOPED_TRACE(file);
2678 SCOPED_TRACE(aos::FlatbufferToJson(
2679 *log_header, {.multi_line = true, .max_vector_size = 100}));
2680
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002681 // Confirm that the oldest timestamps match what we expect. Based on
2682 // what we are doing, we know that the oldest time is the first
2683 // message's time.
2684 //
2685 // This makes the test robust to both the split and combined config
2686 // tests.
2687 switch (log_header->message().parts_index()) {
2688 case 0:
2689 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2690 expected_oldest_remote_monotonic_timestamps);
2691 EXPECT_EQ(oldest_local_monotonic_timestamps,
2692 expected_oldest_local_monotonic_timestamps);
2693 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2694 expected_oldest_local_monotonic_timestamps)
2695 << file;
2696 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2697 expected_oldest_timestamp_monotonic_timestamps)
2698 << file;
2699
2700 if (reliable) {
2701 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
Naman Guptaa63aa132023-03-22 20:06:34 -07002702 expected_oldest_remote_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002703 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
Naman Guptaa63aa132023-03-22 20:06:34 -07002704 expected_oldest_local_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002705 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2706 monotonic_clock::max_time);
2707 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2708 monotonic_clock::max_time);
2709 } else {
2710 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2711 monotonic_clock::max_time);
2712 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2713 monotonic_clock::max_time);
Naman Guptaa63aa132023-03-22 20:06:34 -07002714 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2715 expected_oldest_remote_monotonic_timestamps);
2716 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2717 expected_oldest_local_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002718 }
2719 break;
2720 case 1:
2721 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2722 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2723 EXPECT_EQ(oldest_local_monotonic_timestamps,
2724 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2725 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2726 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2727 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2728 monotonic_clock::epoch() + chrono::nanoseconds(90250000));
2729 if (reliable) {
2730 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2731 expected_oldest_remote_monotonic_timestamps);
2732 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2733 expected_oldest_local_monotonic_timestamps);
2734 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2735 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2736 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2737 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2738 } else {
2739 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2740 monotonic_clock::max_time);
2741 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2742 monotonic_clock::max_time);
2743 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2744 expected_oldest_remote_monotonic_timestamps);
2745 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2746 expected_oldest_local_monotonic_timestamps);
2747 }
2748 break;
2749 case 2:
2750 EXPECT_EQ(
2751 oldest_remote_monotonic_timestamps,
2752 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2753 EXPECT_EQ(oldest_local_monotonic_timestamps,
2754 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2755 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2756 expected_oldest_local_monotonic_timestamps)
2757 << file;
2758 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2759 expected_oldest_timestamp_monotonic_timestamps)
2760 << file;
2761 if (reliable) {
2762 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2763 expected_oldest_remote_monotonic_timestamps);
2764 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2765 expected_oldest_local_monotonic_timestamps);
2766 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2767 monotonic_clock::max_time);
2768 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2769 monotonic_clock::max_time);
2770 } else {
2771 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2772 monotonic_clock::max_time);
2773 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2774 monotonic_clock::max_time);
2775 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2776 expected_oldest_remote_monotonic_timestamps);
2777 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2778 expected_oldest_local_monotonic_timestamps);
2779 }
2780 break;
Naman Guptaa63aa132023-03-22 20:06:34 -07002781
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002782 case 3:
2783 EXPECT_EQ(
2784 oldest_remote_monotonic_timestamps,
2785 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2786 EXPECT_EQ(oldest_local_monotonic_timestamps,
2787 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2788 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2789 expected_oldest_remote_monotonic_timestamps);
2790 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2791 expected_oldest_local_monotonic_timestamps);
2792 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2793 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2794 EXPECT_EQ(
2795 oldest_logger_local_unreliable_monotonic_timestamps,
2796 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2797 break;
2798 default:
2799 FAIL();
2800 break;
Naman Guptaa63aa132023-03-22 20:06:34 -07002801 }
2802
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002803 switch (log_header->message().parts_index()) {
2804 case 0:
2805 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2806 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2807 break;
2808 case 1:
2809 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2810 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2811 break;
2812 case 2:
2813 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2814 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2815 break;
2816 case 3:
2817 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2818 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2819 break;
2820 [[fallthrough]];
2821 default:
2822 FAIL();
2823 break;
2824 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002825 continue;
2826 }
2827 EXPECT_EQ(
2828 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2829 monotonic_clock::max_time.time_since_epoch().count());
2830 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2831 monotonic_clock::max_time.time_since_epoch().count());
2832 EXPECT_EQ(log_header->message()
2833 .oldest_remote_unreliable_monotonic_timestamps()
2834 ->Get(0),
2835 monotonic_clock::max_time.time_since_epoch().count());
2836 EXPECT_EQ(log_header->message()
2837 .oldest_local_unreliable_monotonic_timestamps()
2838 ->Get(0),
2839 monotonic_clock::max_time.time_since_epoch().count());
2840
2841 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2842 monotonic_clock::time_point(chrono::nanoseconds(
2843 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2844 1)));
2845 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2846 monotonic_clock::time_point(chrono::nanoseconds(
2847 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2848 const monotonic_clock::time_point
2849 oldest_remote_unreliable_monotonic_timestamps =
2850 monotonic_clock::time_point(chrono::nanoseconds(
2851 log_header->message()
2852 .oldest_remote_unreliable_monotonic_timestamps()
2853 ->Get(1)));
2854 const monotonic_clock::time_point
2855 oldest_local_unreliable_monotonic_timestamps =
2856 monotonic_clock::time_point(chrono::nanoseconds(
2857 log_header->message()
2858 .oldest_local_unreliable_monotonic_timestamps()
2859 ->Get(1)));
2860 switch (log_header->message().parts_index()) {
2861 case 0:
2862 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2863 monotonic_clock::max_time);
2864 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2865 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2866 monotonic_clock::max_time);
2867 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2868 monotonic_clock::max_time);
2869 break;
2870 default:
2871 FAIL();
2872 break;
2873 }
2874 }
2875
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002876 EXPECT_EQ(timestamp_file_count, 4u);
Naman Guptaa63aa132023-03-22 20:06:34 -07002877
2878 // Confirm that we can actually sort the resulting log and read it.
2879 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002880 auto sorted_parts = SortParts(filenames);
2881 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2882 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002883
2884 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2885 log_reader_factory.set_send_delay(chrono::microseconds(0));
2886
2887 // This sends out the fetched messages and advances time to the start of
2888 // the log file.
2889 reader.Register(&log_reader_factory);
2890
2891 log_reader_factory.Run();
2892
2893 reader.Deregister();
2894 }
2895}
2896
2897// Tests that we properly handle one direction of message_bridge being
2898// unavailable.
2899TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07002900 std::vector<std::string> actual_filenames;
2901
Naman Guptaa63aa132023-03-22 20:06:34 -07002902 pi1_->Disconnect(pi2_->node());
2903 time_converter_.AddMonotonic(
2904 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2905
2906 time_converter_.AddMonotonic(
2907 {chrono::milliseconds(10000),
2908 chrono::milliseconds(10000) - chrono::milliseconds(1)});
2909 {
2910 LoggerState pi1_logger = MakeLogger(pi1_);
2911
2912 event_loop_factory_.RunFor(chrono::milliseconds(95));
2913
2914 StartLogger(&pi1_logger);
2915
2916 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002917 pi1_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07002918 }
2919
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002920 // Confirm that we can parse the result. LogReader has enough internal
2921 // CHECKs to confirm the right thing happened.
Austin Schuh8fb4b452023-08-04 17:02:27 -07002922 ConfirmReadable(actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07002923}
2924
2925// Tests that we properly handle one direction of message_bridge being
2926// unavailable.
2927TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
2928 pi1_->Disconnect(pi2_->node());
2929 time_converter_.AddMonotonic(
2930 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(500)});
2931
2932 time_converter_.AddMonotonic(
2933 {chrono::milliseconds(10000),
2934 chrono::milliseconds(10000) + chrono::milliseconds(1)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07002935
2936 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07002937 {
2938 LoggerState pi1_logger = MakeLogger(pi1_);
2939
2940 event_loop_factory_.RunFor(chrono::milliseconds(95));
2941
2942 StartLogger(&pi1_logger);
2943
2944 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002945 pi1_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07002946 }
2947
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002948 // Confirm that we can parse the result. LogReader has enough internal
2949 // CHECKs to confirm the right thing happened.
Austin Schuh8fb4b452023-08-04 17:02:27 -07002950 ConfirmReadable(filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07002951}
2952
2953// Tests that we explode if someone passes in a part file twice with a better
2954// error than an out of order error.
2955TEST_P(MultinodeLoggerTest, DuplicateLogFiles) {
2956 time_converter_.AddMonotonic(
2957 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07002958
2959 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07002960 {
2961 LoggerState pi1_logger = MakeLogger(pi1_);
2962
2963 event_loop_factory_.RunFor(chrono::milliseconds(95));
2964
2965 StartLogger(&pi1_logger);
2966
2967 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002968
2969 pi1_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07002970 }
2971
2972 std::vector<std::string> duplicates;
Austin Schuh8fb4b452023-08-04 17:02:27 -07002973 for (const std::string &f : filenames) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002974 duplicates.emplace_back(f);
2975 duplicates.emplace_back(f);
2976 }
2977 EXPECT_DEATH({ SortParts(duplicates); }, "Found duplicate parts in");
2978}
2979
2980// Tests that we explode if someone loses a part out of the middle of a log.
2981TEST_P(MultinodeLoggerTest, MissingPartsFromMiddle) {
Austin Schuh6ecfe902023-08-04 22:44:37 -07002982 if (file_strategy() == FileStrategy::kCombine) {
2983 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
2984 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002985 time_converter_.AddMonotonic(
2986 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2987 {
2988 LoggerState pi1_logger = MakeLogger(pi1_);
2989
2990 event_loop_factory_.RunFor(chrono::milliseconds(95));
2991
2992 StartLogger(&pi1_logger);
2993 aos::monotonic_clock::time_point last_rotation_time =
2994 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07002995 pi1_logger.logger->set_on_logged_period(
2996 [&](aos::monotonic_clock::time_point) {
2997 const auto now = pi1_logger.event_loop->monotonic_now();
2998 if (now > last_rotation_time + std::chrono::seconds(5)) {
2999 pi1_logger.logger->Rotate();
3000 last_rotation_time = now;
3001 }
3002 });
Naman Guptaa63aa132023-03-22 20:06:34 -07003003
3004 event_loop_factory_.RunFor(chrono::milliseconds(10000));
3005 }
3006
3007 std::vector<std::string> missing_parts;
3008
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07003009 missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part0" +
3010 Extension());
3011 missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part2" +
3012 Extension());
Naman Guptaa63aa132023-03-22 20:06:34 -07003013 missing_parts.emplace_back(absl::StrCat(
3014 logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
3015
3016 EXPECT_DEATH({ SortParts(missing_parts); },
3017 "Broken log, missing part files between");
3018}
3019
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003020// Tests that we properly handle a dead node. Do this by just disconnecting
3021// it and only using one nodes of logs.
Naman Guptaa63aa132023-03-22 20:06:34 -07003022TEST_P(MultinodeLoggerTest, DeadNode) {
3023 pi1_->Disconnect(pi2_->node());
3024 pi2_->Disconnect(pi1_->node());
3025 time_converter_.AddMonotonic(
3026 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
3027 {
3028 LoggerState pi1_logger = MakeLogger(pi1_);
3029
3030 event_loop_factory_.RunFor(chrono::milliseconds(95));
3031
3032 StartLogger(&pi1_logger);
3033
3034 event_loop_factory_.RunFor(chrono::milliseconds(10000));
3035 }
3036
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003037 // Confirm that we can parse the result. LogReader has enough internal
3038 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003039 ConfirmReadable(MakePi1DeadNodeLogfiles());
3040}
3041
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003042// Tests that we can relog with a different config. This makes most sense
3043// when you are trying to edit a log and want to use channel renaming + the
3044// original config in the new log.
Naman Guptaa63aa132023-03-22 20:06:34 -07003045TEST_P(MultinodeLoggerTest, LogDifferentConfig) {
3046 time_converter_.StartEqual();
3047 {
3048 LoggerState pi1_logger = MakeLogger(pi1_);
3049 LoggerState pi2_logger = MakeLogger(pi2_);
3050
3051 event_loop_factory_.RunFor(chrono::milliseconds(95));
3052
3053 StartLogger(&pi1_logger);
3054 StartLogger(&pi2_logger);
3055
3056 event_loop_factory_.RunFor(chrono::milliseconds(20000));
3057 }
3058
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003059 auto sorted_parts = SortParts(logfiles_);
3060 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3061 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07003062 reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
3063
3064 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
3065 log_reader_factory.set_send_delay(chrono::microseconds(0));
3066
3067 // This sends out the fetched messages and advances time to the start of the
3068 // log file.
3069 reader.Register(&log_reader_factory);
3070
3071 const Node *pi1 =
3072 configuration::GetNode(log_reader_factory.configuration(), "pi1");
3073 const Node *pi2 =
3074 configuration::GetNode(log_reader_factory.configuration(), "pi2");
3075
3076 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
3077 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
3078 LOG(INFO) << "now pi1 "
3079 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
3080 LOG(INFO) << "now pi2 "
3081 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
3082
3083 EXPECT_THAT(reader.LoggedNodes(),
3084 ::testing::ElementsAre(
3085 configuration::GetNode(reader.logged_configuration(), pi1),
3086 configuration::GetNode(reader.logged_configuration(), pi2)));
3087
3088 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
3089
3090 // And confirm we can re-create a log again, while checking the contents.
3091 std::vector<std::string> log_files;
3092 {
3093 LoggerState pi1_logger =
3094 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
3095 &log_reader_factory, reader.logged_configuration());
3096 LoggerState pi2_logger =
3097 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
3098 &log_reader_factory, reader.logged_configuration());
3099
Austin Schuh7e417682023-08-11 17:05:30 -07003100 pi1_logger.StartLogger(tmp_dir_ + "/logs/relogged1");
3101 pi2_logger.StartLogger(tmp_dir_ + "/logs/relogged2");
Naman Guptaa63aa132023-03-22 20:06:34 -07003102
3103 log_reader_factory.Run();
3104
3105 for (auto &x : pi1_logger.log_namer->all_filenames()) {
Austin Schuh7e417682023-08-11 17:05:30 -07003106 log_files.emplace_back(absl::StrCat(tmp_dir_, "/logs/relogged1_", x));
Naman Guptaa63aa132023-03-22 20:06:34 -07003107 }
3108 for (auto &x : pi2_logger.log_namer->all_filenames()) {
Austin Schuh7e417682023-08-11 17:05:30 -07003109 log_files.emplace_back(absl::StrCat(tmp_dir_, "/logs/relogged2_", x));
Naman Guptaa63aa132023-03-22 20:06:34 -07003110 }
3111 }
3112
3113 reader.Deregister();
3114
3115 // And verify that we can run the LogReader over the relogged files without
3116 // hitting any fatal errors.
3117 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003118 auto sorted_parts = SortParts(log_files);
3119 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3120 LogReader relogged_reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07003121 relogged_reader.Register();
3122
3123 relogged_reader.event_loop_factory()->Run();
3124 }
3125}
3126
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003127// Tests that we properly replay a log where the start time for a node is
3128// before any data on the node. This can happen if the logger starts before
3129// data is published. While the scenario below is a bit convoluted, we have
3130// seen logs like this generated out in the wild.
Naman Guptaa63aa132023-03-22 20:06:34 -07003131TEST(MultinodeRebootLoggerTest, StartTimeBeforeData) {
Austin Schuh7e417682023-08-11 17:05:30 -07003132 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3133 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3134
Naman Guptaa63aa132023-03-22 20:06:34 -07003135 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3136 aos::configuration::ReadConfig(ArtifactPath(
3137 "aos/events/logging/multinode_pingpong_split3_config.json"));
3138 message_bridge::TestingTimeConverter time_converter(
3139 configuration::NodesCount(&config.message()));
3140 SimulatedEventLoopFactory event_loop_factory(&config.message());
3141 event_loop_factory.SetTimeConverter(&time_converter);
3142 NodeEventLoopFactory *const pi1 =
3143 event_loop_factory.GetNodeEventLoopFactory("pi1");
3144 const size_t pi1_index = configuration::GetNodeIndex(
3145 event_loop_factory.configuration(), pi1->node());
3146 NodeEventLoopFactory *const pi2 =
3147 event_loop_factory.GetNodeEventLoopFactory("pi2");
3148 const size_t pi2_index = configuration::GetNodeIndex(
3149 event_loop_factory.configuration(), pi2->node());
3150 NodeEventLoopFactory *const pi3 =
3151 event_loop_factory.GetNodeEventLoopFactory("pi3");
3152 const size_t pi3_index = configuration::GetNodeIndex(
3153 event_loop_factory.configuration(), pi3->node());
3154
3155 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003156 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003157 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003158 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003159 const std::string kLogfile2_2 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003160 aos::testing::TestTmpDir() + "/logs/multi_logfile2.2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003161 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003162 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
3163
Naman Guptaa63aa132023-03-22 20:06:34 -07003164 const UUID pi1_boot0 = UUID::Random();
3165 const UUID pi2_boot0 = UUID::Random();
3166 const UUID pi2_boot1 = UUID::Random();
3167 const UUID pi3_boot0 = UUID::Random();
3168 {
3169 CHECK_EQ(pi1_index, 0u);
3170 CHECK_EQ(pi2_index, 1u);
3171 CHECK_EQ(pi3_index, 2u);
3172
3173 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3174 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3175 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3176 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3177
3178 time_converter.AddNextTimestamp(
3179 distributed_clock::epoch(),
3180 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3181 BootTimestamp::epoch()});
3182 const chrono::nanoseconds reboot_time = chrono::milliseconds(20000);
3183 time_converter.AddNextTimestamp(
3184 distributed_clock::epoch() + reboot_time,
3185 {BootTimestamp::epoch() + reboot_time,
3186 BootTimestamp{
3187 .boot = 1,
3188 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)},
3189 BootTimestamp::epoch() + reboot_time});
3190 }
3191
3192 // Make everything perfectly quiet.
3193 event_loop_factory.SkipTimingReport();
3194 event_loop_factory.DisableStatistics();
3195
3196 std::vector<std::string> filenames;
3197 {
3198 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003199 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3200 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003201 LoggerState pi3_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003202 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3203 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003204 {
3205 // And now start the logger.
3206 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003207 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3208 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003209
3210 event_loop_factory.RunFor(chrono::milliseconds(1000));
3211
3212 pi1_logger.StartLogger(kLogfile1_1);
3213 pi3_logger.StartLogger(kLogfile3_1);
3214 pi2_logger.StartLogger(kLogfile2_1);
3215
3216 event_loop_factory.RunFor(chrono::milliseconds(10000));
3217
3218 // Now that we've got a start time in the past, turn on data.
3219 event_loop_factory.EnableStatistics();
3220 std::unique_ptr<aos::EventLoop> ping_event_loop =
3221 pi1->MakeEventLoop("ping");
3222 Ping ping(ping_event_loop.get());
3223
3224 pi2->AlwaysStart<Pong>("pong");
3225
3226 event_loop_factory.RunFor(chrono::milliseconds(3000));
3227
3228 pi2_logger.AppendAllFilenames(&filenames);
3229
3230 // Stop logging on pi2 before rebooting and completely shut off all
3231 // messages on pi2.
3232 pi2->DisableStatistics();
3233 pi1->Disconnect(pi2->node());
3234 pi2->Disconnect(pi1->node());
3235 }
3236 event_loop_factory.RunFor(chrono::milliseconds(7000));
3237 // pi2 now reboots.
3238 {
3239 event_loop_factory.RunFor(chrono::milliseconds(1000));
3240
3241 // Start logging again on pi2 after it is up.
3242 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003243 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3244 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003245 pi2_logger.StartLogger(kLogfile2_2);
3246
3247 event_loop_factory.RunFor(chrono::milliseconds(10000));
3248 // And, now that we have a start time in the log, turn data back on.
3249 pi2->EnableStatistics();
3250 pi1->Connect(pi2->node());
3251 pi2->Connect(pi1->node());
3252
3253 pi2->AlwaysStart<Pong>("pong");
3254 std::unique_ptr<aos::EventLoop> ping_event_loop =
3255 pi1->MakeEventLoop("ping");
3256 Ping ping(ping_event_loop.get());
3257
3258 event_loop_factory.RunFor(chrono::milliseconds(3000));
3259
3260 pi2_logger.AppendAllFilenames(&filenames);
3261 }
3262
3263 pi1_logger.AppendAllFilenames(&filenames);
3264 pi3_logger.AppendAllFilenames(&filenames);
3265 }
3266
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003267 // Confirm that we can parse the result. LogReader has enough internal
3268 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003269 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003270 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003271 auto result = ConfirmReadable(filenames);
3272 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
3273 chrono::seconds(1)));
3274 EXPECT_THAT(result[0].second,
3275 ::testing::ElementsAre(realtime_clock::epoch() +
3276 chrono::microseconds(34990350)));
3277
3278 EXPECT_THAT(result[1].first,
3279 ::testing::ElementsAre(
3280 realtime_clock::epoch() + chrono::seconds(1),
3281 realtime_clock::epoch() + chrono::microseconds(3323000)));
3282 EXPECT_THAT(result[1].second,
3283 ::testing::ElementsAre(
3284 realtime_clock::epoch() + chrono::microseconds(13990200),
3285 realtime_clock::epoch() + chrono::microseconds(16313200)));
3286
3287 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
3288 chrono::seconds(1)));
3289 EXPECT_THAT(result[2].second,
3290 ::testing::ElementsAre(realtime_clock::epoch() +
3291 chrono::microseconds(34900150)));
3292}
3293
3294// Tests that local data before remote data after reboot is properly replayed.
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003295// We only trigger a reboot in the timestamp interpolation function when
3296// solving the timestamp problem when we actually have a point in the
3297// function. This originally only happened when a point passes the noncausal
3298// filter. At the start of time for the second boot, if we aren't careful, we
3299// will have messages which need to be published at times before the boot.
3300// This happens when a local message is in the log before a forwarded message,
3301// so there is no point in the interpolation function. This delays the
3302// reboot. So, we need to recreate that situation and make sure it doesn't
3303// come back.
Naman Guptaa63aa132023-03-22 20:06:34 -07003304TEST(MultinodeRebootLoggerTest,
3305 LocalMessageBeforeRemoteBeforeStartAfterReboot) {
Austin Schuh7e417682023-08-11 17:05:30 -07003306 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3307 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3308
Naman Guptaa63aa132023-03-22 20:06:34 -07003309 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3310 aos::configuration::ReadConfig(ArtifactPath(
3311 "aos/events/logging/multinode_pingpong_split3_config.json"));
3312 message_bridge::TestingTimeConverter time_converter(
3313 configuration::NodesCount(&config.message()));
3314 SimulatedEventLoopFactory event_loop_factory(&config.message());
3315 event_loop_factory.SetTimeConverter(&time_converter);
3316 NodeEventLoopFactory *const pi1 =
3317 event_loop_factory.GetNodeEventLoopFactory("pi1");
3318 const size_t pi1_index = configuration::GetNodeIndex(
3319 event_loop_factory.configuration(), pi1->node());
3320 NodeEventLoopFactory *const pi2 =
3321 event_loop_factory.GetNodeEventLoopFactory("pi2");
3322 const size_t pi2_index = configuration::GetNodeIndex(
3323 event_loop_factory.configuration(), pi2->node());
3324 NodeEventLoopFactory *const pi3 =
3325 event_loop_factory.GetNodeEventLoopFactory("pi3");
3326 const size_t pi3_index = configuration::GetNodeIndex(
3327 event_loop_factory.configuration(), pi3->node());
3328
3329 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003330 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003331 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003332 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003333 const std::string kLogfile2_2 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003334 aos::testing::TestTmpDir() + "/logs/multi_logfile2.2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003335 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003336 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003337 const UUID pi1_boot0 = UUID::Random();
3338 const UUID pi2_boot0 = UUID::Random();
3339 const UUID pi2_boot1 = UUID::Random();
3340 const UUID pi3_boot0 = UUID::Random();
3341 {
3342 CHECK_EQ(pi1_index, 0u);
3343 CHECK_EQ(pi2_index, 1u);
3344 CHECK_EQ(pi3_index, 2u);
3345
3346 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3347 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3348 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3349 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3350
3351 time_converter.AddNextTimestamp(
3352 distributed_clock::epoch(),
3353 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3354 BootTimestamp::epoch()});
3355 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3356 time_converter.AddNextTimestamp(
3357 distributed_clock::epoch() + reboot_time,
3358 {BootTimestamp::epoch() + reboot_time,
3359 BootTimestamp{.boot = 1,
3360 .time = monotonic_clock::epoch() + reboot_time +
3361 chrono::seconds(100)},
3362 BootTimestamp::epoch() + reboot_time});
3363 }
3364
3365 std::vector<std::string> filenames;
3366 {
3367 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003368 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3369 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003370 LoggerState pi3_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003371 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3372 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003373 {
3374 // And now start the logger.
3375 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003376 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3377 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003378
3379 pi1_logger.StartLogger(kLogfile1_1);
3380 pi3_logger.StartLogger(kLogfile3_1);
3381 pi2_logger.StartLogger(kLogfile2_1);
3382
3383 event_loop_factory.RunFor(chrono::milliseconds(1005));
3384
3385 // Now that we've got a start time in the past, turn on data.
3386 std::unique_ptr<aos::EventLoop> ping_event_loop =
3387 pi1->MakeEventLoop("ping");
3388 Ping ping(ping_event_loop.get());
3389
3390 pi2->AlwaysStart<Pong>("pong");
3391
3392 event_loop_factory.RunFor(chrono::milliseconds(3000));
3393
3394 pi2_logger.AppendAllFilenames(&filenames);
3395
3396 // Disable any remote messages on pi2.
3397 pi1->Disconnect(pi2->node());
3398 pi2->Disconnect(pi1->node());
3399 }
3400 event_loop_factory.RunFor(chrono::milliseconds(995));
3401 // pi2 now reboots at 5 seconds.
3402 {
3403 event_loop_factory.RunFor(chrono::milliseconds(1000));
3404
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003405 // Make local stuff happen before we start logging and connect the
3406 // remote.
Naman Guptaa63aa132023-03-22 20:06:34 -07003407 pi2->AlwaysStart<Pong>("pong");
3408 std::unique_ptr<aos::EventLoop> ping_event_loop =
3409 pi1->MakeEventLoop("ping");
3410 Ping ping(ping_event_loop.get());
3411 event_loop_factory.RunFor(chrono::milliseconds(1005));
3412
3413 // Start logging again on pi2 after it is up.
3414 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003415 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3416 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003417 pi2_logger.StartLogger(kLogfile2_2);
3418
3419 // And allow remote messages now that we have some local ones.
3420 pi1->Connect(pi2->node());
3421 pi2->Connect(pi1->node());
3422
3423 event_loop_factory.RunFor(chrono::milliseconds(1000));
3424
3425 event_loop_factory.RunFor(chrono::milliseconds(3000));
3426
3427 pi2_logger.AppendAllFilenames(&filenames);
3428 }
3429
3430 pi1_logger.AppendAllFilenames(&filenames);
3431 pi3_logger.AppendAllFilenames(&filenames);
3432 }
3433
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003434 // Confirm that we can parse the result. LogReader has enough internal
3435 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003436 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003437 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003438 auto result = ConfirmReadable(filenames);
3439
3440 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3441 EXPECT_THAT(result[0].second,
3442 ::testing::ElementsAre(realtime_clock::epoch() +
3443 chrono::microseconds(11000350)));
3444
3445 EXPECT_THAT(result[1].first,
3446 ::testing::ElementsAre(
3447 realtime_clock::epoch(),
3448 realtime_clock::epoch() + chrono::microseconds(107005000)));
3449 EXPECT_THAT(result[1].second,
3450 ::testing::ElementsAre(
3451 realtime_clock::epoch() + chrono::microseconds(4000150),
3452 realtime_clock::epoch() + chrono::microseconds(111000200)));
3453
3454 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3455 EXPECT_THAT(result[2].second,
3456 ::testing::ElementsAre(realtime_clock::epoch() +
3457 chrono::microseconds(11000150)));
3458
3459 auto start_stop_result = ConfirmReadable(
3460 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3461 realtime_clock::epoch() + chrono::milliseconds(3000));
3462
3463 EXPECT_THAT(
3464 start_stop_result[0].first,
3465 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3466 EXPECT_THAT(
3467 start_stop_result[0].second,
3468 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3469 EXPECT_THAT(
3470 start_stop_result[1].first,
3471 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3472 EXPECT_THAT(
3473 start_stop_result[1].second,
3474 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3475 EXPECT_THAT(
3476 start_stop_result[2].first,
3477 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3478 EXPECT_THAT(
3479 start_stop_result[2].second,
3480 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3481}
3482
3483// Tests that setting the start and stop flags across a reboot works as
3484// expected.
3485TEST(MultinodeRebootLoggerTest, RebootStartStopTimes) {
Austin Schuh7e417682023-08-11 17:05:30 -07003486 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3487 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3488
Naman Guptaa63aa132023-03-22 20:06:34 -07003489 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3490 aos::configuration::ReadConfig(ArtifactPath(
3491 "aos/events/logging/multinode_pingpong_split3_config.json"));
3492 message_bridge::TestingTimeConverter time_converter(
3493 configuration::NodesCount(&config.message()));
3494 SimulatedEventLoopFactory event_loop_factory(&config.message());
3495 event_loop_factory.SetTimeConverter(&time_converter);
3496 NodeEventLoopFactory *const pi1 =
3497 event_loop_factory.GetNodeEventLoopFactory("pi1");
3498 const size_t pi1_index = configuration::GetNodeIndex(
3499 event_loop_factory.configuration(), pi1->node());
3500 NodeEventLoopFactory *const pi2 =
3501 event_loop_factory.GetNodeEventLoopFactory("pi2");
3502 const size_t pi2_index = configuration::GetNodeIndex(
3503 event_loop_factory.configuration(), pi2->node());
3504 NodeEventLoopFactory *const pi3 =
3505 event_loop_factory.GetNodeEventLoopFactory("pi3");
3506 const size_t pi3_index = configuration::GetNodeIndex(
3507 event_loop_factory.configuration(), pi3->node());
3508
3509 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003510 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003511 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003512 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003513 const std::string kLogfile2_2 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003514 aos::testing::TestTmpDir() + "/logs/multi_logfile2.2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003515 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003516 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003517 {
3518 CHECK_EQ(pi1_index, 0u);
3519 CHECK_EQ(pi2_index, 1u);
3520 CHECK_EQ(pi3_index, 2u);
3521
3522 time_converter.AddNextTimestamp(
3523 distributed_clock::epoch(),
3524 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3525 BootTimestamp::epoch()});
3526 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3527 time_converter.AddNextTimestamp(
3528 distributed_clock::epoch() + reboot_time,
3529 {BootTimestamp::epoch() + reboot_time,
3530 BootTimestamp{.boot = 1,
3531 .time = monotonic_clock::epoch() + reboot_time},
3532 BootTimestamp::epoch() + reboot_time});
3533 }
3534
3535 std::vector<std::string> filenames;
3536 {
3537 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003538 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3539 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003540 LoggerState pi3_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003541 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3542 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003543 {
3544 // And now start the logger.
3545 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003546 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3547 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003548
3549 pi1_logger.StartLogger(kLogfile1_1);
3550 pi3_logger.StartLogger(kLogfile3_1);
3551 pi2_logger.StartLogger(kLogfile2_1);
3552
3553 event_loop_factory.RunFor(chrono::milliseconds(1005));
3554
3555 // Now that we've got a start time in the past, turn on data.
3556 std::unique_ptr<aos::EventLoop> ping_event_loop =
3557 pi1->MakeEventLoop("ping");
3558 Ping ping(ping_event_loop.get());
3559
3560 pi2->AlwaysStart<Pong>("pong");
3561
3562 event_loop_factory.RunFor(chrono::milliseconds(3000));
3563
3564 pi2_logger.AppendAllFilenames(&filenames);
3565 }
3566 event_loop_factory.RunFor(chrono::milliseconds(995));
3567 // pi2 now reboots at 5 seconds.
3568 {
3569 event_loop_factory.RunFor(chrono::milliseconds(1000));
3570
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003571 // Make local stuff happen before we start logging and connect the
3572 // remote.
Naman Guptaa63aa132023-03-22 20:06:34 -07003573 pi2->AlwaysStart<Pong>("pong");
3574 std::unique_ptr<aos::EventLoop> ping_event_loop =
3575 pi1->MakeEventLoop("ping");
3576 Ping ping(ping_event_loop.get());
3577 event_loop_factory.RunFor(chrono::milliseconds(5));
3578
3579 // Start logging again on pi2 after it is up.
3580 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003581 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3582 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003583 pi2_logger.StartLogger(kLogfile2_2);
3584
3585 event_loop_factory.RunFor(chrono::milliseconds(5000));
3586
3587 pi2_logger.AppendAllFilenames(&filenames);
3588 }
3589
3590 pi1_logger.AppendAllFilenames(&filenames);
3591 pi3_logger.AppendAllFilenames(&filenames);
3592 }
3593
3594 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003595 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003596 auto result = ConfirmReadable(filenames);
3597
3598 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3599 EXPECT_THAT(result[0].second,
3600 ::testing::ElementsAre(realtime_clock::epoch() +
3601 chrono::microseconds(11000350)));
3602
3603 EXPECT_THAT(result[1].first,
3604 ::testing::ElementsAre(
3605 realtime_clock::epoch(),
3606 realtime_clock::epoch() + chrono::microseconds(6005000)));
3607 EXPECT_THAT(result[1].second,
3608 ::testing::ElementsAre(
3609 realtime_clock::epoch() + chrono::microseconds(4900150),
3610 realtime_clock::epoch() + chrono::microseconds(11000200)));
3611
3612 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3613 EXPECT_THAT(result[2].second,
3614 ::testing::ElementsAre(realtime_clock::epoch() +
3615 chrono::microseconds(11000150)));
3616
3617 // Confirm we observed the correct start and stop times. We should see the
3618 // reboot here.
3619 auto start_stop_result = ConfirmReadable(
3620 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3621 realtime_clock::epoch() + chrono::milliseconds(8000));
3622
3623 EXPECT_THAT(
3624 start_stop_result[0].first,
3625 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3626 EXPECT_THAT(
3627 start_stop_result[0].second,
3628 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3629 EXPECT_THAT(start_stop_result[1].first,
3630 ::testing::ElementsAre(
3631 realtime_clock::epoch() + chrono::seconds(2),
3632 realtime_clock::epoch() + chrono::microseconds(6005000)));
3633 EXPECT_THAT(start_stop_result[1].second,
3634 ::testing::ElementsAre(
3635 realtime_clock::epoch() + chrono::microseconds(4900150),
3636 realtime_clock::epoch() + chrono::seconds(8)));
3637 EXPECT_THAT(
3638 start_stop_result[2].first,
3639 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3640 EXPECT_THAT(
3641 start_stop_result[2].second,
3642 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3643}
3644
3645// Tests that we properly handle one direction being down.
3646TEST(MissingDirectionTest, OneDirection) {
Austin Schuh7e417682023-08-11 17:05:30 -07003647 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3648 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3649
Naman Guptaa63aa132023-03-22 20:06:34 -07003650 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3651 aos::configuration::ReadConfig(ArtifactPath(
3652 "aos/events/logging/multinode_pingpong_split4_config.json"));
3653 message_bridge::TestingTimeConverter time_converter(
3654 configuration::NodesCount(&config.message()));
3655 SimulatedEventLoopFactory event_loop_factory(&config.message());
3656 event_loop_factory.SetTimeConverter(&time_converter);
3657
3658 NodeEventLoopFactory *const pi1 =
3659 event_loop_factory.GetNodeEventLoopFactory("pi1");
3660 const size_t pi1_index = configuration::GetNodeIndex(
3661 event_loop_factory.configuration(), pi1->node());
3662 NodeEventLoopFactory *const pi2 =
3663 event_loop_factory.GetNodeEventLoopFactory("pi2");
3664 const size_t pi2_index = configuration::GetNodeIndex(
3665 event_loop_factory.configuration(), pi2->node());
3666 std::vector<std::string> filenames;
3667
3668 {
3669 CHECK_EQ(pi1_index, 0u);
3670 CHECK_EQ(pi2_index, 1u);
3671
3672 time_converter.AddNextTimestamp(
3673 distributed_clock::epoch(),
3674 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3675
3676 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3677 time_converter.AddNextTimestamp(
3678 distributed_clock::epoch() + reboot_time,
3679 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3680 BootTimestamp::epoch() + reboot_time});
3681 }
3682
3683 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003684 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003685 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003686 aos::testing::TestTmpDir() + "/logs/multi_logfile1.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003687
3688 pi2->Disconnect(pi1->node());
3689
3690 pi1->AlwaysStart<Ping>("ping");
3691 pi2->AlwaysStart<Pong>("pong");
3692
3693 {
3694 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003695 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3696 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003697
3698 event_loop_factory.RunFor(chrono::milliseconds(95));
3699
3700 pi2_logger.StartLogger(kLogfile2_1);
3701
3702 event_loop_factory.RunFor(chrono::milliseconds(6000));
3703
3704 pi2->Connect(pi1->node());
3705
3706 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003707 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3708 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003709 pi1_logger.StartLogger(kLogfile1_1);
3710
3711 event_loop_factory.RunFor(chrono::milliseconds(5000));
3712 pi1_logger.AppendAllFilenames(&filenames);
3713 pi2_logger.AppendAllFilenames(&filenames);
3714 }
3715
3716 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003717 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003718 ConfirmReadable(filenames);
3719}
3720
3721// Tests that we properly handle only one direction ever existing after a
3722// reboot.
3723TEST(MissingDirectionTest, OneDirectionAfterReboot) {
Austin Schuh7e417682023-08-11 17:05:30 -07003724 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3725 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3726
Naman Guptaa63aa132023-03-22 20:06:34 -07003727 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3728 aos::configuration::ReadConfig(ArtifactPath(
3729 "aos/events/logging/multinode_pingpong_split4_config.json"));
3730 message_bridge::TestingTimeConverter time_converter(
3731 configuration::NodesCount(&config.message()));
3732 SimulatedEventLoopFactory event_loop_factory(&config.message());
3733 event_loop_factory.SetTimeConverter(&time_converter);
3734
3735 NodeEventLoopFactory *const pi1 =
3736 event_loop_factory.GetNodeEventLoopFactory("pi1");
3737 const size_t pi1_index = configuration::GetNodeIndex(
3738 event_loop_factory.configuration(), pi1->node());
3739 NodeEventLoopFactory *const pi2 =
3740 event_loop_factory.GetNodeEventLoopFactory("pi2");
3741 const size_t pi2_index = configuration::GetNodeIndex(
3742 event_loop_factory.configuration(), pi2->node());
3743 std::vector<std::string> filenames;
3744
3745 {
3746 CHECK_EQ(pi1_index, 0u);
3747 CHECK_EQ(pi2_index, 1u);
3748
3749 time_converter.AddNextTimestamp(
3750 distributed_clock::epoch(),
3751 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3752
3753 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3754 time_converter.AddNextTimestamp(
3755 distributed_clock::epoch() + reboot_time,
3756 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3757 BootTimestamp::epoch() + reboot_time});
3758 }
3759
3760 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003761 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003762
3763 pi1->AlwaysStart<Ping>("ping");
3764
3765 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3766 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3767 // second boot.
3768 {
3769 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003770 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3771 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003772
3773 event_loop_factory.RunFor(chrono::milliseconds(95));
3774
3775 pi2_logger.StartLogger(kLogfile2_1);
3776
3777 event_loop_factory.RunFor(chrono::milliseconds(4000));
3778
3779 pi2->Disconnect(pi1->node());
3780
3781 event_loop_factory.RunFor(chrono::milliseconds(1000));
3782 pi1->AlwaysStart<Ping>("ping");
3783
3784 event_loop_factory.RunFor(chrono::milliseconds(5000));
3785 pi2_logger.AppendAllFilenames(&filenames);
3786 }
3787
3788 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003789 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003790 ConfirmReadable(filenames);
3791}
3792
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003793// Tests that we properly handle only one direction ever existing after a
3794// reboot with only reliable data.
Naman Guptaa63aa132023-03-22 20:06:34 -07003795TEST(MissingDirectionTest, OneDirectionAfterRebootReliable) {
Austin Schuh7e417682023-08-11 17:05:30 -07003796 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3797 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3798
Naman Guptaa63aa132023-03-22 20:06:34 -07003799 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003800 aos::configuration::ReadConfig(
3801 ArtifactPath("aos/events/logging/"
3802 "multinode_pingpong_split4_reliable_config.json"));
Naman Guptaa63aa132023-03-22 20:06:34 -07003803 message_bridge::TestingTimeConverter time_converter(
3804 configuration::NodesCount(&config.message()));
3805 SimulatedEventLoopFactory event_loop_factory(&config.message());
3806 event_loop_factory.SetTimeConverter(&time_converter);
3807
3808 NodeEventLoopFactory *const pi1 =
3809 event_loop_factory.GetNodeEventLoopFactory("pi1");
3810 const size_t pi1_index = configuration::GetNodeIndex(
3811 event_loop_factory.configuration(), pi1->node());
3812 NodeEventLoopFactory *const pi2 =
3813 event_loop_factory.GetNodeEventLoopFactory("pi2");
3814 const size_t pi2_index = configuration::GetNodeIndex(
3815 event_loop_factory.configuration(), pi2->node());
3816 std::vector<std::string> filenames;
3817
3818 {
3819 CHECK_EQ(pi1_index, 0u);
3820 CHECK_EQ(pi2_index, 1u);
3821
3822 time_converter.AddNextTimestamp(
3823 distributed_clock::epoch(),
3824 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3825
3826 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3827 time_converter.AddNextTimestamp(
3828 distributed_clock::epoch() + reboot_time,
3829 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3830 BootTimestamp::epoch() + reboot_time});
3831 }
3832
3833 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003834 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003835
3836 pi1->AlwaysStart<Ping>("ping");
3837
3838 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3839 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3840 // second boot.
3841 {
3842 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003843 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3844 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003845
3846 event_loop_factory.RunFor(chrono::milliseconds(95));
3847
3848 pi2_logger.StartLogger(kLogfile2_1);
3849
3850 event_loop_factory.RunFor(chrono::milliseconds(4000));
3851
3852 pi2->Disconnect(pi1->node());
3853
3854 event_loop_factory.RunFor(chrono::milliseconds(1000));
3855 pi1->AlwaysStart<Ping>("ping");
3856
3857 event_loop_factory.RunFor(chrono::milliseconds(5000));
3858 pi2_logger.AppendAllFilenames(&filenames);
3859 }
3860
3861 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003862 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003863 ConfirmReadable(filenames);
3864}
3865
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003866// Tests that we properly handle only one direction ever existing after a
3867// reboot with mixed unreliable vs reliable, where reliable has an earlier
3868// timestamp than unreliable.
Brian Smartte67d7112023-03-20 12:06:30 -07003869TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase1) {
Austin Schuh7e417682023-08-11 17:05:30 -07003870 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3871 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3872
Brian Smartte67d7112023-03-20 12:06:30 -07003873 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3874 aos::configuration::ReadConfig(ArtifactPath(
3875 "aos/events/logging/multinode_pingpong_split4_mixed1_config.json"));
3876 message_bridge::TestingTimeConverter time_converter(
3877 configuration::NodesCount(&config.message()));
3878 SimulatedEventLoopFactory event_loop_factory(&config.message());
3879 event_loop_factory.SetTimeConverter(&time_converter);
3880
3881 NodeEventLoopFactory *const pi1 =
3882 event_loop_factory.GetNodeEventLoopFactory("pi1");
3883 const size_t pi1_index = configuration::GetNodeIndex(
3884 event_loop_factory.configuration(), pi1->node());
3885 NodeEventLoopFactory *const pi2 =
3886 event_loop_factory.GetNodeEventLoopFactory("pi2");
3887 const size_t pi2_index = configuration::GetNodeIndex(
3888 event_loop_factory.configuration(), pi2->node());
3889 std::vector<std::string> filenames;
3890
3891 {
3892 CHECK_EQ(pi1_index, 0u);
3893 CHECK_EQ(pi2_index, 1u);
3894
3895 time_converter.AddNextTimestamp(
3896 distributed_clock::epoch(),
3897 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3898
3899 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3900 time_converter.AddNextTimestamp(
3901 distributed_clock::epoch() + reboot_time,
3902 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3903 BootTimestamp::epoch() + reboot_time});
3904 }
3905
3906 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003907 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Brian Smartte67d7112023-03-20 12:06:30 -07003908
3909 // The following sequence using the above reference config creates
3910 // a reliable message timestamp < unreliable message timestamp.
3911 {
3912 pi1->DisableStatistics();
3913 pi2->DisableStatistics();
3914
3915 event_loop_factory.RunFor(chrono::milliseconds(95));
3916
3917 pi1->AlwaysStart<Ping>("ping");
3918
3919 event_loop_factory.RunFor(chrono::milliseconds(5250));
3920
3921 pi1->EnableStatistics();
3922
3923 event_loop_factory.RunFor(chrono::milliseconds(1000));
3924
3925 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003926 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3927 FileStrategy::kKeepSeparate);
Brian Smartte67d7112023-03-20 12:06:30 -07003928
3929 pi2_logger.StartLogger(kLogfile2_1);
3930
3931 event_loop_factory.RunFor(chrono::milliseconds(5000));
3932 pi2_logger.AppendAllFilenames(&filenames);
3933 }
3934
3935 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003936 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Brian Smartte67d7112023-03-20 12:06:30 -07003937 ConfirmReadable(filenames);
3938}
3939
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003940// Tests that we properly handle only one direction ever existing after a
3941// reboot with mixed unreliable vs reliable, where unreliable has an earlier
3942// timestamp than reliable.
Brian Smartte67d7112023-03-20 12:06:30 -07003943TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase2) {
Austin Schuh7e417682023-08-11 17:05:30 -07003944 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3945 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3946
Brian Smartte67d7112023-03-20 12:06:30 -07003947 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3948 aos::configuration::ReadConfig(ArtifactPath(
3949 "aos/events/logging/multinode_pingpong_split4_mixed2_config.json"));
3950 message_bridge::TestingTimeConverter time_converter(
3951 configuration::NodesCount(&config.message()));
3952 SimulatedEventLoopFactory event_loop_factory(&config.message());
3953 event_loop_factory.SetTimeConverter(&time_converter);
3954
3955 NodeEventLoopFactory *const pi1 =
3956 event_loop_factory.GetNodeEventLoopFactory("pi1");
3957 const size_t pi1_index = configuration::GetNodeIndex(
3958 event_loop_factory.configuration(), pi1->node());
3959 NodeEventLoopFactory *const pi2 =
3960 event_loop_factory.GetNodeEventLoopFactory("pi2");
3961 const size_t pi2_index = configuration::GetNodeIndex(
3962 event_loop_factory.configuration(), pi2->node());
3963 std::vector<std::string> filenames;
3964
3965 {
3966 CHECK_EQ(pi1_index, 0u);
3967 CHECK_EQ(pi2_index, 1u);
3968
3969 time_converter.AddNextTimestamp(
3970 distributed_clock::epoch(),
3971 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3972
3973 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3974 time_converter.AddNextTimestamp(
3975 distributed_clock::epoch() + reboot_time,
3976 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3977 BootTimestamp::epoch() + reboot_time});
3978 }
3979
3980 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003981 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Brian Smartte67d7112023-03-20 12:06:30 -07003982
3983 // The following sequence using the above reference config creates
3984 // an unreliable message timestamp < reliable message timestamp.
3985 {
3986 pi1->DisableStatistics();
3987 pi2->DisableStatistics();
3988
3989 event_loop_factory.RunFor(chrono::milliseconds(95));
3990
3991 pi1->AlwaysStart<Ping>("ping");
3992
3993 event_loop_factory.RunFor(chrono::milliseconds(5250));
3994
3995 pi1->EnableStatistics();
3996
3997 event_loop_factory.RunFor(chrono::milliseconds(1000));
3998
3999 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004000 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4001 FileStrategy::kKeepSeparate);
Brian Smartte67d7112023-03-20 12:06:30 -07004002
4003 pi2_logger.StartLogger(kLogfile2_1);
4004
4005 event_loop_factory.RunFor(chrono::milliseconds(5000));
4006 pi2_logger.AppendAllFilenames(&filenames);
4007 }
4008
4009 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004010 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Brian Smartte67d7112023-03-20 12:06:30 -07004011 ConfirmReadable(filenames);
4012}
4013
Naman Guptaa63aa132023-03-22 20:06:34 -07004014// Tests that we properly handle what used to be a time violation in one
4015// direction. This can occur when one direction goes down after sending some
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004016// data, but the other keeps working. The down direction ends up resolving to
4017// a straight line in the noncausal filter, where the direction which is still
4018// up can cross that line. Really, time progressed along just fine but we
4019// assumed that the offset was a line when it could have deviated by up to
4020// 1ms/second.
Naman Guptaa63aa132023-03-22 20:06:34 -07004021TEST_P(MultinodeLoggerTest, OneDirectionTimeDrift) {
4022 std::vector<std::string> filenames;
4023
4024 CHECK_EQ(pi1_index_, 0u);
4025 CHECK_EQ(pi2_index_, 1u);
4026
4027 time_converter_.AddNextTimestamp(
4028 distributed_clock::epoch(),
4029 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4030
4031 const chrono::nanoseconds before_disconnect_duration =
4032 time_converter_.AddMonotonic(
4033 {chrono::milliseconds(1000), chrono::milliseconds(1000)});
4034
4035 const chrono::nanoseconds test_duration =
4036 time_converter_.AddMonotonic(
4037 {chrono::milliseconds(1000), chrono::milliseconds(1000)}) +
4038 time_converter_.AddMonotonic(
4039 {chrono::milliseconds(10000),
4040 chrono::milliseconds(10000) - chrono::milliseconds(5)}) +
4041 time_converter_.AddMonotonic(
4042 {chrono::milliseconds(10000),
4043 chrono::milliseconds(10000) + chrono::milliseconds(5)});
4044
4045 const std::string kLogfile =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004046 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004047
4048 {
4049 LoggerState pi2_logger = MakeLogger(pi2_);
4050 pi2_logger.StartLogger(kLogfile);
4051 event_loop_factory_.RunFor(before_disconnect_duration);
4052
4053 pi2_->Disconnect(pi1_->node());
4054
4055 event_loop_factory_.RunFor(test_duration);
4056 pi2_->Connect(pi1_->node());
4057
4058 event_loop_factory_.RunFor(chrono::milliseconds(5000));
4059 pi2_logger.AppendAllFilenames(&filenames);
4060 }
4061
4062 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004063 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004064 ConfirmReadable(filenames);
4065}
4066
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004067// Tests that we can replay a logfile that has timestamps such that at least
4068// one node's epoch is at a positive distributed_clock (and thus will have to
4069// be booted after the other node(s)).
Naman Guptaa63aa132023-03-22 20:06:34 -07004070TEST_P(MultinodeLoggerTest, StartOneNodeBeforeOther) {
4071 std::vector<std::string> filenames;
4072
4073 CHECK_EQ(pi1_index_, 0u);
4074 CHECK_EQ(pi2_index_, 1u);
4075
4076 time_converter_.AddNextTimestamp(
4077 distributed_clock::epoch(),
4078 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4079
4080 const chrono::nanoseconds before_reboot_duration = chrono::milliseconds(1000);
4081 time_converter_.RebootAt(
4082 0, distributed_clock::time_point(before_reboot_duration));
4083
4084 const chrono::nanoseconds test_duration = time_converter_.AddMonotonic(
4085 {chrono::milliseconds(10000), chrono::milliseconds(10000)});
4086
4087 const std::string kLogfile =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004088 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004089
4090 pi2_->Disconnect(pi1_->node());
4091 pi1_->Disconnect(pi2_->node());
4092
4093 {
4094 LoggerState pi2_logger = MakeLogger(pi2_);
4095
4096 pi2_logger.StartLogger(kLogfile);
4097 event_loop_factory_.RunFor(before_reboot_duration);
4098
4099 pi2_->Connect(pi1_->node());
4100 pi1_->Connect(pi2_->node());
4101
4102 event_loop_factory_.RunFor(test_duration);
4103
4104 pi2_logger.AppendAllFilenames(&filenames);
4105 }
4106
4107 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004108 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004109 ConfirmReadable(filenames);
4110
4111 {
4112 LogReader reader(sorted_parts);
4113 SimulatedEventLoopFactory replay_factory(reader.configuration());
4114 reader.RegisterWithoutStarting(&replay_factory);
4115
4116 NodeEventLoopFactory *const replay_node =
4117 reader.event_loop_factory()->GetNodeEventLoopFactory("pi1");
4118
4119 std::unique_ptr<EventLoop> test_event_loop =
4120 replay_node->MakeEventLoop("test_reader");
4121 replay_node->OnStartup([replay_node]() {
4122 // Check that we didn't boot until at least t=0.
4123 CHECK_LE(monotonic_clock::epoch(), replay_node->monotonic_now());
4124 });
4125 test_event_loop->OnRun([&test_event_loop]() {
4126 // Check that we didn't boot until at least t=0.
4127 EXPECT_LE(monotonic_clock::epoch(), test_event_loop->monotonic_now());
4128 });
4129 reader.event_loop_factory()->Run();
4130 reader.Deregister();
4131 }
4132}
4133
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004134// Tests that when we have a loop without all the logs at all points in time,
4135// we can sort it properly.
Naman Guptaa63aa132023-03-22 20:06:34 -07004136TEST(MultinodeLoggerLoopTest, Loop) {
Austin Schuh7e417682023-08-11 17:05:30 -07004137 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4138 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4139
Naman Guptaa63aa132023-03-22 20:06:34 -07004140 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004141 aos::configuration::ReadConfig(
4142 ArtifactPath("aos/events/logging/"
4143 "multinode_pingpong_triangle_split_config.json"));
Naman Guptaa63aa132023-03-22 20:06:34 -07004144 message_bridge::TestingTimeConverter time_converter(
4145 configuration::NodesCount(&config.message()));
4146 SimulatedEventLoopFactory event_loop_factory(&config.message());
4147 event_loop_factory.SetTimeConverter(&time_converter);
4148
4149 NodeEventLoopFactory *const pi1 =
4150 event_loop_factory.GetNodeEventLoopFactory("pi1");
4151 NodeEventLoopFactory *const pi2 =
4152 event_loop_factory.GetNodeEventLoopFactory("pi2");
4153 NodeEventLoopFactory *const pi3 =
4154 event_loop_factory.GetNodeEventLoopFactory("pi3");
4155
4156 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004157 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004158 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004159 aos::testing::TestTmpDir() + "/logs/multi_logfile2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004160 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004161 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004162
4163 {
4164 // Make pi1 boot before everything else.
4165 time_converter.AddNextTimestamp(
4166 distributed_clock::epoch(),
4167 {BootTimestamp::epoch(),
4168 BootTimestamp::epoch() - chrono::milliseconds(100),
4169 BootTimestamp::epoch() - chrono::milliseconds(300)});
4170 }
4171
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004172 // We want to setup a situation such that 2 of the 3 legs of the loop are
4173 // very confident about time being X, and the third leg is pulling the
4174 // average off to one side.
Naman Guptaa63aa132023-03-22 20:06:34 -07004175 //
4176 // It's easiest to visualize this in timestamp_plotter.
4177
4178 std::vector<std::string> filenames;
4179 {
4180 // Have pi1 send out a reliable message at startup. This sets up a long
4181 // forwarding time message at the start to bias time.
4182 std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
4183 {
4184 aos::Sender<examples::Ping> ping_sender =
4185 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
4186
4187 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
4188 examples::Ping::Builder ping_builder =
4189 builder.MakeBuilder<examples::Ping>();
4190 CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
4191 }
4192
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004193 // Wait a while so there's enough data to let the worst case be rather
4194 // off.
Naman Guptaa63aa132023-03-22 20:06:34 -07004195 event_loop_factory.RunFor(chrono::seconds(1000));
4196
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004197 // Now start a receiving node first. This sets up 2 tight bounds between
4198 // 2 of the nodes.
Naman Guptaa63aa132023-03-22 20:06:34 -07004199 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004200 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4201 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004202 pi2_logger.StartLogger(kLogfile2_1);
4203
4204 event_loop_factory.RunFor(chrono::seconds(100));
4205
4206 // And now start the third leg.
4207 LoggerState pi3_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004208 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4209 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004210 pi3_logger.StartLogger(kLogfile3_1);
4211
4212 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004213 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4214 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004215 pi1_logger.StartLogger(kLogfile1_1);
4216
4217 event_loop_factory.RunFor(chrono::seconds(100));
4218
4219 pi1_logger.AppendAllFilenames(&filenames);
4220 pi2_logger.AppendAllFilenames(&filenames);
4221 pi3_logger.AppendAllFilenames(&filenames);
4222 }
4223
4224 // Make sure we can read this.
4225 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004226 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004227 auto result = ConfirmReadable(filenames);
4228}
4229
Austin Schuh08dba8f2023-05-01 08:29:30 -07004230// Tests that RestartLogging works in the simple case. Unfortunately, the
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004231// failure cases involve simulating time elapsing in callbacks, which is
4232// really hard. The best we can reasonably do is make sure 2 back to back
4233// logs are parseable together.
Austin Schuh08dba8f2023-05-01 08:29:30 -07004234TEST_P(MultinodeLoggerTest, RestartLogging) {
4235 time_converter_.AddMonotonic(
4236 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
4237 std::vector<std::string> filenames;
4238 {
4239 LoggerState pi1_logger = MakeLogger(pi1_);
4240
4241 event_loop_factory_.RunFor(chrono::milliseconds(95));
4242
4243 StartLogger(&pi1_logger, logfile_base1_);
4244 aos::monotonic_clock::time_point last_rotation_time =
4245 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07004246 pi1_logger.logger->set_on_logged_period(
4247 [&](aos::monotonic_clock::time_point) {
4248 const auto now = pi1_logger.event_loop->monotonic_now();
4249 if (now > last_rotation_time + std::chrono::seconds(5)) {
4250 pi1_logger.AppendAllFilenames(&filenames);
4251 std::unique_ptr<MultiNodeFilesLogNamer> namer =
4252 pi1_logger.MakeLogNamer(logfile_base2_);
4253 pi1_logger.log_namer = namer.get();
Austin Schuh08dba8f2023-05-01 08:29:30 -07004254
Austin Schuh2f864452023-07-17 14:53:08 -07004255 pi1_logger.logger->RestartLogging(std::move(namer));
4256 last_rotation_time = now;
4257 }
4258 });
Austin Schuh08dba8f2023-05-01 08:29:30 -07004259
4260 event_loop_factory_.RunFor(chrono::milliseconds(7000));
4261
4262 pi1_logger.AppendAllFilenames(&filenames);
4263 }
4264
4265 for (const auto &x : filenames) {
4266 LOG(INFO) << x;
4267 }
4268
4269 EXPECT_GE(filenames.size(), 2u);
4270
4271 ConfirmReadable(filenames);
4272
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004273 // TODO(austin): It would be good to confirm that any one time messages end
4274 // up in both logs correctly.
Austin Schuh08dba8f2023-05-01 08:29:30 -07004275}
4276
Austin Schuh6e93fc22023-08-22 21:27:22 -07004277// Tests that we call OnEnd without --skip_missing_forwarding_entries.
4278TEST_P(MultinodeLoggerTest, SkipMissingForwardingEntries) {
4279 if (file_strategy() == FileStrategy::kCombine) {
4280 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
4281 }
4282 time_converter_.AddMonotonic(
4283 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
4284
4285 std::vector<std::string> filenames;
4286 {
4287 LoggerState pi1_logger = MakeLogger(pi1_);
4288
4289 event_loop_factory_.RunFor(chrono::milliseconds(95));
4290
4291 StartLogger(&pi1_logger);
4292 aos::monotonic_clock::time_point last_rotation_time =
4293 pi1_logger.event_loop->monotonic_now();
4294 pi1_logger.logger->set_on_logged_period(
4295 [&](aos::monotonic_clock::time_point) {
4296 const auto now = pi1_logger.event_loop->monotonic_now();
4297 if (now > last_rotation_time + std::chrono::seconds(5)) {
4298 pi1_logger.logger->Rotate();
4299 last_rotation_time = now;
4300 }
4301 });
4302
4303 event_loop_factory_.RunFor(chrono::milliseconds(15000));
4304 pi1_logger.AppendAllFilenames(&filenames);
4305 }
4306
4307 // If we remove the last remote data part, we'll trigger missing data for
4308 // timestamps.
4309 filenames.erase(std::remove_if(filenames.begin(), filenames.end(),
4310 [](const std::string &s) {
4311 return s.find("data/pi2_data.part3.bfbs") !=
4312 std::string::npos;
4313 }),
4314 filenames.end());
4315
4316 auto result = ConfirmReadable(filenames);
4317}
4318
Austin Schuh54ffea42023-08-23 13:27:04 -07004319// Tests that we call OnEnd without --skip_missing_forwarding_entries.
4320TEST(MultinodeLoggerConfigTest, SingleNode) {
4321 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4322 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4323
4324 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4325 aos::configuration::ReadConfig(
4326 ArtifactPath("aos/events/logging/multinode_single_node_config.json"));
4327 message_bridge::TestingTimeConverter time_converter(
4328 configuration::NodesCount(&config.message()));
4329 SimulatedEventLoopFactory event_loop_factory(&config.message());
4330 event_loop_factory.SetTimeConverter(&time_converter);
4331
4332 time_converter.StartEqual();
4333
4334 const std::string kLogfile1_1 =
4335 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
4336
4337 NodeEventLoopFactory *const pi1 =
4338 event_loop_factory.GetNodeEventLoopFactory("pi1");
4339
4340 std::vector<std::string> filenames;
4341
4342 {
4343 // Now start a receiving node first. This sets up 2 tight bounds between
4344 // 2 of the nodes.
4345 LoggerState pi1_logger = MakeLoggerState(
4346 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4347 FileStrategy::kKeepSeparate);
4348 pi1_logger.StartLogger(kLogfile1_1);
4349
4350 event_loop_factory.RunFor(chrono::seconds(10));
4351
4352 pi1_logger.AppendAllFilenames(&filenames);
4353 }
4354
4355 // Make sure we can read this.
4356 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4357 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
4358 auto result = ConfirmReadable(filenames);
4359
4360 // TODO(austin): Probably want to stop caring about ServerStatistics,
4361 // ClientStatistics, and Timestamp since they don't really make sense.
4362}
4363
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004364// Tests that when we have evidence of 2 boots, and then start logging, the
4365// max_out_of_order_duration ends up reasonable on the boot with the start time.
4366TEST(MultinodeLoggerLoopTest, PreviousBootData) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07004367 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4368 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4369
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004370 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4371 aos::configuration::ReadConfig(ArtifactPath(
4372 "aos/events/logging/multinode_pingpong_reboot_ooo_config.json"));
4373 message_bridge::TestingTimeConverter time_converter(
4374 configuration::NodesCount(&config.message()));
4375 SimulatedEventLoopFactory event_loop_factory(&config.message());
4376 event_loop_factory.SetTimeConverter(&time_converter);
4377
4378 const UUID pi1_boot0 = UUID::Random();
4379 const UUID pi2_boot0 = UUID::Random();
4380 const UUID pi2_boot1 = UUID::Random();
4381
4382 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004383 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004384
4385 {
4386 constexpr size_t kPi1Index = 0;
4387 constexpr size_t kPi2Index = 1;
4388 time_converter.set_boot_uuid(kPi1Index, 0, pi1_boot0);
4389 time_converter.set_boot_uuid(kPi2Index, 0, pi2_boot0);
4390 time_converter.set_boot_uuid(kPi2Index, 1, pi2_boot1);
4391
4392 // Make pi1 boot before everything else.
4393 time_converter.AddNextTimestamp(
4394 distributed_clock::epoch(),
4395 {BootTimestamp::epoch(),
4396 BootTimestamp::epoch() - chrono::milliseconds(100)});
4397
4398 const chrono::nanoseconds reboot_time = chrono::seconds(1005);
4399 time_converter.AddNextTimestamp(
4400 distributed_clock::epoch() + reboot_time,
4401 {BootTimestamp::epoch() + reboot_time,
4402 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()}});
4403 }
4404
4405 NodeEventLoopFactory *const pi1 =
4406 event_loop_factory.GetNodeEventLoopFactory("pi1");
4407 NodeEventLoopFactory *const pi2 =
4408 event_loop_factory.GetNodeEventLoopFactory("pi2");
4409
4410 // What we want is for pi2 to send a message at t=1000 on the first channel
4411 // (/atest1 pong), and t=1 on the second channel (/atest3 pong). That'll make
4412 // the max out of order duration be large.
4413 //
4414 // Then, we reboot, and only send messages on a third channel (/atest2 pong).
4415 // The order is key, they need to sort in this order in the config.
4416
4417 std::vector<std::string> filenames;
4418 {
4419 {
4420 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4421 aos::Sender<examples::Pong> pong_sender =
4422 pi2_event_loop->MakeSender<examples::Pong>("/atest3");
4423
4424 pi2_event_loop->OnRun([&]() {
4425 aos::Sender<examples::Pong>::Builder builder =
4426 pong_sender.MakeBuilder();
4427 examples::Pong::Builder pong_builder =
4428 builder.MakeBuilder<examples::Pong>();
4429 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4430 });
4431
4432 event_loop_factory.RunFor(chrono::seconds(1000));
4433 }
4434
4435 {
4436 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4437 aos::Sender<examples::Pong> pong_sender =
4438 pi2_event_loop->MakeSender<examples::Pong>("/atest1");
4439
4440 aos::Sender<examples::Pong>::Builder builder = pong_sender.MakeBuilder();
4441 examples::Pong::Builder pong_builder =
4442 builder.MakeBuilder<examples::Pong>();
4443 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4444 }
4445
4446 event_loop_factory.RunFor(chrono::seconds(10));
4447
4448 // Now start a receiving node first. This sets up 2 tight bounds between
4449 // 2 of the nodes.
4450 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004451 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4452 FileStrategy::kKeepSeparate);
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004453 pi1_logger.StartLogger(kLogfile1_1);
4454
4455 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4456 aos::Sender<examples::Pong> pong_sender =
4457 pi2_event_loop->MakeSender<examples::Pong>("/atest2");
4458
4459 pi2_event_loop->AddPhasedLoop(
4460 [&pong_sender](int) {
4461 aos::Sender<examples::Pong>::Builder builder =
4462 pong_sender.MakeBuilder();
4463 examples::Pong::Builder pong_builder =
4464 builder.MakeBuilder<examples::Pong>();
4465 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4466 },
4467 chrono::milliseconds(10));
4468
4469 event_loop_factory.RunFor(chrono::seconds(100));
4470
4471 pi1_logger.AppendAllFilenames(&filenames);
4472 }
4473
4474 // Make sure we can read this.
4475 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4476 EXPECT_TRUE(AllRebootPartsMatchOutOfOrderDuration(sorted_parts, "pi2"));
4477 auto result = ConfirmReadable(filenames);
4478}
4479
4480// Tests that when we start without a connection, and then start logging, the
4481// max_out_of_order_duration ends up reasonable.
4482TEST(MultinodeLoggerLoopTest, StartDisconnected) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07004483 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4484 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4485
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004486 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4487 aos::configuration::ReadConfig(ArtifactPath(
4488 "aos/events/logging/multinode_pingpong_reboot_ooo_config.json"));
4489 message_bridge::TestingTimeConverter time_converter(
4490 configuration::NodesCount(&config.message()));
4491 SimulatedEventLoopFactory event_loop_factory(&config.message());
4492 event_loop_factory.SetTimeConverter(&time_converter);
4493
4494 time_converter.StartEqual();
4495
4496 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004497 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004498
4499 NodeEventLoopFactory *const pi1 =
4500 event_loop_factory.GetNodeEventLoopFactory("pi1");
4501 NodeEventLoopFactory *const pi2 =
4502 event_loop_factory.GetNodeEventLoopFactory("pi2");
4503
4504 // What we want is for pi2 to send a message at t=1000 on the first channel
4505 // (/atest1 pong), and t=1 on the second channel (/atest3 pong). That'll make
4506 // the max out of order duration be large.
4507 //
4508 // Then, we disconnect, and only send messages on a third channel
4509 // (/atest2 pong). The order is key, they need to sort in this order in the
4510 // config so we observe them in the order which grows the
4511 // max_out_of_order_duration.
4512
4513 std::vector<std::string> filenames;
4514 {
4515 {
4516 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4517 aos::Sender<examples::Pong> pong_sender =
4518 pi2_event_loop->MakeSender<examples::Pong>("/atest3");
4519
4520 pi2_event_loop->OnRun([&]() {
4521 aos::Sender<examples::Pong>::Builder builder =
4522 pong_sender.MakeBuilder();
4523 examples::Pong::Builder pong_builder =
4524 builder.MakeBuilder<examples::Pong>();
4525 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4526 });
4527
4528 event_loop_factory.RunFor(chrono::seconds(1000));
4529 }
4530
4531 {
4532 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4533 aos::Sender<examples::Pong> pong_sender =
4534 pi2_event_loop->MakeSender<examples::Pong>("/atest1");
4535
4536 aos::Sender<examples::Pong>::Builder builder = pong_sender.MakeBuilder();
4537 examples::Pong::Builder pong_builder =
4538 builder.MakeBuilder<examples::Pong>();
4539 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4540 }
4541
4542 event_loop_factory.RunFor(chrono::seconds(10));
4543
4544 pi1->Disconnect(pi2->node());
4545 pi2->Disconnect(pi1->node());
4546
4547 // Make data flow.
4548 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4549 aos::Sender<examples::Pong> pong_sender =
4550 pi2_event_loop->MakeSender<examples::Pong>("/atest2");
4551
4552 pi2_event_loop->AddPhasedLoop(
4553 [&pong_sender](int) {
4554 aos::Sender<examples::Pong>::Builder builder =
4555 pong_sender.MakeBuilder();
4556 examples::Pong::Builder pong_builder =
4557 builder.MakeBuilder<examples::Pong>();
4558 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4559 },
4560 chrono::milliseconds(10));
4561
4562 event_loop_factory.RunFor(chrono::seconds(10));
4563
4564 // Now start a receiving node first. This sets up 2 tight bounds between
4565 // 2 of the nodes.
4566 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004567 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4568 FileStrategy::kKeepSeparate);
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004569 pi1_logger.StartLogger(kLogfile1_1);
4570
4571 event_loop_factory.RunFor(chrono::seconds(10));
4572
4573 // Now, reconnect, and everything should recover.
4574 pi1->Connect(pi2->node());
4575 pi2->Connect(pi1->node());
4576
4577 event_loop_factory.RunFor(chrono::seconds(10));
4578
4579 pi1_logger.AppendAllFilenames(&filenames);
4580 }
4581
4582 // Make sure we can read this.
4583 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4584 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
4585 auto result = ConfirmReadable(filenames);
4586}
4587
Austin Schuh1124c512023-08-01 15:20:44 -07004588// Class to spam Pong messages blindly.
4589class PongSender {
4590 public:
4591 PongSender(EventLoop *loop, std::string_view channel_name)
4592 : sender_(loop->MakeSender<examples::Pong>(channel_name)) {
4593 loop->AddPhasedLoop(
4594 [this](int) {
4595 aos::Sender<examples::Pong>::Builder builder = sender_.MakeBuilder();
4596 examples::Pong::Builder pong_builder =
4597 builder.MakeBuilder<examples::Pong>();
4598 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4599 },
4600 chrono::milliseconds(10));
4601 }
4602
4603 private:
4604 aos::Sender<examples::Pong> sender_;
4605};
4606
4607// Tests that we log correctly as nodes connect slowly.
4608TEST(MultinodeLoggerLoopTest, StaggeredConnect) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07004609 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4610 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4611
Austin Schuh1124c512023-08-01 15:20:44 -07004612 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4613 aos::configuration::ReadConfig(ArtifactPath(
4614 "aos/events/logging/multinode_pingpong_pi3_pingpong_config.json"));
4615 message_bridge::TestingTimeConverter time_converter(
4616 configuration::NodesCount(&config.message()));
4617 SimulatedEventLoopFactory event_loop_factory(&config.message());
4618 event_loop_factory.SetTimeConverter(&time_converter);
4619
4620 time_converter.StartEqual();
4621
4622 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004623 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Austin Schuh1124c512023-08-01 15:20:44 -07004624
4625 NodeEventLoopFactory *const pi1 =
4626 event_loop_factory.GetNodeEventLoopFactory("pi1");
4627 NodeEventLoopFactory *const pi2 =
4628 event_loop_factory.GetNodeEventLoopFactory("pi2");
4629 NodeEventLoopFactory *const pi3 =
4630 event_loop_factory.GetNodeEventLoopFactory("pi3");
4631
4632 // What we want is for pi2 to send a message at t=1000 on the first channel
4633 // (/atest1 pong), and t=1 on the second channel (/atest3 pong). That'll make
4634 // the max out of order duration be large.
4635 //
4636 // Then, we disconnect, and only send messages on a third channel
4637 // (/atest2 pong). The order is key, they need to sort in this order in the
4638 // config so we observe them in the order which grows the
4639 // max_out_of_order_duration.
4640
4641 pi1->Disconnect(pi2->node());
4642 pi2->Disconnect(pi1->node());
4643
4644 pi1->Disconnect(pi3->node());
4645 pi3->Disconnect(pi1->node());
4646
4647 std::vector<std::string> filenames;
4648 pi2->AlwaysStart<PongSender>("pongsender", "/test2");
4649 pi3->AlwaysStart<PongSender>("pongsender", "/test3");
4650
4651 event_loop_factory.RunFor(chrono::seconds(10));
4652
4653 {
4654 // Now start a receiving node first. This sets up 2 tight bounds between
4655 // 2 of the nodes.
4656 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004657 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4658 FileStrategy::kKeepSeparate);
Austin Schuh1124c512023-08-01 15:20:44 -07004659 pi1_logger.StartLogger(kLogfile1_1);
4660
4661 event_loop_factory.RunFor(chrono::seconds(10));
4662
4663 // Now, reconnect, and everything should recover.
4664 pi1->Connect(pi2->node());
4665 pi2->Connect(pi1->node());
4666
4667 event_loop_factory.RunFor(chrono::seconds(10));
4668
4669 pi1->Connect(pi3->node());
4670 pi3->Connect(pi1->node());
4671
4672 event_loop_factory.RunFor(chrono::seconds(10));
4673
4674 pi1_logger.AppendAllFilenames(&filenames);
4675 }
4676
4677 // Make sure we can read this.
4678 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4679 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
4680 auto result = ConfirmReadable(filenames);
4681}
4682
Stephan Pleinesf63bde82024-01-13 15:59:33 -08004683} // namespace aos::logger::testing