Make LogReader::OnStart able to start applications at times
There are 2 main features missing from OnStart to make it truly useful
at scale.
1) We need to be able to start applications...
2) Only replaying a time range (typically on the RT clock) is very
helpful.
Add support for 1.
2 is more tricky. I considered putting the logic outside LogReader, but
in the end, the bookkeeping for starting and stopping is bad enough that
it really helps to have LogReader manage and synchronize it. Maybe we
can refactor it back out later.
Change-Id: I4ddf6e18819d3aadd02f38776bbc7aef843a96b0
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 1732549..3b0b672 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -523,7 +523,12 @@
}
};
-void ConfirmReadable(const std::vector<std::string> &files) {
+std::vector<std::pair<std::vector<realtime_clock::time_point>,
+ std::vector<realtime_clock::time_point>>>
+ConfirmReadable(
+ const std::vector<std::string> &files,
+ realtime_clock::time_point start_time = realtime_clock::min_time,
+ realtime_clock::time_point end_time = realtime_clock::max_time) {
{
LogReader reader(SortParts(files));
@@ -535,22 +540,65 @@
reader.Deregister();
}
{
+ std::vector<std::pair<std::vector<realtime_clock::time_point>,
+ std::vector<realtime_clock::time_point>>>
+ result;
LogReader reader(SortParts(files));
+ reader.SetStartTime(start_time);
+ reader.SetEndTime(end_time);
+
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
reader.RegisterWithoutStarting(&log_reader_factory);
+ result.resize(
+ configuration::NodesCount(log_reader_factory.configuration()));
if (configuration::MultiNode(log_reader_factory.configuration())) {
+ size_t i = 0;
for (const aos::Node *node :
*log_reader_factory.configuration()->nodes()) {
- reader.OnStart(node, [node]() {
+ LOG(INFO) << "Registering start";
+ reader.OnStart(node, [node, &log_reader_factory, &result,
+ node_index = i]() {
LOG(INFO) << "Starting " << node->name()->string_view();
+ result[node_index].first.push_back(
+ log_reader_factory.GetNodeEventLoopFactory(node)->realtime_now());
});
+ reader.OnEnd(node, [node, &log_reader_factory, &result,
+ node_index = i]() {
+ LOG(INFO) << "Ending " << node->name()->string_view();
+ result[node_index].second.push_back(
+ log_reader_factory.GetNodeEventLoopFactory(node)->realtime_now());
+ });
+ ++i;
}
+ } else {
+ reader.OnStart([&log_reader_factory, &result]() {
+ LOG(INFO) << "Starting";
+ result[0].first.push_back(
+ log_reader_factory.GetNodeEventLoopFactory(nullptr)
+ ->realtime_now());
+ });
+ reader.OnEnd([&log_reader_factory, &result]() {
+ LOG(INFO) << "Ending";
+ result[0].second.push_back(
+ log_reader_factory.GetNodeEventLoopFactory(nullptr)
+ ->realtime_now());
+ });
}
log_reader_factory.Run();
reader.Deregister();
+
+ for (auto x : result) {
+ for (auto y : x.first) {
+ VLOG(1) << "Start " << y;
+ }
+ for (auto y : x.second) {
+ VLOG(1) << "End " << y;
+ }
+ }
+ return result;
}
}
@@ -3589,7 +3637,27 @@
// Confirm that we can parse the result. LogReader has enough internal CHECKs
// to confirm the right thing happened.
const std::vector<LogFile> sorted_parts = SortParts(filenames);
- ConfirmReadable(filenames);
+ auto result = ConfirmReadable(filenames);
+ EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
+ chrono::seconds(1)));
+ EXPECT_THAT(result[0].second,
+ ::testing::ElementsAre(realtime_clock::epoch() +
+ chrono::microseconds(34990350)));
+
+ EXPECT_THAT(result[1].first,
+ ::testing::ElementsAre(
+ realtime_clock::epoch() + chrono::seconds(1),
+ realtime_clock::epoch() + chrono::microseconds(3323000)));
+ EXPECT_THAT(result[1].second,
+ ::testing::ElementsAre(
+ realtime_clock::epoch() + chrono::microseconds(13990200),
+ realtime_clock::epoch() + chrono::microseconds(16313200)));
+
+ EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
+ chrono::seconds(1)));
+ EXPECT_THAT(result[2].second,
+ ::testing::ElementsAre(realtime_clock::epoch() +
+ chrono::microseconds(34900150)));
}
// Tests that local data before remote data after reboot is properly replayed.
@@ -3730,9 +3798,201 @@
// Confirm that we can parse the result. LogReader has enough internal CHECKs
// to confirm the right thing happened.
const std::vector<LogFile> sorted_parts = SortParts(filenames);
- ConfirmReadable(filenames);
+ auto result = ConfirmReadable(filenames);
+
+ EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
+ EXPECT_THAT(result[0].second,
+ ::testing::ElementsAre(realtime_clock::epoch() +
+ chrono::microseconds(11000350)));
+
+ EXPECT_THAT(result[1].first,
+ ::testing::ElementsAre(
+ realtime_clock::epoch(),
+ realtime_clock::epoch() + chrono::microseconds(107005000)));
+ EXPECT_THAT(result[1].second,
+ ::testing::ElementsAre(
+ realtime_clock::epoch() + chrono::microseconds(4000150),
+ realtime_clock::epoch() + chrono::microseconds(111000200)));
+
+ EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
+ EXPECT_THAT(result[2].second,
+ ::testing::ElementsAre(realtime_clock::epoch() +
+ chrono::microseconds(11000150)));
+
+ auto start_stop_result = ConfirmReadable(
+ filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
+ realtime_clock::epoch() + chrono::milliseconds(3000));
+
+ EXPECT_THAT(start_stop_result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
+ chrono::seconds(2)));
+ EXPECT_THAT(start_stop_result[0].second, ::testing::ElementsAre(realtime_clock::epoch() +
+ chrono::seconds(3)));
+ EXPECT_THAT(start_stop_result[1].first, ::testing::ElementsAre(realtime_clock::epoch() +
+ chrono::seconds(2)));
+ EXPECT_THAT(start_stop_result[1].second, ::testing::ElementsAre(realtime_clock::epoch() +
+ chrono::seconds(3)));
+ EXPECT_THAT(start_stop_result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
+ chrono::seconds(2)));
+ EXPECT_THAT(start_stop_result[2].second, ::testing::ElementsAre(realtime_clock::epoch() +
+ chrono::seconds(3)));
}
+// Tests that setting the start and stop flags across a reboot works as
+// expected.
+TEST(MultinodeRebootLoggerTest, RebootStartStopTimes) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(ArtifactPath(
+ "aos/events/logging/multinode_pingpong_split3_config.json"));
+ message_bridge::TestingTimeConverter time_converter(
+ configuration::NodesCount(&config.message()));
+ SimulatedEventLoopFactory event_loop_factory(&config.message());
+ event_loop_factory.SetTimeConverter(&time_converter);
+ NodeEventLoopFactory *const pi1 =
+ event_loop_factory.GetNodeEventLoopFactory("pi1");
+ const size_t pi1_index = configuration::GetNodeIndex(
+ event_loop_factory.configuration(), pi1->node());
+ NodeEventLoopFactory *const pi2 =
+ event_loop_factory.GetNodeEventLoopFactory("pi2");
+ const size_t pi2_index = configuration::GetNodeIndex(
+ event_loop_factory.configuration(), pi2->node());
+ NodeEventLoopFactory *const pi3 =
+ event_loop_factory.GetNodeEventLoopFactory("pi3");
+ const size_t pi3_index = configuration::GetNodeIndex(
+ event_loop_factory.configuration(), pi3->node());
+
+ const std::string kLogfile1_1 =
+ aos::testing::TestTmpDir() + "/multi_logfile1/";
+ const std::string kLogfile2_1 =
+ aos::testing::TestTmpDir() + "/multi_logfile2.1/";
+ const std::string kLogfile2_2 =
+ aos::testing::TestTmpDir() + "/multi_logfile2.2/";
+ const std::string kLogfile3_1 =
+ aos::testing::TestTmpDir() + "/multi_logfile3/";
+ util::UnlinkRecursive(kLogfile1_1);
+ util::UnlinkRecursive(kLogfile2_1);
+ util::UnlinkRecursive(kLogfile2_2);
+ util::UnlinkRecursive(kLogfile3_1);
+ {
+ CHECK_EQ(pi1_index, 0u);
+ CHECK_EQ(pi2_index, 1u);
+ CHECK_EQ(pi3_index, 2u);
+
+ time_converter.AddNextTimestamp(
+ distributed_clock::epoch(),
+ {BootTimestamp::epoch(), BootTimestamp::epoch(),
+ BootTimestamp::epoch()});
+ const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
+ time_converter.AddNextTimestamp(
+ distributed_clock::epoch() + reboot_time,
+ {BootTimestamp::epoch() + reboot_time,
+ BootTimestamp{.boot = 1,
+ .time = monotonic_clock::epoch() + reboot_time},
+ BootTimestamp::epoch() + reboot_time});
+ }
+
+ std::vector<std::string> filenames;
+ {
+ LoggerState pi1_logger = LoggerState::MakeLogger(
+ pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+ LoggerState pi3_logger = LoggerState::MakeLogger(
+ pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+ {
+ // And now start the logger.
+ LoggerState pi2_logger = LoggerState::MakeLogger(
+ pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+
+ pi1_logger.StartLogger(kLogfile1_1);
+ pi3_logger.StartLogger(kLogfile3_1);
+ pi2_logger.StartLogger(kLogfile2_1);
+
+ event_loop_factory.RunFor(chrono::milliseconds(1005));
+
+ // Now that we've got a start time in the past, turn on data.
+ std::unique_ptr<aos::EventLoop> ping_event_loop =
+ pi1->MakeEventLoop("ping");
+ Ping ping(ping_event_loop.get());
+
+ pi2->AlwaysStart<Pong>("pong");
+
+ event_loop_factory.RunFor(chrono::milliseconds(3000));
+
+ pi2_logger.AppendAllFilenames(&filenames);
+ }
+ event_loop_factory.RunFor(chrono::milliseconds(995));
+ // pi2 now reboots at 5 seconds.
+ {
+ event_loop_factory.RunFor(chrono::milliseconds(1000));
+
+ // Make local stuff happen before we start logging and connect the remote.
+ pi2->AlwaysStart<Pong>("pong");
+ std::unique_ptr<aos::EventLoop> ping_event_loop =
+ pi1->MakeEventLoop("ping");
+ Ping ping(ping_event_loop.get());
+ event_loop_factory.RunFor(chrono::milliseconds(5));
+
+ // Start logging again on pi2 after it is up.
+ LoggerState pi2_logger = LoggerState::MakeLogger(
+ pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+ pi2_logger.StartLogger(kLogfile2_2);
+
+ event_loop_factory.RunFor(chrono::milliseconds(5000));
+
+ pi2_logger.AppendAllFilenames(&filenames);
+ }
+
+ pi1_logger.AppendAllFilenames(&filenames);
+ pi3_logger.AppendAllFilenames(&filenames);
+ }
+
+ const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ auto result = ConfirmReadable(filenames);
+
+ EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
+ EXPECT_THAT(result[0].second,
+ ::testing::ElementsAre(realtime_clock::epoch() +
+ chrono::microseconds(11000350)));
+
+ EXPECT_THAT(result[1].first,
+ ::testing::ElementsAre(
+ realtime_clock::epoch(),
+ realtime_clock::epoch() + chrono::microseconds(6005000)));
+ EXPECT_THAT(result[1].second,
+ ::testing::ElementsAre(
+ realtime_clock::epoch() + chrono::microseconds(4900150),
+ realtime_clock::epoch() + chrono::microseconds(11000200)));
+
+ EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
+ EXPECT_THAT(result[2].second,
+ ::testing::ElementsAre(realtime_clock::epoch() +
+ chrono::microseconds(11000150)));
+
+ // Confirm we observed the correct start and stop times. We should see the
+ // reboot here.
+ auto start_stop_result = ConfirmReadable(
+ filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
+ realtime_clock::epoch() + chrono::milliseconds(8000));
+
+ EXPECT_THAT(
+ start_stop_result[0].first,
+ ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
+ EXPECT_THAT(
+ start_stop_result[0].second,
+ ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
+ EXPECT_THAT(start_stop_result[1].first,
+ ::testing::ElementsAre(
+ realtime_clock::epoch() + chrono::seconds(2),
+ realtime_clock::epoch() + chrono::microseconds(6005000)));
+ EXPECT_THAT(start_stop_result[1].second,
+ ::testing::ElementsAre(
+ realtime_clock::epoch() + chrono::microseconds(4900150),
+ realtime_clock::epoch() + chrono::seconds(8)));
+ EXPECT_THAT(
+ start_stop_result[2].first,
+ ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
+ EXPECT_THAT(
+ start_stop_result[2].second,
+ ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
+}
} // namespace testing
} // namespace logger