Merge changes Id0f3fe5a,I1657b15c,I80e69a93

* changes:
  Create an ExitHandle interface
  autocxx: Support multiple AUTOCXX_RS_JSON_ARCHIVE entries
  autocxx: Generate impls for abstract types too
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 278934b..deca15b 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -373,6 +373,7 @@
     visibility = ["//visibility:public"],
     deps = [
         ":aos_logging",
+        ":epoll",
         ":event_loop",
         ":simple_channel",
         "//aos:init",
@@ -395,6 +396,7 @@
     target_compatible_with = ["@platforms//os:linux"],
     deps = [
         ":simulated_event_loop",
+        "//aos/network:testing_time_converter",
         "//aos/testing:googletest",
         "@com_github_google_glog//:glog",
     ],
diff --git a/aos/events/epoll.cc b/aos/events/epoll.cc
index 8cb553b..71e3c9b 100644
--- a/aos/events/epoll.cc
+++ b/aos/events/epoll.cc
@@ -24,6 +24,7 @@
 
 void TimerFd::SetTime(monotonic_clock::time_point start,
                       monotonic_clock::duration interval) {
+  CHECK_GE(start, monotonic_clock::epoch());
   struct itimerspec new_value;
   new_value.it_interval = ::aos::time::to_timespec(interval);
   new_value.it_value = ::aos::time::to_timespec(start);
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 3ceb240..2bcde2f 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -462,6 +462,7 @@
 
   // Timer should sleep until base, base + offset, base + offset * 2, ...
   // If repeat_offset isn't set, the timer only expires once.
+  // base must be greater than or equal to zero.
   virtual void Setup(monotonic_clock::time_point base,
                      monotonic_clock::duration repeat_offset =
                          ::aos::monotonic_clock::zero()) = 0;
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 2d7decc..0eeecf8 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -752,6 +752,14 @@
       "/test/invalid");
 }
 
+// Verify that setting up a timer before monotonic_clock::epoch() fails.
+TEST_P(AbstractEventLoopDeathTest, NegativeTimeTimer) {
+  auto loop = Make();
+  TimerHandler *time = loop->AddTimer([]() {});
+  EXPECT_DEATH(time->Setup(monotonic_clock::epoch() - std::chrono::seconds(1)),
+               "-1.000");
+}
+
 // Verify that registering a watcher twice for "/test" fails.
 TEST_P(AbstractEventLoopDeathTest, TwoWatcher) {
   auto loop = Make();
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index cb0c629..97c1e83 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -10,6 +10,7 @@
 
 EventScheduler::Token EventScheduler::Schedule(monotonic_clock::time_point time,
                                                Event *callback) {
+  CHECK_LE(monotonic_clock::epoch(), time);
   return events_list_.emplace(time, callback);
 }
 
@@ -35,6 +36,12 @@
 }
 
 aos::monotonic_clock::time_point EventScheduler::OldestEvent() {
+  // If we haven't started yet, schedule a special event for the epoch to allow
+  // ourselves to boot.
+  if (!called_started_) {
+    return aos::monotonic_clock::epoch();
+  }
+
   if (events_list_.empty()) {
     return monotonic_clock::max_time;
   }
@@ -42,14 +49,27 @@
   return events_list_.begin()->first;
 }
 
-void EventScheduler::Shutdown() { on_shutdown_(); }
+void EventScheduler::Shutdown() {
+  CHECK(!is_running_);
+  on_shutdown_();
+}
 
 void EventScheduler::Startup() {
   ++boot_count_;
-  RunOnStartup();
+  CHECK(!is_running_);
+  MaybeRunOnStartup();
+  CHECK(called_started_);
 }
 
 void EventScheduler::CallOldestEvent() {
+  if (!called_started_) {
+    // If we haven't started, start.
+    MaybeRunOnStartup();
+    MaybeRunOnRun();
+    CHECK(called_started_);
+    return;
+  }
+  CHECK(is_running_);
   CHECK_GT(events_list_.size(), 0u);
   auto iter = events_list_.begin();
   const logger::BootTimestamp t =
@@ -66,6 +86,7 @@
 }
 
 void EventScheduler::RunOnRun() {
+  CHECK(is_running_);
   while (!on_run_.empty()) {
     std::function<void()> fn = std::move(*on_run_.begin());
     on_run_.erase(on_run_.begin());
@@ -75,6 +96,7 @@
 
 void EventScheduler::RunOnStartup() noexcept {
   while (!on_startup_.empty()) {
+    CHECK(!is_running_);
     std::function<void()> fn = std::move(*on_startup_.begin());
     on_startup_.erase(on_startup_.begin());
     fn();
@@ -82,14 +104,39 @@
 }
 
 void EventScheduler::RunStarted() {
+  CHECK(!is_running_);
   if (started_) {
     started_();
   }
+  is_running_ = true;
 }
 
-void EventScheduler::RunStopped() {
-  if (stopped_) {
-    stopped_();
+void EventScheduler::MaybeRunStopped() {
+  CHECK(is_running_);
+  is_running_ = false;
+  if (called_started_) {
+    called_started_ = false;
+    if (stopped_) {
+      stopped_();
+    }
+  }
+}
+
+void EventScheduler::MaybeRunOnStartup() {
+  CHECK(!called_started_);
+  CHECK(!is_running_);
+  const logger::BootTimestamp t =
+      FromDistributedClock(scheduler_scheduler_->distributed_now());
+  if (t.boot == boot_count_ && t.time >= monotonic_clock::epoch()) {
+    called_started_ = true;
+    RunOnStartup();
+  }
+}
+
+void EventScheduler::MaybeRunOnRun() {
+  if (called_started_) {
+    RunStarted();
+    RunOnRun();
   }
 }
 
@@ -110,6 +157,88 @@
   scheduler->scheduler_scheduler_ = this;
 }
 
+void EventSchedulerScheduler::MaybeRunStopped() {
+  CHECK(!is_running_);
+  for (EventScheduler *scheduler : schedulers_) {
+    if (scheduler->is_running()) {
+      scheduler->MaybeRunStopped();
+    }
+  }
+}
+
+bool EventSchedulerScheduler::RunUntil(
+    realtime_clock::time_point end_time, EventScheduler *scheduler,
+    std::function<std::chrono::nanoseconds()> fn_realtime_offset) {
+  logging::ScopedLogRestorer prev_logger;
+  MaybeRunOnStartup();
+
+  bool reached_end_time = false;
+
+  RunMaybeRealtimeLoop([this, scheduler, end_time, fn_realtime_offset,
+                        &reached_end_time]() {
+    std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
+        OldestEvent();
+    aos::distributed_clock::time_point oldest_event_time_distributed =
+        std::get<0>(oldest_event);
+    logger::BootTimestamp test_time_monotonic =
+        scheduler->FromDistributedClock(oldest_event_time_distributed);
+    realtime_clock::time_point oldest_event_realtime(
+        test_time_monotonic.time_since_epoch() + fn_realtime_offset());
+
+    if ((std::get<0>(oldest_event) == distributed_clock::max_time) ||
+        (oldest_event_realtime > end_time &&
+         (reboots_.empty() ||
+          std::get<0>(reboots_.front()) > oldest_event_time_distributed))) {
+      is_running_ = false;
+      reached_end_time = true;
+
+      // We have to nudge our time back to the distributed time
+      // corresponding to our desired realtime time.
+      const aos::monotonic_clock::time_point end_monotonic =
+          aos::monotonic_clock::epoch() + end_time.time_since_epoch() -
+          fn_realtime_offset();
+      const aos::distributed_clock::time_point end_time_distributed =
+          scheduler->ToDistributedClock(end_monotonic);
+
+      now_ = end_time_distributed;
+
+      return;
+    }
+
+    if (!reboots_.empty() &&
+        std::get<0>(reboots_.front()) <= std::get<0>(oldest_event)) {
+      // Reboot is next.
+      CHECK_LE(now_,
+               std::get<0>(reboots_.front()) + std::chrono::nanoseconds(1))
+          << ": Simulated time went backwards by too much.  Please "
+             "investigate.";
+      now_ = std::get<0>(reboots_.front());
+      Reboot();
+      reboots_.erase(reboots_.begin());
+      return;
+    }
+
+    // We get to pick our tradeoffs here.  Either we assume that there are
+    // no backward step changes in our time function for each node, or we
+    // have to let time go backwards.  We currently only really see this
+    // happen when 2 events are scheduled for "now", time changes, and
+    // there is a nanosecond or two of rounding due to integer math.
+    //
+    // //aos/events/logging:logger_test triggers this.
+    CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
+        << ": Simulated time went backwards by too much.  Please "
+           "investigate.";
+
+    now_ = std::get<0>(oldest_event);
+
+    std::get<1>(oldest_event)->CallOldestEvent();
+  });
+
+  MaybeRunStopped();
+
+  return reached_end_time;
+}
+
 void EventSchedulerScheduler::Reboot() {
   const std::vector<logger::BootTimestamp> &times =
       std::get<1>(reboots_.front());
@@ -131,7 +260,7 @@
       rebooted.emplace_back(node_index);
       CHECK_EQ(schedulers_[node_index]->boot_count() + 1,
                times[node_index].boot);
-      schedulers_[node_index]->RunStopped();
+      schedulers_[node_index]->MaybeRunStopped();
       schedulers_[node_index]->Shutdown();
     }
   }
@@ -140,16 +269,10 @@
   // (especially message_bridge), it could try to send stuff out.  We want
   // to move everything over to the new boot before doing that.
   for (const size_t node_index : rebooted) {
-    CHECK_EQ(schedulers_[node_index]->boot_count() + 1, times[node_index].boot);
     schedulers_[node_index]->Startup();
   }
-
   for (const size_t node_index : rebooted) {
-    schedulers_[node_index]->RunStarted();
-  }
-
-  for (const size_t node_index : rebooted) {
-    schedulers_[node_index]->RunOnRun();
+    schedulers_[node_index]->MaybeRunOnRun();
   }
   is_running_ = true;
 }
@@ -157,11 +280,10 @@
 void EventSchedulerScheduler::RunFor(distributed_clock::duration duration) {
   distributed_clock::time_point end_time = now_ + duration;
   logging::ScopedLogRestorer prev_logger;
-  RunOnStartup();
-  RunOnRun();
+  MaybeRunOnStartup();
 
   // Run all the sub-event-schedulers.
-  while (is_running_) {
+  RunMaybeRealtimeLoop([this, end_time]() {
     std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
         OldestEvent();
     if (!reboots_.empty() &&
@@ -170,7 +292,7 @@
       if (std::get<0>(reboots_.front()) > end_time) {
         // Reboot is after our end time, give up.
         is_running_ = false;
-        break;
+        return;
       }
 
       CHECK_LE(now_,
@@ -180,14 +302,14 @@
       now_ = std::get<0>(reboots_.front());
       Reboot();
       reboots_.erase(reboots_.begin());
-      continue;
+      return;
     }
 
     // No events left, bail.
     if (std::get<0>(oldest_event) == distributed_clock::max_time ||
         std::get<0>(oldest_event) > end_time) {
       is_running_ = false;
-      break;
+      return;
     }
 
     // We get to pick our tradeoffs here.  Either we assume that there are no
@@ -199,22 +321,23 @@
     // //aos/events/logging:logger_test triggers this.
     CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
         << ": Simulated time went backwards by too much.  Please investigate.";
+    // push time forwards
     now_ = std::get<0>(oldest_event);
 
     std::get<1>(oldest_event)->CallOldestEvent();
-  }
+  });
 
   now_ = end_time;
 
-  RunStopped();
+  MaybeRunStopped();
 }
 
 void EventSchedulerScheduler::Run() {
   logging::ScopedLogRestorer prev_logger;
-  RunOnStartup();
-  RunOnRun();
+  MaybeRunOnStartup();
+
   // Run all the sub-event-schedulers.
-  while (is_running_) {
+  RunMaybeRealtimeLoop([this]() {
     std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
         OldestEvent();
     if (!reboots_.empty() &&
@@ -227,11 +350,12 @@
       now_ = std::get<0>(reboots_.front());
       Reboot();
       reboots_.erase(reboots_.begin());
-      continue;
+      return;
     }
     // No events left, bail.
     if (std::get<0>(oldest_event) == distributed_clock::max_time) {
-      break;
+      is_running_ = false;
+      return;
     }
 
     // We get to pick our tradeoffs here.  Either we assume that there are no
@@ -246,11 +370,55 @@
     now_ = std::get<0>(oldest_event);
 
     std::get<1>(oldest_event)->CallOldestEvent();
-  }
+  });
 
-  is_running_ = false;
+  MaybeRunStopped();
+}
 
-  RunStopped();
+template <typename F>
+void EventSchedulerScheduler::RunMaybeRealtimeLoop(F loop_body) {
+  internal::TimerFd timerfd;
+  CHECK_LT(0.0, replay_rate_) << "Replay rate must be positive.";
+  distributed_clock::time_point last_distributed_clock =
+      std::get<0>(OldestEvent());
+  monotonic_clock::time_point last_monotonic_clock = monotonic_clock::now();
+  timerfd.SetTime(last_monotonic_clock, std::chrono::seconds(0));
+  epoll_.OnReadable(
+      timerfd.fd(), [this, &last_distributed_clock, &last_monotonic_clock,
+                     &timerfd, loop_body]() {
+        const uint64_t read_result = timerfd.Read();
+        if (!is_running_) {
+          epoll_.Quit();
+          return;
+        }
+        CHECK_EQ(read_result, 1u);
+        // Call loop_body() at least once; if we are in infinite-speed replay,
+        // we don't actually want/need the context switches from the epoll
+        // setup, so just loop.
+        // Note: The performance impacts of this code have not been carefully
+        // inspected (e.g., how much does avoiding the context-switch help; does
+        // the timerfd_settime call matter).
+        // This is deliberately written to support the user changing replay
+        // rates dynamically.
+        do {
+          loop_body();
+          if (is_running_) {
+            const monotonic_clock::time_point next_trigger =
+                last_monotonic_clock +
+                std::chrono::duration_cast<std::chrono::nanoseconds>(
+                    (now_ - last_distributed_clock) / replay_rate_);
+            timerfd.SetTime(next_trigger, std::chrono::seconds(0));
+            last_monotonic_clock = next_trigger;
+            last_distributed_clock = now_;
+          } else {
+            epoll_.Quit();
+          }
+        } while (replay_rate_ == std::numeric_limits<double>::infinity() &&
+                 is_running_);
+      });
+
+  epoll_.Run();
+  epoll_.DeleteFd(timerfd.fd());
 }
 
 std::tuple<distributed_clock::time_point, EventScheduler *>
@@ -284,12 +452,23 @@
   const bool was_running = is_running_;
   if (is_running_) {
     is_running_ = false;
-    RunStopped();
+    MaybeRunStopped();
   }
   fn();
   if (was_running) {
-    RunOnStartup();
-    RunOnRun();
+    MaybeRunOnStartup();
+  }
+}
+
+void EventSchedulerScheduler::MaybeRunOnStartup() {
+  is_running_ = true;
+  for (EventScheduler *scheduler : schedulers_) {
+    scheduler->MaybeRunOnStartup();
+  }
+  // We must trigger all the OnRun's *after* all the OnStartup callbacks are
+  // triggered because that is the contract that we have stated.
+  for (EventScheduler *scheduler : schedulers_) {
+    scheduler->MaybeRunOnRun();
   }
 }
 
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index d91a75e..237a240 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -8,6 +8,7 @@
 #include <utility>
 #include <vector>
 
+#include "aos/events/epoll.h"
 #include "aos/events/event_loop.h"
 #include "aos/events/logging/boot_timestamp.h"
 #include "aos/logging/implementations.h"
@@ -108,47 +109,50 @@
   // Returns an iterator to the event
   Token Schedule(monotonic_clock::time_point time, Event *callback);
 
-  // Schedules a callback when the event scheduler starts.
+  // Schedules a callback whenever the event scheduler starts, after we have
+  // entered the running state. Callbacks are cleared after being called once.
+  // Will not get called until a node starts (a node does not start until its
+  // monotonic clock has reached at least monotonic_clock::epoch()).
   void ScheduleOnRun(std::function<void()> callback) {
     on_run_.emplace_back(std::move(callback));
   }
 
-  // Schedules a callback when the event scheduler starts.
+  // Schedules a callback whenever the event scheduler starts, before we have
+  // entered the running state. Callbacks are cleared after being called once.
+  // Will not get called until a node starts (a node does not start until its
+  // monotonic clock has reached at least monotonic_clock::epoch()).
   void ScheduleOnStartup(std::function<void()> callback) {
     on_startup_.emplace_back(std::move(callback));
   }
 
+  // Schedules a callback for whenever a node reboots, after we have exited the
+  // running state. Does not get called when the event scheduler stops (unless
+  // it is stopping to execute the reboot).
   void set_on_shutdown(std::function<void()> callback) {
     on_shutdown_ = std::move(callback);
   }
 
+  // Identical to ScheduleOnStartup, except that only one callback may get set
+  // and it will not be cleared after being called.
   void set_started(std::function<void()> callback) {
     started_ = std::move(callback);
   }
 
+  // Schedules a callback for whenever the scheduler exits the running state
+  // (running will be false during the callback). This includes both node
+  // reboots and the end of regular execution. Will not be called if the node
+  // never started.
   void set_stopped(std::function<void()> callback) {
     stopped_ = std::move(callback);
   }
 
-  std::function<void()> started_;
-  std::function<void()> stopped_;
-  std::function<void()> on_shutdown_;
-
   Token InvalidToken() { return events_list_.end(); }
 
   // Deschedule an event by its iterator
   void Deschedule(Token token);
 
-  // Runs the OnRun callbacks.
-  void RunOnRun();
-
-  // Runs the OnStartup callbacks.
-  void RunOnStartup() noexcept;
-
   // Runs the Started callback.
-  void RunStarted();
-  // Runs the Started callback.
-  void RunStopped();
+  void MaybeRunStopped();
 
   // Returns true if events are being handled.
   inline bool is_running() const;
@@ -185,12 +189,24 @@
 
   size_t node_index() const { return node_index_; }
 
+ private:
+  friend class EventSchedulerScheduler;
+
+  // Runs the OnRun callbacks.
+  void RunOnRun();
+
+  // Runs the OnStartup callbacks.
+  void RunOnStartup() noexcept;
+
+  // Runs the Started callback.
+  void RunStarted();
+
   // For implementing reboots.
   void Shutdown();
   void Startup();
 
- private:
-  friend class EventSchedulerScheduler;
+  void MaybeRunOnStartup();
+  void MaybeRunOnRun();
 
   // Current execution time.
   monotonic_clock::time_point monotonic_now_ = monotonic_clock::epoch();
@@ -213,6 +229,15 @@
   // distinguish which one.
   size_t node_index_ = 0;
 
+  // Whether this individual scheduler is currently running.
+  bool is_running_ = false;
+  // Whether we have called all the startup handlers during this boot.
+  bool called_started_ = false;
+
+  std::function<void()> started_;
+  std::function<void()> stopped_;
+  std::function<void()> on_shutdown_;
+
   // Converts time by doing nothing to it.
   class UnityConverter final : public TimeConverter {
    public:
@@ -270,33 +295,28 @@
   // Stops running.
   void Exit() { is_running_ = false; }
 
-  bool is_running() const { return is_running_; }
-
   // Runs for a duration on the distributed clock.  Time on the distributed
   // clock should be very representative of time on each node, but won't be
   // exactly the same.
   void RunFor(distributed_clock::duration duration);
 
+  // Sets the realtime replay rate. A value of 1.0 will cause the scheduler to
+  // try to play events in realtime. 0.5 will run at half speed. Use infinity
+  // (the default) to run as fast as possible. This can be changed during
+  // run-time.
+  void SetReplayRate(double replay_rate) { replay_rate_ = replay_rate; }
+  internal::EPoll *epoll() { return &epoll_; }
+
+  // Run until time.  fn_realtime_offset is a function that returns the
+  // realtime offset.
+  // Returns true if it ran until time (i.e., Exit() was not called before
+  // end_time).
+  bool RunUntil(realtime_clock::time_point end_time, EventScheduler *scheduler,
+                std::function<std::chrono::nanoseconds()> fn_realtime_offset);
+
   // Returns the current distributed time.
   distributed_clock::time_point distributed_now() const { return now_; }
 
-  void RunOnStartup() {
-    CHECK(!is_running_);
-    for (EventScheduler *scheduler : schedulers_) {
-      scheduler->RunOnStartup();
-    }
-    for (EventScheduler *scheduler : schedulers_) {
-      scheduler->RunStarted();
-    }
-  }
-
-  void RunStopped() {
-    CHECK(!is_running_);
-    for (EventScheduler *scheduler : schedulers_) {
-      scheduler->RunStopped();
-    }
-  }
-
   void SetTimeConverter(TimeConverter *time_converter) {
     time_converter->set_reboot_found(
         [this](distributed_clock::time_point reboot_time,
@@ -314,20 +334,20 @@
   void TemporarilyStopAndRun(std::function<void()> fn);
 
  private:
-  // Handles running the OnRun functions.
-  void RunOnRun() {
-    CHECK(!is_running_);
-    is_running_ = true;
-    for (EventScheduler *scheduler : schedulers_) {
-      scheduler->RunOnRun();
-    }
-  }
-
   void Reboot();
 
+  void MaybeRunStopped();
+  void MaybeRunOnStartup();
+
   // Returns the next event time and scheduler on which to run it.
   std::tuple<distributed_clock::time_point, EventScheduler *> OldestEvent();
 
+  // Handles running loop_body repeatedly until complete. loop_body should
+  // return the next time at which it wants to be called, and set is_running_ to
+  // false once we should stop.
+  template <typename F>
+  void RunMaybeRealtimeLoop(F loop_body);
+
   // True if we are running.
   bool is_running_ = false;
   // The current time.
@@ -339,6 +359,9 @@
   std::vector<std::tuple<distributed_clock::time_point,
                          std::vector<logger::BootTimestamp>>>
       reboots_;
+
+  double replay_rate_ = std::numeric_limits<double>::infinity();
+  internal::EPoll epoll_;
 };
 
 inline distributed_clock::time_point EventScheduler::distributed_now() const {
@@ -353,9 +376,7 @@
   return t.time;
 }
 
-inline bool EventScheduler::is_running() const {
-  return scheduler_scheduler_->is_running();
-}
+inline bool EventScheduler::is_running() const { return is_running_; }
 
 }  // namespace aos
 
diff --git a/aos/events/event_scheduler_test.cc b/aos/events/event_scheduler_test.cc
index 54fb91a..c32399c 100644
--- a/aos/events/event_scheduler_test.cc
+++ b/aos/events/event_scheduler_test.cc
@@ -2,6 +2,7 @@
 
 #include <chrono>
 
+#include "aos/network/testing_time_converter.h"
 #include "gtest/gtest.h"
 
 namespace aos {
@@ -67,6 +68,16 @@
   std::vector<UUID> uuids_;
 };
 
+class FunctionEvent : public EventScheduler::Event {
+ public:
+  FunctionEvent(std::function<void()> fn) : fn_(fn) {}
+
+  void Handle() noexcept override { fn_(); }
+
+ private:
+  std::function<void()> fn_;
+};
+
 // Tests that the default parameters (slope of 1, offest of 0) behave as
 // an identity.
 TEST(EventSchedulerTest, IdentityTimeConversion) {
@@ -108,4 +119,408 @@
             distributed_clock::epoch() + chrono::seconds(1));
 }
 
+// Test that RunUntil() stops at the appointed time and returns correctly.
+TEST(EventSchedulerTest, RunUntil) {
+  int counter = 0;
+  EventSchedulerScheduler scheduler_scheduler;
+  EventScheduler scheduler(0);
+  scheduler_scheduler.AddEventScheduler(&scheduler);
+
+  FunctionEvent e([&counter]() { counter += 1; });
+  FunctionEvent quitter(
+      [&scheduler_scheduler]() { scheduler_scheduler.Exit(); });
+  scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+  scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(3), &quitter);
+  scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(5), &e);
+  ASSERT_TRUE(scheduler_scheduler.RunUntil(
+      realtime_clock::epoch() + std::chrono::seconds(2), &scheduler,
+      []() { return std::chrono::nanoseconds{0}; }));
+  EXPECT_EQ(counter, 1);
+  ASSERT_FALSE(scheduler_scheduler.RunUntil(
+      realtime_clock::epoch() + std::chrono::seconds(4), &scheduler,
+      []() { return std::chrono::nanoseconds{0}; }));
+  EXPECT_EQ(counter, 1);
+  ASSERT_TRUE(scheduler_scheduler.RunUntil(
+      realtime_clock::epoch() + std::chrono::seconds(6), &scheduler,
+      []() { return std::chrono::nanoseconds{0}; }));
+  EXPECT_EQ(counter, 2);
+}
+
+enum class RunMode {
+  kRun,
+  kRunUntil,
+  kRunFor,
+};
+
+// Sets up a parameterized test case that will excercise all three of the Run(),
+// RunFor(), and RunUntil() methods of the EventSchedulerScheduler. This exposes
+// a ParamRunFor() to the test case that will nominally run for the specified
+// time (except for when in kRun mode, where it will just call Run()).
+class EventSchedulerParamTest : public testing::TestWithParam<RunMode> {
+ public:
+  EventSchedulerParamTest() {
+    schedulers_.reserve(kNumNodes);
+    for (size_t ii = 0; ii < kNumNodes; ++ii) {
+      schedulers_.emplace_back(ii);
+      schedulers_.back().SetTimeConverter(ii, &time_);
+      scheduler_scheduler_.AddEventScheduler(&schedulers_.back());
+    }
+    scheduler_scheduler_.SetTimeConverter(&time_);
+  }
+
+  void StartClocksAtEpoch() {
+    time_.AddMonotonic({BootTimestamp::epoch(), BootTimestamp::epoch()});
+  }
+
+ protected:
+  static constexpr size_t kNumNodes = 2;
+
+  void CheckSchedulersRunning(bool running) {
+    for (EventScheduler &scheduler : schedulers_) {
+      EXPECT_EQ(running, scheduler.is_running());
+    }
+  }
+
+  void ParamRunFor(std::chrono::nanoseconds t) {
+    switch (GetParam()) {
+      case RunMode::kRun:
+        scheduler_scheduler_.Run();
+        break;
+      case RunMode::kRunUntil:
+        scheduler_scheduler_.RunUntil(
+            realtime_clock::time_point(
+                schedulers_.at(0).monotonic_now().time_since_epoch() + t),
+            &schedulers_.at(0), []() { return std::chrono::nanoseconds(0); });
+        break;
+      case RunMode::kRunFor:
+        scheduler_scheduler_.RunFor(t);
+        break;
+    }
+  }
+
+  message_bridge::TestingTimeConverter time_{kNumNodes};
+  std::vector<EventScheduler> schedulers_;
+  EventSchedulerScheduler scheduler_scheduler_;
+};
+
+// Tests that we correctly handle exiting during startup.
+TEST_P(EventSchedulerParamTest, ExitOnStartup) {
+  StartClocksAtEpoch();
+  bool observed_handler = false;
+  schedulers_.at(0).ScheduleOnStartup([this, &observed_handler]() {
+    EXPECT_FALSE(schedulers_.at(0).is_running());
+    observed_handler = true;
+    scheduler_scheduler_.Exit();
+  });
+  ParamRunFor(std::chrono::seconds(1));
+  EXPECT_TRUE(observed_handler);
+}
+
+// Test that creating an event and running the scheduler runs the event.
+TEST_P(EventSchedulerParamTest, ScheduleEvent) {
+  StartClocksAtEpoch();
+  int counter = 0;
+
+  FunctionEvent e([&counter]() { counter += 1; });
+  schedulers_.at(0).Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+  ParamRunFor(std::chrono::seconds(1));
+  EXPECT_EQ(counter, 1);
+  auto token = schedulers_.at(0).Schedule(
+      monotonic_clock::epoch() + chrono::seconds(2), &e);
+  schedulers_.at(0).Deschedule(token);
+  ParamRunFor(std::chrono::seconds(2));
+  EXPECT_EQ(counter, 1);
+}
+
+// Tests that a node that would have a negative monotonic time at boot does not
+// get started until later.
+TEST_P(EventSchedulerParamTest, NodeWaitsTillEpochToBoot) {
+  time_.AddNextTimestamp(
+      distributed_clock::epoch(),
+      {BootTimestamp{0, monotonic_clock::epoch()},
+       BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(1)}});
+  bool observed_startup_0 = false;
+  bool observed_startup_1 = false;
+  bool observed_on_run_1 = false;
+  schedulers_.at(0).ScheduleOnStartup([this, &observed_startup_0]() {
+    observed_startup_0 = true;
+    EXPECT_FALSE(schedulers_.at(0).is_running());
+    EXPECT_FALSE(schedulers_.at(1).is_running());
+    EXPECT_EQ(distributed_clock::epoch(),
+              scheduler_scheduler_.distributed_now());
+    EXPECT_EQ(monotonic_clock::epoch(), schedulers_.at(0).monotonic_now());
+    EXPECT_EQ(monotonic_clock::epoch() - chrono::seconds(1),
+              schedulers_.at(1).monotonic_now());
+  });
+  schedulers_.at(1).ScheduleOnStartup([this, &observed_startup_1]() {
+    observed_startup_1 = true;
+    // Note that we do not *stop* execution on node zero just to get 1 started.
+    EXPECT_TRUE(schedulers_.at(0).is_running());
+    EXPECT_FALSE(schedulers_.at(1).is_running());
+    EXPECT_EQ(distributed_clock::epoch() + chrono::seconds(1),
+              scheduler_scheduler_.distributed_now());
+    EXPECT_EQ(monotonic_clock::epoch() + chrono::seconds(1),
+              schedulers_.at(0).monotonic_now());
+    EXPECT_EQ(monotonic_clock::epoch(), schedulers_.at(1).monotonic_now());
+  });
+  schedulers_.at(1).ScheduleOnRun([this, &observed_on_run_1]() {
+    observed_on_run_1 = true;
+    // Note that we do not *stop* execution on node zero just to get 1 started.
+    EXPECT_TRUE(schedulers_.at(0).is_running());
+    EXPECT_TRUE(schedulers_.at(1).is_running());
+    EXPECT_EQ(distributed_clock::epoch() + chrono::seconds(1),
+              scheduler_scheduler_.distributed_now());
+    EXPECT_EQ(monotonic_clock::epoch() + chrono::seconds(1),
+              schedulers_.at(0).monotonic_now());
+    EXPECT_EQ(monotonic_clock::epoch(), schedulers_.at(1).monotonic_now());
+  });
+
+  FunctionEvent e([]() {});
+  schedulers_.at(0).Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+  ParamRunFor(chrono::seconds(1));
+  EXPECT_TRUE(observed_startup_0);
+  EXPECT_TRUE(observed_startup_1);
+  EXPECT_TRUE(observed_on_run_1);
+}
+
+// Tests that a node that never boots does not get any of its handlers run.
+TEST_P(EventSchedulerParamTest, NodeNeverBootsIfAlwaysNegative) {
+  time_.AddNextTimestamp(
+      distributed_clock::epoch(),
+      {BootTimestamp{0, monotonic_clock::epoch()},
+       BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(10)}});
+  bool observed_startup_0 = false;
+  schedulers_.at(0).ScheduleOnStartup([this, &observed_startup_0]() {
+    observed_startup_0 = true;
+    EXPECT_FALSE(schedulers_.at(0).is_running());
+    EXPECT_FALSE(schedulers_.at(1).is_running());
+    EXPECT_EQ(distributed_clock::epoch(),
+              scheduler_scheduler_.distributed_now());
+    EXPECT_EQ(monotonic_clock::epoch(), schedulers_.at(0).monotonic_now());
+    EXPECT_EQ(monotonic_clock::epoch() - chrono::seconds(10),
+              schedulers_.at(1).monotonic_now());
+  });
+  schedulers_.at(1).ScheduleOnStartup(
+      []() { FAIL() << "Should never have hit startup handlers for node 1."; });
+  schedulers_.at(1).ScheduleOnRun(
+      []() { FAIL() << "Should never have hit OnRun handlers for node 1."; });
+  schedulers_.at(1).set_stopped(
+      []() { FAIL() << "Should never have hit stopped handlers for node 1."; });
+
+  FunctionEvent e([this]() { scheduler_scheduler_.Exit(); });
+  schedulers_.at(0).Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+  ParamRunFor(chrono::seconds(1));
+  EXPECT_TRUE(observed_startup_0);
+}
+
+// Checks for regressions in how the startup/shutdown handlers behave.
+TEST_P(EventSchedulerParamTest, StartupShutdownHandlers) {
+  StartClocksAtEpoch();
+  time_.AddNextTimestamp(
+      distributed_clock::epoch() + chrono::seconds(3),
+      {BootTimestamp{0, monotonic_clock::epoch() + chrono::seconds(3)},
+       BootTimestamp{0, monotonic_clock::epoch() + chrono::seconds(3)}});
+  time_.RebootAt(0, distributed_clock::epoch() + chrono::seconds(4));
+  // Expected behavior:
+  // If all handlers get called during a reboot, they should sequence as:
+  // * is_running_ = false
+  // * stopped()
+  // * on_shutdown()
+  // * on_startup()
+  // * started()
+  // * is_running_ = true
+  // * OnRun()
+  //
+  // on_shutdown handlers should not get called at end of execution (e.g., when
+  // TemporarilyStopAndRun is called)--only when a node reboots.
+  //
+  // startup and OnRun handlers get cleared after being called once; these are
+  // also the only handlers that can have more than one handler registered.
+  //
+  // Create counters for all the handlers on the 0 node. Create separate a/b
+  // counters for the handlers that can/should get cleared.
+  int shutdown_counter = 0;
+  int stopped_counter = 0;
+  int startup_counter_a = 0;
+  int startup_counter_b = 0;
+  int started_counter = 0;
+  int on_run_counter_a = 0;
+  int on_run_counter_b = 0;
+
+  schedulers_.at(1).set_on_shutdown([]() {
+    FAIL() << "Should never reach the node 1 shutdown handler, since it never "
+              "reboots.";
+  });
+
+  auto startup_handler_a = [this, &startup_counter_a]() {
+    EXPECT_FALSE(schedulers_.at(0).is_running());
+    ++startup_counter_a;
+  };
+
+  auto startup_handler_b = [this, &startup_counter_b]() {
+    EXPECT_FALSE(schedulers_.at(0).is_running());
+    ++startup_counter_b;
+  };
+
+  auto on_run_handler_a = [this, &on_run_counter_a]() {
+    EXPECT_TRUE(schedulers_.at(0).is_running());
+    ++on_run_counter_a;
+  };
+
+  auto on_run_handler_b = [this, &on_run_counter_b]() {
+    EXPECT_TRUE(schedulers_.at(0).is_running());
+    ++on_run_counter_b;
+  };
+
+  schedulers_.at(0).set_stopped([this, &stopped_counter]() {
+    EXPECT_FALSE(schedulers_.at(0).is_running());
+    ++stopped_counter;
+  });
+  schedulers_.at(0).set_on_shutdown(
+      [this, &shutdown_counter, startup_handler_a, on_run_handler_a]() {
+        EXPECT_FALSE(schedulers_.at(0).is_running());
+        schedulers_.at(0).ScheduleOnStartup(startup_handler_a);
+        schedulers_.at(0).ScheduleOnRun(on_run_handler_a);
+        ++shutdown_counter;
+      });
+  schedulers_.at(0).ScheduleOnStartup(startup_handler_a);
+  schedulers_.at(0).set_started([this, &started_counter]() {
+    EXPECT_FALSE(schedulers_.at(0).is_running());
+    ++started_counter;
+  });
+  schedulers_.at(0).ScheduleOnRun(on_run_handler_a);
+
+  FunctionEvent e([]() {});
+  schedulers_.at(0).Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+  ParamRunFor(std::chrono::seconds(1));
+  EXPECT_EQ(shutdown_counter, 0);
+  EXPECT_EQ(stopped_counter, 1);
+  EXPECT_EQ(started_counter, 1);
+  EXPECT_EQ(startup_counter_a, 1);
+  EXPECT_EQ(on_run_counter_a, 1);
+  EXPECT_EQ(startup_counter_b, 0);
+  EXPECT_EQ(on_run_counter_b, 0);
+
+  // In the middle, execute a TemporarilyStopAndRun. Use it to re-register the
+  // startup handlers.
+  schedulers_.at(0).ScheduleOnStartup(startup_handler_b);
+  schedulers_.at(0).ScheduleOnRun(on_run_handler_b);
+  FunctionEvent stop_and_run([this, startup_handler_a, on_run_handler_a]() {
+    scheduler_scheduler_.TemporarilyStopAndRun(
+        [this, startup_handler_a, on_run_handler_a]() {
+          schedulers_.at(0).ScheduleOnStartup(startup_handler_a);
+          schedulers_.at(0).ScheduleOnRun(on_run_handler_a);
+        });
+  });
+  schedulers_.at(1).Schedule(monotonic_clock::epoch() + chrono::seconds(2),
+                             &stop_and_run);
+  ParamRunFor(std::chrono::seconds(1));
+  EXPECT_EQ(shutdown_counter, 0);
+  EXPECT_EQ(stopped_counter, 3);
+  EXPECT_EQ(started_counter, 3);
+  EXPECT_EQ(startup_counter_a, 2);
+  EXPECT_EQ(on_run_counter_a, 2);
+  EXPECT_EQ(startup_counter_b, 1);
+  EXPECT_EQ(on_run_counter_b, 1);
+
+  // Next, execute a reboot in the middle of running and confirm that things
+  // tally correctly. We do not re-register the startup/on_run handlers before
+  // starting here, but do in the shutdown handler, so should see the A handlers
+  // increment.
+  // We need to schedule at least one event so that the reboot is actually
+  // observable (otherwise Run() will just terminate immediately, since there
+  // are no scheduled events that could possibly observe the reboot anyways).
+  schedulers_.at(1).Schedule(monotonic_clock::epoch() + chrono::seconds(5), &e);
+  ParamRunFor(std::chrono::seconds(5));
+  EXPECT_EQ(shutdown_counter, 1);
+  EXPECT_EQ(stopped_counter, 5);
+  EXPECT_EQ(started_counter, 5);
+  EXPECT_EQ(startup_counter_a, 3);
+  EXPECT_EQ(on_run_counter_a, 3);
+  EXPECT_EQ(startup_counter_b, 1);
+  EXPECT_EQ(on_run_counter_b, 1);
+}
+
+// Test that descheduling an already scheduled event doesn't run the event.
+TEST_P(EventSchedulerParamTest, DescheduleEvent) {
+  StartClocksAtEpoch();
+  int counter = 0;
+  FunctionEvent e([&counter]() { counter += 1; });
+  auto token = schedulers_.at(0).Schedule(
+      monotonic_clock::epoch() + chrono::seconds(1), &e);
+  schedulers_.at(0).Deschedule(token);
+  ParamRunFor(std::chrono::seconds(2));
+  EXPECT_EQ(counter, 0);
+}
+
+// Test that TemporarilyStopAndRun respects and preserves running.
+TEST_P(EventSchedulerParamTest, TemporarilyStopAndRun) {
+  StartClocksAtEpoch();
+  int counter = 0;
+
+  scheduler_scheduler_.TemporarilyStopAndRun([this]() {
+    SCOPED_TRACE("StopAndRun while stopped.");
+    CheckSchedulersRunning(false);
+  });
+  {
+    SCOPED_TRACE("After StopAndRun while stopped.");
+    CheckSchedulersRunning(false);
+  }
+
+  FunctionEvent e([&]() {
+    counter += 1;
+    {
+      SCOPED_TRACE("Before StopAndRun while running.");
+      CheckSchedulersRunning(true);
+    }
+    scheduler_scheduler_.TemporarilyStopAndRun([&]() {
+      SCOPED_TRACE("StopAndRun while running.");
+      CheckSchedulersRunning(false);
+    });
+    {
+      SCOPED_TRACE("After StopAndRun while running.");
+      CheckSchedulersRunning(true);
+    }
+  });
+  schedulers_.at(0).Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+  ParamRunFor(std::chrono::seconds(1));
+  EXPECT_EQ(counter, 1);
+}
+
+// Test that TemporarilyStopAndRun leaves stopped nodes stopped.
+TEST_P(EventSchedulerParamTest, TemporarilyStopAndRunStaggeredStart) {
+  time_.AddNextTimestamp(
+      distributed_clock::epoch(),
+      {BootTimestamp{0, monotonic_clock::epoch()},
+       BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(10)}});
+  int counter = 0;
+
+  schedulers_[1].ScheduleOnRun([]() { FAIL(); });
+  schedulers_[1].ScheduleOnStartup([]() { FAIL(); });
+  schedulers_[1].set_on_shutdown([]() { FAIL(); });
+  schedulers_[1].set_started([]() { FAIL(); });
+  schedulers_[1].set_stopped([]() { FAIL(); });
+
+  FunctionEvent e([this, &counter]() {
+    counter += 1;
+    EXPECT_TRUE(schedulers_[0].is_running());
+    EXPECT_FALSE(schedulers_[1].is_running());
+    scheduler_scheduler_.TemporarilyStopAndRun([&]() {
+      SCOPED_TRACE("StopAndRun while running.");
+      CheckSchedulersRunning(false);
+    });
+    EXPECT_TRUE(schedulers_[0].is_running());
+    EXPECT_FALSE(schedulers_[1].is_running());
+  });
+  FunctionEvent exiter([this]() { scheduler_scheduler_.Exit(); });
+  schedulers_.at(0).Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+  schedulers_.at(0).Schedule(monotonic_clock::epoch() + chrono::seconds(2),
+                             &exiter);
+  ParamRunFor(std::chrono::seconds(1));
+  EXPECT_EQ(counter, 1);
+}
+
+INSTANTIATE_TEST_SUITE_P(EventSchedulerParamTest, EventSchedulerParamTest,
+                         testing::Values(RunMode::kRun, RunMode::kRunFor));
+
 }  // namespace aos
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 9f6234a..0afdda0 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -1,10 +1,11 @@
 load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
 load("//aos:config.bzl", "aos_config")
+load("//aos:flatbuffers.bzl", "cc_static_flatbuffer")
 
 flatbuffer_cc_library(
     name = "logger_fbs",
     srcs = ["logger.fbs"],
-    gen_reflections = 1,
+    gen_reflections = True,
     includes = [
         "//aos:configuration_fbs_includes",
     ],
@@ -12,6 +13,20 @@
     visibility = ["//visibility:public"],
 )
 
+flatbuffer_cc_library(
+    name = "replay_timing_fbs",
+    srcs = ["replay_timing.fbs"],
+    gen_reflections = True,
+    target_compatible_with = ["@platforms//os:linux"],
+)
+
+cc_static_flatbuffer(
+    name = "replay_timing_schema",
+    function = "aos::timing::ReplayTimingSchema",
+    target = ":replay_timing_fbs_reflection_out",
+    visibility = ["//visibility:public"],
+)
+
 cc_library(
     name = "boot_timestamp",
     srcs = ["boot_timestamp.cc"],
@@ -265,9 +280,13 @@
         ":log_writer",
         ":logfile_utils",
         ":logger_fbs",
+        ":replay_timing_fbs",
+        "//aos:condition",
         "//aos:uuid",
         "//aos/events:event_loop",
+        "//aos/events:shm_event_loop",
         "//aos/events:simulated_event_loop",
+        "//aos/mutex",
         "//aos/network:message_bridge_server_fbs",
         "//aos/network:multinode_timestamp_filter",
         "//aos/network:remote_message_fbs",
@@ -276,6 +295,7 @@
         "//aos/network:timestamp_filter",
         "//aos/time",
         "//aos/util:file",
+        "//aos/util:threaded_queue",
         "@com_github_google_flatbuffers//:flatbuffers",
         "@com_google_absl//absl/strings",
     ],
@@ -446,6 +466,41 @@
     deps = ["//aos/events:aos_config"],
 )
 
+aos_config(
+    name = "multinode_pingpong_triangle_split_config",
+    src = "multinode_pingpong_triangle_split.json",
+    flatbuffers = [
+        "//aos/events:ping_fbs",
+        "//aos/events:pong_fbs",
+        "//aos/network:message_bridge_client_fbs",
+        "//aos/network:message_bridge_server_fbs",
+        "//aos/network:remote_message_fbs",
+        "//aos/network:timestamp_fbs",
+    ],
+    target_compatible_with = ["@platforms//os:linux"],
+    deps = ["//aos/events:aos_config"],
+)
+
+cc_test(
+    name = "realtime_replay_test",
+    srcs = ["realtime_replay_test.cc"],
+    data = [
+        "//aos/events:pingpong_config",
+    ],
+    target_compatible_with = ["@platforms//os:linux"],
+    deps = [
+        ":log_reader",
+        ":log_writer",
+        "//aos/events:ping_lib",
+        "//aos/events:pong_lib",
+        "//aos/events:shm_event_loop",
+        "//aos/events:simulated_event_loop",
+        "//aos/testing:googletest",
+        "//aos/testing:path",
+        "//aos/testing:tmpdir",
+    ],
+)
+
 cc_test(
     name = "logger_test",
     srcs = ["logger_test.cc"],
@@ -460,6 +515,7 @@
         ":multinode_pingpong_split4_config",
         ":multinode_pingpong_split4_reliable_config",
         ":multinode_pingpong_split_config",
+        ":multinode_pingpong_triangle_split_config",
         "//aos/events:pingpong_config",
     ],
     shard_count = 10,
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 1c3a349..87ad72c 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -15,6 +15,7 @@
 #include "aos/events/logging/logfile_sorting.h"
 #include "aos/events/logging/logger_generated.h"
 #include "aos/flatbuffer_merge.h"
+#include "aos/json_to_flatbuffer.h"
 #include "aos/network/multinode_timestamp_filter.h"
 #include "aos/network/remote_message_generated.h"
 #include "aos/network/remote_message_schema.h"
@@ -46,6 +47,19 @@
     end_time, "",
     "If set, end at this point in time in the log on the realtime clock.");
 
+DEFINE_bool(drop_realtime_messages_before_start, false,
+            "If set, will drop any messages sent before the start of the "
+            "logfile in realtime replay. Setting this guarantees consistency "
+            "in timing with the original logfile, but means that you lose "
+            "access to fetched low-frequency messages.");
+
+DEFINE_double(
+    threaded_look_ahead_seconds, 2.0,
+    "Time, in seconds, to add to look-ahead when using multi-threaded replay. "
+    "Can validly be zero, but higher values are encouraged for realtime replay "
+    "in order to prevent the replay from ever having to block on waiting for "
+    "the reader to find the next message.");
+
 namespace aos {
 namespace configuration {
 // We don't really want to expose this publicly, but log reader doesn't really
@@ -162,6 +176,11 @@
 
   ~EventNotifier() { event_timer_->Disable(); }
 
+  // Sets the clock offset for realtime playback.
+  void SetClockOffset(std::chrono::nanoseconds clock_offset) {
+    clock_offset_ = clock_offset;
+  }
+
   // Returns the event trigger time.
   realtime_clock::time_point realtime_event_time() const {
     return realtime_event_time_;
@@ -189,7 +208,7 @@
       // Whops, time went backwards.  Just do it now.
       HandleTime();
     } else {
-      event_timer_->Setup(candidate_monotonic);
+      event_timer_->Setup(candidate_monotonic + clock_offset_);
     }
   }
 
@@ -208,6 +227,8 @@
   const realtime_clock::time_point realtime_event_time_ =
       realtime_clock::min_time;
 
+  std::chrono::nanoseconds clock_offset_{0};
+
   bool called_ = false;
 };
 
@@ -325,9 +346,7 @@
   }
 
   if (!configuration::MultiNode(configuration())) {
-    states_.emplace_back(std::make_unique<State>(
-        std::make_unique<TimestampMapper>(FilterPartsForNode(log_files_, "")),
-        nullptr));
+    states_.resize(1);
   } else {
     if (replay_configuration) {
       CHECK_EQ(logged_configuration()->nodes()->size(),
@@ -405,6 +424,46 @@
   state->OnStart(std::move(fn));
 }
 
+void LogReader::State::QueueThreadUntil(BootTimestamp time) {
+  if (threading_ == ThreadedBuffering::kYes) {
+    CHECK(!message_queuer_.has_value()) << "Can't start thread twice.";
+    message_queuer_.emplace(
+        [this](const BootTimestamp queue_until) {
+          // This will be called whenever anything prompts us for any state
+          // change; there may be wakeups that result in us not having any new
+          // data to push (even if we aren't done), in which case we will return
+          // nullopt but not done().
+          if (last_queued_message_.has_value() &&
+              queue_until < last_queued_message_) {
+            return util::ThreadedQueue<TimestampedMessage,
+                                       BootTimestamp>::PushResult{
+                std::nullopt, false,
+                last_queued_message_ == BootTimestamp::max_time()};
+          }
+          TimestampedMessage *message = timestamp_mapper_->Front();
+          // Upon reaching the end of the log, exit.
+          if (message == nullptr) {
+            last_queued_message_ = BootTimestamp::max_time();
+            return util::ThreadedQueue<TimestampedMessage,
+                                       BootTimestamp>::PushResult{std::nullopt,
+                                                                  false, true};
+          }
+          last_queued_message_ = message->monotonic_event_time;
+          const util::ThreadedQueue<TimestampedMessage,
+                                    BootTimestamp>::PushResult result{
+              *message, queue_until >= last_queued_message_, false};
+          timestamp_mapper_->PopFront();
+          SeedSortedMessages();
+          return result;
+        },
+        time);
+    // Spin until the first few seconds of messages are queued up so that we
+    // don't end up with delays/inconsistent timing during the first few seconds
+    // of replay.
+    message_queuer_->WaitForNoMoreWork();
+  }
+}
+
 void LogReader::State::OnStart(std::function<void()> fn) {
   on_starts_.emplace_back(std::move(fn));
 }
@@ -464,6 +523,51 @@
 
   stopped_ = true;
   started_ = true;
+  if (message_queuer_.has_value()) {
+    message_queuer_->StopPushing();
+  }
+}
+
+std::vector<
+    std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
+LogReader::State::NonExclusiveChannels() {
+  CHECK_NOTNULL(node_event_loop_factory_);
+  const aos::Configuration *config = node_event_loop_factory_->configuration();
+  std::vector<
+      std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
+      result{// Timing reports can be sent by logged and replayed applications.
+             {aos::configuration::GetChannel(config, "/aos",
+                                             "aos.timing.Report", "", node_),
+              NodeEventLoopFactory::ExclusiveSenders::kNo},
+             // AOS_LOG may be used in the log and in replay.
+             {aos::configuration::GetChannel(
+                  config, "/aos", "aos.logging.LogMessageFbs", "", node_),
+              NodeEventLoopFactory::ExclusiveSenders::kNo}};
+  for (const Node *const node : configuration::GetNodes(config)) {
+    if (node == nullptr) {
+      break;
+    }
+    const Channel *const old_timestamp_channel = aos::configuration::GetChannel(
+        config,
+        absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
+        "aos.message_bridge.RemoteMessage", "", node_, /*quiet=*/true);
+    // The old-style remote timestamp channel can be populated from any
+    // channel, simulated or replayed.
+    if (old_timestamp_channel != nullptr) {
+      result.push_back(std::make_pair(
+          old_timestamp_channel, NodeEventLoopFactory::ExclusiveSenders::kNo));
+    }
+  }
+  // Remove any channels that weren't found due to not existing in the
+  // config.
+  for (size_t ii = 0; ii < result.size();) {
+    if (result[ii].first == nullptr) {
+      result.erase(result.begin() + ii);
+    } else {
+      ++ii;
+    }
+  }
+  return result;
 }
 
 void LogReader::Register() {
@@ -490,11 +594,15 @@
     std::vector<LogParts> filtered_parts = FilterPartsForNode(
         log_files_, node != nullptr ? node->name()->string_view() : "");
 
+    // We don't run with threading on the buffering for simulated event loops
+    // because we haven't attempted to validate how the interactions beteen the
+    // buffering and the timestamp mapper works when running multiple nodes
+    // concurrently.
     states_[node_index] = std::make_unique<State>(
         filtered_parts.size() == 0u
             ? nullptr
             : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
-        node);
+        filters_.get(), node, State::ThreadedBuffering::kNo);
     State *state = states_[node_index].get();
     state->SetNodeEventLoopFactory(
         event_loop_factory_->GetNodeEventLoopFactory(node),
@@ -532,7 +640,7 @@
 
     // If we didn't find any log files with data in them, we won't ever get a
     // callback or be live.  So skip the rest of the setup.
-    if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+    if (state->SingleThreadedOldestMessageTime() == BootTimestamp::max_time()) {
       continue;
     }
     ++live_nodes_;
@@ -581,7 +689,7 @@
 
     // If we are replaying a log, we don't want a bunch of redundant messages
     // from both the real message bridge and simulated message bridge.
-    event_loop_factory_->DisableStatistics();
+    event_loop_factory_->PermanentlyDisableStatistics();
   }
 
   // Write pseudo start times out to file now that we are all setup.
@@ -661,8 +769,58 @@
   return nullptr;
 }
 
+// TODO(jkuszmaul): Make in-line modifications to
+// ServerStatistics/ClientStatistics messages for ShmEventLoop-based replay to
+// avoid messing up anything that depends on them having valid offsets.
 void LogReader::Register(EventLoop *event_loop) {
-  Register(event_loop, event_loop->node());
+  filters_ =
+      std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
+          event_loop->configuration(), logged_configuration(),
+          log_files_[0].boots, FLAGS_skip_order_validation,
+          chrono::duration_cast<chrono::nanoseconds>(
+              chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
+
+  std::vector<TimestampMapper *> timestamp_mappers;
+  for (const Node *node : configuration::GetNodes(configuration())) {
+    const size_t node_index =
+        configuration::GetNodeIndex(configuration(), node);
+    std::vector<LogParts> filtered_parts = FilterPartsForNode(
+        log_files_, node != nullptr ? node->name()->string_view() : "");
+
+    states_[node_index] = std::make_unique<State>(
+        filtered_parts.size() == 0u
+            ? nullptr
+            : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
+        filters_.get(), node, State::ThreadedBuffering::kYes);
+    State *state = states_[node_index].get();
+
+    state->SetChannelCount(logged_configuration()->channels()->size());
+    timestamp_mappers.emplace_back(state->timestamp_mapper());
+  }
+
+  filters_->SetTimestampMappers(std::move(timestamp_mappers));
+
+  for (const Node *node : configuration::GetNodes(configuration())) {
+    const size_t node_index =
+        configuration::GetNodeIndex(configuration(), node);
+    State *state = states_[node_index].get();
+    for (const Node *other_node : configuration::GetNodes(configuration())) {
+      const size_t other_node_index =
+          configuration::GetNodeIndex(configuration(), other_node);
+      State *other_state = states_[other_node_index].get();
+      if (other_state != state) {
+        state->AddPeer(other_state);
+      }
+    }
+  }
+  for (const Node *node : configuration::GetNodes(configuration())) {
+    if (node == nullptr || node->name()->string_view() ==
+                               event_loop->node()->name()->string_view()) {
+      Register(event_loop, event_loop->node());
+    } else {
+      Register(nullptr, node);
+    }
+  }
 }
 
 void LogReader::Register(EventLoop *event_loop, const Node *node) {
@@ -671,10 +829,13 @@
 
   // If we didn't find any log files with data in them, we won't ever get a
   // callback or be live.  So skip the rest of the setup.
-  if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+  if (state->SingleThreadedOldestMessageTime() == BootTimestamp::max_time()) {
     return;
   }
-  ++live_nodes_;
+
+  if (event_loop != nullptr) {
+    ++live_nodes_;
+  }
 
   if (event_loop_factory_ != nullptr) {
     event_loop_factory_->GetNodeEventLoopFactory(node)->OnStartup(
@@ -687,14 +848,14 @@
 }
 
 void LogReader::RegisterDuringStartup(EventLoop *event_loop, const Node *node) {
-  if (event_loop) {
+  if (event_loop != nullptr) {
     CHECK(event_loop->configuration() == configuration());
   }
 
   State *state =
       states_[configuration::GetNodeIndex(configuration(), node)].get();
 
-  if (!event_loop) {
+  if (event_loop == nullptr) {
     state->ClearTimeFlags();
   }
 
@@ -703,7 +864,7 @@
   // We don't run timing reports when trying to print out logged data, because
   // otherwise we would end up printing out the timing reports themselves...
   // This is only really relevant when we are replaying into a simulation.
-  if (event_loop) {
+  if (event_loop != nullptr) {
     event_loop->SkipTimingReport();
     event_loop->SkipAosLog();
   }
@@ -716,10 +877,10 @@
         logged_configuration()->channels()->Get(logged_channel_index));
 
     const bool logged = channel->logger() != LoggerConfig::NOT_LOGGED;
-
     message_bridge::NoncausalOffsetEstimator *filter = nullptr;
 
     State *source_state = nullptr;
+
     if (!configuration::ChannelIsSendableOnNode(channel, node) &&
         configuration::ChannelIsReadableOnNode(channel, node)) {
       const Node *source_node = configuration::GetNode(
@@ -741,7 +902,10 @@
     state->SetChannel(
         logged_channel_index,
         configuration::ChannelIndex(configuration(), channel),
-        event_loop && logged ? event_loop->MakeRawSender(channel) : nullptr,
+        event_loop && logged &&
+                configuration::ChannelIsReadableOnNode(channel, node)
+            ? event_loop->MakeRawSender(channel)
+            : nullptr,
         filter, is_forwarded, source_state);
 
     if (is_forwarded && logged) {
@@ -758,10 +922,12 @@
               states_[configuration::GetNodeIndex(
                           configuration(), connection->name()->string_view())]
                   .get();
-          destination_state->SetRemoteTimestampSender(
-              logged_channel_index,
-              event_loop ? state->RemoteTimestampSender(channel, connection)
-                         : nullptr);
+          if (destination_state) {
+            destination_state->SetRemoteTimestampSender(
+                logged_channel_index,
+                event_loop ? state->RemoteTimestampSender(channel, connection)
+                           : nullptr);
+          }
         }
       }
     }
@@ -775,14 +941,12 @@
   }
 
   state->set_timer_handler(event_loop->AddTimer([this, state]() {
-    VLOG(1) << "Starting sending " << MaybeNodeName(state->event_loop()->node())
-            << "at " << state->event_loop()->context().monotonic_event_time
-            << " now " << state->monotonic_now();
-    if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+    if (state->MultiThreadedOldestMessageTime() == BootTimestamp::max_time()) {
       --live_nodes_;
       VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
-      if (exit_on_finish_ && live_nodes_ == 0) {
-        event_loop_factory_->Exit();
+      if (exit_on_finish_ && live_nodes_ == 0 &&
+          event_loop_factory_ != nullptr) {
+        CHECK_NOTNULL(event_loop_factory_)->Exit();
       }
       return;
     }
@@ -794,32 +958,37 @@
 
     const monotonic_clock::time_point monotonic_now =
         state->event_loop()->context().monotonic_event_time;
-    if (!FLAGS_skip_order_validation) {
-      CHECK(monotonic_now == timestamped_message.monotonic_event_time.time)
-          << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
-          << monotonic_now << " trying to send "
-          << timestamped_message.monotonic_event_time << " failure "
-          << state->DebugString();
-    } else if (BootTimestamp{.boot = state->boot_count(),
-                             .time = monotonic_now} !=
-               timestamped_message.monotonic_event_time) {
-      LOG(WARNING) << "Check failed: monotonic_now == "
-                      "timestamped_message.monotonic_event_time) ("
-                   << monotonic_now << " vs. "
-                   << timestamped_message.monotonic_event_time
-                   << "): " << FlatbufferToJson(state->event_loop()->node())
-                   << " Now " << monotonic_now << " trying to send "
-                   << timestamped_message.monotonic_event_time << " failure "
-                   << state->DebugString();
+    if (event_loop_factory_ != nullptr) {
+      // Only enforce exact timing in simulation.
+      if (!FLAGS_skip_order_validation) {
+        CHECK(monotonic_now == timestamped_message.monotonic_event_time.time)
+            << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
+            << monotonic_now << " trying to send "
+            << timestamped_message.monotonic_event_time << " failure "
+            << state->DebugString();
+      } else if (BootTimestamp{.boot = state->boot_count(),
+                               .time = monotonic_now} !=
+                 timestamped_message.monotonic_event_time) {
+        LOG(WARNING) << "Check failed: monotonic_now == "
+                        "timestamped_message.monotonic_event_time) ("
+                     << monotonic_now << " vs. "
+                     << timestamped_message.monotonic_event_time
+                     << "): " << FlatbufferToJson(state->event_loop()->node())
+                     << " Now " << monotonic_now << " trying to send "
+                     << timestamped_message.monotonic_event_time << " failure "
+                     << state->DebugString();
+      }
     }
 
     if (timestamped_message.monotonic_event_time.time >
             state->monotonic_start_time(
                 timestamped_message.monotonic_event_time.boot) ||
-        event_loop_factory_ != nullptr) {
+        event_loop_factory_ != nullptr ||
+        !FLAGS_drop_realtime_messages_before_start) {
       if (timestamped_message.data != nullptr && !state->found_last_message()) {
         if (timestamped_message.monotonic_remote_time !=
-            BootTimestamp::min_time()) {
+                BootTimestamp::min_time() &&
+            !FLAGS_skip_order_validation && event_loop_factory_ != nullptr) {
           // Confirm that the message was sent on the sending node before the
           // destination node (this node).  As a proxy, do this by making sure
           // that time on the source node is past when the message was sent.
@@ -890,7 +1059,8 @@
                                  timestamped_message.realtime_event_time);
 
         VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
-                << timestamped_message.monotonic_event_time;
+                << timestamped_message.monotonic_event_time << " "
+                << state->DebugString();
         // TODO(austin): std::move channel_data in and make that efficient in
         // simulation.
         state->Send(std::move(timestamped_message));
@@ -982,16 +1152,16 @@
         }
       }
     } else {
-      LOG(WARNING) << "Not sending data from before the start of the log file. "
-                   << timestamped_message.monotonic_event_time.time
-                          .time_since_epoch()
-                          .count()
-                   << " start "
-                   << monotonic_start_time().time_since_epoch().count() << " "
-                   << *timestamped_message.data;
+      LOG(WARNING)
+          << "Not sending data from before the start of the log file. "
+          << timestamped_message.monotonic_event_time.time.time_since_epoch()
+                 .count()
+          << " start "
+          << monotonic_start_time(state->node()).time_since_epoch().count()
+          << " timestamped_message.data is null";
     }
 
-    const BootTimestamp next_time = state->OldestMessageTime();
+    const BootTimestamp next_time = state->MultiThreadedOldestMessageTime();
     if (next_time != BootTimestamp::max_time()) {
       if (next_time.boot != state->boot_count()) {
         VLOG(1) << "Next message for "
@@ -1002,18 +1172,28 @@
         state->NotifyLogfileEnd();
         return;
       }
-      VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
-              << "wakeup for " << next_time.time << "("
-              << state->ToDistributedClock(next_time.time)
-              << " distributed), now is " << state->monotonic_now();
+      if (event_loop_factory_ != nullptr) {
+        VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
+                << "wakeup for " << next_time.time << "("
+                << state->ToDistributedClock(next_time.time)
+                << " distributed), now is " << state->monotonic_now();
+      } else {
+        VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
+                << "wakeup for " << next_time.time << ", now is "
+                << state->monotonic_now();
+      }
+      // TODO(james): This can result in negative times getting passed-through
+      // in realtime replay.
       state->Setup(next_time.time);
     } else {
       VLOG(1) << MaybeNodeName(state->event_loop()->node())
               << "No next message, scheduling shutdown";
       state->NotifyLogfileEnd();
       // Set a timer up immediately after now to die. If we don't do this,
-      // then the senders waiting on the message we just read will never get
+      // then the watchers waiting on the message we just read will never get
       // called.
+      // Doesn't apply to single-EventLoop replay since the watchers in question
+      // are not under our control.
       if (event_loop_factory_ != nullptr) {
         state->Setup(monotonic_now + event_loop_factory_->send_delay() +
                      std::chrono::nanoseconds(1));
@@ -1025,7 +1205,9 @@
             << state->monotonic_now();
   }));
 
-  if (state->OldestMessageTime() != BootTimestamp::max_time()) {
+  state->SeedSortedMessages();
+
+  if (state->SingleThreadedOldestMessageTime() != BootTimestamp::max_time()) {
     state->set_startup_timer(
         event_loop->AddTimer([state]() { state->NotifyLogfileStart(); }));
     if (start_time_ != realtime_clock::min_time) {
@@ -1035,8 +1217,16 @@
       state->SetEndTimeFlag(end_time_);
     }
     event_loop->OnRun([state]() {
-      BootTimestamp next_time = state->OldestMessageTime();
+      BootTimestamp next_time = state->SingleThreadedOldestMessageTime();
       CHECK_EQ(next_time.boot, state->boot_count());
+      // Queue up messages and then set clock offsets (we don't want to set
+      // clock offsets before we've done the work of getting the first messages
+      // primed).
+      state->QueueThreadUntil(
+          next_time + std::chrono::duration_cast<std::chrono::nanoseconds>(
+                          std::chrono::duration<double>(
+                              FLAGS_threaded_look_ahead_seconds)));
+      state->MaybeSetClockOffset();
       state->Setup(next_time.time);
       state->SetupStartupTimer();
     });
@@ -1451,9 +1641,14 @@
   return remapped_channel;
 }
 
-LogReader::State::State(std::unique_ptr<TimestampMapper> timestamp_mapper,
-                        const Node *node)
-    : timestamp_mapper_(std::move(timestamp_mapper)), node_(node) {}
+LogReader::State::State(
+    std::unique_ptr<TimestampMapper> timestamp_mapper,
+    message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
+    const Node *node, LogReader::State::ThreadedBuffering threading)
+    : timestamp_mapper_(std::move(timestamp_mapper)),
+      node_(node),
+      multinode_filters_(multinode_filters),
+      threading_(threading) {}
 
 void LogReader::State::AddPeer(State *peer) {
   if (timestamp_mapper_ && peer->timestamp_mapper_) {
@@ -1499,6 +1694,52 @@
   factory_channel_index_[logged_channel_index] = factory_channel_index;
 }
 
+void LogReader::State::TrackMessageSendTiming(
+    const RawSender &sender, monotonic_clock::time_point expected_send_time) {
+  if (event_loop_ == nullptr || !timing_statistics_sender_.valid()) {
+    return;
+  }
+
+  timing::MessageTimingT sample;
+  sample.channel = configuration::ChannelIndex(event_loop_->configuration(),
+                                               sender.channel());
+  sample.expected_send_time = expected_send_time.time_since_epoch().count();
+  sample.actual_send_time =
+      sender.monotonic_sent_time().time_since_epoch().count();
+  sample.send_time_error = aos::time::DurationInSeconds(
+      expected_send_time - sender.monotonic_sent_time());
+  send_timings_.push_back(sample);
+
+  // Somewhat arbitrarily send out timing information in batches of 100. No need
+  // to create excessive overhead in regenerated logfiles.
+  // TODO(james): The overhead may be fine.
+  constexpr size_t kMaxTimesPerStatisticsMessage = 100;
+  CHECK(timing_statistics_sender_.valid());
+  if (send_timings_.size() == kMaxTimesPerStatisticsMessage) {
+    SendMessageTimings();
+  }
+}
+
+void LogReader::State::SendMessageTimings() {
+  if (send_timings_.empty() || !timing_statistics_sender_.valid()) {
+    return;
+  }
+  auto builder = timing_statistics_sender_.MakeBuilder();
+  std::vector<flatbuffers::Offset<timing::MessageTiming>> timing_offsets;
+  for (const auto &timing : send_timings_) {
+    timing_offsets.push_back(
+        timing::MessageTiming::Pack(*builder.fbb(), &timing));
+  }
+  send_timings_.clear();
+  flatbuffers::Offset<
+      flatbuffers::Vector<flatbuffers::Offset<timing::MessageTiming>>>
+      timings_offset = builder.fbb()->CreateVector(timing_offsets);
+  timing::ReplayTiming::Builder timing_builder =
+      builder.MakeBuilder<timing::ReplayTiming>();
+  timing_builder.add_messages(timings_offset);
+  timing_statistics_sender_.CheckOk(builder.Send(timing_builder.Finish()));
+}
+
 bool LogReader::State::Send(const TimestampedMessage &timestamped_message) {
   aos::RawSender *sender = channels_[timestamped_message.channel_index].get();
   CHECK(sender);
@@ -1572,6 +1813,23 @@
              source_state->boot_count());
   }
 
+  if (event_loop_factory_ != nullptr &&
+      channel_source_state_[timestamped_message.channel_index] != nullptr &&
+      multinode_filters_ != nullptr) {
+    // Sanity check that we are using consistent boot uuids.
+    State *source_state =
+        channel_source_state_[timestamped_message.channel_index];
+    CHECK_EQ(multinode_filters_->boot_uuid(
+                 configuration::GetNodeIndex(event_loop_->configuration(),
+                                             source_state->node()),
+                 timestamped_message.monotonic_remote_time.boot),
+             CHECK_NOTNULL(
+                 CHECK_NOTNULL(
+                     channel_source_state_[timestamped_message.channel_index])
+                     ->event_loop_)
+                 ->boot_uuid());
+  }
+
   // Send!  Use the replayed queue index here instead of the logged queue index
   // for the remote queue index.  This makes re-logging work.
   const auto err = sender->Send(
@@ -1580,11 +1838,22 @@
       timestamped_message.monotonic_remote_time.time,
       timestamped_message.realtime_remote_time, remote_queue_index,
       (channel_source_state_[timestamped_message.channel_index] != nullptr
-           ? CHECK_NOTNULL(
-                 channel_source_state_[timestamped_message.channel_index])
-                 ->event_loop_->boot_uuid()
+           ? CHECK_NOTNULL(multinode_filters_)
+                 ->boot_uuid(configuration::GetNodeIndex(
+                                 event_loop_->configuration(),
+                                 channel_source_state_[timestamped_message
+                                                           .channel_index]
+                                     ->node()),
+                             timestamped_message.monotonic_remote_time.boot)
            : event_loop_->boot_uuid()));
   if (err != RawSender::Error::kOk) return false;
+  if (monotonic_start_time(timestamped_message.monotonic_event_time.boot) <=
+      timestamped_message.monotonic_event_time.time) {
+    // Only track errors for non-fetched messages.
+    TrackMessageSendTiming(
+        *sender,
+        timestamped_message.monotonic_event_time.time + clock_offset());
+  }
 
   if (queue_index_map_[timestamped_message.channel_index]) {
     CHECK_EQ(timestamped_message.monotonic_event_time.boot, boot_count());
@@ -1629,6 +1898,11 @@
     // map.
   } else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
              nullptr) {
+    // TODO(james): Currently, If running replay against a single event loop,
+    // remote timestamps will not get replayed because this code-path only
+    // gets triggered on the event loop that receives the forwarded message
+    // that the timestamps correspond to. This code, as written, also doesn't
+    // correctly handle a non-zero clock_offset for the *_remote_time fields.
     State *source_state =
         CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index]);
 
@@ -1802,27 +2076,55 @@
 }
 
 TimestampedMessage LogReader::State::PopOldest() {
-  CHECK(timestamp_mapper_ != nullptr);
-  TimestampedMessage *result_ptr = timestamp_mapper_->Front();
-  CHECK(result_ptr != nullptr);
+  if (message_queuer_.has_value()) {
+    std::optional<TimestampedMessage> message = message_queuer_->Pop();
+    CHECK(message.has_value()) << ": Unexpectedly ran out of messages.";
+    message_queuer_->SetState(
+        message.value().monotonic_event_time +
+        std::chrono::duration_cast<std::chrono::nanoseconds>(
+            std::chrono::duration<double>(FLAGS_threaded_look_ahead_seconds)));
+    return message.value();
+  } else {
+    CHECK(timestamp_mapper_ != nullptr);
+    TimestampedMessage *result_ptr = timestamp_mapper_->Front();
+    CHECK(result_ptr != nullptr);
 
-  TimestampedMessage result = std::move(*result_ptr);
+    TimestampedMessage result = std::move(*result_ptr);
 
-  VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
-          << result.monotonic_event_time;
-  timestamp_mapper_->PopFront();
-  SeedSortedMessages();
+    VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
+            << result.monotonic_event_time;
+    timestamp_mapper_->PopFront();
+    SeedSortedMessages();
 
-  CHECK_EQ(result.monotonic_event_time.boot, boot_count());
+    CHECK_EQ(result.monotonic_event_time.boot, boot_count());
 
-  VLOG(1) << "Popped " << result
-          << configuration::CleanedChannelToString(
-                 event_loop_->configuration()->channels()->Get(
-                     factory_channel_index_[result.channel_index]));
-  return result;
+    VLOG(1) << "Popped " << result
+            << configuration::CleanedChannelToString(
+                   event_loop_->configuration()->channels()->Get(
+                       factory_channel_index_[result.channel_index]));
+    return result;
+  }
 }
 
-BootTimestamp LogReader::State::OldestMessageTime() {
+BootTimestamp LogReader::State::MultiThreadedOldestMessageTime() {
+  if (!message_queuer_.has_value()) {
+    return SingleThreadedOldestMessageTime();
+  }
+  std::optional<TimestampedMessage> message = message_queuer_->Peek();
+  if (!message.has_value()) {
+    return BootTimestamp::max_time();
+  }
+  if (message.value().monotonic_event_time.boot == boot_count()) {
+    ObserveNextMessage(message.value().monotonic_event_time.time,
+                       message.value().realtime_event_time);
+  }
+  return message.value().monotonic_event_time;
+}
+
+BootTimestamp LogReader::State::SingleThreadedOldestMessageTime() {
+  CHECK(!message_queuer_.has_value())
+      << "Cannot use SingleThreadedOldestMessageTime() once the queuer thread "
+         "is created.";
   if (timestamp_mapper_ == nullptr) {
     return BootTimestamp::max_time();
   }
@@ -1832,12 +2134,10 @@
   }
   VLOG(2) << MaybeNodeName(node()) << "oldest message at "
           << result_ptr->monotonic_event_time.time;
-
   if (result_ptr->monotonic_event_time.boot == boot_count()) {
     ObserveNextMessage(result_ptr->monotonic_event_time.time,
                        result_ptr->realtime_event_time);
   }
-
   return result_ptr->monotonic_event_time;
 }
 
@@ -1862,6 +2162,7 @@
   event_loop_ = nullptr;
   timer_handler_ = nullptr;
   node_event_loop_factory_ = nullptr;
+  timing_statistics_sender_ = Sender<timing::ReplayTiming>();
 }
 
 void LogReader::State::SetStartTimeFlag(realtime_clock::time_point start_time) {
@@ -1934,5 +2235,27 @@
   }
 }
 
+void LogReader::State::MaybeSetClockOffset() {
+  if (node_event_loop_factory_ == nullptr) {
+    // If not running with simulated event loop, set the monotonic clock
+    // offset.
+    clock_offset_ = event_loop()->monotonic_now() - monotonic_start_time(0);
+
+    if (start_event_notifier_) {
+      start_event_notifier_->SetClockOffset(clock_offset_);
+    }
+    if (end_event_notifier_) {
+      end_event_notifier_->SetClockOffset(clock_offset_);
+    }
+  }
+}
+
+void LogReader::SetRealtimeReplayRate(double replay_rate) {
+  CHECK(event_loop_factory_ != nullptr)
+      << ": Can't set replay rate without an event loop factory (have you "
+         "called Register()?).";
+  event_loop_factory_->SetRealtimeReplayRate(replay_rate);
+}
+
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index c1333c6..9e44996 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -3,20 +3,26 @@
 
 #include <chrono>
 #include <deque>
+#include <queue>
 #include <string_view>
 #include <tuple>
 #include <vector>
 
+#include "aos/condition.h"
 #include "aos/events/event_loop.h"
 #include "aos/events/logging/logfile_sorting.h"
 #include "aos/events/logging/logfile_utils.h"
 #include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/replay_timing_generated.h"
+#include "aos/events/shm_event_loop.h"
 #include "aos/events/simulated_event_loop.h"
+#include "aos/mutex/mutex.h"
 #include "aos/network/message_bridge_server_generated.h"
 #include "aos/network/multinode_timestamp_filter.h"
 #include "aos/network/remote_message_generated.h"
 #include "aos/network/timestamp_filter.h"
 #include "aos/time/time.h"
+#include "aos/util/threaded_queue.h"
 #include "aos/uuid.h"
 #include "flatbuffers/flatbuffers.h"
 
@@ -92,10 +98,19 @@
   // Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
   // and then calls Register.
   void Register();
+
   // Registers callbacks for all the events after the log file starts.  This is
   // only useful when replaying live.
   void Register(EventLoop *event_loop);
 
+  // Sets a sender that should be used for tracking timing statistics. If not
+  // set, no statistics will be recorded.
+  void set_timing_accuracy_sender(
+      const Node *node, aos::Sender<timing::ReplayTiming> timing_sender) {
+    states_[configuration::GetNodeIndex(configuration(), node)]
+        ->set_timing_accuracy_sender(std::move(timing_sender));
+  }
+
   // Called whenever a log file starts for a node.
   void OnStart(std::function<void()> fn);
   void OnStart(const Node *node, std::function<void()> fn);
@@ -216,6 +231,13 @@
     exit_on_finish_ = exit_on_finish;
   }
 
+  // Sets the realtime replay rate. A value of 1.0 will cause the scheduler to
+  // try to play events in realtime. 0.5 will run at half speed. Use infinity
+  // (the default) to run as fast as possible. This can be changed during
+  // run-time.
+  // Only applies when running against a SimulatedEventLoopFactory.
+  void SetRealtimeReplayRate(double replay_rate);
+
  private:
   void Register(EventLoop *event_loop, const Node *node);
 
@@ -285,7 +307,13 @@
   // State per node.
   class State {
    public:
-    State(std::unique_ptr<TimestampMapper> timestamp_mapper, const Node *node);
+    // Whether we should spin up a separate thread for buffering up messages.
+    // Only allowed in realtime replay--see comments on threading_ member for
+    // details.
+    enum class ThreadedBuffering { kYes, kNo };
+    State(std::unique_ptr<TimestampMapper> timestamp_mapper,
+          message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
+          const Node *node, ThreadedBuffering threading);
 
     // Connects up the timestamp mappers.
     void AddPeer(State *peer);
@@ -297,12 +325,25 @@
     TimestampedMessage PopOldest();
 
     // Returns the monotonic time of the oldest message.
-    BootTimestamp OldestMessageTime();
+    BootTimestamp SingleThreadedOldestMessageTime();
+    // Returns the monotonic time of the oldest message, handling querying the
+    // separate thread of ThreadedBuffering was set.
+    BootTimestamp MultiThreadedOldestMessageTime();
 
     size_t boot_count() const {
       // If we are replaying directly into an event loop, we can't reboot.  So
       // we will stay stuck on the 0th boot.
-      if (!node_event_loop_factory_) return 0u;
+      if (!node_event_loop_factory_) {
+        if (event_loop_ == nullptr) {
+          // If boot_count is being checked after startup for any of the
+          // non-primary nodes, then returning 0 may not be accurate (since
+          // remote nodes *can* reboot even if the EventLoop being played to
+          // can't).
+          CHECK(!started_);
+          CHECK(!stopped_);
+        }
+        return 0u;
+      }
       return node_event_loop_factory_->boot_count();
     }
 
@@ -319,8 +360,10 @@
         NotifyLogfileStart();
         return;
       }
-      CHECK_GE(start_time, event_loop_->monotonic_now());
-      startup_timer_->Setup(start_time);
+      if (node_event_loop_factory_) {
+        CHECK_GE(start_time + clock_offset(), event_loop_->monotonic_now());
+      }
+      startup_timer_->Setup(start_time + clock_offset());
     }
 
     void set_startup_timer(TimerHandler *timer_handler) {
@@ -382,6 +425,7 @@
     // distributed clock.
     distributed_clock::time_point ToDistributedClock(
         monotonic_clock::time_point time) {
+      CHECK(node_event_loop_factory_);
       return node_event_loop_factory_->ToDistributedClock(time);
     }
 
@@ -409,12 +453,14 @@
       // ensure we are remapping channels correctly.
       event_loop_unique_ptr_ = node_event_loop_factory_->MakeEventLoop(
           "log_reader", {NodeEventLoopFactory::CheckSentTooFast::kNo,
-                         NodeEventLoopFactory::ExclusiveSenders::kNo});
+                         NodeEventLoopFactory::ExclusiveSenders::kYes,
+                         NonExclusiveChannels()});
       return event_loop_unique_ptr_.get();
     }
 
     distributed_clock::time_point RemoteToDistributedClock(
         size_t channel_index, monotonic_clock::time_point time) {
+      CHECK(node_event_loop_factory_);
       return channel_source_state_[channel_index]
           ->node_event_loop_factory_->ToDistributedClock(time);
     }
@@ -425,7 +471,7 @@
     }
 
     monotonic_clock::time_point monotonic_now() const {
-      return node_event_loop_factory_->monotonic_now();
+      return event_loop_->monotonic_now();
     }
 
     // Sets the number of channels.
@@ -487,12 +533,16 @@
 
     // Sets the next wakeup time on the replay callback.
     void Setup(monotonic_clock::time_point next_time) {
-      timer_handler_->Setup(next_time);
+      timer_handler_->Setup(
+          std::max(monotonic_now(), next_time + clock_offset()));
     }
 
     // Sends a buffer on the provided channel index.
     bool Send(const TimestampedMessage &timestamped_message);
 
+    void MaybeSetClockOffset();
+    std::chrono::nanoseconds clock_offset() const { return clock_offset_; }
+
     // Returns a debug string for the channel merger.
     std::string DebugString() const {
       if (!timestamp_mapper_) {
@@ -522,7 +572,21 @@
       return last_message_[channel_index];
     }
 
+    void set_timing_accuracy_sender(
+        aos::Sender<timing::ReplayTiming> timing_sender) {
+      timing_statistics_sender_ = std::move(timing_sender);
+      OnEnd([this]() { SendMessageTimings(); });
+    }
+
+    // If running with ThreadedBuffering::kYes, will start the processing thread
+    // and queue up messages until the specified time. No-op of
+    // ThreadedBuffering::kNo is set. Should only be called once.
+    void QueueThreadUntil(BootTimestamp time);
+
    private:
+    void TrackMessageSendTiming(const RawSender &sender,
+                                monotonic_clock::time_point expected_send_time);
+    void SendMessageTimings();
     // Log file.
     std::unique_ptr<TimestampMapper> timestamp_mapper_;
 
@@ -556,6 +620,12 @@
       uint32_t actual_queue_index = 0xffffffff;
     };
 
+    // Returns a list of channels which LogReader will send on but which may
+    // *also* get sent on by other applications in replay.
+    std::vector<
+        std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
+    NonExclusiveChannels();
+
     // Stores all the timestamps that have been sent on this channel.  This is
     // only done for channels which are forwarded and on the node which
     // initially sends the message.  Compress using ranges and offsets.
@@ -582,6 +652,7 @@
     // going between 2 nodes.  The second element in the tuple indicates if this
     // is the primary direction or not.
     std::vector<message_bridge::NoncausalOffsetEstimator *> filters_;
+    message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters_;
 
     // List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
     // channel) which correspond to the originating node.
@@ -598,14 +669,46 @@
     absl::btree_map<const Channel *, std::shared_ptr<RemoteMessageSender>>
         timestamp_loggers_;
 
+    // Time offset between the log's monotonic clock and the current event
+    // loop's monotonic clock.  Useful when replaying logs with non-simulated
+    // event loops.
+    std::chrono::nanoseconds clock_offset_{0};
+
     std::vector<std::function<void()>> on_starts_;
     std::vector<std::function<void()>> on_ends_;
 
-    bool stopped_ = false;
-    bool started_ = false;
+    std::atomic<bool> stopped_ = false;
+    std::atomic<bool> started_ = false;
 
     bool found_last_message_ = false;
     std::vector<bool> last_message_;
+
+    std::vector<timing::MessageTimingT> send_timings_;
+    aos::Sender<timing::ReplayTiming> timing_statistics_sender_;
+
+    // Protects access to any internal state after Run() is called. Designed
+    // assuming that only one node is actually executing in replay.
+    // Threading design:
+    // * The worker passed to message_queuer_ has full ownership over all
+    //   the log-reading code, timestamp filters, last_queued_message_, etc.
+    // * The main thread should only have exclusive access to the replay
+    //   event loop and associated features (mainly senders).
+    //   It will pop an item out of the queue (which does maintain a shared_ptr
+    //   reference which may also be being used by the message_queuer_ thread,
+    //   but having shared_ptr's accessing the same memory from
+    //   separate threads is permissible).
+    // Enabling this in simulation is currently infeasible due to a lack of
+    // synchronization in the MultiNodeNoncausalOffsetEstimator. Essentially,
+    // when the message_queuer_ thread attempts to read/pop messages from the
+    // timestamp_mapper_, it will end up calling callbacks that update the
+    // internal state of the MultiNodeNoncausalOffsetEstimator. Simultaneously,
+    // the event scheduler that is running in the main thread to orchestrate the
+    // simulation will be querying the estimator to know what the clocks on the
+    // various nodes are at, leading to potential issues.
+    ThreadedBuffering threading_;
+    std::optional<BootTimestamp> last_queued_message_;
+    std::optional<util::ThreadedQueue<TimestampedMessage, BootTimestamp>>
+        message_queuer_;
   };
 
   // Node index -> State.
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 5f265e2..a463f4f 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -181,6 +181,46 @@
   }
 }
 
+// Tests that we die if the replayer attempts to send on a logged channel.
+TEST_F(LoggerDeathTest, DieOnDuplicateReplayChannels) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          ArtifactPath("aos/events/pingpong_config.json"));
+  SimulatedEventLoopFactory event_loop_factory(&config.message());
+  const ::std::string tmpdir = aos::testing::TestTmpDir();
+  const ::std::string base_name = tmpdir + "/logfile";
+  const ::std::string config_file =
+      absl::StrCat(base_name, kSingleConfigSha256, ".bfbs");
+  const ::std::string logfile = base_name + ".part0.bfbs";
+  // Remove the log file.
+  unlink(config_file.c_str());
+  unlink(logfile.c_str());
+
+  LOG(INFO) << "Logging data to " << logfile;
+
+  {
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory.MakeEventLoop("logger");
+
+    Logger logger(logger_event_loop.get());
+    logger.set_separate_config(false);
+    logger.set_polling_period(std::chrono::milliseconds(100));
+    logger.StartLoggingLocalNamerOnRun(base_name);
+
+    event_loop_factory.RunFor(chrono::seconds(2));
+  }
+
+  LogReader reader(logfile);
+
+  reader.Register();
+
+  std::unique_ptr<EventLoop> test_event_loop =
+      reader.event_loop_factory()->MakeEventLoop("log_reader");
+
+  EXPECT_DEATH(test_event_loop->MakeSender<examples::Ping>("/test"),
+               "exclusive channel.*examples.Ping");
+}
+
 // Tests calling StopLogging twice.
 TEST_F(LoggerDeathTest, ExtraStop) {
   const ::std::string tmpdir = aos::testing::TestTmpDir();
@@ -442,7 +482,8 @@
     std::unique_ptr<EventLoop> ping_spammer_event_loop =
         event_loop_factory.GetNodeEventLoopFactory(nullptr)->MakeEventLoop(
             "ping_spammer", {NodeEventLoopFactory::CheckSentTooFast::kNo,
-                             NodeEventLoopFactory::ExclusiveSenders::kNo});
+                             NodeEventLoopFactory::ExclusiveSenders::kNo,
+                             {}});
     aos::Sender<examples::Ping> ping_sender =
         ping_spammer_event_loop->MakeSender<examples::Ping>("/test");
 
@@ -2192,6 +2233,149 @@
   reader.Deregister();
 }
 
+// Tests that we observe all the same events in log replay (for a given node)
+// whether we just register an event loop for that node or if we register a full
+// event loop factory.
+TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
+  time_converter_.StartEqual();
+  constexpr chrono::milliseconds kStartupDelay(95);
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(kStartupDelay);
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+  }
+
+  LogReader full_reader(SortParts(logfiles_));
+  LogReader single_node_reader(SortParts(logfiles_));
+
+  SimulatedEventLoopFactory full_factory(full_reader.configuration());
+  SimulatedEventLoopFactory single_node_factory(
+      single_node_reader.configuration());
+  single_node_factory.SkipTimingReport();
+  single_node_factory.DisableStatistics();
+  std::unique_ptr<EventLoop> replay_event_loop =
+      single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
+          "log_reader");
+
+  full_reader.Register(&full_factory);
+  single_node_reader.Register(replay_event_loop.get());
+
+  const Node *full_pi1 =
+      configuration::GetNode(full_factory.configuration(), "pi1");
+
+  // Confirm we can read the data on the remapped channel, just for pi1. Nothing
+  // else should have moved.
+  std::unique_ptr<EventLoop> full_event_loop =
+      full_factory.MakeEventLoop("test", full_pi1);
+  full_event_loop->SkipTimingReport();
+  full_event_loop->SkipAosLog();
+  // maps are indexed on channel index.
+  // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
+  std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
+      observed_messages;
+  std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
+  for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
+       ++ii) {
+    const Channel *channel =
+        full_event_loop->configuration()->channels()->Get(ii);
+    // We currently don't support replaying remote timestamp channels in
+    // realtime replay.
+    if (channel->name()->string_view().find("remote_timestamp") !=
+        std::string_view::npos) {
+      continue;
+    }
+    if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
+      observed_messages[ii] = {};
+      fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
+      full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
+        if (fetchers[ii]->Fetch()) {
+          observed_messages[ii].push_back(std::make_pair(
+              fetchers[ii]->context().monotonic_event_time, true));
+        }
+      });
+      full_event_loop->MakeRawNoArgWatcher(
+          channel, [ii, &observed_messages](const Context &context) {
+            observed_messages[ii].push_back(
+                std::make_pair(context.monotonic_event_time, false));
+          });
+    }
+  }
+
+  full_factory.Run();
+  fetchers.clear();
+  full_reader.Deregister();
+
+  const Node *single_node_pi1 =
+      configuration::GetNode(single_node_factory.configuration(), "pi1");
+  std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
+
+  std::unique_ptr<EventLoop> single_node_event_loop =
+      single_node_factory.MakeEventLoop("test", single_node_pi1);
+  single_node_event_loop->SkipTimingReport();
+  single_node_event_loop->SkipAosLog();
+  for (size_t ii = 0;
+       ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
+    const Channel *channel =
+        single_node_event_loop->configuration()->channels()->Get(ii);
+    single_node_factory.DisableForwarding(channel);
+    if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
+      single_node_fetchers[ii] =
+          single_node_event_loop->MakeRawFetcher(channel);
+      single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
+        EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
+            << "Single EventLoop replay doesn't support pre-loading fetchers. "
+            << configuration::StrippedChannelToString(channel);
+      });
+      single_node_event_loop->MakeRawNoArgWatcher(
+          channel, [ii, &observed_messages, channel,
+                    kStartupDelay](const Context &context) {
+            if (observed_messages[ii].empty()) {
+              FAIL() << "Observed extra message at "
+                     << context.monotonic_event_time << " on "
+                     << configuration::StrippedChannelToString(channel);
+              return;
+            }
+            const std::pair<monotonic_clock::time_point, bool> &message =
+                observed_messages[ii].front();
+            if (message.second) {
+              EXPECT_LE(message.first,
+                        context.monotonic_event_time + kStartupDelay)
+                  << "Mismatched message times " << context.monotonic_event_time
+                  << " and " << message.first << " on "
+                  << configuration::StrippedChannelToString(channel);
+            } else {
+              EXPECT_EQ(message.first,
+                        context.monotonic_event_time + kStartupDelay)
+                  << "Mismatched message times " << context.monotonic_event_time
+                  << " and " << message.first << " on "
+                  << configuration::StrippedChannelToString(channel);
+            }
+            observed_messages[ii].erase(observed_messages[ii].begin());
+          });
+    }
+  }
+
+  single_node_factory.Run();
+
+  single_node_fetchers.clear();
+
+  single_node_reader.Deregister();
+
+  for (const auto &pair : observed_messages) {
+    EXPECT_TRUE(pair.second.empty())
+        << "Missed " << pair.second.size() << " messages on "
+        << configuration::StrippedChannelToString(
+               single_node_event_loop->configuration()->channels()->Get(
+                   pair.first));
+  }
+}
+
 // Tests that we properly recreate forwarded timestamps when replaying a log.
 // This should be enough that we can then re-run the logger and get a valid log
 // back.
@@ -4349,6 +4533,163 @@
   ConfirmReadable(filenames);
 }
 
+// Tests that we can replay a logfile that has timestamps such that at least one
+// node's epoch is at a positive distributed_clock (and thus will have to be
+// booted after the other node(s)).
+TEST_P(MultinodeLoggerTest, StartOneNodeBeforeOther) {
+  std::vector<std::string> filenames;
+
+  CHECK_EQ(pi1_index_, 0u);
+  CHECK_EQ(pi2_index_, 1u);
+
+  time_converter_.AddNextTimestamp(
+      distributed_clock::epoch(),
+      {BootTimestamp::epoch(), BootTimestamp::epoch()});
+
+  const chrono::nanoseconds before_reboot_duration = chrono::milliseconds(1000);
+  time_converter_.RebootAt(
+      0, distributed_clock::time_point(before_reboot_duration));
+
+  const chrono::nanoseconds test_duration = time_converter_.AddMonotonic(
+      {chrono::milliseconds(10000), chrono::milliseconds(10000)});
+
+  const std::string kLogfile =
+      aos::testing::TestTmpDir() + "/multi_logfile2.1/";
+  util::UnlinkRecursive(kLogfile);
+
+  pi2_->Disconnect(pi1_->node());
+  pi1_->Disconnect(pi2_->node());
+
+  {
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    pi2_logger.StartLogger(kLogfile);
+    event_loop_factory_.RunFor(before_reboot_duration);
+
+    pi2_->Connect(pi1_->node());
+    pi1_->Connect(pi2_->node());
+
+    event_loop_factory_.RunFor(test_duration);
+
+    pi2_logger.AppendAllFilenames(&filenames);
+  }
+
+  const std::vector<LogFile> sorted_parts = SortParts(filenames);
+  ConfirmReadable(filenames);
+
+  {
+    LogReader reader(sorted_parts);
+    SimulatedEventLoopFactory replay_factory(reader.configuration());
+    reader.RegisterWithoutStarting(&replay_factory);
+
+    NodeEventLoopFactory *const replay_node =
+        reader.event_loop_factory()->GetNodeEventLoopFactory("pi1");
+
+    std::unique_ptr<EventLoop> test_event_loop =
+        replay_node->MakeEventLoop("test_reader");
+    replay_node->OnStartup([replay_node]() {
+      // Check that we didn't boot until at least t=0.
+      CHECK_LE(monotonic_clock::epoch(), replay_node->monotonic_now());
+    });
+    test_event_loop->OnRun([&test_event_loop]() {
+      // Check that we didn't boot until at least t=0.
+      EXPECT_LE(monotonic_clock::epoch(), test_event_loop->monotonic_now());
+    });
+    reader.event_loop_factory()->Run();
+    reader.Deregister();
+  }
+}
+
+// Tests that when we have a loop without all the logs at all points in time, we
+// can sort it properly.
+TEST(MultinodeLoggerLoopTest, DISABLED_Loop) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(ArtifactPath(
+          "aos/events/logging/multinode_pingpong_triangle_split_config.json"));
+  message_bridge::TestingTimeConverter time_converter(
+      configuration::NodesCount(&config.message()));
+  SimulatedEventLoopFactory event_loop_factory(&config.message());
+  event_loop_factory.SetTimeConverter(&time_converter);
+
+  NodeEventLoopFactory *const pi1 =
+      event_loop_factory.GetNodeEventLoopFactory("pi1");
+  NodeEventLoopFactory *const pi2 =
+      event_loop_factory.GetNodeEventLoopFactory("pi2");
+  NodeEventLoopFactory *const pi3 =
+      event_loop_factory.GetNodeEventLoopFactory("pi3");
+
+  const std::string kLogfile1_1 =
+      aos::testing::TestTmpDir() + "/multi_logfile1/";
+  const std::string kLogfile2_1 =
+      aos::testing::TestTmpDir() + "/multi_logfile2/";
+  const std::string kLogfile3_1 =
+      aos::testing::TestTmpDir() + "/multi_logfile3/";
+  util::UnlinkRecursive(kLogfile1_1);
+  util::UnlinkRecursive(kLogfile2_1);
+  util::UnlinkRecursive(kLogfile3_1);
+
+  {
+    // Make pi1 boot before everything else.
+    time_converter.AddNextTimestamp(
+        distributed_clock::epoch(),
+        {BootTimestamp::epoch(),
+         BootTimestamp::epoch() - chrono::milliseconds(100),
+         BootTimestamp::epoch() - chrono::milliseconds(300)});
+  }
+
+  // We want to setup a situation such that 2 of the 3 legs of the loop are very
+  // confident about time being X, and the third leg is pulling the average off
+  // to one side.
+  //
+  // It's easiest to visualize this in timestamp_plotter.
+
+  std::vector<std::string> filenames;
+  {
+    // Have pi1 send out a reliable message at startup.  This sets up a long
+    // forwarding time message at the start to bias time.
+    std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
+    {
+      aos::Sender<examples::Ping> ping_sender =
+          pi1_event_loop->MakeSender<examples::Ping>("/reliable");
+
+      aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
+      examples::Ping::Builder ping_builder =
+          builder.MakeBuilder<examples::Ping>();
+      CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
+    }
+
+    // Wait a while so there's enough data to let the worst case be rather off.
+    event_loop_factory.RunFor(chrono::seconds(1000));
+
+    // Now start a receiving node first.  This sets up 2 tight bounds between 2
+    // of the nodes.
+    LoggerState pi2_logger = LoggerState::MakeLogger(
+        pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+    pi2_logger.StartLogger(kLogfile2_1);
+
+    event_loop_factory.RunFor(chrono::seconds(100));
+
+    // And now start the third leg.
+    LoggerState pi3_logger = LoggerState::MakeLogger(
+        pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+    pi3_logger.StartLogger(kLogfile3_1);
+
+    LoggerState pi1_logger = LoggerState::MakeLogger(
+        pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+    pi1_logger.StartLogger(kLogfile1_1);
+
+    event_loop_factory.RunFor(chrono::seconds(100));
+
+    pi1_logger.AppendAllFilenames(&filenames);
+    pi2_logger.AppendAllFilenames(&filenames);
+    pi3_logger.AppendAllFilenames(&filenames);
+  }
+
+  // Make sure we can read this.
+  const std::vector<LogFile> sorted_parts = SortParts(filenames);
+  auto result = ConfirmReadable(filenames);
+}
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/logging/multinode_pingpong_triangle_split.json b/aos/events/logging/multinode_pingpong_triangle_split.json
new file mode 100644
index 0000000..226718e
--- /dev/null
+++ b/aos/events/logging/multinode_pingpong_triangle_split.json
@@ -0,0 +1,343 @@
+{
+  "channels": [
+    {
+      "name": "/pi1/aos",
+      "type": "aos.logging.LogMessageFbs",
+      "source_node": "pi1",
+      "frequency": 200,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    {
+      "name": "/pi2/aos",
+      "type": "aos.logging.LogMessageFbs",
+      "source_node": "pi2",
+      "frequency": 200,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    {
+      "name": "/pi3/aos",
+      "type": "aos.logging.LogMessageFbs",
+      "source_node": "pi3",
+      "frequency": 200,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    /* Logged on pi1 locally */
+    {
+      "name": "/pi1/aos",
+      "type": "aos.timing.Report",
+      "source_node": "pi1",
+      "frequency": 50,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    {
+      "name": "/pi2/aos",
+      "type": "aos.timing.Report",
+      "source_node": "pi2",
+      "frequency": 50,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    {
+      "name": "/pi3/aos",
+      "type": "aos.timing.Report",
+      "source_node": "pi3",
+      "frequency": 50,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    {
+      "name": "/pi1/aos",
+      "type": "aos.message_bridge.ServerStatistics",
+      "logger": "LOCAL_LOGGER",
+      "source_node": "pi1"
+    },
+    {
+      "name": "/pi2/aos",
+      "type": "aos.message_bridge.ServerStatistics",
+      "logger": "LOCAL_LOGGER",
+      "source_node": "pi2"
+    },
+    {
+      "name": "/pi3/aos",
+      "type": "aos.message_bridge.ServerStatistics",
+      "logger": "LOCAL_LOGGER",
+      "source_node": "pi3"
+    },
+    {
+      "name": "/pi1/aos",
+      "type": "aos.logging.DynamicLogCommand",
+      "logger": "LOCAL_LOGGER",
+      "source_node": "pi1"
+    },
+    {
+      "name": "/pi2/aos",
+      "type": "aos.logging.DynamicLogCommand",
+      "logger": "LOCAL_LOGGER",
+      "source_node": "pi2"
+    },
+    {
+      "name": "/pi3/aos",
+      "type": "aos.logging.DynamicLogCommand",
+      "logger": "LOCAL_LOGGER",
+      "source_node": "pi3"
+    },
+    {
+      "name": "/pi1/aos",
+      "type": "aos.message_bridge.ClientStatistics",
+      "logger": "LOCAL_LOGGER",
+      "source_node": "pi1"
+    },
+    {
+      "name": "/pi2/aos",
+      "type": "aos.message_bridge.ClientStatistics",
+      "logger": "LOCAL_LOGGER",
+      "source_node": "pi2"
+    },
+    {
+      "name": "/pi3/aos",
+      "type": "aos.message_bridge.ClientStatistics",
+      "logger": "LOCAL_LOGGER",
+      "source_node": "pi3"
+    },
+    {
+      "name": "/pi1/aos",
+      "type": "aos.message_bridge.Timestamp",
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_nodes": ["pi2", "pi3"],
+      "source_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi2",
+          "time_to_live": 5000000,
+          "timestamp_logger": "LOCAL_LOGGER"
+        },
+        {
+          "name": "pi3",
+          "time_to_live": 5000000,
+          "timestamp_logger": "LOCAL_LOGGER"
+        }
+      ]
+    },
+    {
+      "name": "/pi2/aos",
+      "type": "aos.message_bridge.Timestamp",
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_nodes": ["pi1", "pi3"],
+      "source_node": "pi2",
+      "destination_nodes": [
+        {
+          "name": "pi1",
+          "time_to_live": 5000000,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["pi2"]
+        },
+        {
+          "name": "pi3",
+          "time_to_live": 5000000,
+          "timestamp_logger": "LOCAL_LOGGER"
+        }
+      ]
+    },
+    {
+      "name": "/pi3/aos",
+      "type": "aos.message_bridge.Timestamp",
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_nodes": ["pi1", "pi2"],
+      "source_node": "pi3",
+      "destination_nodes": [
+        {
+          "name": "pi1",
+          "time_to_live": 5000000,
+          "timestamp_logger": "LOCAL_LOGGER"
+        },
+        {
+          "name": "pi2",
+          "time_to_live": 5000000,
+          "timestamp_logger": "LOCAL_LOGGER"
+        }
+      ]
+    },
+    {
+      "name": "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
+      "type": "aos.message_bridge.RemoteMessage",
+      "logger": "NOT_LOGGED",
+      "num_senders": 2,
+      "source_node": "pi1"
+    },
+    {
+      "name": "/pi1/aos/remote_timestamps/pi3/pi1/aos/aos-message_bridge-Timestamp",
+      "type": "aos.message_bridge.RemoteMessage",
+      "logger": "NOT_LOGGED",
+      "num_senders": 2,
+      "source_node": "pi1"
+    },
+    {
+      "name": "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
+      "type": "aos.message_bridge.RemoteMessage",
+      "logger": "NOT_LOGGED",
+      "num_senders": 2,
+      "source_node": "pi2"
+    },
+    {
+      "name": "/pi2/aos/remote_timestamps/pi3/pi2/aos/aos-message_bridge-Timestamp",
+      "type": "aos.message_bridge.RemoteMessage",
+      "logger": "NOT_LOGGED",
+      "num_senders": 2,
+      "source_node": "pi2"
+    },
+    {
+      "name": "/pi3/aos/remote_timestamps/pi1/pi3/aos/aos-message_bridge-Timestamp",
+      "type": "aos.message_bridge.RemoteMessage",
+      "logger": "NOT_LOGGED",
+      "num_senders": 2,
+      "source_node": "pi3"
+    },
+    {
+      "name": "/pi3/aos/remote_timestamps/pi2/pi3/aos/aos-message_bridge-Timestamp",
+      "type": "aos.message_bridge.RemoteMessage",
+      "logger": "NOT_LOGGED",
+      "num_senders": 2,
+      "source_node": "pi3"
+    },
+    /* Reliable channel */
+    {
+      "name": "/reliable",
+      "type": "aos.examples.Ping",
+      "source_node": "pi1",
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_nodes": ["pi2", "pi3"],
+      "destination_nodes": [
+        {
+          "name": "pi2",
+          "priority": 1,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": [
+            "pi1"
+          ]
+        },
+        {
+          "name": "pi3",
+          "priority": 1,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": [
+            "pi1"
+          ]
+        }
+      ]
+    },
+    {
+      "name": "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
+      "type": "aos.message_bridge.RemoteMessage",
+      "logger": "NOT_LOGGED",
+      "num_senders": 2,
+      "source_node": "pi1",
+      "frequency": 150
+    },
+    {
+      "name": "/pi1/aos/remote_timestamps/pi3/reliable/aos-examples-Ping",
+      "type": "aos.message_bridge.RemoteMessage",
+      "logger": "NOT_LOGGED",
+      "num_senders": 2,
+      "source_node": "pi1",
+      "frequency": 150
+    },
+    /* Forwarded to pi2 */
+    {
+      "name": "/test",
+      "type": "aos.examples.Ping",
+      "source_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi2",
+          "priority": 1,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": [
+            "pi1"
+          ],
+          "time_to_live": 5000000
+        }
+      ],
+      "frequency": 150
+    },
+    {
+      "name": "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
+      "type": "aos.message_bridge.RemoteMessage",
+      "logger": "NOT_LOGGED",
+      "num_senders": 2,
+      "source_node": "pi1",
+      "frequency": 150
+    },
+    /* Forwarded back to pi1.
+     * The message is logged both on the sending node and the receiving node
+     * (to make it easier to look at the results for now).
+     *
+     * The timestamps are logged on the receiving node.
+     */
+    {
+      "name": "/test",
+      "type": "aos.examples.Pong",
+      "source_node": "pi2",
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_nodes": ["pi1"],
+      "destination_nodes": [
+        {
+          "name": "pi1",
+          "priority": 1,
+          "timestamp_logger": "LOCAL_LOGGER",
+          "time_to_live": 5000000
+        }
+      ],
+      "frequency": 150
+    }
+  ],
+  "maps": [
+    {
+      "match": {
+        "name": "/aos*",
+        "source_node": "pi1"
+      },
+      "rename": {
+        "name": "/pi1/aos"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos*",
+        "source_node": "pi2"
+      },
+      "rename": {
+        "name": "/pi2/aos"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos*",
+        "source_node": "pi3"
+      },
+      "rename": {
+        "name": "/pi3/aos"
+      }
+    }
+  ],
+  "nodes": [
+    {
+      "name": "pi1",
+      "hostname": "raspberrypi",
+      "port": 9971
+    },
+    {
+      "name": "pi2",
+      "hostname": "raspberrypi2",
+      "port": 9971
+    },
+    {
+      "name": "pi3",
+      "hostname": "raspberrypi3",
+      "port": 9971
+    }
+  ]
+}
diff --git a/aos/events/logging/realtime_replay_test.cc b/aos/events/logging/realtime_replay_test.cc
new file mode 100644
index 0000000..c7744d4
--- /dev/null
+++ b/aos/events/logging/realtime_replay_test.cc
@@ -0,0 +1,76 @@
+#include "aos/events/logging/log_reader.h"
+#include "aos/events/logging/log_writer.h"
+#include "aos/events/ping_lib.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/json_to_flatbuffer.h"
+#include "aos/testing/path.h"
+#include "aos/testing/tmpdir.h"
+#include "gtest/gtest.h"
+
+namespace aos::logger::testing {
+
+class RealtimeLoggerTest : public ::testing::Test {
+ protected:
+  RealtimeLoggerTest()
+      : shm_dir_(aos::testing::TestTmpDir() + "/aos"),
+        config_file_(
+            aos::testing::ArtifactPath("aos/events/pingpong_config.json")),
+        config_(aos::configuration::ReadConfig(config_file_)),
+        event_loop_factory_(&config_.message()),
+        ping_event_loop_(event_loop_factory_.MakeEventLoop("ping")),
+        ping_(ping_event_loop_.get()) {
+    FLAGS_shm_base = shm_dir_;
+
+    // Nuke the shm dir, to ensure we aren't being affected by any preexisting
+    // tests.
+    aos::util::UnlinkRecursive(shm_dir_);
+  }
+
+  gflags::FlagSaver flag_saver_;
+  std::string shm_dir_;
+
+  const std::string config_file_;
+  const aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
+
+  // Factory and Ping class to generate a test logfile.
+  SimulatedEventLoopFactory event_loop_factory_;
+  std::unique_ptr<EventLoop> ping_event_loop_;
+  Ping ping_;
+};
+
+TEST_F(RealtimeLoggerTest, RealtimeReplay) {
+  const std::string tmpdir = aos::testing::TestTmpDir();
+  const std::string base_name = tmpdir + "/logfile/";
+  aos::util::UnlinkRecursive(base_name);
+  {
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory_.MakeEventLoop("logger");
+
+    event_loop_factory_.RunFor(std::chrono::milliseconds(95));
+
+    Logger logger(logger_event_loop.get());
+    logger.set_separate_config(false);
+    logger.set_polling_period(std::chrono::milliseconds(100));
+    logger.StartLoggingLocalNamerOnRun(base_name);
+    event_loop_factory_.RunFor(std::chrono::milliseconds(2000));
+  }
+
+  LogReader reader(logger::SortParts(logger::FindLogs(base_name)));
+  ShmEventLoop shm_event_loop(reader.configuration());
+  reader.Register(&shm_event_loop);
+  reader.OnEnd(shm_event_loop.node(),
+               [&shm_event_loop]() { shm_event_loop.Exit(); });
+
+  Fetcher<examples::Ping> ping_fetcher =
+      shm_event_loop.MakeFetcher<examples::Ping>("/test");
+
+  shm_event_loop.AddTimer([]() { LOG(INFO) << "Hello, World!"; })
+      ->Setup(shm_event_loop.monotonic_now(), std::chrono::seconds(1));
+
+  shm_event_loop.Run();
+  reader.Deregister();
+
+  ASSERT_TRUE(ping_fetcher.Fetch());
+  ASSERT_EQ(ping_fetcher->value(), 210);
+}
+}  // namespace aos::logger::testing
diff --git a/aos/events/logging/replay_timing.fbs b/aos/events/logging/replay_timing.fbs
new file mode 100644
index 0000000..dbe7159
--- /dev/null
+++ b/aos/events/logging/replay_timing.fbs
@@ -0,0 +1,16 @@
+namespace aos.timing;
+
+table MessageTiming {
+  channel:uint (id: 0);
+  // Expected and actual monotonic send times, in nanoseconds.
+  expected_send_time:int64 (id: 1);
+  actual_send_time:int64 (id: 2);
+  // expected - actual, in seconds (provides no additional information).
+  send_time_error:double (id: 3);
+}
+
+table ReplayTiming {
+  messages:[MessageTiming] (id: 0);
+}
+
+root_type ReplayTiming;
diff --git a/aos/events/logging/timestamp_plot.cc b/aos/events/logging/timestamp_plot.cc
index 9c5f2dd..1239eb2 100644
--- a/aos/events/logging/timestamp_plot.cc
+++ b/aos/events/logging/timestamp_plot.cc
@@ -64,7 +64,9 @@
     }
 
     std::vector<std::string_view> l = absl::StrSplit(n, ", ");
-    CHECK_EQ(l.size(), 4u);
+    if (l.size() != 4u) {
+      continue;
+    }
     double t;
     double o;
     CHECK(absl::SimpleAtod(l[0], &t));
@@ -166,7 +168,9 @@
     }
 
     std::vector<std::string_view> l = absl::StrSplit(n, ", ");
-    CHECK_EQ(l.size(), 3u);
+    if (l.size() != 3u) {
+      continue;
+    }
     double t;
     double o;
     CHECK(absl::SimpleAtod(l[0], &t));
diff --git a/aos/events/multinode_pingpong_test_combined.json b/aos/events/multinode_pingpong_test_combined.json
index 2d58dd0..66ef8df 100644
--- a/aos/events/multinode_pingpong_test_combined.json
+++ b/aos/events/multinode_pingpong_test_combined.json
@@ -250,6 +250,20 @@
           "time_to_live": 5000000
         }
       ]
+    },
+    {
+      "name": "/reliable2",
+      "type": "aos.examples.Ping",
+      "source_node": "pi2",
+      "destination_nodes": [
+        {
+          "name": "pi1",
+          "priority": 1,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["pi2"],
+          "time_to_live": 0
+        }
+      ]
     }
   ],
   "maps": [
diff --git a/aos/events/multinode_pingpong_test_split.json b/aos/events/multinode_pingpong_test_split.json
index b160c42..049c407 100644
--- a/aos/events/multinode_pingpong_test_split.json
+++ b/aos/events/multinode_pingpong_test_split.json
@@ -163,6 +163,12 @@
       "source_node": "pi1"
     },
     {
+      "name": "/pi2/aos/remote_timestamps/pi1/reliable2/aos-examples-Ping",
+      "type": "aos.message_bridge.RemoteMessage",
+      "logger": "NOT_LOGGED",
+      "source_node": "pi2"
+    },
+    {
       "name": "/pi1/aos",
       "type": "aos.timing.Report",
       "source_node": "pi1",
@@ -266,6 +272,20 @@
           "time_to_live": 5000000
         }
       ]
+    },
+    {
+      "name": "/reliable2",
+      "type": "aos.examples.Ping",
+      "source_node": "pi2",
+      "destination_nodes": [
+        {
+          "name": "pi1",
+          "priority": 1,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["pi2"],
+          "time_to_live": 0
+        }
+      ]
     }
   ],
   "maps": [
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 88504a5..f570296 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -773,6 +773,10 @@
   }
 }
 
+void SimulatedEventLoopFactory::SetRealtimeReplayRate(double replay_rate) {
+  scheduler_scheduler_.SetReplayRate(replay_rate);
+}
+
 void SimulatedEventLoop::MakeRawWatcher(
     const Channel *channel,
     std::function<void(const Context &channel, const void *message)> watcher) {
@@ -962,7 +966,27 @@
   CHECK(allow_new_senders_)
       << ": Attempted to create a new sender on exclusive channel "
       << configuration::StrippedChannelToString(channel_);
-  if (event_loop->options().exclusive_senders == ExclusiveSenders::kYes) {
+  std::optional<ExclusiveSenders> per_channel_option;
+  for (const std::pair<const aos::Channel *, ExclusiveSenders> &per_channel :
+       event_loop->options().per_channel_exclusivity) {
+    if (per_channel.first->name()->string_view() ==
+            channel_->name()->string_view() &&
+        per_channel.first->type()->string_view() ==
+            channel_->type()->string_view()) {
+      CHECK(!per_channel_option.has_value())
+          << ": Channel " << configuration::StrippedChannelToString(channel_)
+          << " listed twice in per-channel list.";
+      per_channel_option = per_channel.second;
+    }
+  }
+  if (!per_channel_option.has_value()) {
+    // This could just as easily be implemented by setting
+    // per_channel_option to the global setting when we initialize it, but
+    // then we'd lose track of whether a given channel appears twice in
+    // the list.
+    per_channel_option = event_loop->options().exclusive_senders;
+  }
+  if (per_channel_option.value() == ExclusiveSenders::kYes) {
     CHECK_EQ(0, sender_count_)
         << ": Attempted to add an exclusive sender on a channel with existing "
            "senders: "
@@ -1157,6 +1181,7 @@
 
 void SimulatedTimerHandler::Setup(monotonic_clock::time_point base,
                                   monotonic_clock::duration repeat_offset) {
+  CHECK_GE(base, monotonic_clock::epoch());
   // The allocations in here are due to infrastructure and don't count in the no
   // mallocs in RT code.
   ScopedNotRealtime nrt;
@@ -1486,7 +1511,14 @@
 
 void SimulatedEventLoopFactory::DisableStatistics() {
   CHECK(bridge_) << ": Can't disable statistics without a message bridge.";
-  bridge_->DisableStatistics();
+  bridge_->DisableStatistics(
+      message_bridge::SimulatedMessageBridge::DestroySenders::kNo);
+}
+
+void SimulatedEventLoopFactory::PermanentlyDisableStatistics() {
+  CHECK(bridge_) << ": Can't disable statistics without a message bridge.";
+  bridge_->DisableStatistics(
+      message_bridge::SimulatedMessageBridge::DestroySenders::kYes);
 }
 
 void SimulatedEventLoopFactory::EnableStatistics() {
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 7b6eaa7..22eedc1 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -123,6 +123,9 @@
 
   // Disables the messages sent by the simulated message gateway.
   void DisableStatistics();
+  // Disables statistics sent by the simulated message gateway, and prevents
+  // EnableStatistcs from ever being called again (used by LogReader).
+  void PermanentlyDisableStatistics();
   // Enables the messages sent by the simulated message gateway.
   void EnableStatistics();
 
@@ -136,6 +139,15 @@
   // starts up without stopping execution.
   void AllowApplicationCreationDuring(std::function<void()> fn);
 
+  // Sets the realtime replay rate. A value of 1.0 will cause the scheduler to
+  // try to play events in realtime. 0.5 will run at half speed. Use infinity
+  // (the default) to run as fast as possible. This can be changed during
+  // run-time.
+  void SetRealtimeReplayRate(double replay_rate);
+
+  // Access to the internal scheduler's epoll object for realtime replay.
+  internal::EPoll *scheduler_epoll() { return scheduler_scheduler_.epoll(); }
+
  private:
   friend class NodeEventLoopFactory;
   friend class SimulatedFactoryExitHandle;
@@ -172,14 +184,18 @@
   struct EventLoopOptions {
     CheckSentTooFast check_sent_too_fast;
     ExclusiveSenders exclusive_senders;
+    // per_channel_exclusivity is used to list any exceptions to the overall
+    // exclusive_senders policy for this event loop.
+    std::vector<std::pair<const aos::Channel *, ExclusiveSenders>>
+        per_channel_exclusivity;
   };
 
   // Takes the name for the event loop and a struct of options for selecting
   // what checks to run for the event loop in question.
   std::unique_ptr<EventLoop> MakeEventLoop(
       std::string_view name,
-      EventLoopOptions options = EventLoopOptions{CheckSentTooFast::kYes,
-                                                  ExclusiveSenders::kNo});
+      EventLoopOptions options = EventLoopOptions{
+          CheckSentTooFast::kYes, ExclusiveSenders::kNo, {}});
 
   // Returns the node that this factory is running as, or nullptr if this is a
   // single node setup.
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index eddacb1..2922460 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -139,72 +139,6 @@
   aos::FlatbufferDetachedBuffer<aos::Configuration> config;
 };
 
-class FunctionEvent : public EventScheduler::Event {
- public:
-  FunctionEvent(std::function<void()> fn) : fn_(fn) {}
-
-  void Handle() noexcept override { fn_(); }
-
- private:
-  std::function<void()> fn_;
-};
-
-// Test that creating an event and running the scheduler runs the event.
-TEST(EventSchedulerTest, ScheduleEvent) {
-  int counter = 0;
-  EventSchedulerScheduler scheduler_scheduler;
-  EventScheduler scheduler(0);
-  scheduler_scheduler.AddEventScheduler(&scheduler);
-
-  FunctionEvent e([&counter]() { counter += 1; });
-  scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
-  scheduler_scheduler.Run();
-  EXPECT_EQ(counter, 1);
-  auto token =
-      scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(2), &e);
-  scheduler.Deschedule(token);
-  scheduler_scheduler.Run();
-  EXPECT_EQ(counter, 1);
-}
-
-// Test that descheduling an already scheduled event doesn't run the event.
-TEST(EventSchedulerTest, DescheduleEvent) {
-  int counter = 0;
-  EventSchedulerScheduler scheduler_scheduler;
-  EventScheduler scheduler(0);
-  scheduler_scheduler.AddEventScheduler(&scheduler);
-
-  FunctionEvent e([&counter]() { counter += 1; });
-  auto token =
-      scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
-  scheduler.Deschedule(token);
-  scheduler_scheduler.Run();
-  EXPECT_EQ(counter, 0);
-}
-
-// Test that TemporarilyStopAndRun respects and preserves running.
-TEST(EventSchedulerTest, TemporarilyStopAndRun) {
-  int counter = 0;
-  EventSchedulerScheduler scheduler_scheduler;
-  EventScheduler scheduler(0);
-  scheduler_scheduler.AddEventScheduler(&scheduler);
-
-  scheduler_scheduler.TemporarilyStopAndRun(
-      [&]() { CHECK(!scheduler_scheduler.is_running()); });
-  ASSERT_FALSE(scheduler_scheduler.is_running());
-
-  FunctionEvent e([&]() {
-    counter += 1;
-    CHECK(scheduler_scheduler.is_running());
-    scheduler_scheduler.TemporarilyStopAndRun(
-        [&]() { CHECK(!scheduler_scheduler.is_running()); });
-    CHECK(scheduler_scheduler.is_running());
-  });
-  scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
-  scheduler_scheduler.Run();
-  EXPECT_EQ(counter, 1);
-}
-
 // Test that sending a message after running gets properly notified.
 TEST(SimulatedEventLoopTest, SendAfterRunFor) {
   SimulatedEventLoopTestFactory factory;
@@ -262,7 +196,8 @@
       simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
           ->MakeEventLoop("too_fast_sender",
                           {NodeEventLoopFactory::CheckSentTooFast::kNo,
-                           NodeEventLoopFactory::ExclusiveSenders::kNo});
+                           NodeEventLoopFactory::ExclusiveSenders::kNo,
+                           {}});
   aos::Sender<TestMessage> too_fast_message_sender =
       too_fast_event_loop->MakeSender<TestMessage>("/test");
 
@@ -292,9 +227,13 @@
 
   ::std::unique_ptr<EventLoop> exclusive_event_loop =
       simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
-          ->MakeEventLoop("too_fast_sender",
-                          {NodeEventLoopFactory::CheckSentTooFast::kYes,
-                           NodeEventLoopFactory::ExclusiveSenders::kYes});
+          ->MakeEventLoop(
+              "too_fast_sender",
+              {NodeEventLoopFactory::CheckSentTooFast::kYes,
+               NodeEventLoopFactory::ExclusiveSenders::kYes,
+               {{configuration::GetChannel(factory.configuration(), "/test1",
+                                           "aos.TestMessage", "", nullptr),
+                 NodeEventLoopFactory::ExclusiveSenders::kNo}}});
   exclusive_event_loop->SkipAosLog();
   exclusive_event_loop->SkipTimingReport();
   ::std::unique_ptr<EventLoop> normal_event_loop =
@@ -313,6 +252,12 @@
       normal_event_loop->MakeSender<TestMessage>("/test");
   EXPECT_DEATH(exclusive_event_loop->MakeSender<TestMessage>("/test"),
                "TestMessage");
+
+  // And check an explicitly exempted channel:
+  aos::Sender<TestMessage> non_exclusive_sender =
+      exclusive_event_loop->MakeSender<TestMessage>("/test1");
+  aos::Sender<TestMessage> non_exclusive_sender_regular_event_loop =
+      normal_event_loop->MakeSender<TestMessage>("/test1");
 }
 
 void TestSentTooFastCheckEdgeCase(
@@ -1336,9 +1281,34 @@
       pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
                                     "/pi3/aos");
 
+  std::vector<std::unique_ptr<aos::EventLoop>> statistics_watcher_loops;
+  statistics_watcher_loops.emplace_back(pi1->MakeEventLoop("test"));
+  statistics_watcher_loops.emplace_back(pi2->MakeEventLoop("test"));
+  statistics_watcher_loops.emplace_back(pi3->MakeEventLoop("test"));
+  // The currenct contract is that, if all nodes boot simultaneously in
+  // simulation, that they should all act as if they area already connected,
+  // without ever observing the transition from disconnected to connected (note
+  // that on a real system the ServerStatistics message will get resent for each
+  // and every new connection, even if the new connections happen
+  // "simultaneously"--in simulation, we are essentially acting as if we are
+  // starting execution in an already running system, rather than observing the
+  // boot process).
+  for (auto &event_loop : statistics_watcher_loops) {
+    event_loop->MakeWatcher(
+        "/aos", [](const message_bridge::ServerStatistics &msg) {
+          for (const message_bridge::ServerConnection *connection :
+               *msg.connections()) {
+            EXPECT_EQ(message_bridge::State::CONNECTED, connection->state())
+                << connection->node()->name()->string_view();
+          }
+        });
+  }
+
   simulated_event_loop_factory.RunFor(chrono::seconds(2) +
                                       chrono::milliseconds(5));
 
+  statistics_watcher_loops.clear();
+
   EXPECT_EQ(pi1_pong_counter.count(), 201u);
   EXPECT_EQ(pi2_pong_counter.count(), 201u);
 
@@ -1631,8 +1601,13 @@
 
   std::unique_ptr<EventLoop> pi2_pong_event_loop =
       simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+  aos::Sender<examples::Ping> pi2_reliable_sender =
+      pi2_pong_event_loop->MakeSender<examples::Ping>("/reliable2");
+  SendPing(&pi2_reliable_sender, 1);
   MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
                                                       "/reliable");
+  MessageCounter<examples::Ping> pi1_reliable_counter(ping_event_loop.get(),
+                                                      "/reliable2");
   MessageCounter<examples::Ping> pi2_unreliable_counter(
       pi2_pong_event_loop.get(), "/unreliable");
   aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
@@ -1688,6 +1663,7 @@
   SendPing(&pi1_unreliable_sender, 2);
   simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
   EXPECT_EQ(pi2_reliable_counter.count(), 2u);
+  EXPECT_EQ(pi1_reliable_counter.count(), 1u);
   EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
 
   EXPECT_EQ(reliable_timestamp_count, 2u);
@@ -2186,6 +2162,71 @@
   EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
 }
 
+TEST(SimulatedEventLoopTest, ReliableMessageSentOnStaggeredBoot) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+  message_bridge::TestingTimeConverter time(
+      configuration::NodesCount(&config.message()));
+  time.AddNextTimestamp(
+      distributed_clock::epoch(),
+      {BootTimestamp{0, monotonic_clock::epoch()},
+       BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(1)},
+       BootTimestamp{0, monotonic_clock::epoch()}});
+  SimulatedEventLoopFactory factory(&config.message());
+  factory.SetTimeConverter(&time);
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+  NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+  const UUID pi1_boot_uuid = pi1->boot_uuid();
+  const UUID pi2_boot_uuid = pi2->boot_uuid();
+  EXPECT_NE(pi1_boot_uuid, UUID::Zero());
+  EXPECT_NE(pi2_boot_uuid, UUID::Zero());
+
+  {
+    ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
+    aos::Sender<examples::Ping> pi1_sender =
+        pi1_event_loop->MakeSender<examples::Ping>("/reliable");
+    SendPing(&pi1_sender, 1);
+  }
+  ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("ping");
+  aos::Sender<examples::Ping> pi2_sender =
+      pi2_event_loop->MakeSender<examples::Ping>("/reliable2");
+  SendPing(&pi2_sender, 1);
+  // Verify that we staggered the OnRun callback correctly.
+  pi2_event_loop->OnRun([pi1, pi2]() {
+    EXPECT_EQ(pi1->monotonic_now(),
+              monotonic_clock::epoch() + std::chrono::seconds(1));
+    EXPECT_EQ(pi2->monotonic_now(), monotonic_clock::epoch());
+  });
+
+  factory.RunFor(chrono::seconds(2));
+
+  {
+    ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
+    aos::Fetcher<examples::Ping> fetcher =
+        pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
+    ASSERT_TRUE(fetcher.Fetch());
+    EXPECT_EQ(fetcher.context().monotonic_event_time,
+              monotonic_clock::epoch() + factory.network_delay());
+    EXPECT_EQ(fetcher.context().monotonic_remote_time,
+              monotonic_clock::epoch());
+  }
+  {
+    ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("pong");
+    aos::Fetcher<examples::Ping> fetcher =
+        pi1_event_loop->MakeFetcher<examples::Ping>("/reliable2");
+    ASSERT_TRUE(fetcher.Fetch());
+    EXPECT_EQ(fetcher.context().monotonic_event_time,
+              monotonic_clock::epoch() + std::chrono::seconds(1) +
+                  factory.network_delay());
+    EXPECT_EQ(fetcher.context().monotonic_remote_time,
+              monotonic_clock::epoch() - std::chrono::seconds(1));
+  }
+}
+
 class SimulatedEventLoopDisconnectTest : public ::testing::Test {
  public:
   SimulatedEventLoopDisconnectTest()
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 19ca658..31f3f0e 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -32,6 +32,10 @@
   bool forwarding_disabled() const { return forwarding_disabled_; }
   void set_forwarding_disabled(bool forwarding_disabled) {
     forwarding_disabled_ = forwarding_disabled;
+    if (!forwarding_disabled_) {
+      CHECK(timestamp_logger_ == nullptr);
+      CHECK(sender_ == nullptr);
+    }
   }
 
   void SetFetchEventLoop(aos::EventLoop *fetch_event_loop,
@@ -50,7 +54,8 @@
       server_connection_ =
           server_status_->FindServerConnection(send_node_factory_->node());
     }
-    if (delivery_time_is_logged_ && timestamp_loggers != nullptr) {
+    if (delivery_time_is_logged_ && timestamp_loggers != nullptr &&
+        !forwarding_disabled_) {
       timestamp_logger_ =
           timestamp_loggers->SenderForChannel(channel_, connection_);
     } else {
@@ -79,7 +84,7 @@
                         MessageBridgeClientStatus *client_status) {
     sent_ = false;
     send_event_loop_ = send_event_loop;
-    if (send_event_loop_) {
+    if (send_event_loop_ && !forwarding_disabled_) {
       sender_ = send_event_loop_->MakeRawSender(channel_);
     } else {
       sender_ = nullptr;
@@ -133,7 +138,6 @@
     if (fetcher_->context().data == nullptr || sent_) {
       return;
     }
-    CHECK(!timer_scheduled_);
 
     // Send at startup.  It is the best we can do.
     const monotonic_clock::time_point monotonic_delivered_time =
@@ -561,16 +565,17 @@
   destination_state->second.SetClientState(source, state);
 }
 
-void SimulatedMessageBridge::DisableStatistics() {
+void SimulatedMessageBridge::DisableStatistics(DestroySenders destroy_senders) {
   for (std::pair<const Node *const, State> &state : event_loop_map_) {
-    state.second.DisableStatistics();
+    state.second.DisableStatistics(destroy_senders);
   }
 }
 
-void SimulatedMessageBridge::DisableStatistics(const Node *node) {
+void SimulatedMessageBridge::DisableStatistics(const Node *node,
+                                               DestroySenders destroy_senders) {
   auto it = event_loop_map_.find(node);
   CHECK(it != event_loop_map_.end());
-  it->second.DisableStatistics();
+  it->second.DisableStatistics(destroy_senders);
 }
 
 void SimulatedMessageBridge::EnableStatistics() {
@@ -625,7 +630,7 @@
   timestamp_loggers = ChannelTimestampSender(event_loop.get());
   server_status = std::make_unique<MessageBridgeServerStatus>(event_loop.get());
   if (disable_statistics_) {
-    server_status->DisableStatistics();
+    server_status->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
   }
 
   {
@@ -659,7 +664,7 @@
   }
   client_status = std::make_unique<MessageBridgeClientStatus>(event_loop.get());
   if (disable_statistics_) {
-    client_status->DisableStatistics();
+    client_status->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
   }
 
   for (size_t i = 0;
@@ -701,8 +706,16 @@
             configuration::ConnectionDeliveryTimeIsLoggedOnNode(
                 connection, event_loop->node());
 
+        const RawMessageDelayer *delayer = nullptr;
+        for (const RawMessageDelayer *candidate : source_delayers_) {
+          if (candidate->channel() == channel) {
+            delayer = candidate;
+          }
+        }
+
         // And the timestamps are then logged back by us again.
-        if (!delivery_time_is_logged) {
+        if (!delivery_time_is_logged ||
+            CHECK_NOTNULL(delayer)->forwarding_disabled()) {
           continue;
         }
 
@@ -725,6 +738,25 @@
         destination_delayer->ScheduleReliable();
       }
     }
+    // Note: This exists to work around the fact that some users like to be able
+    // to send reliable messages while execution is stopped, creating a
+    // situation where the following sequencing can occur:
+    // 1) <While stopped> Send a reliable message on Node A (to be forwarded to
+    //    Node B).
+    // 2) Node B starts up.
+    // 3) Anywhere from 0 to N seconds later, Node A starts up.
+    //
+    // In this case, we need the reliable message to make it to Node B, but it
+    // also shouldn't make it to Node B until Node A has started up.
+    //
+    // Ideally, if the user were to wait for the Node B OnRun callbacks to send
+    // the message, then that would trigger the watchers in the delayers.
+    // However, we so far have continued to support Sending while stopped....
+    for (RawMessageDelayer *source_delayer : source_delayers_) {
+      if (source_delayer->time_to_live() == 0) {
+        source_delayer->ScheduleReliable();
+      }
+    }
   });
 }
 
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index f5b3eae..9565029 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -35,8 +35,10 @@
   // Disables generating and sending the messages which message_gateway sends.
   // The messages are the ClientStatistics, ServerStatistics and Timestamp
   // messages.
-  void DisableStatistics();
-  void DisableStatistics(const Node *node);
+  enum class DestroySenders { kNo, kYes };
+  void DisableStatistics(DestroySenders destroy_senders = DestroySenders::kNo);
+  void DisableStatistics(const Node *node,
+                         DestroySenders destroy_senders = DestroySenders::kNo);
   void EnableStatistics();
   void EnableStatistics(const Node *node);
 
@@ -55,13 +57,16 @@
     }
     State(const State &state) = delete;
 
-    void DisableStatistics() {
+    void DisableStatistics(DestroySenders destroy_senders) {
       disable_statistics_ = true;
+      destroy_senders_ = destroy_senders;
       if (server_status) {
-        server_status->DisableStatistics();
+        server_status->DisableStatistics(destroy_senders ==
+                                         DestroySenders::kYes);
       }
       if (client_status) {
-        client_status->DisableStatistics();
+        client_status->DisableStatistics(destroy_senders ==
+                                         DestroySenders::kYes);
       }
     }
 
@@ -88,7 +93,8 @@
       // logfiles.
       SetEventLoop(node_factory_->MakeEventLoop(
           "message_bridge", {NodeEventLoopFactory::CheckSentTooFast::kNo,
-                             NodeEventLoopFactory::ExclusiveSenders::kNo}));
+                             NodeEventLoopFactory::ExclusiveSenders::kNo,
+                             {}}));
     }
 
     void SetEventLoop(std::unique_ptr<aos::EventLoop> loop);
@@ -218,7 +224,9 @@
     std::vector<RawMessageDelayer *> destination_delayers_;
 
     bool disable_statistics_ = false;
+    DestroySenders destroy_senders_ = DestroySenders::kNo;
   };
+
   // Map of nodes to event loops.  This is a member variable so that the
   // lifetime of the event loops matches the lifetime of the bridge.
   std::map<const Node *, State> event_loop_map_;
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index 57dd94b..b708ebc 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -211,8 +211,8 @@
   Event ready2;
 
   // Start the thread.
-  ::std::thread t1([this, &ready1]() { RunUntilWakeup(&ready1, 5); });
-  ::std::thread t2([this, &ready2]() { RunUntilWakeup(&ready2, 4); });
+  ::std::thread t1([this, &ready1]() { RunUntilWakeup(&ready1, 2); });
+  ::std::thread t2([this, &ready2]() { RunUntilWakeup(&ready2, 1); });
 
   ready1.Wait();
   ready2.Wait();
@@ -330,20 +330,27 @@
 
 namespace {
 
-// Temporarily pins the current thread to CPUs 0 and 1.
+// Temporarily pins the current thread to the first 2 available CPUs.
 // This speeds up the test on some machines a lot (~4x). It also preserves
 // opportunities for the 2 threads to race each other.
 class PinForTest {
  public:
   PinForTest() {
-    PCHECK(sched_getaffinity(0, sizeof(old_), &old_) == 0);
-    cpu_set_t new_set;
-    CPU_ZERO(&new_set);
-    CPU_SET(0, &new_set);
-    CPU_SET(1, &new_set);
-    PCHECK(sched_setaffinity(0, sizeof(new_set), &new_set) == 0);
+    cpu_set_t cpus = GetCurrentThreadAffinity();
+    old_ = cpus;
+    int number_found = 0;
+    for (int i = 0; i < CPU_SETSIZE; ++i) {
+      if (CPU_ISSET(i, &cpus)) {
+        if (number_found < 2) {
+          ++number_found;
+        } else {
+          CPU_CLR(i, &cpus);
+        }
+      }
+    }
+    SetCurrentThreadAffinity(cpus);
   }
-  ~PinForTest() { PCHECK(sched_setaffinity(0, sizeof(old_), &old_) == 0); }
+  ~PinForTest() { SetCurrentThreadAffinity(old_); }
 
  private:
   cpu_set_t old_;
diff --git a/aos/json_to_flatbuffer.h b/aos/json_to_flatbuffer.h
index 7168e4c..45920a8 100644
--- a/aos/json_to_flatbuffer.h
+++ b/aos/json_to_flatbuffer.h
@@ -102,7 +102,8 @@
             std::ostreambuf_iterator<char>(file));
 }
 
-// Parses a file as JSON and returns the corresponding Flatbuffer, or dies.
+// Parses a file as JSON and returns the corresponding Flatbuffer. Dies if
+// reading the file fails, returns an empty buffer if the contents are invalid.
 template <typename T>
 inline FlatbufferDetachedBuffer<T> JsonFileToFlatbuffer(
     const std::string_view path) {
diff --git a/aos/network/log_web_proxy_main.cc b/aos/network/log_web_proxy_main.cc
index 1cc8e17..34e40a2 100644
--- a/aos/network/log_web_proxy_main.cc
+++ b/aos/network/log_web_proxy_main.cc
@@ -17,6 +17,9 @@
 DEFINE_int32(buffer_size, -1, "-1 if infinite, in # of messages / channel.");
 DEFINE_double(monotonic_start_time, -1.0, "Start time (sec)");
 DEFINE_double(monotonic_end_time, -1.0, "End time (sec)");
+DEFINE_double(
+    replay_rate, -1,
+    "-1 to replay as fast as possible; 1.0 = realtime, 0.5 = half speed.");
 
 int main(int argc, char **argv) {
   aos::InitGoogle(&argc, &argv);
@@ -31,6 +34,12 @@
 
   reader.Register();
 
+  // If going for "as fast as possible" don't actually use infinity, because we
+  // don't want the log reading blocking our use of the epoll handlers.
+  reader.SetRealtimeReplayRate(FLAGS_replay_rate == -1.0
+                                   ? std::numeric_limits<double>::max()
+                                   : FLAGS_replay_rate);
+
   std::unique_ptr<aos::EventLoop> event_loop;
 
   if (FLAGS_node.empty()) {
@@ -55,13 +64,12 @@
   }
 
   aos::web_proxy::WebProxy web_proxy(
-      event_loop.get(), aos::web_proxy::StoreHistory::kYes, FLAGS_buffer_size);
+      event_loop.get(),
+      reader.event_loop_factory()->scheduler_epoll(),
+      aos::web_proxy::StoreHistory::kYes, FLAGS_buffer_size);
 
   web_proxy.SetDataPath(FLAGS_data_dir.c_str());
 
-  // Keep the web proxy alive past when we finish reading the logfile.
-  reader.set_exit_on_finish(false);
-
   if (FLAGS_monotonic_end_time > 0) {
     event_loop->AddTimer([&web_proxy]() { web_proxy.StopRecording(); })
         ->Setup(aos::monotonic_clock::time_point(
@@ -70,4 +78,11 @@
   }
 
   reader.event_loop_factory()->Run();
+
+  // Keep the web proxy alive past when we finish reading the logfile, but crank
+  // down the replay rate so that we don't peg our entire CPU just trying to
+  // service timers in the web proxy code.
+  reader.set_exit_on_finish(false);
+  reader.SetRealtimeReplayRate(1.0);
+  reader.event_loop_factory()->Run();
 }
diff --git a/aos/network/message_bridge_client_status.cc b/aos/network/message_bridge_client_status.cc
index aa2484d..f3511a0 100644
--- a/aos/network/message_bridge_client_status.cc
+++ b/aos/network/message_bridge_client_status.cc
@@ -219,12 +219,16 @@
   filter->Sample(monotonic_delivered_time, offset);
 }
 
-void MessageBridgeClientStatus::DisableStatistics() {
+void MessageBridgeClientStatus::DisableStatistics(bool destroy_sender) {
   statistics_timer_->Disable();
   send_ = false;
+  if (destroy_sender) {
+    sender_ = aos::Sender<ClientStatistics>();
+  }
 }
 
 void MessageBridgeClientStatus::EnableStatistics() {
+  CHECK(sender_.valid());
   send_ = true;
   statistics_timer_->Setup(event_loop_->monotonic_now() + kStatisticsPeriod,
                            kStatisticsPeriod);
diff --git a/aos/network/message_bridge_client_status.h b/aos/network/message_bridge_client_status.h
index 9c21169..033af95 100644
--- a/aos/network/message_bridge_client_status.h
+++ b/aos/network/message_bridge_client_status.h
@@ -54,7 +54,10 @@
   void Connect(int client_index);
 
   // Disables sending out any statistics messages.
-  void DisableStatistics();
+  // If destroy_sender is set, will clear the ClientStatistics Sender.
+  // EnableStatistics cannot be called again if destroy_sender is set. This is
+  // used by the LogReader to enforce one-sender-per-channel checks.
+  void DisableStatistics(bool destroy_sender);
   // Enables sending out any statistics messages.
   void EnableStatistics();
 
diff --git a/aos/network/message_bridge_server_status.cc b/aos/network/message_bridge_server_status.cc
index c82158b..4f6abff 100644
--- a/aos/network/message_bridge_server_status.cc
+++ b/aos/network/message_bridge_server_status.cc
@@ -411,13 +411,19 @@
   }
 }
 
-void MessageBridgeServerStatus::DisableStatistics() {
+void MessageBridgeServerStatus::DisableStatistics(bool destroy_senders) {
   send_ = false;
   statistics_timer_->Disable();
+  if (destroy_senders) {
+    sender_ = aos::Sender<ServerStatistics>();
+    timestamp_sender_ = aos::Sender<Timestamp>();
+  }
 }
 
 void MessageBridgeServerStatus::EnableStatistics() {
   send_ = true;
+  CHECK(sender_.valid());
+  CHECK(timestamp_sender_.valid());
   statistics_timer_->Setup(event_loop_->monotonic_now() + kPingPeriod,
                            kPingPeriod);
 }
diff --git a/aos/network/message_bridge_server_status.h b/aos/network/message_bridge_server_status.h
index feb1b05..327930c 100644
--- a/aos/network/message_bridge_server_status.h
+++ b/aos/network/message_bridge_server_status.h
@@ -73,7 +73,7 @@
   }
 
   // Disables sending out any statistics messages.
-  void DisableStatistics();
+  void DisableStatistics(bool destroy_senders);
   // Enables sending out any statistics messages.
   void EnableStatistics();
 
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index 81fc03f..5b63d18 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -182,7 +182,7 @@
       constexpr double kMinNetworkDelay = 2.0;
 
       const std::pair<NoncausalTimestampFilter::Pointer,
-                      std::pair<chrono::nanoseconds, double>>
+                      std::tuple<chrono::nanoseconds, double, double>>
           offset_error =
               FLAGS_bounds_offset_error
                   ? filter.filter->BoundsOffsetError(
@@ -198,8 +198,8 @@
       filter.pointer = offset_error.first;
 
       const std::pair<chrono::nanoseconds, double> error =
-          std::make_pair(offset_error.second.first,
-                         offset_error.second.second - kMinNetworkDelay);
+          std::make_pair(std::get<0>(offset_error.second),
+                         std::get<1>(offset_error.second) - kMinNetworkDelay);
 
       std::pair<chrono::nanoseconds, double> grad;
       double hess;
@@ -1856,6 +1856,7 @@
           std::optional<std::tuple<logger::BootTimestamp, logger::BootDuration>>
               result = next_node_filter->Consume();
           CHECK(result);
+          WriteFilter(next_node_filter, *result);
           next_node_filter->Pop(std::get<0>(*result) -
                                 time_estimation_buffer_seconds_);
         }
@@ -2061,17 +2062,22 @@
                     << (last_monotonics_[i].time - result_times[i].time).count()
                     << "ns";
         }
+        UpdateSolution(std::move(result_times));
+        WriteFilter(next_filter, sample);
+        FlushAndClose(false);
         LOG(FATAL)
             << "Found a solution before the last returned solution on node "
             << solution_node_index;
         break;
       case TimeComparison::kEq:
+        WriteFilter(next_filter, sample);
         return NextTimestamp();
       case TimeComparison::kInvalid: {
         const chrono::nanoseconds invalid_distance =
             InvalidDistance(last_monotonics_, result_times);
         if (invalid_distance <=
             chrono::nanoseconds(FLAGS_max_invalid_distance_ns)) {
+          WriteFilter(next_filter, sample);
           return NextTimestamp();
         }
         LOG(INFO) << "Times can't be compared by " << invalid_distance.count()
@@ -2083,6 +2089,9 @@
                     << (last_monotonics_[i].time - result_times[i].time).count()
                     << "ns";
         }
+        UpdateSolution(std::move(result_times));
+        WriteFilter(next_filter, sample);
+        FlushAndClose(false);
         LOG(FATAL) << "Please investigate.  Use --max_invalid_distance_ns="
                    << invalid_distance.count() << " to ignore this.";
       } break;
diff --git a/aos/network/multinode_timestamp_filter.h b/aos/network/multinode_timestamp_filter.h
index 0954984..253bb82 100644
--- a/aos/network/multinode_timestamp_filter.h
+++ b/aos/network/multinode_timestamp_filter.h
@@ -192,7 +192,7 @@
   // TODO(austin): Pop here instead of in log reader.
   void ObserveTimePassed(distributed_clock::time_point time) override;
 
-  // Queues 1 more timestammp in the interpolation list.  This is public for
+  // Queues 1 more timestamp in the interpolation list.  This is public for
   // timestamp_extractor so it can hammer on the log until everything is queued.
   std::optional<const std::tuple<distributed_clock::time_point,
                                  std::vector<logger::BootTimestamp>> *>
diff --git a/aos/network/multinode_timestamp_filter_test.cc b/aos/network/multinode_timestamp_filter_test.cc
index dab8e06..c078e28 100644
--- a/aos/network/multinode_timestamp_filter_test.cc
+++ b/aos/network/multinode_timestamp_filter_test.cc
@@ -393,16 +393,17 @@
 
   // Confirm that the error is almost equal for both directions.  The solution
   // is an integer solution, so there will be a little bit of error left over.
-  std::pair<chrono::nanoseconds, double> a_error =
+  std::tuple<chrono::nanoseconds, double, double> a_error =
       a.OffsetError(nullptr, NoncausalTimestampFilter::Pointer(),
                     std::get<0>(result1)[0], 0.0, std::get<0>(result1)[1], 0.0)
           .second;
-  std::pair<chrono::nanoseconds, double> b_error =
+  std::tuple<chrono::nanoseconds, double, double> b_error =
       b.OffsetError(nullptr, NoncausalTimestampFilter::Pointer(),
                     std::get<0>(result1)[1], 0.0, std::get<0>(result1)[0], 0.0)
           .second;
-  EXPECT_NEAR(static_cast<double>((a_error.first - b_error.first).count()) +
-                  (a_error.second - b_error.second),
+  EXPECT_NEAR(static_cast<double>(
+                  (std::get<0>(a_error) - std::get<0>(b_error)).count()) +
+                  (std::get<1>(a_error) - std::get<1>(b_error)),
               0.0, 0.5);
 }
 
diff --git a/aos/network/timestamp_channel.h b/aos/network/timestamp_channel.h
index e35a170..d95edda 100644
--- a/aos/network/timestamp_channel.h
+++ b/aos/network/timestamp_channel.h
@@ -59,6 +59,8 @@
 
   aos::Sender<RemoteMessage> *SenderForChannel(const Channel *channel,
                                                const Connection *connection);
+  void ClearSenderForChannel(const Channel *channel,
+                             const Connection *connection);
 
  private:
   aos::EventLoop *event_loop_;
diff --git a/aos/network/timestamp_filter.cc b/aos/network/timestamp_filter.cc
index 8d1c9fe..99dd671 100644
--- a/aos/network/timestamp_filter.cc
+++ b/aos/network/timestamp_filter.cc
@@ -773,17 +773,17 @@
 chrono::nanoseconds NoncausalTimestampFilter::ExtrapolateOffset(
     std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds> p0,
     monotonic_clock::time_point ta) {
-  return ExtrapolateOffset(p0, ta, 0.0).first;
+  return std::get<0>(ExtrapolateOffset(p0, ta, 0.0));
 }
 
 chrono::nanoseconds NoncausalTimestampFilter::InterpolateOffset(
     std::tuple<monotonic_clock::time_point, chrono::nanoseconds> p0,
     std::tuple<monotonic_clock::time_point, chrono::nanoseconds> p1,
     monotonic_clock::time_point ta) {
-  return InterpolateOffset(p0, p1, ta, 0.0).first;
+  return std::get<0>(InterpolateOffset(p0, p1, ta, 0.0));
 }
 
-std::pair<chrono::nanoseconds, double>
+std::tuple<chrono::nanoseconds, double, double>
 NoncausalTimestampFilter::InterpolateOffset(
     std::tuple<monotonic_clock::time_point, chrono::nanoseconds> p0,
     std::tuple<monotonic_clock::time_point, chrono::nanoseconds> p1,
@@ -821,12 +821,12 @@
   //
   // We have good tests which confirm for small offsets this matches nicely. For
   // large offsets, the 128 bit math will take care of us.
+  const double slope =
+      static_cast<double>(doffset.count()) / static_cast<double>(dt.count());
   const double remainder =
       static_cast<double>(numerator % absl::int128(dt.count())) / dt.count() +
-      (numerator > 0 ? -0.5 : 0.5) +
-      ta * static_cast<double>(doffset.count()) /
-          static_cast<double>(dt.count());
-  return std::make_pair(integer, remainder);
+      (numerator > 0 ? -0.5 : 0.5) + ta * slope;
+  return std::make_tuple(integer, remainder, slope);
 }
 
 chrono::nanoseconds NoncausalTimestampFilter::BoundOffset(
@@ -841,21 +841,22 @@
                   NoncausalTimestampFilter::ExtrapolateOffset(p1, ta));
 }
 
-std::pair<chrono::nanoseconds, double> NoncausalTimestampFilter::BoundOffset(
+std::tuple<chrono::nanoseconds, double, double>
+NoncausalTimestampFilter::BoundOffset(
     std::tuple<monotonic_clock::time_point, chrono::nanoseconds> p0,
     std::tuple<monotonic_clock::time_point, chrono::nanoseconds> p1,
     monotonic_clock::time_point ta_base, double ta) {
   DCHECK_GE(ta, 0.0);
   DCHECK_LT(ta, 1.0);
 
-  const std::pair<chrono::nanoseconds, double> o0 =
+  const std::tuple<chrono::nanoseconds, double, double> o0 =
       NoncausalTimestampFilter::ExtrapolateOffset(p0, ta_base, ta);
-  const std::pair<chrono::nanoseconds, double> o1 =
+  const std::tuple<chrono::nanoseconds, double, double> o1 =
       NoncausalTimestampFilter::ExtrapolateOffset(p1, ta_base, ta);
 
   // Want to calculate max(o0 + o0r, o1 + o1r) without precision problems.
-  if (static_cast<double>((o0.first - o1.first).count()) >
-      o1.second - o0.second) {
+  if (static_cast<double>((std::get<0>(o0) - std::get<0>(o1)).count()) >
+      std::get<1>(o1) - std::get<1>(o0)) {
     // Ok, o0 is now > o1.  We want the max, so return o0.
     return o0;
   } else {
@@ -863,7 +864,7 @@
   }
 }
 
-std::pair<chrono::nanoseconds, double>
+std::tuple<chrono::nanoseconds, double, double>
 NoncausalTimestampFilter::ExtrapolateOffset(
     std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds> p0,
     monotonic_clock::time_point ta_base, double ta) {
@@ -892,12 +893,13 @@
     const absl::int128 numerator =
         (absl::int128(dt.count() - MaxVelocityRatio::den / 2) *
          absl::int128(MaxVelocityRatio::num));
-    return std::make_pair(
+    return std::make_tuple(
         std::get<1>(p0) + chrono::nanoseconds(static_cast<int64_t>(
                               numerator / absl::int128(MaxVelocityRatio::den))),
         static_cast<double>(numerator % absl::int128(MaxVelocityRatio::den)) /
                 static_cast<double>(MaxVelocityRatio::den) +
-            0.5 + ta * kMaxVelocity());
+            0.5 + ta * kMaxVelocity(),
+        kMaxVelocity());
   } else {
     // Extrapolate forwards, using the (negative) MaxVelocity slope
     // Same concept, except going foward past our last (most recent) sample:
@@ -913,12 +915,13 @@
     const absl::int128 numerator =
         absl::int128(dt.count() + MaxVelocityRatio::den / 2) *
         absl::int128(MaxVelocityRatio::num);
-    return std::make_pair(
+    return std::make_tuple(
         std::get<1>(p0) - chrono::nanoseconds(static_cast<int64_t>(
                               numerator / absl::int128(MaxVelocityRatio::den))),
         -static_cast<double>(numerator % absl::int128(MaxVelocityRatio::den)) /
                 static_cast<double>(MaxVelocityRatio::den) +
-            0.5 - ta * kMaxVelocity());
+            0.5 - ta * kMaxVelocity(),
+        -kMaxVelocity());
   }
 }
 
@@ -946,7 +949,7 @@
                             points.second.first, points.second.second, ta));
 }
 
-std::pair<Pointer, std::pair<chrono::nanoseconds, double>>
+std::pair<Pointer, std::tuple<chrono::nanoseconds, double, double>>
 NoncausalTimestampFilter::SingleFilter::Offset(
     const SingleFilter *other, Pointer pointer,
     monotonic_clock::time_point ta_base, double ta) const {
@@ -976,7 +979,7 @@
           points.second.first, points.second.second, ta_base, ta));
 }
 
-std::pair<Pointer, std::pair<chrono::nanoseconds, double>>
+std::pair<Pointer, std::tuple<chrono::nanoseconds, double, double>>
 NoncausalTimestampFilter::SingleFilter::BoundsOffset(
     const SingleFilter *other, Pointer pointer,
     monotonic_clock::time_point ta_base, double ta) const {
@@ -1005,7 +1008,7 @@
                                           points.second.second, ta_base, ta));
 }
 
-std::pair<Pointer, std::pair<chrono::nanoseconds, double>>
+std::pair<Pointer, std::tuple<chrono::nanoseconds, double, double>>
 NoncausalTimestampFilter::SingleFilter::OffsetError(
     const SingleFilter *other, Pointer pointer,
     aos::monotonic_clock::time_point ta_base, double ta,
@@ -1013,17 +1016,19 @@
   NormalizeTimestamps(&ta_base, &ta);
   NormalizeTimestamps(&tb_base, &tb);
 
-  const std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>> offset =
-      Offset(other, pointer, ta_base, ta);
+  const std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
+      offset = Offset(other, pointer, ta_base, ta);
 
   // Compute the integer portion first, and the double portion second.  Subtract
   // the results of each.  This handles large offsets without losing precision.
   return std::make_pair(
-      offset.first, std::make_pair(((tb_base - ta_base) - offset.second.first),
-                                   (tb - ta) - offset.second.second));
+      offset.first,
+      std::make_tuple(((tb_base - ta_base) - std::get<0>(offset.second)),
+                      (tb - ta) - std::get<1>(offset.second),
+                      std::get<2>(offset.second)));
 }
 
-std::pair<Pointer, std::pair<chrono::nanoseconds, double>>
+std::pair<Pointer, std::tuple<chrono::nanoseconds, double, double>>
 NoncausalTimestampFilter::SingleFilter::BoundsOffsetError(
     const SingleFilter *other, Pointer pointer,
     aos::monotonic_clock::time_point ta_base, double ta,
@@ -1031,14 +1036,16 @@
   NormalizeTimestamps(&ta_base, &ta);
   NormalizeTimestamps(&tb_base, &tb);
 
-  const std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>> offset =
-      BoundsOffset(other, pointer, ta_base, ta);
+  const std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
+      offset = BoundsOffset(other, pointer, ta_base, ta);
 
   // Compute the integer portion first, and the double portion second.  Subtract
   // the results of each.  This handles large offsets without losing precision.
   return std::make_pair(
-      offset.first, std::make_pair((tb_base - ta_base) - offset.second.first,
-                                   (tb - ta) - offset.second.second));
+      offset.first,
+      std::make_tuple((tb_base - ta_base) - std::get<0>(offset.second),
+                      (tb - ta) - std::get<1>(offset.second),
+                      std::get<2>(offset.second)));
 }
 
 std::string NoncausalTimestampFilter::DebugOffsetError(
@@ -1128,22 +1135,24 @@
     auto reference_timestamp = GetReferenceTimestamp(ta_base, ta);
 
     // Special case size = 1 or ta before first timestamp, so we extrapolate
-    const std::pair<chrono::nanoseconds, double> offset =
+    const std::tuple<chrono::nanoseconds, double, double> offset =
         NoncausalTimestampFilter::ExtrapolateOffset(reference_timestamp.second,
                                                     ta_base, ta);
 
     // We want to do offset + ta > tb, but we need to do it with minimal
     // numerical precision problems.
     // See below for why this is a >=
-    if (static_cast<double>((offset.first + ta_base - tb_base).count()) >=
-        tb - ta - offset.second) {
+    if (static_cast<double>(
+            (std::get<0>(offset) + ta_base - tb_base).count()) >=
+        tb - ta - std::get<1>(offset)) {
       LOG(ERROR) << node_names_ << " "
-                 << TimeString(ta_base, ta, offset.first, offset.second)
+                 << TimeString(ta_base, ta, std::get<0>(offset),
+                               std::get<1>(offset))
                  << " > solution time "
                  << tb_base + chrono::nanoseconds(
                                   static_cast<int64_t>(std::round(tb)))
                  << ", " << tb - std::round(tb) << " foo";
-      LOG(INFO) << "Remainder " << offset.second;
+      LOG(INFO) << "Remainder " << std::get<1>(offset);
       return false;
     }
     return true;
@@ -1159,14 +1168,15 @@
       std::pair<std::tuple<monotonic_clock::time_point, chrono::nanoseconds>,
                 std::tuple<monotonic_clock::time_point, chrono::nanoseconds>>>
       points = FindTimestamps(other, false, pointer, ta_base, ta);
-  const std::pair<chrono::nanoseconds, double> offset =
+  const std::tuple<chrono::nanoseconds, double, double> offset =
       NoncausalTimestampFilter::BoundOffset(points.second.first,
                                             points.second.second, ta_base, ta);
   // See below for why this is a >=
-  if (static_cast<double>((offset.first + ta_base - tb_base).count()) >=
-      tb - offset.second - ta) {
+  if (static_cast<double>((std::get<0>(offset) + ta_base - tb_base).count()) >=
+      tb - std::get<1>(offset) - ta) {
     LOG(ERROR) << node_names_ << " "
-               << TimeString(ta_base, ta, offset.first, offset.second)
+               << TimeString(ta_base, ta, std::get<0>(offset),
+                             std::get<1>(offset))
                << " > solution time " << tb_base << ", " << tb;
     LOG(ERROR) << "Bracketing times are " << TimeString(points.second.first)
                << " and " << TimeString(points.second.second);
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index 2b5ff47..5aaa7fd 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -311,29 +311,31 @@
         other_points_;
   };
 
-  // Returns the error between the offset in the provided timestamps, and the
-  // offset at ta.  Also returns a pointer to the timestamps used for the
-  // lookup to be passed back in again for a more efficient second lookup.
-  std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>> OffsetError(
-      const NoncausalTimestampFilter *other, Pointer pointer,
-      logger::BootTimestamp ta_base, double ta, logger::BootTimestamp tb_base,
-      double tb) const {
+  // Returns the error between the offset in the provided timestamps, the
+  // offset at ta, and d error/dta.  Also returns a pointer to the timestamps
+  // used for the lookup to be passed back in again for a more efficient second
+  // lookup.
+  std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
+  OffsetError(const NoncausalTimestampFilter *other, Pointer pointer,
+              logger::BootTimestamp ta_base, double ta,
+              logger::BootTimestamp tb_base, double tb) const {
     const BootFilter *boot_filter = filter(pointer, ta_base.boot, tb_base.boot);
     const SingleFilter *other_filter =
         other == nullptr
             ? nullptr
             : other->maybe_single_filter(tb_base.boot, ta_base.boot);
-    std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>> result =
-        boot_filter->filter.OffsetError(other_filter, pointer, ta_base.time, ta,
-                                        tb_base.time, tb);
+    std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
+        result = boot_filter->filter.OffsetError(
+            other_filter, pointer, ta_base.time, ta, tb_base.time, tb);
     result.first.boot_filter_ = boot_filter;
     return result;
   }
 
-  // Returns the error between the offset in the provided timestamps, and the
-  // bounds offset at ta.  Also returns a pointer to the timestamps used for the
-  // lookup to be passed back in again for a more efficient second lookup.
-  std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>>
+  // Returns the error between the offset in the provided timestamps, the
+  // bounds offset at ta, and d error/dta.  Also returns a pointer to the
+  // timestamps used for the lookup to be passed back in again for a more
+  // efficient second lookup.
+  std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
   BoundsOffsetError(const NoncausalTimestampFilter *other, Pointer pointer,
                     logger::BootTimestamp ta_base, double ta,
                     logger::BootTimestamp tb_base, double tb) const {
@@ -342,8 +344,8 @@
         other == nullptr
             ? nullptr
             : other->maybe_single_filter(tb_base.boot, ta_base.boot);
-    std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>> result =
-        boot_filter->filter.BoundsOffsetError(
+    std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
+        result = boot_filter->filter.BoundsOffsetError(
             other_filter, pointer, ta_base.time, ta, tb_base.time, tb);
     result.first.boot_filter_ = boot_filter;
     return result;
@@ -573,17 +575,17 @@
       std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds> p0,
       monotonic_clock::time_point ta);
 
-  static std::pair<std::chrono::nanoseconds, double> InterpolateOffset(
+  static std::tuple<std::chrono::nanoseconds, double, double> InterpolateOffset(
       std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds> p0,
       std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds> p1,
       monotonic_clock::time_point ta_base, double ta);
 
-  static std::pair<std::chrono::nanoseconds, double> BoundOffset(
+  static std::tuple<std::chrono::nanoseconds, double, double> BoundOffset(
       std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds> p0,
       std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds> p1,
       monotonic_clock::time_point ta_base, double ta);
 
-  static std::pair<std::chrono::nanoseconds, double> ExtrapolateOffset(
+  static std::tuple<std::chrono::nanoseconds, double, double> ExtrapolateOffset(
       std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds> p0,
       monotonic_clock::time_point ta_base, double ta);
 
@@ -634,20 +636,20 @@
     std::pair<Pointer, std::chrono::nanoseconds> Offset(
         const SingleFilter *other, Pointer pointer,
         monotonic_clock::time_point ta) const;
-    std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>> Offset(
-        const SingleFilter *other, Pointer pointer,
-        monotonic_clock::time_point ta_base, double ta) const;
+    std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
+    Offset(const SingleFilter *other, Pointer pointer,
+           monotonic_clock::time_point ta_base, double ta) const;
 
-    std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>>
+    std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
     BoundsOffset(const SingleFilter *other, Pointer pointer,
                  monotonic_clock::time_point ta_base, double ta) const;
 
-    std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>> OffsetError(
-        const SingleFilter *other, Pointer pointer,
-        aos::monotonic_clock::time_point ta_base, double ta,
-        aos::monotonic_clock::time_point tb_base, double tb) const;
+    std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
+    OffsetError(const SingleFilter *other, Pointer pointer,
+                aos::monotonic_clock::time_point ta_base, double ta,
+                aos::monotonic_clock::time_point tb_base, double tb) const;
 
-    std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>>
+    std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
     BoundsOffsetError(const SingleFilter *other, Pointer pointer,
                       aos::monotonic_clock::time_point ta_base, double ta,
                       aos::monotonic_clock::time_point tb_base,
diff --git a/aos/network/timestamp_filter_test.cc b/aos/network/timestamp_filter_test.cc
index 5d8eeeb..adb6172 100644
--- a/aos/network/timestamp_filter_test.cc
+++ b/aos/network/timestamp_filter_test.cc
@@ -48,34 +48,36 @@
                 .second};
   }
 
-  std::pair<logger::BootDuration, double> Offset(
+  std::tuple<logger::BootDuration, double, double> Offset(
       const TestingNoncausalTimestampFilter *other, Pointer pointer,
       logger::BootTimestamp ta_base, double ta, size_t sample_boot) const {
-    std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>> result =
-        filter(ta_base.boot, sample_boot)
-            ->filter.Offset(
-                other == nullptr
-                    ? nullptr
-                    : &other->filter(sample_boot, ta_base.boot)->filter,
-                pointer, ta_base.time, ta);
-    return std::make_pair(
-        logger::BootDuration{sample_boot, result.second.first},
-        result.second.second);
+    std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
+        result =
+            filter(ta_base.boot, sample_boot)
+                ->filter.Offset(
+                    other == nullptr
+                        ? nullptr
+                        : &other->filter(sample_boot, ta_base.boot)->filter,
+                    pointer, ta_base.time, ta);
+    return std::make_tuple(
+        logger::BootDuration{sample_boot, std::get<0>(result.second)},
+        std::get<1>(result.second), std::get<2>(result.second));
   }
 
-  std::pair<logger::BootDuration, double> BoundsOffset(
+  std::tuple<logger::BootDuration, double, double> BoundsOffset(
       const TestingNoncausalTimestampFilter *other, Pointer pointer,
       logger::BootTimestamp ta_base, double ta, size_t sample_boot) const {
-    std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>> result =
-        filter(ta_base.boot, sample_boot)
-            ->filter.BoundsOffset(
-                other == nullptr
-                    ? nullptr
-                    : &other->filter(sample_boot, ta_base.boot)->filter,
-                pointer, ta_base.time, ta);
-    return std::make_pair(
-        logger::BootDuration{sample_boot, result.second.first},
-        result.second.second);
+    std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
+        result =
+            filter(ta_base.boot, sample_boot)
+                ->filter.BoundsOffset(
+                    other == nullptr
+                        ? nullptr
+                        : &other->filter(sample_boot, ta_base.boot)->filter,
+                    pointer, ta_base.time, ta);
+    return std::make_tuple(
+        logger::BootDuration{sample_boot, std::get<0>(result.second)},
+        std::get<1>(result.second), std::get<2>(result.second));
   }
 };
 
@@ -866,19 +868,21 @@
   const monotonic_clock::time_point t2 = t1 + chrono::nanoseconds(1000);
   const chrono::nanoseconds o2 = chrono::nanoseconds(150);
 
+  const double slope = 50.0 / 1000.0;
+
   EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
                 std::make_tuple(t1, o1), std::make_tuple(t2, o2), t1),
             o1);
   EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
                 std::make_tuple(t1, o1), std::make_tuple(t2, o2), t1, 0.0),
-            std::make_pair(o1, 0.0));
+            std::make_tuple(o1, 0.0, slope));
 
   EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
                 std::make_tuple(t1, o1), std::make_tuple(t2, o2), t2),
             o2);
   EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
                 std::make_tuple(t1, o1), std::make_tuple(t2, o2), t2, 0.0),
-            std::make_pair(o2, 0.0));
+            std::make_tuple(o2, 0.0, slope));
 
   EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
                 std::make_tuple(t1, o1), std::make_tuple(t2, o2),
@@ -887,7 +891,7 @@
   EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
                 std::make_tuple(t1, o1), std::make_tuple(t2, o2),
                 t1 + chrono::nanoseconds(500), 0.0),
-            std::make_pair(o1 + chrono::nanoseconds(25), 0.0));
+            std::make_tuple(o1 + chrono::nanoseconds(25), 0.0, slope));
 
   EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
                 std::make_tuple(t1, o1), std::make_tuple(t2, o2),
@@ -896,7 +900,7 @@
   EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
                 std::make_tuple(t1, o1), std::make_tuple(t2, o2),
                 t1 - chrono::nanoseconds(200), 0.0),
-            std::make_pair(o1 - chrono::nanoseconds(10), 0.0));
+            std::make_tuple(o1 - chrono::nanoseconds(10), 0.0, slope));
 
   EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
                 std::make_tuple(t1, o1), std::make_tuple(t2, o2),
@@ -905,7 +909,7 @@
   EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
                 std::make_tuple(t1, o1), std::make_tuple(t2, o2),
                 t1 + chrono::nanoseconds(200), 0.0),
-            std::make_pair(o1 + chrono::nanoseconds(10), 0.0));
+            std::make_tuple(o1 + chrono::nanoseconds(10), 0.0, slope));
 
   EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
                 std::make_tuple(t1, o1), std::make_tuple(t2, o2),
@@ -914,7 +918,7 @@
   EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
                 std::make_tuple(t1, o1), std::make_tuple(t2, o2),
                 t1 + chrono::nanoseconds(800), 0.0),
-            std::make_pair(o1 + chrono::nanoseconds(40), 0.0));
+            std::make_tuple(o1 + chrono::nanoseconds(40), 0.0, slope));
 
   EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
                 std::make_tuple(t1, o1), std::make_tuple(t2, o2),
@@ -923,7 +927,7 @@
   EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
                 std::make_tuple(t1, o1), std::make_tuple(t2, o2),
                 t1 + chrono::nanoseconds(1200), 0.0),
-            std::make_pair(o1 + chrono::nanoseconds(60), 0.0));
+            std::make_tuple(o1 + chrono::nanoseconds(60), 0.0, slope));
 
   for (int i = -MaxVelocityRatio::den * MaxVelocityRatio::num * 6;
        i <
@@ -941,18 +945,19 @@
         NoncausalTimestampFilter::InterpolateOffset(
             std::make_tuple(t1, o1), std::make_tuple(t2, o2), ta_base);
 
-    const std::pair<chrono::nanoseconds, double> offset =
+    const std::tuple<chrono::nanoseconds, double, double> offset =
         NoncausalTimestampFilter::InterpolateOffset(
             std::make_tuple(t1, o1), std::make_tuple(t2, o2), ta_base, ta);
-    EXPECT_EQ(expected_offset, offset.first);
+    EXPECT_EQ(expected_offset, std::get<0>(offset));
 
     const double expected_double_offset =
         static_cast<double>(o1.count()) +
         static_cast<double>(ta_orig) / static_cast<double>((t2 - t1).count()) *
             (o2 - o1).count();
 
-    EXPECT_NEAR(static_cast<double>(offset.first.count()) + offset.second,
-                expected_double_offset, 1e-9)
+    EXPECT_NEAR(
+        static_cast<double>(std::get<0>(offset).count()) + std::get<1>(offset),
+        expected_double_offset, 1e-9)
         << ": i " << i << " t " << ta_base << " " << ta << " t1 " << t1
         << " o1 " << o1.count() << "ns t2 " << t2 << " o2 " << o2.count()
         << "ns Non-rounded: " << expected_offset.count() << "ns";
@@ -999,14 +1004,14 @@
   // Test base + double version
   EXPECT_EQ(NoncausalTimestampFilter::ExtrapolateOffset(std::make_tuple(t1, o1),
                                                         e, 0.),
-            std::make_pair(chrono::nanoseconds(90), 0.0));
+            std::make_tuple(chrono::nanoseconds(90), 0.0, kMaxVelocity()));
   EXPECT_EQ(NoncausalTimestampFilter::ExtrapolateOffset(std::make_tuple(t1, o1),
                                                         t1, 0.),
-            std::make_pair(o1, 0.0));
+            std::make_tuple(o1, 0.0, -kMaxVelocity()));
 
   EXPECT_EQ(NoncausalTimestampFilter::ExtrapolateOffset(std::make_tuple(t1, o1),
                                                         t1, 0.5),
-            std::make_pair(o1, -0.5 * kMaxVelocity()));
+            std::make_tuple(o1, -0.5 * kMaxVelocity(), -kMaxVelocity()));
 
   EXPECT_EQ(
       NoncausalTimestampFilter::ExtrapolateOffset(std::make_tuple(t2, o2), t2),
@@ -1014,7 +1019,7 @@
 
   EXPECT_EQ(NoncausalTimestampFilter::ExtrapolateOffset(std::make_tuple(t2, o2),
                                                         t2, 0.0),
-            std::make_pair(o2, 0.0));
+            std::make_tuple(o2, 0.0, -kMaxVelocity()));
 
   // Test points past our last sample
   EXPECT_EQ(NoncausalTimestampFilter::ExtrapolateOffset(
@@ -1022,10 +1027,10 @@
             chrono::nanoseconds(
                 static_cast<int64_t>(o2.count() - 10000. * kMaxVelocity())));
 
-  EXPECT_EQ(
-      NoncausalTimestampFilter::ExtrapolateOffset(
-          std::make_tuple(t2, o2), t2 + chrono::nanoseconds(10000), 0.5),
-      std::make_pair(o2 - chrono::nanoseconds(10), -0.5 * kMaxVelocity()));
+  EXPECT_EQ(NoncausalTimestampFilter::ExtrapolateOffset(
+                std::make_tuple(t2, o2), t2 + chrono::nanoseconds(10000), 0.5),
+            std::make_tuple(o2 - chrono::nanoseconds(10), -0.5 * kMaxVelocity(),
+                            -kMaxVelocity()));
 
   // Now, test that offset + remainder functions add up to the right answer for
   // a lot of cases.  This is enough to catch all the various rounding cases.
@@ -1043,13 +1048,13 @@
         NoncausalTimestampFilter::ExtrapolateOffset(std::make_tuple(t1, o1),
                                                     ta_base);
 
-    std::pair<chrono::nanoseconds, double> offset =
+    std::tuple<chrono::nanoseconds, double, double> offset =
         NoncausalTimestampFilter::ExtrapolateOffset(std::make_tuple(t1, o1),
                                                     ta_base, ta);
 
-    EXPECT_EQ(expected_offset, offset.first);
+    EXPECT_EQ(expected_offset, std::get<0>(offset));
     EXPECT_NEAR(
-        static_cast<double>(offset.first.count()) + offset.second,
+        static_cast<double>(std::get<0>(offset).count()) + std::get<1>(offset),
         static_cast<double>(o1.count()) - std::abs(ta_orig) * kMaxVelocity(),
         1e-9)
         << ": i " << i << " t " << ta_base << " " << ta
@@ -1072,14 +1077,14 @@
             o1);
   EXPECT_EQ(NoncausalTimestampFilter::BoundOffset(
                 std::make_tuple(t1, o1), std::make_tuple(t2, o2), t1, 0.0),
-            std::pair(o1, 0.0));
+            std::tuple(o1, 0.0, -kMaxVelocity()));
 
   EXPECT_EQ(NoncausalTimestampFilter::BoundOffset(std::make_tuple(t1, o1),
                                                   std::make_tuple(t2, o2), t2),
             o2);
   EXPECT_EQ(NoncausalTimestampFilter::BoundOffset(
                 std::make_tuple(t1, o1), std::make_tuple(t2, o2), t2, 0.0),
-            std::pair(o2, 0.0));
+            std::tuple(o2, 0.0, -kMaxVelocity()));
 
   // Iterate from before t1 to after t2 and confirm that the solution is right.
   // We must always be >= than interpolation, and must also be equal to the max
@@ -1114,19 +1119,21 @@
     //  /  \/  \                                                              |
     // /        \                                                             |
 
-    const std::pair<chrono::nanoseconds, double> offset =
+    const std::tuple<chrono::nanoseconds, double, double> offset =
         NoncausalTimestampFilter::BoundOffset(
             std::make_tuple(t1, o1), std::make_tuple(t2, o2), ta_base, ta);
 
-    EXPECT_EQ(std::max(expected_offset_1, expected_offset_2), offset.first);
+    EXPECT_EQ(std::max(expected_offset_1, expected_offset_2),
+              std::get<0>(offset));
 
     const double expected_double_offset = std::max(
         static_cast<double>(o1.count()) - std::abs(ta_orig) * kMaxVelocity(),
         static_cast<double>(o2.count()) -
             std::abs(ta_orig - (t2 - t1).count()) * kMaxVelocity());
 
-    EXPECT_NEAR(static_cast<double>(offset.first.count()) + offset.second,
-                expected_double_offset, 1e-9)
+    EXPECT_NEAR(
+        static_cast<double>(std::get<0>(offset).count()) + std::get<1>(offset),
+        expected_double_offset, 1e-9)
         << ": i " << i << " t " << ta_base << " " << ta << " t1 " << t1
         << " o1 " << o1.count() << "ns t2 " << t2 << " o2 " << o2.count()
         << "ns Non-rounded: "
@@ -1468,9 +1475,15 @@
   const BootTimestamp t2 = e + chrono::microseconds(2000);
   const BootDuration o2{0, chrono::nanoseconds(150)};
 
+  const double slope12 = static_cast<double>((o2 - o1).duration.count()) /
+                         static_cast<double>((t2 - t1).duration.count());
+
   const BootTimestamp t3 = e + chrono::microseconds(3000);
   const BootDuration o3{0, chrono::nanoseconds(50)};
 
+  const double slope23 = static_cast<double>((o3 - o2).duration.count()) /
+                         static_cast<double>((t3 - t2).duration.count());
+
   const BootTimestamp t4 = e + chrono::microseconds(4000);
 
   TestingNoncausalTimestampFilter filter(node_a, node_b);
@@ -1480,87 +1493,88 @@
   // 1 point is handled properly.
   EXPECT_EQ(filter.Offset(nullptr, Pointer(), t1, 0), o1);
   EXPECT_EQ(filter.Offset(nullptr, Pointer(), t1, 0.0, 0),
-            std::make_pair(o1, 0.0));
+            std::make_tuple(o1, 0.0, -kMaxVelocity()));
   EXPECT_EQ(filter.BoundsOffset(nullptr, Pointer(), t1, 0.0, 0),
-            std::make_pair(o1, 0.0));
+            std::make_tuple(o1, 0.0, -kMaxVelocity()));
   // Check if we ask for something away from point that we get an offset
   // based on the MaxVelocity allowed
   const double offset_pre = -(t1.time - e.time).count() * kMaxVelocity();
   EXPECT_EQ(filter.Offset(nullptr, Pointer(), e, 0),
             o1 + chrono::nanoseconds(static_cast<int64_t>(offset_pre)));
-  EXPECT_EQ(
-      filter.Offset(nullptr, Pointer(), e, 0.0, 0),
-      std::make_pair(o1 + chrono::nanoseconds(static_cast<int64_t>(offset_pre)),
-                     0.0));
-  EXPECT_EQ(
-      filter.BoundsOffset(nullptr, Pointer(), e, 0.0, 0),
-      std::make_pair(o1 + chrono::nanoseconds(static_cast<int64_t>(offset_pre)),
-                     0.0));
+  EXPECT_EQ(filter.Offset(nullptr, Pointer(), e, 0.0, 0),
+            std::make_tuple(
+                o1 + chrono::nanoseconds(static_cast<int64_t>(offset_pre)), 0.0,
+                kMaxVelocity()));
+  EXPECT_EQ(filter.BoundsOffset(nullptr, Pointer(), e, 0.0, 0),
+            std::make_tuple(
+                o1 + chrono::nanoseconds(static_cast<int64_t>(offset_pre)), 0.0,
+                kMaxVelocity()));
 
   double offset_post = -(t2.time - t1.time).count() * kMaxVelocity();
   EXPECT_EQ(filter.Offset(nullptr, Pointer(), t2, 0),
             o1 + chrono::nanoseconds(static_cast<int64_t>(offset_post)));
-  EXPECT_EQ(
-      filter.Offset(nullptr, Pointer(), t2, 0.0, 0),
-      std::make_pair(
-          o1 + chrono::nanoseconds(static_cast<int64_t>(offset_post)), 0.0));
-  EXPECT_EQ(
-      filter.BoundsOffset(nullptr, Pointer(), t2, 0.0, 0),
-      std::make_pair(
-          o1 + chrono::nanoseconds(static_cast<int64_t>(offset_post)), 0.0));
+  EXPECT_EQ(filter.Offset(nullptr, Pointer(), t2, 0.0, 0),
+            std::make_tuple(
+                o1 + chrono::nanoseconds(static_cast<int64_t>(offset_post)),
+                0.0, -kMaxVelocity()));
+  EXPECT_EQ(filter.BoundsOffset(nullptr, Pointer(), t2, 0.0, 0),
+            std::make_tuple(
+                o1 + chrono::nanoseconds(static_cast<int64_t>(offset_post)),
+                0.0, -kMaxVelocity()));
 
   filter.Sample(t2, o2);
   filter.Sample(t3, o3);
 
   EXPECT_EQ(filter.Offset(nullptr, Pointer(), t1, 0), o1);
   EXPECT_EQ(filter.BoundsOffset(nullptr, Pointer(), t1, 0.0, 0),
-            std::make_pair(o1, 0.0));
+            std::make_tuple(o1, 0.0, -kMaxVelocity()));
   EXPECT_EQ(filter.Offset(nullptr, Pointer(), t2, 0), o2);
   EXPECT_EQ(filter.BoundsOffset(nullptr, Pointer(), t2, 0.0, 0),
-            std::make_pair(o2, 0.0));
+            std::make_tuple(o2, 0.0, -kMaxVelocity()));
   EXPECT_EQ(filter.Offset(nullptr, Pointer(), t3, 0), o3);
   EXPECT_EQ(filter.BoundsOffset(nullptr, Pointer(), t3, 0.0, 0),
-            std::make_pair(o3, 0.0));
+            std::make_tuple(o3, 0.0, -kMaxVelocity()));
 
   EXPECT_EQ(filter.Offset(nullptr, Pointer(), t1, 0.0, 0),
-            std::make_pair(o1, 0.0));
+            std::make_tuple(o1, 0.0, slope12));
 
   EXPECT_EQ(
       filter.Offset(nullptr, Pointer(),
                     e + (t2.time_since_epoch() + t1.time_since_epoch()) / 2,
                     0.0, 0),
-      std::make_pair(o1 + (o2 - o1) / 2, 0.0));
+      std::make_tuple(o1 + (o2 - o1) / 2, 0.0, slope12));
 
   EXPECT_EQ(filter.Offset(nullptr, Pointer(), t2, 0.0, 0),
-            std::make_pair(o2, 0.0));
+            std::make_tuple(o2, 0.0, slope23));
 
   EXPECT_EQ(
       filter.Offset(nullptr, Pointer(),
                     e + (t2.time_since_epoch() + t3.time_since_epoch()) / 2,
                     0.0, 0),
-      std::make_pair((o2 + o3) / 2, 0.0));
+      std::make_tuple((o2 + o3) / 2, 0.0, slope23));
 
   EXPECT_EQ(filter.Offset(nullptr, Pointer(), t3, 0.0, 0),
-            std::make_pair(o3, 0.0));
+            std::make_tuple(o3, 0.0, -kMaxVelocity()));
 
   // Check that we still get same answer for times before our sample data...
   EXPECT_EQ(filter.Offset(nullptr, Pointer(), e, 0),
             o1 + chrono::nanoseconds(static_cast<int64_t>(offset_pre)));
-  EXPECT_EQ(
-      filter.Offset(nullptr, Pointer(), e, 0.0, 0),
-      std::make_pair(o1 + chrono::nanoseconds(static_cast<int64_t>(offset_pre)),
-                     0.0));
+  EXPECT_EQ(filter.Offset(nullptr, Pointer(), e, 0.0, 0),
+            std::make_tuple(
+                o1 + chrono::nanoseconds(static_cast<int64_t>(offset_pre)), 0.0,
+                kMaxVelocity()));
   // ... and after
   offset_post = -(t4.time - t3.time).count() * kMaxVelocity();
   EXPECT_EQ(filter.Offset(nullptr, Pointer(), t4, 0),
             (o3 + chrono::nanoseconds(static_cast<int64_t>(offset_post))));
-  EXPECT_EQ(
-      filter.Offset(nullptr, Pointer(), t4, 0.0, 0),
-      std::make_pair(
-          o3 + chrono::nanoseconds(static_cast<int64_t>(offset_post)), 0.0));
+  EXPECT_EQ(filter.Offset(nullptr, Pointer(), t4, 0.0, 0),
+            std::make_tuple(
+                o3 + chrono::nanoseconds(static_cast<int64_t>(offset_post)),
+                0.0, -kMaxVelocity()));
 
-  EXPECT_EQ(filter.BoundsOffset(nullptr, Pointer(), t2 + (t3 - t2) / 2, 0.0, 0),
-            std::make_pair(o2 - chrono::nanoseconds(500), 0.0));
+  EXPECT_EQ(
+      filter.BoundsOffset(nullptr, Pointer(), t2 + (t3 - t2) / 2, 0.0, 0),
+      std::make_tuple(o2 - chrono::nanoseconds(500), 0.0, -kMaxVelocity()));
 }
 
 // Tests that adding duplicates gets correctly deduplicated.
diff --git a/aos/network/web_proxy.h b/aos/network/web_proxy.h
index 0c1d1dc..2b57c05 100644
--- a/aos/network/web_proxy.h
+++ b/aos/network/web_proxy.h
@@ -77,6 +77,8 @@
            int per_channel_buffer_size_bytes);
   WebProxy(aos::ShmEventLoop *event_loop, StoreHistory store_history,
            int per_channel_buffer_size_bytes);
+  WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
+           StoreHistory store_history, int per_channel_buffer_size_bytes);
   ~WebProxy();
 
   void SetDataPath(const char *path) { server_.setStaticPath(path); }
@@ -85,9 +87,6 @@
   void StopRecording();
 
  private:
-  WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
-           StoreHistory store_history, int per_channel_buffer_size_bytes);
-
   aos::internal::EPoll internal_epoll_;
   aos::internal::EPoll *const epoll_;
   ::seasocks::Server server_;
diff --git a/aos/util/BUILD b/aos/util/BUILD
index faa3dc1..bc62f87 100644
--- a/aos/util/BUILD
+++ b/aos/util/BUILD
@@ -385,3 +385,24 @@
     visibility = ["//visibility:public"],
     deps = ["//aos:python_init"],
 )
+
+cc_library(
+    name = "threaded_queue",
+    hdrs = [
+        "threaded_queue.h",
+        "threaded_queue_tmpl.h",
+    ],
+    deps = [
+        "//aos:condition",
+        "//aos/mutex",
+    ],
+)
+
+cc_test(
+    name = "threaded_queue_test",
+    srcs = ["threaded_queue_test.cc"],
+    deps = [
+        ":threaded_queue",
+        "//aos/testing:googletest",
+    ],
+)
diff --git a/aos/util/bitpacking_test.cc b/aos/util/bitpacking_test.cc
index 80d1977..087c719 100644
--- a/aos/util/bitpacking_test.cc
+++ b/aos/util/bitpacking_test.cc
@@ -1,5 +1,6 @@
 #include "aos/util/bitpacking.h"
 
+#include <array>
 #include <cstdint>
 
 #include "gtest/gtest.h"
diff --git a/aos/util/threaded_queue.h b/aos/util/threaded_queue.h
new file mode 100644
index 0000000..0b06632
--- /dev/null
+++ b/aos/util/threaded_queue.h
@@ -0,0 +1,99 @@
+#ifndef AOS_UTIL_THREADED_QUEUE_H_
+#define AOS_UTIL_THREADED_QUEUE_H_
+#include <functional>
+#include <optional>
+#include <queue>
+#include <thread>
+
+#include "aos/condition.h"
+#include "aos/mutex/mutex.h"
+
+namespace aos::util {
+// This class implements a queue of objects of type T in which a worker thread
+// pushes and the calling thread pops from the queue.
+//
+// This is setup such that the user will pass a worker function that will get
+// called in a separate thread whenever we think that there might be something
+// to trigger more work to be done. All the methods on this calss are intended
+// to be called from the main thread (not from within the worker function).
+// Communication between the main thread and the worker can be achieved either
+// by manually handling your own state, or preferably by using the SharedState
+// object, which will get passed into the worker. The worker gets called every
+// time that SetState() is called.
+template <typename T, typename SharedState>
+class ThreadedQueue {
+ public:
+  // PushResult is returned from the worker to indicate its current state.
+  struct PushResult {
+    // The new item to push, if any. If set to nullopt, nothing gets pushed.
+    std::optional<T> item = std::nullopt;
+    // Set to true if the worker knows that there is more work that it has to do
+    // and so should be immediately called again. If set to false, then the
+    // worker will not get called again until one of the following events
+    // occurs:
+    // 1) The queue is successfully popped from.
+    // 2) SetState() is called.
+    bool more_to_push = false;
+    // Set to true if there is no more work to be done. The worker should not
+    // get called after setting done to true.
+    bool done = false;
+  };
+  ThreadedQueue(
+      std::function<PushResult(SharedState)> push_request_handler,
+      SharedState initial_state);
+  ~ThreadedQueue();
+  // Sets state. Triggers a new call to push_request_handler.
+  void SetState(const SharedState &state);
+  // Returns the current front of the queue, blocking until a new item is
+  // available. Will only return nullopt if done is true and there will be no
+  // more items in the queue.
+  std::optional<T> Peek();
+  // Identical to Peek(), except that it also removes the item from the queue.
+  std::optional<T> Pop();
+  // Waits until the push_request_handler has returned more_to_push = false, and
+  // so is just spinning. Useful if you want ensure that the worker has done all
+  // the work it can before going to the next step.
+  void WaitForNoMoreWork();
+  // Stops any further calls to push_request_handler. Used to terminate the
+  // queue from the calling thread.
+  void StopPushing();
+
+ private:
+  // Safely grabs the current state and returns a copy.
+  SharedState State();
+  // Implements the Peek()/Pop() methods, blocking until we are either done or
+  // the next item is available in the queue.
+  std::optional<T> PeekOrPop(bool pop);
+
+  // Mutex controlling access to all shared state (in this case, all the members
+  // of this class).
+  aos::Mutex mutex_;
+  // Condition variable used to indicate when anything has happened that should
+  // cause us to check for whether the worker can add anything new to the queue.
+  // Called "popped_" in reference to that it may be called when an item is
+  // popped from the queue.
+  aos::Condition popped_;
+  // Condition variable to indicate when an item has either been pushed to the
+  // queue or there is some other state change that consumers may care about
+  // (e.g., being done).
+  aos::Condition pushed_;
+  // TODO(jkuszmaul): Evaluate impact of dynamic memory allocation in
+  // std::queue, consider using aos::RingBuffer or similarly statically
+  // allocated buffer.
+  std::queue<T> queue_;
+  // Whether we are done processing entirely.
+  bool done_{false};
+  // Set while the pusher thread is waiting on popped_.
+  // Used to notice when the push handler is out of work to do.
+  bool pusher_waiting_{false};
+  // Set when SetState() is called, cleared when State() is read. Used to track
+  // whether the push handler has been called with the most recent state.
+  bool state_updated_{false};
+  SharedState state_;
+  std::thread pusher_thread_;
+};
+
+}  // namespace aos::util
+
+#include "aos/util/threaded_queue_tmpl.h"
+#endif  // AOS_UTIL_THREADED_QUEUE_H_
diff --git a/aos/util/threaded_queue_test.cc b/aos/util/threaded_queue_test.cc
new file mode 100644
index 0000000..4f27c57
--- /dev/null
+++ b/aos/util/threaded_queue_test.cc
@@ -0,0 +1,105 @@
+#include "aos/util/threaded_queue.h"
+
+#include "gtest/gtest.h"
+
+namespace aos::util {
+
+TEST(ThreadedQueueTest, BasicFunction) {
+  std::atomic<int> counter{10000};
+  int state = 0;
+  std::atomic<int> observed_state{0};
+  ThreadedQueue<int, int> queue(
+      [&counter, &observed_state](const int state) {
+        // Because this handler always returns more_to_push = false, it will
+        // only get called when the queue is popped from.
+        observed_state = state;
+        int count = --counter;
+        return ThreadedQueue<int, int>::PushResult{count, false, count == 0};
+      },
+      state);
+  while (true) {
+    std::optional<int> peek_result = queue.Peek();
+    std::optional<int> pop_result = queue.Pop();
+    ASSERT_EQ(peek_result.has_value(), pop_result.has_value());
+    if (peek_result.has_value()) {
+      ASSERT_EQ(peek_result.value(), pop_result.value());
+    } else {
+      break;
+    }
+    state++;
+    queue.SetState(state);
+  }
+  ASSERT_EQ(counter, 0);
+  // Our API doesn't make any guarantee about the push/pop cycle being kept in
+  // lock-step, so just check that the observed state got incremente at all.
+  ASSERT_LT(1, observed_state);
+  ASSERT_EQ(state, 10000);
+}
+
+// Test running a queue where the consumer wants to have X entries pre-loaded
+// and so communicates its current state back to the pusher.
+TEST(ThreadedQueueTest, StatefulQueue) {
+  std::atomic<int> counter{10000};
+  int state = counter;
+  constexpr int kMaxLookahead = 10;
+  std::atomic<int> observed_state{0};
+  ThreadedQueue<int, int> queue(
+      [&counter, &observed_state](const int state) {
+        observed_state = state;
+        if (counter + kMaxLookahead < state) {
+          return ThreadedQueue<int, int>::PushResult{std::nullopt, false,
+                                                     counter == 0};
+        } else {
+          int count = --counter;
+          return ThreadedQueue<int, int>::PushResult{count, true, count == 0};
+        }
+      },
+      state);
+  while (true) {
+    std::optional<int> peek_result = queue.Peek();
+    std::optional<int> pop_result = queue.Pop();
+    ASSERT_EQ(peek_result.has_value(), pop_result.has_value());
+    if (peek_result.has_value()) {
+      ASSERT_EQ(peek_result.value(), pop_result.value());
+    } else {
+      break;
+    }
+    for (int ii = 0; ii < 2 * kMaxLookahead; ++ii) {
+      // Trigger the internal condition variable a bunch of times to cause
+      // trouble.
+      queue.Peek();
+    }
+    // The pusher should never be more than the permissible distance ahead.
+    ASSERT_GE(counter + kMaxLookahead + 1, state);
+    ASSERT_GE(observed_state, state);
+    state--;
+    queue.SetState(state);
+    // Periodically pause, ensure that the pusher has enough time to catch up,
+    // and check that it has indeed pre-queued kMaxLookahead items.
+    if (state % 1000 == 0 && state > 0) {
+      queue.WaitForNoMoreWork();
+      ASSERT_EQ(observed_state, state);
+      ASSERT_EQ(counter + kMaxLookahead + 1, state);
+    }
+  }
+  ASSERT_EQ(counter, 0);
+  ASSERT_EQ(state, 0);
+}
+
+
+// Tests that we can exit early without any issues.
+TEST(ThreadedQueueTest, ExitEarly) {
+  // There used to exist a deadlock in this case where StopPushing would
+  // improperly synchronize things internally, but required very (un)lucky
+  // timing to hit.
+  for (int ii = 0; ii < 10000; ++ii) {
+    ThreadedQueue<int, int> queue(
+        [](int) {
+          return ThreadedQueue<int, int>::PushResult{971, false, false};
+        },
+        0);
+    queue.StopPushing();
+  }
+}
+
+}  // namespace aos::util
diff --git a/aos/util/threaded_queue_tmpl.h b/aos/util/threaded_queue_tmpl.h
new file mode 100644
index 0000000..cb4cdfc
--- /dev/null
+++ b/aos/util/threaded_queue_tmpl.h
@@ -0,0 +1,99 @@
+namespace aos::util {
+template <typename T, typename SharedState>
+ThreadedQueue<T, SharedState>::ThreadedQueue(
+    std::function<PushResult(SharedState)> push_request_handler,
+    SharedState initial_state)
+    : popped_(&mutex_),
+      pushed_(&mutex_),
+      state_(initial_state),
+      pusher_thread_([this, push_request_handler]() {
+        while (true) {
+          PushResult result = push_request_handler(State());
+          {
+            MutexLocker locker(&mutex_);
+            done_ = done_ || result.done;
+            if (result.item.has_value()) {
+              queue_.push(std::move(result.item.value()));
+            }
+            pushed_.Broadcast();
+            if (done_) {
+              return;
+            }
+            if (result.more_to_push || state_updated_) {
+              continue;
+            } else {
+              pusher_waiting_ = true;
+              CHECK(!popped_.Wait());
+              pusher_waiting_ = false;
+            }
+          }
+        }
+      }) {}
+
+template <typename T, typename SharedState>
+ThreadedQueue<T, SharedState>::~ThreadedQueue() {
+  StopPushing();
+  pusher_thread_.join();
+}
+
+template <typename T, typename SharedState>
+void ThreadedQueue<T, SharedState>::WaitForNoMoreWork() {
+  MutexLocker locker(&mutex_);
+  while (state_updated_ || (!pusher_waiting_ && !done_)) {
+    CHECK(!pushed_.Wait());
+  }
+}
+
+template <typename T, typename SharedState>
+SharedState ThreadedQueue<T, SharedState>::State() {
+  MutexLocker locker(&mutex_);
+  state_updated_ = false;
+  return state_;
+}
+
+template <typename T, typename SharedState>
+void ThreadedQueue<T, SharedState>::SetState(const SharedState &state) {
+  MutexLocker locker(&mutex_);
+  state_ = state;
+  state_updated_ = true;
+  popped_.Broadcast();
+}
+
+template <typename T, typename SharedState>
+void ThreadedQueue<T, SharedState>::StopPushing() {
+  // Ensure that the mutex is locked before doing anything, to make sure that
+  // the pushing thread actually observes the change.
+  MutexLocker locker(&mutex_);
+  done_ = true;
+  popped_.Broadcast();
+}
+
+template <typename T, typename SharedState>
+std::optional<T> ThreadedQueue<T, SharedState>::Peek() {
+  return PeekOrPop(false);
+}
+
+template <typename T, typename SharedState>
+std::optional<T> ThreadedQueue<T, SharedState>::Pop() {
+  return PeekOrPop(true);
+}
+
+template <typename T, typename SharedState>
+std::optional<T> ThreadedQueue<T, SharedState>::PeekOrPop(bool pop) {
+  MutexLocker locker(&mutex_);
+  while (!done_ && queue_.empty()) {
+    CHECK(!pushed_.Wait());
+  }
+  if (queue_.empty()) {
+    return std::nullopt;
+  }
+  if (pop) {
+    T result = std::move(queue_.front());
+    queue_.pop();
+    popped_.Broadcast();
+    return result;
+  } else {
+    return queue_.front();
+  }
+}
+}  // namespace aos::util
diff --git a/frc971/analysis/in_process_plotter.cc b/frc971/analysis/in_process_plotter.cc
index 545740d..33d999e 100644
--- a/frc971/analysis/in_process_plotter.cc
+++ b/frc971/analysis/in_process_plotter.cc
@@ -15,7 +15,8 @@
       event_loop_factory_(&config_.message()),
       event_loop_(event_loop_factory_.MakeEventLoop("plotter")),
       plot_sender_(event_loop_->MakeSender<Plot>("/analysis")),
-      web_proxy_(event_loop_.get(), aos::web_proxy::StoreHistory::kYes, -1),
+      web_proxy_(event_loop_.get(), event_loop_factory_.scheduler_epoll(),
+                 aos::web_proxy::StoreHistory::kYes, -1),
       builder_(plot_sender_.MakeBuilder()) {
   web_proxy_.SetDataPath(kDataPath);
   event_loop_->SkipTimingReport();
@@ -41,7 +42,11 @@
       ColorWheelColor{.name = "white", .color = {1, 1, 1}});
 }
 
-void Plotter::Spin() { event_loop_factory_.Run(); }
+void Plotter::Spin() {
+  // Set non-infinite replay rate to avoid pegging a full CPU.
+  event_loop_factory_.SetRealtimeReplayRate(1.0);
+  event_loop_factory_.Run();
+}
 
 void Plotter::Title(std::string_view title) {
   title_ = builder_.fbb()->CreateString(title);
diff --git a/y2020/control_loops/drivetrain/localizer_test.cc b/y2020/control_loops/drivetrain/localizer_test.cc
index f0bcd88..91b5505 100644
--- a/y2020/control_loops/drivetrain/localizer_test.cc
+++ b/y2020/control_loops/drivetrain/localizer_test.cc
@@ -91,7 +91,7 @@
   return locations;
 }
 
-constexpr std::chrono::seconds kPiTimeOffset(-10);
+constexpr std::chrono::seconds kPiTimeOffset(10);
 }  // namespace
 
 namespace chrono = std::chrono;