blob: 1e26c3cfce1b44d8e0b09d819e82e59004054e5b [file] [log] [blame]
Brian Smartte67d7112023-03-20 12:06:30 -07001#include <algorithm>
2
Philipp Schrader790cb542023-07-05 21:06:52 -07003#include "gmock/gmock.h"
4#include "gtest/gtest.h"
5
Naman Guptaa63aa132023-03-22 20:06:34 -07006#include "aos/events/logging/log_reader.h"
7#include "aos/events/logging/multinode_logger_test_lib.h"
8#include "aos/events/message_counter.h"
9#include "aos/events/ping_lib.h"
10#include "aos/events/pong_lib.h"
11#include "aos/network/remote_message_generated.h"
12#include "aos/network/timestamp_generated.h"
13#include "aos/testing/tmpdir.h"
Naman Guptaa63aa132023-03-22 20:06:34 -070014
Stephan Pleinesf63bde82024-01-13 15:59:33 -080015namespace aos::logger::testing {
Naman Guptaa63aa132023-03-22 20:06:34 -070016
17namespace chrono = std::chrono;
18using aos::message_bridge::RemoteMessage;
19using aos::testing::ArtifactPath;
20using aos::testing::MessageCounter;
21
Naman Guptaa63aa132023-03-22 20:06:34 -070022INSTANTIATE_TEST_SUITE_P(
23 All, MultinodeLoggerTest,
24 ::testing::Combine(
25 ::testing::Values(
26 ConfigParams{"multinode_pingpong_combined_config.json", true,
Austin Schuh6ecfe902023-08-04 22:44:37 -070027 kCombinedConfigSha1(), kCombinedConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070028 FileStrategy::kKeepSeparate,
29 ForceTimestampBuffering::kForceBufferTimestamps},
30 ConfigParams{"multinode_pingpong_combined_config.json", true,
31 kCombinedConfigSha1(), kCombinedConfigSha1(),
32 FileStrategy::kKeepSeparate,
33 ForceTimestampBuffering::kAutoBuffer},
Naman Guptaa63aa132023-03-22 20:06:34 -070034 ConfigParams{"multinode_pingpong_split_config.json", false,
Austin Schuh6ecfe902023-08-04 22:44:37 -070035 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070036 FileStrategy::kKeepSeparate,
37 ForceTimestampBuffering::kForceBufferTimestamps},
Austin Schuh6ecfe902023-08-04 22:44:37 -070038 ConfigParams{"multinode_pingpong_split_config.json", false,
39 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070040 FileStrategy::kKeepSeparate,
41 ForceTimestampBuffering::kAutoBuffer},
42 ConfigParams{"multinode_pingpong_split_config.json", false,
43 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
44 FileStrategy::kCombine,
45 ForceTimestampBuffering::kForceBufferTimestamps},
46 ConfigParams{"multinode_pingpong_split_config.json", false,
47 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
48 FileStrategy::kCombine,
49 ForceTimestampBuffering::kAutoBuffer}),
Naman Guptaa63aa132023-03-22 20:06:34 -070050 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
51
52INSTANTIATE_TEST_SUITE_P(
53 All, MultinodeLoggerDeathTest,
54 ::testing::Combine(
55 ::testing::Values(
56 ConfigParams{"multinode_pingpong_combined_config.json", true,
Austin Schuh6ecfe902023-08-04 22:44:37 -070057 kCombinedConfigSha1(), kCombinedConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070058 FileStrategy::kKeepSeparate,
59 ForceTimestampBuffering::kForceBufferTimestamps},
60 ConfigParams{"multinode_pingpong_combined_config.json", true,
61 kCombinedConfigSha1(), kCombinedConfigSha1(),
62 FileStrategy::kKeepSeparate,
63 ForceTimestampBuffering::kAutoBuffer},
Naman Guptaa63aa132023-03-22 20:06:34 -070064 ConfigParams{"multinode_pingpong_split_config.json", false,
Austin Schuh6ecfe902023-08-04 22:44:37 -070065 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070066 FileStrategy::kKeepSeparate,
67 ForceTimestampBuffering::kForceBufferTimestamps},
Austin Schuh6ecfe902023-08-04 22:44:37 -070068 ConfigParams{"multinode_pingpong_split_config.json", false,
69 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070070 FileStrategy::kKeepSeparate,
71 ForceTimestampBuffering::kAutoBuffer},
72 ConfigParams{"multinode_pingpong_split_config.json", false,
73 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
74 FileStrategy::kCombine,
75 ForceTimestampBuffering::kForceBufferTimestamps},
76 ConfigParams{"multinode_pingpong_split_config.json", false,
77 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
78 FileStrategy::kCombine,
79 ForceTimestampBuffering::kAutoBuffer}),
Naman Guptaa63aa132023-03-22 20:06:34 -070080 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
81
Austin Schuh633858f2024-03-22 14:34:19 -070082// Class to spam Pong messages blindly.
83class PongSender {
84 public:
85 PongSender(EventLoop *loop, std::string_view channel_name)
86 : sender_(loop->MakeSender<examples::Pong>(channel_name)) {
87 loop->AddPhasedLoop(
88 [this](int) {
89 aos::Sender<examples::Pong>::Builder builder = sender_.MakeBuilder();
90 examples::Pong::Builder pong_builder =
91 builder.MakeBuilder<examples::Pong>();
92 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
93 },
94 chrono::milliseconds(10));
95 }
96
97 private:
98 aos::Sender<examples::Pong> sender_;
99};
100
101// Class to spam Ping messages blindly.
102class PingSender {
103 public:
104 PingSender(EventLoop *loop, std::string_view channel_name)
105 : sender_(loop->MakeSender<examples::Ping>(channel_name)) {
106 loop->AddPhasedLoop(
107 [this](int) {
108 aos::Sender<examples::Ping>::Builder builder = sender_.MakeBuilder();
109 examples::Ping::Builder ping_builder =
110 builder.MakeBuilder<examples::Ping>();
111 CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
112 },
113 chrono::milliseconds(10));
114 }
115
116 private:
117 aos::Sender<examples::Ping> sender_;
118};
119
Naman Guptaa63aa132023-03-22 20:06:34 -0700120// Tests that we can write and read simple multi-node log files.
121TEST_P(MultinodeLoggerTest, SimpleMultiNode) {
Austin Schuh6ecfe902023-08-04 22:44:37 -0700122 if (file_strategy() == FileStrategy::kCombine) {
123 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
124 }
125
Naman Guptaa63aa132023-03-22 20:06:34 -0700126 std::vector<std::string> actual_filenames;
127 time_converter_.StartEqual();
128
129 {
130 LoggerState pi1_logger = MakeLogger(pi1_);
131 LoggerState pi2_logger = MakeLogger(pi2_);
132
133 event_loop_factory_.RunFor(chrono::milliseconds(95));
134
135 StartLogger(&pi1_logger);
136 StartLogger(&pi2_logger);
137
138 event_loop_factory_.RunFor(chrono::milliseconds(20000));
139 pi1_logger.AppendAllFilenames(&actual_filenames);
140 pi2_logger.AppendAllFilenames(&actual_filenames);
141 }
142
143 ASSERT_THAT(actual_filenames,
144 ::testing::UnorderedElementsAreArray(logfiles_));
145
146 {
147 std::set<std::string> logfile_uuids;
148 std::set<std::string> parts_uuids;
149 // Confirm that we have the expected number of UUIDs for both the logfile
150 // UUIDs and parts UUIDs.
151 std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
152 for (std::string_view f : logfiles_) {
153 log_header.emplace_back(ReadHeader(f).value());
154 if (!log_header.back().message().has_configuration()) {
155 logfile_uuids.insert(
156 log_header.back().message().log_event_uuid()->str());
157 parts_uuids.insert(log_header.back().message().parts_uuid()->str());
158 }
159 }
160
161 EXPECT_EQ(logfile_uuids.size(), 2u);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700162 EXPECT_EQ(parts_uuids.size(), 8u);
Naman Guptaa63aa132023-03-22 20:06:34 -0700163
164 // And confirm everything is on the correct node.
165 EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
166 EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi1");
167 EXPECT_EQ(log_header[4].message().node()->name()->string_view(), "pi1");
168
169 EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
170 EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700171 EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700172
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700173 EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi1");
174 EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi1");
Naman Guptaa63aa132023-03-22 20:06:34 -0700175
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700176 EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi2");
177 EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700178
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700179 EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
180 EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
181 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700182
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700183 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi1");
184 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi1");
Naman Guptaa63aa132023-03-22 20:06:34 -0700185
186 // And the parts index matches.
187 EXPECT_EQ(log_header[2].message().parts_index(), 0);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700188
189 EXPECT_EQ(log_header[3].message().parts_index(), 0);
190 EXPECT_EQ(log_header[4].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700191
192 EXPECT_EQ(log_header[5].message().parts_index(), 0);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700193
194 EXPECT_EQ(log_header[6].message().parts_index(), 0);
195 EXPECT_EQ(log_header[7].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700196
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700197 EXPECT_EQ(log_header[8].message().parts_index(), 0);
198 EXPECT_EQ(log_header[9].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700199
200 EXPECT_EQ(log_header[10].message().parts_index(), 0);
201 EXPECT_EQ(log_header[11].message().parts_index(), 1);
202
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700203 EXPECT_EQ(log_header[12].message().parts_index(), 0);
204 EXPECT_EQ(log_header[13].message().parts_index(), 1);
205 EXPECT_EQ(log_header[14].message().parts_index(), 2);
Naman Guptaa63aa132023-03-22 20:06:34 -0700206
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700207 EXPECT_EQ(log_header[15].message().parts_index(), 0);
208 EXPECT_EQ(log_header[16].message().parts_index(), 1);
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700209
210 // And that the data_stored field is right.
211 EXPECT_THAT(*log_header[2].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700212 ::testing::ElementsAre(StoredDataType::DATA));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700213 EXPECT_THAT(*log_header[3].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700214 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700215 EXPECT_THAT(*log_header[4].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700216 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700217
218 EXPECT_THAT(*log_header[5].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700219 ::testing::ElementsAre(StoredDataType::DATA));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700220 EXPECT_THAT(*log_header[6].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700221 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700222 EXPECT_THAT(*log_header[7].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700223 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700224
225 EXPECT_THAT(*log_header[8].message().data_stored(),
226 ::testing::ElementsAre(StoredDataType::DATA));
227 EXPECT_THAT(*log_header[9].message().data_stored(),
228 ::testing::ElementsAre(StoredDataType::DATA));
229
230 EXPECT_THAT(*log_header[10].message().data_stored(),
231 ::testing::ElementsAre(StoredDataType::DATA));
232 EXPECT_THAT(*log_header[11].message().data_stored(),
233 ::testing::ElementsAre(StoredDataType::DATA));
234
235 EXPECT_THAT(*log_header[12].message().data_stored(),
236 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
237 EXPECT_THAT(*log_header[13].message().data_stored(),
238 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
239 EXPECT_THAT(*log_header[14].message().data_stored(),
240 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
241
242 EXPECT_THAT(*log_header[15].message().data_stored(),
243 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
244 EXPECT_THAT(*log_header[16].message().data_stored(),
245 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
Naman Guptaa63aa132023-03-22 20:06:34 -0700246 }
247
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700248 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
249 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -0700250 {
251 using ::testing::UnorderedElementsAre;
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700252 std::shared_ptr<const aos::Configuration> config = sorted_parts[0].config;
Naman Guptaa63aa132023-03-22 20:06:34 -0700253
254 // Timing reports, pings
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700255 if (shared()) {
256 EXPECT_THAT(
257 CountChannelsData(config, logfiles_[2]),
258 UnorderedElementsAre(
259 std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
260 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
261 200),
262 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
263 21),
264 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
265 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
266 std::make_tuple("/test", "aos.examples.Ping", 2001)))
267 << " : " << logfiles_[2];
268 } else {
269 EXPECT_THAT(
270 CountChannelsData(config, logfiles_[2]),
271 UnorderedElementsAre(
272 std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
273 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
274 200),
275 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
276 21),
277 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
278 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
279 std::make_tuple("/test", "aos.examples.Ping", 2001),
280 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
281 "aos-message_bridge-Timestamp",
282 "aos.message_bridge.RemoteMessage", 200)))
283 << " : " << logfiles_[2];
Naman Guptaa63aa132023-03-22 20:06:34 -0700284 }
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700285
286 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
287 ::testing::UnorderedElementsAre())
288 << " : " << logfiles_[3];
289 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
290 ::testing::UnorderedElementsAre())
291 << " : " << logfiles_[4];
292
Naman Guptaa63aa132023-03-22 20:06:34 -0700293 // Timestamps for pong
294 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
295 UnorderedElementsAre())
296 << " : " << logfiles_[2];
297 EXPECT_THAT(
298 CountChannelsTimestamp(config, logfiles_[3]),
299 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
300 << " : " << logfiles_[3];
301 EXPECT_THAT(
302 CountChannelsTimestamp(config, logfiles_[4]),
303 UnorderedElementsAre(
304 std::make_tuple("/test", "aos.examples.Pong", 2000),
305 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)))
306 << " : " << logfiles_[4];
307
Naman Guptaa63aa132023-03-22 20:06:34 -0700308 // Timing reports and pongs.
Naman Guptaa63aa132023-03-22 20:06:34 -0700309 EXPECT_THAT(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700310 CountChannelsData(config, logfiles_[5]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700311 UnorderedElementsAre(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700312 std::make_tuple("/pi2/aos", "aos.examples.Ping", 2001),
Naman Guptaa63aa132023-03-22 20:06:34 -0700313 std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
314 200),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700315 std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
316 21),
317 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700318 std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700319 std::make_tuple("/test", "aos.examples.Pong", 2001)))
320 << " : " << logfiles_[5];
321 EXPECT_THAT(CountChannelsData(config, logfiles_[6]), UnorderedElementsAre())
322 << " : " << logfiles_[6];
323 EXPECT_THAT(CountChannelsData(config, logfiles_[7]), UnorderedElementsAre())
Naman Guptaa63aa132023-03-22 20:06:34 -0700324 << " : " << logfiles_[7];
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700325 // And ping timestamps.
326 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
327 UnorderedElementsAre())
328 << " : " << logfiles_[5];
Naman Guptaa63aa132023-03-22 20:06:34 -0700329 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700330 CountChannelsTimestamp(config, logfiles_[6]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700331 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700332 << " : " << logfiles_[6];
Naman Guptaa63aa132023-03-22 20:06:34 -0700333 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700334 CountChannelsTimestamp(config, logfiles_[7]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700335 UnorderedElementsAre(
336 std::make_tuple("/test", "aos.examples.Ping", 2000),
337 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700338 << " : " << logfiles_[7];
Naman Guptaa63aa132023-03-22 20:06:34 -0700339
340 // And then test that the remotely logged timestamp data files only have
341 // timestamps in them.
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700342 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[8]),
343 UnorderedElementsAre())
344 << " : " << logfiles_[8];
345 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[9]),
346 UnorderedElementsAre())
347 << " : " << logfiles_[9];
Naman Guptaa63aa132023-03-22 20:06:34 -0700348 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
349 UnorderedElementsAre())
350 << " : " << logfiles_[10];
351 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
352 UnorderedElementsAre())
353 << " : " << logfiles_[11];
Naman Guptaa63aa132023-03-22 20:06:34 -0700354
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700355 EXPECT_THAT(CountChannelsData(config, logfiles_[8]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700356 UnorderedElementsAre(std::make_tuple(
357 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700358 << " : " << logfiles_[8];
359 EXPECT_THAT(CountChannelsData(config, logfiles_[9]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700360 UnorderedElementsAre(std::make_tuple(
361 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700362 << " : " << logfiles_[9];
Naman Guptaa63aa132023-03-22 20:06:34 -0700363
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700364 // Pong snd timestamp data.
365 EXPECT_THAT(
366 CountChannelsData(config, logfiles_[10]),
367 UnorderedElementsAre(
368 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 9),
369 std::make_tuple("/test", "aos.examples.Pong", 91)))
370 << " : " << logfiles_[10];
371 EXPECT_THAT(
372 CountChannelsData(config, logfiles_[11]),
373 UnorderedElementsAre(
374 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 191),
375 std::make_tuple("/test", "aos.examples.Pong", 1910)))
376 << " : " << logfiles_[11];
Naman Guptaa63aa132023-03-22 20:06:34 -0700377
378 // Timestamps from pi2 on pi1, and the other way.
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700379 // if (shared()) {
380 EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
381 UnorderedElementsAre())
382 << " : " << logfiles_[12];
383 EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
384 UnorderedElementsAre())
385 << " : " << logfiles_[13];
386 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
387 UnorderedElementsAre())
388 << " : " << logfiles_[14];
389 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
390 UnorderedElementsAre())
391 << " : " << logfiles_[15];
392 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
393 UnorderedElementsAre())
394 << " : " << logfiles_[16];
Naman Guptaa63aa132023-03-22 20:06:34 -0700395
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700396 EXPECT_THAT(
397 CountChannelsTimestamp(config, logfiles_[12]),
398 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
399 << " : " << logfiles_[12];
400 EXPECT_THAT(
401 CountChannelsTimestamp(config, logfiles_[13]),
402 UnorderedElementsAre(
403 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 9),
404 std::make_tuple("/test", "aos.examples.Ping", 90)))
405 << " : " << logfiles_[13];
406 EXPECT_THAT(
407 CountChannelsTimestamp(config, logfiles_[14]),
408 UnorderedElementsAre(
409 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 191),
410 std::make_tuple("/test", "aos.examples.Ping", 1910)))
411 << " : " << logfiles_[14];
412 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
413 UnorderedElementsAre(std::make_tuple(
414 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
415 << " : " << logfiles_[15];
416 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[16]),
417 UnorderedElementsAre(std::make_tuple(
418 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
419 << " : " << logfiles_[16];
Naman Guptaa63aa132023-03-22 20:06:34 -0700420 }
421
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700422 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700423
424 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
425 log_reader_factory.set_send_delay(chrono::microseconds(0));
426
427 // This sends out the fetched messages and advances time to the start of the
428 // log file.
429 reader.Register(&log_reader_factory);
430
431 const Node *pi1 =
432 configuration::GetNode(log_reader_factory.configuration(), "pi1");
433 const Node *pi2 =
434 configuration::GetNode(log_reader_factory.configuration(), "pi2");
435
436 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
437 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
438 LOG(INFO) << "now pi1 "
439 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
440 LOG(INFO) << "now pi2 "
441 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
442
443 EXPECT_THAT(reader.LoggedNodes(),
444 ::testing::ElementsAre(
445 configuration::GetNode(reader.logged_configuration(), pi1),
446 configuration::GetNode(reader.logged_configuration(), pi2)));
447
448 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
449
450 std::unique_ptr<EventLoop> pi1_event_loop =
451 log_reader_factory.MakeEventLoop("test", pi1);
452 std::unique_ptr<EventLoop> pi2_event_loop =
453 log_reader_factory.MakeEventLoop("test", pi2);
454
455 int pi1_ping_count = 10;
456 int pi2_ping_count = 10;
457 int pi1_pong_count = 10;
458 int pi2_pong_count = 10;
459
460 // Confirm that the ping value matches.
461 pi1_event_loop->MakeWatcher(
462 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
463 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
464 << pi1_event_loop->context().monotonic_remote_time << " -> "
465 << pi1_event_loop->context().monotonic_event_time;
466 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
467 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
468 pi1_ping_count * chrono::milliseconds(10) +
469 monotonic_clock::epoch());
470 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
471 pi1_ping_count * chrono::milliseconds(10) +
472 realtime_clock::epoch());
473 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
474 pi1_event_loop->context().monotonic_event_time);
475 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
476 pi1_event_loop->context().realtime_event_time);
Austin Schuhb5224ec2024-03-27 15:20:09 -0700477 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_transmit_time,
478 monotonic_clock::min_time);
Naman Guptaa63aa132023-03-22 20:06:34 -0700479
480 ++pi1_ping_count;
481 });
482 pi2_event_loop->MakeWatcher(
483 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
484 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
485 << pi2_event_loop->context().monotonic_remote_time << " -> "
486 << pi2_event_loop->context().monotonic_event_time;
487 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
488
489 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
490 pi2_ping_count * chrono::milliseconds(10) +
491 monotonic_clock::epoch());
492 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
493 pi2_ping_count * chrono::milliseconds(10) +
494 realtime_clock::epoch());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700495 // The message at the start of each second doesn't have wakeup latency
496 // since timing reports and server statistics wake us up already at that
497 // point in time.
498 chrono::nanoseconds offset = chrono::microseconds(150);
499 if (pi2_event_loop->context().monotonic_remote_time.time_since_epoch() %
500 chrono::seconds(1) ==
501 chrono::seconds(0)) {
502 offset = chrono::microseconds(100);
503 }
504 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time + offset,
Naman Guptaa63aa132023-03-22 20:06:34 -0700505 pi2_event_loop->context().monotonic_event_time);
Austin Schuhb5224ec2024-03-27 15:20:09 -0700506 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time -
507 chrono::microseconds(100),
508 pi2_event_loop->context().monotonic_remote_transmit_time);
Austin Schuhac6d89e2024-03-27 14:56:09 -0700509 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time + offset,
Naman Guptaa63aa132023-03-22 20:06:34 -0700510 pi2_event_loop->context().realtime_event_time);
511 ++pi2_ping_count;
512 });
513
514 constexpr ssize_t kQueueIndexOffset = -9;
515 // Confirm that the ping and pong counts both match, and the value also
516 // matches.
Austin Schuhac6d89e2024-03-27 14:56:09 -0700517 pi1_event_loop->MakeWatcher("/test", [&pi1_event_loop, &pi1_ping_count,
518 &pi1_pong_count](
519 const examples::Pong &pong) {
520 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
521 << pi1_event_loop->context().monotonic_remote_time << " -> "
522 << pi1_event_loop->context().monotonic_event_time;
Naman Guptaa63aa132023-03-22 20:06:34 -0700523
Austin Schuhac6d89e2024-03-27 14:56:09 -0700524 EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
525 pi1_pong_count + kQueueIndexOffset);
Naman Guptaa63aa132023-03-22 20:06:34 -0700526
Austin Schuhac6d89e2024-03-27 14:56:09 -0700527 chrono::nanoseconds offset = chrono::microseconds(200);
528 if ((pi1_event_loop->context().monotonic_remote_time.time_since_epoch() -
529 chrono::microseconds(150)) %
530 chrono::seconds(1) ==
531 chrono::seconds(0)) {
532 offset = chrono::microseconds(150);
533 }
Naman Guptaa63aa132023-03-22 20:06:34 -0700534
Austin Schuhac6d89e2024-03-27 14:56:09 -0700535 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
536 offset + pi1_pong_count * chrono::milliseconds(10) +
537 monotonic_clock::epoch());
538 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
539 offset + pi1_pong_count * chrono::milliseconds(10) +
540 realtime_clock::epoch());
Naman Guptaa63aa132023-03-22 20:06:34 -0700541
Austin Schuhac6d89e2024-03-27 14:56:09 -0700542 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
543 chrono::microseconds(150),
544 pi1_event_loop->context().monotonic_event_time);
545 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
546 chrono::microseconds(150),
547 pi1_event_loop->context().realtime_event_time);
Austin Schuhb5224ec2024-03-27 15:20:09 -0700548 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_transmit_time,
549 pi1_event_loop->context().monotonic_event_time -
550 chrono::microseconds(100));
Naman Guptaa63aa132023-03-22 20:06:34 -0700551
Austin Schuhac6d89e2024-03-27 14:56:09 -0700552 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
553 ++pi1_pong_count;
554 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
555 });
556 pi2_event_loop->MakeWatcher("/test", [&pi2_event_loop, &pi2_ping_count,
557 &pi2_pong_count](
558 const examples::Pong &pong) {
559 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
560 << pi2_event_loop->context().monotonic_remote_time << " -> "
561 << pi2_event_loop->context().monotonic_event_time;
Naman Guptaa63aa132023-03-22 20:06:34 -0700562
Austin Schuhac6d89e2024-03-27 14:56:09 -0700563 EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
564 pi2_pong_count + kQueueIndexOffset);
Naman Guptaa63aa132023-03-22 20:06:34 -0700565
Austin Schuhac6d89e2024-03-27 14:56:09 -0700566 chrono::nanoseconds offset = chrono::microseconds(200);
567 if ((pi2_event_loop->context().monotonic_remote_time.time_since_epoch() -
568 chrono::microseconds(150)) %
569 chrono::seconds(1) ==
570 chrono::seconds(0)) {
571 offset = chrono::microseconds(150);
572 }
573
574 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
575 offset + pi2_pong_count * chrono::milliseconds(10) +
576 monotonic_clock::epoch());
577 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
578 offset + pi2_pong_count * chrono::milliseconds(10) +
579 realtime_clock::epoch());
580
581 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
582 pi2_event_loop->context().monotonic_event_time);
583 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
584 pi2_event_loop->context().realtime_event_time);
Austin Schuhb5224ec2024-03-27 15:20:09 -0700585 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_transmit_time,
586 monotonic_clock::min_time);
Austin Schuhac6d89e2024-03-27 14:56:09 -0700587
588 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
589 ++pi2_pong_count;
590 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
591 });
Naman Guptaa63aa132023-03-22 20:06:34 -0700592
593 log_reader_factory.Run();
594 EXPECT_EQ(pi1_ping_count, 2010);
595 EXPECT_EQ(pi2_ping_count, 2010);
596 EXPECT_EQ(pi1_pong_count, 2010);
597 EXPECT_EQ(pi2_pong_count, 2010);
598
599 reader.Deregister();
600}
601
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600602// MultinodeLoggerTest that tests the mutate callback works across multiple
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700603// nodes with remapping.
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600604TEST_P(MultinodeLoggerTest, MultiNodeRemapMutateCallback) {
605 time_converter_.StartEqual();
606 std::vector<std::string> actual_filenames;
607
608 {
609 LoggerState pi1_logger = MakeLogger(pi1_);
610 LoggerState pi2_logger = MakeLogger(pi2_);
611
612 event_loop_factory_.RunFor(chrono::milliseconds(95));
613
614 StartLogger(&pi1_logger);
615 StartLogger(&pi2_logger);
616
617 event_loop_factory_.RunFor(chrono::milliseconds(20000));
618 pi1_logger.AppendAllFilenames(&actual_filenames);
619 pi2_logger.AppendAllFilenames(&actual_filenames);
620 }
621
Austin Schuh8fb4b452023-08-04 17:02:27 -0700622 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700623 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600624
625 LogReader reader(sorted_parts, &config_.message());
626 // Remap just on pi1.
627 reader.RemapLoggedChannel<examples::Pong>(
628 "/test", configuration::GetNode(reader.configuration(), "pi1"));
629
630 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
631
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700632 int pong_count = 10;
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600633 // Adds a callback which mutates the value of the pong message before the
634 // message is sent which is the feature we are testing here
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700635 reader.AddBeforeSendCallback<aos::examples::Pong>(
636 "/test",
637 [&pong_count](
638 aos::examples::Pong *pong,
639 const TimestampedMessage &timestamped_message) -> SharedSpan {
640 pong->mutate_value(pong_count + 1);
641 ++pong_count;
642 return *timestamped_message.data;
643 });
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600644
645 // This sends out the fetched messages and advances time to the start of the
646 // log file.
647 reader.Register(&log_reader_factory);
648
649 const Node *pi1 =
650 configuration::GetNode(log_reader_factory.configuration(), "pi1");
651 const Node *pi2 =
652 configuration::GetNode(log_reader_factory.configuration(), "pi2");
653
654 EXPECT_THAT(reader.LoggedNodes(),
655 ::testing::ElementsAre(
656 configuration::GetNode(reader.logged_configuration(), pi1),
657 configuration::GetNode(reader.logged_configuration(), pi2)));
658
659 std::unique_ptr<EventLoop> pi1_event_loop =
660 log_reader_factory.MakeEventLoop("test", pi1);
661 std::unique_ptr<EventLoop> pi2_event_loop =
662 log_reader_factory.MakeEventLoop("test", pi2);
663
664 pi1_event_loop->MakeWatcher("/original/test",
665 [&pong_count](const examples::Pong &pong) {
666 EXPECT_EQ(pong_count, pong.value());
667 });
668
669 pi2_event_loop->MakeWatcher("/test",
670 [&pong_count](const examples::Pong &pong) {
671 EXPECT_EQ(pong_count, pong.value());
672 });
673
674 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
675 reader.Deregister();
676
677 EXPECT_EQ(pong_count, 2011);
678}
679
680// MultinodeLoggerTest that tests the mutate callback works across multiple
681// nodes
682TEST_P(MultinodeLoggerTest, MultiNodeMutateCallback) {
683 time_converter_.StartEqual();
684 std::vector<std::string> actual_filenames;
685
686 {
687 LoggerState pi1_logger = MakeLogger(pi1_);
688 LoggerState pi2_logger = MakeLogger(pi2_);
689
690 event_loop_factory_.RunFor(chrono::milliseconds(95));
691
692 StartLogger(&pi1_logger);
693 StartLogger(&pi2_logger);
694
695 event_loop_factory_.RunFor(chrono::milliseconds(20000));
696 pi1_logger.AppendAllFilenames(&actual_filenames);
697 pi2_logger.AppendAllFilenames(&actual_filenames);
698 }
699
Austin Schuh8fb4b452023-08-04 17:02:27 -0700700 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700701 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600702
703 LogReader reader(sorted_parts, &config_.message());
704
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700705 int pong_count = 10;
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600706 // Adds a callback which mutates the value of the pong message before the
707 // message is sent which is the feature we are testing here
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700708 reader.AddBeforeSendCallback<aos::examples::Pong>(
709 "/test",
710 [&pong_count](
711 aos::examples::Pong *pong,
712 const TimestampedMessage &timestamped_message) -> SharedSpan {
713 pong->mutate_value(pong_count + 1);
714 ++pong_count;
715 return *timestamped_message.data;
716 });
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600717
718 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
719
720 // This sends out the fetched messages and advances time to the start of the
721 // log file.
722 reader.Register(&log_reader_factory);
723
724 const Node *pi1 =
725 configuration::GetNode(log_reader_factory.configuration(), "pi1");
726 const Node *pi2 =
727 configuration::GetNode(log_reader_factory.configuration(), "pi2");
728
729 EXPECT_THAT(reader.LoggedNodes(),
730 ::testing::ElementsAre(
731 configuration::GetNode(reader.logged_configuration(), pi1),
732 configuration::GetNode(reader.logged_configuration(), pi2)));
733
734 std::unique_ptr<EventLoop> pi1_event_loop =
735 log_reader_factory.MakeEventLoop("test", pi1);
736 std::unique_ptr<EventLoop> pi2_event_loop =
737 log_reader_factory.MakeEventLoop("test", pi2);
738
739 pi1_event_loop->MakeWatcher("/test",
740 [&pong_count](const examples::Pong &pong) {
741 EXPECT_EQ(pong_count, pong.value());
742 });
743
744 pi2_event_loop->MakeWatcher("/test",
745 [&pong_count](const examples::Pong &pong) {
746 EXPECT_EQ(pong_count, pong.value());
747 });
748
749 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
750 reader.Deregister();
751
752 EXPECT_EQ(pong_count, 2011);
753}
754
755// Tests that the before send callback is only called from the sender node if it
756// is forwarded
757TEST_P(MultinodeLoggerTest, OnlyDoBeforeSendCallbackOnSenderNode) {
758 time_converter_.StartEqual();
Austin Schuh8fb4b452023-08-04 17:02:27 -0700759
760 std::vector<std::string> filenames;
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600761 {
762 LoggerState pi1_logger = MakeLogger(pi1_);
763 LoggerState pi2_logger = MakeLogger(pi2_);
764
765 event_loop_factory_.RunFor(chrono::milliseconds(95));
766
767 StartLogger(&pi1_logger);
768 StartLogger(&pi2_logger);
769
770 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -0700771
772 pi1_logger.AppendAllFilenames(&filenames);
773 pi2_logger.AppendAllFilenames(&filenames);
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600774 }
775
Austin Schuh8fb4b452023-08-04 17:02:27 -0700776 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700777 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
778 LogReader reader(sorted_parts);
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600779
780 int ping_count = 0;
781 // Adds a callback which mutates the value of the pong message before the
782 // message is sent which is the feature we are testing here
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700783 reader.AddBeforeSendCallback<aos::examples::Ping>(
784 "/test",
785 [&ping_count](
786 aos::examples::Ping *ping,
787 const TimestampedMessage &timestamped_message) -> SharedSpan {
788 ++ping_count;
789 ping->mutate_value(ping_count);
790 return *timestamped_message.data;
791 });
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600792
793 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
794 log_reader_factory.set_send_delay(chrono::microseconds(0));
795
796 reader.Register(&log_reader_factory);
797
798 const Node *pi1 =
799 configuration::GetNode(log_reader_factory.configuration(), "pi1");
800 const Node *pi2 =
801 configuration::GetNode(log_reader_factory.configuration(), "pi2");
802
803 std::unique_ptr<EventLoop> pi1_event_loop =
804 log_reader_factory.MakeEventLoop("test", pi1);
805 pi1_event_loop->SkipTimingReport();
806 std::unique_ptr<EventLoop> pi2_event_loop =
807 log_reader_factory.MakeEventLoop("test", pi2);
808 pi2_event_loop->SkipTimingReport();
809
810 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
811 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
812
813 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
814 pi1_ping_timestamp;
815 if (!shared()) {
816 pi1_ping_timestamp =
817 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
818 pi1_event_loop.get(),
819 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
820 }
821
822 log_reader_factory.Run();
823
824 EXPECT_EQ(pi1_ping.count(), 2000u);
825 EXPECT_EQ(pi2_ping.count(), 2000u);
826 // If the BeforeSendCallback is called on both nodes, then the ping count
827 // would be 4002 instead of 2001
828 EXPECT_EQ(ping_count, 2001u);
829 if (!shared()) {
830 EXPECT_EQ(pi1_ping_timestamp->count(), 2000u);
831 }
832
833 reader.Deregister();
834}
835
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700836// MultinodeLoggerTest that tests the mutate callback can fully replace the
837// message.
838TEST_P(MultinodeLoggerTest, MultiNodeMutateCallbackReplacement) {
839 time_converter_.StartEqual();
840 std::vector<std::string> actual_filenames;
841
842 {
843 LoggerState pi1_logger = MakeLogger(pi1_);
844 LoggerState pi2_logger = MakeLogger(pi2_);
845
846 event_loop_factory_.RunFor(chrono::milliseconds(95));
847
848 StartLogger(&pi1_logger);
849 StartLogger(&pi2_logger);
850
851 event_loop_factory_.RunFor(chrono::milliseconds(20000));
852 pi1_logger.AppendAllFilenames(&actual_filenames);
853 pi2_logger.AppendAllFilenames(&actual_filenames);
854 }
855
856 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
857 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
858
859 LogReader reader(sorted_parts, &config_.message());
860
861 int pong_count = 10;
862 const uint8_t *data_ptr = nullptr;
863 // Adds a callback which replaces the pong message before the message is sent.
864 reader.AddBeforeSendCallback<aos::examples::Pong>(
865 "/test",
866 [&pong_count, &data_ptr](aos::examples::Pong *pong,
867 const TimestampedMessage &) -> SharedSpan {
868 fbs::AlignedVectorAllocator allocator;
869 aos::fbs::Builder<aos::examples::PongStatic> pong_static(&allocator);
870 CHECK(pong_static->FromFlatbuffer(*pong));
871
872 pong_static->set_value(pong_count + 101);
873 ++pong_count;
874
875 SharedSpan result = allocator.Release();
876
877 data_ptr = result->data();
878
879 return result;
880 });
881
882 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
883
884 // This sends out the fetched messages and advances time to the start of the
885 // log file.
886 reader.Register(&log_reader_factory);
887
888 const Node *pi1 =
889 configuration::GetNode(log_reader_factory.configuration(), "pi1");
890 const Node *pi2 =
891 configuration::GetNode(log_reader_factory.configuration(), "pi2");
892
893 EXPECT_THAT(reader.LoggedNodes(),
894 ::testing::ElementsAre(
895 configuration::GetNode(reader.logged_configuration(), pi1),
896 configuration::GetNode(reader.logged_configuration(), pi2)));
897
898 std::unique_ptr<EventLoop> pi1_event_loop =
899 log_reader_factory.MakeEventLoop("test", pi1);
900 std::unique_ptr<EventLoop> pi2_event_loop =
901 log_reader_factory.MakeEventLoop("test", pi2);
902
903 int pi1_pong_count = 10;
904 pi1_event_loop->MakeWatcher(
905 "/test", [&pi1_event_loop, &pong_count, &pi1_pong_count,
906 &data_ptr](const examples::Pong &pong) {
907 ++pi1_pong_count;
908 // Since simulated event loops (especially log replay) refcount the
909 // shared data, we can verify if the right data got published by
910 // verifying that the actual pointer to the flatbuffer matches. This
911 // only is guarenteed to hold during this callback.
912 EXPECT_EQ(pi1_event_loop->context().data, data_ptr);
913 EXPECT_EQ(pong_count + 100, pong.value());
914 EXPECT_EQ(pi1_pong_count + 101, pong.value());
915 });
916
917 pi2_event_loop->MakeWatcher("/test", [&pi2_event_loop, &pong_count,
918 &data_ptr](const examples::Pong &pong) {
919 // Same goes for the forwarded side, that should be the same contents too.
920 EXPECT_EQ(pi2_event_loop->context().data, data_ptr);
921 EXPECT_EQ(pong_count + 100, pong.value());
922 });
923
924 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
925 reader.Deregister();
926
927 EXPECT_EQ(pong_count, 2011);
928}
929
930// MultinodeLoggerTest that tests the mutate callback can delete messages by
931// returning nullptr.
932TEST_P(MultinodeLoggerTest, MultiNodeMutateCallbackDelete) {
933 time_converter_.StartEqual();
934 std::vector<std::string> actual_filenames;
935
936 {
937 LoggerState pi1_logger = MakeLogger(pi1_);
938 LoggerState pi2_logger = MakeLogger(pi2_);
939
940 event_loop_factory_.RunFor(chrono::milliseconds(95));
941
942 StartLogger(&pi1_logger);
943 StartLogger(&pi2_logger);
944
945 event_loop_factory_.RunFor(chrono::milliseconds(20000));
946 pi1_logger.AppendAllFilenames(&actual_filenames);
947 pi2_logger.AppendAllFilenames(&actual_filenames);
948 }
949
950 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
951 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
952
953 LogReader reader(sorted_parts, &config_.message());
954
955 int pong_count = 10;
956 const uint8_t *data_ptr = nullptr;
957 // Adds a callback which mutates the value of the pong message before the
958 // message is sent which is the feature we are testing here
959 reader.AddBeforeSendCallback<aos::examples::Pong>(
960 "/test",
961 [&pong_count, &data_ptr](aos::examples::Pong *pong,
962 const TimestampedMessage &) -> SharedSpan {
963 fbs::AlignedVectorAllocator allocator;
964 aos::fbs::Builder<aos::examples::PongStatic> pong_static(&allocator);
965 CHECK(pong_static->FromFlatbuffer(*pong));
966
967 pong_static->set_value(pong_count + 101);
968 ++pong_count;
969
970 if ((pong_count % 2) == 0) {
971 data_ptr = nullptr;
972 return nullptr;
973 }
974
975 SharedSpan result = allocator.Release();
976
977 data_ptr = result->data();
978
979 return result;
980 });
981
982 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
983
984 // This sends out the fetched messages and advances time to the start of the
985 // log file.
986 reader.Register(&log_reader_factory);
987
988 const Node *pi1 =
989 configuration::GetNode(log_reader_factory.configuration(), "pi1");
990 const Node *pi2 =
991 configuration::GetNode(log_reader_factory.configuration(), "pi2");
992
993 EXPECT_THAT(reader.LoggedNodes(),
994 ::testing::ElementsAre(
995 configuration::GetNode(reader.logged_configuration(), pi1),
996 configuration::GetNode(reader.logged_configuration(), pi2)));
997
998 std::unique_ptr<EventLoop> pi1_event_loop =
999 log_reader_factory.MakeEventLoop("test", pi1);
1000 std::unique_ptr<EventLoop> pi2_event_loop =
1001 log_reader_factory.MakeEventLoop("test", pi2);
1002
1003 int pi1_pong_count = 10;
1004 pi1_event_loop->MakeWatcher(
1005 "/test", [&pi1_event_loop, &pong_count, &pi1_pong_count,
1006 &data_ptr](const examples::Pong &pong) {
1007 pi1_pong_count += 2;
1008 // Since simulated event loops (especially log replay) refcount the
1009 // shared data, we can verify if the right data got published by
1010 // verifying that the actual pointer to the flatbuffer matches. This
1011 // only is guarenteed to hold during this callback.
1012 EXPECT_EQ(pi1_event_loop->context().data, data_ptr);
1013 EXPECT_EQ(pong_count + 100, pong.value());
1014 EXPECT_EQ(pi1_pong_count + 101, pong.value());
1015 });
1016
1017 pi2_event_loop->MakeWatcher("/test", [&pi2_event_loop, &pong_count,
1018 &data_ptr](const examples::Pong &pong) {
1019 // Same goes for the forwarded side, that should be the same contents too.
1020 EXPECT_EQ(pi2_event_loop->context().data, data_ptr);
1021 EXPECT_EQ(pong_count + 100, pong.value());
1022 });
1023
1024 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
1025 reader.Deregister();
1026
1027 EXPECT_EQ(pong_count, 2011);
1028 // Since we count up by 2 each time we get a message, and the last pong gets
1029 // dropped since it is an odd number we expect the number on pi1 to be 1 less.
1030 EXPECT_EQ(pi1_pong_count, 2010);
1031}
1032
1033// MultinodeLoggerTest that tests that non-forwarded channels are able to be
1034// mutated.
1035TEST_P(MultinodeLoggerTest, MultiNodeMutateCallbackNotForwarded) {
1036 time_converter_.StartEqual();
1037 std::vector<std::string> actual_filenames;
1038
1039 {
1040 LoggerState pi1_logger = MakeLogger(pi1_);
1041 LoggerState pi2_logger = MakeLogger(pi2_);
1042
1043 event_loop_factory_.RunFor(chrono::milliseconds(95));
1044
1045 StartLogger(&pi1_logger);
1046 StartLogger(&pi2_logger);
1047
1048 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1049 pi1_logger.AppendAllFilenames(&actual_filenames);
1050 pi2_logger.AppendAllFilenames(&actual_filenames);
1051 }
1052
1053 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1054 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1055
1056 LogReader reader(sorted_parts, &config_.message());
1057
1058 int ping_count = 10;
1059 const uint8_t *data_ptr = nullptr;
1060 // Adds a callback which mutates the value of the pong message before the
1061 // message is sent which is the feature we are testing here
1062 reader.AddBeforeSendCallback<aos::examples::Ping>(
1063 "/pi1/aos",
1064 [&ping_count, &data_ptr](aos::examples::Ping *ping,
1065 const TimestampedMessage &) -> SharedSpan {
1066 fbs::AlignedVectorAllocator allocator;
1067 aos::fbs::Builder<aos::examples::PingStatic> ping_static(&allocator);
1068 CHECK(ping_static->FromFlatbuffer(*ping));
1069
1070 ping_static->set_value(ping_count + 101);
1071 ++ping_count;
1072
1073 SharedSpan result = allocator.Release();
1074
1075 data_ptr = result->data();
1076
1077 return result;
1078 });
1079
1080 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1081
1082 // This sends out the fetched messages and advances time to the start of the
1083 // log file.
1084 reader.Register(&log_reader_factory);
1085
1086 const Node *pi1 =
1087 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1088 const Node *pi2 =
1089 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1090
1091 EXPECT_THAT(reader.LoggedNodes(),
1092 ::testing::ElementsAre(
1093 configuration::GetNode(reader.logged_configuration(), pi1),
1094 configuration::GetNode(reader.logged_configuration(), pi2)));
1095
1096 std::unique_ptr<EventLoop> pi1_event_loop =
1097 log_reader_factory.MakeEventLoop("test", pi1);
1098 std::unique_ptr<EventLoop> pi2_event_loop =
1099 log_reader_factory.MakeEventLoop("test", pi2);
1100
1101 int pi1_ping_count = 10;
1102 pi1_event_loop->MakeWatcher(
1103 "/aos", [&pi1_event_loop, &ping_count, &pi1_ping_count,
1104 &data_ptr](const examples::Ping &ping) {
1105 ++pi1_ping_count;
1106 // Since simulated event loops (especially log replay) refcount the
1107 // shared data, we can verify if the right data got published by
1108 // verifying that the actual pointer to the flatbuffer matches. This
1109 // only is guarenteed to hold during this callback.
1110 EXPECT_EQ(pi1_event_loop->context().data, data_ptr);
1111 EXPECT_EQ(ping_count + 100, ping.value());
1112 EXPECT_EQ(pi1_ping_count + 101, ping.value());
1113 });
1114
1115 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
1116 reader.Deregister();
1117
1118 EXPECT_EQ(ping_count, 2011);
1119}
1120
Eric Schmiedebergae00e732023-04-12 15:53:17 -06001121// Tests that we do not allow adding callbacks after Register is called
1122TEST_P(MultinodeLoggerDeathTest, AddCallbackAfterRegister) {
1123 time_converter_.StartEqual();
1124 std::vector<std::string> actual_filenames;
1125
1126 {
1127 LoggerState pi1_logger = MakeLogger(pi1_);
1128 LoggerState pi2_logger = MakeLogger(pi2_);
1129
1130 event_loop_factory_.RunFor(chrono::milliseconds(95));
1131
1132 StartLogger(&pi1_logger);
1133 StartLogger(&pi2_logger);
1134
1135 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1136 pi1_logger.AppendAllFilenames(&actual_filenames);
1137 pi2_logger.AppendAllFilenames(&actual_filenames);
1138 }
1139
Austin Schuh8fb4b452023-08-04 17:02:27 -07001140 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001141 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -06001142
1143 LogReader reader(sorted_parts, &config_.message());
1144 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1145 reader.Register(&log_reader_factory);
1146 EXPECT_DEATH(
1147 {
Austin Schuh3c9f92c2024-04-30 17:56:42 -07001148 reader.AddBeforeSendCallback<aos::examples::Pong>(
1149 "/test",
1150 [](aos::examples::Pong *,
1151 const TimestampedMessage &timestamped_message) -> SharedSpan {
1152 LOG(FATAL) << "This should not be called";
1153 return *timestamped_message.data;
1154 });
Eric Schmiedebergae00e732023-04-12 15:53:17 -06001155 },
1156 "Cannot add callbacks after calling Register");
1157 reader.Deregister();
1158}
1159
Naman Guptaa63aa132023-03-22 20:06:34 -07001160// Test that if we feed the replay with a mismatched node list that we die on
1161// the LogReader constructor.
1162TEST_P(MultinodeLoggerDeathTest, MultiNodeBadReplayConfig) {
1163 time_converter_.StartEqual();
Austin Schuh8fb4b452023-08-04 17:02:27 -07001164
1165 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07001166 {
1167 LoggerState pi1_logger = MakeLogger(pi1_);
1168 LoggerState pi2_logger = MakeLogger(pi2_);
1169
1170 event_loop_factory_.RunFor(chrono::milliseconds(95));
1171
1172 StartLogger(&pi1_logger);
1173 StartLogger(&pi2_logger);
1174
1175 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07001176
1177 pi1_logger.AppendAllFilenames(&filenames);
1178 pi2_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001179 }
1180
1181 // Test that, if we add an additional node to the replay config that the
1182 // logger complains about the mismatch in number of nodes.
1183 FlatbufferDetachedBuffer<Configuration> extra_nodes_config =
1184 configuration::MergeWithConfig(&config_.message(), R"({
1185 "nodes": [
1186 {
1187 "name": "extra-node"
1188 }
1189 ]
1190 }
1191 )");
1192
Austin Schuh8fb4b452023-08-04 17:02:27 -07001193 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001194 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07001195 EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
1196 "Log file and replay config need to have matching nodes lists.");
1197}
1198
1199// Tests that we can read log files where they don't start at the same monotonic
1200// time.
1201TEST_P(MultinodeLoggerTest, StaggeredStart) {
1202 time_converter_.StartEqual();
1203 std::vector<std::string> actual_filenames;
1204
1205 {
1206 LoggerState pi1_logger = MakeLogger(pi1_);
1207 LoggerState pi2_logger = MakeLogger(pi2_);
1208
1209 event_loop_factory_.RunFor(chrono::milliseconds(95));
1210
1211 StartLogger(&pi1_logger);
1212
1213 event_loop_factory_.RunFor(chrono::milliseconds(200));
1214
1215 StartLogger(&pi2_logger);
1216
1217 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1218 pi1_logger.AppendAllFilenames(&actual_filenames);
1219 pi2_logger.AppendAllFilenames(&actual_filenames);
1220 }
1221
1222 // Since we delay starting pi2, it already knows about all the timestamps so
1223 // we don't end up with extra parts.
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001224 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1225 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1226 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001227
1228 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1229 log_reader_factory.set_send_delay(chrono::microseconds(0));
1230
1231 // This sends out the fetched messages and advances time to the start of the
1232 // log file.
1233 reader.Register(&log_reader_factory);
1234
1235 const Node *pi1 =
1236 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1237 const Node *pi2 =
1238 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1239
1240 EXPECT_THAT(reader.LoggedNodes(),
1241 ::testing::ElementsAre(
1242 configuration::GetNode(reader.logged_configuration(), pi1),
1243 configuration::GetNode(reader.logged_configuration(), pi2)));
1244
1245 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1246
1247 std::unique_ptr<EventLoop> pi1_event_loop =
1248 log_reader_factory.MakeEventLoop("test", pi1);
1249 std::unique_ptr<EventLoop> pi2_event_loop =
1250 log_reader_factory.MakeEventLoop("test", pi2);
1251
1252 int pi1_ping_count = 30;
1253 int pi2_ping_count = 30;
1254 int pi1_pong_count = 30;
1255 int pi2_pong_count = 30;
1256
1257 // Confirm that the ping value matches.
1258 pi1_event_loop->MakeWatcher(
1259 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
1260 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
1261 << pi1_event_loop->context().monotonic_remote_time << " -> "
1262 << pi1_event_loop->context().monotonic_event_time;
1263 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
1264
1265 ++pi1_ping_count;
1266 });
1267 pi2_event_loop->MakeWatcher(
1268 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
1269 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
1270 << pi2_event_loop->context().monotonic_remote_time << " -> "
1271 << pi2_event_loop->context().monotonic_event_time;
1272 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
1273
1274 ++pi2_ping_count;
1275 });
1276
1277 // Confirm that the ping and pong counts both match, and the value also
1278 // matches.
1279 pi1_event_loop->MakeWatcher(
1280 "/test", [&pi1_event_loop, &pi1_ping_count,
1281 &pi1_pong_count](const examples::Pong &pong) {
1282 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
1283 << pi1_event_loop->context().monotonic_remote_time << " -> "
1284 << pi1_event_loop->context().monotonic_event_time;
1285
1286 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
1287 ++pi1_pong_count;
1288 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
1289 });
1290 pi2_event_loop->MakeWatcher(
1291 "/test", [&pi2_event_loop, &pi2_ping_count,
1292 &pi2_pong_count](const examples::Pong &pong) {
1293 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
1294 << pi2_event_loop->context().monotonic_remote_time << " -> "
1295 << pi2_event_loop->context().monotonic_event_time;
1296
1297 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
1298 ++pi2_pong_count;
1299 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
1300 });
1301
1302 log_reader_factory.Run();
1303 EXPECT_EQ(pi1_ping_count, 2030);
1304 EXPECT_EQ(pi2_ping_count, 2030);
1305 EXPECT_EQ(pi1_pong_count, 2030);
1306 EXPECT_EQ(pi2_pong_count, 2030);
1307
1308 reader.Deregister();
1309}
1310
1311// Tests that we can read log files where the monotonic clocks drift and don't
1312// match correctly. While we are here, also test that different ending times
1313// also is readable.
1314TEST_P(MultinodeLoggerTest, MismatchedClocks) {
1315 // TODO(austin): Negate...
1316 const chrono::nanoseconds initial_pi2_offset = chrono::seconds(1000);
1317
1318 time_converter_.AddMonotonic(
1319 {BootTimestamp::epoch(), BootTimestamp::epoch() + initial_pi2_offset});
1320 // Wait for 95 ms, (~0.1 seconds - 1/2 of the ping/pong period), and set the
1321 // skew to be 200 uS/s
1322 const chrono::nanoseconds startup_sleep1 = time_converter_.AddMonotonic(
1323 {chrono::milliseconds(95),
1324 chrono::milliseconds(95) - chrono::nanoseconds(200) * 95});
1325 // Run another 200 ms to have one logger start first.
1326 const chrono::nanoseconds startup_sleep2 = time_converter_.AddMonotonic(
1327 {chrono::milliseconds(200), chrono::milliseconds(200)});
1328 // Slew one way then the other at the same 200 uS/S slew rate. Make sure we
1329 // go far enough to cause problems if this isn't accounted for.
1330 const chrono::nanoseconds logger_run1 = time_converter_.AddMonotonic(
1331 {chrono::milliseconds(20000),
1332 chrono::milliseconds(20000) - chrono::nanoseconds(200) * 20000});
1333 const chrono::nanoseconds logger_run2 = time_converter_.AddMonotonic(
1334 {chrono::milliseconds(40000),
1335 chrono::milliseconds(40000) + chrono::nanoseconds(200) * 40000});
1336 const chrono::nanoseconds logger_run3 = time_converter_.AddMonotonic(
1337 {chrono::milliseconds(400), chrono::milliseconds(400)});
1338
Austin Schuh8fb4b452023-08-04 17:02:27 -07001339 std::vector<std::string> actual_filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07001340 {
1341 LoggerState pi2_logger = MakeLogger(pi2_);
1342
1343 LOG(INFO) << "pi2 times: " << pi2_->monotonic_now() << " "
1344 << pi2_->realtime_now() << " distributed "
1345 << pi2_->ToDistributedClock(pi2_->monotonic_now());
1346
1347 LOG(INFO) << "pi2_ times: " << pi2_->monotonic_now() << " "
1348 << pi2_->realtime_now() << " distributed "
1349 << pi2_->ToDistributedClock(pi2_->monotonic_now());
1350
1351 event_loop_factory_.RunFor(startup_sleep1);
1352
1353 StartLogger(&pi2_logger);
1354
1355 event_loop_factory_.RunFor(startup_sleep2);
1356
1357 {
1358 // Run pi1's logger for only part of the time.
1359 LoggerState pi1_logger = MakeLogger(pi1_);
1360
1361 StartLogger(&pi1_logger);
1362 event_loop_factory_.RunFor(logger_run1);
1363
1364 // Make sure we slewed time far enough so that the difference is greater
1365 // than the network delay. This confirms that if we sort incorrectly, it
1366 // would show in the results.
1367 EXPECT_LT(
1368 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
1369 -event_loop_factory_.send_delay() -
1370 event_loop_factory_.network_delay());
1371
1372 event_loop_factory_.RunFor(logger_run2);
1373
1374 // And now check that we went far enough the other way to make sure we
1375 // cover both problems.
1376 EXPECT_GT(
1377 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
1378 event_loop_factory_.send_delay() +
1379 event_loop_factory_.network_delay());
Austin Schuh8fb4b452023-08-04 17:02:27 -07001380
1381 pi1_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001382 }
1383
1384 // And log a bit more on pi2.
1385 event_loop_factory_.RunFor(logger_run3);
Austin Schuh8fb4b452023-08-04 17:02:27 -07001386
1387 pi2_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001388 }
1389
Austin Schuh8fb4b452023-08-04 17:02:27 -07001390 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001391 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1392 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001393
1394 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1395 log_reader_factory.set_send_delay(chrono::microseconds(0));
1396
1397 const Node *pi1 =
1398 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1399 const Node *pi2 =
1400 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1401
1402 // This sends out the fetched messages and advances time to the start of the
1403 // log file.
1404 reader.Register(&log_reader_factory);
1405
1406 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1407 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1408 LOG(INFO) << "now pi1 "
1409 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1410 LOG(INFO) << "now pi2 "
1411 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1412
1413 LOG(INFO) << "Done registering (pi1) "
1414 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
1415 << " "
1416 << log_reader_factory.GetNodeEventLoopFactory(pi1)->realtime_now();
1417 LOG(INFO) << "Done registering (pi2) "
1418 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now()
1419 << " "
1420 << log_reader_factory.GetNodeEventLoopFactory(pi2)->realtime_now();
1421
1422 EXPECT_THAT(reader.LoggedNodes(),
1423 ::testing::ElementsAre(
1424 configuration::GetNode(reader.logged_configuration(), pi1),
1425 configuration::GetNode(reader.logged_configuration(), pi2)));
1426
1427 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1428
1429 std::unique_ptr<EventLoop> pi1_event_loop =
1430 log_reader_factory.MakeEventLoop("test", pi1);
1431 std::unique_ptr<EventLoop> pi2_event_loop =
1432 log_reader_factory.MakeEventLoop("test", pi2);
1433
1434 int pi1_ping_count = 30;
1435 int pi2_ping_count = 30;
1436 int pi1_pong_count = 30;
1437 int pi2_pong_count = 30;
1438
1439 // Confirm that the ping value matches.
1440 pi1_event_loop->MakeWatcher(
1441 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
1442 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
1443 << pi1_event_loop->context().monotonic_remote_time << " -> "
1444 << pi1_event_loop->context().monotonic_event_time;
1445 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
1446
1447 ++pi1_ping_count;
1448 });
1449 pi2_event_loop->MakeWatcher(
1450 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
1451 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
1452 << pi2_event_loop->context().monotonic_remote_time << " -> "
1453 << pi2_event_loop->context().monotonic_event_time;
1454 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
1455
1456 ++pi2_ping_count;
1457 });
1458
1459 // Confirm that the ping and pong counts both match, and the value also
1460 // matches.
1461 pi1_event_loop->MakeWatcher(
1462 "/test", [&pi1_event_loop, &pi1_ping_count,
1463 &pi1_pong_count](const examples::Pong &pong) {
1464 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
1465 << pi1_event_loop->context().monotonic_remote_time << " -> "
1466 << pi1_event_loop->context().monotonic_event_time;
1467
1468 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
1469 ++pi1_pong_count;
1470 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
1471 });
1472 pi2_event_loop->MakeWatcher(
1473 "/test", [&pi2_event_loop, &pi2_ping_count,
1474 &pi2_pong_count](const examples::Pong &pong) {
1475 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
1476 << pi2_event_loop->context().monotonic_remote_time << " -> "
1477 << pi2_event_loop->context().monotonic_event_time;
1478
1479 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
1480 ++pi2_pong_count;
1481 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
1482 });
1483
1484 log_reader_factory.Run();
1485 EXPECT_EQ(pi1_ping_count, 6030);
1486 EXPECT_EQ(pi2_ping_count, 6030);
1487 EXPECT_EQ(pi1_pong_count, 6030);
1488 EXPECT_EQ(pi2_pong_count, 6030);
1489
1490 reader.Deregister();
1491}
1492
1493// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
1494TEST_P(MultinodeLoggerTest, SortParts) {
1495 time_converter_.StartEqual();
1496 // Make a bunch of parts.
1497 {
1498 LoggerState pi1_logger = MakeLogger(pi1_);
1499 LoggerState pi2_logger = MakeLogger(pi2_);
1500
1501 event_loop_factory_.RunFor(chrono::milliseconds(95));
1502
1503 StartLogger(&pi1_logger);
1504 StartLogger(&pi2_logger);
1505
1506 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1507 }
1508
1509 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1510 VerifyParts(sorted_parts);
1511}
1512
1513// Tests that we can sort a bunch of parts with an empty part. We should ignore
1514// it and remove it from the sorted list.
1515TEST_P(MultinodeLoggerTest, SortEmptyParts) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07001516 std::vector<std::string> actual_filenames;
1517
Naman Guptaa63aa132023-03-22 20:06:34 -07001518 time_converter_.StartEqual();
1519 // Make a bunch of parts.
1520 {
1521 LoggerState pi1_logger = MakeLogger(pi1_);
1522 LoggerState pi2_logger = MakeLogger(pi2_);
1523
1524 event_loop_factory_.RunFor(chrono::milliseconds(95));
1525
1526 StartLogger(&pi1_logger);
1527 StartLogger(&pi2_logger);
1528
1529 event_loop_factory_.RunFor(chrono::milliseconds(2000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07001530 pi1_logger.AppendAllFilenames(&actual_filenames);
1531 pi2_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001532 }
1533
1534 // TODO(austin): Should we flip out if the file can't open?
1535 const std::string kEmptyFile("foobarinvalidfiledoesnotexist" + Extension());
1536
1537 aos::util::WriteStringToFileOrDie(kEmptyFile, "");
Austin Schuh8fb4b452023-08-04 17:02:27 -07001538 actual_filenames.emplace_back(kEmptyFile);
Naman Guptaa63aa132023-03-22 20:06:34 -07001539
Austin Schuh8fb4b452023-08-04 17:02:27 -07001540 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001541 VerifyParts(sorted_parts, {kEmptyFile});
1542}
1543
1544// Tests that we can sort a bunch of parts with the end missing off a
1545// file. We should use the part we can read.
1546TEST_P(MultinodeLoggerTest, SortTruncatedParts) {
Austin Schuh6ecfe902023-08-04 22:44:37 -07001547 if (file_strategy() == FileStrategy::kCombine) {
1548 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
1549 }
1550
Naman Guptaa63aa132023-03-22 20:06:34 -07001551 std::vector<std::string> actual_filenames;
1552 time_converter_.StartEqual();
1553 // Make a bunch of parts.
1554 {
1555 LoggerState pi1_logger = MakeLogger(pi1_);
1556 LoggerState pi2_logger = MakeLogger(pi2_);
1557
1558 event_loop_factory_.RunFor(chrono::milliseconds(95));
1559
1560 StartLogger(&pi1_logger);
1561 StartLogger(&pi2_logger);
1562
1563 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1564
1565 pi1_logger.AppendAllFilenames(&actual_filenames);
1566 pi2_logger.AppendAllFilenames(&actual_filenames);
1567 }
1568
1569 ASSERT_THAT(actual_filenames,
1570 ::testing::UnorderedElementsAreArray(logfiles_));
1571
1572 // Strip off the end of one of the files. Pick one with a lot of data.
1573 // For snappy, needs to have enough data to be >1 chunk of compressed data so
1574 // that we don't corrupt the entire log part.
1575 ::std::string compressed_contents =
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07001576 aos::util::ReadFileToStringOrDie(logfiles_[2]);
Naman Guptaa63aa132023-03-22 20:06:34 -07001577
1578 aos::util::WriteStringToFileOrDie(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07001579 logfiles_[2],
Naman Guptaa63aa132023-03-22 20:06:34 -07001580 compressed_contents.substr(0, compressed_contents.size() - 100));
1581
1582 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1583 VerifyParts(sorted_parts);
1584}
1585
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001586// Tests that if we remap a logged channel, it shows up correctly.
Naman Guptaa63aa132023-03-22 20:06:34 -07001587TEST_P(MultinodeLoggerTest, RemapLoggedChannel) {
1588 time_converter_.StartEqual();
Austin Schuh8fb4b452023-08-04 17:02:27 -07001589
1590 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07001591 {
1592 LoggerState pi1_logger = MakeLogger(pi1_);
1593 LoggerState pi2_logger = MakeLogger(pi2_);
1594
1595 event_loop_factory_.RunFor(chrono::milliseconds(95));
1596
1597 StartLogger(&pi1_logger);
1598 StartLogger(&pi2_logger);
1599
1600 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07001601
1602 pi1_logger.AppendAllFilenames(&filenames);
1603 pi2_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001604 }
1605
Austin Schuh8fb4b452023-08-04 17:02:27 -07001606 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001607 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1608 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001609
1610 // Remap just on pi1.
1611 reader.RemapLoggedChannel<aos::timing::Report>(
1612 "/aos", configuration::GetNode(reader.configuration(), "pi1"));
1613
1614 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1615 log_reader_factory.set_send_delay(chrono::microseconds(0));
1616
1617 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1618 // Note: An extra channel gets remapped automatically due to a timestamp
1619 // channel being LOCAL_LOGGER'd.
1620 ASSERT_EQ(remapped_channels.size(), std::get<0>(GetParam()).shared ? 1u : 2u);
1621 EXPECT_EQ(remapped_channels[0]->name()->string_view(), "/original/pi1/aos");
1622 EXPECT_EQ(remapped_channels[0]->type()->string_view(), "aos.timing.Report");
1623 if (!std::get<0>(GetParam()).shared) {
1624 EXPECT_EQ(remapped_channels[1]->name()->string_view(),
1625 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1626 "aos-message_bridge-Timestamp");
1627 EXPECT_EQ(remapped_channels[1]->type()->string_view(),
1628 "aos.message_bridge.RemoteMessage");
1629 }
1630
1631 reader.Register(&log_reader_factory);
1632
1633 const Node *pi1 =
1634 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1635 const Node *pi2 =
1636 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1637
1638 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1639 // else should have moved.
1640 std::unique_ptr<EventLoop> pi1_event_loop =
1641 log_reader_factory.MakeEventLoop("test", pi1);
1642 pi1_event_loop->SkipTimingReport();
1643 std::unique_ptr<EventLoop> full_pi1_event_loop =
1644 log_reader_factory.MakeEventLoop("test", pi1);
1645 full_pi1_event_loop->SkipTimingReport();
1646 std::unique_ptr<EventLoop> pi2_event_loop =
1647 log_reader_factory.MakeEventLoop("test", pi2);
1648 pi2_event_loop->SkipTimingReport();
1649
1650 MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
1651 "/aos");
1652 MessageCounter<aos::timing::Report> full_pi1_timing_report(
1653 full_pi1_event_loop.get(), "/pi1/aos");
1654 MessageCounter<aos::timing::Report> pi1_original_timing_report(
1655 pi1_event_loop.get(), "/original/aos");
1656 MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
1657 full_pi1_event_loop.get(), "/original/pi1/aos");
1658 MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
1659 "/aos");
1660
1661 log_reader_factory.Run();
1662
1663 EXPECT_EQ(pi1_timing_report.count(), 0u);
1664 EXPECT_EQ(full_pi1_timing_report.count(), 0u);
1665 EXPECT_NE(pi1_original_timing_report.count(), 0u);
1666 EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
1667 EXPECT_NE(pi2_timing_report.count(), 0u);
1668
1669 reader.Deregister();
1670}
1671
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001672// Tests that if we rename a logged channel, it shows up correctly.
1673TEST_P(MultinodeLoggerTest, RenameLoggedChannel) {
1674 std::vector<std::string> actual_filenames;
1675 time_converter_.StartEqual();
1676 {
1677 LoggerState pi1_logger = MakeLogger(pi1_);
1678 LoggerState pi2_logger = MakeLogger(pi2_);
1679
1680 event_loop_factory_.RunFor(chrono::milliseconds(95));
1681
1682 StartLogger(&pi1_logger);
1683 StartLogger(&pi2_logger);
1684
1685 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1686
1687 pi1_logger.AppendAllFilenames(&actual_filenames);
1688 pi2_logger.AppendAllFilenames(&actual_filenames);
1689 }
1690
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001691 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1692 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1693 LogReader reader(sorted_parts);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001694
1695 // Rename just on pi2. Add some global maps just to verify they get added in
1696 // the config and used correctly.
1697 std::vector<MapT> maps;
1698 {
1699 MapT map;
1700 map.match = std::make_unique<ChannelT>();
1701 map.match->name = "/foo*";
1702 map.match->source_node = "pi1";
1703 map.rename = std::make_unique<ChannelT>();
1704 map.rename->name = "/pi1/foo";
1705 maps.emplace_back(std::move(map));
1706 }
1707 {
1708 MapT map;
1709 map.match = std::make_unique<ChannelT>();
1710 map.match->name = "/foo*";
1711 map.match->source_node = "pi2";
1712 map.rename = std::make_unique<ChannelT>();
1713 map.rename->name = "/pi2/foo";
1714 maps.emplace_back(std::move(map));
1715 }
1716 {
1717 MapT map;
1718 map.match = std::make_unique<ChannelT>();
1719 map.match->name = "/foo";
1720 map.match->type = "aos.examples.Ping";
1721 map.rename = std::make_unique<ChannelT>();
1722 map.rename->name = "/foo/renamed";
1723 maps.emplace_back(std::move(map));
1724 }
1725 reader.RenameLoggedChannel<aos::examples::Ping>(
1726 "/aos", configuration::GetNode(reader.configuration(), "pi2"),
1727 "/pi2/foo/renamed", maps);
1728
1729 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1730 log_reader_factory.set_send_delay(chrono::microseconds(0));
1731
1732 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1733 // Note: An extra channel gets remapped automatically due to a timestamp
1734 // channel being LOCAL_LOGGER'd.
1735 const bool shared = std::get<0>(GetParam()).shared;
1736 ASSERT_EQ(remapped_channels.size(), shared ? 1u : 2u);
1737 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->name()->string_view(),
1738 "/pi2/foo/renamed");
1739 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->type()->string_view(),
1740 "aos.examples.Ping");
1741 if (!shared) {
1742 EXPECT_EQ(remapped_channels[0]->name()->string_view(),
1743 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1744 "aos-message_bridge-Timestamp");
1745 EXPECT_EQ(remapped_channels[0]->type()->string_view(),
1746 "aos.message_bridge.RemoteMessage");
1747 }
1748
1749 reader.Register(&log_reader_factory);
1750
1751 const Node *pi1 =
1752 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1753 const Node *pi2 =
1754 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1755
1756 // Confirm we can read the data on the renamed channel, just for pi2. Nothing
1757 // else should have moved.
1758 std::unique_ptr<EventLoop> pi2_event_loop =
1759 log_reader_factory.MakeEventLoop("test", pi2);
1760 pi2_event_loop->SkipTimingReport();
1761 std::unique_ptr<EventLoop> full_pi2_event_loop =
1762 log_reader_factory.MakeEventLoop("test", pi2);
1763 full_pi2_event_loop->SkipTimingReport();
1764 std::unique_ptr<EventLoop> pi1_event_loop =
1765 log_reader_factory.MakeEventLoop("test", pi1);
1766 pi1_event_loop->SkipTimingReport();
1767
1768 MessageCounter<aos::examples::Ping> pi2_ping(pi2_event_loop.get(), "/aos");
1769 MessageCounter<aos::examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1770 "/foo");
1771 MessageCounter<aos::examples::Ping> full_pi2_renamed_ping(
1772 full_pi2_event_loop.get(), "/pi2/foo/renamed");
1773 MessageCounter<aos::examples::Ping> pi1_ping(pi1_event_loop.get(), "/aos");
1774
1775 log_reader_factory.Run();
1776
1777 EXPECT_EQ(pi2_ping.count(), 0u);
1778 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1779 EXPECT_NE(full_pi2_renamed_ping.count(), 0u);
1780 EXPECT_NE(pi1_ping.count(), 0u);
1781
1782 reader.Deregister();
1783}
1784
Naman Guptaa63aa132023-03-22 20:06:34 -07001785// Tests that we can remap a forwarded channel as well.
1786TEST_P(MultinodeLoggerTest, RemapForwardedLoggedChannel) {
1787 time_converter_.StartEqual();
1788 {
1789 LoggerState pi1_logger = MakeLogger(pi1_);
1790 LoggerState pi2_logger = MakeLogger(pi2_);
1791
1792 event_loop_factory_.RunFor(chrono::milliseconds(95));
1793
1794 StartLogger(&pi1_logger);
1795 StartLogger(&pi2_logger);
1796
1797 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1798 }
1799
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001800 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1801 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1802 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001803
1804 reader.RemapLoggedChannel<examples::Ping>("/test");
1805
1806 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1807 log_reader_factory.set_send_delay(chrono::microseconds(0));
1808
1809 reader.Register(&log_reader_factory);
1810
1811 const Node *pi1 =
1812 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1813 const Node *pi2 =
1814 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1815
1816 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1817 // else should have moved.
1818 std::unique_ptr<EventLoop> pi1_event_loop =
1819 log_reader_factory.MakeEventLoop("test", pi1);
1820 pi1_event_loop->SkipTimingReport();
1821 std::unique_ptr<EventLoop> full_pi1_event_loop =
1822 log_reader_factory.MakeEventLoop("test", pi1);
1823 full_pi1_event_loop->SkipTimingReport();
1824 std::unique_ptr<EventLoop> pi2_event_loop =
1825 log_reader_factory.MakeEventLoop("test", pi2);
1826 pi2_event_loop->SkipTimingReport();
1827
1828 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1829 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1830 MessageCounter<examples::Ping> pi1_original_ping(pi1_event_loop.get(),
1831 "/original/test");
1832 MessageCounter<examples::Ping> pi2_original_ping(pi2_event_loop.get(),
1833 "/original/test");
1834
1835 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1836 pi1_original_ping_timestamp;
1837 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1838 pi1_ping_timestamp;
1839 if (!shared()) {
1840 pi1_original_ping_timestamp =
1841 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1842 pi1_event_loop.get(),
1843 "/pi1/aos/remote_timestamps/pi2/original/test/aos-examples-Ping");
1844 pi1_ping_timestamp =
1845 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1846 pi1_event_loop.get(),
1847 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1848 }
1849
1850 log_reader_factory.Run();
1851
1852 EXPECT_EQ(pi1_ping.count(), 0u);
1853 EXPECT_EQ(pi2_ping.count(), 0u);
1854 EXPECT_NE(pi1_original_ping.count(), 0u);
1855 EXPECT_NE(pi2_original_ping.count(), 0u);
1856 if (!shared()) {
1857 EXPECT_NE(pi1_original_ping_timestamp->count(), 0u);
1858 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1859 }
1860
1861 reader.Deregister();
1862}
1863
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001864// Tests that we can rename a forwarded channel as well.
1865TEST_P(MultinodeLoggerTest, RenameForwardedLoggedChannel) {
1866 std::vector<std::string> actual_filenames;
1867 time_converter_.StartEqual();
1868 {
1869 LoggerState pi1_logger = MakeLogger(pi1_);
1870 LoggerState pi2_logger = MakeLogger(pi2_);
1871
1872 event_loop_factory_.RunFor(chrono::milliseconds(95));
1873
1874 StartLogger(&pi1_logger);
1875 StartLogger(&pi2_logger);
1876
1877 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1878
1879 pi1_logger.AppendAllFilenames(&actual_filenames);
1880 pi2_logger.AppendAllFilenames(&actual_filenames);
1881 }
1882
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001883 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1884 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1885 LogReader reader(sorted_parts);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001886
1887 std::vector<MapT> maps;
1888 {
1889 MapT map;
1890 map.match = std::make_unique<ChannelT>();
1891 map.match->name = "/production*";
1892 map.match->source_node = "pi1";
1893 map.rename = std::make_unique<ChannelT>();
1894 map.rename->name = "/pi1/production";
1895 maps.emplace_back(std::move(map));
1896 }
1897 {
1898 MapT map;
1899 map.match = std::make_unique<ChannelT>();
1900 map.match->name = "/production*";
1901 map.match->source_node = "pi2";
1902 map.rename = std::make_unique<ChannelT>();
1903 map.rename->name = "/pi2/production";
1904 maps.emplace_back(std::move(map));
1905 }
1906 reader.RenameLoggedChannel<aos::examples::Ping>(
1907 "/test", configuration::GetNode(reader.configuration(), "pi1"),
1908 "/pi1/production", maps);
1909
1910 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1911 log_reader_factory.set_send_delay(chrono::microseconds(0));
1912
1913 reader.Register(&log_reader_factory);
1914
1915 const Node *pi1 =
1916 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1917 const Node *pi2 =
1918 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1919
1920 // Confirm we can read the data on the renamed channel, on both the source
1921 // node and the remote node. In case of split timestamp channels, confirm that
1922 // we receive the timestamp messages on the renamed channel as well.
1923 std::unique_ptr<EventLoop> pi1_event_loop =
1924 log_reader_factory.MakeEventLoop("test", pi1);
1925 pi1_event_loop->SkipTimingReport();
1926 std::unique_ptr<EventLoop> full_pi1_event_loop =
1927 log_reader_factory.MakeEventLoop("test", pi1);
1928 full_pi1_event_loop->SkipTimingReport();
1929 std::unique_ptr<EventLoop> pi2_event_loop =
1930 log_reader_factory.MakeEventLoop("test", pi2);
1931 pi2_event_loop->SkipTimingReport();
1932
1933 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1934 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1935 MessageCounter<examples::Ping> pi1_renamed_ping(pi1_event_loop.get(),
1936 "/pi1/production");
1937 MessageCounter<examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1938 "/pi1/production");
1939
1940 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1941 pi1_renamed_ping_timestamp;
1942 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1943 pi1_ping_timestamp;
1944 if (!shared()) {
1945 pi1_renamed_ping_timestamp =
1946 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1947 pi1_event_loop.get(),
1948 "/pi1/aos/remote_timestamps/pi2/pi1/production/aos-examples-Ping");
1949 pi1_ping_timestamp =
1950 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1951 pi1_event_loop.get(),
1952 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1953 }
1954
1955 log_reader_factory.Run();
1956
1957 EXPECT_EQ(pi1_ping.count(), 0u);
1958 EXPECT_EQ(pi2_ping.count(), 0u);
1959 EXPECT_NE(pi1_renamed_ping.count(), 0u);
1960 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1961 if (!shared()) {
1962 EXPECT_NE(pi1_renamed_ping_timestamp->count(), 0u);
1963 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1964 }
1965
1966 reader.Deregister();
1967}
1968
Naman Guptaa63aa132023-03-22 20:06:34 -07001969// Tests that we observe all the same events in log replay (for a given node)
1970// whether we just register an event loop for that node or if we register a full
1971// event loop factory.
1972TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
1973 time_converter_.StartEqual();
1974 constexpr chrono::milliseconds kStartupDelay(95);
Austin Schuh8fb4b452023-08-04 17:02:27 -07001975 std::vector<std::string> filenames;
1976
Naman Guptaa63aa132023-03-22 20:06:34 -07001977 {
1978 LoggerState pi1_logger = MakeLogger(pi1_);
1979 LoggerState pi2_logger = MakeLogger(pi2_);
1980
1981 event_loop_factory_.RunFor(kStartupDelay);
1982
1983 StartLogger(&pi1_logger);
1984 StartLogger(&pi2_logger);
1985
1986 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07001987
1988 pi1_logger.AppendAllFilenames(&filenames);
1989 pi2_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001990 }
1991
Austin Schuh8fb4b452023-08-04 17:02:27 -07001992 LogReader full_reader(SortParts(filenames));
1993 LogReader single_node_reader(SortParts(filenames));
Naman Guptaa63aa132023-03-22 20:06:34 -07001994
1995 SimulatedEventLoopFactory full_factory(full_reader.configuration());
1996 SimulatedEventLoopFactory single_node_factory(
1997 single_node_reader.configuration());
1998 single_node_factory.SkipTimingReport();
1999 single_node_factory.DisableStatistics();
2000 std::unique_ptr<EventLoop> replay_event_loop =
2001 single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
2002 "log_reader");
2003
2004 full_reader.Register(&full_factory);
2005 single_node_reader.Register(replay_event_loop.get());
2006
2007 const Node *full_pi1 =
2008 configuration::GetNode(full_factory.configuration(), "pi1");
2009
2010 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
2011 // else should have moved.
2012 std::unique_ptr<EventLoop> full_event_loop =
2013 full_factory.MakeEventLoop("test", full_pi1);
2014 full_event_loop->SkipTimingReport();
2015 full_event_loop->SkipAosLog();
2016 // maps are indexed on channel index.
2017 // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
2018 std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
2019 observed_messages;
2020 std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
2021 for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
2022 ++ii) {
2023 const Channel *channel =
2024 full_event_loop->configuration()->channels()->Get(ii);
2025 // We currently don't support replaying remote timestamp channels in
2026 // realtime replay (unless the remote timestamp channel was not NOT_LOGGED,
2027 // in which case it gets auto-remapped and replayed on a /original channel).
2028 if (channel->name()->string_view().find("remote_timestamp") !=
2029 std::string_view::npos &&
2030 channel->name()->string_view().find("/original") ==
2031 std::string_view::npos) {
2032 continue;
2033 }
2034 if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
2035 observed_messages[ii] = {};
2036 fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
2037 full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
2038 if (fetchers[ii]->Fetch()) {
2039 observed_messages[ii].push_back(std::make_pair(
2040 fetchers[ii]->context().monotonic_event_time, true));
2041 }
2042 });
2043 full_event_loop->MakeRawNoArgWatcher(
2044 channel, [ii, &observed_messages](const Context &context) {
2045 observed_messages[ii].push_back(
2046 std::make_pair(context.monotonic_event_time, false));
2047 });
2048 }
2049 }
2050
2051 full_factory.Run();
2052 fetchers.clear();
2053 full_reader.Deregister();
2054
2055 const Node *single_node_pi1 =
2056 configuration::GetNode(single_node_factory.configuration(), "pi1");
2057 std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
2058
2059 std::unique_ptr<EventLoop> single_node_event_loop =
2060 single_node_factory.MakeEventLoop("test", single_node_pi1);
2061 single_node_event_loop->SkipTimingReport();
2062 single_node_event_loop->SkipAosLog();
2063 for (size_t ii = 0;
2064 ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
2065 const Channel *channel =
2066 single_node_event_loop->configuration()->channels()->Get(ii);
2067 single_node_factory.DisableForwarding(channel);
2068 if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
2069 single_node_fetchers[ii] =
2070 single_node_event_loop->MakeRawFetcher(channel);
2071 single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
2072 EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
2073 << "Single EventLoop replay doesn't support pre-loading fetchers. "
2074 << configuration::StrippedChannelToString(channel);
2075 });
2076 single_node_event_loop->MakeRawNoArgWatcher(
2077 channel, [ii, &observed_messages, channel,
2078 kStartupDelay](const Context &context) {
2079 if (observed_messages[ii].empty()) {
2080 FAIL() << "Observed extra message at "
2081 << context.monotonic_event_time << " on "
2082 << configuration::StrippedChannelToString(channel);
2083 return;
2084 }
2085 const std::pair<monotonic_clock::time_point, bool> &message =
2086 observed_messages[ii].front();
2087 if (message.second) {
2088 EXPECT_LE(message.first,
2089 context.monotonic_event_time + kStartupDelay)
2090 << "Mismatched message times " << context.monotonic_event_time
2091 << " and " << message.first << " on "
2092 << configuration::StrippedChannelToString(channel);
2093 } else {
2094 EXPECT_EQ(message.first,
2095 context.monotonic_event_time + kStartupDelay)
2096 << "Mismatched message times " << context.monotonic_event_time
2097 << " and " << message.first << " on "
2098 << configuration::StrippedChannelToString(channel);
2099 }
2100 observed_messages[ii].erase(observed_messages[ii].begin());
2101 });
2102 }
2103 }
2104
2105 single_node_factory.Run();
2106
2107 single_node_fetchers.clear();
2108
2109 single_node_reader.Deregister();
2110
2111 for (const auto &pair : observed_messages) {
2112 EXPECT_TRUE(pair.second.empty())
2113 << "Missed " << pair.second.size() << " messages on "
2114 << configuration::StrippedChannelToString(
2115 single_node_event_loop->configuration()->channels()->Get(
2116 pair.first));
2117 }
2118}
2119
2120// Tests that we properly recreate forwarded timestamps when replaying a log.
2121// This should be enough that we can then re-run the logger and get a valid log
2122// back.
2123TEST_P(MultinodeLoggerTest, MessageHeader) {
2124 time_converter_.StartEqual();
2125 {
2126 LoggerState pi1_logger = MakeLogger(pi1_);
2127 LoggerState pi2_logger = MakeLogger(pi2_);
2128
2129 event_loop_factory_.RunFor(chrono::milliseconds(95));
2130
2131 StartLogger(&pi1_logger);
2132 StartLogger(&pi2_logger);
2133
2134 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2135 }
2136
2137 LogReader reader(SortParts(logfiles_));
2138
2139 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2140 log_reader_factory.set_send_delay(chrono::microseconds(0));
2141
2142 // This sends out the fetched messages and advances time to the start of the
2143 // log file.
2144 reader.Register(&log_reader_factory);
2145
2146 const Node *pi1 =
2147 configuration::GetNode(log_reader_factory.configuration(), "pi1");
2148 const Node *pi2 =
2149 configuration::GetNode(log_reader_factory.configuration(), "pi2");
2150
2151 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
2152 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
2153 LOG(INFO) << "now pi1 "
2154 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
2155 LOG(INFO) << "now pi2 "
2156 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
2157
2158 EXPECT_THAT(reader.LoggedNodes(),
2159 ::testing::ElementsAre(
2160 configuration::GetNode(reader.logged_configuration(), pi1),
2161 configuration::GetNode(reader.logged_configuration(), pi2)));
2162
2163 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
2164
2165 std::unique_ptr<EventLoop> pi1_event_loop =
2166 log_reader_factory.MakeEventLoop("test", pi1);
2167 std::unique_ptr<EventLoop> pi2_event_loop =
2168 log_reader_factory.MakeEventLoop("test", pi2);
2169
2170 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
2171 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
2172 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
2173 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
2174
2175 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
2176 pi1_event_loop->MakeFetcher<examples::Ping>("/test");
2177 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
2178 pi2_event_loop->MakeFetcher<examples::Ping>("/test");
2179
2180 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
2181 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
2182 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
2183 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
2184
2185 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
2186 pi2_event_loop->MakeFetcher<examples::Pong>("/test");
2187 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
2188 pi1_event_loop->MakeFetcher<examples::Pong>("/test");
2189
2190 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
2191 pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
2192 const size_t ping_timestamp_channel = configuration::ChannelIndex(
2193 pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
2194
2195 const size_t pi2_timestamp_channel = configuration::ChannelIndex(
2196 pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
2197 const size_t pong_timestamp_channel = configuration::ChannelIndex(
2198 pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
2199
2200 const chrono::nanoseconds network_delay = event_loop_factory_.network_delay();
2201 const chrono::nanoseconds send_delay = event_loop_factory_.send_delay();
2202
2203 for (std::pair<int, std::string> channel :
2204 shared()
2205 ? std::vector<
2206 std::pair<int, std::string>>{{-1,
2207 "/aos/remote_timestamps/pi2"}}
2208 : std::vector<std::pair<int, std::string>>{
2209 {pi1_timestamp_channel,
2210 "/aos/remote_timestamps/pi2/pi1/aos/"
2211 "aos-message_bridge-Timestamp"},
2212 {ping_timestamp_channel,
2213 "/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
2214 pi1_event_loop->MakeWatcher(
2215 channel.second,
2216 [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
2217 ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
2218 &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
2219 &ping_on_pi2_fetcher, network_delay, send_delay,
2220 channel_index = channel.first](const RemoteMessage &header) {
2221 const aos::monotonic_clock::time_point header_monotonic_sent_time(
2222 chrono::nanoseconds(header.monotonic_sent_time()));
2223 const aos::realtime_clock::time_point header_realtime_sent_time(
2224 chrono::nanoseconds(header.realtime_sent_time()));
2225 const aos::monotonic_clock::time_point header_monotonic_remote_time(
2226 chrono::nanoseconds(header.monotonic_remote_time()));
Austin Schuhb5224ec2024-03-27 15:20:09 -07002227 const aos::monotonic_clock::time_point
2228 header_monotonic_remote_transmit_time(
2229 chrono::nanoseconds(header.monotonic_remote_transmit_time()));
Naman Guptaa63aa132023-03-22 20:06:34 -07002230 const aos::realtime_clock::time_point header_realtime_remote_time(
2231 chrono::nanoseconds(header.realtime_remote_time()));
2232
2233 if (channel_index != -1) {
2234 ASSERT_EQ(channel_index, header.channel_index());
2235 }
2236
2237 const Context *pi1_context = nullptr;
2238 const Context *pi2_context = nullptr;
2239
2240 if (header.channel_index() == pi1_timestamp_channel) {
2241 ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
2242 ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
2243 pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
2244 pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002245 // Timestamps don't have wakeup delay, so they show back up after 2
2246 // times the network delay on the source node. Confirm that matches
2247 // when we are reading the log.
2248 EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
2249 pi1_context->monotonic_event_time + 2 * network_delay);
Naman Guptaa63aa132023-03-22 20:06:34 -07002250 } else if (header.channel_index() == ping_timestamp_channel) {
2251 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
2252 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
2253 pi1_context = &ping_on_pi1_fetcher.context();
2254 pi2_context = &ping_on_pi2_fetcher.context();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002255 // Ping messages get picked up faster at the start of each message
2256 // when timers wake up. Verify all that behavior matches exactly as
2257 // expected when reading the log.
2258 EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
2259 pi1_context->monotonic_event_time + 2 * network_delay +
2260 ((pi1_event_loop->context().monotonic_event_time -
2261 2 * network_delay)
2262 .time_since_epoch() %
2263 chrono::nanoseconds(1000000000) ==
2264 chrono::nanoseconds(0)
2265 ? chrono::nanoseconds(0)
2266 : send_delay));
Naman Guptaa63aa132023-03-22 20:06:34 -07002267 } else {
2268 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
2269 << configuration::CleanedChannelToString(
2270 pi1_event_loop->configuration()->channels()->Get(
2271 header.channel_index()));
2272 }
2273
2274 ASSERT_TRUE(header.has_boot_uuid());
2275 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
2276 pi2_event_loop->boot_uuid());
2277
2278 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
2279 EXPECT_EQ(pi2_context->remote_queue_index,
2280 header.remote_queue_index());
2281 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
2282
2283 EXPECT_EQ(pi2_context->monotonic_event_time,
2284 header_monotonic_sent_time);
2285 EXPECT_EQ(pi2_context->realtime_event_time,
2286 header_realtime_sent_time);
2287 EXPECT_EQ(pi2_context->realtime_remote_time,
2288 header_realtime_remote_time);
2289 EXPECT_EQ(pi2_context->monotonic_remote_time,
2290 header_monotonic_remote_time);
Austin Schuhb5224ec2024-03-27 15:20:09 -07002291 EXPECT_EQ(pi2_context->monotonic_remote_transmit_time,
2292 header_monotonic_remote_transmit_time);
Naman Guptaa63aa132023-03-22 20:06:34 -07002293
2294 EXPECT_EQ(pi1_context->realtime_event_time,
2295 header_realtime_remote_time);
2296 EXPECT_EQ(pi1_context->monotonic_event_time,
2297 header_monotonic_remote_time);
Naman Guptaa63aa132023-03-22 20:06:34 -07002298 });
2299 }
2300 for (std::pair<int, std::string> channel :
2301 shared()
2302 ? std::vector<
2303 std::pair<int, std::string>>{{-1,
2304 "/aos/remote_timestamps/pi1"}}
2305 : std::vector<std::pair<int, std::string>>{
2306 {pi2_timestamp_channel,
2307 "/aos/remote_timestamps/pi1/pi2/aos/"
2308 "aos-message_bridge-Timestamp"}}) {
2309 pi2_event_loop->MakeWatcher(
2310 channel.second,
2311 [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
2312 pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
2313 &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
2314 &pong_on_pi1_fetcher, network_delay, send_delay,
2315 channel_index = channel.first](const RemoteMessage &header) {
2316 const aos::monotonic_clock::time_point header_monotonic_sent_time(
2317 chrono::nanoseconds(header.monotonic_sent_time()));
2318 const aos::realtime_clock::time_point header_realtime_sent_time(
2319 chrono::nanoseconds(header.realtime_sent_time()));
2320 const aos::monotonic_clock::time_point header_monotonic_remote_time(
2321 chrono::nanoseconds(header.monotonic_remote_time()));
Austin Schuhb5224ec2024-03-27 15:20:09 -07002322 const aos::monotonic_clock::time_point
2323 header_monotonic_remote_transmit_time(
2324 chrono::nanoseconds(header.monotonic_remote_transmit_time()));
Naman Guptaa63aa132023-03-22 20:06:34 -07002325 const aos::realtime_clock::time_point header_realtime_remote_time(
2326 chrono::nanoseconds(header.realtime_remote_time()));
2327
2328 if (channel_index != -1) {
2329 ASSERT_EQ(channel_index, header.channel_index());
2330 }
2331
2332 const Context *pi2_context = nullptr;
2333 const Context *pi1_context = nullptr;
2334
2335 if (header.channel_index() == pi2_timestamp_channel) {
2336 ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
2337 ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
2338 pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
2339 pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002340 // Again, timestamps don't have wakeup delay, so they show back up
2341 // after 2 times the network delay on the source node.
2342 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
2343 pi2_context->monotonic_event_time + 2 * network_delay);
Naman Guptaa63aa132023-03-22 20:06:34 -07002344 } else if (header.channel_index() == pong_timestamp_channel) {
2345 ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
2346 ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
2347 pi2_context = &pong_on_pi2_fetcher.context();
2348 pi1_context = &pong_on_pi1_fetcher.context();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002349 // And Pong messages come back repeatably since they aren't at the
2350 // start of a second.
2351 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
2352 pi2_context->monotonic_event_time + 2 * network_delay +
2353 send_delay);
Naman Guptaa63aa132023-03-22 20:06:34 -07002354 } else {
2355 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
2356 << configuration::CleanedChannelToString(
2357 pi2_event_loop->configuration()->channels()->Get(
2358 header.channel_index()));
2359 }
2360
2361 ASSERT_TRUE(header.has_boot_uuid());
2362 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
2363 pi1_event_loop->boot_uuid());
2364
2365 EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
2366 EXPECT_EQ(pi1_context->remote_queue_index,
2367 header.remote_queue_index());
2368 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
2369
2370 EXPECT_EQ(pi1_context->monotonic_event_time,
2371 header_monotonic_sent_time);
2372 EXPECT_EQ(pi1_context->realtime_event_time,
2373 header_realtime_sent_time);
2374 EXPECT_EQ(pi1_context->realtime_remote_time,
2375 header_realtime_remote_time);
2376 EXPECT_EQ(pi1_context->monotonic_remote_time,
2377 header_monotonic_remote_time);
Austin Schuhb5224ec2024-03-27 15:20:09 -07002378 EXPECT_EQ(pi1_context->monotonic_remote_transmit_time,
2379 header_monotonic_remote_transmit_time);
Naman Guptaa63aa132023-03-22 20:06:34 -07002380
2381 EXPECT_EQ(pi2_context->realtime_event_time,
2382 header_realtime_remote_time);
2383 EXPECT_EQ(pi2_context->monotonic_event_time,
2384 header_monotonic_remote_time);
Naman Guptaa63aa132023-03-22 20:06:34 -07002385 });
2386 }
2387
2388 // And confirm we can re-create a log again, while checking the contents.
2389 {
2390 LoggerState pi1_logger = MakeLogger(
2391 log_reader_factory.GetNodeEventLoopFactory("pi1"), &log_reader_factory);
2392 LoggerState pi2_logger = MakeLogger(
2393 log_reader_factory.GetNodeEventLoopFactory("pi2"), &log_reader_factory);
2394
Austin Schuh8fb4b452023-08-04 17:02:27 -07002395 StartLogger(&pi1_logger, tmp_dir_ + "/logs/relogged1");
2396 StartLogger(&pi2_logger, tmp_dir_ + "/logs/relogged2");
Naman Guptaa63aa132023-03-22 20:06:34 -07002397
2398 log_reader_factory.Run();
2399 }
2400
2401 reader.Deregister();
2402
2403 // And verify that we can run the LogReader over the relogged files without
2404 // hitting any fatal errors.
2405 {
Austin Schuh8fb4b452023-08-04 17:02:27 -07002406 LogReader relogged_reader(SortParts(
2407 MakeLogFiles(tmp_dir_ + "/logs/relogged1", tmp_dir_ + "/logs/relogged2",
2408 1, 1, 2, 2, true)));
Naman Guptaa63aa132023-03-22 20:06:34 -07002409 relogged_reader.Register();
2410
2411 relogged_reader.event_loop_factory()->Run();
2412 }
2413 // And confirm that we can read the logged file using the reader's
2414 // configuration.
2415 {
2416 LogReader relogged_reader(
Austin Schuh8fb4b452023-08-04 17:02:27 -07002417 SortParts(MakeLogFiles(tmp_dir_ + "/logs/relogged1",
2418 tmp_dir_ + "/logs/relogged2", 1, 1, 2, 2, true)),
Naman Guptaa63aa132023-03-22 20:06:34 -07002419 reader.configuration());
2420 relogged_reader.Register();
2421
2422 relogged_reader.event_loop_factory()->Run();
2423 }
2424}
2425
2426// Tests that we properly populate and extract the logger_start time by setting
2427// up a clock difference between 2 nodes and looking at the resulting parts.
2428TEST_P(MultinodeLoggerTest, LoggerStartTime) {
2429 std::vector<std::string> actual_filenames;
2430 time_converter_.AddMonotonic(
2431 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2432 {
2433 LoggerState pi1_logger = MakeLogger(pi1_);
2434 LoggerState pi2_logger = MakeLogger(pi2_);
2435
2436 StartLogger(&pi1_logger);
2437 StartLogger(&pi2_logger);
2438
2439 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2440
2441 pi1_logger.AppendAllFilenames(&actual_filenames);
2442 pi2_logger.AppendAllFilenames(&actual_filenames);
2443 }
2444
2445 ASSERT_THAT(actual_filenames,
2446 ::testing::UnorderedElementsAreArray(logfiles_));
2447
Austin Schuh8fb4b452023-08-04 17:02:27 -07002448 for (const LogFile &log_file : SortParts(actual_filenames)) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002449 for (const LogParts &log_part : log_file.parts) {
2450 if (log_part.node == log_file.logger_node) {
2451 EXPECT_EQ(log_part.logger_monotonic_start_time,
2452 aos::monotonic_clock::min_time);
2453 EXPECT_EQ(log_part.logger_realtime_start_time,
2454 aos::realtime_clock::min_time);
2455 } else {
2456 const chrono::seconds offset = log_file.logger_node == "pi1"
2457 ? -chrono::seconds(1000)
2458 : chrono::seconds(1000);
2459 EXPECT_EQ(log_part.logger_monotonic_start_time,
2460 log_part.monotonic_start_time + offset);
2461 EXPECT_EQ(log_part.logger_realtime_start_time,
2462 log_file.realtime_start_time +
2463 (log_part.logger_monotonic_start_time -
2464 log_file.monotonic_start_time));
2465 }
2466 }
2467 }
2468}
2469
2470// Test that renaming the base, renames the folder.
2471TEST_P(MultinodeLoggerTest, LoggerRenameFolder) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002472 time_converter_.AddMonotonic(
2473 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07002474 logfile_base1_ = tmp_dir_ + "/logs/renamefolder/multi_logfile1";
2475 logfile_base2_ = tmp_dir_ + "/logs/renamefolder/multi_logfile2";
2476
Naman Guptaa63aa132023-03-22 20:06:34 -07002477 LoggerState pi1_logger = MakeLogger(pi1_);
2478 LoggerState pi2_logger = MakeLogger(pi2_);
2479
2480 StartLogger(&pi1_logger);
2481 StartLogger(&pi2_logger);
2482
2483 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002484 logfile_base1_ = tmp_dir_ + "/logs/new-good/multi_logfile1";
2485 logfile_base2_ = tmp_dir_ + "/logs/new-good/multi_logfile2";
Naman Guptaa63aa132023-03-22 20:06:34 -07002486 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
Alexei Strotscaf17d32023-04-03 22:31:11 -07002487
2488 // Sequence of set_base_name and Rotate simulates rename operation. Since
2489 // rename is not supported by all namers, RenameLogBase moved from logger to
2490 // the higher level abstraction, yet log_namers support rename, and it is
2491 // legal to test it here.
2492 pi1_logger.log_namer->set_base_name(logfile_base1_);
2493 pi1_logger.logger->Rotate();
2494 pi2_logger.log_namer->set_base_name(logfile_base2_);
2495 pi2_logger.logger->Rotate();
2496
Naman Guptaa63aa132023-03-22 20:06:34 -07002497 for (auto &file : logfiles_) {
2498 struct stat s;
2499 EXPECT_EQ(0, stat(file.c_str(), &s));
2500 }
2501}
2502
2503// Test that renaming the file base dies.
2504TEST_P(MultinodeLoggerDeathTest, LoggerRenameFile) {
2505 time_converter_.AddMonotonic(
2506 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07002507 logfile_base1_ = tmp_dir_ + "/logs/renamefile/multi_logfile1";
2508 logfile_base2_ = tmp_dir_ + "/logs/renamefile/multi_logfile2";
2509
Naman Guptaa63aa132023-03-22 20:06:34 -07002510 LoggerState pi1_logger = MakeLogger(pi1_);
2511 StartLogger(&pi1_logger);
2512 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002513 logfile_base1_ = tmp_dir_ + "/logs/new-renamefile/new_multi_logfile1";
Alexei Strotscaf17d32023-04-03 22:31:11 -07002514 EXPECT_DEATH({ pi1_logger.log_namer->set_base_name(logfile_base1_); },
Naman Guptaa63aa132023-03-22 20:06:34 -07002515 "Rename of file base from");
2516}
2517
2518// TODO(austin): We can write a test which recreates a logfile and confirms that
2519// we get it back. That is the ultimate test.
2520
2521// Tests that we properly recreate forwarded timestamps when replaying a log.
2522// This should be enough that we can then re-run the logger and get a valid log
2523// back.
2524TEST_P(MultinodeLoggerTest, RemoteReboot) {
Austin Schuh6ecfe902023-08-04 22:44:37 -07002525 if (file_strategy() == FileStrategy::kCombine) {
2526 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
2527 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002528 std::vector<std::string> actual_filenames;
2529
2530 const UUID pi1_boot0 = UUID::Random();
2531 const UUID pi2_boot0 = UUID::Random();
2532 const UUID pi2_boot1 = UUID::Random();
2533 {
2534 CHECK_EQ(pi1_index_, 0u);
2535 CHECK_EQ(pi2_index_, 1u);
2536
2537 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2538 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2539 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2540
2541 time_converter_.AddNextTimestamp(
2542 distributed_clock::epoch(),
2543 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2544 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2545 time_converter_.AddNextTimestamp(
2546 distributed_clock::epoch() + reboot_time,
2547 {BootTimestamp::epoch() + reboot_time,
2548 BootTimestamp{
2549 .boot = 1,
2550 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2551 }
2552
2553 {
2554 LoggerState pi1_logger = MakeLogger(pi1_);
2555
2556 event_loop_factory_.RunFor(chrono::milliseconds(95));
2557 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2558 pi1_boot0);
2559 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2560 pi2_boot0);
2561
2562 StartLogger(&pi1_logger);
2563
2564 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2565
2566 VLOG(1) << "Reboot now!";
2567
2568 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2569 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2570 pi1_boot0);
2571 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2572 pi2_boot1);
2573
2574 pi1_logger.AppendAllFilenames(&actual_filenames);
2575 }
2576
2577 std::sort(actual_filenames.begin(), actual_filenames.end());
2578 std::sort(pi1_reboot_logfiles_.begin(), pi1_reboot_logfiles_.end());
2579 ASSERT_THAT(actual_filenames,
2580 ::testing::UnorderedElementsAreArray(pi1_reboot_logfiles_));
2581
2582 // Confirm that our new oldest timestamps properly update as we reboot and
2583 // rotate.
2584 for (const std::string &file : pi1_reboot_logfiles_) {
2585 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2586 ReadHeader(file);
2587 CHECK(log_header);
2588 if (log_header->message().has_configuration()) {
2589 continue;
2590 }
2591
2592 const monotonic_clock::time_point monotonic_start_time =
2593 monotonic_clock::time_point(
2594 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2595 const UUID source_node_boot_uuid = UUID::FromString(
2596 log_header->message().source_node_boot_uuid()->string_view());
2597
2598 if (log_header->message().node()->name()->string_view() != "pi1") {
2599 // The remote message channel should rotate later and have more parts.
2600 // This only is true on the log files with shared remote messages.
2601 //
2602 // TODO(austin): I'm not the most thrilled with this test pattern... It
2603 // feels brittle in a different way.
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002604 if (file.find("timestamps/remote_pi2") == std::string::npos) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002605 switch (log_header->message().parts_index()) {
2606 case 0:
2607 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2608 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2609 break;
2610 case 1:
2611 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2612 ASSERT_EQ(monotonic_start_time,
2613 monotonic_clock::epoch() + chrono::seconds(1));
2614 break;
2615 case 2:
2616 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2617 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2618 break;
2619 case 3:
2620 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2621 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
Austin Schuhac6d89e2024-03-27 14:56:09 -07002622 chrono::nanoseconds(2323000000))
Naman Guptaa63aa132023-03-22 20:06:34 -07002623 << " on " << file;
2624 break;
2625 default:
2626 FAIL();
2627 break;
2628 }
2629 } else {
2630 switch (log_header->message().parts_index()) {
2631 case 0:
2632 case 1:
2633 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2634 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2635 break;
2636 case 2:
2637 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2638 ASSERT_EQ(monotonic_start_time,
2639 monotonic_clock::epoch() + chrono::seconds(1));
2640 break;
2641 case 3:
2642 case 4:
2643 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2644 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2645 break;
2646 case 5:
2647 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2648 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
Austin Schuhac6d89e2024-03-27 14:56:09 -07002649 chrono::nanoseconds(2323000000))
Naman Guptaa63aa132023-03-22 20:06:34 -07002650 << " on " << file;
2651 break;
2652 default:
2653 FAIL();
2654 break;
2655 }
2656 }
2657 continue;
2658 }
2659 SCOPED_TRACE(file);
2660 SCOPED_TRACE(aos::FlatbufferToJson(
2661 *log_header, {.multi_line = true, .max_vector_size = 100}));
2662 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2663 ASSERT_EQ(
2664 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2665 EXPECT_EQ(
2666 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2667 monotonic_clock::max_time.time_since_epoch().count());
2668 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2669 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2670 2u);
2671 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2672 monotonic_clock::max_time.time_since_epoch().count());
2673 ASSERT_TRUE(log_header->message()
2674 .has_oldest_remote_unreliable_monotonic_timestamps());
2675 ASSERT_EQ(log_header->message()
2676 .oldest_remote_unreliable_monotonic_timestamps()
2677 ->size(),
2678 2u);
2679 EXPECT_EQ(log_header->message()
2680 .oldest_remote_unreliable_monotonic_timestamps()
2681 ->Get(0),
2682 monotonic_clock::max_time.time_since_epoch().count());
2683 ASSERT_TRUE(log_header->message()
2684 .has_oldest_local_unreliable_monotonic_timestamps());
2685 ASSERT_EQ(log_header->message()
2686 .oldest_local_unreliable_monotonic_timestamps()
2687 ->size(),
2688 2u);
2689 EXPECT_EQ(log_header->message()
2690 .oldest_local_unreliable_monotonic_timestamps()
2691 ->Get(0),
2692 monotonic_clock::max_time.time_since_epoch().count());
2693
2694 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2695 monotonic_clock::time_point(chrono::nanoseconds(
2696 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2697 1)));
2698 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2699 monotonic_clock::time_point(chrono::nanoseconds(
2700 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2701 const monotonic_clock::time_point
2702 oldest_remote_unreliable_monotonic_timestamps =
2703 monotonic_clock::time_point(chrono::nanoseconds(
2704 log_header->message()
2705 .oldest_remote_unreliable_monotonic_timestamps()
2706 ->Get(1)));
2707 const monotonic_clock::time_point
2708 oldest_local_unreliable_monotonic_timestamps =
2709 monotonic_clock::time_point(chrono::nanoseconds(
2710 log_header->message()
2711 .oldest_local_unreliable_monotonic_timestamps()
2712 ->Get(1)));
2713 const monotonic_clock::time_point
Austin Schuhb5224ec2024-03-27 15:20:09 -07002714 oldest_remote_reliable_monotonic_transmit_timestamps =
2715 monotonic_clock::time_point(chrono::nanoseconds(
2716 log_header->message()
2717 .oldest_remote_reliable_monotonic_transmit_timestamps()
2718 ->Get(1)));
2719 const monotonic_clock::time_point
2720 oldest_local_reliable_monotonic_transmit_timestamps =
2721 monotonic_clock::time_point(chrono::nanoseconds(
2722 log_header->message()
2723 .oldest_local_reliable_monotonic_transmit_timestamps()
2724 ->Get(1)));
2725 const monotonic_clock::time_point
Naman Guptaa63aa132023-03-22 20:06:34 -07002726 oldest_remote_reliable_monotonic_timestamps =
2727 monotonic_clock::time_point(chrono::nanoseconds(
2728 log_header->message()
2729 .oldest_remote_reliable_monotonic_timestamps()
2730 ->Get(1)));
2731 const monotonic_clock::time_point
2732 oldest_local_reliable_monotonic_timestamps =
2733 monotonic_clock::time_point(chrono::nanoseconds(
2734 log_header->message()
2735 .oldest_local_reliable_monotonic_timestamps()
2736 ->Get(1)));
2737 const monotonic_clock::time_point
2738 oldest_logger_remote_unreliable_monotonic_timestamps =
2739 monotonic_clock::time_point(chrono::nanoseconds(
2740 log_header->message()
2741 .oldest_logger_remote_unreliable_monotonic_timestamps()
2742 ->Get(0)));
2743 const monotonic_clock::time_point
2744 oldest_logger_local_unreliable_monotonic_timestamps =
2745 monotonic_clock::time_point(chrono::nanoseconds(
2746 log_header->message()
2747 .oldest_logger_local_unreliable_monotonic_timestamps()
2748 ->Get(0)));
2749 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2750 monotonic_clock::max_time);
2751 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2752 monotonic_clock::max_time);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002753 if (log_header->message().data_stored()->Get(0) == StoredDataType::DATA) {
2754 switch (log_header->message().parts_index()) {
2755 case 0:
2756 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2757 monotonic_clock::max_time);
2758 EXPECT_EQ(oldest_local_monotonic_timestamps,
2759 monotonic_clock::max_time);
2760 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2761 monotonic_clock::max_time);
2762 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2763 monotonic_clock::max_time);
2764 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2765 monotonic_clock::max_time);
2766 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2767 monotonic_clock::max_time);
Austin Schuhb5224ec2024-03-27 15:20:09 -07002768 EXPECT_EQ(oldest_remote_reliable_monotonic_transmit_timestamps,
2769 monotonic_clock::max_time);
2770 EXPECT_EQ(oldest_local_reliable_monotonic_transmit_timestamps,
2771 monotonic_clock::max_time);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002772 break;
2773 default:
2774 FAIL();
2775 break;
2776 }
2777 } else if (log_header->message().data_stored()->Get(0) ==
2778 StoredDataType::TIMESTAMPS) {
2779 switch (log_header->message().parts_index()) {
2780 case 0:
2781 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2782 monotonic_clock::time_point(chrono::microseconds(90200)));
2783 EXPECT_EQ(oldest_local_monotonic_timestamps,
2784 monotonic_clock::time_point(chrono::microseconds(90350)));
2785 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2786 monotonic_clock::time_point(chrono::microseconds(90200)));
2787 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2788 monotonic_clock::time_point(chrono::microseconds(90350)));
2789 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2790 monotonic_clock::max_time);
2791 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2792 monotonic_clock::max_time);
Austin Schuhb5224ec2024-03-27 15:20:09 -07002793 EXPECT_EQ(oldest_remote_reliable_monotonic_transmit_timestamps,
2794 monotonic_clock::time_point(chrono::microseconds(90250)));
2795 EXPECT_EQ(oldest_local_reliable_monotonic_transmit_timestamps,
2796 monotonic_clock::time_point(chrono::microseconds(90350)));
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002797 break;
2798 case 1:
2799 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2800 monotonic_clock::time_point(chrono::microseconds(90200)))
2801 << file;
2802 EXPECT_EQ(oldest_local_monotonic_timestamps,
2803 monotonic_clock::time_point(chrono::microseconds(90350)))
2804 << file;
2805 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2806 monotonic_clock::time_point(chrono::microseconds(90200)))
2807 << file;
2808 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2809 monotonic_clock::time_point(chrono::microseconds(90350)))
2810 << file;
2811 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2812 monotonic_clock::time_point(chrono::microseconds(100000)))
2813 << file;
2814 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
Austin Schuhac6d89e2024-03-27 14:56:09 -07002815 monotonic_clock::time_point(chrono::microseconds(100100)))
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002816 << file;
Austin Schuhb5224ec2024-03-27 15:20:09 -07002817 EXPECT_EQ(oldest_remote_reliable_monotonic_transmit_timestamps,
2818 monotonic_clock::time_point(chrono::microseconds(90250)))
2819 << file;
2820 EXPECT_EQ(oldest_local_reliable_monotonic_transmit_timestamps,
2821 monotonic_clock::time_point(chrono::microseconds(90350)))
2822 << file;
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002823 break;
2824 case 2:
2825 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2826 monotonic_clock::time_point(chrono::milliseconds(1323) +
2827 chrono::microseconds(200)));
2828 EXPECT_EQ(
2829 oldest_local_monotonic_timestamps,
2830 monotonic_clock::time_point(chrono::microseconds(10100350)));
2831 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2832 monotonic_clock::time_point(chrono::milliseconds(1323) +
2833 chrono::microseconds(200)));
2834 EXPECT_EQ(
2835 oldest_local_unreliable_monotonic_timestamps,
2836 monotonic_clock::time_point(chrono::microseconds(10100350)));
2837 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2838 monotonic_clock::max_time)
2839 << file;
2840 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2841 monotonic_clock::max_time)
2842 << file;
Austin Schuhb5224ec2024-03-27 15:20:09 -07002843 EXPECT_EQ(oldest_remote_reliable_monotonic_transmit_timestamps,
2844 monotonic_clock::time_point(chrono::milliseconds(1323) +
2845 chrono::microseconds(250)))
2846 << file;
2847 EXPECT_EQ(oldest_local_reliable_monotonic_transmit_timestamps,
2848 monotonic_clock::time_point(chrono::microseconds(10100350)))
2849 << file;
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002850 break;
2851 case 3:
2852 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2853 monotonic_clock::time_point(chrono::milliseconds(1323) +
2854 chrono::microseconds(200)));
2855 EXPECT_EQ(
2856 oldest_local_monotonic_timestamps,
2857 monotonic_clock::time_point(chrono::microseconds(10100350)));
2858 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2859 monotonic_clock::time_point(chrono::milliseconds(1323) +
2860 chrono::microseconds(200)));
2861 EXPECT_EQ(
2862 oldest_local_unreliable_monotonic_timestamps,
2863 monotonic_clock::time_point(chrono::microseconds(10100350)));
2864 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2865 monotonic_clock::time_point(chrono::microseconds(1423000)))
2866 << file;
2867 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
Austin Schuhac6d89e2024-03-27 14:56:09 -07002868 monotonic_clock::time_point(chrono::microseconds(10200100)))
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002869 << file;
Austin Schuhb5224ec2024-03-27 15:20:09 -07002870 EXPECT_EQ(oldest_remote_reliable_monotonic_transmit_timestamps,
2871 monotonic_clock::time_point(chrono::milliseconds(1323) +
2872 chrono::microseconds(250)))
2873 << file;
2874 EXPECT_EQ(oldest_local_reliable_monotonic_transmit_timestamps,
2875 monotonic_clock::time_point(chrono::microseconds(10100350)))
2876 << file;
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002877 break;
2878 default:
2879 FAIL();
2880 break;
2881 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002882 }
2883 }
2884
2885 // Confirm that we refuse to replay logs with missing boot uuids.
2886 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002887 auto sorted_parts = SortParts(pi1_reboot_logfiles_);
2888 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2889 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002890
2891 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2892 log_reader_factory.set_send_delay(chrono::microseconds(0));
2893
2894 // This sends out the fetched messages and advances time to the start of
2895 // the log file.
2896 reader.Register(&log_reader_factory);
2897
2898 log_reader_factory.Run();
2899
2900 reader.Deregister();
2901 }
2902}
2903
2904// Tests that we can sort a log which only has timestamps from the remote
2905// because the local message_bridge_client failed to connect.
2906TEST_P(MultinodeLoggerTest, RemoteRebootOnlyTimestamps) {
Austin Schuh6ecfe902023-08-04 22:44:37 -07002907 if (file_strategy() == FileStrategy::kCombine) {
2908 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
2909 }
2910
Naman Guptaa63aa132023-03-22 20:06:34 -07002911 const UUID pi1_boot0 = UUID::Random();
2912 const UUID pi2_boot0 = UUID::Random();
2913 const UUID pi2_boot1 = UUID::Random();
2914 {
2915 CHECK_EQ(pi1_index_, 0u);
2916 CHECK_EQ(pi2_index_, 1u);
2917
2918 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2919 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2920 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2921
2922 time_converter_.AddNextTimestamp(
2923 distributed_clock::epoch(),
2924 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2925 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2926 time_converter_.AddNextTimestamp(
2927 distributed_clock::epoch() + reboot_time,
2928 {BootTimestamp::epoch() + reboot_time,
2929 BootTimestamp{
2930 .boot = 1,
2931 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2932 }
2933 pi2_->Disconnect(pi1_->node());
2934
2935 std::vector<std::string> filenames;
2936 {
2937 LoggerState pi1_logger = MakeLogger(pi1_);
2938
2939 event_loop_factory_.RunFor(chrono::milliseconds(95));
2940 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2941 pi1_boot0);
2942 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2943 pi2_boot0);
2944
2945 StartLogger(&pi1_logger);
2946
2947 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2948
2949 VLOG(1) << "Reboot now!";
2950
2951 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2952 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2953 pi1_boot0);
2954 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2955 pi2_boot1);
2956 pi1_logger.AppendAllFilenames(&filenames);
2957 }
2958
2959 std::sort(filenames.begin(), filenames.end());
2960
2961 // Confirm that our new oldest timestamps properly update as we reboot and
2962 // rotate.
2963 size_t timestamp_file_count = 0;
2964 for (const std::string &file : filenames) {
2965 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2966 ReadHeader(file);
2967 CHECK(log_header);
2968
2969 if (log_header->message().has_configuration()) {
2970 continue;
2971 }
2972
2973 const monotonic_clock::time_point monotonic_start_time =
2974 monotonic_clock::time_point(
2975 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2976 const UUID source_node_boot_uuid = UUID::FromString(
2977 log_header->message().source_node_boot_uuid()->string_view());
2978
2979 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2980 ASSERT_EQ(
2981 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2982 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2983 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2984 2u);
2985 ASSERT_TRUE(log_header->message()
2986 .has_oldest_remote_unreliable_monotonic_timestamps());
2987 ASSERT_EQ(log_header->message()
2988 .oldest_remote_unreliable_monotonic_timestamps()
2989 ->size(),
2990 2u);
2991 ASSERT_TRUE(log_header->message()
2992 .has_oldest_local_unreliable_monotonic_timestamps());
2993 ASSERT_EQ(log_header->message()
2994 .oldest_local_unreliable_monotonic_timestamps()
2995 ->size(),
2996 2u);
2997 ASSERT_TRUE(log_header->message()
2998 .has_oldest_remote_reliable_monotonic_timestamps());
2999 ASSERT_EQ(log_header->message()
3000 .oldest_remote_reliable_monotonic_timestamps()
3001 ->size(),
3002 2u);
3003 ASSERT_TRUE(
3004 log_header->message().has_oldest_local_reliable_monotonic_timestamps());
3005 ASSERT_EQ(log_header->message()
3006 .oldest_local_reliable_monotonic_timestamps()
3007 ->size(),
3008 2u);
3009
3010 ASSERT_TRUE(
3011 log_header->message()
3012 .has_oldest_logger_remote_unreliable_monotonic_timestamps());
3013 ASSERT_EQ(log_header->message()
3014 .oldest_logger_remote_unreliable_monotonic_timestamps()
3015 ->size(),
3016 2u);
3017 ASSERT_TRUE(log_header->message()
3018 .has_oldest_logger_local_unreliable_monotonic_timestamps());
3019 ASSERT_EQ(log_header->message()
3020 .oldest_logger_local_unreliable_monotonic_timestamps()
3021 ->size(),
3022 2u);
3023
3024 if (log_header->message().node()->name()->string_view() != "pi1") {
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07003025 ASSERT_TRUE(file.find("timestamps/remote_pi2") != std::string::npos);
Naman Guptaa63aa132023-03-22 20:06:34 -07003026
3027 const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
3028 ReadNthMessage(file, 0);
3029 CHECK(msg);
3030
3031 EXPECT_TRUE(msg->message().has_monotonic_sent_time());
3032 EXPECT_TRUE(msg->message().has_monotonic_remote_time());
3033
3034 const monotonic_clock::time_point
3035 expected_oldest_local_monotonic_timestamps(
3036 chrono::nanoseconds(msg->message().monotonic_sent_time()));
3037 const monotonic_clock::time_point
3038 expected_oldest_remote_monotonic_timestamps(
3039 chrono::nanoseconds(msg->message().monotonic_remote_time()));
3040 const monotonic_clock::time_point
3041 expected_oldest_timestamp_monotonic_timestamps(
3042 chrono::nanoseconds(msg->message().monotonic_timestamp_time()));
3043
3044 EXPECT_NE(expected_oldest_local_monotonic_timestamps,
3045 monotonic_clock::min_time);
3046 EXPECT_NE(expected_oldest_remote_monotonic_timestamps,
3047 monotonic_clock::min_time);
3048 EXPECT_NE(expected_oldest_timestamp_monotonic_timestamps,
3049 monotonic_clock::min_time);
3050
3051 ++timestamp_file_count;
3052 // Since the log file is from the perspective of the other node,
3053 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
3054 monotonic_clock::time_point(chrono::nanoseconds(
3055 log_header->message().oldest_remote_monotonic_timestamps()->Get(
3056 0)));
3057 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
3058 monotonic_clock::time_point(chrono::nanoseconds(
3059 log_header->message().oldest_local_monotonic_timestamps()->Get(
3060 0)));
3061 const monotonic_clock::time_point
3062 oldest_remote_unreliable_monotonic_timestamps =
3063 monotonic_clock::time_point(chrono::nanoseconds(
3064 log_header->message()
3065 .oldest_remote_unreliable_monotonic_timestamps()
3066 ->Get(0)));
3067 const monotonic_clock::time_point
3068 oldest_local_unreliable_monotonic_timestamps =
3069 monotonic_clock::time_point(chrono::nanoseconds(
3070 log_header->message()
3071 .oldest_local_unreliable_monotonic_timestamps()
3072 ->Get(0)));
3073 const monotonic_clock::time_point
3074 oldest_remote_reliable_monotonic_timestamps =
3075 monotonic_clock::time_point(chrono::nanoseconds(
3076 log_header->message()
3077 .oldest_remote_reliable_monotonic_timestamps()
3078 ->Get(0)));
3079 const monotonic_clock::time_point
3080 oldest_local_reliable_monotonic_timestamps =
3081 monotonic_clock::time_point(chrono::nanoseconds(
3082 log_header->message()
3083 .oldest_local_reliable_monotonic_timestamps()
3084 ->Get(0)));
3085 const monotonic_clock::time_point
3086 oldest_logger_remote_unreliable_monotonic_timestamps =
3087 monotonic_clock::time_point(chrono::nanoseconds(
3088 log_header->message()
3089 .oldest_logger_remote_unreliable_monotonic_timestamps()
3090 ->Get(1)));
3091 const monotonic_clock::time_point
3092 oldest_logger_local_unreliable_monotonic_timestamps =
3093 monotonic_clock::time_point(chrono::nanoseconds(
3094 log_header->message()
3095 .oldest_logger_local_unreliable_monotonic_timestamps()
3096 ->Get(1)));
3097
3098 const Channel *channel =
3099 event_loop_factory_.configuration()->channels()->Get(
3100 msg->message().channel_index());
3101 const Connection *connection = configuration::ConnectionToNode(
3102 channel, configuration::GetNode(
3103 event_loop_factory_.configuration(),
3104 log_header->message().node()->name()->string_view()));
3105
3106 const bool reliable = connection->time_to_live() == 0;
3107
3108 SCOPED_TRACE(file);
3109 SCOPED_TRACE(aos::FlatbufferToJson(
3110 *log_header, {.multi_line = true, .max_vector_size = 100}));
3111
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003112 // Confirm that the oldest timestamps match what we expect. Based on
3113 // what we are doing, we know that the oldest time is the first
3114 // message's time.
3115 //
3116 // This makes the test robust to both the split and combined config
3117 // tests.
3118 switch (log_header->message().parts_index()) {
3119 case 0:
3120 EXPECT_EQ(oldest_remote_monotonic_timestamps,
3121 expected_oldest_remote_monotonic_timestamps);
3122 EXPECT_EQ(oldest_local_monotonic_timestamps,
3123 expected_oldest_local_monotonic_timestamps);
3124 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
3125 expected_oldest_local_monotonic_timestamps)
3126 << file;
3127 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
3128 expected_oldest_timestamp_monotonic_timestamps)
3129 << file;
3130
3131 if (reliable) {
3132 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
Naman Guptaa63aa132023-03-22 20:06:34 -07003133 expected_oldest_remote_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003134 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
Naman Guptaa63aa132023-03-22 20:06:34 -07003135 expected_oldest_local_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003136 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
3137 monotonic_clock::max_time);
3138 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
3139 monotonic_clock::max_time);
3140 } else {
3141 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
3142 monotonic_clock::max_time);
3143 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
3144 monotonic_clock::max_time);
Naman Guptaa63aa132023-03-22 20:06:34 -07003145 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
3146 expected_oldest_remote_monotonic_timestamps);
3147 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
3148 expected_oldest_local_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003149 }
3150 break;
3151 case 1:
3152 EXPECT_EQ(oldest_remote_monotonic_timestamps,
3153 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
3154 EXPECT_EQ(oldest_local_monotonic_timestamps,
3155 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
3156 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
3157 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
3158 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
3159 monotonic_clock::epoch() + chrono::nanoseconds(90250000));
3160 if (reliable) {
3161 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
3162 expected_oldest_remote_monotonic_timestamps);
3163 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
3164 expected_oldest_local_monotonic_timestamps);
3165 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
3166 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
3167 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
3168 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
3169 } else {
3170 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
3171 monotonic_clock::max_time);
3172 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
3173 monotonic_clock::max_time);
3174 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
3175 expected_oldest_remote_monotonic_timestamps);
3176 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
3177 expected_oldest_local_monotonic_timestamps);
3178 }
3179 break;
3180 case 2:
3181 EXPECT_EQ(
3182 oldest_remote_monotonic_timestamps,
3183 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
3184 EXPECT_EQ(oldest_local_monotonic_timestamps,
3185 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
3186 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
3187 expected_oldest_local_monotonic_timestamps)
3188 << file;
3189 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
3190 expected_oldest_timestamp_monotonic_timestamps)
3191 << file;
3192 if (reliable) {
3193 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
3194 expected_oldest_remote_monotonic_timestamps);
3195 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
3196 expected_oldest_local_monotonic_timestamps);
3197 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
3198 monotonic_clock::max_time);
3199 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
3200 monotonic_clock::max_time);
3201 } else {
3202 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
3203 monotonic_clock::max_time);
3204 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
3205 monotonic_clock::max_time);
3206 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
3207 expected_oldest_remote_monotonic_timestamps);
3208 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
3209 expected_oldest_local_monotonic_timestamps);
3210 }
3211 break;
Naman Guptaa63aa132023-03-22 20:06:34 -07003212
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003213 case 3:
3214 EXPECT_EQ(
3215 oldest_remote_monotonic_timestamps,
3216 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
3217 EXPECT_EQ(oldest_local_monotonic_timestamps,
3218 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
3219 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
3220 expected_oldest_remote_monotonic_timestamps);
3221 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
3222 expected_oldest_local_monotonic_timestamps);
3223 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
3224 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
3225 EXPECT_EQ(
3226 oldest_logger_local_unreliable_monotonic_timestamps,
3227 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
3228 break;
3229 default:
3230 FAIL();
3231 break;
Naman Guptaa63aa132023-03-22 20:06:34 -07003232 }
3233
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003234 switch (log_header->message().parts_index()) {
3235 case 0:
3236 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
3237 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
3238 break;
3239 case 1:
3240 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
3241 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
3242 break;
3243 case 2:
3244 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
3245 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
3246 break;
3247 case 3:
3248 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
3249 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
3250 break;
3251 [[fallthrough]];
3252 default:
3253 FAIL();
3254 break;
3255 }
Naman Guptaa63aa132023-03-22 20:06:34 -07003256 continue;
3257 }
3258 EXPECT_EQ(
3259 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
3260 monotonic_clock::max_time.time_since_epoch().count());
3261 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
3262 monotonic_clock::max_time.time_since_epoch().count());
3263 EXPECT_EQ(log_header->message()
3264 .oldest_remote_unreliable_monotonic_timestamps()
3265 ->Get(0),
3266 monotonic_clock::max_time.time_since_epoch().count());
3267 EXPECT_EQ(log_header->message()
3268 .oldest_local_unreliable_monotonic_timestamps()
3269 ->Get(0),
3270 monotonic_clock::max_time.time_since_epoch().count());
3271
3272 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
3273 monotonic_clock::time_point(chrono::nanoseconds(
3274 log_header->message().oldest_remote_monotonic_timestamps()->Get(
3275 1)));
3276 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
3277 monotonic_clock::time_point(chrono::nanoseconds(
3278 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
3279 const monotonic_clock::time_point
3280 oldest_remote_unreliable_monotonic_timestamps =
3281 monotonic_clock::time_point(chrono::nanoseconds(
3282 log_header->message()
3283 .oldest_remote_unreliable_monotonic_timestamps()
3284 ->Get(1)));
3285 const monotonic_clock::time_point
3286 oldest_local_unreliable_monotonic_timestamps =
3287 monotonic_clock::time_point(chrono::nanoseconds(
3288 log_header->message()
3289 .oldest_local_unreliable_monotonic_timestamps()
3290 ->Get(1)));
3291 switch (log_header->message().parts_index()) {
3292 case 0:
3293 EXPECT_EQ(oldest_remote_monotonic_timestamps,
3294 monotonic_clock::max_time);
3295 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
3296 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
3297 monotonic_clock::max_time);
3298 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
3299 monotonic_clock::max_time);
3300 break;
3301 default:
3302 FAIL();
3303 break;
3304 }
3305 }
3306
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003307 EXPECT_EQ(timestamp_file_count, 4u);
Naman Guptaa63aa132023-03-22 20:06:34 -07003308
3309 // Confirm that we can actually sort the resulting log and read it.
3310 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003311 auto sorted_parts = SortParts(filenames);
3312 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3313 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07003314
3315 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
3316 log_reader_factory.set_send_delay(chrono::microseconds(0));
3317
3318 // This sends out the fetched messages and advances time to the start of
3319 // the log file.
3320 reader.Register(&log_reader_factory);
3321
3322 log_reader_factory.Run();
3323
3324 reader.Deregister();
3325 }
3326}
3327
3328// Tests that we properly handle one direction of message_bridge being
3329// unavailable.
3330TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07003331 std::vector<std::string> actual_filenames;
3332
Naman Guptaa63aa132023-03-22 20:06:34 -07003333 pi1_->Disconnect(pi2_->node());
3334 time_converter_.AddMonotonic(
3335 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
3336
3337 time_converter_.AddMonotonic(
3338 {chrono::milliseconds(10000),
3339 chrono::milliseconds(10000) - chrono::milliseconds(1)});
3340 {
3341 LoggerState pi1_logger = MakeLogger(pi1_);
3342
3343 event_loop_factory_.RunFor(chrono::milliseconds(95));
3344
3345 StartLogger(&pi1_logger);
3346
3347 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07003348 pi1_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07003349 }
3350
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003351 // Confirm that we can parse the result. LogReader has enough internal
3352 // CHECKs to confirm the right thing happened.
Austin Schuh8fb4b452023-08-04 17:02:27 -07003353 ConfirmReadable(actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07003354}
3355
3356// Tests that we properly handle one direction of message_bridge being
3357// unavailable.
3358TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
3359 pi1_->Disconnect(pi2_->node());
3360 time_converter_.AddMonotonic(
3361 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(500)});
3362
3363 time_converter_.AddMonotonic(
3364 {chrono::milliseconds(10000),
3365 chrono::milliseconds(10000) + chrono::milliseconds(1)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07003366
3367 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07003368 {
3369 LoggerState pi1_logger = MakeLogger(pi1_);
3370
3371 event_loop_factory_.RunFor(chrono::milliseconds(95));
3372
3373 StartLogger(&pi1_logger);
3374
3375 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07003376 pi1_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07003377 }
3378
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003379 // Confirm that we can parse the result. LogReader has enough internal
3380 // CHECKs to confirm the right thing happened.
Austin Schuh8fb4b452023-08-04 17:02:27 -07003381 ConfirmReadable(filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07003382}
3383
3384// Tests that we explode if someone passes in a part file twice with a better
3385// error than an out of order error.
3386TEST_P(MultinodeLoggerTest, DuplicateLogFiles) {
3387 time_converter_.AddMonotonic(
3388 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07003389
3390 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07003391 {
3392 LoggerState pi1_logger = MakeLogger(pi1_);
3393
3394 event_loop_factory_.RunFor(chrono::milliseconds(95));
3395
3396 StartLogger(&pi1_logger);
3397
3398 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07003399
3400 pi1_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07003401 }
3402
3403 std::vector<std::string> duplicates;
Austin Schuh8fb4b452023-08-04 17:02:27 -07003404 for (const std::string &f : filenames) {
Naman Guptaa63aa132023-03-22 20:06:34 -07003405 duplicates.emplace_back(f);
3406 duplicates.emplace_back(f);
3407 }
3408 EXPECT_DEATH({ SortParts(duplicates); }, "Found duplicate parts in");
3409}
3410
3411// Tests that we explode if someone loses a part out of the middle of a log.
3412TEST_P(MultinodeLoggerTest, MissingPartsFromMiddle) {
Austin Schuh6ecfe902023-08-04 22:44:37 -07003413 if (file_strategy() == FileStrategy::kCombine) {
3414 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
3415 }
Naman Guptaa63aa132023-03-22 20:06:34 -07003416 time_converter_.AddMonotonic(
3417 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
3418 {
3419 LoggerState pi1_logger = MakeLogger(pi1_);
3420
3421 event_loop_factory_.RunFor(chrono::milliseconds(95));
3422
3423 StartLogger(&pi1_logger);
3424 aos::monotonic_clock::time_point last_rotation_time =
3425 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07003426 pi1_logger.logger->set_on_logged_period(
3427 [&](aos::monotonic_clock::time_point) {
3428 const auto now = pi1_logger.event_loop->monotonic_now();
3429 if (now > last_rotation_time + std::chrono::seconds(5)) {
3430 pi1_logger.logger->Rotate();
3431 last_rotation_time = now;
3432 }
3433 });
Naman Guptaa63aa132023-03-22 20:06:34 -07003434
3435 event_loop_factory_.RunFor(chrono::milliseconds(10000));
3436 }
3437
3438 std::vector<std::string> missing_parts;
3439
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07003440 missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part0" +
3441 Extension());
3442 missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part2" +
3443 Extension());
Naman Guptaa63aa132023-03-22 20:06:34 -07003444 missing_parts.emplace_back(absl::StrCat(
3445 logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
3446
3447 EXPECT_DEATH({ SortParts(missing_parts); },
3448 "Broken log, missing part files between");
3449}
3450
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003451// Tests that we properly handle a dead node. Do this by just disconnecting
3452// it and only using one nodes of logs.
Naman Guptaa63aa132023-03-22 20:06:34 -07003453TEST_P(MultinodeLoggerTest, DeadNode) {
3454 pi1_->Disconnect(pi2_->node());
3455 pi2_->Disconnect(pi1_->node());
3456 time_converter_.AddMonotonic(
3457 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
3458 {
3459 LoggerState pi1_logger = MakeLogger(pi1_);
3460
3461 event_loop_factory_.RunFor(chrono::milliseconds(95));
3462
3463 StartLogger(&pi1_logger);
3464
3465 event_loop_factory_.RunFor(chrono::milliseconds(10000));
3466 }
3467
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003468 // Confirm that we can parse the result. LogReader has enough internal
3469 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003470 ConfirmReadable(MakePi1DeadNodeLogfiles());
3471}
3472
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003473// Tests that we can relog with a different config. This makes most sense
3474// when you are trying to edit a log and want to use channel renaming + the
3475// original config in the new log.
Naman Guptaa63aa132023-03-22 20:06:34 -07003476TEST_P(MultinodeLoggerTest, LogDifferentConfig) {
3477 time_converter_.StartEqual();
3478 {
3479 LoggerState pi1_logger = MakeLogger(pi1_);
3480 LoggerState pi2_logger = MakeLogger(pi2_);
3481
3482 event_loop_factory_.RunFor(chrono::milliseconds(95));
3483
3484 StartLogger(&pi1_logger);
3485 StartLogger(&pi2_logger);
3486
3487 event_loop_factory_.RunFor(chrono::milliseconds(20000));
3488 }
3489
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003490 auto sorted_parts = SortParts(logfiles_);
3491 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3492 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07003493 reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
3494
3495 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
3496 log_reader_factory.set_send_delay(chrono::microseconds(0));
3497
3498 // This sends out the fetched messages and advances time to the start of the
3499 // log file.
3500 reader.Register(&log_reader_factory);
3501
3502 const Node *pi1 =
3503 configuration::GetNode(log_reader_factory.configuration(), "pi1");
3504 const Node *pi2 =
3505 configuration::GetNode(log_reader_factory.configuration(), "pi2");
3506
3507 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
3508 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
3509 LOG(INFO) << "now pi1 "
3510 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
3511 LOG(INFO) << "now pi2 "
3512 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
3513
3514 EXPECT_THAT(reader.LoggedNodes(),
3515 ::testing::ElementsAre(
3516 configuration::GetNode(reader.logged_configuration(), pi1),
3517 configuration::GetNode(reader.logged_configuration(), pi2)));
3518
3519 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
3520
3521 // And confirm we can re-create a log again, while checking the contents.
3522 std::vector<std::string> log_files;
3523 {
3524 LoggerState pi1_logger =
3525 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
3526 &log_reader_factory, reader.logged_configuration());
3527 LoggerState pi2_logger =
3528 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
3529 &log_reader_factory, reader.logged_configuration());
3530
Austin Schuh7e417682023-08-11 17:05:30 -07003531 pi1_logger.StartLogger(tmp_dir_ + "/logs/relogged1");
3532 pi2_logger.StartLogger(tmp_dir_ + "/logs/relogged2");
Naman Guptaa63aa132023-03-22 20:06:34 -07003533
3534 log_reader_factory.Run();
3535
3536 for (auto &x : pi1_logger.log_namer->all_filenames()) {
Austin Schuh7e417682023-08-11 17:05:30 -07003537 log_files.emplace_back(absl::StrCat(tmp_dir_, "/logs/relogged1_", x));
Naman Guptaa63aa132023-03-22 20:06:34 -07003538 }
3539 for (auto &x : pi2_logger.log_namer->all_filenames()) {
Austin Schuh7e417682023-08-11 17:05:30 -07003540 log_files.emplace_back(absl::StrCat(tmp_dir_, "/logs/relogged2_", x));
Naman Guptaa63aa132023-03-22 20:06:34 -07003541 }
3542 }
3543
3544 reader.Deregister();
3545
3546 // And verify that we can run the LogReader over the relogged files without
3547 // hitting any fatal errors.
3548 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003549 auto sorted_parts = SortParts(log_files);
3550 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3551 LogReader relogged_reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07003552 relogged_reader.Register();
3553
3554 relogged_reader.event_loop_factory()->Run();
3555 }
3556}
3557
Maxwell Gumley8c1b87f2024-02-13 17:54:52 -07003558// Tests that we can relog with a subset of the original config. This is useful
3559// for excluding obsolete or deprecated channels, so they don't appear in the
3560// configuration when reading the log.
3561TEST_P(MultinodeLoggerTest, LogPartialConfig) {
3562 time_converter_.StartEqual();
3563 {
3564 LoggerState pi1_logger = MakeLogger(pi1_);
3565 LoggerState pi2_logger = MakeLogger(pi2_);
3566
3567 event_loop_factory_.RunFor(chrono::milliseconds(95));
3568
3569 StartLogger(&pi1_logger);
3570 StartLogger(&pi2_logger);
3571
3572 event_loop_factory_.RunFor(chrono::milliseconds(20000));
3573 }
3574
3575 auto sorted_parts = SortParts(logfiles_);
3576 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3577 LogReader reader(sorted_parts);
3578 reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
3579
3580 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
3581 log_reader_factory.set_send_delay(chrono::microseconds(0));
3582
3583 // This sends out the fetched messages and advances time to the start of the
3584 // log file.
3585 reader.Register(&log_reader_factory);
3586
3587 const Node *pi1 =
3588 configuration::GetNode(log_reader_factory.configuration(), "pi1");
3589 const Node *pi2 =
3590 configuration::GetNode(log_reader_factory.configuration(), "pi2");
3591
3592 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
3593 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
3594 LOG(INFO) << "now pi1 "
3595 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
3596 LOG(INFO) << "now pi2 "
3597 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
3598
3599 EXPECT_THAT(reader.LoggedNodes(),
3600 ::testing::ElementsAre(
3601 configuration::GetNode(reader.logged_configuration(), pi1),
3602 configuration::GetNode(reader.logged_configuration(), pi2)));
3603
3604 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
3605
3606 const FlatbufferDetachedBuffer<Configuration> partial_configuration_buffer =
3607 configuration::GetPartialConfiguration(
3608 *reader.event_loop_factory()->configuration(),
3609 [](const Channel &channel) {
3610 if (channel.name()->string_view().starts_with("/original/")) {
3611 LOG(INFO) << "Omitting channel from save_log, channel: "
3612 << channel.name()->string_view() << ", "
3613 << channel.type()->string_view();
3614 return false;
3615 }
3616 return true;
3617 });
3618
3619 // And confirm we can re-create a log again, while checking the contents.
3620 std::vector<std::string> log_files;
3621 {
3622 const Configuration *partial_configuration =
3623 &(partial_configuration_buffer.message());
3624
3625 LoggerState pi1_logger =
3626 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
3627 &log_reader_factory, partial_configuration);
3628 LoggerState pi2_logger =
3629 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
3630 &log_reader_factory, partial_configuration);
3631
3632 pi1_logger.StartLogger(tmp_dir_ + "/logs/relogged1");
3633 pi2_logger.StartLogger(tmp_dir_ + "/logs/relogged2");
3634
3635 log_reader_factory.Run();
3636
3637 for (auto &x : pi1_logger.log_namer->all_filenames()) {
3638 log_files.emplace_back(absl::StrCat(tmp_dir_, "/logs/relogged1_", x));
3639 }
3640 for (auto &x : pi2_logger.log_namer->all_filenames()) {
3641 log_files.emplace_back(absl::StrCat(tmp_dir_, "/logs/relogged2_", x));
3642 }
3643 }
3644
3645 reader.Deregister();
3646
3647 // And verify that we can run the LogReader over the relogged files without
3648 // hitting any fatal errors.
3649 {
3650 auto sorted_parts = SortParts(log_files);
3651 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3652 LogReader relogged_reader(sorted_parts);
3653 relogged_reader.Register();
3654
3655 relogged_reader.event_loop_factory()->Run();
3656 }
3657}
3658
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003659// Tests that we properly replay a log where the start time for a node is
3660// before any data on the node. This can happen if the logger starts before
3661// data is published. While the scenario below is a bit convoluted, we have
3662// seen logs like this generated out in the wild.
Naman Guptaa63aa132023-03-22 20:06:34 -07003663TEST(MultinodeRebootLoggerTest, StartTimeBeforeData) {
Austin Schuh7e417682023-08-11 17:05:30 -07003664 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3665 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3666
Naman Guptaa63aa132023-03-22 20:06:34 -07003667 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3668 aos::configuration::ReadConfig(ArtifactPath(
3669 "aos/events/logging/multinode_pingpong_split3_config.json"));
3670 message_bridge::TestingTimeConverter time_converter(
3671 configuration::NodesCount(&config.message()));
3672 SimulatedEventLoopFactory event_loop_factory(&config.message());
3673 event_loop_factory.SetTimeConverter(&time_converter);
3674 NodeEventLoopFactory *const pi1 =
3675 event_loop_factory.GetNodeEventLoopFactory("pi1");
3676 const size_t pi1_index = configuration::GetNodeIndex(
3677 event_loop_factory.configuration(), pi1->node());
3678 NodeEventLoopFactory *const pi2 =
3679 event_loop_factory.GetNodeEventLoopFactory("pi2");
3680 const size_t pi2_index = configuration::GetNodeIndex(
3681 event_loop_factory.configuration(), pi2->node());
3682 NodeEventLoopFactory *const pi3 =
3683 event_loop_factory.GetNodeEventLoopFactory("pi3");
3684 const size_t pi3_index = configuration::GetNodeIndex(
3685 event_loop_factory.configuration(), pi3->node());
3686
3687 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003688 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003689 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003690 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003691 const std::string kLogfile2_2 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003692 aos::testing::TestTmpDir() + "/logs/multi_logfile2.2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003693 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003694 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
3695
Naman Guptaa63aa132023-03-22 20:06:34 -07003696 const UUID pi1_boot0 = UUID::Random();
3697 const UUID pi2_boot0 = UUID::Random();
3698 const UUID pi2_boot1 = UUID::Random();
3699 const UUID pi3_boot0 = UUID::Random();
3700 {
3701 CHECK_EQ(pi1_index, 0u);
3702 CHECK_EQ(pi2_index, 1u);
3703 CHECK_EQ(pi3_index, 2u);
3704
3705 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3706 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3707 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3708 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3709
3710 time_converter.AddNextTimestamp(
3711 distributed_clock::epoch(),
3712 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3713 BootTimestamp::epoch()});
3714 const chrono::nanoseconds reboot_time = chrono::milliseconds(20000);
3715 time_converter.AddNextTimestamp(
3716 distributed_clock::epoch() + reboot_time,
3717 {BootTimestamp::epoch() + reboot_time,
3718 BootTimestamp{
3719 .boot = 1,
3720 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)},
3721 BootTimestamp::epoch() + reboot_time});
3722 }
3723
3724 // Make everything perfectly quiet.
3725 event_loop_factory.SkipTimingReport();
3726 event_loop_factory.DisableStatistics();
3727
3728 std::vector<std::string> filenames;
3729 {
3730 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003731 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3732 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003733 LoggerState pi3_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003734 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3735 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003736 {
3737 // And now start the logger.
3738 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003739 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3740 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003741
3742 event_loop_factory.RunFor(chrono::milliseconds(1000));
3743
3744 pi1_logger.StartLogger(kLogfile1_1);
3745 pi3_logger.StartLogger(kLogfile3_1);
3746 pi2_logger.StartLogger(kLogfile2_1);
3747
3748 event_loop_factory.RunFor(chrono::milliseconds(10000));
3749
3750 // Now that we've got a start time in the past, turn on data.
3751 event_loop_factory.EnableStatistics();
3752 std::unique_ptr<aos::EventLoop> ping_event_loop =
3753 pi1->MakeEventLoop("ping");
3754 Ping ping(ping_event_loop.get());
3755
3756 pi2->AlwaysStart<Pong>("pong");
3757
3758 event_loop_factory.RunFor(chrono::milliseconds(3000));
3759
3760 pi2_logger.AppendAllFilenames(&filenames);
3761
3762 // Stop logging on pi2 before rebooting and completely shut off all
3763 // messages on pi2.
3764 pi2->DisableStatistics();
3765 pi1->Disconnect(pi2->node());
3766 pi2->Disconnect(pi1->node());
3767 }
3768 event_loop_factory.RunFor(chrono::milliseconds(7000));
3769 // pi2 now reboots.
3770 {
3771 event_loop_factory.RunFor(chrono::milliseconds(1000));
3772
3773 // Start logging again on pi2 after it is up.
3774 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003775 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3776 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003777 pi2_logger.StartLogger(kLogfile2_2);
3778
3779 event_loop_factory.RunFor(chrono::milliseconds(10000));
3780 // And, now that we have a start time in the log, turn data back on.
3781 pi2->EnableStatistics();
3782 pi1->Connect(pi2->node());
3783 pi2->Connect(pi1->node());
3784
3785 pi2->AlwaysStart<Pong>("pong");
3786 std::unique_ptr<aos::EventLoop> ping_event_loop =
3787 pi1->MakeEventLoop("ping");
3788 Ping ping(ping_event_loop.get());
3789
3790 event_loop_factory.RunFor(chrono::milliseconds(3000));
3791
3792 pi2_logger.AppendAllFilenames(&filenames);
3793 }
3794
3795 pi1_logger.AppendAllFilenames(&filenames);
3796 pi3_logger.AppendAllFilenames(&filenames);
3797 }
3798
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003799 // Confirm that we can parse the result. LogReader has enough internal
3800 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003801 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003802 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003803 auto result = ConfirmReadable(filenames);
3804 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
3805 chrono::seconds(1)));
3806 EXPECT_THAT(result[0].second,
3807 ::testing::ElementsAre(realtime_clock::epoch() +
3808 chrono::microseconds(34990350)));
3809
3810 EXPECT_THAT(result[1].first,
3811 ::testing::ElementsAre(
3812 realtime_clock::epoch() + chrono::seconds(1),
3813 realtime_clock::epoch() + chrono::microseconds(3323000)));
3814 EXPECT_THAT(result[1].second,
3815 ::testing::ElementsAre(
3816 realtime_clock::epoch() + chrono::microseconds(13990200),
3817 realtime_clock::epoch() + chrono::microseconds(16313200)));
3818
3819 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
3820 chrono::seconds(1)));
3821 EXPECT_THAT(result[2].second,
3822 ::testing::ElementsAre(realtime_clock::epoch() +
Austin Schuhac6d89e2024-03-27 14:56:09 -07003823 chrono::microseconds(34900100)));
Naman Guptaa63aa132023-03-22 20:06:34 -07003824}
3825
3826// Tests that local data before remote data after reboot is properly replayed.
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003827// We only trigger a reboot in the timestamp interpolation function when
3828// solving the timestamp problem when we actually have a point in the
3829// function. This originally only happened when a point passes the noncausal
3830// filter. At the start of time for the second boot, if we aren't careful, we
3831// will have messages which need to be published at times before the boot.
3832// This happens when a local message is in the log before a forwarded message,
3833// so there is no point in the interpolation function. This delays the
3834// reboot. So, we need to recreate that situation and make sure it doesn't
3835// come back.
Naman Guptaa63aa132023-03-22 20:06:34 -07003836TEST(MultinodeRebootLoggerTest,
3837 LocalMessageBeforeRemoteBeforeStartAfterReboot) {
Austin Schuh7e417682023-08-11 17:05:30 -07003838 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3839 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3840
Naman Guptaa63aa132023-03-22 20:06:34 -07003841 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3842 aos::configuration::ReadConfig(ArtifactPath(
3843 "aos/events/logging/multinode_pingpong_split3_config.json"));
3844 message_bridge::TestingTimeConverter time_converter(
3845 configuration::NodesCount(&config.message()));
3846 SimulatedEventLoopFactory event_loop_factory(&config.message());
3847 event_loop_factory.SetTimeConverter(&time_converter);
3848 NodeEventLoopFactory *const pi1 =
3849 event_loop_factory.GetNodeEventLoopFactory("pi1");
3850 const size_t pi1_index = configuration::GetNodeIndex(
3851 event_loop_factory.configuration(), pi1->node());
3852 NodeEventLoopFactory *const pi2 =
3853 event_loop_factory.GetNodeEventLoopFactory("pi2");
3854 const size_t pi2_index = configuration::GetNodeIndex(
3855 event_loop_factory.configuration(), pi2->node());
3856 NodeEventLoopFactory *const pi3 =
3857 event_loop_factory.GetNodeEventLoopFactory("pi3");
3858 const size_t pi3_index = configuration::GetNodeIndex(
3859 event_loop_factory.configuration(), pi3->node());
3860
3861 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003862 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003863 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003864 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003865 const std::string kLogfile2_2 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003866 aos::testing::TestTmpDir() + "/logs/multi_logfile2.2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003867 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003868 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003869 const UUID pi1_boot0 = UUID::Random();
3870 const UUID pi2_boot0 = UUID::Random();
3871 const UUID pi2_boot1 = UUID::Random();
3872 const UUID pi3_boot0 = UUID::Random();
3873 {
3874 CHECK_EQ(pi1_index, 0u);
3875 CHECK_EQ(pi2_index, 1u);
3876 CHECK_EQ(pi3_index, 2u);
3877
3878 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3879 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3880 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3881 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3882
3883 time_converter.AddNextTimestamp(
3884 distributed_clock::epoch(),
3885 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3886 BootTimestamp::epoch()});
3887 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3888 time_converter.AddNextTimestamp(
3889 distributed_clock::epoch() + reboot_time,
3890 {BootTimestamp::epoch() + reboot_time,
3891 BootTimestamp{.boot = 1,
3892 .time = monotonic_clock::epoch() + reboot_time +
3893 chrono::seconds(100)},
3894 BootTimestamp::epoch() + reboot_time});
3895 }
3896
3897 std::vector<std::string> filenames;
3898 {
3899 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003900 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3901 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003902 LoggerState pi3_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003903 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3904 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003905 {
3906 // And now start the logger.
3907 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003908 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3909 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003910
3911 pi1_logger.StartLogger(kLogfile1_1);
3912 pi3_logger.StartLogger(kLogfile3_1);
3913 pi2_logger.StartLogger(kLogfile2_1);
3914
3915 event_loop_factory.RunFor(chrono::milliseconds(1005));
3916
3917 // Now that we've got a start time in the past, turn on data.
3918 std::unique_ptr<aos::EventLoop> ping_event_loop =
3919 pi1->MakeEventLoop("ping");
3920 Ping ping(ping_event_loop.get());
3921
3922 pi2->AlwaysStart<Pong>("pong");
3923
3924 event_loop_factory.RunFor(chrono::milliseconds(3000));
3925
3926 pi2_logger.AppendAllFilenames(&filenames);
3927
3928 // Disable any remote messages on pi2.
3929 pi1->Disconnect(pi2->node());
3930 pi2->Disconnect(pi1->node());
3931 }
3932 event_loop_factory.RunFor(chrono::milliseconds(995));
3933 // pi2 now reboots at 5 seconds.
3934 {
3935 event_loop_factory.RunFor(chrono::milliseconds(1000));
3936
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003937 // Make local stuff happen before we start logging and connect the
3938 // remote.
Naman Guptaa63aa132023-03-22 20:06:34 -07003939 pi2->AlwaysStart<Pong>("pong");
3940 std::unique_ptr<aos::EventLoop> ping_event_loop =
3941 pi1->MakeEventLoop("ping");
3942 Ping ping(ping_event_loop.get());
3943 event_loop_factory.RunFor(chrono::milliseconds(1005));
3944
3945 // Start logging again on pi2 after it is up.
3946 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003947 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3948 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003949 pi2_logger.StartLogger(kLogfile2_2);
3950
3951 // And allow remote messages now that we have some local ones.
3952 pi1->Connect(pi2->node());
3953 pi2->Connect(pi1->node());
3954
3955 event_loop_factory.RunFor(chrono::milliseconds(1000));
3956
3957 event_loop_factory.RunFor(chrono::milliseconds(3000));
3958
3959 pi2_logger.AppendAllFilenames(&filenames);
3960 }
3961
3962 pi1_logger.AppendAllFilenames(&filenames);
3963 pi3_logger.AppendAllFilenames(&filenames);
3964 }
3965
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003966 // Confirm that we can parse the result. LogReader has enough internal
3967 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003968 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003969 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003970 auto result = ConfirmReadable(filenames);
3971
3972 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3973 EXPECT_THAT(result[0].second,
3974 ::testing::ElementsAre(realtime_clock::epoch() +
Austin Schuhac6d89e2024-03-27 14:56:09 -07003975 chrono::microseconds(11000300)));
Naman Guptaa63aa132023-03-22 20:06:34 -07003976
3977 EXPECT_THAT(result[1].first,
3978 ::testing::ElementsAre(
3979 realtime_clock::epoch(),
3980 realtime_clock::epoch() + chrono::microseconds(107005000)));
3981 EXPECT_THAT(result[1].second,
3982 ::testing::ElementsAre(
Austin Schuhac6d89e2024-03-27 14:56:09 -07003983 realtime_clock::epoch() + chrono::microseconds(4000100),
3984 realtime_clock::epoch() + chrono::microseconds(111000150)));
Naman Guptaa63aa132023-03-22 20:06:34 -07003985
3986 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3987 EXPECT_THAT(result[2].second,
3988 ::testing::ElementsAre(realtime_clock::epoch() +
Austin Schuhac6d89e2024-03-27 14:56:09 -07003989 chrono::microseconds(11000100)));
Naman Guptaa63aa132023-03-22 20:06:34 -07003990
3991 auto start_stop_result = ConfirmReadable(
3992 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3993 realtime_clock::epoch() + chrono::milliseconds(3000));
3994
3995 EXPECT_THAT(
3996 start_stop_result[0].first,
3997 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3998 EXPECT_THAT(
3999 start_stop_result[0].second,
4000 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
4001 EXPECT_THAT(
4002 start_stop_result[1].first,
4003 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
4004 EXPECT_THAT(
4005 start_stop_result[1].second,
4006 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
4007 EXPECT_THAT(
4008 start_stop_result[2].first,
4009 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
4010 EXPECT_THAT(
4011 start_stop_result[2].second,
4012 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
4013}
4014
4015// Tests that setting the start and stop flags across a reboot works as
4016// expected.
4017TEST(MultinodeRebootLoggerTest, RebootStartStopTimes) {
Austin Schuh7e417682023-08-11 17:05:30 -07004018 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4019 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4020
Naman Guptaa63aa132023-03-22 20:06:34 -07004021 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4022 aos::configuration::ReadConfig(ArtifactPath(
4023 "aos/events/logging/multinode_pingpong_split3_config.json"));
4024 message_bridge::TestingTimeConverter time_converter(
4025 configuration::NodesCount(&config.message()));
4026 SimulatedEventLoopFactory event_loop_factory(&config.message());
4027 event_loop_factory.SetTimeConverter(&time_converter);
4028 NodeEventLoopFactory *const pi1 =
4029 event_loop_factory.GetNodeEventLoopFactory("pi1");
4030 const size_t pi1_index = configuration::GetNodeIndex(
4031 event_loop_factory.configuration(), pi1->node());
4032 NodeEventLoopFactory *const pi2 =
4033 event_loop_factory.GetNodeEventLoopFactory("pi2");
4034 const size_t pi2_index = configuration::GetNodeIndex(
4035 event_loop_factory.configuration(), pi2->node());
4036 NodeEventLoopFactory *const pi3 =
4037 event_loop_factory.GetNodeEventLoopFactory("pi3");
4038 const size_t pi3_index = configuration::GetNodeIndex(
4039 event_loop_factory.configuration(), pi3->node());
4040
4041 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004042 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004043 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004044 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004045 const std::string kLogfile2_2 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004046 aos::testing::TestTmpDir() + "/logs/multi_logfile2.2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004047 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004048 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004049 {
4050 CHECK_EQ(pi1_index, 0u);
4051 CHECK_EQ(pi2_index, 1u);
4052 CHECK_EQ(pi3_index, 2u);
4053
4054 time_converter.AddNextTimestamp(
4055 distributed_clock::epoch(),
4056 {BootTimestamp::epoch(), BootTimestamp::epoch(),
4057 BootTimestamp::epoch()});
4058 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
4059 time_converter.AddNextTimestamp(
4060 distributed_clock::epoch() + reboot_time,
4061 {BootTimestamp::epoch() + reboot_time,
4062 BootTimestamp{.boot = 1,
4063 .time = monotonic_clock::epoch() + reboot_time},
4064 BootTimestamp::epoch() + reboot_time});
4065 }
4066
4067 std::vector<std::string> filenames;
4068 {
4069 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004070 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4071 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004072 LoggerState pi3_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004073 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4074 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004075 {
4076 // And now start the logger.
4077 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004078 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4079 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004080
4081 pi1_logger.StartLogger(kLogfile1_1);
4082 pi3_logger.StartLogger(kLogfile3_1);
4083 pi2_logger.StartLogger(kLogfile2_1);
4084
4085 event_loop_factory.RunFor(chrono::milliseconds(1005));
4086
4087 // Now that we've got a start time in the past, turn on data.
4088 std::unique_ptr<aos::EventLoop> ping_event_loop =
4089 pi1->MakeEventLoop("ping");
4090 Ping ping(ping_event_loop.get());
4091
4092 pi2->AlwaysStart<Pong>("pong");
4093
4094 event_loop_factory.RunFor(chrono::milliseconds(3000));
4095
4096 pi2_logger.AppendAllFilenames(&filenames);
4097 }
4098 event_loop_factory.RunFor(chrono::milliseconds(995));
4099 // pi2 now reboots at 5 seconds.
4100 {
4101 event_loop_factory.RunFor(chrono::milliseconds(1000));
4102
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004103 // Make local stuff happen before we start logging and connect the
4104 // remote.
Naman Guptaa63aa132023-03-22 20:06:34 -07004105 pi2->AlwaysStart<Pong>("pong");
4106 std::unique_ptr<aos::EventLoop> ping_event_loop =
4107 pi1->MakeEventLoop("ping");
4108 Ping ping(ping_event_loop.get());
4109 event_loop_factory.RunFor(chrono::milliseconds(5));
4110
4111 // Start logging again on pi2 after it is up.
4112 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004113 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4114 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004115 pi2_logger.StartLogger(kLogfile2_2);
4116
4117 event_loop_factory.RunFor(chrono::milliseconds(5000));
4118
4119 pi2_logger.AppendAllFilenames(&filenames);
4120 }
4121
4122 pi1_logger.AppendAllFilenames(&filenames);
4123 pi3_logger.AppendAllFilenames(&filenames);
4124 }
4125
4126 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004127 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004128 auto result = ConfirmReadable(filenames);
4129
4130 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
4131 EXPECT_THAT(result[0].second,
4132 ::testing::ElementsAre(realtime_clock::epoch() +
Austin Schuhac6d89e2024-03-27 14:56:09 -07004133 chrono::microseconds(11000300)));
Naman Guptaa63aa132023-03-22 20:06:34 -07004134
4135 EXPECT_THAT(result[1].first,
4136 ::testing::ElementsAre(
4137 realtime_clock::epoch(),
4138 realtime_clock::epoch() + chrono::microseconds(6005000)));
4139 EXPECT_THAT(result[1].second,
4140 ::testing::ElementsAre(
Austin Schuhac6d89e2024-03-27 14:56:09 -07004141 realtime_clock::epoch() + chrono::microseconds(4900100),
4142 realtime_clock::epoch() + chrono::microseconds(11000150)));
Naman Guptaa63aa132023-03-22 20:06:34 -07004143
4144 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
4145 EXPECT_THAT(result[2].second,
4146 ::testing::ElementsAre(realtime_clock::epoch() +
Austin Schuhac6d89e2024-03-27 14:56:09 -07004147 chrono::microseconds(11000100)));
Naman Guptaa63aa132023-03-22 20:06:34 -07004148
4149 // Confirm we observed the correct start and stop times. We should see the
4150 // reboot here.
4151 auto start_stop_result = ConfirmReadable(
4152 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
4153 realtime_clock::epoch() + chrono::milliseconds(8000));
4154
4155 EXPECT_THAT(
4156 start_stop_result[0].first,
4157 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
4158 EXPECT_THAT(
4159 start_stop_result[0].second,
4160 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
4161 EXPECT_THAT(start_stop_result[1].first,
4162 ::testing::ElementsAre(
4163 realtime_clock::epoch() + chrono::seconds(2),
4164 realtime_clock::epoch() + chrono::microseconds(6005000)));
4165 EXPECT_THAT(start_stop_result[1].second,
4166 ::testing::ElementsAre(
Austin Schuhac6d89e2024-03-27 14:56:09 -07004167 realtime_clock::epoch() + chrono::microseconds(4900100),
Naman Guptaa63aa132023-03-22 20:06:34 -07004168 realtime_clock::epoch() + chrono::seconds(8)));
4169 EXPECT_THAT(
4170 start_stop_result[2].first,
4171 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
4172 EXPECT_THAT(
4173 start_stop_result[2].second,
4174 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
4175}
4176
4177// Tests that we properly handle one direction being down.
4178TEST(MissingDirectionTest, OneDirection) {
Austin Schuh7e417682023-08-11 17:05:30 -07004179 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4180 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4181
Naman Guptaa63aa132023-03-22 20:06:34 -07004182 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4183 aos::configuration::ReadConfig(ArtifactPath(
4184 "aos/events/logging/multinode_pingpong_split4_config.json"));
4185 message_bridge::TestingTimeConverter time_converter(
4186 configuration::NodesCount(&config.message()));
4187 SimulatedEventLoopFactory event_loop_factory(&config.message());
4188 event_loop_factory.SetTimeConverter(&time_converter);
4189
4190 NodeEventLoopFactory *const pi1 =
4191 event_loop_factory.GetNodeEventLoopFactory("pi1");
4192 const size_t pi1_index = configuration::GetNodeIndex(
4193 event_loop_factory.configuration(), pi1->node());
4194 NodeEventLoopFactory *const pi2 =
4195 event_loop_factory.GetNodeEventLoopFactory("pi2");
4196 const size_t pi2_index = configuration::GetNodeIndex(
4197 event_loop_factory.configuration(), pi2->node());
4198 std::vector<std::string> filenames;
4199
4200 {
4201 CHECK_EQ(pi1_index, 0u);
4202 CHECK_EQ(pi2_index, 1u);
4203
4204 time_converter.AddNextTimestamp(
4205 distributed_clock::epoch(),
4206 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4207
4208 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
4209 time_converter.AddNextTimestamp(
4210 distributed_clock::epoch() + reboot_time,
4211 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
4212 BootTimestamp::epoch() + reboot_time});
4213 }
4214
4215 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004216 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004217 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004218 aos::testing::TestTmpDir() + "/logs/multi_logfile1.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004219
4220 pi2->Disconnect(pi1->node());
4221
4222 pi1->AlwaysStart<Ping>("ping");
4223 pi2->AlwaysStart<Pong>("pong");
4224
4225 {
4226 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004227 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4228 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004229
4230 event_loop_factory.RunFor(chrono::milliseconds(95));
4231
4232 pi2_logger.StartLogger(kLogfile2_1);
4233
4234 event_loop_factory.RunFor(chrono::milliseconds(6000));
4235
4236 pi2->Connect(pi1->node());
4237
4238 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004239 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4240 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004241 pi1_logger.StartLogger(kLogfile1_1);
4242
4243 event_loop_factory.RunFor(chrono::milliseconds(5000));
4244 pi1_logger.AppendAllFilenames(&filenames);
4245 pi2_logger.AppendAllFilenames(&filenames);
4246 }
4247
4248 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004249 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004250 ConfirmReadable(filenames);
4251}
4252
4253// Tests that we properly handle only one direction ever existing after a
4254// reboot.
4255TEST(MissingDirectionTest, OneDirectionAfterReboot) {
Austin Schuh7e417682023-08-11 17:05:30 -07004256 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4257 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4258
Naman Guptaa63aa132023-03-22 20:06:34 -07004259 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4260 aos::configuration::ReadConfig(ArtifactPath(
4261 "aos/events/logging/multinode_pingpong_split4_config.json"));
4262 message_bridge::TestingTimeConverter time_converter(
4263 configuration::NodesCount(&config.message()));
4264 SimulatedEventLoopFactory event_loop_factory(&config.message());
4265 event_loop_factory.SetTimeConverter(&time_converter);
4266
4267 NodeEventLoopFactory *const pi1 =
4268 event_loop_factory.GetNodeEventLoopFactory("pi1");
4269 const size_t pi1_index = configuration::GetNodeIndex(
4270 event_loop_factory.configuration(), pi1->node());
4271 NodeEventLoopFactory *const pi2 =
4272 event_loop_factory.GetNodeEventLoopFactory("pi2");
4273 const size_t pi2_index = configuration::GetNodeIndex(
4274 event_loop_factory.configuration(), pi2->node());
4275 std::vector<std::string> filenames;
4276
4277 {
4278 CHECK_EQ(pi1_index, 0u);
4279 CHECK_EQ(pi2_index, 1u);
4280
4281 time_converter.AddNextTimestamp(
4282 distributed_clock::epoch(),
4283 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4284
4285 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
4286 time_converter.AddNextTimestamp(
4287 distributed_clock::epoch() + reboot_time,
4288 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
4289 BootTimestamp::epoch() + reboot_time});
4290 }
4291
4292 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004293 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004294
4295 pi1->AlwaysStart<Ping>("ping");
4296
4297 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
4298 // makes it such that we will only get timestamps from pi1 -> pi2 on the
4299 // second boot.
4300 {
4301 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004302 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4303 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004304
4305 event_loop_factory.RunFor(chrono::milliseconds(95));
4306
4307 pi2_logger.StartLogger(kLogfile2_1);
4308
4309 event_loop_factory.RunFor(chrono::milliseconds(4000));
4310
4311 pi2->Disconnect(pi1->node());
4312
4313 event_loop_factory.RunFor(chrono::milliseconds(1000));
4314 pi1->AlwaysStart<Ping>("ping");
4315
4316 event_loop_factory.RunFor(chrono::milliseconds(5000));
4317 pi2_logger.AppendAllFilenames(&filenames);
4318 }
4319
4320 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004321 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004322 ConfirmReadable(filenames);
4323}
4324
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004325// Tests that we properly handle only one direction ever existing after a
4326// reboot with only reliable data.
Naman Guptaa63aa132023-03-22 20:06:34 -07004327TEST(MissingDirectionTest, OneDirectionAfterRebootReliable) {
Austin Schuh7e417682023-08-11 17:05:30 -07004328 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4329 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4330
Naman Guptaa63aa132023-03-22 20:06:34 -07004331 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004332 aos::configuration::ReadConfig(
4333 ArtifactPath("aos/events/logging/"
4334 "multinode_pingpong_split4_reliable_config.json"));
Naman Guptaa63aa132023-03-22 20:06:34 -07004335 message_bridge::TestingTimeConverter time_converter(
4336 configuration::NodesCount(&config.message()));
4337 SimulatedEventLoopFactory event_loop_factory(&config.message());
4338 event_loop_factory.SetTimeConverter(&time_converter);
4339
4340 NodeEventLoopFactory *const pi1 =
4341 event_loop_factory.GetNodeEventLoopFactory("pi1");
4342 const size_t pi1_index = configuration::GetNodeIndex(
4343 event_loop_factory.configuration(), pi1->node());
4344 NodeEventLoopFactory *const pi2 =
4345 event_loop_factory.GetNodeEventLoopFactory("pi2");
4346 const size_t pi2_index = configuration::GetNodeIndex(
4347 event_loop_factory.configuration(), pi2->node());
4348 std::vector<std::string> filenames;
4349
4350 {
4351 CHECK_EQ(pi1_index, 0u);
4352 CHECK_EQ(pi2_index, 1u);
4353
4354 time_converter.AddNextTimestamp(
4355 distributed_clock::epoch(),
4356 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4357
4358 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
4359 time_converter.AddNextTimestamp(
4360 distributed_clock::epoch() + reboot_time,
4361 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
4362 BootTimestamp::epoch() + reboot_time});
4363 }
4364
4365 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004366 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004367
4368 pi1->AlwaysStart<Ping>("ping");
4369
4370 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
4371 // makes it such that we will only get timestamps from pi1 -> pi2 on the
4372 // second boot.
4373 {
4374 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004375 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4376 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004377
4378 event_loop_factory.RunFor(chrono::milliseconds(95));
4379
4380 pi2_logger.StartLogger(kLogfile2_1);
4381
4382 event_loop_factory.RunFor(chrono::milliseconds(4000));
4383
4384 pi2->Disconnect(pi1->node());
4385
4386 event_loop_factory.RunFor(chrono::milliseconds(1000));
4387 pi1->AlwaysStart<Ping>("ping");
4388
4389 event_loop_factory.RunFor(chrono::milliseconds(5000));
4390 pi2_logger.AppendAllFilenames(&filenames);
4391 }
4392
4393 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004394 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004395 ConfirmReadable(filenames);
4396}
4397
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004398// Tests that we properly handle only one direction ever existing after a
4399// reboot with mixed unreliable vs reliable, where reliable has an earlier
4400// timestamp than unreliable.
Brian Smartte67d7112023-03-20 12:06:30 -07004401TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase1) {
Austin Schuh7e417682023-08-11 17:05:30 -07004402 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4403 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4404
Brian Smartte67d7112023-03-20 12:06:30 -07004405 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4406 aos::configuration::ReadConfig(ArtifactPath(
4407 "aos/events/logging/multinode_pingpong_split4_mixed1_config.json"));
4408 message_bridge::TestingTimeConverter time_converter(
4409 configuration::NodesCount(&config.message()));
4410 SimulatedEventLoopFactory event_loop_factory(&config.message());
4411 event_loop_factory.SetTimeConverter(&time_converter);
4412
4413 NodeEventLoopFactory *const pi1 =
4414 event_loop_factory.GetNodeEventLoopFactory("pi1");
4415 const size_t pi1_index = configuration::GetNodeIndex(
4416 event_loop_factory.configuration(), pi1->node());
4417 NodeEventLoopFactory *const pi2 =
4418 event_loop_factory.GetNodeEventLoopFactory("pi2");
4419 const size_t pi2_index = configuration::GetNodeIndex(
4420 event_loop_factory.configuration(), pi2->node());
4421 std::vector<std::string> filenames;
4422
4423 {
4424 CHECK_EQ(pi1_index, 0u);
4425 CHECK_EQ(pi2_index, 1u);
4426
4427 time_converter.AddNextTimestamp(
4428 distributed_clock::epoch(),
4429 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4430
4431 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
4432 time_converter.AddNextTimestamp(
4433 distributed_clock::epoch() + reboot_time,
4434 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
4435 BootTimestamp::epoch() + reboot_time});
4436 }
4437
4438 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004439 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Brian Smartte67d7112023-03-20 12:06:30 -07004440
4441 // The following sequence using the above reference config creates
4442 // a reliable message timestamp < unreliable message timestamp.
4443 {
4444 pi1->DisableStatistics();
4445 pi2->DisableStatistics();
4446
4447 event_loop_factory.RunFor(chrono::milliseconds(95));
4448
4449 pi1->AlwaysStart<Ping>("ping");
4450
4451 event_loop_factory.RunFor(chrono::milliseconds(5250));
4452
4453 pi1->EnableStatistics();
4454
4455 event_loop_factory.RunFor(chrono::milliseconds(1000));
4456
4457 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004458 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4459 FileStrategy::kKeepSeparate);
Brian Smartte67d7112023-03-20 12:06:30 -07004460
4461 pi2_logger.StartLogger(kLogfile2_1);
4462
4463 event_loop_factory.RunFor(chrono::milliseconds(5000));
4464 pi2_logger.AppendAllFilenames(&filenames);
4465 }
4466
4467 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004468 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Brian Smartte67d7112023-03-20 12:06:30 -07004469 ConfirmReadable(filenames);
4470}
4471
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004472// Tests that we properly handle only one direction ever existing after a
4473// reboot with mixed unreliable vs reliable, where unreliable has an earlier
4474// timestamp than reliable.
Brian Smartte67d7112023-03-20 12:06:30 -07004475TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase2) {
Austin Schuh7e417682023-08-11 17:05:30 -07004476 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4477 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4478
Brian Smartte67d7112023-03-20 12:06:30 -07004479 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4480 aos::configuration::ReadConfig(ArtifactPath(
4481 "aos/events/logging/multinode_pingpong_split4_mixed2_config.json"));
4482 message_bridge::TestingTimeConverter time_converter(
4483 configuration::NodesCount(&config.message()));
4484 SimulatedEventLoopFactory event_loop_factory(&config.message());
4485 event_loop_factory.SetTimeConverter(&time_converter);
4486
4487 NodeEventLoopFactory *const pi1 =
4488 event_loop_factory.GetNodeEventLoopFactory("pi1");
4489 const size_t pi1_index = configuration::GetNodeIndex(
4490 event_loop_factory.configuration(), pi1->node());
4491 NodeEventLoopFactory *const pi2 =
4492 event_loop_factory.GetNodeEventLoopFactory("pi2");
4493 const size_t pi2_index = configuration::GetNodeIndex(
4494 event_loop_factory.configuration(), pi2->node());
4495 std::vector<std::string> filenames;
4496
4497 {
4498 CHECK_EQ(pi1_index, 0u);
4499 CHECK_EQ(pi2_index, 1u);
4500
4501 time_converter.AddNextTimestamp(
4502 distributed_clock::epoch(),
4503 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4504
4505 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
4506 time_converter.AddNextTimestamp(
4507 distributed_clock::epoch() + reboot_time,
4508 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
4509 BootTimestamp::epoch() + reboot_time});
4510 }
4511
4512 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004513 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Brian Smartte67d7112023-03-20 12:06:30 -07004514
4515 // The following sequence using the above reference config creates
4516 // an unreliable message timestamp < reliable message timestamp.
4517 {
4518 pi1->DisableStatistics();
4519 pi2->DisableStatistics();
4520
4521 event_loop_factory.RunFor(chrono::milliseconds(95));
4522
4523 pi1->AlwaysStart<Ping>("ping");
4524
4525 event_loop_factory.RunFor(chrono::milliseconds(5250));
4526
4527 pi1->EnableStatistics();
4528
4529 event_loop_factory.RunFor(chrono::milliseconds(1000));
4530
4531 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004532 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4533 FileStrategy::kKeepSeparate);
Brian Smartte67d7112023-03-20 12:06:30 -07004534
4535 pi2_logger.StartLogger(kLogfile2_1);
4536
4537 event_loop_factory.RunFor(chrono::milliseconds(5000));
4538 pi2_logger.AppendAllFilenames(&filenames);
4539 }
4540
4541 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004542 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Brian Smartte67d7112023-03-20 12:06:30 -07004543 ConfirmReadable(filenames);
4544}
4545
Naman Guptaa63aa132023-03-22 20:06:34 -07004546// Tests that we properly handle what used to be a time violation in one
4547// direction. This can occur when one direction goes down after sending some
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004548// data, but the other keeps working. The down direction ends up resolving to
4549// a straight line in the noncausal filter, where the direction which is still
4550// up can cross that line. Really, time progressed along just fine but we
4551// assumed that the offset was a line when it could have deviated by up to
4552// 1ms/second.
Naman Guptaa63aa132023-03-22 20:06:34 -07004553TEST_P(MultinodeLoggerTest, OneDirectionTimeDrift) {
4554 std::vector<std::string> filenames;
4555
4556 CHECK_EQ(pi1_index_, 0u);
4557 CHECK_EQ(pi2_index_, 1u);
4558
4559 time_converter_.AddNextTimestamp(
4560 distributed_clock::epoch(),
4561 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4562
4563 const chrono::nanoseconds before_disconnect_duration =
4564 time_converter_.AddMonotonic(
4565 {chrono::milliseconds(1000), chrono::milliseconds(1000)});
4566
4567 const chrono::nanoseconds test_duration =
4568 time_converter_.AddMonotonic(
4569 {chrono::milliseconds(1000), chrono::milliseconds(1000)}) +
4570 time_converter_.AddMonotonic(
4571 {chrono::milliseconds(10000),
4572 chrono::milliseconds(10000) - chrono::milliseconds(5)}) +
4573 time_converter_.AddMonotonic(
4574 {chrono::milliseconds(10000),
4575 chrono::milliseconds(10000) + chrono::milliseconds(5)});
4576
4577 const std::string kLogfile =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004578 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004579
4580 {
4581 LoggerState pi2_logger = MakeLogger(pi2_);
4582 pi2_logger.StartLogger(kLogfile);
4583 event_loop_factory_.RunFor(before_disconnect_duration);
4584
4585 pi2_->Disconnect(pi1_->node());
4586
4587 event_loop_factory_.RunFor(test_duration);
4588 pi2_->Connect(pi1_->node());
4589
4590 event_loop_factory_.RunFor(chrono::milliseconds(5000));
4591 pi2_logger.AppendAllFilenames(&filenames);
4592 }
4593
4594 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004595 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004596 ConfirmReadable(filenames);
4597}
4598
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004599// Tests that we can replay a logfile that has timestamps such that at least
4600// one node's epoch is at a positive distributed_clock (and thus will have to
4601// be booted after the other node(s)).
Naman Guptaa63aa132023-03-22 20:06:34 -07004602TEST_P(MultinodeLoggerTest, StartOneNodeBeforeOther) {
4603 std::vector<std::string> filenames;
4604
4605 CHECK_EQ(pi1_index_, 0u);
4606 CHECK_EQ(pi2_index_, 1u);
4607
4608 time_converter_.AddNextTimestamp(
4609 distributed_clock::epoch(),
4610 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4611
4612 const chrono::nanoseconds before_reboot_duration = chrono::milliseconds(1000);
4613 time_converter_.RebootAt(
4614 0, distributed_clock::time_point(before_reboot_duration));
4615
4616 const chrono::nanoseconds test_duration = time_converter_.AddMonotonic(
4617 {chrono::milliseconds(10000), chrono::milliseconds(10000)});
4618
4619 const std::string kLogfile =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004620 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004621
4622 pi2_->Disconnect(pi1_->node());
4623 pi1_->Disconnect(pi2_->node());
4624
4625 {
4626 LoggerState pi2_logger = MakeLogger(pi2_);
4627
4628 pi2_logger.StartLogger(kLogfile);
4629 event_loop_factory_.RunFor(before_reboot_duration);
4630
4631 pi2_->Connect(pi1_->node());
4632 pi1_->Connect(pi2_->node());
4633
4634 event_loop_factory_.RunFor(test_duration);
4635
4636 pi2_logger.AppendAllFilenames(&filenames);
4637 }
4638
4639 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004640 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004641 ConfirmReadable(filenames);
4642
4643 {
4644 LogReader reader(sorted_parts);
4645 SimulatedEventLoopFactory replay_factory(reader.configuration());
4646 reader.RegisterWithoutStarting(&replay_factory);
4647
4648 NodeEventLoopFactory *const replay_node =
4649 reader.event_loop_factory()->GetNodeEventLoopFactory("pi1");
4650
4651 std::unique_ptr<EventLoop> test_event_loop =
4652 replay_node->MakeEventLoop("test_reader");
4653 replay_node->OnStartup([replay_node]() {
4654 // Check that we didn't boot until at least t=0.
4655 CHECK_LE(monotonic_clock::epoch(), replay_node->monotonic_now());
4656 });
4657 test_event_loop->OnRun([&test_event_loop]() {
4658 // Check that we didn't boot until at least t=0.
4659 EXPECT_LE(monotonic_clock::epoch(), test_event_loop->monotonic_now());
4660 });
4661 reader.event_loop_factory()->Run();
4662 reader.Deregister();
4663 }
4664}
4665
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004666// Tests that when we have a loop without all the logs at all points in time,
4667// we can sort it properly.
Naman Guptaa63aa132023-03-22 20:06:34 -07004668TEST(MultinodeLoggerLoopTest, Loop) {
Austin Schuh7e417682023-08-11 17:05:30 -07004669 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4670 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4671
Naman Guptaa63aa132023-03-22 20:06:34 -07004672 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004673 aos::configuration::ReadConfig(
4674 ArtifactPath("aos/events/logging/"
4675 "multinode_pingpong_triangle_split_config.json"));
Naman Guptaa63aa132023-03-22 20:06:34 -07004676 message_bridge::TestingTimeConverter time_converter(
4677 configuration::NodesCount(&config.message()));
4678 SimulatedEventLoopFactory event_loop_factory(&config.message());
4679 event_loop_factory.SetTimeConverter(&time_converter);
4680
4681 NodeEventLoopFactory *const pi1 =
4682 event_loop_factory.GetNodeEventLoopFactory("pi1");
4683 NodeEventLoopFactory *const pi2 =
4684 event_loop_factory.GetNodeEventLoopFactory("pi2");
4685 NodeEventLoopFactory *const pi3 =
4686 event_loop_factory.GetNodeEventLoopFactory("pi3");
4687
4688 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004689 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004690 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004691 aos::testing::TestTmpDir() + "/logs/multi_logfile2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004692 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004693 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004694
4695 {
4696 // Make pi1 boot before everything else.
4697 time_converter.AddNextTimestamp(
4698 distributed_clock::epoch(),
4699 {BootTimestamp::epoch(),
4700 BootTimestamp::epoch() - chrono::milliseconds(100),
4701 BootTimestamp::epoch() - chrono::milliseconds(300)});
4702 }
4703
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004704 // We want to setup a situation such that 2 of the 3 legs of the loop are
4705 // very confident about time being X, and the third leg is pulling the
4706 // average off to one side.
Naman Guptaa63aa132023-03-22 20:06:34 -07004707 //
4708 // It's easiest to visualize this in timestamp_plotter.
4709
4710 std::vector<std::string> filenames;
4711 {
4712 // Have pi1 send out a reliable message at startup. This sets up a long
4713 // forwarding time message at the start to bias time.
4714 std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
4715 {
4716 aos::Sender<examples::Ping> ping_sender =
4717 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
4718
4719 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
4720 examples::Ping::Builder ping_builder =
4721 builder.MakeBuilder<examples::Ping>();
4722 CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
4723 }
4724
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004725 // Wait a while so there's enough data to let the worst case be rather
4726 // off.
Naman Guptaa63aa132023-03-22 20:06:34 -07004727 event_loop_factory.RunFor(chrono::seconds(1000));
4728
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004729 // Now start a receiving node first. This sets up 2 tight bounds between
4730 // 2 of the nodes.
Naman Guptaa63aa132023-03-22 20:06:34 -07004731 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004732 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4733 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004734 pi2_logger.StartLogger(kLogfile2_1);
4735
4736 event_loop_factory.RunFor(chrono::seconds(100));
4737
4738 // And now start the third leg.
4739 LoggerState pi3_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004740 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4741 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004742 pi3_logger.StartLogger(kLogfile3_1);
4743
4744 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004745 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4746 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004747 pi1_logger.StartLogger(kLogfile1_1);
4748
4749 event_loop_factory.RunFor(chrono::seconds(100));
4750
4751 pi1_logger.AppendAllFilenames(&filenames);
4752 pi2_logger.AppendAllFilenames(&filenames);
4753 pi3_logger.AppendAllFilenames(&filenames);
4754 }
4755
4756 // Make sure we can read this.
4757 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004758 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004759 auto result = ConfirmReadable(filenames);
4760}
4761
Austin Schuh08dba8f2023-05-01 08:29:30 -07004762// Tests that RestartLogging works in the simple case. Unfortunately, the
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004763// failure cases involve simulating time elapsing in callbacks, which is
4764// really hard. The best we can reasonably do is make sure 2 back to back
4765// logs are parseable together.
Austin Schuh08dba8f2023-05-01 08:29:30 -07004766TEST_P(MultinodeLoggerTest, RestartLogging) {
4767 time_converter_.AddMonotonic(
4768 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
4769 std::vector<std::string> filenames;
4770 {
4771 LoggerState pi1_logger = MakeLogger(pi1_);
4772
4773 event_loop_factory_.RunFor(chrono::milliseconds(95));
4774
4775 StartLogger(&pi1_logger, logfile_base1_);
4776 aos::monotonic_clock::time_point last_rotation_time =
4777 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07004778 pi1_logger.logger->set_on_logged_period(
4779 [&](aos::monotonic_clock::time_point) {
4780 const auto now = pi1_logger.event_loop->monotonic_now();
4781 if (now > last_rotation_time + std::chrono::seconds(5)) {
4782 pi1_logger.AppendAllFilenames(&filenames);
4783 std::unique_ptr<MultiNodeFilesLogNamer> namer =
4784 pi1_logger.MakeLogNamer(logfile_base2_);
4785 pi1_logger.log_namer = namer.get();
Austin Schuh08dba8f2023-05-01 08:29:30 -07004786
Austin Schuh2f864452023-07-17 14:53:08 -07004787 pi1_logger.logger->RestartLogging(std::move(namer));
4788 last_rotation_time = now;
4789 }
4790 });
Austin Schuh08dba8f2023-05-01 08:29:30 -07004791
4792 event_loop_factory_.RunFor(chrono::milliseconds(7000));
4793
4794 pi1_logger.AppendAllFilenames(&filenames);
4795 }
4796
4797 for (const auto &x : filenames) {
4798 LOG(INFO) << x;
4799 }
4800
4801 EXPECT_GE(filenames.size(), 2u);
4802
4803 ConfirmReadable(filenames);
4804
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004805 // TODO(austin): It would be good to confirm that any one time messages end
4806 // up in both logs correctly.
Austin Schuh08dba8f2023-05-01 08:29:30 -07004807}
4808
Austin Schuh6e93fc22023-08-22 21:27:22 -07004809// Tests that we call OnEnd without --skip_missing_forwarding_entries.
4810TEST_P(MultinodeLoggerTest, SkipMissingForwardingEntries) {
4811 if (file_strategy() == FileStrategy::kCombine) {
4812 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
4813 }
4814 time_converter_.AddMonotonic(
4815 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
4816
4817 std::vector<std::string> filenames;
4818 {
4819 LoggerState pi1_logger = MakeLogger(pi1_);
4820
4821 event_loop_factory_.RunFor(chrono::milliseconds(95));
4822
4823 StartLogger(&pi1_logger);
4824 aos::monotonic_clock::time_point last_rotation_time =
4825 pi1_logger.event_loop->monotonic_now();
4826 pi1_logger.logger->set_on_logged_period(
4827 [&](aos::monotonic_clock::time_point) {
4828 const auto now = pi1_logger.event_loop->monotonic_now();
4829 if (now > last_rotation_time + std::chrono::seconds(5)) {
4830 pi1_logger.logger->Rotate();
4831 last_rotation_time = now;
4832 }
4833 });
4834
4835 event_loop_factory_.RunFor(chrono::milliseconds(15000));
4836 pi1_logger.AppendAllFilenames(&filenames);
4837 }
4838
4839 // If we remove the last remote data part, we'll trigger missing data for
4840 // timestamps.
4841 filenames.erase(std::remove_if(filenames.begin(), filenames.end(),
4842 [](const std::string &s) {
4843 return s.find("data/pi2_data.part3.bfbs") !=
4844 std::string::npos;
4845 }),
4846 filenames.end());
4847
4848 auto result = ConfirmReadable(filenames);
4849}
4850
Austin Schuh54ffea42023-08-23 13:27:04 -07004851// Tests that we call OnEnd without --skip_missing_forwarding_entries.
4852TEST(MultinodeLoggerConfigTest, SingleNode) {
4853 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4854 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4855
4856 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4857 aos::configuration::ReadConfig(
4858 ArtifactPath("aos/events/logging/multinode_single_node_config.json"));
4859 message_bridge::TestingTimeConverter time_converter(
4860 configuration::NodesCount(&config.message()));
4861 SimulatedEventLoopFactory event_loop_factory(&config.message());
4862 event_loop_factory.SetTimeConverter(&time_converter);
4863
4864 time_converter.StartEqual();
4865
4866 const std::string kLogfile1_1 =
4867 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
4868
4869 NodeEventLoopFactory *const pi1 =
4870 event_loop_factory.GetNodeEventLoopFactory("pi1");
4871
4872 std::vector<std::string> filenames;
4873
4874 {
4875 // Now start a receiving node first. This sets up 2 tight bounds between
4876 // 2 of the nodes.
4877 LoggerState pi1_logger = MakeLoggerState(
4878 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4879 FileStrategy::kKeepSeparate);
4880 pi1_logger.StartLogger(kLogfile1_1);
4881
4882 event_loop_factory.RunFor(chrono::seconds(10));
4883
4884 pi1_logger.AppendAllFilenames(&filenames);
4885 }
4886
4887 // Make sure we can read this.
4888 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4889 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
4890 auto result = ConfirmReadable(filenames);
4891
4892 // TODO(austin): Probably want to stop caring about ServerStatistics,
4893 // ClientStatistics, and Timestamp since they don't really make sense.
4894}
4895
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004896// Tests that when we have evidence of 2 boots, and then start logging, the
4897// max_out_of_order_duration ends up reasonable on the boot with the start time.
4898TEST(MultinodeLoggerLoopTest, PreviousBootData) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07004899 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4900 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4901
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004902 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4903 aos::configuration::ReadConfig(ArtifactPath(
4904 "aos/events/logging/multinode_pingpong_reboot_ooo_config.json"));
4905 message_bridge::TestingTimeConverter time_converter(
4906 configuration::NodesCount(&config.message()));
4907 SimulatedEventLoopFactory event_loop_factory(&config.message());
4908 event_loop_factory.SetTimeConverter(&time_converter);
4909
4910 const UUID pi1_boot0 = UUID::Random();
4911 const UUID pi2_boot0 = UUID::Random();
4912 const UUID pi2_boot1 = UUID::Random();
4913
4914 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004915 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004916
4917 {
4918 constexpr size_t kPi1Index = 0;
4919 constexpr size_t kPi2Index = 1;
4920 time_converter.set_boot_uuid(kPi1Index, 0, pi1_boot0);
4921 time_converter.set_boot_uuid(kPi2Index, 0, pi2_boot0);
4922 time_converter.set_boot_uuid(kPi2Index, 1, pi2_boot1);
4923
4924 // Make pi1 boot before everything else.
4925 time_converter.AddNextTimestamp(
4926 distributed_clock::epoch(),
4927 {BootTimestamp::epoch(),
4928 BootTimestamp::epoch() - chrono::milliseconds(100)});
4929
4930 const chrono::nanoseconds reboot_time = chrono::seconds(1005);
4931 time_converter.AddNextTimestamp(
4932 distributed_clock::epoch() + reboot_time,
4933 {BootTimestamp::epoch() + reboot_time,
4934 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()}});
4935 }
4936
4937 NodeEventLoopFactory *const pi1 =
4938 event_loop_factory.GetNodeEventLoopFactory("pi1");
4939 NodeEventLoopFactory *const pi2 =
4940 event_loop_factory.GetNodeEventLoopFactory("pi2");
4941
4942 // What we want is for pi2 to send a message at t=1000 on the first channel
4943 // (/atest1 pong), and t=1 on the second channel (/atest3 pong). That'll make
4944 // the max out of order duration be large.
4945 //
4946 // Then, we reboot, and only send messages on a third channel (/atest2 pong).
4947 // The order is key, they need to sort in this order in the config.
4948
4949 std::vector<std::string> filenames;
4950 {
4951 {
4952 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4953 aos::Sender<examples::Pong> pong_sender =
4954 pi2_event_loop->MakeSender<examples::Pong>("/atest3");
4955
4956 pi2_event_loop->OnRun([&]() {
4957 aos::Sender<examples::Pong>::Builder builder =
4958 pong_sender.MakeBuilder();
4959 examples::Pong::Builder pong_builder =
4960 builder.MakeBuilder<examples::Pong>();
4961 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4962 });
4963
4964 event_loop_factory.RunFor(chrono::seconds(1000));
4965 }
4966
4967 {
4968 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4969 aos::Sender<examples::Pong> pong_sender =
4970 pi2_event_loop->MakeSender<examples::Pong>("/atest1");
4971
4972 aos::Sender<examples::Pong>::Builder builder = pong_sender.MakeBuilder();
4973 examples::Pong::Builder pong_builder =
4974 builder.MakeBuilder<examples::Pong>();
4975 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4976 }
4977
4978 event_loop_factory.RunFor(chrono::seconds(10));
4979
4980 // Now start a receiving node first. This sets up 2 tight bounds between
4981 // 2 of the nodes.
4982 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004983 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4984 FileStrategy::kKeepSeparate);
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004985 pi1_logger.StartLogger(kLogfile1_1);
4986
4987 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4988 aos::Sender<examples::Pong> pong_sender =
4989 pi2_event_loop->MakeSender<examples::Pong>("/atest2");
4990
4991 pi2_event_loop->AddPhasedLoop(
4992 [&pong_sender](int) {
4993 aos::Sender<examples::Pong>::Builder builder =
4994 pong_sender.MakeBuilder();
4995 examples::Pong::Builder pong_builder =
4996 builder.MakeBuilder<examples::Pong>();
4997 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4998 },
4999 chrono::milliseconds(10));
5000
5001 event_loop_factory.RunFor(chrono::seconds(100));
5002
5003 pi1_logger.AppendAllFilenames(&filenames);
5004 }
5005
5006 // Make sure we can read this.
5007 const std::vector<LogFile> sorted_parts = SortParts(filenames);
5008 EXPECT_TRUE(AllRebootPartsMatchOutOfOrderDuration(sorted_parts, "pi2"));
5009 auto result = ConfirmReadable(filenames);
5010}
5011
5012// Tests that when we start without a connection, and then start logging, the
5013// max_out_of_order_duration ends up reasonable.
5014TEST(MultinodeLoggerLoopTest, StartDisconnected) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07005015 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
5016 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
5017
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07005018 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
5019 aos::configuration::ReadConfig(ArtifactPath(
5020 "aos/events/logging/multinode_pingpong_reboot_ooo_config.json"));
5021 message_bridge::TestingTimeConverter time_converter(
5022 configuration::NodesCount(&config.message()));
5023 SimulatedEventLoopFactory event_loop_factory(&config.message());
5024 event_loop_factory.SetTimeConverter(&time_converter);
5025
5026 time_converter.StartEqual();
5027
5028 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07005029 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07005030
5031 NodeEventLoopFactory *const pi1 =
5032 event_loop_factory.GetNodeEventLoopFactory("pi1");
5033 NodeEventLoopFactory *const pi2 =
5034 event_loop_factory.GetNodeEventLoopFactory("pi2");
5035
5036 // What we want is for pi2 to send a message at t=1000 on the first channel
5037 // (/atest1 pong), and t=1 on the second channel (/atest3 pong). That'll make
5038 // the max out of order duration be large.
5039 //
5040 // Then, we disconnect, and only send messages on a third channel
5041 // (/atest2 pong). The order is key, they need to sort in this order in the
5042 // config so we observe them in the order which grows the
5043 // max_out_of_order_duration.
5044
5045 std::vector<std::string> filenames;
5046 {
5047 {
5048 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
5049 aos::Sender<examples::Pong> pong_sender =
5050 pi2_event_loop->MakeSender<examples::Pong>("/atest3");
5051
5052 pi2_event_loop->OnRun([&]() {
5053 aos::Sender<examples::Pong>::Builder builder =
5054 pong_sender.MakeBuilder();
5055 examples::Pong::Builder pong_builder =
5056 builder.MakeBuilder<examples::Pong>();
5057 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
5058 });
5059
5060 event_loop_factory.RunFor(chrono::seconds(1000));
5061 }
5062
5063 {
5064 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
5065 aos::Sender<examples::Pong> pong_sender =
5066 pi2_event_loop->MakeSender<examples::Pong>("/atest1");
5067
5068 aos::Sender<examples::Pong>::Builder builder = pong_sender.MakeBuilder();
5069 examples::Pong::Builder pong_builder =
5070 builder.MakeBuilder<examples::Pong>();
5071 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
5072 }
5073
5074 event_loop_factory.RunFor(chrono::seconds(10));
5075
5076 pi1->Disconnect(pi2->node());
5077 pi2->Disconnect(pi1->node());
5078
5079 // Make data flow.
5080 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
5081 aos::Sender<examples::Pong> pong_sender =
5082 pi2_event_loop->MakeSender<examples::Pong>("/atest2");
5083
5084 pi2_event_loop->AddPhasedLoop(
5085 [&pong_sender](int) {
5086 aos::Sender<examples::Pong>::Builder builder =
5087 pong_sender.MakeBuilder();
5088 examples::Pong::Builder pong_builder =
5089 builder.MakeBuilder<examples::Pong>();
5090 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
5091 },
5092 chrono::milliseconds(10));
5093
5094 event_loop_factory.RunFor(chrono::seconds(10));
5095
5096 // Now start a receiving node first. This sets up 2 tight bounds between
5097 // 2 of the nodes.
5098 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07005099 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
5100 FileStrategy::kKeepSeparate);
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07005101 pi1_logger.StartLogger(kLogfile1_1);
5102
5103 event_loop_factory.RunFor(chrono::seconds(10));
5104
5105 // Now, reconnect, and everything should recover.
5106 pi1->Connect(pi2->node());
5107 pi2->Connect(pi1->node());
5108
5109 event_loop_factory.RunFor(chrono::seconds(10));
5110
5111 pi1_logger.AppendAllFilenames(&filenames);
5112 }
5113
5114 // Make sure we can read this.
5115 const std::vector<LogFile> sorted_parts = SortParts(filenames);
5116 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
5117 auto result = ConfirmReadable(filenames);
5118}
5119
Austin Schuh633858f2024-03-22 14:34:19 -07005120// Tests that only having a delayed, reliable message from a boot results in a
5121// readable log.
5122//
5123// Note: this is disabled since it doesn't work yet. Un-disable this when the
5124// code is fixed!
Austin Schuhb5224ec2024-03-27 15:20:09 -07005125TEST(MultinodeLoggerLoopTest, ReliableOnlyTimestamps) {
Austin Schuh633858f2024-03-22 14:34:19 -07005126 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
5127 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
5128
5129 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
5130 aos::configuration::ReadConfig(
5131 ArtifactPath("aos/events/logging/"
5132 "multinode_pingpong_reboot_reliable_only_config.json"));
5133 message_bridge::TestingTimeConverter time_converter(
5134 configuration::NodesCount(&config.message()));
5135 SimulatedEventLoopFactory event_loop_factory(&config.message());
5136 event_loop_factory.SetTimeConverter(&time_converter);
5137
5138 constexpr chrono::nanoseconds kRebootTime = chrono::seconds(100);
5139 {
5140 time_converter.AddNextTimestamp(
5141 distributed_clock::epoch(),
5142 {BootTimestamp::epoch(), BootTimestamp::epoch()});
5143 time_converter.AddNextTimestamp(
5144 distributed_clock::epoch() + kRebootTime,
5145 {BootTimestamp::epoch() + kRebootTime,
5146 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()}});
Austin Schuh1124c512023-08-01 15:20:44 -07005147 }
5148
Austin Schuh633858f2024-03-22 14:34:19 -07005149 const std::string kLogfile1_1 =
5150 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
5151
5152 NodeEventLoopFactory *const pi1 =
5153 event_loop_factory.GetNodeEventLoopFactory("pi1");
5154
5155 // We want unreliable timestamps from one boot, a reliable timestamp from the
5156 // same boot, and then a long delayed reliable timestamp from the second boot.
5157 // This produces conflicting information about when the second boot happened.
5158 std::vector<std::string> filenames;
5159 PingSender *app1 = pi1->AlwaysStart<PingSender>("pingsender", "/atest1");
5160 PingSender *app2 = pi1->AlwaysStart<PingSender>("pingsender", "/atest2");
5161 event_loop_factory.RunFor(chrono::seconds(1));
5162 pi1->Stop(app2);
5163 event_loop_factory.RunFor(kRebootTime - chrono::seconds(2));
5164 pi1->Stop(app1);
5165
5166 event_loop_factory.RunFor(chrono::seconds(1) + kRebootTime * 2);
5167
5168 {
5169 // Collect a small log after reboot.
5170 LoggerState pi1_logger = MakeLoggerState(
5171 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
5172 FileStrategy::kKeepSeparate);
5173 pi1_logger.StartLogger(kLogfile1_1);
5174
5175 event_loop_factory.RunFor(chrono::seconds(1));
5176
5177 pi1_logger.AppendAllFilenames(&filenames);
5178 }
5179
5180 // Make sure we can read this.
5181 const std::vector<LogFile> sorted_parts = SortParts(filenames);
5182 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
5183 auto result = ConfirmReadable(filenames);
5184}
Austin Schuh1124c512023-08-01 15:20:44 -07005185
5186// Tests that we log correctly as nodes connect slowly.
5187TEST(MultinodeLoggerLoopTest, StaggeredConnect) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07005188 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
5189 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
5190
Austin Schuh1124c512023-08-01 15:20:44 -07005191 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
5192 aos::configuration::ReadConfig(ArtifactPath(
5193 "aos/events/logging/multinode_pingpong_pi3_pingpong_config.json"));
5194 message_bridge::TestingTimeConverter time_converter(
5195 configuration::NodesCount(&config.message()));
5196 SimulatedEventLoopFactory event_loop_factory(&config.message());
5197 event_loop_factory.SetTimeConverter(&time_converter);
5198
5199 time_converter.StartEqual();
5200
5201 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07005202 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Austin Schuh1124c512023-08-01 15:20:44 -07005203
5204 NodeEventLoopFactory *const pi1 =
5205 event_loop_factory.GetNodeEventLoopFactory("pi1");
5206 NodeEventLoopFactory *const pi2 =
5207 event_loop_factory.GetNodeEventLoopFactory("pi2");
5208 NodeEventLoopFactory *const pi3 =
5209 event_loop_factory.GetNodeEventLoopFactory("pi3");
5210
5211 // What we want is for pi2 to send a message at t=1000 on the first channel
5212 // (/atest1 pong), and t=1 on the second channel (/atest3 pong). That'll make
5213 // the max out of order duration be large.
5214 //
5215 // Then, we disconnect, and only send messages on a third channel
5216 // (/atest2 pong). The order is key, they need to sort in this order in the
5217 // config so we observe them in the order which grows the
5218 // max_out_of_order_duration.
5219
5220 pi1->Disconnect(pi2->node());
5221 pi2->Disconnect(pi1->node());
5222
5223 pi1->Disconnect(pi3->node());
5224 pi3->Disconnect(pi1->node());
5225
5226 std::vector<std::string> filenames;
5227 pi2->AlwaysStart<PongSender>("pongsender", "/test2");
5228 pi3->AlwaysStart<PongSender>("pongsender", "/test3");
5229
5230 event_loop_factory.RunFor(chrono::seconds(10));
5231
5232 {
5233 // Now start a receiving node first. This sets up 2 tight bounds between
5234 // 2 of the nodes.
5235 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07005236 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
5237 FileStrategy::kKeepSeparate);
Austin Schuh1124c512023-08-01 15:20:44 -07005238 pi1_logger.StartLogger(kLogfile1_1);
5239
5240 event_loop_factory.RunFor(chrono::seconds(10));
5241
5242 // Now, reconnect, and everything should recover.
5243 pi1->Connect(pi2->node());
5244 pi2->Connect(pi1->node());
5245
5246 event_loop_factory.RunFor(chrono::seconds(10));
5247
5248 pi1->Connect(pi3->node());
5249 pi3->Connect(pi1->node());
5250
5251 event_loop_factory.RunFor(chrono::seconds(10));
5252
5253 pi1_logger.AppendAllFilenames(&filenames);
5254 }
5255
5256 // Make sure we can read this.
5257 const std::vector<LogFile> sorted_parts = SortParts(filenames);
5258 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
5259 auto result = ConfirmReadable(filenames);
5260}
5261
Stephan Pleinesf63bde82024-01-13 15:59:33 -08005262} // namespace aos::logger::testing