Fix AOS support for realtime replay

This patch fixes the use-case where you provide a single event loop and
just want to replay logged events from the perspective of that event
loop's node.

Includes a test running a ShmEventLoop against a single-node logfile,
and running a multi-node replay into a single EventLoop in
simulation (the choice of single vs multi node here is arbitrary--it
should work with both single and multi node configs in both
simulation and shm).

This has a few caveats:
* Doesn't replay remote timestamps currently.
* Doesn't correct for implied changes in node<->node offsets due to
  changes in the clock.
* Had to add a flag to choose how to manage fetcher behavior for
  messages from before the start of the log.

Change-Id: I8f101e8774e0923bacc4f7e1bf58c9da02fd0d3f
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 9f6234a..f7a5551 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -267,6 +267,7 @@
         ":logger_fbs",
         "//aos:uuid",
         "//aos/events:event_loop",
+        "//aos/events:shm_event_loop",
         "//aos/events:simulated_event_loop",
         "//aos/network:message_bridge_server_fbs",
         "//aos/network:multinode_timestamp_filter",
@@ -447,6 +448,26 @@
 )
 
 cc_test(
+    name = "realtime_replay_test",
+    srcs = ["realtime_replay_test.cc"],
+    data = [
+        "//aos/events:pingpong_config",
+    ],
+    target_compatible_with = ["@platforms//os:linux"],
+    deps = [
+        ":log_reader",
+        ":log_writer",
+        "//aos/events:ping_lib",
+        "//aos/events:pong_lib",
+        "//aos/events:shm_event_loop",
+        "//aos/events:simulated_event_loop",
+        "//aos/testing:googletest",
+        "//aos/testing:path",
+        "//aos/testing:tmpdir",
+    ],
+)
+
+cc_test(
     name = "logger_test",
     srcs = ["logger_test.cc"],
     copts = select({
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 1c3a349..a4d6cd0 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -15,6 +15,7 @@
 #include "aos/events/logging/logfile_sorting.h"
 #include "aos/events/logging/logger_generated.h"
 #include "aos/flatbuffer_merge.h"
+#include "aos/json_to_flatbuffer.h"
 #include "aos/network/multinode_timestamp_filter.h"
 #include "aos/network/remote_message_generated.h"
 #include "aos/network/remote_message_schema.h"
@@ -46,6 +47,12 @@
     end_time, "",
     "If set, end at this point in time in the log on the realtime clock.");
 
+DEFINE_bool(drop_realtime_messages_before_start, false,
+            "If set, will drop any messages sent before the start of the "
+            "logfile in realtime replay. Setting this guarantees consistency "
+            "in timing with the original logfile, but means that you lose "
+            "access to fetched low-frequency messages.");
+
 namespace aos {
 namespace configuration {
 // We don't really want to expose this publicly, but log reader doesn't really
@@ -162,6 +169,11 @@
 
   ~EventNotifier() { event_timer_->Disable(); }
 
+  // Sets the clock offset for realtime playback.
+  void SetClockOffset(std::chrono::nanoseconds clock_offset) {
+    clock_offset_ = clock_offset;
+  }
+
   // Returns the event trigger time.
   realtime_clock::time_point realtime_event_time() const {
     return realtime_event_time_;
@@ -189,7 +201,7 @@
       // Whops, time went backwards.  Just do it now.
       HandleTime();
     } else {
-      event_timer_->Setup(candidate_monotonic);
+      event_timer_->Setup(candidate_monotonic + clock_offset_);
     }
   }
 
@@ -208,6 +220,8 @@
   const realtime_clock::time_point realtime_event_time_ =
       realtime_clock::min_time;
 
+  std::chrono::nanoseconds clock_offset_{0};
+
   bool called_ = false;
 };
 
@@ -325,9 +339,7 @@
   }
 
   if (!configuration::MultiNode(configuration())) {
-    states_.emplace_back(std::make_unique<State>(
-        std::make_unique<TimestampMapper>(FilterPartsForNode(log_files_, "")),
-        nullptr));
+    states_.resize(1);
   } else {
     if (replay_configuration) {
       CHECK_EQ(logged_configuration()->nodes()->size(),
@@ -494,7 +506,7 @@
         filtered_parts.size() == 0u
             ? nullptr
             : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
-        node);
+        filters_.get(), node);
     State *state = states_[node_index].get();
     state->SetNodeEventLoopFactory(
         event_loop_factory_->GetNodeEventLoopFactory(node),
@@ -661,8 +673,58 @@
   return nullptr;
 }
 
+// TODO(jkuszmaul): Make in-line modifications to
+// ServerStatistics/ClientStatistics messages for ShmEventLoop-based replay to
+// avoid messing up anything that depends on them having valid offsets.
 void LogReader::Register(EventLoop *event_loop) {
-  Register(event_loop, event_loop->node());
+  filters_ =
+      std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
+          event_loop->configuration(), logged_configuration(),
+          log_files_[0].boots, FLAGS_skip_order_validation,
+          chrono::duration_cast<chrono::nanoseconds>(
+              chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
+
+  std::vector<TimestampMapper *> timestamp_mappers;
+  for (const Node *node : configuration::GetNodes(configuration())) {
+    const size_t node_index =
+        configuration::GetNodeIndex(configuration(), node);
+    std::vector<LogParts> filtered_parts = FilterPartsForNode(
+        log_files_, node != nullptr ? node->name()->string_view() : "");
+
+    states_[node_index] = std::make_unique<State>(
+        filtered_parts.size() == 0u
+            ? nullptr
+            : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
+        filters_.get(), node);
+    State *state = states_[node_index].get();
+
+    state->SetChannelCount(logged_configuration()->channels()->size());
+    timestamp_mappers.emplace_back(state->timestamp_mapper());
+  }
+
+  filters_->SetTimestampMappers(std::move(timestamp_mappers));
+
+  for (const Node *node : configuration::GetNodes(configuration())) {
+    const size_t node_index =
+        configuration::GetNodeIndex(configuration(), node);
+    State *state = states_[node_index].get();
+    for (const Node *other_node : configuration::GetNodes(configuration())) {
+      const size_t other_node_index =
+          configuration::GetNodeIndex(configuration(), other_node);
+      State *other_state = states_[other_node_index].get();
+      if (other_state != state) {
+        state->AddPeer(other_state);
+      }
+    }
+  }
+  for (const Node *node : configuration::GetNodes(configuration())) {
+    if (node == nullptr || node->name()->string_view() ==
+                               event_loop->node()->name()->string_view()) {
+      Register(event_loop, event_loop->node());
+    } else {
+      Register(nullptr, node);
+    }
+  }
 }
 
 void LogReader::Register(EventLoop *event_loop, const Node *node) {
@@ -674,7 +736,10 @@
   if (state->OldestMessageTime() == BootTimestamp::max_time()) {
     return;
   }
-  ++live_nodes_;
+
+  if (event_loop != nullptr) {
+    ++live_nodes_;
+  }
 
   if (event_loop_factory_ != nullptr) {
     event_loop_factory_->GetNodeEventLoopFactory(node)->OnStartup(
@@ -687,14 +752,14 @@
 }
 
 void LogReader::RegisterDuringStartup(EventLoop *event_loop, const Node *node) {
-  if (event_loop) {
+  if (event_loop != nullptr) {
     CHECK(event_loop->configuration() == configuration());
   }
 
   State *state =
       states_[configuration::GetNodeIndex(configuration(), node)].get();
 
-  if (!event_loop) {
+  if (event_loop == nullptr) {
     state->ClearTimeFlags();
   }
 
@@ -703,7 +768,7 @@
   // We don't run timing reports when trying to print out logged data, because
   // otherwise we would end up printing out the timing reports themselves...
   // This is only really relevant when we are replaying into a simulation.
-  if (event_loop) {
+  if (event_loop != nullptr) {
     event_loop->SkipTimingReport();
     event_loop->SkipAosLog();
   }
@@ -716,10 +781,10 @@
         logged_configuration()->channels()->Get(logged_channel_index));
 
     const bool logged = channel->logger() != LoggerConfig::NOT_LOGGED;
-
     message_bridge::NoncausalOffsetEstimator *filter = nullptr;
 
     State *source_state = nullptr;
+
     if (!configuration::ChannelIsSendableOnNode(channel, node) &&
         configuration::ChannelIsReadableOnNode(channel, node)) {
       const Node *source_node = configuration::GetNode(
@@ -741,7 +806,10 @@
     state->SetChannel(
         logged_channel_index,
         configuration::ChannelIndex(configuration(), channel),
-        event_loop && logged ? event_loop->MakeRawSender(channel) : nullptr,
+        event_loop && logged &&
+                configuration::ChannelIsReadableOnNode(channel, node)
+            ? event_loop->MakeRawSender(channel)
+            : nullptr,
         filter, is_forwarded, source_state);
 
     if (is_forwarded && logged) {
@@ -758,10 +826,12 @@
               states_[configuration::GetNodeIndex(
                           configuration(), connection->name()->string_view())]
                   .get();
-          destination_state->SetRemoteTimestampSender(
-              logged_channel_index,
-              event_loop ? state->RemoteTimestampSender(channel, connection)
-                         : nullptr);
+          if (destination_state) {
+            destination_state->SetRemoteTimestampSender(
+                logged_channel_index,
+                event_loop ? state->RemoteTimestampSender(channel, connection)
+                           : nullptr);
+          }
         }
       }
     }
@@ -775,14 +845,11 @@
   }
 
   state->set_timer_handler(event_loop->AddTimer([this, state]() {
-    VLOG(1) << "Starting sending " << MaybeNodeName(state->event_loop()->node())
-            << "at " << state->event_loop()->context().monotonic_event_time
-            << " now " << state->monotonic_now();
     if (state->OldestMessageTime() == BootTimestamp::max_time()) {
       --live_nodes_;
       VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
       if (exit_on_finish_ && live_nodes_ == 0) {
-        event_loop_factory_->Exit();
+        CHECK_NOTNULL(event_loop_factory_)->Exit();
       }
       return;
     }
@@ -794,32 +861,37 @@
 
     const monotonic_clock::time_point monotonic_now =
         state->event_loop()->context().monotonic_event_time;
-    if (!FLAGS_skip_order_validation) {
-      CHECK(monotonic_now == timestamped_message.monotonic_event_time.time)
-          << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
-          << monotonic_now << " trying to send "
-          << timestamped_message.monotonic_event_time << " failure "
-          << state->DebugString();
-    } else if (BootTimestamp{.boot = state->boot_count(),
-                             .time = monotonic_now} !=
-               timestamped_message.monotonic_event_time) {
-      LOG(WARNING) << "Check failed: monotonic_now == "
-                      "timestamped_message.monotonic_event_time) ("
-                   << monotonic_now << " vs. "
-                   << timestamped_message.monotonic_event_time
-                   << "): " << FlatbufferToJson(state->event_loop()->node())
-                   << " Now " << monotonic_now << " trying to send "
-                   << timestamped_message.monotonic_event_time << " failure "
-                   << state->DebugString();
+    if (event_loop_factory_ != nullptr) {
+      // Only enforce exact timing in simulation.
+      if (!FLAGS_skip_order_validation) {
+        CHECK(monotonic_now == timestamped_message.monotonic_event_time.time)
+            << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
+            << monotonic_now << " trying to send "
+            << timestamped_message.monotonic_event_time << " failure "
+            << state->DebugString();
+      } else if (BootTimestamp{.boot = state->boot_count(),
+                               .time = monotonic_now} !=
+                 timestamped_message.monotonic_event_time) {
+        LOG(WARNING) << "Check failed: monotonic_now == "
+                        "timestamped_message.monotonic_event_time) ("
+                     << monotonic_now << " vs. "
+                     << timestamped_message.monotonic_event_time
+                     << "): " << FlatbufferToJson(state->event_loop()->node())
+                     << " Now " << monotonic_now << " trying to send "
+                     << timestamped_message.monotonic_event_time << " failure "
+                     << state->DebugString();
+      }
     }
 
     if (timestamped_message.monotonic_event_time.time >
             state->monotonic_start_time(
                 timestamped_message.monotonic_event_time.boot) ||
-        event_loop_factory_ != nullptr) {
+        event_loop_factory_ != nullptr ||
+        !FLAGS_drop_realtime_messages_before_start) {
       if (timestamped_message.data != nullptr && !state->found_last_message()) {
         if (timestamped_message.monotonic_remote_time !=
-            BootTimestamp::min_time()) {
+                BootTimestamp::min_time() &&
+            !FLAGS_skip_order_validation && event_loop_factory_ != nullptr) {
           // Confirm that the message was sent on the sending node before the
           // destination node (this node).  As a proxy, do this by making sure
           // that time on the source node is past when the message was sent.
@@ -890,7 +962,8 @@
                                  timestamped_message.realtime_event_time);
 
         VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
-                << timestamped_message.monotonic_event_time;
+                << timestamped_message.monotonic_event_time << " "
+                << state->DebugString();
         // TODO(austin): std::move channel_data in and make that efficient in
         // simulation.
         state->Send(std::move(timestamped_message));
@@ -982,13 +1055,13 @@
         }
       }
     } else {
-      LOG(WARNING) << "Not sending data from before the start of the log file. "
-                   << timestamped_message.monotonic_event_time.time
-                          .time_since_epoch()
-                          .count()
-                   << " start "
-                   << monotonic_start_time().time_since_epoch().count() << " "
-                   << *timestamped_message.data;
+      LOG(WARNING)
+          << "Not sending data from before the start of the log file. "
+          << timestamped_message.monotonic_event_time.time.time_since_epoch()
+                 .count()
+          << " start "
+          << monotonic_start_time(state->node()).time_since_epoch().count()
+          << " timestamped_message.data is null";
     }
 
     const BootTimestamp next_time = state->OldestMessageTime();
@@ -1002,18 +1075,26 @@
         state->NotifyLogfileEnd();
         return;
       }
-      VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
-              << "wakeup for " << next_time.time << "("
-              << state->ToDistributedClock(next_time.time)
-              << " distributed), now is " << state->monotonic_now();
+      if (event_loop_factory_ != nullptr) {
+        VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
+                << "wakeup for " << next_time.time << "("
+                << state->ToDistributedClock(next_time.time)
+                << " distributed), now is " << state->monotonic_now();
+      } else {
+        VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
+                << "wakeup for " << next_time.time << ", now is "
+                << state->monotonic_now();
+      }
       state->Setup(next_time.time);
     } else {
       VLOG(1) << MaybeNodeName(state->event_loop()->node())
               << "No next message, scheduling shutdown";
       state->NotifyLogfileEnd();
       // Set a timer up immediately after now to die. If we don't do this,
-      // then the senders waiting on the message we just read will never get
+      // then the watchers waiting on the message we just read will never get
       // called.
+      // Doesn't apply to single-EventLoop replay since the watchers in question
+      // are not under our control.
       if (event_loop_factory_ != nullptr) {
         state->Setup(monotonic_now + event_loop_factory_->send_delay() +
                      std::chrono::nanoseconds(1));
@@ -1037,6 +1118,7 @@
     event_loop->OnRun([state]() {
       BootTimestamp next_time = state->OldestMessageTime();
       CHECK_EQ(next_time.boot, state->boot_count());
+      state->SetClockOffset();
       state->Setup(next_time.time);
       state->SetupStartupTimer();
     });
@@ -1451,9 +1533,13 @@
   return remapped_channel;
 }
 
-LogReader::State::State(std::unique_ptr<TimestampMapper> timestamp_mapper,
-                        const Node *node)
-    : timestamp_mapper_(std::move(timestamp_mapper)), node_(node) {}
+LogReader::State::State(
+    std::unique_ptr<TimestampMapper> timestamp_mapper,
+    message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
+    const Node *node)
+    : timestamp_mapper_(std::move(timestamp_mapper)),
+      node_(node),
+      multinode_filters_(multinode_filters) {}
 
 void LogReader::State::AddPeer(State *peer) {
   if (timestamp_mapper_ && peer->timestamp_mapper_) {
@@ -1572,6 +1658,23 @@
              source_state->boot_count());
   }
 
+  if (event_loop_factory_ != nullptr &&
+      channel_source_state_[timestamped_message.channel_index] != nullptr &&
+      multinode_filters_ != nullptr) {
+    // Sanity check that we are using consistent boot uuids.
+    State *source_state =
+        channel_source_state_[timestamped_message.channel_index];
+    CHECK_EQ(multinode_filters_->boot_uuid(
+                 configuration::GetNodeIndex(event_loop_->configuration(),
+                                             source_state->node()),
+                 timestamped_message.monotonic_remote_time.boot),
+             CHECK_NOTNULL(
+                 CHECK_NOTNULL(
+                     channel_source_state_[timestamped_message.channel_index])
+                     ->event_loop_)
+                 ->boot_uuid());
+  }
+
   // Send!  Use the replayed queue index here instead of the logged queue index
   // for the remote queue index.  This makes re-logging work.
   const auto err = sender->Send(
@@ -1580,9 +1683,13 @@
       timestamped_message.monotonic_remote_time.time,
       timestamped_message.realtime_remote_time, remote_queue_index,
       (channel_source_state_[timestamped_message.channel_index] != nullptr
-           ? CHECK_NOTNULL(
-                 channel_source_state_[timestamped_message.channel_index])
-                 ->event_loop_->boot_uuid()
+           ? CHECK_NOTNULL(multinode_filters_)
+                 ->boot_uuid(configuration::GetNodeIndex(
+                                 event_loop_->configuration(),
+                                 channel_source_state_[timestamped_message
+                                                           .channel_index]
+                                     ->node()),
+                             timestamped_message.monotonic_remote_time.boot)
            : event_loop_->boot_uuid()));
   if (err != RawSender::Error::kOk) return false;
 
@@ -1629,6 +1736,11 @@
     // map.
   } else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
              nullptr) {
+    // TODO(james): Currently, If running replay against a single event loop,
+    // remote timestamps will not get replayed because this code-path only
+    // gets triggered on the event loop that receives the forwarded message
+    // that the timestamps correspond to. This code, as written, also doesn't
+    // correctly handle a non-zero clock_offset for the *_remote_time fields.
     State *source_state =
         CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index]);
 
@@ -1934,5 +2046,20 @@
   }
 }
 
+void LogReader::State::SetClockOffset() {
+  if (node_event_loop_factory_ == nullptr) {
+    // If not running with simulated event loop, set the monotonic clock
+    // offset.
+    clock_offset_ = event_loop()->monotonic_now() - monotonic_start_time(0);
+
+    if (start_event_notifier_) {
+      start_event_notifier_->SetClockOffset(clock_offset_);
+    }
+    if (end_event_notifier_) {
+      end_event_notifier_->SetClockOffset(clock_offset_);
+    }
+  }
+}
+
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index c1333c6..2c31e0f 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -11,6 +11,7 @@
 #include "aos/events/logging/logfile_sorting.h"
 #include "aos/events/logging/logfile_utils.h"
 #include "aos/events/logging/logger_generated.h"
+#include "aos/events/shm_event_loop.h"
 #include "aos/events/simulated_event_loop.h"
 #include "aos/network/message_bridge_server_generated.h"
 #include "aos/network/multinode_timestamp_filter.h"
@@ -92,6 +93,7 @@
   // Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
   // and then calls Register.
   void Register();
+
   // Registers callbacks for all the events after the log file starts.  This is
   // only useful when replaying live.
   void Register(EventLoop *event_loop);
@@ -285,7 +287,9 @@
   // State per node.
   class State {
    public:
-    State(std::unique_ptr<TimestampMapper> timestamp_mapper, const Node *node);
+    State(std::unique_ptr<TimestampMapper> timestamp_mapper,
+          message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
+          const Node *node);
 
     // Connects up the timestamp mappers.
     void AddPeer(State *peer);
@@ -302,7 +306,17 @@
     size_t boot_count() const {
       // If we are replaying directly into an event loop, we can't reboot.  So
       // we will stay stuck on the 0th boot.
-      if (!node_event_loop_factory_) return 0u;
+      if (!node_event_loop_factory_) {
+        if (event_loop_ == nullptr) {
+          // If boot_count is being checked after startup for any of the
+          // non-primary nodes, then returning 0 may not be accurate (since
+          // remote nodes *can* reboot even if the EventLoop being played to
+          // can't).
+          CHECK(!started_);
+          CHECK(!stopped_);
+        }
+        return 0u;
+      }
       return node_event_loop_factory_->boot_count();
     }
 
@@ -319,8 +333,10 @@
         NotifyLogfileStart();
         return;
       }
-      CHECK_GE(start_time, event_loop_->monotonic_now());
-      startup_timer_->Setup(start_time);
+      if (node_event_loop_factory_) {
+        CHECK_GE(start_time + clock_offset(), event_loop_->monotonic_now());
+      }
+      startup_timer_->Setup(start_time + clock_offset());
     }
 
     void set_startup_timer(TimerHandler *timer_handler) {
@@ -382,6 +398,7 @@
     // distributed clock.
     distributed_clock::time_point ToDistributedClock(
         monotonic_clock::time_point time) {
+      CHECK(node_event_loop_factory_);
       return node_event_loop_factory_->ToDistributedClock(time);
     }
 
@@ -415,6 +432,7 @@
 
     distributed_clock::time_point RemoteToDistributedClock(
         size_t channel_index, monotonic_clock::time_point time) {
+      CHECK(node_event_loop_factory_);
       return channel_source_state_[channel_index]
           ->node_event_loop_factory_->ToDistributedClock(time);
     }
@@ -425,7 +443,7 @@
     }
 
     monotonic_clock::time_point monotonic_now() const {
-      return node_event_loop_factory_->monotonic_now();
+      return event_loop_->monotonic_now();
     }
 
     // Sets the number of channels.
@@ -487,12 +505,15 @@
 
     // Sets the next wakeup time on the replay callback.
     void Setup(monotonic_clock::time_point next_time) {
-      timer_handler_->Setup(next_time);
+      timer_handler_->Setup(next_time + clock_offset());
     }
 
     // Sends a buffer on the provided channel index.
     bool Send(const TimestampedMessage &timestamped_message);
 
+    void SetClockOffset();
+    std::chrono::nanoseconds clock_offset() const { return clock_offset_; }
+
     // Returns a debug string for the channel merger.
     std::string DebugString() const {
       if (!timestamp_mapper_) {
@@ -582,6 +603,7 @@
     // going between 2 nodes.  The second element in the tuple indicates if this
     // is the primary direction or not.
     std::vector<message_bridge::NoncausalOffsetEstimator *> filters_;
+    message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters_;
 
     // List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
     // channel) which correspond to the originating node.
@@ -598,6 +620,11 @@
     absl::btree_map<const Channel *, std::shared_ptr<RemoteMessageSender>>
         timestamp_loggers_;
 
+    // Time offset between the log's monotonic clock and the current event
+    // loop's monotonic clock.  Useful when replaying logs with non-simulated
+    // event loops.
+    std::chrono::nanoseconds clock_offset_{0};
+
     std::vector<std::function<void()>> on_starts_;
     std::vector<std::function<void()>> on_ends_;
 
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 5f265e2..02081fd 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -2192,6 +2192,149 @@
   reader.Deregister();
 }
 
+// Tests that we observe all the same events in log replay (for a given node)
+// whether we just register an event loop for that node or if we register a full
+// event loop factory.
+TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
+  time_converter_.StartEqual();
+  constexpr chrono::milliseconds kStartupDelay(95);
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(kStartupDelay);
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+  }
+
+  LogReader full_reader(SortParts(logfiles_));
+  LogReader single_node_reader(SortParts(logfiles_));
+
+  SimulatedEventLoopFactory full_factory(full_reader.configuration());
+  SimulatedEventLoopFactory single_node_factory(
+      single_node_reader.configuration());
+  std::unique_ptr<EventLoop> replay_event_loop =
+      single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
+          "log_reader");
+
+  full_reader.Register(&full_factory);
+  single_node_reader.Register(replay_event_loop.get());
+  single_node_factory.SkipTimingReport();
+  single_node_factory.DisableStatistics();
+
+  const Node *full_pi1 =
+      configuration::GetNode(full_factory.configuration(), "pi1");
+
+  // Confirm we can read the data on the remapped channel, just for pi1. Nothing
+  // else should have moved.
+  std::unique_ptr<EventLoop> full_event_loop =
+      full_factory.MakeEventLoop("test", full_pi1);
+  full_event_loop->SkipTimingReport();
+  full_event_loop->SkipAosLog();
+  // maps are indexed on channel index.
+  // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
+  std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
+      observed_messages;
+  std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
+  for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
+       ++ii) {
+    const Channel *channel =
+        full_event_loop->configuration()->channels()->Get(ii);
+    // We currently don't support replaying remote timestamp channels in
+    // realtime replay.
+    if (channel->name()->string_view().find("remote_timestamp") !=
+        std::string_view::npos) {
+      continue;
+    }
+    if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
+      observed_messages[ii] = {};
+      fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
+      full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
+        if (fetchers[ii]->Fetch()) {
+          observed_messages[ii].push_back(std::make_pair(
+              fetchers[ii]->context().monotonic_event_time, true));
+        }
+      });
+      full_event_loop->MakeRawNoArgWatcher(
+          channel, [ii, &observed_messages](const Context &context) {
+            observed_messages[ii].push_back(
+                std::make_pair(context.monotonic_event_time, false));
+          });
+    }
+  }
+
+  full_factory.Run();
+  fetchers.clear();
+  full_reader.Deregister();
+
+  const Node *single_node_pi1 =
+      configuration::GetNode(single_node_factory.configuration(), "pi1");
+  std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
+
+  std::unique_ptr<EventLoop> single_node_event_loop =
+      single_node_factory.MakeEventLoop("test", single_node_pi1);
+  single_node_event_loop->SkipTimingReport();
+  single_node_event_loop->SkipAosLog();
+  for (size_t ii = 0;
+       ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
+    const Channel *channel =
+        single_node_event_loop->configuration()->channels()->Get(ii);
+    single_node_factory.DisableForwarding(channel);
+    if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
+      single_node_fetchers[ii] =
+          single_node_event_loop->MakeRawFetcher(channel);
+      single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
+        EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
+            << "Single EventLoop replay doesn't support pre-loading fetchers. "
+            << configuration::StrippedChannelToString(channel);
+      });
+      single_node_event_loop->MakeRawNoArgWatcher(
+          channel, [ii, &observed_messages, channel,
+                    kStartupDelay](const Context &context) {
+            if (observed_messages[ii].empty()) {
+              FAIL() << "Observed extra message at "
+                     << context.monotonic_event_time << " on "
+                     << configuration::StrippedChannelToString(channel);
+              return;
+            }
+            const std::pair<monotonic_clock::time_point, bool> &message =
+                observed_messages[ii].front();
+            if (message.second) {
+              EXPECT_LE(message.first,
+                        context.monotonic_event_time + kStartupDelay)
+                  << "Mismatched message times " << context.monotonic_event_time
+                  << " and " << message.first << " on "
+                  << configuration::StrippedChannelToString(channel);
+            } else {
+              EXPECT_EQ(message.first,
+                        context.monotonic_event_time + kStartupDelay)
+                  << "Mismatched message times " << context.monotonic_event_time
+                  << " and " << message.first << " on "
+                  << configuration::StrippedChannelToString(channel);
+            }
+            observed_messages[ii].erase(observed_messages[ii].begin());
+          });
+    }
+  }
+
+  single_node_factory.Run();
+
+  single_node_fetchers.clear();
+
+  single_node_reader.Deregister();
+
+  for (const auto &pair : observed_messages) {
+    EXPECT_TRUE(pair.second.empty())
+        << "Missed " << pair.second.size() << " messages on "
+        << configuration::StrippedChannelToString(
+               single_node_event_loop->configuration()->channels()->Get(
+                   pair.first));
+  }
+}
+
 // Tests that we properly recreate forwarded timestamps when replaying a log.
 // This should be enough that we can then re-run the logger and get a valid log
 // back.
diff --git a/aos/events/logging/realtime_replay_test.cc b/aos/events/logging/realtime_replay_test.cc
new file mode 100644
index 0000000..c7744d4
--- /dev/null
+++ b/aos/events/logging/realtime_replay_test.cc
@@ -0,0 +1,76 @@
+#include "aos/events/logging/log_reader.h"
+#include "aos/events/logging/log_writer.h"
+#include "aos/events/ping_lib.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/json_to_flatbuffer.h"
+#include "aos/testing/path.h"
+#include "aos/testing/tmpdir.h"
+#include "gtest/gtest.h"
+
+namespace aos::logger::testing {
+
+class RealtimeLoggerTest : public ::testing::Test {
+ protected:
+  RealtimeLoggerTest()
+      : shm_dir_(aos::testing::TestTmpDir() + "/aos"),
+        config_file_(
+            aos::testing::ArtifactPath("aos/events/pingpong_config.json")),
+        config_(aos::configuration::ReadConfig(config_file_)),
+        event_loop_factory_(&config_.message()),
+        ping_event_loop_(event_loop_factory_.MakeEventLoop("ping")),
+        ping_(ping_event_loop_.get()) {
+    FLAGS_shm_base = shm_dir_;
+
+    // Nuke the shm dir, to ensure we aren't being affected by any preexisting
+    // tests.
+    aos::util::UnlinkRecursive(shm_dir_);
+  }
+
+  gflags::FlagSaver flag_saver_;
+  std::string shm_dir_;
+
+  const std::string config_file_;
+  const aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
+
+  // Factory and Ping class to generate a test logfile.
+  SimulatedEventLoopFactory event_loop_factory_;
+  std::unique_ptr<EventLoop> ping_event_loop_;
+  Ping ping_;
+};
+
+TEST_F(RealtimeLoggerTest, RealtimeReplay) {
+  const std::string tmpdir = aos::testing::TestTmpDir();
+  const std::string base_name = tmpdir + "/logfile/";
+  aos::util::UnlinkRecursive(base_name);
+  {
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory_.MakeEventLoop("logger");
+
+    event_loop_factory_.RunFor(std::chrono::milliseconds(95));
+
+    Logger logger(logger_event_loop.get());
+    logger.set_separate_config(false);
+    logger.set_polling_period(std::chrono::milliseconds(100));
+    logger.StartLoggingLocalNamerOnRun(base_name);
+    event_loop_factory_.RunFor(std::chrono::milliseconds(2000));
+  }
+
+  LogReader reader(logger::SortParts(logger::FindLogs(base_name)));
+  ShmEventLoop shm_event_loop(reader.configuration());
+  reader.Register(&shm_event_loop);
+  reader.OnEnd(shm_event_loop.node(),
+               [&shm_event_loop]() { shm_event_loop.Exit(); });
+
+  Fetcher<examples::Ping> ping_fetcher =
+      shm_event_loop.MakeFetcher<examples::Ping>("/test");
+
+  shm_event_loop.AddTimer([]() { LOG(INFO) << "Hello, World!"; })
+      ->Setup(shm_event_loop.monotonic_now(), std::chrono::seconds(1));
+
+  shm_event_loop.Run();
+  reader.Deregister();
+
+  ASSERT_TRUE(ping_fetcher.Fetch());
+  ASSERT_EQ(ping_fetcher->value(), 210);
+}
+}  // namespace aos::logger::testing