Support nodes rebooting
We have log files which span a reboot. We want to be able to replay the
timeline across that reboot so we can run simulations and everything
else interesting.
This requires a bunch of stuff, unfortunately.
The most visible one is that we need to be able to "reboot" a node.
This means we need a way of starting it up and stopping it. There are
now OnStartup and OnShutdown handlers in NodeEventLoopFactory to serve
this purpose, and better application context tracking to make it easier
to start and stop applications through a virtual starter.
This requires LogReader and the simulated network bridge to be
refactored to support nodes coming and going while the main application
continues to run.
From there, everything else is just a massive amount of plumbing of the
BootTimestamp through everything just short of the user. Boot UUIDs
were put in TimeConverter so everything related to rebooting is all
nicely together.
Change-Id: I2cfb659c5764c1dd80dc66f33cfab3937159e324
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 195d442..226c084 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -435,25 +435,19 @@
absl::StrCat("aos/events/logging/", GetParam().config)))),
time_converter_(configuration::NodesCount(&config_.message())),
event_loop_factory_(&config_.message()),
- pi1_(
- configuration::GetNode(event_loop_factory_.configuration(), "pi1")),
+ pi1_(event_loop_factory_.GetNodeEventLoopFactory("pi1")),
pi1_index_(configuration::GetNodeIndex(
- event_loop_factory_.configuration(), pi1_)),
- pi2_(
- configuration::GetNode(event_loop_factory_.configuration(), "pi2")),
+ event_loop_factory_.configuration(), pi1_->node())),
+ pi2_(event_loop_factory_.GetNodeEventLoopFactory("pi2")),
pi2_index_(configuration::GetNodeIndex(
- event_loop_factory_.configuration(), pi2_)),
+ event_loop_factory_.configuration(), pi2_->node())),
tmp_dir_(aos::testing::TestTmpDir()),
logfile_base1_(tmp_dir_ + "/multi_logfile1"),
logfile_base2_(tmp_dir_ + "/multi_logfile2"),
pi1_reboot_logfiles_(MakePi1RebootLogfiles()),
logfiles_(MakeLogFiles(logfile_base1_, logfile_base2_)),
pi1_single_direction_logfiles_(MakePi1SingleDirectionLogfiles()),
- structured_logfiles_(StructureLogFiles()),
- ping_event_loop_(event_loop_factory_.MakeEventLoop("ping", pi1_)),
- ping_(ping_event_loop_.get()),
- pong_event_loop_(event_loop_factory_.MakeEventLoop("pong", pi2_)),
- pong_(pong_event_loop_.get()) {
+ structured_logfiles_(StructureLogFiles()) {
LOG(INFO) << "Config " << GetParam().config;
event_loop_factory_.SetTimeConverter(&time_converter_);
@@ -474,6 +468,9 @@
LOG(INFO) << "Logging data to " << logfiles_[0] << ", " << logfiles_[1]
<< " and " << logfiles_[2];
+
+ pi1_->OnStartup([this]() { pi1_->AlwaysStart<Ping>("ping"); });
+ pi2_->OnStartup([this]() { pi2_->AlwaysStart<Pong>("pong"); });
}
bool shared() const { return GetParam().shared; }
@@ -559,13 +556,14 @@
result.emplace_back(logfile_base1_ + "_pi1_data.part0.bfbs");
result.emplace_back(logfile_base1_ + "_pi1_data.part1.bfbs");
result.emplace_back(logfile_base1_ + "_pi1_data.part2.bfbs");
- result.emplace_back(logfile_base1_ + "_pi1_data.part3.bfbs");
result.emplace_back(logfile_base1_ +
"_pi2_data/test/aos.examples.Pong.part0.bfbs");
result.emplace_back(logfile_base1_ +
"_pi2_data/test/aos.examples.Pong.part1.bfbs");
result.emplace_back(logfile_base1_ +
"_pi2_data/test/aos.examples.Pong.part2.bfbs");
+ result.emplace_back(logfile_base1_ +
+ "_pi2_data/test/aos.examples.Pong.part3.bfbs");
result.emplace_back(
logfile_base1_ +
"_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0.bfbs");
@@ -576,6 +574,9 @@
logfile_base1_ +
"_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part2.bfbs");
result.emplace_back(
+ logfile_base1_ +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part3.bfbs");
+ result.emplace_back(
absl::StrCat(logfile_base1_, "_", GetParam().sha256, ".bfbs"));
if (shared()) {
result.emplace_back(logfile_base1_ +
@@ -587,6 +588,9 @@
result.emplace_back(logfile_base1_ +
"_timestamps/pi1/aos/remote_timestamps/pi2/"
"aos.message_bridge.RemoteMessage.part2.bfbs");
+ result.emplace_back(logfile_base1_ +
+ "_timestamps/pi1/aos/remote_timestamps/pi2/"
+ "aos.message_bridge.RemoteMessage.part3.bfbs");
} else {
result.emplace_back(logfile_base1_ +
"_timestamps/pi1/aos/remote_timestamps/pi2/pi1/aos/"
@@ -600,6 +604,10 @@
"_timestamps/pi1/aos/remote_timestamps/pi2/pi1/aos/"
"aos-message_bridge-Timestamp/"
"aos.message_bridge.RemoteMessage.part2.bfbs");
+ result.emplace_back(logfile_base1_ +
+ "_timestamps/pi1/aos/remote_timestamps/pi2/pi1/aos/"
+ "aos-message_bridge-Timestamp/"
+ "aos.message_bridge.RemoteMessage.part3.bfbs");
result.emplace_back(logfile_base1_ +
"_timestamps/pi1/aos/remote_timestamps/pi2/test/"
@@ -613,6 +621,10 @@
"_timestamps/pi1/aos/remote_timestamps/pi2/test/"
"aos-examples-Ping/"
"aos.message_bridge.RemoteMessage.part2.bfbs");
+ result.emplace_back(logfile_base1_ +
+ "_timestamps/pi1/aos/remote_timestamps/pi2/test/"
+ "aos-examples-Ping/"
+ "aos.message_bridge.RemoteMessage.part3.bfbs");
}
return result;
}
@@ -663,7 +675,7 @@
MultiNodeLogNamer *log_namer;
};
- LoggerState MakeLogger(const Node *node,
+ LoggerState MakeLogger(NodeEventLoopFactory *node,
SimulatedEventLoopFactory *factory = nullptr,
const Configuration *configuration = nullptr) {
if (factory == nullptr) {
@@ -672,13 +684,11 @@
if (configuration == nullptr) {
configuration = factory->configuration();
}
- return {
- factory->MakeEventLoop(
- "logger", configuration::GetNode(factory->configuration(), node)),
- {},
- configuration,
- node,
- nullptr};
+ return {node->MakeEventLoop("logger"),
+ {},
+ configuration,
+ configuration::GetNode(configuration, node->node()),
+ nullptr};
}
void StartLogger(LoggerState *logger, std::string logfile_base = "",
@@ -820,9 +830,9 @@
message_bridge::TestingTimeConverter time_converter_;
SimulatedEventLoopFactory event_loop_factory_;
- const Node *const pi1_;
+ NodeEventLoopFactory *const pi1_;
const size_t pi1_index_;
- const Node *const pi2_;
+ NodeEventLoopFactory *const pi2_;
const size_t pi2_index_;
std::string tmp_dir_;
@@ -833,11 +843,6 @@
std::vector<std::string> pi1_single_direction_logfiles_;
std::vector<std::vector<std::string>> structured_logfiles_;
-
- std::unique_ptr<EventLoop> ping_event_loop_;
- Ping ping_;
- std::unique_ptr<EventLoop> pong_event_loop_;
- Pong pong_;
};
// Counts the number of messages on a channel. Returns (channel name, channel
@@ -1490,17 +1495,13 @@
{
LoggerState pi2_logger = MakeLogger(pi2_);
- NodeEventLoopFactory *pi1 =
- event_loop_factory_.GetNodeEventLoopFactory(pi1_);
- NodeEventLoopFactory *pi2 =
- event_loop_factory_.GetNodeEventLoopFactory(pi2_);
- LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
- << pi2->realtime_now() << " distributed "
- << pi2->ToDistributedClock(pi2->monotonic_now());
+ LOG(INFO) << "pi2 times: " << pi2_->monotonic_now() << " "
+ << pi2_->realtime_now() << " distributed "
+ << pi2_->ToDistributedClock(pi2_->monotonic_now());
- LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
- << pi2->realtime_now() << " distributed "
- << pi2->ToDistributedClock(pi2->monotonic_now());
+ LOG(INFO) << "pi2_ times: " << pi2_->monotonic_now() << " "
+ << pi2_->realtime_now() << " distributed "
+ << pi2_->ToDistributedClock(pi2_->monotonic_now());
event_loop_factory_.RunFor(startup_sleep1);
@@ -1519,7 +1520,7 @@
// than the network delay. This confirms that if we sort incorrectly, it
// would show in the results.
EXPECT_LT(
- (pi2->monotonic_now() - pi1->monotonic_now()) - initial_pi2_offset,
+ (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
-event_loop_factory_.send_delay() -
event_loop_factory_.network_delay());
@@ -1528,7 +1529,7 @@
// And now check that we went far enough the other way to make sure we
// cover both problems.
EXPECT_GT(
- (pi2->monotonic_now() - pi1->monotonic_now()) - initial_pi2_offset,
+ (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
event_loop_factory_.send_delay() +
event_loop_factory_.network_delay());
}
@@ -2140,11 +2141,9 @@
// And confirm we can re-create a log again, while checking the contents.
{
LoggerState pi1_logger = MakeLogger(
- configuration::GetNode(log_reader_factory.configuration(), pi1_),
- &log_reader_factory);
+ log_reader_factory.GetNodeEventLoopFactory("pi1"), &log_reader_factory);
LoggerState pi2_logger = MakeLogger(
- configuration::GetNode(log_reader_factory.configuration(), pi2_),
- &log_reader_factory);
+ log_reader_factory.GetNodeEventLoopFactory("pi2"), &log_reader_factory);
StartLogger(&pi1_logger, tmp_dir_ + "/relogged1");
StartLogger(&pi2_logger, tmp_dir_ + "/relogged2");
@@ -2234,8 +2233,7 @@
// Test that renaming the file base dies.
TEST_P(MultinodeLoggerDeathTest, LoggerRenameFile) {
time_converter_.AddMonotonic(
- {BootTimestamp::epoch(),
- BootTimestamp::epoch() + chrono::seconds(1000)});
+ {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
util::UnlinkRecursive(tmp_dir_ + "/renamefile");
logfile_base1_ = tmp_dir_ + "/renamefile/multi_logfile1";
logfile_base2_ = tmp_dir_ + "/renamefile/multi_logfile2";
@@ -2254,29 +2252,50 @@
// Tests that we properly recreate forwarded timestamps when replaying a log.
// This should be enough that we can then re-run the logger and get a valid log
// back.
-TEST_P(MultinodeLoggerDeathTest, RemoteReboot) {
- time_converter_.StartEqual();
- std::string pi2_boot1;
- std::string pi2_boot2;
+TEST_P(MultinodeLoggerTest, RemoteReboot) {
+ const UUID pi1_boot0 = UUID::Random();
+ const UUID pi2_boot0 = UUID::Random();
+ const UUID pi2_boot1 = UUID::Random();
{
- pi2_boot1 = event_loop_factory_.GetNodeEventLoopFactory(pi2_)
- ->boot_uuid()
- .ToString();
+ CHECK_EQ(pi1_index_, 0u);
+ CHECK_EQ(pi2_index_, 1u);
+
+ time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
+ time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
+ time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
+
+ time_converter_.AddNextTimestamp(
+ distributed_clock::epoch(),
+ {BootTimestamp::epoch(), BootTimestamp::epoch()});
+ const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
+ time_converter_.AddNextTimestamp(
+ distributed_clock::epoch() + reboot_time,
+ {BootTimestamp::epoch() + reboot_time,
+ BootTimestamp{
+ .boot = 1,
+ .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
+ }
+
+ {
LoggerState pi1_logger = MakeLogger(pi1_);
event_loop_factory_.RunFor(chrono::milliseconds(95));
+ EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
+ pi1_boot0);
+ EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
+ pi2_boot0);
StartLogger(&pi1_logger);
event_loop_factory_.RunFor(chrono::milliseconds(10000));
- event_loop_factory_.GetNodeEventLoopFactory(pi2_)->Reboot();
-
- pi2_boot2 = event_loop_factory_.GetNodeEventLoopFactory(pi2_)
- ->boot_uuid()
- .ToString();
+ VLOG(1) << "Reboot now!";
event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
+ pi1_boot0);
+ EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
+ pi2_boot1);
}
// Confirm that our new oldest timestamps properly update as we reboot and
@@ -2289,7 +2308,36 @@
continue;
}
+ const monotonic_clock::time_point monotonic_start_time =
+ monotonic_clock::time_point(
+ chrono::nanoseconds(log_header->message().monotonic_start_time()));
+ const UUID source_node_boot_uuid = UUID::FromString(
+ log_header->message().source_node_boot_uuid()->string_view());
+
if (log_header->message().node()->name()->string_view() != "pi1") {
+ switch (log_header->message().parts_index()) {
+ case 0:
+ EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
+ EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
+ break;
+ case 1:
+ EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
+ ASSERT_EQ(monotonic_start_time,
+ monotonic_clock::epoch() + chrono::seconds(1));
+ break;
+ case 2:
+ EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
+ EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
+ break;
+ case 3:
+ EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
+ ASSERT_EQ(monotonic_start_time,
+ monotonic_clock::epoch() + chrono::nanoseconds(2322999462));
+ break;
+ default:
+ FAIL();
+ break;
+ }
continue;
}
SCOPED_TRACE(file);
@@ -2368,21 +2416,13 @@
break;
case 2:
EXPECT_EQ(oldest_remote_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(10100000)));
+ monotonic_clock::time_point(chrono::milliseconds(1323) +
+ chrono::microseconds(200)));
EXPECT_EQ(oldest_local_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(10100150)));
+ monotonic_clock::time_point(chrono::microseconds(10100350)));
EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
- monotonic_clock::max_time);
- EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
- monotonic_clock::max_time);
- break;
- case 3:
- EXPECT_EQ(oldest_remote_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(10100000)));
- EXPECT_EQ(oldest_local_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(10100150)));
- EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(10100200)));
+ monotonic_clock::time_point(chrono::milliseconds(1323) +
+ chrono::microseconds(200)));
EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
monotonic_clock::time_point(chrono::microseconds(10100350)));
break;
@@ -2393,25 +2433,26 @@
}
// Confirm that we refuse to replay logs with missing boot uuids.
- EXPECT_DEATH(
- {
- LogReader reader(SortParts(pi1_reboot_logfiles_));
+ {
+ LogReader reader(SortParts(pi1_reboot_logfiles_));
- SimulatedEventLoopFactory log_reader_factory(reader.configuration());
- log_reader_factory.set_send_delay(chrono::microseconds(0));
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+ log_reader_factory.set_send_delay(chrono::microseconds(0));
- // This sends out the fetched messages and advances time to the start of
- // the log file.
- reader.Register(&log_reader_factory);
- },
- absl::StrFormat("(%s|%s).*(%s|%s).*Found parts from different boots",
- pi2_boot1, pi2_boot2, pi2_boot2, pi2_boot1));
+ // This sends out the fetched messages and advances time to the start of
+ // the log file.
+ reader.Register(&log_reader_factory);
+
+ log_reader_factory.Run();
+
+ reader.Deregister();
+ }
}
// Tests that we properly handle one direction of message_bridge being
// unavailable.
TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
- event_loop_factory_.GetNodeEventLoopFactory(pi1_)->Disconnect(pi2_);
+ pi1_->Disconnect(pi2_->node());
time_converter_.AddMonotonic(
{BootTimestamp::epoch(),
BootTimestamp::epoch() + chrono::seconds(1000)});
@@ -2437,7 +2478,7 @@
// Tests that we properly handle one direction of message_bridge being
// unavailable.
TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
- event_loop_factory_.GetNodeEventLoopFactory(pi1_)->Disconnect(pi2_);
+ pi1_->Disconnect(pi2_->node());
time_converter_.AddMonotonic(
{BootTimestamp::epoch(),
BootTimestamp::epoch() + chrono::seconds(500)});
@@ -2463,8 +2504,8 @@
// Tests that we properly handle a dead node. Do this by just disconnecting it
// and only using one nodes of logs.
TEST_P(MultinodeLoggerTest, DeadNode) {
- event_loop_factory_.GetNodeEventLoopFactory(pi1_)->Disconnect(pi2_);
- event_loop_factory_.GetNodeEventLoopFactory(pi2_)->Disconnect(pi1_);
+ pi1_->Disconnect(pi2_->node());
+ pi2_->Disconnect(pi1_->node());
time_converter_.AddMonotonic(
{BootTimestamp::epoch(),
BootTimestamp::epoch() + chrono::seconds(1000)});
@@ -2552,10 +2593,10 @@
std::vector<std::string> log_files;
{
LoggerState pi1_logger =
- MakeLogger(configuration::GetNode(reader.logged_configuration(), pi1_),
+ MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
&log_reader_factory, reader.logged_configuration());
LoggerState pi2_logger =
- MakeLogger(configuration::GetNode(reader.logged_configuration(), pi2_),
+ MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
&log_reader_factory, reader.logged_configuration());
StartLogger(&pi1_logger, tmp_dir_ + "/relogged1");