blob: 64abfe75d3e400d3dbc240dc36f302a14879016e [file] [log] [blame]
Brian Smartte67d7112023-03-20 12:06:30 -07001#include <algorithm>
2
Philipp Schrader790cb542023-07-05 21:06:52 -07003#include "gmock/gmock.h"
4#include "gtest/gtest.h"
5
Naman Guptaa63aa132023-03-22 20:06:34 -07006#include "aos/events/logging/log_reader.h"
7#include "aos/events/logging/multinode_logger_test_lib.h"
8#include "aos/events/message_counter.h"
9#include "aos/events/ping_lib.h"
10#include "aos/events/pong_lib.h"
11#include "aos/network/remote_message_generated.h"
12#include "aos/network/timestamp_generated.h"
13#include "aos/testing/tmpdir.h"
Naman Guptaa63aa132023-03-22 20:06:34 -070014
15namespace aos {
16namespace logger {
17namespace testing {
18
19namespace chrono = std::chrono;
20using aos::message_bridge::RemoteMessage;
21using aos::testing::ArtifactPath;
22using aos::testing::MessageCounter;
23
Naman Guptaa63aa132023-03-22 20:06:34 -070024INSTANTIATE_TEST_SUITE_P(
25 All, MultinodeLoggerTest,
26 ::testing::Combine(
27 ::testing::Values(
28 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080029 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070030 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080031 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070032 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
33
34INSTANTIATE_TEST_SUITE_P(
35 All, MultinodeLoggerDeathTest,
36 ::testing::Combine(
37 ::testing::Values(
38 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080039 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070040 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080041 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070042 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
43
44// Tests that we can write and read simple multi-node log files.
45TEST_P(MultinodeLoggerTest, SimpleMultiNode) {
46 std::vector<std::string> actual_filenames;
47 time_converter_.StartEqual();
48
49 {
50 LoggerState pi1_logger = MakeLogger(pi1_);
51 LoggerState pi2_logger = MakeLogger(pi2_);
52
53 event_loop_factory_.RunFor(chrono::milliseconds(95));
54
55 StartLogger(&pi1_logger);
56 StartLogger(&pi2_logger);
57
58 event_loop_factory_.RunFor(chrono::milliseconds(20000));
59 pi1_logger.AppendAllFilenames(&actual_filenames);
60 pi2_logger.AppendAllFilenames(&actual_filenames);
61 }
62
63 ASSERT_THAT(actual_filenames,
64 ::testing::UnorderedElementsAreArray(logfiles_));
65
66 {
67 std::set<std::string> logfile_uuids;
68 std::set<std::string> parts_uuids;
69 // Confirm that we have the expected number of UUIDs for both the logfile
70 // UUIDs and parts UUIDs.
71 std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
72 for (std::string_view f : logfiles_) {
73 log_header.emplace_back(ReadHeader(f).value());
74 if (!log_header.back().message().has_configuration()) {
75 logfile_uuids.insert(
76 log_header.back().message().log_event_uuid()->str());
77 parts_uuids.insert(log_header.back().message().parts_uuid()->str());
78 }
79 }
80
81 EXPECT_EQ(logfile_uuids.size(), 2u);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -070082 EXPECT_EQ(parts_uuids.size(), 8u);
Naman Guptaa63aa132023-03-22 20:06:34 -070083
84 // And confirm everything is on the correct node.
85 EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
86 EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi1");
87 EXPECT_EQ(log_header[4].message().node()->name()->string_view(), "pi1");
88
89 EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
90 EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -070091 EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -070092
Mithun Bharadwaj0c629932023-08-02 16:10:40 -070093 EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi1");
94 EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi1");
Naman Guptaa63aa132023-03-22 20:06:34 -070095
Mithun Bharadwaj0c629932023-08-02 16:10:40 -070096 EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi2");
97 EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -070098
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -070099 EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
100 EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
101 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700102
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700103 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi1");
104 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi1");
Naman Guptaa63aa132023-03-22 20:06:34 -0700105
106 // And the parts index matches.
107 EXPECT_EQ(log_header[2].message().parts_index(), 0);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700108
109 EXPECT_EQ(log_header[3].message().parts_index(), 0);
110 EXPECT_EQ(log_header[4].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700111
112 EXPECT_EQ(log_header[5].message().parts_index(), 0);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700113
114 EXPECT_EQ(log_header[6].message().parts_index(), 0);
115 EXPECT_EQ(log_header[7].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700116
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700117 EXPECT_EQ(log_header[8].message().parts_index(), 0);
118 EXPECT_EQ(log_header[9].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700119
120 EXPECT_EQ(log_header[10].message().parts_index(), 0);
121 EXPECT_EQ(log_header[11].message().parts_index(), 1);
122
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700123 EXPECT_EQ(log_header[12].message().parts_index(), 0);
124 EXPECT_EQ(log_header[13].message().parts_index(), 1);
125 EXPECT_EQ(log_header[14].message().parts_index(), 2);
Naman Guptaa63aa132023-03-22 20:06:34 -0700126
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700127 EXPECT_EQ(log_header[15].message().parts_index(), 0);
128 EXPECT_EQ(log_header[16].message().parts_index(), 1);
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700129
130 // And that the data_stored field is right.
131 EXPECT_THAT(*log_header[2].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700132 ::testing::ElementsAre(StoredDataType::DATA));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700133 EXPECT_THAT(*log_header[3].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700134 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700135 EXPECT_THAT(*log_header[4].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700136 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700137
138 EXPECT_THAT(*log_header[5].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700139 ::testing::ElementsAre(StoredDataType::DATA));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700140 EXPECT_THAT(*log_header[6].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700141 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700142 EXPECT_THAT(*log_header[7].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700143 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700144
145 EXPECT_THAT(*log_header[8].message().data_stored(),
146 ::testing::ElementsAre(StoredDataType::DATA));
147 EXPECT_THAT(*log_header[9].message().data_stored(),
148 ::testing::ElementsAre(StoredDataType::DATA));
149
150 EXPECT_THAT(*log_header[10].message().data_stored(),
151 ::testing::ElementsAre(StoredDataType::DATA));
152 EXPECT_THAT(*log_header[11].message().data_stored(),
153 ::testing::ElementsAre(StoredDataType::DATA));
154
155 EXPECT_THAT(*log_header[12].message().data_stored(),
156 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
157 EXPECT_THAT(*log_header[13].message().data_stored(),
158 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
159 EXPECT_THAT(*log_header[14].message().data_stored(),
160 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
161
162 EXPECT_THAT(*log_header[15].message().data_stored(),
163 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
164 EXPECT_THAT(*log_header[16].message().data_stored(),
165 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
Naman Guptaa63aa132023-03-22 20:06:34 -0700166 }
167
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700168 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
169 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -0700170 {
171 using ::testing::UnorderedElementsAre;
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700172 std::shared_ptr<const aos::Configuration> config = sorted_parts[0].config;
Naman Guptaa63aa132023-03-22 20:06:34 -0700173
174 // Timing reports, pings
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700175 if (shared()) {
176 EXPECT_THAT(
177 CountChannelsData(config, logfiles_[2]),
178 UnorderedElementsAre(
179 std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
180 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
181 200),
182 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
183 21),
184 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
185 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
186 std::make_tuple("/test", "aos.examples.Ping", 2001)))
187 << " : " << logfiles_[2];
188 } else {
189 EXPECT_THAT(
190 CountChannelsData(config, logfiles_[2]),
191 UnorderedElementsAre(
192 std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
193 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
194 200),
195 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
196 21),
197 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
198 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
199 std::make_tuple("/test", "aos.examples.Ping", 2001),
200 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
201 "aos-message_bridge-Timestamp",
202 "aos.message_bridge.RemoteMessage", 200)))
203 << " : " << logfiles_[2];
Naman Guptaa63aa132023-03-22 20:06:34 -0700204 }
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700205
206 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
207 ::testing::UnorderedElementsAre())
208 << " : " << logfiles_[3];
209 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
210 ::testing::UnorderedElementsAre())
211 << " : " << logfiles_[4];
212
Naman Guptaa63aa132023-03-22 20:06:34 -0700213 // Timestamps for pong
214 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
215 UnorderedElementsAre())
216 << " : " << logfiles_[2];
217 EXPECT_THAT(
218 CountChannelsTimestamp(config, logfiles_[3]),
219 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
220 << " : " << logfiles_[3];
221 EXPECT_THAT(
222 CountChannelsTimestamp(config, logfiles_[4]),
223 UnorderedElementsAre(
224 std::make_tuple("/test", "aos.examples.Pong", 2000),
225 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)))
226 << " : " << logfiles_[4];
227
Naman Guptaa63aa132023-03-22 20:06:34 -0700228 // Timing reports and pongs.
Naman Guptaa63aa132023-03-22 20:06:34 -0700229 EXPECT_THAT(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700230 CountChannelsData(config, logfiles_[5]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700231 UnorderedElementsAre(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700232 std::make_tuple("/pi2/aos", "aos.examples.Ping", 2001),
Naman Guptaa63aa132023-03-22 20:06:34 -0700233 std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
234 200),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700235 std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
236 21),
237 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700238 std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700239 std::make_tuple("/test", "aos.examples.Pong", 2001)))
240 << " : " << logfiles_[5];
241 EXPECT_THAT(CountChannelsData(config, logfiles_[6]), UnorderedElementsAre())
242 << " : " << logfiles_[6];
243 EXPECT_THAT(CountChannelsData(config, logfiles_[7]), UnorderedElementsAre())
Naman Guptaa63aa132023-03-22 20:06:34 -0700244 << " : " << logfiles_[7];
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700245 // And ping timestamps.
246 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
247 UnorderedElementsAre())
248 << " : " << logfiles_[5];
Naman Guptaa63aa132023-03-22 20:06:34 -0700249 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700250 CountChannelsTimestamp(config, logfiles_[6]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700251 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700252 << " : " << logfiles_[6];
Naman Guptaa63aa132023-03-22 20:06:34 -0700253 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700254 CountChannelsTimestamp(config, logfiles_[7]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700255 UnorderedElementsAre(
256 std::make_tuple("/test", "aos.examples.Ping", 2000),
257 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700258 << " : " << logfiles_[7];
Naman Guptaa63aa132023-03-22 20:06:34 -0700259
260 // And then test that the remotely logged timestamp data files only have
261 // timestamps in them.
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700262 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[8]),
263 UnorderedElementsAre())
264 << " : " << logfiles_[8];
265 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[9]),
266 UnorderedElementsAre())
267 << " : " << logfiles_[9];
Naman Guptaa63aa132023-03-22 20:06:34 -0700268 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
269 UnorderedElementsAre())
270 << " : " << logfiles_[10];
271 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
272 UnorderedElementsAre())
273 << " : " << logfiles_[11];
Naman Guptaa63aa132023-03-22 20:06:34 -0700274
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700275 EXPECT_THAT(CountChannelsData(config, logfiles_[8]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700276 UnorderedElementsAre(std::make_tuple(
277 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700278 << " : " << logfiles_[8];
279 EXPECT_THAT(CountChannelsData(config, logfiles_[9]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700280 UnorderedElementsAre(std::make_tuple(
281 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700282 << " : " << logfiles_[9];
Naman Guptaa63aa132023-03-22 20:06:34 -0700283
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700284 // Pong snd timestamp data.
285 EXPECT_THAT(
286 CountChannelsData(config, logfiles_[10]),
287 UnorderedElementsAre(
288 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 9),
289 std::make_tuple("/test", "aos.examples.Pong", 91)))
290 << " : " << logfiles_[10];
291 EXPECT_THAT(
292 CountChannelsData(config, logfiles_[11]),
293 UnorderedElementsAre(
294 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 191),
295 std::make_tuple("/test", "aos.examples.Pong", 1910)))
296 << " : " << logfiles_[11];
Naman Guptaa63aa132023-03-22 20:06:34 -0700297
298 // Timestamps from pi2 on pi1, and the other way.
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700299 // if (shared()) {
300 EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
301 UnorderedElementsAre())
302 << " : " << logfiles_[12];
303 EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
304 UnorderedElementsAre())
305 << " : " << logfiles_[13];
306 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
307 UnorderedElementsAre())
308 << " : " << logfiles_[14];
309 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
310 UnorderedElementsAre())
311 << " : " << logfiles_[15];
312 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
313 UnorderedElementsAre())
314 << " : " << logfiles_[16];
Naman Guptaa63aa132023-03-22 20:06:34 -0700315
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700316 EXPECT_THAT(
317 CountChannelsTimestamp(config, logfiles_[12]),
318 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
319 << " : " << logfiles_[12];
320 EXPECT_THAT(
321 CountChannelsTimestamp(config, logfiles_[13]),
322 UnorderedElementsAre(
323 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 9),
324 std::make_tuple("/test", "aos.examples.Ping", 90)))
325 << " : " << logfiles_[13];
326 EXPECT_THAT(
327 CountChannelsTimestamp(config, logfiles_[14]),
328 UnorderedElementsAre(
329 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 191),
330 std::make_tuple("/test", "aos.examples.Ping", 1910)))
331 << " : " << logfiles_[14];
332 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
333 UnorderedElementsAre(std::make_tuple(
334 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
335 << " : " << logfiles_[15];
336 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[16]),
337 UnorderedElementsAre(std::make_tuple(
338 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
339 << " : " << logfiles_[16];
Naman Guptaa63aa132023-03-22 20:06:34 -0700340 }
341
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700342 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700343
344 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
345 log_reader_factory.set_send_delay(chrono::microseconds(0));
346
347 // This sends out the fetched messages and advances time to the start of the
348 // log file.
349 reader.Register(&log_reader_factory);
350
351 const Node *pi1 =
352 configuration::GetNode(log_reader_factory.configuration(), "pi1");
353 const Node *pi2 =
354 configuration::GetNode(log_reader_factory.configuration(), "pi2");
355
356 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
357 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
358 LOG(INFO) << "now pi1 "
359 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
360 LOG(INFO) << "now pi2 "
361 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
362
363 EXPECT_THAT(reader.LoggedNodes(),
364 ::testing::ElementsAre(
365 configuration::GetNode(reader.logged_configuration(), pi1),
366 configuration::GetNode(reader.logged_configuration(), pi2)));
367
368 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
369
370 std::unique_ptr<EventLoop> pi1_event_loop =
371 log_reader_factory.MakeEventLoop("test", pi1);
372 std::unique_ptr<EventLoop> pi2_event_loop =
373 log_reader_factory.MakeEventLoop("test", pi2);
374
375 int pi1_ping_count = 10;
376 int pi2_ping_count = 10;
377 int pi1_pong_count = 10;
378 int pi2_pong_count = 10;
379
380 // Confirm that the ping value matches.
381 pi1_event_loop->MakeWatcher(
382 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
383 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
384 << pi1_event_loop->context().monotonic_remote_time << " -> "
385 << pi1_event_loop->context().monotonic_event_time;
386 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
387 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
388 pi1_ping_count * chrono::milliseconds(10) +
389 monotonic_clock::epoch());
390 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
391 pi1_ping_count * chrono::milliseconds(10) +
392 realtime_clock::epoch());
393 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
394 pi1_event_loop->context().monotonic_event_time);
395 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
396 pi1_event_loop->context().realtime_event_time);
397
398 ++pi1_ping_count;
399 });
400 pi2_event_loop->MakeWatcher(
401 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
402 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
403 << pi2_event_loop->context().monotonic_remote_time << " -> "
404 << pi2_event_loop->context().monotonic_event_time;
405 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
406
407 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
408 pi2_ping_count * chrono::milliseconds(10) +
409 monotonic_clock::epoch());
410 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
411 pi2_ping_count * chrono::milliseconds(10) +
412 realtime_clock::epoch());
413 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time +
414 chrono::microseconds(150),
415 pi2_event_loop->context().monotonic_event_time);
416 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time +
417 chrono::microseconds(150),
418 pi2_event_loop->context().realtime_event_time);
419 ++pi2_ping_count;
420 });
421
422 constexpr ssize_t kQueueIndexOffset = -9;
423 // Confirm that the ping and pong counts both match, and the value also
424 // matches.
425 pi1_event_loop->MakeWatcher(
426 "/test", [&pi1_event_loop, &pi1_ping_count,
427 &pi1_pong_count](const examples::Pong &pong) {
428 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
429 << pi1_event_loop->context().monotonic_remote_time << " -> "
430 << pi1_event_loop->context().monotonic_event_time;
431
432 EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
433 pi1_pong_count + kQueueIndexOffset);
434 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
435 chrono::microseconds(200) +
436 pi1_pong_count * chrono::milliseconds(10) +
437 monotonic_clock::epoch());
438 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
439 chrono::microseconds(200) +
440 pi1_pong_count * chrono::milliseconds(10) +
441 realtime_clock::epoch());
442
443 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
444 chrono::microseconds(150),
445 pi1_event_loop->context().monotonic_event_time);
446 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
447 chrono::microseconds(150),
448 pi1_event_loop->context().realtime_event_time);
449
450 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
451 ++pi1_pong_count;
452 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
453 });
454 pi2_event_loop->MakeWatcher(
455 "/test", [&pi2_event_loop, &pi2_ping_count,
456 &pi2_pong_count](const examples::Pong &pong) {
457 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
458 << pi2_event_loop->context().monotonic_remote_time << " -> "
459 << pi2_event_loop->context().monotonic_event_time;
460
461 EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
462 pi2_pong_count + kQueueIndexOffset);
463
464 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
465 chrono::microseconds(200) +
466 pi2_pong_count * chrono::milliseconds(10) +
467 monotonic_clock::epoch());
468 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
469 chrono::microseconds(200) +
470 pi2_pong_count * chrono::milliseconds(10) +
471 realtime_clock::epoch());
472
473 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
474 pi2_event_loop->context().monotonic_event_time);
475 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
476 pi2_event_loop->context().realtime_event_time);
477
478 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
479 ++pi2_pong_count;
480 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
481 });
482
483 log_reader_factory.Run();
484 EXPECT_EQ(pi1_ping_count, 2010);
485 EXPECT_EQ(pi2_ping_count, 2010);
486 EXPECT_EQ(pi1_pong_count, 2010);
487 EXPECT_EQ(pi2_pong_count, 2010);
488
489 reader.Deregister();
490}
491
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600492// MultinodeLoggerTest that tests the mutate callback works across multiple
493// nodes with remapping
494TEST_P(MultinodeLoggerTest, MultiNodeRemapMutateCallback) {
495 time_converter_.StartEqual();
496 std::vector<std::string> actual_filenames;
497
498 {
499 LoggerState pi1_logger = MakeLogger(pi1_);
500 LoggerState pi2_logger = MakeLogger(pi2_);
501
502 event_loop_factory_.RunFor(chrono::milliseconds(95));
503
504 StartLogger(&pi1_logger);
505 StartLogger(&pi2_logger);
506
507 event_loop_factory_.RunFor(chrono::milliseconds(20000));
508 pi1_logger.AppendAllFilenames(&actual_filenames);
509 pi2_logger.AppendAllFilenames(&actual_filenames);
510 }
511
512 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700513 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600514
515 LogReader reader(sorted_parts, &config_.message());
516 // Remap just on pi1.
517 reader.RemapLoggedChannel<examples::Pong>(
518 "/test", configuration::GetNode(reader.configuration(), "pi1"));
519
520 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
521
522 int pong_count = 0;
523 // Adds a callback which mutates the value of the pong message before the
524 // message is sent which is the feature we are testing here
525 reader.AddBeforeSendCallback("/test",
526 [&pong_count](aos::examples::Pong *pong) {
527 pong->mutate_value(pong->value() + 1);
528 pong_count = pong->value();
529 });
530
531 // This sends out the fetched messages and advances time to the start of the
532 // log file.
533 reader.Register(&log_reader_factory);
534
535 const Node *pi1 =
536 configuration::GetNode(log_reader_factory.configuration(), "pi1");
537 const Node *pi2 =
538 configuration::GetNode(log_reader_factory.configuration(), "pi2");
539
540 EXPECT_THAT(reader.LoggedNodes(),
541 ::testing::ElementsAre(
542 configuration::GetNode(reader.logged_configuration(), pi1),
543 configuration::GetNode(reader.logged_configuration(), pi2)));
544
545 std::unique_ptr<EventLoop> pi1_event_loop =
546 log_reader_factory.MakeEventLoop("test", pi1);
547 std::unique_ptr<EventLoop> pi2_event_loop =
548 log_reader_factory.MakeEventLoop("test", pi2);
549
550 pi1_event_loop->MakeWatcher("/original/test",
551 [&pong_count](const examples::Pong &pong) {
552 EXPECT_EQ(pong_count, pong.value());
553 });
554
555 pi2_event_loop->MakeWatcher("/test",
556 [&pong_count](const examples::Pong &pong) {
557 EXPECT_EQ(pong_count, pong.value());
558 });
559
560 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
561 reader.Deregister();
562
563 EXPECT_EQ(pong_count, 2011);
564}
565
566// MultinodeLoggerTest that tests the mutate callback works across multiple
567// nodes
568TEST_P(MultinodeLoggerTest, MultiNodeMutateCallback) {
569 time_converter_.StartEqual();
570 std::vector<std::string> actual_filenames;
571
572 {
573 LoggerState pi1_logger = MakeLogger(pi1_);
574 LoggerState pi2_logger = MakeLogger(pi2_);
575
576 event_loop_factory_.RunFor(chrono::milliseconds(95));
577
578 StartLogger(&pi1_logger);
579 StartLogger(&pi2_logger);
580
581 event_loop_factory_.RunFor(chrono::milliseconds(20000));
582 pi1_logger.AppendAllFilenames(&actual_filenames);
583 pi2_logger.AppendAllFilenames(&actual_filenames);
584 }
585
586 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700587 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600588
589 LogReader reader(sorted_parts, &config_.message());
590
591 int pong_count = 0;
592 // Adds a callback which mutates the value of the pong message before the
593 // message is sent which is the feature we are testing here
594 reader.AddBeforeSendCallback("/test",
595 [&pong_count](aos::examples::Pong *pong) {
596 pong->mutate_value(pong->value() + 1);
597 pong_count = pong->value();
598 });
599
600 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
601
602 // This sends out the fetched messages and advances time to the start of the
603 // log file.
604 reader.Register(&log_reader_factory);
605
606 const Node *pi1 =
607 configuration::GetNode(log_reader_factory.configuration(), "pi1");
608 const Node *pi2 =
609 configuration::GetNode(log_reader_factory.configuration(), "pi2");
610
611 EXPECT_THAT(reader.LoggedNodes(),
612 ::testing::ElementsAre(
613 configuration::GetNode(reader.logged_configuration(), pi1),
614 configuration::GetNode(reader.logged_configuration(), pi2)));
615
616 std::unique_ptr<EventLoop> pi1_event_loop =
617 log_reader_factory.MakeEventLoop("test", pi1);
618 std::unique_ptr<EventLoop> pi2_event_loop =
619 log_reader_factory.MakeEventLoop("test", pi2);
620
621 pi1_event_loop->MakeWatcher("/test",
622 [&pong_count](const examples::Pong &pong) {
623 EXPECT_EQ(pong_count, pong.value());
624 });
625
626 pi2_event_loop->MakeWatcher("/test",
627 [&pong_count](const examples::Pong &pong) {
628 EXPECT_EQ(pong_count, pong.value());
629 });
630
631 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
632 reader.Deregister();
633
634 EXPECT_EQ(pong_count, 2011);
635}
636
637// Tests that the before send callback is only called from the sender node if it
638// is forwarded
639TEST_P(MultinodeLoggerTest, OnlyDoBeforeSendCallbackOnSenderNode) {
640 time_converter_.StartEqual();
641 {
642 LoggerState pi1_logger = MakeLogger(pi1_);
643 LoggerState pi2_logger = MakeLogger(pi2_);
644
645 event_loop_factory_.RunFor(chrono::milliseconds(95));
646
647 StartLogger(&pi1_logger);
648 StartLogger(&pi2_logger);
649
650 event_loop_factory_.RunFor(chrono::milliseconds(20000));
651 }
652
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700653 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
654 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
655 LogReader reader(sorted_parts);
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600656
657 int ping_count = 0;
658 // Adds a callback which mutates the value of the pong message before the
659 // message is sent which is the feature we are testing here
660 reader.AddBeforeSendCallback("/test",
661 [&ping_count](aos::examples::Ping *ping) {
662 ++ping_count;
663 ping->mutate_value(ping_count);
664 });
665
666 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
667 log_reader_factory.set_send_delay(chrono::microseconds(0));
668
669 reader.Register(&log_reader_factory);
670
671 const Node *pi1 =
672 configuration::GetNode(log_reader_factory.configuration(), "pi1");
673 const Node *pi2 =
674 configuration::GetNode(log_reader_factory.configuration(), "pi2");
675
676 std::unique_ptr<EventLoop> pi1_event_loop =
677 log_reader_factory.MakeEventLoop("test", pi1);
678 pi1_event_loop->SkipTimingReport();
679 std::unique_ptr<EventLoop> pi2_event_loop =
680 log_reader_factory.MakeEventLoop("test", pi2);
681 pi2_event_loop->SkipTimingReport();
682
683 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
684 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
685
686 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
687 pi1_ping_timestamp;
688 if (!shared()) {
689 pi1_ping_timestamp =
690 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
691 pi1_event_loop.get(),
692 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
693 }
694
695 log_reader_factory.Run();
696
697 EXPECT_EQ(pi1_ping.count(), 2000u);
698 EXPECT_EQ(pi2_ping.count(), 2000u);
699 // If the BeforeSendCallback is called on both nodes, then the ping count
700 // would be 4002 instead of 2001
701 EXPECT_EQ(ping_count, 2001u);
702 if (!shared()) {
703 EXPECT_EQ(pi1_ping_timestamp->count(), 2000u);
704 }
705
706 reader.Deregister();
707}
708
709// Tests that we do not allow adding callbacks after Register is called
710TEST_P(MultinodeLoggerDeathTest, AddCallbackAfterRegister) {
711 time_converter_.StartEqual();
712 std::vector<std::string> actual_filenames;
713
714 {
715 LoggerState pi1_logger = MakeLogger(pi1_);
716 LoggerState pi2_logger = MakeLogger(pi2_);
717
718 event_loop_factory_.RunFor(chrono::milliseconds(95));
719
720 StartLogger(&pi1_logger);
721 StartLogger(&pi2_logger);
722
723 event_loop_factory_.RunFor(chrono::milliseconds(20000));
724 pi1_logger.AppendAllFilenames(&actual_filenames);
725 pi2_logger.AppendAllFilenames(&actual_filenames);
726 }
727
728 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700729 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600730
731 LogReader reader(sorted_parts, &config_.message());
732 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
733 reader.Register(&log_reader_factory);
734 EXPECT_DEATH(
735 {
736 reader.AddBeforeSendCallback("/test", [](aos::examples::Pong *) {
737 LOG(FATAL) << "This should not be called";
738 });
739 },
740 "Cannot add callbacks after calling Register");
741 reader.Deregister();
742}
743
Naman Guptaa63aa132023-03-22 20:06:34 -0700744// Test that if we feed the replay with a mismatched node list that we die on
745// the LogReader constructor.
746TEST_P(MultinodeLoggerDeathTest, MultiNodeBadReplayConfig) {
747 time_converter_.StartEqual();
748 {
749 LoggerState pi1_logger = MakeLogger(pi1_);
750 LoggerState pi2_logger = MakeLogger(pi2_);
751
752 event_loop_factory_.RunFor(chrono::milliseconds(95));
753
754 StartLogger(&pi1_logger);
755 StartLogger(&pi2_logger);
756
757 event_loop_factory_.RunFor(chrono::milliseconds(20000));
758 }
759
760 // Test that, if we add an additional node to the replay config that the
761 // logger complains about the mismatch in number of nodes.
762 FlatbufferDetachedBuffer<Configuration> extra_nodes_config =
763 configuration::MergeWithConfig(&config_.message(), R"({
764 "nodes": [
765 {
766 "name": "extra-node"
767 }
768 ]
769 }
770 )");
771
772 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700773 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -0700774 EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
775 "Log file and replay config need to have matching nodes lists.");
776}
777
778// Tests that we can read log files where they don't start at the same monotonic
779// time.
780TEST_P(MultinodeLoggerTest, StaggeredStart) {
781 time_converter_.StartEqual();
782 std::vector<std::string> actual_filenames;
783
784 {
785 LoggerState pi1_logger = MakeLogger(pi1_);
786 LoggerState pi2_logger = MakeLogger(pi2_);
787
788 event_loop_factory_.RunFor(chrono::milliseconds(95));
789
790 StartLogger(&pi1_logger);
791
792 event_loop_factory_.RunFor(chrono::milliseconds(200));
793
794 StartLogger(&pi2_logger);
795
796 event_loop_factory_.RunFor(chrono::milliseconds(20000));
797 pi1_logger.AppendAllFilenames(&actual_filenames);
798 pi2_logger.AppendAllFilenames(&actual_filenames);
799 }
800
801 // Since we delay starting pi2, it already knows about all the timestamps so
802 // we don't end up with extra parts.
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700803 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
804 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
805 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700806
807 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
808 log_reader_factory.set_send_delay(chrono::microseconds(0));
809
810 // This sends out the fetched messages and advances time to the start of the
811 // log file.
812 reader.Register(&log_reader_factory);
813
814 const Node *pi1 =
815 configuration::GetNode(log_reader_factory.configuration(), "pi1");
816 const Node *pi2 =
817 configuration::GetNode(log_reader_factory.configuration(), "pi2");
818
819 EXPECT_THAT(reader.LoggedNodes(),
820 ::testing::ElementsAre(
821 configuration::GetNode(reader.logged_configuration(), pi1),
822 configuration::GetNode(reader.logged_configuration(), pi2)));
823
824 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
825
826 std::unique_ptr<EventLoop> pi1_event_loop =
827 log_reader_factory.MakeEventLoop("test", pi1);
828 std::unique_ptr<EventLoop> pi2_event_loop =
829 log_reader_factory.MakeEventLoop("test", pi2);
830
831 int pi1_ping_count = 30;
832 int pi2_ping_count = 30;
833 int pi1_pong_count = 30;
834 int pi2_pong_count = 30;
835
836 // Confirm that the ping value matches.
837 pi1_event_loop->MakeWatcher(
838 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
839 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
840 << pi1_event_loop->context().monotonic_remote_time << " -> "
841 << pi1_event_loop->context().monotonic_event_time;
842 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
843
844 ++pi1_ping_count;
845 });
846 pi2_event_loop->MakeWatcher(
847 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
848 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
849 << pi2_event_loop->context().monotonic_remote_time << " -> "
850 << pi2_event_loop->context().monotonic_event_time;
851 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
852
853 ++pi2_ping_count;
854 });
855
856 // Confirm that the ping and pong counts both match, and the value also
857 // matches.
858 pi1_event_loop->MakeWatcher(
859 "/test", [&pi1_event_loop, &pi1_ping_count,
860 &pi1_pong_count](const examples::Pong &pong) {
861 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
862 << pi1_event_loop->context().monotonic_remote_time << " -> "
863 << pi1_event_loop->context().monotonic_event_time;
864
865 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
866 ++pi1_pong_count;
867 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
868 });
869 pi2_event_loop->MakeWatcher(
870 "/test", [&pi2_event_loop, &pi2_ping_count,
871 &pi2_pong_count](const examples::Pong &pong) {
872 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
873 << pi2_event_loop->context().monotonic_remote_time << " -> "
874 << pi2_event_loop->context().monotonic_event_time;
875
876 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
877 ++pi2_pong_count;
878 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
879 });
880
881 log_reader_factory.Run();
882 EXPECT_EQ(pi1_ping_count, 2030);
883 EXPECT_EQ(pi2_ping_count, 2030);
884 EXPECT_EQ(pi1_pong_count, 2030);
885 EXPECT_EQ(pi2_pong_count, 2030);
886
887 reader.Deregister();
888}
889
890// Tests that we can read log files where the monotonic clocks drift and don't
891// match correctly. While we are here, also test that different ending times
892// also is readable.
893TEST_P(MultinodeLoggerTest, MismatchedClocks) {
894 // TODO(austin): Negate...
895 const chrono::nanoseconds initial_pi2_offset = chrono::seconds(1000);
896
897 time_converter_.AddMonotonic(
898 {BootTimestamp::epoch(), BootTimestamp::epoch() + initial_pi2_offset});
899 // Wait for 95 ms, (~0.1 seconds - 1/2 of the ping/pong period), and set the
900 // skew to be 200 uS/s
901 const chrono::nanoseconds startup_sleep1 = time_converter_.AddMonotonic(
902 {chrono::milliseconds(95),
903 chrono::milliseconds(95) - chrono::nanoseconds(200) * 95});
904 // Run another 200 ms to have one logger start first.
905 const chrono::nanoseconds startup_sleep2 = time_converter_.AddMonotonic(
906 {chrono::milliseconds(200), chrono::milliseconds(200)});
907 // Slew one way then the other at the same 200 uS/S slew rate. Make sure we
908 // go far enough to cause problems if this isn't accounted for.
909 const chrono::nanoseconds logger_run1 = time_converter_.AddMonotonic(
910 {chrono::milliseconds(20000),
911 chrono::milliseconds(20000) - chrono::nanoseconds(200) * 20000});
912 const chrono::nanoseconds logger_run2 = time_converter_.AddMonotonic(
913 {chrono::milliseconds(40000),
914 chrono::milliseconds(40000) + chrono::nanoseconds(200) * 40000});
915 const chrono::nanoseconds logger_run3 = time_converter_.AddMonotonic(
916 {chrono::milliseconds(400), chrono::milliseconds(400)});
917
918 {
919 LoggerState pi2_logger = MakeLogger(pi2_);
920
921 LOG(INFO) << "pi2 times: " << pi2_->monotonic_now() << " "
922 << pi2_->realtime_now() << " distributed "
923 << pi2_->ToDistributedClock(pi2_->monotonic_now());
924
925 LOG(INFO) << "pi2_ times: " << pi2_->monotonic_now() << " "
926 << pi2_->realtime_now() << " distributed "
927 << pi2_->ToDistributedClock(pi2_->monotonic_now());
928
929 event_loop_factory_.RunFor(startup_sleep1);
930
931 StartLogger(&pi2_logger);
932
933 event_loop_factory_.RunFor(startup_sleep2);
934
935 {
936 // Run pi1's logger for only part of the time.
937 LoggerState pi1_logger = MakeLogger(pi1_);
938
939 StartLogger(&pi1_logger);
940 event_loop_factory_.RunFor(logger_run1);
941
942 // Make sure we slewed time far enough so that the difference is greater
943 // than the network delay. This confirms that if we sort incorrectly, it
944 // would show in the results.
945 EXPECT_LT(
946 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
947 -event_loop_factory_.send_delay() -
948 event_loop_factory_.network_delay());
949
950 event_loop_factory_.RunFor(logger_run2);
951
952 // And now check that we went far enough the other way to make sure we
953 // cover both problems.
954 EXPECT_GT(
955 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
956 event_loop_factory_.send_delay() +
957 event_loop_factory_.network_delay());
958 }
959
960 // And log a bit more on pi2.
961 event_loop_factory_.RunFor(logger_run3);
962 }
963
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700964 const std::vector<LogFile> sorted_parts =
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700965 SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 1, 1, 2, 2));
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700966 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
967 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700968
969 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
970 log_reader_factory.set_send_delay(chrono::microseconds(0));
971
972 const Node *pi1 =
973 configuration::GetNode(log_reader_factory.configuration(), "pi1");
974 const Node *pi2 =
975 configuration::GetNode(log_reader_factory.configuration(), "pi2");
976
977 // This sends out the fetched messages and advances time to the start of the
978 // log file.
979 reader.Register(&log_reader_factory);
980
981 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
982 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
983 LOG(INFO) << "now pi1 "
984 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
985 LOG(INFO) << "now pi2 "
986 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
987
988 LOG(INFO) << "Done registering (pi1) "
989 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
990 << " "
991 << log_reader_factory.GetNodeEventLoopFactory(pi1)->realtime_now();
992 LOG(INFO) << "Done registering (pi2) "
993 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now()
994 << " "
995 << log_reader_factory.GetNodeEventLoopFactory(pi2)->realtime_now();
996
997 EXPECT_THAT(reader.LoggedNodes(),
998 ::testing::ElementsAre(
999 configuration::GetNode(reader.logged_configuration(), pi1),
1000 configuration::GetNode(reader.logged_configuration(), pi2)));
1001
1002 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1003
1004 std::unique_ptr<EventLoop> pi1_event_loop =
1005 log_reader_factory.MakeEventLoop("test", pi1);
1006 std::unique_ptr<EventLoop> pi2_event_loop =
1007 log_reader_factory.MakeEventLoop("test", pi2);
1008
1009 int pi1_ping_count = 30;
1010 int pi2_ping_count = 30;
1011 int pi1_pong_count = 30;
1012 int pi2_pong_count = 30;
1013
1014 // Confirm that the ping value matches.
1015 pi1_event_loop->MakeWatcher(
1016 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
1017 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
1018 << pi1_event_loop->context().monotonic_remote_time << " -> "
1019 << pi1_event_loop->context().monotonic_event_time;
1020 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
1021
1022 ++pi1_ping_count;
1023 });
1024 pi2_event_loop->MakeWatcher(
1025 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
1026 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
1027 << pi2_event_loop->context().monotonic_remote_time << " -> "
1028 << pi2_event_loop->context().monotonic_event_time;
1029 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
1030
1031 ++pi2_ping_count;
1032 });
1033
1034 // Confirm that the ping and pong counts both match, and the value also
1035 // matches.
1036 pi1_event_loop->MakeWatcher(
1037 "/test", [&pi1_event_loop, &pi1_ping_count,
1038 &pi1_pong_count](const examples::Pong &pong) {
1039 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
1040 << pi1_event_loop->context().monotonic_remote_time << " -> "
1041 << pi1_event_loop->context().monotonic_event_time;
1042
1043 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
1044 ++pi1_pong_count;
1045 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
1046 });
1047 pi2_event_loop->MakeWatcher(
1048 "/test", [&pi2_event_loop, &pi2_ping_count,
1049 &pi2_pong_count](const examples::Pong &pong) {
1050 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
1051 << pi2_event_loop->context().monotonic_remote_time << " -> "
1052 << pi2_event_loop->context().monotonic_event_time;
1053
1054 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
1055 ++pi2_pong_count;
1056 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
1057 });
1058
1059 log_reader_factory.Run();
1060 EXPECT_EQ(pi1_ping_count, 6030);
1061 EXPECT_EQ(pi2_ping_count, 6030);
1062 EXPECT_EQ(pi1_pong_count, 6030);
1063 EXPECT_EQ(pi2_pong_count, 6030);
1064
1065 reader.Deregister();
1066}
1067
1068// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
1069TEST_P(MultinodeLoggerTest, SortParts) {
1070 time_converter_.StartEqual();
1071 // Make a bunch of parts.
1072 {
1073 LoggerState pi1_logger = MakeLogger(pi1_);
1074 LoggerState pi2_logger = MakeLogger(pi2_);
1075
1076 event_loop_factory_.RunFor(chrono::milliseconds(95));
1077
1078 StartLogger(&pi1_logger);
1079 StartLogger(&pi2_logger);
1080
1081 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1082 }
1083
1084 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1085 VerifyParts(sorted_parts);
1086}
1087
1088// Tests that we can sort a bunch of parts with an empty part. We should ignore
1089// it and remove it from the sorted list.
1090TEST_P(MultinodeLoggerTest, SortEmptyParts) {
1091 time_converter_.StartEqual();
1092 // Make a bunch of parts.
1093 {
1094 LoggerState pi1_logger = MakeLogger(pi1_);
1095 LoggerState pi2_logger = MakeLogger(pi2_);
1096
1097 event_loop_factory_.RunFor(chrono::milliseconds(95));
1098
1099 StartLogger(&pi1_logger);
1100 StartLogger(&pi2_logger);
1101
1102 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1103 }
1104
1105 // TODO(austin): Should we flip out if the file can't open?
1106 const std::string kEmptyFile("foobarinvalidfiledoesnotexist" + Extension());
1107
1108 aos::util::WriteStringToFileOrDie(kEmptyFile, "");
1109 logfiles_.emplace_back(kEmptyFile);
1110
1111 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1112 VerifyParts(sorted_parts, {kEmptyFile});
1113}
1114
1115// Tests that we can sort a bunch of parts with the end missing off a
1116// file. We should use the part we can read.
1117TEST_P(MultinodeLoggerTest, SortTruncatedParts) {
1118 std::vector<std::string> actual_filenames;
1119 time_converter_.StartEqual();
1120 // Make a bunch of parts.
1121 {
1122 LoggerState pi1_logger = MakeLogger(pi1_);
1123 LoggerState pi2_logger = MakeLogger(pi2_);
1124
1125 event_loop_factory_.RunFor(chrono::milliseconds(95));
1126
1127 StartLogger(&pi1_logger);
1128 StartLogger(&pi2_logger);
1129
1130 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1131
1132 pi1_logger.AppendAllFilenames(&actual_filenames);
1133 pi2_logger.AppendAllFilenames(&actual_filenames);
1134 }
1135
1136 ASSERT_THAT(actual_filenames,
1137 ::testing::UnorderedElementsAreArray(logfiles_));
1138
1139 // Strip off the end of one of the files. Pick one with a lot of data.
1140 // For snappy, needs to have enough data to be >1 chunk of compressed data so
1141 // that we don't corrupt the entire log part.
1142 ::std::string compressed_contents =
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07001143 aos::util::ReadFileToStringOrDie(logfiles_[2]);
Naman Guptaa63aa132023-03-22 20:06:34 -07001144
1145 aos::util::WriteStringToFileOrDie(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07001146 logfiles_[2],
Naman Guptaa63aa132023-03-22 20:06:34 -07001147 compressed_contents.substr(0, compressed_contents.size() - 100));
1148
1149 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1150 VerifyParts(sorted_parts);
1151}
1152
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001153// Tests that if we remap a logged channel, it shows up correctly.
Naman Guptaa63aa132023-03-22 20:06:34 -07001154TEST_P(MultinodeLoggerTest, RemapLoggedChannel) {
1155 time_converter_.StartEqual();
1156 {
1157 LoggerState pi1_logger = MakeLogger(pi1_);
1158 LoggerState pi2_logger = MakeLogger(pi2_);
1159
1160 event_loop_factory_.RunFor(chrono::milliseconds(95));
1161
1162 StartLogger(&pi1_logger);
1163 StartLogger(&pi2_logger);
1164
1165 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1166 }
1167
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001168 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1169 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1170 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001171
1172 // Remap just on pi1.
1173 reader.RemapLoggedChannel<aos::timing::Report>(
1174 "/aos", configuration::GetNode(reader.configuration(), "pi1"));
1175
1176 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1177 log_reader_factory.set_send_delay(chrono::microseconds(0));
1178
1179 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1180 // Note: An extra channel gets remapped automatically due to a timestamp
1181 // channel being LOCAL_LOGGER'd.
1182 ASSERT_EQ(remapped_channels.size(), std::get<0>(GetParam()).shared ? 1u : 2u);
1183 EXPECT_EQ(remapped_channels[0]->name()->string_view(), "/original/pi1/aos");
1184 EXPECT_EQ(remapped_channels[0]->type()->string_view(), "aos.timing.Report");
1185 if (!std::get<0>(GetParam()).shared) {
1186 EXPECT_EQ(remapped_channels[1]->name()->string_view(),
1187 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1188 "aos-message_bridge-Timestamp");
1189 EXPECT_EQ(remapped_channels[1]->type()->string_view(),
1190 "aos.message_bridge.RemoteMessage");
1191 }
1192
1193 reader.Register(&log_reader_factory);
1194
1195 const Node *pi1 =
1196 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1197 const Node *pi2 =
1198 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1199
1200 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1201 // else should have moved.
1202 std::unique_ptr<EventLoop> pi1_event_loop =
1203 log_reader_factory.MakeEventLoop("test", pi1);
1204 pi1_event_loop->SkipTimingReport();
1205 std::unique_ptr<EventLoop> full_pi1_event_loop =
1206 log_reader_factory.MakeEventLoop("test", pi1);
1207 full_pi1_event_loop->SkipTimingReport();
1208 std::unique_ptr<EventLoop> pi2_event_loop =
1209 log_reader_factory.MakeEventLoop("test", pi2);
1210 pi2_event_loop->SkipTimingReport();
1211
1212 MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
1213 "/aos");
1214 MessageCounter<aos::timing::Report> full_pi1_timing_report(
1215 full_pi1_event_loop.get(), "/pi1/aos");
1216 MessageCounter<aos::timing::Report> pi1_original_timing_report(
1217 pi1_event_loop.get(), "/original/aos");
1218 MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
1219 full_pi1_event_loop.get(), "/original/pi1/aos");
1220 MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
1221 "/aos");
1222
1223 log_reader_factory.Run();
1224
1225 EXPECT_EQ(pi1_timing_report.count(), 0u);
1226 EXPECT_EQ(full_pi1_timing_report.count(), 0u);
1227 EXPECT_NE(pi1_original_timing_report.count(), 0u);
1228 EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
1229 EXPECT_NE(pi2_timing_report.count(), 0u);
1230
1231 reader.Deregister();
1232}
1233
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001234// Tests that if we rename a logged channel, it shows up correctly.
1235TEST_P(MultinodeLoggerTest, RenameLoggedChannel) {
1236 std::vector<std::string> actual_filenames;
1237 time_converter_.StartEqual();
1238 {
1239 LoggerState pi1_logger = MakeLogger(pi1_);
1240 LoggerState pi2_logger = MakeLogger(pi2_);
1241
1242 event_loop_factory_.RunFor(chrono::milliseconds(95));
1243
1244 StartLogger(&pi1_logger);
1245 StartLogger(&pi2_logger);
1246
1247 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1248
1249 pi1_logger.AppendAllFilenames(&actual_filenames);
1250 pi2_logger.AppendAllFilenames(&actual_filenames);
1251 }
1252
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001253 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1254 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1255 LogReader reader(sorted_parts);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001256
1257 // Rename just on pi2. Add some global maps just to verify they get added in
1258 // the config and used correctly.
1259 std::vector<MapT> maps;
1260 {
1261 MapT map;
1262 map.match = std::make_unique<ChannelT>();
1263 map.match->name = "/foo*";
1264 map.match->source_node = "pi1";
1265 map.rename = std::make_unique<ChannelT>();
1266 map.rename->name = "/pi1/foo";
1267 maps.emplace_back(std::move(map));
1268 }
1269 {
1270 MapT map;
1271 map.match = std::make_unique<ChannelT>();
1272 map.match->name = "/foo*";
1273 map.match->source_node = "pi2";
1274 map.rename = std::make_unique<ChannelT>();
1275 map.rename->name = "/pi2/foo";
1276 maps.emplace_back(std::move(map));
1277 }
1278 {
1279 MapT map;
1280 map.match = std::make_unique<ChannelT>();
1281 map.match->name = "/foo";
1282 map.match->type = "aos.examples.Ping";
1283 map.rename = std::make_unique<ChannelT>();
1284 map.rename->name = "/foo/renamed";
1285 maps.emplace_back(std::move(map));
1286 }
1287 reader.RenameLoggedChannel<aos::examples::Ping>(
1288 "/aos", configuration::GetNode(reader.configuration(), "pi2"),
1289 "/pi2/foo/renamed", maps);
1290
1291 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1292 log_reader_factory.set_send_delay(chrono::microseconds(0));
1293
1294 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1295 // Note: An extra channel gets remapped automatically due to a timestamp
1296 // channel being LOCAL_LOGGER'd.
1297 const bool shared = std::get<0>(GetParam()).shared;
1298 ASSERT_EQ(remapped_channels.size(), shared ? 1u : 2u);
1299 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->name()->string_view(),
1300 "/pi2/foo/renamed");
1301 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->type()->string_view(),
1302 "aos.examples.Ping");
1303 if (!shared) {
1304 EXPECT_EQ(remapped_channels[0]->name()->string_view(),
1305 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1306 "aos-message_bridge-Timestamp");
1307 EXPECT_EQ(remapped_channels[0]->type()->string_view(),
1308 "aos.message_bridge.RemoteMessage");
1309 }
1310
1311 reader.Register(&log_reader_factory);
1312
1313 const Node *pi1 =
1314 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1315 const Node *pi2 =
1316 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1317
1318 // Confirm we can read the data on the renamed channel, just for pi2. Nothing
1319 // else should have moved.
1320 std::unique_ptr<EventLoop> pi2_event_loop =
1321 log_reader_factory.MakeEventLoop("test", pi2);
1322 pi2_event_loop->SkipTimingReport();
1323 std::unique_ptr<EventLoop> full_pi2_event_loop =
1324 log_reader_factory.MakeEventLoop("test", pi2);
1325 full_pi2_event_loop->SkipTimingReport();
1326 std::unique_ptr<EventLoop> pi1_event_loop =
1327 log_reader_factory.MakeEventLoop("test", pi1);
1328 pi1_event_loop->SkipTimingReport();
1329
1330 MessageCounter<aos::examples::Ping> pi2_ping(pi2_event_loop.get(), "/aos");
1331 MessageCounter<aos::examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1332 "/foo");
1333 MessageCounter<aos::examples::Ping> full_pi2_renamed_ping(
1334 full_pi2_event_loop.get(), "/pi2/foo/renamed");
1335 MessageCounter<aos::examples::Ping> pi1_ping(pi1_event_loop.get(), "/aos");
1336
1337 log_reader_factory.Run();
1338
1339 EXPECT_EQ(pi2_ping.count(), 0u);
1340 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1341 EXPECT_NE(full_pi2_renamed_ping.count(), 0u);
1342 EXPECT_NE(pi1_ping.count(), 0u);
1343
1344 reader.Deregister();
1345}
1346
Naman Guptaa63aa132023-03-22 20:06:34 -07001347// Tests that we can remap a forwarded channel as well.
1348TEST_P(MultinodeLoggerTest, RemapForwardedLoggedChannel) {
1349 time_converter_.StartEqual();
1350 {
1351 LoggerState pi1_logger = MakeLogger(pi1_);
1352 LoggerState pi2_logger = MakeLogger(pi2_);
1353
1354 event_loop_factory_.RunFor(chrono::milliseconds(95));
1355
1356 StartLogger(&pi1_logger);
1357 StartLogger(&pi2_logger);
1358
1359 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1360 }
1361
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001362 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1363 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1364 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001365
1366 reader.RemapLoggedChannel<examples::Ping>("/test");
1367
1368 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1369 log_reader_factory.set_send_delay(chrono::microseconds(0));
1370
1371 reader.Register(&log_reader_factory);
1372
1373 const Node *pi1 =
1374 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1375 const Node *pi2 =
1376 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1377
1378 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1379 // else should have moved.
1380 std::unique_ptr<EventLoop> pi1_event_loop =
1381 log_reader_factory.MakeEventLoop("test", pi1);
1382 pi1_event_loop->SkipTimingReport();
1383 std::unique_ptr<EventLoop> full_pi1_event_loop =
1384 log_reader_factory.MakeEventLoop("test", pi1);
1385 full_pi1_event_loop->SkipTimingReport();
1386 std::unique_ptr<EventLoop> pi2_event_loop =
1387 log_reader_factory.MakeEventLoop("test", pi2);
1388 pi2_event_loop->SkipTimingReport();
1389
1390 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1391 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1392 MessageCounter<examples::Ping> pi1_original_ping(pi1_event_loop.get(),
1393 "/original/test");
1394 MessageCounter<examples::Ping> pi2_original_ping(pi2_event_loop.get(),
1395 "/original/test");
1396
1397 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1398 pi1_original_ping_timestamp;
1399 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1400 pi1_ping_timestamp;
1401 if (!shared()) {
1402 pi1_original_ping_timestamp =
1403 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1404 pi1_event_loop.get(),
1405 "/pi1/aos/remote_timestamps/pi2/original/test/aos-examples-Ping");
1406 pi1_ping_timestamp =
1407 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1408 pi1_event_loop.get(),
1409 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1410 }
1411
1412 log_reader_factory.Run();
1413
1414 EXPECT_EQ(pi1_ping.count(), 0u);
1415 EXPECT_EQ(pi2_ping.count(), 0u);
1416 EXPECT_NE(pi1_original_ping.count(), 0u);
1417 EXPECT_NE(pi2_original_ping.count(), 0u);
1418 if (!shared()) {
1419 EXPECT_NE(pi1_original_ping_timestamp->count(), 0u);
1420 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1421 }
1422
1423 reader.Deregister();
1424}
1425
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001426// Tests that we can rename a forwarded channel as well.
1427TEST_P(MultinodeLoggerTest, RenameForwardedLoggedChannel) {
1428 std::vector<std::string> actual_filenames;
1429 time_converter_.StartEqual();
1430 {
1431 LoggerState pi1_logger = MakeLogger(pi1_);
1432 LoggerState pi2_logger = MakeLogger(pi2_);
1433
1434 event_loop_factory_.RunFor(chrono::milliseconds(95));
1435
1436 StartLogger(&pi1_logger);
1437 StartLogger(&pi2_logger);
1438
1439 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1440
1441 pi1_logger.AppendAllFilenames(&actual_filenames);
1442 pi2_logger.AppendAllFilenames(&actual_filenames);
1443 }
1444
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001445 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1446 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1447 LogReader reader(sorted_parts);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001448
1449 std::vector<MapT> maps;
1450 {
1451 MapT map;
1452 map.match = std::make_unique<ChannelT>();
1453 map.match->name = "/production*";
1454 map.match->source_node = "pi1";
1455 map.rename = std::make_unique<ChannelT>();
1456 map.rename->name = "/pi1/production";
1457 maps.emplace_back(std::move(map));
1458 }
1459 {
1460 MapT map;
1461 map.match = std::make_unique<ChannelT>();
1462 map.match->name = "/production*";
1463 map.match->source_node = "pi2";
1464 map.rename = std::make_unique<ChannelT>();
1465 map.rename->name = "/pi2/production";
1466 maps.emplace_back(std::move(map));
1467 }
1468 reader.RenameLoggedChannel<aos::examples::Ping>(
1469 "/test", configuration::GetNode(reader.configuration(), "pi1"),
1470 "/pi1/production", maps);
1471
1472 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1473 log_reader_factory.set_send_delay(chrono::microseconds(0));
1474
1475 reader.Register(&log_reader_factory);
1476
1477 const Node *pi1 =
1478 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1479 const Node *pi2 =
1480 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1481
1482 // Confirm we can read the data on the renamed channel, on both the source
1483 // node and the remote node. In case of split timestamp channels, confirm that
1484 // we receive the timestamp messages on the renamed channel as well.
1485 std::unique_ptr<EventLoop> pi1_event_loop =
1486 log_reader_factory.MakeEventLoop("test", pi1);
1487 pi1_event_loop->SkipTimingReport();
1488 std::unique_ptr<EventLoop> full_pi1_event_loop =
1489 log_reader_factory.MakeEventLoop("test", pi1);
1490 full_pi1_event_loop->SkipTimingReport();
1491 std::unique_ptr<EventLoop> pi2_event_loop =
1492 log_reader_factory.MakeEventLoop("test", pi2);
1493 pi2_event_loop->SkipTimingReport();
1494
1495 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1496 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1497 MessageCounter<examples::Ping> pi1_renamed_ping(pi1_event_loop.get(),
1498 "/pi1/production");
1499 MessageCounter<examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1500 "/pi1/production");
1501
1502 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1503 pi1_renamed_ping_timestamp;
1504 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1505 pi1_ping_timestamp;
1506 if (!shared()) {
1507 pi1_renamed_ping_timestamp =
1508 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1509 pi1_event_loop.get(),
1510 "/pi1/aos/remote_timestamps/pi2/pi1/production/aos-examples-Ping");
1511 pi1_ping_timestamp =
1512 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1513 pi1_event_loop.get(),
1514 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1515 }
1516
1517 log_reader_factory.Run();
1518
1519 EXPECT_EQ(pi1_ping.count(), 0u);
1520 EXPECT_EQ(pi2_ping.count(), 0u);
1521 EXPECT_NE(pi1_renamed_ping.count(), 0u);
1522 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1523 if (!shared()) {
1524 EXPECT_NE(pi1_renamed_ping_timestamp->count(), 0u);
1525 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1526 }
1527
1528 reader.Deregister();
1529}
1530
Naman Guptaa63aa132023-03-22 20:06:34 -07001531// Tests that we observe all the same events in log replay (for a given node)
1532// whether we just register an event loop for that node or if we register a full
1533// event loop factory.
1534TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
1535 time_converter_.StartEqual();
1536 constexpr chrono::milliseconds kStartupDelay(95);
1537 {
1538 LoggerState pi1_logger = MakeLogger(pi1_);
1539 LoggerState pi2_logger = MakeLogger(pi2_);
1540
1541 event_loop_factory_.RunFor(kStartupDelay);
1542
1543 StartLogger(&pi1_logger);
1544 StartLogger(&pi2_logger);
1545
1546 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1547 }
1548
1549 LogReader full_reader(SortParts(logfiles_));
1550 LogReader single_node_reader(SortParts(logfiles_));
1551
1552 SimulatedEventLoopFactory full_factory(full_reader.configuration());
1553 SimulatedEventLoopFactory single_node_factory(
1554 single_node_reader.configuration());
1555 single_node_factory.SkipTimingReport();
1556 single_node_factory.DisableStatistics();
1557 std::unique_ptr<EventLoop> replay_event_loop =
1558 single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
1559 "log_reader");
1560
1561 full_reader.Register(&full_factory);
1562 single_node_reader.Register(replay_event_loop.get());
1563
1564 const Node *full_pi1 =
1565 configuration::GetNode(full_factory.configuration(), "pi1");
1566
1567 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1568 // else should have moved.
1569 std::unique_ptr<EventLoop> full_event_loop =
1570 full_factory.MakeEventLoop("test", full_pi1);
1571 full_event_loop->SkipTimingReport();
1572 full_event_loop->SkipAosLog();
1573 // maps are indexed on channel index.
1574 // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
1575 std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
1576 observed_messages;
1577 std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
1578 for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
1579 ++ii) {
1580 const Channel *channel =
1581 full_event_loop->configuration()->channels()->Get(ii);
1582 // We currently don't support replaying remote timestamp channels in
1583 // realtime replay (unless the remote timestamp channel was not NOT_LOGGED,
1584 // in which case it gets auto-remapped and replayed on a /original channel).
1585 if (channel->name()->string_view().find("remote_timestamp") !=
1586 std::string_view::npos &&
1587 channel->name()->string_view().find("/original") ==
1588 std::string_view::npos) {
1589 continue;
1590 }
1591 if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
1592 observed_messages[ii] = {};
1593 fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
1594 full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
1595 if (fetchers[ii]->Fetch()) {
1596 observed_messages[ii].push_back(std::make_pair(
1597 fetchers[ii]->context().monotonic_event_time, true));
1598 }
1599 });
1600 full_event_loop->MakeRawNoArgWatcher(
1601 channel, [ii, &observed_messages](const Context &context) {
1602 observed_messages[ii].push_back(
1603 std::make_pair(context.monotonic_event_time, false));
1604 });
1605 }
1606 }
1607
1608 full_factory.Run();
1609 fetchers.clear();
1610 full_reader.Deregister();
1611
1612 const Node *single_node_pi1 =
1613 configuration::GetNode(single_node_factory.configuration(), "pi1");
1614 std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
1615
1616 std::unique_ptr<EventLoop> single_node_event_loop =
1617 single_node_factory.MakeEventLoop("test", single_node_pi1);
1618 single_node_event_loop->SkipTimingReport();
1619 single_node_event_loop->SkipAosLog();
1620 for (size_t ii = 0;
1621 ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
1622 const Channel *channel =
1623 single_node_event_loop->configuration()->channels()->Get(ii);
1624 single_node_factory.DisableForwarding(channel);
1625 if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
1626 single_node_fetchers[ii] =
1627 single_node_event_loop->MakeRawFetcher(channel);
1628 single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
1629 EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
1630 << "Single EventLoop replay doesn't support pre-loading fetchers. "
1631 << configuration::StrippedChannelToString(channel);
1632 });
1633 single_node_event_loop->MakeRawNoArgWatcher(
1634 channel, [ii, &observed_messages, channel,
1635 kStartupDelay](const Context &context) {
1636 if (observed_messages[ii].empty()) {
1637 FAIL() << "Observed extra message at "
1638 << context.monotonic_event_time << " on "
1639 << configuration::StrippedChannelToString(channel);
1640 return;
1641 }
1642 const std::pair<monotonic_clock::time_point, bool> &message =
1643 observed_messages[ii].front();
1644 if (message.second) {
1645 EXPECT_LE(message.first,
1646 context.monotonic_event_time + kStartupDelay)
1647 << "Mismatched message times " << context.monotonic_event_time
1648 << " and " << message.first << " on "
1649 << configuration::StrippedChannelToString(channel);
1650 } else {
1651 EXPECT_EQ(message.first,
1652 context.monotonic_event_time + kStartupDelay)
1653 << "Mismatched message times " << context.monotonic_event_time
1654 << " and " << message.first << " on "
1655 << configuration::StrippedChannelToString(channel);
1656 }
1657 observed_messages[ii].erase(observed_messages[ii].begin());
1658 });
1659 }
1660 }
1661
1662 single_node_factory.Run();
1663
1664 single_node_fetchers.clear();
1665
1666 single_node_reader.Deregister();
1667
1668 for (const auto &pair : observed_messages) {
1669 EXPECT_TRUE(pair.second.empty())
1670 << "Missed " << pair.second.size() << " messages on "
1671 << configuration::StrippedChannelToString(
1672 single_node_event_loop->configuration()->channels()->Get(
1673 pair.first));
1674 }
1675}
1676
1677// Tests that we properly recreate forwarded timestamps when replaying a log.
1678// This should be enough that we can then re-run the logger and get a valid log
1679// back.
1680TEST_P(MultinodeLoggerTest, MessageHeader) {
1681 time_converter_.StartEqual();
1682 {
1683 LoggerState pi1_logger = MakeLogger(pi1_);
1684 LoggerState pi2_logger = MakeLogger(pi2_);
1685
1686 event_loop_factory_.RunFor(chrono::milliseconds(95));
1687
1688 StartLogger(&pi1_logger);
1689 StartLogger(&pi2_logger);
1690
1691 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1692 }
1693
1694 LogReader reader(SortParts(logfiles_));
1695
1696 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1697 log_reader_factory.set_send_delay(chrono::microseconds(0));
1698
1699 // This sends out the fetched messages and advances time to the start of the
1700 // log file.
1701 reader.Register(&log_reader_factory);
1702
1703 const Node *pi1 =
1704 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1705 const Node *pi2 =
1706 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1707
1708 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1709 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1710 LOG(INFO) << "now pi1 "
1711 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1712 LOG(INFO) << "now pi2 "
1713 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1714
1715 EXPECT_THAT(reader.LoggedNodes(),
1716 ::testing::ElementsAre(
1717 configuration::GetNode(reader.logged_configuration(), pi1),
1718 configuration::GetNode(reader.logged_configuration(), pi2)));
1719
1720 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1721
1722 std::unique_ptr<EventLoop> pi1_event_loop =
1723 log_reader_factory.MakeEventLoop("test", pi1);
1724 std::unique_ptr<EventLoop> pi2_event_loop =
1725 log_reader_factory.MakeEventLoop("test", pi2);
1726
1727 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
1728 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1729 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
1730 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1731
1732 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1733 pi1_event_loop->MakeFetcher<examples::Ping>("/test");
1734 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1735 pi2_event_loop->MakeFetcher<examples::Ping>("/test");
1736
1737 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
1738 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1739 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
1740 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1741
1742 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1743 pi2_event_loop->MakeFetcher<examples::Pong>("/test");
1744 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1745 pi1_event_loop->MakeFetcher<examples::Pong>("/test");
1746
1747 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
1748 pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
1749 const size_t ping_timestamp_channel = configuration::ChannelIndex(
1750 pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
1751
1752 const size_t pi2_timestamp_channel = configuration::ChannelIndex(
1753 pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
1754 const size_t pong_timestamp_channel = configuration::ChannelIndex(
1755 pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
1756
1757 const chrono::nanoseconds network_delay = event_loop_factory_.network_delay();
1758 const chrono::nanoseconds send_delay = event_loop_factory_.send_delay();
1759
1760 for (std::pair<int, std::string> channel :
1761 shared()
1762 ? std::vector<
1763 std::pair<int, std::string>>{{-1,
1764 "/aos/remote_timestamps/pi2"}}
1765 : std::vector<std::pair<int, std::string>>{
1766 {pi1_timestamp_channel,
1767 "/aos/remote_timestamps/pi2/pi1/aos/"
1768 "aos-message_bridge-Timestamp"},
1769 {ping_timestamp_channel,
1770 "/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
1771 pi1_event_loop->MakeWatcher(
1772 channel.second,
1773 [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
1774 ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
1775 &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
1776 &ping_on_pi2_fetcher, network_delay, send_delay,
1777 channel_index = channel.first](const RemoteMessage &header) {
1778 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1779 chrono::nanoseconds(header.monotonic_sent_time()));
1780 const aos::realtime_clock::time_point header_realtime_sent_time(
1781 chrono::nanoseconds(header.realtime_sent_time()));
1782 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1783 chrono::nanoseconds(header.monotonic_remote_time()));
1784 const aos::realtime_clock::time_point header_realtime_remote_time(
1785 chrono::nanoseconds(header.realtime_remote_time()));
1786
1787 if (channel_index != -1) {
1788 ASSERT_EQ(channel_index, header.channel_index());
1789 }
1790
1791 const Context *pi1_context = nullptr;
1792 const Context *pi2_context = nullptr;
1793
1794 if (header.channel_index() == pi1_timestamp_channel) {
1795 ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
1796 ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
1797 pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
1798 pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
1799 } else if (header.channel_index() == ping_timestamp_channel) {
1800 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
1801 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
1802 pi1_context = &ping_on_pi1_fetcher.context();
1803 pi2_context = &ping_on_pi2_fetcher.context();
1804 } else {
1805 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1806 << configuration::CleanedChannelToString(
1807 pi1_event_loop->configuration()->channels()->Get(
1808 header.channel_index()));
1809 }
1810
1811 ASSERT_TRUE(header.has_boot_uuid());
1812 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1813 pi2_event_loop->boot_uuid());
1814
1815 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
1816 EXPECT_EQ(pi2_context->remote_queue_index,
1817 header.remote_queue_index());
1818 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
1819
1820 EXPECT_EQ(pi2_context->monotonic_event_time,
1821 header_monotonic_sent_time);
1822 EXPECT_EQ(pi2_context->realtime_event_time,
1823 header_realtime_sent_time);
1824 EXPECT_EQ(pi2_context->realtime_remote_time,
1825 header_realtime_remote_time);
1826 EXPECT_EQ(pi2_context->monotonic_remote_time,
1827 header_monotonic_remote_time);
1828
1829 EXPECT_EQ(pi1_context->realtime_event_time,
1830 header_realtime_remote_time);
1831 EXPECT_EQ(pi1_context->monotonic_event_time,
1832 header_monotonic_remote_time);
1833
1834 // Time estimation isn't perfect, but we know the clocks were
1835 // identical when logged, so we know when this should have come back.
1836 // Confirm we got it when we expected.
1837 EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
1838 pi1_context->monotonic_event_time + 2 * network_delay +
1839 send_delay);
1840 });
1841 }
1842 for (std::pair<int, std::string> channel :
1843 shared()
1844 ? std::vector<
1845 std::pair<int, std::string>>{{-1,
1846 "/aos/remote_timestamps/pi1"}}
1847 : std::vector<std::pair<int, std::string>>{
1848 {pi2_timestamp_channel,
1849 "/aos/remote_timestamps/pi1/pi2/aos/"
1850 "aos-message_bridge-Timestamp"}}) {
1851 pi2_event_loop->MakeWatcher(
1852 channel.second,
1853 [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
1854 pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
1855 &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
1856 &pong_on_pi1_fetcher, network_delay, send_delay,
1857 channel_index = channel.first](const RemoteMessage &header) {
1858 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1859 chrono::nanoseconds(header.monotonic_sent_time()));
1860 const aos::realtime_clock::time_point header_realtime_sent_time(
1861 chrono::nanoseconds(header.realtime_sent_time()));
1862 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1863 chrono::nanoseconds(header.monotonic_remote_time()));
1864 const aos::realtime_clock::time_point header_realtime_remote_time(
1865 chrono::nanoseconds(header.realtime_remote_time()));
1866
1867 if (channel_index != -1) {
1868 ASSERT_EQ(channel_index, header.channel_index());
1869 }
1870
1871 const Context *pi2_context = nullptr;
1872 const Context *pi1_context = nullptr;
1873
1874 if (header.channel_index() == pi2_timestamp_channel) {
1875 ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
1876 ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
1877 pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
1878 pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
1879 } else if (header.channel_index() == pong_timestamp_channel) {
1880 ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
1881 ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
1882 pi2_context = &pong_on_pi2_fetcher.context();
1883 pi1_context = &pong_on_pi1_fetcher.context();
1884 } else {
1885 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1886 << configuration::CleanedChannelToString(
1887 pi2_event_loop->configuration()->channels()->Get(
1888 header.channel_index()));
1889 }
1890
1891 ASSERT_TRUE(header.has_boot_uuid());
1892 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1893 pi1_event_loop->boot_uuid());
1894
1895 EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
1896 EXPECT_EQ(pi1_context->remote_queue_index,
1897 header.remote_queue_index());
1898 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
1899
1900 EXPECT_EQ(pi1_context->monotonic_event_time,
1901 header_monotonic_sent_time);
1902 EXPECT_EQ(pi1_context->realtime_event_time,
1903 header_realtime_sent_time);
1904 EXPECT_EQ(pi1_context->realtime_remote_time,
1905 header_realtime_remote_time);
1906 EXPECT_EQ(pi1_context->monotonic_remote_time,
1907 header_monotonic_remote_time);
1908
1909 EXPECT_EQ(pi2_context->realtime_event_time,
1910 header_realtime_remote_time);
1911 EXPECT_EQ(pi2_context->monotonic_event_time,
1912 header_monotonic_remote_time);
1913
1914 // Time estimation isn't perfect, but we know the clocks were
1915 // identical when logged, so we know when this should have come back.
1916 // Confirm we got it when we expected.
1917 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
1918 pi2_context->monotonic_event_time + 2 * network_delay +
1919 send_delay);
1920 });
1921 }
1922
1923 // And confirm we can re-create a log again, while checking the contents.
1924 {
1925 LoggerState pi1_logger = MakeLogger(
1926 log_reader_factory.GetNodeEventLoopFactory("pi1"), &log_reader_factory);
1927 LoggerState pi2_logger = MakeLogger(
1928 log_reader_factory.GetNodeEventLoopFactory("pi2"), &log_reader_factory);
1929
1930 StartLogger(&pi1_logger, tmp_dir_ + "/relogged1");
1931 StartLogger(&pi2_logger, tmp_dir_ + "/relogged2");
1932
1933 log_reader_factory.Run();
1934 }
1935
1936 reader.Deregister();
1937
1938 // And verify that we can run the LogReader over the relogged files without
1939 // hitting any fatal errors.
1940 {
1941 LogReader relogged_reader(SortParts(MakeLogFiles(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07001942 tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 1, 1, 2, 2, true)));
Naman Guptaa63aa132023-03-22 20:06:34 -07001943 relogged_reader.Register();
1944
1945 relogged_reader.event_loop_factory()->Run();
1946 }
1947 // And confirm that we can read the logged file using the reader's
1948 // configuration.
1949 {
1950 LogReader relogged_reader(
1951 SortParts(MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2",
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07001952 1, 1, 2, 2, true)),
Naman Guptaa63aa132023-03-22 20:06:34 -07001953 reader.configuration());
1954 relogged_reader.Register();
1955
1956 relogged_reader.event_loop_factory()->Run();
1957 }
1958}
1959
1960// Tests that we properly populate and extract the logger_start time by setting
1961// up a clock difference between 2 nodes and looking at the resulting parts.
1962TEST_P(MultinodeLoggerTest, LoggerStartTime) {
1963 std::vector<std::string> actual_filenames;
1964 time_converter_.AddMonotonic(
1965 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1966 {
1967 LoggerState pi1_logger = MakeLogger(pi1_);
1968 LoggerState pi2_logger = MakeLogger(pi2_);
1969
1970 StartLogger(&pi1_logger);
1971 StartLogger(&pi2_logger);
1972
1973 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1974
1975 pi1_logger.AppendAllFilenames(&actual_filenames);
1976 pi2_logger.AppendAllFilenames(&actual_filenames);
1977 }
1978
1979 ASSERT_THAT(actual_filenames,
1980 ::testing::UnorderedElementsAreArray(logfiles_));
1981
1982 for (const LogFile &log_file : SortParts(logfiles_)) {
1983 for (const LogParts &log_part : log_file.parts) {
1984 if (log_part.node == log_file.logger_node) {
1985 EXPECT_EQ(log_part.logger_monotonic_start_time,
1986 aos::monotonic_clock::min_time);
1987 EXPECT_EQ(log_part.logger_realtime_start_time,
1988 aos::realtime_clock::min_time);
1989 } else {
1990 const chrono::seconds offset = log_file.logger_node == "pi1"
1991 ? -chrono::seconds(1000)
1992 : chrono::seconds(1000);
1993 EXPECT_EQ(log_part.logger_monotonic_start_time,
1994 log_part.monotonic_start_time + offset);
1995 EXPECT_EQ(log_part.logger_realtime_start_time,
1996 log_file.realtime_start_time +
1997 (log_part.logger_monotonic_start_time -
1998 log_file.monotonic_start_time));
1999 }
2000 }
2001 }
2002}
2003
2004// Test that renaming the base, renames the folder.
2005TEST_P(MultinodeLoggerTest, LoggerRenameFolder) {
2006 util::UnlinkRecursive(tmp_dir_ + "/renamefolder");
2007 util::UnlinkRecursive(tmp_dir_ + "/new-good");
2008 time_converter_.AddMonotonic(
2009 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2010 logfile_base1_ = tmp_dir_ + "/renamefolder/multi_logfile1";
2011 logfile_base2_ = tmp_dir_ + "/renamefolder/multi_logfile2";
2012 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
2013 LoggerState pi1_logger = MakeLogger(pi1_);
2014 LoggerState pi2_logger = MakeLogger(pi2_);
2015
2016 StartLogger(&pi1_logger);
2017 StartLogger(&pi2_logger);
2018
2019 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2020 logfile_base1_ = tmp_dir_ + "/new-good/multi_logfile1";
2021 logfile_base2_ = tmp_dir_ + "/new-good/multi_logfile2";
2022 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
Alexei Strotscaf17d32023-04-03 22:31:11 -07002023
2024 // Sequence of set_base_name and Rotate simulates rename operation. Since
2025 // rename is not supported by all namers, RenameLogBase moved from logger to
2026 // the higher level abstraction, yet log_namers support rename, and it is
2027 // legal to test it here.
2028 pi1_logger.log_namer->set_base_name(logfile_base1_);
2029 pi1_logger.logger->Rotate();
2030 pi2_logger.log_namer->set_base_name(logfile_base2_);
2031 pi2_logger.logger->Rotate();
2032
Naman Guptaa63aa132023-03-22 20:06:34 -07002033 for (auto &file : logfiles_) {
2034 struct stat s;
2035 EXPECT_EQ(0, stat(file.c_str(), &s));
2036 }
2037}
2038
2039// Test that renaming the file base dies.
2040TEST_P(MultinodeLoggerDeathTest, LoggerRenameFile) {
2041 time_converter_.AddMonotonic(
2042 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2043 util::UnlinkRecursive(tmp_dir_ + "/renamefile");
2044 logfile_base1_ = tmp_dir_ + "/renamefile/multi_logfile1";
2045 logfile_base2_ = tmp_dir_ + "/renamefile/multi_logfile2";
2046 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
2047 LoggerState pi1_logger = MakeLogger(pi1_);
2048 StartLogger(&pi1_logger);
2049 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2050 logfile_base1_ = tmp_dir_ + "/new-renamefile/new_multi_logfile1";
Alexei Strotscaf17d32023-04-03 22:31:11 -07002051 EXPECT_DEATH({ pi1_logger.log_namer->set_base_name(logfile_base1_); },
Naman Guptaa63aa132023-03-22 20:06:34 -07002052 "Rename of file base from");
2053}
2054
2055// TODO(austin): We can write a test which recreates a logfile and confirms that
2056// we get it back. That is the ultimate test.
2057
2058// Tests that we properly recreate forwarded timestamps when replaying a log.
2059// This should be enough that we can then re-run the logger and get a valid log
2060// back.
2061TEST_P(MultinodeLoggerTest, RemoteReboot) {
2062 std::vector<std::string> actual_filenames;
2063
2064 const UUID pi1_boot0 = UUID::Random();
2065 const UUID pi2_boot0 = UUID::Random();
2066 const UUID pi2_boot1 = UUID::Random();
2067 {
2068 CHECK_EQ(pi1_index_, 0u);
2069 CHECK_EQ(pi2_index_, 1u);
2070
2071 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2072 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2073 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2074
2075 time_converter_.AddNextTimestamp(
2076 distributed_clock::epoch(),
2077 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2078 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2079 time_converter_.AddNextTimestamp(
2080 distributed_clock::epoch() + reboot_time,
2081 {BootTimestamp::epoch() + reboot_time,
2082 BootTimestamp{
2083 .boot = 1,
2084 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2085 }
2086
2087 {
2088 LoggerState pi1_logger = MakeLogger(pi1_);
2089
2090 event_loop_factory_.RunFor(chrono::milliseconds(95));
2091 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2092 pi1_boot0);
2093 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2094 pi2_boot0);
2095
2096 StartLogger(&pi1_logger);
2097
2098 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2099
2100 VLOG(1) << "Reboot now!";
2101
2102 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2103 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2104 pi1_boot0);
2105 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2106 pi2_boot1);
2107
2108 pi1_logger.AppendAllFilenames(&actual_filenames);
2109 }
2110
2111 std::sort(actual_filenames.begin(), actual_filenames.end());
2112 std::sort(pi1_reboot_logfiles_.begin(), pi1_reboot_logfiles_.end());
2113 ASSERT_THAT(actual_filenames,
2114 ::testing::UnorderedElementsAreArray(pi1_reboot_logfiles_));
2115
2116 // Confirm that our new oldest timestamps properly update as we reboot and
2117 // rotate.
2118 for (const std::string &file : pi1_reboot_logfiles_) {
2119 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2120 ReadHeader(file);
2121 CHECK(log_header);
2122 if (log_header->message().has_configuration()) {
2123 continue;
2124 }
2125
2126 const monotonic_clock::time_point monotonic_start_time =
2127 monotonic_clock::time_point(
2128 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2129 const UUID source_node_boot_uuid = UUID::FromString(
2130 log_header->message().source_node_boot_uuid()->string_view());
2131
2132 if (log_header->message().node()->name()->string_view() != "pi1") {
2133 // The remote message channel should rotate later and have more parts.
2134 // This only is true on the log files with shared remote messages.
2135 //
2136 // TODO(austin): I'm not the most thrilled with this test pattern... It
2137 // feels brittle in a different way.
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002138 if (file.find("timestamps/remote_pi2") == std::string::npos) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002139 switch (log_header->message().parts_index()) {
2140 case 0:
2141 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2142 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2143 break;
2144 case 1:
2145 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2146 ASSERT_EQ(monotonic_start_time,
2147 monotonic_clock::epoch() + chrono::seconds(1));
2148 break;
2149 case 2:
2150 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2151 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2152 break;
2153 case 3:
2154 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2155 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
2156 chrono::nanoseconds(2322999462))
2157 << " on " << file;
2158 break;
2159 default:
2160 FAIL();
2161 break;
2162 }
2163 } else {
2164 switch (log_header->message().parts_index()) {
2165 case 0:
2166 case 1:
2167 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2168 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2169 break;
2170 case 2:
2171 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2172 ASSERT_EQ(monotonic_start_time,
2173 monotonic_clock::epoch() + chrono::seconds(1));
2174 break;
2175 case 3:
2176 case 4:
2177 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2178 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2179 break;
2180 case 5:
2181 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2182 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
2183 chrono::nanoseconds(2322999462))
2184 << " on " << file;
2185 break;
2186 default:
2187 FAIL();
2188 break;
2189 }
2190 }
2191 continue;
2192 }
2193 SCOPED_TRACE(file);
2194 SCOPED_TRACE(aos::FlatbufferToJson(
2195 *log_header, {.multi_line = true, .max_vector_size = 100}));
2196 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2197 ASSERT_EQ(
2198 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2199 EXPECT_EQ(
2200 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2201 monotonic_clock::max_time.time_since_epoch().count());
2202 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2203 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2204 2u);
2205 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2206 monotonic_clock::max_time.time_since_epoch().count());
2207 ASSERT_TRUE(log_header->message()
2208 .has_oldest_remote_unreliable_monotonic_timestamps());
2209 ASSERT_EQ(log_header->message()
2210 .oldest_remote_unreliable_monotonic_timestamps()
2211 ->size(),
2212 2u);
2213 EXPECT_EQ(log_header->message()
2214 .oldest_remote_unreliable_monotonic_timestamps()
2215 ->Get(0),
2216 monotonic_clock::max_time.time_since_epoch().count());
2217 ASSERT_TRUE(log_header->message()
2218 .has_oldest_local_unreliable_monotonic_timestamps());
2219 ASSERT_EQ(log_header->message()
2220 .oldest_local_unreliable_monotonic_timestamps()
2221 ->size(),
2222 2u);
2223 EXPECT_EQ(log_header->message()
2224 .oldest_local_unreliable_monotonic_timestamps()
2225 ->Get(0),
2226 monotonic_clock::max_time.time_since_epoch().count());
2227
2228 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2229 monotonic_clock::time_point(chrono::nanoseconds(
2230 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2231 1)));
2232 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2233 monotonic_clock::time_point(chrono::nanoseconds(
2234 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2235 const monotonic_clock::time_point
2236 oldest_remote_unreliable_monotonic_timestamps =
2237 monotonic_clock::time_point(chrono::nanoseconds(
2238 log_header->message()
2239 .oldest_remote_unreliable_monotonic_timestamps()
2240 ->Get(1)));
2241 const monotonic_clock::time_point
2242 oldest_local_unreliable_monotonic_timestamps =
2243 monotonic_clock::time_point(chrono::nanoseconds(
2244 log_header->message()
2245 .oldest_local_unreliable_monotonic_timestamps()
2246 ->Get(1)));
2247 const monotonic_clock::time_point
2248 oldest_remote_reliable_monotonic_timestamps =
2249 monotonic_clock::time_point(chrono::nanoseconds(
2250 log_header->message()
2251 .oldest_remote_reliable_monotonic_timestamps()
2252 ->Get(1)));
2253 const monotonic_clock::time_point
2254 oldest_local_reliable_monotonic_timestamps =
2255 monotonic_clock::time_point(chrono::nanoseconds(
2256 log_header->message()
2257 .oldest_local_reliable_monotonic_timestamps()
2258 ->Get(1)));
2259 const monotonic_clock::time_point
2260 oldest_logger_remote_unreliable_monotonic_timestamps =
2261 monotonic_clock::time_point(chrono::nanoseconds(
2262 log_header->message()
2263 .oldest_logger_remote_unreliable_monotonic_timestamps()
2264 ->Get(0)));
2265 const monotonic_clock::time_point
2266 oldest_logger_local_unreliable_monotonic_timestamps =
2267 monotonic_clock::time_point(chrono::nanoseconds(
2268 log_header->message()
2269 .oldest_logger_local_unreliable_monotonic_timestamps()
2270 ->Get(0)));
2271 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2272 monotonic_clock::max_time);
2273 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2274 monotonic_clock::max_time);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002275 if (log_header->message().data_stored()->Get(0) == StoredDataType::DATA) {
2276 switch (log_header->message().parts_index()) {
2277 case 0:
2278 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2279 monotonic_clock::max_time);
2280 EXPECT_EQ(oldest_local_monotonic_timestamps,
2281 monotonic_clock::max_time);
2282 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2283 monotonic_clock::max_time);
2284 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2285 monotonic_clock::max_time);
2286 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2287 monotonic_clock::max_time);
2288 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2289 monotonic_clock::max_time);
2290 break;
2291 default:
2292 FAIL();
2293 break;
2294 }
2295 } else if (log_header->message().data_stored()->Get(0) ==
2296 StoredDataType::TIMESTAMPS) {
2297 switch (log_header->message().parts_index()) {
2298 case 0:
2299 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2300 monotonic_clock::time_point(chrono::microseconds(90200)));
2301 EXPECT_EQ(oldest_local_monotonic_timestamps,
2302 monotonic_clock::time_point(chrono::microseconds(90350)));
2303 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2304 monotonic_clock::time_point(chrono::microseconds(90200)));
2305 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2306 monotonic_clock::time_point(chrono::microseconds(90350)));
2307 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2308 monotonic_clock::max_time);
2309 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2310 monotonic_clock::max_time);
2311 break;
2312 case 1:
2313 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2314 monotonic_clock::time_point(chrono::microseconds(90200)))
2315 << file;
2316 EXPECT_EQ(oldest_local_monotonic_timestamps,
2317 monotonic_clock::time_point(chrono::microseconds(90350)))
2318 << file;
2319 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2320 monotonic_clock::time_point(chrono::microseconds(90200)))
2321 << file;
2322 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2323 monotonic_clock::time_point(chrono::microseconds(90350)))
2324 << file;
2325 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2326 monotonic_clock::time_point(chrono::microseconds(100000)))
2327 << file;
2328 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2329 monotonic_clock::time_point(chrono::microseconds(100150)))
2330 << file;
2331 break;
2332 case 2:
2333 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2334 monotonic_clock::time_point(chrono::milliseconds(1323) +
2335 chrono::microseconds(200)));
2336 EXPECT_EQ(
2337 oldest_local_monotonic_timestamps,
2338 monotonic_clock::time_point(chrono::microseconds(10100350)));
2339 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2340 monotonic_clock::time_point(chrono::milliseconds(1323) +
2341 chrono::microseconds(200)));
2342 EXPECT_EQ(
2343 oldest_local_unreliable_monotonic_timestamps,
2344 monotonic_clock::time_point(chrono::microseconds(10100350)));
2345 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2346 monotonic_clock::max_time)
2347 << file;
2348 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2349 monotonic_clock::max_time)
2350 << file;
2351 break;
2352 case 3:
2353 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2354 monotonic_clock::time_point(chrono::milliseconds(1323) +
2355 chrono::microseconds(200)));
2356 EXPECT_EQ(
2357 oldest_local_monotonic_timestamps,
2358 monotonic_clock::time_point(chrono::microseconds(10100350)));
2359 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2360 monotonic_clock::time_point(chrono::milliseconds(1323) +
2361 chrono::microseconds(200)));
2362 EXPECT_EQ(
2363 oldest_local_unreliable_monotonic_timestamps,
2364 monotonic_clock::time_point(chrono::microseconds(10100350)));
2365 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2366 monotonic_clock::time_point(chrono::microseconds(1423000)))
2367 << file;
2368 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2369 monotonic_clock::time_point(chrono::microseconds(10200150)))
2370 << file;
2371 break;
2372 default:
2373 FAIL();
2374 break;
2375 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002376 }
2377 }
2378
2379 // Confirm that we refuse to replay logs with missing boot uuids.
2380 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002381 auto sorted_parts = SortParts(pi1_reboot_logfiles_);
2382 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2383 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002384
2385 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2386 log_reader_factory.set_send_delay(chrono::microseconds(0));
2387
2388 // This sends out the fetched messages and advances time to the start of
2389 // the log file.
2390 reader.Register(&log_reader_factory);
2391
2392 log_reader_factory.Run();
2393
2394 reader.Deregister();
2395 }
2396}
2397
2398// Tests that we can sort a log which only has timestamps from the remote
2399// because the local message_bridge_client failed to connect.
2400TEST_P(MultinodeLoggerTest, RemoteRebootOnlyTimestamps) {
2401 const UUID pi1_boot0 = UUID::Random();
2402 const UUID pi2_boot0 = UUID::Random();
2403 const UUID pi2_boot1 = UUID::Random();
2404 {
2405 CHECK_EQ(pi1_index_, 0u);
2406 CHECK_EQ(pi2_index_, 1u);
2407
2408 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2409 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2410 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2411
2412 time_converter_.AddNextTimestamp(
2413 distributed_clock::epoch(),
2414 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2415 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2416 time_converter_.AddNextTimestamp(
2417 distributed_clock::epoch() + reboot_time,
2418 {BootTimestamp::epoch() + reboot_time,
2419 BootTimestamp{
2420 .boot = 1,
2421 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2422 }
2423 pi2_->Disconnect(pi1_->node());
2424
2425 std::vector<std::string> filenames;
2426 {
2427 LoggerState pi1_logger = MakeLogger(pi1_);
2428
2429 event_loop_factory_.RunFor(chrono::milliseconds(95));
2430 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2431 pi1_boot0);
2432 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2433 pi2_boot0);
2434
2435 StartLogger(&pi1_logger);
2436
2437 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2438
2439 VLOG(1) << "Reboot now!";
2440
2441 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2442 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2443 pi1_boot0);
2444 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2445 pi2_boot1);
2446 pi1_logger.AppendAllFilenames(&filenames);
2447 }
2448
2449 std::sort(filenames.begin(), filenames.end());
2450
2451 // Confirm that our new oldest timestamps properly update as we reboot and
2452 // rotate.
2453 size_t timestamp_file_count = 0;
2454 for (const std::string &file : filenames) {
2455 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2456 ReadHeader(file);
2457 CHECK(log_header);
2458
2459 if (log_header->message().has_configuration()) {
2460 continue;
2461 }
2462
2463 const monotonic_clock::time_point monotonic_start_time =
2464 monotonic_clock::time_point(
2465 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2466 const UUID source_node_boot_uuid = UUID::FromString(
2467 log_header->message().source_node_boot_uuid()->string_view());
2468
2469 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2470 ASSERT_EQ(
2471 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2472 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2473 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2474 2u);
2475 ASSERT_TRUE(log_header->message()
2476 .has_oldest_remote_unreliable_monotonic_timestamps());
2477 ASSERT_EQ(log_header->message()
2478 .oldest_remote_unreliable_monotonic_timestamps()
2479 ->size(),
2480 2u);
2481 ASSERT_TRUE(log_header->message()
2482 .has_oldest_local_unreliable_monotonic_timestamps());
2483 ASSERT_EQ(log_header->message()
2484 .oldest_local_unreliable_monotonic_timestamps()
2485 ->size(),
2486 2u);
2487 ASSERT_TRUE(log_header->message()
2488 .has_oldest_remote_reliable_monotonic_timestamps());
2489 ASSERT_EQ(log_header->message()
2490 .oldest_remote_reliable_monotonic_timestamps()
2491 ->size(),
2492 2u);
2493 ASSERT_TRUE(
2494 log_header->message().has_oldest_local_reliable_monotonic_timestamps());
2495 ASSERT_EQ(log_header->message()
2496 .oldest_local_reliable_monotonic_timestamps()
2497 ->size(),
2498 2u);
2499
2500 ASSERT_TRUE(
2501 log_header->message()
2502 .has_oldest_logger_remote_unreliable_monotonic_timestamps());
2503 ASSERT_EQ(log_header->message()
2504 .oldest_logger_remote_unreliable_monotonic_timestamps()
2505 ->size(),
2506 2u);
2507 ASSERT_TRUE(log_header->message()
2508 .has_oldest_logger_local_unreliable_monotonic_timestamps());
2509 ASSERT_EQ(log_header->message()
2510 .oldest_logger_local_unreliable_monotonic_timestamps()
2511 ->size(),
2512 2u);
2513
2514 if (log_header->message().node()->name()->string_view() != "pi1") {
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002515 ASSERT_TRUE(file.find("timestamps/remote_pi2") != std::string::npos);
Naman Guptaa63aa132023-03-22 20:06:34 -07002516
2517 const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
2518 ReadNthMessage(file, 0);
2519 CHECK(msg);
2520
2521 EXPECT_TRUE(msg->message().has_monotonic_sent_time());
2522 EXPECT_TRUE(msg->message().has_monotonic_remote_time());
2523
2524 const monotonic_clock::time_point
2525 expected_oldest_local_monotonic_timestamps(
2526 chrono::nanoseconds(msg->message().monotonic_sent_time()));
2527 const monotonic_clock::time_point
2528 expected_oldest_remote_monotonic_timestamps(
2529 chrono::nanoseconds(msg->message().monotonic_remote_time()));
2530 const monotonic_clock::time_point
2531 expected_oldest_timestamp_monotonic_timestamps(
2532 chrono::nanoseconds(msg->message().monotonic_timestamp_time()));
2533
2534 EXPECT_NE(expected_oldest_local_monotonic_timestamps,
2535 monotonic_clock::min_time);
2536 EXPECT_NE(expected_oldest_remote_monotonic_timestamps,
2537 monotonic_clock::min_time);
2538 EXPECT_NE(expected_oldest_timestamp_monotonic_timestamps,
2539 monotonic_clock::min_time);
2540
2541 ++timestamp_file_count;
2542 // Since the log file is from the perspective of the other node,
2543 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2544 monotonic_clock::time_point(chrono::nanoseconds(
2545 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2546 0)));
2547 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2548 monotonic_clock::time_point(chrono::nanoseconds(
2549 log_header->message().oldest_local_monotonic_timestamps()->Get(
2550 0)));
2551 const monotonic_clock::time_point
2552 oldest_remote_unreliable_monotonic_timestamps =
2553 monotonic_clock::time_point(chrono::nanoseconds(
2554 log_header->message()
2555 .oldest_remote_unreliable_monotonic_timestamps()
2556 ->Get(0)));
2557 const monotonic_clock::time_point
2558 oldest_local_unreliable_monotonic_timestamps =
2559 monotonic_clock::time_point(chrono::nanoseconds(
2560 log_header->message()
2561 .oldest_local_unreliable_monotonic_timestamps()
2562 ->Get(0)));
2563 const monotonic_clock::time_point
2564 oldest_remote_reliable_monotonic_timestamps =
2565 monotonic_clock::time_point(chrono::nanoseconds(
2566 log_header->message()
2567 .oldest_remote_reliable_monotonic_timestamps()
2568 ->Get(0)));
2569 const monotonic_clock::time_point
2570 oldest_local_reliable_monotonic_timestamps =
2571 monotonic_clock::time_point(chrono::nanoseconds(
2572 log_header->message()
2573 .oldest_local_reliable_monotonic_timestamps()
2574 ->Get(0)));
2575 const monotonic_clock::time_point
2576 oldest_logger_remote_unreliable_monotonic_timestamps =
2577 monotonic_clock::time_point(chrono::nanoseconds(
2578 log_header->message()
2579 .oldest_logger_remote_unreliable_monotonic_timestamps()
2580 ->Get(1)));
2581 const monotonic_clock::time_point
2582 oldest_logger_local_unreliable_monotonic_timestamps =
2583 monotonic_clock::time_point(chrono::nanoseconds(
2584 log_header->message()
2585 .oldest_logger_local_unreliable_monotonic_timestamps()
2586 ->Get(1)));
2587
2588 const Channel *channel =
2589 event_loop_factory_.configuration()->channels()->Get(
2590 msg->message().channel_index());
2591 const Connection *connection = configuration::ConnectionToNode(
2592 channel, configuration::GetNode(
2593 event_loop_factory_.configuration(),
2594 log_header->message().node()->name()->string_view()));
2595
2596 const bool reliable = connection->time_to_live() == 0;
2597
2598 SCOPED_TRACE(file);
2599 SCOPED_TRACE(aos::FlatbufferToJson(
2600 *log_header, {.multi_line = true, .max_vector_size = 100}));
2601
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002602 // Confirm that the oldest timestamps match what we expect. Based on
2603 // what we are doing, we know that the oldest time is the first
2604 // message's time.
2605 //
2606 // This makes the test robust to both the split and combined config
2607 // tests.
2608 switch (log_header->message().parts_index()) {
2609 case 0:
2610 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2611 expected_oldest_remote_monotonic_timestamps);
2612 EXPECT_EQ(oldest_local_monotonic_timestamps,
2613 expected_oldest_local_monotonic_timestamps);
2614 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2615 expected_oldest_local_monotonic_timestamps)
2616 << file;
2617 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2618 expected_oldest_timestamp_monotonic_timestamps)
2619 << file;
2620
2621 if (reliable) {
2622 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
Naman Guptaa63aa132023-03-22 20:06:34 -07002623 expected_oldest_remote_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002624 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
Naman Guptaa63aa132023-03-22 20:06:34 -07002625 expected_oldest_local_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002626 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2627 monotonic_clock::max_time);
2628 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2629 monotonic_clock::max_time);
2630 } else {
2631 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2632 monotonic_clock::max_time);
2633 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2634 monotonic_clock::max_time);
Naman Guptaa63aa132023-03-22 20:06:34 -07002635 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2636 expected_oldest_remote_monotonic_timestamps);
2637 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2638 expected_oldest_local_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002639 }
2640 break;
2641 case 1:
2642 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2643 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2644 EXPECT_EQ(oldest_local_monotonic_timestamps,
2645 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2646 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2647 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2648 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2649 monotonic_clock::epoch() + chrono::nanoseconds(90250000));
2650 if (reliable) {
2651 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2652 expected_oldest_remote_monotonic_timestamps);
2653 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2654 expected_oldest_local_monotonic_timestamps);
2655 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2656 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2657 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2658 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2659 } else {
2660 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2661 monotonic_clock::max_time);
2662 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2663 monotonic_clock::max_time);
2664 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2665 expected_oldest_remote_monotonic_timestamps);
2666 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2667 expected_oldest_local_monotonic_timestamps);
2668 }
2669 break;
2670 case 2:
2671 EXPECT_EQ(
2672 oldest_remote_monotonic_timestamps,
2673 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2674 EXPECT_EQ(oldest_local_monotonic_timestamps,
2675 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2676 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2677 expected_oldest_local_monotonic_timestamps)
2678 << file;
2679 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2680 expected_oldest_timestamp_monotonic_timestamps)
2681 << file;
2682 if (reliable) {
2683 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2684 expected_oldest_remote_monotonic_timestamps);
2685 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2686 expected_oldest_local_monotonic_timestamps);
2687 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2688 monotonic_clock::max_time);
2689 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2690 monotonic_clock::max_time);
2691 } else {
2692 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2693 monotonic_clock::max_time);
2694 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2695 monotonic_clock::max_time);
2696 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2697 expected_oldest_remote_monotonic_timestamps);
2698 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2699 expected_oldest_local_monotonic_timestamps);
2700 }
2701 break;
Naman Guptaa63aa132023-03-22 20:06:34 -07002702
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002703 case 3:
2704 EXPECT_EQ(
2705 oldest_remote_monotonic_timestamps,
2706 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2707 EXPECT_EQ(oldest_local_monotonic_timestamps,
2708 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2709 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2710 expected_oldest_remote_monotonic_timestamps);
2711 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2712 expected_oldest_local_monotonic_timestamps);
2713 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2714 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2715 EXPECT_EQ(
2716 oldest_logger_local_unreliable_monotonic_timestamps,
2717 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2718 break;
2719 default:
2720 FAIL();
2721 break;
Naman Guptaa63aa132023-03-22 20:06:34 -07002722 }
2723
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002724 switch (log_header->message().parts_index()) {
2725 case 0:
2726 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2727 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2728 break;
2729 case 1:
2730 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2731 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2732 break;
2733 case 2:
2734 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2735 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2736 break;
2737 case 3:
2738 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2739 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2740 break;
2741 [[fallthrough]];
2742 default:
2743 FAIL();
2744 break;
2745 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002746 continue;
2747 }
2748 EXPECT_EQ(
2749 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2750 monotonic_clock::max_time.time_since_epoch().count());
2751 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2752 monotonic_clock::max_time.time_since_epoch().count());
2753 EXPECT_EQ(log_header->message()
2754 .oldest_remote_unreliable_monotonic_timestamps()
2755 ->Get(0),
2756 monotonic_clock::max_time.time_since_epoch().count());
2757 EXPECT_EQ(log_header->message()
2758 .oldest_local_unreliable_monotonic_timestamps()
2759 ->Get(0),
2760 monotonic_clock::max_time.time_since_epoch().count());
2761
2762 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2763 monotonic_clock::time_point(chrono::nanoseconds(
2764 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2765 1)));
2766 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2767 monotonic_clock::time_point(chrono::nanoseconds(
2768 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2769 const monotonic_clock::time_point
2770 oldest_remote_unreliable_monotonic_timestamps =
2771 monotonic_clock::time_point(chrono::nanoseconds(
2772 log_header->message()
2773 .oldest_remote_unreliable_monotonic_timestamps()
2774 ->Get(1)));
2775 const monotonic_clock::time_point
2776 oldest_local_unreliable_monotonic_timestamps =
2777 monotonic_clock::time_point(chrono::nanoseconds(
2778 log_header->message()
2779 .oldest_local_unreliable_monotonic_timestamps()
2780 ->Get(1)));
2781 switch (log_header->message().parts_index()) {
2782 case 0:
2783 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2784 monotonic_clock::max_time);
2785 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2786 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2787 monotonic_clock::max_time);
2788 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2789 monotonic_clock::max_time);
2790 break;
2791 default:
2792 FAIL();
2793 break;
2794 }
2795 }
2796
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002797 EXPECT_EQ(timestamp_file_count, 4u);
Naman Guptaa63aa132023-03-22 20:06:34 -07002798
2799 // Confirm that we can actually sort the resulting log and read it.
2800 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002801 auto sorted_parts = SortParts(filenames);
2802 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2803 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002804
2805 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2806 log_reader_factory.set_send_delay(chrono::microseconds(0));
2807
2808 // This sends out the fetched messages and advances time to the start of
2809 // the log file.
2810 reader.Register(&log_reader_factory);
2811
2812 log_reader_factory.Run();
2813
2814 reader.Deregister();
2815 }
2816}
2817
2818// Tests that we properly handle one direction of message_bridge being
2819// unavailable.
2820TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
2821 pi1_->Disconnect(pi2_->node());
2822 time_converter_.AddMonotonic(
2823 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2824
2825 time_converter_.AddMonotonic(
2826 {chrono::milliseconds(10000),
2827 chrono::milliseconds(10000) - chrono::milliseconds(1)});
2828 {
2829 LoggerState pi1_logger = MakeLogger(pi1_);
2830
2831 event_loop_factory_.RunFor(chrono::milliseconds(95));
2832
2833 StartLogger(&pi1_logger);
2834
2835 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2836 }
2837
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002838 // Confirm that we can parse the result. LogReader has enough internal
2839 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07002840 ConfirmReadable(pi1_single_direction_logfiles_);
2841}
2842
2843// Tests that we properly handle one direction of message_bridge being
2844// unavailable.
2845TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
2846 pi1_->Disconnect(pi2_->node());
2847 time_converter_.AddMonotonic(
2848 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(500)});
2849
2850 time_converter_.AddMonotonic(
2851 {chrono::milliseconds(10000),
2852 chrono::milliseconds(10000) + chrono::milliseconds(1)});
2853 {
2854 LoggerState pi1_logger = MakeLogger(pi1_);
2855
2856 event_loop_factory_.RunFor(chrono::milliseconds(95));
2857
2858 StartLogger(&pi1_logger);
2859
2860 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2861 }
2862
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002863 // Confirm that we can parse the result. LogReader has enough internal
2864 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07002865 ConfirmReadable(pi1_single_direction_logfiles_);
2866}
2867
2868// Tests that we explode if someone passes in a part file twice with a better
2869// error than an out of order error.
2870TEST_P(MultinodeLoggerTest, DuplicateLogFiles) {
2871 time_converter_.AddMonotonic(
2872 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2873 {
2874 LoggerState pi1_logger = MakeLogger(pi1_);
2875
2876 event_loop_factory_.RunFor(chrono::milliseconds(95));
2877
2878 StartLogger(&pi1_logger);
2879
2880 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2881 }
2882
2883 std::vector<std::string> duplicates;
2884 for (const std::string &f : pi1_single_direction_logfiles_) {
2885 duplicates.emplace_back(f);
2886 duplicates.emplace_back(f);
2887 }
2888 EXPECT_DEATH({ SortParts(duplicates); }, "Found duplicate parts in");
2889}
2890
2891// Tests that we explode if someone loses a part out of the middle of a log.
2892TEST_P(MultinodeLoggerTest, MissingPartsFromMiddle) {
2893 time_converter_.AddMonotonic(
2894 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2895 {
2896 LoggerState pi1_logger = MakeLogger(pi1_);
2897
2898 event_loop_factory_.RunFor(chrono::milliseconds(95));
2899
2900 StartLogger(&pi1_logger);
2901 aos::monotonic_clock::time_point last_rotation_time =
2902 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07002903 pi1_logger.logger->set_on_logged_period(
2904 [&](aos::monotonic_clock::time_point) {
2905 const auto now = pi1_logger.event_loop->monotonic_now();
2906 if (now > last_rotation_time + std::chrono::seconds(5)) {
2907 pi1_logger.logger->Rotate();
2908 last_rotation_time = now;
2909 }
2910 });
Naman Guptaa63aa132023-03-22 20:06:34 -07002911
2912 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2913 }
2914
2915 std::vector<std::string> missing_parts;
2916
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002917 missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part0" +
2918 Extension());
2919 missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part2" +
2920 Extension());
Naman Guptaa63aa132023-03-22 20:06:34 -07002921 missing_parts.emplace_back(absl::StrCat(
2922 logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
2923
2924 EXPECT_DEATH({ SortParts(missing_parts); },
2925 "Broken log, missing part files between");
2926}
2927
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002928// Tests that we properly handle a dead node. Do this by just disconnecting
2929// it and only using one nodes of logs.
Naman Guptaa63aa132023-03-22 20:06:34 -07002930TEST_P(MultinodeLoggerTest, DeadNode) {
2931 pi1_->Disconnect(pi2_->node());
2932 pi2_->Disconnect(pi1_->node());
2933 time_converter_.AddMonotonic(
2934 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2935 {
2936 LoggerState pi1_logger = MakeLogger(pi1_);
2937
2938 event_loop_factory_.RunFor(chrono::milliseconds(95));
2939
2940 StartLogger(&pi1_logger);
2941
2942 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2943 }
2944
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002945 // Confirm that we can parse the result. LogReader has enough internal
2946 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07002947 ConfirmReadable(MakePi1DeadNodeLogfiles());
2948}
2949
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002950// Tests that we can relog with a different config. This makes most sense
2951// when you are trying to edit a log and want to use channel renaming + the
2952// original config in the new log.
Naman Guptaa63aa132023-03-22 20:06:34 -07002953TEST_P(MultinodeLoggerTest, LogDifferentConfig) {
2954 time_converter_.StartEqual();
2955 {
2956 LoggerState pi1_logger = MakeLogger(pi1_);
2957 LoggerState pi2_logger = MakeLogger(pi2_);
2958
2959 event_loop_factory_.RunFor(chrono::milliseconds(95));
2960
2961 StartLogger(&pi1_logger);
2962 StartLogger(&pi2_logger);
2963
2964 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2965 }
2966
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002967 auto sorted_parts = SortParts(logfiles_);
2968 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2969 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002970 reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
2971
2972 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2973 log_reader_factory.set_send_delay(chrono::microseconds(0));
2974
2975 // This sends out the fetched messages and advances time to the start of the
2976 // log file.
2977 reader.Register(&log_reader_factory);
2978
2979 const Node *pi1 =
2980 configuration::GetNode(log_reader_factory.configuration(), "pi1");
2981 const Node *pi2 =
2982 configuration::GetNode(log_reader_factory.configuration(), "pi2");
2983
2984 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
2985 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
2986 LOG(INFO) << "now pi1 "
2987 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
2988 LOG(INFO) << "now pi2 "
2989 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
2990
2991 EXPECT_THAT(reader.LoggedNodes(),
2992 ::testing::ElementsAre(
2993 configuration::GetNode(reader.logged_configuration(), pi1),
2994 configuration::GetNode(reader.logged_configuration(), pi2)));
2995
2996 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
2997
2998 // And confirm we can re-create a log again, while checking the contents.
2999 std::vector<std::string> log_files;
3000 {
3001 LoggerState pi1_logger =
3002 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
3003 &log_reader_factory, reader.logged_configuration());
3004 LoggerState pi2_logger =
3005 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
3006 &log_reader_factory, reader.logged_configuration());
3007
3008 pi1_logger.StartLogger(tmp_dir_ + "/relogged1");
3009 pi2_logger.StartLogger(tmp_dir_ + "/relogged2");
3010
3011 log_reader_factory.Run();
3012
3013 for (auto &x : pi1_logger.log_namer->all_filenames()) {
3014 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged1_", x));
3015 }
3016 for (auto &x : pi2_logger.log_namer->all_filenames()) {
3017 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged2_", x));
3018 }
3019 }
3020
3021 reader.Deregister();
3022
3023 // And verify that we can run the LogReader over the relogged files without
3024 // hitting any fatal errors.
3025 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003026 auto sorted_parts = SortParts(log_files);
3027 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3028 LogReader relogged_reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07003029 relogged_reader.Register();
3030
3031 relogged_reader.event_loop_factory()->Run();
3032 }
3033}
3034
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003035// Tests that we properly replay a log where the start time for a node is
3036// before any data on the node. This can happen if the logger starts before
3037// data is published. While the scenario below is a bit convoluted, we have
3038// seen logs like this generated out in the wild.
Naman Guptaa63aa132023-03-22 20:06:34 -07003039TEST(MultinodeRebootLoggerTest, StartTimeBeforeData) {
3040 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3041 aos::configuration::ReadConfig(ArtifactPath(
3042 "aos/events/logging/multinode_pingpong_split3_config.json"));
3043 message_bridge::TestingTimeConverter time_converter(
3044 configuration::NodesCount(&config.message()));
3045 SimulatedEventLoopFactory event_loop_factory(&config.message());
3046 event_loop_factory.SetTimeConverter(&time_converter);
3047 NodeEventLoopFactory *const pi1 =
3048 event_loop_factory.GetNodeEventLoopFactory("pi1");
3049 const size_t pi1_index = configuration::GetNodeIndex(
3050 event_loop_factory.configuration(), pi1->node());
3051 NodeEventLoopFactory *const pi2 =
3052 event_loop_factory.GetNodeEventLoopFactory("pi2");
3053 const size_t pi2_index = configuration::GetNodeIndex(
3054 event_loop_factory.configuration(), pi2->node());
3055 NodeEventLoopFactory *const pi3 =
3056 event_loop_factory.GetNodeEventLoopFactory("pi3");
3057 const size_t pi3_index = configuration::GetNodeIndex(
3058 event_loop_factory.configuration(), pi3->node());
3059
3060 const std::string kLogfile1_1 =
3061 aos::testing::TestTmpDir() + "/multi_logfile1/";
3062 const std::string kLogfile2_1 =
3063 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3064 const std::string kLogfile2_2 =
3065 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3066 const std::string kLogfile3_1 =
3067 aos::testing::TestTmpDir() + "/multi_logfile3/";
3068 util::UnlinkRecursive(kLogfile1_1);
3069 util::UnlinkRecursive(kLogfile2_1);
3070 util::UnlinkRecursive(kLogfile2_2);
3071 util::UnlinkRecursive(kLogfile3_1);
3072 const UUID pi1_boot0 = UUID::Random();
3073 const UUID pi2_boot0 = UUID::Random();
3074 const UUID pi2_boot1 = UUID::Random();
3075 const UUID pi3_boot0 = UUID::Random();
3076 {
3077 CHECK_EQ(pi1_index, 0u);
3078 CHECK_EQ(pi2_index, 1u);
3079 CHECK_EQ(pi3_index, 2u);
3080
3081 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3082 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3083 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3084 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3085
3086 time_converter.AddNextTimestamp(
3087 distributed_clock::epoch(),
3088 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3089 BootTimestamp::epoch()});
3090 const chrono::nanoseconds reboot_time = chrono::milliseconds(20000);
3091 time_converter.AddNextTimestamp(
3092 distributed_clock::epoch() + reboot_time,
3093 {BootTimestamp::epoch() + reboot_time,
3094 BootTimestamp{
3095 .boot = 1,
3096 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)},
3097 BootTimestamp::epoch() + reboot_time});
3098 }
3099
3100 // Make everything perfectly quiet.
3101 event_loop_factory.SkipTimingReport();
3102 event_loop_factory.DisableStatistics();
3103
3104 std::vector<std::string> filenames;
3105 {
3106 LoggerState pi1_logger = MakeLoggerState(
3107 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3108 LoggerState pi3_logger = MakeLoggerState(
3109 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3110 {
3111 // And now start the logger.
3112 LoggerState pi2_logger = MakeLoggerState(
3113 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3114
3115 event_loop_factory.RunFor(chrono::milliseconds(1000));
3116
3117 pi1_logger.StartLogger(kLogfile1_1);
3118 pi3_logger.StartLogger(kLogfile3_1);
3119 pi2_logger.StartLogger(kLogfile2_1);
3120
3121 event_loop_factory.RunFor(chrono::milliseconds(10000));
3122
3123 // Now that we've got a start time in the past, turn on data.
3124 event_loop_factory.EnableStatistics();
3125 std::unique_ptr<aos::EventLoop> ping_event_loop =
3126 pi1->MakeEventLoop("ping");
3127 Ping ping(ping_event_loop.get());
3128
3129 pi2->AlwaysStart<Pong>("pong");
3130
3131 event_loop_factory.RunFor(chrono::milliseconds(3000));
3132
3133 pi2_logger.AppendAllFilenames(&filenames);
3134
3135 // Stop logging on pi2 before rebooting and completely shut off all
3136 // messages on pi2.
3137 pi2->DisableStatistics();
3138 pi1->Disconnect(pi2->node());
3139 pi2->Disconnect(pi1->node());
3140 }
3141 event_loop_factory.RunFor(chrono::milliseconds(7000));
3142 // pi2 now reboots.
3143 {
3144 event_loop_factory.RunFor(chrono::milliseconds(1000));
3145
3146 // Start logging again on pi2 after it is up.
3147 LoggerState pi2_logger = MakeLoggerState(
3148 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3149 pi2_logger.StartLogger(kLogfile2_2);
3150
3151 event_loop_factory.RunFor(chrono::milliseconds(10000));
3152 // And, now that we have a start time in the log, turn data back on.
3153 pi2->EnableStatistics();
3154 pi1->Connect(pi2->node());
3155 pi2->Connect(pi1->node());
3156
3157 pi2->AlwaysStart<Pong>("pong");
3158 std::unique_ptr<aos::EventLoop> ping_event_loop =
3159 pi1->MakeEventLoop("ping");
3160 Ping ping(ping_event_loop.get());
3161
3162 event_loop_factory.RunFor(chrono::milliseconds(3000));
3163
3164 pi2_logger.AppendAllFilenames(&filenames);
3165 }
3166
3167 pi1_logger.AppendAllFilenames(&filenames);
3168 pi3_logger.AppendAllFilenames(&filenames);
3169 }
3170
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003171 // Confirm that we can parse the result. LogReader has enough internal
3172 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003173 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003174 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003175 auto result = ConfirmReadable(filenames);
3176 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
3177 chrono::seconds(1)));
3178 EXPECT_THAT(result[0].second,
3179 ::testing::ElementsAre(realtime_clock::epoch() +
3180 chrono::microseconds(34990350)));
3181
3182 EXPECT_THAT(result[1].first,
3183 ::testing::ElementsAre(
3184 realtime_clock::epoch() + chrono::seconds(1),
3185 realtime_clock::epoch() + chrono::microseconds(3323000)));
3186 EXPECT_THAT(result[1].second,
3187 ::testing::ElementsAre(
3188 realtime_clock::epoch() + chrono::microseconds(13990200),
3189 realtime_clock::epoch() + chrono::microseconds(16313200)));
3190
3191 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
3192 chrono::seconds(1)));
3193 EXPECT_THAT(result[2].second,
3194 ::testing::ElementsAre(realtime_clock::epoch() +
3195 chrono::microseconds(34900150)));
3196}
3197
3198// Tests that local data before remote data after reboot is properly replayed.
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003199// We only trigger a reboot in the timestamp interpolation function when
3200// solving the timestamp problem when we actually have a point in the
3201// function. This originally only happened when a point passes the noncausal
3202// filter. At the start of time for the second boot, if we aren't careful, we
3203// will have messages which need to be published at times before the boot.
3204// This happens when a local message is in the log before a forwarded message,
3205// so there is no point in the interpolation function. This delays the
3206// reboot. So, we need to recreate that situation and make sure it doesn't
3207// come back.
Naman Guptaa63aa132023-03-22 20:06:34 -07003208TEST(MultinodeRebootLoggerTest,
3209 LocalMessageBeforeRemoteBeforeStartAfterReboot) {
3210 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3211 aos::configuration::ReadConfig(ArtifactPath(
3212 "aos/events/logging/multinode_pingpong_split3_config.json"));
3213 message_bridge::TestingTimeConverter time_converter(
3214 configuration::NodesCount(&config.message()));
3215 SimulatedEventLoopFactory event_loop_factory(&config.message());
3216 event_loop_factory.SetTimeConverter(&time_converter);
3217 NodeEventLoopFactory *const pi1 =
3218 event_loop_factory.GetNodeEventLoopFactory("pi1");
3219 const size_t pi1_index = configuration::GetNodeIndex(
3220 event_loop_factory.configuration(), pi1->node());
3221 NodeEventLoopFactory *const pi2 =
3222 event_loop_factory.GetNodeEventLoopFactory("pi2");
3223 const size_t pi2_index = configuration::GetNodeIndex(
3224 event_loop_factory.configuration(), pi2->node());
3225 NodeEventLoopFactory *const pi3 =
3226 event_loop_factory.GetNodeEventLoopFactory("pi3");
3227 const size_t pi3_index = configuration::GetNodeIndex(
3228 event_loop_factory.configuration(), pi3->node());
3229
3230 const std::string kLogfile1_1 =
3231 aos::testing::TestTmpDir() + "/multi_logfile1/";
3232 const std::string kLogfile2_1 =
3233 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3234 const std::string kLogfile2_2 =
3235 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3236 const std::string kLogfile3_1 =
3237 aos::testing::TestTmpDir() + "/multi_logfile3/";
3238 util::UnlinkRecursive(kLogfile1_1);
3239 util::UnlinkRecursive(kLogfile2_1);
3240 util::UnlinkRecursive(kLogfile2_2);
3241 util::UnlinkRecursive(kLogfile3_1);
3242 const UUID pi1_boot0 = UUID::Random();
3243 const UUID pi2_boot0 = UUID::Random();
3244 const UUID pi2_boot1 = UUID::Random();
3245 const UUID pi3_boot0 = UUID::Random();
3246 {
3247 CHECK_EQ(pi1_index, 0u);
3248 CHECK_EQ(pi2_index, 1u);
3249 CHECK_EQ(pi3_index, 2u);
3250
3251 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3252 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3253 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3254 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3255
3256 time_converter.AddNextTimestamp(
3257 distributed_clock::epoch(),
3258 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3259 BootTimestamp::epoch()});
3260 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3261 time_converter.AddNextTimestamp(
3262 distributed_clock::epoch() + reboot_time,
3263 {BootTimestamp::epoch() + reboot_time,
3264 BootTimestamp{.boot = 1,
3265 .time = monotonic_clock::epoch() + reboot_time +
3266 chrono::seconds(100)},
3267 BootTimestamp::epoch() + reboot_time});
3268 }
3269
3270 std::vector<std::string> filenames;
3271 {
3272 LoggerState pi1_logger = MakeLoggerState(
3273 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3274 LoggerState pi3_logger = MakeLoggerState(
3275 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3276 {
3277 // And now start the logger.
3278 LoggerState pi2_logger = MakeLoggerState(
3279 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3280
3281 pi1_logger.StartLogger(kLogfile1_1);
3282 pi3_logger.StartLogger(kLogfile3_1);
3283 pi2_logger.StartLogger(kLogfile2_1);
3284
3285 event_loop_factory.RunFor(chrono::milliseconds(1005));
3286
3287 // Now that we've got a start time in the past, turn on data.
3288 std::unique_ptr<aos::EventLoop> ping_event_loop =
3289 pi1->MakeEventLoop("ping");
3290 Ping ping(ping_event_loop.get());
3291
3292 pi2->AlwaysStart<Pong>("pong");
3293
3294 event_loop_factory.RunFor(chrono::milliseconds(3000));
3295
3296 pi2_logger.AppendAllFilenames(&filenames);
3297
3298 // Disable any remote messages on pi2.
3299 pi1->Disconnect(pi2->node());
3300 pi2->Disconnect(pi1->node());
3301 }
3302 event_loop_factory.RunFor(chrono::milliseconds(995));
3303 // pi2 now reboots at 5 seconds.
3304 {
3305 event_loop_factory.RunFor(chrono::milliseconds(1000));
3306
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003307 // Make local stuff happen before we start logging and connect the
3308 // remote.
Naman Guptaa63aa132023-03-22 20:06:34 -07003309 pi2->AlwaysStart<Pong>("pong");
3310 std::unique_ptr<aos::EventLoop> ping_event_loop =
3311 pi1->MakeEventLoop("ping");
3312 Ping ping(ping_event_loop.get());
3313 event_loop_factory.RunFor(chrono::milliseconds(1005));
3314
3315 // Start logging again on pi2 after it is up.
3316 LoggerState pi2_logger = MakeLoggerState(
3317 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3318 pi2_logger.StartLogger(kLogfile2_2);
3319
3320 // And allow remote messages now that we have some local ones.
3321 pi1->Connect(pi2->node());
3322 pi2->Connect(pi1->node());
3323
3324 event_loop_factory.RunFor(chrono::milliseconds(1000));
3325
3326 event_loop_factory.RunFor(chrono::milliseconds(3000));
3327
3328 pi2_logger.AppendAllFilenames(&filenames);
3329 }
3330
3331 pi1_logger.AppendAllFilenames(&filenames);
3332 pi3_logger.AppendAllFilenames(&filenames);
3333 }
3334
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003335 // Confirm that we can parse the result. LogReader has enough internal
3336 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003337 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003338 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003339 auto result = ConfirmReadable(filenames);
3340
3341 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3342 EXPECT_THAT(result[0].second,
3343 ::testing::ElementsAre(realtime_clock::epoch() +
3344 chrono::microseconds(11000350)));
3345
3346 EXPECT_THAT(result[1].first,
3347 ::testing::ElementsAre(
3348 realtime_clock::epoch(),
3349 realtime_clock::epoch() + chrono::microseconds(107005000)));
3350 EXPECT_THAT(result[1].second,
3351 ::testing::ElementsAre(
3352 realtime_clock::epoch() + chrono::microseconds(4000150),
3353 realtime_clock::epoch() + chrono::microseconds(111000200)));
3354
3355 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3356 EXPECT_THAT(result[2].second,
3357 ::testing::ElementsAre(realtime_clock::epoch() +
3358 chrono::microseconds(11000150)));
3359
3360 auto start_stop_result = ConfirmReadable(
3361 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3362 realtime_clock::epoch() + chrono::milliseconds(3000));
3363
3364 EXPECT_THAT(
3365 start_stop_result[0].first,
3366 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3367 EXPECT_THAT(
3368 start_stop_result[0].second,
3369 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3370 EXPECT_THAT(
3371 start_stop_result[1].first,
3372 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3373 EXPECT_THAT(
3374 start_stop_result[1].second,
3375 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3376 EXPECT_THAT(
3377 start_stop_result[2].first,
3378 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3379 EXPECT_THAT(
3380 start_stop_result[2].second,
3381 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3382}
3383
3384// Tests that setting the start and stop flags across a reboot works as
3385// expected.
3386TEST(MultinodeRebootLoggerTest, RebootStartStopTimes) {
3387 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3388 aos::configuration::ReadConfig(ArtifactPath(
3389 "aos/events/logging/multinode_pingpong_split3_config.json"));
3390 message_bridge::TestingTimeConverter time_converter(
3391 configuration::NodesCount(&config.message()));
3392 SimulatedEventLoopFactory event_loop_factory(&config.message());
3393 event_loop_factory.SetTimeConverter(&time_converter);
3394 NodeEventLoopFactory *const pi1 =
3395 event_loop_factory.GetNodeEventLoopFactory("pi1");
3396 const size_t pi1_index = configuration::GetNodeIndex(
3397 event_loop_factory.configuration(), pi1->node());
3398 NodeEventLoopFactory *const pi2 =
3399 event_loop_factory.GetNodeEventLoopFactory("pi2");
3400 const size_t pi2_index = configuration::GetNodeIndex(
3401 event_loop_factory.configuration(), pi2->node());
3402 NodeEventLoopFactory *const pi3 =
3403 event_loop_factory.GetNodeEventLoopFactory("pi3");
3404 const size_t pi3_index = configuration::GetNodeIndex(
3405 event_loop_factory.configuration(), pi3->node());
3406
3407 const std::string kLogfile1_1 =
3408 aos::testing::TestTmpDir() + "/multi_logfile1/";
3409 const std::string kLogfile2_1 =
3410 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3411 const std::string kLogfile2_2 =
3412 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3413 const std::string kLogfile3_1 =
3414 aos::testing::TestTmpDir() + "/multi_logfile3/";
3415 util::UnlinkRecursive(kLogfile1_1);
3416 util::UnlinkRecursive(kLogfile2_1);
3417 util::UnlinkRecursive(kLogfile2_2);
3418 util::UnlinkRecursive(kLogfile3_1);
3419 {
3420 CHECK_EQ(pi1_index, 0u);
3421 CHECK_EQ(pi2_index, 1u);
3422 CHECK_EQ(pi3_index, 2u);
3423
3424 time_converter.AddNextTimestamp(
3425 distributed_clock::epoch(),
3426 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3427 BootTimestamp::epoch()});
3428 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3429 time_converter.AddNextTimestamp(
3430 distributed_clock::epoch() + reboot_time,
3431 {BootTimestamp::epoch() + reboot_time,
3432 BootTimestamp{.boot = 1,
3433 .time = monotonic_clock::epoch() + reboot_time},
3434 BootTimestamp::epoch() + reboot_time});
3435 }
3436
3437 std::vector<std::string> filenames;
3438 {
3439 LoggerState pi1_logger = MakeLoggerState(
3440 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3441 LoggerState pi3_logger = MakeLoggerState(
3442 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3443 {
3444 // And now start the logger.
3445 LoggerState pi2_logger = MakeLoggerState(
3446 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3447
3448 pi1_logger.StartLogger(kLogfile1_1);
3449 pi3_logger.StartLogger(kLogfile3_1);
3450 pi2_logger.StartLogger(kLogfile2_1);
3451
3452 event_loop_factory.RunFor(chrono::milliseconds(1005));
3453
3454 // Now that we've got a start time in the past, turn on data.
3455 std::unique_ptr<aos::EventLoop> ping_event_loop =
3456 pi1->MakeEventLoop("ping");
3457 Ping ping(ping_event_loop.get());
3458
3459 pi2->AlwaysStart<Pong>("pong");
3460
3461 event_loop_factory.RunFor(chrono::milliseconds(3000));
3462
3463 pi2_logger.AppendAllFilenames(&filenames);
3464 }
3465 event_loop_factory.RunFor(chrono::milliseconds(995));
3466 // pi2 now reboots at 5 seconds.
3467 {
3468 event_loop_factory.RunFor(chrono::milliseconds(1000));
3469
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003470 // Make local stuff happen before we start logging and connect the
3471 // remote.
Naman Guptaa63aa132023-03-22 20:06:34 -07003472 pi2->AlwaysStart<Pong>("pong");
3473 std::unique_ptr<aos::EventLoop> ping_event_loop =
3474 pi1->MakeEventLoop("ping");
3475 Ping ping(ping_event_loop.get());
3476 event_loop_factory.RunFor(chrono::milliseconds(5));
3477
3478 // Start logging again on pi2 after it is up.
3479 LoggerState pi2_logger = MakeLoggerState(
3480 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3481 pi2_logger.StartLogger(kLogfile2_2);
3482
3483 event_loop_factory.RunFor(chrono::milliseconds(5000));
3484
3485 pi2_logger.AppendAllFilenames(&filenames);
3486 }
3487
3488 pi1_logger.AppendAllFilenames(&filenames);
3489 pi3_logger.AppendAllFilenames(&filenames);
3490 }
3491
3492 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003493 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003494 auto result = ConfirmReadable(filenames);
3495
3496 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3497 EXPECT_THAT(result[0].second,
3498 ::testing::ElementsAre(realtime_clock::epoch() +
3499 chrono::microseconds(11000350)));
3500
3501 EXPECT_THAT(result[1].first,
3502 ::testing::ElementsAre(
3503 realtime_clock::epoch(),
3504 realtime_clock::epoch() + chrono::microseconds(6005000)));
3505 EXPECT_THAT(result[1].second,
3506 ::testing::ElementsAre(
3507 realtime_clock::epoch() + chrono::microseconds(4900150),
3508 realtime_clock::epoch() + chrono::microseconds(11000200)));
3509
3510 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3511 EXPECT_THAT(result[2].second,
3512 ::testing::ElementsAre(realtime_clock::epoch() +
3513 chrono::microseconds(11000150)));
3514
3515 // Confirm we observed the correct start and stop times. We should see the
3516 // reboot here.
3517 auto start_stop_result = ConfirmReadable(
3518 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3519 realtime_clock::epoch() + chrono::milliseconds(8000));
3520
3521 EXPECT_THAT(
3522 start_stop_result[0].first,
3523 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3524 EXPECT_THAT(
3525 start_stop_result[0].second,
3526 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3527 EXPECT_THAT(start_stop_result[1].first,
3528 ::testing::ElementsAre(
3529 realtime_clock::epoch() + chrono::seconds(2),
3530 realtime_clock::epoch() + chrono::microseconds(6005000)));
3531 EXPECT_THAT(start_stop_result[1].second,
3532 ::testing::ElementsAre(
3533 realtime_clock::epoch() + chrono::microseconds(4900150),
3534 realtime_clock::epoch() + chrono::seconds(8)));
3535 EXPECT_THAT(
3536 start_stop_result[2].first,
3537 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3538 EXPECT_THAT(
3539 start_stop_result[2].second,
3540 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3541}
3542
3543// Tests that we properly handle one direction being down.
3544TEST(MissingDirectionTest, OneDirection) {
3545 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3546 aos::configuration::ReadConfig(ArtifactPath(
3547 "aos/events/logging/multinode_pingpong_split4_config.json"));
3548 message_bridge::TestingTimeConverter time_converter(
3549 configuration::NodesCount(&config.message()));
3550 SimulatedEventLoopFactory event_loop_factory(&config.message());
3551 event_loop_factory.SetTimeConverter(&time_converter);
3552
3553 NodeEventLoopFactory *const pi1 =
3554 event_loop_factory.GetNodeEventLoopFactory("pi1");
3555 const size_t pi1_index = configuration::GetNodeIndex(
3556 event_loop_factory.configuration(), pi1->node());
3557 NodeEventLoopFactory *const pi2 =
3558 event_loop_factory.GetNodeEventLoopFactory("pi2");
3559 const size_t pi2_index = configuration::GetNodeIndex(
3560 event_loop_factory.configuration(), pi2->node());
3561 std::vector<std::string> filenames;
3562
3563 {
3564 CHECK_EQ(pi1_index, 0u);
3565 CHECK_EQ(pi2_index, 1u);
3566
3567 time_converter.AddNextTimestamp(
3568 distributed_clock::epoch(),
3569 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3570
3571 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3572 time_converter.AddNextTimestamp(
3573 distributed_clock::epoch() + reboot_time,
3574 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3575 BootTimestamp::epoch() + reboot_time});
3576 }
3577
3578 const std::string kLogfile2_1 =
3579 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3580 const std::string kLogfile1_1 =
3581 aos::testing::TestTmpDir() + "/multi_logfile1.1/";
3582 util::UnlinkRecursive(kLogfile2_1);
3583 util::UnlinkRecursive(kLogfile1_1);
3584
3585 pi2->Disconnect(pi1->node());
3586
3587 pi1->AlwaysStart<Ping>("ping");
3588 pi2->AlwaysStart<Pong>("pong");
3589
3590 {
3591 LoggerState pi2_logger = MakeLoggerState(
3592 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3593
3594 event_loop_factory.RunFor(chrono::milliseconds(95));
3595
3596 pi2_logger.StartLogger(kLogfile2_1);
3597
3598 event_loop_factory.RunFor(chrono::milliseconds(6000));
3599
3600 pi2->Connect(pi1->node());
3601
3602 LoggerState pi1_logger = MakeLoggerState(
3603 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3604 pi1_logger.StartLogger(kLogfile1_1);
3605
3606 event_loop_factory.RunFor(chrono::milliseconds(5000));
3607 pi1_logger.AppendAllFilenames(&filenames);
3608 pi2_logger.AppendAllFilenames(&filenames);
3609 }
3610
3611 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003612 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003613 ConfirmReadable(filenames);
3614}
3615
3616// Tests that we properly handle only one direction ever existing after a
3617// reboot.
3618TEST(MissingDirectionTest, OneDirectionAfterReboot) {
3619 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3620 aos::configuration::ReadConfig(ArtifactPath(
3621 "aos/events/logging/multinode_pingpong_split4_config.json"));
3622 message_bridge::TestingTimeConverter time_converter(
3623 configuration::NodesCount(&config.message()));
3624 SimulatedEventLoopFactory event_loop_factory(&config.message());
3625 event_loop_factory.SetTimeConverter(&time_converter);
3626
3627 NodeEventLoopFactory *const pi1 =
3628 event_loop_factory.GetNodeEventLoopFactory("pi1");
3629 const size_t pi1_index = configuration::GetNodeIndex(
3630 event_loop_factory.configuration(), pi1->node());
3631 NodeEventLoopFactory *const pi2 =
3632 event_loop_factory.GetNodeEventLoopFactory("pi2");
3633 const size_t pi2_index = configuration::GetNodeIndex(
3634 event_loop_factory.configuration(), pi2->node());
3635 std::vector<std::string> filenames;
3636
3637 {
3638 CHECK_EQ(pi1_index, 0u);
3639 CHECK_EQ(pi2_index, 1u);
3640
3641 time_converter.AddNextTimestamp(
3642 distributed_clock::epoch(),
3643 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3644
3645 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3646 time_converter.AddNextTimestamp(
3647 distributed_clock::epoch() + reboot_time,
3648 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3649 BootTimestamp::epoch() + reboot_time});
3650 }
3651
3652 const std::string kLogfile2_1 =
3653 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3654 util::UnlinkRecursive(kLogfile2_1);
3655
3656 pi1->AlwaysStart<Ping>("ping");
3657
3658 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3659 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3660 // second boot.
3661 {
3662 LoggerState pi2_logger = MakeLoggerState(
3663 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3664
3665 event_loop_factory.RunFor(chrono::milliseconds(95));
3666
3667 pi2_logger.StartLogger(kLogfile2_1);
3668
3669 event_loop_factory.RunFor(chrono::milliseconds(4000));
3670
3671 pi2->Disconnect(pi1->node());
3672
3673 event_loop_factory.RunFor(chrono::milliseconds(1000));
3674 pi1->AlwaysStart<Ping>("ping");
3675
3676 event_loop_factory.RunFor(chrono::milliseconds(5000));
3677 pi2_logger.AppendAllFilenames(&filenames);
3678 }
3679
3680 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003681 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003682 ConfirmReadable(filenames);
3683}
3684
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003685// Tests that we properly handle only one direction ever existing after a
3686// reboot with only reliable data.
Naman Guptaa63aa132023-03-22 20:06:34 -07003687TEST(MissingDirectionTest, OneDirectionAfterRebootReliable) {
3688 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003689 aos::configuration::ReadConfig(
3690 ArtifactPath("aos/events/logging/"
3691 "multinode_pingpong_split4_reliable_config.json"));
Naman Guptaa63aa132023-03-22 20:06:34 -07003692 message_bridge::TestingTimeConverter time_converter(
3693 configuration::NodesCount(&config.message()));
3694 SimulatedEventLoopFactory event_loop_factory(&config.message());
3695 event_loop_factory.SetTimeConverter(&time_converter);
3696
3697 NodeEventLoopFactory *const pi1 =
3698 event_loop_factory.GetNodeEventLoopFactory("pi1");
3699 const size_t pi1_index = configuration::GetNodeIndex(
3700 event_loop_factory.configuration(), pi1->node());
3701 NodeEventLoopFactory *const pi2 =
3702 event_loop_factory.GetNodeEventLoopFactory("pi2");
3703 const size_t pi2_index = configuration::GetNodeIndex(
3704 event_loop_factory.configuration(), pi2->node());
3705 std::vector<std::string> filenames;
3706
3707 {
3708 CHECK_EQ(pi1_index, 0u);
3709 CHECK_EQ(pi2_index, 1u);
3710
3711 time_converter.AddNextTimestamp(
3712 distributed_clock::epoch(),
3713 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3714
3715 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3716 time_converter.AddNextTimestamp(
3717 distributed_clock::epoch() + reboot_time,
3718 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3719 BootTimestamp::epoch() + reboot_time});
3720 }
3721
3722 const std::string kLogfile2_1 =
3723 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3724 util::UnlinkRecursive(kLogfile2_1);
3725
3726 pi1->AlwaysStart<Ping>("ping");
3727
3728 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3729 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3730 // second boot.
3731 {
3732 LoggerState pi2_logger = MakeLoggerState(
3733 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3734
3735 event_loop_factory.RunFor(chrono::milliseconds(95));
3736
3737 pi2_logger.StartLogger(kLogfile2_1);
3738
3739 event_loop_factory.RunFor(chrono::milliseconds(4000));
3740
3741 pi2->Disconnect(pi1->node());
3742
3743 event_loop_factory.RunFor(chrono::milliseconds(1000));
3744 pi1->AlwaysStart<Ping>("ping");
3745
3746 event_loop_factory.RunFor(chrono::milliseconds(5000));
3747 pi2_logger.AppendAllFilenames(&filenames);
3748 }
3749
3750 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003751 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003752 ConfirmReadable(filenames);
3753}
3754
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003755// Tests that we properly handle only one direction ever existing after a
3756// reboot with mixed unreliable vs reliable, where reliable has an earlier
3757// timestamp than unreliable.
Brian Smartte67d7112023-03-20 12:06:30 -07003758TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase1) {
3759 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3760 aos::configuration::ReadConfig(ArtifactPath(
3761 "aos/events/logging/multinode_pingpong_split4_mixed1_config.json"));
3762 message_bridge::TestingTimeConverter time_converter(
3763 configuration::NodesCount(&config.message()));
3764 SimulatedEventLoopFactory event_loop_factory(&config.message());
3765 event_loop_factory.SetTimeConverter(&time_converter);
3766
3767 NodeEventLoopFactory *const pi1 =
3768 event_loop_factory.GetNodeEventLoopFactory("pi1");
3769 const size_t pi1_index = configuration::GetNodeIndex(
3770 event_loop_factory.configuration(), pi1->node());
3771 NodeEventLoopFactory *const pi2 =
3772 event_loop_factory.GetNodeEventLoopFactory("pi2");
3773 const size_t pi2_index = configuration::GetNodeIndex(
3774 event_loop_factory.configuration(), pi2->node());
3775 std::vector<std::string> filenames;
3776
3777 {
3778 CHECK_EQ(pi1_index, 0u);
3779 CHECK_EQ(pi2_index, 1u);
3780
3781 time_converter.AddNextTimestamp(
3782 distributed_clock::epoch(),
3783 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3784
3785 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3786 time_converter.AddNextTimestamp(
3787 distributed_clock::epoch() + reboot_time,
3788 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3789 BootTimestamp::epoch() + reboot_time});
3790 }
3791
3792 const std::string kLogfile2_1 =
3793 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3794 util::UnlinkRecursive(kLogfile2_1);
3795
3796 // The following sequence using the above reference config creates
3797 // a reliable message timestamp < unreliable message timestamp.
3798 {
3799 pi1->DisableStatistics();
3800 pi2->DisableStatistics();
3801
3802 event_loop_factory.RunFor(chrono::milliseconds(95));
3803
3804 pi1->AlwaysStart<Ping>("ping");
3805
3806 event_loop_factory.RunFor(chrono::milliseconds(5250));
3807
3808 pi1->EnableStatistics();
3809
3810 event_loop_factory.RunFor(chrono::milliseconds(1000));
3811
3812 LoggerState pi2_logger = MakeLoggerState(
3813 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3814
3815 pi2_logger.StartLogger(kLogfile2_1);
3816
3817 event_loop_factory.RunFor(chrono::milliseconds(5000));
3818 pi2_logger.AppendAllFilenames(&filenames);
3819 }
3820
3821 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003822 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Brian Smartte67d7112023-03-20 12:06:30 -07003823 ConfirmReadable(filenames);
3824}
3825
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003826// Tests that we properly handle only one direction ever existing after a
3827// reboot with mixed unreliable vs reliable, where unreliable has an earlier
3828// timestamp than reliable.
Brian Smartte67d7112023-03-20 12:06:30 -07003829TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase2) {
3830 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3831 aos::configuration::ReadConfig(ArtifactPath(
3832 "aos/events/logging/multinode_pingpong_split4_mixed2_config.json"));
3833 message_bridge::TestingTimeConverter time_converter(
3834 configuration::NodesCount(&config.message()));
3835 SimulatedEventLoopFactory event_loop_factory(&config.message());
3836 event_loop_factory.SetTimeConverter(&time_converter);
3837
3838 NodeEventLoopFactory *const pi1 =
3839 event_loop_factory.GetNodeEventLoopFactory("pi1");
3840 const size_t pi1_index = configuration::GetNodeIndex(
3841 event_loop_factory.configuration(), pi1->node());
3842 NodeEventLoopFactory *const pi2 =
3843 event_loop_factory.GetNodeEventLoopFactory("pi2");
3844 const size_t pi2_index = configuration::GetNodeIndex(
3845 event_loop_factory.configuration(), pi2->node());
3846 std::vector<std::string> filenames;
3847
3848 {
3849 CHECK_EQ(pi1_index, 0u);
3850 CHECK_EQ(pi2_index, 1u);
3851
3852 time_converter.AddNextTimestamp(
3853 distributed_clock::epoch(),
3854 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3855
3856 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3857 time_converter.AddNextTimestamp(
3858 distributed_clock::epoch() + reboot_time,
3859 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3860 BootTimestamp::epoch() + reboot_time});
3861 }
3862
3863 const std::string kLogfile2_1 =
3864 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3865 util::UnlinkRecursive(kLogfile2_1);
3866
3867 // The following sequence using the above reference config creates
3868 // an unreliable message timestamp < reliable message timestamp.
3869 {
3870 pi1->DisableStatistics();
3871 pi2->DisableStatistics();
3872
3873 event_loop_factory.RunFor(chrono::milliseconds(95));
3874
3875 pi1->AlwaysStart<Ping>("ping");
3876
3877 event_loop_factory.RunFor(chrono::milliseconds(5250));
3878
3879 pi1->EnableStatistics();
3880
3881 event_loop_factory.RunFor(chrono::milliseconds(1000));
3882
3883 LoggerState pi2_logger = MakeLoggerState(
3884 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3885
3886 pi2_logger.StartLogger(kLogfile2_1);
3887
3888 event_loop_factory.RunFor(chrono::milliseconds(5000));
3889 pi2_logger.AppendAllFilenames(&filenames);
3890 }
3891
3892 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003893 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Brian Smartte67d7112023-03-20 12:06:30 -07003894 ConfirmReadable(filenames);
3895}
3896
Naman Guptaa63aa132023-03-22 20:06:34 -07003897// Tests that we properly handle what used to be a time violation in one
3898// direction. This can occur when one direction goes down after sending some
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003899// data, but the other keeps working. The down direction ends up resolving to
3900// a straight line in the noncausal filter, where the direction which is still
3901// up can cross that line. Really, time progressed along just fine but we
3902// assumed that the offset was a line when it could have deviated by up to
3903// 1ms/second.
Naman Guptaa63aa132023-03-22 20:06:34 -07003904TEST_P(MultinodeLoggerTest, OneDirectionTimeDrift) {
3905 std::vector<std::string> filenames;
3906
3907 CHECK_EQ(pi1_index_, 0u);
3908 CHECK_EQ(pi2_index_, 1u);
3909
3910 time_converter_.AddNextTimestamp(
3911 distributed_clock::epoch(),
3912 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3913
3914 const chrono::nanoseconds before_disconnect_duration =
3915 time_converter_.AddMonotonic(
3916 {chrono::milliseconds(1000), chrono::milliseconds(1000)});
3917
3918 const chrono::nanoseconds test_duration =
3919 time_converter_.AddMonotonic(
3920 {chrono::milliseconds(1000), chrono::milliseconds(1000)}) +
3921 time_converter_.AddMonotonic(
3922 {chrono::milliseconds(10000),
3923 chrono::milliseconds(10000) - chrono::milliseconds(5)}) +
3924 time_converter_.AddMonotonic(
3925 {chrono::milliseconds(10000),
3926 chrono::milliseconds(10000) + chrono::milliseconds(5)});
3927
3928 const std::string kLogfile =
3929 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3930 util::UnlinkRecursive(kLogfile);
3931
3932 {
3933 LoggerState pi2_logger = MakeLogger(pi2_);
3934 pi2_logger.StartLogger(kLogfile);
3935 event_loop_factory_.RunFor(before_disconnect_duration);
3936
3937 pi2_->Disconnect(pi1_->node());
3938
3939 event_loop_factory_.RunFor(test_duration);
3940 pi2_->Connect(pi1_->node());
3941
3942 event_loop_factory_.RunFor(chrono::milliseconds(5000));
3943 pi2_logger.AppendAllFilenames(&filenames);
3944 }
3945
3946 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003947 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003948 ConfirmReadable(filenames);
3949}
3950
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003951// Tests that we can replay a logfile that has timestamps such that at least
3952// one node's epoch is at a positive distributed_clock (and thus will have to
3953// be booted after the other node(s)).
Naman Guptaa63aa132023-03-22 20:06:34 -07003954TEST_P(MultinodeLoggerTest, StartOneNodeBeforeOther) {
3955 std::vector<std::string> filenames;
3956
3957 CHECK_EQ(pi1_index_, 0u);
3958 CHECK_EQ(pi2_index_, 1u);
3959
3960 time_converter_.AddNextTimestamp(
3961 distributed_clock::epoch(),
3962 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3963
3964 const chrono::nanoseconds before_reboot_duration = chrono::milliseconds(1000);
3965 time_converter_.RebootAt(
3966 0, distributed_clock::time_point(before_reboot_duration));
3967
3968 const chrono::nanoseconds test_duration = time_converter_.AddMonotonic(
3969 {chrono::milliseconds(10000), chrono::milliseconds(10000)});
3970
3971 const std::string kLogfile =
3972 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3973 util::UnlinkRecursive(kLogfile);
3974
3975 pi2_->Disconnect(pi1_->node());
3976 pi1_->Disconnect(pi2_->node());
3977
3978 {
3979 LoggerState pi2_logger = MakeLogger(pi2_);
3980
3981 pi2_logger.StartLogger(kLogfile);
3982 event_loop_factory_.RunFor(before_reboot_duration);
3983
3984 pi2_->Connect(pi1_->node());
3985 pi1_->Connect(pi2_->node());
3986
3987 event_loop_factory_.RunFor(test_duration);
3988
3989 pi2_logger.AppendAllFilenames(&filenames);
3990 }
3991
3992 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003993 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003994 ConfirmReadable(filenames);
3995
3996 {
3997 LogReader reader(sorted_parts);
3998 SimulatedEventLoopFactory replay_factory(reader.configuration());
3999 reader.RegisterWithoutStarting(&replay_factory);
4000
4001 NodeEventLoopFactory *const replay_node =
4002 reader.event_loop_factory()->GetNodeEventLoopFactory("pi1");
4003
4004 std::unique_ptr<EventLoop> test_event_loop =
4005 replay_node->MakeEventLoop("test_reader");
4006 replay_node->OnStartup([replay_node]() {
4007 // Check that we didn't boot until at least t=0.
4008 CHECK_LE(monotonic_clock::epoch(), replay_node->monotonic_now());
4009 });
4010 test_event_loop->OnRun([&test_event_loop]() {
4011 // Check that we didn't boot until at least t=0.
4012 EXPECT_LE(monotonic_clock::epoch(), test_event_loop->monotonic_now());
4013 });
4014 reader.event_loop_factory()->Run();
4015 reader.Deregister();
4016 }
4017}
4018
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004019// Tests that when we have a loop without all the logs at all points in time,
4020// we can sort it properly.
Naman Guptaa63aa132023-03-22 20:06:34 -07004021TEST(MultinodeLoggerLoopTest, Loop) {
4022 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004023 aos::configuration::ReadConfig(
4024 ArtifactPath("aos/events/logging/"
4025 "multinode_pingpong_triangle_split_config.json"));
Naman Guptaa63aa132023-03-22 20:06:34 -07004026 message_bridge::TestingTimeConverter time_converter(
4027 configuration::NodesCount(&config.message()));
4028 SimulatedEventLoopFactory event_loop_factory(&config.message());
4029 event_loop_factory.SetTimeConverter(&time_converter);
4030
4031 NodeEventLoopFactory *const pi1 =
4032 event_loop_factory.GetNodeEventLoopFactory("pi1");
4033 NodeEventLoopFactory *const pi2 =
4034 event_loop_factory.GetNodeEventLoopFactory("pi2");
4035 NodeEventLoopFactory *const pi3 =
4036 event_loop_factory.GetNodeEventLoopFactory("pi3");
4037
4038 const std::string kLogfile1_1 =
4039 aos::testing::TestTmpDir() + "/multi_logfile1/";
4040 const std::string kLogfile2_1 =
4041 aos::testing::TestTmpDir() + "/multi_logfile2/";
4042 const std::string kLogfile3_1 =
4043 aos::testing::TestTmpDir() + "/multi_logfile3/";
4044 util::UnlinkRecursive(kLogfile1_1);
4045 util::UnlinkRecursive(kLogfile2_1);
4046 util::UnlinkRecursive(kLogfile3_1);
4047
4048 {
4049 // Make pi1 boot before everything else.
4050 time_converter.AddNextTimestamp(
4051 distributed_clock::epoch(),
4052 {BootTimestamp::epoch(),
4053 BootTimestamp::epoch() - chrono::milliseconds(100),
4054 BootTimestamp::epoch() - chrono::milliseconds(300)});
4055 }
4056
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004057 // We want to setup a situation such that 2 of the 3 legs of the loop are
4058 // very confident about time being X, and the third leg is pulling the
4059 // average off to one side.
Naman Guptaa63aa132023-03-22 20:06:34 -07004060 //
4061 // It's easiest to visualize this in timestamp_plotter.
4062
4063 std::vector<std::string> filenames;
4064 {
4065 // Have pi1 send out a reliable message at startup. This sets up a long
4066 // forwarding time message at the start to bias time.
4067 std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
4068 {
4069 aos::Sender<examples::Ping> ping_sender =
4070 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
4071
4072 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
4073 examples::Ping::Builder ping_builder =
4074 builder.MakeBuilder<examples::Ping>();
4075 CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
4076 }
4077
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004078 // Wait a while so there's enough data to let the worst case be rather
4079 // off.
Naman Guptaa63aa132023-03-22 20:06:34 -07004080 event_loop_factory.RunFor(chrono::seconds(1000));
4081
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004082 // Now start a receiving node first. This sets up 2 tight bounds between
4083 // 2 of the nodes.
Naman Guptaa63aa132023-03-22 20:06:34 -07004084 LoggerState pi2_logger = MakeLoggerState(
4085 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4086 pi2_logger.StartLogger(kLogfile2_1);
4087
4088 event_loop_factory.RunFor(chrono::seconds(100));
4089
4090 // And now start the third leg.
4091 LoggerState pi3_logger = MakeLoggerState(
4092 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4093 pi3_logger.StartLogger(kLogfile3_1);
4094
4095 LoggerState pi1_logger = MakeLoggerState(
4096 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4097 pi1_logger.StartLogger(kLogfile1_1);
4098
4099 event_loop_factory.RunFor(chrono::seconds(100));
4100
4101 pi1_logger.AppendAllFilenames(&filenames);
4102 pi2_logger.AppendAllFilenames(&filenames);
4103 pi3_logger.AppendAllFilenames(&filenames);
4104 }
4105
4106 // Make sure we can read this.
4107 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004108 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004109 auto result = ConfirmReadable(filenames);
4110}
4111
Austin Schuh08dba8f2023-05-01 08:29:30 -07004112// Tests that RestartLogging works in the simple case. Unfortunately, the
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004113// failure cases involve simulating time elapsing in callbacks, which is
4114// really hard. The best we can reasonably do is make sure 2 back to back
4115// logs are parseable together.
Austin Schuh08dba8f2023-05-01 08:29:30 -07004116TEST_P(MultinodeLoggerTest, RestartLogging) {
4117 time_converter_.AddMonotonic(
4118 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
4119 std::vector<std::string> filenames;
4120 {
4121 LoggerState pi1_logger = MakeLogger(pi1_);
4122
4123 event_loop_factory_.RunFor(chrono::milliseconds(95));
4124
4125 StartLogger(&pi1_logger, logfile_base1_);
4126 aos::monotonic_clock::time_point last_rotation_time =
4127 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07004128 pi1_logger.logger->set_on_logged_period(
4129 [&](aos::monotonic_clock::time_point) {
4130 const auto now = pi1_logger.event_loop->monotonic_now();
4131 if (now > last_rotation_time + std::chrono::seconds(5)) {
4132 pi1_logger.AppendAllFilenames(&filenames);
4133 std::unique_ptr<MultiNodeFilesLogNamer> namer =
4134 pi1_logger.MakeLogNamer(logfile_base2_);
4135 pi1_logger.log_namer = namer.get();
Austin Schuh08dba8f2023-05-01 08:29:30 -07004136
Austin Schuh2f864452023-07-17 14:53:08 -07004137 pi1_logger.logger->RestartLogging(std::move(namer));
4138 last_rotation_time = now;
4139 }
4140 });
Austin Schuh08dba8f2023-05-01 08:29:30 -07004141
4142 event_loop_factory_.RunFor(chrono::milliseconds(7000));
4143
4144 pi1_logger.AppendAllFilenames(&filenames);
4145 }
4146
4147 for (const auto &x : filenames) {
4148 LOG(INFO) << x;
4149 }
4150
4151 EXPECT_GE(filenames.size(), 2u);
4152
4153 ConfirmReadable(filenames);
4154
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004155 // TODO(austin): It would be good to confirm that any one time messages end
4156 // up in both logs correctly.
Austin Schuh08dba8f2023-05-01 08:29:30 -07004157}
4158
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004159// Tests that when we have evidence of 2 boots, and then start logging, the
4160// max_out_of_order_duration ends up reasonable on the boot with the start time.
4161TEST(MultinodeLoggerLoopTest, PreviousBootData) {
4162 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4163 aos::configuration::ReadConfig(ArtifactPath(
4164 "aos/events/logging/multinode_pingpong_reboot_ooo_config.json"));
4165 message_bridge::TestingTimeConverter time_converter(
4166 configuration::NodesCount(&config.message()));
4167 SimulatedEventLoopFactory event_loop_factory(&config.message());
4168 event_loop_factory.SetTimeConverter(&time_converter);
4169
4170 const UUID pi1_boot0 = UUID::Random();
4171 const UUID pi2_boot0 = UUID::Random();
4172 const UUID pi2_boot1 = UUID::Random();
4173
4174 const std::string kLogfile1_1 =
4175 aos::testing::TestTmpDir() + "/multi_logfile1/";
4176 util::UnlinkRecursive(kLogfile1_1);
4177
4178 {
4179 constexpr size_t kPi1Index = 0;
4180 constexpr size_t kPi2Index = 1;
4181 time_converter.set_boot_uuid(kPi1Index, 0, pi1_boot0);
4182 time_converter.set_boot_uuid(kPi2Index, 0, pi2_boot0);
4183 time_converter.set_boot_uuid(kPi2Index, 1, pi2_boot1);
4184
4185 // Make pi1 boot before everything else.
4186 time_converter.AddNextTimestamp(
4187 distributed_clock::epoch(),
4188 {BootTimestamp::epoch(),
4189 BootTimestamp::epoch() - chrono::milliseconds(100)});
4190
4191 const chrono::nanoseconds reboot_time = chrono::seconds(1005);
4192 time_converter.AddNextTimestamp(
4193 distributed_clock::epoch() + reboot_time,
4194 {BootTimestamp::epoch() + reboot_time,
4195 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()}});
4196 }
4197
4198 NodeEventLoopFactory *const pi1 =
4199 event_loop_factory.GetNodeEventLoopFactory("pi1");
4200 NodeEventLoopFactory *const pi2 =
4201 event_loop_factory.GetNodeEventLoopFactory("pi2");
4202
4203 // What we want is for pi2 to send a message at t=1000 on the first channel
4204 // (/atest1 pong), and t=1 on the second channel (/atest3 pong). That'll make
4205 // the max out of order duration be large.
4206 //
4207 // Then, we reboot, and only send messages on a third channel (/atest2 pong).
4208 // The order is key, they need to sort in this order in the config.
4209
4210 std::vector<std::string> filenames;
4211 {
4212 {
4213 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4214 aos::Sender<examples::Pong> pong_sender =
4215 pi2_event_loop->MakeSender<examples::Pong>("/atest3");
4216
4217 pi2_event_loop->OnRun([&]() {
4218 aos::Sender<examples::Pong>::Builder builder =
4219 pong_sender.MakeBuilder();
4220 examples::Pong::Builder pong_builder =
4221 builder.MakeBuilder<examples::Pong>();
4222 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4223 });
4224
4225 event_loop_factory.RunFor(chrono::seconds(1000));
4226 }
4227
4228 {
4229 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4230 aos::Sender<examples::Pong> pong_sender =
4231 pi2_event_loop->MakeSender<examples::Pong>("/atest1");
4232
4233 aos::Sender<examples::Pong>::Builder builder = pong_sender.MakeBuilder();
4234 examples::Pong::Builder pong_builder =
4235 builder.MakeBuilder<examples::Pong>();
4236 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4237 }
4238
4239 event_loop_factory.RunFor(chrono::seconds(10));
4240
4241 // Now start a receiving node first. This sets up 2 tight bounds between
4242 // 2 of the nodes.
4243 LoggerState pi1_logger = MakeLoggerState(
4244 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4245 pi1_logger.StartLogger(kLogfile1_1);
4246
4247 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4248 aos::Sender<examples::Pong> pong_sender =
4249 pi2_event_loop->MakeSender<examples::Pong>("/atest2");
4250
4251 pi2_event_loop->AddPhasedLoop(
4252 [&pong_sender](int) {
4253 aos::Sender<examples::Pong>::Builder builder =
4254 pong_sender.MakeBuilder();
4255 examples::Pong::Builder pong_builder =
4256 builder.MakeBuilder<examples::Pong>();
4257 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4258 },
4259 chrono::milliseconds(10));
4260
4261 event_loop_factory.RunFor(chrono::seconds(100));
4262
4263 pi1_logger.AppendAllFilenames(&filenames);
4264 }
4265
4266 // Make sure we can read this.
4267 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4268 EXPECT_TRUE(AllRebootPartsMatchOutOfOrderDuration(sorted_parts, "pi2"));
4269 auto result = ConfirmReadable(filenames);
4270}
4271
4272// Tests that when we start without a connection, and then start logging, the
4273// max_out_of_order_duration ends up reasonable.
4274TEST(MultinodeLoggerLoopTest, StartDisconnected) {
4275 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4276 aos::configuration::ReadConfig(ArtifactPath(
4277 "aos/events/logging/multinode_pingpong_reboot_ooo_config.json"));
4278 message_bridge::TestingTimeConverter time_converter(
4279 configuration::NodesCount(&config.message()));
4280 SimulatedEventLoopFactory event_loop_factory(&config.message());
4281 event_loop_factory.SetTimeConverter(&time_converter);
4282
4283 time_converter.StartEqual();
4284
4285 const std::string kLogfile1_1 =
4286 aos::testing::TestTmpDir() + "/multi_logfile1/";
4287 util::UnlinkRecursive(kLogfile1_1);
4288
4289 NodeEventLoopFactory *const pi1 =
4290 event_loop_factory.GetNodeEventLoopFactory("pi1");
4291 NodeEventLoopFactory *const pi2 =
4292 event_loop_factory.GetNodeEventLoopFactory("pi2");
4293
4294 // What we want is for pi2 to send a message at t=1000 on the first channel
4295 // (/atest1 pong), and t=1 on the second channel (/atest3 pong). That'll make
4296 // the max out of order duration be large.
4297 //
4298 // Then, we disconnect, and only send messages on a third channel
4299 // (/atest2 pong). The order is key, they need to sort in this order in the
4300 // config so we observe them in the order which grows the
4301 // max_out_of_order_duration.
4302
4303 std::vector<std::string> filenames;
4304 {
4305 {
4306 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4307 aos::Sender<examples::Pong> pong_sender =
4308 pi2_event_loop->MakeSender<examples::Pong>("/atest3");
4309
4310 pi2_event_loop->OnRun([&]() {
4311 aos::Sender<examples::Pong>::Builder builder =
4312 pong_sender.MakeBuilder();
4313 examples::Pong::Builder pong_builder =
4314 builder.MakeBuilder<examples::Pong>();
4315 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4316 });
4317
4318 event_loop_factory.RunFor(chrono::seconds(1000));
4319 }
4320
4321 {
4322 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4323 aos::Sender<examples::Pong> pong_sender =
4324 pi2_event_loop->MakeSender<examples::Pong>("/atest1");
4325
4326 aos::Sender<examples::Pong>::Builder builder = pong_sender.MakeBuilder();
4327 examples::Pong::Builder pong_builder =
4328 builder.MakeBuilder<examples::Pong>();
4329 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4330 }
4331
4332 event_loop_factory.RunFor(chrono::seconds(10));
4333
4334 pi1->Disconnect(pi2->node());
4335 pi2->Disconnect(pi1->node());
4336
4337 // Make data flow.
4338 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4339 aos::Sender<examples::Pong> pong_sender =
4340 pi2_event_loop->MakeSender<examples::Pong>("/atest2");
4341
4342 pi2_event_loop->AddPhasedLoop(
4343 [&pong_sender](int) {
4344 aos::Sender<examples::Pong>::Builder builder =
4345 pong_sender.MakeBuilder();
4346 examples::Pong::Builder pong_builder =
4347 builder.MakeBuilder<examples::Pong>();
4348 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4349 },
4350 chrono::milliseconds(10));
4351
4352 event_loop_factory.RunFor(chrono::seconds(10));
4353
4354 // Now start a receiving node first. This sets up 2 tight bounds between
4355 // 2 of the nodes.
4356 LoggerState pi1_logger = MakeLoggerState(
4357 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4358 pi1_logger.StartLogger(kLogfile1_1);
4359
4360 event_loop_factory.RunFor(chrono::seconds(10));
4361
4362 // Now, reconnect, and everything should recover.
4363 pi1->Connect(pi2->node());
4364 pi2->Connect(pi1->node());
4365
4366 event_loop_factory.RunFor(chrono::seconds(10));
4367
4368 pi1_logger.AppendAllFilenames(&filenames);
4369 }
4370
4371 // Make sure we can read this.
4372 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4373 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
4374 auto result = ConfirmReadable(filenames);
4375}
4376
Austin Schuh1124c512023-08-01 15:20:44 -07004377// Class to spam Pong messages blindly.
4378class PongSender {
4379 public:
4380 PongSender(EventLoop *loop, std::string_view channel_name)
4381 : sender_(loop->MakeSender<examples::Pong>(channel_name)) {
4382 loop->AddPhasedLoop(
4383 [this](int) {
4384 aos::Sender<examples::Pong>::Builder builder = sender_.MakeBuilder();
4385 examples::Pong::Builder pong_builder =
4386 builder.MakeBuilder<examples::Pong>();
4387 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4388 },
4389 chrono::milliseconds(10));
4390 }
4391
4392 private:
4393 aos::Sender<examples::Pong> sender_;
4394};
4395
4396// Tests that we log correctly as nodes connect slowly.
4397TEST(MultinodeLoggerLoopTest, StaggeredConnect) {
4398 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4399 aos::configuration::ReadConfig(ArtifactPath(
4400 "aos/events/logging/multinode_pingpong_pi3_pingpong_config.json"));
4401 message_bridge::TestingTimeConverter time_converter(
4402 configuration::NodesCount(&config.message()));
4403 SimulatedEventLoopFactory event_loop_factory(&config.message());
4404 event_loop_factory.SetTimeConverter(&time_converter);
4405
4406 time_converter.StartEqual();
4407
4408 const std::string kLogfile1_1 =
4409 aos::testing::TestTmpDir() + "/multi_logfile1/";
4410 util::UnlinkRecursive(kLogfile1_1);
4411
4412 NodeEventLoopFactory *const pi1 =
4413 event_loop_factory.GetNodeEventLoopFactory("pi1");
4414 NodeEventLoopFactory *const pi2 =
4415 event_loop_factory.GetNodeEventLoopFactory("pi2");
4416 NodeEventLoopFactory *const pi3 =
4417 event_loop_factory.GetNodeEventLoopFactory("pi3");
4418
4419 // What we want is for pi2 to send a message at t=1000 on the first channel
4420 // (/atest1 pong), and t=1 on the second channel (/atest3 pong). That'll make
4421 // the max out of order duration be large.
4422 //
4423 // Then, we disconnect, and only send messages on a third channel
4424 // (/atest2 pong). The order is key, they need to sort in this order in the
4425 // config so we observe them in the order which grows the
4426 // max_out_of_order_duration.
4427
4428 pi1->Disconnect(pi2->node());
4429 pi2->Disconnect(pi1->node());
4430
4431 pi1->Disconnect(pi3->node());
4432 pi3->Disconnect(pi1->node());
4433
4434 std::vector<std::string> filenames;
4435 pi2->AlwaysStart<PongSender>("pongsender", "/test2");
4436 pi3->AlwaysStart<PongSender>("pongsender", "/test3");
4437
4438 event_loop_factory.RunFor(chrono::seconds(10));
4439
4440 {
4441 // Now start a receiving node first. This sets up 2 tight bounds between
4442 // 2 of the nodes.
4443 LoggerState pi1_logger = MakeLoggerState(
4444 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4445 pi1_logger.StartLogger(kLogfile1_1);
4446
4447 event_loop_factory.RunFor(chrono::seconds(10));
4448
4449 // Now, reconnect, and everything should recover.
4450 pi1->Connect(pi2->node());
4451 pi2->Connect(pi1->node());
4452
4453 event_loop_factory.RunFor(chrono::seconds(10));
4454
4455 pi1->Connect(pi3->node());
4456 pi3->Connect(pi1->node());
4457
4458 event_loop_factory.RunFor(chrono::seconds(10));
4459
4460 pi1_logger.AppendAllFilenames(&filenames);
4461 }
4462
4463 // Make sure we can read this.
4464 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4465 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
4466 auto result = ConfirmReadable(filenames);
4467}
4468
Naman Guptaa63aa132023-03-22 20:06:34 -07004469} // namespace testing
4470} // namespace logger
4471} // namespace aos