Support nodes rebooting
We have log files which span a reboot. We want to be able to replay the
timeline across that reboot so we can run simulations and everything
else interesting.
This requires a bunch of stuff, unfortunately.
The most visible one is that we need to be able to "reboot" a node.
This means we need a way of starting it up and stopping it. There are
now OnStartup and OnShutdown handlers in NodeEventLoopFactory to serve
this purpose, and better application context tracking to make it easier
to start and stop applications through a virtual starter.
This requires LogReader and the simulated network bridge to be
refactored to support nodes coming and going while the main application
continues to run.
From there, everything else is just a massive amount of plumbing of the
BootTimestamp through everything just short of the user. Boot UUIDs
were put in TimeConverter so everything related to rebooting is all
nicely together.
Change-Id: I2cfb659c5764c1dd80dc66f33cfab3937159e324
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 5cfb5df..8293956 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -73,6 +73,10 @@
// below, but can be anything as long as the locations needed to send
// everything are available.
void Register(SimulatedEventLoopFactory *event_loop_factory);
+ // Registers all the callbacks to send the log file data out to an event loop
+ // factory. This does not start replaying or change the current distributed
+ // time of the factory. It does change the monotonic clocks to be right.
+ void RegisterWithoutStarting(SimulatedEventLoopFactory *event_loop_factory);
// Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
// and then calls Register.
void Register();
@@ -80,6 +84,13 @@
// only useful when replaying live.
void Register(EventLoop *event_loop);
+ // Called whenever a log file starts for a node.
+ void OnStart(std::function<void()> fn);
+ void OnStart(const Node *node, std::function<void()> fn);
+ // Called whenever a log file ends for a node.
+ void OnEnd(std::function<void()> fn);
+ void OnEnd(const Node *node, std::function<void()> fn);
+
// Unregisters the senders. You only need to call this if you separately
// supplied an event loop or event loop factory and the lifetimes are such
// that they need to be explicitly destroyed before the LogReader destructor
@@ -175,7 +186,11 @@
}
private:
- const Channel *RemapChannel(const EventLoop *event_loop,
+ void Register(EventLoop *event_loop, const Node *node);
+
+ void RegisterDuringStartup(EventLoop *event_loop, const Node *node);
+
+ const Channel *RemapChannel(const EventLoop *event_loop, const Node *node,
const Channel *channel);
// Queues at least max_out_of_order_duration_ messages into channels_.
@@ -206,7 +221,7 @@
// send it immediately.
void Send(
FlatbufferDetachedBuffer<message_bridge::RemoteMessage> remote_message,
- monotonic_clock::time_point monotonic_timestamp_time);
+ BootTimestamp monotonic_timestamp_time, size_t source_boot_count);
private:
// Handles actually sending the timestamp if we were delayed.
@@ -239,7 +254,7 @@
// State per node.
class State {
public:
- State(std::unique_ptr<TimestampMapper> timestamp_mapper);
+ State(std::unique_ptr<TimestampMapper> timestamp_mapper, const Node *node);
// Connects up the timestamp mappers.
void AddPeer(State *peer);
@@ -251,12 +266,44 @@
TimestampedMessage PopOldest();
// Returns the monotonic time of the oldest message.
- monotonic_clock::time_point OldestMessageTime() const;
+ BootTimestamp OldestMessageTime() const;
+
+ size_t boot_count() const {
+ // If we are replaying directly into an event loop, we can't reboot. So
+ // we will stay stuck on the 0th boot.
+ if (!node_event_loop_factory_) return 0u;
+ return node_event_loop_factory_->boot_count();
+ }
// Primes the queues inside State. Should be called before calling
// OldestMessageTime.
void SeedSortedMessages();
+ void SetupStartupTimer() {
+ const monotonic_clock::time_point start_time =
+ monotonic_start_time(boot_count());
+ if (start_time == monotonic_clock::min_time) {
+ LOG(ERROR)
+ << "No start time, skipping, please figure out when this happens";
+ RunOnStart();
+ return;
+ }
+ CHECK_GT(start_time, event_loop_->monotonic_now());
+ startup_timer_->Setup(start_time);
+ }
+
+ void set_startup_timer(TimerHandler *timer_handler) {
+ startup_timer_ = timer_handler;
+ if (startup_timer_) {
+ if (event_loop_->node() != nullptr) {
+ startup_timer_->set_name(absl::StrCat(
+ event_loop_->node()->name()->string_view(), "_startup"));
+ } else {
+ startup_timer_->set_name("startup");
+ }
+ }
+ }
+
// Returns the starting time for this node.
monotonic_clock::time_point monotonic_start_time(size_t boot_count) const {
return timestamp_mapper_
@@ -271,13 +318,19 @@
// Sets the node event loop factory for replaying into a
// SimulatedEventLoopFactory. Returns the EventLoop to use.
- EventLoop *SetNodeEventLoopFactory(
- NodeEventLoopFactory *node_event_loop_factory);
+ void SetNodeEventLoopFactory(NodeEventLoopFactory *node_event_loop_factory);
// Sets and gets the event loop to use.
void set_event_loop(EventLoop *event_loop) { event_loop_ = event_loop; }
EventLoop *event_loop() { return event_loop_; }
+ const Node *node() const { return node_; }
+
+ void Register(EventLoop *event_loop);
+
+ void OnStart(std::function<void()> fn);
+ void OnEnd(std::function<void()> fn);
+
// Sets the current realtime offset from the monotonic clock for this node
// (if we are on a simulated event loop).
void SetRealtimeOffset(monotonic_clock::time_point monotonic_time,
@@ -302,19 +355,29 @@
// Returns the current time on the remote node which sends messages on
// channel_index.
- monotonic_clock::time_point monotonic_remote_now(size_t channel_index) {
- return channel_source_state_[channel_index]
- ->node_event_loop_factory_->monotonic_now();
+ BootTimestamp monotonic_remote_now(size_t channel_index) {
+ State *s = channel_source_state_[channel_index];
+ return BootTimestamp{
+ .boot = s->boot_count(),
+ .time = s->node_event_loop_factory_->monotonic_now()};
}
// Returns the start time of the remote for the provided channel.
monotonic_clock::time_point monotonic_remote_start_time(
- size_t boot_count,
- size_t channel_index) {
+ size_t boot_count, size_t channel_index) {
return channel_source_state_[channel_index]->monotonic_start_time(
boot_count);
}
+ void DestroyEventLoop() { event_loop_unique_ptr_.reset(); }
+
+ EventLoop *MakeEventLoop() {
+ CHECK(!event_loop_unique_ptr_);
+ event_loop_unique_ptr_ =
+ node_event_loop_factory_->MakeEventLoop("log_reader");
+ return event_loop_unique_ptr_.get();
+ }
+
distributed_clock::time_point RemoteToDistributedClock(
size_t channel_index, monotonic_clock::time_point time) {
return channel_source_state_[channel_index]
@@ -337,15 +400,30 @@
void SetChannel(size_t logged_channel_index, size_t factory_channel_index,
std::unique_ptr<RawSender> sender,
message_bridge::NoncausalOffsetEstimator *filter,
- RemoteMessageSender *remote_timestamp_sender,
- State *source_state);
+ bool is_forwarded, State *source_state);
+
+ void SetRemoteTimestampSender(size_t logged_channel_index,
+ RemoteMessageSender *remote_timestamp_sender);
+
+ void RunOnStart();
+ void RunOnEnd();
// Unregisters everything so we can destory the event loop.
+ // TODO(austin): Is this needed? OnShutdown should be able to serve this
+ // need.
void Deregister();
// Sets the current TimerHandle for the replay callback.
void set_timer_handler(TimerHandler *timer_handler) {
timer_handler_ = timer_handler;
+ if (timer_handler_) {
+ if (event_loop_->node() != nullptr) {
+ timer_handler_->set_name(absl::StrCat(
+ event_loop_->node()->name()->string_view(), "_main"));
+ } else {
+ timer_handler_->set_name("main");
+ }
+ }
}
// Sets the next wakeup time on the replay callback.
@@ -364,6 +442,11 @@
return timestamp_mapper_->DebugString();
}
+ void ClearRemoteTimestampSenders() {
+ channel_timestamp_loggers_.clear();
+ timestamp_loggers_.clear();
+ }
+
private:
// Log file.
std::unique_ptr<TimestampMapper> timestamp_mapper_;
@@ -408,9 +491,11 @@
NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
std::unique_ptr<EventLoop> event_loop_unique_ptr_;
// Event loop.
+ const Node *node_ = nullptr;
EventLoop *event_loop_ = nullptr;
// And timer used to send messages.
- TimerHandler *timer_handler_;
+ TimerHandler *timer_handler_ = nullptr;
+ TimerHandler *startup_timer_ = nullptr;
// Filters (or nullptr if it isn't a forwarded channel) for each channel.
// This corresponds to the object which is shared among all the channels
@@ -432,6 +517,12 @@
// is the channel that timestamps are published to.
absl::btree_map<const Channel *, std::shared_ptr<RemoteMessageSender>>
timestamp_loggers_;
+
+ std::vector<std::function<void()>> on_starts_;
+ std::vector<std::function<void()>> on_ends_;
+
+ bool stopped_ = false;
+ bool started_ = false;
};
// Node index -> State.