blob: 290b3125fec470b7277d9eaf000c7d802ea54d04 [file] [log] [blame]
Brian Smartte67d7112023-03-20 12:06:30 -07001#include <algorithm>
2
Philipp Schrader790cb542023-07-05 21:06:52 -07003#include "gmock/gmock.h"
4#include "gtest/gtest.h"
5
Naman Guptaa63aa132023-03-22 20:06:34 -07006#include "aos/events/logging/log_reader.h"
7#include "aos/events/logging/multinode_logger_test_lib.h"
8#include "aos/events/message_counter.h"
9#include "aos/events/ping_lib.h"
10#include "aos/events/pong_lib.h"
11#include "aos/network/remote_message_generated.h"
12#include "aos/network/timestamp_generated.h"
13#include "aos/testing/tmpdir.h"
Naman Guptaa63aa132023-03-22 20:06:34 -070014
15namespace aos {
16namespace logger {
17namespace testing {
18
19namespace chrono = std::chrono;
20using aos::message_bridge::RemoteMessage;
21using aos::testing::ArtifactPath;
22using aos::testing::MessageCounter;
23
Naman Guptaa63aa132023-03-22 20:06:34 -070024INSTANTIATE_TEST_SUITE_P(
25 All, MultinodeLoggerTest,
26 ::testing::Combine(
27 ::testing::Values(
28 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080029 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070030 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080031 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070032 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
33
34INSTANTIATE_TEST_SUITE_P(
35 All, MultinodeLoggerDeathTest,
36 ::testing::Combine(
37 ::testing::Values(
38 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080039 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070040 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080041 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070042 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
43
44// Tests that we can write and read simple multi-node log files.
45TEST_P(MultinodeLoggerTest, SimpleMultiNode) {
46 std::vector<std::string> actual_filenames;
47 time_converter_.StartEqual();
48
49 {
50 LoggerState pi1_logger = MakeLogger(pi1_);
51 LoggerState pi2_logger = MakeLogger(pi2_);
52
53 event_loop_factory_.RunFor(chrono::milliseconds(95));
54
55 StartLogger(&pi1_logger);
56 StartLogger(&pi2_logger);
57
58 event_loop_factory_.RunFor(chrono::milliseconds(20000));
59 pi1_logger.AppendAllFilenames(&actual_filenames);
60 pi2_logger.AppendAllFilenames(&actual_filenames);
61 }
62
63 ASSERT_THAT(actual_filenames,
64 ::testing::UnorderedElementsAreArray(logfiles_));
65
66 {
67 std::set<std::string> logfile_uuids;
68 std::set<std::string> parts_uuids;
69 // Confirm that we have the expected number of UUIDs for both the logfile
70 // UUIDs and parts UUIDs.
71 std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
72 for (std::string_view f : logfiles_) {
73 log_header.emplace_back(ReadHeader(f).value());
74 if (!log_header.back().message().has_configuration()) {
75 logfile_uuids.insert(
76 log_header.back().message().log_event_uuid()->str());
77 parts_uuids.insert(log_header.back().message().parts_uuid()->str());
78 }
79 }
80
81 EXPECT_EQ(logfile_uuids.size(), 2u);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -070082 EXPECT_EQ(parts_uuids.size(), 8u);
Naman Guptaa63aa132023-03-22 20:06:34 -070083
84 // And confirm everything is on the correct node.
85 EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
86 EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi1");
87 EXPECT_EQ(log_header[4].message().node()->name()->string_view(), "pi1");
88
89 EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
90 EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -070091 EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -070092
Mithun Bharadwaj0c629932023-08-02 16:10:40 -070093 EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi1");
94 EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi1");
Naman Guptaa63aa132023-03-22 20:06:34 -070095
Mithun Bharadwaj0c629932023-08-02 16:10:40 -070096 EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi2");
97 EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -070098
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -070099 EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
100 EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
101 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700102
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700103 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi1");
104 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi1");
Naman Guptaa63aa132023-03-22 20:06:34 -0700105
106 // And the parts index matches.
107 EXPECT_EQ(log_header[2].message().parts_index(), 0);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700108
109 EXPECT_EQ(log_header[3].message().parts_index(), 0);
110 EXPECT_EQ(log_header[4].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700111
112 EXPECT_EQ(log_header[5].message().parts_index(), 0);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700113
114 EXPECT_EQ(log_header[6].message().parts_index(), 0);
115 EXPECT_EQ(log_header[7].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700116
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700117 EXPECT_EQ(log_header[8].message().parts_index(), 0);
118 EXPECT_EQ(log_header[9].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700119
120 EXPECT_EQ(log_header[10].message().parts_index(), 0);
121 EXPECT_EQ(log_header[11].message().parts_index(), 1);
122
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700123 EXPECT_EQ(log_header[12].message().parts_index(), 0);
124 EXPECT_EQ(log_header[13].message().parts_index(), 1);
125 EXPECT_EQ(log_header[14].message().parts_index(), 2);
Naman Guptaa63aa132023-03-22 20:06:34 -0700126
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700127 EXPECT_EQ(log_header[15].message().parts_index(), 0);
128 EXPECT_EQ(log_header[16].message().parts_index(), 1);
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700129
130 // And that the data_stored field is right.
131 EXPECT_THAT(*log_header[2].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700132 ::testing::ElementsAre(StoredDataType::DATA));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700133 EXPECT_THAT(*log_header[3].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700134 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700135 EXPECT_THAT(*log_header[4].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700136 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700137
138 EXPECT_THAT(*log_header[5].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700139 ::testing::ElementsAre(StoredDataType::DATA));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700140 EXPECT_THAT(*log_header[6].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700141 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700142 EXPECT_THAT(*log_header[7].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700143 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700144
145 EXPECT_THAT(*log_header[8].message().data_stored(),
146 ::testing::ElementsAre(StoredDataType::DATA));
147 EXPECT_THAT(*log_header[9].message().data_stored(),
148 ::testing::ElementsAre(StoredDataType::DATA));
149
150 EXPECT_THAT(*log_header[10].message().data_stored(),
151 ::testing::ElementsAre(StoredDataType::DATA));
152 EXPECT_THAT(*log_header[11].message().data_stored(),
153 ::testing::ElementsAre(StoredDataType::DATA));
154
155 EXPECT_THAT(*log_header[12].message().data_stored(),
156 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
157 EXPECT_THAT(*log_header[13].message().data_stored(),
158 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
159 EXPECT_THAT(*log_header[14].message().data_stored(),
160 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
161
162 EXPECT_THAT(*log_header[15].message().data_stored(),
163 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
164 EXPECT_THAT(*log_header[16].message().data_stored(),
165 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
Naman Guptaa63aa132023-03-22 20:06:34 -0700166 }
167
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700168 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
169 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -0700170 {
171 using ::testing::UnorderedElementsAre;
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700172 std::shared_ptr<const aos::Configuration> config = sorted_parts[0].config;
Naman Guptaa63aa132023-03-22 20:06:34 -0700173
174 // Timing reports, pings
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700175 if (shared()) {
176 EXPECT_THAT(
177 CountChannelsData(config, logfiles_[2]),
178 UnorderedElementsAre(
179 std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
180 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
181 200),
182 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
183 21),
184 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
185 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
186 std::make_tuple("/test", "aos.examples.Ping", 2001)))
187 << " : " << logfiles_[2];
188 } else {
189 EXPECT_THAT(
190 CountChannelsData(config, logfiles_[2]),
191 UnorderedElementsAre(
192 std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
193 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
194 200),
195 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
196 21),
197 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
198 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
199 std::make_tuple("/test", "aos.examples.Ping", 2001),
200 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
201 "aos-message_bridge-Timestamp",
202 "aos.message_bridge.RemoteMessage", 200)))
203 << " : " << logfiles_[2];
Naman Guptaa63aa132023-03-22 20:06:34 -0700204 }
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700205
206 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
207 ::testing::UnorderedElementsAre())
208 << " : " << logfiles_[3];
209 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
210 ::testing::UnorderedElementsAre())
211 << " : " << logfiles_[4];
212
Naman Guptaa63aa132023-03-22 20:06:34 -0700213 // Timestamps for pong
214 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
215 UnorderedElementsAre())
216 << " : " << logfiles_[2];
217 EXPECT_THAT(
218 CountChannelsTimestamp(config, logfiles_[3]),
219 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
220 << " : " << logfiles_[3];
221 EXPECT_THAT(
222 CountChannelsTimestamp(config, logfiles_[4]),
223 UnorderedElementsAre(
224 std::make_tuple("/test", "aos.examples.Pong", 2000),
225 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)))
226 << " : " << logfiles_[4];
227
Naman Guptaa63aa132023-03-22 20:06:34 -0700228 // Timing reports and pongs.
Naman Guptaa63aa132023-03-22 20:06:34 -0700229 EXPECT_THAT(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700230 CountChannelsData(config, logfiles_[5]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700231 UnorderedElementsAre(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700232 std::make_tuple("/pi2/aos", "aos.examples.Ping", 2001),
Naman Guptaa63aa132023-03-22 20:06:34 -0700233 std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
234 200),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700235 std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
236 21),
237 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700238 std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700239 std::make_tuple("/test", "aos.examples.Pong", 2001)))
240 << " : " << logfiles_[5];
241 EXPECT_THAT(CountChannelsData(config, logfiles_[6]), UnorderedElementsAre())
242 << " : " << logfiles_[6];
243 EXPECT_THAT(CountChannelsData(config, logfiles_[7]), UnorderedElementsAre())
Naman Guptaa63aa132023-03-22 20:06:34 -0700244 << " : " << logfiles_[7];
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700245 // And ping timestamps.
246 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
247 UnorderedElementsAre())
248 << " : " << logfiles_[5];
Naman Guptaa63aa132023-03-22 20:06:34 -0700249 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700250 CountChannelsTimestamp(config, logfiles_[6]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700251 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700252 << " : " << logfiles_[6];
Naman Guptaa63aa132023-03-22 20:06:34 -0700253 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700254 CountChannelsTimestamp(config, logfiles_[7]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700255 UnorderedElementsAre(
256 std::make_tuple("/test", "aos.examples.Ping", 2000),
257 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700258 << " : " << logfiles_[7];
Naman Guptaa63aa132023-03-22 20:06:34 -0700259
260 // And then test that the remotely logged timestamp data files only have
261 // timestamps in them.
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700262 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[8]),
263 UnorderedElementsAre())
264 << " : " << logfiles_[8];
265 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[9]),
266 UnorderedElementsAre())
267 << " : " << logfiles_[9];
Naman Guptaa63aa132023-03-22 20:06:34 -0700268 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
269 UnorderedElementsAre())
270 << " : " << logfiles_[10];
271 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
272 UnorderedElementsAre())
273 << " : " << logfiles_[11];
Naman Guptaa63aa132023-03-22 20:06:34 -0700274
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700275 EXPECT_THAT(CountChannelsData(config, logfiles_[8]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700276 UnorderedElementsAre(std::make_tuple(
277 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700278 << " : " << logfiles_[8];
279 EXPECT_THAT(CountChannelsData(config, logfiles_[9]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700280 UnorderedElementsAre(std::make_tuple(
281 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700282 << " : " << logfiles_[9];
Naman Guptaa63aa132023-03-22 20:06:34 -0700283
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700284 // Pong snd timestamp data.
285 EXPECT_THAT(
286 CountChannelsData(config, logfiles_[10]),
287 UnorderedElementsAre(
288 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 9),
289 std::make_tuple("/test", "aos.examples.Pong", 91)))
290 << " : " << logfiles_[10];
291 EXPECT_THAT(
292 CountChannelsData(config, logfiles_[11]),
293 UnorderedElementsAre(
294 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 191),
295 std::make_tuple("/test", "aos.examples.Pong", 1910)))
296 << " : " << logfiles_[11];
Naman Guptaa63aa132023-03-22 20:06:34 -0700297
298 // Timestamps from pi2 on pi1, and the other way.
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700299 // if (shared()) {
300 EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
301 UnorderedElementsAre())
302 << " : " << logfiles_[12];
303 EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
304 UnorderedElementsAre())
305 << " : " << logfiles_[13];
306 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
307 UnorderedElementsAre())
308 << " : " << logfiles_[14];
309 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
310 UnorderedElementsAre())
311 << " : " << logfiles_[15];
312 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
313 UnorderedElementsAre())
314 << " : " << logfiles_[16];
Naman Guptaa63aa132023-03-22 20:06:34 -0700315
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700316 EXPECT_THAT(
317 CountChannelsTimestamp(config, logfiles_[12]),
318 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
319 << " : " << logfiles_[12];
320 EXPECT_THAT(
321 CountChannelsTimestamp(config, logfiles_[13]),
322 UnorderedElementsAre(
323 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 9),
324 std::make_tuple("/test", "aos.examples.Ping", 90)))
325 << " : " << logfiles_[13];
326 EXPECT_THAT(
327 CountChannelsTimestamp(config, logfiles_[14]),
328 UnorderedElementsAre(
329 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 191),
330 std::make_tuple("/test", "aos.examples.Ping", 1910)))
331 << " : " << logfiles_[14];
332 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
333 UnorderedElementsAre(std::make_tuple(
334 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
335 << " : " << logfiles_[15];
336 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[16]),
337 UnorderedElementsAre(std::make_tuple(
338 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
339 << " : " << logfiles_[16];
Naman Guptaa63aa132023-03-22 20:06:34 -0700340 }
341
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700342 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700343
344 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
345 log_reader_factory.set_send_delay(chrono::microseconds(0));
346
347 // This sends out the fetched messages and advances time to the start of the
348 // log file.
349 reader.Register(&log_reader_factory);
350
351 const Node *pi1 =
352 configuration::GetNode(log_reader_factory.configuration(), "pi1");
353 const Node *pi2 =
354 configuration::GetNode(log_reader_factory.configuration(), "pi2");
355
356 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
357 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
358 LOG(INFO) << "now pi1 "
359 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
360 LOG(INFO) << "now pi2 "
361 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
362
363 EXPECT_THAT(reader.LoggedNodes(),
364 ::testing::ElementsAre(
365 configuration::GetNode(reader.logged_configuration(), pi1),
366 configuration::GetNode(reader.logged_configuration(), pi2)));
367
368 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
369
370 std::unique_ptr<EventLoop> pi1_event_loop =
371 log_reader_factory.MakeEventLoop("test", pi1);
372 std::unique_ptr<EventLoop> pi2_event_loop =
373 log_reader_factory.MakeEventLoop("test", pi2);
374
375 int pi1_ping_count = 10;
376 int pi2_ping_count = 10;
377 int pi1_pong_count = 10;
378 int pi2_pong_count = 10;
379
380 // Confirm that the ping value matches.
381 pi1_event_loop->MakeWatcher(
382 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
383 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
384 << pi1_event_loop->context().monotonic_remote_time << " -> "
385 << pi1_event_loop->context().monotonic_event_time;
386 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
387 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
388 pi1_ping_count * chrono::milliseconds(10) +
389 monotonic_clock::epoch());
390 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
391 pi1_ping_count * chrono::milliseconds(10) +
392 realtime_clock::epoch());
393 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
394 pi1_event_loop->context().monotonic_event_time);
395 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
396 pi1_event_loop->context().realtime_event_time);
397
398 ++pi1_ping_count;
399 });
400 pi2_event_loop->MakeWatcher(
401 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
402 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
403 << pi2_event_loop->context().monotonic_remote_time << " -> "
404 << pi2_event_loop->context().monotonic_event_time;
405 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
406
407 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
408 pi2_ping_count * chrono::milliseconds(10) +
409 monotonic_clock::epoch());
410 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
411 pi2_ping_count * chrono::milliseconds(10) +
412 realtime_clock::epoch());
413 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time +
414 chrono::microseconds(150),
415 pi2_event_loop->context().monotonic_event_time);
416 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time +
417 chrono::microseconds(150),
418 pi2_event_loop->context().realtime_event_time);
419 ++pi2_ping_count;
420 });
421
422 constexpr ssize_t kQueueIndexOffset = -9;
423 // Confirm that the ping and pong counts both match, and the value also
424 // matches.
425 pi1_event_loop->MakeWatcher(
426 "/test", [&pi1_event_loop, &pi1_ping_count,
427 &pi1_pong_count](const examples::Pong &pong) {
428 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
429 << pi1_event_loop->context().monotonic_remote_time << " -> "
430 << pi1_event_loop->context().monotonic_event_time;
431
432 EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
433 pi1_pong_count + kQueueIndexOffset);
434 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
435 chrono::microseconds(200) +
436 pi1_pong_count * chrono::milliseconds(10) +
437 monotonic_clock::epoch());
438 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
439 chrono::microseconds(200) +
440 pi1_pong_count * chrono::milliseconds(10) +
441 realtime_clock::epoch());
442
443 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
444 chrono::microseconds(150),
445 pi1_event_loop->context().monotonic_event_time);
446 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
447 chrono::microseconds(150),
448 pi1_event_loop->context().realtime_event_time);
449
450 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
451 ++pi1_pong_count;
452 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
453 });
454 pi2_event_loop->MakeWatcher(
455 "/test", [&pi2_event_loop, &pi2_ping_count,
456 &pi2_pong_count](const examples::Pong &pong) {
457 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
458 << pi2_event_loop->context().monotonic_remote_time << " -> "
459 << pi2_event_loop->context().monotonic_event_time;
460
461 EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
462 pi2_pong_count + kQueueIndexOffset);
463
464 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
465 chrono::microseconds(200) +
466 pi2_pong_count * chrono::milliseconds(10) +
467 monotonic_clock::epoch());
468 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
469 chrono::microseconds(200) +
470 pi2_pong_count * chrono::milliseconds(10) +
471 realtime_clock::epoch());
472
473 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
474 pi2_event_loop->context().monotonic_event_time);
475 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
476 pi2_event_loop->context().realtime_event_time);
477
478 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
479 ++pi2_pong_count;
480 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
481 });
482
483 log_reader_factory.Run();
484 EXPECT_EQ(pi1_ping_count, 2010);
485 EXPECT_EQ(pi2_ping_count, 2010);
486 EXPECT_EQ(pi1_pong_count, 2010);
487 EXPECT_EQ(pi2_pong_count, 2010);
488
489 reader.Deregister();
490}
491
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600492// MultinodeLoggerTest that tests the mutate callback works across multiple
493// nodes with remapping
494TEST_P(MultinodeLoggerTest, MultiNodeRemapMutateCallback) {
495 time_converter_.StartEqual();
496 std::vector<std::string> actual_filenames;
497
498 {
499 LoggerState pi1_logger = MakeLogger(pi1_);
500 LoggerState pi2_logger = MakeLogger(pi2_);
501
502 event_loop_factory_.RunFor(chrono::milliseconds(95));
503
504 StartLogger(&pi1_logger);
505 StartLogger(&pi2_logger);
506
507 event_loop_factory_.RunFor(chrono::milliseconds(20000));
508 pi1_logger.AppendAllFilenames(&actual_filenames);
509 pi2_logger.AppendAllFilenames(&actual_filenames);
510 }
511
Austin Schuh8fb4b452023-08-04 17:02:27 -0700512 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700513 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600514
515 LogReader reader(sorted_parts, &config_.message());
516 // Remap just on pi1.
517 reader.RemapLoggedChannel<examples::Pong>(
518 "/test", configuration::GetNode(reader.configuration(), "pi1"));
519
520 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
521
522 int pong_count = 0;
523 // Adds a callback which mutates the value of the pong message before the
524 // message is sent which is the feature we are testing here
525 reader.AddBeforeSendCallback("/test",
526 [&pong_count](aos::examples::Pong *pong) {
527 pong->mutate_value(pong->value() + 1);
528 pong_count = pong->value();
529 });
530
531 // This sends out the fetched messages and advances time to the start of the
532 // log file.
533 reader.Register(&log_reader_factory);
534
535 const Node *pi1 =
536 configuration::GetNode(log_reader_factory.configuration(), "pi1");
537 const Node *pi2 =
538 configuration::GetNode(log_reader_factory.configuration(), "pi2");
539
540 EXPECT_THAT(reader.LoggedNodes(),
541 ::testing::ElementsAre(
542 configuration::GetNode(reader.logged_configuration(), pi1),
543 configuration::GetNode(reader.logged_configuration(), pi2)));
544
545 std::unique_ptr<EventLoop> pi1_event_loop =
546 log_reader_factory.MakeEventLoop("test", pi1);
547 std::unique_ptr<EventLoop> pi2_event_loop =
548 log_reader_factory.MakeEventLoop("test", pi2);
549
550 pi1_event_loop->MakeWatcher("/original/test",
551 [&pong_count](const examples::Pong &pong) {
552 EXPECT_EQ(pong_count, pong.value());
553 });
554
555 pi2_event_loop->MakeWatcher("/test",
556 [&pong_count](const examples::Pong &pong) {
557 EXPECT_EQ(pong_count, pong.value());
558 });
559
560 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
561 reader.Deregister();
562
563 EXPECT_EQ(pong_count, 2011);
564}
565
566// MultinodeLoggerTest that tests the mutate callback works across multiple
567// nodes
568TEST_P(MultinodeLoggerTest, MultiNodeMutateCallback) {
569 time_converter_.StartEqual();
570 std::vector<std::string> actual_filenames;
571
572 {
573 LoggerState pi1_logger = MakeLogger(pi1_);
574 LoggerState pi2_logger = MakeLogger(pi2_);
575
576 event_loop_factory_.RunFor(chrono::milliseconds(95));
577
578 StartLogger(&pi1_logger);
579 StartLogger(&pi2_logger);
580
581 event_loop_factory_.RunFor(chrono::milliseconds(20000));
582 pi1_logger.AppendAllFilenames(&actual_filenames);
583 pi2_logger.AppendAllFilenames(&actual_filenames);
584 }
585
Austin Schuh8fb4b452023-08-04 17:02:27 -0700586 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700587 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600588
589 LogReader reader(sorted_parts, &config_.message());
590
591 int pong_count = 0;
592 // Adds a callback which mutates the value of the pong message before the
593 // message is sent which is the feature we are testing here
594 reader.AddBeforeSendCallback("/test",
595 [&pong_count](aos::examples::Pong *pong) {
596 pong->mutate_value(pong->value() + 1);
597 pong_count = pong->value();
598 });
599
600 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
601
602 // This sends out the fetched messages and advances time to the start of the
603 // log file.
604 reader.Register(&log_reader_factory);
605
606 const Node *pi1 =
607 configuration::GetNode(log_reader_factory.configuration(), "pi1");
608 const Node *pi2 =
609 configuration::GetNode(log_reader_factory.configuration(), "pi2");
610
611 EXPECT_THAT(reader.LoggedNodes(),
612 ::testing::ElementsAre(
613 configuration::GetNode(reader.logged_configuration(), pi1),
614 configuration::GetNode(reader.logged_configuration(), pi2)));
615
616 std::unique_ptr<EventLoop> pi1_event_loop =
617 log_reader_factory.MakeEventLoop("test", pi1);
618 std::unique_ptr<EventLoop> pi2_event_loop =
619 log_reader_factory.MakeEventLoop("test", pi2);
620
621 pi1_event_loop->MakeWatcher("/test",
622 [&pong_count](const examples::Pong &pong) {
623 EXPECT_EQ(pong_count, pong.value());
624 });
625
626 pi2_event_loop->MakeWatcher("/test",
627 [&pong_count](const examples::Pong &pong) {
628 EXPECT_EQ(pong_count, pong.value());
629 });
630
631 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
632 reader.Deregister();
633
634 EXPECT_EQ(pong_count, 2011);
635}
636
637// Tests that the before send callback is only called from the sender node if it
638// is forwarded
639TEST_P(MultinodeLoggerTest, OnlyDoBeforeSendCallbackOnSenderNode) {
640 time_converter_.StartEqual();
Austin Schuh8fb4b452023-08-04 17:02:27 -0700641
642 std::vector<std::string> filenames;
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600643 {
644 LoggerState pi1_logger = MakeLogger(pi1_);
645 LoggerState pi2_logger = MakeLogger(pi2_);
646
647 event_loop_factory_.RunFor(chrono::milliseconds(95));
648
649 StartLogger(&pi1_logger);
650 StartLogger(&pi2_logger);
651
652 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -0700653
654 pi1_logger.AppendAllFilenames(&filenames);
655 pi2_logger.AppendAllFilenames(&filenames);
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600656 }
657
Austin Schuh8fb4b452023-08-04 17:02:27 -0700658 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700659 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
660 LogReader reader(sorted_parts);
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600661
662 int ping_count = 0;
663 // Adds a callback which mutates the value of the pong message before the
664 // message is sent which is the feature we are testing here
665 reader.AddBeforeSendCallback("/test",
666 [&ping_count](aos::examples::Ping *ping) {
667 ++ping_count;
668 ping->mutate_value(ping_count);
669 });
670
671 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
672 log_reader_factory.set_send_delay(chrono::microseconds(0));
673
674 reader.Register(&log_reader_factory);
675
676 const Node *pi1 =
677 configuration::GetNode(log_reader_factory.configuration(), "pi1");
678 const Node *pi2 =
679 configuration::GetNode(log_reader_factory.configuration(), "pi2");
680
681 std::unique_ptr<EventLoop> pi1_event_loop =
682 log_reader_factory.MakeEventLoop("test", pi1);
683 pi1_event_loop->SkipTimingReport();
684 std::unique_ptr<EventLoop> pi2_event_loop =
685 log_reader_factory.MakeEventLoop("test", pi2);
686 pi2_event_loop->SkipTimingReport();
687
688 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
689 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
690
691 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
692 pi1_ping_timestamp;
693 if (!shared()) {
694 pi1_ping_timestamp =
695 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
696 pi1_event_loop.get(),
697 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
698 }
699
700 log_reader_factory.Run();
701
702 EXPECT_EQ(pi1_ping.count(), 2000u);
703 EXPECT_EQ(pi2_ping.count(), 2000u);
704 // If the BeforeSendCallback is called on both nodes, then the ping count
705 // would be 4002 instead of 2001
706 EXPECT_EQ(ping_count, 2001u);
707 if (!shared()) {
708 EXPECT_EQ(pi1_ping_timestamp->count(), 2000u);
709 }
710
711 reader.Deregister();
712}
713
714// Tests that we do not allow adding callbacks after Register is called
715TEST_P(MultinodeLoggerDeathTest, AddCallbackAfterRegister) {
716 time_converter_.StartEqual();
717 std::vector<std::string> actual_filenames;
718
719 {
720 LoggerState pi1_logger = MakeLogger(pi1_);
721 LoggerState pi2_logger = MakeLogger(pi2_);
722
723 event_loop_factory_.RunFor(chrono::milliseconds(95));
724
725 StartLogger(&pi1_logger);
726 StartLogger(&pi2_logger);
727
728 event_loop_factory_.RunFor(chrono::milliseconds(20000));
729 pi1_logger.AppendAllFilenames(&actual_filenames);
730 pi2_logger.AppendAllFilenames(&actual_filenames);
731 }
732
Austin Schuh8fb4b452023-08-04 17:02:27 -0700733 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700734 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600735
736 LogReader reader(sorted_parts, &config_.message());
737 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
738 reader.Register(&log_reader_factory);
739 EXPECT_DEATH(
740 {
741 reader.AddBeforeSendCallback("/test", [](aos::examples::Pong *) {
742 LOG(FATAL) << "This should not be called";
743 });
744 },
745 "Cannot add callbacks after calling Register");
746 reader.Deregister();
747}
748
Naman Guptaa63aa132023-03-22 20:06:34 -0700749// Test that if we feed the replay with a mismatched node list that we die on
750// the LogReader constructor.
751TEST_P(MultinodeLoggerDeathTest, MultiNodeBadReplayConfig) {
752 time_converter_.StartEqual();
Austin Schuh8fb4b452023-08-04 17:02:27 -0700753
754 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -0700755 {
756 LoggerState pi1_logger = MakeLogger(pi1_);
757 LoggerState pi2_logger = MakeLogger(pi2_);
758
759 event_loop_factory_.RunFor(chrono::milliseconds(95));
760
761 StartLogger(&pi1_logger);
762 StartLogger(&pi2_logger);
763
764 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -0700765
766 pi1_logger.AppendAllFilenames(&filenames);
767 pi2_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -0700768 }
769
770 // Test that, if we add an additional node to the replay config that the
771 // logger complains about the mismatch in number of nodes.
772 FlatbufferDetachedBuffer<Configuration> extra_nodes_config =
773 configuration::MergeWithConfig(&config_.message(), R"({
774 "nodes": [
775 {
776 "name": "extra-node"
777 }
778 ]
779 }
780 )");
781
Austin Schuh8fb4b452023-08-04 17:02:27 -0700782 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700783 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -0700784 EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
785 "Log file and replay config need to have matching nodes lists.");
786}
787
788// Tests that we can read log files where they don't start at the same monotonic
789// time.
790TEST_P(MultinodeLoggerTest, StaggeredStart) {
791 time_converter_.StartEqual();
792 std::vector<std::string> actual_filenames;
793
794 {
795 LoggerState pi1_logger = MakeLogger(pi1_);
796 LoggerState pi2_logger = MakeLogger(pi2_);
797
798 event_loop_factory_.RunFor(chrono::milliseconds(95));
799
800 StartLogger(&pi1_logger);
801
802 event_loop_factory_.RunFor(chrono::milliseconds(200));
803
804 StartLogger(&pi2_logger);
805
806 event_loop_factory_.RunFor(chrono::milliseconds(20000));
807 pi1_logger.AppendAllFilenames(&actual_filenames);
808 pi2_logger.AppendAllFilenames(&actual_filenames);
809 }
810
811 // Since we delay starting pi2, it already knows about all the timestamps so
812 // we don't end up with extra parts.
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700813 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
814 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
815 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700816
817 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
818 log_reader_factory.set_send_delay(chrono::microseconds(0));
819
820 // This sends out the fetched messages and advances time to the start of the
821 // log file.
822 reader.Register(&log_reader_factory);
823
824 const Node *pi1 =
825 configuration::GetNode(log_reader_factory.configuration(), "pi1");
826 const Node *pi2 =
827 configuration::GetNode(log_reader_factory.configuration(), "pi2");
828
829 EXPECT_THAT(reader.LoggedNodes(),
830 ::testing::ElementsAre(
831 configuration::GetNode(reader.logged_configuration(), pi1),
832 configuration::GetNode(reader.logged_configuration(), pi2)));
833
834 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
835
836 std::unique_ptr<EventLoop> pi1_event_loop =
837 log_reader_factory.MakeEventLoop("test", pi1);
838 std::unique_ptr<EventLoop> pi2_event_loop =
839 log_reader_factory.MakeEventLoop("test", pi2);
840
841 int pi1_ping_count = 30;
842 int pi2_ping_count = 30;
843 int pi1_pong_count = 30;
844 int pi2_pong_count = 30;
845
846 // Confirm that the ping value matches.
847 pi1_event_loop->MakeWatcher(
848 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
849 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
850 << pi1_event_loop->context().monotonic_remote_time << " -> "
851 << pi1_event_loop->context().monotonic_event_time;
852 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
853
854 ++pi1_ping_count;
855 });
856 pi2_event_loop->MakeWatcher(
857 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
858 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
859 << pi2_event_loop->context().monotonic_remote_time << " -> "
860 << pi2_event_loop->context().monotonic_event_time;
861 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
862
863 ++pi2_ping_count;
864 });
865
866 // Confirm that the ping and pong counts both match, and the value also
867 // matches.
868 pi1_event_loop->MakeWatcher(
869 "/test", [&pi1_event_loop, &pi1_ping_count,
870 &pi1_pong_count](const examples::Pong &pong) {
871 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
872 << pi1_event_loop->context().monotonic_remote_time << " -> "
873 << pi1_event_loop->context().monotonic_event_time;
874
875 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
876 ++pi1_pong_count;
877 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
878 });
879 pi2_event_loop->MakeWatcher(
880 "/test", [&pi2_event_loop, &pi2_ping_count,
881 &pi2_pong_count](const examples::Pong &pong) {
882 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
883 << pi2_event_loop->context().monotonic_remote_time << " -> "
884 << pi2_event_loop->context().monotonic_event_time;
885
886 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
887 ++pi2_pong_count;
888 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
889 });
890
891 log_reader_factory.Run();
892 EXPECT_EQ(pi1_ping_count, 2030);
893 EXPECT_EQ(pi2_ping_count, 2030);
894 EXPECT_EQ(pi1_pong_count, 2030);
895 EXPECT_EQ(pi2_pong_count, 2030);
896
897 reader.Deregister();
898}
899
900// Tests that we can read log files where the monotonic clocks drift and don't
901// match correctly. While we are here, also test that different ending times
902// also is readable.
903TEST_P(MultinodeLoggerTest, MismatchedClocks) {
904 // TODO(austin): Negate...
905 const chrono::nanoseconds initial_pi2_offset = chrono::seconds(1000);
906
907 time_converter_.AddMonotonic(
908 {BootTimestamp::epoch(), BootTimestamp::epoch() + initial_pi2_offset});
909 // Wait for 95 ms, (~0.1 seconds - 1/2 of the ping/pong period), and set the
910 // skew to be 200 uS/s
911 const chrono::nanoseconds startup_sleep1 = time_converter_.AddMonotonic(
912 {chrono::milliseconds(95),
913 chrono::milliseconds(95) - chrono::nanoseconds(200) * 95});
914 // Run another 200 ms to have one logger start first.
915 const chrono::nanoseconds startup_sleep2 = time_converter_.AddMonotonic(
916 {chrono::milliseconds(200), chrono::milliseconds(200)});
917 // Slew one way then the other at the same 200 uS/S slew rate. Make sure we
918 // go far enough to cause problems if this isn't accounted for.
919 const chrono::nanoseconds logger_run1 = time_converter_.AddMonotonic(
920 {chrono::milliseconds(20000),
921 chrono::milliseconds(20000) - chrono::nanoseconds(200) * 20000});
922 const chrono::nanoseconds logger_run2 = time_converter_.AddMonotonic(
923 {chrono::milliseconds(40000),
924 chrono::milliseconds(40000) + chrono::nanoseconds(200) * 40000});
925 const chrono::nanoseconds logger_run3 = time_converter_.AddMonotonic(
926 {chrono::milliseconds(400), chrono::milliseconds(400)});
927
Austin Schuh8fb4b452023-08-04 17:02:27 -0700928 std::vector<std::string> actual_filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -0700929 {
930 LoggerState pi2_logger = MakeLogger(pi2_);
931
932 LOG(INFO) << "pi2 times: " << pi2_->monotonic_now() << " "
933 << pi2_->realtime_now() << " distributed "
934 << pi2_->ToDistributedClock(pi2_->monotonic_now());
935
936 LOG(INFO) << "pi2_ times: " << pi2_->monotonic_now() << " "
937 << pi2_->realtime_now() << " distributed "
938 << pi2_->ToDistributedClock(pi2_->monotonic_now());
939
940 event_loop_factory_.RunFor(startup_sleep1);
941
942 StartLogger(&pi2_logger);
943
944 event_loop_factory_.RunFor(startup_sleep2);
945
946 {
947 // Run pi1's logger for only part of the time.
948 LoggerState pi1_logger = MakeLogger(pi1_);
949
950 StartLogger(&pi1_logger);
951 event_loop_factory_.RunFor(logger_run1);
952
953 // Make sure we slewed time far enough so that the difference is greater
954 // than the network delay. This confirms that if we sort incorrectly, it
955 // would show in the results.
956 EXPECT_LT(
957 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
958 -event_loop_factory_.send_delay() -
959 event_loop_factory_.network_delay());
960
961 event_loop_factory_.RunFor(logger_run2);
962
963 // And now check that we went far enough the other way to make sure we
964 // cover both problems.
965 EXPECT_GT(
966 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
967 event_loop_factory_.send_delay() +
968 event_loop_factory_.network_delay());
Austin Schuh8fb4b452023-08-04 17:02:27 -0700969
970 pi1_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -0700971 }
972
973 // And log a bit more on pi2.
974 event_loop_factory_.RunFor(logger_run3);
Austin Schuh8fb4b452023-08-04 17:02:27 -0700975
976 pi2_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -0700977 }
978
Austin Schuh8fb4b452023-08-04 17:02:27 -0700979 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700980 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
981 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700982
983 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
984 log_reader_factory.set_send_delay(chrono::microseconds(0));
985
986 const Node *pi1 =
987 configuration::GetNode(log_reader_factory.configuration(), "pi1");
988 const Node *pi2 =
989 configuration::GetNode(log_reader_factory.configuration(), "pi2");
990
991 // This sends out the fetched messages and advances time to the start of the
992 // log file.
993 reader.Register(&log_reader_factory);
994
995 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
996 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
997 LOG(INFO) << "now pi1 "
998 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
999 LOG(INFO) << "now pi2 "
1000 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1001
1002 LOG(INFO) << "Done registering (pi1) "
1003 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
1004 << " "
1005 << log_reader_factory.GetNodeEventLoopFactory(pi1)->realtime_now();
1006 LOG(INFO) << "Done registering (pi2) "
1007 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now()
1008 << " "
1009 << log_reader_factory.GetNodeEventLoopFactory(pi2)->realtime_now();
1010
1011 EXPECT_THAT(reader.LoggedNodes(),
1012 ::testing::ElementsAre(
1013 configuration::GetNode(reader.logged_configuration(), pi1),
1014 configuration::GetNode(reader.logged_configuration(), pi2)));
1015
1016 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1017
1018 std::unique_ptr<EventLoop> pi1_event_loop =
1019 log_reader_factory.MakeEventLoop("test", pi1);
1020 std::unique_ptr<EventLoop> pi2_event_loop =
1021 log_reader_factory.MakeEventLoop("test", pi2);
1022
1023 int pi1_ping_count = 30;
1024 int pi2_ping_count = 30;
1025 int pi1_pong_count = 30;
1026 int pi2_pong_count = 30;
1027
1028 // Confirm that the ping value matches.
1029 pi1_event_loop->MakeWatcher(
1030 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
1031 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
1032 << pi1_event_loop->context().monotonic_remote_time << " -> "
1033 << pi1_event_loop->context().monotonic_event_time;
1034 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
1035
1036 ++pi1_ping_count;
1037 });
1038 pi2_event_loop->MakeWatcher(
1039 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
1040 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
1041 << pi2_event_loop->context().monotonic_remote_time << " -> "
1042 << pi2_event_loop->context().monotonic_event_time;
1043 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
1044
1045 ++pi2_ping_count;
1046 });
1047
1048 // Confirm that the ping and pong counts both match, and the value also
1049 // matches.
1050 pi1_event_loop->MakeWatcher(
1051 "/test", [&pi1_event_loop, &pi1_ping_count,
1052 &pi1_pong_count](const examples::Pong &pong) {
1053 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
1054 << pi1_event_loop->context().monotonic_remote_time << " -> "
1055 << pi1_event_loop->context().monotonic_event_time;
1056
1057 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
1058 ++pi1_pong_count;
1059 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
1060 });
1061 pi2_event_loop->MakeWatcher(
1062 "/test", [&pi2_event_loop, &pi2_ping_count,
1063 &pi2_pong_count](const examples::Pong &pong) {
1064 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
1065 << pi2_event_loop->context().monotonic_remote_time << " -> "
1066 << pi2_event_loop->context().monotonic_event_time;
1067
1068 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
1069 ++pi2_pong_count;
1070 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
1071 });
1072
1073 log_reader_factory.Run();
1074 EXPECT_EQ(pi1_ping_count, 6030);
1075 EXPECT_EQ(pi2_ping_count, 6030);
1076 EXPECT_EQ(pi1_pong_count, 6030);
1077 EXPECT_EQ(pi2_pong_count, 6030);
1078
1079 reader.Deregister();
1080}
1081
1082// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
1083TEST_P(MultinodeLoggerTest, SortParts) {
1084 time_converter_.StartEqual();
1085 // Make a bunch of parts.
1086 {
1087 LoggerState pi1_logger = MakeLogger(pi1_);
1088 LoggerState pi2_logger = MakeLogger(pi2_);
1089
1090 event_loop_factory_.RunFor(chrono::milliseconds(95));
1091
1092 StartLogger(&pi1_logger);
1093 StartLogger(&pi2_logger);
1094
1095 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1096 }
1097
1098 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1099 VerifyParts(sorted_parts);
1100}
1101
1102// Tests that we can sort a bunch of parts with an empty part. We should ignore
1103// it and remove it from the sorted list.
1104TEST_P(MultinodeLoggerTest, SortEmptyParts) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07001105 std::vector<std::string> actual_filenames;
1106
Naman Guptaa63aa132023-03-22 20:06:34 -07001107 time_converter_.StartEqual();
1108 // Make a bunch of parts.
1109 {
1110 LoggerState pi1_logger = MakeLogger(pi1_);
1111 LoggerState pi2_logger = MakeLogger(pi2_);
1112
1113 event_loop_factory_.RunFor(chrono::milliseconds(95));
1114
1115 StartLogger(&pi1_logger);
1116 StartLogger(&pi2_logger);
1117
1118 event_loop_factory_.RunFor(chrono::milliseconds(2000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07001119 pi1_logger.AppendAllFilenames(&actual_filenames);
1120 pi2_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001121 }
1122
1123 // TODO(austin): Should we flip out if the file can't open?
1124 const std::string kEmptyFile("foobarinvalidfiledoesnotexist" + Extension());
1125
1126 aos::util::WriteStringToFileOrDie(kEmptyFile, "");
Austin Schuh8fb4b452023-08-04 17:02:27 -07001127 actual_filenames.emplace_back(kEmptyFile);
Naman Guptaa63aa132023-03-22 20:06:34 -07001128
Austin Schuh8fb4b452023-08-04 17:02:27 -07001129 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001130 VerifyParts(sorted_parts, {kEmptyFile});
1131}
1132
1133// Tests that we can sort a bunch of parts with the end missing off a
1134// file. We should use the part we can read.
1135TEST_P(MultinodeLoggerTest, SortTruncatedParts) {
1136 std::vector<std::string> actual_filenames;
1137 time_converter_.StartEqual();
1138 // Make a bunch of parts.
1139 {
1140 LoggerState pi1_logger = MakeLogger(pi1_);
1141 LoggerState pi2_logger = MakeLogger(pi2_);
1142
1143 event_loop_factory_.RunFor(chrono::milliseconds(95));
1144
1145 StartLogger(&pi1_logger);
1146 StartLogger(&pi2_logger);
1147
1148 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1149
1150 pi1_logger.AppendAllFilenames(&actual_filenames);
1151 pi2_logger.AppendAllFilenames(&actual_filenames);
1152 }
1153
1154 ASSERT_THAT(actual_filenames,
1155 ::testing::UnorderedElementsAreArray(logfiles_));
1156
1157 // Strip off the end of one of the files. Pick one with a lot of data.
1158 // For snappy, needs to have enough data to be >1 chunk of compressed data so
1159 // that we don't corrupt the entire log part.
1160 ::std::string compressed_contents =
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07001161 aos::util::ReadFileToStringOrDie(logfiles_[2]);
Naman Guptaa63aa132023-03-22 20:06:34 -07001162
1163 aos::util::WriteStringToFileOrDie(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07001164 logfiles_[2],
Naman Guptaa63aa132023-03-22 20:06:34 -07001165 compressed_contents.substr(0, compressed_contents.size() - 100));
1166
1167 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1168 VerifyParts(sorted_parts);
1169}
1170
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001171// Tests that if we remap a logged channel, it shows up correctly.
Naman Guptaa63aa132023-03-22 20:06:34 -07001172TEST_P(MultinodeLoggerTest, RemapLoggedChannel) {
1173 time_converter_.StartEqual();
Austin Schuh8fb4b452023-08-04 17:02:27 -07001174
1175 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07001176 {
1177 LoggerState pi1_logger = MakeLogger(pi1_);
1178 LoggerState pi2_logger = MakeLogger(pi2_);
1179
1180 event_loop_factory_.RunFor(chrono::milliseconds(95));
1181
1182 StartLogger(&pi1_logger);
1183 StartLogger(&pi2_logger);
1184
1185 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07001186
1187 pi1_logger.AppendAllFilenames(&filenames);
1188 pi2_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001189 }
1190
Austin Schuh8fb4b452023-08-04 17:02:27 -07001191 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001192 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1193 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001194
1195 // Remap just on pi1.
1196 reader.RemapLoggedChannel<aos::timing::Report>(
1197 "/aos", configuration::GetNode(reader.configuration(), "pi1"));
1198
1199 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1200 log_reader_factory.set_send_delay(chrono::microseconds(0));
1201
1202 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1203 // Note: An extra channel gets remapped automatically due to a timestamp
1204 // channel being LOCAL_LOGGER'd.
1205 ASSERT_EQ(remapped_channels.size(), std::get<0>(GetParam()).shared ? 1u : 2u);
1206 EXPECT_EQ(remapped_channels[0]->name()->string_view(), "/original/pi1/aos");
1207 EXPECT_EQ(remapped_channels[0]->type()->string_view(), "aos.timing.Report");
1208 if (!std::get<0>(GetParam()).shared) {
1209 EXPECT_EQ(remapped_channels[1]->name()->string_view(),
1210 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1211 "aos-message_bridge-Timestamp");
1212 EXPECT_EQ(remapped_channels[1]->type()->string_view(),
1213 "aos.message_bridge.RemoteMessage");
1214 }
1215
1216 reader.Register(&log_reader_factory);
1217
1218 const Node *pi1 =
1219 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1220 const Node *pi2 =
1221 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1222
1223 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1224 // else should have moved.
1225 std::unique_ptr<EventLoop> pi1_event_loop =
1226 log_reader_factory.MakeEventLoop("test", pi1);
1227 pi1_event_loop->SkipTimingReport();
1228 std::unique_ptr<EventLoop> full_pi1_event_loop =
1229 log_reader_factory.MakeEventLoop("test", pi1);
1230 full_pi1_event_loop->SkipTimingReport();
1231 std::unique_ptr<EventLoop> pi2_event_loop =
1232 log_reader_factory.MakeEventLoop("test", pi2);
1233 pi2_event_loop->SkipTimingReport();
1234
1235 MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
1236 "/aos");
1237 MessageCounter<aos::timing::Report> full_pi1_timing_report(
1238 full_pi1_event_loop.get(), "/pi1/aos");
1239 MessageCounter<aos::timing::Report> pi1_original_timing_report(
1240 pi1_event_loop.get(), "/original/aos");
1241 MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
1242 full_pi1_event_loop.get(), "/original/pi1/aos");
1243 MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
1244 "/aos");
1245
1246 log_reader_factory.Run();
1247
1248 EXPECT_EQ(pi1_timing_report.count(), 0u);
1249 EXPECT_EQ(full_pi1_timing_report.count(), 0u);
1250 EXPECT_NE(pi1_original_timing_report.count(), 0u);
1251 EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
1252 EXPECT_NE(pi2_timing_report.count(), 0u);
1253
1254 reader.Deregister();
1255}
1256
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001257// Tests that if we rename a logged channel, it shows up correctly.
1258TEST_P(MultinodeLoggerTest, RenameLoggedChannel) {
1259 std::vector<std::string> actual_filenames;
1260 time_converter_.StartEqual();
1261 {
1262 LoggerState pi1_logger = MakeLogger(pi1_);
1263 LoggerState pi2_logger = MakeLogger(pi2_);
1264
1265 event_loop_factory_.RunFor(chrono::milliseconds(95));
1266
1267 StartLogger(&pi1_logger);
1268 StartLogger(&pi2_logger);
1269
1270 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1271
1272 pi1_logger.AppendAllFilenames(&actual_filenames);
1273 pi2_logger.AppendAllFilenames(&actual_filenames);
1274 }
1275
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001276 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1277 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1278 LogReader reader(sorted_parts);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001279
1280 // Rename just on pi2. Add some global maps just to verify they get added in
1281 // the config and used correctly.
1282 std::vector<MapT> maps;
1283 {
1284 MapT map;
1285 map.match = std::make_unique<ChannelT>();
1286 map.match->name = "/foo*";
1287 map.match->source_node = "pi1";
1288 map.rename = std::make_unique<ChannelT>();
1289 map.rename->name = "/pi1/foo";
1290 maps.emplace_back(std::move(map));
1291 }
1292 {
1293 MapT map;
1294 map.match = std::make_unique<ChannelT>();
1295 map.match->name = "/foo*";
1296 map.match->source_node = "pi2";
1297 map.rename = std::make_unique<ChannelT>();
1298 map.rename->name = "/pi2/foo";
1299 maps.emplace_back(std::move(map));
1300 }
1301 {
1302 MapT map;
1303 map.match = std::make_unique<ChannelT>();
1304 map.match->name = "/foo";
1305 map.match->type = "aos.examples.Ping";
1306 map.rename = std::make_unique<ChannelT>();
1307 map.rename->name = "/foo/renamed";
1308 maps.emplace_back(std::move(map));
1309 }
1310 reader.RenameLoggedChannel<aos::examples::Ping>(
1311 "/aos", configuration::GetNode(reader.configuration(), "pi2"),
1312 "/pi2/foo/renamed", maps);
1313
1314 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1315 log_reader_factory.set_send_delay(chrono::microseconds(0));
1316
1317 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1318 // Note: An extra channel gets remapped automatically due to a timestamp
1319 // channel being LOCAL_LOGGER'd.
1320 const bool shared = std::get<0>(GetParam()).shared;
1321 ASSERT_EQ(remapped_channels.size(), shared ? 1u : 2u);
1322 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->name()->string_view(),
1323 "/pi2/foo/renamed");
1324 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->type()->string_view(),
1325 "aos.examples.Ping");
1326 if (!shared) {
1327 EXPECT_EQ(remapped_channels[0]->name()->string_view(),
1328 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1329 "aos-message_bridge-Timestamp");
1330 EXPECT_EQ(remapped_channels[0]->type()->string_view(),
1331 "aos.message_bridge.RemoteMessage");
1332 }
1333
1334 reader.Register(&log_reader_factory);
1335
1336 const Node *pi1 =
1337 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1338 const Node *pi2 =
1339 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1340
1341 // Confirm we can read the data on the renamed channel, just for pi2. Nothing
1342 // else should have moved.
1343 std::unique_ptr<EventLoop> pi2_event_loop =
1344 log_reader_factory.MakeEventLoop("test", pi2);
1345 pi2_event_loop->SkipTimingReport();
1346 std::unique_ptr<EventLoop> full_pi2_event_loop =
1347 log_reader_factory.MakeEventLoop("test", pi2);
1348 full_pi2_event_loop->SkipTimingReport();
1349 std::unique_ptr<EventLoop> pi1_event_loop =
1350 log_reader_factory.MakeEventLoop("test", pi1);
1351 pi1_event_loop->SkipTimingReport();
1352
1353 MessageCounter<aos::examples::Ping> pi2_ping(pi2_event_loop.get(), "/aos");
1354 MessageCounter<aos::examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1355 "/foo");
1356 MessageCounter<aos::examples::Ping> full_pi2_renamed_ping(
1357 full_pi2_event_loop.get(), "/pi2/foo/renamed");
1358 MessageCounter<aos::examples::Ping> pi1_ping(pi1_event_loop.get(), "/aos");
1359
1360 log_reader_factory.Run();
1361
1362 EXPECT_EQ(pi2_ping.count(), 0u);
1363 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1364 EXPECT_NE(full_pi2_renamed_ping.count(), 0u);
1365 EXPECT_NE(pi1_ping.count(), 0u);
1366
1367 reader.Deregister();
1368}
1369
Naman Guptaa63aa132023-03-22 20:06:34 -07001370// Tests that we can remap a forwarded channel as well.
1371TEST_P(MultinodeLoggerTest, RemapForwardedLoggedChannel) {
1372 time_converter_.StartEqual();
1373 {
1374 LoggerState pi1_logger = MakeLogger(pi1_);
1375 LoggerState pi2_logger = MakeLogger(pi2_);
1376
1377 event_loop_factory_.RunFor(chrono::milliseconds(95));
1378
1379 StartLogger(&pi1_logger);
1380 StartLogger(&pi2_logger);
1381
1382 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1383 }
1384
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001385 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1386 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1387 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001388
1389 reader.RemapLoggedChannel<examples::Ping>("/test");
1390
1391 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1392 log_reader_factory.set_send_delay(chrono::microseconds(0));
1393
1394 reader.Register(&log_reader_factory);
1395
1396 const Node *pi1 =
1397 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1398 const Node *pi2 =
1399 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1400
1401 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1402 // else should have moved.
1403 std::unique_ptr<EventLoop> pi1_event_loop =
1404 log_reader_factory.MakeEventLoop("test", pi1);
1405 pi1_event_loop->SkipTimingReport();
1406 std::unique_ptr<EventLoop> full_pi1_event_loop =
1407 log_reader_factory.MakeEventLoop("test", pi1);
1408 full_pi1_event_loop->SkipTimingReport();
1409 std::unique_ptr<EventLoop> pi2_event_loop =
1410 log_reader_factory.MakeEventLoop("test", pi2);
1411 pi2_event_loop->SkipTimingReport();
1412
1413 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1414 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1415 MessageCounter<examples::Ping> pi1_original_ping(pi1_event_loop.get(),
1416 "/original/test");
1417 MessageCounter<examples::Ping> pi2_original_ping(pi2_event_loop.get(),
1418 "/original/test");
1419
1420 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1421 pi1_original_ping_timestamp;
1422 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1423 pi1_ping_timestamp;
1424 if (!shared()) {
1425 pi1_original_ping_timestamp =
1426 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1427 pi1_event_loop.get(),
1428 "/pi1/aos/remote_timestamps/pi2/original/test/aos-examples-Ping");
1429 pi1_ping_timestamp =
1430 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1431 pi1_event_loop.get(),
1432 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1433 }
1434
1435 log_reader_factory.Run();
1436
1437 EXPECT_EQ(pi1_ping.count(), 0u);
1438 EXPECT_EQ(pi2_ping.count(), 0u);
1439 EXPECT_NE(pi1_original_ping.count(), 0u);
1440 EXPECT_NE(pi2_original_ping.count(), 0u);
1441 if (!shared()) {
1442 EXPECT_NE(pi1_original_ping_timestamp->count(), 0u);
1443 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1444 }
1445
1446 reader.Deregister();
1447}
1448
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001449// Tests that we can rename a forwarded channel as well.
1450TEST_P(MultinodeLoggerTest, RenameForwardedLoggedChannel) {
1451 std::vector<std::string> actual_filenames;
1452 time_converter_.StartEqual();
1453 {
1454 LoggerState pi1_logger = MakeLogger(pi1_);
1455 LoggerState pi2_logger = MakeLogger(pi2_);
1456
1457 event_loop_factory_.RunFor(chrono::milliseconds(95));
1458
1459 StartLogger(&pi1_logger);
1460 StartLogger(&pi2_logger);
1461
1462 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1463
1464 pi1_logger.AppendAllFilenames(&actual_filenames);
1465 pi2_logger.AppendAllFilenames(&actual_filenames);
1466 }
1467
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001468 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1469 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1470 LogReader reader(sorted_parts);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001471
1472 std::vector<MapT> maps;
1473 {
1474 MapT map;
1475 map.match = std::make_unique<ChannelT>();
1476 map.match->name = "/production*";
1477 map.match->source_node = "pi1";
1478 map.rename = std::make_unique<ChannelT>();
1479 map.rename->name = "/pi1/production";
1480 maps.emplace_back(std::move(map));
1481 }
1482 {
1483 MapT map;
1484 map.match = std::make_unique<ChannelT>();
1485 map.match->name = "/production*";
1486 map.match->source_node = "pi2";
1487 map.rename = std::make_unique<ChannelT>();
1488 map.rename->name = "/pi2/production";
1489 maps.emplace_back(std::move(map));
1490 }
1491 reader.RenameLoggedChannel<aos::examples::Ping>(
1492 "/test", configuration::GetNode(reader.configuration(), "pi1"),
1493 "/pi1/production", maps);
1494
1495 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1496 log_reader_factory.set_send_delay(chrono::microseconds(0));
1497
1498 reader.Register(&log_reader_factory);
1499
1500 const Node *pi1 =
1501 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1502 const Node *pi2 =
1503 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1504
1505 // Confirm we can read the data on the renamed channel, on both the source
1506 // node and the remote node. In case of split timestamp channels, confirm that
1507 // we receive the timestamp messages on the renamed channel as well.
1508 std::unique_ptr<EventLoop> pi1_event_loop =
1509 log_reader_factory.MakeEventLoop("test", pi1);
1510 pi1_event_loop->SkipTimingReport();
1511 std::unique_ptr<EventLoop> full_pi1_event_loop =
1512 log_reader_factory.MakeEventLoop("test", pi1);
1513 full_pi1_event_loop->SkipTimingReport();
1514 std::unique_ptr<EventLoop> pi2_event_loop =
1515 log_reader_factory.MakeEventLoop("test", pi2);
1516 pi2_event_loop->SkipTimingReport();
1517
1518 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1519 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1520 MessageCounter<examples::Ping> pi1_renamed_ping(pi1_event_loop.get(),
1521 "/pi1/production");
1522 MessageCounter<examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1523 "/pi1/production");
1524
1525 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1526 pi1_renamed_ping_timestamp;
1527 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1528 pi1_ping_timestamp;
1529 if (!shared()) {
1530 pi1_renamed_ping_timestamp =
1531 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1532 pi1_event_loop.get(),
1533 "/pi1/aos/remote_timestamps/pi2/pi1/production/aos-examples-Ping");
1534 pi1_ping_timestamp =
1535 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1536 pi1_event_loop.get(),
1537 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1538 }
1539
1540 log_reader_factory.Run();
1541
1542 EXPECT_EQ(pi1_ping.count(), 0u);
1543 EXPECT_EQ(pi2_ping.count(), 0u);
1544 EXPECT_NE(pi1_renamed_ping.count(), 0u);
1545 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1546 if (!shared()) {
1547 EXPECT_NE(pi1_renamed_ping_timestamp->count(), 0u);
1548 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1549 }
1550
1551 reader.Deregister();
1552}
1553
Naman Guptaa63aa132023-03-22 20:06:34 -07001554// Tests that we observe all the same events in log replay (for a given node)
1555// whether we just register an event loop for that node or if we register a full
1556// event loop factory.
1557TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
1558 time_converter_.StartEqual();
1559 constexpr chrono::milliseconds kStartupDelay(95);
Austin Schuh8fb4b452023-08-04 17:02:27 -07001560 std::vector<std::string> filenames;
1561
Naman Guptaa63aa132023-03-22 20:06:34 -07001562 {
1563 LoggerState pi1_logger = MakeLogger(pi1_);
1564 LoggerState pi2_logger = MakeLogger(pi2_);
1565
1566 event_loop_factory_.RunFor(kStartupDelay);
1567
1568 StartLogger(&pi1_logger);
1569 StartLogger(&pi2_logger);
1570
1571 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07001572
1573 pi1_logger.AppendAllFilenames(&filenames);
1574 pi2_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001575 }
1576
Austin Schuh8fb4b452023-08-04 17:02:27 -07001577 LogReader full_reader(SortParts(filenames));
1578 LogReader single_node_reader(SortParts(filenames));
Naman Guptaa63aa132023-03-22 20:06:34 -07001579
1580 SimulatedEventLoopFactory full_factory(full_reader.configuration());
1581 SimulatedEventLoopFactory single_node_factory(
1582 single_node_reader.configuration());
1583 single_node_factory.SkipTimingReport();
1584 single_node_factory.DisableStatistics();
1585 std::unique_ptr<EventLoop> replay_event_loop =
1586 single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
1587 "log_reader");
1588
1589 full_reader.Register(&full_factory);
1590 single_node_reader.Register(replay_event_loop.get());
1591
1592 const Node *full_pi1 =
1593 configuration::GetNode(full_factory.configuration(), "pi1");
1594
1595 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1596 // else should have moved.
1597 std::unique_ptr<EventLoop> full_event_loop =
1598 full_factory.MakeEventLoop("test", full_pi1);
1599 full_event_loop->SkipTimingReport();
1600 full_event_loop->SkipAosLog();
1601 // maps are indexed on channel index.
1602 // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
1603 std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
1604 observed_messages;
1605 std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
1606 for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
1607 ++ii) {
1608 const Channel *channel =
1609 full_event_loop->configuration()->channels()->Get(ii);
1610 // We currently don't support replaying remote timestamp channels in
1611 // realtime replay (unless the remote timestamp channel was not NOT_LOGGED,
1612 // in which case it gets auto-remapped and replayed on a /original channel).
1613 if (channel->name()->string_view().find("remote_timestamp") !=
1614 std::string_view::npos &&
1615 channel->name()->string_view().find("/original") ==
1616 std::string_view::npos) {
1617 continue;
1618 }
1619 if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
1620 observed_messages[ii] = {};
1621 fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
1622 full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
1623 if (fetchers[ii]->Fetch()) {
1624 observed_messages[ii].push_back(std::make_pair(
1625 fetchers[ii]->context().monotonic_event_time, true));
1626 }
1627 });
1628 full_event_loop->MakeRawNoArgWatcher(
1629 channel, [ii, &observed_messages](const Context &context) {
1630 observed_messages[ii].push_back(
1631 std::make_pair(context.monotonic_event_time, false));
1632 });
1633 }
1634 }
1635
1636 full_factory.Run();
1637 fetchers.clear();
1638 full_reader.Deregister();
1639
1640 const Node *single_node_pi1 =
1641 configuration::GetNode(single_node_factory.configuration(), "pi1");
1642 std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
1643
1644 std::unique_ptr<EventLoop> single_node_event_loop =
1645 single_node_factory.MakeEventLoop("test", single_node_pi1);
1646 single_node_event_loop->SkipTimingReport();
1647 single_node_event_loop->SkipAosLog();
1648 for (size_t ii = 0;
1649 ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
1650 const Channel *channel =
1651 single_node_event_loop->configuration()->channels()->Get(ii);
1652 single_node_factory.DisableForwarding(channel);
1653 if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
1654 single_node_fetchers[ii] =
1655 single_node_event_loop->MakeRawFetcher(channel);
1656 single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
1657 EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
1658 << "Single EventLoop replay doesn't support pre-loading fetchers. "
1659 << configuration::StrippedChannelToString(channel);
1660 });
1661 single_node_event_loop->MakeRawNoArgWatcher(
1662 channel, [ii, &observed_messages, channel,
1663 kStartupDelay](const Context &context) {
1664 if (observed_messages[ii].empty()) {
1665 FAIL() << "Observed extra message at "
1666 << context.monotonic_event_time << " on "
1667 << configuration::StrippedChannelToString(channel);
1668 return;
1669 }
1670 const std::pair<monotonic_clock::time_point, bool> &message =
1671 observed_messages[ii].front();
1672 if (message.second) {
1673 EXPECT_LE(message.first,
1674 context.monotonic_event_time + kStartupDelay)
1675 << "Mismatched message times " << context.monotonic_event_time
1676 << " and " << message.first << " on "
1677 << configuration::StrippedChannelToString(channel);
1678 } else {
1679 EXPECT_EQ(message.first,
1680 context.monotonic_event_time + kStartupDelay)
1681 << "Mismatched message times " << context.monotonic_event_time
1682 << " and " << message.first << " on "
1683 << configuration::StrippedChannelToString(channel);
1684 }
1685 observed_messages[ii].erase(observed_messages[ii].begin());
1686 });
1687 }
1688 }
1689
1690 single_node_factory.Run();
1691
1692 single_node_fetchers.clear();
1693
1694 single_node_reader.Deregister();
1695
1696 for (const auto &pair : observed_messages) {
1697 EXPECT_TRUE(pair.second.empty())
1698 << "Missed " << pair.second.size() << " messages on "
1699 << configuration::StrippedChannelToString(
1700 single_node_event_loop->configuration()->channels()->Get(
1701 pair.first));
1702 }
1703}
1704
1705// Tests that we properly recreate forwarded timestamps when replaying a log.
1706// This should be enough that we can then re-run the logger and get a valid log
1707// back.
1708TEST_P(MultinodeLoggerTest, MessageHeader) {
1709 time_converter_.StartEqual();
1710 {
1711 LoggerState pi1_logger = MakeLogger(pi1_);
1712 LoggerState pi2_logger = MakeLogger(pi2_);
1713
1714 event_loop_factory_.RunFor(chrono::milliseconds(95));
1715
1716 StartLogger(&pi1_logger);
1717 StartLogger(&pi2_logger);
1718
1719 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1720 }
1721
1722 LogReader reader(SortParts(logfiles_));
1723
1724 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1725 log_reader_factory.set_send_delay(chrono::microseconds(0));
1726
1727 // This sends out the fetched messages and advances time to the start of the
1728 // log file.
1729 reader.Register(&log_reader_factory);
1730
1731 const Node *pi1 =
1732 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1733 const Node *pi2 =
1734 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1735
1736 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1737 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1738 LOG(INFO) << "now pi1 "
1739 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1740 LOG(INFO) << "now pi2 "
1741 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1742
1743 EXPECT_THAT(reader.LoggedNodes(),
1744 ::testing::ElementsAre(
1745 configuration::GetNode(reader.logged_configuration(), pi1),
1746 configuration::GetNode(reader.logged_configuration(), pi2)));
1747
1748 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1749
1750 std::unique_ptr<EventLoop> pi1_event_loop =
1751 log_reader_factory.MakeEventLoop("test", pi1);
1752 std::unique_ptr<EventLoop> pi2_event_loop =
1753 log_reader_factory.MakeEventLoop("test", pi2);
1754
1755 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
1756 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1757 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
1758 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1759
1760 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1761 pi1_event_loop->MakeFetcher<examples::Ping>("/test");
1762 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1763 pi2_event_loop->MakeFetcher<examples::Ping>("/test");
1764
1765 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
1766 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1767 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
1768 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1769
1770 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1771 pi2_event_loop->MakeFetcher<examples::Pong>("/test");
1772 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1773 pi1_event_loop->MakeFetcher<examples::Pong>("/test");
1774
1775 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
1776 pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
1777 const size_t ping_timestamp_channel = configuration::ChannelIndex(
1778 pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
1779
1780 const size_t pi2_timestamp_channel = configuration::ChannelIndex(
1781 pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
1782 const size_t pong_timestamp_channel = configuration::ChannelIndex(
1783 pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
1784
1785 const chrono::nanoseconds network_delay = event_loop_factory_.network_delay();
1786 const chrono::nanoseconds send_delay = event_loop_factory_.send_delay();
1787
1788 for (std::pair<int, std::string> channel :
1789 shared()
1790 ? std::vector<
1791 std::pair<int, std::string>>{{-1,
1792 "/aos/remote_timestamps/pi2"}}
1793 : std::vector<std::pair<int, std::string>>{
1794 {pi1_timestamp_channel,
1795 "/aos/remote_timestamps/pi2/pi1/aos/"
1796 "aos-message_bridge-Timestamp"},
1797 {ping_timestamp_channel,
1798 "/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
1799 pi1_event_loop->MakeWatcher(
1800 channel.second,
1801 [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
1802 ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
1803 &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
1804 &ping_on_pi2_fetcher, network_delay, send_delay,
1805 channel_index = channel.first](const RemoteMessage &header) {
1806 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1807 chrono::nanoseconds(header.monotonic_sent_time()));
1808 const aos::realtime_clock::time_point header_realtime_sent_time(
1809 chrono::nanoseconds(header.realtime_sent_time()));
1810 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1811 chrono::nanoseconds(header.monotonic_remote_time()));
1812 const aos::realtime_clock::time_point header_realtime_remote_time(
1813 chrono::nanoseconds(header.realtime_remote_time()));
1814
1815 if (channel_index != -1) {
1816 ASSERT_EQ(channel_index, header.channel_index());
1817 }
1818
1819 const Context *pi1_context = nullptr;
1820 const Context *pi2_context = nullptr;
1821
1822 if (header.channel_index() == pi1_timestamp_channel) {
1823 ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
1824 ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
1825 pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
1826 pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
1827 } else if (header.channel_index() == ping_timestamp_channel) {
1828 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
1829 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
1830 pi1_context = &ping_on_pi1_fetcher.context();
1831 pi2_context = &ping_on_pi2_fetcher.context();
1832 } else {
1833 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1834 << configuration::CleanedChannelToString(
1835 pi1_event_loop->configuration()->channels()->Get(
1836 header.channel_index()));
1837 }
1838
1839 ASSERT_TRUE(header.has_boot_uuid());
1840 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1841 pi2_event_loop->boot_uuid());
1842
1843 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
1844 EXPECT_EQ(pi2_context->remote_queue_index,
1845 header.remote_queue_index());
1846 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
1847
1848 EXPECT_EQ(pi2_context->monotonic_event_time,
1849 header_monotonic_sent_time);
1850 EXPECT_EQ(pi2_context->realtime_event_time,
1851 header_realtime_sent_time);
1852 EXPECT_EQ(pi2_context->realtime_remote_time,
1853 header_realtime_remote_time);
1854 EXPECT_EQ(pi2_context->monotonic_remote_time,
1855 header_monotonic_remote_time);
1856
1857 EXPECT_EQ(pi1_context->realtime_event_time,
1858 header_realtime_remote_time);
1859 EXPECT_EQ(pi1_context->monotonic_event_time,
1860 header_monotonic_remote_time);
1861
1862 // Time estimation isn't perfect, but we know the clocks were
1863 // identical when logged, so we know when this should have come back.
1864 // Confirm we got it when we expected.
1865 EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
1866 pi1_context->monotonic_event_time + 2 * network_delay +
1867 send_delay);
1868 });
1869 }
1870 for (std::pair<int, std::string> channel :
1871 shared()
1872 ? std::vector<
1873 std::pair<int, std::string>>{{-1,
1874 "/aos/remote_timestamps/pi1"}}
1875 : std::vector<std::pair<int, std::string>>{
1876 {pi2_timestamp_channel,
1877 "/aos/remote_timestamps/pi1/pi2/aos/"
1878 "aos-message_bridge-Timestamp"}}) {
1879 pi2_event_loop->MakeWatcher(
1880 channel.second,
1881 [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
1882 pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
1883 &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
1884 &pong_on_pi1_fetcher, network_delay, send_delay,
1885 channel_index = channel.first](const RemoteMessage &header) {
1886 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1887 chrono::nanoseconds(header.monotonic_sent_time()));
1888 const aos::realtime_clock::time_point header_realtime_sent_time(
1889 chrono::nanoseconds(header.realtime_sent_time()));
1890 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1891 chrono::nanoseconds(header.monotonic_remote_time()));
1892 const aos::realtime_clock::time_point header_realtime_remote_time(
1893 chrono::nanoseconds(header.realtime_remote_time()));
1894
1895 if (channel_index != -1) {
1896 ASSERT_EQ(channel_index, header.channel_index());
1897 }
1898
1899 const Context *pi2_context = nullptr;
1900 const Context *pi1_context = nullptr;
1901
1902 if (header.channel_index() == pi2_timestamp_channel) {
1903 ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
1904 ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
1905 pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
1906 pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
1907 } else if (header.channel_index() == pong_timestamp_channel) {
1908 ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
1909 ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
1910 pi2_context = &pong_on_pi2_fetcher.context();
1911 pi1_context = &pong_on_pi1_fetcher.context();
1912 } else {
1913 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1914 << configuration::CleanedChannelToString(
1915 pi2_event_loop->configuration()->channels()->Get(
1916 header.channel_index()));
1917 }
1918
1919 ASSERT_TRUE(header.has_boot_uuid());
1920 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1921 pi1_event_loop->boot_uuid());
1922
1923 EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
1924 EXPECT_EQ(pi1_context->remote_queue_index,
1925 header.remote_queue_index());
1926 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
1927
1928 EXPECT_EQ(pi1_context->monotonic_event_time,
1929 header_monotonic_sent_time);
1930 EXPECT_EQ(pi1_context->realtime_event_time,
1931 header_realtime_sent_time);
1932 EXPECT_EQ(pi1_context->realtime_remote_time,
1933 header_realtime_remote_time);
1934 EXPECT_EQ(pi1_context->monotonic_remote_time,
1935 header_monotonic_remote_time);
1936
1937 EXPECT_EQ(pi2_context->realtime_event_time,
1938 header_realtime_remote_time);
1939 EXPECT_EQ(pi2_context->monotonic_event_time,
1940 header_monotonic_remote_time);
1941
1942 // Time estimation isn't perfect, but we know the clocks were
1943 // identical when logged, so we know when this should have come back.
1944 // Confirm we got it when we expected.
1945 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
1946 pi2_context->monotonic_event_time + 2 * network_delay +
1947 send_delay);
1948 });
1949 }
1950
1951 // And confirm we can re-create a log again, while checking the contents.
1952 {
1953 LoggerState pi1_logger = MakeLogger(
1954 log_reader_factory.GetNodeEventLoopFactory("pi1"), &log_reader_factory);
1955 LoggerState pi2_logger = MakeLogger(
1956 log_reader_factory.GetNodeEventLoopFactory("pi2"), &log_reader_factory);
1957
Austin Schuh8fb4b452023-08-04 17:02:27 -07001958 StartLogger(&pi1_logger, tmp_dir_ + "/logs/relogged1");
1959 StartLogger(&pi2_logger, tmp_dir_ + "/logs/relogged2");
Naman Guptaa63aa132023-03-22 20:06:34 -07001960
1961 log_reader_factory.Run();
1962 }
1963
1964 reader.Deregister();
1965
1966 // And verify that we can run the LogReader over the relogged files without
1967 // hitting any fatal errors.
1968 {
Austin Schuh8fb4b452023-08-04 17:02:27 -07001969 LogReader relogged_reader(SortParts(
1970 MakeLogFiles(tmp_dir_ + "/logs/relogged1", tmp_dir_ + "/logs/relogged2",
1971 1, 1, 2, 2, true)));
Naman Guptaa63aa132023-03-22 20:06:34 -07001972 relogged_reader.Register();
1973
1974 relogged_reader.event_loop_factory()->Run();
1975 }
1976 // And confirm that we can read the logged file using the reader's
1977 // configuration.
1978 {
1979 LogReader relogged_reader(
Austin Schuh8fb4b452023-08-04 17:02:27 -07001980 SortParts(MakeLogFiles(tmp_dir_ + "/logs/relogged1",
1981 tmp_dir_ + "/logs/relogged2", 1, 1, 2, 2, true)),
Naman Guptaa63aa132023-03-22 20:06:34 -07001982 reader.configuration());
1983 relogged_reader.Register();
1984
1985 relogged_reader.event_loop_factory()->Run();
1986 }
1987}
1988
1989// Tests that we properly populate and extract the logger_start time by setting
1990// up a clock difference between 2 nodes and looking at the resulting parts.
1991TEST_P(MultinodeLoggerTest, LoggerStartTime) {
1992 std::vector<std::string> actual_filenames;
1993 time_converter_.AddMonotonic(
1994 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1995 {
1996 LoggerState pi1_logger = MakeLogger(pi1_);
1997 LoggerState pi2_logger = MakeLogger(pi2_);
1998
1999 StartLogger(&pi1_logger);
2000 StartLogger(&pi2_logger);
2001
2002 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2003
2004 pi1_logger.AppendAllFilenames(&actual_filenames);
2005 pi2_logger.AppendAllFilenames(&actual_filenames);
2006 }
2007
2008 ASSERT_THAT(actual_filenames,
2009 ::testing::UnorderedElementsAreArray(logfiles_));
2010
Austin Schuh8fb4b452023-08-04 17:02:27 -07002011 for (const LogFile &log_file : SortParts(actual_filenames)) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002012 for (const LogParts &log_part : log_file.parts) {
2013 if (log_part.node == log_file.logger_node) {
2014 EXPECT_EQ(log_part.logger_monotonic_start_time,
2015 aos::monotonic_clock::min_time);
2016 EXPECT_EQ(log_part.logger_realtime_start_time,
2017 aos::realtime_clock::min_time);
2018 } else {
2019 const chrono::seconds offset = log_file.logger_node == "pi1"
2020 ? -chrono::seconds(1000)
2021 : chrono::seconds(1000);
2022 EXPECT_EQ(log_part.logger_monotonic_start_time,
2023 log_part.monotonic_start_time + offset);
2024 EXPECT_EQ(log_part.logger_realtime_start_time,
2025 log_file.realtime_start_time +
2026 (log_part.logger_monotonic_start_time -
2027 log_file.monotonic_start_time));
2028 }
2029 }
2030 }
2031}
2032
2033// Test that renaming the base, renames the folder.
2034TEST_P(MultinodeLoggerTest, LoggerRenameFolder) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002035 time_converter_.AddMonotonic(
2036 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07002037 logfile_base1_ = tmp_dir_ + "/logs/renamefolder/multi_logfile1";
2038 logfile_base2_ = tmp_dir_ + "/logs/renamefolder/multi_logfile2";
2039
Naman Guptaa63aa132023-03-22 20:06:34 -07002040 LoggerState pi1_logger = MakeLogger(pi1_);
2041 LoggerState pi2_logger = MakeLogger(pi2_);
2042
2043 StartLogger(&pi1_logger);
2044 StartLogger(&pi2_logger);
2045
2046 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002047 logfile_base1_ = tmp_dir_ + "/logs/new-good/multi_logfile1";
2048 logfile_base2_ = tmp_dir_ + "/logs/new-good/multi_logfile2";
Naman Guptaa63aa132023-03-22 20:06:34 -07002049 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
Alexei Strotscaf17d32023-04-03 22:31:11 -07002050
2051 // Sequence of set_base_name and Rotate simulates rename operation. Since
2052 // rename is not supported by all namers, RenameLogBase moved from logger to
2053 // the higher level abstraction, yet log_namers support rename, and it is
2054 // legal to test it here.
2055 pi1_logger.log_namer->set_base_name(logfile_base1_);
2056 pi1_logger.logger->Rotate();
2057 pi2_logger.log_namer->set_base_name(logfile_base2_);
2058 pi2_logger.logger->Rotate();
2059
Naman Guptaa63aa132023-03-22 20:06:34 -07002060 for (auto &file : logfiles_) {
2061 struct stat s;
2062 EXPECT_EQ(0, stat(file.c_str(), &s));
2063 }
2064}
2065
2066// Test that renaming the file base dies.
2067TEST_P(MultinodeLoggerDeathTest, LoggerRenameFile) {
2068 time_converter_.AddMonotonic(
2069 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07002070 logfile_base1_ = tmp_dir_ + "/logs/renamefile/multi_logfile1";
2071 logfile_base2_ = tmp_dir_ + "/logs/renamefile/multi_logfile2";
2072
Naman Guptaa63aa132023-03-22 20:06:34 -07002073 LoggerState pi1_logger = MakeLogger(pi1_);
2074 StartLogger(&pi1_logger);
2075 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002076 logfile_base1_ = tmp_dir_ + "/logs/new-renamefile/new_multi_logfile1";
Alexei Strotscaf17d32023-04-03 22:31:11 -07002077 EXPECT_DEATH({ pi1_logger.log_namer->set_base_name(logfile_base1_); },
Naman Guptaa63aa132023-03-22 20:06:34 -07002078 "Rename of file base from");
2079}
2080
2081// TODO(austin): We can write a test which recreates a logfile and confirms that
2082// we get it back. That is the ultimate test.
2083
2084// Tests that we properly recreate forwarded timestamps when replaying a log.
2085// This should be enough that we can then re-run the logger and get a valid log
2086// back.
2087TEST_P(MultinodeLoggerTest, RemoteReboot) {
2088 std::vector<std::string> actual_filenames;
2089
2090 const UUID pi1_boot0 = UUID::Random();
2091 const UUID pi2_boot0 = UUID::Random();
2092 const UUID pi2_boot1 = UUID::Random();
2093 {
2094 CHECK_EQ(pi1_index_, 0u);
2095 CHECK_EQ(pi2_index_, 1u);
2096
2097 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2098 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2099 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2100
2101 time_converter_.AddNextTimestamp(
2102 distributed_clock::epoch(),
2103 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2104 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2105 time_converter_.AddNextTimestamp(
2106 distributed_clock::epoch() + reboot_time,
2107 {BootTimestamp::epoch() + reboot_time,
2108 BootTimestamp{
2109 .boot = 1,
2110 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2111 }
2112
2113 {
2114 LoggerState pi1_logger = MakeLogger(pi1_);
2115
2116 event_loop_factory_.RunFor(chrono::milliseconds(95));
2117 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2118 pi1_boot0);
2119 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2120 pi2_boot0);
2121
2122 StartLogger(&pi1_logger);
2123
2124 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2125
2126 VLOG(1) << "Reboot now!";
2127
2128 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2129 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2130 pi1_boot0);
2131 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2132 pi2_boot1);
2133
2134 pi1_logger.AppendAllFilenames(&actual_filenames);
2135 }
2136
2137 std::sort(actual_filenames.begin(), actual_filenames.end());
2138 std::sort(pi1_reboot_logfiles_.begin(), pi1_reboot_logfiles_.end());
2139 ASSERT_THAT(actual_filenames,
2140 ::testing::UnorderedElementsAreArray(pi1_reboot_logfiles_));
2141
2142 // Confirm that our new oldest timestamps properly update as we reboot and
2143 // rotate.
2144 for (const std::string &file : pi1_reboot_logfiles_) {
2145 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2146 ReadHeader(file);
2147 CHECK(log_header);
2148 if (log_header->message().has_configuration()) {
2149 continue;
2150 }
2151
2152 const monotonic_clock::time_point monotonic_start_time =
2153 monotonic_clock::time_point(
2154 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2155 const UUID source_node_boot_uuid = UUID::FromString(
2156 log_header->message().source_node_boot_uuid()->string_view());
2157
2158 if (log_header->message().node()->name()->string_view() != "pi1") {
2159 // The remote message channel should rotate later and have more parts.
2160 // This only is true on the log files with shared remote messages.
2161 //
2162 // TODO(austin): I'm not the most thrilled with this test pattern... It
2163 // feels brittle in a different way.
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002164 if (file.find("timestamps/remote_pi2") == std::string::npos) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002165 switch (log_header->message().parts_index()) {
2166 case 0:
2167 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2168 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2169 break;
2170 case 1:
2171 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2172 ASSERT_EQ(monotonic_start_time,
2173 monotonic_clock::epoch() + chrono::seconds(1));
2174 break;
2175 case 2:
2176 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2177 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2178 break;
2179 case 3:
2180 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2181 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
2182 chrono::nanoseconds(2322999462))
2183 << " on " << file;
2184 break;
2185 default:
2186 FAIL();
2187 break;
2188 }
2189 } else {
2190 switch (log_header->message().parts_index()) {
2191 case 0:
2192 case 1:
2193 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2194 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2195 break;
2196 case 2:
2197 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2198 ASSERT_EQ(monotonic_start_time,
2199 monotonic_clock::epoch() + chrono::seconds(1));
2200 break;
2201 case 3:
2202 case 4:
2203 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2204 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2205 break;
2206 case 5:
2207 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2208 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
2209 chrono::nanoseconds(2322999462))
2210 << " on " << file;
2211 break;
2212 default:
2213 FAIL();
2214 break;
2215 }
2216 }
2217 continue;
2218 }
2219 SCOPED_TRACE(file);
2220 SCOPED_TRACE(aos::FlatbufferToJson(
2221 *log_header, {.multi_line = true, .max_vector_size = 100}));
2222 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2223 ASSERT_EQ(
2224 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2225 EXPECT_EQ(
2226 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2227 monotonic_clock::max_time.time_since_epoch().count());
2228 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2229 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2230 2u);
2231 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2232 monotonic_clock::max_time.time_since_epoch().count());
2233 ASSERT_TRUE(log_header->message()
2234 .has_oldest_remote_unreliable_monotonic_timestamps());
2235 ASSERT_EQ(log_header->message()
2236 .oldest_remote_unreliable_monotonic_timestamps()
2237 ->size(),
2238 2u);
2239 EXPECT_EQ(log_header->message()
2240 .oldest_remote_unreliable_monotonic_timestamps()
2241 ->Get(0),
2242 monotonic_clock::max_time.time_since_epoch().count());
2243 ASSERT_TRUE(log_header->message()
2244 .has_oldest_local_unreliable_monotonic_timestamps());
2245 ASSERT_EQ(log_header->message()
2246 .oldest_local_unreliable_monotonic_timestamps()
2247 ->size(),
2248 2u);
2249 EXPECT_EQ(log_header->message()
2250 .oldest_local_unreliable_monotonic_timestamps()
2251 ->Get(0),
2252 monotonic_clock::max_time.time_since_epoch().count());
2253
2254 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2255 monotonic_clock::time_point(chrono::nanoseconds(
2256 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2257 1)));
2258 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2259 monotonic_clock::time_point(chrono::nanoseconds(
2260 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2261 const monotonic_clock::time_point
2262 oldest_remote_unreliable_monotonic_timestamps =
2263 monotonic_clock::time_point(chrono::nanoseconds(
2264 log_header->message()
2265 .oldest_remote_unreliable_monotonic_timestamps()
2266 ->Get(1)));
2267 const monotonic_clock::time_point
2268 oldest_local_unreliable_monotonic_timestamps =
2269 monotonic_clock::time_point(chrono::nanoseconds(
2270 log_header->message()
2271 .oldest_local_unreliable_monotonic_timestamps()
2272 ->Get(1)));
2273 const monotonic_clock::time_point
2274 oldest_remote_reliable_monotonic_timestamps =
2275 monotonic_clock::time_point(chrono::nanoseconds(
2276 log_header->message()
2277 .oldest_remote_reliable_monotonic_timestamps()
2278 ->Get(1)));
2279 const monotonic_clock::time_point
2280 oldest_local_reliable_monotonic_timestamps =
2281 monotonic_clock::time_point(chrono::nanoseconds(
2282 log_header->message()
2283 .oldest_local_reliable_monotonic_timestamps()
2284 ->Get(1)));
2285 const monotonic_clock::time_point
2286 oldest_logger_remote_unreliable_monotonic_timestamps =
2287 monotonic_clock::time_point(chrono::nanoseconds(
2288 log_header->message()
2289 .oldest_logger_remote_unreliable_monotonic_timestamps()
2290 ->Get(0)));
2291 const monotonic_clock::time_point
2292 oldest_logger_local_unreliable_monotonic_timestamps =
2293 monotonic_clock::time_point(chrono::nanoseconds(
2294 log_header->message()
2295 .oldest_logger_local_unreliable_monotonic_timestamps()
2296 ->Get(0)));
2297 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2298 monotonic_clock::max_time);
2299 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2300 monotonic_clock::max_time);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002301 if (log_header->message().data_stored()->Get(0) == StoredDataType::DATA) {
2302 switch (log_header->message().parts_index()) {
2303 case 0:
2304 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2305 monotonic_clock::max_time);
2306 EXPECT_EQ(oldest_local_monotonic_timestamps,
2307 monotonic_clock::max_time);
2308 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2309 monotonic_clock::max_time);
2310 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2311 monotonic_clock::max_time);
2312 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2313 monotonic_clock::max_time);
2314 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2315 monotonic_clock::max_time);
2316 break;
2317 default:
2318 FAIL();
2319 break;
2320 }
2321 } else if (log_header->message().data_stored()->Get(0) ==
2322 StoredDataType::TIMESTAMPS) {
2323 switch (log_header->message().parts_index()) {
2324 case 0:
2325 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2326 monotonic_clock::time_point(chrono::microseconds(90200)));
2327 EXPECT_EQ(oldest_local_monotonic_timestamps,
2328 monotonic_clock::time_point(chrono::microseconds(90350)));
2329 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2330 monotonic_clock::time_point(chrono::microseconds(90200)));
2331 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2332 monotonic_clock::time_point(chrono::microseconds(90350)));
2333 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2334 monotonic_clock::max_time);
2335 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2336 monotonic_clock::max_time);
2337 break;
2338 case 1:
2339 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2340 monotonic_clock::time_point(chrono::microseconds(90200)))
2341 << file;
2342 EXPECT_EQ(oldest_local_monotonic_timestamps,
2343 monotonic_clock::time_point(chrono::microseconds(90350)))
2344 << file;
2345 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2346 monotonic_clock::time_point(chrono::microseconds(90200)))
2347 << file;
2348 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2349 monotonic_clock::time_point(chrono::microseconds(90350)))
2350 << file;
2351 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2352 monotonic_clock::time_point(chrono::microseconds(100000)))
2353 << file;
2354 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2355 monotonic_clock::time_point(chrono::microseconds(100150)))
2356 << file;
2357 break;
2358 case 2:
2359 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2360 monotonic_clock::time_point(chrono::milliseconds(1323) +
2361 chrono::microseconds(200)));
2362 EXPECT_EQ(
2363 oldest_local_monotonic_timestamps,
2364 monotonic_clock::time_point(chrono::microseconds(10100350)));
2365 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2366 monotonic_clock::time_point(chrono::milliseconds(1323) +
2367 chrono::microseconds(200)));
2368 EXPECT_EQ(
2369 oldest_local_unreliable_monotonic_timestamps,
2370 monotonic_clock::time_point(chrono::microseconds(10100350)));
2371 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2372 monotonic_clock::max_time)
2373 << file;
2374 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2375 monotonic_clock::max_time)
2376 << file;
2377 break;
2378 case 3:
2379 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2380 monotonic_clock::time_point(chrono::milliseconds(1323) +
2381 chrono::microseconds(200)));
2382 EXPECT_EQ(
2383 oldest_local_monotonic_timestamps,
2384 monotonic_clock::time_point(chrono::microseconds(10100350)));
2385 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2386 monotonic_clock::time_point(chrono::milliseconds(1323) +
2387 chrono::microseconds(200)));
2388 EXPECT_EQ(
2389 oldest_local_unreliable_monotonic_timestamps,
2390 monotonic_clock::time_point(chrono::microseconds(10100350)));
2391 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2392 monotonic_clock::time_point(chrono::microseconds(1423000)))
2393 << file;
2394 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2395 monotonic_clock::time_point(chrono::microseconds(10200150)))
2396 << file;
2397 break;
2398 default:
2399 FAIL();
2400 break;
2401 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002402 }
2403 }
2404
2405 // Confirm that we refuse to replay logs with missing boot uuids.
2406 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002407 auto sorted_parts = SortParts(pi1_reboot_logfiles_);
2408 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2409 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002410
2411 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2412 log_reader_factory.set_send_delay(chrono::microseconds(0));
2413
2414 // This sends out the fetched messages and advances time to the start of
2415 // the log file.
2416 reader.Register(&log_reader_factory);
2417
2418 log_reader_factory.Run();
2419
2420 reader.Deregister();
2421 }
2422}
2423
2424// Tests that we can sort a log which only has timestamps from the remote
2425// because the local message_bridge_client failed to connect.
2426TEST_P(MultinodeLoggerTest, RemoteRebootOnlyTimestamps) {
2427 const UUID pi1_boot0 = UUID::Random();
2428 const UUID pi2_boot0 = UUID::Random();
2429 const UUID pi2_boot1 = UUID::Random();
2430 {
2431 CHECK_EQ(pi1_index_, 0u);
2432 CHECK_EQ(pi2_index_, 1u);
2433
2434 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2435 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2436 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2437
2438 time_converter_.AddNextTimestamp(
2439 distributed_clock::epoch(),
2440 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2441 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2442 time_converter_.AddNextTimestamp(
2443 distributed_clock::epoch() + reboot_time,
2444 {BootTimestamp::epoch() + reboot_time,
2445 BootTimestamp{
2446 .boot = 1,
2447 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2448 }
2449 pi2_->Disconnect(pi1_->node());
2450
2451 std::vector<std::string> filenames;
2452 {
2453 LoggerState pi1_logger = MakeLogger(pi1_);
2454
2455 event_loop_factory_.RunFor(chrono::milliseconds(95));
2456 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2457 pi1_boot0);
2458 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2459 pi2_boot0);
2460
2461 StartLogger(&pi1_logger);
2462
2463 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2464
2465 VLOG(1) << "Reboot now!";
2466
2467 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2468 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2469 pi1_boot0);
2470 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2471 pi2_boot1);
2472 pi1_logger.AppendAllFilenames(&filenames);
2473 }
2474
2475 std::sort(filenames.begin(), filenames.end());
2476
2477 // Confirm that our new oldest timestamps properly update as we reboot and
2478 // rotate.
2479 size_t timestamp_file_count = 0;
2480 for (const std::string &file : filenames) {
2481 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2482 ReadHeader(file);
2483 CHECK(log_header);
2484
2485 if (log_header->message().has_configuration()) {
2486 continue;
2487 }
2488
2489 const monotonic_clock::time_point monotonic_start_time =
2490 monotonic_clock::time_point(
2491 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2492 const UUID source_node_boot_uuid = UUID::FromString(
2493 log_header->message().source_node_boot_uuid()->string_view());
2494
2495 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2496 ASSERT_EQ(
2497 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2498 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2499 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2500 2u);
2501 ASSERT_TRUE(log_header->message()
2502 .has_oldest_remote_unreliable_monotonic_timestamps());
2503 ASSERT_EQ(log_header->message()
2504 .oldest_remote_unreliable_monotonic_timestamps()
2505 ->size(),
2506 2u);
2507 ASSERT_TRUE(log_header->message()
2508 .has_oldest_local_unreliable_monotonic_timestamps());
2509 ASSERT_EQ(log_header->message()
2510 .oldest_local_unreliable_monotonic_timestamps()
2511 ->size(),
2512 2u);
2513 ASSERT_TRUE(log_header->message()
2514 .has_oldest_remote_reliable_monotonic_timestamps());
2515 ASSERT_EQ(log_header->message()
2516 .oldest_remote_reliable_monotonic_timestamps()
2517 ->size(),
2518 2u);
2519 ASSERT_TRUE(
2520 log_header->message().has_oldest_local_reliable_monotonic_timestamps());
2521 ASSERT_EQ(log_header->message()
2522 .oldest_local_reliable_monotonic_timestamps()
2523 ->size(),
2524 2u);
2525
2526 ASSERT_TRUE(
2527 log_header->message()
2528 .has_oldest_logger_remote_unreliable_monotonic_timestamps());
2529 ASSERT_EQ(log_header->message()
2530 .oldest_logger_remote_unreliable_monotonic_timestamps()
2531 ->size(),
2532 2u);
2533 ASSERT_TRUE(log_header->message()
2534 .has_oldest_logger_local_unreliable_monotonic_timestamps());
2535 ASSERT_EQ(log_header->message()
2536 .oldest_logger_local_unreliable_monotonic_timestamps()
2537 ->size(),
2538 2u);
2539
2540 if (log_header->message().node()->name()->string_view() != "pi1") {
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002541 ASSERT_TRUE(file.find("timestamps/remote_pi2") != std::string::npos);
Naman Guptaa63aa132023-03-22 20:06:34 -07002542
2543 const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
2544 ReadNthMessage(file, 0);
2545 CHECK(msg);
2546
2547 EXPECT_TRUE(msg->message().has_monotonic_sent_time());
2548 EXPECT_TRUE(msg->message().has_monotonic_remote_time());
2549
2550 const monotonic_clock::time_point
2551 expected_oldest_local_monotonic_timestamps(
2552 chrono::nanoseconds(msg->message().monotonic_sent_time()));
2553 const monotonic_clock::time_point
2554 expected_oldest_remote_monotonic_timestamps(
2555 chrono::nanoseconds(msg->message().monotonic_remote_time()));
2556 const monotonic_clock::time_point
2557 expected_oldest_timestamp_monotonic_timestamps(
2558 chrono::nanoseconds(msg->message().monotonic_timestamp_time()));
2559
2560 EXPECT_NE(expected_oldest_local_monotonic_timestamps,
2561 monotonic_clock::min_time);
2562 EXPECT_NE(expected_oldest_remote_monotonic_timestamps,
2563 monotonic_clock::min_time);
2564 EXPECT_NE(expected_oldest_timestamp_monotonic_timestamps,
2565 monotonic_clock::min_time);
2566
2567 ++timestamp_file_count;
2568 // Since the log file is from the perspective of the other node,
2569 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2570 monotonic_clock::time_point(chrono::nanoseconds(
2571 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2572 0)));
2573 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2574 monotonic_clock::time_point(chrono::nanoseconds(
2575 log_header->message().oldest_local_monotonic_timestamps()->Get(
2576 0)));
2577 const monotonic_clock::time_point
2578 oldest_remote_unreliable_monotonic_timestamps =
2579 monotonic_clock::time_point(chrono::nanoseconds(
2580 log_header->message()
2581 .oldest_remote_unreliable_monotonic_timestamps()
2582 ->Get(0)));
2583 const monotonic_clock::time_point
2584 oldest_local_unreliable_monotonic_timestamps =
2585 monotonic_clock::time_point(chrono::nanoseconds(
2586 log_header->message()
2587 .oldest_local_unreliable_monotonic_timestamps()
2588 ->Get(0)));
2589 const monotonic_clock::time_point
2590 oldest_remote_reliable_monotonic_timestamps =
2591 monotonic_clock::time_point(chrono::nanoseconds(
2592 log_header->message()
2593 .oldest_remote_reliable_monotonic_timestamps()
2594 ->Get(0)));
2595 const monotonic_clock::time_point
2596 oldest_local_reliable_monotonic_timestamps =
2597 monotonic_clock::time_point(chrono::nanoseconds(
2598 log_header->message()
2599 .oldest_local_reliable_monotonic_timestamps()
2600 ->Get(0)));
2601 const monotonic_clock::time_point
2602 oldest_logger_remote_unreliable_monotonic_timestamps =
2603 monotonic_clock::time_point(chrono::nanoseconds(
2604 log_header->message()
2605 .oldest_logger_remote_unreliable_monotonic_timestamps()
2606 ->Get(1)));
2607 const monotonic_clock::time_point
2608 oldest_logger_local_unreliable_monotonic_timestamps =
2609 monotonic_clock::time_point(chrono::nanoseconds(
2610 log_header->message()
2611 .oldest_logger_local_unreliable_monotonic_timestamps()
2612 ->Get(1)));
2613
2614 const Channel *channel =
2615 event_loop_factory_.configuration()->channels()->Get(
2616 msg->message().channel_index());
2617 const Connection *connection = configuration::ConnectionToNode(
2618 channel, configuration::GetNode(
2619 event_loop_factory_.configuration(),
2620 log_header->message().node()->name()->string_view()));
2621
2622 const bool reliable = connection->time_to_live() == 0;
2623
2624 SCOPED_TRACE(file);
2625 SCOPED_TRACE(aos::FlatbufferToJson(
2626 *log_header, {.multi_line = true, .max_vector_size = 100}));
2627
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002628 // Confirm that the oldest timestamps match what we expect. Based on
2629 // what we are doing, we know that the oldest time is the first
2630 // message's time.
2631 //
2632 // This makes the test robust to both the split and combined config
2633 // tests.
2634 switch (log_header->message().parts_index()) {
2635 case 0:
2636 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2637 expected_oldest_remote_monotonic_timestamps);
2638 EXPECT_EQ(oldest_local_monotonic_timestamps,
2639 expected_oldest_local_monotonic_timestamps);
2640 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2641 expected_oldest_local_monotonic_timestamps)
2642 << file;
2643 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2644 expected_oldest_timestamp_monotonic_timestamps)
2645 << file;
2646
2647 if (reliable) {
2648 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
Naman Guptaa63aa132023-03-22 20:06:34 -07002649 expected_oldest_remote_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002650 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
Naman Guptaa63aa132023-03-22 20:06:34 -07002651 expected_oldest_local_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002652 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2653 monotonic_clock::max_time);
2654 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2655 monotonic_clock::max_time);
2656 } else {
2657 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2658 monotonic_clock::max_time);
2659 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2660 monotonic_clock::max_time);
Naman Guptaa63aa132023-03-22 20:06:34 -07002661 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2662 expected_oldest_remote_monotonic_timestamps);
2663 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2664 expected_oldest_local_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002665 }
2666 break;
2667 case 1:
2668 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2669 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2670 EXPECT_EQ(oldest_local_monotonic_timestamps,
2671 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2672 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2673 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2674 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2675 monotonic_clock::epoch() + chrono::nanoseconds(90250000));
2676 if (reliable) {
2677 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2678 expected_oldest_remote_monotonic_timestamps);
2679 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2680 expected_oldest_local_monotonic_timestamps);
2681 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2682 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2683 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2684 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2685 } else {
2686 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2687 monotonic_clock::max_time);
2688 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2689 monotonic_clock::max_time);
2690 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2691 expected_oldest_remote_monotonic_timestamps);
2692 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2693 expected_oldest_local_monotonic_timestamps);
2694 }
2695 break;
2696 case 2:
2697 EXPECT_EQ(
2698 oldest_remote_monotonic_timestamps,
2699 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2700 EXPECT_EQ(oldest_local_monotonic_timestamps,
2701 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2702 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2703 expected_oldest_local_monotonic_timestamps)
2704 << file;
2705 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2706 expected_oldest_timestamp_monotonic_timestamps)
2707 << file;
2708 if (reliable) {
2709 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2710 expected_oldest_remote_monotonic_timestamps);
2711 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2712 expected_oldest_local_monotonic_timestamps);
2713 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2714 monotonic_clock::max_time);
2715 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2716 monotonic_clock::max_time);
2717 } else {
2718 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2719 monotonic_clock::max_time);
2720 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2721 monotonic_clock::max_time);
2722 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2723 expected_oldest_remote_monotonic_timestamps);
2724 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2725 expected_oldest_local_monotonic_timestamps);
2726 }
2727 break;
Naman Guptaa63aa132023-03-22 20:06:34 -07002728
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002729 case 3:
2730 EXPECT_EQ(
2731 oldest_remote_monotonic_timestamps,
2732 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2733 EXPECT_EQ(oldest_local_monotonic_timestamps,
2734 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2735 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2736 expected_oldest_remote_monotonic_timestamps);
2737 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2738 expected_oldest_local_monotonic_timestamps);
2739 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2740 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2741 EXPECT_EQ(
2742 oldest_logger_local_unreliable_monotonic_timestamps,
2743 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2744 break;
2745 default:
2746 FAIL();
2747 break;
Naman Guptaa63aa132023-03-22 20:06:34 -07002748 }
2749
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002750 switch (log_header->message().parts_index()) {
2751 case 0:
2752 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2753 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2754 break;
2755 case 1:
2756 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2757 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2758 break;
2759 case 2:
2760 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2761 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2762 break;
2763 case 3:
2764 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2765 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2766 break;
2767 [[fallthrough]];
2768 default:
2769 FAIL();
2770 break;
2771 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002772 continue;
2773 }
2774 EXPECT_EQ(
2775 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2776 monotonic_clock::max_time.time_since_epoch().count());
2777 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2778 monotonic_clock::max_time.time_since_epoch().count());
2779 EXPECT_EQ(log_header->message()
2780 .oldest_remote_unreliable_monotonic_timestamps()
2781 ->Get(0),
2782 monotonic_clock::max_time.time_since_epoch().count());
2783 EXPECT_EQ(log_header->message()
2784 .oldest_local_unreliable_monotonic_timestamps()
2785 ->Get(0),
2786 monotonic_clock::max_time.time_since_epoch().count());
2787
2788 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2789 monotonic_clock::time_point(chrono::nanoseconds(
2790 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2791 1)));
2792 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2793 monotonic_clock::time_point(chrono::nanoseconds(
2794 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2795 const monotonic_clock::time_point
2796 oldest_remote_unreliable_monotonic_timestamps =
2797 monotonic_clock::time_point(chrono::nanoseconds(
2798 log_header->message()
2799 .oldest_remote_unreliable_monotonic_timestamps()
2800 ->Get(1)));
2801 const monotonic_clock::time_point
2802 oldest_local_unreliable_monotonic_timestamps =
2803 monotonic_clock::time_point(chrono::nanoseconds(
2804 log_header->message()
2805 .oldest_local_unreliable_monotonic_timestamps()
2806 ->Get(1)));
2807 switch (log_header->message().parts_index()) {
2808 case 0:
2809 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2810 monotonic_clock::max_time);
2811 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2812 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2813 monotonic_clock::max_time);
2814 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2815 monotonic_clock::max_time);
2816 break;
2817 default:
2818 FAIL();
2819 break;
2820 }
2821 }
2822
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002823 EXPECT_EQ(timestamp_file_count, 4u);
Naman Guptaa63aa132023-03-22 20:06:34 -07002824
2825 // Confirm that we can actually sort the resulting log and read it.
2826 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002827 auto sorted_parts = SortParts(filenames);
2828 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2829 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002830
2831 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2832 log_reader_factory.set_send_delay(chrono::microseconds(0));
2833
2834 // This sends out the fetched messages and advances time to the start of
2835 // the log file.
2836 reader.Register(&log_reader_factory);
2837
2838 log_reader_factory.Run();
2839
2840 reader.Deregister();
2841 }
2842}
2843
2844// Tests that we properly handle one direction of message_bridge being
2845// unavailable.
2846TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07002847 std::vector<std::string> actual_filenames;
2848
Naman Guptaa63aa132023-03-22 20:06:34 -07002849 pi1_->Disconnect(pi2_->node());
2850 time_converter_.AddMonotonic(
2851 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2852
2853 time_converter_.AddMonotonic(
2854 {chrono::milliseconds(10000),
2855 chrono::milliseconds(10000) - chrono::milliseconds(1)});
2856 {
2857 LoggerState pi1_logger = MakeLogger(pi1_);
2858
2859 event_loop_factory_.RunFor(chrono::milliseconds(95));
2860
2861 StartLogger(&pi1_logger);
2862
2863 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002864 pi1_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07002865 }
2866
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002867 // Confirm that we can parse the result. LogReader has enough internal
2868 // CHECKs to confirm the right thing happened.
Austin Schuh8fb4b452023-08-04 17:02:27 -07002869 ConfirmReadable(actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07002870}
2871
2872// Tests that we properly handle one direction of message_bridge being
2873// unavailable.
2874TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
2875 pi1_->Disconnect(pi2_->node());
2876 time_converter_.AddMonotonic(
2877 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(500)});
2878
2879 time_converter_.AddMonotonic(
2880 {chrono::milliseconds(10000),
2881 chrono::milliseconds(10000) + chrono::milliseconds(1)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07002882
2883 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07002884 {
2885 LoggerState pi1_logger = MakeLogger(pi1_);
2886
2887 event_loop_factory_.RunFor(chrono::milliseconds(95));
2888
2889 StartLogger(&pi1_logger);
2890
2891 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002892 pi1_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07002893 }
2894
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002895 // Confirm that we can parse the result. LogReader has enough internal
2896 // CHECKs to confirm the right thing happened.
Austin Schuh8fb4b452023-08-04 17:02:27 -07002897 ConfirmReadable(filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07002898}
2899
2900// Tests that we explode if someone passes in a part file twice with a better
2901// error than an out of order error.
2902TEST_P(MultinodeLoggerTest, DuplicateLogFiles) {
2903 time_converter_.AddMonotonic(
2904 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07002905
2906 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07002907 {
2908 LoggerState pi1_logger = MakeLogger(pi1_);
2909
2910 event_loop_factory_.RunFor(chrono::milliseconds(95));
2911
2912 StartLogger(&pi1_logger);
2913
2914 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002915
2916 pi1_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07002917 }
2918
2919 std::vector<std::string> duplicates;
Austin Schuh8fb4b452023-08-04 17:02:27 -07002920 for (const std::string &f : filenames) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002921 duplicates.emplace_back(f);
2922 duplicates.emplace_back(f);
2923 }
2924 EXPECT_DEATH({ SortParts(duplicates); }, "Found duplicate parts in");
2925}
2926
2927// Tests that we explode if someone loses a part out of the middle of a log.
2928TEST_P(MultinodeLoggerTest, MissingPartsFromMiddle) {
2929 time_converter_.AddMonotonic(
2930 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2931 {
2932 LoggerState pi1_logger = MakeLogger(pi1_);
2933
2934 event_loop_factory_.RunFor(chrono::milliseconds(95));
2935
2936 StartLogger(&pi1_logger);
2937 aos::monotonic_clock::time_point last_rotation_time =
2938 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07002939 pi1_logger.logger->set_on_logged_period(
2940 [&](aos::monotonic_clock::time_point) {
2941 const auto now = pi1_logger.event_loop->monotonic_now();
2942 if (now > last_rotation_time + std::chrono::seconds(5)) {
2943 pi1_logger.logger->Rotate();
2944 last_rotation_time = now;
2945 }
2946 });
Naman Guptaa63aa132023-03-22 20:06:34 -07002947
2948 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2949 }
2950
2951 std::vector<std::string> missing_parts;
2952
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002953 missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part0" +
2954 Extension());
2955 missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part2" +
2956 Extension());
Naman Guptaa63aa132023-03-22 20:06:34 -07002957 missing_parts.emplace_back(absl::StrCat(
2958 logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
2959
2960 EXPECT_DEATH({ SortParts(missing_parts); },
2961 "Broken log, missing part files between");
2962}
2963
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002964// Tests that we properly handle a dead node. Do this by just disconnecting
2965// it and only using one nodes of logs.
Naman Guptaa63aa132023-03-22 20:06:34 -07002966TEST_P(MultinodeLoggerTest, DeadNode) {
2967 pi1_->Disconnect(pi2_->node());
2968 pi2_->Disconnect(pi1_->node());
2969 time_converter_.AddMonotonic(
2970 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2971 {
2972 LoggerState pi1_logger = MakeLogger(pi1_);
2973
2974 event_loop_factory_.RunFor(chrono::milliseconds(95));
2975
2976 StartLogger(&pi1_logger);
2977
2978 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2979 }
2980
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002981 // Confirm that we can parse the result. LogReader has enough internal
2982 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07002983 ConfirmReadable(MakePi1DeadNodeLogfiles());
2984}
2985
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002986// Tests that we can relog with a different config. This makes most sense
2987// when you are trying to edit a log and want to use channel renaming + the
2988// original config in the new log.
Naman Guptaa63aa132023-03-22 20:06:34 -07002989TEST_P(MultinodeLoggerTest, LogDifferentConfig) {
2990 time_converter_.StartEqual();
2991 {
2992 LoggerState pi1_logger = MakeLogger(pi1_);
2993 LoggerState pi2_logger = MakeLogger(pi2_);
2994
2995 event_loop_factory_.RunFor(chrono::milliseconds(95));
2996
2997 StartLogger(&pi1_logger);
2998 StartLogger(&pi2_logger);
2999
3000 event_loop_factory_.RunFor(chrono::milliseconds(20000));
3001 }
3002
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003003 auto sorted_parts = SortParts(logfiles_);
3004 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3005 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07003006 reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
3007
3008 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
3009 log_reader_factory.set_send_delay(chrono::microseconds(0));
3010
3011 // This sends out the fetched messages and advances time to the start of the
3012 // log file.
3013 reader.Register(&log_reader_factory);
3014
3015 const Node *pi1 =
3016 configuration::GetNode(log_reader_factory.configuration(), "pi1");
3017 const Node *pi2 =
3018 configuration::GetNode(log_reader_factory.configuration(), "pi2");
3019
3020 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
3021 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
3022 LOG(INFO) << "now pi1 "
3023 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
3024 LOG(INFO) << "now pi2 "
3025 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
3026
3027 EXPECT_THAT(reader.LoggedNodes(),
3028 ::testing::ElementsAre(
3029 configuration::GetNode(reader.logged_configuration(), pi1),
3030 configuration::GetNode(reader.logged_configuration(), pi2)));
3031
3032 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
3033
3034 // And confirm we can re-create a log again, while checking the contents.
3035 std::vector<std::string> log_files;
3036 {
3037 LoggerState pi1_logger =
3038 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
3039 &log_reader_factory, reader.logged_configuration());
3040 LoggerState pi2_logger =
3041 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
3042 &log_reader_factory, reader.logged_configuration());
3043
3044 pi1_logger.StartLogger(tmp_dir_ + "/relogged1");
3045 pi2_logger.StartLogger(tmp_dir_ + "/relogged2");
3046
3047 log_reader_factory.Run();
3048
3049 for (auto &x : pi1_logger.log_namer->all_filenames()) {
3050 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged1_", x));
3051 }
3052 for (auto &x : pi2_logger.log_namer->all_filenames()) {
3053 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged2_", x));
3054 }
3055 }
3056
3057 reader.Deregister();
3058
3059 // And verify that we can run the LogReader over the relogged files without
3060 // hitting any fatal errors.
3061 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003062 auto sorted_parts = SortParts(log_files);
3063 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3064 LogReader relogged_reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07003065 relogged_reader.Register();
3066
3067 relogged_reader.event_loop_factory()->Run();
3068 }
3069}
3070
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003071// Tests that we properly replay a log where the start time for a node is
3072// before any data on the node. This can happen if the logger starts before
3073// data is published. While the scenario below is a bit convoluted, we have
3074// seen logs like this generated out in the wild.
Naman Guptaa63aa132023-03-22 20:06:34 -07003075TEST(MultinodeRebootLoggerTest, StartTimeBeforeData) {
3076 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3077 aos::configuration::ReadConfig(ArtifactPath(
3078 "aos/events/logging/multinode_pingpong_split3_config.json"));
3079 message_bridge::TestingTimeConverter time_converter(
3080 configuration::NodesCount(&config.message()));
3081 SimulatedEventLoopFactory event_loop_factory(&config.message());
3082 event_loop_factory.SetTimeConverter(&time_converter);
3083 NodeEventLoopFactory *const pi1 =
3084 event_loop_factory.GetNodeEventLoopFactory("pi1");
3085 const size_t pi1_index = configuration::GetNodeIndex(
3086 event_loop_factory.configuration(), pi1->node());
3087 NodeEventLoopFactory *const pi2 =
3088 event_loop_factory.GetNodeEventLoopFactory("pi2");
3089 const size_t pi2_index = configuration::GetNodeIndex(
3090 event_loop_factory.configuration(), pi2->node());
3091 NodeEventLoopFactory *const pi3 =
3092 event_loop_factory.GetNodeEventLoopFactory("pi3");
3093 const size_t pi3_index = configuration::GetNodeIndex(
3094 event_loop_factory.configuration(), pi3->node());
3095
3096 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003097 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003098 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003099 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003100 const std::string kLogfile2_2 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003101 aos::testing::TestTmpDir() + "/logs/multi_logfile2.2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003102 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003103 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
3104
Naman Guptaa63aa132023-03-22 20:06:34 -07003105 const UUID pi1_boot0 = UUID::Random();
3106 const UUID pi2_boot0 = UUID::Random();
3107 const UUID pi2_boot1 = UUID::Random();
3108 const UUID pi3_boot0 = UUID::Random();
3109 {
3110 CHECK_EQ(pi1_index, 0u);
3111 CHECK_EQ(pi2_index, 1u);
3112 CHECK_EQ(pi3_index, 2u);
3113
3114 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3115 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3116 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3117 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3118
3119 time_converter.AddNextTimestamp(
3120 distributed_clock::epoch(),
3121 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3122 BootTimestamp::epoch()});
3123 const chrono::nanoseconds reboot_time = chrono::milliseconds(20000);
3124 time_converter.AddNextTimestamp(
3125 distributed_clock::epoch() + reboot_time,
3126 {BootTimestamp::epoch() + reboot_time,
3127 BootTimestamp{
3128 .boot = 1,
3129 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)},
3130 BootTimestamp::epoch() + reboot_time});
3131 }
3132
3133 // Make everything perfectly quiet.
3134 event_loop_factory.SkipTimingReport();
3135 event_loop_factory.DisableStatistics();
3136
3137 std::vector<std::string> filenames;
3138 {
3139 LoggerState pi1_logger = MakeLoggerState(
3140 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3141 LoggerState pi3_logger = MakeLoggerState(
3142 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3143 {
3144 // And now start the logger.
3145 LoggerState pi2_logger = MakeLoggerState(
3146 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3147
3148 event_loop_factory.RunFor(chrono::milliseconds(1000));
3149
3150 pi1_logger.StartLogger(kLogfile1_1);
3151 pi3_logger.StartLogger(kLogfile3_1);
3152 pi2_logger.StartLogger(kLogfile2_1);
3153
3154 event_loop_factory.RunFor(chrono::milliseconds(10000));
3155
3156 // Now that we've got a start time in the past, turn on data.
3157 event_loop_factory.EnableStatistics();
3158 std::unique_ptr<aos::EventLoop> ping_event_loop =
3159 pi1->MakeEventLoop("ping");
3160 Ping ping(ping_event_loop.get());
3161
3162 pi2->AlwaysStart<Pong>("pong");
3163
3164 event_loop_factory.RunFor(chrono::milliseconds(3000));
3165
3166 pi2_logger.AppendAllFilenames(&filenames);
3167
3168 // Stop logging on pi2 before rebooting and completely shut off all
3169 // messages on pi2.
3170 pi2->DisableStatistics();
3171 pi1->Disconnect(pi2->node());
3172 pi2->Disconnect(pi1->node());
3173 }
3174 event_loop_factory.RunFor(chrono::milliseconds(7000));
3175 // pi2 now reboots.
3176 {
3177 event_loop_factory.RunFor(chrono::milliseconds(1000));
3178
3179 // Start logging again on pi2 after it is up.
3180 LoggerState pi2_logger = MakeLoggerState(
3181 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3182 pi2_logger.StartLogger(kLogfile2_2);
3183
3184 event_loop_factory.RunFor(chrono::milliseconds(10000));
3185 // And, now that we have a start time in the log, turn data back on.
3186 pi2->EnableStatistics();
3187 pi1->Connect(pi2->node());
3188 pi2->Connect(pi1->node());
3189
3190 pi2->AlwaysStart<Pong>("pong");
3191 std::unique_ptr<aos::EventLoop> ping_event_loop =
3192 pi1->MakeEventLoop("ping");
3193 Ping ping(ping_event_loop.get());
3194
3195 event_loop_factory.RunFor(chrono::milliseconds(3000));
3196
3197 pi2_logger.AppendAllFilenames(&filenames);
3198 }
3199
3200 pi1_logger.AppendAllFilenames(&filenames);
3201 pi3_logger.AppendAllFilenames(&filenames);
3202 }
3203
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003204 // Confirm that we can parse the result. LogReader has enough internal
3205 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003206 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003207 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003208 auto result = ConfirmReadable(filenames);
3209 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
3210 chrono::seconds(1)));
3211 EXPECT_THAT(result[0].second,
3212 ::testing::ElementsAre(realtime_clock::epoch() +
3213 chrono::microseconds(34990350)));
3214
3215 EXPECT_THAT(result[1].first,
3216 ::testing::ElementsAre(
3217 realtime_clock::epoch() + chrono::seconds(1),
3218 realtime_clock::epoch() + chrono::microseconds(3323000)));
3219 EXPECT_THAT(result[1].second,
3220 ::testing::ElementsAre(
3221 realtime_clock::epoch() + chrono::microseconds(13990200),
3222 realtime_clock::epoch() + chrono::microseconds(16313200)));
3223
3224 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
3225 chrono::seconds(1)));
3226 EXPECT_THAT(result[2].second,
3227 ::testing::ElementsAre(realtime_clock::epoch() +
3228 chrono::microseconds(34900150)));
3229}
3230
3231// Tests that local data before remote data after reboot is properly replayed.
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003232// We only trigger a reboot in the timestamp interpolation function when
3233// solving the timestamp problem when we actually have a point in the
3234// function. This originally only happened when a point passes the noncausal
3235// filter. At the start of time for the second boot, if we aren't careful, we
3236// will have messages which need to be published at times before the boot.
3237// This happens when a local message is in the log before a forwarded message,
3238// so there is no point in the interpolation function. This delays the
3239// reboot. So, we need to recreate that situation and make sure it doesn't
3240// come back.
Naman Guptaa63aa132023-03-22 20:06:34 -07003241TEST(MultinodeRebootLoggerTest,
3242 LocalMessageBeforeRemoteBeforeStartAfterReboot) {
3243 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3244 aos::configuration::ReadConfig(ArtifactPath(
3245 "aos/events/logging/multinode_pingpong_split3_config.json"));
3246 message_bridge::TestingTimeConverter time_converter(
3247 configuration::NodesCount(&config.message()));
3248 SimulatedEventLoopFactory event_loop_factory(&config.message());
3249 event_loop_factory.SetTimeConverter(&time_converter);
3250 NodeEventLoopFactory *const pi1 =
3251 event_loop_factory.GetNodeEventLoopFactory("pi1");
3252 const size_t pi1_index = configuration::GetNodeIndex(
3253 event_loop_factory.configuration(), pi1->node());
3254 NodeEventLoopFactory *const pi2 =
3255 event_loop_factory.GetNodeEventLoopFactory("pi2");
3256 const size_t pi2_index = configuration::GetNodeIndex(
3257 event_loop_factory.configuration(), pi2->node());
3258 NodeEventLoopFactory *const pi3 =
3259 event_loop_factory.GetNodeEventLoopFactory("pi3");
3260 const size_t pi3_index = configuration::GetNodeIndex(
3261 event_loop_factory.configuration(), pi3->node());
3262
3263 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003264 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003265 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003266 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003267 const std::string kLogfile2_2 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003268 aos::testing::TestTmpDir() + "/logs/multi_logfile2.2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003269 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003270 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003271 const UUID pi1_boot0 = UUID::Random();
3272 const UUID pi2_boot0 = UUID::Random();
3273 const UUID pi2_boot1 = UUID::Random();
3274 const UUID pi3_boot0 = UUID::Random();
3275 {
3276 CHECK_EQ(pi1_index, 0u);
3277 CHECK_EQ(pi2_index, 1u);
3278 CHECK_EQ(pi3_index, 2u);
3279
3280 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3281 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3282 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3283 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3284
3285 time_converter.AddNextTimestamp(
3286 distributed_clock::epoch(),
3287 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3288 BootTimestamp::epoch()});
3289 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3290 time_converter.AddNextTimestamp(
3291 distributed_clock::epoch() + reboot_time,
3292 {BootTimestamp::epoch() + reboot_time,
3293 BootTimestamp{.boot = 1,
3294 .time = monotonic_clock::epoch() + reboot_time +
3295 chrono::seconds(100)},
3296 BootTimestamp::epoch() + reboot_time});
3297 }
3298
3299 std::vector<std::string> filenames;
3300 {
3301 LoggerState pi1_logger = MakeLoggerState(
3302 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3303 LoggerState pi3_logger = MakeLoggerState(
3304 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3305 {
3306 // And now start the logger.
3307 LoggerState pi2_logger = MakeLoggerState(
3308 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3309
3310 pi1_logger.StartLogger(kLogfile1_1);
3311 pi3_logger.StartLogger(kLogfile3_1);
3312 pi2_logger.StartLogger(kLogfile2_1);
3313
3314 event_loop_factory.RunFor(chrono::milliseconds(1005));
3315
3316 // Now that we've got a start time in the past, turn on data.
3317 std::unique_ptr<aos::EventLoop> ping_event_loop =
3318 pi1->MakeEventLoop("ping");
3319 Ping ping(ping_event_loop.get());
3320
3321 pi2->AlwaysStart<Pong>("pong");
3322
3323 event_loop_factory.RunFor(chrono::milliseconds(3000));
3324
3325 pi2_logger.AppendAllFilenames(&filenames);
3326
3327 // Disable any remote messages on pi2.
3328 pi1->Disconnect(pi2->node());
3329 pi2->Disconnect(pi1->node());
3330 }
3331 event_loop_factory.RunFor(chrono::milliseconds(995));
3332 // pi2 now reboots at 5 seconds.
3333 {
3334 event_loop_factory.RunFor(chrono::milliseconds(1000));
3335
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003336 // Make local stuff happen before we start logging and connect the
3337 // remote.
Naman Guptaa63aa132023-03-22 20:06:34 -07003338 pi2->AlwaysStart<Pong>("pong");
3339 std::unique_ptr<aos::EventLoop> ping_event_loop =
3340 pi1->MakeEventLoop("ping");
3341 Ping ping(ping_event_loop.get());
3342 event_loop_factory.RunFor(chrono::milliseconds(1005));
3343
3344 // Start logging again on pi2 after it is up.
3345 LoggerState pi2_logger = MakeLoggerState(
3346 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3347 pi2_logger.StartLogger(kLogfile2_2);
3348
3349 // And allow remote messages now that we have some local ones.
3350 pi1->Connect(pi2->node());
3351 pi2->Connect(pi1->node());
3352
3353 event_loop_factory.RunFor(chrono::milliseconds(1000));
3354
3355 event_loop_factory.RunFor(chrono::milliseconds(3000));
3356
3357 pi2_logger.AppendAllFilenames(&filenames);
3358 }
3359
3360 pi1_logger.AppendAllFilenames(&filenames);
3361 pi3_logger.AppendAllFilenames(&filenames);
3362 }
3363
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003364 // Confirm that we can parse the result. LogReader has enough internal
3365 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003366 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003367 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003368 auto result = ConfirmReadable(filenames);
3369
3370 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3371 EXPECT_THAT(result[0].second,
3372 ::testing::ElementsAre(realtime_clock::epoch() +
3373 chrono::microseconds(11000350)));
3374
3375 EXPECT_THAT(result[1].first,
3376 ::testing::ElementsAre(
3377 realtime_clock::epoch(),
3378 realtime_clock::epoch() + chrono::microseconds(107005000)));
3379 EXPECT_THAT(result[1].second,
3380 ::testing::ElementsAre(
3381 realtime_clock::epoch() + chrono::microseconds(4000150),
3382 realtime_clock::epoch() + chrono::microseconds(111000200)));
3383
3384 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3385 EXPECT_THAT(result[2].second,
3386 ::testing::ElementsAre(realtime_clock::epoch() +
3387 chrono::microseconds(11000150)));
3388
3389 auto start_stop_result = ConfirmReadable(
3390 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3391 realtime_clock::epoch() + chrono::milliseconds(3000));
3392
3393 EXPECT_THAT(
3394 start_stop_result[0].first,
3395 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3396 EXPECT_THAT(
3397 start_stop_result[0].second,
3398 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3399 EXPECT_THAT(
3400 start_stop_result[1].first,
3401 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3402 EXPECT_THAT(
3403 start_stop_result[1].second,
3404 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3405 EXPECT_THAT(
3406 start_stop_result[2].first,
3407 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3408 EXPECT_THAT(
3409 start_stop_result[2].second,
3410 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3411}
3412
3413// Tests that setting the start and stop flags across a reboot works as
3414// expected.
3415TEST(MultinodeRebootLoggerTest, RebootStartStopTimes) {
3416 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3417 aos::configuration::ReadConfig(ArtifactPath(
3418 "aos/events/logging/multinode_pingpong_split3_config.json"));
3419 message_bridge::TestingTimeConverter time_converter(
3420 configuration::NodesCount(&config.message()));
3421 SimulatedEventLoopFactory event_loop_factory(&config.message());
3422 event_loop_factory.SetTimeConverter(&time_converter);
3423 NodeEventLoopFactory *const pi1 =
3424 event_loop_factory.GetNodeEventLoopFactory("pi1");
3425 const size_t pi1_index = configuration::GetNodeIndex(
3426 event_loop_factory.configuration(), pi1->node());
3427 NodeEventLoopFactory *const pi2 =
3428 event_loop_factory.GetNodeEventLoopFactory("pi2");
3429 const size_t pi2_index = configuration::GetNodeIndex(
3430 event_loop_factory.configuration(), pi2->node());
3431 NodeEventLoopFactory *const pi3 =
3432 event_loop_factory.GetNodeEventLoopFactory("pi3");
3433 const size_t pi3_index = configuration::GetNodeIndex(
3434 event_loop_factory.configuration(), pi3->node());
3435
3436 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003437 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003438 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003439 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003440 const std::string kLogfile2_2 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003441 aos::testing::TestTmpDir() + "/logs/multi_logfile2.2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003442 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003443 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003444 {
3445 CHECK_EQ(pi1_index, 0u);
3446 CHECK_EQ(pi2_index, 1u);
3447 CHECK_EQ(pi3_index, 2u);
3448
3449 time_converter.AddNextTimestamp(
3450 distributed_clock::epoch(),
3451 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3452 BootTimestamp::epoch()});
3453 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3454 time_converter.AddNextTimestamp(
3455 distributed_clock::epoch() + reboot_time,
3456 {BootTimestamp::epoch() + reboot_time,
3457 BootTimestamp{.boot = 1,
3458 .time = monotonic_clock::epoch() + reboot_time},
3459 BootTimestamp::epoch() + reboot_time});
3460 }
3461
3462 std::vector<std::string> filenames;
3463 {
3464 LoggerState pi1_logger = MakeLoggerState(
3465 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3466 LoggerState pi3_logger = MakeLoggerState(
3467 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3468 {
3469 // And now start the logger.
3470 LoggerState pi2_logger = MakeLoggerState(
3471 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3472
3473 pi1_logger.StartLogger(kLogfile1_1);
3474 pi3_logger.StartLogger(kLogfile3_1);
3475 pi2_logger.StartLogger(kLogfile2_1);
3476
3477 event_loop_factory.RunFor(chrono::milliseconds(1005));
3478
3479 // Now that we've got a start time in the past, turn on data.
3480 std::unique_ptr<aos::EventLoop> ping_event_loop =
3481 pi1->MakeEventLoop("ping");
3482 Ping ping(ping_event_loop.get());
3483
3484 pi2->AlwaysStart<Pong>("pong");
3485
3486 event_loop_factory.RunFor(chrono::milliseconds(3000));
3487
3488 pi2_logger.AppendAllFilenames(&filenames);
3489 }
3490 event_loop_factory.RunFor(chrono::milliseconds(995));
3491 // pi2 now reboots at 5 seconds.
3492 {
3493 event_loop_factory.RunFor(chrono::milliseconds(1000));
3494
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003495 // Make local stuff happen before we start logging and connect the
3496 // remote.
Naman Guptaa63aa132023-03-22 20:06:34 -07003497 pi2->AlwaysStart<Pong>("pong");
3498 std::unique_ptr<aos::EventLoop> ping_event_loop =
3499 pi1->MakeEventLoop("ping");
3500 Ping ping(ping_event_loop.get());
3501 event_loop_factory.RunFor(chrono::milliseconds(5));
3502
3503 // Start logging again on pi2 after it is up.
3504 LoggerState pi2_logger = MakeLoggerState(
3505 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3506 pi2_logger.StartLogger(kLogfile2_2);
3507
3508 event_loop_factory.RunFor(chrono::milliseconds(5000));
3509
3510 pi2_logger.AppendAllFilenames(&filenames);
3511 }
3512
3513 pi1_logger.AppendAllFilenames(&filenames);
3514 pi3_logger.AppendAllFilenames(&filenames);
3515 }
3516
3517 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003518 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003519 auto result = ConfirmReadable(filenames);
3520
3521 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3522 EXPECT_THAT(result[0].second,
3523 ::testing::ElementsAre(realtime_clock::epoch() +
3524 chrono::microseconds(11000350)));
3525
3526 EXPECT_THAT(result[1].first,
3527 ::testing::ElementsAre(
3528 realtime_clock::epoch(),
3529 realtime_clock::epoch() + chrono::microseconds(6005000)));
3530 EXPECT_THAT(result[1].second,
3531 ::testing::ElementsAre(
3532 realtime_clock::epoch() + chrono::microseconds(4900150),
3533 realtime_clock::epoch() + chrono::microseconds(11000200)));
3534
3535 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3536 EXPECT_THAT(result[2].second,
3537 ::testing::ElementsAre(realtime_clock::epoch() +
3538 chrono::microseconds(11000150)));
3539
3540 // Confirm we observed the correct start and stop times. We should see the
3541 // reboot here.
3542 auto start_stop_result = ConfirmReadable(
3543 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3544 realtime_clock::epoch() + chrono::milliseconds(8000));
3545
3546 EXPECT_THAT(
3547 start_stop_result[0].first,
3548 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3549 EXPECT_THAT(
3550 start_stop_result[0].second,
3551 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3552 EXPECT_THAT(start_stop_result[1].first,
3553 ::testing::ElementsAre(
3554 realtime_clock::epoch() + chrono::seconds(2),
3555 realtime_clock::epoch() + chrono::microseconds(6005000)));
3556 EXPECT_THAT(start_stop_result[1].second,
3557 ::testing::ElementsAre(
3558 realtime_clock::epoch() + chrono::microseconds(4900150),
3559 realtime_clock::epoch() + chrono::seconds(8)));
3560 EXPECT_THAT(
3561 start_stop_result[2].first,
3562 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3563 EXPECT_THAT(
3564 start_stop_result[2].second,
3565 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3566}
3567
3568// Tests that we properly handle one direction being down.
3569TEST(MissingDirectionTest, OneDirection) {
3570 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3571 aos::configuration::ReadConfig(ArtifactPath(
3572 "aos/events/logging/multinode_pingpong_split4_config.json"));
3573 message_bridge::TestingTimeConverter time_converter(
3574 configuration::NodesCount(&config.message()));
3575 SimulatedEventLoopFactory event_loop_factory(&config.message());
3576 event_loop_factory.SetTimeConverter(&time_converter);
3577
3578 NodeEventLoopFactory *const pi1 =
3579 event_loop_factory.GetNodeEventLoopFactory("pi1");
3580 const size_t pi1_index = configuration::GetNodeIndex(
3581 event_loop_factory.configuration(), pi1->node());
3582 NodeEventLoopFactory *const pi2 =
3583 event_loop_factory.GetNodeEventLoopFactory("pi2");
3584 const size_t pi2_index = configuration::GetNodeIndex(
3585 event_loop_factory.configuration(), pi2->node());
3586 std::vector<std::string> filenames;
3587
3588 {
3589 CHECK_EQ(pi1_index, 0u);
3590 CHECK_EQ(pi2_index, 1u);
3591
3592 time_converter.AddNextTimestamp(
3593 distributed_clock::epoch(),
3594 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3595
3596 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3597 time_converter.AddNextTimestamp(
3598 distributed_clock::epoch() + reboot_time,
3599 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3600 BootTimestamp::epoch() + reboot_time});
3601 }
3602
3603 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003604 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003605 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003606 aos::testing::TestTmpDir() + "/logs/multi_logfile1.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003607
3608 pi2->Disconnect(pi1->node());
3609
3610 pi1->AlwaysStart<Ping>("ping");
3611 pi2->AlwaysStart<Pong>("pong");
3612
3613 {
3614 LoggerState pi2_logger = MakeLoggerState(
3615 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3616
3617 event_loop_factory.RunFor(chrono::milliseconds(95));
3618
3619 pi2_logger.StartLogger(kLogfile2_1);
3620
3621 event_loop_factory.RunFor(chrono::milliseconds(6000));
3622
3623 pi2->Connect(pi1->node());
3624
3625 LoggerState pi1_logger = MakeLoggerState(
3626 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3627 pi1_logger.StartLogger(kLogfile1_1);
3628
3629 event_loop_factory.RunFor(chrono::milliseconds(5000));
3630 pi1_logger.AppendAllFilenames(&filenames);
3631 pi2_logger.AppendAllFilenames(&filenames);
3632 }
3633
3634 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003635 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003636 ConfirmReadable(filenames);
3637}
3638
3639// Tests that we properly handle only one direction ever existing after a
3640// reboot.
3641TEST(MissingDirectionTest, OneDirectionAfterReboot) {
3642 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3643 aos::configuration::ReadConfig(ArtifactPath(
3644 "aos/events/logging/multinode_pingpong_split4_config.json"));
3645 message_bridge::TestingTimeConverter time_converter(
3646 configuration::NodesCount(&config.message()));
3647 SimulatedEventLoopFactory event_loop_factory(&config.message());
3648 event_loop_factory.SetTimeConverter(&time_converter);
3649
3650 NodeEventLoopFactory *const pi1 =
3651 event_loop_factory.GetNodeEventLoopFactory("pi1");
3652 const size_t pi1_index = configuration::GetNodeIndex(
3653 event_loop_factory.configuration(), pi1->node());
3654 NodeEventLoopFactory *const pi2 =
3655 event_loop_factory.GetNodeEventLoopFactory("pi2");
3656 const size_t pi2_index = configuration::GetNodeIndex(
3657 event_loop_factory.configuration(), pi2->node());
3658 std::vector<std::string> filenames;
3659
3660 {
3661 CHECK_EQ(pi1_index, 0u);
3662 CHECK_EQ(pi2_index, 1u);
3663
3664 time_converter.AddNextTimestamp(
3665 distributed_clock::epoch(),
3666 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3667
3668 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3669 time_converter.AddNextTimestamp(
3670 distributed_clock::epoch() + reboot_time,
3671 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3672 BootTimestamp::epoch() + reboot_time});
3673 }
3674
3675 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003676 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003677
3678 pi1->AlwaysStart<Ping>("ping");
3679
3680 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3681 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3682 // second boot.
3683 {
3684 LoggerState pi2_logger = MakeLoggerState(
3685 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3686
3687 event_loop_factory.RunFor(chrono::milliseconds(95));
3688
3689 pi2_logger.StartLogger(kLogfile2_1);
3690
3691 event_loop_factory.RunFor(chrono::milliseconds(4000));
3692
3693 pi2->Disconnect(pi1->node());
3694
3695 event_loop_factory.RunFor(chrono::milliseconds(1000));
3696 pi1->AlwaysStart<Ping>("ping");
3697
3698 event_loop_factory.RunFor(chrono::milliseconds(5000));
3699 pi2_logger.AppendAllFilenames(&filenames);
3700 }
3701
3702 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003703 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003704 ConfirmReadable(filenames);
3705}
3706
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003707// Tests that we properly handle only one direction ever existing after a
3708// reboot with only reliable data.
Naman Guptaa63aa132023-03-22 20:06:34 -07003709TEST(MissingDirectionTest, OneDirectionAfterRebootReliable) {
3710 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003711 aos::configuration::ReadConfig(
3712 ArtifactPath("aos/events/logging/"
3713 "multinode_pingpong_split4_reliable_config.json"));
Naman Guptaa63aa132023-03-22 20:06:34 -07003714 message_bridge::TestingTimeConverter time_converter(
3715 configuration::NodesCount(&config.message()));
3716 SimulatedEventLoopFactory event_loop_factory(&config.message());
3717 event_loop_factory.SetTimeConverter(&time_converter);
3718
3719 NodeEventLoopFactory *const pi1 =
3720 event_loop_factory.GetNodeEventLoopFactory("pi1");
3721 const size_t pi1_index = configuration::GetNodeIndex(
3722 event_loop_factory.configuration(), pi1->node());
3723 NodeEventLoopFactory *const pi2 =
3724 event_loop_factory.GetNodeEventLoopFactory("pi2");
3725 const size_t pi2_index = configuration::GetNodeIndex(
3726 event_loop_factory.configuration(), pi2->node());
3727 std::vector<std::string> filenames;
3728
3729 {
3730 CHECK_EQ(pi1_index, 0u);
3731 CHECK_EQ(pi2_index, 1u);
3732
3733 time_converter.AddNextTimestamp(
3734 distributed_clock::epoch(),
3735 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3736
3737 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3738 time_converter.AddNextTimestamp(
3739 distributed_clock::epoch() + reboot_time,
3740 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3741 BootTimestamp::epoch() + reboot_time});
3742 }
3743
3744 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003745 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003746
3747 pi1->AlwaysStart<Ping>("ping");
3748
3749 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3750 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3751 // second boot.
3752 {
3753 LoggerState pi2_logger = MakeLoggerState(
3754 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3755
3756 event_loop_factory.RunFor(chrono::milliseconds(95));
3757
3758 pi2_logger.StartLogger(kLogfile2_1);
3759
3760 event_loop_factory.RunFor(chrono::milliseconds(4000));
3761
3762 pi2->Disconnect(pi1->node());
3763
3764 event_loop_factory.RunFor(chrono::milliseconds(1000));
3765 pi1->AlwaysStart<Ping>("ping");
3766
3767 event_loop_factory.RunFor(chrono::milliseconds(5000));
3768 pi2_logger.AppendAllFilenames(&filenames);
3769 }
3770
3771 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003772 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003773 ConfirmReadable(filenames);
3774}
3775
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003776// Tests that we properly handle only one direction ever existing after a
3777// reboot with mixed unreliable vs reliable, where reliable has an earlier
3778// timestamp than unreliable.
Brian Smartte67d7112023-03-20 12:06:30 -07003779TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase1) {
3780 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3781 aos::configuration::ReadConfig(ArtifactPath(
3782 "aos/events/logging/multinode_pingpong_split4_mixed1_config.json"));
3783 message_bridge::TestingTimeConverter time_converter(
3784 configuration::NodesCount(&config.message()));
3785 SimulatedEventLoopFactory event_loop_factory(&config.message());
3786 event_loop_factory.SetTimeConverter(&time_converter);
3787
3788 NodeEventLoopFactory *const pi1 =
3789 event_loop_factory.GetNodeEventLoopFactory("pi1");
3790 const size_t pi1_index = configuration::GetNodeIndex(
3791 event_loop_factory.configuration(), pi1->node());
3792 NodeEventLoopFactory *const pi2 =
3793 event_loop_factory.GetNodeEventLoopFactory("pi2");
3794 const size_t pi2_index = configuration::GetNodeIndex(
3795 event_loop_factory.configuration(), pi2->node());
3796 std::vector<std::string> filenames;
3797
3798 {
3799 CHECK_EQ(pi1_index, 0u);
3800 CHECK_EQ(pi2_index, 1u);
3801
3802 time_converter.AddNextTimestamp(
3803 distributed_clock::epoch(),
3804 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3805
3806 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3807 time_converter.AddNextTimestamp(
3808 distributed_clock::epoch() + reboot_time,
3809 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3810 BootTimestamp::epoch() + reboot_time});
3811 }
3812
3813 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003814 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Brian Smartte67d7112023-03-20 12:06:30 -07003815
3816 // The following sequence using the above reference config creates
3817 // a reliable message timestamp < unreliable message timestamp.
3818 {
3819 pi1->DisableStatistics();
3820 pi2->DisableStatistics();
3821
3822 event_loop_factory.RunFor(chrono::milliseconds(95));
3823
3824 pi1->AlwaysStart<Ping>("ping");
3825
3826 event_loop_factory.RunFor(chrono::milliseconds(5250));
3827
3828 pi1->EnableStatistics();
3829
3830 event_loop_factory.RunFor(chrono::milliseconds(1000));
3831
3832 LoggerState pi2_logger = MakeLoggerState(
3833 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3834
3835 pi2_logger.StartLogger(kLogfile2_1);
3836
3837 event_loop_factory.RunFor(chrono::milliseconds(5000));
3838 pi2_logger.AppendAllFilenames(&filenames);
3839 }
3840
3841 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003842 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Brian Smartte67d7112023-03-20 12:06:30 -07003843 ConfirmReadable(filenames);
3844}
3845
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003846// Tests that we properly handle only one direction ever existing after a
3847// reboot with mixed unreliable vs reliable, where unreliable has an earlier
3848// timestamp than reliable.
Brian Smartte67d7112023-03-20 12:06:30 -07003849TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase2) {
3850 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3851 aos::configuration::ReadConfig(ArtifactPath(
3852 "aos/events/logging/multinode_pingpong_split4_mixed2_config.json"));
3853 message_bridge::TestingTimeConverter time_converter(
3854 configuration::NodesCount(&config.message()));
3855 SimulatedEventLoopFactory event_loop_factory(&config.message());
3856 event_loop_factory.SetTimeConverter(&time_converter);
3857
3858 NodeEventLoopFactory *const pi1 =
3859 event_loop_factory.GetNodeEventLoopFactory("pi1");
3860 const size_t pi1_index = configuration::GetNodeIndex(
3861 event_loop_factory.configuration(), pi1->node());
3862 NodeEventLoopFactory *const pi2 =
3863 event_loop_factory.GetNodeEventLoopFactory("pi2");
3864 const size_t pi2_index = configuration::GetNodeIndex(
3865 event_loop_factory.configuration(), pi2->node());
3866 std::vector<std::string> filenames;
3867
3868 {
3869 CHECK_EQ(pi1_index, 0u);
3870 CHECK_EQ(pi2_index, 1u);
3871
3872 time_converter.AddNextTimestamp(
3873 distributed_clock::epoch(),
3874 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3875
3876 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3877 time_converter.AddNextTimestamp(
3878 distributed_clock::epoch() + reboot_time,
3879 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3880 BootTimestamp::epoch() + reboot_time});
3881 }
3882
3883 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003884 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Brian Smartte67d7112023-03-20 12:06:30 -07003885
3886 // The following sequence using the above reference config creates
3887 // an unreliable message timestamp < reliable message timestamp.
3888 {
3889 pi1->DisableStatistics();
3890 pi2->DisableStatistics();
3891
3892 event_loop_factory.RunFor(chrono::milliseconds(95));
3893
3894 pi1->AlwaysStart<Ping>("ping");
3895
3896 event_loop_factory.RunFor(chrono::milliseconds(5250));
3897
3898 pi1->EnableStatistics();
3899
3900 event_loop_factory.RunFor(chrono::milliseconds(1000));
3901
3902 LoggerState pi2_logger = MakeLoggerState(
3903 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3904
3905 pi2_logger.StartLogger(kLogfile2_1);
3906
3907 event_loop_factory.RunFor(chrono::milliseconds(5000));
3908 pi2_logger.AppendAllFilenames(&filenames);
3909 }
3910
3911 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003912 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Brian Smartte67d7112023-03-20 12:06:30 -07003913 ConfirmReadable(filenames);
3914}
3915
Naman Guptaa63aa132023-03-22 20:06:34 -07003916// Tests that we properly handle what used to be a time violation in one
3917// direction. This can occur when one direction goes down after sending some
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003918// data, but the other keeps working. The down direction ends up resolving to
3919// a straight line in the noncausal filter, where the direction which is still
3920// up can cross that line. Really, time progressed along just fine but we
3921// assumed that the offset was a line when it could have deviated by up to
3922// 1ms/second.
Naman Guptaa63aa132023-03-22 20:06:34 -07003923TEST_P(MultinodeLoggerTest, OneDirectionTimeDrift) {
3924 std::vector<std::string> filenames;
3925
3926 CHECK_EQ(pi1_index_, 0u);
3927 CHECK_EQ(pi2_index_, 1u);
3928
3929 time_converter_.AddNextTimestamp(
3930 distributed_clock::epoch(),
3931 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3932
3933 const chrono::nanoseconds before_disconnect_duration =
3934 time_converter_.AddMonotonic(
3935 {chrono::milliseconds(1000), chrono::milliseconds(1000)});
3936
3937 const chrono::nanoseconds test_duration =
3938 time_converter_.AddMonotonic(
3939 {chrono::milliseconds(1000), chrono::milliseconds(1000)}) +
3940 time_converter_.AddMonotonic(
3941 {chrono::milliseconds(10000),
3942 chrono::milliseconds(10000) - chrono::milliseconds(5)}) +
3943 time_converter_.AddMonotonic(
3944 {chrono::milliseconds(10000),
3945 chrono::milliseconds(10000) + chrono::milliseconds(5)});
3946
3947 const std::string kLogfile =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003948 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003949
3950 {
3951 LoggerState pi2_logger = MakeLogger(pi2_);
3952 pi2_logger.StartLogger(kLogfile);
3953 event_loop_factory_.RunFor(before_disconnect_duration);
3954
3955 pi2_->Disconnect(pi1_->node());
3956
3957 event_loop_factory_.RunFor(test_duration);
3958 pi2_->Connect(pi1_->node());
3959
3960 event_loop_factory_.RunFor(chrono::milliseconds(5000));
3961 pi2_logger.AppendAllFilenames(&filenames);
3962 }
3963
3964 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003965 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003966 ConfirmReadable(filenames);
3967}
3968
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003969// Tests that we can replay a logfile that has timestamps such that at least
3970// one node's epoch is at a positive distributed_clock (and thus will have to
3971// be booted after the other node(s)).
Naman Guptaa63aa132023-03-22 20:06:34 -07003972TEST_P(MultinodeLoggerTest, StartOneNodeBeforeOther) {
3973 std::vector<std::string> filenames;
3974
3975 CHECK_EQ(pi1_index_, 0u);
3976 CHECK_EQ(pi2_index_, 1u);
3977
3978 time_converter_.AddNextTimestamp(
3979 distributed_clock::epoch(),
3980 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3981
3982 const chrono::nanoseconds before_reboot_duration = chrono::milliseconds(1000);
3983 time_converter_.RebootAt(
3984 0, distributed_clock::time_point(before_reboot_duration));
3985
3986 const chrono::nanoseconds test_duration = time_converter_.AddMonotonic(
3987 {chrono::milliseconds(10000), chrono::milliseconds(10000)});
3988
3989 const std::string kLogfile =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003990 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003991 util::UnlinkRecursive(kLogfile);
3992
3993 pi2_->Disconnect(pi1_->node());
3994 pi1_->Disconnect(pi2_->node());
3995
3996 {
3997 LoggerState pi2_logger = MakeLogger(pi2_);
3998
3999 pi2_logger.StartLogger(kLogfile);
4000 event_loop_factory_.RunFor(before_reboot_duration);
4001
4002 pi2_->Connect(pi1_->node());
4003 pi1_->Connect(pi2_->node());
4004
4005 event_loop_factory_.RunFor(test_duration);
4006
4007 pi2_logger.AppendAllFilenames(&filenames);
4008 }
4009
4010 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004011 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004012 ConfirmReadable(filenames);
4013
4014 {
4015 LogReader reader(sorted_parts);
4016 SimulatedEventLoopFactory replay_factory(reader.configuration());
4017 reader.RegisterWithoutStarting(&replay_factory);
4018
4019 NodeEventLoopFactory *const replay_node =
4020 reader.event_loop_factory()->GetNodeEventLoopFactory("pi1");
4021
4022 std::unique_ptr<EventLoop> test_event_loop =
4023 replay_node->MakeEventLoop("test_reader");
4024 replay_node->OnStartup([replay_node]() {
4025 // Check that we didn't boot until at least t=0.
4026 CHECK_LE(monotonic_clock::epoch(), replay_node->monotonic_now());
4027 });
4028 test_event_loop->OnRun([&test_event_loop]() {
4029 // Check that we didn't boot until at least t=0.
4030 EXPECT_LE(monotonic_clock::epoch(), test_event_loop->monotonic_now());
4031 });
4032 reader.event_loop_factory()->Run();
4033 reader.Deregister();
4034 }
4035}
4036
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004037// Tests that when we have a loop without all the logs at all points in time,
4038// we can sort it properly.
Naman Guptaa63aa132023-03-22 20:06:34 -07004039TEST(MultinodeLoggerLoopTest, Loop) {
4040 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004041 aos::configuration::ReadConfig(
4042 ArtifactPath("aos/events/logging/"
4043 "multinode_pingpong_triangle_split_config.json"));
Naman Guptaa63aa132023-03-22 20:06:34 -07004044 message_bridge::TestingTimeConverter time_converter(
4045 configuration::NodesCount(&config.message()));
4046 SimulatedEventLoopFactory event_loop_factory(&config.message());
4047 event_loop_factory.SetTimeConverter(&time_converter);
4048
4049 NodeEventLoopFactory *const pi1 =
4050 event_loop_factory.GetNodeEventLoopFactory("pi1");
4051 NodeEventLoopFactory *const pi2 =
4052 event_loop_factory.GetNodeEventLoopFactory("pi2");
4053 NodeEventLoopFactory *const pi3 =
4054 event_loop_factory.GetNodeEventLoopFactory("pi3");
4055
4056 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004057 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004058 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004059 aos::testing::TestTmpDir() + "/logs/multi_logfile2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004060 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004061 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004062
4063 {
4064 // Make pi1 boot before everything else.
4065 time_converter.AddNextTimestamp(
4066 distributed_clock::epoch(),
4067 {BootTimestamp::epoch(),
4068 BootTimestamp::epoch() - chrono::milliseconds(100),
4069 BootTimestamp::epoch() - chrono::milliseconds(300)});
4070 }
4071
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004072 // We want to setup a situation such that 2 of the 3 legs of the loop are
4073 // very confident about time being X, and the third leg is pulling the
4074 // average off to one side.
Naman Guptaa63aa132023-03-22 20:06:34 -07004075 //
4076 // It's easiest to visualize this in timestamp_plotter.
4077
4078 std::vector<std::string> filenames;
4079 {
4080 // Have pi1 send out a reliable message at startup. This sets up a long
4081 // forwarding time message at the start to bias time.
4082 std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
4083 {
4084 aos::Sender<examples::Ping> ping_sender =
4085 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
4086
4087 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
4088 examples::Ping::Builder ping_builder =
4089 builder.MakeBuilder<examples::Ping>();
4090 CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
4091 }
4092
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004093 // Wait a while so there's enough data to let the worst case be rather
4094 // off.
Naman Guptaa63aa132023-03-22 20:06:34 -07004095 event_loop_factory.RunFor(chrono::seconds(1000));
4096
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004097 // Now start a receiving node first. This sets up 2 tight bounds between
4098 // 2 of the nodes.
Naman Guptaa63aa132023-03-22 20:06:34 -07004099 LoggerState pi2_logger = MakeLoggerState(
4100 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4101 pi2_logger.StartLogger(kLogfile2_1);
4102
4103 event_loop_factory.RunFor(chrono::seconds(100));
4104
4105 // And now start the third leg.
4106 LoggerState pi3_logger = MakeLoggerState(
4107 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4108 pi3_logger.StartLogger(kLogfile3_1);
4109
4110 LoggerState pi1_logger = MakeLoggerState(
4111 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4112 pi1_logger.StartLogger(kLogfile1_1);
4113
4114 event_loop_factory.RunFor(chrono::seconds(100));
4115
4116 pi1_logger.AppendAllFilenames(&filenames);
4117 pi2_logger.AppendAllFilenames(&filenames);
4118 pi3_logger.AppendAllFilenames(&filenames);
4119 }
4120
4121 // Make sure we can read this.
4122 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004123 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004124 auto result = ConfirmReadable(filenames);
4125}
4126
Austin Schuh08dba8f2023-05-01 08:29:30 -07004127// Tests that RestartLogging works in the simple case. Unfortunately, the
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004128// failure cases involve simulating time elapsing in callbacks, which is
4129// really hard. The best we can reasonably do is make sure 2 back to back
4130// logs are parseable together.
Austin Schuh08dba8f2023-05-01 08:29:30 -07004131TEST_P(MultinodeLoggerTest, RestartLogging) {
4132 time_converter_.AddMonotonic(
4133 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
4134 std::vector<std::string> filenames;
4135 {
4136 LoggerState pi1_logger = MakeLogger(pi1_);
4137
4138 event_loop_factory_.RunFor(chrono::milliseconds(95));
4139
4140 StartLogger(&pi1_logger, logfile_base1_);
4141 aos::monotonic_clock::time_point last_rotation_time =
4142 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07004143 pi1_logger.logger->set_on_logged_period(
4144 [&](aos::monotonic_clock::time_point) {
4145 const auto now = pi1_logger.event_loop->monotonic_now();
4146 if (now > last_rotation_time + std::chrono::seconds(5)) {
4147 pi1_logger.AppendAllFilenames(&filenames);
4148 std::unique_ptr<MultiNodeFilesLogNamer> namer =
4149 pi1_logger.MakeLogNamer(logfile_base2_);
4150 pi1_logger.log_namer = namer.get();
Austin Schuh08dba8f2023-05-01 08:29:30 -07004151
Austin Schuh2f864452023-07-17 14:53:08 -07004152 pi1_logger.logger->RestartLogging(std::move(namer));
4153 last_rotation_time = now;
4154 }
4155 });
Austin Schuh08dba8f2023-05-01 08:29:30 -07004156
4157 event_loop_factory_.RunFor(chrono::milliseconds(7000));
4158
4159 pi1_logger.AppendAllFilenames(&filenames);
4160 }
4161
4162 for (const auto &x : filenames) {
4163 LOG(INFO) << x;
4164 }
4165
4166 EXPECT_GE(filenames.size(), 2u);
4167
4168 ConfirmReadable(filenames);
4169
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004170 // TODO(austin): It would be good to confirm that any one time messages end
4171 // up in both logs correctly.
Austin Schuh08dba8f2023-05-01 08:29:30 -07004172}
4173
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004174// Tests that when we have evidence of 2 boots, and then start logging, the
4175// max_out_of_order_duration ends up reasonable on the boot with the start time.
4176TEST(MultinodeLoggerLoopTest, PreviousBootData) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07004177 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4178 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4179
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004180 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4181 aos::configuration::ReadConfig(ArtifactPath(
4182 "aos/events/logging/multinode_pingpong_reboot_ooo_config.json"));
4183 message_bridge::TestingTimeConverter time_converter(
4184 configuration::NodesCount(&config.message()));
4185 SimulatedEventLoopFactory event_loop_factory(&config.message());
4186 event_loop_factory.SetTimeConverter(&time_converter);
4187
4188 const UUID pi1_boot0 = UUID::Random();
4189 const UUID pi2_boot0 = UUID::Random();
4190 const UUID pi2_boot1 = UUID::Random();
4191
4192 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004193 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004194
4195 {
4196 constexpr size_t kPi1Index = 0;
4197 constexpr size_t kPi2Index = 1;
4198 time_converter.set_boot_uuid(kPi1Index, 0, pi1_boot0);
4199 time_converter.set_boot_uuid(kPi2Index, 0, pi2_boot0);
4200 time_converter.set_boot_uuid(kPi2Index, 1, pi2_boot1);
4201
4202 // Make pi1 boot before everything else.
4203 time_converter.AddNextTimestamp(
4204 distributed_clock::epoch(),
4205 {BootTimestamp::epoch(),
4206 BootTimestamp::epoch() - chrono::milliseconds(100)});
4207
4208 const chrono::nanoseconds reboot_time = chrono::seconds(1005);
4209 time_converter.AddNextTimestamp(
4210 distributed_clock::epoch() + reboot_time,
4211 {BootTimestamp::epoch() + reboot_time,
4212 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()}});
4213 }
4214
4215 NodeEventLoopFactory *const pi1 =
4216 event_loop_factory.GetNodeEventLoopFactory("pi1");
4217 NodeEventLoopFactory *const pi2 =
4218 event_loop_factory.GetNodeEventLoopFactory("pi2");
4219
4220 // What we want is for pi2 to send a message at t=1000 on the first channel
4221 // (/atest1 pong), and t=1 on the second channel (/atest3 pong). That'll make
4222 // the max out of order duration be large.
4223 //
4224 // Then, we reboot, and only send messages on a third channel (/atest2 pong).
4225 // The order is key, they need to sort in this order in the config.
4226
4227 std::vector<std::string> filenames;
4228 {
4229 {
4230 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4231 aos::Sender<examples::Pong> pong_sender =
4232 pi2_event_loop->MakeSender<examples::Pong>("/atest3");
4233
4234 pi2_event_loop->OnRun([&]() {
4235 aos::Sender<examples::Pong>::Builder builder =
4236 pong_sender.MakeBuilder();
4237 examples::Pong::Builder pong_builder =
4238 builder.MakeBuilder<examples::Pong>();
4239 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4240 });
4241
4242 event_loop_factory.RunFor(chrono::seconds(1000));
4243 }
4244
4245 {
4246 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4247 aos::Sender<examples::Pong> pong_sender =
4248 pi2_event_loop->MakeSender<examples::Pong>("/atest1");
4249
4250 aos::Sender<examples::Pong>::Builder builder = pong_sender.MakeBuilder();
4251 examples::Pong::Builder pong_builder =
4252 builder.MakeBuilder<examples::Pong>();
4253 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4254 }
4255
4256 event_loop_factory.RunFor(chrono::seconds(10));
4257
4258 // Now start a receiving node first. This sets up 2 tight bounds between
4259 // 2 of the nodes.
4260 LoggerState pi1_logger = MakeLoggerState(
4261 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4262 pi1_logger.StartLogger(kLogfile1_1);
4263
4264 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4265 aos::Sender<examples::Pong> pong_sender =
4266 pi2_event_loop->MakeSender<examples::Pong>("/atest2");
4267
4268 pi2_event_loop->AddPhasedLoop(
4269 [&pong_sender](int) {
4270 aos::Sender<examples::Pong>::Builder builder =
4271 pong_sender.MakeBuilder();
4272 examples::Pong::Builder pong_builder =
4273 builder.MakeBuilder<examples::Pong>();
4274 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4275 },
4276 chrono::milliseconds(10));
4277
4278 event_loop_factory.RunFor(chrono::seconds(100));
4279
4280 pi1_logger.AppendAllFilenames(&filenames);
4281 }
4282
4283 // Make sure we can read this.
4284 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4285 EXPECT_TRUE(AllRebootPartsMatchOutOfOrderDuration(sorted_parts, "pi2"));
4286 auto result = ConfirmReadable(filenames);
4287}
4288
4289// Tests that when we start without a connection, and then start logging, the
4290// max_out_of_order_duration ends up reasonable.
4291TEST(MultinodeLoggerLoopTest, StartDisconnected) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07004292 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4293 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4294
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004295 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4296 aos::configuration::ReadConfig(ArtifactPath(
4297 "aos/events/logging/multinode_pingpong_reboot_ooo_config.json"));
4298 message_bridge::TestingTimeConverter time_converter(
4299 configuration::NodesCount(&config.message()));
4300 SimulatedEventLoopFactory event_loop_factory(&config.message());
4301 event_loop_factory.SetTimeConverter(&time_converter);
4302
4303 time_converter.StartEqual();
4304
4305 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004306 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004307
4308 NodeEventLoopFactory *const pi1 =
4309 event_loop_factory.GetNodeEventLoopFactory("pi1");
4310 NodeEventLoopFactory *const pi2 =
4311 event_loop_factory.GetNodeEventLoopFactory("pi2");
4312
4313 // What we want is for pi2 to send a message at t=1000 on the first channel
4314 // (/atest1 pong), and t=1 on the second channel (/atest3 pong). That'll make
4315 // the max out of order duration be large.
4316 //
4317 // Then, we disconnect, and only send messages on a third channel
4318 // (/atest2 pong). The order is key, they need to sort in this order in the
4319 // config so we observe them in the order which grows the
4320 // max_out_of_order_duration.
4321
4322 std::vector<std::string> filenames;
4323 {
4324 {
4325 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4326 aos::Sender<examples::Pong> pong_sender =
4327 pi2_event_loop->MakeSender<examples::Pong>("/atest3");
4328
4329 pi2_event_loop->OnRun([&]() {
4330 aos::Sender<examples::Pong>::Builder builder =
4331 pong_sender.MakeBuilder();
4332 examples::Pong::Builder pong_builder =
4333 builder.MakeBuilder<examples::Pong>();
4334 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4335 });
4336
4337 event_loop_factory.RunFor(chrono::seconds(1000));
4338 }
4339
4340 {
4341 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4342 aos::Sender<examples::Pong> pong_sender =
4343 pi2_event_loop->MakeSender<examples::Pong>("/atest1");
4344
4345 aos::Sender<examples::Pong>::Builder builder = pong_sender.MakeBuilder();
4346 examples::Pong::Builder pong_builder =
4347 builder.MakeBuilder<examples::Pong>();
4348 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4349 }
4350
4351 event_loop_factory.RunFor(chrono::seconds(10));
4352
4353 pi1->Disconnect(pi2->node());
4354 pi2->Disconnect(pi1->node());
4355
4356 // Make data flow.
4357 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4358 aos::Sender<examples::Pong> pong_sender =
4359 pi2_event_loop->MakeSender<examples::Pong>("/atest2");
4360
4361 pi2_event_loop->AddPhasedLoop(
4362 [&pong_sender](int) {
4363 aos::Sender<examples::Pong>::Builder builder =
4364 pong_sender.MakeBuilder();
4365 examples::Pong::Builder pong_builder =
4366 builder.MakeBuilder<examples::Pong>();
4367 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4368 },
4369 chrono::milliseconds(10));
4370
4371 event_loop_factory.RunFor(chrono::seconds(10));
4372
4373 // Now start a receiving node first. This sets up 2 tight bounds between
4374 // 2 of the nodes.
4375 LoggerState pi1_logger = MakeLoggerState(
4376 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4377 pi1_logger.StartLogger(kLogfile1_1);
4378
4379 event_loop_factory.RunFor(chrono::seconds(10));
4380
4381 // Now, reconnect, and everything should recover.
4382 pi1->Connect(pi2->node());
4383 pi2->Connect(pi1->node());
4384
4385 event_loop_factory.RunFor(chrono::seconds(10));
4386
4387 pi1_logger.AppendAllFilenames(&filenames);
4388 }
4389
4390 // Make sure we can read this.
4391 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4392 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
4393 auto result = ConfirmReadable(filenames);
4394}
4395
Austin Schuh1124c512023-08-01 15:20:44 -07004396// Class to spam Pong messages blindly.
4397class PongSender {
4398 public:
4399 PongSender(EventLoop *loop, std::string_view channel_name)
4400 : sender_(loop->MakeSender<examples::Pong>(channel_name)) {
4401 loop->AddPhasedLoop(
4402 [this](int) {
4403 aos::Sender<examples::Pong>::Builder builder = sender_.MakeBuilder();
4404 examples::Pong::Builder pong_builder =
4405 builder.MakeBuilder<examples::Pong>();
4406 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4407 },
4408 chrono::milliseconds(10));
4409 }
4410
4411 private:
4412 aos::Sender<examples::Pong> sender_;
4413};
4414
4415// Tests that we log correctly as nodes connect slowly.
4416TEST(MultinodeLoggerLoopTest, StaggeredConnect) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07004417 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4418 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4419
Austin Schuh1124c512023-08-01 15:20:44 -07004420 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4421 aos::configuration::ReadConfig(ArtifactPath(
4422 "aos/events/logging/multinode_pingpong_pi3_pingpong_config.json"));
4423 message_bridge::TestingTimeConverter time_converter(
4424 configuration::NodesCount(&config.message()));
4425 SimulatedEventLoopFactory event_loop_factory(&config.message());
4426 event_loop_factory.SetTimeConverter(&time_converter);
4427
4428 time_converter.StartEqual();
4429
4430 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004431 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Austin Schuh1124c512023-08-01 15:20:44 -07004432
4433 NodeEventLoopFactory *const pi1 =
4434 event_loop_factory.GetNodeEventLoopFactory("pi1");
4435 NodeEventLoopFactory *const pi2 =
4436 event_loop_factory.GetNodeEventLoopFactory("pi2");
4437 NodeEventLoopFactory *const pi3 =
4438 event_loop_factory.GetNodeEventLoopFactory("pi3");
4439
4440 // What we want is for pi2 to send a message at t=1000 on the first channel
4441 // (/atest1 pong), and t=1 on the second channel (/atest3 pong). That'll make
4442 // the max out of order duration be large.
4443 //
4444 // Then, we disconnect, and only send messages on a third channel
4445 // (/atest2 pong). The order is key, they need to sort in this order in the
4446 // config so we observe them in the order which grows the
4447 // max_out_of_order_duration.
4448
4449 pi1->Disconnect(pi2->node());
4450 pi2->Disconnect(pi1->node());
4451
4452 pi1->Disconnect(pi3->node());
4453 pi3->Disconnect(pi1->node());
4454
4455 std::vector<std::string> filenames;
4456 pi2->AlwaysStart<PongSender>("pongsender", "/test2");
4457 pi3->AlwaysStart<PongSender>("pongsender", "/test3");
4458
4459 event_loop_factory.RunFor(chrono::seconds(10));
4460
4461 {
4462 // Now start a receiving node first. This sets up 2 tight bounds between
4463 // 2 of the nodes.
4464 LoggerState pi1_logger = MakeLoggerState(
4465 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4466 pi1_logger.StartLogger(kLogfile1_1);
4467
4468 event_loop_factory.RunFor(chrono::seconds(10));
4469
4470 // Now, reconnect, and everything should recover.
4471 pi1->Connect(pi2->node());
4472 pi2->Connect(pi1->node());
4473
4474 event_loop_factory.RunFor(chrono::seconds(10));
4475
4476 pi1->Connect(pi3->node());
4477 pi3->Connect(pi1->node());
4478
4479 event_loop_factory.RunFor(chrono::seconds(10));
4480
4481 pi1_logger.AppendAllFilenames(&filenames);
4482 }
4483
4484 // Make sure we can read this.
4485 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4486 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
4487 auto result = ConfirmReadable(filenames);
4488}
4489
Naman Guptaa63aa132023-03-22 20:06:34 -07004490} // namespace testing
4491} // namespace logger
4492} // namespace aos