Fix AOS support for realtime replay

This patch fixes the use-case where you provide a single event loop and
just want to replay logged events from the perspective of that event
loop's node.

Includes a test running a ShmEventLoop against a single-node logfile,
and running a multi-node replay into a single EventLoop in
simulation (the choice of single vs multi node here is arbitrary--it
should work with both single and multi node configs in both
simulation and shm).

This has a few caveats:
* Doesn't replay remote timestamps currently.
* Doesn't correct for implied changes in node<->node offsets due to
  changes in the clock.
* Had to add a flag to choose how to manage fetcher behavior for
  messages from before the start of the log.

Change-Id: I8f101e8774e0923bacc4f7e1bf58c9da02fd0d3f
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index c1333c6..2c31e0f 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -11,6 +11,7 @@
 #include "aos/events/logging/logfile_sorting.h"
 #include "aos/events/logging/logfile_utils.h"
 #include "aos/events/logging/logger_generated.h"
+#include "aos/events/shm_event_loop.h"
 #include "aos/events/simulated_event_loop.h"
 #include "aos/network/message_bridge_server_generated.h"
 #include "aos/network/multinode_timestamp_filter.h"
@@ -92,6 +93,7 @@
   // Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
   // and then calls Register.
   void Register();
+
   // Registers callbacks for all the events after the log file starts.  This is
   // only useful when replaying live.
   void Register(EventLoop *event_loop);
@@ -285,7 +287,9 @@
   // State per node.
   class State {
    public:
-    State(std::unique_ptr<TimestampMapper> timestamp_mapper, const Node *node);
+    State(std::unique_ptr<TimestampMapper> timestamp_mapper,
+          message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
+          const Node *node);
 
     // Connects up the timestamp mappers.
     void AddPeer(State *peer);
@@ -302,7 +306,17 @@
     size_t boot_count() const {
       // If we are replaying directly into an event loop, we can't reboot.  So
       // we will stay stuck on the 0th boot.
-      if (!node_event_loop_factory_) return 0u;
+      if (!node_event_loop_factory_) {
+        if (event_loop_ == nullptr) {
+          // If boot_count is being checked after startup for any of the
+          // non-primary nodes, then returning 0 may not be accurate (since
+          // remote nodes *can* reboot even if the EventLoop being played to
+          // can't).
+          CHECK(!started_);
+          CHECK(!stopped_);
+        }
+        return 0u;
+      }
       return node_event_loop_factory_->boot_count();
     }
 
@@ -319,8 +333,10 @@
         NotifyLogfileStart();
         return;
       }
-      CHECK_GE(start_time, event_loop_->monotonic_now());
-      startup_timer_->Setup(start_time);
+      if (node_event_loop_factory_) {
+        CHECK_GE(start_time + clock_offset(), event_loop_->monotonic_now());
+      }
+      startup_timer_->Setup(start_time + clock_offset());
     }
 
     void set_startup_timer(TimerHandler *timer_handler) {
@@ -382,6 +398,7 @@
     // distributed clock.
     distributed_clock::time_point ToDistributedClock(
         monotonic_clock::time_point time) {
+      CHECK(node_event_loop_factory_);
       return node_event_loop_factory_->ToDistributedClock(time);
     }
 
@@ -415,6 +432,7 @@
 
     distributed_clock::time_point RemoteToDistributedClock(
         size_t channel_index, monotonic_clock::time_point time) {
+      CHECK(node_event_loop_factory_);
       return channel_source_state_[channel_index]
           ->node_event_loop_factory_->ToDistributedClock(time);
     }
@@ -425,7 +443,7 @@
     }
 
     monotonic_clock::time_point monotonic_now() const {
-      return node_event_loop_factory_->monotonic_now();
+      return event_loop_->monotonic_now();
     }
 
     // Sets the number of channels.
@@ -487,12 +505,15 @@
 
     // Sets the next wakeup time on the replay callback.
     void Setup(monotonic_clock::time_point next_time) {
-      timer_handler_->Setup(next_time);
+      timer_handler_->Setup(next_time + clock_offset());
     }
 
     // Sends a buffer on the provided channel index.
     bool Send(const TimestampedMessage &timestamped_message);
 
+    void SetClockOffset();
+    std::chrono::nanoseconds clock_offset() const { return clock_offset_; }
+
     // Returns a debug string for the channel merger.
     std::string DebugString() const {
       if (!timestamp_mapper_) {
@@ -582,6 +603,7 @@
     // going between 2 nodes.  The second element in the tuple indicates if this
     // is the primary direction or not.
     std::vector<message_bridge::NoncausalOffsetEstimator *> filters_;
+    message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters_;
 
     // List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
     // channel) which correspond to the originating node.
@@ -598,6 +620,11 @@
     absl::btree_map<const Channel *, std::shared_ptr<RemoteMessageSender>>
         timestamp_loggers_;
 
+    // Time offset between the log's monotonic clock and the current event
+    // loop's monotonic clock.  Useful when replaying logs with non-simulated
+    // event loops.
+    std::chrono::nanoseconds clock_offset_{0};
+
     std::vector<std::function<void()>> on_starts_;
     std::vector<std::function<void()>> on_ends_;