Fix AOS support for realtime replay
This patch fixes the use-case where you provide a single event loop and
just want to replay logged events from the perspective of that event
loop's node.
Includes a test running a ShmEventLoop against a single-node logfile,
and running a multi-node replay into a single EventLoop in
simulation (the choice of single vs multi node here is arbitrary--it
should work with both single and multi node configs in both
simulation and shm).
This has a few caveats:
* Doesn't replay remote timestamps currently.
* Doesn't correct for implied changes in node<->node offsets due to
changes in the clock.
* Had to add a flag to choose how to manage fetcher behavior for
messages from before the start of the log.
Change-Id: I8f101e8774e0923bacc4f7e1bf58c9da02fd0d3f
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index c1333c6..2c31e0f 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -11,6 +11,7 @@
#include "aos/events/logging/logfile_sorting.h"
#include "aos/events/logging/logfile_utils.h"
#include "aos/events/logging/logger_generated.h"
+#include "aos/events/shm_event_loop.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/network/message_bridge_server_generated.h"
#include "aos/network/multinode_timestamp_filter.h"
@@ -92,6 +93,7 @@
// Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
// and then calls Register.
void Register();
+
// Registers callbacks for all the events after the log file starts. This is
// only useful when replaying live.
void Register(EventLoop *event_loop);
@@ -285,7 +287,9 @@
// State per node.
class State {
public:
- State(std::unique_ptr<TimestampMapper> timestamp_mapper, const Node *node);
+ State(std::unique_ptr<TimestampMapper> timestamp_mapper,
+ message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
+ const Node *node);
// Connects up the timestamp mappers.
void AddPeer(State *peer);
@@ -302,7 +306,17 @@
size_t boot_count() const {
// If we are replaying directly into an event loop, we can't reboot. So
// we will stay stuck on the 0th boot.
- if (!node_event_loop_factory_) return 0u;
+ if (!node_event_loop_factory_) {
+ if (event_loop_ == nullptr) {
+ // If boot_count is being checked after startup for any of the
+ // non-primary nodes, then returning 0 may not be accurate (since
+ // remote nodes *can* reboot even if the EventLoop being played to
+ // can't).
+ CHECK(!started_);
+ CHECK(!stopped_);
+ }
+ return 0u;
+ }
return node_event_loop_factory_->boot_count();
}
@@ -319,8 +333,10 @@
NotifyLogfileStart();
return;
}
- CHECK_GE(start_time, event_loop_->monotonic_now());
- startup_timer_->Setup(start_time);
+ if (node_event_loop_factory_) {
+ CHECK_GE(start_time + clock_offset(), event_loop_->monotonic_now());
+ }
+ startup_timer_->Setup(start_time + clock_offset());
}
void set_startup_timer(TimerHandler *timer_handler) {
@@ -382,6 +398,7 @@
// distributed clock.
distributed_clock::time_point ToDistributedClock(
monotonic_clock::time_point time) {
+ CHECK(node_event_loop_factory_);
return node_event_loop_factory_->ToDistributedClock(time);
}
@@ -415,6 +432,7 @@
distributed_clock::time_point RemoteToDistributedClock(
size_t channel_index, monotonic_clock::time_point time) {
+ CHECK(node_event_loop_factory_);
return channel_source_state_[channel_index]
->node_event_loop_factory_->ToDistributedClock(time);
}
@@ -425,7 +443,7 @@
}
monotonic_clock::time_point monotonic_now() const {
- return node_event_loop_factory_->monotonic_now();
+ return event_loop_->monotonic_now();
}
// Sets the number of channels.
@@ -487,12 +505,15 @@
// Sets the next wakeup time on the replay callback.
void Setup(monotonic_clock::time_point next_time) {
- timer_handler_->Setup(next_time);
+ timer_handler_->Setup(next_time + clock_offset());
}
// Sends a buffer on the provided channel index.
bool Send(const TimestampedMessage ×tamped_message);
+ void SetClockOffset();
+ std::chrono::nanoseconds clock_offset() const { return clock_offset_; }
+
// Returns a debug string for the channel merger.
std::string DebugString() const {
if (!timestamp_mapper_) {
@@ -582,6 +603,7 @@
// going between 2 nodes. The second element in the tuple indicates if this
// is the primary direction or not.
std::vector<message_bridge::NoncausalOffsetEstimator *> filters_;
+ message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters_;
// List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
// channel) which correspond to the originating node.
@@ -598,6 +620,11 @@
absl::btree_map<const Channel *, std::shared_ptr<RemoteMessageSender>>
timestamp_loggers_;
+ // Time offset between the log's monotonic clock and the current event
+ // loop's monotonic clock. Useful when replaying logs with non-simulated
+ // event loops.
+ std::chrono::nanoseconds clock_offset_{0};
+
std::vector<std::function<void()>> on_starts_;
std::vector<std::function<void()>> on_ends_;