blob: c252616eed994cf8dc543ed17aaca57e223b6b03 [file] [log] [blame]
Brian Smartte67d7112023-03-20 12:06:30 -07001#include <algorithm>
2
Philipp Schrader790cb542023-07-05 21:06:52 -07003#include "gmock/gmock.h"
4#include "gtest/gtest.h"
5
Naman Guptaa63aa132023-03-22 20:06:34 -07006#include "aos/events/logging/log_reader.h"
7#include "aos/events/logging/multinode_logger_test_lib.h"
8#include "aos/events/message_counter.h"
9#include "aos/events/ping_lib.h"
10#include "aos/events/pong_lib.h"
Austin Schuh3c9f92c2024-04-30 17:56:42 -070011#include "aos/flatbuffers/aligned_allocator.h"
Naman Guptaa63aa132023-03-22 20:06:34 -070012#include "aos/network/remote_message_generated.h"
13#include "aos/network/timestamp_generated.h"
14#include "aos/testing/tmpdir.h"
Naman Guptaa63aa132023-03-22 20:06:34 -070015
Stephan Pleinesf63bde82024-01-13 15:59:33 -080016namespace aos::logger::testing {
Naman Guptaa63aa132023-03-22 20:06:34 -070017
18namespace chrono = std::chrono;
19using aos::message_bridge::RemoteMessage;
20using aos::testing::ArtifactPath;
21using aos::testing::MessageCounter;
22
Naman Guptaa63aa132023-03-22 20:06:34 -070023INSTANTIATE_TEST_SUITE_P(
24 All, MultinodeLoggerTest,
25 ::testing::Combine(
26 ::testing::Values(
27 ConfigParams{"multinode_pingpong_combined_config.json", true,
Austin Schuh6ecfe902023-08-04 22:44:37 -070028 kCombinedConfigSha1(), kCombinedConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070029 FileStrategy::kKeepSeparate,
30 ForceTimestampBuffering::kForceBufferTimestamps},
31 ConfigParams{"multinode_pingpong_combined_config.json", true,
32 kCombinedConfigSha1(), kCombinedConfigSha1(),
33 FileStrategy::kKeepSeparate,
34 ForceTimestampBuffering::kAutoBuffer},
Naman Guptaa63aa132023-03-22 20:06:34 -070035 ConfigParams{"multinode_pingpong_split_config.json", false,
Austin Schuh6ecfe902023-08-04 22:44:37 -070036 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070037 FileStrategy::kKeepSeparate,
38 ForceTimestampBuffering::kForceBufferTimestamps},
Austin Schuh6ecfe902023-08-04 22:44:37 -070039 ConfigParams{"multinode_pingpong_split_config.json", false,
40 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070041 FileStrategy::kKeepSeparate,
42 ForceTimestampBuffering::kAutoBuffer},
43 ConfigParams{"multinode_pingpong_split_config.json", false,
44 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
45 FileStrategy::kCombine,
46 ForceTimestampBuffering::kForceBufferTimestamps},
47 ConfigParams{"multinode_pingpong_split_config.json", false,
48 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
49 FileStrategy::kCombine,
50 ForceTimestampBuffering::kAutoBuffer}),
Naman Guptaa63aa132023-03-22 20:06:34 -070051 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
52
53INSTANTIATE_TEST_SUITE_P(
54 All, MultinodeLoggerDeathTest,
55 ::testing::Combine(
56 ::testing::Values(
57 ConfigParams{"multinode_pingpong_combined_config.json", true,
Austin Schuh6ecfe902023-08-04 22:44:37 -070058 kCombinedConfigSha1(), kCombinedConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070059 FileStrategy::kKeepSeparate,
60 ForceTimestampBuffering::kForceBufferTimestamps},
61 ConfigParams{"multinode_pingpong_combined_config.json", true,
62 kCombinedConfigSha1(), kCombinedConfigSha1(),
63 FileStrategy::kKeepSeparate,
64 ForceTimestampBuffering::kAutoBuffer},
Naman Guptaa63aa132023-03-22 20:06:34 -070065 ConfigParams{"multinode_pingpong_split_config.json", false,
Austin Schuh6ecfe902023-08-04 22:44:37 -070066 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070067 FileStrategy::kKeepSeparate,
68 ForceTimestampBuffering::kForceBufferTimestamps},
Austin Schuh6ecfe902023-08-04 22:44:37 -070069 ConfigParams{"multinode_pingpong_split_config.json", false,
70 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
Austin Schuh63097262023-08-16 17:04:29 -070071 FileStrategy::kKeepSeparate,
72 ForceTimestampBuffering::kAutoBuffer},
73 ConfigParams{"multinode_pingpong_split_config.json", false,
74 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
75 FileStrategy::kCombine,
76 ForceTimestampBuffering::kForceBufferTimestamps},
77 ConfigParams{"multinode_pingpong_split_config.json", false,
78 kSplitConfigSha1(), kReloggedSplitConfigSha1(),
79 FileStrategy::kCombine,
80 ForceTimestampBuffering::kAutoBuffer}),
Naman Guptaa63aa132023-03-22 20:06:34 -070081 ::testing::ValuesIn(SupportedCompressionAlgorithms())));
82
Austin Schuh633858f2024-03-22 14:34:19 -070083// Class to spam Pong messages blindly.
84class PongSender {
85 public:
86 PongSender(EventLoop *loop, std::string_view channel_name)
87 : sender_(loop->MakeSender<examples::Pong>(channel_name)) {
88 loop->AddPhasedLoop(
89 [this](int) {
90 aos::Sender<examples::Pong>::Builder builder = sender_.MakeBuilder();
91 examples::Pong::Builder pong_builder =
92 builder.MakeBuilder<examples::Pong>();
93 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
94 },
95 chrono::milliseconds(10));
96 }
97
98 private:
99 aos::Sender<examples::Pong> sender_;
100};
101
102// Class to spam Ping messages blindly.
103class PingSender {
104 public:
105 PingSender(EventLoop *loop, std::string_view channel_name)
106 : sender_(loop->MakeSender<examples::Ping>(channel_name)) {
107 loop->AddPhasedLoop(
108 [this](int) {
109 aos::Sender<examples::Ping>::Builder builder = sender_.MakeBuilder();
110 examples::Ping::Builder ping_builder =
111 builder.MakeBuilder<examples::Ping>();
112 CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
113 },
114 chrono::milliseconds(10));
115 }
116
117 private:
118 aos::Sender<examples::Ping> sender_;
119};
120
Naman Guptaa63aa132023-03-22 20:06:34 -0700121// Tests that we can write and read simple multi-node log files.
122TEST_P(MultinodeLoggerTest, SimpleMultiNode) {
Austin Schuh6ecfe902023-08-04 22:44:37 -0700123 if (file_strategy() == FileStrategy::kCombine) {
124 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
125 }
126
Naman Guptaa63aa132023-03-22 20:06:34 -0700127 std::vector<std::string> actual_filenames;
128 time_converter_.StartEqual();
129
130 {
131 LoggerState pi1_logger = MakeLogger(pi1_);
132 LoggerState pi2_logger = MakeLogger(pi2_);
133
134 event_loop_factory_.RunFor(chrono::milliseconds(95));
135
136 StartLogger(&pi1_logger);
137 StartLogger(&pi2_logger);
138
139 event_loop_factory_.RunFor(chrono::milliseconds(20000));
140 pi1_logger.AppendAllFilenames(&actual_filenames);
141 pi2_logger.AppendAllFilenames(&actual_filenames);
142 }
143
144 ASSERT_THAT(actual_filenames,
145 ::testing::UnorderedElementsAreArray(logfiles_));
146
147 {
148 std::set<std::string> logfile_uuids;
149 std::set<std::string> parts_uuids;
150 // Confirm that we have the expected number of UUIDs for both the logfile
151 // UUIDs and parts UUIDs.
152 std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
153 for (std::string_view f : logfiles_) {
154 log_header.emplace_back(ReadHeader(f).value());
155 if (!log_header.back().message().has_configuration()) {
156 logfile_uuids.insert(
157 log_header.back().message().log_event_uuid()->str());
158 parts_uuids.insert(log_header.back().message().parts_uuid()->str());
159 }
160 }
161
162 EXPECT_EQ(logfile_uuids.size(), 2u);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700163 EXPECT_EQ(parts_uuids.size(), 8u);
Naman Guptaa63aa132023-03-22 20:06:34 -0700164
165 // And confirm everything is on the correct node.
166 EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
167 EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi1");
168 EXPECT_EQ(log_header[4].message().node()->name()->string_view(), "pi1");
169
170 EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
171 EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700172 EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700173
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700174 EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi1");
175 EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi1");
Naman Guptaa63aa132023-03-22 20:06:34 -0700176
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700177 EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi2");
178 EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700179
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700180 EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
181 EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
182 EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
Naman Guptaa63aa132023-03-22 20:06:34 -0700183
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700184 EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi1");
185 EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi1");
Naman Guptaa63aa132023-03-22 20:06:34 -0700186
187 // And the parts index matches.
188 EXPECT_EQ(log_header[2].message().parts_index(), 0);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700189
190 EXPECT_EQ(log_header[3].message().parts_index(), 0);
191 EXPECT_EQ(log_header[4].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700192
193 EXPECT_EQ(log_header[5].message().parts_index(), 0);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700194
195 EXPECT_EQ(log_header[6].message().parts_index(), 0);
196 EXPECT_EQ(log_header[7].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700197
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700198 EXPECT_EQ(log_header[8].message().parts_index(), 0);
199 EXPECT_EQ(log_header[9].message().parts_index(), 1);
Naman Guptaa63aa132023-03-22 20:06:34 -0700200
201 EXPECT_EQ(log_header[10].message().parts_index(), 0);
202 EXPECT_EQ(log_header[11].message().parts_index(), 1);
203
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700204 EXPECT_EQ(log_header[12].message().parts_index(), 0);
205 EXPECT_EQ(log_header[13].message().parts_index(), 1);
206 EXPECT_EQ(log_header[14].message().parts_index(), 2);
Naman Guptaa63aa132023-03-22 20:06:34 -0700207
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700208 EXPECT_EQ(log_header[15].message().parts_index(), 0);
209 EXPECT_EQ(log_header[16].message().parts_index(), 1);
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700210
211 // And that the data_stored field is right.
212 EXPECT_THAT(*log_header[2].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700213 ::testing::ElementsAre(StoredDataType::DATA));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700214 EXPECT_THAT(*log_header[3].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700215 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700216 EXPECT_THAT(*log_header[4].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700217 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700218
219 EXPECT_THAT(*log_header[5].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700220 ::testing::ElementsAre(StoredDataType::DATA));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700221 EXPECT_THAT(*log_header[6].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700222 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700223 EXPECT_THAT(*log_header[7].message().data_stored(),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700224 ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700225
226 EXPECT_THAT(*log_header[8].message().data_stored(),
227 ::testing::ElementsAre(StoredDataType::DATA));
228 EXPECT_THAT(*log_header[9].message().data_stored(),
229 ::testing::ElementsAre(StoredDataType::DATA));
230
231 EXPECT_THAT(*log_header[10].message().data_stored(),
232 ::testing::ElementsAre(StoredDataType::DATA));
233 EXPECT_THAT(*log_header[11].message().data_stored(),
234 ::testing::ElementsAre(StoredDataType::DATA));
235
236 EXPECT_THAT(*log_header[12].message().data_stored(),
237 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
238 EXPECT_THAT(*log_header[13].message().data_stored(),
239 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
240 EXPECT_THAT(*log_header[14].message().data_stored(),
241 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
242
243 EXPECT_THAT(*log_header[15].message().data_stored(),
244 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
245 EXPECT_THAT(*log_header[16].message().data_stored(),
246 ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
Naman Guptaa63aa132023-03-22 20:06:34 -0700247 }
248
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700249 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
250 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -0700251 {
252 using ::testing::UnorderedElementsAre;
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700253 std::shared_ptr<const aos::Configuration> config = sorted_parts[0].config;
Naman Guptaa63aa132023-03-22 20:06:34 -0700254
255 // Timing reports, pings
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700256 if (shared()) {
257 EXPECT_THAT(
258 CountChannelsData(config, logfiles_[2]),
259 UnorderedElementsAre(
260 std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
261 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
262 200),
263 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
264 21),
265 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
266 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
267 std::make_tuple("/test", "aos.examples.Ping", 2001)))
268 << " : " << logfiles_[2];
269 } else {
270 EXPECT_THAT(
271 CountChannelsData(config, logfiles_[2]),
272 UnorderedElementsAre(
273 std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
274 std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
275 200),
276 std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
277 21),
278 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
279 std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
280 std::make_tuple("/test", "aos.examples.Ping", 2001),
281 std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
282 "aos-message_bridge-Timestamp",
283 "aos.message_bridge.RemoteMessage", 200)))
284 << " : " << logfiles_[2];
Naman Guptaa63aa132023-03-22 20:06:34 -0700285 }
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700286
287 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
288 ::testing::UnorderedElementsAre())
289 << " : " << logfiles_[3];
290 EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
291 ::testing::UnorderedElementsAre())
292 << " : " << logfiles_[4];
293
Naman Guptaa63aa132023-03-22 20:06:34 -0700294 // Timestamps for pong
295 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
296 UnorderedElementsAre())
297 << " : " << logfiles_[2];
298 EXPECT_THAT(
299 CountChannelsTimestamp(config, logfiles_[3]),
300 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
301 << " : " << logfiles_[3];
302 EXPECT_THAT(
303 CountChannelsTimestamp(config, logfiles_[4]),
304 UnorderedElementsAre(
305 std::make_tuple("/test", "aos.examples.Pong", 2000),
306 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)))
307 << " : " << logfiles_[4];
308
Naman Guptaa63aa132023-03-22 20:06:34 -0700309 // Timing reports and pongs.
Naman Guptaa63aa132023-03-22 20:06:34 -0700310 EXPECT_THAT(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700311 CountChannelsData(config, logfiles_[5]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700312 UnorderedElementsAre(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700313 std::make_tuple("/pi2/aos", "aos.examples.Ping", 2001),
Naman Guptaa63aa132023-03-22 20:06:34 -0700314 std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
315 200),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700316 std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
317 21),
318 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
Sanjay Narayanan5ec00232022-07-08 15:21:30 -0700319 std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700320 std::make_tuple("/test", "aos.examples.Pong", 2001)))
321 << " : " << logfiles_[5];
322 EXPECT_THAT(CountChannelsData(config, logfiles_[6]), UnorderedElementsAre())
323 << " : " << logfiles_[6];
324 EXPECT_THAT(CountChannelsData(config, logfiles_[7]), UnorderedElementsAre())
Naman Guptaa63aa132023-03-22 20:06:34 -0700325 << " : " << logfiles_[7];
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700326 // And ping timestamps.
327 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
328 UnorderedElementsAre())
329 << " : " << logfiles_[5];
Naman Guptaa63aa132023-03-22 20:06:34 -0700330 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700331 CountChannelsTimestamp(config, logfiles_[6]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700332 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700333 << " : " << logfiles_[6];
Naman Guptaa63aa132023-03-22 20:06:34 -0700334 EXPECT_THAT(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700335 CountChannelsTimestamp(config, logfiles_[7]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700336 UnorderedElementsAre(
337 std::make_tuple("/test", "aos.examples.Ping", 2000),
338 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700339 << " : " << logfiles_[7];
Naman Guptaa63aa132023-03-22 20:06:34 -0700340
341 // And then test that the remotely logged timestamp data files only have
342 // timestamps in them.
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700343 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[8]),
344 UnorderedElementsAre())
345 << " : " << logfiles_[8];
346 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[9]),
347 UnorderedElementsAre())
348 << " : " << logfiles_[9];
Naman Guptaa63aa132023-03-22 20:06:34 -0700349 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
350 UnorderedElementsAre())
351 << " : " << logfiles_[10];
352 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
353 UnorderedElementsAre())
354 << " : " << logfiles_[11];
Naman Guptaa63aa132023-03-22 20:06:34 -0700355
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700356 EXPECT_THAT(CountChannelsData(config, logfiles_[8]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700357 UnorderedElementsAre(std::make_tuple(
358 "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700359 << " : " << logfiles_[8];
360 EXPECT_THAT(CountChannelsData(config, logfiles_[9]),
Naman Guptaa63aa132023-03-22 20:06:34 -0700361 UnorderedElementsAre(std::make_tuple(
362 "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700363 << " : " << logfiles_[9];
Naman Guptaa63aa132023-03-22 20:06:34 -0700364
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700365 // Pong snd timestamp data.
366 EXPECT_THAT(
367 CountChannelsData(config, logfiles_[10]),
368 UnorderedElementsAre(
369 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 9),
370 std::make_tuple("/test", "aos.examples.Pong", 91)))
371 << " : " << logfiles_[10];
372 EXPECT_THAT(
373 CountChannelsData(config, logfiles_[11]),
374 UnorderedElementsAre(
375 std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 191),
376 std::make_tuple("/test", "aos.examples.Pong", 1910)))
377 << " : " << logfiles_[11];
Naman Guptaa63aa132023-03-22 20:06:34 -0700378
379 // Timestamps from pi2 on pi1, and the other way.
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700380 // if (shared()) {
381 EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
382 UnorderedElementsAre())
383 << " : " << logfiles_[12];
384 EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
385 UnorderedElementsAre())
386 << " : " << logfiles_[13];
387 EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
388 UnorderedElementsAre())
389 << " : " << logfiles_[14];
390 EXPECT_THAT(CountChannelsData(config, logfiles_[15]),
391 UnorderedElementsAre())
392 << " : " << logfiles_[15];
393 EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
394 UnorderedElementsAre())
395 << " : " << logfiles_[16];
Naman Guptaa63aa132023-03-22 20:06:34 -0700396
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700397 EXPECT_THAT(
398 CountChannelsTimestamp(config, logfiles_[12]),
399 UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
400 << " : " << logfiles_[12];
401 EXPECT_THAT(
402 CountChannelsTimestamp(config, logfiles_[13]),
403 UnorderedElementsAre(
404 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 9),
405 std::make_tuple("/test", "aos.examples.Ping", 90)))
406 << " : " << logfiles_[13];
407 EXPECT_THAT(
408 CountChannelsTimestamp(config, logfiles_[14]),
409 UnorderedElementsAre(
410 std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 191),
411 std::make_tuple("/test", "aos.examples.Ping", 1910)))
412 << " : " << logfiles_[14];
413 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
414 UnorderedElementsAre(std::make_tuple(
415 "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
416 << " : " << logfiles_[15];
417 EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[16]),
418 UnorderedElementsAre(std::make_tuple(
419 "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
420 << " : " << logfiles_[16];
Naman Guptaa63aa132023-03-22 20:06:34 -0700421 }
422
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700423 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -0700424
425 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
426 log_reader_factory.set_send_delay(chrono::microseconds(0));
427
428 // This sends out the fetched messages and advances time to the start of the
429 // log file.
430 reader.Register(&log_reader_factory);
431
432 const Node *pi1 =
433 configuration::GetNode(log_reader_factory.configuration(), "pi1");
434 const Node *pi2 =
435 configuration::GetNode(log_reader_factory.configuration(), "pi2");
436
437 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
438 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
439 LOG(INFO) << "now pi1 "
440 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
441 LOG(INFO) << "now pi2 "
442 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
443
444 EXPECT_THAT(reader.LoggedNodes(),
445 ::testing::ElementsAre(
446 configuration::GetNode(reader.logged_configuration(), pi1),
447 configuration::GetNode(reader.logged_configuration(), pi2)));
448
449 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
450
451 std::unique_ptr<EventLoop> pi1_event_loop =
452 log_reader_factory.MakeEventLoop("test", pi1);
453 std::unique_ptr<EventLoop> pi2_event_loop =
454 log_reader_factory.MakeEventLoop("test", pi2);
455
456 int pi1_ping_count = 10;
457 int pi2_ping_count = 10;
458 int pi1_pong_count = 10;
459 int pi2_pong_count = 10;
460
461 // Confirm that the ping value matches.
462 pi1_event_loop->MakeWatcher(
463 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
464 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
465 << pi1_event_loop->context().monotonic_remote_time << " -> "
466 << pi1_event_loop->context().monotonic_event_time;
467 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
468 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
469 pi1_ping_count * chrono::milliseconds(10) +
470 monotonic_clock::epoch());
471 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
472 pi1_ping_count * chrono::milliseconds(10) +
473 realtime_clock::epoch());
474 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
475 pi1_event_loop->context().monotonic_event_time);
476 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
477 pi1_event_loop->context().realtime_event_time);
Austin Schuhb5224ec2024-03-27 15:20:09 -0700478 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_transmit_time,
479 monotonic_clock::min_time);
Naman Guptaa63aa132023-03-22 20:06:34 -0700480
481 ++pi1_ping_count;
482 });
483 pi2_event_loop->MakeWatcher(
484 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
485 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
486 << pi2_event_loop->context().monotonic_remote_time << " -> "
487 << pi2_event_loop->context().monotonic_event_time;
488 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
489
490 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
491 pi2_ping_count * chrono::milliseconds(10) +
492 monotonic_clock::epoch());
493 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
494 pi2_ping_count * chrono::milliseconds(10) +
495 realtime_clock::epoch());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700496 // The message at the start of each second doesn't have wakeup latency
497 // since timing reports and server statistics wake us up already at that
498 // point in time.
499 chrono::nanoseconds offset = chrono::microseconds(150);
500 if (pi2_event_loop->context().monotonic_remote_time.time_since_epoch() %
501 chrono::seconds(1) ==
502 chrono::seconds(0)) {
503 offset = chrono::microseconds(100);
504 }
505 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time + offset,
Naman Guptaa63aa132023-03-22 20:06:34 -0700506 pi2_event_loop->context().monotonic_event_time);
Austin Schuhb5224ec2024-03-27 15:20:09 -0700507 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time -
508 chrono::microseconds(100),
509 pi2_event_loop->context().monotonic_remote_transmit_time);
Austin Schuhac6d89e2024-03-27 14:56:09 -0700510 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time + offset,
Naman Guptaa63aa132023-03-22 20:06:34 -0700511 pi2_event_loop->context().realtime_event_time);
512 ++pi2_ping_count;
513 });
514
515 constexpr ssize_t kQueueIndexOffset = -9;
516 // Confirm that the ping and pong counts both match, and the value also
517 // matches.
Austin Schuhac6d89e2024-03-27 14:56:09 -0700518 pi1_event_loop->MakeWatcher("/test", [&pi1_event_loop, &pi1_ping_count,
519 &pi1_pong_count](
520 const examples::Pong &pong) {
521 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
522 << pi1_event_loop->context().monotonic_remote_time << " -> "
523 << pi1_event_loop->context().monotonic_event_time;
Naman Guptaa63aa132023-03-22 20:06:34 -0700524
Austin Schuhac6d89e2024-03-27 14:56:09 -0700525 EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
526 pi1_pong_count + kQueueIndexOffset);
Naman Guptaa63aa132023-03-22 20:06:34 -0700527
Austin Schuhac6d89e2024-03-27 14:56:09 -0700528 chrono::nanoseconds offset = chrono::microseconds(200);
529 if ((pi1_event_loop->context().monotonic_remote_time.time_since_epoch() -
530 chrono::microseconds(150)) %
531 chrono::seconds(1) ==
532 chrono::seconds(0)) {
533 offset = chrono::microseconds(150);
534 }
Naman Guptaa63aa132023-03-22 20:06:34 -0700535
Austin Schuhac6d89e2024-03-27 14:56:09 -0700536 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
537 offset + pi1_pong_count * chrono::milliseconds(10) +
538 monotonic_clock::epoch());
539 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
540 offset + pi1_pong_count * chrono::milliseconds(10) +
541 realtime_clock::epoch());
Naman Guptaa63aa132023-03-22 20:06:34 -0700542
Austin Schuhac6d89e2024-03-27 14:56:09 -0700543 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
544 chrono::microseconds(150),
545 pi1_event_loop->context().monotonic_event_time);
546 EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
547 chrono::microseconds(150),
548 pi1_event_loop->context().realtime_event_time);
Austin Schuhb5224ec2024-03-27 15:20:09 -0700549 EXPECT_EQ(pi1_event_loop->context().monotonic_remote_transmit_time,
550 pi1_event_loop->context().monotonic_event_time -
551 chrono::microseconds(100));
Naman Guptaa63aa132023-03-22 20:06:34 -0700552
Austin Schuhac6d89e2024-03-27 14:56:09 -0700553 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
554 ++pi1_pong_count;
555 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
556 });
557 pi2_event_loop->MakeWatcher("/test", [&pi2_event_loop, &pi2_ping_count,
558 &pi2_pong_count](
559 const examples::Pong &pong) {
560 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
561 << pi2_event_loop->context().monotonic_remote_time << " -> "
562 << pi2_event_loop->context().monotonic_event_time;
Naman Guptaa63aa132023-03-22 20:06:34 -0700563
Austin Schuhac6d89e2024-03-27 14:56:09 -0700564 EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
565 pi2_pong_count + kQueueIndexOffset);
Naman Guptaa63aa132023-03-22 20:06:34 -0700566
Austin Schuhac6d89e2024-03-27 14:56:09 -0700567 chrono::nanoseconds offset = chrono::microseconds(200);
568 if ((pi2_event_loop->context().monotonic_remote_time.time_since_epoch() -
569 chrono::microseconds(150)) %
570 chrono::seconds(1) ==
571 chrono::seconds(0)) {
572 offset = chrono::microseconds(150);
573 }
574
575 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
576 offset + pi2_pong_count * chrono::milliseconds(10) +
577 monotonic_clock::epoch());
578 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
579 offset + pi2_pong_count * chrono::milliseconds(10) +
580 realtime_clock::epoch());
581
582 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
583 pi2_event_loop->context().monotonic_event_time);
584 EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
585 pi2_event_loop->context().realtime_event_time);
Austin Schuhb5224ec2024-03-27 15:20:09 -0700586 EXPECT_EQ(pi2_event_loop->context().monotonic_remote_transmit_time,
587 monotonic_clock::min_time);
Austin Schuhac6d89e2024-03-27 14:56:09 -0700588
589 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
590 ++pi2_pong_count;
591 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
592 });
Naman Guptaa63aa132023-03-22 20:06:34 -0700593
594 log_reader_factory.Run();
595 EXPECT_EQ(pi1_ping_count, 2010);
596 EXPECT_EQ(pi2_ping_count, 2010);
597 EXPECT_EQ(pi1_pong_count, 2010);
598 EXPECT_EQ(pi2_pong_count, 2010);
599
600 reader.Deregister();
601}
602
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600603// MultinodeLoggerTest that tests the mutate callback works across multiple
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700604// nodes with remapping.
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600605TEST_P(MultinodeLoggerTest, MultiNodeRemapMutateCallback) {
606 time_converter_.StartEqual();
607 std::vector<std::string> actual_filenames;
608
609 {
610 LoggerState pi1_logger = MakeLogger(pi1_);
611 LoggerState pi2_logger = MakeLogger(pi2_);
612
613 event_loop_factory_.RunFor(chrono::milliseconds(95));
614
615 StartLogger(&pi1_logger);
616 StartLogger(&pi2_logger);
617
618 event_loop_factory_.RunFor(chrono::milliseconds(20000));
619 pi1_logger.AppendAllFilenames(&actual_filenames);
620 pi2_logger.AppendAllFilenames(&actual_filenames);
621 }
622
Austin Schuh8fb4b452023-08-04 17:02:27 -0700623 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700624 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600625
626 LogReader reader(sorted_parts, &config_.message());
627 // Remap just on pi1.
628 reader.RemapLoggedChannel<examples::Pong>(
629 "/test", configuration::GetNode(reader.configuration(), "pi1"));
630
631 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
632
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700633 int pong_count = 10;
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600634 // Adds a callback which mutates the value of the pong message before the
635 // message is sent which is the feature we are testing here
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700636 reader.AddBeforeSendCallback<aos::examples::Pong>(
637 "/test",
638 [&pong_count](
639 aos::examples::Pong *pong,
640 const TimestampedMessage &timestamped_message) -> SharedSpan {
641 pong->mutate_value(pong_count + 1);
642 ++pong_count;
643 return *timestamped_message.data;
644 });
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600645
646 // This sends out the fetched messages and advances time to the start of the
647 // log file.
648 reader.Register(&log_reader_factory);
649
650 const Node *pi1 =
651 configuration::GetNode(log_reader_factory.configuration(), "pi1");
652 const Node *pi2 =
653 configuration::GetNode(log_reader_factory.configuration(), "pi2");
654
655 EXPECT_THAT(reader.LoggedNodes(),
656 ::testing::ElementsAre(
657 configuration::GetNode(reader.logged_configuration(), pi1),
658 configuration::GetNode(reader.logged_configuration(), pi2)));
659
660 std::unique_ptr<EventLoop> pi1_event_loop =
661 log_reader_factory.MakeEventLoop("test", pi1);
662 std::unique_ptr<EventLoop> pi2_event_loop =
663 log_reader_factory.MakeEventLoop("test", pi2);
664
665 pi1_event_loop->MakeWatcher("/original/test",
666 [&pong_count](const examples::Pong &pong) {
667 EXPECT_EQ(pong_count, pong.value());
668 });
669
670 pi2_event_loop->MakeWatcher("/test",
671 [&pong_count](const examples::Pong &pong) {
672 EXPECT_EQ(pong_count, pong.value());
673 });
674
675 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
676 reader.Deregister();
677
678 EXPECT_EQ(pong_count, 2011);
679}
680
681// MultinodeLoggerTest that tests the mutate callback works across multiple
682// nodes
683TEST_P(MultinodeLoggerTest, MultiNodeMutateCallback) {
684 time_converter_.StartEqual();
685 std::vector<std::string> actual_filenames;
686
687 {
688 LoggerState pi1_logger = MakeLogger(pi1_);
689 LoggerState pi2_logger = MakeLogger(pi2_);
690
691 event_loop_factory_.RunFor(chrono::milliseconds(95));
692
693 StartLogger(&pi1_logger);
694 StartLogger(&pi2_logger);
695
696 event_loop_factory_.RunFor(chrono::milliseconds(20000));
697 pi1_logger.AppendAllFilenames(&actual_filenames);
698 pi2_logger.AppendAllFilenames(&actual_filenames);
699 }
700
Austin Schuh8fb4b452023-08-04 17:02:27 -0700701 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700702 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600703
704 LogReader reader(sorted_parts, &config_.message());
705
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700706 int pong_count = 10;
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600707 // Adds a callback which mutates the value of the pong message before the
708 // message is sent which is the feature we are testing here
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700709 reader.AddBeforeSendCallback<aos::examples::Pong>(
710 "/test",
711 [&pong_count](
712 aos::examples::Pong *pong,
713 const TimestampedMessage &timestamped_message) -> SharedSpan {
714 pong->mutate_value(pong_count + 1);
715 ++pong_count;
716 return *timestamped_message.data;
717 });
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600718
719 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
720
721 // This sends out the fetched messages and advances time to the start of the
722 // log file.
723 reader.Register(&log_reader_factory);
724
725 const Node *pi1 =
726 configuration::GetNode(log_reader_factory.configuration(), "pi1");
727 const Node *pi2 =
728 configuration::GetNode(log_reader_factory.configuration(), "pi2");
729
730 EXPECT_THAT(reader.LoggedNodes(),
731 ::testing::ElementsAre(
732 configuration::GetNode(reader.logged_configuration(), pi1),
733 configuration::GetNode(reader.logged_configuration(), pi2)));
734
735 std::unique_ptr<EventLoop> pi1_event_loop =
736 log_reader_factory.MakeEventLoop("test", pi1);
737 std::unique_ptr<EventLoop> pi2_event_loop =
738 log_reader_factory.MakeEventLoop("test", pi2);
739
740 pi1_event_loop->MakeWatcher("/test",
741 [&pong_count](const examples::Pong &pong) {
742 EXPECT_EQ(pong_count, pong.value());
743 });
744
745 pi2_event_loop->MakeWatcher("/test",
746 [&pong_count](const examples::Pong &pong) {
747 EXPECT_EQ(pong_count, pong.value());
748 });
749
750 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
751 reader.Deregister();
752
753 EXPECT_EQ(pong_count, 2011);
754}
755
756// Tests that the before send callback is only called from the sender node if it
757// is forwarded
758TEST_P(MultinodeLoggerTest, OnlyDoBeforeSendCallbackOnSenderNode) {
759 time_converter_.StartEqual();
Austin Schuh8fb4b452023-08-04 17:02:27 -0700760
761 std::vector<std::string> filenames;
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600762 {
763 LoggerState pi1_logger = MakeLogger(pi1_);
764 LoggerState pi2_logger = MakeLogger(pi2_);
765
766 event_loop_factory_.RunFor(chrono::milliseconds(95));
767
768 StartLogger(&pi1_logger);
769 StartLogger(&pi2_logger);
770
771 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -0700772
773 pi1_logger.AppendAllFilenames(&filenames);
774 pi2_logger.AppendAllFilenames(&filenames);
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600775 }
776
Austin Schuh8fb4b452023-08-04 17:02:27 -0700777 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700778 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
779 LogReader reader(sorted_parts);
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600780
781 int ping_count = 0;
782 // Adds a callback which mutates the value of the pong message before the
783 // message is sent which is the feature we are testing here
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700784 reader.AddBeforeSendCallback<aos::examples::Ping>(
785 "/test",
786 [&ping_count](
787 aos::examples::Ping *ping,
788 const TimestampedMessage &timestamped_message) -> SharedSpan {
789 ++ping_count;
790 ping->mutate_value(ping_count);
791 return *timestamped_message.data;
792 });
Eric Schmiedebergae00e732023-04-12 15:53:17 -0600793
794 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
795 log_reader_factory.set_send_delay(chrono::microseconds(0));
796
797 reader.Register(&log_reader_factory);
798
799 const Node *pi1 =
800 configuration::GetNode(log_reader_factory.configuration(), "pi1");
801 const Node *pi2 =
802 configuration::GetNode(log_reader_factory.configuration(), "pi2");
803
804 std::unique_ptr<EventLoop> pi1_event_loop =
805 log_reader_factory.MakeEventLoop("test", pi1);
806 pi1_event_loop->SkipTimingReport();
807 std::unique_ptr<EventLoop> pi2_event_loop =
808 log_reader_factory.MakeEventLoop("test", pi2);
809 pi2_event_loop->SkipTimingReport();
810
811 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
812 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
813
814 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
815 pi1_ping_timestamp;
816 if (!shared()) {
817 pi1_ping_timestamp =
818 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
819 pi1_event_loop.get(),
820 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
821 }
822
823 log_reader_factory.Run();
824
825 EXPECT_EQ(pi1_ping.count(), 2000u);
826 EXPECT_EQ(pi2_ping.count(), 2000u);
827 // If the BeforeSendCallback is called on both nodes, then the ping count
828 // would be 4002 instead of 2001
829 EXPECT_EQ(ping_count, 2001u);
830 if (!shared()) {
831 EXPECT_EQ(pi1_ping_timestamp->count(), 2000u);
832 }
833
834 reader.Deregister();
835}
836
Austin Schuh3c9f92c2024-04-30 17:56:42 -0700837// MultinodeLoggerTest that tests the mutate callback can fully replace the
838// message.
839TEST_P(MultinodeLoggerTest, MultiNodeMutateCallbackReplacement) {
840 time_converter_.StartEqual();
841 std::vector<std::string> actual_filenames;
842
843 {
844 LoggerState pi1_logger = MakeLogger(pi1_);
845 LoggerState pi2_logger = MakeLogger(pi2_);
846
847 event_loop_factory_.RunFor(chrono::milliseconds(95));
848
849 StartLogger(&pi1_logger);
850 StartLogger(&pi2_logger);
851
852 event_loop_factory_.RunFor(chrono::milliseconds(20000));
853 pi1_logger.AppendAllFilenames(&actual_filenames);
854 pi2_logger.AppendAllFilenames(&actual_filenames);
855 }
856
857 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
858 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
859
860 LogReader reader(sorted_parts, &config_.message());
861
862 int pong_count = 10;
863 const uint8_t *data_ptr = nullptr;
864 // Adds a callback which replaces the pong message before the message is sent.
865 reader.AddBeforeSendCallback<aos::examples::Pong>(
866 "/test",
867 [&pong_count, &data_ptr](aos::examples::Pong *pong,
868 const TimestampedMessage &) -> SharedSpan {
869 fbs::AlignedVectorAllocator allocator;
870 aos::fbs::Builder<aos::examples::PongStatic> pong_static(&allocator);
871 CHECK(pong_static->FromFlatbuffer(*pong));
872
873 pong_static->set_value(pong_count + 101);
874 ++pong_count;
875
876 SharedSpan result = allocator.Release();
877
878 data_ptr = result->data();
879
880 return result;
881 });
882
883 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
884
885 // This sends out the fetched messages and advances time to the start of the
886 // log file.
887 reader.Register(&log_reader_factory);
888
889 const Node *pi1 =
890 configuration::GetNode(log_reader_factory.configuration(), "pi1");
891 const Node *pi2 =
892 configuration::GetNode(log_reader_factory.configuration(), "pi2");
893
894 EXPECT_THAT(reader.LoggedNodes(),
895 ::testing::ElementsAre(
896 configuration::GetNode(reader.logged_configuration(), pi1),
897 configuration::GetNode(reader.logged_configuration(), pi2)));
898
899 std::unique_ptr<EventLoop> pi1_event_loop =
900 log_reader_factory.MakeEventLoop("test", pi1);
901 std::unique_ptr<EventLoop> pi2_event_loop =
902 log_reader_factory.MakeEventLoop("test", pi2);
903
904 int pi1_pong_count = 10;
905 pi1_event_loop->MakeWatcher(
906 "/test", [&pi1_event_loop, &pong_count, &pi1_pong_count,
907 &data_ptr](const examples::Pong &pong) {
908 ++pi1_pong_count;
909 // Since simulated event loops (especially log replay) refcount the
910 // shared data, we can verify if the right data got published by
911 // verifying that the actual pointer to the flatbuffer matches. This
912 // only is guarenteed to hold during this callback.
913 EXPECT_EQ(pi1_event_loop->context().data, data_ptr);
914 EXPECT_EQ(pong_count + 100, pong.value());
915 EXPECT_EQ(pi1_pong_count + 101, pong.value());
916 });
917
918 pi2_event_loop->MakeWatcher("/test", [&pi2_event_loop, &pong_count,
919 &data_ptr](const examples::Pong &pong) {
920 // Same goes for the forwarded side, that should be the same contents too.
921 EXPECT_EQ(pi2_event_loop->context().data, data_ptr);
922 EXPECT_EQ(pong_count + 100, pong.value());
923 });
924
925 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
926 reader.Deregister();
927
928 EXPECT_EQ(pong_count, 2011);
929}
930
931// MultinodeLoggerTest that tests the mutate callback can delete messages by
932// returning nullptr.
933TEST_P(MultinodeLoggerTest, MultiNodeMutateCallbackDelete) {
934 time_converter_.StartEqual();
935 std::vector<std::string> actual_filenames;
936
937 {
938 LoggerState pi1_logger = MakeLogger(pi1_);
939 LoggerState pi2_logger = MakeLogger(pi2_);
940
941 event_loop_factory_.RunFor(chrono::milliseconds(95));
942
943 StartLogger(&pi1_logger);
944 StartLogger(&pi2_logger);
945
946 event_loop_factory_.RunFor(chrono::milliseconds(20000));
947 pi1_logger.AppendAllFilenames(&actual_filenames);
948 pi2_logger.AppendAllFilenames(&actual_filenames);
949 }
950
951 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
952 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
953
954 LogReader reader(sorted_parts, &config_.message());
955
956 int pong_count = 10;
957 const uint8_t *data_ptr = nullptr;
958 // Adds a callback which mutates the value of the pong message before the
959 // message is sent which is the feature we are testing here
960 reader.AddBeforeSendCallback<aos::examples::Pong>(
961 "/test",
962 [&pong_count, &data_ptr](aos::examples::Pong *pong,
963 const TimestampedMessage &) -> SharedSpan {
964 fbs::AlignedVectorAllocator allocator;
965 aos::fbs::Builder<aos::examples::PongStatic> pong_static(&allocator);
966 CHECK(pong_static->FromFlatbuffer(*pong));
967
968 pong_static->set_value(pong_count + 101);
969 ++pong_count;
970
971 if ((pong_count % 2) == 0) {
972 data_ptr = nullptr;
973 return nullptr;
974 }
975
976 SharedSpan result = allocator.Release();
977
978 data_ptr = result->data();
979
980 return result;
981 });
982
983 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
984
985 // This sends out the fetched messages and advances time to the start of the
986 // log file.
987 reader.Register(&log_reader_factory);
988
989 const Node *pi1 =
990 configuration::GetNode(log_reader_factory.configuration(), "pi1");
991 const Node *pi2 =
992 configuration::GetNode(log_reader_factory.configuration(), "pi2");
993
994 EXPECT_THAT(reader.LoggedNodes(),
995 ::testing::ElementsAre(
996 configuration::GetNode(reader.logged_configuration(), pi1),
997 configuration::GetNode(reader.logged_configuration(), pi2)));
998
999 std::unique_ptr<EventLoop> pi1_event_loop =
1000 log_reader_factory.MakeEventLoop("test", pi1);
1001 std::unique_ptr<EventLoop> pi2_event_loop =
1002 log_reader_factory.MakeEventLoop("test", pi2);
1003
1004 int pi1_pong_count = 10;
1005 pi1_event_loop->MakeWatcher(
1006 "/test", [&pi1_event_loop, &pong_count, &pi1_pong_count,
1007 &data_ptr](const examples::Pong &pong) {
1008 pi1_pong_count += 2;
1009 // Since simulated event loops (especially log replay) refcount the
1010 // shared data, we can verify if the right data got published by
1011 // verifying that the actual pointer to the flatbuffer matches. This
1012 // only is guarenteed to hold during this callback.
1013 EXPECT_EQ(pi1_event_loop->context().data, data_ptr);
1014 EXPECT_EQ(pong_count + 100, pong.value());
1015 EXPECT_EQ(pi1_pong_count + 101, pong.value());
1016 });
1017
1018 pi2_event_loop->MakeWatcher("/test", [&pi2_event_loop, &pong_count,
1019 &data_ptr](const examples::Pong &pong) {
1020 // Same goes for the forwarded side, that should be the same contents too.
1021 EXPECT_EQ(pi2_event_loop->context().data, data_ptr);
1022 EXPECT_EQ(pong_count + 100, pong.value());
1023 });
1024
1025 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
1026 reader.Deregister();
1027
1028 EXPECT_EQ(pong_count, 2011);
1029 // Since we count up by 2 each time we get a message, and the last pong gets
1030 // dropped since it is an odd number we expect the number on pi1 to be 1 less.
1031 EXPECT_EQ(pi1_pong_count, 2010);
1032}
1033
1034// MultinodeLoggerTest that tests that non-forwarded channels are able to be
1035// mutated.
1036TEST_P(MultinodeLoggerTest, MultiNodeMutateCallbackNotForwarded) {
1037 time_converter_.StartEqual();
1038 std::vector<std::string> actual_filenames;
1039
1040 {
1041 LoggerState pi1_logger = MakeLogger(pi1_);
1042 LoggerState pi2_logger = MakeLogger(pi2_);
1043
1044 event_loop_factory_.RunFor(chrono::milliseconds(95));
1045
1046 StartLogger(&pi1_logger);
1047 StartLogger(&pi2_logger);
1048
1049 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1050 pi1_logger.AppendAllFilenames(&actual_filenames);
1051 pi2_logger.AppendAllFilenames(&actual_filenames);
1052 }
1053
1054 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1055 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1056
1057 LogReader reader(sorted_parts, &config_.message());
1058
1059 int ping_count = 10;
1060 const uint8_t *data_ptr = nullptr;
1061 // Adds a callback which mutates the value of the pong message before the
1062 // message is sent which is the feature we are testing here
1063 reader.AddBeforeSendCallback<aos::examples::Ping>(
1064 "/pi1/aos",
1065 [&ping_count, &data_ptr](aos::examples::Ping *ping,
1066 const TimestampedMessage &) -> SharedSpan {
1067 fbs::AlignedVectorAllocator allocator;
1068 aos::fbs::Builder<aos::examples::PingStatic> ping_static(&allocator);
1069 CHECK(ping_static->FromFlatbuffer(*ping));
1070
1071 ping_static->set_value(ping_count + 101);
1072 ++ping_count;
1073
1074 SharedSpan result = allocator.Release();
1075
1076 data_ptr = result->data();
1077
1078 return result;
1079 });
1080
1081 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1082
1083 // This sends out the fetched messages and advances time to the start of the
1084 // log file.
1085 reader.Register(&log_reader_factory);
1086
1087 const Node *pi1 =
1088 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1089 const Node *pi2 =
1090 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1091
1092 EXPECT_THAT(reader.LoggedNodes(),
1093 ::testing::ElementsAre(
1094 configuration::GetNode(reader.logged_configuration(), pi1),
1095 configuration::GetNode(reader.logged_configuration(), pi2)));
1096
1097 std::unique_ptr<EventLoop> pi1_event_loop =
1098 log_reader_factory.MakeEventLoop("test", pi1);
1099 std::unique_ptr<EventLoop> pi2_event_loop =
1100 log_reader_factory.MakeEventLoop("test", pi2);
1101
1102 int pi1_ping_count = 10;
1103 pi1_event_loop->MakeWatcher(
1104 "/aos", [&pi1_event_loop, &ping_count, &pi1_ping_count,
1105 &data_ptr](const examples::Ping &ping) {
1106 ++pi1_ping_count;
1107 // Since simulated event loops (especially log replay) refcount the
1108 // shared data, we can verify if the right data got published by
1109 // verifying that the actual pointer to the flatbuffer matches. This
1110 // only is guarenteed to hold during this callback.
1111 EXPECT_EQ(pi1_event_loop->context().data, data_ptr);
1112 EXPECT_EQ(ping_count + 100, ping.value());
1113 EXPECT_EQ(pi1_ping_count + 101, ping.value());
1114 });
1115
1116 reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
1117 reader.Deregister();
1118
1119 EXPECT_EQ(ping_count, 2011);
1120}
1121
Eric Schmiedebergae00e732023-04-12 15:53:17 -06001122// Tests that we do not allow adding callbacks after Register is called
1123TEST_P(MultinodeLoggerDeathTest, AddCallbackAfterRegister) {
1124 time_converter_.StartEqual();
1125 std::vector<std::string> actual_filenames;
1126
1127 {
1128 LoggerState pi1_logger = MakeLogger(pi1_);
1129 LoggerState pi2_logger = MakeLogger(pi2_);
1130
1131 event_loop_factory_.RunFor(chrono::milliseconds(95));
1132
1133 StartLogger(&pi1_logger);
1134 StartLogger(&pi2_logger);
1135
1136 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1137 pi1_logger.AppendAllFilenames(&actual_filenames);
1138 pi2_logger.AppendAllFilenames(&actual_filenames);
1139 }
1140
Austin Schuh8fb4b452023-08-04 17:02:27 -07001141 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001142 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Eric Schmiedebergae00e732023-04-12 15:53:17 -06001143
1144 LogReader reader(sorted_parts, &config_.message());
1145 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1146 reader.Register(&log_reader_factory);
1147 EXPECT_DEATH(
1148 {
Austin Schuh3c9f92c2024-04-30 17:56:42 -07001149 reader.AddBeforeSendCallback<aos::examples::Pong>(
1150 "/test",
1151 [](aos::examples::Pong *,
1152 const TimestampedMessage &timestamped_message) -> SharedSpan {
1153 LOG(FATAL) << "This should not be called";
1154 return *timestamped_message.data;
1155 });
Eric Schmiedebergae00e732023-04-12 15:53:17 -06001156 },
1157 "Cannot add callbacks after calling Register");
1158 reader.Deregister();
1159}
1160
Naman Guptaa63aa132023-03-22 20:06:34 -07001161// Test that if we feed the replay with a mismatched node list that we die on
1162// the LogReader constructor.
1163TEST_P(MultinodeLoggerDeathTest, MultiNodeBadReplayConfig) {
1164 time_converter_.StartEqual();
Austin Schuh8fb4b452023-08-04 17:02:27 -07001165
1166 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07001167 {
1168 LoggerState pi1_logger = MakeLogger(pi1_);
1169 LoggerState pi2_logger = MakeLogger(pi2_);
1170
1171 event_loop_factory_.RunFor(chrono::milliseconds(95));
1172
1173 StartLogger(&pi1_logger);
1174 StartLogger(&pi2_logger);
1175
1176 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07001177
1178 pi1_logger.AppendAllFilenames(&filenames);
1179 pi2_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001180 }
1181
1182 // Test that, if we add an additional node to the replay config that the
1183 // logger complains about the mismatch in number of nodes.
1184 FlatbufferDetachedBuffer<Configuration> extra_nodes_config =
1185 configuration::MergeWithConfig(&config_.message(), R"({
1186 "nodes": [
1187 {
1188 "name": "extra-node"
1189 }
1190 ]
1191 }
1192 )");
1193
Austin Schuh8fb4b452023-08-04 17:02:27 -07001194 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001195 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07001196 EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
1197 "Log file and replay config need to have matching nodes lists.");
1198}
1199
1200// Tests that we can read log files where they don't start at the same monotonic
1201// time.
1202TEST_P(MultinodeLoggerTest, StaggeredStart) {
1203 time_converter_.StartEqual();
1204 std::vector<std::string> actual_filenames;
1205
1206 {
1207 LoggerState pi1_logger = MakeLogger(pi1_);
1208 LoggerState pi2_logger = MakeLogger(pi2_);
1209
1210 event_loop_factory_.RunFor(chrono::milliseconds(95));
1211
1212 StartLogger(&pi1_logger);
1213
1214 event_loop_factory_.RunFor(chrono::milliseconds(200));
1215
1216 StartLogger(&pi2_logger);
1217
1218 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1219 pi1_logger.AppendAllFilenames(&actual_filenames);
1220 pi2_logger.AppendAllFilenames(&actual_filenames);
1221 }
1222
1223 // Since we delay starting pi2, it already knows about all the timestamps so
1224 // we don't end up with extra parts.
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001225 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1226 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1227 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001228
1229 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1230 log_reader_factory.set_send_delay(chrono::microseconds(0));
1231
1232 // This sends out the fetched messages and advances time to the start of the
1233 // log file.
1234 reader.Register(&log_reader_factory);
1235
1236 const Node *pi1 =
1237 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1238 const Node *pi2 =
1239 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1240
1241 EXPECT_THAT(reader.LoggedNodes(),
1242 ::testing::ElementsAre(
1243 configuration::GetNode(reader.logged_configuration(), pi1),
1244 configuration::GetNode(reader.logged_configuration(), pi2)));
1245
1246 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1247
1248 std::unique_ptr<EventLoop> pi1_event_loop =
1249 log_reader_factory.MakeEventLoop("test", pi1);
1250 std::unique_ptr<EventLoop> pi2_event_loop =
1251 log_reader_factory.MakeEventLoop("test", pi2);
1252
1253 int pi1_ping_count = 30;
1254 int pi2_ping_count = 30;
1255 int pi1_pong_count = 30;
1256 int pi2_pong_count = 30;
1257
1258 // Confirm that the ping value matches.
1259 pi1_event_loop->MakeWatcher(
1260 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
1261 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
1262 << pi1_event_loop->context().monotonic_remote_time << " -> "
1263 << pi1_event_loop->context().monotonic_event_time;
1264 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
1265
1266 ++pi1_ping_count;
1267 });
1268 pi2_event_loop->MakeWatcher(
1269 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
1270 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
1271 << pi2_event_loop->context().monotonic_remote_time << " -> "
1272 << pi2_event_loop->context().monotonic_event_time;
1273 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
1274
1275 ++pi2_ping_count;
1276 });
1277
1278 // Confirm that the ping and pong counts both match, and the value also
1279 // matches.
1280 pi1_event_loop->MakeWatcher(
1281 "/test", [&pi1_event_loop, &pi1_ping_count,
1282 &pi1_pong_count](const examples::Pong &pong) {
1283 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
1284 << pi1_event_loop->context().monotonic_remote_time << " -> "
1285 << pi1_event_loop->context().monotonic_event_time;
1286
1287 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
1288 ++pi1_pong_count;
1289 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
1290 });
1291 pi2_event_loop->MakeWatcher(
1292 "/test", [&pi2_event_loop, &pi2_ping_count,
1293 &pi2_pong_count](const examples::Pong &pong) {
1294 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
1295 << pi2_event_loop->context().monotonic_remote_time << " -> "
1296 << pi2_event_loop->context().monotonic_event_time;
1297
1298 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
1299 ++pi2_pong_count;
1300 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
1301 });
1302
1303 log_reader_factory.Run();
1304 EXPECT_EQ(pi1_ping_count, 2030);
1305 EXPECT_EQ(pi2_ping_count, 2030);
1306 EXPECT_EQ(pi1_pong_count, 2030);
1307 EXPECT_EQ(pi2_pong_count, 2030);
1308
1309 reader.Deregister();
1310}
1311
1312// Tests that we can read log files where the monotonic clocks drift and don't
1313// match correctly. While we are here, also test that different ending times
1314// also is readable.
1315TEST_P(MultinodeLoggerTest, MismatchedClocks) {
1316 // TODO(austin): Negate...
1317 const chrono::nanoseconds initial_pi2_offset = chrono::seconds(1000);
1318
1319 time_converter_.AddMonotonic(
1320 {BootTimestamp::epoch(), BootTimestamp::epoch() + initial_pi2_offset});
1321 // Wait for 95 ms, (~0.1 seconds - 1/2 of the ping/pong period), and set the
1322 // skew to be 200 uS/s
1323 const chrono::nanoseconds startup_sleep1 = time_converter_.AddMonotonic(
1324 {chrono::milliseconds(95),
1325 chrono::milliseconds(95) - chrono::nanoseconds(200) * 95});
1326 // Run another 200 ms to have one logger start first.
1327 const chrono::nanoseconds startup_sleep2 = time_converter_.AddMonotonic(
1328 {chrono::milliseconds(200), chrono::milliseconds(200)});
1329 // Slew one way then the other at the same 200 uS/S slew rate. Make sure we
1330 // go far enough to cause problems if this isn't accounted for.
1331 const chrono::nanoseconds logger_run1 = time_converter_.AddMonotonic(
1332 {chrono::milliseconds(20000),
1333 chrono::milliseconds(20000) - chrono::nanoseconds(200) * 20000});
1334 const chrono::nanoseconds logger_run2 = time_converter_.AddMonotonic(
1335 {chrono::milliseconds(40000),
1336 chrono::milliseconds(40000) + chrono::nanoseconds(200) * 40000});
1337 const chrono::nanoseconds logger_run3 = time_converter_.AddMonotonic(
1338 {chrono::milliseconds(400), chrono::milliseconds(400)});
1339
Austin Schuh8fb4b452023-08-04 17:02:27 -07001340 std::vector<std::string> actual_filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07001341 {
1342 LoggerState pi2_logger = MakeLogger(pi2_);
1343
1344 LOG(INFO) << "pi2 times: " << pi2_->monotonic_now() << " "
1345 << pi2_->realtime_now() << " distributed "
1346 << pi2_->ToDistributedClock(pi2_->monotonic_now());
1347
1348 LOG(INFO) << "pi2_ times: " << pi2_->monotonic_now() << " "
1349 << pi2_->realtime_now() << " distributed "
1350 << pi2_->ToDistributedClock(pi2_->monotonic_now());
1351
1352 event_loop_factory_.RunFor(startup_sleep1);
1353
1354 StartLogger(&pi2_logger);
1355
1356 event_loop_factory_.RunFor(startup_sleep2);
1357
1358 {
1359 // Run pi1's logger for only part of the time.
1360 LoggerState pi1_logger = MakeLogger(pi1_);
1361
1362 StartLogger(&pi1_logger);
1363 event_loop_factory_.RunFor(logger_run1);
1364
1365 // Make sure we slewed time far enough so that the difference is greater
1366 // than the network delay. This confirms that if we sort incorrectly, it
1367 // would show in the results.
1368 EXPECT_LT(
1369 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
1370 -event_loop_factory_.send_delay() -
1371 event_loop_factory_.network_delay());
1372
1373 event_loop_factory_.RunFor(logger_run2);
1374
1375 // And now check that we went far enough the other way to make sure we
1376 // cover both problems.
1377 EXPECT_GT(
1378 (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
1379 event_loop_factory_.send_delay() +
1380 event_loop_factory_.network_delay());
Austin Schuh8fb4b452023-08-04 17:02:27 -07001381
1382 pi1_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001383 }
1384
1385 // And log a bit more on pi2.
1386 event_loop_factory_.RunFor(logger_run3);
Austin Schuh8fb4b452023-08-04 17:02:27 -07001387
1388 pi2_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001389 }
1390
Austin Schuh8fb4b452023-08-04 17:02:27 -07001391 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001392 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1393 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001394
1395 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1396 log_reader_factory.set_send_delay(chrono::microseconds(0));
1397
1398 const Node *pi1 =
1399 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1400 const Node *pi2 =
1401 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1402
1403 // This sends out the fetched messages and advances time to the start of the
1404 // log file.
1405 reader.Register(&log_reader_factory);
1406
1407 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
1408 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
1409 LOG(INFO) << "now pi1 "
1410 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
1411 LOG(INFO) << "now pi2 "
1412 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
1413
1414 LOG(INFO) << "Done registering (pi1) "
1415 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
1416 << " "
1417 << log_reader_factory.GetNodeEventLoopFactory(pi1)->realtime_now();
1418 LOG(INFO) << "Done registering (pi2) "
1419 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now()
1420 << " "
1421 << log_reader_factory.GetNodeEventLoopFactory(pi2)->realtime_now();
1422
1423 EXPECT_THAT(reader.LoggedNodes(),
1424 ::testing::ElementsAre(
1425 configuration::GetNode(reader.logged_configuration(), pi1),
1426 configuration::GetNode(reader.logged_configuration(), pi2)));
1427
1428 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
1429
1430 std::unique_ptr<EventLoop> pi1_event_loop =
1431 log_reader_factory.MakeEventLoop("test", pi1);
1432 std::unique_ptr<EventLoop> pi2_event_loop =
1433 log_reader_factory.MakeEventLoop("test", pi2);
1434
1435 int pi1_ping_count = 30;
1436 int pi2_ping_count = 30;
1437 int pi1_pong_count = 30;
1438 int pi2_pong_count = 30;
1439
1440 // Confirm that the ping value matches.
1441 pi1_event_loop->MakeWatcher(
1442 "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
1443 VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
1444 << pi1_event_loop->context().monotonic_remote_time << " -> "
1445 << pi1_event_loop->context().monotonic_event_time;
1446 EXPECT_EQ(ping.value(), pi1_ping_count + 1);
1447
1448 ++pi1_ping_count;
1449 });
1450 pi2_event_loop->MakeWatcher(
1451 "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
1452 VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
1453 << pi2_event_loop->context().monotonic_remote_time << " -> "
1454 << pi2_event_loop->context().monotonic_event_time;
1455 EXPECT_EQ(ping.value(), pi2_ping_count + 1);
1456
1457 ++pi2_ping_count;
1458 });
1459
1460 // Confirm that the ping and pong counts both match, and the value also
1461 // matches.
1462 pi1_event_loop->MakeWatcher(
1463 "/test", [&pi1_event_loop, &pi1_ping_count,
1464 &pi1_pong_count](const examples::Pong &pong) {
1465 VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
1466 << pi1_event_loop->context().monotonic_remote_time << " -> "
1467 << pi1_event_loop->context().monotonic_event_time;
1468
1469 EXPECT_EQ(pong.value(), pi1_pong_count + 1);
1470 ++pi1_pong_count;
1471 EXPECT_EQ(pi1_ping_count, pi1_pong_count);
1472 });
1473 pi2_event_loop->MakeWatcher(
1474 "/test", [&pi2_event_loop, &pi2_ping_count,
1475 &pi2_pong_count](const examples::Pong &pong) {
1476 VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
1477 << pi2_event_loop->context().monotonic_remote_time << " -> "
1478 << pi2_event_loop->context().monotonic_event_time;
1479
1480 EXPECT_EQ(pong.value(), pi2_pong_count + 1);
1481 ++pi2_pong_count;
1482 EXPECT_EQ(pi2_ping_count, pi2_pong_count);
1483 });
1484
1485 log_reader_factory.Run();
1486 EXPECT_EQ(pi1_ping_count, 6030);
1487 EXPECT_EQ(pi2_ping_count, 6030);
1488 EXPECT_EQ(pi1_pong_count, 6030);
1489 EXPECT_EQ(pi2_pong_count, 6030);
1490
1491 reader.Deregister();
1492}
1493
1494// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
1495TEST_P(MultinodeLoggerTest, SortParts) {
1496 time_converter_.StartEqual();
1497 // Make a bunch of parts.
1498 {
1499 LoggerState pi1_logger = MakeLogger(pi1_);
1500 LoggerState pi2_logger = MakeLogger(pi2_);
1501
1502 event_loop_factory_.RunFor(chrono::milliseconds(95));
1503
1504 StartLogger(&pi1_logger);
1505 StartLogger(&pi2_logger);
1506
1507 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1508 }
1509
1510 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1511 VerifyParts(sorted_parts);
1512}
1513
1514// Tests that we can sort a bunch of parts with an empty part. We should ignore
1515// it and remove it from the sorted list.
1516TEST_P(MultinodeLoggerTest, SortEmptyParts) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07001517 std::vector<std::string> actual_filenames;
1518
Naman Guptaa63aa132023-03-22 20:06:34 -07001519 time_converter_.StartEqual();
1520 // Make a bunch of parts.
1521 {
1522 LoggerState pi1_logger = MakeLogger(pi1_);
1523 LoggerState pi2_logger = MakeLogger(pi2_);
1524
1525 event_loop_factory_.RunFor(chrono::milliseconds(95));
1526
1527 StartLogger(&pi1_logger);
1528 StartLogger(&pi2_logger);
1529
1530 event_loop_factory_.RunFor(chrono::milliseconds(2000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07001531 pi1_logger.AppendAllFilenames(&actual_filenames);
1532 pi2_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001533 }
1534
1535 // TODO(austin): Should we flip out if the file can't open?
1536 const std::string kEmptyFile("foobarinvalidfiledoesnotexist" + Extension());
1537
1538 aos::util::WriteStringToFileOrDie(kEmptyFile, "");
Austin Schuh8fb4b452023-08-04 17:02:27 -07001539 actual_filenames.emplace_back(kEmptyFile);
Naman Guptaa63aa132023-03-22 20:06:34 -07001540
Austin Schuh8fb4b452023-08-04 17:02:27 -07001541 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001542 VerifyParts(sorted_parts, {kEmptyFile});
1543}
1544
1545// Tests that we can sort a bunch of parts with the end missing off a
1546// file. We should use the part we can read.
1547TEST_P(MultinodeLoggerTest, SortTruncatedParts) {
Austin Schuh6ecfe902023-08-04 22:44:37 -07001548 if (file_strategy() == FileStrategy::kCombine) {
1549 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
1550 }
1551
Naman Guptaa63aa132023-03-22 20:06:34 -07001552 std::vector<std::string> actual_filenames;
1553 time_converter_.StartEqual();
1554 // Make a bunch of parts.
1555 {
1556 LoggerState pi1_logger = MakeLogger(pi1_);
1557 LoggerState pi2_logger = MakeLogger(pi2_);
1558
1559 event_loop_factory_.RunFor(chrono::milliseconds(95));
1560
1561 StartLogger(&pi1_logger);
1562 StartLogger(&pi2_logger);
1563
1564 event_loop_factory_.RunFor(chrono::milliseconds(2000));
1565
1566 pi1_logger.AppendAllFilenames(&actual_filenames);
1567 pi2_logger.AppendAllFilenames(&actual_filenames);
1568 }
1569
1570 ASSERT_THAT(actual_filenames,
1571 ::testing::UnorderedElementsAreArray(logfiles_));
1572
1573 // Strip off the end of one of the files. Pick one with a lot of data.
1574 // For snappy, needs to have enough data to be >1 chunk of compressed data so
1575 // that we don't corrupt the entire log part.
1576 ::std::string compressed_contents =
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07001577 aos::util::ReadFileToStringOrDie(logfiles_[2]);
Naman Guptaa63aa132023-03-22 20:06:34 -07001578
1579 aos::util::WriteStringToFileOrDie(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07001580 logfiles_[2],
Naman Guptaa63aa132023-03-22 20:06:34 -07001581 compressed_contents.substr(0, compressed_contents.size() - 100));
1582
1583 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1584 VerifyParts(sorted_parts);
1585}
1586
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001587// Tests that if we remap a logged channel, it shows up correctly.
Naman Guptaa63aa132023-03-22 20:06:34 -07001588TEST_P(MultinodeLoggerTest, RemapLoggedChannel) {
1589 time_converter_.StartEqual();
Austin Schuh8fb4b452023-08-04 17:02:27 -07001590
1591 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07001592 {
1593 LoggerState pi1_logger = MakeLogger(pi1_);
1594 LoggerState pi2_logger = MakeLogger(pi2_);
1595
1596 event_loop_factory_.RunFor(chrono::milliseconds(95));
1597
1598 StartLogger(&pi1_logger);
1599 StartLogger(&pi2_logger);
1600
1601 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07001602
1603 pi1_logger.AppendAllFilenames(&filenames);
1604 pi2_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001605 }
1606
Austin Schuh8fb4b452023-08-04 17:02:27 -07001607 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001608 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1609 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001610
1611 // Remap just on pi1.
1612 reader.RemapLoggedChannel<aos::timing::Report>(
1613 "/aos", configuration::GetNode(reader.configuration(), "pi1"));
1614
1615 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1616 log_reader_factory.set_send_delay(chrono::microseconds(0));
1617
1618 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1619 // Note: An extra channel gets remapped automatically due to a timestamp
1620 // channel being LOCAL_LOGGER'd.
1621 ASSERT_EQ(remapped_channels.size(), std::get<0>(GetParam()).shared ? 1u : 2u);
1622 EXPECT_EQ(remapped_channels[0]->name()->string_view(), "/original/pi1/aos");
1623 EXPECT_EQ(remapped_channels[0]->type()->string_view(), "aos.timing.Report");
1624 if (!std::get<0>(GetParam()).shared) {
1625 EXPECT_EQ(remapped_channels[1]->name()->string_view(),
1626 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1627 "aos-message_bridge-Timestamp");
1628 EXPECT_EQ(remapped_channels[1]->type()->string_view(),
1629 "aos.message_bridge.RemoteMessage");
1630 }
1631
1632 reader.Register(&log_reader_factory);
1633
1634 const Node *pi1 =
1635 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1636 const Node *pi2 =
1637 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1638
1639 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1640 // else should have moved.
1641 std::unique_ptr<EventLoop> pi1_event_loop =
1642 log_reader_factory.MakeEventLoop("test", pi1);
1643 pi1_event_loop->SkipTimingReport();
1644 std::unique_ptr<EventLoop> full_pi1_event_loop =
1645 log_reader_factory.MakeEventLoop("test", pi1);
1646 full_pi1_event_loop->SkipTimingReport();
1647 std::unique_ptr<EventLoop> pi2_event_loop =
1648 log_reader_factory.MakeEventLoop("test", pi2);
1649 pi2_event_loop->SkipTimingReport();
1650
1651 MessageCounter<aos::timing::Report> pi1_timing_report(pi1_event_loop.get(),
1652 "/aos");
1653 MessageCounter<aos::timing::Report> full_pi1_timing_report(
1654 full_pi1_event_loop.get(), "/pi1/aos");
1655 MessageCounter<aos::timing::Report> pi1_original_timing_report(
1656 pi1_event_loop.get(), "/original/aos");
1657 MessageCounter<aos::timing::Report> full_pi1_original_timing_report(
1658 full_pi1_event_loop.get(), "/original/pi1/aos");
1659 MessageCounter<aos::timing::Report> pi2_timing_report(pi2_event_loop.get(),
1660 "/aos");
1661
1662 log_reader_factory.Run();
1663
1664 EXPECT_EQ(pi1_timing_report.count(), 0u);
1665 EXPECT_EQ(full_pi1_timing_report.count(), 0u);
1666 EXPECT_NE(pi1_original_timing_report.count(), 0u);
1667 EXPECT_NE(full_pi1_original_timing_report.count(), 0u);
1668 EXPECT_NE(pi2_timing_report.count(), 0u);
1669
1670 reader.Deregister();
1671}
1672
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001673// Tests that if we rename a logged channel, it shows up correctly.
1674TEST_P(MultinodeLoggerTest, RenameLoggedChannel) {
1675 std::vector<std::string> actual_filenames;
1676 time_converter_.StartEqual();
1677 {
1678 LoggerState pi1_logger = MakeLogger(pi1_);
1679 LoggerState pi2_logger = MakeLogger(pi2_);
1680
1681 event_loop_factory_.RunFor(chrono::milliseconds(95));
1682
1683 StartLogger(&pi1_logger);
1684 StartLogger(&pi2_logger);
1685
1686 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1687
1688 pi1_logger.AppendAllFilenames(&actual_filenames);
1689 pi2_logger.AppendAllFilenames(&actual_filenames);
1690 }
1691
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001692 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1693 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1694 LogReader reader(sorted_parts);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001695
1696 // Rename just on pi2. Add some global maps just to verify they get added in
1697 // the config and used correctly.
1698 std::vector<MapT> maps;
1699 {
1700 MapT map;
1701 map.match = std::make_unique<ChannelT>();
1702 map.match->name = "/foo*";
1703 map.match->source_node = "pi1";
1704 map.rename = std::make_unique<ChannelT>();
1705 map.rename->name = "/pi1/foo";
1706 maps.emplace_back(std::move(map));
1707 }
1708 {
1709 MapT map;
1710 map.match = std::make_unique<ChannelT>();
1711 map.match->name = "/foo*";
1712 map.match->source_node = "pi2";
1713 map.rename = std::make_unique<ChannelT>();
1714 map.rename->name = "/pi2/foo";
1715 maps.emplace_back(std::move(map));
1716 }
1717 {
1718 MapT map;
1719 map.match = std::make_unique<ChannelT>();
1720 map.match->name = "/foo";
1721 map.match->type = "aos.examples.Ping";
1722 map.rename = std::make_unique<ChannelT>();
1723 map.rename->name = "/foo/renamed";
1724 maps.emplace_back(std::move(map));
1725 }
1726 reader.RenameLoggedChannel<aos::examples::Ping>(
1727 "/aos", configuration::GetNode(reader.configuration(), "pi2"),
1728 "/pi2/foo/renamed", maps);
1729
1730 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1731 log_reader_factory.set_send_delay(chrono::microseconds(0));
1732
1733 std::vector<const Channel *> remapped_channels = reader.RemappedChannels();
1734 // Note: An extra channel gets remapped automatically due to a timestamp
1735 // channel being LOCAL_LOGGER'd.
1736 const bool shared = std::get<0>(GetParam()).shared;
1737 ASSERT_EQ(remapped_channels.size(), shared ? 1u : 2u);
1738 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->name()->string_view(),
1739 "/pi2/foo/renamed");
1740 EXPECT_EQ(remapped_channels[shared ? 0 : 1]->type()->string_view(),
1741 "aos.examples.Ping");
1742 if (!shared) {
1743 EXPECT_EQ(remapped_channels[0]->name()->string_view(),
1744 "/original/pi1/aos/remote_timestamps/pi2/pi1/aos/"
1745 "aos-message_bridge-Timestamp");
1746 EXPECT_EQ(remapped_channels[0]->type()->string_view(),
1747 "aos.message_bridge.RemoteMessage");
1748 }
1749
1750 reader.Register(&log_reader_factory);
1751
1752 const Node *pi1 =
1753 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1754 const Node *pi2 =
1755 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1756
1757 // Confirm we can read the data on the renamed channel, just for pi2. Nothing
1758 // else should have moved.
1759 std::unique_ptr<EventLoop> pi2_event_loop =
1760 log_reader_factory.MakeEventLoop("test", pi2);
1761 pi2_event_loop->SkipTimingReport();
1762 std::unique_ptr<EventLoop> full_pi2_event_loop =
1763 log_reader_factory.MakeEventLoop("test", pi2);
1764 full_pi2_event_loop->SkipTimingReport();
1765 std::unique_ptr<EventLoop> pi1_event_loop =
1766 log_reader_factory.MakeEventLoop("test", pi1);
1767 pi1_event_loop->SkipTimingReport();
1768
1769 MessageCounter<aos::examples::Ping> pi2_ping(pi2_event_loop.get(), "/aos");
1770 MessageCounter<aos::examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1771 "/foo");
1772 MessageCounter<aos::examples::Ping> full_pi2_renamed_ping(
1773 full_pi2_event_loop.get(), "/pi2/foo/renamed");
1774 MessageCounter<aos::examples::Ping> pi1_ping(pi1_event_loop.get(), "/aos");
1775
1776 log_reader_factory.Run();
1777
1778 EXPECT_EQ(pi2_ping.count(), 0u);
1779 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1780 EXPECT_NE(full_pi2_renamed_ping.count(), 0u);
1781 EXPECT_NE(pi1_ping.count(), 0u);
1782
1783 reader.Deregister();
1784}
1785
Naman Guptaa63aa132023-03-22 20:06:34 -07001786// Tests that we can remap a forwarded channel as well.
1787TEST_P(MultinodeLoggerTest, RemapForwardedLoggedChannel) {
1788 time_converter_.StartEqual();
1789 {
1790 LoggerState pi1_logger = MakeLogger(pi1_);
1791 LoggerState pi2_logger = MakeLogger(pi2_);
1792
1793 event_loop_factory_.RunFor(chrono::milliseconds(95));
1794
1795 StartLogger(&pi1_logger);
1796 StartLogger(&pi2_logger);
1797
1798 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1799 }
1800
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001801 const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
1802 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1803 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07001804
1805 reader.RemapLoggedChannel<examples::Ping>("/test");
1806
1807 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1808 log_reader_factory.set_send_delay(chrono::microseconds(0));
1809
1810 reader.Register(&log_reader_factory);
1811
1812 const Node *pi1 =
1813 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1814 const Node *pi2 =
1815 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1816
1817 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
1818 // else should have moved.
1819 std::unique_ptr<EventLoop> pi1_event_loop =
1820 log_reader_factory.MakeEventLoop("test", pi1);
1821 pi1_event_loop->SkipTimingReport();
1822 std::unique_ptr<EventLoop> full_pi1_event_loop =
1823 log_reader_factory.MakeEventLoop("test", pi1);
1824 full_pi1_event_loop->SkipTimingReport();
1825 std::unique_ptr<EventLoop> pi2_event_loop =
1826 log_reader_factory.MakeEventLoop("test", pi2);
1827 pi2_event_loop->SkipTimingReport();
1828
1829 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1830 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1831 MessageCounter<examples::Ping> pi1_original_ping(pi1_event_loop.get(),
1832 "/original/test");
1833 MessageCounter<examples::Ping> pi2_original_ping(pi2_event_loop.get(),
1834 "/original/test");
1835
1836 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1837 pi1_original_ping_timestamp;
1838 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1839 pi1_ping_timestamp;
1840 if (!shared()) {
1841 pi1_original_ping_timestamp =
1842 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1843 pi1_event_loop.get(),
1844 "/pi1/aos/remote_timestamps/pi2/original/test/aos-examples-Ping");
1845 pi1_ping_timestamp =
1846 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1847 pi1_event_loop.get(),
1848 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1849 }
1850
1851 log_reader_factory.Run();
1852
1853 EXPECT_EQ(pi1_ping.count(), 0u);
1854 EXPECT_EQ(pi2_ping.count(), 0u);
1855 EXPECT_NE(pi1_original_ping.count(), 0u);
1856 EXPECT_NE(pi2_original_ping.count(), 0u);
1857 if (!shared()) {
1858 EXPECT_NE(pi1_original_ping_timestamp->count(), 0u);
1859 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1860 }
1861
1862 reader.Deregister();
1863}
1864
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001865// Tests that we can rename a forwarded channel as well.
1866TEST_P(MultinodeLoggerTest, RenameForwardedLoggedChannel) {
1867 std::vector<std::string> actual_filenames;
1868 time_converter_.StartEqual();
1869 {
1870 LoggerState pi1_logger = MakeLogger(pi1_);
1871 LoggerState pi2_logger = MakeLogger(pi2_);
1872
1873 event_loop_factory_.RunFor(chrono::milliseconds(95));
1874
1875 StartLogger(&pi1_logger);
1876 StartLogger(&pi2_logger);
1877
1878 event_loop_factory_.RunFor(chrono::milliseconds(20000));
1879
1880 pi1_logger.AppendAllFilenames(&actual_filenames);
1881 pi2_logger.AppendAllFilenames(&actual_filenames);
1882 }
1883
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001884 const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
1885 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
1886 LogReader reader(sorted_parts);
Sanjay Narayanan5ec00232022-07-08 15:21:30 -07001887
1888 std::vector<MapT> maps;
1889 {
1890 MapT map;
1891 map.match = std::make_unique<ChannelT>();
1892 map.match->name = "/production*";
1893 map.match->source_node = "pi1";
1894 map.rename = std::make_unique<ChannelT>();
1895 map.rename->name = "/pi1/production";
1896 maps.emplace_back(std::move(map));
1897 }
1898 {
1899 MapT map;
1900 map.match = std::make_unique<ChannelT>();
1901 map.match->name = "/production*";
1902 map.match->source_node = "pi2";
1903 map.rename = std::make_unique<ChannelT>();
1904 map.rename->name = "/pi2/production";
1905 maps.emplace_back(std::move(map));
1906 }
1907 reader.RenameLoggedChannel<aos::examples::Ping>(
1908 "/test", configuration::GetNode(reader.configuration(), "pi1"),
1909 "/pi1/production", maps);
1910
1911 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
1912 log_reader_factory.set_send_delay(chrono::microseconds(0));
1913
1914 reader.Register(&log_reader_factory);
1915
1916 const Node *pi1 =
1917 configuration::GetNode(log_reader_factory.configuration(), "pi1");
1918 const Node *pi2 =
1919 configuration::GetNode(log_reader_factory.configuration(), "pi2");
1920
1921 // Confirm we can read the data on the renamed channel, on both the source
1922 // node and the remote node. In case of split timestamp channels, confirm that
1923 // we receive the timestamp messages on the renamed channel as well.
1924 std::unique_ptr<EventLoop> pi1_event_loop =
1925 log_reader_factory.MakeEventLoop("test", pi1);
1926 pi1_event_loop->SkipTimingReport();
1927 std::unique_ptr<EventLoop> full_pi1_event_loop =
1928 log_reader_factory.MakeEventLoop("test", pi1);
1929 full_pi1_event_loop->SkipTimingReport();
1930 std::unique_ptr<EventLoop> pi2_event_loop =
1931 log_reader_factory.MakeEventLoop("test", pi2);
1932 pi2_event_loop->SkipTimingReport();
1933
1934 MessageCounter<examples::Ping> pi1_ping(pi1_event_loop.get(), "/test");
1935 MessageCounter<examples::Ping> pi2_ping(pi2_event_loop.get(), "/test");
1936 MessageCounter<examples::Ping> pi1_renamed_ping(pi1_event_loop.get(),
1937 "/pi1/production");
1938 MessageCounter<examples::Ping> pi2_renamed_ping(pi2_event_loop.get(),
1939 "/pi1/production");
1940
1941 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1942 pi1_renamed_ping_timestamp;
1943 std::unique_ptr<MessageCounter<message_bridge::RemoteMessage>>
1944 pi1_ping_timestamp;
1945 if (!shared()) {
1946 pi1_renamed_ping_timestamp =
1947 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1948 pi1_event_loop.get(),
1949 "/pi1/aos/remote_timestamps/pi2/pi1/production/aos-examples-Ping");
1950 pi1_ping_timestamp =
1951 std::make_unique<MessageCounter<message_bridge::RemoteMessage>>(
1952 pi1_event_loop.get(),
1953 "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
1954 }
1955
1956 log_reader_factory.Run();
1957
1958 EXPECT_EQ(pi1_ping.count(), 0u);
1959 EXPECT_EQ(pi2_ping.count(), 0u);
1960 EXPECT_NE(pi1_renamed_ping.count(), 0u);
1961 EXPECT_NE(pi2_renamed_ping.count(), 0u);
1962 if (!shared()) {
1963 EXPECT_NE(pi1_renamed_ping_timestamp->count(), 0u);
1964 EXPECT_EQ(pi1_ping_timestamp->count(), 0u);
1965 }
1966
1967 reader.Deregister();
1968}
1969
Naman Guptaa63aa132023-03-22 20:06:34 -07001970// Tests that we observe all the same events in log replay (for a given node)
1971// whether we just register an event loop for that node or if we register a full
1972// event loop factory.
1973TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
1974 time_converter_.StartEqual();
1975 constexpr chrono::milliseconds kStartupDelay(95);
Austin Schuh8fb4b452023-08-04 17:02:27 -07001976 std::vector<std::string> filenames;
1977
Naman Guptaa63aa132023-03-22 20:06:34 -07001978 {
1979 LoggerState pi1_logger = MakeLogger(pi1_);
1980 LoggerState pi2_logger = MakeLogger(pi2_);
1981
1982 event_loop_factory_.RunFor(kStartupDelay);
1983
1984 StartLogger(&pi1_logger);
1985 StartLogger(&pi2_logger);
1986
1987 event_loop_factory_.RunFor(chrono::milliseconds(20000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07001988
1989 pi1_logger.AppendAllFilenames(&filenames);
1990 pi2_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07001991 }
1992
Austin Schuh8fb4b452023-08-04 17:02:27 -07001993 LogReader full_reader(SortParts(filenames));
1994 LogReader single_node_reader(SortParts(filenames));
Naman Guptaa63aa132023-03-22 20:06:34 -07001995
1996 SimulatedEventLoopFactory full_factory(full_reader.configuration());
1997 SimulatedEventLoopFactory single_node_factory(
1998 single_node_reader.configuration());
1999 single_node_factory.SkipTimingReport();
2000 single_node_factory.DisableStatistics();
2001 std::unique_ptr<EventLoop> replay_event_loop =
2002 single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
2003 "log_reader");
2004
2005 full_reader.Register(&full_factory);
2006 single_node_reader.Register(replay_event_loop.get());
2007
2008 const Node *full_pi1 =
2009 configuration::GetNode(full_factory.configuration(), "pi1");
2010
2011 // Confirm we can read the data on the remapped channel, just for pi1. Nothing
2012 // else should have moved.
2013 std::unique_ptr<EventLoop> full_event_loop =
2014 full_factory.MakeEventLoop("test", full_pi1);
2015 full_event_loop->SkipTimingReport();
2016 full_event_loop->SkipAosLog();
2017 // maps are indexed on channel index.
2018 // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
2019 std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
2020 observed_messages;
2021 std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
2022 for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
2023 ++ii) {
2024 const Channel *channel =
2025 full_event_loop->configuration()->channels()->Get(ii);
2026 // We currently don't support replaying remote timestamp channels in
2027 // realtime replay (unless the remote timestamp channel was not NOT_LOGGED,
2028 // in which case it gets auto-remapped and replayed on a /original channel).
2029 if (channel->name()->string_view().find("remote_timestamp") !=
2030 std::string_view::npos &&
2031 channel->name()->string_view().find("/original") ==
2032 std::string_view::npos) {
2033 continue;
2034 }
2035 if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
2036 observed_messages[ii] = {};
2037 fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
2038 full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
2039 if (fetchers[ii]->Fetch()) {
2040 observed_messages[ii].push_back(std::make_pair(
2041 fetchers[ii]->context().monotonic_event_time, true));
2042 }
2043 });
2044 full_event_loop->MakeRawNoArgWatcher(
2045 channel, [ii, &observed_messages](const Context &context) {
2046 observed_messages[ii].push_back(
2047 std::make_pair(context.monotonic_event_time, false));
2048 });
2049 }
2050 }
2051
2052 full_factory.Run();
2053 fetchers.clear();
2054 full_reader.Deregister();
2055
2056 const Node *single_node_pi1 =
2057 configuration::GetNode(single_node_factory.configuration(), "pi1");
2058 std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
2059
2060 std::unique_ptr<EventLoop> single_node_event_loop =
2061 single_node_factory.MakeEventLoop("test", single_node_pi1);
2062 single_node_event_loop->SkipTimingReport();
2063 single_node_event_loop->SkipAosLog();
2064 for (size_t ii = 0;
2065 ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
2066 const Channel *channel =
2067 single_node_event_loop->configuration()->channels()->Get(ii);
2068 single_node_factory.DisableForwarding(channel);
2069 if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
2070 single_node_fetchers[ii] =
2071 single_node_event_loop->MakeRawFetcher(channel);
2072 single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
2073 EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
2074 << "Single EventLoop replay doesn't support pre-loading fetchers. "
2075 << configuration::StrippedChannelToString(channel);
2076 });
2077 single_node_event_loop->MakeRawNoArgWatcher(
2078 channel, [ii, &observed_messages, channel,
2079 kStartupDelay](const Context &context) {
2080 if (observed_messages[ii].empty()) {
2081 FAIL() << "Observed extra message at "
2082 << context.monotonic_event_time << " on "
2083 << configuration::StrippedChannelToString(channel);
2084 return;
2085 }
2086 const std::pair<monotonic_clock::time_point, bool> &message =
2087 observed_messages[ii].front();
2088 if (message.second) {
2089 EXPECT_LE(message.first,
2090 context.monotonic_event_time + kStartupDelay)
2091 << "Mismatched message times " << context.monotonic_event_time
2092 << " and " << message.first << " on "
2093 << configuration::StrippedChannelToString(channel);
2094 } else {
2095 EXPECT_EQ(message.first,
2096 context.monotonic_event_time + kStartupDelay)
2097 << "Mismatched message times " << context.monotonic_event_time
2098 << " and " << message.first << " on "
2099 << configuration::StrippedChannelToString(channel);
2100 }
2101 observed_messages[ii].erase(observed_messages[ii].begin());
2102 });
2103 }
2104 }
2105
2106 single_node_factory.Run();
2107
2108 single_node_fetchers.clear();
2109
2110 single_node_reader.Deregister();
2111
2112 for (const auto &pair : observed_messages) {
2113 EXPECT_TRUE(pair.second.empty())
2114 << "Missed " << pair.second.size() << " messages on "
2115 << configuration::StrippedChannelToString(
2116 single_node_event_loop->configuration()->channels()->Get(
2117 pair.first));
2118 }
2119}
2120
2121// Tests that we properly recreate forwarded timestamps when replaying a log.
2122// This should be enough that we can then re-run the logger and get a valid log
2123// back.
2124TEST_P(MultinodeLoggerTest, MessageHeader) {
2125 time_converter_.StartEqual();
2126 {
2127 LoggerState pi1_logger = MakeLogger(pi1_);
2128 LoggerState pi2_logger = MakeLogger(pi2_);
2129
2130 event_loop_factory_.RunFor(chrono::milliseconds(95));
2131
2132 StartLogger(&pi1_logger);
2133 StartLogger(&pi2_logger);
2134
2135 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2136 }
2137
2138 LogReader reader(SortParts(logfiles_));
2139
2140 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2141 log_reader_factory.set_send_delay(chrono::microseconds(0));
2142
2143 // This sends out the fetched messages and advances time to the start of the
2144 // log file.
2145 reader.Register(&log_reader_factory);
2146
2147 const Node *pi1 =
2148 configuration::GetNode(log_reader_factory.configuration(), "pi1");
2149 const Node *pi2 =
2150 configuration::GetNode(log_reader_factory.configuration(), "pi2");
2151
2152 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
2153 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
2154 LOG(INFO) << "now pi1 "
2155 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
2156 LOG(INFO) << "now pi2 "
2157 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
2158
2159 EXPECT_THAT(reader.LoggedNodes(),
2160 ::testing::ElementsAre(
2161 configuration::GetNode(reader.logged_configuration(), pi1),
2162 configuration::GetNode(reader.logged_configuration(), pi2)));
2163
2164 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
2165
2166 std::unique_ptr<EventLoop> pi1_event_loop =
2167 log_reader_factory.MakeEventLoop("test", pi1);
2168 std::unique_ptr<EventLoop> pi2_event_loop =
2169 log_reader_factory.MakeEventLoop("test", pi2);
2170
2171 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
2172 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
2173 aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
2174 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
2175
2176 aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
2177 pi1_event_loop->MakeFetcher<examples::Ping>("/test");
2178 aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
2179 pi2_event_loop->MakeFetcher<examples::Ping>("/test");
2180
2181 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
2182 pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
2183 aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
2184 pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
2185
2186 aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
2187 pi2_event_loop->MakeFetcher<examples::Pong>("/test");
2188 aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
2189 pi1_event_loop->MakeFetcher<examples::Pong>("/test");
2190
2191 const size_t pi1_timestamp_channel = configuration::ChannelIndex(
2192 pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
2193 const size_t ping_timestamp_channel = configuration::ChannelIndex(
2194 pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
2195
2196 const size_t pi2_timestamp_channel = configuration::ChannelIndex(
2197 pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
2198 const size_t pong_timestamp_channel = configuration::ChannelIndex(
2199 pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
2200
2201 const chrono::nanoseconds network_delay = event_loop_factory_.network_delay();
2202 const chrono::nanoseconds send_delay = event_loop_factory_.send_delay();
2203
2204 for (std::pair<int, std::string> channel :
2205 shared()
2206 ? std::vector<
2207 std::pair<int, std::string>>{{-1,
2208 "/aos/remote_timestamps/pi2"}}
2209 : std::vector<std::pair<int, std::string>>{
2210 {pi1_timestamp_channel,
2211 "/aos/remote_timestamps/pi2/pi1/aos/"
2212 "aos-message_bridge-Timestamp"},
2213 {ping_timestamp_channel,
2214 "/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
2215 pi1_event_loop->MakeWatcher(
2216 channel.second,
2217 [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
2218 ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
2219 &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
2220 &ping_on_pi2_fetcher, network_delay, send_delay,
2221 channel_index = channel.first](const RemoteMessage &header) {
2222 const aos::monotonic_clock::time_point header_monotonic_sent_time(
2223 chrono::nanoseconds(header.monotonic_sent_time()));
2224 const aos::realtime_clock::time_point header_realtime_sent_time(
2225 chrono::nanoseconds(header.realtime_sent_time()));
2226 const aos::monotonic_clock::time_point header_monotonic_remote_time(
2227 chrono::nanoseconds(header.monotonic_remote_time()));
Austin Schuhb5224ec2024-03-27 15:20:09 -07002228 const aos::monotonic_clock::time_point
2229 header_monotonic_remote_transmit_time(
2230 chrono::nanoseconds(header.monotonic_remote_transmit_time()));
Naman Guptaa63aa132023-03-22 20:06:34 -07002231 const aos::realtime_clock::time_point header_realtime_remote_time(
2232 chrono::nanoseconds(header.realtime_remote_time()));
2233
2234 if (channel_index != -1) {
2235 ASSERT_EQ(channel_index, header.channel_index());
2236 }
2237
2238 const Context *pi1_context = nullptr;
2239 const Context *pi2_context = nullptr;
2240
2241 if (header.channel_index() == pi1_timestamp_channel) {
2242 ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
2243 ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
2244 pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
2245 pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002246 // Timestamps don't have wakeup delay, so they show back up after 2
2247 // times the network delay on the source node. Confirm that matches
2248 // when we are reading the log.
2249 EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
2250 pi1_context->monotonic_event_time + 2 * network_delay);
Naman Guptaa63aa132023-03-22 20:06:34 -07002251 } else if (header.channel_index() == ping_timestamp_channel) {
2252 ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
2253 ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
2254 pi1_context = &ping_on_pi1_fetcher.context();
2255 pi2_context = &ping_on_pi2_fetcher.context();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002256 // Ping messages get picked up faster at the start of each message
2257 // when timers wake up. Verify all that behavior matches exactly as
2258 // expected when reading the log.
2259 EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
2260 pi1_context->monotonic_event_time + 2 * network_delay +
2261 ((pi1_event_loop->context().monotonic_event_time -
2262 2 * network_delay)
2263 .time_since_epoch() %
2264 chrono::nanoseconds(1000000000) ==
2265 chrono::nanoseconds(0)
2266 ? chrono::nanoseconds(0)
2267 : send_delay));
Naman Guptaa63aa132023-03-22 20:06:34 -07002268 } else {
2269 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
2270 << configuration::CleanedChannelToString(
2271 pi1_event_loop->configuration()->channels()->Get(
2272 header.channel_index()));
2273 }
2274
2275 ASSERT_TRUE(header.has_boot_uuid());
2276 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
2277 pi2_event_loop->boot_uuid());
2278
2279 EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
2280 EXPECT_EQ(pi2_context->remote_queue_index,
2281 header.remote_queue_index());
2282 EXPECT_EQ(pi2_context->queue_index, header.queue_index());
2283
2284 EXPECT_EQ(pi2_context->monotonic_event_time,
2285 header_monotonic_sent_time);
2286 EXPECT_EQ(pi2_context->realtime_event_time,
2287 header_realtime_sent_time);
2288 EXPECT_EQ(pi2_context->realtime_remote_time,
2289 header_realtime_remote_time);
2290 EXPECT_EQ(pi2_context->monotonic_remote_time,
2291 header_monotonic_remote_time);
Austin Schuhb5224ec2024-03-27 15:20:09 -07002292 EXPECT_EQ(pi2_context->monotonic_remote_transmit_time,
2293 header_monotonic_remote_transmit_time);
Naman Guptaa63aa132023-03-22 20:06:34 -07002294
2295 EXPECT_EQ(pi1_context->realtime_event_time,
2296 header_realtime_remote_time);
2297 EXPECT_EQ(pi1_context->monotonic_event_time,
2298 header_monotonic_remote_time);
Naman Guptaa63aa132023-03-22 20:06:34 -07002299 });
2300 }
2301 for (std::pair<int, std::string> channel :
2302 shared()
2303 ? std::vector<
2304 std::pair<int, std::string>>{{-1,
2305 "/aos/remote_timestamps/pi1"}}
2306 : std::vector<std::pair<int, std::string>>{
2307 {pi2_timestamp_channel,
2308 "/aos/remote_timestamps/pi1/pi2/aos/"
2309 "aos-message_bridge-Timestamp"}}) {
2310 pi2_event_loop->MakeWatcher(
2311 channel.second,
2312 [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
2313 pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
2314 &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
2315 &pong_on_pi1_fetcher, network_delay, send_delay,
2316 channel_index = channel.first](const RemoteMessage &header) {
2317 const aos::monotonic_clock::time_point header_monotonic_sent_time(
2318 chrono::nanoseconds(header.monotonic_sent_time()));
2319 const aos::realtime_clock::time_point header_realtime_sent_time(
2320 chrono::nanoseconds(header.realtime_sent_time()));
2321 const aos::monotonic_clock::time_point header_monotonic_remote_time(
2322 chrono::nanoseconds(header.monotonic_remote_time()));
Austin Schuhb5224ec2024-03-27 15:20:09 -07002323 const aos::monotonic_clock::time_point
2324 header_monotonic_remote_transmit_time(
2325 chrono::nanoseconds(header.monotonic_remote_transmit_time()));
Naman Guptaa63aa132023-03-22 20:06:34 -07002326 const aos::realtime_clock::time_point header_realtime_remote_time(
2327 chrono::nanoseconds(header.realtime_remote_time()));
2328
2329 if (channel_index != -1) {
2330 ASSERT_EQ(channel_index, header.channel_index());
2331 }
2332
2333 const Context *pi2_context = nullptr;
2334 const Context *pi1_context = nullptr;
2335
2336 if (header.channel_index() == pi2_timestamp_channel) {
2337 ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
2338 ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
2339 pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
2340 pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002341 // Again, timestamps don't have wakeup delay, so they show back up
2342 // after 2 times the network delay on the source node.
2343 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
2344 pi2_context->monotonic_event_time + 2 * network_delay);
Naman Guptaa63aa132023-03-22 20:06:34 -07002345 } else if (header.channel_index() == pong_timestamp_channel) {
2346 ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
2347 ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
2348 pi2_context = &pong_on_pi2_fetcher.context();
2349 pi1_context = &pong_on_pi1_fetcher.context();
Austin Schuhac6d89e2024-03-27 14:56:09 -07002350 // And Pong messages come back repeatably since they aren't at the
2351 // start of a second.
2352 EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
2353 pi2_context->monotonic_event_time + 2 * network_delay +
2354 send_delay);
Naman Guptaa63aa132023-03-22 20:06:34 -07002355 } else {
2356 LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
2357 << configuration::CleanedChannelToString(
2358 pi2_event_loop->configuration()->channels()->Get(
2359 header.channel_index()));
2360 }
2361
2362 ASSERT_TRUE(header.has_boot_uuid());
2363 EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
2364 pi1_event_loop->boot_uuid());
2365
2366 EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
2367 EXPECT_EQ(pi1_context->remote_queue_index,
2368 header.remote_queue_index());
2369 EXPECT_EQ(pi1_context->queue_index, header.queue_index());
2370
2371 EXPECT_EQ(pi1_context->monotonic_event_time,
2372 header_monotonic_sent_time);
2373 EXPECT_EQ(pi1_context->realtime_event_time,
2374 header_realtime_sent_time);
2375 EXPECT_EQ(pi1_context->realtime_remote_time,
2376 header_realtime_remote_time);
2377 EXPECT_EQ(pi1_context->monotonic_remote_time,
2378 header_monotonic_remote_time);
Austin Schuhb5224ec2024-03-27 15:20:09 -07002379 EXPECT_EQ(pi1_context->monotonic_remote_transmit_time,
2380 header_monotonic_remote_transmit_time);
Naman Guptaa63aa132023-03-22 20:06:34 -07002381
2382 EXPECT_EQ(pi2_context->realtime_event_time,
2383 header_realtime_remote_time);
2384 EXPECT_EQ(pi2_context->monotonic_event_time,
2385 header_monotonic_remote_time);
Naman Guptaa63aa132023-03-22 20:06:34 -07002386 });
2387 }
2388
2389 // And confirm we can re-create a log again, while checking the contents.
2390 {
2391 LoggerState pi1_logger = MakeLogger(
2392 log_reader_factory.GetNodeEventLoopFactory("pi1"), &log_reader_factory);
2393 LoggerState pi2_logger = MakeLogger(
2394 log_reader_factory.GetNodeEventLoopFactory("pi2"), &log_reader_factory);
2395
Austin Schuh8fb4b452023-08-04 17:02:27 -07002396 StartLogger(&pi1_logger, tmp_dir_ + "/logs/relogged1");
2397 StartLogger(&pi2_logger, tmp_dir_ + "/logs/relogged2");
Naman Guptaa63aa132023-03-22 20:06:34 -07002398
2399 log_reader_factory.Run();
2400 }
2401
2402 reader.Deregister();
2403
2404 // And verify that we can run the LogReader over the relogged files without
2405 // hitting any fatal errors.
2406 {
Austin Schuh8fb4b452023-08-04 17:02:27 -07002407 LogReader relogged_reader(SortParts(
2408 MakeLogFiles(tmp_dir_ + "/logs/relogged1", tmp_dir_ + "/logs/relogged2",
2409 1, 1, 2, 2, true)));
Naman Guptaa63aa132023-03-22 20:06:34 -07002410 relogged_reader.Register();
2411
2412 relogged_reader.event_loop_factory()->Run();
2413 }
2414 // And confirm that we can read the logged file using the reader's
2415 // configuration.
2416 {
2417 LogReader relogged_reader(
Austin Schuh8fb4b452023-08-04 17:02:27 -07002418 SortParts(MakeLogFiles(tmp_dir_ + "/logs/relogged1",
2419 tmp_dir_ + "/logs/relogged2", 1, 1, 2, 2, true)),
Naman Guptaa63aa132023-03-22 20:06:34 -07002420 reader.configuration());
2421 relogged_reader.Register();
2422
2423 relogged_reader.event_loop_factory()->Run();
2424 }
2425}
2426
2427// Tests that we properly populate and extract the logger_start time by setting
2428// up a clock difference between 2 nodes and looking at the resulting parts.
2429TEST_P(MultinodeLoggerTest, LoggerStartTime) {
2430 std::vector<std::string> actual_filenames;
2431 time_converter_.AddMonotonic(
2432 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
2433 {
2434 LoggerState pi1_logger = MakeLogger(pi1_);
2435 LoggerState pi2_logger = MakeLogger(pi2_);
2436
2437 StartLogger(&pi1_logger);
2438 StartLogger(&pi2_logger);
2439
2440 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2441
2442 pi1_logger.AppendAllFilenames(&actual_filenames);
2443 pi2_logger.AppendAllFilenames(&actual_filenames);
2444 }
2445
2446 ASSERT_THAT(actual_filenames,
2447 ::testing::UnorderedElementsAreArray(logfiles_));
2448
Austin Schuh8fb4b452023-08-04 17:02:27 -07002449 for (const LogFile &log_file : SortParts(actual_filenames)) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002450 for (const LogParts &log_part : log_file.parts) {
2451 if (log_part.node == log_file.logger_node) {
2452 EXPECT_EQ(log_part.logger_monotonic_start_time,
2453 aos::monotonic_clock::min_time);
2454 EXPECT_EQ(log_part.logger_realtime_start_time,
2455 aos::realtime_clock::min_time);
2456 } else {
2457 const chrono::seconds offset = log_file.logger_node == "pi1"
2458 ? -chrono::seconds(1000)
2459 : chrono::seconds(1000);
2460 EXPECT_EQ(log_part.logger_monotonic_start_time,
2461 log_part.monotonic_start_time + offset);
2462 EXPECT_EQ(log_part.logger_realtime_start_time,
2463 log_file.realtime_start_time +
2464 (log_part.logger_monotonic_start_time -
2465 log_file.monotonic_start_time));
2466 }
2467 }
2468 }
2469}
2470
2471// Test that renaming the base, renames the folder.
2472TEST_P(MultinodeLoggerTest, LoggerRenameFolder) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002473 time_converter_.AddMonotonic(
2474 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07002475 logfile_base1_ = tmp_dir_ + "/logs/renamefolder/multi_logfile1";
2476 logfile_base2_ = tmp_dir_ + "/logs/renamefolder/multi_logfile2";
2477
Naman Guptaa63aa132023-03-22 20:06:34 -07002478 LoggerState pi1_logger = MakeLogger(pi1_);
2479 LoggerState pi2_logger = MakeLogger(pi2_);
2480
2481 StartLogger(&pi1_logger);
2482 StartLogger(&pi2_logger);
2483
2484 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002485 logfile_base1_ = tmp_dir_ + "/logs/new-good/multi_logfile1";
2486 logfile_base2_ = tmp_dir_ + "/logs/new-good/multi_logfile2";
Naman Guptaa63aa132023-03-22 20:06:34 -07002487 logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
Alexei Strotscaf17d32023-04-03 22:31:11 -07002488
2489 // Sequence of set_base_name and Rotate simulates rename operation. Since
2490 // rename is not supported by all namers, RenameLogBase moved from logger to
2491 // the higher level abstraction, yet log_namers support rename, and it is
2492 // legal to test it here.
2493 pi1_logger.log_namer->set_base_name(logfile_base1_);
2494 pi1_logger.logger->Rotate();
2495 pi2_logger.log_namer->set_base_name(logfile_base2_);
2496 pi2_logger.logger->Rotate();
2497
Naman Guptaa63aa132023-03-22 20:06:34 -07002498 for (auto &file : logfiles_) {
2499 struct stat s;
2500 EXPECT_EQ(0, stat(file.c_str(), &s));
2501 }
2502}
2503
2504// Test that renaming the file base dies.
2505TEST_P(MultinodeLoggerDeathTest, LoggerRenameFile) {
2506 time_converter_.AddMonotonic(
2507 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07002508 logfile_base1_ = tmp_dir_ + "/logs/renamefile/multi_logfile1";
2509 logfile_base2_ = tmp_dir_ + "/logs/renamefile/multi_logfile2";
2510
Naman Guptaa63aa132023-03-22 20:06:34 -07002511 LoggerState pi1_logger = MakeLogger(pi1_);
2512 StartLogger(&pi1_logger);
2513 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07002514 logfile_base1_ = tmp_dir_ + "/logs/new-renamefile/new_multi_logfile1";
Alexei Strotscaf17d32023-04-03 22:31:11 -07002515 EXPECT_DEATH({ pi1_logger.log_namer->set_base_name(logfile_base1_); },
Naman Guptaa63aa132023-03-22 20:06:34 -07002516 "Rename of file base from");
2517}
2518
2519// TODO(austin): We can write a test which recreates a logfile and confirms that
2520// we get it back. That is the ultimate test.
2521
2522// Tests that we properly recreate forwarded timestamps when replaying a log.
2523// This should be enough that we can then re-run the logger and get a valid log
2524// back.
2525TEST_P(MultinodeLoggerTest, RemoteReboot) {
Austin Schuh6ecfe902023-08-04 22:44:37 -07002526 if (file_strategy() == FileStrategy::kCombine) {
2527 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
2528 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002529 std::vector<std::string> actual_filenames;
2530
2531 const UUID pi1_boot0 = UUID::Random();
2532 const UUID pi2_boot0 = UUID::Random();
2533 const UUID pi2_boot1 = UUID::Random();
2534 {
2535 CHECK_EQ(pi1_index_, 0u);
2536 CHECK_EQ(pi2_index_, 1u);
2537
2538 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2539 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2540 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2541
2542 time_converter_.AddNextTimestamp(
2543 distributed_clock::epoch(),
2544 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2545 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2546 time_converter_.AddNextTimestamp(
2547 distributed_clock::epoch() + reboot_time,
2548 {BootTimestamp::epoch() + reboot_time,
2549 BootTimestamp{
2550 .boot = 1,
2551 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2552 }
2553
2554 {
2555 LoggerState pi1_logger = MakeLogger(pi1_);
2556
2557 event_loop_factory_.RunFor(chrono::milliseconds(95));
2558 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2559 pi1_boot0);
2560 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2561 pi2_boot0);
2562
2563 StartLogger(&pi1_logger);
2564
2565 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2566
2567 VLOG(1) << "Reboot now!";
2568
2569 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2570 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2571 pi1_boot0);
2572 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2573 pi2_boot1);
2574
2575 pi1_logger.AppendAllFilenames(&actual_filenames);
2576 }
2577
2578 std::sort(actual_filenames.begin(), actual_filenames.end());
2579 std::sort(pi1_reboot_logfiles_.begin(), pi1_reboot_logfiles_.end());
2580 ASSERT_THAT(actual_filenames,
2581 ::testing::UnorderedElementsAreArray(pi1_reboot_logfiles_));
2582
2583 // Confirm that our new oldest timestamps properly update as we reboot and
2584 // rotate.
2585 for (const std::string &file : pi1_reboot_logfiles_) {
2586 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2587 ReadHeader(file);
2588 CHECK(log_header);
2589 if (log_header->message().has_configuration()) {
2590 continue;
2591 }
2592
2593 const monotonic_clock::time_point monotonic_start_time =
2594 monotonic_clock::time_point(
2595 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2596 const UUID source_node_boot_uuid = UUID::FromString(
2597 log_header->message().source_node_boot_uuid()->string_view());
2598
2599 if (log_header->message().node()->name()->string_view() != "pi1") {
2600 // The remote message channel should rotate later and have more parts.
2601 // This only is true on the log files with shared remote messages.
2602 //
2603 // TODO(austin): I'm not the most thrilled with this test pattern... It
2604 // feels brittle in a different way.
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002605 if (file.find("timestamps/remote_pi2") == std::string::npos) {
Naman Guptaa63aa132023-03-22 20:06:34 -07002606 switch (log_header->message().parts_index()) {
2607 case 0:
2608 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2609 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2610 break;
2611 case 1:
2612 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2613 ASSERT_EQ(monotonic_start_time,
2614 monotonic_clock::epoch() + chrono::seconds(1));
2615 break;
2616 case 2:
2617 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2618 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2619 break;
2620 case 3:
2621 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2622 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
Austin Schuhac6d89e2024-03-27 14:56:09 -07002623 chrono::nanoseconds(2323000000))
Naman Guptaa63aa132023-03-22 20:06:34 -07002624 << " on " << file;
2625 break;
2626 default:
2627 FAIL();
2628 break;
2629 }
2630 } else {
2631 switch (log_header->message().parts_index()) {
2632 case 0:
2633 case 1:
2634 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2635 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
2636 break;
2637 case 2:
2638 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
2639 ASSERT_EQ(monotonic_start_time,
2640 monotonic_clock::epoch() + chrono::seconds(1));
2641 break;
2642 case 3:
2643 case 4:
2644 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2645 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
2646 break;
2647 case 5:
2648 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
2649 ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
Austin Schuhac6d89e2024-03-27 14:56:09 -07002650 chrono::nanoseconds(2323000000))
Naman Guptaa63aa132023-03-22 20:06:34 -07002651 << " on " << file;
2652 break;
2653 default:
2654 FAIL();
2655 break;
2656 }
2657 }
2658 continue;
2659 }
2660 SCOPED_TRACE(file);
2661 SCOPED_TRACE(aos::FlatbufferToJson(
2662 *log_header, {.multi_line = true, .max_vector_size = 100}));
2663 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2664 ASSERT_EQ(
2665 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2666 EXPECT_EQ(
2667 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
2668 monotonic_clock::max_time.time_since_epoch().count());
2669 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2670 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2671 2u);
2672 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
2673 monotonic_clock::max_time.time_since_epoch().count());
2674 ASSERT_TRUE(log_header->message()
2675 .has_oldest_remote_unreliable_monotonic_timestamps());
2676 ASSERT_EQ(log_header->message()
2677 .oldest_remote_unreliable_monotonic_timestamps()
2678 ->size(),
2679 2u);
2680 EXPECT_EQ(log_header->message()
2681 .oldest_remote_unreliable_monotonic_timestamps()
2682 ->Get(0),
2683 monotonic_clock::max_time.time_since_epoch().count());
2684 ASSERT_TRUE(log_header->message()
2685 .has_oldest_local_unreliable_monotonic_timestamps());
2686 ASSERT_EQ(log_header->message()
2687 .oldest_local_unreliable_monotonic_timestamps()
2688 ->size(),
2689 2u);
2690 EXPECT_EQ(log_header->message()
2691 .oldest_local_unreliable_monotonic_timestamps()
2692 ->Get(0),
2693 monotonic_clock::max_time.time_since_epoch().count());
2694
2695 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
2696 monotonic_clock::time_point(chrono::nanoseconds(
2697 log_header->message().oldest_remote_monotonic_timestamps()->Get(
2698 1)));
2699 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
2700 monotonic_clock::time_point(chrono::nanoseconds(
2701 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
2702 const monotonic_clock::time_point
2703 oldest_remote_unreliable_monotonic_timestamps =
2704 monotonic_clock::time_point(chrono::nanoseconds(
2705 log_header->message()
2706 .oldest_remote_unreliable_monotonic_timestamps()
2707 ->Get(1)));
2708 const monotonic_clock::time_point
2709 oldest_local_unreliable_monotonic_timestamps =
2710 monotonic_clock::time_point(chrono::nanoseconds(
2711 log_header->message()
2712 .oldest_local_unreliable_monotonic_timestamps()
2713 ->Get(1)));
2714 const monotonic_clock::time_point
Austin Schuhb5224ec2024-03-27 15:20:09 -07002715 oldest_remote_reliable_monotonic_transmit_timestamps =
2716 monotonic_clock::time_point(chrono::nanoseconds(
2717 log_header->message()
2718 .oldest_remote_reliable_monotonic_transmit_timestamps()
2719 ->Get(1)));
2720 const monotonic_clock::time_point
2721 oldest_local_reliable_monotonic_transmit_timestamps =
2722 monotonic_clock::time_point(chrono::nanoseconds(
2723 log_header->message()
2724 .oldest_local_reliable_monotonic_transmit_timestamps()
2725 ->Get(1)));
2726 const monotonic_clock::time_point
Naman Guptaa63aa132023-03-22 20:06:34 -07002727 oldest_remote_reliable_monotonic_timestamps =
2728 monotonic_clock::time_point(chrono::nanoseconds(
2729 log_header->message()
2730 .oldest_remote_reliable_monotonic_timestamps()
2731 ->Get(1)));
2732 const monotonic_clock::time_point
2733 oldest_local_reliable_monotonic_timestamps =
2734 monotonic_clock::time_point(chrono::nanoseconds(
2735 log_header->message()
2736 .oldest_local_reliable_monotonic_timestamps()
2737 ->Get(1)));
2738 const monotonic_clock::time_point
2739 oldest_logger_remote_unreliable_monotonic_timestamps =
2740 monotonic_clock::time_point(chrono::nanoseconds(
2741 log_header->message()
2742 .oldest_logger_remote_unreliable_monotonic_timestamps()
2743 ->Get(0)));
2744 const monotonic_clock::time_point
2745 oldest_logger_local_unreliable_monotonic_timestamps =
2746 monotonic_clock::time_point(chrono::nanoseconds(
2747 log_header->message()
2748 .oldest_logger_local_unreliable_monotonic_timestamps()
2749 ->Get(0)));
2750 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
2751 monotonic_clock::max_time);
2752 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
2753 monotonic_clock::max_time);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002754 if (log_header->message().data_stored()->Get(0) == StoredDataType::DATA) {
2755 switch (log_header->message().parts_index()) {
2756 case 0:
2757 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2758 monotonic_clock::max_time);
2759 EXPECT_EQ(oldest_local_monotonic_timestamps,
2760 monotonic_clock::max_time);
2761 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2762 monotonic_clock::max_time);
2763 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2764 monotonic_clock::max_time);
2765 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2766 monotonic_clock::max_time);
2767 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2768 monotonic_clock::max_time);
Austin Schuhb5224ec2024-03-27 15:20:09 -07002769 EXPECT_EQ(oldest_remote_reliable_monotonic_transmit_timestamps,
2770 monotonic_clock::max_time);
2771 EXPECT_EQ(oldest_local_reliable_monotonic_transmit_timestamps,
2772 monotonic_clock::max_time);
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002773 break;
2774 default:
2775 FAIL();
2776 break;
2777 }
2778 } else if (log_header->message().data_stored()->Get(0) ==
2779 StoredDataType::TIMESTAMPS) {
2780 switch (log_header->message().parts_index()) {
2781 case 0:
2782 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2783 monotonic_clock::time_point(chrono::microseconds(90200)));
2784 EXPECT_EQ(oldest_local_monotonic_timestamps,
2785 monotonic_clock::time_point(chrono::microseconds(90350)));
2786 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2787 monotonic_clock::time_point(chrono::microseconds(90200)));
2788 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2789 monotonic_clock::time_point(chrono::microseconds(90350)));
2790 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2791 monotonic_clock::max_time);
2792 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2793 monotonic_clock::max_time);
Austin Schuhb5224ec2024-03-27 15:20:09 -07002794 EXPECT_EQ(oldest_remote_reliable_monotonic_transmit_timestamps,
2795 monotonic_clock::time_point(chrono::microseconds(90250)));
2796 EXPECT_EQ(oldest_local_reliable_monotonic_transmit_timestamps,
2797 monotonic_clock::time_point(chrono::microseconds(90350)));
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002798 break;
2799 case 1:
2800 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2801 monotonic_clock::time_point(chrono::microseconds(90200)))
2802 << file;
2803 EXPECT_EQ(oldest_local_monotonic_timestamps,
2804 monotonic_clock::time_point(chrono::microseconds(90350)))
2805 << file;
2806 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2807 monotonic_clock::time_point(chrono::microseconds(90200)))
2808 << file;
2809 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
2810 monotonic_clock::time_point(chrono::microseconds(90350)))
2811 << file;
2812 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2813 monotonic_clock::time_point(chrono::microseconds(100000)))
2814 << file;
2815 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
Austin Schuhac6d89e2024-03-27 14:56:09 -07002816 monotonic_clock::time_point(chrono::microseconds(100100)))
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002817 << file;
Austin Schuhb5224ec2024-03-27 15:20:09 -07002818 EXPECT_EQ(oldest_remote_reliable_monotonic_transmit_timestamps,
2819 monotonic_clock::time_point(chrono::microseconds(90250)))
2820 << file;
2821 EXPECT_EQ(oldest_local_reliable_monotonic_transmit_timestamps,
2822 monotonic_clock::time_point(chrono::microseconds(90350)))
2823 << file;
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002824 break;
2825 case 2:
2826 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2827 monotonic_clock::time_point(chrono::milliseconds(1323) +
2828 chrono::microseconds(200)));
2829 EXPECT_EQ(
2830 oldest_local_monotonic_timestamps,
2831 monotonic_clock::time_point(chrono::microseconds(10100350)));
2832 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2833 monotonic_clock::time_point(chrono::milliseconds(1323) +
2834 chrono::microseconds(200)));
2835 EXPECT_EQ(
2836 oldest_local_unreliable_monotonic_timestamps,
2837 monotonic_clock::time_point(chrono::microseconds(10100350)));
2838 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2839 monotonic_clock::max_time)
2840 << file;
2841 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
2842 monotonic_clock::max_time)
2843 << file;
Austin Schuhb5224ec2024-03-27 15:20:09 -07002844 EXPECT_EQ(oldest_remote_reliable_monotonic_transmit_timestamps,
2845 monotonic_clock::time_point(chrono::milliseconds(1323) +
2846 chrono::microseconds(250)))
2847 << file;
2848 EXPECT_EQ(oldest_local_reliable_monotonic_transmit_timestamps,
2849 monotonic_clock::time_point(chrono::microseconds(10100350)))
2850 << file;
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002851 break;
2852 case 3:
2853 ASSERT_EQ(oldest_remote_monotonic_timestamps,
2854 monotonic_clock::time_point(chrono::milliseconds(1323) +
2855 chrono::microseconds(200)));
2856 EXPECT_EQ(
2857 oldest_local_monotonic_timestamps,
2858 monotonic_clock::time_point(chrono::microseconds(10100350)));
2859 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
2860 monotonic_clock::time_point(chrono::milliseconds(1323) +
2861 chrono::microseconds(200)));
2862 EXPECT_EQ(
2863 oldest_local_unreliable_monotonic_timestamps,
2864 monotonic_clock::time_point(chrono::microseconds(10100350)));
2865 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
2866 monotonic_clock::time_point(chrono::microseconds(1423000)))
2867 << file;
2868 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
Austin Schuhac6d89e2024-03-27 14:56:09 -07002869 monotonic_clock::time_point(chrono::microseconds(10200100)))
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002870 << file;
Austin Schuhb5224ec2024-03-27 15:20:09 -07002871 EXPECT_EQ(oldest_remote_reliable_monotonic_transmit_timestamps,
2872 monotonic_clock::time_point(chrono::milliseconds(1323) +
2873 chrono::microseconds(250)))
2874 << file;
2875 EXPECT_EQ(oldest_local_reliable_monotonic_transmit_timestamps,
2876 monotonic_clock::time_point(chrono::microseconds(10100350)))
2877 << file;
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07002878 break;
2879 default:
2880 FAIL();
2881 break;
2882 }
Naman Guptaa63aa132023-03-22 20:06:34 -07002883 }
2884 }
2885
2886 // Confirm that we refuse to replay logs with missing boot uuids.
2887 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07002888 auto sorted_parts = SortParts(pi1_reboot_logfiles_);
2889 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
2890 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07002891
2892 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
2893 log_reader_factory.set_send_delay(chrono::microseconds(0));
2894
2895 // This sends out the fetched messages and advances time to the start of
2896 // the log file.
2897 reader.Register(&log_reader_factory);
2898
2899 log_reader_factory.Run();
2900
2901 reader.Deregister();
2902 }
2903}
2904
2905// Tests that we can sort a log which only has timestamps from the remote
2906// because the local message_bridge_client failed to connect.
2907TEST_P(MultinodeLoggerTest, RemoteRebootOnlyTimestamps) {
Austin Schuh6ecfe902023-08-04 22:44:37 -07002908 if (file_strategy() == FileStrategy::kCombine) {
2909 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
2910 }
2911
Naman Guptaa63aa132023-03-22 20:06:34 -07002912 const UUID pi1_boot0 = UUID::Random();
2913 const UUID pi2_boot0 = UUID::Random();
2914 const UUID pi2_boot1 = UUID::Random();
2915 {
2916 CHECK_EQ(pi1_index_, 0u);
2917 CHECK_EQ(pi2_index_, 1u);
2918
2919 time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
2920 time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
2921 time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
2922
2923 time_converter_.AddNextTimestamp(
2924 distributed_clock::epoch(),
2925 {BootTimestamp::epoch(), BootTimestamp::epoch()});
2926 const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
2927 time_converter_.AddNextTimestamp(
2928 distributed_clock::epoch() + reboot_time,
2929 {BootTimestamp::epoch() + reboot_time,
2930 BootTimestamp{
2931 .boot = 1,
2932 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
2933 }
2934 pi2_->Disconnect(pi1_->node());
2935
2936 std::vector<std::string> filenames;
2937 {
2938 LoggerState pi1_logger = MakeLogger(pi1_);
2939
2940 event_loop_factory_.RunFor(chrono::milliseconds(95));
2941 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2942 pi1_boot0);
2943 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2944 pi2_boot0);
2945
2946 StartLogger(&pi1_logger);
2947
2948 event_loop_factory_.RunFor(chrono::milliseconds(10000));
2949
2950 VLOG(1) << "Reboot now!";
2951
2952 event_loop_factory_.RunFor(chrono::milliseconds(20000));
2953 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
2954 pi1_boot0);
2955 EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
2956 pi2_boot1);
2957 pi1_logger.AppendAllFilenames(&filenames);
2958 }
2959
2960 std::sort(filenames.begin(), filenames.end());
2961
2962 // Confirm that our new oldest timestamps properly update as we reboot and
2963 // rotate.
2964 size_t timestamp_file_count = 0;
2965 for (const std::string &file : filenames) {
2966 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
2967 ReadHeader(file);
2968 CHECK(log_header);
2969
2970 if (log_header->message().has_configuration()) {
2971 continue;
2972 }
2973
2974 const monotonic_clock::time_point monotonic_start_time =
2975 monotonic_clock::time_point(
2976 chrono::nanoseconds(log_header->message().monotonic_start_time()));
2977 const UUID source_node_boot_uuid = UUID::FromString(
2978 log_header->message().source_node_boot_uuid()->string_view());
2979
2980 ASSERT_TRUE(log_header->message().has_oldest_remote_monotonic_timestamps());
2981 ASSERT_EQ(
2982 log_header->message().oldest_remote_monotonic_timestamps()->size(), 2u);
2983 ASSERT_TRUE(log_header->message().has_oldest_local_monotonic_timestamps());
2984 ASSERT_EQ(log_header->message().oldest_local_monotonic_timestamps()->size(),
2985 2u);
2986 ASSERT_TRUE(log_header->message()
2987 .has_oldest_remote_unreliable_monotonic_timestamps());
2988 ASSERT_EQ(log_header->message()
2989 .oldest_remote_unreliable_monotonic_timestamps()
2990 ->size(),
2991 2u);
2992 ASSERT_TRUE(log_header->message()
2993 .has_oldest_local_unreliable_monotonic_timestamps());
2994 ASSERT_EQ(log_header->message()
2995 .oldest_local_unreliable_monotonic_timestamps()
2996 ->size(),
2997 2u);
2998 ASSERT_TRUE(log_header->message()
2999 .has_oldest_remote_reliable_monotonic_timestamps());
3000 ASSERT_EQ(log_header->message()
3001 .oldest_remote_reliable_monotonic_timestamps()
3002 ->size(),
3003 2u);
3004 ASSERT_TRUE(
3005 log_header->message().has_oldest_local_reliable_monotonic_timestamps());
3006 ASSERT_EQ(log_header->message()
3007 .oldest_local_reliable_monotonic_timestamps()
3008 ->size(),
3009 2u);
3010
3011 ASSERT_TRUE(
3012 log_header->message()
3013 .has_oldest_logger_remote_unreliable_monotonic_timestamps());
3014 ASSERT_EQ(log_header->message()
3015 .oldest_logger_remote_unreliable_monotonic_timestamps()
3016 ->size(),
3017 2u);
3018 ASSERT_TRUE(log_header->message()
3019 .has_oldest_logger_local_unreliable_monotonic_timestamps());
3020 ASSERT_EQ(log_header->message()
3021 .oldest_logger_local_unreliable_monotonic_timestamps()
3022 ->size(),
3023 2u);
3024
3025 if (log_header->message().node()->name()->string_view() != "pi1") {
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07003026 ASSERT_TRUE(file.find("timestamps/remote_pi2") != std::string::npos);
Naman Guptaa63aa132023-03-22 20:06:34 -07003027
3028 const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
3029 ReadNthMessage(file, 0);
3030 CHECK(msg);
3031
3032 EXPECT_TRUE(msg->message().has_monotonic_sent_time());
3033 EXPECT_TRUE(msg->message().has_monotonic_remote_time());
3034
3035 const monotonic_clock::time_point
3036 expected_oldest_local_monotonic_timestamps(
3037 chrono::nanoseconds(msg->message().monotonic_sent_time()));
3038 const monotonic_clock::time_point
3039 expected_oldest_remote_monotonic_timestamps(
3040 chrono::nanoseconds(msg->message().monotonic_remote_time()));
3041 const monotonic_clock::time_point
3042 expected_oldest_timestamp_monotonic_timestamps(
3043 chrono::nanoseconds(msg->message().monotonic_timestamp_time()));
3044
3045 EXPECT_NE(expected_oldest_local_monotonic_timestamps,
3046 monotonic_clock::min_time);
3047 EXPECT_NE(expected_oldest_remote_monotonic_timestamps,
3048 monotonic_clock::min_time);
3049 EXPECT_NE(expected_oldest_timestamp_monotonic_timestamps,
3050 monotonic_clock::min_time);
3051
3052 ++timestamp_file_count;
3053 // Since the log file is from the perspective of the other node,
3054 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
3055 monotonic_clock::time_point(chrono::nanoseconds(
3056 log_header->message().oldest_remote_monotonic_timestamps()->Get(
3057 0)));
3058 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
3059 monotonic_clock::time_point(chrono::nanoseconds(
3060 log_header->message().oldest_local_monotonic_timestamps()->Get(
3061 0)));
3062 const monotonic_clock::time_point
3063 oldest_remote_unreliable_monotonic_timestamps =
3064 monotonic_clock::time_point(chrono::nanoseconds(
3065 log_header->message()
3066 .oldest_remote_unreliable_monotonic_timestamps()
3067 ->Get(0)));
3068 const monotonic_clock::time_point
3069 oldest_local_unreliable_monotonic_timestamps =
3070 monotonic_clock::time_point(chrono::nanoseconds(
3071 log_header->message()
3072 .oldest_local_unreliable_monotonic_timestamps()
3073 ->Get(0)));
3074 const monotonic_clock::time_point
3075 oldest_remote_reliable_monotonic_timestamps =
3076 monotonic_clock::time_point(chrono::nanoseconds(
3077 log_header->message()
3078 .oldest_remote_reliable_monotonic_timestamps()
3079 ->Get(0)));
3080 const monotonic_clock::time_point
3081 oldest_local_reliable_monotonic_timestamps =
3082 monotonic_clock::time_point(chrono::nanoseconds(
3083 log_header->message()
3084 .oldest_local_reliable_monotonic_timestamps()
3085 ->Get(0)));
3086 const monotonic_clock::time_point
3087 oldest_logger_remote_unreliable_monotonic_timestamps =
3088 monotonic_clock::time_point(chrono::nanoseconds(
3089 log_header->message()
3090 .oldest_logger_remote_unreliable_monotonic_timestamps()
3091 ->Get(1)));
3092 const monotonic_clock::time_point
3093 oldest_logger_local_unreliable_monotonic_timestamps =
3094 monotonic_clock::time_point(chrono::nanoseconds(
3095 log_header->message()
3096 .oldest_logger_local_unreliable_monotonic_timestamps()
3097 ->Get(1)));
3098
3099 const Channel *channel =
3100 event_loop_factory_.configuration()->channels()->Get(
3101 msg->message().channel_index());
3102 const Connection *connection = configuration::ConnectionToNode(
3103 channel, configuration::GetNode(
3104 event_loop_factory_.configuration(),
3105 log_header->message().node()->name()->string_view()));
3106
3107 const bool reliable = connection->time_to_live() == 0;
3108
3109 SCOPED_TRACE(file);
3110 SCOPED_TRACE(aos::FlatbufferToJson(
3111 *log_header, {.multi_line = true, .max_vector_size = 100}));
3112
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003113 // Confirm that the oldest timestamps match what we expect. Based on
3114 // what we are doing, we know that the oldest time is the first
3115 // message's time.
3116 //
3117 // This makes the test robust to both the split and combined config
3118 // tests.
3119 switch (log_header->message().parts_index()) {
3120 case 0:
3121 EXPECT_EQ(oldest_remote_monotonic_timestamps,
3122 expected_oldest_remote_monotonic_timestamps);
3123 EXPECT_EQ(oldest_local_monotonic_timestamps,
3124 expected_oldest_local_monotonic_timestamps);
3125 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
3126 expected_oldest_local_monotonic_timestamps)
3127 << file;
3128 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
3129 expected_oldest_timestamp_monotonic_timestamps)
3130 << file;
3131
3132 if (reliable) {
3133 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
Naman Guptaa63aa132023-03-22 20:06:34 -07003134 expected_oldest_remote_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003135 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
Naman Guptaa63aa132023-03-22 20:06:34 -07003136 expected_oldest_local_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003137 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
3138 monotonic_clock::max_time);
3139 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
3140 monotonic_clock::max_time);
3141 } else {
3142 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
3143 monotonic_clock::max_time);
3144 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
3145 monotonic_clock::max_time);
Naman Guptaa63aa132023-03-22 20:06:34 -07003146 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
3147 expected_oldest_remote_monotonic_timestamps);
3148 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
3149 expected_oldest_local_monotonic_timestamps);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003150 }
3151 break;
3152 case 1:
3153 EXPECT_EQ(oldest_remote_monotonic_timestamps,
3154 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
3155 EXPECT_EQ(oldest_local_monotonic_timestamps,
3156 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
3157 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
3158 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
3159 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
3160 monotonic_clock::epoch() + chrono::nanoseconds(90250000));
3161 if (reliable) {
3162 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
3163 expected_oldest_remote_monotonic_timestamps);
3164 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
3165 expected_oldest_local_monotonic_timestamps);
3166 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
3167 monotonic_clock::epoch() + chrono::nanoseconds(90000000));
3168 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
3169 monotonic_clock::epoch() + chrono::nanoseconds(90150000));
3170 } else {
3171 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
3172 monotonic_clock::max_time);
3173 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
3174 monotonic_clock::max_time);
3175 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
3176 expected_oldest_remote_monotonic_timestamps);
3177 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
3178 expected_oldest_local_monotonic_timestamps);
3179 }
3180 break;
3181 case 2:
3182 EXPECT_EQ(
3183 oldest_remote_monotonic_timestamps,
3184 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
3185 EXPECT_EQ(oldest_local_monotonic_timestamps,
3186 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
3187 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
3188 expected_oldest_local_monotonic_timestamps)
3189 << file;
3190 EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
3191 expected_oldest_timestamp_monotonic_timestamps)
3192 << file;
3193 if (reliable) {
3194 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
3195 expected_oldest_remote_monotonic_timestamps);
3196 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
3197 expected_oldest_local_monotonic_timestamps);
3198 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
3199 monotonic_clock::max_time);
3200 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
3201 monotonic_clock::max_time);
3202 } else {
3203 EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
3204 monotonic_clock::max_time);
3205 EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
3206 monotonic_clock::max_time);
3207 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
3208 expected_oldest_remote_monotonic_timestamps);
3209 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
3210 expected_oldest_local_monotonic_timestamps);
3211 }
3212 break;
Naman Guptaa63aa132023-03-22 20:06:34 -07003213
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003214 case 3:
3215 EXPECT_EQ(
3216 oldest_remote_monotonic_timestamps,
3217 monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
3218 EXPECT_EQ(oldest_local_monotonic_timestamps,
3219 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
3220 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
3221 expected_oldest_remote_monotonic_timestamps);
3222 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
3223 expected_oldest_local_monotonic_timestamps);
3224 EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
3225 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
3226 EXPECT_EQ(
3227 oldest_logger_local_unreliable_monotonic_timestamps,
3228 monotonic_clock::epoch() + chrono::nanoseconds(10100200000));
3229 break;
3230 default:
3231 FAIL();
3232 break;
Naman Guptaa63aa132023-03-22 20:06:34 -07003233 }
3234
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003235 switch (log_header->message().parts_index()) {
3236 case 0:
3237 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
3238 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
3239 break;
3240 case 1:
3241 EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
3242 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
3243 break;
3244 case 2:
3245 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
3246 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
3247 break;
3248 case 3:
3249 EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
3250 EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
3251 break;
3252 [[fallthrough]];
3253 default:
3254 FAIL();
3255 break;
3256 }
Naman Guptaa63aa132023-03-22 20:06:34 -07003257 continue;
3258 }
3259 EXPECT_EQ(
3260 log_header->message().oldest_remote_monotonic_timestamps()->Get(0),
3261 monotonic_clock::max_time.time_since_epoch().count());
3262 EXPECT_EQ(log_header->message().oldest_local_monotonic_timestamps()->Get(0),
3263 monotonic_clock::max_time.time_since_epoch().count());
3264 EXPECT_EQ(log_header->message()
3265 .oldest_remote_unreliable_monotonic_timestamps()
3266 ->Get(0),
3267 monotonic_clock::max_time.time_since_epoch().count());
3268 EXPECT_EQ(log_header->message()
3269 .oldest_local_unreliable_monotonic_timestamps()
3270 ->Get(0),
3271 monotonic_clock::max_time.time_since_epoch().count());
3272
3273 const monotonic_clock::time_point oldest_remote_monotonic_timestamps =
3274 monotonic_clock::time_point(chrono::nanoseconds(
3275 log_header->message().oldest_remote_monotonic_timestamps()->Get(
3276 1)));
3277 const monotonic_clock::time_point oldest_local_monotonic_timestamps =
3278 monotonic_clock::time_point(chrono::nanoseconds(
3279 log_header->message().oldest_local_monotonic_timestamps()->Get(1)));
3280 const monotonic_clock::time_point
3281 oldest_remote_unreliable_monotonic_timestamps =
3282 monotonic_clock::time_point(chrono::nanoseconds(
3283 log_header->message()
3284 .oldest_remote_unreliable_monotonic_timestamps()
3285 ->Get(1)));
3286 const monotonic_clock::time_point
3287 oldest_local_unreliable_monotonic_timestamps =
3288 monotonic_clock::time_point(chrono::nanoseconds(
3289 log_header->message()
3290 .oldest_local_unreliable_monotonic_timestamps()
3291 ->Get(1)));
3292 switch (log_header->message().parts_index()) {
3293 case 0:
3294 EXPECT_EQ(oldest_remote_monotonic_timestamps,
3295 monotonic_clock::max_time);
3296 EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
3297 EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
3298 monotonic_clock::max_time);
3299 EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
3300 monotonic_clock::max_time);
3301 break;
3302 default:
3303 FAIL();
3304 break;
3305 }
3306 }
3307
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003308 EXPECT_EQ(timestamp_file_count, 4u);
Naman Guptaa63aa132023-03-22 20:06:34 -07003309
3310 // Confirm that we can actually sort the resulting log and read it.
3311 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003312 auto sorted_parts = SortParts(filenames);
3313 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3314 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07003315
3316 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
3317 log_reader_factory.set_send_delay(chrono::microseconds(0));
3318
3319 // This sends out the fetched messages and advances time to the start of
3320 // the log file.
3321 reader.Register(&log_reader_factory);
3322
3323 log_reader_factory.Run();
3324
3325 reader.Deregister();
3326 }
3327}
3328
3329// Tests that we properly handle one direction of message_bridge being
3330// unavailable.
3331TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07003332 std::vector<std::string> actual_filenames;
3333
Naman Guptaa63aa132023-03-22 20:06:34 -07003334 pi1_->Disconnect(pi2_->node());
3335 time_converter_.AddMonotonic(
3336 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
3337
3338 time_converter_.AddMonotonic(
3339 {chrono::milliseconds(10000),
3340 chrono::milliseconds(10000) - chrono::milliseconds(1)});
3341 {
3342 LoggerState pi1_logger = MakeLogger(pi1_);
3343
3344 event_loop_factory_.RunFor(chrono::milliseconds(95));
3345
3346 StartLogger(&pi1_logger);
3347
3348 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07003349 pi1_logger.AppendAllFilenames(&actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07003350 }
3351
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003352 // Confirm that we can parse the result. LogReader has enough internal
3353 // CHECKs to confirm the right thing happened.
Austin Schuh8fb4b452023-08-04 17:02:27 -07003354 ConfirmReadable(actual_filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07003355}
3356
3357// Tests that we properly handle one direction of message_bridge being
3358// unavailable.
3359TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
3360 pi1_->Disconnect(pi2_->node());
3361 time_converter_.AddMonotonic(
3362 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(500)});
3363
3364 time_converter_.AddMonotonic(
3365 {chrono::milliseconds(10000),
3366 chrono::milliseconds(10000) + chrono::milliseconds(1)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07003367
3368 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07003369 {
3370 LoggerState pi1_logger = MakeLogger(pi1_);
3371
3372 event_loop_factory_.RunFor(chrono::milliseconds(95));
3373
3374 StartLogger(&pi1_logger);
3375
3376 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07003377 pi1_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07003378 }
3379
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003380 // Confirm that we can parse the result. LogReader has enough internal
3381 // CHECKs to confirm the right thing happened.
Austin Schuh8fb4b452023-08-04 17:02:27 -07003382 ConfirmReadable(filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07003383}
3384
3385// Tests that we explode if someone passes in a part file twice with a better
3386// error than an out of order error.
3387TEST_P(MultinodeLoggerTest, DuplicateLogFiles) {
3388 time_converter_.AddMonotonic(
3389 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
Austin Schuh8fb4b452023-08-04 17:02:27 -07003390
3391 std::vector<std::string> filenames;
Naman Guptaa63aa132023-03-22 20:06:34 -07003392 {
3393 LoggerState pi1_logger = MakeLogger(pi1_);
3394
3395 event_loop_factory_.RunFor(chrono::milliseconds(95));
3396
3397 StartLogger(&pi1_logger);
3398
3399 event_loop_factory_.RunFor(chrono::milliseconds(10000));
Austin Schuh8fb4b452023-08-04 17:02:27 -07003400
3401 pi1_logger.AppendAllFilenames(&filenames);
Naman Guptaa63aa132023-03-22 20:06:34 -07003402 }
3403
3404 std::vector<std::string> duplicates;
Austin Schuh8fb4b452023-08-04 17:02:27 -07003405 for (const std::string &f : filenames) {
Naman Guptaa63aa132023-03-22 20:06:34 -07003406 duplicates.emplace_back(f);
3407 duplicates.emplace_back(f);
3408 }
3409 EXPECT_DEATH({ SortParts(duplicates); }, "Found duplicate parts in");
3410}
3411
3412// Tests that we explode if someone loses a part out of the middle of a log.
3413TEST_P(MultinodeLoggerTest, MissingPartsFromMiddle) {
Austin Schuh6ecfe902023-08-04 22:44:37 -07003414 if (file_strategy() == FileStrategy::kCombine) {
3415 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
3416 }
Naman Guptaa63aa132023-03-22 20:06:34 -07003417 time_converter_.AddMonotonic(
3418 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
3419 {
3420 LoggerState pi1_logger = MakeLogger(pi1_);
3421
3422 event_loop_factory_.RunFor(chrono::milliseconds(95));
3423
3424 StartLogger(&pi1_logger);
3425 aos::monotonic_clock::time_point last_rotation_time =
3426 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07003427 pi1_logger.logger->set_on_logged_period(
3428 [&](aos::monotonic_clock::time_point) {
3429 const auto now = pi1_logger.event_loop->monotonic_now();
3430 if (now > last_rotation_time + std::chrono::seconds(5)) {
3431 pi1_logger.logger->Rotate();
3432 last_rotation_time = now;
3433 }
3434 });
Naman Guptaa63aa132023-03-22 20:06:34 -07003435
3436 event_loop_factory_.RunFor(chrono::milliseconds(10000));
3437 }
3438
3439 std::vector<std::string> missing_parts;
3440
Mithun Bharadwajc54aa022023-08-02 16:10:41 -07003441 missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part0" +
3442 Extension());
3443 missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part2" +
3444 Extension());
Naman Guptaa63aa132023-03-22 20:06:34 -07003445 missing_parts.emplace_back(absl::StrCat(
3446 logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));
3447
3448 EXPECT_DEATH({ SortParts(missing_parts); },
3449 "Broken log, missing part files between");
3450}
3451
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003452// Tests that we properly handle a dead node. Do this by just disconnecting
3453// it and only using one nodes of logs.
Naman Guptaa63aa132023-03-22 20:06:34 -07003454TEST_P(MultinodeLoggerTest, DeadNode) {
3455 pi1_->Disconnect(pi2_->node());
3456 pi2_->Disconnect(pi1_->node());
3457 time_converter_.AddMonotonic(
3458 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
3459 {
3460 LoggerState pi1_logger = MakeLogger(pi1_);
3461
3462 event_loop_factory_.RunFor(chrono::milliseconds(95));
3463
3464 StartLogger(&pi1_logger);
3465
3466 event_loop_factory_.RunFor(chrono::milliseconds(10000));
3467 }
3468
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003469 // Confirm that we can parse the result. LogReader has enough internal
3470 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003471 ConfirmReadable(MakePi1DeadNodeLogfiles());
3472}
3473
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003474// Tests that we can relog with a different config. This makes most sense
3475// when you are trying to edit a log and want to use channel renaming + the
3476// original config in the new log.
Naman Guptaa63aa132023-03-22 20:06:34 -07003477TEST_P(MultinodeLoggerTest, LogDifferentConfig) {
3478 time_converter_.StartEqual();
3479 {
3480 LoggerState pi1_logger = MakeLogger(pi1_);
3481 LoggerState pi2_logger = MakeLogger(pi2_);
3482
3483 event_loop_factory_.RunFor(chrono::milliseconds(95));
3484
3485 StartLogger(&pi1_logger);
3486 StartLogger(&pi2_logger);
3487
3488 event_loop_factory_.RunFor(chrono::milliseconds(20000));
3489 }
3490
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003491 auto sorted_parts = SortParts(logfiles_);
3492 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3493 LogReader reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07003494 reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
3495
3496 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
3497 log_reader_factory.set_send_delay(chrono::microseconds(0));
3498
3499 // This sends out the fetched messages and advances time to the start of the
3500 // log file.
3501 reader.Register(&log_reader_factory);
3502
3503 const Node *pi1 =
3504 configuration::GetNode(log_reader_factory.configuration(), "pi1");
3505 const Node *pi2 =
3506 configuration::GetNode(log_reader_factory.configuration(), "pi2");
3507
3508 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
3509 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
3510 LOG(INFO) << "now pi1 "
3511 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
3512 LOG(INFO) << "now pi2 "
3513 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
3514
3515 EXPECT_THAT(reader.LoggedNodes(),
3516 ::testing::ElementsAre(
3517 configuration::GetNode(reader.logged_configuration(), pi1),
3518 configuration::GetNode(reader.logged_configuration(), pi2)));
3519
3520 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
3521
3522 // And confirm we can re-create a log again, while checking the contents.
3523 std::vector<std::string> log_files;
3524 {
3525 LoggerState pi1_logger =
3526 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
3527 &log_reader_factory, reader.logged_configuration());
3528 LoggerState pi2_logger =
3529 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
3530 &log_reader_factory, reader.logged_configuration());
3531
Austin Schuh7e417682023-08-11 17:05:30 -07003532 pi1_logger.StartLogger(tmp_dir_ + "/logs/relogged1");
3533 pi2_logger.StartLogger(tmp_dir_ + "/logs/relogged2");
Naman Guptaa63aa132023-03-22 20:06:34 -07003534
3535 log_reader_factory.Run();
3536
3537 for (auto &x : pi1_logger.log_namer->all_filenames()) {
Austin Schuh7e417682023-08-11 17:05:30 -07003538 log_files.emplace_back(absl::StrCat(tmp_dir_, "/logs/relogged1_", x));
Naman Guptaa63aa132023-03-22 20:06:34 -07003539 }
3540 for (auto &x : pi2_logger.log_namer->all_filenames()) {
Austin Schuh7e417682023-08-11 17:05:30 -07003541 log_files.emplace_back(absl::StrCat(tmp_dir_, "/logs/relogged2_", x));
Naman Guptaa63aa132023-03-22 20:06:34 -07003542 }
3543 }
3544
3545 reader.Deregister();
3546
3547 // And verify that we can run the LogReader over the relogged files without
3548 // hitting any fatal errors.
3549 {
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003550 auto sorted_parts = SortParts(log_files);
3551 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3552 LogReader relogged_reader(sorted_parts);
Naman Guptaa63aa132023-03-22 20:06:34 -07003553 relogged_reader.Register();
3554
3555 relogged_reader.event_loop_factory()->Run();
3556 }
3557}
3558
Maxwell Gumley8c1b87f2024-02-13 17:54:52 -07003559// Tests that we can relog with a subset of the original config. This is useful
3560// for excluding obsolete or deprecated channels, so they don't appear in the
3561// configuration when reading the log.
3562TEST_P(MultinodeLoggerTest, LogPartialConfig) {
3563 time_converter_.StartEqual();
3564 {
3565 LoggerState pi1_logger = MakeLogger(pi1_);
3566 LoggerState pi2_logger = MakeLogger(pi2_);
3567
3568 event_loop_factory_.RunFor(chrono::milliseconds(95));
3569
3570 StartLogger(&pi1_logger);
3571 StartLogger(&pi2_logger);
3572
3573 event_loop_factory_.RunFor(chrono::milliseconds(20000));
3574 }
3575
3576 auto sorted_parts = SortParts(logfiles_);
3577 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3578 LogReader reader(sorted_parts);
3579 reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
3580
3581 SimulatedEventLoopFactory log_reader_factory(reader.configuration());
3582 log_reader_factory.set_send_delay(chrono::microseconds(0));
3583
3584 // This sends out the fetched messages and advances time to the start of the
3585 // log file.
3586 reader.Register(&log_reader_factory);
3587
3588 const Node *pi1 =
3589 configuration::GetNode(log_reader_factory.configuration(), "pi1");
3590 const Node *pi2 =
3591 configuration::GetNode(log_reader_factory.configuration(), "pi2");
3592
3593 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
3594 LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
3595 LOG(INFO) << "now pi1 "
3596 << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
3597 LOG(INFO) << "now pi2 "
3598 << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
3599
3600 EXPECT_THAT(reader.LoggedNodes(),
3601 ::testing::ElementsAre(
3602 configuration::GetNode(reader.logged_configuration(), pi1),
3603 configuration::GetNode(reader.logged_configuration(), pi2)));
3604
3605 reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
3606
3607 const FlatbufferDetachedBuffer<Configuration> partial_configuration_buffer =
3608 configuration::GetPartialConfiguration(
3609 *reader.event_loop_factory()->configuration(),
3610 [](const Channel &channel) {
3611 if (channel.name()->string_view().starts_with("/original/")) {
3612 LOG(INFO) << "Omitting channel from save_log, channel: "
3613 << channel.name()->string_view() << ", "
3614 << channel.type()->string_view();
3615 return false;
3616 }
3617 return true;
3618 });
3619
3620 // And confirm we can re-create a log again, while checking the contents.
3621 std::vector<std::string> log_files;
3622 {
3623 const Configuration *partial_configuration =
3624 &(partial_configuration_buffer.message());
3625
3626 LoggerState pi1_logger =
3627 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
3628 &log_reader_factory, partial_configuration);
3629 LoggerState pi2_logger =
3630 MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
3631 &log_reader_factory, partial_configuration);
3632
3633 pi1_logger.StartLogger(tmp_dir_ + "/logs/relogged1");
3634 pi2_logger.StartLogger(tmp_dir_ + "/logs/relogged2");
3635
3636 log_reader_factory.Run();
3637
3638 for (auto &x : pi1_logger.log_namer->all_filenames()) {
3639 log_files.emplace_back(absl::StrCat(tmp_dir_, "/logs/relogged1_", x));
3640 }
3641 for (auto &x : pi2_logger.log_namer->all_filenames()) {
3642 log_files.emplace_back(absl::StrCat(tmp_dir_, "/logs/relogged2_", x));
3643 }
3644 }
3645
3646 reader.Deregister();
3647
3648 // And verify that we can run the LogReader over the relogged files without
3649 // hitting any fatal errors.
3650 {
3651 auto sorted_parts = SortParts(log_files);
3652 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
3653 LogReader relogged_reader(sorted_parts);
3654 relogged_reader.Register();
3655
3656 relogged_reader.event_loop_factory()->Run();
3657 }
3658}
3659
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003660// Tests that we properly replay a log where the start time for a node is
3661// before any data on the node. This can happen if the logger starts before
3662// data is published. While the scenario below is a bit convoluted, we have
3663// seen logs like this generated out in the wild.
Naman Guptaa63aa132023-03-22 20:06:34 -07003664TEST(MultinodeRebootLoggerTest, StartTimeBeforeData) {
Austin Schuh7e417682023-08-11 17:05:30 -07003665 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3666 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3667
Naman Guptaa63aa132023-03-22 20:06:34 -07003668 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3669 aos::configuration::ReadConfig(ArtifactPath(
3670 "aos/events/logging/multinode_pingpong_split3_config.json"));
3671 message_bridge::TestingTimeConverter time_converter(
3672 configuration::NodesCount(&config.message()));
3673 SimulatedEventLoopFactory event_loop_factory(&config.message());
3674 event_loop_factory.SetTimeConverter(&time_converter);
3675 NodeEventLoopFactory *const pi1 =
3676 event_loop_factory.GetNodeEventLoopFactory("pi1");
3677 const size_t pi1_index = configuration::GetNodeIndex(
3678 event_loop_factory.configuration(), pi1->node());
3679 NodeEventLoopFactory *const pi2 =
3680 event_loop_factory.GetNodeEventLoopFactory("pi2");
3681 const size_t pi2_index = configuration::GetNodeIndex(
3682 event_loop_factory.configuration(), pi2->node());
3683 NodeEventLoopFactory *const pi3 =
3684 event_loop_factory.GetNodeEventLoopFactory("pi3");
3685 const size_t pi3_index = configuration::GetNodeIndex(
3686 event_loop_factory.configuration(), pi3->node());
3687
3688 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003689 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003690 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003691 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003692 const std::string kLogfile2_2 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003693 aos::testing::TestTmpDir() + "/logs/multi_logfile2.2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003694 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003695 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
3696
Naman Guptaa63aa132023-03-22 20:06:34 -07003697 const UUID pi1_boot0 = UUID::Random();
3698 const UUID pi2_boot0 = UUID::Random();
3699 const UUID pi2_boot1 = UUID::Random();
3700 const UUID pi3_boot0 = UUID::Random();
3701 {
3702 CHECK_EQ(pi1_index, 0u);
3703 CHECK_EQ(pi2_index, 1u);
3704 CHECK_EQ(pi3_index, 2u);
3705
3706 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3707 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3708 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3709 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3710
3711 time_converter.AddNextTimestamp(
3712 distributed_clock::epoch(),
3713 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3714 BootTimestamp::epoch()});
3715 const chrono::nanoseconds reboot_time = chrono::milliseconds(20000);
3716 time_converter.AddNextTimestamp(
3717 distributed_clock::epoch() + reboot_time,
3718 {BootTimestamp::epoch() + reboot_time,
3719 BootTimestamp{
3720 .boot = 1,
3721 .time = monotonic_clock::epoch() + chrono::milliseconds(1323)},
3722 BootTimestamp::epoch() + reboot_time});
3723 }
3724
3725 // Make everything perfectly quiet.
3726 event_loop_factory.SkipTimingReport();
3727 event_loop_factory.DisableStatistics();
3728
3729 std::vector<std::string> filenames;
3730 {
3731 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003732 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3733 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003734 LoggerState pi3_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003735 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3736 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003737 {
3738 // And now start the logger.
3739 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003740 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3741 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003742
3743 event_loop_factory.RunFor(chrono::milliseconds(1000));
3744
3745 pi1_logger.StartLogger(kLogfile1_1);
3746 pi3_logger.StartLogger(kLogfile3_1);
3747 pi2_logger.StartLogger(kLogfile2_1);
3748
3749 event_loop_factory.RunFor(chrono::milliseconds(10000));
3750
3751 // Now that we've got a start time in the past, turn on data.
3752 event_loop_factory.EnableStatistics();
3753 std::unique_ptr<aos::EventLoop> ping_event_loop =
3754 pi1->MakeEventLoop("ping");
3755 Ping ping(ping_event_loop.get());
3756
3757 pi2->AlwaysStart<Pong>("pong");
3758
3759 event_loop_factory.RunFor(chrono::milliseconds(3000));
3760
3761 pi2_logger.AppendAllFilenames(&filenames);
3762
3763 // Stop logging on pi2 before rebooting and completely shut off all
3764 // messages on pi2.
3765 pi2->DisableStatistics();
3766 pi1->Disconnect(pi2->node());
3767 pi2->Disconnect(pi1->node());
3768 }
3769 event_loop_factory.RunFor(chrono::milliseconds(7000));
3770 // pi2 now reboots.
3771 {
3772 event_loop_factory.RunFor(chrono::milliseconds(1000));
3773
3774 // Start logging again on pi2 after it is up.
3775 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003776 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3777 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003778 pi2_logger.StartLogger(kLogfile2_2);
3779
3780 event_loop_factory.RunFor(chrono::milliseconds(10000));
3781 // And, now that we have a start time in the log, turn data back on.
3782 pi2->EnableStatistics();
3783 pi1->Connect(pi2->node());
3784 pi2->Connect(pi1->node());
3785
3786 pi2->AlwaysStart<Pong>("pong");
3787 std::unique_ptr<aos::EventLoop> ping_event_loop =
3788 pi1->MakeEventLoop("ping");
3789 Ping ping(ping_event_loop.get());
3790
3791 event_loop_factory.RunFor(chrono::milliseconds(3000));
3792
3793 pi2_logger.AppendAllFilenames(&filenames);
3794 }
3795
3796 pi1_logger.AppendAllFilenames(&filenames);
3797 pi3_logger.AppendAllFilenames(&filenames);
3798 }
3799
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003800 // Confirm that we can parse the result. LogReader has enough internal
3801 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003802 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003803 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003804 auto result = ConfirmReadable(filenames);
3805 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
3806 chrono::seconds(1)));
3807 EXPECT_THAT(result[0].second,
3808 ::testing::ElementsAre(realtime_clock::epoch() +
3809 chrono::microseconds(34990350)));
3810
3811 EXPECT_THAT(result[1].first,
3812 ::testing::ElementsAre(
3813 realtime_clock::epoch() + chrono::seconds(1),
3814 realtime_clock::epoch() + chrono::microseconds(3323000)));
3815 EXPECT_THAT(result[1].second,
3816 ::testing::ElementsAre(
3817 realtime_clock::epoch() + chrono::microseconds(13990200),
3818 realtime_clock::epoch() + chrono::microseconds(16313200)));
3819
3820 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
3821 chrono::seconds(1)));
3822 EXPECT_THAT(result[2].second,
3823 ::testing::ElementsAre(realtime_clock::epoch() +
Austin Schuhac6d89e2024-03-27 14:56:09 -07003824 chrono::microseconds(34900100)));
Naman Guptaa63aa132023-03-22 20:06:34 -07003825}
3826
3827// Tests that local data before remote data after reboot is properly replayed.
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003828// We only trigger a reboot in the timestamp interpolation function when
3829// solving the timestamp problem when we actually have a point in the
3830// function. This originally only happened when a point passes the noncausal
3831// filter. At the start of time for the second boot, if we aren't careful, we
3832// will have messages which need to be published at times before the boot.
3833// This happens when a local message is in the log before a forwarded message,
3834// so there is no point in the interpolation function. This delays the
3835// reboot. So, we need to recreate that situation and make sure it doesn't
3836// come back.
Naman Guptaa63aa132023-03-22 20:06:34 -07003837TEST(MultinodeRebootLoggerTest,
3838 LocalMessageBeforeRemoteBeforeStartAfterReboot) {
Austin Schuh7e417682023-08-11 17:05:30 -07003839 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
3840 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
3841
Naman Guptaa63aa132023-03-22 20:06:34 -07003842 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
3843 aos::configuration::ReadConfig(ArtifactPath(
3844 "aos/events/logging/multinode_pingpong_split3_config.json"));
3845 message_bridge::TestingTimeConverter time_converter(
3846 configuration::NodesCount(&config.message()));
3847 SimulatedEventLoopFactory event_loop_factory(&config.message());
3848 event_loop_factory.SetTimeConverter(&time_converter);
3849 NodeEventLoopFactory *const pi1 =
3850 event_loop_factory.GetNodeEventLoopFactory("pi1");
3851 const size_t pi1_index = configuration::GetNodeIndex(
3852 event_loop_factory.configuration(), pi1->node());
3853 NodeEventLoopFactory *const pi2 =
3854 event_loop_factory.GetNodeEventLoopFactory("pi2");
3855 const size_t pi2_index = configuration::GetNodeIndex(
3856 event_loop_factory.configuration(), pi2->node());
3857 NodeEventLoopFactory *const pi3 =
3858 event_loop_factory.GetNodeEventLoopFactory("pi3");
3859 const size_t pi3_index = configuration::GetNodeIndex(
3860 event_loop_factory.configuration(), pi3->node());
3861
3862 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003863 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003864 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003865 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003866 const std::string kLogfile2_2 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003867 aos::testing::TestTmpDir() + "/logs/multi_logfile2.2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003868 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07003869 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
Naman Guptaa63aa132023-03-22 20:06:34 -07003870 const UUID pi1_boot0 = UUID::Random();
3871 const UUID pi2_boot0 = UUID::Random();
3872 const UUID pi2_boot1 = UUID::Random();
3873 const UUID pi3_boot0 = UUID::Random();
3874 {
3875 CHECK_EQ(pi1_index, 0u);
3876 CHECK_EQ(pi2_index, 1u);
3877 CHECK_EQ(pi3_index, 2u);
3878
3879 time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
3880 time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
3881 time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
3882 time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
3883
3884 time_converter.AddNextTimestamp(
3885 distributed_clock::epoch(),
3886 {BootTimestamp::epoch(), BootTimestamp::epoch(),
3887 BootTimestamp::epoch()});
3888 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
3889 time_converter.AddNextTimestamp(
3890 distributed_clock::epoch() + reboot_time,
3891 {BootTimestamp::epoch() + reboot_time,
3892 BootTimestamp{.boot = 1,
3893 .time = monotonic_clock::epoch() + reboot_time +
3894 chrono::seconds(100)},
3895 BootTimestamp::epoch() + reboot_time});
3896 }
3897
3898 std::vector<std::string> filenames;
3899 {
3900 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003901 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3902 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003903 LoggerState pi3_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003904 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3905 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003906 {
3907 // And now start the logger.
3908 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003909 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3910 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003911
3912 pi1_logger.StartLogger(kLogfile1_1);
3913 pi3_logger.StartLogger(kLogfile3_1);
3914 pi2_logger.StartLogger(kLogfile2_1);
3915
3916 event_loop_factory.RunFor(chrono::milliseconds(1005));
3917
3918 // Now that we've got a start time in the past, turn on data.
3919 std::unique_ptr<aos::EventLoop> ping_event_loop =
3920 pi1->MakeEventLoop("ping");
3921 Ping ping(ping_event_loop.get());
3922
3923 pi2->AlwaysStart<Pong>("pong");
3924
3925 event_loop_factory.RunFor(chrono::milliseconds(3000));
3926
3927 pi2_logger.AppendAllFilenames(&filenames);
3928
3929 // Disable any remote messages on pi2.
3930 pi1->Disconnect(pi2->node());
3931 pi2->Disconnect(pi1->node());
3932 }
3933 event_loop_factory.RunFor(chrono::milliseconds(995));
3934 // pi2 now reboots at 5 seconds.
3935 {
3936 event_loop_factory.RunFor(chrono::milliseconds(1000));
3937
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003938 // Make local stuff happen before we start logging and connect the
3939 // remote.
Naman Guptaa63aa132023-03-22 20:06:34 -07003940 pi2->AlwaysStart<Pong>("pong");
3941 std::unique_ptr<aos::EventLoop> ping_event_loop =
3942 pi1->MakeEventLoop("ping");
3943 Ping ping(ping_event_loop.get());
3944 event_loop_factory.RunFor(chrono::milliseconds(1005));
3945
3946 // Start logging again on pi2 after it is up.
3947 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07003948 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
3949 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07003950 pi2_logger.StartLogger(kLogfile2_2);
3951
3952 // And allow remote messages now that we have some local ones.
3953 pi1->Connect(pi2->node());
3954 pi2->Connect(pi1->node());
3955
3956 event_loop_factory.RunFor(chrono::milliseconds(1000));
3957
3958 event_loop_factory.RunFor(chrono::milliseconds(3000));
3959
3960 pi2_logger.AppendAllFilenames(&filenames);
3961 }
3962
3963 pi1_logger.AppendAllFilenames(&filenames);
3964 pi3_logger.AppendAllFilenames(&filenames);
3965 }
3966
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07003967 // Confirm that we can parse the result. LogReader has enough internal
3968 // CHECKs to confirm the right thing happened.
Naman Guptaa63aa132023-03-22 20:06:34 -07003969 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07003970 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07003971 auto result = ConfirmReadable(filenames);
3972
3973 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
3974 EXPECT_THAT(result[0].second,
3975 ::testing::ElementsAre(realtime_clock::epoch() +
Austin Schuhac6d89e2024-03-27 14:56:09 -07003976 chrono::microseconds(11000300)));
Naman Guptaa63aa132023-03-22 20:06:34 -07003977
3978 EXPECT_THAT(result[1].first,
3979 ::testing::ElementsAre(
3980 realtime_clock::epoch(),
3981 realtime_clock::epoch() + chrono::microseconds(107005000)));
3982 EXPECT_THAT(result[1].second,
3983 ::testing::ElementsAre(
Austin Schuhac6d89e2024-03-27 14:56:09 -07003984 realtime_clock::epoch() + chrono::microseconds(4000100),
3985 realtime_clock::epoch() + chrono::microseconds(111000150)));
Naman Guptaa63aa132023-03-22 20:06:34 -07003986
3987 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
3988 EXPECT_THAT(result[2].second,
3989 ::testing::ElementsAre(realtime_clock::epoch() +
Austin Schuhac6d89e2024-03-27 14:56:09 -07003990 chrono::microseconds(11000100)));
Naman Guptaa63aa132023-03-22 20:06:34 -07003991
3992 auto start_stop_result = ConfirmReadable(
3993 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
3994 realtime_clock::epoch() + chrono::milliseconds(3000));
3995
3996 EXPECT_THAT(
3997 start_stop_result[0].first,
3998 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
3999 EXPECT_THAT(
4000 start_stop_result[0].second,
4001 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
4002 EXPECT_THAT(
4003 start_stop_result[1].first,
4004 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
4005 EXPECT_THAT(
4006 start_stop_result[1].second,
4007 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
4008 EXPECT_THAT(
4009 start_stop_result[2].first,
4010 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
4011 EXPECT_THAT(
4012 start_stop_result[2].second,
4013 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
4014}
4015
4016// Tests that setting the start and stop flags across a reboot works as
4017// expected.
4018TEST(MultinodeRebootLoggerTest, RebootStartStopTimes) {
Austin Schuh7e417682023-08-11 17:05:30 -07004019 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4020 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4021
Naman Guptaa63aa132023-03-22 20:06:34 -07004022 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4023 aos::configuration::ReadConfig(ArtifactPath(
4024 "aos/events/logging/multinode_pingpong_split3_config.json"));
4025 message_bridge::TestingTimeConverter time_converter(
4026 configuration::NodesCount(&config.message()));
4027 SimulatedEventLoopFactory event_loop_factory(&config.message());
4028 event_loop_factory.SetTimeConverter(&time_converter);
4029 NodeEventLoopFactory *const pi1 =
4030 event_loop_factory.GetNodeEventLoopFactory("pi1");
4031 const size_t pi1_index = configuration::GetNodeIndex(
4032 event_loop_factory.configuration(), pi1->node());
4033 NodeEventLoopFactory *const pi2 =
4034 event_loop_factory.GetNodeEventLoopFactory("pi2");
4035 const size_t pi2_index = configuration::GetNodeIndex(
4036 event_loop_factory.configuration(), pi2->node());
4037 NodeEventLoopFactory *const pi3 =
4038 event_loop_factory.GetNodeEventLoopFactory("pi3");
4039 const size_t pi3_index = configuration::GetNodeIndex(
4040 event_loop_factory.configuration(), pi3->node());
4041
4042 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004043 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004044 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004045 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004046 const std::string kLogfile2_2 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004047 aos::testing::TestTmpDir() + "/logs/multi_logfile2.2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004048 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004049 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004050 {
4051 CHECK_EQ(pi1_index, 0u);
4052 CHECK_EQ(pi2_index, 1u);
4053 CHECK_EQ(pi3_index, 2u);
4054
4055 time_converter.AddNextTimestamp(
4056 distributed_clock::epoch(),
4057 {BootTimestamp::epoch(), BootTimestamp::epoch(),
4058 BootTimestamp::epoch()});
4059 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
4060 time_converter.AddNextTimestamp(
4061 distributed_clock::epoch() + reboot_time,
4062 {BootTimestamp::epoch() + reboot_time,
4063 BootTimestamp{.boot = 1,
4064 .time = monotonic_clock::epoch() + reboot_time},
4065 BootTimestamp::epoch() + reboot_time});
4066 }
4067
4068 std::vector<std::string> filenames;
4069 {
4070 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004071 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4072 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004073 LoggerState pi3_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004074 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4075 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004076 {
4077 // And now start the logger.
4078 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004079 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4080 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004081
4082 pi1_logger.StartLogger(kLogfile1_1);
4083 pi3_logger.StartLogger(kLogfile3_1);
4084 pi2_logger.StartLogger(kLogfile2_1);
4085
4086 event_loop_factory.RunFor(chrono::milliseconds(1005));
4087
4088 // Now that we've got a start time in the past, turn on data.
4089 std::unique_ptr<aos::EventLoop> ping_event_loop =
4090 pi1->MakeEventLoop("ping");
4091 Ping ping(ping_event_loop.get());
4092
4093 pi2->AlwaysStart<Pong>("pong");
4094
4095 event_loop_factory.RunFor(chrono::milliseconds(3000));
4096
4097 pi2_logger.AppendAllFilenames(&filenames);
4098 }
4099 event_loop_factory.RunFor(chrono::milliseconds(995));
4100 // pi2 now reboots at 5 seconds.
4101 {
4102 event_loop_factory.RunFor(chrono::milliseconds(1000));
4103
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004104 // Make local stuff happen before we start logging and connect the
4105 // remote.
Naman Guptaa63aa132023-03-22 20:06:34 -07004106 pi2->AlwaysStart<Pong>("pong");
4107 std::unique_ptr<aos::EventLoop> ping_event_loop =
4108 pi1->MakeEventLoop("ping");
4109 Ping ping(ping_event_loop.get());
4110 event_loop_factory.RunFor(chrono::milliseconds(5));
4111
4112 // Start logging again on pi2 after it is up.
4113 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004114 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4115 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004116 pi2_logger.StartLogger(kLogfile2_2);
4117
4118 event_loop_factory.RunFor(chrono::milliseconds(5000));
4119
4120 pi2_logger.AppendAllFilenames(&filenames);
4121 }
4122
4123 pi1_logger.AppendAllFilenames(&filenames);
4124 pi3_logger.AppendAllFilenames(&filenames);
4125 }
4126
4127 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004128 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004129 auto result = ConfirmReadable(filenames);
4130
4131 EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
4132 EXPECT_THAT(result[0].second,
4133 ::testing::ElementsAre(realtime_clock::epoch() +
Austin Schuhac6d89e2024-03-27 14:56:09 -07004134 chrono::microseconds(11000300)));
Naman Guptaa63aa132023-03-22 20:06:34 -07004135
4136 EXPECT_THAT(result[1].first,
4137 ::testing::ElementsAre(
4138 realtime_clock::epoch(),
4139 realtime_clock::epoch() + chrono::microseconds(6005000)));
4140 EXPECT_THAT(result[1].second,
4141 ::testing::ElementsAre(
Austin Schuhac6d89e2024-03-27 14:56:09 -07004142 realtime_clock::epoch() + chrono::microseconds(4900100),
4143 realtime_clock::epoch() + chrono::microseconds(11000150)));
Naman Guptaa63aa132023-03-22 20:06:34 -07004144
4145 EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
4146 EXPECT_THAT(result[2].second,
4147 ::testing::ElementsAre(realtime_clock::epoch() +
Austin Schuhac6d89e2024-03-27 14:56:09 -07004148 chrono::microseconds(11000100)));
Naman Guptaa63aa132023-03-22 20:06:34 -07004149
4150 // Confirm we observed the correct start and stop times. We should see the
4151 // reboot here.
4152 auto start_stop_result = ConfirmReadable(
4153 filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
4154 realtime_clock::epoch() + chrono::milliseconds(8000));
4155
4156 EXPECT_THAT(
4157 start_stop_result[0].first,
4158 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
4159 EXPECT_THAT(
4160 start_stop_result[0].second,
4161 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
4162 EXPECT_THAT(start_stop_result[1].first,
4163 ::testing::ElementsAre(
4164 realtime_clock::epoch() + chrono::seconds(2),
4165 realtime_clock::epoch() + chrono::microseconds(6005000)));
4166 EXPECT_THAT(start_stop_result[1].second,
4167 ::testing::ElementsAre(
Austin Schuhac6d89e2024-03-27 14:56:09 -07004168 realtime_clock::epoch() + chrono::microseconds(4900100),
Naman Guptaa63aa132023-03-22 20:06:34 -07004169 realtime_clock::epoch() + chrono::seconds(8)));
4170 EXPECT_THAT(
4171 start_stop_result[2].first,
4172 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
4173 EXPECT_THAT(
4174 start_stop_result[2].second,
4175 ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
4176}
4177
4178// Tests that we properly handle one direction being down.
4179TEST(MissingDirectionTest, OneDirection) {
Austin Schuh7e417682023-08-11 17:05:30 -07004180 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4181 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4182
Naman Guptaa63aa132023-03-22 20:06:34 -07004183 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4184 aos::configuration::ReadConfig(ArtifactPath(
4185 "aos/events/logging/multinode_pingpong_split4_config.json"));
4186 message_bridge::TestingTimeConverter time_converter(
4187 configuration::NodesCount(&config.message()));
4188 SimulatedEventLoopFactory event_loop_factory(&config.message());
4189 event_loop_factory.SetTimeConverter(&time_converter);
4190
4191 NodeEventLoopFactory *const pi1 =
4192 event_loop_factory.GetNodeEventLoopFactory("pi1");
4193 const size_t pi1_index = configuration::GetNodeIndex(
4194 event_loop_factory.configuration(), pi1->node());
4195 NodeEventLoopFactory *const pi2 =
4196 event_loop_factory.GetNodeEventLoopFactory("pi2");
4197 const size_t pi2_index = configuration::GetNodeIndex(
4198 event_loop_factory.configuration(), pi2->node());
4199 std::vector<std::string> filenames;
4200
4201 {
4202 CHECK_EQ(pi1_index, 0u);
4203 CHECK_EQ(pi2_index, 1u);
4204
4205 time_converter.AddNextTimestamp(
4206 distributed_clock::epoch(),
4207 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4208
4209 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
4210 time_converter.AddNextTimestamp(
4211 distributed_clock::epoch() + reboot_time,
4212 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
4213 BootTimestamp::epoch() + reboot_time});
4214 }
4215
4216 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004217 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004218 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004219 aos::testing::TestTmpDir() + "/logs/multi_logfile1.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004220
4221 pi2->Disconnect(pi1->node());
4222
4223 pi1->AlwaysStart<Ping>("ping");
4224 pi2->AlwaysStart<Pong>("pong");
4225
4226 {
4227 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004228 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4229 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004230
4231 event_loop_factory.RunFor(chrono::milliseconds(95));
4232
4233 pi2_logger.StartLogger(kLogfile2_1);
4234
4235 event_loop_factory.RunFor(chrono::milliseconds(6000));
4236
4237 pi2->Connect(pi1->node());
4238
4239 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004240 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4241 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004242 pi1_logger.StartLogger(kLogfile1_1);
4243
4244 event_loop_factory.RunFor(chrono::milliseconds(5000));
4245 pi1_logger.AppendAllFilenames(&filenames);
4246 pi2_logger.AppendAllFilenames(&filenames);
4247 }
4248
4249 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004250 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004251 ConfirmReadable(filenames);
4252}
4253
4254// Tests that we properly handle only one direction ever existing after a
4255// reboot.
4256TEST(MissingDirectionTest, OneDirectionAfterReboot) {
Austin Schuh7e417682023-08-11 17:05:30 -07004257 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4258 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4259
Naman Guptaa63aa132023-03-22 20:06:34 -07004260 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4261 aos::configuration::ReadConfig(ArtifactPath(
4262 "aos/events/logging/multinode_pingpong_split4_config.json"));
4263 message_bridge::TestingTimeConverter time_converter(
4264 configuration::NodesCount(&config.message()));
4265 SimulatedEventLoopFactory event_loop_factory(&config.message());
4266 event_loop_factory.SetTimeConverter(&time_converter);
4267
4268 NodeEventLoopFactory *const pi1 =
4269 event_loop_factory.GetNodeEventLoopFactory("pi1");
4270 const size_t pi1_index = configuration::GetNodeIndex(
4271 event_loop_factory.configuration(), pi1->node());
4272 NodeEventLoopFactory *const pi2 =
4273 event_loop_factory.GetNodeEventLoopFactory("pi2");
4274 const size_t pi2_index = configuration::GetNodeIndex(
4275 event_loop_factory.configuration(), pi2->node());
4276 std::vector<std::string> filenames;
4277
4278 {
4279 CHECK_EQ(pi1_index, 0u);
4280 CHECK_EQ(pi2_index, 1u);
4281
4282 time_converter.AddNextTimestamp(
4283 distributed_clock::epoch(),
4284 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4285
4286 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
4287 time_converter.AddNextTimestamp(
4288 distributed_clock::epoch() + reboot_time,
4289 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
4290 BootTimestamp::epoch() + reboot_time});
4291 }
4292
4293 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004294 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004295
4296 pi1->AlwaysStart<Ping>("ping");
4297
4298 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
4299 // makes it such that we will only get timestamps from pi1 -> pi2 on the
4300 // second boot.
4301 {
4302 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004303 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4304 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004305
4306 event_loop_factory.RunFor(chrono::milliseconds(95));
4307
4308 pi2_logger.StartLogger(kLogfile2_1);
4309
4310 event_loop_factory.RunFor(chrono::milliseconds(4000));
4311
4312 pi2->Disconnect(pi1->node());
4313
4314 event_loop_factory.RunFor(chrono::milliseconds(1000));
4315 pi1->AlwaysStart<Ping>("ping");
4316
4317 event_loop_factory.RunFor(chrono::milliseconds(5000));
4318 pi2_logger.AppendAllFilenames(&filenames);
4319 }
4320
4321 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004322 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004323 ConfirmReadable(filenames);
4324}
4325
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004326// Tests that we properly handle only one direction ever existing after a
4327// reboot with only reliable data.
Naman Guptaa63aa132023-03-22 20:06:34 -07004328TEST(MissingDirectionTest, OneDirectionAfterRebootReliable) {
Austin Schuh7e417682023-08-11 17:05:30 -07004329 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4330 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4331
Naman Guptaa63aa132023-03-22 20:06:34 -07004332 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004333 aos::configuration::ReadConfig(
4334 ArtifactPath("aos/events/logging/"
4335 "multinode_pingpong_split4_reliable_config.json"));
Naman Guptaa63aa132023-03-22 20:06:34 -07004336 message_bridge::TestingTimeConverter time_converter(
4337 configuration::NodesCount(&config.message()));
4338 SimulatedEventLoopFactory event_loop_factory(&config.message());
4339 event_loop_factory.SetTimeConverter(&time_converter);
4340
4341 NodeEventLoopFactory *const pi1 =
4342 event_loop_factory.GetNodeEventLoopFactory("pi1");
4343 const size_t pi1_index = configuration::GetNodeIndex(
4344 event_loop_factory.configuration(), pi1->node());
4345 NodeEventLoopFactory *const pi2 =
4346 event_loop_factory.GetNodeEventLoopFactory("pi2");
4347 const size_t pi2_index = configuration::GetNodeIndex(
4348 event_loop_factory.configuration(), pi2->node());
4349 std::vector<std::string> filenames;
4350
4351 {
4352 CHECK_EQ(pi1_index, 0u);
4353 CHECK_EQ(pi2_index, 1u);
4354
4355 time_converter.AddNextTimestamp(
4356 distributed_clock::epoch(),
4357 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4358
4359 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
4360 time_converter.AddNextTimestamp(
4361 distributed_clock::epoch() + reboot_time,
4362 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
4363 BootTimestamp::epoch() + reboot_time});
4364 }
4365
4366 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004367 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004368
4369 pi1->AlwaysStart<Ping>("ping");
4370
4371 // Pi1 sends to pi2. Reboot pi1, but don't let pi2 connect to pi1. This
4372 // makes it such that we will only get timestamps from pi1 -> pi2 on the
4373 // second boot.
4374 {
4375 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004376 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4377 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004378
4379 event_loop_factory.RunFor(chrono::milliseconds(95));
4380
4381 pi2_logger.StartLogger(kLogfile2_1);
4382
4383 event_loop_factory.RunFor(chrono::milliseconds(4000));
4384
4385 pi2->Disconnect(pi1->node());
4386
4387 event_loop_factory.RunFor(chrono::milliseconds(1000));
4388 pi1->AlwaysStart<Ping>("ping");
4389
4390 event_loop_factory.RunFor(chrono::milliseconds(5000));
4391 pi2_logger.AppendAllFilenames(&filenames);
4392 }
4393
4394 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004395 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004396 ConfirmReadable(filenames);
4397}
4398
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004399// Tests that we properly handle only one direction ever existing after a
4400// reboot with mixed unreliable vs reliable, where reliable has an earlier
4401// timestamp than unreliable.
Brian Smartte67d7112023-03-20 12:06:30 -07004402TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase1) {
Austin Schuh7e417682023-08-11 17:05:30 -07004403 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4404 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4405
Brian Smartte67d7112023-03-20 12:06:30 -07004406 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4407 aos::configuration::ReadConfig(ArtifactPath(
4408 "aos/events/logging/multinode_pingpong_split4_mixed1_config.json"));
4409 message_bridge::TestingTimeConverter time_converter(
4410 configuration::NodesCount(&config.message()));
4411 SimulatedEventLoopFactory event_loop_factory(&config.message());
4412 event_loop_factory.SetTimeConverter(&time_converter);
4413
4414 NodeEventLoopFactory *const pi1 =
4415 event_loop_factory.GetNodeEventLoopFactory("pi1");
4416 const size_t pi1_index = configuration::GetNodeIndex(
4417 event_loop_factory.configuration(), pi1->node());
4418 NodeEventLoopFactory *const pi2 =
4419 event_loop_factory.GetNodeEventLoopFactory("pi2");
4420 const size_t pi2_index = configuration::GetNodeIndex(
4421 event_loop_factory.configuration(), pi2->node());
4422 std::vector<std::string> filenames;
4423
4424 {
4425 CHECK_EQ(pi1_index, 0u);
4426 CHECK_EQ(pi2_index, 1u);
4427
4428 time_converter.AddNextTimestamp(
4429 distributed_clock::epoch(),
4430 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4431
4432 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
4433 time_converter.AddNextTimestamp(
4434 distributed_clock::epoch() + reboot_time,
4435 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
4436 BootTimestamp::epoch() + reboot_time});
4437 }
4438
4439 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004440 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Brian Smartte67d7112023-03-20 12:06:30 -07004441
4442 // The following sequence using the above reference config creates
4443 // a reliable message timestamp < unreliable message timestamp.
4444 {
4445 pi1->DisableStatistics();
4446 pi2->DisableStatistics();
4447
4448 event_loop_factory.RunFor(chrono::milliseconds(95));
4449
4450 pi1->AlwaysStart<Ping>("ping");
4451
4452 event_loop_factory.RunFor(chrono::milliseconds(5250));
4453
4454 pi1->EnableStatistics();
4455
4456 event_loop_factory.RunFor(chrono::milliseconds(1000));
4457
4458 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004459 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4460 FileStrategy::kKeepSeparate);
Brian Smartte67d7112023-03-20 12:06:30 -07004461
4462 pi2_logger.StartLogger(kLogfile2_1);
4463
4464 event_loop_factory.RunFor(chrono::milliseconds(5000));
4465 pi2_logger.AppendAllFilenames(&filenames);
4466 }
4467
4468 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004469 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Brian Smartte67d7112023-03-20 12:06:30 -07004470 ConfirmReadable(filenames);
4471}
4472
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004473// Tests that we properly handle only one direction ever existing after a
4474// reboot with mixed unreliable vs reliable, where unreliable has an earlier
4475// timestamp than reliable.
Brian Smartte67d7112023-03-20 12:06:30 -07004476TEST(MissingDirectionTest, OneDirectionAfterRebootMixedCase2) {
Austin Schuh7e417682023-08-11 17:05:30 -07004477 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4478 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4479
Brian Smartte67d7112023-03-20 12:06:30 -07004480 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4481 aos::configuration::ReadConfig(ArtifactPath(
4482 "aos/events/logging/multinode_pingpong_split4_mixed2_config.json"));
4483 message_bridge::TestingTimeConverter time_converter(
4484 configuration::NodesCount(&config.message()));
4485 SimulatedEventLoopFactory event_loop_factory(&config.message());
4486 event_loop_factory.SetTimeConverter(&time_converter);
4487
4488 NodeEventLoopFactory *const pi1 =
4489 event_loop_factory.GetNodeEventLoopFactory("pi1");
4490 const size_t pi1_index = configuration::GetNodeIndex(
4491 event_loop_factory.configuration(), pi1->node());
4492 NodeEventLoopFactory *const pi2 =
4493 event_loop_factory.GetNodeEventLoopFactory("pi2");
4494 const size_t pi2_index = configuration::GetNodeIndex(
4495 event_loop_factory.configuration(), pi2->node());
4496 std::vector<std::string> filenames;
4497
4498 {
4499 CHECK_EQ(pi1_index, 0u);
4500 CHECK_EQ(pi2_index, 1u);
4501
4502 time_converter.AddNextTimestamp(
4503 distributed_clock::epoch(),
4504 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4505
4506 const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
4507 time_converter.AddNextTimestamp(
4508 distributed_clock::epoch() + reboot_time,
4509 {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
4510 BootTimestamp::epoch() + reboot_time});
4511 }
4512
4513 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004514 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Brian Smartte67d7112023-03-20 12:06:30 -07004515
4516 // The following sequence using the above reference config creates
4517 // an unreliable message timestamp < reliable message timestamp.
4518 {
4519 pi1->DisableStatistics();
4520 pi2->DisableStatistics();
4521
4522 event_loop_factory.RunFor(chrono::milliseconds(95));
4523
4524 pi1->AlwaysStart<Ping>("ping");
4525
4526 event_loop_factory.RunFor(chrono::milliseconds(5250));
4527
4528 pi1->EnableStatistics();
4529
4530 event_loop_factory.RunFor(chrono::milliseconds(1000));
4531
4532 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004533 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4534 FileStrategy::kKeepSeparate);
Brian Smartte67d7112023-03-20 12:06:30 -07004535
4536 pi2_logger.StartLogger(kLogfile2_1);
4537
4538 event_loop_factory.RunFor(chrono::milliseconds(5000));
4539 pi2_logger.AppendAllFilenames(&filenames);
4540 }
4541
4542 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004543 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Brian Smartte67d7112023-03-20 12:06:30 -07004544 ConfirmReadable(filenames);
4545}
4546
Naman Guptaa63aa132023-03-22 20:06:34 -07004547// Tests that we properly handle what used to be a time violation in one
4548// direction. This can occur when one direction goes down after sending some
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004549// data, but the other keeps working. The down direction ends up resolving to
4550// a straight line in the noncausal filter, where the direction which is still
4551// up can cross that line. Really, time progressed along just fine but we
4552// assumed that the offset was a line when it could have deviated by up to
4553// 1ms/second.
Naman Guptaa63aa132023-03-22 20:06:34 -07004554TEST_P(MultinodeLoggerTest, OneDirectionTimeDrift) {
4555 std::vector<std::string> filenames;
4556
4557 CHECK_EQ(pi1_index_, 0u);
4558 CHECK_EQ(pi2_index_, 1u);
4559
4560 time_converter_.AddNextTimestamp(
4561 distributed_clock::epoch(),
4562 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4563
4564 const chrono::nanoseconds before_disconnect_duration =
4565 time_converter_.AddMonotonic(
4566 {chrono::milliseconds(1000), chrono::milliseconds(1000)});
4567
4568 const chrono::nanoseconds test_duration =
4569 time_converter_.AddMonotonic(
4570 {chrono::milliseconds(1000), chrono::milliseconds(1000)}) +
4571 time_converter_.AddMonotonic(
4572 {chrono::milliseconds(10000),
4573 chrono::milliseconds(10000) - chrono::milliseconds(5)}) +
4574 time_converter_.AddMonotonic(
4575 {chrono::milliseconds(10000),
4576 chrono::milliseconds(10000) + chrono::milliseconds(5)});
4577
4578 const std::string kLogfile =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004579 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004580
4581 {
4582 LoggerState pi2_logger = MakeLogger(pi2_);
4583 pi2_logger.StartLogger(kLogfile);
4584 event_loop_factory_.RunFor(before_disconnect_duration);
4585
4586 pi2_->Disconnect(pi1_->node());
4587
4588 event_loop_factory_.RunFor(test_duration);
4589 pi2_->Connect(pi1_->node());
4590
4591 event_loop_factory_.RunFor(chrono::milliseconds(5000));
4592 pi2_logger.AppendAllFilenames(&filenames);
4593 }
4594
4595 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004596 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004597 ConfirmReadable(filenames);
4598}
4599
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004600// Tests that we can replay a logfile that has timestamps such that at least
4601// one node's epoch is at a positive distributed_clock (and thus will have to
4602// be booted after the other node(s)).
Naman Guptaa63aa132023-03-22 20:06:34 -07004603TEST_P(MultinodeLoggerTest, StartOneNodeBeforeOther) {
4604 std::vector<std::string> filenames;
4605
4606 CHECK_EQ(pi1_index_, 0u);
4607 CHECK_EQ(pi2_index_, 1u);
4608
4609 time_converter_.AddNextTimestamp(
4610 distributed_clock::epoch(),
4611 {BootTimestamp::epoch(), BootTimestamp::epoch()});
4612
4613 const chrono::nanoseconds before_reboot_duration = chrono::milliseconds(1000);
4614 time_converter_.RebootAt(
4615 0, distributed_clock::time_point(before_reboot_duration));
4616
4617 const chrono::nanoseconds test_duration = time_converter_.AddMonotonic(
4618 {chrono::milliseconds(10000), chrono::milliseconds(10000)});
4619
4620 const std::string kLogfile =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004621 aos::testing::TestTmpDir() + "/logs/multi_logfile2.1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004622
4623 pi2_->Disconnect(pi1_->node());
4624 pi1_->Disconnect(pi2_->node());
4625
4626 {
4627 LoggerState pi2_logger = MakeLogger(pi2_);
4628
4629 pi2_logger.StartLogger(kLogfile);
4630 event_loop_factory_.RunFor(before_reboot_duration);
4631
4632 pi2_->Connect(pi1_->node());
4633 pi1_->Connect(pi2_->node());
4634
4635 event_loop_factory_.RunFor(test_duration);
4636
4637 pi2_logger.AppendAllFilenames(&filenames);
4638 }
4639
4640 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004641 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004642 ConfirmReadable(filenames);
4643
4644 {
4645 LogReader reader(sorted_parts);
4646 SimulatedEventLoopFactory replay_factory(reader.configuration());
4647 reader.RegisterWithoutStarting(&replay_factory);
4648
4649 NodeEventLoopFactory *const replay_node =
4650 reader.event_loop_factory()->GetNodeEventLoopFactory("pi1");
4651
4652 std::unique_ptr<EventLoop> test_event_loop =
4653 replay_node->MakeEventLoop("test_reader");
4654 replay_node->OnStartup([replay_node]() {
4655 // Check that we didn't boot until at least t=0.
4656 CHECK_LE(monotonic_clock::epoch(), replay_node->monotonic_now());
4657 });
4658 test_event_loop->OnRun([&test_event_loop]() {
4659 // Check that we didn't boot until at least t=0.
4660 EXPECT_LE(monotonic_clock::epoch(), test_event_loop->monotonic_now());
4661 });
4662 reader.event_loop_factory()->Run();
4663 reader.Deregister();
4664 }
4665}
4666
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004667// Tests that when we have a loop without all the logs at all points in time,
4668// we can sort it properly.
Naman Guptaa63aa132023-03-22 20:06:34 -07004669TEST(MultinodeLoggerLoopTest, Loop) {
Austin Schuh7e417682023-08-11 17:05:30 -07004670 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4671 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4672
Naman Guptaa63aa132023-03-22 20:06:34 -07004673 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004674 aos::configuration::ReadConfig(
4675 ArtifactPath("aos/events/logging/"
4676 "multinode_pingpong_triangle_split_config.json"));
Naman Guptaa63aa132023-03-22 20:06:34 -07004677 message_bridge::TestingTimeConverter time_converter(
4678 configuration::NodesCount(&config.message()));
4679 SimulatedEventLoopFactory event_loop_factory(&config.message());
4680 event_loop_factory.SetTimeConverter(&time_converter);
4681
4682 NodeEventLoopFactory *const pi1 =
4683 event_loop_factory.GetNodeEventLoopFactory("pi1");
4684 NodeEventLoopFactory *const pi2 =
4685 event_loop_factory.GetNodeEventLoopFactory("pi2");
4686 NodeEventLoopFactory *const pi3 =
4687 event_loop_factory.GetNodeEventLoopFactory("pi3");
4688
4689 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004690 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004691 const std::string kLogfile2_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004692 aos::testing::TestTmpDir() + "/logs/multi_logfile2/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004693 const std::string kLogfile3_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004694 aos::testing::TestTmpDir() + "/logs/multi_logfile3/";
Naman Guptaa63aa132023-03-22 20:06:34 -07004695
4696 {
4697 // Make pi1 boot before everything else.
4698 time_converter.AddNextTimestamp(
4699 distributed_clock::epoch(),
4700 {BootTimestamp::epoch(),
4701 BootTimestamp::epoch() - chrono::milliseconds(100),
4702 BootTimestamp::epoch() - chrono::milliseconds(300)});
4703 }
4704
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004705 // We want to setup a situation such that 2 of the 3 legs of the loop are
4706 // very confident about time being X, and the third leg is pulling the
4707 // average off to one side.
Naman Guptaa63aa132023-03-22 20:06:34 -07004708 //
4709 // It's easiest to visualize this in timestamp_plotter.
4710
4711 std::vector<std::string> filenames;
4712 {
4713 // Have pi1 send out a reliable message at startup. This sets up a long
4714 // forwarding time message at the start to bias time.
4715 std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
4716 {
4717 aos::Sender<examples::Ping> ping_sender =
4718 pi1_event_loop->MakeSender<examples::Ping>("/reliable");
4719
4720 aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
4721 examples::Ping::Builder ping_builder =
4722 builder.MakeBuilder<examples::Ping>();
4723 CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
4724 }
4725
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004726 // Wait a while so there's enough data to let the worst case be rather
4727 // off.
Naman Guptaa63aa132023-03-22 20:06:34 -07004728 event_loop_factory.RunFor(chrono::seconds(1000));
4729
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004730 // Now start a receiving node first. This sets up 2 tight bounds between
4731 // 2 of the nodes.
Naman Guptaa63aa132023-03-22 20:06:34 -07004732 LoggerState pi2_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004733 pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4734 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004735 pi2_logger.StartLogger(kLogfile2_1);
4736
4737 event_loop_factory.RunFor(chrono::seconds(100));
4738
4739 // And now start the third leg.
4740 LoggerState pi3_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004741 pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4742 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004743 pi3_logger.StartLogger(kLogfile3_1);
4744
4745 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004746 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4747 FileStrategy::kKeepSeparate);
Naman Guptaa63aa132023-03-22 20:06:34 -07004748 pi1_logger.StartLogger(kLogfile1_1);
4749
4750 event_loop_factory.RunFor(chrono::seconds(100));
4751
4752 pi1_logger.AppendAllFilenames(&filenames);
4753 pi2_logger.AppendAllFilenames(&filenames);
4754 pi3_logger.AppendAllFilenames(&filenames);
4755 }
4756
4757 // Make sure we can read this.
4758 const std::vector<LogFile> sorted_parts = SortParts(filenames);
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07004759 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
Naman Guptaa63aa132023-03-22 20:06:34 -07004760 auto result = ConfirmReadable(filenames);
4761}
4762
Austin Schuh08dba8f2023-05-01 08:29:30 -07004763// Tests that RestartLogging works in the simple case. Unfortunately, the
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004764// failure cases involve simulating time elapsing in callbacks, which is
4765// really hard. The best we can reasonably do is make sure 2 back to back
4766// logs are parseable together.
Austin Schuh08dba8f2023-05-01 08:29:30 -07004767TEST_P(MultinodeLoggerTest, RestartLogging) {
4768 time_converter_.AddMonotonic(
4769 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
4770 std::vector<std::string> filenames;
4771 {
4772 LoggerState pi1_logger = MakeLogger(pi1_);
4773
4774 event_loop_factory_.RunFor(chrono::milliseconds(95));
4775
4776 StartLogger(&pi1_logger, logfile_base1_);
4777 aos::monotonic_clock::time_point last_rotation_time =
4778 pi1_logger.event_loop->monotonic_now();
Austin Schuh2f864452023-07-17 14:53:08 -07004779 pi1_logger.logger->set_on_logged_period(
4780 [&](aos::monotonic_clock::time_point) {
4781 const auto now = pi1_logger.event_loop->monotonic_now();
4782 if (now > last_rotation_time + std::chrono::seconds(5)) {
4783 pi1_logger.AppendAllFilenames(&filenames);
4784 std::unique_ptr<MultiNodeFilesLogNamer> namer =
4785 pi1_logger.MakeLogNamer(logfile_base2_);
4786 pi1_logger.log_namer = namer.get();
Austin Schuh08dba8f2023-05-01 08:29:30 -07004787
Austin Schuh2f864452023-07-17 14:53:08 -07004788 pi1_logger.logger->RestartLogging(std::move(namer));
4789 last_rotation_time = now;
4790 }
4791 });
Austin Schuh08dba8f2023-05-01 08:29:30 -07004792
4793 event_loop_factory_.RunFor(chrono::milliseconds(7000));
4794
4795 pi1_logger.AppendAllFilenames(&filenames);
4796 }
4797
4798 for (const auto &x : filenames) {
4799 LOG(INFO) << x;
4800 }
4801
4802 EXPECT_GE(filenames.size(), 2u);
4803
4804 ConfirmReadable(filenames);
4805
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -07004806 // TODO(austin): It would be good to confirm that any one time messages end
4807 // up in both logs correctly.
Austin Schuh08dba8f2023-05-01 08:29:30 -07004808}
4809
Austin Schuh6e93fc22023-08-22 21:27:22 -07004810// Tests that we call OnEnd without --skip_missing_forwarding_entries.
4811TEST_P(MultinodeLoggerTest, SkipMissingForwardingEntries) {
4812 if (file_strategy() == FileStrategy::kCombine) {
4813 GTEST_SKIP() << "We don't need to test the combined file writer this deep.";
4814 }
4815 time_converter_.AddMonotonic(
4816 {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
4817
4818 std::vector<std::string> filenames;
4819 {
4820 LoggerState pi1_logger = MakeLogger(pi1_);
4821
4822 event_loop_factory_.RunFor(chrono::milliseconds(95));
4823
4824 StartLogger(&pi1_logger);
4825 aos::monotonic_clock::time_point last_rotation_time =
4826 pi1_logger.event_loop->monotonic_now();
4827 pi1_logger.logger->set_on_logged_period(
4828 [&](aos::monotonic_clock::time_point) {
4829 const auto now = pi1_logger.event_loop->monotonic_now();
4830 if (now > last_rotation_time + std::chrono::seconds(5)) {
4831 pi1_logger.logger->Rotate();
4832 last_rotation_time = now;
4833 }
4834 });
4835
4836 event_loop_factory_.RunFor(chrono::milliseconds(15000));
4837 pi1_logger.AppendAllFilenames(&filenames);
4838 }
4839
4840 // If we remove the last remote data part, we'll trigger missing data for
4841 // timestamps.
4842 filenames.erase(std::remove_if(filenames.begin(), filenames.end(),
4843 [](const std::string &s) {
4844 return s.find("data/pi2_data.part3.bfbs") !=
4845 std::string::npos;
4846 }),
4847 filenames.end());
4848
4849 auto result = ConfirmReadable(filenames);
4850}
4851
Austin Schuh54ffea42023-08-23 13:27:04 -07004852// Tests that we call OnEnd without --skip_missing_forwarding_entries.
4853TEST(MultinodeLoggerConfigTest, SingleNode) {
4854 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4855 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4856
4857 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4858 aos::configuration::ReadConfig(
4859 ArtifactPath("aos/events/logging/multinode_single_node_config.json"));
4860 message_bridge::TestingTimeConverter time_converter(
4861 configuration::NodesCount(&config.message()));
4862 SimulatedEventLoopFactory event_loop_factory(&config.message());
4863 event_loop_factory.SetTimeConverter(&time_converter);
4864
4865 time_converter.StartEqual();
4866
4867 const std::string kLogfile1_1 =
4868 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
4869
4870 NodeEventLoopFactory *const pi1 =
4871 event_loop_factory.GetNodeEventLoopFactory("pi1");
4872
4873 std::vector<std::string> filenames;
4874
4875 {
4876 // Now start a receiving node first. This sets up 2 tight bounds between
4877 // 2 of the nodes.
4878 LoggerState pi1_logger = MakeLoggerState(
4879 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4880 FileStrategy::kKeepSeparate);
4881 pi1_logger.StartLogger(kLogfile1_1);
4882
4883 event_loop_factory.RunFor(chrono::seconds(10));
4884
4885 pi1_logger.AppendAllFilenames(&filenames);
4886 }
4887
4888 // Make sure we can read this.
4889 const std::vector<LogFile> sorted_parts = SortParts(filenames);
4890 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
4891 auto result = ConfirmReadable(filenames);
4892
4893 // TODO(austin): Probably want to stop caring about ServerStatistics,
4894 // ClientStatistics, and Timestamp since they don't really make sense.
4895}
4896
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004897// Tests that when we have evidence of 2 boots, and then start logging, the
4898// max_out_of_order_duration ends up reasonable on the boot with the start time.
4899TEST(MultinodeLoggerLoopTest, PreviousBootData) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07004900 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
4901 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
4902
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004903 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
4904 aos::configuration::ReadConfig(ArtifactPath(
4905 "aos/events/logging/multinode_pingpong_reboot_ooo_config.json"));
4906 message_bridge::TestingTimeConverter time_converter(
4907 configuration::NodesCount(&config.message()));
4908 SimulatedEventLoopFactory event_loop_factory(&config.message());
4909 event_loop_factory.SetTimeConverter(&time_converter);
4910
4911 const UUID pi1_boot0 = UUID::Random();
4912 const UUID pi2_boot0 = UUID::Random();
4913 const UUID pi2_boot1 = UUID::Random();
4914
4915 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07004916 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004917
4918 {
4919 constexpr size_t kPi1Index = 0;
4920 constexpr size_t kPi2Index = 1;
4921 time_converter.set_boot_uuid(kPi1Index, 0, pi1_boot0);
4922 time_converter.set_boot_uuid(kPi2Index, 0, pi2_boot0);
4923 time_converter.set_boot_uuid(kPi2Index, 1, pi2_boot1);
4924
4925 // Make pi1 boot before everything else.
4926 time_converter.AddNextTimestamp(
4927 distributed_clock::epoch(),
4928 {BootTimestamp::epoch(),
4929 BootTimestamp::epoch() - chrono::milliseconds(100)});
4930
4931 const chrono::nanoseconds reboot_time = chrono::seconds(1005);
4932 time_converter.AddNextTimestamp(
4933 distributed_clock::epoch() + reboot_time,
4934 {BootTimestamp::epoch() + reboot_time,
4935 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()}});
4936 }
4937
4938 NodeEventLoopFactory *const pi1 =
4939 event_loop_factory.GetNodeEventLoopFactory("pi1");
4940 NodeEventLoopFactory *const pi2 =
4941 event_loop_factory.GetNodeEventLoopFactory("pi2");
4942
4943 // What we want is for pi2 to send a message at t=1000 on the first channel
4944 // (/atest1 pong), and t=1 on the second channel (/atest3 pong). That'll make
4945 // the max out of order duration be large.
4946 //
4947 // Then, we reboot, and only send messages on a third channel (/atest2 pong).
4948 // The order is key, they need to sort in this order in the config.
4949
4950 std::vector<std::string> filenames;
4951 {
4952 {
4953 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4954 aos::Sender<examples::Pong> pong_sender =
4955 pi2_event_loop->MakeSender<examples::Pong>("/atest3");
4956
4957 pi2_event_loop->OnRun([&]() {
4958 aos::Sender<examples::Pong>::Builder builder =
4959 pong_sender.MakeBuilder();
4960 examples::Pong::Builder pong_builder =
4961 builder.MakeBuilder<examples::Pong>();
4962 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4963 });
4964
4965 event_loop_factory.RunFor(chrono::seconds(1000));
4966 }
4967
4968 {
4969 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4970 aos::Sender<examples::Pong> pong_sender =
4971 pi2_event_loop->MakeSender<examples::Pong>("/atest1");
4972
4973 aos::Sender<examples::Pong>::Builder builder = pong_sender.MakeBuilder();
4974 examples::Pong::Builder pong_builder =
4975 builder.MakeBuilder<examples::Pong>();
4976 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4977 }
4978
4979 event_loop_factory.RunFor(chrono::seconds(10));
4980
4981 // Now start a receiving node first. This sets up 2 tight bounds between
4982 // 2 of the nodes.
4983 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07004984 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
4985 FileStrategy::kKeepSeparate);
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07004986 pi1_logger.StartLogger(kLogfile1_1);
4987
4988 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
4989 aos::Sender<examples::Pong> pong_sender =
4990 pi2_event_loop->MakeSender<examples::Pong>("/atest2");
4991
4992 pi2_event_loop->AddPhasedLoop(
4993 [&pong_sender](int) {
4994 aos::Sender<examples::Pong>::Builder builder =
4995 pong_sender.MakeBuilder();
4996 examples::Pong::Builder pong_builder =
4997 builder.MakeBuilder<examples::Pong>();
4998 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
4999 },
5000 chrono::milliseconds(10));
5001
5002 event_loop_factory.RunFor(chrono::seconds(100));
5003
5004 pi1_logger.AppendAllFilenames(&filenames);
5005 }
5006
5007 // Make sure we can read this.
5008 const std::vector<LogFile> sorted_parts = SortParts(filenames);
5009 EXPECT_TRUE(AllRebootPartsMatchOutOfOrderDuration(sorted_parts, "pi2"));
5010 auto result = ConfirmReadable(filenames);
5011}
5012
5013// Tests that when we start without a connection, and then start logging, the
5014// max_out_of_order_duration ends up reasonable.
5015TEST(MultinodeLoggerLoopTest, StartDisconnected) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07005016 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
5017 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
5018
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07005019 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
5020 aos::configuration::ReadConfig(ArtifactPath(
5021 "aos/events/logging/multinode_pingpong_reboot_ooo_config.json"));
5022 message_bridge::TestingTimeConverter time_converter(
5023 configuration::NodesCount(&config.message()));
5024 SimulatedEventLoopFactory event_loop_factory(&config.message());
5025 event_loop_factory.SetTimeConverter(&time_converter);
5026
5027 time_converter.StartEqual();
5028
5029 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07005030 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07005031
5032 NodeEventLoopFactory *const pi1 =
5033 event_loop_factory.GetNodeEventLoopFactory("pi1");
5034 NodeEventLoopFactory *const pi2 =
5035 event_loop_factory.GetNodeEventLoopFactory("pi2");
5036
5037 // What we want is for pi2 to send a message at t=1000 on the first channel
5038 // (/atest1 pong), and t=1 on the second channel (/atest3 pong). That'll make
5039 // the max out of order duration be large.
5040 //
5041 // Then, we disconnect, and only send messages on a third channel
5042 // (/atest2 pong). The order is key, they need to sort in this order in the
5043 // config so we observe them in the order which grows the
5044 // max_out_of_order_duration.
5045
5046 std::vector<std::string> filenames;
5047 {
5048 {
5049 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
5050 aos::Sender<examples::Pong> pong_sender =
5051 pi2_event_loop->MakeSender<examples::Pong>("/atest3");
5052
5053 pi2_event_loop->OnRun([&]() {
5054 aos::Sender<examples::Pong>::Builder builder =
5055 pong_sender.MakeBuilder();
5056 examples::Pong::Builder pong_builder =
5057 builder.MakeBuilder<examples::Pong>();
5058 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
5059 });
5060
5061 event_loop_factory.RunFor(chrono::seconds(1000));
5062 }
5063
5064 {
5065 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
5066 aos::Sender<examples::Pong> pong_sender =
5067 pi2_event_loop->MakeSender<examples::Pong>("/atest1");
5068
5069 aos::Sender<examples::Pong>::Builder builder = pong_sender.MakeBuilder();
5070 examples::Pong::Builder pong_builder =
5071 builder.MakeBuilder<examples::Pong>();
5072 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
5073 }
5074
5075 event_loop_factory.RunFor(chrono::seconds(10));
5076
5077 pi1->Disconnect(pi2->node());
5078 pi2->Disconnect(pi1->node());
5079
5080 // Make data flow.
5081 std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
5082 aos::Sender<examples::Pong> pong_sender =
5083 pi2_event_loop->MakeSender<examples::Pong>("/atest2");
5084
5085 pi2_event_loop->AddPhasedLoop(
5086 [&pong_sender](int) {
5087 aos::Sender<examples::Pong>::Builder builder =
5088 pong_sender.MakeBuilder();
5089 examples::Pong::Builder pong_builder =
5090 builder.MakeBuilder<examples::Pong>();
5091 CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
5092 },
5093 chrono::milliseconds(10));
5094
5095 event_loop_factory.RunFor(chrono::seconds(10));
5096
5097 // Now start a receiving node first. This sets up 2 tight bounds between
5098 // 2 of the nodes.
5099 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07005100 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
5101 FileStrategy::kKeepSeparate);
Mithun Bharadwajdec32d92023-08-02 16:10:41 -07005102 pi1_logger.StartLogger(kLogfile1_1);
5103
5104 event_loop_factory.RunFor(chrono::seconds(10));
5105
5106 // Now, reconnect, and everything should recover.
5107 pi1->Connect(pi2->node());
5108 pi2->Connect(pi1->node());
5109
5110 event_loop_factory.RunFor(chrono::seconds(10));
5111
5112 pi1_logger.AppendAllFilenames(&filenames);
5113 }
5114
5115 // Make sure we can read this.
5116 const std::vector<LogFile> sorted_parts = SortParts(filenames);
5117 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
5118 auto result = ConfirmReadable(filenames);
5119}
5120
Austin Schuh633858f2024-03-22 14:34:19 -07005121// Tests that only having a delayed, reliable message from a boot results in a
5122// readable log.
5123//
5124// Note: this is disabled since it doesn't work yet. Un-disable this when the
5125// code is fixed!
Austin Schuhb5224ec2024-03-27 15:20:09 -07005126TEST(MultinodeLoggerLoopTest, ReliableOnlyTimestamps) {
Austin Schuh633858f2024-03-22 14:34:19 -07005127 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
5128 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
5129
5130 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
5131 aos::configuration::ReadConfig(
5132 ArtifactPath("aos/events/logging/"
5133 "multinode_pingpong_reboot_reliable_only_config.json"));
5134 message_bridge::TestingTimeConverter time_converter(
5135 configuration::NodesCount(&config.message()));
5136 SimulatedEventLoopFactory event_loop_factory(&config.message());
5137 event_loop_factory.SetTimeConverter(&time_converter);
5138
5139 constexpr chrono::nanoseconds kRebootTime = chrono::seconds(100);
5140 {
5141 time_converter.AddNextTimestamp(
5142 distributed_clock::epoch(),
5143 {BootTimestamp::epoch(), BootTimestamp::epoch()});
5144 time_converter.AddNextTimestamp(
5145 distributed_clock::epoch() + kRebootTime,
5146 {BootTimestamp::epoch() + kRebootTime,
5147 BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()}});
Austin Schuh1124c512023-08-01 15:20:44 -07005148 }
5149
Austin Schuh633858f2024-03-22 14:34:19 -07005150 const std::string kLogfile1_1 =
5151 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
5152
5153 NodeEventLoopFactory *const pi1 =
5154 event_loop_factory.GetNodeEventLoopFactory("pi1");
5155
5156 // We want unreliable timestamps from one boot, a reliable timestamp from the
5157 // same boot, and then a long delayed reliable timestamp from the second boot.
5158 // This produces conflicting information about when the second boot happened.
5159 std::vector<std::string> filenames;
5160 PingSender *app1 = pi1->AlwaysStart<PingSender>("pingsender", "/atest1");
5161 PingSender *app2 = pi1->AlwaysStart<PingSender>("pingsender", "/atest2");
5162 event_loop_factory.RunFor(chrono::seconds(1));
5163 pi1->Stop(app2);
5164 event_loop_factory.RunFor(kRebootTime - chrono::seconds(2));
5165 pi1->Stop(app1);
5166
5167 event_loop_factory.RunFor(chrono::seconds(1) + kRebootTime * 2);
5168
5169 {
5170 // Collect a small log after reboot.
5171 LoggerState pi1_logger = MakeLoggerState(
5172 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
5173 FileStrategy::kKeepSeparate);
5174 pi1_logger.StartLogger(kLogfile1_1);
5175
5176 event_loop_factory.RunFor(chrono::seconds(1));
5177
5178 pi1_logger.AppendAllFilenames(&filenames);
5179 }
5180
5181 // Make sure we can read this.
5182 const std::vector<LogFile> sorted_parts = SortParts(filenames);
5183 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
5184 auto result = ConfirmReadable(filenames);
5185}
Austin Schuh1124c512023-08-01 15:20:44 -07005186
5187// Tests that we log correctly as nodes connect slowly.
5188TEST(MultinodeLoggerLoopTest, StaggeredConnect) {
Austin Schuh8fb4b452023-08-04 17:02:27 -07005189 util::UnlinkRecursive(aos::testing::TestTmpDir() + "/logs");
5190 std::filesystem::create_directory(aos::testing::TestTmpDir() + "/logs");
5191
Austin Schuh1124c512023-08-01 15:20:44 -07005192 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
5193 aos::configuration::ReadConfig(ArtifactPath(
5194 "aos/events/logging/multinode_pingpong_pi3_pingpong_config.json"));
5195 message_bridge::TestingTimeConverter time_converter(
5196 configuration::NodesCount(&config.message()));
5197 SimulatedEventLoopFactory event_loop_factory(&config.message());
5198 event_loop_factory.SetTimeConverter(&time_converter);
5199
5200 time_converter.StartEqual();
5201
5202 const std::string kLogfile1_1 =
Austin Schuh8fb4b452023-08-04 17:02:27 -07005203 aos::testing::TestTmpDir() + "/logs/multi_logfile1/";
Austin Schuh1124c512023-08-01 15:20:44 -07005204
5205 NodeEventLoopFactory *const pi1 =
5206 event_loop_factory.GetNodeEventLoopFactory("pi1");
5207 NodeEventLoopFactory *const pi2 =
5208 event_loop_factory.GetNodeEventLoopFactory("pi2");
5209 NodeEventLoopFactory *const pi3 =
5210 event_loop_factory.GetNodeEventLoopFactory("pi3");
5211
5212 // What we want is for pi2 to send a message at t=1000 on the first channel
5213 // (/atest1 pong), and t=1 on the second channel (/atest3 pong). That'll make
5214 // the max out of order duration be large.
5215 //
5216 // Then, we disconnect, and only send messages on a third channel
5217 // (/atest2 pong). The order is key, they need to sort in this order in the
5218 // config so we observe them in the order which grows the
5219 // max_out_of_order_duration.
5220
5221 pi1->Disconnect(pi2->node());
5222 pi2->Disconnect(pi1->node());
5223
5224 pi1->Disconnect(pi3->node());
5225 pi3->Disconnect(pi1->node());
5226
5227 std::vector<std::string> filenames;
5228 pi2->AlwaysStart<PongSender>("pongsender", "/test2");
5229 pi3->AlwaysStart<PongSender>("pongsender", "/test3");
5230
5231 event_loop_factory.RunFor(chrono::seconds(10));
5232
5233 {
5234 // Now start a receiving node first. This sets up 2 tight bounds between
5235 // 2 of the nodes.
5236 LoggerState pi1_logger = MakeLoggerState(
Austin Schuh6ecfe902023-08-04 22:44:37 -07005237 pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0],
5238 FileStrategy::kKeepSeparate);
Austin Schuh1124c512023-08-01 15:20:44 -07005239 pi1_logger.StartLogger(kLogfile1_1);
5240
5241 event_loop_factory.RunFor(chrono::seconds(10));
5242
5243 // Now, reconnect, and everything should recover.
5244 pi1->Connect(pi2->node());
5245 pi2->Connect(pi1->node());
5246
5247 event_loop_factory.RunFor(chrono::seconds(10));
5248
5249 pi1->Connect(pi3->node());
5250 pi3->Connect(pi1->node());
5251
5252 event_loop_factory.RunFor(chrono::seconds(10));
5253
5254 pi1_logger.AppendAllFilenames(&filenames);
5255 }
5256
5257 // Make sure we can read this.
5258 const std::vector<LogFile> sorted_parts = SortParts(filenames);
5259 EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
5260 auto result = ConfirmReadable(filenames);
5261}
5262
Stephan Pleinesf63bde82024-01-13 15:59:33 -08005263} // namespace aos::logger::testing