blob: f6e54477f11a1bb977693e38c00eba3fa1317d8a [file] [log] [blame]
Brian Smartte67d7112023-03-20 12:06:30 -07001#include <algorithm>
2
Philipp Schrader790cb542023-07-05 21:06:52 -07003#include "gmock/gmock.h"
4#include "gtest/gtest.h"
5
Naman Guptaa63aa132023-03-22 20:06:34 -07006#include "aos/events/logging/log_reader.h"
7#include "aos/events/logging/multinode_logger_test_lib.h"
8#include "aos/events/message_counter.h"
9#include "aos/events/ping_lib.h"
10#include "aos/events/pong_lib.h"
11#include "aos/network/remote_message_generated.h"
12#include "aos/network/timestamp_generated.h"
13#include "aos/testing/tmpdir.h"
Naman Guptaa63aa132023-03-22 20:06:34 -070014
15namespace aos {
16namespace logger {
17namespace testing {
18
19namespace chrono = std::chrono;
20using aos::message_bridge::RemoteMessage;
21using aos::testing::ArtifactPath;
22using aos::testing::MessageCounter;
23
Naman Guptaa63aa132023-03-22 20:06:34 -070024INSTANTIATE_TEST_SUITE_P(
25 All, MultinodeLoggerTest,
26 ::testing::Combine(
27 ::testing::Values(
28 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080029 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070030 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080031 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070032 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
33
34INSTANTIATE_TEST_SUITE_P(
35 All, MultinodeLoggerDeathTest,
36 ::testing::Combine(
37 ::testing::Values(
38 ConfigParams{"multinode_pingpong_combined_config.json", true,
Naman Guptac1069282023-03-06 11:01:53 -080039 kCombinedConfigSha1(), kCombinedConfigSha1()},
Naman Guptaa63aa132023-03-22 20:06:34 -070040 ConfigParams{"multinode_pingpong_split_config.json", false,
Naman Guptac1069282023-03-06 11:01:53 -080041 kSplitConfigSha1(), kReloggedSplitConfigSha1()}),
Naman Guptaa63aa132023-03-22 20:06:34 -070042 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
43
44// Tests that we can write and read simple multi-node log files.
45TEST_P(MultinodeLoggerTest, SimpleMultiNode) {
46 std::vector<std::string> actual_filenames;
47 time_converter_.StartEqual();
48
49 {
50 LoggerState pi1_logger = MakeLogger(pi1_);
51 LoggerState pi2_logger = MakeLogger(pi2_);
52
53 event_loop_factory_.RunFor(chrono::milliseconds(95));
54
55 StartLogger(&pi1_logger);
56 StartLogger(&pi2_logger);
57
58 event_loop_factory_.RunFor(chrono::milliseconds(20000));
59 pi1_logger.AppendAllFilenames(&actual_filenames);
60 pi2_logger.AppendAllFilenames(&actual_filenames);
61 }
62
63 ASSERT_THAT(actual_filenames,
64 ::testing::UnorderedElementsAreArray(logfiles_));
65
66 {
67 std::set<std::string> logfile_uuids;
68 std::set<std::string> parts_uuids;
69 // Confirm that we have the expected number of UUIDs for both the logfile
70 // UUIDs and parts UUIDs.
71 std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
72 for (std::string_view f : logfiles_) {
73 log_header.emplace_back(ReadHeader(f).value());
74 if (!log_header.back().message().has_configuration()) {
75 logfile_uuids.insert(
76 log_header.back().message().log_event_uuid()->str());
77 parts_uuids.insert(log_header.back().message().parts_uuid()->str());
78 }
79 }
80
81 EXPECT_EQ(logfile_uuids.size(), 2u);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -070082 EXPECT_EQ(parts_uuids.size(), 6u);
Naman Guptaa63aa132023-03-22 20:06:34 -070083
84 // And confirm everything is on the correct node.
85 EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
86 EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi1");
87 EXPECT_EQ(log_header[4].message().node()->name()->string_view(), "pi1");
88
89 EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
90 EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -070091 EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -070092
Mithun Bharadwaj0c629932023-08-02 16:10:40 -070093 EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi1");
94 EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi1");
Naman Guptaa63aa132023-03-22 20:06:34 -070095
Mithun Bharadwaj0c629932023-08-02 16:10:40 -070096 EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi2");
97 EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -070098
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -070099 EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
100 EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
101 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700102
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700103 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi1");
104 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi1");
Naman Guptaa63aa132023-03-22 20:06:34 -0700105
106 // And the parts index matches.
107 EXPECT_EQ(log_header[2].message().parts_index(), 0);
108 EXPECT_EQ(log_header[3].message().parts_index(), 1);
109 EXPECT_EQ(log_header[4].message().parts_index(), 2);
110
111 EXPECT_EQ(log_header[5].message().parts_index(), 0);
112 EXPECT_EQ(log_header[6].message().parts_index(), 1);
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700113 EXPECT_EQ(log_header[7].message().parts_index(), 2);
Naman Guptaa63aa132023-03-22 20:06:34 -0700114
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700115 EXPECT_EQ(log_header[8].message().parts_index(), 0);
116 EXPECT_EQ(log_header[9].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700117
118 EXPECT_EQ(log_header[10].message().parts_index(), 0);
119 EXPECT_EQ(log_header[11].message().parts_index(), 1);
120
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700121 EXPECT_EQ(log_header[12].message().parts_index(), 0);
122 EXPECT_EQ(log_header[13].message().parts_index(), 1);
123 EXPECT_EQ(log_header[14].message().parts_index(), 2);
Naman Guptaa63aa132023-03-22 20:06:34 -0700124
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700125 EXPECT_EQ(log_header[15].message().parts_index(), 0);
126 EXPECT_EQ(log_header[16].message().parts_index(), 1);
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700127
128 // And that the data_stored field is right.
129 EXPECT_THAT(*log_header[2].message().data_stored(),
130 ::testing::ElementsAre(StoredDataType::DATA,
131 StoredDataType::TIMESTAMPS));
132 EXPECT_THAT(*log_header[3].message().data_stored(),
133 ::testing::ElementsAre(StoredDataType::DATA,
134 StoredDataType::TIMESTAMPS));
135 EXPECT_THAT(*log_header[4].message().data_stored(),
136 ::testing::ElementsAre(StoredDataType::DATA,
137 StoredDataType::TIMESTAMPS));
138
139 EXPECT_THAT(*log_header[5].message().data_stored(),
140 ::testing::ElementsAre(StoredDataType::DATA,
141 StoredDataType::TIMESTAMPS));
142 EXPECT_THAT(*log_header[6].message().data_stored(),
143 ::testing::ElementsAre(StoredDataType::DATA,
144 StoredDataType::TIMESTAMPS));
145 EXPECT_THAT(*log_header[7].message().data_stored(),
146 ::testing::ElementsAre(StoredDataType::DATA,
147 StoredDataType::TIMESTAMPS));
148
149 EXPECT_THAT(*log_header[8].message().data_stored(),
150 ::testing::ElementsAre(StoredDataType::DATA));
151 EXPECT_THAT(*log_header[9].message().data_stored(),
152 ::testing::ElementsAre(StoredDataType::DATA));
153
154 EXPECT_THAT(*log_header[10].message().data_stored(),
155 ::testing::ElementsAre(StoredDataType::DATA));
156 EXPECT_THAT(*log_header[11].message().data_stored(),
157 ::testing::ElementsAre(StoredDataType::DATA));
158
159 EXPECT_THAT(*log_header[12].message().data_stored(),
160 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
161 EXPECT_THAT(*log_header[13].message().data_stored(),
162 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
163 EXPECT_THAT(*log_header[14].message().data_stored(),
164 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
165
166 EXPECT_THAT(*log_header[15].message().data_stored(),
167 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
168 EXPECT_THAT(*log_header[16].message().data_stored(),
169 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
Naman Guptaa63aa132023-03-22 20:06:34 -0700170 }
171
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700172 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
173 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -0700174 {
175 using ::testing::UnorderedElementsAre;
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700176 std::shared_ptr<const aos::Configuration> config = sorted_parts[0].config;
Naman Guptaa63aa132023-03-22 20:06:34 -0700177
178 // Timing reports, pings
179 EXPECT_THAT(CountChannelsData(config, logfiles_[2]),
180 UnorderedElementsAre(
181 std::make_tuple("/pi1/aos",
182 "aos.message_bridge.ServerStatistics", 1),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700183 std::make_tuple("/test", "aos.examples.Ping", 1),
184 std::make_tuple("/pi1/aos", "aos.examples.Ping", 1)))
Naman Guptaa63aa132023-03-22 20:06:34 -0700185 << " : " << logfiles_[2];
186 {
187 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700188 std::make_tuple("/pi1/aos", "aos.examples.Ping", 10),
Naman Guptaa63aa132023-03-22 20:06:34 -0700189 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 1),
190 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
191 1)};
192 if (!std::get<0>(GetParam()).shared) {
193 channel_counts.push_back(
194 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
195 "aos-message_bridge-Timestamp",
196 "aos.message_bridge.RemoteMessage", 1));
197 }
198 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
199 ::testing::UnorderedElementsAreArray(channel_counts))
200 << " : " << logfiles_[3];
201 }
202 {
203 std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700204 std::make_tuple("/pi1/aos", "aos.examples.Ping", 1990),
Naman Guptaa63aa132023-03-22 20:06:34 -0700205 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 199),
206 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
207 20),
208 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
209 199),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700210 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
Naman Guptaa63aa132023-03-22 20:06:34 -0700211 std::make_tuple("/test", "aos.examples.Ping", 2000)};
212 if (!std::get<0>(GetParam()).shared) {
213 channel_counts.push_back(
214 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
215 "aos-message_bridge-Timestamp",
216 "aos.message_bridge.RemoteMessage", 199));
217 }
218 EXPECT_THAT(CountChannelsData(config, logfiles_[4]),
219 ::testing::UnorderedElementsAreArray(channel_counts))
220 << " : " << logfiles_[4];
221 }
222 // Timestamps for pong
223 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
224 UnorderedElementsAre())
225 << " : " << logfiles_[2];
226 EXPECT_THAT(
227 CountChannelsTimestamp(config, logfiles_[3]),
228 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
229 << " : " << logfiles_[3];
230 EXPECT_THAT(
231 CountChannelsTimestamp(config, logfiles_[4]),
232 UnorderedElementsAre(
233 std::make_tuple("/test", "aos.examples.Pong", 2000),
234 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)))
235 << " : " << logfiles_[4];
236
Naman Guptaa63aa132023-03-22 20:06:34 -0700237 // Timing reports and pongs.
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700238 EXPECT_THAT(CountChannelsData(config, logfiles_[5]),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700239 UnorderedElementsAre(
240 std::make_tuple("/pi2/aos", "aos.examples.Ping", 1),
241 std::make_tuple("/pi2/aos",
242 "aos.message_bridge.ServerStatistics", 1)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700243 << " : " << logfiles_[5];
Naman Guptaa63aa132023-03-22 20:06:34 -0700244 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700245 CountChannelsData(config, logfiles_[6]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700246 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700247 << " : " << logfiles_[6];
Naman Guptaa63aa132023-03-22 20:06:34 -0700248 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700249 CountChannelsData(config, logfiles_[7]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700250 UnorderedElementsAre(
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700251 std::make_tuple("/pi2/aos", "aos.examples.Ping", 2000),
Naman Guptaa63aa132023-03-22 20:06:34 -0700252 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
253 std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
254 20),
255 std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
256 200),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700257 std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
Naman Guptaa63aa132023-03-22 20:06:34 -0700258 std::make_tuple("/test", "aos.examples.Pong", 2000)))
Naman Guptaa63aa132023-03-22 20:06:34 -0700259 << " : " << logfiles_[7];
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700260 // And ping timestamps.
261 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
262 UnorderedElementsAre())
263 << " : " << logfiles_[5];
Naman Guptaa63aa132023-03-22 20:06:34 -0700264 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700265 CountChannelsTimestamp(config, logfiles_[6]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700266 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700267 << " : " << logfiles_[6];
Naman Guptaa63aa132023-03-22 20:06:34 -0700268 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700269 CountChannelsTimestamp(config, logfiles_[7]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700270 UnorderedElementsAre(
271 std::make_tuple("/test", "aos.examples.Ping", 2000),
272 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700273 << " : " << logfiles_[7];
Naman Guptaa63aa132023-03-22 20:06:34 -0700274
275 // And then test that the remotely logged timestamp data files only have
276 // timestamps in them.
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700277 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[8]),
278 UnorderedElementsAre())
279 << " : " << logfiles_[8];
280 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[9]),
281 UnorderedElementsAre())
282 << " : " << logfiles_[9];
Naman Guptaa63aa132023-03-22 20:06:34 -0700283 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
284 UnorderedElementsAre())
285 << " : " << logfiles_[10];
286 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
287 UnorderedElementsAre())
288 << " : " << logfiles_[11];
Naman Guptaa63aa132023-03-22 20:06:34 -0700289
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700290 EXPECT_THAT(CountChannelsData(config, logfiles_[8]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700291 UnorderedElementsAre(std::make_tuple(
292 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700293 << " : " << logfiles_[8];
294 EXPECT_THAT(CountChannelsData(config, logfiles_[9]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700295 UnorderedElementsAre(std::make_tuple(
296 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700297 << " : " << logfiles_[9];
Naman Guptaa63aa132023-03-22 20:06:34 -0700298
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700299 // Pong snd timestamp data.
300 EXPECT_THAT(
301 CountChannelsData(config, logfiles_[10]),
302 UnorderedElementsAre(
303 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 9),
304 std::make_tuple("/test", "aos.examples.Pong", 91)))
305 << " : " << logfiles_[10];
306 EXPECT_THAT(
307 CountChannelsData(config, logfiles_[11]),
308 UnorderedElementsAre(
309 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 191),
310 std::make_tuple("/test", "aos.examples.Pong", 1910)))
311 << " : " << logfiles_[11];
Naman Guptaa63aa132023-03-22 20:06:34 -0700312
313 // Timestamps from pi2 on pi1, and the other way.
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700314 // if (shared()) {
315 EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
316 UnorderedElementsAre())
317 << " : " << logfiles_[12];
318 EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
319 UnorderedElementsAre())
320 << " : " << logfiles_[13];
321 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
322 UnorderedElementsAre())
323 << " : " << logfiles_[14];
324 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
325 UnorderedElementsAre())
326 << " : " << logfiles_[15];
327 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
328 UnorderedElementsAre())
329 << " : " << logfiles_[16];
Naman Guptaa63aa132023-03-22 20:06:34 -0700330
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700331 EXPECT_THAT(
332 CountChannelsTimestamp(config, logfiles_[12]),
333 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
334 << " : " << logfiles_[12];
335 EXPECT_THAT(
336 CountChannelsTimestamp(config, logfiles_[13]),
337 UnorderedElementsAre(
338 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 9),
339 std::make_tuple("/test", "aos.examples.Ping", 90)))
340 << " : " << logfiles_[13];
341 EXPECT_THAT(
342 CountChannelsTimestamp(config, logfiles_[14]),
343 UnorderedElementsAre(
344 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 191),
345 std::make_tuple("/test", "aos.examples.Ping", 1910)))
346 << " : " << logfiles_[14];
347 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
348 UnorderedElementsAre(std::make_tuple(
349 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
350 << " : " << logfiles_[15];
351 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[16]),
352 UnorderedElementsAre(std::make_tuple(
353 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
354 << " : " << logfiles_[16];
Naman Guptaa63aa132023-03-22 20:06:34 -0700355 }
356
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700357 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700358
359 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
360 log_reader_factory.set_send_delay(chrono::microseconds(0));
361
362 // This sends out the fetched messages and advances time to the start of the
363 // log file.
364 reader.Register(&log_reader_factory);
365
366 const Node *pi1 =
367 configuration::GetNode(log_reader_factory.configuration(), "pi1");
368 const Node *pi2 =
369 configuration::GetNode(log_reader_factory.configuration(), "pi2");
370
371 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
372 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
373 LOG(INFO) << "now pi1 "
374 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
375 LOG(INFO) << "now pi2 "
376 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
377
378 EXPECT_THAT(reader.LoggedNodes(),
379 ::testing::ElementsAre(
380 configuration::GetNode(reader.logged_configuration(), pi1),
381 configuration::GetNode(reader.logged_configuration(), pi2)));
382
383 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
384
385 std::unique_ptr<EventLoop> pi1_event_loop =
386 log_reader_factory.MakeEventLoop("test", pi1);
387 std::unique_ptr<EventLoop> pi2_event_loop =
388 log_reader_factory.MakeEventLoop("test", pi2);
389
390 int pi1_ping_count = 10;
391 int pi2_ping_count = 10;
392 int pi1_pong_count = 10;
393 int pi2_pong_count = 10;
394
395 // Confirm that the ping value matches.
396 pi1_event_loop->MakeWatcher(
397 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
398 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
399 << pi1_event_loop->context().monotonic_remote_time << " -> "
400 << pi1_event_loop->context().monotonic_event_time;
401 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
402 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
403 pi1_ping_count * chrono::milliseconds(10) +
404 monotonic_clock::epoch());
405 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
406 pi1_ping_count * chrono::milliseconds(10) +
407 realtime_clock::epoch());
408 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
409 pi1_event_loop->context().monotonic_event_time);
410 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
411 pi1_event_loop->context().realtime_event_time);
412
413 ++pi1_ping_count;
414 });
415 pi2_event_loop->MakeWatcher(
416 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
417 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
418 << pi2_event_loop->context().monotonic_remote_time << " -> "
419 << pi2_event_loop->context().monotonic_event_time;
420 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
421
422 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
423 pi2_ping_count * chrono::milliseconds(10) +
424 monotonic_clock::epoch());
425 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
426 pi2_ping_count * chrono::milliseconds(10) +
427 realtime_clock::epoch());
428 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time +
429 chrono::microseconds(150),
430 pi2_event_loop->context().monotonic_event_time);
431 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time +
432 chrono::microseconds(150),
433 pi2_event_loop->context().realtime_event_time);
434 ++pi2_ping_count;
435 });
436
437 constexpr ssize_t kQueueIndexOffset = -9;
438 // Confirm that the ping and pong counts both match, and the value also
439 // matches.
440 pi1_event_loop->MakeWatcher(
441 "/test", [&pi1_event_loop, &pi1_ping_count,
442 &pi1_pong_count](const examples::Pong &pong) {
443 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
444 << pi1_event_loop->context().monotonic_remote_time << " -> "
445 << pi1_event_loop->context().monotonic_event_time;
446
447 EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
448 pi1_pong_count + kQueueIndexOffset);
449 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
450 chrono::microseconds(200) +
451 pi1_pong_count * chrono::milliseconds(10) +
452 monotonic_clock::epoch());
453 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
454 chrono::microseconds(200) +
455 pi1_pong_count * chrono::milliseconds(10) +
456 realtime_clock::epoch());
457
458 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
459 chrono::microseconds(150),
460 pi1_event_loop->context().monotonic_event_time);
461 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
462 chrono::microseconds(150),
463 pi1_event_loop->context().realtime_event_time);
464
465 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
466 ++pi1_pong_count;
467 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
468 });
469 pi2_event_loop->MakeWatcher(
470 "/test", [&pi2_event_loop, &pi2_ping_count,
471 &pi2_pong_count](const examples::Pong &pong) {
472 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
473 << pi2_event_loop->context().monotonic_remote_time << " -> "
474 << pi2_event_loop->context().monotonic_event_time;
475
476 EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
477 pi2_pong_count + kQueueIndexOffset);
478
479 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
480 chrono::microseconds(200) +
481 pi2_pong_count * chrono::milliseconds(10) +
482 monotonic_clock::epoch());
483 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
484 chrono::microseconds(200) +
485 pi2_pong_count * chrono::milliseconds(10) +
486 realtime_clock::epoch());
487
488 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
489 pi2_event_loop->context().monotonic_event_time);
490 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
491 pi2_event_loop->context().realtime_event_time);
492
493 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
494 ++pi2_pong_count;
495 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
496 });
497
498 log_reader_factory.Run();
499 EXPECT_EQ(pi1_ping_count, 2010);
500 EXPECT_EQ(pi2_ping_count, 2010);
501 EXPECT_EQ(pi1_pong_count, 2010);
502 EXPECT_EQ(pi2_pong_count, 2010);
503
504 reader.Deregister();
505}
506
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600507// MultinodeLoggerTest that tests the mutate callback works across multiple
508// nodes with remapping
509TEST_P(MultinodeLoggerTest, MultiNodeRemapMutateCallback) {
510 time_converter_.StartEqual();
511 std::vector<std::string> actual_filenames;
512
513 {
514 LoggerState pi1_logger = MakeLogger(pi1_);
515 LoggerState pi2_logger = MakeLogger(pi2_);
516
517 event_loop_factory_.RunFor(chrono::milliseconds(95));
518
519 StartLogger(&pi1_logger);
520 StartLogger(&pi2_logger);
521
522 event_loop_factory_.RunFor(chrono::milliseconds(20000));
523 pi1_logger.AppendAllFilenames(&actual_filenames);
524 pi2_logger.AppendAllFilenames(&actual_filenames);
525 }
526
527 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700528 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600529
530 LogReader reader(sorted_parts, &config_.message());
531 // Remap just on pi1.
532 reader.RemapLoggedChannel<examples::Pong>(
533 "/test", configuration::GetNode(reader.configuration(), "pi1"));
534
535 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
536
537 int pong_count = 0;
538 // Adds a callback which mutates the value of the pong message before the
539 // message is sent which is the feature we are testing here
540 reader.AddBeforeSendCallback("/test",
541 [&pong_count](aos::examples::Pong *pong) {
542 pong->mutate_value(pong->value() + 1);
543 pong_count = pong->value();
544 });
545
546 // This sends out the fetched messages and advances time to the start of the
547 // log file.
548 reader.Register(&log_reader_factory);
549
550 const Node *pi1 =
551 configuration::GetNode(log_reader_factory.configuration(), "pi1");
552 const Node *pi2 =
553 configuration::GetNode(log_reader_factory.configuration(), "pi2");
554
555 EXPECT_THAT(reader.LoggedNodes(),
556 ::testing::ElementsAre(
557 configuration::GetNode(reader.logged_configuration(), pi1),
558 configuration::GetNode(reader.logged_configuration(), pi2)));
559
560 std::unique_ptr<EventLoop> pi1_event_loop =
561 log_reader_factory.MakeEventLoop("test", pi1);
562 std::unique_ptr<EventLoop> pi2_event_loop =
563 log_reader_factory.MakeEventLoop("test", pi2);
564
565 pi1_event_loop->MakeWatcher("/original/test",
566 [&pong_count](const examples::Pong &pong) {
567 EXPECT_EQ(pong_count, pong.value());
568 });
569
570 pi2_event_loop->MakeWatcher("/test",
571 [&pong_count](const examples::Pong &pong) {
572 EXPECT_EQ(pong_count, pong.value());
573 });
574
575 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
576 reader.Deregister();
577
578 EXPECT_EQ(pong_count, 2011);
579}
580
581// MultinodeLoggerTest that tests the mutate callback works across multiple
582// nodes
583TEST_P(MultinodeLoggerTest, MultiNodeMutateCallback) {
584 time_converter_.StartEqual();
585 std::vector<std::string> actual_filenames;
586
587 {
588 LoggerState pi1_logger = MakeLogger(pi1_);
589 LoggerState pi2_logger = MakeLogger(pi2_);
590
591 event_loop_factory_.RunFor(chrono::milliseconds(95));
592
593 StartLogger(&pi1_logger);
594 StartLogger(&pi2_logger);
595
596 event_loop_factory_.RunFor(chrono::milliseconds(20000));
597 pi1_logger.AppendAllFilenames(&actual_filenames);
598 pi2_logger.AppendAllFilenames(&actual_filenames);
599 }
600
601 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700602 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600603
604 LogReader reader(sorted_parts, &config_.message());
605
606 int pong_count = 0;
607 // Adds a callback which mutates the value of the pong message before the
608 // message is sent which is the feature we are testing here
609 reader.AddBeforeSendCallback("/test",
610 [&pong_count](aos::examples::Pong *pong) {
611 pong->mutate_value(pong->value() + 1);
612 pong_count = pong->value();
613 });
614
615 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
616
617 // This sends out the fetched messages and advances time to the start of the
618 // log file.
619 reader.Register(&log_reader_factory);
620
621 const Node *pi1 =
622 configuration::GetNode(log_reader_factory.configuration(), "pi1");
623 const Node *pi2 =
624 configuration::GetNode(log_reader_factory.configuration(), "pi2");
625
626 EXPECT_THAT(reader.LoggedNodes(),
627 ::testing::ElementsAre(
628 configuration::GetNode(reader.logged_configuration(), pi1),
629 configuration::GetNode(reader.logged_configuration(), pi2)));
630
631 std::unique_ptr<EventLoop> pi1_event_loop =
632 log_reader_factory.MakeEventLoop("test", pi1);
633 std::unique_ptr<EventLoop> pi2_event_loop =
634 log_reader_factory.MakeEventLoop("test", pi2);
635
636 pi1_event_loop->MakeWatcher("/test",
637 [&pong_count](const examples::Pong &pong) {
638 EXPECT_EQ(pong_count, pong.value());
639 });
640
641 pi2_event_loop->MakeWatcher("/test",
642 [&pong_count](const examples::Pong &pong) {
643 EXPECT_EQ(pong_count, pong.value());
644 });
645
646 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
647 reader.Deregister();
648
649 EXPECT_EQ(pong_count, 2011);
650}
651
652// Tests that the before send callback is only called from the sender node if it
653// is forwarded
654TEST_P(MultinodeLoggerTest, OnlyDoBeforeSendCallbackOnSenderNode) {
655 time_converter_.StartEqual();
656 {
657 LoggerState pi1_logger = MakeLogger(pi1_);
658 LoggerState pi2_logger = MakeLogger(pi2_);
659
660 event_loop_factory_.RunFor(chrono::milliseconds(95));
661
662 StartLogger(&pi1_logger);
663 StartLogger(&pi2_logger);
664
665 event_loop_factory_.RunFor(chrono::milliseconds(20000));
666 }
667
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700668 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
669 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
670 LogReader reader(sorted_parts);
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600671
672 int ping_count = 0;
673 // Adds a callback which mutates the value of the pong message before the
674 // message is sent which is the feature we are testing here
675 reader.AddBeforeSendCallback("/test",
676 [&ping_count](aos::examples::Ping *ping) {
677 ++ping_count;
678 ping->mutate_value(ping_count);
679 });
680
681 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
682 log_reader_factory.set_send_delay(chrono::microseconds(0));
683
684 reader.Register(&log_reader_factory);
685
686 const Node *pi1 =
687 configuration::GetNode(log_reader_factory.configuration(), "pi1");
688 const Node *pi2 =
689 configuration::GetNode(log_reader_factory.configuration(), "pi2");
690
691 std::unique_ptr<EventLoop> pi1_event_loop =
692 log_reader_factory.MakeEventLoop("test", pi1);
693 pi1_event_loop->SkipTimingReport();
694 std::unique_ptr<EventLoop> pi2_event_loop =
695 log_reader_factory.MakeEventLoop("test", pi2);
696 pi2_event_loop->SkipTimingReport();
697
698 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
699 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
700
701 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
702 pi1_ping_timestamp;
703 if (!shared()) {
704 pi1_ping_timestamp =
705 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
706 pi1_event_loop.get(),
707 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
708 }
709
710 log_reader_factory.Run();
711
712 EXPECT_EQ(pi1_ping.count(), 2000u);
713 EXPECT_EQ(pi2_ping.count(), 2000u);
714 // If the BeforeSendCallback is called on both nodes, then the ping count
715 // would be 4002 instead of 2001
716 EXPECT_EQ(ping_count, 2001u);
717 if (!shared()) {
718 EXPECT_EQ(pi1_ping_timestamp->count(), 2000u);
719 }
720
721 reader.Deregister();
722}
723
724// Tests that we do not allow adding callbacks after Register is called
725TEST_P(MultinodeLoggerDeathTest, AddCallbackAfterRegister) {
726 time_converter_.StartEqual();
727 std::vector<std::string> actual_filenames;
728
729 {
730 LoggerState pi1_logger = MakeLogger(pi1_);
731 LoggerState pi2_logger = MakeLogger(pi2_);
732
733 event_loop_factory_.RunFor(chrono::milliseconds(95));
734
735 StartLogger(&pi1_logger);
736 StartLogger(&pi2_logger);
737
738 event_loop_factory_.RunFor(chrono::milliseconds(20000));
739 pi1_logger.AppendAllFilenames(&actual_filenames);
740 pi2_logger.AppendAllFilenames(&actual_filenames);
741 }
742
743 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700744 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600745
746 LogReader reader(sorted_parts, &config_.message());
747 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
748 reader.Register(&log_reader_factory);
749 EXPECT_DEATH(
750 {
751 reader.AddBeforeSendCallback("/test", [](aos::examples::Pong *) {
752 LOG(FATAL) << "This should not be called";
753 });
754 },
755 "Cannot add callbacks after calling Register");
756 reader.Deregister();
757}
758
Naman Guptaa63aa132023-03-22 20:06:34 -0700759// Test that if we feed the replay with a mismatched node list that we die on
760// the LogReader constructor.
761TEST_P(MultinodeLoggerDeathTest, MultiNodeBadReplayConfig) {
762 time_converter_.StartEqual();
763 {
764 LoggerState pi1_logger = MakeLogger(pi1_);
765 LoggerState pi2_logger = MakeLogger(pi2_);
766
767 event_loop_factory_.RunFor(chrono::milliseconds(95));
768
769 StartLogger(&pi1_logger);
770 StartLogger(&pi2_logger);
771
772 event_loop_factory_.RunFor(chrono::milliseconds(20000));
773 }
774
775 // Test that, if we add an additional node to the replay config that the
776 // logger complains about the mismatch in number of nodes.
777 FlatbufferDetachedBuffer<Configuration> extra_nodes_config =
778 configuration::MergeWithConfig(&config_.message(), R"({
779 "nodes": [
780 {
781 "name": "extra-node"
782 }
783 ]
784 }
785 )");
786
787 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700788 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -0700789 EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
790 "Log file and replay config need to have matching nodes lists.");
791}
792
793// Tests that we can read log files where they don't start at the same monotonic
794// time.
795TEST_P(MultinodeLoggerTest, StaggeredStart) {
796 time_converter_.StartEqual();
797 std::vector<std::string> actual_filenames;
798
799 {
800 LoggerState pi1_logger = MakeLogger(pi1_);
801 LoggerState pi2_logger = MakeLogger(pi2_);
802
803 event_loop_factory_.RunFor(chrono::milliseconds(95));
804
805 StartLogger(&pi1_logger);
806
807 event_loop_factory_.RunFor(chrono::milliseconds(200));
808
809 StartLogger(&pi2_logger);
810
811 event_loop_factory_.RunFor(chrono::milliseconds(20000));
812 pi1_logger.AppendAllFilenames(&actual_filenames);
813 pi2_logger.AppendAllFilenames(&actual_filenames);
814 }
815
816 // Since we delay starting pi2, it already knows about all the timestamps so
817 // we don't end up with extra parts.
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700818 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
819 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
820 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700821
822 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
823 log_reader_factory.set_send_delay(chrono::microseconds(0));
824
825 // This sends out the fetched messages and advances time to the start of the
826 // log file.
827 reader.Register(&log_reader_factory);
828
829 const Node *pi1 =
830 configuration::GetNode(log_reader_factory.configuration(), "pi1");
831 const Node *pi2 =
832 configuration::GetNode(log_reader_factory.configuration(), "pi2");
833
834 EXPECT_THAT(reader.LoggedNodes(),
835 ::testing::ElementsAre(
836 configuration::GetNode(reader.logged_configuration(), pi1),
837 configuration::GetNode(reader.logged_configuration(), pi2)));
838
839 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
840
841 std::unique_ptr<EventLoop> pi1_event_loop =
842 log_reader_factory.MakeEventLoop("test", pi1);
843 std::unique_ptr<EventLoop> pi2_event_loop =
844 log_reader_factory.MakeEventLoop("test", pi2);
845
846 int pi1_ping_count = 30;
847 int pi2_ping_count = 30;
848 int pi1_pong_count = 30;
849 int pi2_pong_count = 30;
850
851 // Confirm that the ping value matches.
852 pi1_event_loop->MakeWatcher(
853 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
854 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
855 << pi1_event_loop->context().monotonic_remote_time << " -> "
856 << pi1_event_loop->context().monotonic_event_time;
857 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
858
859 ++pi1_ping_count;
860 });
861 pi2_event_loop->MakeWatcher(
862 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
863 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
864 << pi2_event_loop->context().monotonic_remote_time << " -> "
865 << pi2_event_loop->context().monotonic_event_time;
866 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
867
868 ++pi2_ping_count;
869 });
870
871 // Confirm that the ping and pong counts both match, and the value also
872 // matches.
873 pi1_event_loop->MakeWatcher(
874 "/test", [&pi1_event_loop, &pi1_ping_count,
875 &pi1_pong_count](const examples::Pong &pong) {
876 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
877 << pi1_event_loop->context().monotonic_remote_time << " -> "
878 << pi1_event_loop->context().monotonic_event_time;
879
880 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
881 ++pi1_pong_count;
882 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
883 });
884 pi2_event_loop->MakeWatcher(
885 "/test", [&pi2_event_loop, &pi2_ping_count,
886 &pi2_pong_count](const examples::Pong &pong) {
887 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
888 << pi2_event_loop->context().monotonic_remote_time << " -> "
889 << pi2_event_loop->context().monotonic_event_time;
890
891 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
892 ++pi2_pong_count;
893 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
894 });
895
896 log_reader_factory.Run();
897 EXPECT_EQ(pi1_ping_count, 2030);
898 EXPECT_EQ(pi2_ping_count, 2030);
899 EXPECT_EQ(pi1_pong_count, 2030);
900 EXPECT_EQ(pi2_pong_count, 2030);
901
902 reader.Deregister();
903}
904
905// Tests that we can read log files where the monotonic clocks drift and don't
906// match correctly. While we are here, also test that different ending times
907// also is readable.
908TEST_P(MultinodeLoggerTest, MismatchedClocks) {
909 // TODO(austin): Negate...
910 const chrono::nanoseconds initial_pi2_offset = chrono::seconds(1000);
911
912 time_converter_.AddMonotonic(
913 {BootTimestamp::epoch(), BootTimestamp::epoch() + initial_pi2_offset});
914 // Wait for 95 ms, (~0.1 seconds - 1/2 of the ping/pong period), and set the
915 // skew to be 200 uS/s
916 const chrono::nanoseconds startup_sleep1 = time_converter_.AddMonotonic(
917 {chrono::milliseconds(95),
918 chrono::milliseconds(95) - chrono::nanoseconds(200) * 95});
919 // Run another 200 ms to have one logger start first.
920 const chrono::nanoseconds startup_sleep2 = time_converter_.AddMonotonic(
921 {chrono::milliseconds(200), chrono::milliseconds(200)});
922 // Slew one way then the other at the same 200 uS/S slew rate. Make sure we
923 // go far enough to cause problems if this isn't accounted for.
924 const chrono::nanoseconds logger_run1 = time_converter_.AddMonotonic(
925 {chrono::milliseconds(20000),
926 chrono::milliseconds(20000) - chrono::nanoseconds(200) * 20000});
927 const chrono::nanoseconds logger_run2 = time_converter_.AddMonotonic(
928 {chrono::milliseconds(40000),
929 chrono::milliseconds(40000) + chrono::nanoseconds(200) * 40000});
930 const chrono::nanoseconds logger_run3 = time_converter_.AddMonotonic(
931 {chrono::milliseconds(400), chrono::milliseconds(400)});
932
933 {
934 LoggerState pi2_logger = MakeLogger(pi2_);
935
936 LOG(INFO) << "pi2 times: " << pi2_->monotonic_now() << " "
937 << pi2_->realtime_now() << " distributed "
938 << pi2_->ToDistributedClock(pi2_->monotonic_now());
939
940 LOG(INFO) << "pi2_ times: " << pi2_->monotonic_now() << " "
941 << pi2_->realtime_now() << " distributed "
942 << pi2_->ToDistributedClock(pi2_->monotonic_now());
943
944 event_loop_factory_.RunFor(startup_sleep1);
945
946 StartLogger(&pi2_logger);
947
948 event_loop_factory_.RunFor(startup_sleep2);
949
950 {
951 // Run pi1's logger for only part of the time.
952 LoggerState pi1_logger = MakeLogger(pi1_);
953
954 StartLogger(&pi1_logger);
955 event_loop_factory_.RunFor(logger_run1);
956
957 // Make sure we slewed time far enough so that the difference is greater
958 // than the network delay. This confirms that if we sort incorrectly, it
959 // would show in the results.
960 EXPECT_LT(
961 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
962 -event_loop_factory_.send_delay() -
963 event_loop_factory_.network_delay());
964
965 event_loop_factory_.RunFor(logger_run2);
966
967 // And now check that we went far enough the other way to make sure we
968 // cover both problems.
969 EXPECT_GT(
970 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
971 event_loop_factory_.send_delay() +
972 event_loop_factory_.network_delay());
973 }
974
975 // And log a bit more on pi2.
976 event_loop_factory_.RunFor(logger_run3);
977 }
978
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700979 const std::vector<LogFile> sorted_parts =
980 SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 3, 3));
981 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
982 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700983
984 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
985 log_reader_factory.set_send_delay(chrono::microseconds(0));
986
987 const Node *pi1 =
988 configuration::GetNode(log_reader_factory.configuration(), "pi1");
989 const Node *pi2 =
990 configuration::GetNode(log_reader_factory.configuration(), "pi2");
991
992 // This sends out the fetched messages and advances time to the start of the
993 // log file.
994 reader.Register(&log_reader_factory);
995
996 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
997 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
998 LOG(INFO) << "now pi1 "
999 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1000 LOG(INFO) << "now pi2 "
1001 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1002
1003 LOG(INFO) << "Done registering (pi1) "
1004 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
1005 << " "
1006 << log_reader_factory.GetNodeEventLoopFactory(pi1)->realtime_now();
1007 LOG(INFO) << "Done registering (pi2) "
1008 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now()
1009 << " "
1010 << log_reader_factory.GetNodeEventLoopFactory(pi2)->realtime_now();
1011
1012 EXPECT_THAT(reader.LoggedNodes(),
1013 ::testing::ElementsAre(
1014 configuration::GetNode(reader.logged_configuration(), pi1),
1015 configuration::GetNode(reader.logged_configuration(), pi2)));
1016
1017 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1018
1019 std::unique_ptr<EventLoop> pi1_event_loop =
1020 log_reader_factory.MakeEventLoop("test", pi1);
1021 std::unique_ptr<EventLoop> pi2_event_loop =
1022 log_reader_factory.MakeEventLoop("test", pi2);
1023
1024 int pi1_ping_count = 30;
1025 int pi2_ping_count = 30;
1026 int pi1_pong_count = 30;
1027 int pi2_pong_count = 30;
1028
1029 // Confirm that the ping value matches.
1030 pi1_event_loop->MakeWatcher(
1031 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
1032 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
1033 << pi1_event_loop->context().monotonic_remote_time << " -> "
1034 << pi1_event_loop->context().monotonic_event_time;
1035 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
1036
1037 ++pi1_ping_count;
1038 });
1039 pi2_event_loop->MakeWatcher(
1040 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
1041 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
1042 << pi2_event_loop->context().monotonic_remote_time << " -> "
1043 << pi2_event_loop->context().monotonic_event_time;
1044 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
1045
1046 ++pi2_ping_count;
1047 });
1048
1049 // Confirm that the ping and pong counts both match, and the value also
1050 // matches.
1051 pi1_event_loop->MakeWatcher(
1052 "/test", [&pi1_event_loop, &pi1_ping_count,
1053 &pi1_pong_count](const examples::Pong &pong) {
1054 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
1055 << pi1_event_loop->context().monotonic_remote_time << " -> "
1056 << pi1_event_loop->context().monotonic_event_time;
1057
1058 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
1059 ++pi1_pong_count;
1060 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
1061 });
1062 pi2_event_loop->MakeWatcher(
1063 "/test", [&pi2_event_loop, &pi2_ping_count,
1064 &pi2_pong_count](const examples::Pong &pong) {
1065 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
1066 << pi2_event_loop->context().monotonic_remote_time << " -> "
1067 << pi2_event_loop->context().monotonic_event_time;
1068
1069 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
1070 ++pi2_pong_count;
1071 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
1072 });
1073
1074 log_reader_factory.Run();
1075 EXPECT_EQ(pi1_ping_count, 6030);
1076 EXPECT_EQ(pi2_ping_count, 6030);
1077 EXPECT_EQ(pi1_pong_count, 6030);
1078 EXPECT_EQ(pi2_pong_count, 6030);
1079
1080 reader.Deregister();
1081}
1082
1083// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
1084TEST_P(MultinodeLoggerTest, SortParts) {
1085 time_converter_.StartEqual();
1086 // Make a bunch of parts.
1087 {
1088 LoggerState pi1_logger = MakeLogger(pi1_);
1089 LoggerState pi2_logger = MakeLogger(pi2_);
1090
1091 event_loop_factory_.RunFor(chrono::milliseconds(95));
1092
1093 StartLogger(&pi1_logger);
1094 StartLogger(&pi2_logger);
1095
1096 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1097 }
1098
1099 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1100 VerifyParts(sorted_parts);
1101}
1102
1103// Tests that we can sort a bunch of parts with an empty part. We should ignore
1104// it and remove it from the sorted list.
1105TEST_P(MultinodeLoggerTest, SortEmptyParts) {
1106 time_converter_.StartEqual();
1107 // Make a bunch of parts.
1108 {
1109 LoggerState pi1_logger = MakeLogger(pi1_);
1110 LoggerState pi2_logger = MakeLogger(pi2_);
1111
1112 event_loop_factory_.RunFor(chrono::milliseconds(95));
1113
1114 StartLogger(&pi1_logger);
1115 StartLogger(&pi2_logger);
1116
1117 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1118 }
1119
1120 // TODO(austin): Should we flip out if the file can't open?
1121 const std::string kEmptyFile("foobarinvalidfiledoesnotexist" + Extension());
1122
1123 aos::util::WriteStringToFileOrDie(kEmptyFile, "");
1124 logfiles_.emplace_back(kEmptyFile);
1125
1126 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1127 VerifyParts(sorted_parts, {kEmptyFile});
1128}
1129
1130// Tests that we can sort a bunch of parts with the end missing off a
1131// file. We should use the part we can read.
1132TEST_P(MultinodeLoggerTest, SortTruncatedParts) {
1133 std::vector<std::string> actual_filenames;
1134 time_converter_.StartEqual();
1135 // Make a bunch of parts.
1136 {
1137 LoggerState pi1_logger = MakeLogger(pi1_);
1138 LoggerState pi2_logger = MakeLogger(pi2_);
1139
1140 event_loop_factory_.RunFor(chrono::milliseconds(95));
1141
1142 StartLogger(&pi1_logger);
1143 StartLogger(&pi2_logger);
1144
1145 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1146
1147 pi1_logger.AppendAllFilenames(&actual_filenames);
1148 pi2_logger.AppendAllFilenames(&actual_filenames);
1149 }
1150
1151 ASSERT_THAT(actual_filenames,
1152 ::testing::UnorderedElementsAreArray(logfiles_));
1153
1154 // Strip off the end of one of the files. Pick one with a lot of data.
1155 // For snappy, needs to have enough data to be >1 chunk of compressed data so
1156 // that we don't corrupt the entire log part.
1157 ::std::string compressed_contents =
1158 aos::util::ReadFileToStringOrDie(logfiles_[4]);
1159
1160 aos::util::WriteStringToFileOrDie(
1161 logfiles_[4],
1162 compressed_contents.substr(0, compressed_contents.size() - 100));
1163
1164 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1165 VerifyParts(sorted_parts);
1166}
1167
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001168// Tests that if we remap a logged channel, it shows up correctly.
Naman Guptaa63aa132023-03-22 20:06:34 -07001169TEST_P(MultinodeLoggerTest, RemapLoggedChannel) {
1170 time_converter_.StartEqual();
1171 {
1172 LoggerState pi1_logger = MakeLogger(pi1_);
1173 LoggerState pi2_logger = MakeLogger(pi2_);
1174
1175 event_loop_factory_.RunFor(chrono::milliseconds(95));
1176
1177 StartLogger(&pi1_logger);
1178 StartLogger(&pi2_logger);
1179
1180 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1181 }
1182
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001183 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1184 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1185 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001186
1187 // Remap just on pi1.
1188 reader.RemapLoggedChannel<aos::timing::Report>(
1189 "/aos", configuration::GetNode(reader.configuration(), "pi1"));
1190
1191 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1192 log_reader_factory.set_send_delay(chrono::microseconds(0));
1193
1194 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1195 // Note: An extra channel gets remapped automatically due to a timestamp
1196 // channel being LOCAL_LOGGER'd.
1197 ASSERT_EQ(remapped_channels.size(), std::get<0>(GetParam()).shared ? 1u : 2u);
1198 EXPECT_EQ(remapped_channels[0]->name()->string_view(), "/original/pi1/aos");
1199 EXPECT_EQ(remapped_channels[0]->type()->string_view(), "aos.timing.Report");
1200 if (!std::get<0>(GetParam()).shared) {
1201 EXPECT_EQ(remapped_channels[1]->name()->string_view(),
1202 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1203 "aos-message_bridge-Timestamp");
1204 EXPECT_EQ(remapped_channels[1]->type()->string_view(),
1205 "aos.message_bridge.RemoteMessage");
1206 }
1207
1208 reader.Register(&log_reader_factory);
1209
1210 const Node *pi1 =
1211 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1212 const Node *pi2 =
1213 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1214
1215 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1216 // else should have moved.
1217 std::unique_ptr<EventLoop> pi1_event_loop =
1218 log_reader_factory.MakeEventLoop("test", pi1);
1219 pi1_event_loop->SkipTimingReport();
1220 std::unique_ptr<EventLoop> full_pi1_event_loop =
1221 log_reader_factory.MakeEventLoop("test", pi1);
1222 full_pi1_event_loop->SkipTimingReport();
1223 std::unique_ptr<EventLoop> pi2_event_loop =
1224 log_reader_factory.MakeEventLoop("test", pi2);
1225 pi2_event_loop->SkipTimingReport();
1226
1227 MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
1228 "/aos");
1229 MessageCounter<aos::timing::Report> full_pi1_timing_report(
1230 full_pi1_event_loop.get(), "/pi1/aos");
1231 MessageCounter<aos::timing::Report> pi1_original_timing_report(
1232 pi1_event_loop.get(), "/original/aos");
1233 MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
1234 full_pi1_event_loop.get(), "/original/pi1/aos");
1235 MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
1236 "/aos");
1237
1238 log_reader_factory.Run();
1239
1240 EXPECT_EQ(pi1_timing_report.count(), 0u);
1241 EXPECT_EQ(full_pi1_timing_report.count(), 0u);
1242 EXPECT_NE(pi1_original_timing_report.count(), 0u);
1243 EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
1244 EXPECT_NE(pi2_timing_report.count(), 0u);
1245
1246 reader.Deregister();
1247}
1248
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001249// Tests that if we rename a logged channel, it shows up correctly.
1250TEST_P(MultinodeLoggerTest, RenameLoggedChannel) {
1251 std::vector<std::string> actual_filenames;
1252 time_converter_.StartEqual();
1253 {
1254 LoggerState pi1_logger = MakeLogger(pi1_);
1255 LoggerState pi2_logger = MakeLogger(pi2_);
1256
1257 event_loop_factory_.RunFor(chrono::milliseconds(95));
1258
1259 StartLogger(&pi1_logger);
1260 StartLogger(&pi2_logger);
1261
1262 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1263
1264 pi1_logger.AppendAllFilenames(&actual_filenames);
1265 pi2_logger.AppendAllFilenames(&actual_filenames);
1266 }
1267
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001268 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1269 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1270 LogReader reader(sorted_parts);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001271
1272 // Rename just on pi2. Add some global maps just to verify they get added in
1273 // the config and used correctly.
1274 std::vector<MapT> maps;
1275 {
1276 MapT map;
1277 map.match = std::make_unique<ChannelT>();
1278 map.match->name = "/foo*";
1279 map.match->source_node = "pi1";
1280 map.rename = std::make_unique<ChannelT>();
1281 map.rename->name = "/pi1/foo";
1282 maps.emplace_back(std::move(map));
1283 }
1284 {
1285 MapT map;
1286 map.match = std::make_unique<ChannelT>();
1287 map.match->name = "/foo*";
1288 map.match->source_node = "pi2";
1289 map.rename = std::make_unique<ChannelT>();
1290 map.rename->name = "/pi2/foo";
1291 maps.emplace_back(std::move(map));
1292 }
1293 {
1294 MapT map;
1295 map.match = std::make_unique<ChannelT>();
1296 map.match->name = "/foo";
1297 map.match->type = "aos.examples.Ping";
1298 map.rename = std::make_unique<ChannelT>();
1299 map.rename->name = "/foo/renamed";
1300 maps.emplace_back(std::move(map));
1301 }
1302 reader.RenameLoggedChannel<aos::examples::Ping>(
1303 "/aos", configuration::GetNode(reader.configuration(), "pi2"),
1304 "/pi2/foo/renamed", maps);
1305
1306 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1307 log_reader_factory.set_send_delay(chrono::microseconds(0));
1308
1309 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1310 // Note: An extra channel gets remapped automatically due to a timestamp
1311 // channel being LOCAL_LOGGER'd.
1312 const bool shared = std::get<0>(GetParam()).shared;
1313 ASSERT_EQ(remapped_channels.size(), shared ? 1u : 2u);
1314 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->name()->string_view(),
1315 "/pi2/foo/renamed");
1316 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->type()->string_view(),
1317 "aos.examples.Ping");
1318 if (!shared) {
1319 EXPECT_EQ(remapped_channels[0]->name()->string_view(),
1320 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1321 "aos-message_bridge-Timestamp");
1322 EXPECT_EQ(remapped_channels[0]->type()->string_view(),
1323 "aos.message_bridge.RemoteMessage");
1324 }
1325
1326 reader.Register(&log_reader_factory);
1327
1328 const Node *pi1 =
1329 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1330 const Node *pi2 =
1331 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1332
1333 // Confirm we can read the data on the renamed channel, just for pi2. Nothing
1334 // else should have moved.
1335 std::unique_ptr<EventLoop> pi2_event_loop =
1336 log_reader_factory.MakeEventLoop("test", pi2);
1337 pi2_event_loop->SkipTimingReport();
1338 std::unique_ptr<EventLoop> full_pi2_event_loop =
1339 log_reader_factory.MakeEventLoop("test", pi2);
1340 full_pi2_event_loop->SkipTimingReport();
1341 std::unique_ptr<EventLoop> pi1_event_loop =
1342 log_reader_factory.MakeEventLoop("test", pi1);
1343 pi1_event_loop->SkipTimingReport();
1344
1345 MessageCounter<aos::examples::Ping> pi2_ping(pi2_event_loop.get(), "/aos");
1346 MessageCounter<aos::examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1347 "/foo");
1348 MessageCounter<aos::examples::Ping> full_pi2_renamed_ping(
1349 full_pi2_event_loop.get(), "/pi2/foo/renamed");
1350 MessageCounter<aos::examples::Ping> pi1_ping(pi1_event_loop.get(), "/aos");
1351
1352 log_reader_factory.Run();
1353
1354 EXPECT_EQ(pi2_ping.count(), 0u);
1355 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1356 EXPECT_NE(full_pi2_renamed_ping.count(), 0u);
1357 EXPECT_NE(pi1_ping.count(), 0u);
1358
1359 reader.Deregister();
1360}
1361
Naman Guptaa63aa132023-03-22 20:06:34 -07001362// Tests that we can remap a forwarded channel as well.
1363TEST_P(MultinodeLoggerTest, RemapForwardedLoggedChannel) {
1364 time_converter_.StartEqual();
1365 {
1366 LoggerState pi1_logger = MakeLogger(pi1_);
1367 LoggerState pi2_logger = MakeLogger(pi2_);
1368
1369 event_loop_factory_.RunFor(chrono::milliseconds(95));
1370
1371 StartLogger(&pi1_logger);
1372 StartLogger(&pi2_logger);
1373
1374 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1375 }
1376
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001377 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1378 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1379 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001380
1381 reader.RemapLoggedChannel<examples::Ping>("/test");
1382
1383 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1384 log_reader_factory.set_send_delay(chrono::microseconds(0));
1385
1386 reader.Register(&log_reader_factory);
1387
1388 const Node *pi1 =
1389 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1390 const Node *pi2 =
1391 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1392
1393 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1394 // else should have moved.
1395 std::unique_ptr<EventLoop> pi1_event_loop =
1396 log_reader_factory.MakeEventLoop("test", pi1);
1397 pi1_event_loop->SkipTimingReport();
1398 std::unique_ptr<EventLoop> full_pi1_event_loop =
1399 log_reader_factory.MakeEventLoop("test", pi1);
1400 full_pi1_event_loop->SkipTimingReport();
1401 std::unique_ptr<EventLoop> pi2_event_loop =
1402 log_reader_factory.MakeEventLoop("test", pi2);
1403 pi2_event_loop->SkipTimingReport();
1404
1405 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1406 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1407 MessageCounter<examples::Ping> pi1_original_ping(pi1_event_loop.get(),
1408 "/original/test");
1409 MessageCounter<examples::Ping> pi2_original_ping(pi2_event_loop.get(),
1410 "/original/test");
1411
1412 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1413 pi1_original_ping_timestamp;
1414 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1415 pi1_ping_timestamp;
1416 if (!shared()) {
1417 pi1_original_ping_timestamp =
1418 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1419 pi1_event_loop.get(),
1420 "/pi1/aos/remote_timestamps/pi2/original/test/aos-examples-Ping");
1421 pi1_ping_timestamp =
1422 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1423 pi1_event_loop.get(),
1424 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1425 }
1426
1427 log_reader_factory.Run();
1428
1429 EXPECT_EQ(pi1_ping.count(), 0u);
1430 EXPECT_EQ(pi2_ping.count(), 0u);
1431 EXPECT_NE(pi1_original_ping.count(), 0u);
1432 EXPECT_NE(pi2_original_ping.count(), 0u);
1433 if (!shared()) {
1434 EXPECT_NE(pi1_original_ping_timestamp->count(), 0u);
1435 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1436 }
1437
1438 reader.Deregister();
1439}
1440
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001441// Tests that we can rename a forwarded channel as well.
1442TEST_P(MultinodeLoggerTest, RenameForwardedLoggedChannel) {
1443 std::vector<std::string> actual_filenames;
1444 time_converter_.StartEqual();
1445 {
1446 LoggerState pi1_logger = MakeLogger(pi1_);
1447 LoggerState pi2_logger = MakeLogger(pi2_);
1448
1449 event_loop_factory_.RunFor(chrono::milliseconds(95));
1450
1451 StartLogger(&pi1_logger);
1452 StartLogger(&pi2_logger);
1453
1454 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1455
1456 pi1_logger.AppendAllFilenames(&actual_filenames);
1457 pi2_logger.AppendAllFilenames(&actual_filenames);
1458 }
1459
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001460 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1461 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1462 LogReader reader(sorted_parts);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001463
1464 std::vector<MapT> maps;
1465 {
1466 MapT map;
1467 map.match = std::make_unique<ChannelT>();
1468 map.match->name = "/production*";
1469 map.match->source_node = "pi1";
1470 map.rename = std::make_unique<ChannelT>();
1471 map.rename->name = "/pi1/production";
1472 maps.emplace_back(std::move(map));
1473 }
1474 {
1475 MapT map;
1476 map.match = std::make_unique<ChannelT>();
1477 map.match->name = "/production*";
1478 map.match->source_node = "pi2";
1479 map.rename = std::make_unique<ChannelT>();
1480 map.rename->name = "/pi2/production";
1481 maps.emplace_back(std::move(map));
1482 }
1483 reader.RenameLoggedChannel<aos::examples::Ping>(
1484 "/test", configuration::GetNode(reader.configuration(), "pi1"),
1485 "/pi1/production", maps);
1486
1487 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1488 log_reader_factory.set_send_delay(chrono::microseconds(0));
1489
1490 reader.Register(&log_reader_factory);
1491
1492 const Node *pi1 =
1493 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1494 const Node *pi2 =
1495 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1496
1497 // Confirm we can read the data on the renamed channel, on both the source
1498 // node and the remote node. In case of split timestamp channels, confirm that
1499 // we receive the timestamp messages on the renamed channel as well.
1500 std::unique_ptr<EventLoop> pi1_event_loop =
1501 log_reader_factory.MakeEventLoop("test", pi1);
1502 pi1_event_loop->SkipTimingReport();
1503 std::unique_ptr<EventLoop> full_pi1_event_loop =
1504 log_reader_factory.MakeEventLoop("test", pi1);
1505 full_pi1_event_loop->SkipTimingReport();
1506 std::unique_ptr<EventLoop> pi2_event_loop =
1507 log_reader_factory.MakeEventLoop("test", pi2);
1508 pi2_event_loop->SkipTimingReport();
1509
1510 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1511 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1512 MessageCounter<examples::Ping> pi1_renamed_ping(pi1_event_loop.get(),
1513 "/pi1/production");
1514 MessageCounter<examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1515 "/pi1/production");
1516
1517 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1518 pi1_renamed_ping_timestamp;
1519 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1520 pi1_ping_timestamp;
1521 if (!shared()) {
1522 pi1_renamed_ping_timestamp =
1523 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1524 pi1_event_loop.get(),
1525 "/pi1/aos/remote_timestamps/pi2/pi1/production/aos-examples-Ping");
1526 pi1_ping_timestamp =
1527 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1528 pi1_event_loop.get(),
1529 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1530 }
1531
1532 log_reader_factory.Run();
1533
1534 EXPECT_EQ(pi1_ping.count(), 0u);
1535 EXPECT_EQ(pi2_ping.count(), 0u);
1536 EXPECT_NE(pi1_renamed_ping.count(), 0u);
1537 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1538 if (!shared()) {
1539 EXPECT_NE(pi1_renamed_ping_timestamp->count(), 0u);
1540 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1541 }
1542
1543 reader.Deregister();
1544}
1545
Naman Guptaa63aa132023-03-22 20:06:34 -07001546// Tests that we observe all the same events in log replay (for a given node)
1547// whether we just register an event loop for that node or if we register a full
1548// event loop factory.
1549TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
1550 time_converter_.StartEqual();
1551 constexpr chrono::milliseconds kStartupDelay(95);
1552 {
1553 LoggerState pi1_logger = MakeLogger(pi1_);
1554 LoggerState pi2_logger = MakeLogger(pi2_);
1555
1556 event_loop_factory_.RunFor(kStartupDelay);
1557
1558 StartLogger(&pi1_logger);
1559 StartLogger(&pi2_logger);
1560
1561 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1562 }
1563
1564 LogReader full_reader(SortParts(logfiles_));
1565 LogReader single_node_reader(SortParts(logfiles_));
1566
1567 SimulatedEventLoopFactory full_factory(full_reader.configuration());
1568 SimulatedEventLoopFactory single_node_factory(
1569 single_node_reader.configuration());
1570 single_node_factory.SkipTimingReport();
1571 single_node_factory.DisableStatistics();
1572 std::unique_ptr<EventLoop> replay_event_loop =
1573 single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
1574 "log_reader");
1575
1576 full_reader.Register(&full_factory);
1577 single_node_reader.Register(replay_event_loop.get());
1578
1579 const Node *full_pi1 =
1580 configuration::GetNode(full_factory.configuration(), "pi1");
1581
1582 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1583 // else should have moved.
1584 std::unique_ptr<EventLoop> full_event_loop =
1585 full_factory.MakeEventLoop("test", full_pi1);
1586 full_event_loop->SkipTimingReport();
1587 full_event_loop->SkipAosLog();
1588 // maps are indexed on channel index.
1589 // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
1590 std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
1591 observed_messages;
1592 std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
1593 for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
1594 ++ii) {
1595 const Channel *channel =
1596 full_event_loop->configuration()->channels()->Get(ii);
1597 // We currently don't support replaying remote timestamp channels in
1598 // realtime replay (unless the remote timestamp channel was not NOT_LOGGED,
1599 // in which case it gets auto-remapped and replayed on a /original channel).
1600 if (channel->name()->string_view().find("remote_timestamp") !=
1601 std::string_view::npos &&
1602 channel->name()->string_view().find("/original") ==
1603 std::string_view::npos) {
1604 continue;
1605 }
1606 if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
1607 observed_messages[ii] = {};
1608 fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
1609 full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
1610 if (fetchers[ii]->Fetch()) {
1611 observed_messages[ii].push_back(std::make_pair(
1612 fetchers[ii]->context().monotonic_event_time, true));
1613 }
1614 });
1615 full_event_loop->MakeRawNoArgWatcher(
1616 channel, [ii, &observed_messages](const Context &context) {
1617 observed_messages[ii].push_back(
1618 std::make_pair(context.monotonic_event_time, false));
1619 });
1620 }
1621 }
1622
1623 full_factory.Run();
1624 fetchers.clear();
1625 full_reader.Deregister();
1626
1627 const Node *single_node_pi1 =
1628 configuration::GetNode(single_node_factory.configuration(), "pi1");
1629 std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
1630
1631 std::unique_ptr<EventLoop> single_node_event_loop =
1632 single_node_factory.MakeEventLoop("test", single_node_pi1);
1633 single_node_event_loop->SkipTimingReport();
1634 single_node_event_loop->SkipAosLog();
1635 for (size_t ii = 0;
1636 ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
1637 const Channel *channel =
1638 single_node_event_loop->configuration()->channels()->Get(ii);
1639 single_node_factory.DisableForwarding(channel);
1640 if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
1641 single_node_fetchers[ii] =
1642 single_node_event_loop->MakeRawFetcher(channel);
1643 single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
1644 EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
1645 << "Single EventLoop replay doesn't support pre-loading fetchers. "
1646 << configuration::StrippedChannelToString(channel);
1647 });
1648 single_node_event_loop->MakeRawNoArgWatcher(
1649 channel, [ii, &observed_messages, channel,
1650 kStartupDelay](const Context &context) {
1651 if (observed_messages[ii].empty()) {
1652 FAIL() << "Observed extra message at "
1653 << context.monotonic_event_time << " on "
1654 << configuration::StrippedChannelToString(channel);
1655 return;
1656 }
1657 const std::pair<monotonic_clock::time_point, bool> &message =
1658 observed_messages[ii].front();
1659 if (message.second) {
1660 EXPECT_LE(message.first,
1661 context.monotonic_event_time + kStartupDelay)
1662 << "Mismatched message times " << context.monotonic_event_time
1663 << " and " << message.first << " on "
1664 << configuration::StrippedChannelToString(channel);
1665 } else {
1666 EXPECT_EQ(message.first,
1667 context.monotonic_event_time + kStartupDelay)
1668 << "Mismatched message times " << context.monotonic_event_time
1669 << " and " << message.first << " on "
1670 << configuration::StrippedChannelToString(channel);
1671 }
1672 observed_messages[ii].erase(observed_messages[ii].begin());
1673 });
1674 }
1675 }
1676
1677 single_node_factory.Run();
1678
1679 single_node_fetchers.clear();
1680
1681 single_node_reader.Deregister();
1682
1683 for (const auto &pair : observed_messages) {
1684 EXPECT_TRUE(pair.second.empty())
1685 << "Missed " << pair.second.size() << " messages on "
1686 << configuration::StrippedChannelToString(
1687 single_node_event_loop->configuration()->channels()->Get(
1688 pair.first));
1689 }
1690}
1691
1692// Tests that we properly recreate forwarded timestamps when replaying a log.
1693// This should be enough that we can then re-run the logger and get a valid log
1694// back.
1695TEST_P(MultinodeLoggerTest, MessageHeader) {
1696 time_converter_.StartEqual();
1697 {
1698 LoggerState pi1_logger = MakeLogger(pi1_);
1699 LoggerState pi2_logger = MakeLogger(pi2_);
1700
1701 event_loop_factory_.RunFor(chrono::milliseconds(95));
1702
1703 StartLogger(&pi1_logger);
1704 StartLogger(&pi2_logger);
1705
1706 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1707 }
1708
1709 LogReader reader(SortParts(logfiles_));
1710
1711 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1712 log_reader_factory.set_send_delay(chrono::microseconds(0));
1713
1714 // This sends out the fetched messages and advances time to the start of the
1715 // log file.
1716 reader.Register(&log_reader_factory);
1717
1718 const Node *pi1 =
1719 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1720 const Node *pi2 =
1721 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1722
1723 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1724 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1725 LOG(INFO) << "now pi1 "
1726 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1727 LOG(INFO) << "now pi2 "
1728 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1729
1730 EXPECT_THAT(reader.LoggedNodes(),
1731 ::testing::ElementsAre(
1732 configuration::GetNode(reader.logged_configuration(), pi1),
1733 configuration::GetNode(reader.logged_configuration(), pi2)));
1734
1735 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1736
1737 std::unique_ptr<EventLoop> pi1_event_loop =
1738 log_reader_factory.MakeEventLoop("test", pi1);
1739 std::unique_ptr<EventLoop> pi2_event_loop =
1740 log_reader_factory.MakeEventLoop("test", pi2);
1741
1742 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
1743 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1744 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
1745 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
1746
1747 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
1748 pi1_event_loop->MakeFetcher<examples::Ping>("/test");
1749 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
1750 pi2_event_loop->MakeFetcher<examples::Ping>("/test");
1751
1752 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
1753 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1754 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
1755 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
1756
1757 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
1758 pi2_event_loop->MakeFetcher<examples::Pong>("/test");
1759 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
1760 pi1_event_loop->MakeFetcher<examples::Pong>("/test");
1761
1762 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
1763 pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
1764 const size_t ping_timestamp_channel = configuration::ChannelIndex(
1765 pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
1766
1767 const size_t pi2_timestamp_channel = configuration::ChannelIndex(
1768 pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
1769 const size_t pong_timestamp_channel = configuration::ChannelIndex(
1770 pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
1771
1772 const chrono::nanoseconds network_delay = event_loop_factory_.network_delay();
1773 const chrono::nanoseconds send_delay = event_loop_factory_.send_delay();
1774
1775 for (std::pair<int, std::string> channel :
1776 shared()
1777 ? std::vector<
1778 std::pair<int, std::string>>{{-1,
1779 "/aos/remote_timestamps/pi2"}}
1780 : std::vector<std::pair<int, std::string>>{
1781 {pi1_timestamp_channel,
1782 "/aos/remote_timestamps/pi2/pi1/aos/"
1783 "aos-message_bridge-Timestamp"},
1784 {ping_timestamp_channel,
1785 "/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
1786 pi1_event_loop->MakeWatcher(
1787 channel.second,
1788 [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
1789 ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
1790 &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
1791 &ping_on_pi2_fetcher, network_delay, send_delay,
1792 channel_index = channel.first](const RemoteMessage &header) {
1793 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1794 chrono::nanoseconds(header.monotonic_sent_time()));
1795 const aos::realtime_clock::time_point header_realtime_sent_time(
1796 chrono::nanoseconds(header.realtime_sent_time()));
1797 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1798 chrono::nanoseconds(header.monotonic_remote_time()));
1799 const aos::realtime_clock::time_point header_realtime_remote_time(
1800 chrono::nanoseconds(header.realtime_remote_time()));
1801
1802 if (channel_index != -1) {
1803 ASSERT_EQ(channel_index, header.channel_index());
1804 }
1805
1806 const Context *pi1_context = nullptr;
1807 const Context *pi2_context = nullptr;
1808
1809 if (header.channel_index() == pi1_timestamp_channel) {
1810 ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
1811 ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
1812 pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
1813 pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
1814 } else if (header.channel_index() == ping_timestamp_channel) {
1815 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
1816 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
1817 pi1_context = &ping_on_pi1_fetcher.context();
1818 pi2_context = &ping_on_pi2_fetcher.context();
1819 } else {
1820 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1821 << configuration::CleanedChannelToString(
1822 pi1_event_loop->configuration()->channels()->Get(
1823 header.channel_index()));
1824 }
1825
1826 ASSERT_TRUE(header.has_boot_uuid());
1827 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1828 pi2_event_loop->boot_uuid());
1829
1830 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
1831 EXPECT_EQ(pi2_context->remote_queue_index,
1832 header.remote_queue_index());
1833 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
1834
1835 EXPECT_EQ(pi2_context->monotonic_event_time,
1836 header_monotonic_sent_time);
1837 EXPECT_EQ(pi2_context->realtime_event_time,
1838 header_realtime_sent_time);
1839 EXPECT_EQ(pi2_context->realtime_remote_time,
1840 header_realtime_remote_time);
1841 EXPECT_EQ(pi2_context->monotonic_remote_time,
1842 header_monotonic_remote_time);
1843
1844 EXPECT_EQ(pi1_context->realtime_event_time,
1845 header_realtime_remote_time);
1846 EXPECT_EQ(pi1_context->monotonic_event_time,
1847 header_monotonic_remote_time);
1848
1849 // Time estimation isn't perfect, but we know the clocks were
1850 // identical when logged, so we know when this should have come back.
1851 // Confirm we got it when we expected.
1852 EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
1853 pi1_context->monotonic_event_time + 2 * network_delay +
1854 send_delay);
1855 });
1856 }
1857 for (std::pair<int, std::string> channel :
1858 shared()
1859 ? std::vector<
1860 std::pair<int, std::string>>{{-1,
1861 "/aos/remote_timestamps/pi1"}}
1862 : std::vector<std::pair<int, std::string>>{
1863 {pi2_timestamp_channel,
1864 "/aos/remote_timestamps/pi1/pi2/aos/"
1865 "aos-message_bridge-Timestamp"}}) {
1866 pi2_event_loop->MakeWatcher(
1867 channel.second,
1868 [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
1869 pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
1870 &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
1871 &pong_on_pi1_fetcher, network_delay, send_delay,
1872 channel_index = channel.first](const RemoteMessage &header) {
1873 const aos::monotonic_clock::time_point header_monotonic_sent_time(
1874 chrono::nanoseconds(header.monotonic_sent_time()));
1875 const aos::realtime_clock::time_point header_realtime_sent_time(
1876 chrono::nanoseconds(header.realtime_sent_time()));
1877 const aos::monotonic_clock::time_point header_monotonic_remote_time(
1878 chrono::nanoseconds(header.monotonic_remote_time()));
1879 const aos::realtime_clock::time_point header_realtime_remote_time(
1880 chrono::nanoseconds(header.realtime_remote_time()));
1881
1882 if (channel_index != -1) {
1883 ASSERT_EQ(channel_index, header.channel_index());
1884 }
1885
1886 const Context *pi2_context = nullptr;
1887 const Context *pi1_context = nullptr;
1888
1889 if (header.channel_index() == pi2_timestamp_channel) {
1890 ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
1891 ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
1892 pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
1893 pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
1894 } else if (header.channel_index() == pong_timestamp_channel) {
1895 ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
1896 ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
1897 pi2_context = &pong_on_pi2_fetcher.context();
1898 pi1_context = &pong_on_pi1_fetcher.context();
1899 } else {
1900 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
1901 << configuration::CleanedChannelToString(
1902 pi2_event_loop->configuration()->channels()->Get(
1903 header.channel_index()));
1904 }
1905
1906 ASSERT_TRUE(header.has_boot_uuid());
1907 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
1908 pi1_event_loop->boot_uuid());
1909
1910 EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
1911 EXPECT_EQ(pi1_context->remote_queue_index,
1912 header.remote_queue_index());
1913 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
1914
1915 EXPECT_EQ(pi1_context->monotonic_event_time,
1916 header_monotonic_sent_time);
1917 EXPECT_EQ(pi1_context->realtime_event_time,
1918 header_realtime_sent_time);
1919 EXPECT_EQ(pi1_context->realtime_remote_time,
1920 header_realtime_remote_time);
1921 EXPECT_EQ(pi1_context->monotonic_remote_time,
1922 header_monotonic_remote_time);
1923
1924 EXPECT_EQ(pi2_context->realtime_event_time,
1925 header_realtime_remote_time);
1926 EXPECT_EQ(pi2_context->monotonic_event_time,
1927 header_monotonic_remote_time);
1928
1929 // Time estimation isn't perfect, but we know the clocks were
1930 // identical when logged, so we know when this should have come back.
1931 // Confirm we got it when we expected.
1932 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
1933 pi2_context->monotonic_event_time + 2 * network_delay +
1934 send_delay);
1935 });
1936 }
1937
1938 // And confirm we can re-create a log again, while checking the contents.
1939 {
1940 LoggerState pi1_logger = MakeLogger(
1941 log_reader_factory.GetNodeEventLoopFactory("pi1"), &log_reader_factory);
1942 LoggerState pi2_logger = MakeLogger(
1943 log_reader_factory.GetNodeEventLoopFactory("pi2"), &log_reader_factory);
1944
1945 StartLogger(&pi1_logger, tmp_dir_ + "/relogged1");
1946 StartLogger(&pi2_logger, tmp_dir_ + "/relogged2");
1947
1948 log_reader_factory.Run();
1949 }
1950
1951 reader.Deregister();
1952
1953 // And verify that we can run the LogReader over the relogged files without
1954 // hitting any fatal errors.
1955 {
1956 LogReader relogged_reader(SortParts(MakeLogFiles(
1957 tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 3, 3, true)));
1958 relogged_reader.Register();
1959
1960 relogged_reader.event_loop_factory()->Run();
1961 }
1962 // And confirm that we can read the logged file using the reader's
1963 // configuration.
1964 {
1965 LogReader relogged_reader(
1966 SortParts(MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2",
1967 3, 3, true)),
1968 reader.configuration());
1969 relogged_reader.Register();
1970
1971 relogged_reader.event_loop_factory()->Run();
1972 }
1973}
1974
1975// Tests that we properly populate and extract the logger_start time by setting
1976// up a clock difference between 2 nodes and looking at the resulting parts.
1977TEST_P(MultinodeLoggerTest, LoggerStartTime) {
1978 std::vector<std::string> actual_filenames;
1979 time_converter_.AddMonotonic(
1980 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
1981 {
1982 LoggerState pi1_logger = MakeLogger(pi1_);
1983 LoggerState pi2_logger = MakeLogger(pi2_);
1984
1985 StartLogger(&pi1_logger);
1986 StartLogger(&pi2_logger);
1987
1988 event_loop_factory_.RunFor(chrono::milliseconds(10000));
1989
1990 pi1_logger.AppendAllFilenames(&actual_filenames);
1991 pi2_logger.AppendAllFilenames(&actual_filenames);
1992 }
1993
1994 ASSERT_THAT(actual_filenames,
1995 ::testing::UnorderedElementsAreArray(logfiles_));
1996
1997 for (const LogFile &log_file : SortParts(logfiles_)) {
1998 for (const LogParts &log_part : log_file.parts) {
1999 if (log_part.node == log_file.logger_node) {
2000 EXPECT_EQ(log_part.logger_monotonic_start_time,
2001 aos::monotonic_clock::min_time);
2002 EXPECT_EQ(log_part.logger_realtime_start_time,
2003 aos::realtime_clock::min_time);
2004 } else {
2005 const chrono::seconds offset = log_file.logger_node == "pi1"
2006 ? -chrono::seconds(1000)
2007 : chrono::seconds(1000);
2008 EXPECT_EQ(log_part.logger_monotonic_start_time,
2009 log_part.monotonic_start_time + offset);
2010 EXPECT_EQ(log_part.logger_realtime_start_time,
2011 log_file.realtime_start_time +
2012 (log_part.logger_monotonic_start_time -
2013 log_file.monotonic_start_time));
2014 }
2015 }
2016 }
2017}
2018
2019// Test that renaming the base, renames the folder.
2020TEST_P(MultinodeLoggerTest, LoggerRenameFolder) {
2021 util::UnlinkRecursive(tmp_dir_ + "/renamefolder");
2022 util::UnlinkRecursive(tmp_dir_ + "/new-good");
2023 time_converter_.AddMonotonic(
2024 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2025 logfile_base1_ = tmp_dir_ + "/renamefolder/multi_logfile1";
2026 logfile_base2_ = tmp_dir_ + "/renamefolder/multi_logfile2";
2027 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
2028 LoggerState pi1_logger = MakeLogger(pi1_);
2029 LoggerState pi2_logger = MakeLogger(pi2_);
2030
2031 StartLogger(&pi1_logger);
2032 StartLogger(&pi2_logger);
2033
2034 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2035 logfile_base1_ = tmp_dir_ + "/new-good/multi_logfile1";
2036 logfile_base2_ = tmp_dir_ + "/new-good/multi_logfile2";
2037 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
Alexei Strotscaf17d32023-04-03 22:31:11 -07002038
2039 // Sequence of set_base_name and Rotate simulates rename operation. Since
2040 // rename is not supported by all namers, RenameLogBase moved from logger to
2041 // the higher level abstraction, yet log_namers support rename, and it is
2042 // legal to test it here.
2043 pi1_logger.log_namer->set_base_name(logfile_base1_);
2044 pi1_logger.logger->Rotate();
2045 pi2_logger.log_namer->set_base_name(logfile_base2_);
2046 pi2_logger.logger->Rotate();
2047
Naman Guptaa63aa132023-03-22 20:06:34 -07002048 for (auto &file : logfiles_) {
2049 struct stat s;
2050 EXPECT_EQ(0, stat(file.c_str(), &s));
2051 }
2052}
2053
2054// Test that renaming the file base dies.
2055TEST_P(MultinodeLoggerDeathTest, LoggerRenameFile) {
2056 time_converter_.AddMonotonic(
2057 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2058 util::UnlinkRecursive(tmp_dir_ + "/renamefile");
2059 logfile_base1_ = tmp_dir_ + "/renamefile/multi_logfile1";
2060 logfile_base2_ = tmp_dir_ + "/renamefile/multi_logfile2";
2061 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
2062 LoggerState pi1_logger = MakeLogger(pi1_);
2063 StartLogger(&pi1_logger);
2064 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2065 logfile_base1_ = tmp_dir_ + "/new-renamefile/new_multi_logfile1";
Alexei Strotscaf17d32023-04-03 22:31:11 -07002066 EXPECT_DEATH({ pi1_logger.log_namer->set_base_name(logfile_base1_); },
Naman Guptaa63aa132023-03-22 20:06:34 -07002067 "Rename of file base from");
2068}
2069
2070// TODO(austin): We can write a test which recreates a logfile and confirms that
2071// we get it back. That is the ultimate test.
2072
2073// Tests that we properly recreate forwarded timestamps when replaying a log.
2074// This should be enough that we can then re-run the logger and get a valid log
2075// back.
2076TEST_P(MultinodeLoggerTest, RemoteReboot) {
2077 std::vector<std::string> actual_filenames;
2078
2079 const UUID pi1_boot0 = UUID::Random();
2080 const UUID pi2_boot0 = UUID::Random();
2081 const UUID pi2_boot1 = UUID::Random();
2082 {
2083 CHECK_EQ(pi1_index_, 0u);
2084 CHECK_EQ(pi2_index_, 1u);
2085
2086 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2087 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2088 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2089
2090 time_converter_.AddNextTimestamp(
2091 distributed_clock::epoch(),
2092 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2093 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2094 time_converter_.AddNextTimestamp(
2095 distributed_clock::epoch() + reboot_time,
2096 {BootTimestamp::epoch() + reboot_time,
2097 BootTimestamp{
2098 .boot = 1,
2099 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2100 }
2101
2102 {
2103 LoggerState pi1_logger = MakeLogger(pi1_);
2104
2105 event_loop_factory_.RunFor(chrono::milliseconds(95));
2106 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2107 pi1_boot0);
2108 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2109 pi2_boot0);
2110
2111 StartLogger(&pi1_logger);
2112
2113 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2114
2115 VLOG(1) << "Reboot now!";
2116
2117 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2118 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2119 pi1_boot0);
2120 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2121 pi2_boot1);
2122
2123 pi1_logger.AppendAllFilenames(&actual_filenames);
2124 }
2125
2126 std::sort(actual_filenames.begin(), actual_filenames.end());
2127 std::sort(pi1_reboot_logfiles_.begin(), pi1_reboot_logfiles_.end());
2128 ASSERT_THAT(actual_filenames,
2129 ::testing::UnorderedElementsAreArray(pi1_reboot_logfiles_));
2130
2131 // Confirm that our new oldest timestamps properly update as we reboot and
2132 // rotate.
2133 for (const std::string &file : pi1_reboot_logfiles_) {
2134 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2135 ReadHeader(file);
2136 CHECK(log_header);
2137 if (log_header->message().has_configuration()) {
2138 continue;
2139 }
2140
2141 const monotonic_clock::time_point monotonic_start_time =
2142 monotonic_clock::time_point(
2143 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2144 const UUID source_node_boot_uuid = UUID::FromString(
2145 log_header->message().source_node_boot_uuid()->string_view());
2146
2147 if (log_header->message().node()->name()->string_view() != "pi1") {
2148 // The remote message channel should rotate later and have more parts.
2149 // This only is true on the log files with shared remote messages.
2150 //
2151 // TODO(austin): I'm not the most thrilled with this test pattern... It
2152 // feels brittle in a different way.
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002153 if (file.find("source_pi1_timestamp_pi2") == std::string::npos) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002154 switch (log_header->message().parts_index()) {
2155 case 0:
2156 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2157 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2158 break;
2159 case 1:
2160 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2161 ASSERT_EQ(monotonic_start_time,
2162 monotonic_clock::epoch() + chrono::seconds(1));
2163 break;
2164 case 2:
2165 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2166 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2167 break;
2168 case 3:
2169 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2170 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
2171 chrono::nanoseconds(2322999462))
2172 << " on " << file;
2173 break;
2174 default:
2175 FAIL();
2176 break;
2177 }
2178 } else {
2179 switch (log_header->message().parts_index()) {
2180 case 0:
2181 case 1:
2182 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2183 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2184 break;
2185 case 2:
2186 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2187 ASSERT_EQ(monotonic_start_time,
2188 monotonic_clock::epoch() + chrono::seconds(1));
2189 break;
2190 case 3:
2191 case 4:
2192 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2193 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2194 break;
2195 case 5:
2196 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2197 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
2198 chrono::nanoseconds(2322999462))
2199 << " on " << file;
2200 break;
2201 default:
2202 FAIL();
2203 break;
2204 }
2205 }
2206 continue;
2207 }
2208 SCOPED_TRACE(file);
2209 SCOPED_TRACE(aos::FlatbufferToJson(
2210 *log_header, {.multi_line = true, .max_vector_size = 100}));
2211 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2212 ASSERT_EQ(
2213 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2214 EXPECT_EQ(
2215 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2216 monotonic_clock::max_time.time_since_epoch().count());
2217 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2218 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2219 2u);
2220 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2221 monotonic_clock::max_time.time_since_epoch().count());
2222 ASSERT_TRUE(log_header->message()
2223 .has_oldest_remote_unreliable_monotonic_timestamps());
2224 ASSERT_EQ(log_header->message()
2225 .oldest_remote_unreliable_monotonic_timestamps()
2226 ->size(),
2227 2u);
2228 EXPECT_EQ(log_header->message()
2229 .oldest_remote_unreliable_monotonic_timestamps()
2230 ->Get(0),
2231 monotonic_clock::max_time.time_since_epoch().count());
2232 ASSERT_TRUE(log_header->message()
2233 .has_oldest_local_unreliable_monotonic_timestamps());
2234 ASSERT_EQ(log_header->message()
2235 .oldest_local_unreliable_monotonic_timestamps()
2236 ->size(),
2237 2u);
2238 EXPECT_EQ(log_header->message()
2239 .oldest_local_unreliable_monotonic_timestamps()
2240 ->Get(0),
2241 monotonic_clock::max_time.time_since_epoch().count());
2242
2243 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2244 monotonic_clock::time_point(chrono::nanoseconds(
2245 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2246 1)));
2247 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2248 monotonic_clock::time_point(chrono::nanoseconds(
2249 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2250 const monotonic_clock::time_point
2251 oldest_remote_unreliable_monotonic_timestamps =
2252 monotonic_clock::time_point(chrono::nanoseconds(
2253 log_header->message()
2254 .oldest_remote_unreliable_monotonic_timestamps()
2255 ->Get(1)));
2256 const monotonic_clock::time_point
2257 oldest_local_unreliable_monotonic_timestamps =
2258 monotonic_clock::time_point(chrono::nanoseconds(
2259 log_header->message()
2260 .oldest_local_unreliable_monotonic_timestamps()
2261 ->Get(1)));
2262 const monotonic_clock::time_point
2263 oldest_remote_reliable_monotonic_timestamps =
2264 monotonic_clock::time_point(chrono::nanoseconds(
2265 log_header->message()
2266 .oldest_remote_reliable_monotonic_timestamps()
2267 ->Get(1)));
2268 const monotonic_clock::time_point
2269 oldest_local_reliable_monotonic_timestamps =
2270 monotonic_clock::time_point(chrono::nanoseconds(
2271 log_header->message()
2272 .oldest_local_reliable_monotonic_timestamps()
2273 ->Get(1)));
2274 const monotonic_clock::time_point
2275 oldest_logger_remote_unreliable_monotonic_timestamps =
2276 monotonic_clock::time_point(chrono::nanoseconds(
2277 log_header->message()
2278 .oldest_logger_remote_unreliable_monotonic_timestamps()
2279 ->Get(0)));
2280 const monotonic_clock::time_point
2281 oldest_logger_local_unreliable_monotonic_timestamps =
2282 monotonic_clock::time_point(chrono::nanoseconds(
2283 log_header->message()
2284 .oldest_logger_local_unreliable_monotonic_timestamps()
2285 ->Get(0)));
2286 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2287 monotonic_clock::max_time);
2288 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2289 monotonic_clock::max_time);
2290 switch (log_header->message().parts_index()) {
2291 case 0:
2292 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2293 monotonic_clock::max_time);
2294 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2295 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2296 monotonic_clock::max_time);
2297 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2298 monotonic_clock::max_time);
2299 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2300 monotonic_clock::max_time);
2301 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2302 monotonic_clock::max_time);
2303 break;
2304 case 1:
2305 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2306 monotonic_clock::time_point(chrono::microseconds(90200)));
2307 EXPECT_EQ(oldest_local_monotonic_timestamps,
2308 monotonic_clock::time_point(chrono::microseconds(90350)));
2309 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2310 monotonic_clock::time_point(chrono::microseconds(90200)));
2311 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2312 monotonic_clock::time_point(chrono::microseconds(90350)));
2313 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2314 monotonic_clock::max_time);
2315 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2316 monotonic_clock::max_time);
2317 break;
2318 case 2:
2319 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2320 monotonic_clock::time_point(chrono::microseconds(90200)))
2321 << file;
2322 EXPECT_EQ(oldest_local_monotonic_timestamps,
2323 monotonic_clock::time_point(chrono::microseconds(90350)))
2324 << file;
2325 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2326 monotonic_clock::time_point(chrono::microseconds(90200)))
2327 << file;
2328 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2329 monotonic_clock::time_point(chrono::microseconds(90350)))
2330 << file;
2331 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2332 monotonic_clock::time_point(chrono::microseconds(100000)))
2333 << file;
2334 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2335 monotonic_clock::time_point(chrono::microseconds(100150)))
2336 << file;
2337 break;
2338 case 3:
2339 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2340 monotonic_clock::time_point(chrono::milliseconds(1323) +
2341 chrono::microseconds(200)));
2342 EXPECT_EQ(oldest_local_monotonic_timestamps,
2343 monotonic_clock::time_point(chrono::microseconds(10100350)));
2344 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2345 monotonic_clock::time_point(chrono::milliseconds(1323) +
2346 chrono::microseconds(200)));
2347 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2348 monotonic_clock::time_point(chrono::microseconds(10100350)));
2349 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2350 monotonic_clock::max_time)
2351 << file;
2352 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2353 monotonic_clock::max_time)
2354 << file;
2355 break;
2356 case 4:
2357 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2358 monotonic_clock::time_point(chrono::milliseconds(1323) +
2359 chrono::microseconds(200)));
2360 EXPECT_EQ(oldest_local_monotonic_timestamps,
2361 monotonic_clock::time_point(chrono::microseconds(10100350)));
2362 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2363 monotonic_clock::time_point(chrono::milliseconds(1323) +
2364 chrono::microseconds(200)));
2365 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2366 monotonic_clock::time_point(chrono::microseconds(10100350)));
2367 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2368 monotonic_clock::time_point(chrono::microseconds(1423000)))
2369 << file;
2370 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2371 monotonic_clock::time_point(chrono::microseconds(10200150)))
2372 << file;
2373 break;
2374 default:
2375 FAIL();
2376 break;
2377 }
2378 }
2379
2380 // Confirm that we refuse to replay logs with missing boot uuids.
2381 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002382 auto sorted_parts = SortParts(pi1_reboot_logfiles_);
2383 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2384 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002385
2386 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2387 log_reader_factory.set_send_delay(chrono::microseconds(0));
2388
2389 // This sends out the fetched messages and advances time to the start of
2390 // the log file.
2391 reader.Register(&log_reader_factory);
2392
2393 log_reader_factory.Run();
2394
2395 reader.Deregister();
2396 }
2397}
2398
2399// Tests that we can sort a log which only has timestamps from the remote
2400// because the local message_bridge_client failed to connect.
2401TEST_P(MultinodeLoggerTest, RemoteRebootOnlyTimestamps) {
2402 const UUID pi1_boot0 = UUID::Random();
2403 const UUID pi2_boot0 = UUID::Random();
2404 const UUID pi2_boot1 = UUID::Random();
2405 {
2406 CHECK_EQ(pi1_index_, 0u);
2407 CHECK_EQ(pi2_index_, 1u);
2408
2409 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2410 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2411 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2412
2413 time_converter_.AddNextTimestamp(
2414 distributed_clock::epoch(),
2415 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2416 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2417 time_converter_.AddNextTimestamp(
2418 distributed_clock::epoch() + reboot_time,
2419 {BootTimestamp::epoch() + reboot_time,
2420 BootTimestamp{
2421 .boot = 1,
2422 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2423 }
2424 pi2_->Disconnect(pi1_->node());
2425
2426 std::vector<std::string> filenames;
2427 {
2428 LoggerState pi1_logger = MakeLogger(pi1_);
2429
2430 event_loop_factory_.RunFor(chrono::milliseconds(95));
2431 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2432 pi1_boot0);
2433 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2434 pi2_boot0);
2435
2436 StartLogger(&pi1_logger);
2437
2438 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2439
2440 VLOG(1) << "Reboot now!";
2441
2442 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2443 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2444 pi1_boot0);
2445 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2446 pi2_boot1);
2447 pi1_logger.AppendAllFilenames(&filenames);
2448 }
2449
2450 std::sort(filenames.begin(), filenames.end());
2451
2452 // Confirm that our new oldest timestamps properly update as we reboot and
2453 // rotate.
2454 size_t timestamp_file_count = 0;
2455 for (const std::string &file : filenames) {
2456 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2457 ReadHeader(file);
2458 CHECK(log_header);
2459
2460 if (log_header->message().has_configuration()) {
2461 continue;
2462 }
2463
2464 const monotonic_clock::time_point monotonic_start_time =
2465 monotonic_clock::time_point(
2466 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2467 const UUID source_node_boot_uuid = UUID::FromString(
2468 log_header->message().source_node_boot_uuid()->string_view());
2469
2470 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2471 ASSERT_EQ(
2472 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2473 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2474 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2475 2u);
2476 ASSERT_TRUE(log_header->message()
2477 .has_oldest_remote_unreliable_monotonic_timestamps());
2478 ASSERT_EQ(log_header->message()
2479 .oldest_remote_unreliable_monotonic_timestamps()
2480 ->size(),
2481 2u);
2482 ASSERT_TRUE(log_header->message()
2483 .has_oldest_local_unreliable_monotonic_timestamps());
2484 ASSERT_EQ(log_header->message()
2485 .oldest_local_unreliable_monotonic_timestamps()
2486 ->size(),
2487 2u);
2488 ASSERT_TRUE(log_header->message()
2489 .has_oldest_remote_reliable_monotonic_timestamps());
2490 ASSERT_EQ(log_header->message()
2491 .oldest_remote_reliable_monotonic_timestamps()
2492 ->size(),
2493 2u);
2494 ASSERT_TRUE(
2495 log_header->message().has_oldest_local_reliable_monotonic_timestamps());
2496 ASSERT_EQ(log_header->message()
2497 .oldest_local_reliable_monotonic_timestamps()
2498 ->size(),
2499 2u);
2500
2501 ASSERT_TRUE(
2502 log_header->message()
2503 .has_oldest_logger_remote_unreliable_monotonic_timestamps());
2504 ASSERT_EQ(log_header->message()
2505 .oldest_logger_remote_unreliable_monotonic_timestamps()
2506 ->size(),
2507 2u);
2508 ASSERT_TRUE(log_header->message()
2509 .has_oldest_logger_local_unreliable_monotonic_timestamps());
2510 ASSERT_EQ(log_header->message()
2511 .oldest_logger_local_unreliable_monotonic_timestamps()
2512 ->size(),
2513 2u);
2514
2515 if (log_header->message().node()->name()->string_view() != "pi1") {
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002516 ASSERT_TRUE(file.find("source_pi1_timestamp_pi2") != std::string::npos);
Naman Guptaa63aa132023-03-22 20:06:34 -07002517
2518 const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
2519 ReadNthMessage(file, 0);
2520 CHECK(msg);
2521
2522 EXPECT_TRUE(msg->message().has_monotonic_sent_time());
2523 EXPECT_TRUE(msg->message().has_monotonic_remote_time());
2524
2525 const monotonic_clock::time_point
2526 expected_oldest_local_monotonic_timestamps(
2527 chrono::nanoseconds(msg->message().monotonic_sent_time()));
2528 const monotonic_clock::time_point
2529 expected_oldest_remote_monotonic_timestamps(
2530 chrono::nanoseconds(msg->message().monotonic_remote_time()));
2531 const monotonic_clock::time_point
2532 expected_oldest_timestamp_monotonic_timestamps(
2533 chrono::nanoseconds(msg->message().monotonic_timestamp_time()));
2534
2535 EXPECT_NE(expected_oldest_local_monotonic_timestamps,
2536 monotonic_clock::min_time);
2537 EXPECT_NE(expected_oldest_remote_monotonic_timestamps,
2538 monotonic_clock::min_time);
2539 EXPECT_NE(expected_oldest_timestamp_monotonic_timestamps,
2540 monotonic_clock::min_time);
2541
2542 ++timestamp_file_count;
2543 // Since the log file is from the perspective of the other node,
2544 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2545 monotonic_clock::time_point(chrono::nanoseconds(
2546 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2547 0)));
2548 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2549 monotonic_clock::time_point(chrono::nanoseconds(
2550 log_header->message().oldest_local_monotonic_timestamps()->Get(
2551 0)));
2552 const monotonic_clock::time_point
2553 oldest_remote_unreliable_monotonic_timestamps =
2554 monotonic_clock::time_point(chrono::nanoseconds(
2555 log_header->message()
2556 .oldest_remote_unreliable_monotonic_timestamps()
2557 ->Get(0)));
2558 const monotonic_clock::time_point
2559 oldest_local_unreliable_monotonic_timestamps =
2560 monotonic_clock::time_point(chrono::nanoseconds(
2561 log_header->message()
2562 .oldest_local_unreliable_monotonic_timestamps()
2563 ->Get(0)));
2564 const monotonic_clock::time_point
2565 oldest_remote_reliable_monotonic_timestamps =
2566 monotonic_clock::time_point(chrono::nanoseconds(
2567 log_header->message()
2568 .oldest_remote_reliable_monotonic_timestamps()
2569 ->Get(0)));
2570 const monotonic_clock::time_point
2571 oldest_local_reliable_monotonic_timestamps =
2572 monotonic_clock::time_point(chrono::nanoseconds(
2573 log_header->message()
2574 .oldest_local_reliable_monotonic_timestamps()
2575 ->Get(0)));
2576 const monotonic_clock::time_point
2577 oldest_logger_remote_unreliable_monotonic_timestamps =
2578 monotonic_clock::time_point(chrono::nanoseconds(
2579 log_header->message()
2580 .oldest_logger_remote_unreliable_monotonic_timestamps()
2581 ->Get(1)));
2582 const monotonic_clock::time_point
2583 oldest_logger_local_unreliable_monotonic_timestamps =
2584 monotonic_clock::time_point(chrono::nanoseconds(
2585 log_header->message()
2586 .oldest_logger_local_unreliable_monotonic_timestamps()
2587 ->Get(1)));
2588
2589 const Channel *channel =
2590 event_loop_factory_.configuration()->channels()->Get(
2591 msg->message().channel_index());
2592 const Connection *connection = configuration::ConnectionToNode(
2593 channel, configuration::GetNode(
2594 event_loop_factory_.configuration(),
2595 log_header->message().node()->name()->string_view()));
2596
2597 const bool reliable = connection->time_to_live() == 0;
2598
2599 SCOPED_TRACE(file);
2600 SCOPED_TRACE(aos::FlatbufferToJson(
2601 *log_header, {.multi_line = true, .max_vector_size = 100}));
2602
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002603 // Confirm that the oldest timestamps match what we expect. Based on
2604 // what we are doing, we know that the oldest time is the first
2605 // message's time.
2606 //
2607 // This makes the test robust to both the split and combined config
2608 // tests.
2609 switch (log_header->message().parts_index()) {
2610 case 0:
2611 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2612 expected_oldest_remote_monotonic_timestamps);
2613 EXPECT_EQ(oldest_local_monotonic_timestamps,
2614 expected_oldest_local_monotonic_timestamps);
2615 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2616 expected_oldest_local_monotonic_timestamps)
2617 << file;
2618 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2619 expected_oldest_timestamp_monotonic_timestamps)
2620 << file;
2621
2622 if (reliable) {
2623 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
Naman Guptaa63aa132023-03-22 20:06:34 -07002624 expected_oldest_remote_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002625 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
Naman Guptaa63aa132023-03-22 20:06:34 -07002626 expected_oldest_local_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002627 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2628 monotonic_clock::max_time);
2629 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2630 monotonic_clock::max_time);
2631 } else {
2632 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2633 monotonic_clock::max_time);
2634 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2635 monotonic_clock::max_time);
Naman Guptaa63aa132023-03-22 20:06:34 -07002636 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2637 expected_oldest_remote_monotonic_timestamps);
2638 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2639 expected_oldest_local_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002640 }
2641 break;
2642 case 1:
2643 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2644 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2645 EXPECT_EQ(oldest_local_monotonic_timestamps,
2646 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2647 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2648 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2649 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2650 monotonic_clock::epoch() + chrono::nanoseconds(90250000));
2651 if (reliable) {
2652 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2653 expected_oldest_remote_monotonic_timestamps);
2654 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2655 expected_oldest_local_monotonic_timestamps);
2656 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2657 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
2658 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2659 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
2660 } else {
2661 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2662 monotonic_clock::max_time);
2663 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2664 monotonic_clock::max_time);
2665 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2666 expected_oldest_remote_monotonic_timestamps);
2667 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2668 expected_oldest_local_monotonic_timestamps);
2669 }
2670 break;
2671 case 2:
2672 EXPECT_EQ(
2673 oldest_remote_monotonic_timestamps,
2674 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2675 EXPECT_EQ(oldest_local_monotonic_timestamps,
2676 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2677 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2678 expected_oldest_local_monotonic_timestamps)
2679 << file;
2680 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2681 expected_oldest_timestamp_monotonic_timestamps)
2682 << file;
2683 if (reliable) {
2684 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2685 expected_oldest_remote_monotonic_timestamps);
2686 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2687 expected_oldest_local_monotonic_timestamps);
2688 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2689 monotonic_clock::max_time);
2690 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2691 monotonic_clock::max_time);
2692 } else {
2693 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2694 monotonic_clock::max_time);
2695 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2696 monotonic_clock::max_time);
2697 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2698 expected_oldest_remote_monotonic_timestamps);
2699 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2700 expected_oldest_local_monotonic_timestamps);
2701 }
2702 break;
Naman Guptaa63aa132023-03-22 20:06:34 -07002703
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002704 case 3:
2705 EXPECT_EQ(
2706 oldest_remote_monotonic_timestamps,
2707 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
2708 EXPECT_EQ(oldest_local_monotonic_timestamps,
2709 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2710 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2711 expected_oldest_remote_monotonic_timestamps);
2712 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2713 expected_oldest_local_monotonic_timestamps);
2714 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2715 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
2716 EXPECT_EQ(
2717 oldest_logger_local_unreliable_monotonic_timestamps,
2718 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
2719 break;
2720 default:
2721 FAIL();
2722 break;
Naman Guptaa63aa132023-03-22 20:06:34 -07002723 }
2724
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002725 switch (log_header->message().parts_index()) {
2726 case 0:
2727 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2728 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2729 break;
2730 case 1:
2731 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2732 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2733 break;
2734 case 2:
2735 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2736 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2737 break;
2738 case 3:
2739 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2740 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2741 break;
2742 [[fallthrough]];
2743 default:
2744 FAIL();
2745 break;
2746 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002747 continue;
2748 }
2749 EXPECT_EQ(
2750 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2751 monotonic_clock::max_time.time_since_epoch().count());
2752 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2753 monotonic_clock::max_time.time_since_epoch().count());
2754 EXPECT_EQ(log_header->message()
2755 .oldest_remote_unreliable_monotonic_timestamps()
2756 ->Get(0),
2757 monotonic_clock::max_time.time_since_epoch().count());
2758 EXPECT_EQ(log_header->message()
2759 .oldest_local_unreliable_monotonic_timestamps()
2760 ->Get(0),
2761 monotonic_clock::max_time.time_since_epoch().count());
2762
2763 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2764 monotonic_clock::time_point(chrono::nanoseconds(
2765 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2766 1)));
2767 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2768 monotonic_clock::time_point(chrono::nanoseconds(
2769 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2770 const monotonic_clock::time_point
2771 oldest_remote_unreliable_monotonic_timestamps =
2772 monotonic_clock::time_point(chrono::nanoseconds(
2773 log_header->message()
2774 .oldest_remote_unreliable_monotonic_timestamps()
2775 ->Get(1)));
2776 const monotonic_clock::time_point
2777 oldest_local_unreliable_monotonic_timestamps =
2778 monotonic_clock::time_point(chrono::nanoseconds(
2779 log_header->message()
2780 .oldest_local_unreliable_monotonic_timestamps()
2781 ->Get(1)));
2782 switch (log_header->message().parts_index()) {
2783 case 0:
2784 EXPECT_EQ(oldest_remote_monotonic_timestamps,
2785 monotonic_clock::max_time);
2786 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
2787 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2788 monotonic_clock::max_time);
2789 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2790 monotonic_clock::max_time);
2791 break;
2792 default:
2793 FAIL();
2794 break;
2795 }
2796 }
2797
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002798 EXPECT_EQ(timestamp_file_count, 4u);
Naman Guptaa63aa132023-03-22 20:06:34 -07002799
2800 // Confirm that we can actually sort the resulting log and read it.
2801 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002802 auto sorted_parts = SortParts(filenames);
2803 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2804 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002805
2806 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2807 log_reader_factory.set_send_delay(chrono::microseconds(0));
2808
2809 // This sends out the fetched messages and advances time to the start of
2810 // the log file.
2811 reader.Register(&log_reader_factory);
2812
2813 log_reader_factory.Run();
2814
2815 reader.Deregister();
2816 }
2817}
2818
2819// Tests that we properly handle one direction of message_bridge being
2820// unavailable.
2821TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
2822 pi1_->Disconnect(pi2_->node());
2823 time_converter_.AddMonotonic(
2824 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2825
2826 time_converter_.AddMonotonic(
2827 {chrono::milliseconds(10000),
2828 chrono::milliseconds(10000) - chrono::milliseconds(1)});
2829 {
2830 LoggerState pi1_logger = MakeLogger(pi1_);
2831
2832 event_loop_factory_.RunFor(chrono::milliseconds(95));
2833
2834 StartLogger(&pi1_logger);
2835
2836 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2837 }
2838
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002839 // Confirm that we can parse the result. LogReader has enough internal
2840 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07002841 ConfirmReadable(pi1_single_direction_logfiles_);
2842}
2843
2844// Tests that we properly handle one direction of message_bridge being
2845// unavailable.
2846TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
2847 pi1_->Disconnect(pi2_->node());
2848 time_converter_.AddMonotonic(
2849 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(500)});
2850
2851 time_converter_.AddMonotonic(
2852 {chrono::milliseconds(10000),
2853 chrono::milliseconds(10000) + chrono::milliseconds(1)});
2854 {
2855 LoggerState pi1_logger = MakeLogger(pi1_);
2856
2857 event_loop_factory_.RunFor(chrono::milliseconds(95));
2858
2859 StartLogger(&pi1_logger);
2860
2861 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2862 }
2863
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002864 // Confirm that we can parse the result. LogReader has enough internal
2865 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07002866 ConfirmReadable(pi1_single_direction_logfiles_);
2867}
2868
2869// Tests that we explode if someone passes in a part file twice with a better
2870// error than an out of order error.
2871TEST_P(MultinodeLoggerTest, DuplicateLogFiles) {
2872 time_converter_.AddMonotonic(
2873 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2874 {
2875 LoggerState pi1_logger = MakeLogger(pi1_);
2876
2877 event_loop_factory_.RunFor(chrono::milliseconds(95));
2878
2879 StartLogger(&pi1_logger);
2880
2881 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2882 }
2883
2884 std::vector<std::string> duplicates;
2885 for (const std::string &f : pi1_single_direction_logfiles_) {
2886 duplicates.emplace_back(f);
2887 duplicates.emplace_back(f);
2888 }
2889 EXPECT_DEATH({ SortParts(duplicates); }, "Found duplicate parts in");
2890}
2891
2892// Tests that we explode if someone loses a part out of the middle of a log.
2893TEST_P(MultinodeLoggerTest, MissingPartsFromMiddle) {
2894 time_converter_.AddMonotonic(
2895 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2896 {
2897 LoggerState pi1_logger = MakeLogger(pi1_);
2898
2899 event_loop_factory_.RunFor(chrono::milliseconds(95));
2900
2901 StartLogger(&pi1_logger);
2902 aos::monotonic_clock::time_point last_rotation_time =
2903 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07002904 pi1_logger.logger->set_on_logged_period(
2905 [&](aos::monotonic_clock::time_point) {
2906 const auto now = pi1_logger.event_loop->monotonic_now();
2907 if (now > last_rotation_time + std::chrono::seconds(5)) {
2908 pi1_logger.logger->Rotate();
2909 last_rotation_time = now;
2910 }
2911 });
Naman Guptaa63aa132023-03-22 20:06:34 -07002912
2913 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2914 }
2915
2916 std::vector<std::string> missing_parts;
2917
2918 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
2919 missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part2" + Extension());
2920 missing_parts.emplace_back(absl::StrCat(
2921 logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
2922
2923 EXPECT_DEATH({ SortParts(missing_parts); },
2924 "Broken log, missing part files between");
2925}
2926
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002927// Tests that we properly handle a dead node. Do this by just disconnecting
2928// it and only using one nodes of logs.
Naman Guptaa63aa132023-03-22 20:06:34 -07002929TEST_P(MultinodeLoggerTest, DeadNode) {
2930 pi1_->Disconnect(pi2_->node());
2931 pi2_->Disconnect(pi1_->node());
2932 time_converter_.AddMonotonic(
2933 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2934 {
2935 LoggerState pi1_logger = MakeLogger(pi1_);
2936
2937 event_loop_factory_.RunFor(chrono::milliseconds(95));
2938
2939 StartLogger(&pi1_logger);
2940
2941 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2942 }
2943
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002944 // Confirm that we can parse the result. LogReader has enough internal
2945 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07002946 ConfirmReadable(MakePi1DeadNodeLogfiles());
2947}
2948
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07002949// Tests that we can relog with a different config. This makes most sense
2950// when you are trying to edit a log and want to use channel renaming + the
2951// original config in the new log.
Naman Guptaa63aa132023-03-22 20:06:34 -07002952TEST_P(MultinodeLoggerTest, LogDifferentConfig) {
2953 time_converter_.StartEqual();
2954 {
2955 LoggerState pi1_logger = MakeLogger(pi1_);
2956 LoggerState pi2_logger = MakeLogger(pi2_);
2957
2958 event_loop_factory_.RunFor(chrono::milliseconds(95));
2959
2960 StartLogger(&pi1_logger);
2961 StartLogger(&pi2_logger);
2962
2963 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2964 }
2965
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002966 auto sorted_parts = SortParts(logfiles_);
2967 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2968 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002969 reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
2970
2971 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2972 log_reader_factory.set_send_delay(chrono::microseconds(0));
2973
2974 // This sends out the fetched messages and advances time to the start of the
2975 // log file.
2976 reader.Register(&log_reader_factory);
2977
2978 const Node *pi1 =
2979 configuration::GetNode(log_reader_factory.configuration(), "pi1");
2980 const Node *pi2 =
2981 configuration::GetNode(log_reader_factory.configuration(), "pi2");
2982
2983 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
2984 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
2985 LOG(INFO) << "now pi1 "
2986 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
2987 LOG(INFO) << "now pi2 "
2988 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
2989
2990 EXPECT_THAT(reader.LoggedNodes(),
2991 ::testing::ElementsAre(
2992 configuration::GetNode(reader.logged_configuration(), pi1),
2993 configuration::GetNode(reader.logged_configuration(), pi2)));
2994
2995 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
2996
2997 // And confirm we can re-create a log again, while checking the contents.
2998 std::vector<std::string> log_files;
2999 {
3000 LoggerState pi1_logger =
3001 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
3002 &log_reader_factory, reader.logged_configuration());
3003 LoggerState pi2_logger =
3004 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
3005 &log_reader_factory, reader.logged_configuration());
3006
3007 pi1_logger.StartLogger(tmp_dir_ + "/relogged1");
3008 pi2_logger.StartLogger(tmp_dir_ + "/relogged2");
3009
3010 log_reader_factory.Run();
3011
3012 for (auto &x : pi1_logger.log_namer->all_filenames()) {
3013 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged1_", x));
3014 }
3015 for (auto &x : pi2_logger.log_namer->all_filenames()) {
3016 log_files.emplace_back(absl::StrCat(tmp_dir_, "/relogged2_", x));
3017 }
3018 }
3019
3020 reader.Deregister();
3021
3022 // And verify that we can run the LogReader over the relogged files without
3023 // hitting any fatal errors.
3024 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003025 auto sorted_parts = SortParts(log_files);
3026 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3027 LogReader relogged_reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07003028 relogged_reader.Register();
3029
3030 relogged_reader.event_loop_factory()->Run();
3031 }
3032}
3033
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003034// Tests that we properly replay a log where the start time for a node is
3035// before any data on the node. This can happen if the logger starts before
3036// data is published. While the scenario below is a bit convoluted, we have
3037// seen logs like this generated out in the wild.
Naman Guptaa63aa132023-03-22 20:06:34 -07003038TEST(MultinodeRebootLoggerTest, StartTimeBeforeData) {
3039 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3040 aos::configuration::ReadConfig(ArtifactPath(
3041 "aos/events/logging/multinode_pingpong_split3_config.json"));
3042 message_bridge::TestingTimeConverter time_converter(
3043 configuration::NodesCount(&config.message()));
3044 SimulatedEventLoopFactory event_loop_factory(&config.message());
3045 event_loop_factory.SetTimeConverter(&time_converter);
3046 NodeEventLoopFactory *const pi1 =
3047 event_loop_factory.GetNodeEventLoopFactory("pi1");
3048 const size_t pi1_index = configuration::GetNodeIndex(
3049 event_loop_factory.configuration(), pi1->node());
3050 NodeEventLoopFactory *const pi2 =
3051 event_loop_factory.GetNodeEventLoopFactory("pi2");
3052 const size_t pi2_index = configuration::GetNodeIndex(
3053 event_loop_factory.configuration(), pi2->node());
3054 NodeEventLoopFactory *const pi3 =
3055 event_loop_factory.GetNodeEventLoopFactory("pi3");
3056 const size_t pi3_index = configuration::GetNodeIndex(
3057 event_loop_factory.configuration(), pi3->node());
3058
3059 const std::string kLogfile1_1 =
3060 aos::testing::TestTmpDir() + "/multi_logfile1/";
3061 const std::string kLogfile2_1 =
3062 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3063 const std::string kLogfile2_2 =
3064 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3065 const std::string kLogfile3_1 =
3066 aos::testing::TestTmpDir() + "/multi_logfile3/";
3067 util::UnlinkRecursive(kLogfile1_1);
3068 util::UnlinkRecursive(kLogfile2_1);
3069 util::UnlinkRecursive(kLogfile2_2);
3070 util::UnlinkRecursive(kLogfile3_1);
3071 const UUID pi1_boot0 = UUID::Random();
3072 const UUID pi2_boot0 = UUID::Random();
3073 const UUID pi2_boot1 = UUID::Random();
3074 const UUID pi3_boot0 = UUID::Random();
3075 {
3076 CHECK_EQ(pi1_index, 0u);
3077 CHECK_EQ(pi2_index, 1u);
3078 CHECK_EQ(pi3_index, 2u);
3079
3080 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3081 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3082 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3083 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3084
3085 time_converter.AddNextTimestamp(
3086 distributed_clock::epoch(),
3087 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3088 BootTimestamp::epoch()});
3089 const chrono::nanoseconds reboot_time = chrono::milliseconds(20000);
3090 time_converter.AddNextTimestamp(
3091 distributed_clock::epoch() + reboot_time,
3092 {BootTimestamp::epoch() + reboot_time,
3093 BootTimestamp{
3094 .boot = 1,
3095 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)},
3096 BootTimestamp::epoch() + reboot_time});
3097 }
3098
3099 // Make everything perfectly quiet.
3100 event_loop_factory.SkipTimingReport();
3101 event_loop_factory.DisableStatistics();
3102
3103 std::vector<std::string> filenames;
3104 {
3105 LoggerState pi1_logger = MakeLoggerState(
3106 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3107 LoggerState pi3_logger = MakeLoggerState(
3108 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3109 {
3110 // And now start the logger.
3111 LoggerState pi2_logger = MakeLoggerState(
3112 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3113
3114 event_loop_factory.RunFor(chrono::milliseconds(1000));
3115
3116 pi1_logger.StartLogger(kLogfile1_1);
3117 pi3_logger.StartLogger(kLogfile3_1);
3118 pi2_logger.StartLogger(kLogfile2_1);
3119
3120 event_loop_factory.RunFor(chrono::milliseconds(10000));
3121
3122 // Now that we've got a start time in the past, turn on data.
3123 event_loop_factory.EnableStatistics();
3124 std::unique_ptr<aos::EventLoop> ping_event_loop =
3125 pi1->MakeEventLoop("ping");
3126 Ping ping(ping_event_loop.get());
3127
3128 pi2->AlwaysStart<Pong>("pong");
3129
3130 event_loop_factory.RunFor(chrono::milliseconds(3000));
3131
3132 pi2_logger.AppendAllFilenames(&filenames);
3133
3134 // Stop logging on pi2 before rebooting and completely shut off all
3135 // messages on pi2.
3136 pi2->DisableStatistics();
3137 pi1->Disconnect(pi2->node());
3138 pi2->Disconnect(pi1->node());
3139 }
3140 event_loop_factory.RunFor(chrono::milliseconds(7000));
3141 // pi2 now reboots.
3142 {
3143 event_loop_factory.RunFor(chrono::milliseconds(1000));
3144
3145 // Start logging again on pi2 after it is up.
3146 LoggerState pi2_logger = MakeLoggerState(
3147 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3148 pi2_logger.StartLogger(kLogfile2_2);
3149
3150 event_loop_factory.RunFor(chrono::milliseconds(10000));
3151 // And, now that we have a start time in the log, turn data back on.
3152 pi2->EnableStatistics();
3153 pi1->Connect(pi2->node());
3154 pi2->Connect(pi1->node());
3155
3156 pi2->AlwaysStart<Pong>("pong");
3157 std::unique_ptr<aos::EventLoop> ping_event_loop =
3158 pi1->MakeEventLoop("ping");
3159 Ping ping(ping_event_loop.get());
3160
3161 event_loop_factory.RunFor(chrono::milliseconds(3000));
3162
3163 pi2_logger.AppendAllFilenames(&filenames);
3164 }
3165
3166 pi1_logger.AppendAllFilenames(&filenames);
3167 pi3_logger.AppendAllFilenames(&filenames);
3168 }
3169
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003170 // Confirm that we can parse the result. LogReader has enough internal
3171 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003172 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003173 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003174 auto result = ConfirmReadable(filenames);
3175 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
3176 chrono::seconds(1)));
3177 EXPECT_THAT(result[0].second,
3178 ::testing::ElementsAre(realtime_clock::epoch() +
3179 chrono::microseconds(34990350)));
3180
3181 EXPECT_THAT(result[1].first,
3182 ::testing::ElementsAre(
3183 realtime_clock::epoch() + chrono::seconds(1),
3184 realtime_clock::epoch() + chrono::microseconds(3323000)));
3185 EXPECT_THAT(result[1].second,
3186 ::testing::ElementsAre(
3187 realtime_clock::epoch() + chrono::microseconds(13990200),
3188 realtime_clock::epoch() + chrono::microseconds(16313200)));
3189
3190 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
3191 chrono::seconds(1)));
3192 EXPECT_THAT(result[2].second,
3193 ::testing::ElementsAre(realtime_clock::epoch() +
3194 chrono::microseconds(34900150)));
3195}
3196
3197// Tests that local data before remote data after reboot is properly replayed.
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003198// We only trigger a reboot in the timestamp interpolation function when
3199// solving the timestamp problem when we actually have a point in the
3200// function. This originally only happened when a point passes the noncausal
3201// filter. At the start of time for the second boot, if we aren't careful, we
3202// will have messages which need to be published at times before the boot.
3203// This happens when a local message is in the log before a forwarded message,
3204// so there is no point in the interpolation function. This delays the
3205// reboot. So, we need to recreate that situation and make sure it doesn't
3206// come back.
Naman Guptaa63aa132023-03-22 20:06:34 -07003207TEST(MultinodeRebootLoggerTest,
3208 LocalMessageBeforeRemoteBeforeStartAfterReboot) {
3209 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3210 aos::configuration::ReadConfig(ArtifactPath(
3211 "aos/events/logging/multinode_pingpong_split3_config.json"));
3212 message_bridge::TestingTimeConverter time_converter(
3213 configuration::NodesCount(&config.message()));
3214 SimulatedEventLoopFactory event_loop_factory(&config.message());
3215 event_loop_factory.SetTimeConverter(&time_converter);
3216 NodeEventLoopFactory *const pi1 =
3217 event_loop_factory.GetNodeEventLoopFactory("pi1");
3218 const size_t pi1_index = configuration::GetNodeIndex(
3219 event_loop_factory.configuration(), pi1->node());
3220 NodeEventLoopFactory *const pi2 =
3221 event_loop_factory.GetNodeEventLoopFactory("pi2");
3222 const size_t pi2_index = configuration::GetNodeIndex(
3223 event_loop_factory.configuration(), pi2->node());
3224 NodeEventLoopFactory *const pi3 =
3225 event_loop_factory.GetNodeEventLoopFactory("pi3");
3226 const size_t pi3_index = configuration::GetNodeIndex(
3227 event_loop_factory.configuration(), pi3->node());
3228
3229 const std::string kLogfile1_1 =
3230 aos::testing::TestTmpDir() + "/multi_logfile1/";
3231 const std::string kLogfile2_1 =
3232 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3233 const std::string kLogfile2_2 =
3234 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3235 const std::string kLogfile3_1 =
3236 aos::testing::TestTmpDir() + "/multi_logfile3/";
3237 util::UnlinkRecursive(kLogfile1_1);
3238 util::UnlinkRecursive(kLogfile2_1);
3239 util::UnlinkRecursive(kLogfile2_2);
3240 util::UnlinkRecursive(kLogfile3_1);
3241 const UUID pi1_boot0 = UUID::Random();
3242 const UUID pi2_boot0 = UUID::Random();
3243 const UUID pi2_boot1 = UUID::Random();
3244 const UUID pi3_boot0 = UUID::Random();
3245 {
3246 CHECK_EQ(pi1_index, 0u);
3247 CHECK_EQ(pi2_index, 1u);
3248 CHECK_EQ(pi3_index, 2u);
3249
3250 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3251 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3252 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3253 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3254
3255 time_converter.AddNextTimestamp(
3256 distributed_clock::epoch(),
3257 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3258 BootTimestamp::epoch()});
3259 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3260 time_converter.AddNextTimestamp(
3261 distributed_clock::epoch() + reboot_time,
3262 {BootTimestamp::epoch() + reboot_time,
3263 BootTimestamp{.boot = 1,
3264 .time = monotonic_clock::epoch() + reboot_time +
3265 chrono::seconds(100)},
3266 BootTimestamp::epoch() + reboot_time});
3267 }
3268
3269 std::vector<std::string> filenames;
3270 {
3271 LoggerState pi1_logger = MakeLoggerState(
3272 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3273 LoggerState pi3_logger = MakeLoggerState(
3274 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3275 {
3276 // And now start the logger.
3277 LoggerState pi2_logger = MakeLoggerState(
3278 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3279
3280 pi1_logger.StartLogger(kLogfile1_1);
3281 pi3_logger.StartLogger(kLogfile3_1);
3282 pi2_logger.StartLogger(kLogfile2_1);
3283
3284 event_loop_factory.RunFor(chrono::milliseconds(1005));
3285
3286 // Now that we've got a start time in the past, turn on data.
3287 std::unique_ptr<aos::EventLoop> ping_event_loop =
3288 pi1->MakeEventLoop("ping");
3289 Ping ping(ping_event_loop.get());
3290
3291 pi2->AlwaysStart<Pong>("pong");
3292
3293 event_loop_factory.RunFor(chrono::milliseconds(3000));
3294
3295 pi2_logger.AppendAllFilenames(&filenames);
3296
3297 // Disable any remote messages on pi2.
3298 pi1->Disconnect(pi2->node());
3299 pi2->Disconnect(pi1->node());
3300 }
3301 event_loop_factory.RunFor(chrono::milliseconds(995));
3302 // pi2 now reboots at 5 seconds.
3303 {
3304 event_loop_factory.RunFor(chrono::milliseconds(1000));
3305
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003306 // Make local stuff happen before we start logging and connect the
3307 // remote.
Naman Guptaa63aa132023-03-22 20:06:34 -07003308 pi2->AlwaysStart<Pong>("pong");
3309 std::unique_ptr<aos::EventLoop> ping_event_loop =
3310 pi1->MakeEventLoop("ping");
3311 Ping ping(ping_event_loop.get());
3312 event_loop_factory.RunFor(chrono::milliseconds(1005));
3313
3314 // Start logging again on pi2 after it is up.
3315 LoggerState pi2_logger = MakeLoggerState(
3316 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3317 pi2_logger.StartLogger(kLogfile2_2);
3318
3319 // And allow remote messages now that we have some local ones.
3320 pi1->Connect(pi2->node());
3321 pi2->Connect(pi1->node());
3322
3323 event_loop_factory.RunFor(chrono::milliseconds(1000));
3324
3325 event_loop_factory.RunFor(chrono::milliseconds(3000));
3326
3327 pi2_logger.AppendAllFilenames(&filenames);
3328 }
3329
3330 pi1_logger.AppendAllFilenames(&filenames);
3331 pi3_logger.AppendAllFilenames(&filenames);
3332 }
3333
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003334 // Confirm that we can parse the result. LogReader has enough internal
3335 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003336 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003337 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003338 auto result = ConfirmReadable(filenames);
3339
3340 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3341 EXPECT_THAT(result[0].second,
3342 ::testing::ElementsAre(realtime_clock::epoch() +
3343 chrono::microseconds(11000350)));
3344
3345 EXPECT_THAT(result[1].first,
3346 ::testing::ElementsAre(
3347 realtime_clock::epoch(),
3348 realtime_clock::epoch() + chrono::microseconds(107005000)));
3349 EXPECT_THAT(result[1].second,
3350 ::testing::ElementsAre(
3351 realtime_clock::epoch() + chrono::microseconds(4000150),
3352 realtime_clock::epoch() + chrono::microseconds(111000200)));
3353
3354 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3355 EXPECT_THAT(result[2].second,
3356 ::testing::ElementsAre(realtime_clock::epoch() +
3357 chrono::microseconds(11000150)));
3358
3359 auto start_stop_result = ConfirmReadable(
3360 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3361 realtime_clock::epoch() + chrono::milliseconds(3000));
3362
3363 EXPECT_THAT(
3364 start_stop_result[0].first,
3365 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3366 EXPECT_THAT(
3367 start_stop_result[0].second,
3368 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3369 EXPECT_THAT(
3370 start_stop_result[1].first,
3371 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3372 EXPECT_THAT(
3373 start_stop_result[1].second,
3374 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3375 EXPECT_THAT(
3376 start_stop_result[2].first,
3377 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3378 EXPECT_THAT(
3379 start_stop_result[2].second,
3380 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
3381}
3382
3383// Tests that setting the start and stop flags across a reboot works as
3384// expected.
3385TEST(MultinodeRebootLoggerTest, RebootStartStopTimes) {
3386 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3387 aos::configuration::ReadConfig(ArtifactPath(
3388 "aos/events/logging/multinode_pingpong_split3_config.json"));
3389 message_bridge::TestingTimeConverter time_converter(
3390 configuration::NodesCount(&config.message()));
3391 SimulatedEventLoopFactory event_loop_factory(&config.message());
3392 event_loop_factory.SetTimeConverter(&time_converter);
3393 NodeEventLoopFactory *const pi1 =
3394 event_loop_factory.GetNodeEventLoopFactory("pi1");
3395 const size_t pi1_index = configuration::GetNodeIndex(
3396 event_loop_factory.configuration(), pi1->node());
3397 NodeEventLoopFactory *const pi2 =
3398 event_loop_factory.GetNodeEventLoopFactory("pi2");
3399 const size_t pi2_index = configuration::GetNodeIndex(
3400 event_loop_factory.configuration(), pi2->node());
3401 NodeEventLoopFactory *const pi3 =
3402 event_loop_factory.GetNodeEventLoopFactory("pi3");
3403 const size_t pi3_index = configuration::GetNodeIndex(
3404 event_loop_factory.configuration(), pi3->node());
3405
3406 const std::string kLogfile1_1 =
3407 aos::testing::TestTmpDir() + "/multi_logfile1/";
3408 const std::string kLogfile2_1 =
3409 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3410 const std::string kLogfile2_2 =
3411 aos::testing::TestTmpDir() + "/multi_logfile2.2/";
3412 const std::string kLogfile3_1 =
3413 aos::testing::TestTmpDir() + "/multi_logfile3/";
3414 util::UnlinkRecursive(kLogfile1_1);
3415 util::UnlinkRecursive(kLogfile2_1);
3416 util::UnlinkRecursive(kLogfile2_2);
3417 util::UnlinkRecursive(kLogfile3_1);
3418 {
3419 CHECK_EQ(pi1_index, 0u);
3420 CHECK_EQ(pi2_index, 1u);
3421 CHECK_EQ(pi3_index, 2u);
3422
3423 time_converter.AddNextTimestamp(
3424 distributed_clock::epoch(),
3425 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3426 BootTimestamp::epoch()});
3427 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3428 time_converter.AddNextTimestamp(
3429 distributed_clock::epoch() + reboot_time,
3430 {BootTimestamp::epoch() + reboot_time,
3431 BootTimestamp{.boot = 1,
3432 .time = monotonic_clock::epoch() + reboot_time},
3433 BootTimestamp::epoch() + reboot_time});
3434 }
3435
3436 std::vector<std::string> filenames;
3437 {
3438 LoggerState pi1_logger = MakeLoggerState(
3439 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3440 LoggerState pi3_logger = MakeLoggerState(
3441 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3442 {
3443 // And now start the logger.
3444 LoggerState pi2_logger = MakeLoggerState(
3445 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3446
3447 pi1_logger.StartLogger(kLogfile1_1);
3448 pi3_logger.StartLogger(kLogfile3_1);
3449 pi2_logger.StartLogger(kLogfile2_1);
3450
3451 event_loop_factory.RunFor(chrono::milliseconds(1005));
3452
3453 // Now that we've got a start time in the past, turn on data.
3454 std::unique_ptr<aos::EventLoop> ping_event_loop =
3455 pi1->MakeEventLoop("ping");
3456 Ping ping(ping_event_loop.get());
3457
3458 pi2->AlwaysStart<Pong>("pong");
3459
3460 event_loop_factory.RunFor(chrono::milliseconds(3000));
3461
3462 pi2_logger.AppendAllFilenames(&filenames);
3463 }
3464 event_loop_factory.RunFor(chrono::milliseconds(995));
3465 // pi2 now reboots at 5 seconds.
3466 {
3467 event_loop_factory.RunFor(chrono::milliseconds(1000));
3468
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003469 // Make local stuff happen before we start logging and connect the
3470 // remote.
Naman Guptaa63aa132023-03-22 20:06:34 -07003471 pi2->AlwaysStart<Pong>("pong");
3472 std::unique_ptr<aos::EventLoop> ping_event_loop =
3473 pi1->MakeEventLoop("ping");
3474 Ping ping(ping_event_loop.get());
3475 event_loop_factory.RunFor(chrono::milliseconds(5));
3476
3477 // Start logging again on pi2 after it is up.
3478 LoggerState pi2_logger = MakeLoggerState(
3479 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3480 pi2_logger.StartLogger(kLogfile2_2);
3481
3482 event_loop_factory.RunFor(chrono::milliseconds(5000));
3483
3484 pi2_logger.AppendAllFilenames(&filenames);
3485 }
3486
3487 pi1_logger.AppendAllFilenames(&filenames);
3488 pi3_logger.AppendAllFilenames(&filenames);
3489 }
3490
3491 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003492 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003493 auto result = ConfirmReadable(filenames);
3494
3495 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3496 EXPECT_THAT(result[0].second,
3497 ::testing::ElementsAre(realtime_clock::epoch() +
3498 chrono::microseconds(11000350)));
3499
3500 EXPECT_THAT(result[1].first,
3501 ::testing::ElementsAre(
3502 realtime_clock::epoch(),
3503 realtime_clock::epoch() + chrono::microseconds(6005000)));
3504 EXPECT_THAT(result[1].second,
3505 ::testing::ElementsAre(
3506 realtime_clock::epoch() + chrono::microseconds(4900150),
3507 realtime_clock::epoch() + chrono::microseconds(11000200)));
3508
3509 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3510 EXPECT_THAT(result[2].second,
3511 ::testing::ElementsAre(realtime_clock::epoch() +
3512 chrono::microseconds(11000150)));
3513
3514 // Confirm we observed the correct start and stop times. We should see the
3515 // reboot here.
3516 auto start_stop_result = ConfirmReadable(
3517 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3518 realtime_clock::epoch() + chrono::milliseconds(8000));
3519
3520 EXPECT_THAT(
3521 start_stop_result[0].first,
3522 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3523 EXPECT_THAT(
3524 start_stop_result[0].second,
3525 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3526 EXPECT_THAT(start_stop_result[1].first,
3527 ::testing::ElementsAre(
3528 realtime_clock::epoch() + chrono::seconds(2),
3529 realtime_clock::epoch() + chrono::microseconds(6005000)));
3530 EXPECT_THAT(start_stop_result[1].second,
3531 ::testing::ElementsAre(
3532 realtime_clock::epoch() + chrono::microseconds(4900150),
3533 realtime_clock::epoch() + chrono::seconds(8)));
3534 EXPECT_THAT(
3535 start_stop_result[2].first,
3536 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3537 EXPECT_THAT(
3538 start_stop_result[2].second,
3539 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
3540}
3541
3542// Tests that we properly handle one direction being down.
3543TEST(MissingDirectionTest, OneDirection) {
3544 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3545 aos::configuration::ReadConfig(ArtifactPath(
3546 "aos/events/logging/multinode_pingpong_split4_config.json"));
3547 message_bridge::TestingTimeConverter time_converter(
3548 configuration::NodesCount(&config.message()));
3549 SimulatedEventLoopFactory event_loop_factory(&config.message());
3550 event_loop_factory.SetTimeConverter(&time_converter);
3551
3552 NodeEventLoopFactory *const pi1 =
3553 event_loop_factory.GetNodeEventLoopFactory("pi1");
3554 const size_t pi1_index = configuration::GetNodeIndex(
3555 event_loop_factory.configuration(), pi1->node());
3556 NodeEventLoopFactory *const pi2 =
3557 event_loop_factory.GetNodeEventLoopFactory("pi2");
3558 const size_t pi2_index = configuration::GetNodeIndex(
3559 event_loop_factory.configuration(), pi2->node());
3560 std::vector<std::string> filenames;
3561
3562 {
3563 CHECK_EQ(pi1_index, 0u);
3564 CHECK_EQ(pi2_index, 1u);
3565
3566 time_converter.AddNextTimestamp(
3567 distributed_clock::epoch(),
3568 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3569
3570 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3571 time_converter.AddNextTimestamp(
3572 distributed_clock::epoch() + reboot_time,
3573 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3574 BootTimestamp::epoch() + reboot_time});
3575 }
3576
3577 const std::string kLogfile2_1 =
3578 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3579 const std::string kLogfile1_1 =
3580 aos::testing::TestTmpDir() + "/multi_logfile1.1/";
3581 util::UnlinkRecursive(kLogfile2_1);
3582 util::UnlinkRecursive(kLogfile1_1);
3583
3584 pi2->Disconnect(pi1->node());
3585
3586 pi1->AlwaysStart<Ping>("ping");
3587 pi2->AlwaysStart<Pong>("pong");
3588
3589 {
3590 LoggerState pi2_logger = MakeLoggerState(
3591 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3592
3593 event_loop_factory.RunFor(chrono::milliseconds(95));
3594
3595 pi2_logger.StartLogger(kLogfile2_1);
3596
3597 event_loop_factory.RunFor(chrono::milliseconds(6000));
3598
3599 pi2->Connect(pi1->node());
3600
3601 LoggerState pi1_logger = MakeLoggerState(
3602 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3603 pi1_logger.StartLogger(kLogfile1_1);
3604
3605 event_loop_factory.RunFor(chrono::milliseconds(5000));
3606 pi1_logger.AppendAllFilenames(&filenames);
3607 pi2_logger.AppendAllFilenames(&filenames);
3608 }
3609
3610 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003611 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003612 ConfirmReadable(filenames);
3613}
3614
3615// Tests that we properly handle only one direction ever existing after a
3616// reboot.
3617TEST(MissingDirectionTest, OneDirectionAfterReboot) {
3618 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3619 aos::configuration::ReadConfig(ArtifactPath(
3620 "aos/events/logging/multinode_pingpong_split4_config.json"));
3621 message_bridge::TestingTimeConverter time_converter(
3622 configuration::NodesCount(&config.message()));
3623 SimulatedEventLoopFactory event_loop_factory(&config.message());
3624 event_loop_factory.SetTimeConverter(&time_converter);
3625
3626 NodeEventLoopFactory *const pi1 =
3627 event_loop_factory.GetNodeEventLoopFactory("pi1");
3628 const size_t pi1_index = configuration::GetNodeIndex(
3629 event_loop_factory.configuration(), pi1->node());
3630 NodeEventLoopFactory *const pi2 =
3631 event_loop_factory.GetNodeEventLoopFactory("pi2");
3632 const size_t pi2_index = configuration::GetNodeIndex(
3633 event_loop_factory.configuration(), pi2->node());
3634 std::vector<std::string> filenames;
3635
3636 {
3637 CHECK_EQ(pi1_index, 0u);
3638 CHECK_EQ(pi2_index, 1u);
3639
3640 time_converter.AddNextTimestamp(
3641 distributed_clock::epoch(),
3642 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3643
3644 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3645 time_converter.AddNextTimestamp(
3646 distributed_clock::epoch() + reboot_time,
3647 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3648 BootTimestamp::epoch() + reboot_time});
3649 }
3650
3651 const std::string kLogfile2_1 =
3652 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3653 util::UnlinkRecursive(kLogfile2_1);
3654
3655 pi1->AlwaysStart<Ping>("ping");
3656
3657 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3658 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3659 // second boot.
3660 {
3661 LoggerState pi2_logger = MakeLoggerState(
3662 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3663
3664 event_loop_factory.RunFor(chrono::milliseconds(95));
3665
3666 pi2_logger.StartLogger(kLogfile2_1);
3667
3668 event_loop_factory.RunFor(chrono::milliseconds(4000));
3669
3670 pi2->Disconnect(pi1->node());
3671
3672 event_loop_factory.RunFor(chrono::milliseconds(1000));
3673 pi1->AlwaysStart<Ping>("ping");
3674
3675 event_loop_factory.RunFor(chrono::milliseconds(5000));
3676 pi2_logger.AppendAllFilenames(&filenames);
3677 }
3678
3679 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003680 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003681 ConfirmReadable(filenames);
3682}
3683
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003684// Tests that we properly handle only one direction ever existing after a
3685// reboot with only reliable data.
Naman Guptaa63aa132023-03-22 20:06:34 -07003686TEST(MissingDirectionTest, OneDirectionAfterRebootReliable) {
3687 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003688 aos::configuration::ReadConfig(
3689 ArtifactPath("aos/events/logging/"
3690 "multinode_pingpong_split4_reliable_config.json"));
Naman Guptaa63aa132023-03-22 20:06:34 -07003691 message_bridge::TestingTimeConverter time_converter(
3692 configuration::NodesCount(&config.message()));
3693 SimulatedEventLoopFactory event_loop_factory(&config.message());
3694 event_loop_factory.SetTimeConverter(&time_converter);
3695
3696 NodeEventLoopFactory *const pi1 =
3697 event_loop_factory.GetNodeEventLoopFactory("pi1");
3698 const size_t pi1_index = configuration::GetNodeIndex(
3699 event_loop_factory.configuration(), pi1->node());
3700 NodeEventLoopFactory *const pi2 =
3701 event_loop_factory.GetNodeEventLoopFactory("pi2");
3702 const size_t pi2_index = configuration::GetNodeIndex(
3703 event_loop_factory.configuration(), pi2->node());
3704 std::vector<std::string> filenames;
3705
3706 {
3707 CHECK_EQ(pi1_index, 0u);
3708 CHECK_EQ(pi2_index, 1u);
3709
3710 time_converter.AddNextTimestamp(
3711 distributed_clock::epoch(),
3712 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3713
3714 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3715 time_converter.AddNextTimestamp(
3716 distributed_clock::epoch() + reboot_time,
3717 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3718 BootTimestamp::epoch() + reboot_time});
3719 }
3720
3721 const std::string kLogfile2_1 =
3722 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3723 util::UnlinkRecursive(kLogfile2_1);
3724
3725 pi1->AlwaysStart<Ping>("ping");
3726
3727 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
3728 // makes it such that we will only get timestamps from pi1 -> pi2 on the
3729 // second boot.
3730 {
3731 LoggerState pi2_logger = MakeLoggerState(
3732 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3733
3734 event_loop_factory.RunFor(chrono::milliseconds(95));
3735
3736 pi2_logger.StartLogger(kLogfile2_1);
3737
3738 event_loop_factory.RunFor(chrono::milliseconds(4000));
3739
3740 pi2->Disconnect(pi1->node());
3741
3742 event_loop_factory.RunFor(chrono::milliseconds(1000));
3743 pi1->AlwaysStart<Ping>("ping");
3744
3745 event_loop_factory.RunFor(chrono::milliseconds(5000));
3746 pi2_logger.AppendAllFilenames(&filenames);
3747 }
3748
3749 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003750 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003751 ConfirmReadable(filenames);
3752}
3753
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003754// Tests that we properly handle only one direction ever existing after a
3755// reboot with mixed unreliable vs reliable, where reliable has an earlier
3756// timestamp than unreliable.
Brian Smartte67d7112023-03-20 12:06:30 -07003757TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase1) {
3758 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3759 aos::configuration::ReadConfig(ArtifactPath(
3760 "aos/events/logging/multinode_pingpong_split4_mixed1_config.json"));
3761 message_bridge::TestingTimeConverter time_converter(
3762 configuration::NodesCount(&config.message()));
3763 SimulatedEventLoopFactory event_loop_factory(&config.message());
3764 event_loop_factory.SetTimeConverter(&time_converter);
3765
3766 NodeEventLoopFactory *const pi1 =
3767 event_loop_factory.GetNodeEventLoopFactory("pi1");
3768 const size_t pi1_index = configuration::GetNodeIndex(
3769 event_loop_factory.configuration(), pi1->node());
3770 NodeEventLoopFactory *const pi2 =
3771 event_loop_factory.GetNodeEventLoopFactory("pi2");
3772 const size_t pi2_index = configuration::GetNodeIndex(
3773 event_loop_factory.configuration(), pi2->node());
3774 std::vector<std::string> filenames;
3775
3776 {
3777 CHECK_EQ(pi1_index, 0u);
3778 CHECK_EQ(pi2_index, 1u);
3779
3780 time_converter.AddNextTimestamp(
3781 distributed_clock::epoch(),
3782 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3783
3784 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3785 time_converter.AddNextTimestamp(
3786 distributed_clock::epoch() + reboot_time,
3787 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3788 BootTimestamp::epoch() + reboot_time});
3789 }
3790
3791 const std::string kLogfile2_1 =
3792 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3793 util::UnlinkRecursive(kLogfile2_1);
3794
3795 // The following sequence using the above reference config creates
3796 // a reliable message timestamp < unreliable message timestamp.
3797 {
3798 pi1->DisableStatistics();
3799 pi2->DisableStatistics();
3800
3801 event_loop_factory.RunFor(chrono::milliseconds(95));
3802
3803 pi1->AlwaysStart<Ping>("ping");
3804
3805 event_loop_factory.RunFor(chrono::milliseconds(5250));
3806
3807 pi1->EnableStatistics();
3808
3809 event_loop_factory.RunFor(chrono::milliseconds(1000));
3810
3811 LoggerState pi2_logger = MakeLoggerState(
3812 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3813
3814 pi2_logger.StartLogger(kLogfile2_1);
3815
3816 event_loop_factory.RunFor(chrono::milliseconds(5000));
3817 pi2_logger.AppendAllFilenames(&filenames);
3818 }
3819
3820 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003821 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Brian Smartte67d7112023-03-20 12:06:30 -07003822 ConfirmReadable(filenames);
3823}
3824
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003825// Tests that we properly handle only one direction ever existing after a
3826// reboot with mixed unreliable vs reliable, where unreliable has an earlier
3827// timestamp than reliable.
Brian Smartte67d7112023-03-20 12:06:30 -07003828TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase2) {
3829 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3830 aos::configuration::ReadConfig(ArtifactPath(
3831 "aos/events/logging/multinode_pingpong_split4_mixed2_config.json"));
3832 message_bridge::TestingTimeConverter time_converter(
3833 configuration::NodesCount(&config.message()));
3834 SimulatedEventLoopFactory event_loop_factory(&config.message());
3835 event_loop_factory.SetTimeConverter(&time_converter);
3836
3837 NodeEventLoopFactory *const pi1 =
3838 event_loop_factory.GetNodeEventLoopFactory("pi1");
3839 const size_t pi1_index = configuration::GetNodeIndex(
3840 event_loop_factory.configuration(), pi1->node());
3841 NodeEventLoopFactory *const pi2 =
3842 event_loop_factory.GetNodeEventLoopFactory("pi2");
3843 const size_t pi2_index = configuration::GetNodeIndex(
3844 event_loop_factory.configuration(), pi2->node());
3845 std::vector<std::string> filenames;
3846
3847 {
3848 CHECK_EQ(pi1_index, 0u);
3849 CHECK_EQ(pi2_index, 1u);
3850
3851 time_converter.AddNextTimestamp(
3852 distributed_clock::epoch(),
3853 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3854
3855 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3856 time_converter.AddNextTimestamp(
3857 distributed_clock::epoch() + reboot_time,
3858 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
3859 BootTimestamp::epoch() + reboot_time});
3860 }
3861
3862 const std::string kLogfile2_1 =
3863 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3864 util::UnlinkRecursive(kLogfile2_1);
3865
3866 // The following sequence using the above reference config creates
3867 // an unreliable message timestamp < reliable message timestamp.
3868 {
3869 pi1->DisableStatistics();
3870 pi2->DisableStatistics();
3871
3872 event_loop_factory.RunFor(chrono::milliseconds(95));
3873
3874 pi1->AlwaysStart<Ping>("ping");
3875
3876 event_loop_factory.RunFor(chrono::milliseconds(5250));
3877
3878 pi1->EnableStatistics();
3879
3880 event_loop_factory.RunFor(chrono::milliseconds(1000));
3881
3882 LoggerState pi2_logger = MakeLoggerState(
3883 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
3884
3885 pi2_logger.StartLogger(kLogfile2_1);
3886
3887 event_loop_factory.RunFor(chrono::milliseconds(5000));
3888 pi2_logger.AppendAllFilenames(&filenames);
3889 }
3890
3891 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003892 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Brian Smartte67d7112023-03-20 12:06:30 -07003893 ConfirmReadable(filenames);
3894}
3895
Naman Guptaa63aa132023-03-22 20:06:34 -07003896// Tests that we properly handle what used to be a time violation in one
3897// direction. This can occur when one direction goes down after sending some
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003898// data, but the other keeps working. The down direction ends up resolving to
3899// a straight line in the noncausal filter, where the direction which is still
3900// up can cross that line. Really, time progressed along just fine but we
3901// assumed that the offset was a line when it could have deviated by up to
3902// 1ms/second.
Naman Guptaa63aa132023-03-22 20:06:34 -07003903TEST_P(MultinodeLoggerTest, OneDirectionTimeDrift) {
3904 std::vector<std::string> filenames;
3905
3906 CHECK_EQ(pi1_index_, 0u);
3907 CHECK_EQ(pi2_index_, 1u);
3908
3909 time_converter_.AddNextTimestamp(
3910 distributed_clock::epoch(),
3911 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3912
3913 const chrono::nanoseconds before_disconnect_duration =
3914 time_converter_.AddMonotonic(
3915 {chrono::milliseconds(1000), chrono::milliseconds(1000)});
3916
3917 const chrono::nanoseconds test_duration =
3918 time_converter_.AddMonotonic(
3919 {chrono::milliseconds(1000), chrono::milliseconds(1000)}) +
3920 time_converter_.AddMonotonic(
3921 {chrono::milliseconds(10000),
3922 chrono::milliseconds(10000) - chrono::milliseconds(5)}) +
3923 time_converter_.AddMonotonic(
3924 {chrono::milliseconds(10000),
3925 chrono::milliseconds(10000) + chrono::milliseconds(5)});
3926
3927 const std::string kLogfile =
3928 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3929 util::UnlinkRecursive(kLogfile);
3930
3931 {
3932 LoggerState pi2_logger = MakeLogger(pi2_);
3933 pi2_logger.StartLogger(kLogfile);
3934 event_loop_factory_.RunFor(before_disconnect_duration);
3935
3936 pi2_->Disconnect(pi1_->node());
3937
3938 event_loop_factory_.RunFor(test_duration);
3939 pi2_->Connect(pi1_->node());
3940
3941 event_loop_factory_.RunFor(chrono::milliseconds(5000));
3942 pi2_logger.AppendAllFilenames(&filenames);
3943 }
3944
3945 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003946 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003947 ConfirmReadable(filenames);
3948}
3949
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003950// Tests that we can replay a logfile that has timestamps such that at least
3951// one node's epoch is at a positive distributed_clock (and thus will have to
3952// be booted after the other node(s)).
Naman Guptaa63aa132023-03-22 20:06:34 -07003953TEST_P(MultinodeLoggerTest, StartOneNodeBeforeOther) {
3954 std::vector<std::string> filenames;
3955
3956 CHECK_EQ(pi1_index_, 0u);
3957 CHECK_EQ(pi2_index_, 1u);
3958
3959 time_converter_.AddNextTimestamp(
3960 distributed_clock::epoch(),
3961 {BootTimestamp::epoch(), BootTimestamp::epoch()});
3962
3963 const chrono::nanoseconds before_reboot_duration = chrono::milliseconds(1000);
3964 time_converter_.RebootAt(
3965 0, distributed_clock::time_point(before_reboot_duration));
3966
3967 const chrono::nanoseconds test_duration = time_converter_.AddMonotonic(
3968 {chrono::milliseconds(10000), chrono::milliseconds(10000)});
3969
3970 const std::string kLogfile =
3971 aos::testing::TestTmpDir() + "/multi_logfile2.1/";
3972 util::UnlinkRecursive(kLogfile);
3973
3974 pi2_->Disconnect(pi1_->node());
3975 pi1_->Disconnect(pi2_->node());
3976
3977 {
3978 LoggerState pi2_logger = MakeLogger(pi2_);
3979
3980 pi2_logger.StartLogger(kLogfile);
3981 event_loop_factory_.RunFor(before_reboot_duration);
3982
3983 pi2_->Connect(pi1_->node());
3984 pi1_->Connect(pi2_->node());
3985
3986 event_loop_factory_.RunFor(test_duration);
3987
3988 pi2_logger.AppendAllFilenames(&filenames);
3989 }
3990
3991 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003992 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003993 ConfirmReadable(filenames);
3994
3995 {
3996 LogReader reader(sorted_parts);
3997 SimulatedEventLoopFactory replay_factory(reader.configuration());
3998 reader.RegisterWithoutStarting(&replay_factory);
3999
4000 NodeEventLoopFactory *const replay_node =
4001 reader.event_loop_factory()->GetNodeEventLoopFactory("pi1");
4002
4003 std::unique_ptr<EventLoop> test_event_loop =
4004 replay_node->MakeEventLoop("test_reader");
4005 replay_node->OnStartup([replay_node]() {
4006 // Check that we didn't boot until at least t=0.
4007 CHECK_LE(monotonic_clock::epoch(), replay_node->monotonic_now());
4008 });
4009 test_event_loop->OnRun([&test_event_loop]() {
4010 // Check that we didn't boot until at least t=0.
4011 EXPECT_LE(monotonic_clock::epoch(), test_event_loop->monotonic_now());
4012 });
4013 reader.event_loop_factory()->Run();
4014 reader.Deregister();
4015 }
4016}
4017
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004018// Tests that when we have a loop without all the logs at all points in time,
4019// we can sort it properly.
Naman Guptaa63aa132023-03-22 20:06:34 -07004020TEST(MultinodeLoggerLoopTest, Loop) {
4021 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004022 aos::configuration::ReadConfig(
4023 ArtifactPath("aos/events/logging/"
4024 "multinode_pingpong_triangle_split_config.json"));
Naman Guptaa63aa132023-03-22 20:06:34 -07004025 message_bridge::TestingTimeConverter time_converter(
4026 configuration::NodesCount(&config.message()));
4027 SimulatedEventLoopFactory event_loop_factory(&config.message());
4028 event_loop_factory.SetTimeConverter(&time_converter);
4029
4030 NodeEventLoopFactory *const pi1 =
4031 event_loop_factory.GetNodeEventLoopFactory("pi1");
4032 NodeEventLoopFactory *const pi2 =
4033 event_loop_factory.GetNodeEventLoopFactory("pi2");
4034 NodeEventLoopFactory *const pi3 =
4035 event_loop_factory.GetNodeEventLoopFactory("pi3");
4036
4037 const std::string kLogfile1_1 =
4038 aos::testing::TestTmpDir() + "/multi_logfile1/";
4039 const std::string kLogfile2_1 =
4040 aos::testing::TestTmpDir() + "/multi_logfile2/";
4041 const std::string kLogfile3_1 =
4042 aos::testing::TestTmpDir() + "/multi_logfile3/";
4043 util::UnlinkRecursive(kLogfile1_1);
4044 util::UnlinkRecursive(kLogfile2_1);
4045 util::UnlinkRecursive(kLogfile3_1);
4046
4047 {
4048 // Make pi1 boot before everything else.
4049 time_converter.AddNextTimestamp(
4050 distributed_clock::epoch(),
4051 {BootTimestamp::epoch(),
4052 BootTimestamp::epoch() - chrono::milliseconds(100),
4053 BootTimestamp::epoch() - chrono::milliseconds(300)});
4054 }
4055
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004056 // We want to setup a situation such that 2 of the 3 legs of the loop are
4057 // very confident about time being X, and the third leg is pulling the
4058 // average off to one side.
Naman Guptaa63aa132023-03-22 20:06:34 -07004059 //
4060 // It's easiest to visualize this in timestamp_plotter.
4061
4062 std::vector<std::string> filenames;
4063 {
4064 // Have pi1 send out a reliable message at startup. This sets up a long
4065 // forwarding time message at the start to bias time.
4066 std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
4067 {
4068 aos::Sender<examples::Ping> ping_sender =
4069 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
4070
4071 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
4072 examples::Ping::Builder ping_builder =
4073 builder.MakeBuilder<examples::Ping>();
4074 CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
4075 }
4076
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004077 // Wait a while so there's enough data to let the worst case be rather
4078 // off.
Naman Guptaa63aa132023-03-22 20:06:34 -07004079 event_loop_factory.RunFor(chrono::seconds(1000));
4080
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004081 // Now start a receiving node first. This sets up 2 tight bounds between
4082 // 2 of the nodes.
Naman Guptaa63aa132023-03-22 20:06:34 -07004083 LoggerState pi2_logger = MakeLoggerState(
4084 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4085 pi2_logger.StartLogger(kLogfile2_1);
4086
4087 event_loop_factory.RunFor(chrono::seconds(100));
4088
4089 // And now start the third leg.
4090 LoggerState pi3_logger = MakeLoggerState(
4091 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4092 pi3_logger.StartLogger(kLogfile3_1);
4093
4094 LoggerState pi1_logger = MakeLoggerState(
4095 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
4096 pi1_logger.StartLogger(kLogfile1_1);
4097
4098 event_loop_factory.RunFor(chrono::seconds(100));
4099
4100 pi1_logger.AppendAllFilenames(&filenames);
4101 pi2_logger.AppendAllFilenames(&filenames);
4102 pi3_logger.AppendAllFilenames(&filenames);
4103 }
4104
4105 // Make sure we can read this.
4106 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004107 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004108 auto result = ConfirmReadable(filenames);
4109}
4110
Austin Schuh08dba8f2023-05-01 08:29:30 -07004111// Tests that RestartLogging works in the simple case. Unfortunately, the
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004112// failure cases involve simulating time elapsing in callbacks, which is
4113// really hard. The best we can reasonably do is make sure 2 back to back
4114// logs are parseable together.
Austin Schuh08dba8f2023-05-01 08:29:30 -07004115TEST_P(MultinodeLoggerTest, RestartLogging) {
4116 time_converter_.AddMonotonic(
4117 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
4118 std::vector<std::string> filenames;
4119 {
4120 LoggerState pi1_logger = MakeLogger(pi1_);
4121
4122 event_loop_factory_.RunFor(chrono::milliseconds(95));
4123
4124 StartLogger(&pi1_logger, logfile_base1_);
4125 aos::monotonic_clock::time_point last_rotation_time =
4126 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07004127 pi1_logger.logger->set_on_logged_period(
4128 [&](aos::monotonic_clock::time_point) {
4129 const auto now = pi1_logger.event_loop->monotonic_now();
4130 if (now > last_rotation_time + std::chrono::seconds(5)) {
4131 pi1_logger.AppendAllFilenames(&filenames);
4132 std::unique_ptr<MultiNodeFilesLogNamer> namer =
4133 pi1_logger.MakeLogNamer(logfile_base2_);
4134 pi1_logger.log_namer = namer.get();
Austin Schuh08dba8f2023-05-01 08:29:30 -07004135
Austin Schuh2f864452023-07-17 14:53:08 -07004136 pi1_logger.logger->RestartLogging(std::move(namer));
4137 last_rotation_time = now;
4138 }
4139 });
Austin Schuh08dba8f2023-05-01 08:29:30 -07004140
4141 event_loop_factory_.RunFor(chrono::milliseconds(7000));
4142
4143 pi1_logger.AppendAllFilenames(&filenames);
4144 }
4145
4146 for (const auto &x : filenames) {
4147 LOG(INFO) << x;
4148 }
4149
4150 EXPECT_GE(filenames.size(), 2u);
4151
4152 ConfirmReadable(filenames);
4153
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004154 // TODO(austin): It would be good to confirm that any one time messages end
4155 // up in both logs correctly.
Austin Schuh08dba8f2023-05-01 08:29:30 -07004156}
4157
Naman Guptaa63aa132023-03-22 20:06:34 -07004158} // namespace testing
4159} // namespace logger
4160} // namespace aos