blob: 427347d0042c3f51f8452043102e8dd6643b128f [file] [log] [blame]
Brian Smartte67d7112023-03-20 12:06:30 -07001#include <algorithm>
2
Philipp Schrader790cb542023-07-05 21:06:52 -07003#include "gmock/gmock.h"
4#include "gtest/gtest.h"
5
Naman Guptaa63aa132023-03-22 20:06:34 -07006#include "aos/events/logging/log_reader.h"
7#include "aos/events/logging/multinode_logger_test_lib.h"
8#include "aos/events/message_counter.h"
9#include "aos/events/ping_lib.h"
10#include "aos/events/pong_lib.h"
11#include "aos/network/remote_message_generated.h"
12#include "aos/network/timestamp_generated.h"
13#include "aos/testing/tmpdir.h"
Naman Guptaa63aa132023-03-22 20:06:34 -070014
Stephan Pleinesf63bde82024-01-13 15:59:33 -080015namespace aos::logger::testing {
Naman Guptaa63aa132023-03-22 20:06:34 -070016
17namespace chrono = std::chrono;
18using aos::message_bridge::RemoteMessage;
19using aos::testing::ArtifactPath;
20using aos::testing::MessageCounter;
21
Naman Guptaa63aa132023-03-22 20:06:34 -070022INSTANTIATE_TEST_SUITE_P(
23 All, MultinodeLoggerTest,
24 ::testing::Combine(
25 ::testing::Values(
26 ConfigParams{"multinode_pingpong_combined_config.json", true,
Austin Schuh6ecfe902023-08-04 22:44:37 -070027 kCombinedConfigSha1(), kCombinedConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070028 FileStrategy::kKeepSeparate,
29 ForceTimestampBuffering::kForceBufferTimestamps},
30 ConfigParams{"multinode_pingpong_combined_config.json", true,
31 kCombinedConfigSha1(), kCombinedConfigSha1(),
32 FileStrategy::kKeepSeparate,
33 ForceTimestampBuffering::kAutoBuffer},
Naman Guptaa63aa132023-03-22 20:06:34 -070034 ConfigParams{"multinode_pingpong_split_config.json", false,
Austin Schuh6ecfe902023-08-04 22:44:37 -070035 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070036 FileStrategy::kKeepSeparate,
37 ForceTimestampBuffering::kForceBufferTimestamps},
Austin Schuh6ecfe902023-08-04 22:44:37 -070038 ConfigParams{"multinode_pingpong_split_config.json", false,
39 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070040 FileStrategy::kKeepSeparate,
41 ForceTimestampBuffering::kAutoBuffer},
42 ConfigParams{"multinode_pingpong_split_config.json", false,
43 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
44 FileStrategy::kCombine,
45 ForceTimestampBuffering::kForceBufferTimestamps},
46 ConfigParams{"multinode_pingpong_split_config.json", false,
47 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
48 FileStrategy::kCombine,
49 ForceTimestampBuffering::kAutoBuffer}),
Naman Guptaa63aa132023-03-22 20:06:34 -070050 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
51
52INSTANTIATE_TEST_SUITE_P(
53 All, MultinodeLoggerDeathTest,
54 ::testing::Combine(
55 ::testing::Values(
56 ConfigParams{"multinode_pingpong_combined_config.json", true,
Austin Schuh6ecfe902023-08-04 22:44:37 -070057 kCombinedConfigSha1(), kCombinedConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070058 FileStrategy::kKeepSeparate,
59 ForceTimestampBuffering::kForceBufferTimestamps},
60 ConfigParams{"multinode_pingpong_combined_config.json", true,
61 kCombinedConfigSha1(), kCombinedConfigSha1(),
62 FileStrategy::kKeepSeparate,
63 ForceTimestampBuffering::kAutoBuffer},
Naman Guptaa63aa132023-03-22 20:06:34 -070064 ConfigParams{"multinode_pingpong_split_config.json", false,
Austin Schuh6ecfe902023-08-04 22:44:37 -070065 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070066 FileStrategy::kKeepSeparate,
67 ForceTimestampBuffering::kForceBufferTimestamps},
Austin Schuh6ecfe902023-08-04 22:44:37 -070068 ConfigParams{"multinode_pingpong_split_config.json", false,
69 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070070 FileStrategy::kKeepSeparate,
71 ForceTimestampBuffering::kAutoBuffer},
72 ConfigParams{"multinode_pingpong_split_config.json", false,
73 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
74 FileStrategy::kCombine,
75 ForceTimestampBuffering::kForceBufferTimestamps},
76 ConfigParams{"multinode_pingpong_split_config.json", false,
77 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
78 FileStrategy::kCombine,
79 ForceTimestampBuffering::kAutoBuffer}),
Naman Guptaa63aa132023-03-22 20:06:34 -070080 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
81
82// Tests that we can write and read simple multi-node log files.
83TEST_P(MultinodeLoggerTest, SimpleMultiNode) {
Austin Schuh6ecfe902023-08-04 22:44:37 -070084 if (file_strategy() == FileStrategy::kCombine) {
85 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
86 }
87
Naman Guptaa63aa132023-03-22 20:06:34 -070088 std::vector<std::string> actual_filenames;
89 time_converter_.StartEqual();
90
91 {
92 LoggerState pi1_logger = MakeLogger(pi1_);
93 LoggerState pi2_logger = MakeLogger(pi2_);
94
95 event_loop_factory_.RunFor(chrono::milliseconds(95));
96
97 StartLogger(&pi1_logger);
98 StartLogger(&pi2_logger);
99
100 event_loop_factory_.RunFor(chrono::milliseconds(20000));
101 pi1_logger.AppendAllFilenames(&actual_filenames);
102 pi2_logger.AppendAllFilenames(&actual_filenames);
103 }
104
105 ASSERT_THAT(actual_filenames,
106 ::testing::UnorderedElementsAreArray(logfiles_));
107
108 {
109 std::set<std::string> logfile_uuids;
110 std::set<std::string> parts_uuids;
111 // Confirm that we have the expected number of UUIDs for both the logfile
112 // UUIDs and parts UUIDs.
113 std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
114 for (std::string_view f : logfiles_) {
115 log_header.emplace_back(ReadHeader(f).value());
116 if (!log_header.back().message().has_configuration()) {
117 logfile_uuids.insert(
118 log_header.back().message().log_event_uuid()->str());
119 parts_uuids.insert(log_header.back().message().parts_uuid()->str());
120 }
121 }
122
123 EXPECT_EQ(logfile_uuids.size(), 2u);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700124 EXPECT_EQ(parts_uuids.size(), 8u);
Naman Guptaa63aa132023-03-22 20:06:34 -0700125
126 // And confirm everything is on the correct node.
127 EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
128 EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi1");
129 EXPECT_EQ(log_header[4].message().node()->name()->string_view(), "pi1");
130
131 EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
132 EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700133 EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700134
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700135 EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi1");
136 EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi1");
Naman Guptaa63aa132023-03-22 20:06:34 -0700137
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700138 EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi2");
139 EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700140
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700141 EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
142 EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
143 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700144
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700145 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi1");
146 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi1");
Naman Guptaa63aa132023-03-22 20:06:34 -0700147
148 // And the parts index matches.
149 EXPECT_EQ(log_header[2].message().parts_index(), 0);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700150
151 EXPECT_EQ(log_header[3].message().parts_index(), 0);
152 EXPECT_EQ(log_header[4].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700153
154 EXPECT_EQ(log_header[5].message().parts_index(), 0);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700155
156 EXPECT_EQ(log_header[6].message().parts_index(), 0);
157 EXPECT_EQ(log_header[7].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700158
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700159 EXPECT_EQ(log_header[8].message().parts_index(), 0);
160 EXPECT_EQ(log_header[9].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700161
162 EXPECT_EQ(log_header[10].message().parts_index(), 0);
163 EXPECT_EQ(log_header[11].message().parts_index(), 1);
164
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700165 EXPECT_EQ(log_header[12].message().parts_index(), 0);
166 EXPECT_EQ(log_header[13].message().parts_index(), 1);
167 EXPECT_EQ(log_header[14].message().parts_index(), 2);
Naman Guptaa63aa132023-03-22 20:06:34 -0700168
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700169 EXPECT_EQ(log_header[15].message().parts_index(), 0);
170 EXPECT_EQ(log_header[16].message().parts_index(), 1);
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700171
172 // And that the data_stored field is right.
173 EXPECT_THAT(*log_header[2].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700174 ::testing::ElementsAre(StoredDataType::DATA));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700175 EXPECT_THAT(*log_header[3].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700176 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700177 EXPECT_THAT(*log_header[4].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700178 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700179
180 EXPECT_THAT(*log_header[5].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700181 ::testing::ElementsAre(StoredDataType::DATA));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700182 EXPECT_THAT(*log_header[6].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700183 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700184 EXPECT_THAT(*log_header[7].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700185 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700186
187 EXPECT_THAT(*log_header[8].message().data_stored(),
188 ::testing::ElementsAre(StoredDataType::DATA));
189 EXPECT_THAT(*log_header[9].message().data_stored(),
190 ::testing::ElementsAre(StoredDataType::DATA));
191
192 EXPECT_THAT(*log_header[10].message().data_stored(),
193 ::testing::ElementsAre(StoredDataType::DATA));
194 EXPECT_THAT(*log_header[11].message().data_stored(),
195 ::testing::ElementsAre(StoredDataType::DATA));
196
197 EXPECT_THAT(*log_header[12].message().data_stored(),
198 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
199 EXPECT_THAT(*log_header[13].message().data_stored(),
200 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
201 EXPECT_THAT(*log_header[14].message().data_stored(),
202 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
203
204 EXPECT_THAT(*log_header[15].message().data_stored(),
205 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
206 EXPECT_THAT(*log_header[16].message().data_stored(),
207 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
Naman Guptaa63aa132023-03-22 20:06:34 -0700208 }
209
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700210 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
211 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -0700212 {
213 using ::testing::UnorderedElementsAre;
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700214 std::shared_ptr<const aos::Configuration> config = sorted_parts[0].config;
Naman Guptaa63aa132023-03-22 20:06:34 -0700215
216 // Timing reports, pings
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700217 if (shared()) {
218 EXPECT_THAT(
219 CountChannelsData(config, logfiles_[2]),
220 UnorderedElementsAre(
221 std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
222 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
223 200),
224 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
225 21),
226 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
227 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
228 std::make_tuple("/test", "aos.examples.Ping", 2001)))
229 << " : " << logfiles_[2];
230 } else {
231 EXPECT_THAT(
232 CountChannelsData(config, logfiles_[2]),
233 UnorderedElementsAre(
234 std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
235 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
236 200),
237 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
238 21),
239 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
240 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
241 std::make_tuple("/test", "aos.examples.Ping", 2001),
242 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
243 "aos-message_bridge-Timestamp",
244 "aos.message_bridge.RemoteMessage", 200)))
245 << " : " << logfiles_[2];
Naman Guptaa63aa132023-03-22 20:06:34 -0700246 }
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700247
248 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
249 ::testing::UnorderedElementsAre())
250 << " : " << logfiles_[3];
251 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
252 ::testing::UnorderedElementsAre())
253 << " : " << logfiles_[4];
254
Naman Guptaa63aa132023-03-22 20:06:34 -0700255 // Timestamps for pong
256 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
257 UnorderedElementsAre())
258 << " : " << logfiles_[2];
259 EXPECT_THAT(
260 CountChannelsTimestamp(config, logfiles_[3]),
261 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
262 << " : " << logfiles_[3];
263 EXPECT_THAT(
264 CountChannelsTimestamp(config, logfiles_[4]),
265 UnorderedElementsAre(
266 std::make_tuple("/test", "aos.examples.Pong", 2000),
267 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)))
268 << " : " << logfiles_[4];
269
Naman Guptaa63aa132023-03-22 20:06:34 -0700270 // Timing reports and pongs.
Naman Guptaa63aa132023-03-22 20:06:34 -0700271 EXPECT_THAT(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700272 CountChannelsData(config, logfiles_[5]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700273 UnorderedElementsAre(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700274 std::make_tuple("/pi2/aos", "aos.examples.Ping", 2001),
Naman Guptaa63aa132023-03-22 20:06:34 -0700275 std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
276 200),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700277 std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
278 21),
279 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700280 std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700281 std::make_tuple("/test", "aos.examples.Pong", 2001)))
282 << " : " << logfiles_[5];
283 EXPECT_THAT(CountChannelsData(config, logfiles_[6]), UnorderedElementsAre())
284 << " : " << logfiles_[6];
285 EXPECT_THAT(CountChannelsData(config, logfiles_[7]), UnorderedElementsAre())
Naman Guptaa63aa132023-03-22 20:06:34 -0700286 << " : " << logfiles_[7];
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700287 // And ping timestamps.
288 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
289 UnorderedElementsAre())
290 << " : " << logfiles_[5];
Naman Guptaa63aa132023-03-22 20:06:34 -0700291 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700292 CountChannelsTimestamp(config, logfiles_[6]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700293 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700294 << " : " << logfiles_[6];
Naman Guptaa63aa132023-03-22 20:06:34 -0700295 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700296 CountChannelsTimestamp(config, logfiles_[7]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700297 UnorderedElementsAre(
298 std::make_tuple("/test", "aos.examples.Ping", 2000),
299 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700300 << " : " << logfiles_[7];
Naman Guptaa63aa132023-03-22 20:06:34 -0700301
302 // And then test that the remotely logged timestamp data files only have
303 // timestamps in them.
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700304 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[8]),
305 UnorderedElementsAre())
306 << " : " << logfiles_[8];
307 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[9]),
308 UnorderedElementsAre())
309 << " : " << logfiles_[9];
Naman Guptaa63aa132023-03-22 20:06:34 -0700310 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
311 UnorderedElementsAre())
312 << " : " << logfiles_[10];
313 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
314 UnorderedElementsAre())
315 << " : " << logfiles_[11];
Naman Guptaa63aa132023-03-22 20:06:34 -0700316
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700317 EXPECT_THAT(CountChannelsData(config, logfiles_[8]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700318 UnorderedElementsAre(std::make_tuple(
319 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700320 << " : " << logfiles_[8];
321 EXPECT_THAT(CountChannelsData(config, logfiles_[9]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700322 UnorderedElementsAre(std::make_tuple(
323 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700324 << " : " << logfiles_[9];
Naman Guptaa63aa132023-03-22 20:06:34 -0700325
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700326 // Pong snd timestamp data.
327 EXPECT_THAT(
328 CountChannelsData(config, logfiles_[10]),
329 UnorderedElementsAre(
330 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 9),
331 std::make_tuple("/test", "aos.examples.Pong", 91)))
332 << " : " << logfiles_[10];
333 EXPECT_THAT(
334 CountChannelsData(config, logfiles_[11]),
335 UnorderedElementsAre(
336 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 191),
337 std::make_tuple("/test", "aos.examples.Pong", 1910)))
338 << " : " << logfiles_[11];
Naman Guptaa63aa132023-03-22 20:06:34 -0700339
340 // Timestamps from pi2 on pi1, and the other way.
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700341 // if (shared()) {
342 EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
343 UnorderedElementsAre())
344 << " : " << logfiles_[12];
345 EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
346 UnorderedElementsAre())
347 << " : " << logfiles_[13];
348 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
349 UnorderedElementsAre())
350 << " : " << logfiles_[14];
351 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
352 UnorderedElementsAre())
353 << " : " << logfiles_[15];
354 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
355 UnorderedElementsAre())
356 << " : " << logfiles_[16];
Naman Guptaa63aa132023-03-22 20:06:34 -0700357
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700358 EXPECT_THAT(
359 CountChannelsTimestamp(config, logfiles_[12]),
360 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
361 << " : " << logfiles_[12];
362 EXPECT_THAT(
363 CountChannelsTimestamp(config, logfiles_[13]),
364 UnorderedElementsAre(
365 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 9),
366 std::make_tuple("/test", "aos.examples.Ping", 90)))
367 << " : " << logfiles_[13];
368 EXPECT_THAT(
369 CountChannelsTimestamp(config, logfiles_[14]),
370 UnorderedElementsAre(
371 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 191),
372 std::make_tuple("/test", "aos.examples.Ping", 1910)))
373 << " : " << logfiles_[14];
374 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
375 UnorderedElementsAre(std::make_tuple(
376 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
377 << " : " << logfiles_[15];
378 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[16]),
379 UnorderedElementsAre(std::make_tuple(
380 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
381 << " : " << logfiles_[16];
Naman Guptaa63aa132023-03-22 20:06:34 -0700382 }
383
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700384 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700385
386 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
387 log_reader_factory.set_send_delay(chrono::microseconds(0));
388
389 // This sends out the fetched messages and advances time to the start of the
390 // log file.
391 reader.Register(&log_reader_factory);
392
393 const Node *pi1 =
394 configuration::GetNode(log_reader_factory.configuration(), "pi1");
395 const Node *pi2 =
396 configuration::GetNode(log_reader_factory.configuration(), "pi2");
397
398 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
399 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
400 LOG(INFO) << "now pi1 "
401 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
402 LOG(INFO) << "now pi2 "
403 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
404
405 EXPECT_THAT(reader.LoggedNodes(),
406 ::testing::ElementsAre(
407 configuration::GetNode(reader.logged_configuration(), pi1),
408 configuration::GetNode(reader.logged_configuration(), pi2)));
409
410 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
411
412 std::unique_ptr<EventLoop> pi1_event_loop =
413 log_reader_factory.MakeEventLoop("test", pi1);
414 std::unique_ptr<EventLoop> pi2_event_loop =
415 log_reader_factory.MakeEventLoop("test", pi2);
416
417 int pi1_ping_count = 10;
418 int pi2_ping_count = 10;
419 int pi1_pong_count = 10;
420 int pi2_pong_count = 10;
421
422 // Confirm that the ping value matches.
423 pi1_event_loop->MakeWatcher(
424 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
425 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
426 << pi1_event_loop->context().monotonic_remote_time << " -> "
427 << pi1_event_loop->context().monotonic_event_time;
428 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
429 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
430 pi1_ping_count * chrono::milliseconds(10) +
431 monotonic_clock::epoch());
432 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
433 pi1_ping_count * chrono::milliseconds(10) +
434 realtime_clock::epoch());
435 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
436 pi1_event_loop->context().monotonic_event_time);
437 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
438 pi1_event_loop->context().realtime_event_time);
439
440 ++pi1_ping_count;
441 });
442 pi2_event_loop->MakeWatcher(
443 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
444 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
445 << pi2_event_loop->context().monotonic_remote_time << " -> "
446 << pi2_event_loop->context().monotonic_event_time;
447 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
448
449 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
450 pi2_ping_count * chrono::milliseconds(10) +
451 monotonic_clock::epoch());
452 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
453 pi2_ping_count * chrono::milliseconds(10) +
454 realtime_clock::epoch());
455 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time +
456 chrono::microseconds(150),
457 pi2_event_loop->context().monotonic_event_time);
458 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time +
459 chrono::microseconds(150),
460 pi2_event_loop->context().realtime_event_time);
461 ++pi2_ping_count;
462 });
463
464 constexpr ssize_t kQueueIndexOffset = -9;
465 // Confirm that the ping and pong counts both match, and the value also
466 // matches.
467 pi1_event_loop->MakeWatcher(
468 "/test", [&pi1_event_loop, &pi1_ping_count,
469 &pi1_pong_count](const examples::Pong &pong) {
470 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
471 << pi1_event_loop->context().monotonic_remote_time << " -> "
472 << pi1_event_loop->context().monotonic_event_time;
473
474 EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
475 pi1_pong_count + kQueueIndexOffset);
476 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
477 chrono::microseconds(200) +
478 pi1_pong_count * chrono::milliseconds(10) +
479 monotonic_clock::epoch());
480 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
481 chrono::microseconds(200) +
482 pi1_pong_count * chrono::milliseconds(10) +
483 realtime_clock::epoch());
484
485 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
486 chrono::microseconds(150),
487 pi1_event_loop->context().monotonic_event_time);
488 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
489 chrono::microseconds(150),
490 pi1_event_loop->context().realtime_event_time);
491
492 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
493 ++pi1_pong_count;
494 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
495 });
496 pi2_event_loop->MakeWatcher(
497 "/test", [&pi2_event_loop, &pi2_ping_count,
498 &pi2_pong_count](const examples::Pong &pong) {
499 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
500 << pi2_event_loop->context().monotonic_remote_time << " -> "
501 << pi2_event_loop->context().monotonic_event_time;
502
503 EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
504 pi2_pong_count + kQueueIndexOffset);
505
506 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
507 chrono::microseconds(200) +
508 pi2_pong_count * chrono::milliseconds(10) +
509 monotonic_clock::epoch());
510 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
511 chrono::microseconds(200) +
512 pi2_pong_count * chrono::milliseconds(10) +
513 realtime_clock::epoch());
514
515 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
516 pi2_event_loop->context().monotonic_event_time);
517 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
518 pi2_event_loop->context().realtime_event_time);
519
520 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
521 ++pi2_pong_count;
522 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
523 });
524
525 log_reader_factory.Run();
526 EXPECT_EQ(pi1_ping_count, 2010);
527 EXPECT_EQ(pi2_ping_count, 2010);
528 EXPECT_EQ(pi1_pong_count, 2010);
529 EXPECT_EQ(pi2_pong_count, 2010);
530
531 reader.Deregister();
532}
533
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600534// MultinodeLoggerTest that tests the mutate callback works across multiple
535// nodes with remapping
536TEST_P(MultinodeLoggerTest, MultiNodeRemapMutateCallback) {
537 time_converter_.StartEqual();
538 std::vector<std::string> actual_filenames;
539
540 {
541 LoggerState pi1_logger = MakeLogger(pi1_);
542 LoggerState pi2_logger = MakeLogger(pi2_);
543
544 event_loop_factory_.RunFor(chrono::milliseconds(95));
545
546 StartLogger(&pi1_logger);
547 StartLogger(&pi2_logger);
548
549 event_loop_factory_.RunFor(chrono::milliseconds(20000));
550 pi1_logger.AppendAllFilenames(&actual_filenames);
551 pi2_logger.AppendAllFilenames(&actual_filenames);
552 }
553
Austin Schuh8fb4b452023-08-04 17:02:27 -0700554 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700555 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600556
557 LogReader reader(sorted_parts, &config_.message());
558 // Remap just on pi1.
559 reader.RemapLoggedChannel<examples::Pong>(
560 "/test", configuration::GetNode(reader.configuration(), "pi1"));
561
562 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
563
564 int pong_count = 0;
565 // Adds a callback which mutates the value of the pong message before the
566 // message is sent which is the feature we are testing here
567 reader.AddBeforeSendCallback("/test",
568 [&pong_count](aos::examples::Pong *pong) {
569 pong->mutate_value(pong->value() + 1);
570 pong_count = pong->value();
571 });
572
573 // This sends out the fetched messages and advances time to the start of the
574 // log file.
575 reader.Register(&log_reader_factory);
576
577 const Node *pi1 =
578 configuration::GetNode(log_reader_factory.configuration(), "pi1");
579 const Node *pi2 =
580 configuration::GetNode(log_reader_factory.configuration(), "pi2");
581
582 EXPECT_THAT(reader.LoggedNodes(),
583 ::testing::ElementsAre(
584 configuration::GetNode(reader.logged_configuration(), pi1),
585 configuration::GetNode(reader.logged_configuration(), pi2)));
586
587 std::unique_ptr<EventLoop> pi1_event_loop =
588 log_reader_factory.MakeEventLoop("test", pi1);
589 std::unique_ptr<EventLoop> pi2_event_loop =
590 log_reader_factory.MakeEventLoop("test", pi2);
591
592 pi1_event_loop->MakeWatcher("/original/test",
593 [&pong_count](const examples::Pong &pong) {
594 EXPECT_EQ(pong_count, pong.value());
595 });
596
597 pi2_event_loop->MakeWatcher("/test",
598 [&pong_count](const examples::Pong &pong) {
599 EXPECT_EQ(pong_count, pong.value());
600 });
601
602 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
603 reader.Deregister();
604
605 EXPECT_EQ(pong_count, 2011);
606}
607
608// MultinodeLoggerTest that tests the mutate callback works across multiple
609// nodes
610TEST_P(MultinodeLoggerTest, MultiNodeMutateCallback) {
611 time_converter_.StartEqual();
612 std::vector<std::string> actual_filenames;
613
614 {
615 LoggerState pi1_logger = MakeLogger(pi1_);
616 LoggerState pi2_logger = MakeLogger(pi2_);
617
618 event_loop_factory_.RunFor(chrono::milliseconds(95));
619
620 StartLogger(&pi1_logger);
621 StartLogger(&pi2_logger);
622
623 event_loop_factory_.RunFor(chrono::milliseconds(20000));
624 pi1_logger.AppendAllFilenames(&actual_filenames);
625 pi2_logger.AppendAllFilenames(&actual_filenames);
626 }
627
Austin Schuh8fb4b452023-08-04 17:02:27 -0700628 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700629 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600630
631 LogReader reader(sorted_parts, &config_.message());
632
633 int pong_count = 0;
634 // Adds a callback which mutates the value of the pong message before the
635 // message is sent which is the feature we are testing here
636 reader.AddBeforeSendCallback("/test",
637 [&pong_count](aos::examples::Pong *pong) {
638 pong->mutate_value(pong->value() + 1);
639 pong_count = pong->value();
640 });
641
642 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
643
644 // This sends out the fetched messages and advances time to the start of the
645 // log file.
646 reader.Register(&log_reader_factory);
647
648 const Node *pi1 =
649 configuration::GetNode(log_reader_factory.configuration(), "pi1");
650 const Node *pi2 =
651 configuration::GetNode(log_reader_factory.configuration(), "pi2");
652
653 EXPECT_THAT(reader.LoggedNodes(),
654 ::testing::ElementsAre(
655 configuration::GetNode(reader.logged_configuration(), pi1),
656 configuration::GetNode(reader.logged_configuration(), pi2)));
657
658 std::unique_ptr<EventLoop> pi1_event_loop =
659 log_reader_factory.MakeEventLoop("test", pi1);
660 std::unique_ptr<EventLoop> pi2_event_loop =
661 log_reader_factory.MakeEventLoop("test", pi2);
662
663 pi1_event_loop->MakeWatcher("/test",
664 [&pong_count](const examples::Pong &pong) {
665 EXPECT_EQ(pong_count, pong.value());
666 });
667
668 pi2_event_loop->MakeWatcher("/test",
669 [&pong_count](const examples::Pong &pong) {
670 EXPECT_EQ(pong_count, pong.value());
671 });
672
673 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
674 reader.Deregister();
675
676 EXPECT_EQ(pong_count, 2011);
677}
678
679// Tests that the before send callback is only called from the sender node if it
680// is forwarded
681TEST_P(MultinodeLoggerTest, OnlyDoBeforeSendCallbackOnSenderNode) {
682 time_converter_.StartEqual();
Austin Schuh8fb4b452023-08-04 17:02:27 -0700683
684 std::vector<std::string> filenames;
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600685 {
686 LoggerState pi1_logger = MakeLogger(pi1_);
687 LoggerState pi2_logger = MakeLogger(pi2_);
688
689 event_loop_factory_.RunFor(chrono::milliseconds(95));
690
691 StartLogger(&pi1_logger);
692 StartLogger(&pi2_logger);
693
694 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -0700695
696 pi1_logger.AppendAllFilenames(&filenames);
697 pi2_logger.AppendAllFilenames(&filenames);
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600698 }
699
Austin Schuh8fb4b452023-08-04 17:02:27 -0700700 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700701 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
702 LogReader reader(sorted_parts);
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600703
704 int ping_count = 0;
705 // Adds a callback which mutates the value of the pong message before the
706 // message is sent which is the feature we are testing here
707 reader.AddBeforeSendCallback("/test",
708 [&ping_count](aos::examples::Ping *ping) {
709 ++ping_count;
710 ping->mutate_value(ping_count);
711 });
712
713 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
714 log_reader_factory.set_send_delay(chrono::microseconds(0));
715
716 reader.Register(&log_reader_factory);
717
718 const Node *pi1 =
719 configuration::GetNode(log_reader_factory.configuration(), "pi1");
720 const Node *pi2 =
721 configuration::GetNode(log_reader_factory.configuration(), "pi2");
722
723 std::unique_ptr<EventLoop> pi1_event_loop =
724 log_reader_factory.MakeEventLoop("test", pi1);
725 pi1_event_loop->SkipTimingReport();
726 std::unique_ptr<EventLoop> pi2_event_loop =
727 log_reader_factory.MakeEventLoop("test", pi2);
728 pi2_event_loop->SkipTimingReport();
729
730 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
731 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
732
733 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
734 pi1_ping_timestamp;
735 if (!shared()) {
736 pi1_ping_timestamp =
737 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
738 pi1_event_loop.get(),
739 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
740 }
741
742 log_reader_factory.Run();
743
744 EXPECT_EQ(pi1_ping.count(), 2000u);
745 EXPECT_EQ(pi2_ping.count(), 2000u);
746 // If the BeforeSendCallback is called on both nodes, then the ping count
747 // would be 4002 instead of 2001
748 EXPECT_EQ(ping_count, 2001u);
749 if (!shared()) {
750 EXPECT_EQ(pi1_ping_timestamp->count(), 2000u);
751 }
752
753 reader.Deregister();
754}
755
756// Tests that we do not allow adding callbacks after Register is called
757TEST_P(MultinodeLoggerDeathTest, AddCallbackAfterRegister) {
758 time_converter_.StartEqual();
759 std::vector<std::string> actual_filenames;
760
761 {
762 LoggerState pi1_logger = MakeLogger(pi1_);
763 LoggerState pi2_logger = MakeLogger(pi2_);
764
765 event_loop_factory_.RunFor(chrono::milliseconds(95));
766
767 StartLogger(&pi1_logger);
768 StartLogger(&pi2_logger);
769
770 event_loop_factory_.RunFor(chrono::milliseconds(20000));
771 pi1_logger.AppendAllFilenames(&actual_filenames);
772 pi2_logger.AppendAllFilenames(&actual_filenames);
773 }
774
Austin Schuh8fb4b452023-08-04 17:02:27 -0700775 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700776 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600777
778 LogReader reader(sorted_parts, &config_.message());
779 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
780 reader.Register(&log_reader_factory);
781 EXPECT_DEATH(
782 {
783 reader.AddBeforeSendCallback("/test", [](aos::examples::Pong *) {
784 LOG(FATAL) << "This should not be called";
785 });
786 },
787 "Cannot add callbacks after calling Register");
788 reader.Deregister();
789}
790
Naman Guptaa63aa132023-03-22 20:06:34 -0700791// Test that if we feed the replay with a mismatched node list that we die on
792// the LogReader constructor.
793TEST_P(MultinodeLoggerDeathTest, MultiNodeBadReplayConfig) {
794 time_converter_.StartEqual();
Austin Schuh8fb4b452023-08-04 17:02:27 -0700795
796 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -0700797 {
798 LoggerState pi1_logger = MakeLogger(pi1_);
799 LoggerState pi2_logger = MakeLogger(pi2_);
800
801 event_loop_factory_.RunFor(chrono::milliseconds(95));
802
803 StartLogger(&pi1_logger);
804 StartLogger(&pi2_logger);
805
806 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -0700807
808 pi1_logger.AppendAllFilenames(&filenames);
809 pi2_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -0700810 }
811
812 // Test that, if we add an additional node to the replay config that the
813 // logger complains about the mismatch in number of nodes.
814 FlatbufferDetachedBuffer<Configuration> extra_nodes_config =
815 configuration::MergeWithConfig(&config_.message(), R"({
816 "nodes": [
817 {
818 "name": "extra-node"
819 }
820 ]
821 }
822 )");
823
Austin Schuh8fb4b452023-08-04 17:02:27 -0700824 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700825 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -0700826 EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
827 "Log file and replay config need to have matching nodes lists.");
828}
829
830// Tests that we can read log files where they don't start at the same monotonic
831// time.
832TEST_P(MultinodeLoggerTest, StaggeredStart) {
833 time_converter_.StartEqual();
834 std::vector<std::string> actual_filenames;
835
836 {
837 LoggerState pi1_logger = MakeLogger(pi1_);
838 LoggerState pi2_logger = MakeLogger(pi2_);
839
840 event_loop_factory_.RunFor(chrono::milliseconds(95));
841
842 StartLogger(&pi1_logger);
843
844 event_loop_factory_.RunFor(chrono::milliseconds(200));
845
846 StartLogger(&pi2_logger);
847
848 event_loop_factory_.RunFor(chrono::milliseconds(20000));
849 pi1_logger.AppendAllFilenames(&actual_filenames);
850 pi2_logger.AppendAllFilenames(&actual_filenames);
851 }
852
853 // Since we delay starting pi2, it already knows about all the timestamps so
854 // we don't end up with extra parts.
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700855 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
856 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
857 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700858
859 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
860 log_reader_factory.set_send_delay(chrono::microseconds(0));
861
862 // This sends out the fetched messages and advances time to the start of the
863 // log file.
864 reader.Register(&log_reader_factory);
865
866 const Node *pi1 =
867 configuration::GetNode(log_reader_factory.configuration(), "pi1");
868 const Node *pi2 =
869 configuration::GetNode(log_reader_factory.configuration(), "pi2");
870
871 EXPECT_THAT(reader.LoggedNodes(),
872 ::testing::ElementsAre(
873 configuration::GetNode(reader.logged_configuration(), pi1),
874 configuration::GetNode(reader.logged_configuration(), pi2)));
875
876 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
877
878 std::unique_ptr<EventLoop> pi1_event_loop =
879 log_reader_factory.MakeEventLoop("test", pi1);
880 std::unique_ptr<EventLoop> pi2_event_loop =
881 log_reader_factory.MakeEventLoop("test", pi2);
882
883 int pi1_ping_count = 30;
884 int pi2_ping_count = 30;
885 int pi1_pong_count = 30;
886 int pi2_pong_count = 30;
887
888 // Confirm that the ping value matches.
889 pi1_event_loop->MakeWatcher(
890 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
891 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
892 << pi1_event_loop->context().monotonic_remote_time << " -> "
893 << pi1_event_loop->context().monotonic_event_time;
894 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
895
896 ++pi1_ping_count;
897 });
898 pi2_event_loop->MakeWatcher(
899 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
900 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
901 << pi2_event_loop->context().monotonic_remote_time << " -> "
902 << pi2_event_loop->context().monotonic_event_time;
903 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
904
905 ++pi2_ping_count;
906 });
907
908 // Confirm that the ping and pong counts both match, and the value also
909 // matches.
910 pi1_event_loop->MakeWatcher(
911 "/test", [&pi1_event_loop, &pi1_ping_count,
912 &pi1_pong_count](const examples::Pong &pong) {
913 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
914 << pi1_event_loop->context().monotonic_remote_time << " -> "
915 << pi1_event_loop->context().monotonic_event_time;
916
917 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
918 ++pi1_pong_count;
919 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
920 });
921 pi2_event_loop->MakeWatcher(
922 "/test", [&pi2_event_loop, &pi2_ping_count,
923 &pi2_pong_count](const examples::Pong &pong) {
924 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
925 << pi2_event_loop->context().monotonic_remote_time << " -> "
926 << pi2_event_loop->context().monotonic_event_time;
927
928 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
929 ++pi2_pong_count;
930 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
931 });
932
933 log_reader_factory.Run();
934 EXPECT_EQ(pi1_ping_count, 2030);
935 EXPECT_EQ(pi2_ping_count, 2030);
936 EXPECT_EQ(pi1_pong_count, 2030);
937 EXPECT_EQ(pi2_pong_count, 2030);
938
939 reader.Deregister();
940}
941
942// Tests that we can read log files where the monotonic clocks drift and don't
943// match correctly. While we are here, also test that different ending times
944// also is readable.
945TEST_P(MultinodeLoggerTest, MismatchedClocks) {
946 // TODO(austin): Negate...
947 const chrono::nanoseconds initial_pi2_offset = chrono::seconds(1000);
948
949 time_converter_.AddMonotonic(
950 {BootTimestamp::epoch(), BootTimestamp::epoch() + initial_pi2_offset});
951 // Wait for 95 ms, (~0.1 seconds - 1/2 of the ping/pong period), and set the
952 // skew to be 200 uS/s
953 const chrono::nanoseconds startup_sleep1 = time_converter_.AddMonotonic(
954 {chrono::milliseconds(95),
955 chrono::milliseconds(95) - chrono::nanoseconds(200) * 95});
956 // Run another 200 ms to have one logger start first.
957 const chrono::nanoseconds startup_sleep2 = time_converter_.AddMonotonic(
958 {chrono::milliseconds(200), chrono::milliseconds(200)});
959 // Slew one way then the other at the same 200 uS/S slew rate. Make sure we
960 // go far enough to cause problems if this isn't accounted for.
961 const chrono::nanoseconds logger_run1 = time_converter_.AddMonotonic(
962 {chrono::milliseconds(20000),
963 chrono::milliseconds(20000) - chrono::nanoseconds(200) * 20000});
964 const chrono::nanoseconds logger_run2 = time_converter_.AddMonotonic(
965 {chrono::milliseconds(40000),
966 chrono::milliseconds(40000) + chrono::nanoseconds(200) * 40000});
967 const chrono::nanoseconds logger_run3 = time_converter_.AddMonotonic(
968 {chrono::milliseconds(400), chrono::milliseconds(400)});
969
Austin Schuh8fb4b452023-08-04 17:02:27 -0700970 std::vector<std::string> actual_filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -0700971 {
972 LoggerState pi2_logger = MakeLogger(pi2_);
973
974 LOG(INFO) << "pi2 times: " << pi2_->monotonic_now() << " "
975 << pi2_->realtime_now() << " distributed "
976 << pi2_->ToDistributedClock(pi2_->monotonic_now());
977
978 LOG(INFO) << "pi2_ times: " << pi2_->monotonic_now() << " "
979 << pi2_->realtime_now() << " distributed "
980 << pi2_->ToDistributedClock(pi2_->monotonic_now());
981
982 event_loop_factory_.RunFor(startup_sleep1);
983
984 StartLogger(&pi2_logger);
985
986 event_loop_factory_.RunFor(startup_sleep2);
987
988 {
989 // Run pi1's logger for only part of the time.
990 LoggerState pi1_logger = MakeLogger(pi1_);
991
992 StartLogger(&pi1_logger);
993 event_loop_factory_.RunFor(logger_run1);
994
995 // Make sure we slewed time far enough so that the difference is greater
996 // than the network delay. This confirms that if we sort incorrectly, it
997 // would show in the results.
998 EXPECT_LT(
999 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
1000 -event_loop_factory_.send_delay() -
1001 event_loop_factory_.network_delay());
1002
1003 event_loop_factory_.RunFor(logger_run2);
1004
1005 // And now check that we went far enough the other way to make sure we
1006 // cover both problems.
1007 EXPECT_GT(
1008 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
1009 event_loop_factory_.send_delay() +
1010 event_loop_factory_.network_delay());
Austin Schuh8fb4b452023-08-04 17:02:27 -07001011
1012 pi1_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001013 }
1014
1015 // And log a bit more on pi2.
1016 event_loop_factory_.RunFor(logger_run3);
Austin Schuh8fb4b452023-08-04 17:02:27 -07001017
1018 pi2_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001019 }
1020
Austin Schuh8fb4b452023-08-04 17:02:27 -07001021 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001022 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1023 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001024
1025 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1026 log_reader_factory.set_send_delay(chrono::microseconds(0));
1027
1028 const Node *pi1 =
1029 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1030 const Node *pi2 =
1031 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1032
1033 // This sends out the fetched messages and advances time to the start of the
1034 // log file.
1035 reader.Register(&log_reader_factory);
1036
1037 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1038 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1039 LOG(INFO) << "now pi1 "
1040 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1041 LOG(INFO) << "now pi2 "
1042 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1043
1044 LOG(INFO) << "Done registering (pi1) "
1045 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
1046 << " "
1047 << log_reader_factory.GetNodeEventLoopFactory(pi1)->realtime_now();
1048 LOG(INFO) << "Done registering (pi2) "
1049 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now()
1050 << " "
1051 << log_reader_factory.GetNodeEventLoopFactory(pi2)->realtime_now();
1052
1053 EXPECT_THAT(reader.LoggedNodes(),
1054 ::testing::ElementsAre(
1055 configuration::GetNode(reader.logged_configuration(), pi1),
1056 configuration::GetNode(reader.logged_configuration(), pi2)));
1057
1058 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1059
1060 std::unique_ptr<EventLoop> pi1_event_loop =
1061 log_reader_factory.MakeEventLoop("test", pi1);
1062 std::unique_ptr<EventLoop> pi2_event_loop =
1063 log_reader_factory.MakeEventLoop("test", pi2);
1064
1065 int pi1_ping_count = 30;
1066 int pi2_ping_count = 30;
1067 int pi1_pong_count = 30;
1068 int pi2_pong_count = 30;
1069
1070 // Confirm that the ping value matches.
1071 pi1_event_loop->MakeWatcher(
1072 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
1073 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
1074 << pi1_event_loop->context().monotonic_remote_time << " -> "
1075 << pi1_event_loop->context().monotonic_event_time;
1076 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
1077
1078 ++pi1_ping_count;
1079 });
1080 pi2_event_loop->MakeWatcher(
1081 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
1082 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
1083 << pi2_event_loop->context().monotonic_remote_time << " -> "
1084 << pi2_event_loop->context().monotonic_event_time;
1085 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
1086
1087 ++pi2_ping_count;
1088 });
1089
1090 // Confirm that the ping and pong counts both match, and the value also
1091 // matches.
1092 pi1_event_loop->MakeWatcher(
1093 "/test", [&pi1_event_loop, &pi1_ping_count,
1094 &pi1_pong_count](const examples::Pong &pong) {
1095 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
1096 << pi1_event_loop->context().monotonic_remote_time << " -> "
1097 << pi1_event_loop->context().monotonic_event_time;
1098
1099 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
1100 ++pi1_pong_count;
1101 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
1102 });
1103 pi2_event_loop->MakeWatcher(
1104 "/test", [&pi2_event_loop, &pi2_ping_count,
1105 &pi2_pong_count](const examples::Pong &pong) {
1106 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
1107 << pi2_event_loop->context().monotonic_remote_time << " -> "
1108 << pi2_event_loop->context().monotonic_event_time;
1109
1110 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
1111 ++pi2_pong_count;
1112 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
1113 });
1114
1115 log_reader_factory.Run();
1116 EXPECT_EQ(pi1_ping_count, 6030);
1117 EXPECT_EQ(pi2_ping_count, 6030);
1118 EXPECT_EQ(pi1_pong_count, 6030);
1119 EXPECT_EQ(pi2_pong_count, 6030);
1120
1121 reader.Deregister();
1122}
1123
1124// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
1125TEST_P(MultinodeLoggerTest, SortParts) {
1126 time_converter_.StartEqual();
1127 // Make a bunch of parts.
1128 {
1129 LoggerState pi1_logger = MakeLogger(pi1_);
1130 LoggerState pi2_logger = MakeLogger(pi2_);
1131
1132 event_loop_factory_.RunFor(chrono::milliseconds(95));
1133
1134 StartLogger(&pi1_logger);
1135 StartLogger(&pi2_logger);
1136
1137 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1138 }
1139
1140 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1141 VerifyParts(sorted_parts);
1142}
1143
1144// Tests that we can sort a bunch of parts with an empty part. We should ignore
1145// it and remove it from the sorted list.
1146TEST_P(MultinodeLoggerTest, SortEmptyParts) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07001147 std::vector<std::string> actual_filenames;
1148
Naman Guptaa63aa132023-03-22 20:06:34 -07001149 time_converter_.StartEqual();
1150 // Make a bunch of parts.
1151 {
1152 LoggerState pi1_logger = MakeLogger(pi1_);
1153 LoggerState pi2_logger = MakeLogger(pi2_);
1154
1155 event_loop_factory_.RunFor(chrono::milliseconds(95));
1156
1157 StartLogger(&pi1_logger);
1158 StartLogger(&pi2_logger);
1159
1160 event_loop_factory_.RunFor(chrono::milliseconds(2000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07001161 pi1_logger.AppendAllFilenames(&actual_filenames);
1162 pi2_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001163 }
1164
1165 // TODO(austin): Should we flip out if the file can't open?
1166 const std::string kEmptyFile("foobarinvalidfiledoesnotexist" + Extension());
1167
1168 aos::util::WriteStringToFileOrDie(kEmptyFile, "");
Austin Schuh8fb4b452023-08-04 17:02:27 -07001169 actual_filenames.emplace_back(kEmptyFile);
Naman Guptaa63aa132023-03-22 20:06:34 -07001170
Austin Schuh8fb4b452023-08-04 17:02:27 -07001171 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001172 VerifyParts(sorted_parts, {kEmptyFile});
1173}
1174
1175// Tests that we can sort a bunch of parts with the end missing off a
1176// file. We should use the part we can read.
1177TEST_P(MultinodeLoggerTest, SortTruncatedParts) {
Austin Schuh6ecfe902023-08-04 22:44:37 -07001178 if (file_strategy() == FileStrategy::kCombine) {
1179 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
1180 }
1181
Naman Guptaa63aa132023-03-22 20:06:34 -07001182 std::vector<std::string> actual_filenames;
1183 time_converter_.StartEqual();
1184 // Make a bunch of parts.
1185 {
1186 LoggerState pi1_logger = MakeLogger(pi1_);
1187 LoggerState pi2_logger = MakeLogger(pi2_);
1188
1189 event_loop_factory_.RunFor(chrono::milliseconds(95));
1190
1191 StartLogger(&pi1_logger);
1192 StartLogger(&pi2_logger);
1193
1194 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1195
1196 pi1_logger.AppendAllFilenames(&actual_filenames);
1197 pi2_logger.AppendAllFilenames(&actual_filenames);
1198 }
1199
1200 ASSERT_THAT(actual_filenames,
1201 ::testing::UnorderedElementsAreArray(logfiles_));
1202
1203 // Strip off the end of one of the files. Pick one with a lot of data.
1204 // For snappy, needs to have enough data to be >1 chunk of compressed data so
1205 // that we don't corrupt the entire log part.
1206 ::std::string compressed_contents =
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07001207 aos::util::ReadFileToStringOrDie(logfiles_[2]);
Naman Guptaa63aa132023-03-22 20:06:34 -07001208
1209 aos::util::WriteStringToFileOrDie(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07001210 logfiles_[2],
Naman Guptaa63aa132023-03-22 20:06:34 -07001211 compressed_contents.substr(0, compressed_contents.size() - 100));
1212
1213 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1214 VerifyParts(sorted_parts);
1215}
1216
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001217// Tests that if we remap a logged channel, it shows up correctly.
Naman Guptaa63aa132023-03-22 20:06:34 -07001218TEST_P(MultinodeLoggerTest, RemapLoggedChannel) {
1219 time_converter_.StartEqual();
Austin Schuh8fb4b452023-08-04 17:02:27 -07001220
1221 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07001222 {
1223 LoggerState pi1_logger = MakeLogger(pi1_);
1224 LoggerState pi2_logger = MakeLogger(pi2_);
1225
1226 event_loop_factory_.RunFor(chrono::milliseconds(95));
1227
1228 StartLogger(&pi1_logger);
1229 StartLogger(&pi2_logger);
1230
1231 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07001232
1233 pi1_logger.AppendAllFilenames(&filenames);
1234 pi2_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001235 }
1236
Austin Schuh8fb4b452023-08-04 17:02:27 -07001237 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001238 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1239 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001240
1241 // Remap just on pi1.
1242 reader.RemapLoggedChannel<aos::timing::Report>(
1243 "/aos", configuration::GetNode(reader.configuration(), "pi1"));
1244
1245 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1246 log_reader_factory.set_send_delay(chrono::microseconds(0));
1247
1248 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1249 // Note: An extra channel gets remapped automatically due to a timestamp
1250 // channel being LOCAL_LOGGER'd.
1251 ASSERT_EQ(remapped_channels.size(), std::get<0>(GetParam()).shared ? 1u : 2u);
1252 EXPECT_EQ(remapped_channels[0]->name()->string_view(), "/original/pi1/aos");
1253 EXPECT_EQ(remapped_channels[0]->type()->string_view(), "aos.timing.Report");
1254 if (!std::get<0>(GetParam()).shared) {
1255 EXPECT_EQ(remapped_channels[1]->name()->string_view(),
1256 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1257 "aos-message_bridge-Timestamp");
1258 EXPECT_EQ(remapped_channels[1]->type()->string_view(),
1259 "aos.message_bridge.RemoteMessage");
1260 }
1261
1262 reader.Register(&log_reader_factory);
1263
1264 const Node *pi1 =
1265 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1266 const Node *pi2 =
1267 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1268
1269 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1270 // else should have moved.
1271 std::unique_ptr<EventLoop> pi1_event_loop =
1272 log_reader_factory.MakeEventLoop("test", pi1);
1273 pi1_event_loop->SkipTimingReport();
1274 std::unique_ptr<EventLoop> full_pi1_event_loop =
1275 log_reader_factory.MakeEventLoop("test", pi1);
1276 full_pi1_event_loop->SkipTimingReport();
1277 std::unique_ptr<EventLoop> pi2_event_loop =
1278 log_reader_factory.MakeEventLoop("test", pi2);
1279 pi2_event_loop->SkipTimingReport();
1280
1281 MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
1282 "/aos");
1283 MessageCounter<aos::timing::Report> full_pi1_timing_report(
1284 full_pi1_event_loop.get(), "/pi1/aos");
1285 MessageCounter<aos::timing::Report> pi1_original_timing_report(
1286 pi1_event_loop.get(), "/original/aos");
1287 MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
1288 full_pi1_event_loop.get(), "/original/pi1/aos");
1289 MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
1290 "/aos");
1291
1292 log_reader_factory.Run();
1293
1294 EXPECT_EQ(pi1_timing_report.count(), 0u);
1295 EXPECT_EQ(full_pi1_timing_report.count(), 0u);
1296 EXPECT_NE(pi1_original_timing_report.count(), 0u);
1297 EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
1298 EXPECT_NE(pi2_timing_report.count(), 0u);
1299
1300 reader.Deregister();
1301}
1302
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001303// Tests that if we rename a logged channel, it shows up correctly.
1304TEST_P(MultinodeLoggerTest, RenameLoggedChannel) {
1305 std::vector<std::string> actual_filenames;
1306 time_converter_.StartEqual();
1307 {
1308 LoggerState pi1_logger = MakeLogger(pi1_);
1309 LoggerState pi2_logger = MakeLogger(pi2_);
1310
1311 event_loop_factory_.RunFor(chrono::milliseconds(95));
1312
1313 StartLogger(&pi1_logger);
1314 StartLogger(&pi2_logger);
1315
1316 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1317
1318 pi1_logger.AppendAllFilenames(&actual_filenames);
1319 pi2_logger.AppendAllFilenames(&actual_filenames);
1320 }
1321
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001322 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1323 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1324 LogReader reader(sorted_parts);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001325
1326 // Rename just on pi2. Add some global maps just to verify they get added in
1327 // the config and used correctly.
1328 std::vector<MapT> maps;
1329 {
1330 MapT map;
1331 map.match = std::make_unique<ChannelT>();
1332 map.match->name = "/foo*";
1333 map.match->source_node = "pi1";
1334 map.rename = std::make_unique<ChannelT>();
1335 map.rename->name = "/pi1/foo";
1336 maps.emplace_back(std::move(map));
1337 }
1338 {
1339 MapT map;
1340 map.match = std::make_unique<ChannelT>();
1341 map.match->name = "/foo*";
1342 map.match->source_node = "pi2";
1343 map.rename = std::make_unique<ChannelT>();
1344 map.rename->name = "/pi2/foo";
1345 maps.emplace_back(std::move(map));
1346 }
1347 {
1348 MapT map;
1349 map.match = std::make_unique<ChannelT>();
1350 map.match->name = "/foo";
1351 map.match->type = "aos.examples.Ping";
1352 map.rename = std::make_unique<ChannelT>();
1353 map.rename->name = "/foo/renamed";
1354 maps.emplace_back(std::move(map));
1355 }
1356 reader.RenameLoggedChannel<aos::examples::Ping>(
1357 "/aos", configuration::GetNode(reader.configuration(), "pi2"),
1358 "/pi2/foo/renamed", maps);
1359
1360 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1361 log_reader_factory.set_send_delay(chrono::microseconds(0));
1362
1363 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1364 // Note: An extra channel gets remapped automatically due to a timestamp
1365 // channel being LOCAL_LOGGER'd.
1366 const bool shared = std::get<0>(GetParam()).shared;
1367 ASSERT_EQ(remapped_channels.size(), shared ? 1u : 2u);
1368 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->name()->string_view(),
1369 "/pi2/foo/renamed");
1370 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->type()->string_view(),
1371 "aos.examples.Ping");
1372 if (!shared) {
1373 EXPECT_EQ(remapped_channels[0]->name()->string_view(),
1374 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1375 "aos-message_bridge-Timestamp");
1376 EXPECT_EQ(remapped_channels[0]->type()->string_view(),
1377 "aos.message_bridge.RemoteMessage");
1378 }
1379
1380 reader.Register(&log_reader_factory);
1381
1382 const Node *pi1 =
1383 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1384 const Node *pi2 =
1385 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1386
1387 // Confirm we can read the data on the renamed channel, just for pi2. Nothing
1388 // else should have moved.
1389 std::unique_ptr<EventLoop> pi2_event_loop =
1390 log_reader_factory.MakeEventLoop("test", pi2);
1391 pi2_event_loop->SkipTimingReport();
1392 std::unique_ptr<EventLoop> full_pi2_event_loop =
1393 log_reader_factory.MakeEventLoop("test", pi2);
1394 full_pi2_event_loop->SkipTimingReport();
1395 std::unique_ptr<EventLoop> pi1_event_loop =
1396 log_reader_factory.MakeEventLoop("test", pi1);
1397 pi1_event_loop->SkipTimingReport();
1398
1399 MessageCounter<aos::examples::Ping> pi2_ping(pi2_event_loop.get(), "/aos");
1400 MessageCounter<aos::examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1401 "/foo");
1402 MessageCounter<aos::examples::Ping> full_pi2_renamed_ping(
1403 full_pi2_event_loop.get(), "/pi2/foo/renamed");
1404 MessageCounter<aos::examples::Ping> pi1_ping(pi1_event_loop.get(), "/aos");
1405
1406 log_reader_factory.Run();
1407
1408 EXPECT_EQ(pi2_ping.count(), 0u);
1409 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1410 EXPECT_NE(full_pi2_renamed_ping.count(), 0u);
1411 EXPECT_NE(pi1_ping.count(), 0u);
1412
1413 reader.Deregister();
1414}
1415
Naman Guptaa63aa132023-03-22 20:06:34 -07001416// Tests that we can remap a forwarded channel as well.
1417TEST_P(MultinodeLoggerTest, RemapForwardedLoggedChannel) {
1418 time_converter_.StartEqual();
1419 {
1420 LoggerState pi1_logger = MakeLogger(pi1_);
1421 LoggerState pi2_logger = MakeLogger(pi2_);
1422
1423 event_loop_factory_.RunFor(chrono::milliseconds(95));
1424
1425 StartLogger(&pi1_logger);
1426 StartLogger(&pi2_logger);
1427
1428 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1429 }
1430
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001431 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1432 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1433 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001434
1435 reader.RemapLoggedChannel<examples::Ping>("/test");
1436
1437 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1438 log_reader_factory.set_send_delay(chrono::microseconds(0));
1439
1440 reader.Register(&log_reader_factory);
1441
1442 const Node *pi1 =
1443 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1444 const Node *pi2 =
1445 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1446
1447 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1448 // else should have moved.
1449 std::unique_ptr<EventLoop> pi1_event_loop =
1450 log_reader_factory.MakeEventLoop("test", pi1);
1451 pi1_event_loop->SkipTimingReport();
1452 std::unique_ptr<EventLoop> full_pi1_event_loop =
1453 log_reader_factory.MakeEventLoop("test", pi1);
1454 full_pi1_event_loop->SkipTimingReport();
1455 std::unique_ptr<EventLoop> pi2_event_loop =
1456 log_reader_factory.MakeEventLoop("test", pi2);
1457 pi2_event_loop->SkipTimingReport();
1458
1459 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1460 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1461 MessageCounter<examples::Ping> pi1_original_ping(pi1_event_loop.get(),
1462 "/original/test");
1463 MessageCounter<examples::Ping> pi2_original_ping(pi2_event_loop.get(),
1464 "/original/test");
1465
1466 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1467 pi1_original_ping_timestamp;
1468 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1469 pi1_ping_timestamp;
1470 if (!shared()) {
1471 pi1_original_ping_timestamp =
1472 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1473 pi1_event_loop.get(),
1474 "/pi1/aos/remote_timestamps/pi2/original/test/aos-examples-Ping");
1475 pi1_ping_timestamp =
1476 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1477 pi1_event_loop.get(),
1478 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1479 }
1480
1481 log_reader_factory.Run();
1482
1483 EXPECT_EQ(pi1_ping.count(), 0u);
1484 EXPECT_EQ(pi2_ping.count(), 0u);
1485 EXPECT_NE(pi1_original_ping.count(), 0u);
1486 EXPECT_NE(pi2_original_ping.count(), 0u);
1487 if (!shared()) {
1488 EXPECT_NE(pi1_original_ping_timestamp->count(), 0u);
1489 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1490 }
1491
1492 reader.Deregister();
1493}
1494
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001495// Tests that we can rename a forwarded channel as well.
1496TEST_P(MultinodeLoggerTest, RenameForwardedLoggedChannel) {
1497 std::vector<std::string> actual_filenames;
1498 time_converter_.StartEqual();
1499 {
1500 LoggerState pi1_logger = MakeLogger(pi1_);
1501 LoggerState pi2_logger = MakeLogger(pi2_);
1502
1503 event_loop_factory_.RunFor(chrono::milliseconds(95));
1504
1505 StartLogger(&pi1_logger);
1506 StartLogger(&pi2_logger);
1507
1508 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1509
1510 pi1_logger.AppendAllFilenames(&actual_filenames);
1511 pi2_logger.AppendAllFilenames(&actual_filenames);
1512 }
1513
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001514 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1515 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1516 LogReader reader(sorted_parts);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001517
1518 std::vector<MapT> maps;
1519 {
1520 MapT map;
1521 map.match = std::make_unique<ChannelT>();
1522 map.match->name = "/production*";
1523 map.match->source_node = "pi1";
1524 map.rename = std::make_unique<ChannelT>();
1525 map.rename->name = "/pi1/production";
1526 maps.emplace_back(std::move(map));
1527 }
1528 {
1529 MapT map;
1530 map.match = std::make_unique<ChannelT>();
1531 map.match->name = "/production*";
1532 map.match->source_node = "pi2";
1533 map.rename = std::make_unique<ChannelT>();
1534 map.rename->name = "/pi2/production";
1535 maps.emplace_back(std::move(map));
1536 }
1537 reader.RenameLoggedChannel<aos::examples::Ping>(
1538 "/test", configuration::GetNode(reader.configuration(), "pi1"),
1539 "/pi1/production", maps);
1540
1541 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1542 log_reader_factory.set_send_delay(chrono::microseconds(0));
1543
1544 reader.Register(&log_reader_factory);
1545
1546 const Node *pi1 =
1547 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1548 const Node *pi2 =
1549 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1550
1551 // Confirm we can read the data on the renamed channel, on both the source
1552 // node and the remote node. In case of split timestamp channels, confirm that
1553 // we receive the timestamp messages on the renamed channel as well.
1554 std::unique_ptr<EventLoop> pi1_event_loop =
1555 log_reader_factory.MakeEventLoop("test", pi1);
1556 pi1_event_loop->SkipTimingReport();
1557 std::unique_ptr<EventLoop> full_pi1_event_loop =
1558 log_reader_factory.MakeEventLoop("test", pi1);
1559 full_pi1_event_loop->SkipTimingReport();
1560 std::unique_ptr<EventLoop> pi2_event_loop =
1561 log_reader_factory.MakeEventLoop("test", pi2);
1562 pi2_event_loop->SkipTimingReport();
1563
1564 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1565 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1566 MessageCounter<examples::Ping> pi1_renamed_ping(pi1_event_loop.get(),
1567 "/pi1/production");
1568 MessageCounter<examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1569 "/pi1/production");
1570
1571 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1572 pi1_renamed_ping_timestamp;
1573 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1574 pi1_ping_timestamp;
1575 if (!shared()) {
1576 pi1_renamed_ping_timestamp =
1577 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1578 pi1_event_loop.get(),
1579 "/pi1/aos/remote_timestamps/pi2/pi1/production/aos-examples-Ping");
1580 pi1_ping_timestamp =
1581 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1582 pi1_event_loop.get(),
1583 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1584 }
1585
1586 log_reader_factory.Run();
1587
1588 EXPECT_EQ(pi1_ping.count(), 0u);
1589 EXPECT_EQ(pi2_ping.count(), 0u);
1590 EXPECT_NE(pi1_renamed_ping.count(), 0u);
1591 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1592 if (!shared()) {
1593 EXPECT_NE(pi1_renamed_ping_timestamp->count(), 0u);
1594 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1595 }
1596
1597 reader.Deregister();
1598}
1599
Naman Guptaa63aa132023-03-22 20:06:34 -07001600// Tests that we observe all the same events in log replay (for a given node)
1601// whether we just register an event loop for that node or if we register a full
1602// event loop factory.
1603TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
1604 time_converter_.StartEqual();
1605 constexpr chrono::milliseconds kStartupDelay(95);
Austin Schuh8fb4b452023-08-04 17:02:27 -07001606 std::vector<std::string> filenames;
1607
Naman Guptaa63aa132023-03-22 20:06:34 -07001608 {
1609 LoggerState pi1_logger = MakeLogger(pi1_);
1610 LoggerState pi2_logger = MakeLogger(pi2_);
1611
1612 event_loop_factory_.RunFor(kStartupDelay);
1613
1614 StartLogger(&pi1_logger);
1615 StartLogger(&pi2_logger);
1616
1617 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07001618
1619 pi1_logger.AppendAllFilenames(&filenames);
1620 pi2_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001621 }
1622
Austin Schuh8fb4b452023-08-04 17:02:27 -07001623 LogReader full_reader(SortParts(filenames));
1624 LogReader single_node_reader(SortParts(filenames));
Naman Guptaa63aa132023-03-22 20:06:34 -07001625
1626 SimulatedEventLoopFactory full_factory(full_reader.configuration());
1627 SimulatedEventLoopFactory single_node_factory(
1628 single_node_reader.configuration());
1629 single_node_factory.SkipTimingReport();
1630 single_node_factory.DisableStatistics();
1631 std::unique_ptr<EventLoop> replay_event_loop =
1632 single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
1633 "log_reader");
1634
1635 full_reader.Register(&full_factory);
1636 single_node_reader.Register(replay_event_loop.get());
1637
1638 const Node *full_pi1 =
1639 configuration::GetNode(full_factory.configuration(), "pi1");
1640
1641 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1642 // else should have moved.
1643 std::unique_ptr<EventLoop> full_event_loop =
1644 full_factory.MakeEventLoop("test", full_pi1);
1645 full_event_loop->SkipTimingReport();
1646 full_event_loop->SkipAosLog();
1647 // maps are indexed on channel index.
1648 // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
1649 std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
1650 observed_messages;
1651 std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
1652 for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
1653 ++ii) {
1654 const Channel *channel =
1655 full_event_loop->configuration()->channels()->Get(ii);
1656 // We currently don't support replaying remote timestamp channels in
1657 // realtime replay (unless the remote timestamp channel was not NOT_LOGGED,
1658 // in which case it gets auto-remapped and replayed on a /original channel).
1659 if (channel->name()->string_view().find("remote_timestamp") !=
1660 std::string_view::npos &&
1661 channel->name()->string_view().find("/original") ==
1662 std::string_view::npos) {
1663 continue;
1664 }
1665 if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
1666 observed_messages[ii] = {};
1667 fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
1668 full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
1669 if (fetchers[ii]->Fetch()) {
1670 observed_messages[ii].push_back(std::make_pair(
1671 fetchers[ii]->context().monotonic_event_time, true));
1672 }
1673 });
1674 full_event_loop->MakeRawNoArgWatcher(
1675 channel, [ii, &observed_messages](const Context &context) {
1676 observed_messages[ii].push_back(
1677 std::make_pair(context.monotonic_event_time, false));
1678 });
1679 }
1680 }
1681
1682 full_factory.Run();
1683 fetchers.clear();
1684 full_reader.Deregister();
1685
1686 const Node *single_node_pi1 =
1687 configuration::GetNode(single_node_factory.configuration(), "pi1");
1688 std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
1689
1690 std::unique_ptr<EventLoop> single_node_event_loop =
1691 single_node_factory.MakeEventLoop("test", single_node_pi1);
1692 single_node_event_loop->SkipTimingReport();
1693 single_node_event_loop->SkipAosLog();
1694 for (size_t ii = 0;
1695 ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
1696 const Channel *channel =
1697 single_node_event_loop->configuration()->channels()->Get(ii);
1698 single_node_factory.DisableForwarding(channel);
1699 if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
1700 single_node_fetchers[ii] =
1701 single_node_event_loop->MakeRawFetcher(channel);
1702 single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
1703 EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
1704 << "Single EventLoop replay doesn't support pre-loading fetchers. "
1705 << configuration::StrippedChannelToString(channel);
1706 });
1707 single_node_event_loop->MakeRawNoArgWatcher(
1708 channel, [ii, &observed_messages, channel,
1709 kStartupDelay](const Context &context) {
1710 if (observed_messages[ii].empty()) {
1711 FAIL() << "Observed extra message at "
1712 << context.monotonic_event_time << " on "
1713 << configuration::StrippedChannelToString(channel);
1714 return;
1715 }
1716 const std::pair<monotonic_clock::time_point, bool> &message =
1717 observed_messages[ii].front();
1718 if (message.second) {
1719 EXPECT_LE(message.first,
1720 context.monotonic_event_time + kStartupDelay)
1721 << "Mismatched message times " << context.monotonic_event_time
1722 << " and " << message.first << " on "
1723 << configuration::StrippedChannelToString(channel);
1724 } else {
1725 EXPECT_EQ(message.first,
1726 context.monotonic_event_time + kStartupDelay)
1727 << "Mismatched message times " << context.monotonic_event_time
1728 << " and " << message.first << " on "
1729 << configuration::StrippedChannelToString(channel);
1730 }
1731 observed_messages[ii].erase(observed_messages[ii].begin());
1732 });
1733 }
1734 }
1735
1736 single_node_factory.Run();
1737
1738 single_node_fetchers.clear();
1739
1740 single_node_reader.Deregister();
1741
1742 for (const auto &pair : observed_messages) {
1743 EXPECT_TRUE(pair.second.empty())
1744 << "Missed " << pair.second.size() << " messages on "
1745 << configuration::StrippedChannelToString(
1746 single_node_event_loop->configuration()->channels()->Get(
1747 pair.first));
1748 }
1749}
1750
1751// Tests that we properly recreate forwarded timestamps when replaying a log.
1752// This should be enough that we can then re-run the logger and get a valid log
1753// back.
1754TEST_P(MultinodeLoggerTest, MessageHeader) {
1755 time_converter_.StartEqual();
1756 {
1757 LoggerState pi1_logger = MakeLogger(pi1_);
1758 LoggerState pi2_logger = MakeLogger(pi2_);
1759
1760 event_loop_factory_.RunFor(chrono::milliseconds(95));
1761
1762 StartLogger(&pi1_logger);
1763 StartLogger(&pi2_logger);
1764
1765 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1766 }
1767
1768 LogReader reader(SortParts(logfiles_));
1769
1770 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1771 log_reader_factory.set_send_delay(chrono::microseconds(0));
1772
1773 // This sends out the fetched messages and advances time to the start of the
1774 // log file.
1775 reader.Register(&log_reader_factory);
1776
1777 const Node *pi1 =
1778 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1779 const Node *pi2 =
1780 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1781
1782 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1783 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1784 LOG(INFO) << "now pi1 "
1785 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1786 LOG(INFO) << "now pi2 "
1787 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1788
1789 EXPECT_THAT(reader.LoggedNodes(),
1790 ::testing::ElementsAre(
1791 configuration::GetNode(reader.logged_configuration(), pi1),
1792 configuration::GetNode(reader.logged_configuration(), pi2)));
1793
1794 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1795
1796 std::unique_ptr<EventLoop> pi1_event_loop =
1797 log_reader_factory.MakeEventLoop("test", pi1);
1798 std::unique_ptr<EventLoop> pi2_event_loop =
1799 log_reader_factory.MakeEventLoop("test", pi2);
1800
1801 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
1802 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1803 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
1804 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1805
1806 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1807 pi1_event_loop->MakeFetcher<examples::Ping>("/test");
1808 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1809 pi2_event_loop->MakeFetcher<examples::Ping>("/test");
1810
1811 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
1812 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1813 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
1814 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1815
1816 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1817 pi2_event_loop->MakeFetcher<examples::Pong>("/test");
1818 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1819 pi1_event_loop->MakeFetcher<examples::Pong>("/test");
1820
1821 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
1822 pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
1823 const size_t ping_timestamp_channel = configuration::ChannelIndex(
1824 pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
1825
1826 const size_t pi2_timestamp_channel = configuration::ChannelIndex(
1827 pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
1828 const size_t pong_timestamp_channel = configuration::ChannelIndex(
1829 pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
1830
1831 const chrono::nanoseconds network_delay = event_loop_factory_.network_delay();
1832 const chrono::nanoseconds send_delay = event_loop_factory_.send_delay();
1833
1834 for (std::pair<int, std::string> channel :
1835 shared()
1836 ? std::vector<
1837 std::pair<int, std::string>>{{-1,
1838 "/aos/remote_timestamps/pi2"}}
1839 : std::vector<std::pair<int, std::string>>{
1840 {pi1_timestamp_channel,
1841 "/aos/remote_timestamps/pi2/pi1/aos/"
1842 "aos-message_bridge-Timestamp"},
1843 {ping_timestamp_channel,
1844 "/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
1845 pi1_event_loop->MakeWatcher(
1846 channel.second,
1847 [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
1848 ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
1849 &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
1850 &ping_on_pi2_fetcher, network_delay, send_delay,
1851 channel_index = channel.first](const RemoteMessage &header) {
1852 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1853 chrono::nanoseconds(header.monotonic_sent_time()));
1854 const aos::realtime_clock::time_point header_realtime_sent_time(
1855 chrono::nanoseconds(header.realtime_sent_time()));
1856 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1857 chrono::nanoseconds(header.monotonic_remote_time()));
1858 const aos::realtime_clock::time_point header_realtime_remote_time(
1859 chrono::nanoseconds(header.realtime_remote_time()));
1860
1861 if (channel_index != -1) {
1862 ASSERT_EQ(channel_index, header.channel_index());
1863 }
1864
1865 const Context *pi1_context = nullptr;
1866 const Context *pi2_context = nullptr;
1867
1868 if (header.channel_index() == pi1_timestamp_channel) {
1869 ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
1870 ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
1871 pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
1872 pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
1873 } else if (header.channel_index() == ping_timestamp_channel) {
1874 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
1875 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
1876 pi1_context = &ping_on_pi1_fetcher.context();
1877 pi2_context = &ping_on_pi2_fetcher.context();
1878 } else {
1879 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1880 << configuration::CleanedChannelToString(
1881 pi1_event_loop->configuration()->channels()->Get(
1882 header.channel_index()));
1883 }
1884
1885 ASSERT_TRUE(header.has_boot_uuid());
1886 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1887 pi2_event_loop->boot_uuid());
1888
1889 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
1890 EXPECT_EQ(pi2_context->remote_queue_index,
1891 header.remote_queue_index());
1892 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
1893
1894 EXPECT_EQ(pi2_context->monotonic_event_time,
1895 header_monotonic_sent_time);
1896 EXPECT_EQ(pi2_context->realtime_event_time,
1897 header_realtime_sent_time);
1898 EXPECT_EQ(pi2_context->realtime_remote_time,
1899 header_realtime_remote_time);
1900 EXPECT_EQ(pi2_context->monotonic_remote_time,
1901 header_monotonic_remote_time);
1902
1903 EXPECT_EQ(pi1_context->realtime_event_time,
1904 header_realtime_remote_time);
1905 EXPECT_EQ(pi1_context->monotonic_event_time,
1906 header_monotonic_remote_time);
1907
1908 // Time estimation isn't perfect, but we know the clocks were
1909 // identical when logged, so we know when this should have come back.
1910 // Confirm we got it when we expected.
1911 EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
1912 pi1_context->monotonic_event_time + 2 * network_delay +
1913 send_delay);
1914 });
1915 }
1916 for (std::pair<int, std::string> channel :
1917 shared()
1918 ? std::vector<
1919 std::pair<int, std::string>>{{-1,
1920 "/aos/remote_timestamps/pi1"}}
1921 : std::vector<std::pair<int, std::string>>{
1922 {pi2_timestamp_channel,
1923 "/aos/remote_timestamps/pi1/pi2/aos/"
1924 "aos-message_bridge-Timestamp"}}) {
1925 pi2_event_loop->MakeWatcher(
1926 channel.second,
1927 [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
1928 pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
1929 &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
1930 &pong_on_pi1_fetcher, network_delay, send_delay,
1931 channel_index = channel.first](const RemoteMessage &header) {
1932 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1933 chrono::nanoseconds(header.monotonic_sent_time()));
1934 const aos::realtime_clock::time_point header_realtime_sent_time(
1935 chrono::nanoseconds(header.realtime_sent_time()));
1936 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1937 chrono::nanoseconds(header.monotonic_remote_time()));
1938 const aos::realtime_clock::time_point header_realtime_remote_time(
1939 chrono::nanoseconds(header.realtime_remote_time()));
1940
1941 if (channel_index != -1) {
1942 ASSERT_EQ(channel_index, header.channel_index());
1943 }
1944
1945 const Context *pi2_context = nullptr;
1946 const Context *pi1_context = nullptr;
1947
1948 if (header.channel_index() == pi2_timestamp_channel) {
1949 ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
1950 ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
1951 pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
1952 pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
1953 } else if (header.channel_index() == pong_timestamp_channel) {
1954 ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
1955 ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
1956 pi2_context = &pong_on_pi2_fetcher.context();
1957 pi1_context = &pong_on_pi1_fetcher.context();
1958 } else {
1959 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1960 << configuration::CleanedChannelToString(
1961 pi2_event_loop->configuration()->channels()->Get(
1962 header.channel_index()));
1963 }
1964
1965 ASSERT_TRUE(header.has_boot_uuid());
1966 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1967 pi1_event_loop->boot_uuid());
1968
1969 EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
1970 EXPECT_EQ(pi1_context->remote_queue_index,
1971 header.remote_queue_index());
1972 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
1973
1974 EXPECT_EQ(pi1_context->monotonic_event_time,
1975 header_monotonic_sent_time);
1976 EXPECT_EQ(pi1_context->realtime_event_time,
1977 header_realtime_sent_time);
1978 EXPECT_EQ(pi1_context->realtime_remote_time,
1979 header_realtime_remote_time);
1980 EXPECT_EQ(pi1_context->monotonic_remote_time,
1981 header_monotonic_remote_time);
1982
1983 EXPECT_EQ(pi2_context->realtime_event_time,
1984 header_realtime_remote_time);
1985 EXPECT_EQ(pi2_context->monotonic_event_time,
1986 header_monotonic_remote_time);
1987
1988 // Time estimation isn't perfect, but we know the clocks were
1989 // identical when logged, so we know when this should have come back.
1990 // Confirm we got it when we expected.
1991 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
1992 pi2_context->monotonic_event_time + 2 * network_delay +
1993 send_delay);
1994 });
1995 }
1996
1997 // And confirm we can re-create a log again, while checking the contents.
1998 {
1999 LoggerState pi1_logger = MakeLogger(
2000 log_reader_factory.GetNodeEventLoopFactory("pi1"), &log_reader_factory);
2001 LoggerState pi2_logger = MakeLogger(
2002 log_reader_factory.GetNodeEventLoopFactory("pi2"), &log_reader_factory);
2003
Austin Schuh8fb4b452023-08-04 17:02:27 -07002004 StartLogger(&pi1_logger, tmp_dir_ + "/logs/relogged1");
2005 StartLogger(&pi2_logger, tmp_dir_ + "/logs/relogged2");
Naman Guptaa63aa132023-03-22 20:06:34 -07002006
2007 log_reader_factory.Run();
2008 }
2009
2010 reader.Deregister();
2011
2012 // And verify that we can run the LogReader over the relogged files without
2013 // hitting any fatal errors.
2014 {
Austin Schuh8fb4b452023-08-04 17:02:27 -07002015 LogReader relogged_reader(SortParts(
2016 MakeLogFiles(tmp_dir_ + "/logs/relogged1", tmp_dir_ + "/logs/relogged2",
2017 1, 1, 2, 2, true)));
Naman Guptaa63aa132023-03-22 20:06:34 -07002018 relogged_reader.Register();
2019
2020 relogged_reader.event_loop_factory()->Run();
2021 }
2022 // And confirm that we can read the logged file using the reader's
2023 // configuration.
2024 {
2025 LogReader relogged_reader(
Austin Schuh8fb4b452023-08-04 17:02:27 -07002026 SortParts(MakeLogFiles(tmp_dir_ + "/logs/relogged1",
2027 tmp_dir_ + "/logs/relogged2", 1, 1, 2, 2, true)),
Naman Guptaa63aa132023-03-22 20:06:34 -07002028 reader.configuration());
2029 relogged_reader.Register();
2030
2031 relogged_reader.event_loop_factory()->Run();
2032 }
2033}
2034
2035// Tests that we properly populate and extract the logger_start time by setting
2036// up a clock difference between 2 nodes and looking at the resulting parts.
2037TEST_P(MultinodeLoggerTest, LoggerStartTime) {
2038 std::vector<std::string> actual_filenames;
2039 time_converter_.AddMonotonic(
2040 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2041 {
2042 LoggerState pi1_logger = MakeLogger(pi1_);
2043 LoggerState pi2_logger = MakeLogger(pi2_);
2044
2045 StartLogger(&pi1_logger);
2046 StartLogger(&pi2_logger);
2047
2048 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2049
2050 pi1_logger.AppendAllFilenames(&actual_filenames);
2051 pi2_logger.AppendAllFilenames(&actual_filenames);
2052 }
2053
2054 ASSERT_THAT(actual_filenames,
2055 ::testing::UnorderedElementsAreArray(logfiles_));
2056
Austin Schuh8fb4b452023-08-04 17:02:27 -07002057 for (const LogFile &log_file : SortParts(actual_filenames)) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002058 for (const LogParts &log_part : log_file.parts) {
2059 if (log_part.node == log_file.logger_node) {
2060 EXPECT_EQ(log_part.logger_monotonic_start_time,
2061 aos::monotonic_clock::min_time);
2062 EXPECT_EQ(log_part.logger_realtime_start_time,
2063 aos::realtime_clock::min_time);
2064 } else {
2065 const chrono::seconds offset = log_file.logger_node == "pi1"
2066 ? -chrono::seconds(1000)
2067 : chrono::seconds(1000);
2068 EXPECT_EQ(log_part.logger_monotonic_start_time,
2069 log_part.monotonic_start_time + offset);
2070 EXPECT_EQ(log_part.logger_realtime_start_time,
2071 log_file.realtime_start_time +
2072 (log_part.logger_monotonic_start_time -
2073 log_file.monotonic_start_time));
2074 }
2075 }
2076 }
2077}
2078
2079// Test that renaming the base, renames the folder.
2080TEST_P(MultinodeLoggerTest, LoggerRenameFolder) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002081 time_converter_.AddMonotonic(
2082 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07002083 logfile_base1_ = tmp_dir_ + "/logs/renamefolder/multi_logfile1";
2084 logfile_base2_ = tmp_dir_ + "/logs/renamefolder/multi_logfile2";
2085
Naman Guptaa63aa132023-03-22 20:06:34 -07002086 LoggerState pi1_logger = MakeLogger(pi1_);
2087 LoggerState pi2_logger = MakeLogger(pi2_);
2088
2089 StartLogger(&pi1_logger);
2090 StartLogger(&pi2_logger);
2091
2092 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002093 logfile_base1_ = tmp_dir_ + "/logs/new-good/multi_logfile1";
2094 logfile_base2_ = tmp_dir_ + "/logs/new-good/multi_logfile2";
Naman Guptaa63aa132023-03-22 20:06:34 -07002095 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
Alexei Strotscaf17d32023-04-03 22:31:11 -07002096
2097 // Sequence of set_base_name and Rotate simulates rename operation. Since
2098 // rename is not supported by all namers, RenameLogBase moved from logger to
2099 // the higher level abstraction, yet log_namers support rename, and it is
2100 // legal to test it here.
2101 pi1_logger.log_namer->set_base_name(logfile_base1_);
2102 pi1_logger.logger->Rotate();
2103 pi2_logger.log_namer->set_base_name(logfile_base2_);
2104 pi2_logger.logger->Rotate();
2105
Naman Guptaa63aa132023-03-22 20:06:34 -07002106 for (auto &file : logfiles_) {
2107 struct stat s;
2108 EXPECT_EQ(0, stat(file.c_str(), &s));
2109 }
2110}
2111
2112// Test that renaming the file base dies.
2113TEST_P(MultinodeLoggerDeathTest, LoggerRenameFile) {
2114 time_converter_.AddMonotonic(
2115 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07002116 logfile_base1_ = tmp_dir_ + "/logs/renamefile/multi_logfile1";
2117 logfile_base2_ = tmp_dir_ + "/logs/renamefile/multi_logfile2";
2118
Naman Guptaa63aa132023-03-22 20:06:34 -07002119 LoggerState pi1_logger = MakeLogger(pi1_);
2120 StartLogger(&pi1_logger);
2121 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002122 logfile_base1_ = tmp_dir_ + "/logs/new-renamefile/new_multi_logfile1";
Alexei Strotscaf17d32023-04-03 22:31:11 -07002123 EXPECT_DEATH({ pi1_logger.log_namer->set_base_name(logfile_base1_); },
Naman Guptaa63aa132023-03-22 20:06:34 -07002124 "Rename of file base from");
2125}
2126
2127// TODO(austin): We can write a test which recreates a logfile and confirms that
2128// we get it back. That is the ultimate test.
2129
2130// Tests that we properly recreate forwarded timestamps when replaying a log.
2131// This should be enough that we can then re-run the logger and get a valid log
2132// back.
2133TEST_P(MultinodeLoggerTest, RemoteReboot) {
Austin Schuh6ecfe902023-08-04 22:44:37 -07002134 if (file_strategy() == FileStrategy::kCombine) {
2135 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
2136 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002137 std::vector<std::string> actual_filenames;
2138
2139 const UUID pi1_boot0 = UUID::Random();
2140 const UUID pi2_boot0 = UUID::Random();
2141 const UUID pi2_boot1 = UUID::Random();
2142 {
2143 CHECK_EQ(pi1_index_, 0u);
2144 CHECK_EQ(pi2_index_, 1u);
2145
2146 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2147 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2148 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2149
2150 time_converter_.AddNextTimestamp(
2151 distributed_clock::epoch(),
2152 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2153 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2154 time_converter_.AddNextTimestamp(
2155 distributed_clock::epoch() + reboot_time,
2156 {BootTimestamp::epoch() + reboot_time,
2157 BootTimestamp{
2158 .boot = 1,
2159 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2160 }
2161
2162 {
2163 LoggerState pi1_logger = MakeLogger(pi1_);
2164
2165 event_loop_factory_.RunFor(chrono::milliseconds(95));
2166 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2167 pi1_boot0);
2168 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2169 pi2_boot0);
2170
2171 StartLogger(&pi1_logger);
2172
2173 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2174
2175 VLOG(1) << "Reboot now!";
2176
2177 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2178 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2179 pi1_boot0);
2180 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2181 pi2_boot1);
2182
2183 pi1_logger.AppendAllFilenames(&actual_filenames);
2184 }
2185
2186 std::sort(actual_filenames.begin(), actual_filenames.end());
2187 std::sort(pi1_reboot_logfiles_.begin(), pi1_reboot_logfiles_.end());
2188 ASSERT_THAT(actual_filenames,
2189 ::testing::UnorderedElementsAreArray(pi1_reboot_logfiles_));
2190
2191 // Confirm that our new oldest timestamps properly update as we reboot and
2192 // rotate.
2193 for (const std::string &file : pi1_reboot_logfiles_) {
2194 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2195 ReadHeader(file);
2196 CHECK(log_header);
2197 if (log_header->message().has_configuration()) {
2198 continue;
2199 }
2200
2201 const monotonic_clock::time_point monotonic_start_time =
2202 monotonic_clock::time_point(
2203 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2204 const UUID source_node_boot_uuid = UUID::FromString(
2205 log_header->message().source_node_boot_uuid()->string_view());
2206
2207 if (log_header->message().node()->name()->string_view() != "pi1") {
2208 // The remote message channel should rotate later and have more parts.
2209 // This only is true on the log files with shared remote messages.
2210 //
2211 // TODO(austin): I'm not the most thrilled with this test pattern... It
2212 // feels brittle in a different way.
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002213 if (file.find("timestamps/remote_pi2") == std::string::npos) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002214 switch (log_header->message().parts_index()) {
2215 case 0:
2216 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2217 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2218 break;
2219 case 1:
2220 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2221 ASSERT_EQ(monotonic_start_time,
2222 monotonic_clock::epoch() + chrono::seconds(1));
2223 break;
2224 case 2:
2225 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2226 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2227 break;
2228 case 3:
2229 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2230 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
2231 chrono::nanoseconds(2322999462))
2232 << " on " << file;
2233 break;
2234 default:
2235 FAIL();
2236 break;
2237 }
2238 } else {
2239 switch (log_header->message().parts_index()) {
2240 case 0:
2241 case 1:
2242 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2243 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2244 break;
2245 case 2:
2246 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2247 ASSERT_EQ(monotonic_start_time,
2248 monotonic_clock::epoch() + chrono::seconds(1));
2249 break;
2250 case 3:
2251 case 4:
2252 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2253 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2254 break;
2255 case 5:
2256 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2257 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
2258 chrono::nanoseconds(2322999462))
2259 << " on " << file;
2260 break;
2261 default:
2262 FAIL();
2263 break;
2264 }
2265 }
2266 continue;
2267 }
2268 SCOPED_TRACE(file);
2269 SCOPED_TRACE(aos::FlatbufferToJson(
2270 *log_header, {.multi_line = true, .max_vector_size = 100}));
2271 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2272 ASSERT_EQ(
2273 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2274 EXPECT_EQ(
2275 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2276 monotonic_clock::max_time.time_since_epoch().count());
2277 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2278 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2279 2u);
2280 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2281 monotonic_clock::max_time.time_since_epoch().count());
2282 ASSERT_TRUE(log_header->message()
2283 .has_oldest_remote_unreliable_monotonic_timestamps());
2284 ASSERT_EQ(log_header->message()
2285 .oldest_remote_unreliable_monotonic_timestamps()
2286 ->size(),
2287 2u);
2288 EXPECT_EQ(log_header->message()
2289 .oldest_remote_unreliable_monotonic_timestamps()
2290 ->Get(0),
2291 monotonic_clock::max_time.time_since_epoch().count());
2292 ASSERT_TRUE(log_header->message()
2293 .has_oldest_local_unreliable_monotonic_timestamps());
2294 ASSERT_EQ(log_header->message()
2295 .oldest_local_unreliable_monotonic_timestamps()
2296 ->size(),
2297 2u);
2298 EXPECT_EQ(log_header->message()
2299 .oldest_local_unreliable_monotonic_timestamps()
2300 ->Get(0),
2301 monotonic_clock::max_time.time_since_epoch().count());
2302
2303 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2304 monotonic_clock::time_point(chrono::nanoseconds(
2305 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2306 1)));
2307 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2308 monotonic_clock::time_point(chrono::nanoseconds(
2309 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2310 const monotonic_clock::time_point
2311 oldest_remote_unreliable_monotonic_timestamps =
2312 monotonic_clock::time_point(chrono::nanoseconds(
2313 log_header->message()
2314 .oldest_remote_unreliable_monotonic_timestamps()
2315 ->Get(1)));
2316 const monotonic_clock::time_point
2317 oldest_local_unreliable_monotonic_timestamps =
2318 monotonic_clock::time_point(chrono::nanoseconds(
2319 log_header->message()
2320 .oldest_local_unreliable_monotonic_timestamps()
2321 ->Get(1)));
2322 const monotonic_clock::time_point
2323 oldest_remote_reliable_monotonic_timestamps =
2324 monotonic_clock::time_point(chrono::nanoseconds(
2325 log_header->message()
2326 .oldest_remote_reliable_monotonic_timestamps()
2327 ->Get(1)));
2328 const monotonic_clock::time_point
2329 oldest_local_reliable_monotonic_timestamps =
2330 monotonic_clock::time_point(chrono::nanoseconds(
2331 log_header->message()
2332 .oldest_local_reliable_monotonic_timestamps()
2333 ->Get(1)));
2334 const monotonic_clock::time_point
2335 oldest_logger_remote_unreliable_monotonic_timestamps =
2336 monotonic_clock::time_point(chrono::nanoseconds(
2337 log_header->message()
2338 .oldest_logger_remote_unreliable_monotonic_timestamps()
2339 ->Get(0)));
2340 const monotonic_clock::time_point
2341 oldest_logger_local_unreliable_monotonic_timestamps =
2342 monotonic_clock::time_point(chrono::nanoseconds(
2343 log_header->message()
2344 .oldest_logger_local_unreliable_monotonic_timestamps()
2345 ->Get(0)));
2346 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2347 monotonic_clock::max_time);
2348 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2349 monotonic_clock::max_time);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002350 if (log_header->message().data_stored()->Get(0) == StoredDataType::DATA) {
2351 switch (log_header->message().parts_index()) {
2352 case 0:
2353 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2354 monotonic_clock::max_time);
2355 EXPECT_EQ(oldest_local_monotonic_timestamps,
2356 monotonic_clock::max_time);
2357 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2358 monotonic_clock::max_time);
2359 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2360 monotonic_clock::max_time);
2361 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2362 monotonic_clock::max_time);
2363 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2364 monotonic_clock::max_time);
2365 break;
2366 default:
2367 FAIL();
2368 break;
2369 }
2370 } else if (log_header->message().data_stored()->Get(0) ==
2371 StoredDataType::TIMESTAMPS) {
2372 switch (log_header->message().parts_index()) {
2373 case 0:
2374 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2375 monotonic_clock::time_point(chrono::microseconds(90200)));
2376 EXPECT_EQ(oldest_local_monotonic_timestamps,
2377 monotonic_clock::time_point(chrono::microseconds(90350)));
2378 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2379 monotonic_clock::time_point(chrono::microseconds(90200)));
2380 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2381 monotonic_clock::time_point(chrono::microseconds(90350)));
2382 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2383 monotonic_clock::max_time);
2384 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2385 monotonic_clock::max_time);
2386 break;
2387 case 1:
2388 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2389 monotonic_clock::time_point(chrono::microseconds(90200)))
2390 << file;
2391 EXPECT_EQ(oldest_local_monotonic_timestamps,
2392 monotonic_clock::time_point(chrono::microseconds(90350)))
2393 << file;
2394 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2395 monotonic_clock::time_point(chrono::microseconds(90200)))
2396 << file;
2397 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2398 monotonic_clock::time_point(chrono::microseconds(90350)))
2399 << file;
2400 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2401 monotonic_clock::time_point(chrono::microseconds(100000)))
2402 << file;
2403 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2404 monotonic_clock::time_point(chrono::microseconds(100150)))
2405 << file;
2406 break;
2407 case 2:
2408 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2409 monotonic_clock::time_point(chrono::milliseconds(1323) +
2410 chrono::microseconds(200)));
2411 EXPECT_EQ(
2412 oldest_local_monotonic_timestamps,
2413 monotonic_clock::time_point(chrono::microseconds(10100350)));
2414 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2415 monotonic_clock::time_point(chrono::milliseconds(1323) +
2416 chrono::microseconds(200)));
2417 EXPECT_EQ(
2418 oldest_local_unreliable_monotonic_timestamps,
2419 monotonic_clock::time_point(chrono::microseconds(10100350)));
2420 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2421 monotonic_clock::max_time)
2422 << file;
2423 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2424 monotonic_clock::max_time)
2425 << file;
2426 break;
2427 case 3:
2428 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2429 monotonic_clock::time_point(chrono::milliseconds(1323) +
2430 chrono::microseconds(200)));
2431 EXPECT_EQ(
2432 oldest_local_monotonic_timestamps,
2433 monotonic_clock::time_point(chrono::microseconds(10100350)));
2434 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2435 monotonic_clock::time_point(chrono::milliseconds(1323) +
2436 chrono::microseconds(200)));
2437 EXPECT_EQ(
2438 oldest_local_unreliable_monotonic_timestamps,
2439 monotonic_clock::time_point(chrono::microseconds(10100350)));
2440 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2441 monotonic_clock::time_point(chrono::microseconds(1423000)))
2442 << file;
2443 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2444 monotonic_clock::time_point(chrono::microseconds(10200150)))
2445 << file;
2446 break;
2447 default:
2448 FAIL();
2449 break;
2450 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002451 }
2452 }
2453
2454 // Confirm that we refuse to replay logs with missing boot uuids.
2455 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002456 auto sorted_parts = SortParts(pi1_reboot_logfiles_);
2457 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2458 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002459
2460 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2461 log_reader_factory.set_send_delay(chrono::microseconds(0));
2462
2463 // This sends out the fetched messages and advances time to the start of
2464 // the log file.
2465 reader.Register(&log_reader_factory);
2466
2467 log_reader_factory.Run();
2468
2469 reader.Deregister();
2470 }
2471}
2472
2473// Tests that we can sort a log which only has timestamps from the remote
2474// because the local message_bridge_client failed to connect.
2475TEST_P(MultinodeLoggerTest, RemoteRebootOnlyTimestamps) {
Austin Schuh6ecfe902023-08-04 22:44:37 -07002476 if (file_strategy() == FileStrategy::kCombine) {
2477 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
2478 }
2479
Naman Guptaa63aa132023-03-22 20:06:34 -07002480 const UUID pi1_boot0 = UUID::Random();
2481 const UUID pi2_boot0 = UUID::Random();
2482 const UUID pi2_boot1 = UUID::Random();
2483 {
2484 CHECK_EQ(pi1_index_, 0u);
2485 CHECK_EQ(pi2_index_, 1u);
2486
2487 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2488 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2489 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2490
2491 time_converter_.AddNextTimestamp(
2492 distributed_clock::epoch(),
2493 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2494 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2495 time_converter_.AddNextTimestamp(
2496 distributed_clock::epoch() + reboot_time,
2497 {BootTimestamp::epoch() + reboot_time,
2498 BootTimestamp{
2499 .boot = 1,
2500 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2501 }
2502 pi2_->Disconnect(pi1_->node());
2503
2504 std::vector<std::string> filenames;
2505 {
2506 LoggerState pi1_logger = MakeLogger(pi1_);
2507
2508 event_loop_factory_.RunFor(chrono::milliseconds(95));
2509 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2510 pi1_boot0);
2511 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2512 pi2_boot0);
2513
2514 StartLogger(&pi1_logger);
2515
2516 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2517
2518 VLOG(1) << "Reboot now!";
2519
2520 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2521 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2522 pi1_boot0);
2523 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2524 pi2_boot1);
2525 pi1_logger.AppendAllFilenames(&filenames);
2526 }
2527
2528 std::sort(filenames.begin(), filenames.end());
2529
2530 // Confirm that our new oldest timestamps properly update as we reboot and
2531 // rotate.
2532 size_t timestamp_file_count = 0;
2533 for (const std::string &file : filenames) {
2534 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2535 ReadHeader(file);
2536 CHECK(log_header);
2537
2538 if (log_header->message().has_configuration()) {
2539 continue;
2540 }
2541
2542 const monotonic_clock::time_point monotonic_start_time =
2543 monotonic_clock::time_point(
2544 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2545 const UUID source_node_boot_uuid = UUID::FromString(
2546 log_header->message().source_node_boot_uuid()->string_view());
2547
2548 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2549 ASSERT_EQ(
2550 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2551 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2552 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2553 2u);
2554 ASSERT_TRUE(log_header->message()
2555 .has_oldest_remote_unreliable_monotonic_timestamps());
2556 ASSERT_EQ(log_header->message()
2557 .oldest_remote_unreliable_monotonic_timestamps()
2558 ->size(),
2559 2u);
2560 ASSERT_TRUE(log_header->message()
2561 .has_oldest_local_unreliable_monotonic_timestamps());
2562 ASSERT_EQ(log_header->message()
2563 .oldest_local_unreliable_monotonic_timestamps()
2564 ->size(),
2565 2u);
2566 ASSERT_TRUE(log_header->message()
2567 .has_oldest_remote_reliable_monotonic_timestamps());
2568 ASSERT_EQ(log_header->message()
2569 .oldest_remote_reliable_monotonic_timestamps()
2570 ->size(),
2571 2u);
2572 ASSERT_TRUE(
2573 log_header->message().has_oldest_local_reliable_monotonic_timestamps());
2574 ASSERT_EQ(log_header->message()
2575 .oldest_local_reliable_monotonic_timestamps()
2576 ->size(),
2577 2u);
2578
2579 ASSERT_TRUE(
2580 log_header->message()
2581 .has_oldest_logger_remote_unreliable_monotonic_timestamps());
2582 ASSERT_EQ(log_header->message()
2583 .oldest_logger_remote_unreliable_monotonic_timestamps()
2584 ->size(),
2585 2u);
2586 ASSERT_TRUE(log_header->message()
2587 .has_oldest_logger_local_unreliable_monotonic_timestamps());
2588 ASSERT_EQ(log_header->message()
2589 .oldest_logger_local_unreliable_monotonic_timestamps()
2590 ->size(),
2591 2u);
2592
2593 if (log_header->message().node()->name()->string_view() != "pi1") {
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002594 ASSERT_TRUE(file.find("timestamps/remote_pi2") != std::string::npos);
Naman Guptaa63aa132023-03-22 20:06:34 -07002595
2596 const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
2597 ReadNthMessage(file, 0);
2598 CHECK(msg);
2599
2600 EXPECT_TRUE(msg->message().has_monotonic_sent_time());
2601 EXPECT_TRUE(msg->message().has_monotonic_remote_time());
2602
2603 const monotonic_clock::time_point
2604 expected_oldest_local_monotonic_timestamps(
2605 chrono::nanoseconds(msg->message().monotonic_sent_time()));
2606 const monotonic_clock::time_point
2607 expected_oldest_remote_monotonic_timestamps(
2608 chrono::nanoseconds(msg->message().monotonic_remote_time()));
2609 const monotonic_clock::time_point
2610 expected_oldest_timestamp_monotonic_timestamps(
2611 chrono::nanoseconds(msg->message().monotonic_timestamp_time()));
2612
2613 EXPECT_NE(expected_oldest_local_monotonic_timestamps,
2614 monotonic_clock::min_time);
2615 EXPECT_NE(expected_oldest_remote_monotonic_timestamps,
2616 monotonic_clock::min_time);
2617 EXPECT_NE(expected_oldest_timestamp_monotonic_timestamps,
2618 monotonic_clock::min_time);
2619
2620 ++timestamp_file_count;
2621 // Since the log file is from the perspective of the other node,
2622 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2623 monotonic_clock::time_point(chrono::nanoseconds(
2624 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2625 0)));
2626 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2627 monotonic_clock::time_point(chrono::nanoseconds(
2628 log_header->message().oldest_local_monotonic_timestamps()->Get(
2629 0)));
2630 const monotonic_clock::time_point
2631 oldest_remote_unreliable_monotonic_timestamps =
2632 monotonic_clock::time_point(chrono::nanoseconds(
2633 log_header->message()
2634 .oldest_remote_unreliable_monotonic_timestamps()
2635 ->Get(0)));
2636 const monotonic_clock::time_point
2637 oldest_local_unreliable_monotonic_timestamps =
2638 monotonic_clock::time_point(chrono::nanoseconds(
2639 log_header->message()
2640 .oldest_local_unreliable_monotonic_timestamps()
2641 ->Get(0)));
2642 const monotonic_clock::time_point
2643 oldest_remote_reliable_monotonic_timestamps =
2644 monotonic_clock::time_point(chrono::nanoseconds(
2645 log_header->message()
2646 .oldest_remote_reliable_monotonic_timestamps()
2647 ->Get(0)));
2648 const monotonic_clock::time_point
2649 oldest_local_reliable_monotonic_timestamps =
2650 monotonic_clock::time_point(chrono::nanoseconds(
2651 log_header->message()
2652 .oldest_local_reliable_monotonic_timestamps()
2653 ->Get(0)));
2654 const monotonic_clock::time_point
2655 oldest_logger_remote_unreliable_monotonic_timestamps =
2656 monotonic_clock::time_point(chrono::nanoseconds(
2657 log_header->message()
2658 .oldest_logger_remote_unreliable_monotonic_timestamps()
2659 ->Get(1)));
2660 const monotonic_clock::time_point
2661 oldest_logger_local_unreliable_monotonic_timestamps =
2662 monotonic_clock::time_point(chrono::nanoseconds(
2663 log_header->message()
2664 .oldest_logger_local_unreliable_monotonic_timestamps()
2665 ->Get(1)));
2666
2667 const Channel *channel =
2668 event_loop_factory_.configuration()->channels()->Get(
2669 msg->message().channel_index());
2670 const Connection *connection = configuration::ConnectionToNode(
2671 channel, configuration::GetNode(
2672 event_loop_factory_.configuration(),
2673 log_header->message().node()->name()->string_view()));
2674
2675 const bool reliable = connection->time_to_live() == 0;
2676
2677 SCOPED_TRACE(file);
2678 SCOPED_TRACE(aos::FlatbufferToJson(
2679 *log_header, {.multi_line = true, .max_vector_size = 100}));
2680
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002681 // Confirm that the oldest timestamps match what we expect. Based on
2682 // what we are doing, we know that the oldest time is the first
2683 // message's time.
2684 //
2685 // This makes the test robust to both the split and combined config
2686 // tests.
2687 switch (log_header->message().parts_index()) {
2688 case 0:
2689 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2690 expected_oldest_remote_monotonic_timestamps);
2691 EXPECT_EQ(oldest_local_monotonic_timestamps,
2692 expected_oldest_local_monotonic_timestamps);
2693 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2694 expected_oldest_local_monotonic_timestamps)
2695 << file;
2696 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2697 expected_oldest_timestamp_monotonic_timestamps)
2698 << file;
2699
2700 if (reliable) {
2701 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
Naman Guptaa63aa132023-03-22 20:06:34 -07002702 expected_oldest_remote_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002703 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
Naman Guptaa63aa132023-03-22 20:06:34 -07002704 expected_oldest_local_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002705 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2706 monotonic_clock::max_time);
2707 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2708 monotonic_clock::max_time);
2709 } else {
2710 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2711 monotonic_clock::max_time);
2712 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2713 monotonic_clock::max_time);
Naman Guptaa63aa132023-03-22 20:06:34 -07002714 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2715 expected_oldest_remote_monotonic_timestamps);
2716 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2717 expected_oldest_local_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002718 }
2719 break;
2720 case 1:
2721 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2722 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2723 EXPECT_EQ(oldest_local_monotonic_timestamps,
2724 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2725 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2726 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2727 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2728 monotonic_clock::epoch() + chrono::nanoseconds(90250000));
2729 if (reliable) {
2730 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2731 expected_oldest_remote_monotonic_timestamps);
2732 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2733 expected_oldest_local_monotonic_timestamps);
2734 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2735 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2736 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2737 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2738 } else {
2739 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2740 monotonic_clock::max_time);
2741 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2742 monotonic_clock::max_time);
2743 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2744 expected_oldest_remote_monotonic_timestamps);
2745 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2746 expected_oldest_local_monotonic_timestamps);
2747 }
2748 break;
2749 case 2:
2750 EXPECT_EQ(
2751 oldest_remote_monotonic_timestamps,
2752 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2753 EXPECT_EQ(oldest_local_monotonic_timestamps,
2754 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2755 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2756 expected_oldest_local_monotonic_timestamps)
2757 << file;
2758 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2759 expected_oldest_timestamp_monotonic_timestamps)
2760 << file;
2761 if (reliable) {
2762 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2763 expected_oldest_remote_monotonic_timestamps);
2764 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2765 expected_oldest_local_monotonic_timestamps);
2766 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2767 monotonic_clock::max_time);
2768 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2769 monotonic_clock::max_time);
2770 } else {
2771 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2772 monotonic_clock::max_time);
2773 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2774 monotonic_clock::max_time);
2775 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2776 expected_oldest_remote_monotonic_timestamps);
2777 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2778 expected_oldest_local_monotonic_timestamps);
2779 }
2780 break;
Naman Guptaa63aa132023-03-22 20:06:34 -07002781
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002782 case 3:
2783 EXPECT_EQ(
2784 oldest_remote_monotonic_timestamps,
2785 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2786 EXPECT_EQ(oldest_local_monotonic_timestamps,
2787 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2788 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2789 expected_oldest_remote_monotonic_timestamps);
2790 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2791 expected_oldest_local_monotonic_timestamps);
2792 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2793 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2794 EXPECT_EQ(
2795 oldest_logger_local_unreliable_monotonic_timestamps,
2796 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2797 break;
2798 default:
2799 FAIL();
2800 break;
Naman Guptaa63aa132023-03-22 20:06:34 -07002801 }
2802
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002803 switch (log_header->message().parts_index()) {
2804 case 0:
2805 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2806 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2807 break;
2808 case 1:
2809 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2810 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2811 break;
2812 case 2:
2813 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2814 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2815 break;
2816 case 3:
2817 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2818 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2819 break;
2820 [[fallthrough]];
2821 default:
2822 FAIL();
2823 break;
2824 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002825 continue;
2826 }
2827 EXPECT_EQ(
2828 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2829 monotonic_clock::max_time.time_since_epoch().count());
2830 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2831 monotonic_clock::max_time.time_since_epoch().count());
2832 EXPECT_EQ(log_header->message()
2833 .oldest_remote_unreliable_monotonic_timestamps()
2834 ->Get(0),
2835 monotonic_clock::max_time.time_since_epoch().count());
2836 EXPECT_EQ(log_header->message()
2837 .oldest_local_unreliable_monotonic_timestamps()
2838 ->Get(0),
2839 monotonic_clock::max_time.time_since_epoch().count());
2840
2841 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2842 monotonic_clock::time_point(chrono::nanoseconds(
2843 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2844 1)));
2845 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2846 monotonic_clock::time_point(chrono::nanoseconds(
2847 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2848 const monotonic_clock::time_point
2849 oldest_remote_unreliable_monotonic_timestamps =
2850 monotonic_clock::time_point(chrono::nanoseconds(
2851 log_header->message()
2852 .oldest_remote_unreliable_monotonic_timestamps()
2853 ->Get(1)));
2854 const monotonic_clock::time_point
2855 oldest_local_unreliable_monotonic_timestamps =
2856 monotonic_clock::time_point(chrono::nanoseconds(
2857 log_header->message()
2858 .oldest_local_unreliable_monotonic_timestamps()
2859 ->Get(1)));
2860 switch (log_header->message().parts_index()) {
2861 case 0:
2862 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2863 monotonic_clock::max_time);
2864 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2865 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2866 monotonic_clock::max_time);
2867 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2868 monotonic_clock::max_time);
2869 break;
2870 default:
2871 FAIL();
2872 break;
2873 }
2874 }
2875
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002876 EXPECT_EQ(timestamp_file_count, 4u);
Naman Guptaa63aa132023-03-22 20:06:34 -07002877
2878 // Confirm that we can actually sort the resulting log and read it.
2879 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002880 auto sorted_parts = SortParts(filenames);
2881 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2882 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002883
2884 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2885 log_reader_factory.set_send_delay(chrono::microseconds(0));
2886
2887 // This sends out the fetched messages and advances time to the start of
2888 // the log file.
2889 reader.Register(&log_reader_factory);
2890
2891 log_reader_factory.Run();
2892
2893 reader.Deregister();
2894 }
2895}
2896
2897// Tests that we properly handle one direction of message_bridge being
2898// unavailable.
2899TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07002900 std::vector<std::string> actual_filenames;
2901
Naman Guptaa63aa132023-03-22 20:06:34 -07002902 pi1_->Disconnect(pi2_->node());
2903 time_converter_.AddMonotonic(
2904 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2905
2906 time_converter_.AddMonotonic(
2907 {chrono::milliseconds(10000),
2908 chrono::milliseconds(10000) - chrono::milliseconds(1)});
2909 {
2910 LoggerState pi1_logger = MakeLogger(pi1_);
2911
2912 event_loop_factory_.RunFor(chrono::milliseconds(95));
2913
2914 StartLogger(&pi1_logger);
2915
2916 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002917 pi1_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07002918 }
2919
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002920 // Confirm that we can parse the result. LogReader has enough internal
2921 // CHECKs to confirm the right thing happened.
Austin Schuh8fb4b452023-08-04 17:02:27 -07002922 ConfirmReadable(actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07002923}
2924
2925// Tests that we properly handle one direction of message_bridge being
2926// unavailable.
2927TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
2928 pi1_->Disconnect(pi2_->node());
2929 time_converter_.AddMonotonic(
2930 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(500)});
2931
2932 time_converter_.AddMonotonic(
2933 {chrono::milliseconds(10000),
2934 chrono::milliseconds(10000) + chrono::milliseconds(1)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07002935
2936 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07002937 {
2938 LoggerState pi1_logger = MakeLogger(pi1_);
2939
2940 event_loop_factory_.RunFor(chrono::milliseconds(95));
2941
2942 StartLogger(&pi1_logger);
2943
2944 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002945 pi1_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07002946 }
2947
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002948 // Confirm that we can parse the result. LogReader has enough internal
2949 // CHECKs to confirm the right thing happened.
Austin Schuh8fb4b452023-08-04 17:02:27 -07002950 ConfirmReadable(filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07002951}
2952
2953// Tests that we explode if someone passes in a part file twice with a better
2954// error than an out of order error.
2955TEST_P(MultinodeLoggerTest, DuplicateLogFiles) {
2956 time_converter_.AddMonotonic(
2957 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07002958
2959 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07002960 {
2961 LoggerState pi1_logger = MakeLogger(pi1_);
2962
2963 event_loop_factory_.RunFor(chrono::milliseconds(95));
2964
2965 StartLogger(&pi1_logger);
2966
2967 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002968
2969 pi1_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07002970 }
2971
2972 std::vector<std::string> duplicates;
Austin Schuh8fb4b452023-08-04 17:02:27 -07002973 for (const std::string &f : filenames) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002974 duplicates.emplace_back(f);
2975 duplicates.emplace_back(f);
2976 }
2977 EXPECT_DEATH({ SortParts(duplicates); }, "Found duplicate parts in");
2978}
2979
2980// Tests that we explode if someone loses a part out of the middle of a log.
2981TEST_P(MultinodeLoggerTest, MissingPartsFromMiddle) {
Austin Schuh6ecfe902023-08-04 22:44:37 -07002982 if (file_strategy() == FileStrategy::kCombine) {
2983 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
2984 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002985 time_converter_.AddMonotonic(
2986 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2987 {
2988 LoggerState pi1_logger = MakeLogger(pi1_);
2989
2990 event_loop_factory_.RunFor(chrono::milliseconds(95));
2991
2992 StartLogger(&pi1_logger);
2993 aos::monotonic_clock::time_point last_rotation_time =
2994 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07002995 pi1_logger.logger->set_on_logged_period(
2996 [&](aos::monotonic_clock::time_point) {
2997 const auto now = pi1_logger.event_loop->monotonic_now();
2998 if (now > last_rotation_time + std::chrono::seconds(5)) {
2999 pi1_logger.logger->Rotate();
3000 last_rotation_time = now;
3001 }
3002 });
Naman Guptaa63aa132023-03-22 20:06:34 -07003003
3004 event_loop_factory_.RunFor(chrono::milliseconds(10000));
3005 }
3006
3007 std::vector<std::string> missing_parts;
3008
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07003009 missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part0" +
3010 Extension());
3011 missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part2" +
3012 Extension());
Naman Guptaa63aa132023-03-22 20:06:34 -07003013 missing_parts.emplace_back(absl::StrCat(
3014 logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
3015
3016 EXPECT_DEATH({ SortParts(missing_parts); },
3017 "Broken log, missing part files between");
3018}
3019
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003020// Tests that we properly handle a dead node. Do this by just disconnecting
3021// it and only using one nodes of logs.
Naman Guptaa63aa132023-03-22 20:06:34 -07003022TEST_P(MultinodeLoggerTest, DeadNode) {
3023 pi1_->Disconnect(pi2_->node());
3024 pi2_->Disconnect(pi1_->node());
3025 time_converter_.AddMonotonic(
3026 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
3027 {
3028 LoggerState pi1_logger = MakeLogger(pi1_);
3029
3030 event_loop_factory_.RunFor(chrono::milliseconds(95));
3031
3032 StartLogger(&pi1_logger);
3033
3034 event_loop_factory_.RunFor(chrono::milliseconds(10000));
3035 }
3036
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003037 // Confirm that we can parse the result. LogReader has enough internal
3038 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003039 ConfirmReadable(MakePi1DeadNodeLogfiles());
3040}
3041
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003042// Tests that we can relog with a different config. This makes most sense
3043// when you are trying to edit a log and want to use channel renaming + the
3044// original config in the new log.
Naman Guptaa63aa132023-03-22 20:06:34 -07003045TEST_P(MultinodeLoggerTest, LogDifferentConfig) {
3046 time_converter_.StartEqual();
3047 {
3048 LoggerState pi1_logger = MakeLogger(pi1_);
3049 LoggerState pi2_logger = MakeLogger(pi2_);
3050
3051 event_loop_factory_.RunFor(chrono::milliseconds(95));
3052
3053 StartLogger(&pi1_logger);
3054 StartLogger(&pi2_logger);
3055
3056 event_loop_factory_.RunFor(chrono::milliseconds(20000));
3057 }
3058
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003059 auto sorted_parts = SortParts(logfiles_);
3060 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3061 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07003062 reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
3063
3064 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
3065 log_reader_factory.set_send_delay(chrono::microseconds(0));
3066
3067 // This sends out the fetched messages and advances time to the start of the
3068 // log file.
3069 reader.Register(&log_reader_factory);
3070
3071 const Node *pi1 =
3072 configuration::GetNode(log_reader_factory.configuration(), "pi1");
3073 const Node *pi2 =
3074 configuration::GetNode(log_reader_factory.configuration(), "pi2");
3075
3076 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
3077 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
3078 LOG(INFO) << "now pi1 "
3079 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
3080 LOG(INFO) << "now pi2 "
3081 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
3082
3083 EXPECT_THAT(reader.LoggedNodes(),
3084 ::testing::ElementsAre(
3085 configuration::GetNode(reader.logged_configuration(), pi1),
3086 configuration::GetNode(reader.logged_configuration(), pi2)));
3087
3088 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
3089
3090 // And confirm we can re-create a log again, while checking the contents.
3091 std::vector<std::string> log_files;
3092 {
3093 LoggerState pi1_logger =
3094 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
3095 &log_reader_factory, reader.logged_configuration());
3096 LoggerState pi2_logger =
3097 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
3098 &log_reader_factory, reader.logged_configuration());
3099
Austin Schuh7e417682023-08-11 17:05:30 -07003100 pi1_logger.StartLogger(tmp_dir_ + "/logs/relogged1");
3101 pi2_logger.StartLogger(tmp_dir_ + "/logs/relogged2");
Naman Guptaa63aa132023-03-22 20:06:34 -07003102
3103 log_reader_factory.Run();
3104
3105 for (auto &x : pi1_logger.log_namer->all_filenames()) {
Austin Schuh7e417682023-08-11 17:05:30 -07003106 log_files.emplace_back(absl::StrCat(tmp_dir_, "/logs/relogged1_", x));
Naman Guptaa63aa132023-03-22 20:06:34 -07003107 }
3108 for (auto &x : pi2_logger.log_namer->all_filenames()) {
Austin Schuh7e417682023-08-11 17:05:30 -07003109 log_files.emplace_back(absl::StrCat(tmp_dir_, "/logs/relogged2_", x));
Naman Guptaa63aa132023-03-22 20:06:34 -07003110 }
3111 }
3112
3113 reader.Deregister();
3114
3115 // And verify that we can run the LogReader over the relogged files without
3116 // hitting any fatal errors.
3117 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003118 auto sorted_parts = SortParts(log_files);
3119 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3120 LogReader relogged_reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07003121 relogged_reader.Register();
3122
3123 relogged_reader.event_loop_factory()->Run();
3124 }
3125}
3126
Maxwell Gumley8c1b87f2024-02-13 17:54:52 -07003127// Tests that we can relog with a subset of the original config. This is useful
3128// for excluding obsolete or deprecated channels, so they don't appear in the
3129// configuration when reading the log.
3130TEST_P(MultinodeLoggerTest, LogPartialConfig) {
3131 time_converter_.StartEqual();
3132 {
3133 LoggerState pi1_logger = MakeLogger(pi1_);
3134 LoggerState pi2_logger = MakeLogger(pi2_);
3135
3136 event_loop_factory_.RunFor(chrono::milliseconds(95));
3137
3138 StartLogger(&pi1_logger);
3139 StartLogger(&pi2_logger);
3140
3141 event_loop_factory_.RunFor(chrono::milliseconds(20000));
3142 }
3143
3144 auto sorted_parts = SortParts(logfiles_);
3145 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3146 LogReader reader(sorted_parts);
3147 reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
3148
3149 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
3150 log_reader_factory.set_send_delay(chrono::microseconds(0));
3151
3152 // This sends out the fetched messages and advances time to the start of the
3153 // log file.
3154 reader.Register(&log_reader_factory);
3155
3156 const Node *pi1 =
3157 configuration::GetNode(log_reader_factory.configuration(), "pi1");
3158 const Node *pi2 =
3159 configuration::GetNode(log_reader_factory.configuration(), "pi2");
3160
3161 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
3162 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
3163 LOG(INFO) << "now pi1 "
3164 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
3165 LOG(INFO) << "now pi2 "
3166 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
3167
3168 EXPECT_THAT(reader.LoggedNodes(),
3169 ::testing::ElementsAre(
3170 configuration::GetNode(reader.logged_configuration(), pi1),
3171 configuration::GetNode(reader.logged_configuration(), pi2)));
3172
3173 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
3174
3175 const FlatbufferDetachedBuffer<Configuration> partial_configuration_buffer =
3176 configuration::GetPartialConfiguration(
3177 *reader.event_loop_factory()->configuration(),
3178 [](const Channel &channel) {
3179 if (channel.name()->string_view().starts_with("/original/")) {
3180 LOG(INFO) << "Omitting channel from save_log, channel: "
3181 << channel.name()->string_view() << ", "
3182 << channel.type()->string_view();
3183 return false;
3184 }
3185 return true;
3186 });
3187
3188 // And confirm we can re-create a log again, while checking the contents.
3189 std::vector<std::string> log_files;
3190 {
3191 const Configuration *partial_configuration =
3192 &(partial_configuration_buffer.message());
3193
3194 LoggerState pi1_logger =
3195 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
3196 &log_reader_factory, partial_configuration);
3197 LoggerState pi2_logger =
3198 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
3199 &log_reader_factory, partial_configuration);
3200
3201 pi1_logger.StartLogger(tmp_dir_ + "/logs/relogged1");
3202 pi2_logger.StartLogger(tmp_dir_ + "/logs/relogged2");
3203
3204 log_reader_factory.Run();
3205
3206 for (auto &x : pi1_logger.log_namer->all_filenames()) {
3207 log_files.emplace_back(absl::StrCat(tmp_dir_, "/logs/relogged1_", x));
3208 }
3209 for (auto &x : pi2_logger.log_namer->all_filenames()) {
3210 log_files.emplace_back(absl::StrCat(tmp_dir_, "/logs/relogged2_", x));
3211 }
3212 }
3213
3214 reader.Deregister();
3215
3216 // And verify that we can run the LogReader over the relogged files without
3217 // hitting any fatal errors.
3218 {
3219 auto sorted_parts = SortParts(log_files);
3220 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3221 LogReader relogged_reader(sorted_parts);
3222 relogged_reader.Register();
3223
3224 relogged_reader.event_loop_factory()->Run();
3225 }
3226}
3227
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003228// Tests that we properly replay a log where the start time for a node is
3229// before any data on the node. This can happen if the logger starts before
3230// data is published. While the scenario below is a bit convoluted, we have
3231// seen logs like this generated out in the wild.
Naman Guptaa63aa132023-03-22 20:06:34 -07003232TEST(MultinodeRebootLoggerTest, StartTimeBeforeData) {
Austin Schuh7e417682023-08-11 17:05:30 -07003233 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3234 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3235
Naman Guptaa63aa132023-03-22 20:06:34 -07003236 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3237 aos::configuration::ReadConfig(ArtifactPath(
3238 "aos/events/logging/multinode_pingpong_split3_config.json"));
3239 message_bridge::TestingTimeConverter time_converter(
3240 configuration::NodesCount(&config.message()));
3241 SimulatedEventLoopFactory event_loop_factory(&config.message());
3242 event_loop_factory.SetTimeConverter(&time_converter);
3243 NodeEventLoopFactory *const pi1 =
3244 event_loop_factory.GetNodeEventLoopFactory("pi1");
3245 const size_t pi1_index = configuration::GetNodeIndex(
3246 event_loop_factory.configuration(), pi1->node());
3247 NodeEventLoopFactory *const pi2 =
3248 event_loop_factory.GetNodeEventLoopFactory("pi2");
3249 const size_t pi2_index = configuration::GetNodeIndex(
3250 event_loop_factory.configuration(), pi2->node());
3251 NodeEventLoopFactory *const pi3 =
3252 event_loop_factory.GetNodeEventLoopFactory("pi3");
3253 const size_t pi3_index = configuration::GetNodeIndex(
3254 event_loop_factory.configuration(), pi3->node());
3255
3256 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003257 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003258 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003259 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003260 const std::string kLogfile2_2 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003261 aos::testing::TestTmpDir() + "/logs/multi_logfile2.2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003262 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003263 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
3264
Naman Guptaa63aa132023-03-22 20:06:34 -07003265 const UUID pi1_boot0 = UUID::Random();
3266 const UUID pi2_boot0 = UUID::Random();
3267 const UUID pi2_boot1 = UUID::Random();
3268 const UUID pi3_boot0 = UUID::Random();
3269 {
3270 CHECK_EQ(pi1_index, 0u);
3271 CHECK_EQ(pi2_index, 1u);
3272 CHECK_EQ(pi3_index, 2u);
3273
3274 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3275 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3276 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3277 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3278
3279 time_converter.AddNextTimestamp(
3280 distributed_clock::epoch(),
3281 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3282 BootTimestamp::epoch()});
3283 const chrono::nanoseconds reboot_time = chrono::milliseconds(20000);
3284 time_converter.AddNextTimestamp(
3285 distributed_clock::epoch() + reboot_time,
3286 {BootTimestamp::epoch() + reboot_time,
3287 BootTimestamp{
3288 .boot = 1,
3289 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)},
3290 BootTimestamp::epoch() + reboot_time});
3291 }
3292
3293 // Make everything perfectly quiet.
3294 event_loop_factory.SkipTimingReport();
3295 event_loop_factory.DisableStatistics();
3296
3297 std::vector<std::string> filenames;
3298 {
3299 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003300 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3301 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003302 LoggerState pi3_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003303 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3304 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003305 {
3306 // And now start the logger.
3307 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003308 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3309 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003310
3311 event_loop_factory.RunFor(chrono::milliseconds(1000));
3312
3313 pi1_logger.StartLogger(kLogfile1_1);
3314 pi3_logger.StartLogger(kLogfile3_1);
3315 pi2_logger.StartLogger(kLogfile2_1);
3316
3317 event_loop_factory.RunFor(chrono::milliseconds(10000));
3318
3319 // Now that we've got a start time in the past, turn on data.
3320 event_loop_factory.EnableStatistics();
3321 std::unique_ptr<aos::EventLoop> ping_event_loop =
3322 pi1->MakeEventLoop("ping");
3323 Ping ping(ping_event_loop.get());
3324
3325 pi2->AlwaysStart<Pong>("pong");
3326
3327 event_loop_factory.RunFor(chrono::milliseconds(3000));
3328
3329 pi2_logger.AppendAllFilenames(&filenames);
3330
3331 // Stop logging on pi2 before rebooting and completely shut off all
3332 // messages on pi2.
3333 pi2->DisableStatistics();
3334 pi1->Disconnect(pi2->node());
3335 pi2->Disconnect(pi1->node());
3336 }
3337 event_loop_factory.RunFor(chrono::milliseconds(7000));
3338 // pi2 now reboots.
3339 {
3340 event_loop_factory.RunFor(chrono::milliseconds(1000));
3341
3342 // Start logging again on pi2 after it is up.
3343 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003344 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3345 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003346 pi2_logger.StartLogger(kLogfile2_2);
3347
3348 event_loop_factory.RunFor(chrono::milliseconds(10000));
3349 // And, now that we have a start time in the log, turn data back on.
3350 pi2->EnableStatistics();
3351 pi1->Connect(pi2->node());
3352 pi2->Connect(pi1->node());
3353
3354 pi2->AlwaysStart<Pong>("pong");
3355 std::unique_ptr<aos::EventLoop> ping_event_loop =
3356 pi1->MakeEventLoop("ping");
3357 Ping ping(ping_event_loop.get());
3358
3359 event_loop_factory.RunFor(chrono::milliseconds(3000));
3360
3361 pi2_logger.AppendAllFilenames(&filenames);
3362 }
3363
3364 pi1_logger.AppendAllFilenames(&filenames);
3365 pi3_logger.AppendAllFilenames(&filenames);
3366 }
3367
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003368 // Confirm that we can parse the result. LogReader has enough internal
3369 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003370 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003371 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003372 auto result = ConfirmReadable(filenames);
3373 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
3374 chrono::seconds(1)));
3375 EXPECT_THAT(result[0].second,
3376 ::testing::ElementsAre(realtime_clock::epoch() +
3377 chrono::microseconds(34990350)));
3378
3379 EXPECT_THAT(result[1].first,
3380 ::testing::ElementsAre(
3381 realtime_clock::epoch() + chrono::seconds(1),
3382 realtime_clock::epoch() + chrono::microseconds(3323000)));
3383 EXPECT_THAT(result[1].second,
3384 ::testing::ElementsAre(
3385 realtime_clock::epoch() + chrono::microseconds(13990200),
3386 realtime_clock::epoch() + chrono::microseconds(16313200)));
3387
3388 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
3389 chrono::seconds(1)));
3390 EXPECT_THAT(result[2].second,
3391 ::testing::ElementsAre(realtime_clock::epoch() +
3392 chrono::microseconds(34900150)));
3393}
3394
3395// Tests that local data before remote data after reboot is properly replayed.
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003396// We only trigger a reboot in the timestamp interpolation function when
3397// solving the timestamp problem when we actually have a point in the
3398// function. This originally only happened when a point passes the noncausal
3399// filter. At the start of time for the second boot, if we aren't careful, we
3400// will have messages which need to be published at times before the boot.
3401// This happens when a local message is in the log before a forwarded message,
3402// so there is no point in the interpolation function. This delays the
3403// reboot. So, we need to recreate that situation and make sure it doesn't
3404// come back.
Naman Guptaa63aa132023-03-22 20:06:34 -07003405TEST(MultinodeRebootLoggerTest,
3406 LocalMessageBeforeRemoteBeforeStartAfterReboot) {
Austin Schuh7e417682023-08-11 17:05:30 -07003407 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3408 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3409
Naman Guptaa63aa132023-03-22 20:06:34 -07003410 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3411 aos::configuration::ReadConfig(ArtifactPath(
3412 "aos/events/logging/multinode_pingpong_split3_config.json"));
3413 message_bridge::TestingTimeConverter time_converter(
3414 configuration::NodesCount(&config.message()));
3415 SimulatedEventLoopFactory event_loop_factory(&config.message());
3416 event_loop_factory.SetTimeConverter(&time_converter);
3417 NodeEventLoopFactory *const pi1 =
3418 event_loop_factory.GetNodeEventLoopFactory("pi1");
3419 const size_t pi1_index = configuration::GetNodeIndex(
3420 event_loop_factory.configuration(), pi1->node());
3421 NodeEventLoopFactory *const pi2 =
3422 event_loop_factory.GetNodeEventLoopFactory("pi2");
3423 const size_t pi2_index = configuration::GetNodeIndex(
3424 event_loop_factory.configuration(), pi2->node());
3425 NodeEventLoopFactory *const pi3 =
3426 event_loop_factory.GetNodeEventLoopFactory("pi3");
3427 const size_t pi3_index = configuration::GetNodeIndex(
3428 event_loop_factory.configuration(), pi3->node());
3429
3430 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003431 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003432 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003433 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003434 const std::string kLogfile2_2 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003435 aos::testing::TestTmpDir() + "/logs/multi_logfile2.2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003436 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003437 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003438 const UUID pi1_boot0 = UUID::Random();
3439 const UUID pi2_boot0 = UUID::Random();
3440 const UUID pi2_boot1 = UUID::Random();
3441 const UUID pi3_boot0 = UUID::Random();
3442 {
3443 CHECK_EQ(pi1_index, 0u);
3444 CHECK_EQ(pi2_index, 1u);
3445 CHECK_EQ(pi3_index, 2u);
3446
3447 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3448 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3449 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3450 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3451
3452 time_converter.AddNextTimestamp(
3453 distributed_clock::epoch(),
3454 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3455 BootTimestamp::epoch()});
3456 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3457 time_converter.AddNextTimestamp(
3458 distributed_clock::epoch() + reboot_time,
3459 {BootTimestamp::epoch() + reboot_time,
3460 BootTimestamp{.boot = 1,
3461 .time = monotonic_clock::epoch() + reboot_time +
3462 chrono::seconds(100)},
3463 BootTimestamp::epoch() + reboot_time});
3464 }
3465
3466 std::vector<std::string> filenames;
3467 {
3468 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003469 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3470 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003471 LoggerState pi3_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003472 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3473 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003474 {
3475 // And now start the logger.
3476 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003477 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3478 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003479
3480 pi1_logger.StartLogger(kLogfile1_1);
3481 pi3_logger.StartLogger(kLogfile3_1);
3482 pi2_logger.StartLogger(kLogfile2_1);
3483
3484 event_loop_factory.RunFor(chrono::milliseconds(1005));
3485
3486 // Now that we've got a start time in the past, turn on data.
3487 std::unique_ptr<aos::EventLoop> ping_event_loop =
3488 pi1->MakeEventLoop("ping");
3489 Ping ping(ping_event_loop.get());
3490
3491 pi2->AlwaysStart<Pong>("pong");
3492
3493 event_loop_factory.RunFor(chrono::milliseconds(3000));
3494
3495 pi2_logger.AppendAllFilenames(&filenames);
3496
3497 // Disable any remote messages on pi2.
3498 pi1->Disconnect(pi2->node());
3499 pi2->Disconnect(pi1->node());
3500 }
3501 event_loop_factory.RunFor(chrono::milliseconds(995));
3502 // pi2 now reboots at 5 seconds.
3503 {
3504 event_loop_factory.RunFor(chrono::milliseconds(1000));
3505
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003506 // Make local stuff happen before we start logging and connect the
3507 // remote.
Naman Guptaa63aa132023-03-22 20:06:34 -07003508 pi2->AlwaysStart<Pong>("pong");
3509 std::unique_ptr<aos::EventLoop> ping_event_loop =
3510 pi1->MakeEventLoop("ping");
3511 Ping ping(ping_event_loop.get());
3512 event_loop_factory.RunFor(chrono::milliseconds(1005));
3513
3514 // Start logging again on pi2 after it is up.
3515 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003516 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3517 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003518 pi2_logger.StartLogger(kLogfile2_2);
3519
3520 // And allow remote messages now that we have some local ones.
3521 pi1->Connect(pi2->node());
3522 pi2->Connect(pi1->node());
3523
3524 event_loop_factory.RunFor(chrono::milliseconds(1000));
3525
3526 event_loop_factory.RunFor(chrono::milliseconds(3000));
3527
3528 pi2_logger.AppendAllFilenames(&filenames);
3529 }
3530
3531 pi1_logger.AppendAllFilenames(&filenames);
3532 pi3_logger.AppendAllFilenames(&filenames);
3533 }
3534
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003535 // Confirm that we can parse the result. LogReader has enough internal
3536 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003537 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003538 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003539 auto result = ConfirmReadable(filenames);
3540
3541 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3542 EXPECT_THAT(result[0].second,
3543 ::testing::ElementsAre(realtime_clock::epoch() +
3544 chrono::microseconds(11000350)));
3545
3546 EXPECT_THAT(result[1].first,
3547 ::testing::ElementsAre(
3548 realtime_clock::epoch(),
3549 realtime_clock::epoch() + chrono::microseconds(107005000)));
3550 EXPECT_THAT(result[1].second,
3551 ::testing::ElementsAre(
3552 realtime_clock::epoch() + chrono::microseconds(4000150),
3553 realtime_clock::epoch() + chrono::microseconds(111000200)));
3554
3555 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3556 EXPECT_THAT(result[2].second,
3557 ::testing::ElementsAre(realtime_clock::epoch() +
3558 chrono::microseconds(11000150)));
3559
3560 auto start_stop_result = ConfirmReadable(
3561 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3562 realtime_clock::epoch() + chrono::milliseconds(3000));
3563
3564 EXPECT_THAT(
3565 start_stop_result[0].first,
3566 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3567 EXPECT_THAT(
3568 start_stop_result[0].second,
3569 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3570 EXPECT_THAT(
3571 start_stop_result[1].first,
3572 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3573 EXPECT_THAT(
3574 start_stop_result[1].second,
3575 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3576 EXPECT_THAT(
3577 start_stop_result[2].first,
3578 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3579 EXPECT_THAT(
3580 start_stop_result[2].second,
3581 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3582}
3583
3584// Tests that setting the start and stop flags across a reboot works as
3585// expected.
3586TEST(MultinodeRebootLoggerTest, RebootStartStopTimes) {
Austin Schuh7e417682023-08-11 17:05:30 -07003587 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3588 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3589
Naman Guptaa63aa132023-03-22 20:06:34 -07003590 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3591 aos::configuration::ReadConfig(ArtifactPath(
3592 "aos/events/logging/multinode_pingpong_split3_config.json"));
3593 message_bridge::TestingTimeConverter time_converter(
3594 configuration::NodesCount(&config.message()));
3595 SimulatedEventLoopFactory event_loop_factory(&config.message());
3596 event_loop_factory.SetTimeConverter(&time_converter);
3597 NodeEventLoopFactory *const pi1 =
3598 event_loop_factory.GetNodeEventLoopFactory("pi1");
3599 const size_t pi1_index = configuration::GetNodeIndex(
3600 event_loop_factory.configuration(), pi1->node());
3601 NodeEventLoopFactory *const pi2 =
3602 event_loop_factory.GetNodeEventLoopFactory("pi2");
3603 const size_t pi2_index = configuration::GetNodeIndex(
3604 event_loop_factory.configuration(), pi2->node());
3605 NodeEventLoopFactory *const pi3 =
3606 event_loop_factory.GetNodeEventLoopFactory("pi3");
3607 const size_t pi3_index = configuration::GetNodeIndex(
3608 event_loop_factory.configuration(), pi3->node());
3609
3610 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003611 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003612 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003613 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003614 const std::string kLogfile2_2 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003615 aos::testing::TestTmpDir() + "/logs/multi_logfile2.2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003616 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003617 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003618 {
3619 CHECK_EQ(pi1_index, 0u);
3620 CHECK_EQ(pi2_index, 1u);
3621 CHECK_EQ(pi3_index, 2u);
3622
3623 time_converter.AddNextTimestamp(
3624 distributed_clock::epoch(),
3625 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3626 BootTimestamp::epoch()});
3627 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3628 time_converter.AddNextTimestamp(
3629 distributed_clock::epoch() + reboot_time,
3630 {BootTimestamp::epoch() + reboot_time,
3631 BootTimestamp{.boot = 1,
3632 .time = monotonic_clock::epoch() + reboot_time},
3633 BootTimestamp::epoch() + reboot_time});
3634 }
3635
3636 std::vector<std::string> filenames;
3637 {
3638 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003639 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3640 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003641 LoggerState pi3_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003642 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3643 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003644 {
3645 // And now start the logger.
3646 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003647 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3648 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003649
3650 pi1_logger.StartLogger(kLogfile1_1);
3651 pi3_logger.StartLogger(kLogfile3_1);
3652 pi2_logger.StartLogger(kLogfile2_1);
3653
3654 event_loop_factory.RunFor(chrono::milliseconds(1005));
3655
3656 // Now that we've got a start time in the past, turn on data.
3657 std::unique_ptr<aos::EventLoop> ping_event_loop =
3658 pi1->MakeEventLoop("ping");
3659 Ping ping(ping_event_loop.get());
3660
3661 pi2->AlwaysStart<Pong>("pong");
3662
3663 event_loop_factory.RunFor(chrono::milliseconds(3000));
3664
3665 pi2_logger.AppendAllFilenames(&filenames);
3666 }
3667 event_loop_factory.RunFor(chrono::milliseconds(995));
3668 // pi2 now reboots at 5 seconds.
3669 {
3670 event_loop_factory.RunFor(chrono::milliseconds(1000));
3671
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003672 // Make local stuff happen before we start logging and connect the
3673 // remote.
Naman Guptaa63aa132023-03-22 20:06:34 -07003674 pi2->AlwaysStart<Pong>("pong");
3675 std::unique_ptr<aos::EventLoop> ping_event_loop =
3676 pi1->MakeEventLoop("ping");
3677 Ping ping(ping_event_loop.get());
3678 event_loop_factory.RunFor(chrono::milliseconds(5));
3679
3680 // Start logging again on pi2 after it is up.
3681 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003682 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3683 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003684 pi2_logger.StartLogger(kLogfile2_2);
3685
3686 event_loop_factory.RunFor(chrono::milliseconds(5000));
3687
3688 pi2_logger.AppendAllFilenames(&filenames);
3689 }
3690
3691 pi1_logger.AppendAllFilenames(&filenames);
3692 pi3_logger.AppendAllFilenames(&filenames);
3693 }
3694
3695 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003696 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003697 auto result = ConfirmReadable(filenames);
3698
3699 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3700 EXPECT_THAT(result[0].second,
3701 ::testing::ElementsAre(realtime_clock::epoch() +
3702 chrono::microseconds(11000350)));
3703
3704 EXPECT_THAT(result[1].first,
3705 ::testing::ElementsAre(
3706 realtime_clock::epoch(),
3707 realtime_clock::epoch() + chrono::microseconds(6005000)));
3708 EXPECT_THAT(result[1].second,
3709 ::testing::ElementsAre(
3710 realtime_clock::epoch() + chrono::microseconds(4900150),
3711 realtime_clock::epoch() + chrono::microseconds(11000200)));
3712
3713 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3714 EXPECT_THAT(result[2].second,
3715 ::testing::ElementsAre(realtime_clock::epoch() +
3716 chrono::microseconds(11000150)));
3717
3718 // Confirm we observed the correct start and stop times. We should see the
3719 // reboot here.
3720 auto start_stop_result = ConfirmReadable(
3721 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3722 realtime_clock::epoch() + chrono::milliseconds(8000));
3723
3724 EXPECT_THAT(
3725 start_stop_result[0].first,
3726 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3727 EXPECT_THAT(
3728 start_stop_result[0].second,
3729 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3730 EXPECT_THAT(start_stop_result[1].first,
3731 ::testing::ElementsAre(
3732 realtime_clock::epoch() + chrono::seconds(2),
3733 realtime_clock::epoch() + chrono::microseconds(6005000)));
3734 EXPECT_THAT(start_stop_result[1].second,
3735 ::testing::ElementsAre(
3736 realtime_clock::epoch() + chrono::microseconds(4900150),
3737 realtime_clock::epoch() + chrono::seconds(8)));
3738 EXPECT_THAT(
3739 start_stop_result[2].first,
3740 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3741 EXPECT_THAT(
3742 start_stop_result[2].second,
3743 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3744}
3745
3746// Tests that we properly handle one direction being down.
3747TEST(MissingDirectionTest, OneDirection) {
Austin Schuh7e417682023-08-11 17:05:30 -07003748 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3749 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3750
Naman Guptaa63aa132023-03-22 20:06:34 -07003751 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3752 aos::configuration::ReadConfig(ArtifactPath(
3753 "aos/events/logging/multinode_pingpong_split4_config.json"));
3754 message_bridge::TestingTimeConverter time_converter(
3755 configuration::NodesCount(&config.message()));
3756 SimulatedEventLoopFactory event_loop_factory(&config.message());
3757 event_loop_factory.SetTimeConverter(&time_converter);
3758
3759 NodeEventLoopFactory *const pi1 =
3760 event_loop_factory.GetNodeEventLoopFactory("pi1");
3761 const size_t pi1_index = configuration::GetNodeIndex(
3762 event_loop_factory.configuration(), pi1->node());
3763 NodeEventLoopFactory *const pi2 =
3764 event_loop_factory.GetNodeEventLoopFactory("pi2");
3765 const size_t pi2_index = configuration::GetNodeIndex(
3766 event_loop_factory.configuration(), pi2->node());
3767 std::vector<std::string> filenames;
3768
3769 {
3770 CHECK_EQ(pi1_index, 0u);
3771 CHECK_EQ(pi2_index, 1u);
3772
3773 time_converter.AddNextTimestamp(
3774 distributed_clock::epoch(),
3775 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3776
3777 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3778 time_converter.AddNextTimestamp(
3779 distributed_clock::epoch() + reboot_time,
3780 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3781 BootTimestamp::epoch() + reboot_time});
3782 }
3783
3784 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003785 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003786 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003787 aos::testing::TestTmpDir() + "/logs/multi_logfile1.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003788
3789 pi2->Disconnect(pi1->node());
3790
3791 pi1->AlwaysStart<Ping>("ping");
3792 pi2->AlwaysStart<Pong>("pong");
3793
3794 {
3795 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003796 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3797 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003798
3799 event_loop_factory.RunFor(chrono::milliseconds(95));
3800
3801 pi2_logger.StartLogger(kLogfile2_1);
3802
3803 event_loop_factory.RunFor(chrono::milliseconds(6000));
3804
3805 pi2->Connect(pi1->node());
3806
3807 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003808 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3809 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003810 pi1_logger.StartLogger(kLogfile1_1);
3811
3812 event_loop_factory.RunFor(chrono::milliseconds(5000));
3813 pi1_logger.AppendAllFilenames(&filenames);
3814 pi2_logger.AppendAllFilenames(&filenames);
3815 }
3816
3817 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003818 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003819 ConfirmReadable(filenames);
3820}
3821
3822// Tests that we properly handle only one direction ever existing after a
3823// reboot.
3824TEST(MissingDirectionTest, OneDirectionAfterReboot) {
Austin Schuh7e417682023-08-11 17:05:30 -07003825 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3826 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3827
Naman Guptaa63aa132023-03-22 20:06:34 -07003828 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3829 aos::configuration::ReadConfig(ArtifactPath(
3830 "aos/events/logging/multinode_pingpong_split4_config.json"));
3831 message_bridge::TestingTimeConverter time_converter(
3832 configuration::NodesCount(&config.message()));
3833 SimulatedEventLoopFactory event_loop_factory(&config.message());
3834 event_loop_factory.SetTimeConverter(&time_converter);
3835
3836 NodeEventLoopFactory *const pi1 =
3837 event_loop_factory.GetNodeEventLoopFactory("pi1");
3838 const size_t pi1_index = configuration::GetNodeIndex(
3839 event_loop_factory.configuration(), pi1->node());
3840 NodeEventLoopFactory *const pi2 =
3841 event_loop_factory.GetNodeEventLoopFactory("pi2");
3842 const size_t pi2_index = configuration::GetNodeIndex(
3843 event_loop_factory.configuration(), pi2->node());
3844 std::vector<std::string> filenames;
3845
3846 {
3847 CHECK_EQ(pi1_index, 0u);
3848 CHECK_EQ(pi2_index, 1u);
3849
3850 time_converter.AddNextTimestamp(
3851 distributed_clock::epoch(),
3852 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3853
3854 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3855 time_converter.AddNextTimestamp(
3856 distributed_clock::epoch() + reboot_time,
3857 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3858 BootTimestamp::epoch() + reboot_time});
3859 }
3860
3861 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003862 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003863
3864 pi1->AlwaysStart<Ping>("ping");
3865
3866 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3867 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3868 // second boot.
3869 {
3870 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003871 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3872 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003873
3874 event_loop_factory.RunFor(chrono::milliseconds(95));
3875
3876 pi2_logger.StartLogger(kLogfile2_1);
3877
3878 event_loop_factory.RunFor(chrono::milliseconds(4000));
3879
3880 pi2->Disconnect(pi1->node());
3881
3882 event_loop_factory.RunFor(chrono::milliseconds(1000));
3883 pi1->AlwaysStart<Ping>("ping");
3884
3885 event_loop_factory.RunFor(chrono::milliseconds(5000));
3886 pi2_logger.AppendAllFilenames(&filenames);
3887 }
3888
3889 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003890 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003891 ConfirmReadable(filenames);
3892}
3893
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003894// Tests that we properly handle only one direction ever existing after a
3895// reboot with only reliable data.
Naman Guptaa63aa132023-03-22 20:06:34 -07003896TEST(MissingDirectionTest, OneDirectionAfterRebootReliable) {
Austin Schuh7e417682023-08-11 17:05:30 -07003897 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3898 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3899
Naman Guptaa63aa132023-03-22 20:06:34 -07003900 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003901 aos::configuration::ReadConfig(
3902 ArtifactPath("aos/events/logging/"
3903 "multinode_pingpong_split4_reliable_config.json"));
Naman Guptaa63aa132023-03-22 20:06:34 -07003904 message_bridge::TestingTimeConverter time_converter(
3905 configuration::NodesCount(&config.message()));
3906 SimulatedEventLoopFactory event_loop_factory(&config.message());
3907 event_loop_factory.SetTimeConverter(&time_converter);
3908
3909 NodeEventLoopFactory *const pi1 =
3910 event_loop_factory.GetNodeEventLoopFactory("pi1");
3911 const size_t pi1_index = configuration::GetNodeIndex(
3912 event_loop_factory.configuration(), pi1->node());
3913 NodeEventLoopFactory *const pi2 =
3914 event_loop_factory.GetNodeEventLoopFactory("pi2");
3915 const size_t pi2_index = configuration::GetNodeIndex(
3916 event_loop_factory.configuration(), pi2->node());
3917 std::vector<std::string> filenames;
3918
3919 {
3920 CHECK_EQ(pi1_index, 0u);
3921 CHECK_EQ(pi2_index, 1u);
3922
3923 time_converter.AddNextTimestamp(
3924 distributed_clock::epoch(),
3925 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3926
3927 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3928 time_converter.AddNextTimestamp(
3929 distributed_clock::epoch() + reboot_time,
3930 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3931 BootTimestamp::epoch() + reboot_time});
3932 }
3933
3934 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003935 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003936
3937 pi1->AlwaysStart<Ping>("ping");
3938
3939 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3940 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3941 // second boot.
3942 {
3943 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003944 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3945 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003946
3947 event_loop_factory.RunFor(chrono::milliseconds(95));
3948
3949 pi2_logger.StartLogger(kLogfile2_1);
3950
3951 event_loop_factory.RunFor(chrono::milliseconds(4000));
3952
3953 pi2->Disconnect(pi1->node());
3954
3955 event_loop_factory.RunFor(chrono::milliseconds(1000));
3956 pi1->AlwaysStart<Ping>("ping");
3957
3958 event_loop_factory.RunFor(chrono::milliseconds(5000));
3959 pi2_logger.AppendAllFilenames(&filenames);
3960 }
3961
3962 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003963 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003964 ConfirmReadable(filenames);
3965}
3966
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003967// Tests that we properly handle only one direction ever existing after a
3968// reboot with mixed unreliable vs reliable, where reliable has an earlier
3969// timestamp than unreliable.
Brian Smartte67d7112023-03-20 12:06:30 -07003970TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase1) {
Austin Schuh7e417682023-08-11 17:05:30 -07003971 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3972 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3973
Brian Smartte67d7112023-03-20 12:06:30 -07003974 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3975 aos::configuration::ReadConfig(ArtifactPath(
3976 "aos/events/logging/multinode_pingpong_split4_mixed1_config.json"));
3977 message_bridge::TestingTimeConverter time_converter(
3978 configuration::NodesCount(&config.message()));
3979 SimulatedEventLoopFactory event_loop_factory(&config.message());
3980 event_loop_factory.SetTimeConverter(&time_converter);
3981
3982 NodeEventLoopFactory *const pi1 =
3983 event_loop_factory.GetNodeEventLoopFactory("pi1");
3984 const size_t pi1_index = configuration::GetNodeIndex(
3985 event_loop_factory.configuration(), pi1->node());
3986 NodeEventLoopFactory *const pi2 =
3987 event_loop_factory.GetNodeEventLoopFactory("pi2");
3988 const size_t pi2_index = configuration::GetNodeIndex(
3989 event_loop_factory.configuration(), pi2->node());
3990 std::vector<std::string> filenames;
3991
3992 {
3993 CHECK_EQ(pi1_index, 0u);
3994 CHECK_EQ(pi2_index, 1u);
3995
3996 time_converter.AddNextTimestamp(
3997 distributed_clock::epoch(),
3998 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3999
4000 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
4001 time_converter.AddNextTimestamp(
4002 distributed_clock::epoch() + reboot_time,
4003 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
4004 BootTimestamp::epoch() + reboot_time});
4005 }
4006
4007 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004008 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Brian Smartte67d7112023-03-20 12:06:30 -07004009
4010 // The following sequence using the above reference config creates
4011 // a reliable message timestamp < unreliable message timestamp.
4012 {
4013 pi1->DisableStatistics();
4014 pi2->DisableStatistics();
4015
4016 event_loop_factory.RunFor(chrono::milliseconds(95));
4017
4018 pi1->AlwaysStart<Ping>("ping");
4019
4020 event_loop_factory.RunFor(chrono::milliseconds(5250));
4021
4022 pi1->EnableStatistics();
4023
4024 event_loop_factory.RunFor(chrono::milliseconds(1000));
4025
4026 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004027 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4028 FileStrategy::kKeepSeparate);
Brian Smartte67d7112023-03-20 12:06:30 -07004029
4030 pi2_logger.StartLogger(kLogfile2_1);
4031
4032 event_loop_factory.RunFor(chrono::milliseconds(5000));
4033 pi2_logger.AppendAllFilenames(&filenames);
4034 }
4035
4036 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004037 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Brian Smartte67d7112023-03-20 12:06:30 -07004038 ConfirmReadable(filenames);
4039}
4040
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004041// Tests that we properly handle only one direction ever existing after a
4042// reboot with mixed unreliable vs reliable, where unreliable has an earlier
4043// timestamp than reliable.
Brian Smartte67d7112023-03-20 12:06:30 -07004044TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase2) {
Austin Schuh7e417682023-08-11 17:05:30 -07004045 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4046 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4047
Brian Smartte67d7112023-03-20 12:06:30 -07004048 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4049 aos::configuration::ReadConfig(ArtifactPath(
4050 "aos/events/logging/multinode_pingpong_split4_mixed2_config.json"));
4051 message_bridge::TestingTimeConverter time_converter(
4052 configuration::NodesCount(&config.message()));
4053 SimulatedEventLoopFactory event_loop_factory(&config.message());
4054 event_loop_factory.SetTimeConverter(&time_converter);
4055
4056 NodeEventLoopFactory *const pi1 =
4057 event_loop_factory.GetNodeEventLoopFactory("pi1");
4058 const size_t pi1_index = configuration::GetNodeIndex(
4059 event_loop_factory.configuration(), pi1->node());
4060 NodeEventLoopFactory *const pi2 =
4061 event_loop_factory.GetNodeEventLoopFactory("pi2");
4062 const size_t pi2_index = configuration::GetNodeIndex(
4063 event_loop_factory.configuration(), pi2->node());
4064 std::vector<std::string> filenames;
4065
4066 {
4067 CHECK_EQ(pi1_index, 0u);
4068 CHECK_EQ(pi2_index, 1u);
4069
4070 time_converter.AddNextTimestamp(
4071 distributed_clock::epoch(),
4072 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4073
4074 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
4075 time_converter.AddNextTimestamp(
4076 distributed_clock::epoch() + reboot_time,
4077 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
4078 BootTimestamp::epoch() + reboot_time});
4079 }
4080
4081 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004082 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Brian Smartte67d7112023-03-20 12:06:30 -07004083
4084 // The following sequence using the above reference config creates
4085 // an unreliable message timestamp < reliable message timestamp.
4086 {
4087 pi1->DisableStatistics();
4088 pi2->DisableStatistics();
4089
4090 event_loop_factory.RunFor(chrono::milliseconds(95));
4091
4092 pi1->AlwaysStart<Ping>("ping");
4093
4094 event_loop_factory.RunFor(chrono::milliseconds(5250));
4095
4096 pi1->EnableStatistics();
4097
4098 event_loop_factory.RunFor(chrono::milliseconds(1000));
4099
4100 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004101 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4102 FileStrategy::kKeepSeparate);
Brian Smartte67d7112023-03-20 12:06:30 -07004103
4104 pi2_logger.StartLogger(kLogfile2_1);
4105
4106 event_loop_factory.RunFor(chrono::milliseconds(5000));
4107 pi2_logger.AppendAllFilenames(&filenames);
4108 }
4109
4110 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004111 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Brian Smartte67d7112023-03-20 12:06:30 -07004112 ConfirmReadable(filenames);
4113}
4114
Naman Guptaa63aa132023-03-22 20:06:34 -07004115// Tests that we properly handle what used to be a time violation in one
4116// direction. This can occur when one direction goes down after sending some
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004117// data, but the other keeps working. The down direction ends up resolving to
4118// a straight line in the noncausal filter, where the direction which is still
4119// up can cross that line. Really, time progressed along just fine but we
4120// assumed that the offset was a line when it could have deviated by up to
4121// 1ms/second.
Naman Guptaa63aa132023-03-22 20:06:34 -07004122TEST_P(MultinodeLoggerTest, OneDirectionTimeDrift) {
4123 std::vector<std::string> filenames;
4124
4125 CHECK_EQ(pi1_index_, 0u);
4126 CHECK_EQ(pi2_index_, 1u);
4127
4128 time_converter_.AddNextTimestamp(
4129 distributed_clock::epoch(),
4130 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4131
4132 const chrono::nanoseconds before_disconnect_duration =
4133 time_converter_.AddMonotonic(
4134 {chrono::milliseconds(1000), chrono::milliseconds(1000)});
4135
4136 const chrono::nanoseconds test_duration =
4137 time_converter_.AddMonotonic(
4138 {chrono::milliseconds(1000), chrono::milliseconds(1000)}) +
4139 time_converter_.AddMonotonic(
4140 {chrono::milliseconds(10000),
4141 chrono::milliseconds(10000) - chrono::milliseconds(5)}) +
4142 time_converter_.AddMonotonic(
4143 {chrono::milliseconds(10000),
4144 chrono::milliseconds(10000) + chrono::milliseconds(5)});
4145
4146 const std::string kLogfile =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004147 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004148
4149 {
4150 LoggerState pi2_logger = MakeLogger(pi2_);
4151 pi2_logger.StartLogger(kLogfile);
4152 event_loop_factory_.RunFor(before_disconnect_duration);
4153
4154 pi2_->Disconnect(pi1_->node());
4155
4156 event_loop_factory_.RunFor(test_duration);
4157 pi2_->Connect(pi1_->node());
4158
4159 event_loop_factory_.RunFor(chrono::milliseconds(5000));
4160 pi2_logger.AppendAllFilenames(&filenames);
4161 }
4162
4163 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004164 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004165 ConfirmReadable(filenames);
4166}
4167
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004168// Tests that we can replay a logfile that has timestamps such that at least
4169// one node's epoch is at a positive distributed_clock (and thus will have to
4170// be booted after the other node(s)).
Naman Guptaa63aa132023-03-22 20:06:34 -07004171TEST_P(MultinodeLoggerTest, StartOneNodeBeforeOther) {
4172 std::vector<std::string> filenames;
4173
4174 CHECK_EQ(pi1_index_, 0u);
4175 CHECK_EQ(pi2_index_, 1u);
4176
4177 time_converter_.AddNextTimestamp(
4178 distributed_clock::epoch(),
4179 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4180
4181 const chrono::nanoseconds before_reboot_duration = chrono::milliseconds(1000);
4182 time_converter_.RebootAt(
4183 0, distributed_clock::time_point(before_reboot_duration));
4184
4185 const chrono::nanoseconds test_duration = time_converter_.AddMonotonic(
4186 {chrono::milliseconds(10000), chrono::milliseconds(10000)});
4187
4188 const std::string kLogfile =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004189 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004190
4191 pi2_->Disconnect(pi1_->node());
4192 pi1_->Disconnect(pi2_->node());
4193
4194 {
4195 LoggerState pi2_logger = MakeLogger(pi2_);
4196
4197 pi2_logger.StartLogger(kLogfile);
4198 event_loop_factory_.RunFor(before_reboot_duration);
4199
4200 pi2_->Connect(pi1_->node());
4201 pi1_->Connect(pi2_->node());
4202
4203 event_loop_factory_.RunFor(test_duration);
4204
4205 pi2_logger.AppendAllFilenames(&filenames);
4206 }
4207
4208 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004209 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004210 ConfirmReadable(filenames);
4211
4212 {
4213 LogReader reader(sorted_parts);
4214 SimulatedEventLoopFactory replay_factory(reader.configuration());
4215 reader.RegisterWithoutStarting(&replay_factory);
4216
4217 NodeEventLoopFactory *const replay_node =
4218 reader.event_loop_factory()->GetNodeEventLoopFactory("pi1");
4219
4220 std::unique_ptr<EventLoop> test_event_loop =
4221 replay_node->MakeEventLoop("test_reader");
4222 replay_node->OnStartup([replay_node]() {
4223 // Check that we didn't boot until at least t=0.
4224 CHECK_LE(monotonic_clock::epoch(), replay_node->monotonic_now());
4225 });
4226 test_event_loop->OnRun([&test_event_loop]() {
4227 // Check that we didn't boot until at least t=0.
4228 EXPECT_LE(monotonic_clock::epoch(), test_event_loop->monotonic_now());
4229 });
4230 reader.event_loop_factory()->Run();
4231 reader.Deregister();
4232 }
4233}
4234
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004235// Tests that when we have a loop without all the logs at all points in time,
4236// we can sort it properly.
Naman Guptaa63aa132023-03-22 20:06:34 -07004237TEST(MultinodeLoggerLoopTest, Loop) {
Austin Schuh7e417682023-08-11 17:05:30 -07004238 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4239 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4240
Naman Guptaa63aa132023-03-22 20:06:34 -07004241 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004242 aos::configuration::ReadConfig(
4243 ArtifactPath("aos/events/logging/"
4244 "multinode_pingpong_triangle_split_config.json"));
Naman Guptaa63aa132023-03-22 20:06:34 -07004245 message_bridge::TestingTimeConverter time_converter(
4246 configuration::NodesCount(&config.message()));
4247 SimulatedEventLoopFactory event_loop_factory(&config.message());
4248 event_loop_factory.SetTimeConverter(&time_converter);
4249
4250 NodeEventLoopFactory *const pi1 =
4251 event_loop_factory.GetNodeEventLoopFactory("pi1");
4252 NodeEventLoopFactory *const pi2 =
4253 event_loop_factory.GetNodeEventLoopFactory("pi2");
4254 NodeEventLoopFactory *const pi3 =
4255 event_loop_factory.GetNodeEventLoopFactory("pi3");
4256
4257 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004258 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004259 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004260 aos::testing::TestTmpDir() + "/logs/multi_logfile2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004261 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004262 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004263
4264 {
4265 // Make pi1 boot before everything else.
4266 time_converter.AddNextTimestamp(
4267 distributed_clock::epoch(),
4268 {BootTimestamp::epoch(),
4269 BootTimestamp::epoch() - chrono::milliseconds(100),
4270 BootTimestamp::epoch() - chrono::milliseconds(300)});
4271 }
4272
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004273 // We want to setup a situation such that 2 of the 3 legs of the loop are
4274 // very confident about time being X, and the third leg is pulling the
4275 // average off to one side.
Naman Guptaa63aa132023-03-22 20:06:34 -07004276 //
4277 // It's easiest to visualize this in timestamp_plotter.
4278
4279 std::vector<std::string> filenames;
4280 {
4281 // Have pi1 send out a reliable message at startup. This sets up a long
4282 // forwarding time message at the start to bias time.
4283 std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
4284 {
4285 aos::Sender<examples::Ping> ping_sender =
4286 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
4287
4288 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
4289 examples::Ping::Builder ping_builder =
4290 builder.MakeBuilder<examples::Ping>();
4291 CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
4292 }
4293
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004294 // Wait a while so there's enough data to let the worst case be rather
4295 // off.
Naman Guptaa63aa132023-03-22 20:06:34 -07004296 event_loop_factory.RunFor(chrono::seconds(1000));
4297
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004298 // Now start a receiving node first. This sets up 2 tight bounds between
4299 // 2 of the nodes.
Naman Guptaa63aa132023-03-22 20:06:34 -07004300 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004301 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4302 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004303 pi2_logger.StartLogger(kLogfile2_1);
4304
4305 event_loop_factory.RunFor(chrono::seconds(100));
4306
4307 // And now start the third leg.
4308 LoggerState pi3_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004309 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4310 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004311 pi3_logger.StartLogger(kLogfile3_1);
4312
4313 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004314 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4315 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004316 pi1_logger.StartLogger(kLogfile1_1);
4317
4318 event_loop_factory.RunFor(chrono::seconds(100));
4319
4320 pi1_logger.AppendAllFilenames(&filenames);
4321 pi2_logger.AppendAllFilenames(&filenames);
4322 pi3_logger.AppendAllFilenames(&filenames);
4323 }
4324
4325 // Make sure we can read this.
4326 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004327 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004328 auto result = ConfirmReadable(filenames);
4329}
4330
Austin Schuh08dba8f2023-05-01 08:29:30 -07004331// Tests that RestartLogging works in the simple case. Unfortunately, the
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004332// failure cases involve simulating time elapsing in callbacks, which is
4333// really hard. The best we can reasonably do is make sure 2 back to back
4334// logs are parseable together.
Austin Schuh08dba8f2023-05-01 08:29:30 -07004335TEST_P(MultinodeLoggerTest, RestartLogging) {
4336 time_converter_.AddMonotonic(
4337 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
4338 std::vector<std::string> filenames;
4339 {
4340 LoggerState pi1_logger = MakeLogger(pi1_);
4341
4342 event_loop_factory_.RunFor(chrono::milliseconds(95));
4343
4344 StartLogger(&pi1_logger, logfile_base1_);
4345 aos::monotonic_clock::time_point last_rotation_time =
4346 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07004347 pi1_logger.logger->set_on_logged_period(
4348 [&](aos::monotonic_clock::time_point) {
4349 const auto now = pi1_logger.event_loop->monotonic_now();
4350 if (now > last_rotation_time + std::chrono::seconds(5)) {
4351 pi1_logger.AppendAllFilenames(&filenames);
4352 std::unique_ptr<MultiNodeFilesLogNamer> namer =
4353 pi1_logger.MakeLogNamer(logfile_base2_);
4354 pi1_logger.log_namer = namer.get();
Austin Schuh08dba8f2023-05-01 08:29:30 -07004355
Austin Schuh2f864452023-07-17 14:53:08 -07004356 pi1_logger.logger->RestartLogging(std::move(namer));
4357 last_rotation_time = now;
4358 }
4359 });
Austin Schuh08dba8f2023-05-01 08:29:30 -07004360
4361 event_loop_factory_.RunFor(chrono::milliseconds(7000));
4362
4363 pi1_logger.AppendAllFilenames(&filenames);
4364 }
4365
4366 for (const auto &x : filenames) {
4367 LOG(INFO) << x;
4368 }
4369
4370 EXPECT_GE(filenames.size(), 2u);
4371
4372 ConfirmReadable(filenames);
4373
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004374 // TODO(austin): It would be good to confirm that any one time messages end
4375 // up in both logs correctly.
Austin Schuh08dba8f2023-05-01 08:29:30 -07004376}
4377
Austin Schuh6e93fc22023-08-22 21:27:22 -07004378// Tests that we call OnEnd without --skip_missing_forwarding_entries.
4379TEST_P(MultinodeLoggerTest, SkipMissingForwardingEntries) {
4380 if (file_strategy() == FileStrategy::kCombine) {
4381 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
4382 }
4383 time_converter_.AddMonotonic(
4384 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
4385
4386 std::vector<std::string> filenames;
4387 {
4388 LoggerState pi1_logger = MakeLogger(pi1_);
4389
4390 event_loop_factory_.RunFor(chrono::milliseconds(95));
4391
4392 StartLogger(&pi1_logger);
4393 aos::monotonic_clock::time_point last_rotation_time =
4394 pi1_logger.event_loop->monotonic_now();
4395 pi1_logger.logger->set_on_logged_period(
4396 [&](aos::monotonic_clock::time_point) {
4397 const auto now = pi1_logger.event_loop->monotonic_now();
4398 if (now > last_rotation_time + std::chrono::seconds(5)) {
4399 pi1_logger.logger->Rotate();
4400 last_rotation_time = now;
4401 }
4402 });
4403
4404 event_loop_factory_.RunFor(chrono::milliseconds(15000));
4405 pi1_logger.AppendAllFilenames(&filenames);
4406 }
4407
4408 // If we remove the last remote data part, we'll trigger missing data for
4409 // timestamps.
4410 filenames.erase(std::remove_if(filenames.begin(), filenames.end(),
4411 [](const std::string &s) {
4412 return s.find("data/pi2_data.part3.bfbs") !=
4413 std::string::npos;
4414 }),
4415 filenames.end());
4416
4417 auto result = ConfirmReadable(filenames);
4418}
4419
Austin Schuh54ffea42023-08-23 13:27:04 -07004420// Tests that we call OnEnd without --skip_missing_forwarding_entries.
4421TEST(MultinodeLoggerConfigTest, SingleNode) {
4422 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4423 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4424
4425 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4426 aos::configuration::ReadConfig(
4427 ArtifactPath("aos/events/logging/multinode_single_node_config.json"));
4428 message_bridge::TestingTimeConverter time_converter(
4429 configuration::NodesCount(&config.message()));
4430 SimulatedEventLoopFactory event_loop_factory(&config.message());
4431 event_loop_factory.SetTimeConverter(&time_converter);
4432
4433 time_converter.StartEqual();
4434
4435 const std::string kLogfile1_1 =
4436 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
4437
4438 NodeEventLoopFactory *const pi1 =
4439 event_loop_factory.GetNodeEventLoopFactory("pi1");
4440
4441 std::vector<std::string> filenames;
4442
4443 {
4444 // Now start a receiving node first. This sets up 2 tight bounds between
4445 // 2 of the nodes.
4446 LoggerState pi1_logger = MakeLoggerState(
4447 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4448 FileStrategy::kKeepSeparate);
4449 pi1_logger.StartLogger(kLogfile1_1);
4450
4451 event_loop_factory.RunFor(chrono::seconds(10));
4452
4453 pi1_logger.AppendAllFilenames(&filenames);
4454 }
4455
4456 // Make sure we can read this.
4457 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4458 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
4459 auto result = ConfirmReadable(filenames);
4460
4461 // TODO(austin): Probably want to stop caring about ServerStatistics,
4462 // ClientStatistics, and Timestamp since they don't really make sense.
4463}
4464
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004465// Tests that when we have evidence of 2 boots, and then start logging, the
4466// max_out_of_order_duration ends up reasonable on the boot with the start time.
4467TEST(MultinodeLoggerLoopTest, PreviousBootData) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07004468 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4469 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4470
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004471 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4472 aos::configuration::ReadConfig(ArtifactPath(
4473 "aos/events/logging/multinode_pingpong_reboot_ooo_config.json"));
4474 message_bridge::TestingTimeConverter time_converter(
4475 configuration::NodesCount(&config.message()));
4476 SimulatedEventLoopFactory event_loop_factory(&config.message());
4477 event_loop_factory.SetTimeConverter(&time_converter);
4478
4479 const UUID pi1_boot0 = UUID::Random();
4480 const UUID pi2_boot0 = UUID::Random();
4481 const UUID pi2_boot1 = UUID::Random();
4482
4483 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004484 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004485
4486 {
4487 constexpr size_t kPi1Index = 0;
4488 constexpr size_t kPi2Index = 1;
4489 time_converter.set_boot_uuid(kPi1Index, 0, pi1_boot0);
4490 time_converter.set_boot_uuid(kPi2Index, 0, pi2_boot0);
4491 time_converter.set_boot_uuid(kPi2Index, 1, pi2_boot1);
4492
4493 // Make pi1 boot before everything else.
4494 time_converter.AddNextTimestamp(
4495 distributed_clock::epoch(),
4496 {BootTimestamp::epoch(),
4497 BootTimestamp::epoch() - chrono::milliseconds(100)});
4498
4499 const chrono::nanoseconds reboot_time = chrono::seconds(1005);
4500 time_converter.AddNextTimestamp(
4501 distributed_clock::epoch() + reboot_time,
4502 {BootTimestamp::epoch() + reboot_time,
4503 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()}});
4504 }
4505
4506 NodeEventLoopFactory *const pi1 =
4507 event_loop_factory.GetNodeEventLoopFactory("pi1");
4508 NodeEventLoopFactory *const pi2 =
4509 event_loop_factory.GetNodeEventLoopFactory("pi2");
4510
4511 // What we want is for pi2 to send a message at t=1000 on the first channel
4512 // (/atest1 pong), and t=1 on the second channel (/atest3 pong). That'll make
4513 // the max out of order duration be large.
4514 //
4515 // Then, we reboot, and only send messages on a third channel (/atest2 pong).
4516 // The order is key, they need to sort in this order in the config.
4517
4518 std::vector<std::string> filenames;
4519 {
4520 {
4521 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4522 aos::Sender<examples::Pong> pong_sender =
4523 pi2_event_loop->MakeSender<examples::Pong>("/atest3");
4524
4525 pi2_event_loop->OnRun([&]() {
4526 aos::Sender<examples::Pong>::Builder builder =
4527 pong_sender.MakeBuilder();
4528 examples::Pong::Builder pong_builder =
4529 builder.MakeBuilder<examples::Pong>();
4530 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4531 });
4532
4533 event_loop_factory.RunFor(chrono::seconds(1000));
4534 }
4535
4536 {
4537 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4538 aos::Sender<examples::Pong> pong_sender =
4539 pi2_event_loop->MakeSender<examples::Pong>("/atest1");
4540
4541 aos::Sender<examples::Pong>::Builder builder = pong_sender.MakeBuilder();
4542 examples::Pong::Builder pong_builder =
4543 builder.MakeBuilder<examples::Pong>();
4544 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4545 }
4546
4547 event_loop_factory.RunFor(chrono::seconds(10));
4548
4549 // Now start a receiving node first. This sets up 2 tight bounds between
4550 // 2 of the nodes.
4551 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004552 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4553 FileStrategy::kKeepSeparate);
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004554 pi1_logger.StartLogger(kLogfile1_1);
4555
4556 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4557 aos::Sender<examples::Pong> pong_sender =
4558 pi2_event_loop->MakeSender<examples::Pong>("/atest2");
4559
4560 pi2_event_loop->AddPhasedLoop(
4561 [&pong_sender](int) {
4562 aos::Sender<examples::Pong>::Builder builder =
4563 pong_sender.MakeBuilder();
4564 examples::Pong::Builder pong_builder =
4565 builder.MakeBuilder<examples::Pong>();
4566 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4567 },
4568 chrono::milliseconds(10));
4569
4570 event_loop_factory.RunFor(chrono::seconds(100));
4571
4572 pi1_logger.AppendAllFilenames(&filenames);
4573 }
4574
4575 // Make sure we can read this.
4576 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4577 EXPECT_TRUE(AllRebootPartsMatchOutOfOrderDuration(sorted_parts, "pi2"));
4578 auto result = ConfirmReadable(filenames);
4579}
4580
4581// Tests that when we start without a connection, and then start logging, the
4582// max_out_of_order_duration ends up reasonable.
4583TEST(MultinodeLoggerLoopTest, StartDisconnected) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07004584 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4585 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4586
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004587 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4588 aos::configuration::ReadConfig(ArtifactPath(
4589 "aos/events/logging/multinode_pingpong_reboot_ooo_config.json"));
4590 message_bridge::TestingTimeConverter time_converter(
4591 configuration::NodesCount(&config.message()));
4592 SimulatedEventLoopFactory event_loop_factory(&config.message());
4593 event_loop_factory.SetTimeConverter(&time_converter);
4594
4595 time_converter.StartEqual();
4596
4597 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004598 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004599
4600 NodeEventLoopFactory *const pi1 =
4601 event_loop_factory.GetNodeEventLoopFactory("pi1");
4602 NodeEventLoopFactory *const pi2 =
4603 event_loop_factory.GetNodeEventLoopFactory("pi2");
4604
4605 // What we want is for pi2 to send a message at t=1000 on the first channel
4606 // (/atest1 pong), and t=1 on the second channel (/atest3 pong). That'll make
4607 // the max out of order duration be large.
4608 //
4609 // Then, we disconnect, and only send messages on a third channel
4610 // (/atest2 pong). The order is key, they need to sort in this order in the
4611 // config so we observe them in the order which grows the
4612 // max_out_of_order_duration.
4613
4614 std::vector<std::string> filenames;
4615 {
4616 {
4617 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4618 aos::Sender<examples::Pong> pong_sender =
4619 pi2_event_loop->MakeSender<examples::Pong>("/atest3");
4620
4621 pi2_event_loop->OnRun([&]() {
4622 aos::Sender<examples::Pong>::Builder builder =
4623 pong_sender.MakeBuilder();
4624 examples::Pong::Builder pong_builder =
4625 builder.MakeBuilder<examples::Pong>();
4626 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4627 });
4628
4629 event_loop_factory.RunFor(chrono::seconds(1000));
4630 }
4631
4632 {
4633 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4634 aos::Sender<examples::Pong> pong_sender =
4635 pi2_event_loop->MakeSender<examples::Pong>("/atest1");
4636
4637 aos::Sender<examples::Pong>::Builder builder = pong_sender.MakeBuilder();
4638 examples::Pong::Builder pong_builder =
4639 builder.MakeBuilder<examples::Pong>();
4640 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4641 }
4642
4643 event_loop_factory.RunFor(chrono::seconds(10));
4644
4645 pi1->Disconnect(pi2->node());
4646 pi2->Disconnect(pi1->node());
4647
4648 // Make data flow.
4649 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4650 aos::Sender<examples::Pong> pong_sender =
4651 pi2_event_loop->MakeSender<examples::Pong>("/atest2");
4652
4653 pi2_event_loop->AddPhasedLoop(
4654 [&pong_sender](int) {
4655 aos::Sender<examples::Pong>::Builder builder =
4656 pong_sender.MakeBuilder();
4657 examples::Pong::Builder pong_builder =
4658 builder.MakeBuilder<examples::Pong>();
4659 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4660 },
4661 chrono::milliseconds(10));
4662
4663 event_loop_factory.RunFor(chrono::seconds(10));
4664
4665 // Now start a receiving node first. This sets up 2 tight bounds between
4666 // 2 of the nodes.
4667 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004668 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4669 FileStrategy::kKeepSeparate);
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004670 pi1_logger.StartLogger(kLogfile1_1);
4671
4672 event_loop_factory.RunFor(chrono::seconds(10));
4673
4674 // Now, reconnect, and everything should recover.
4675 pi1->Connect(pi2->node());
4676 pi2->Connect(pi1->node());
4677
4678 event_loop_factory.RunFor(chrono::seconds(10));
4679
4680 pi1_logger.AppendAllFilenames(&filenames);
4681 }
4682
4683 // Make sure we can read this.
4684 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4685 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
4686 auto result = ConfirmReadable(filenames);
4687}
4688
Austin Schuh1124c512023-08-01 15:20:44 -07004689// Class to spam Pong messages blindly.
4690class PongSender {
4691 public:
4692 PongSender(EventLoop *loop, std::string_view channel_name)
4693 : sender_(loop->MakeSender<examples::Pong>(channel_name)) {
4694 loop->AddPhasedLoop(
4695 [this](int) {
4696 aos::Sender<examples::Pong>::Builder builder = sender_.MakeBuilder();
4697 examples::Pong::Builder pong_builder =
4698 builder.MakeBuilder<examples::Pong>();
4699 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4700 },
4701 chrono::milliseconds(10));
4702 }
4703
4704 private:
4705 aos::Sender<examples::Pong> sender_;
4706};
4707
4708// Tests that we log correctly as nodes connect slowly.
4709TEST(MultinodeLoggerLoopTest, StaggeredConnect) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07004710 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4711 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4712
Austin Schuh1124c512023-08-01 15:20:44 -07004713 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4714 aos::configuration::ReadConfig(ArtifactPath(
4715 "aos/events/logging/multinode_pingpong_pi3_pingpong_config.json"));
4716 message_bridge::TestingTimeConverter time_converter(
4717 configuration::NodesCount(&config.message()));
4718 SimulatedEventLoopFactory event_loop_factory(&config.message());
4719 event_loop_factory.SetTimeConverter(&time_converter);
4720
4721 time_converter.StartEqual();
4722
4723 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004724 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Austin Schuh1124c512023-08-01 15:20:44 -07004725
4726 NodeEventLoopFactory *const pi1 =
4727 event_loop_factory.GetNodeEventLoopFactory("pi1");
4728 NodeEventLoopFactory *const pi2 =
4729 event_loop_factory.GetNodeEventLoopFactory("pi2");
4730 NodeEventLoopFactory *const pi3 =
4731 event_loop_factory.GetNodeEventLoopFactory("pi3");
4732
4733 // What we want is for pi2 to send a message at t=1000 on the first channel
4734 // (/atest1 pong), and t=1 on the second channel (/atest3 pong). That'll make
4735 // the max out of order duration be large.
4736 //
4737 // Then, we disconnect, and only send messages on a third channel
4738 // (/atest2 pong). The order is key, they need to sort in this order in the
4739 // config so we observe them in the order which grows the
4740 // max_out_of_order_duration.
4741
4742 pi1->Disconnect(pi2->node());
4743 pi2->Disconnect(pi1->node());
4744
4745 pi1->Disconnect(pi3->node());
4746 pi3->Disconnect(pi1->node());
4747
4748 std::vector<std::string> filenames;
4749 pi2->AlwaysStart<PongSender>("pongsender", "/test2");
4750 pi3->AlwaysStart<PongSender>("pongsender", "/test3");
4751
4752 event_loop_factory.RunFor(chrono::seconds(10));
4753
4754 {
4755 // Now start a receiving node first. This sets up 2 tight bounds between
4756 // 2 of the nodes.
4757 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004758 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4759 FileStrategy::kKeepSeparate);
Austin Schuh1124c512023-08-01 15:20:44 -07004760 pi1_logger.StartLogger(kLogfile1_1);
4761
4762 event_loop_factory.RunFor(chrono::seconds(10));
4763
4764 // Now, reconnect, and everything should recover.
4765 pi1->Connect(pi2->node());
4766 pi2->Connect(pi1->node());
4767
4768 event_loop_factory.RunFor(chrono::seconds(10));
4769
4770 pi1->Connect(pi3->node());
4771 pi3->Connect(pi1->node());
4772
4773 event_loop_factory.RunFor(chrono::seconds(10));
4774
4775 pi1_logger.AppendAllFilenames(&filenames);
4776 }
4777
4778 // Make sure we can read this.
4779 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4780 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
4781 auto result = ConfirmReadable(filenames);
4782}
4783
Stephan Pleinesf63bde82024-01-13 15:59:33 -08004784} // namespace aos::logger::testing