Allow simulated nodes to startup after global startup

This lets us not boot nodes until their monotonic clock has reached
zero.

This also changes the semantics of OnStartup slightly--even if all the
nodes startup at the start of the simulation, they will each complete
their own startup sequence before going to the next node. This doesn't
appear to have had any negative consequences (and is similar
to if the nodes had tiny monotonic clock offsets that forced several of
the nodes to start late), but is a change.

Change-Id: I25d343b9509a3cdae6db9747f60a212f1cb21187
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/BUILD b/aos/events/BUILD
index be65594..deca15b 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -396,6 +396,7 @@
     target_compatible_with = ["@platforms//os:linux"],
     deps = [
         ":simulated_event_loop",
+        "//aos/network:testing_time_converter",
         "//aos/testing:googletest",
         "@com_github_google_glog//:glog",
     ],
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index e8ad5af..0eeecf8 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -753,8 +753,7 @@
 }
 
 // Verify that setting up a timer before monotonic_clock::epoch() fails.
-// TODO(james): Re-enable when LogReader handles startup correctly.
-TEST_P(AbstractEventLoopDeathTest, DISABLED_NegativeTimeTimer) {
+TEST_P(AbstractEventLoopDeathTest, NegativeTimeTimer) {
   auto loop = Make();
   TimerHandler *time = loop->AddTimer([]() {});
   EXPECT_DEATH(time->Setup(monotonic_clock::epoch() - std::chrono::seconds(1)),
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index e7b9641..97c1e83 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -10,6 +10,7 @@
 
 EventScheduler::Token EventScheduler::Schedule(monotonic_clock::time_point time,
                                                Event *callback) {
+  CHECK_LE(monotonic_clock::epoch(), time);
   return events_list_.emplace(time, callback);
 }
 
@@ -35,6 +36,12 @@
 }
 
 aos::monotonic_clock::time_point EventScheduler::OldestEvent() {
+  // If we haven't started yet, schedule a special event for the epoch to allow
+  // ourselves to boot.
+  if (!called_started_) {
+    return aos::monotonic_clock::epoch();
+  }
+
   if (events_list_.empty()) {
     return monotonic_clock::max_time;
   }
@@ -42,14 +49,27 @@
   return events_list_.begin()->first;
 }
 
-void EventScheduler::Shutdown() { on_shutdown_(); }
+void EventScheduler::Shutdown() {
+  CHECK(!is_running_);
+  on_shutdown_();
+}
 
 void EventScheduler::Startup() {
   ++boot_count_;
-  RunOnStartup();
+  CHECK(!is_running_);
+  MaybeRunOnStartup();
+  CHECK(called_started_);
 }
 
 void EventScheduler::CallOldestEvent() {
+  if (!called_started_) {
+    // If we haven't started, start.
+    MaybeRunOnStartup();
+    MaybeRunOnRun();
+    CHECK(called_started_);
+    return;
+  }
+  CHECK(is_running_);
   CHECK_GT(events_list_.size(), 0u);
   auto iter = events_list_.begin();
   const logger::BootTimestamp t =
@@ -66,6 +86,7 @@
 }
 
 void EventScheduler::RunOnRun() {
+  CHECK(is_running_);
   while (!on_run_.empty()) {
     std::function<void()> fn = std::move(*on_run_.begin());
     on_run_.erase(on_run_.begin());
@@ -75,6 +96,7 @@
 
 void EventScheduler::RunOnStartup() noexcept {
   while (!on_startup_.empty()) {
+    CHECK(!is_running_);
     std::function<void()> fn = std::move(*on_startup_.begin());
     on_startup_.erase(on_startup_.begin());
     fn();
@@ -82,14 +104,39 @@
 }
 
 void EventScheduler::RunStarted() {
+  CHECK(!is_running_);
   if (started_) {
     started_();
   }
+  is_running_ = true;
 }
 
-void EventScheduler::RunStopped() {
-  if (stopped_) {
-    stopped_();
+void EventScheduler::MaybeRunStopped() {
+  CHECK(is_running_);
+  is_running_ = false;
+  if (called_started_) {
+    called_started_ = false;
+    if (stopped_) {
+      stopped_();
+    }
+  }
+}
+
+void EventScheduler::MaybeRunOnStartup() {
+  CHECK(!called_started_);
+  CHECK(!is_running_);
+  const logger::BootTimestamp t =
+      FromDistributedClock(scheduler_scheduler_->distributed_now());
+  if (t.boot == boot_count_ && t.time >= monotonic_clock::epoch()) {
+    called_started_ = true;
+    RunOnStartup();
+  }
+}
+
+void EventScheduler::MaybeRunOnRun() {
+  if (called_started_) {
+    RunStarted();
+    RunOnRun();
   }
 }
 
@@ -110,6 +157,88 @@
   scheduler->scheduler_scheduler_ = this;
 }
 
+void EventSchedulerScheduler::MaybeRunStopped() {
+  CHECK(!is_running_);
+  for (EventScheduler *scheduler : schedulers_) {
+    if (scheduler->is_running()) {
+      scheduler->MaybeRunStopped();
+    }
+  }
+}
+
+bool EventSchedulerScheduler::RunUntil(
+    realtime_clock::time_point end_time, EventScheduler *scheduler,
+    std::function<std::chrono::nanoseconds()> fn_realtime_offset) {
+  logging::ScopedLogRestorer prev_logger;
+  MaybeRunOnStartup();
+
+  bool reached_end_time = false;
+
+  RunMaybeRealtimeLoop([this, scheduler, end_time, fn_realtime_offset,
+                        &reached_end_time]() {
+    std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
+        OldestEvent();
+    aos::distributed_clock::time_point oldest_event_time_distributed =
+        std::get<0>(oldest_event);
+    logger::BootTimestamp test_time_monotonic =
+        scheduler->FromDistributedClock(oldest_event_time_distributed);
+    realtime_clock::time_point oldest_event_realtime(
+        test_time_monotonic.time_since_epoch() + fn_realtime_offset());
+
+    if ((std::get<0>(oldest_event) == distributed_clock::max_time) ||
+        (oldest_event_realtime > end_time &&
+         (reboots_.empty() ||
+          std::get<0>(reboots_.front()) > oldest_event_time_distributed))) {
+      is_running_ = false;
+      reached_end_time = true;
+
+      // We have to nudge our time back to the distributed time
+      // corresponding to our desired realtime time.
+      const aos::monotonic_clock::time_point end_monotonic =
+          aos::monotonic_clock::epoch() + end_time.time_since_epoch() -
+          fn_realtime_offset();
+      const aos::distributed_clock::time_point end_time_distributed =
+          scheduler->ToDistributedClock(end_monotonic);
+
+      now_ = end_time_distributed;
+
+      return;
+    }
+
+    if (!reboots_.empty() &&
+        std::get<0>(reboots_.front()) <= std::get<0>(oldest_event)) {
+      // Reboot is next.
+      CHECK_LE(now_,
+               std::get<0>(reboots_.front()) + std::chrono::nanoseconds(1))
+          << ": Simulated time went backwards by too much.  Please "
+             "investigate.";
+      now_ = std::get<0>(reboots_.front());
+      Reboot();
+      reboots_.erase(reboots_.begin());
+      return;
+    }
+
+    // We get to pick our tradeoffs here.  Either we assume that there are
+    // no backward step changes in our time function for each node, or we
+    // have to let time go backwards.  We currently only really see this
+    // happen when 2 events are scheduled for "now", time changes, and
+    // there is a nanosecond or two of rounding due to integer math.
+    //
+    // //aos/events/logging:logger_test triggers this.
+    CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
+        << ": Simulated time went backwards by too much.  Please "
+           "investigate.";
+
+    now_ = std::get<0>(oldest_event);
+
+    std::get<1>(oldest_event)->CallOldestEvent();
+  });
+
+  MaybeRunStopped();
+
+  return reached_end_time;
+}
+
 void EventSchedulerScheduler::Reboot() {
   const std::vector<logger::BootTimestamp> &times =
       std::get<1>(reboots_.front());
@@ -131,7 +260,7 @@
       rebooted.emplace_back(node_index);
       CHECK_EQ(schedulers_[node_index]->boot_count() + 1,
                times[node_index].boot);
-      schedulers_[node_index]->RunStopped();
+      schedulers_[node_index]->MaybeRunStopped();
       schedulers_[node_index]->Shutdown();
     }
   }
@@ -140,16 +269,10 @@
   // (especially message_bridge), it could try to send stuff out.  We want
   // to move everything over to the new boot before doing that.
   for (const size_t node_index : rebooted) {
-    CHECK_EQ(schedulers_[node_index]->boot_count() + 1, times[node_index].boot);
     schedulers_[node_index]->Startup();
   }
-
   for (const size_t node_index : rebooted) {
-    schedulers_[node_index]->RunStarted();
-  }
-
-  for (const size_t node_index : rebooted) {
-    schedulers_[node_index]->RunOnRun();
+    schedulers_[node_index]->MaybeRunOnRun();
   }
   is_running_ = true;
 }
@@ -157,8 +280,7 @@
 void EventSchedulerScheduler::RunFor(distributed_clock::duration duration) {
   distributed_clock::time_point end_time = now_ + duration;
   logging::ScopedLogRestorer prev_logger;
-  RunOnStartup();
-  RunOnRun();
+  MaybeRunOnStartup();
 
   // Run all the sub-event-schedulers.
   RunMaybeRealtimeLoop([this, end_time]() {
@@ -199,6 +321,7 @@
     // //aos/events/logging:logger_test triggers this.
     CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
         << ": Simulated time went backwards by too much.  Please investigate.";
+    // push time forwards
     now_ = std::get<0>(oldest_event);
 
     std::get<1>(oldest_event)->CallOldestEvent();
@@ -206,15 +329,15 @@
 
   now_ = end_time;
 
-  RunStopped();
+  MaybeRunStopped();
 }
 
 void EventSchedulerScheduler::Run() {
   logging::ScopedLogRestorer prev_logger;
-  RunOnStartup();
-  RunOnRun();
+  MaybeRunOnStartup();
+
+  // Run all the sub-event-schedulers.
   RunMaybeRealtimeLoop([this]() {
-    // Run all the sub-event-schedulers.
     std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
         OldestEvent();
     if (!reboots_.empty() &&
@@ -249,7 +372,7 @@
     std::get<1>(oldest_event)->CallOldestEvent();
   });
 
-  RunStopped();
+  MaybeRunStopped();
 }
 
 template <typename F>
@@ -329,12 +452,23 @@
   const bool was_running = is_running_;
   if (is_running_) {
     is_running_ = false;
-    RunStopped();
+    MaybeRunStopped();
   }
   fn();
   if (was_running) {
-    RunOnStartup();
-    RunOnRun();
+    MaybeRunOnStartup();
+  }
+}
+
+void EventSchedulerScheduler::MaybeRunOnStartup() {
+  is_running_ = true;
+  for (EventScheduler *scheduler : schedulers_) {
+    scheduler->MaybeRunOnStartup();
+  }
+  // We must trigger all the OnRun's *after* all the OnStartup callbacks are
+  // triggered because that is the contract that we have stated.
+  for (EventScheduler *scheduler : schedulers_) {
+    scheduler->MaybeRunOnRun();
   }
 }
 
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index b14d0f8..237a240 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -109,47 +109,50 @@
   // Returns an iterator to the event
   Token Schedule(monotonic_clock::time_point time, Event *callback);
 
-  // Schedules a callback when the event scheduler starts.
+  // Schedules a callback whenever the event scheduler starts, after we have
+  // entered the running state. Callbacks are cleared after being called once.
+  // Will not get called until a node starts (a node does not start until its
+  // monotonic clock has reached at least monotonic_clock::epoch()).
   void ScheduleOnRun(std::function<void()> callback) {
     on_run_.emplace_back(std::move(callback));
   }
 
-  // Schedules a callback when the event scheduler starts.
+  // Schedules a callback whenever the event scheduler starts, before we have
+  // entered the running state. Callbacks are cleared after being called once.
+  // Will not get called until a node starts (a node does not start until its
+  // monotonic clock has reached at least monotonic_clock::epoch()).
   void ScheduleOnStartup(std::function<void()> callback) {
     on_startup_.emplace_back(std::move(callback));
   }
 
+  // Schedules a callback for whenever a node reboots, after we have exited the
+  // running state. Does not get called when the event scheduler stops (unless
+  // it is stopping to execute the reboot).
   void set_on_shutdown(std::function<void()> callback) {
     on_shutdown_ = std::move(callback);
   }
 
+  // Identical to ScheduleOnStartup, except that only one callback may get set
+  // and it will not be cleared after being called.
   void set_started(std::function<void()> callback) {
     started_ = std::move(callback);
   }
 
+  // Schedules a callback for whenever the scheduler exits the running state
+  // (running will be false during the callback). This includes both node
+  // reboots and the end of regular execution. Will not be called if the node
+  // never started.
   void set_stopped(std::function<void()> callback) {
     stopped_ = std::move(callback);
   }
 
-  std::function<void()> started_;
-  std::function<void()> stopped_;
-  std::function<void()> on_shutdown_;
-
   Token InvalidToken() { return events_list_.end(); }
 
   // Deschedule an event by its iterator
   void Deschedule(Token token);
 
-  // Runs the OnRun callbacks.
-  void RunOnRun();
-
-  // Runs the OnStartup callbacks.
-  void RunOnStartup() noexcept;
-
   // Runs the Started callback.
-  void RunStarted();
-  // Runs the Started callback.
-  void RunStopped();
+  void MaybeRunStopped();
 
   // Returns true if events are being handled.
   inline bool is_running() const;
@@ -186,12 +189,24 @@
 
   size_t node_index() const { return node_index_; }
 
+ private:
+  friend class EventSchedulerScheduler;
+
+  // Runs the OnRun callbacks.
+  void RunOnRun();
+
+  // Runs the OnStartup callbacks.
+  void RunOnStartup() noexcept;
+
+  // Runs the Started callback.
+  void RunStarted();
+
   // For implementing reboots.
   void Shutdown();
   void Startup();
 
- private:
-  friend class EventSchedulerScheduler;
+  void MaybeRunOnStartup();
+  void MaybeRunOnRun();
 
   // Current execution time.
   monotonic_clock::time_point monotonic_now_ = monotonic_clock::epoch();
@@ -214,6 +229,15 @@
   // distinguish which one.
   size_t node_index_ = 0;
 
+  // Whether this individual scheduler is currently running.
+  bool is_running_ = false;
+  // Whether we have called all the startup handlers during this boot.
+  bool called_started_ = false;
+
+  std::function<void()> started_;
+  std::function<void()> stopped_;
+  std::function<void()> on_shutdown_;
+
   // Converts time by doing nothing to it.
   class UnityConverter final : public TimeConverter {
    public:
@@ -271,8 +295,6 @@
   // Stops running.
   void Exit() { is_running_ = false; }
 
-  bool is_running() const { return is_running_; }
-
   // Runs for a duration on the distributed clock.  Time on the distributed
   // clock should be very representative of time on each node, but won't be
   // exactly the same.
@@ -285,26 +307,16 @@
   void SetReplayRate(double replay_rate) { replay_rate_ = replay_rate; }
   internal::EPoll *epoll() { return &epoll_; }
 
+  // Run until time.  fn_realtime_offset is a function that returns the
+  // realtime offset.
+  // Returns true if it ran until time (i.e., Exit() was not called before
+  // end_time).
+  bool RunUntil(realtime_clock::time_point end_time, EventScheduler *scheduler,
+                std::function<std::chrono::nanoseconds()> fn_realtime_offset);
+
   // Returns the current distributed time.
   distributed_clock::time_point distributed_now() const { return now_; }
 
-  void RunOnStartup() {
-    CHECK(!is_running_);
-    for (EventScheduler *scheduler : schedulers_) {
-      scheduler->RunOnStartup();
-    }
-    for (EventScheduler *scheduler : schedulers_) {
-      scheduler->RunStarted();
-    }
-  }
-
-  void RunStopped() {
-    CHECK(!is_running_);
-    for (EventScheduler *scheduler : schedulers_) {
-      scheduler->RunStopped();
-    }
-  }
-
   void SetTimeConverter(TimeConverter *time_converter) {
     time_converter->set_reboot_found(
         [this](distributed_clock::time_point reboot_time,
@@ -322,17 +334,11 @@
   void TemporarilyStopAndRun(std::function<void()> fn);
 
  private:
-  // Handles running the OnRun functions.
-  void RunOnRun() {
-    CHECK(!is_running_);
-    is_running_ = true;
-    for (EventScheduler *scheduler : schedulers_) {
-      scheduler->RunOnRun();
-    }
-  }
-
   void Reboot();
 
+  void MaybeRunStopped();
+  void MaybeRunOnStartup();
+
   // Returns the next event time and scheduler on which to run it.
   std::tuple<distributed_clock::time_point, EventScheduler *> OldestEvent();
 
@@ -370,9 +376,7 @@
   return t.time;
 }
 
-inline bool EventScheduler::is_running() const {
-  return scheduler_scheduler_->is_running();
-}
+inline bool EventScheduler::is_running() const { return is_running_; }
 
 }  // namespace aos
 
diff --git a/aos/events/event_scheduler_test.cc b/aos/events/event_scheduler_test.cc
index 54fb91a..c32399c 100644
--- a/aos/events/event_scheduler_test.cc
+++ b/aos/events/event_scheduler_test.cc
@@ -2,6 +2,7 @@
 
 #include <chrono>
 
+#include "aos/network/testing_time_converter.h"
 #include "gtest/gtest.h"
 
 namespace aos {
@@ -67,6 +68,16 @@
   std::vector<UUID> uuids_;
 };
 
+class FunctionEvent : public EventScheduler::Event {
+ public:
+  FunctionEvent(std::function<void()> fn) : fn_(fn) {}
+
+  void Handle() noexcept override { fn_(); }
+
+ private:
+  std::function<void()> fn_;
+};
+
 // Tests that the default parameters (slope of 1, offest of 0) behave as
 // an identity.
 TEST(EventSchedulerTest, IdentityTimeConversion) {
@@ -108,4 +119,408 @@
             distributed_clock::epoch() + chrono::seconds(1));
 }
 
+// Test that RunUntil() stops at the appointed time and returns correctly.
+TEST(EventSchedulerTest, RunUntil) {
+  int counter = 0;
+  EventSchedulerScheduler scheduler_scheduler;
+  EventScheduler scheduler(0);
+  scheduler_scheduler.AddEventScheduler(&scheduler);
+
+  FunctionEvent e([&counter]() { counter += 1; });
+  FunctionEvent quitter(
+      [&scheduler_scheduler]() { scheduler_scheduler.Exit(); });
+  scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+  scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(3), &quitter);
+  scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(5), &e);
+  ASSERT_TRUE(scheduler_scheduler.RunUntil(
+      realtime_clock::epoch() + std::chrono::seconds(2), &scheduler,
+      []() { return std::chrono::nanoseconds{0}; }));
+  EXPECT_EQ(counter, 1);
+  ASSERT_FALSE(scheduler_scheduler.RunUntil(
+      realtime_clock::epoch() + std::chrono::seconds(4), &scheduler,
+      []() { return std::chrono::nanoseconds{0}; }));
+  EXPECT_EQ(counter, 1);
+  ASSERT_TRUE(scheduler_scheduler.RunUntil(
+      realtime_clock::epoch() + std::chrono::seconds(6), &scheduler,
+      []() { return std::chrono::nanoseconds{0}; }));
+  EXPECT_EQ(counter, 2);
+}
+
+enum class RunMode {
+  kRun,
+  kRunUntil,
+  kRunFor,
+};
+
+// Sets up a parameterized test case that will excercise all three of the Run(),
+// RunFor(), and RunUntil() methods of the EventSchedulerScheduler. This exposes
+// a ParamRunFor() to the test case that will nominally run for the specified
+// time (except for when in kRun mode, where it will just call Run()).
+class EventSchedulerParamTest : public testing::TestWithParam<RunMode> {
+ public:
+  EventSchedulerParamTest() {
+    schedulers_.reserve(kNumNodes);
+    for (size_t ii = 0; ii < kNumNodes; ++ii) {
+      schedulers_.emplace_back(ii);
+      schedulers_.back().SetTimeConverter(ii, &time_);
+      scheduler_scheduler_.AddEventScheduler(&schedulers_.back());
+    }
+    scheduler_scheduler_.SetTimeConverter(&time_);
+  }
+
+  void StartClocksAtEpoch() {
+    time_.AddMonotonic({BootTimestamp::epoch(), BootTimestamp::epoch()});
+  }
+
+ protected:
+  static constexpr size_t kNumNodes = 2;
+
+  void CheckSchedulersRunning(bool running) {
+    for (EventScheduler &scheduler : schedulers_) {
+      EXPECT_EQ(running, scheduler.is_running());
+    }
+  }
+
+  void ParamRunFor(std::chrono::nanoseconds t) {
+    switch (GetParam()) {
+      case RunMode::kRun:
+        scheduler_scheduler_.Run();
+        break;
+      case RunMode::kRunUntil:
+        scheduler_scheduler_.RunUntil(
+            realtime_clock::time_point(
+                schedulers_.at(0).monotonic_now().time_since_epoch() + t),
+            &schedulers_.at(0), []() { return std::chrono::nanoseconds(0); });
+        break;
+      case RunMode::kRunFor:
+        scheduler_scheduler_.RunFor(t);
+        break;
+    }
+  }
+
+  message_bridge::TestingTimeConverter time_{kNumNodes};
+  std::vector<EventScheduler> schedulers_;
+  EventSchedulerScheduler scheduler_scheduler_;
+};
+
+// Tests that we correctly handle exiting during startup.
+TEST_P(EventSchedulerParamTest, ExitOnStartup) {
+  StartClocksAtEpoch();
+  bool observed_handler = false;
+  schedulers_.at(0).ScheduleOnStartup([this, &observed_handler]() {
+    EXPECT_FALSE(schedulers_.at(0).is_running());
+    observed_handler = true;
+    scheduler_scheduler_.Exit();
+  });
+  ParamRunFor(std::chrono::seconds(1));
+  EXPECT_TRUE(observed_handler);
+}
+
+// Test that creating an event and running the scheduler runs the event.
+TEST_P(EventSchedulerParamTest, ScheduleEvent) {
+  StartClocksAtEpoch();
+  int counter = 0;
+
+  FunctionEvent e([&counter]() { counter += 1; });
+  schedulers_.at(0).Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+  ParamRunFor(std::chrono::seconds(1));
+  EXPECT_EQ(counter, 1);
+  auto token = schedulers_.at(0).Schedule(
+      monotonic_clock::epoch() + chrono::seconds(2), &e);
+  schedulers_.at(0).Deschedule(token);
+  ParamRunFor(std::chrono::seconds(2));
+  EXPECT_EQ(counter, 1);
+}
+
+// Tests that a node that would have a negative monotonic time at boot does not
+// get started until later.
+TEST_P(EventSchedulerParamTest, NodeWaitsTillEpochToBoot) {
+  time_.AddNextTimestamp(
+      distributed_clock::epoch(),
+      {BootTimestamp{0, monotonic_clock::epoch()},
+       BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(1)}});
+  bool observed_startup_0 = false;
+  bool observed_startup_1 = false;
+  bool observed_on_run_1 = false;
+  schedulers_.at(0).ScheduleOnStartup([this, &observed_startup_0]() {
+    observed_startup_0 = true;
+    EXPECT_FALSE(schedulers_.at(0).is_running());
+    EXPECT_FALSE(schedulers_.at(1).is_running());
+    EXPECT_EQ(distributed_clock::epoch(),
+              scheduler_scheduler_.distributed_now());
+    EXPECT_EQ(monotonic_clock::epoch(), schedulers_.at(0).monotonic_now());
+    EXPECT_EQ(monotonic_clock::epoch() - chrono::seconds(1),
+              schedulers_.at(1).monotonic_now());
+  });
+  schedulers_.at(1).ScheduleOnStartup([this, &observed_startup_1]() {
+    observed_startup_1 = true;
+    // Note that we do not *stop* execution on node zero just to get 1 started.
+    EXPECT_TRUE(schedulers_.at(0).is_running());
+    EXPECT_FALSE(schedulers_.at(1).is_running());
+    EXPECT_EQ(distributed_clock::epoch() + chrono::seconds(1),
+              scheduler_scheduler_.distributed_now());
+    EXPECT_EQ(monotonic_clock::epoch() + chrono::seconds(1),
+              schedulers_.at(0).monotonic_now());
+    EXPECT_EQ(monotonic_clock::epoch(), schedulers_.at(1).monotonic_now());
+  });
+  schedulers_.at(1).ScheduleOnRun([this, &observed_on_run_1]() {
+    observed_on_run_1 = true;
+    // Note that we do not *stop* execution on node zero just to get 1 started.
+    EXPECT_TRUE(schedulers_.at(0).is_running());
+    EXPECT_TRUE(schedulers_.at(1).is_running());
+    EXPECT_EQ(distributed_clock::epoch() + chrono::seconds(1),
+              scheduler_scheduler_.distributed_now());
+    EXPECT_EQ(monotonic_clock::epoch() + chrono::seconds(1),
+              schedulers_.at(0).monotonic_now());
+    EXPECT_EQ(monotonic_clock::epoch(), schedulers_.at(1).monotonic_now());
+  });
+
+  FunctionEvent e([]() {});
+  schedulers_.at(0).Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+  ParamRunFor(chrono::seconds(1));
+  EXPECT_TRUE(observed_startup_0);
+  EXPECT_TRUE(observed_startup_1);
+  EXPECT_TRUE(observed_on_run_1);
+}
+
+// Tests that a node that never boots does not get any of its handlers run.
+TEST_P(EventSchedulerParamTest, NodeNeverBootsIfAlwaysNegative) {
+  time_.AddNextTimestamp(
+      distributed_clock::epoch(),
+      {BootTimestamp{0, monotonic_clock::epoch()},
+       BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(10)}});
+  bool observed_startup_0 = false;
+  schedulers_.at(0).ScheduleOnStartup([this, &observed_startup_0]() {
+    observed_startup_0 = true;
+    EXPECT_FALSE(schedulers_.at(0).is_running());
+    EXPECT_FALSE(schedulers_.at(1).is_running());
+    EXPECT_EQ(distributed_clock::epoch(),
+              scheduler_scheduler_.distributed_now());
+    EXPECT_EQ(monotonic_clock::epoch(), schedulers_.at(0).monotonic_now());
+    EXPECT_EQ(monotonic_clock::epoch() - chrono::seconds(10),
+              schedulers_.at(1).monotonic_now());
+  });
+  schedulers_.at(1).ScheduleOnStartup(
+      []() { FAIL() << "Should never have hit startup handlers for node 1."; });
+  schedulers_.at(1).ScheduleOnRun(
+      []() { FAIL() << "Should never have hit OnRun handlers for node 1."; });
+  schedulers_.at(1).set_stopped(
+      []() { FAIL() << "Should never have hit stopped handlers for node 1."; });
+
+  FunctionEvent e([this]() { scheduler_scheduler_.Exit(); });
+  schedulers_.at(0).Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+  ParamRunFor(chrono::seconds(1));
+  EXPECT_TRUE(observed_startup_0);
+}
+
+// Checks for regressions in how the startup/shutdown handlers behave.
+TEST_P(EventSchedulerParamTest, StartupShutdownHandlers) {
+  StartClocksAtEpoch();
+  time_.AddNextTimestamp(
+      distributed_clock::epoch() + chrono::seconds(3),
+      {BootTimestamp{0, monotonic_clock::epoch() + chrono::seconds(3)},
+       BootTimestamp{0, monotonic_clock::epoch() + chrono::seconds(3)}});
+  time_.RebootAt(0, distributed_clock::epoch() + chrono::seconds(4));
+  // Expected behavior:
+  // If all handlers get called during a reboot, they should sequence as:
+  // * is_running_ = false
+  // * stopped()
+  // * on_shutdown()
+  // * on_startup()
+  // * started()
+  // * is_running_ = true
+  // * OnRun()
+  //
+  // on_shutdown handlers should not get called at end of execution (e.g., when
+  // TemporarilyStopAndRun is called)--only when a node reboots.
+  //
+  // startup and OnRun handlers get cleared after being called once; these are
+  // also the only handlers that can have more than one handler registered.
+  //
+  // Create counters for all the handlers on the 0 node. Create separate a/b
+  // counters for the handlers that can/should get cleared.
+  int shutdown_counter = 0;
+  int stopped_counter = 0;
+  int startup_counter_a = 0;
+  int startup_counter_b = 0;
+  int started_counter = 0;
+  int on_run_counter_a = 0;
+  int on_run_counter_b = 0;
+
+  schedulers_.at(1).set_on_shutdown([]() {
+    FAIL() << "Should never reach the node 1 shutdown handler, since it never "
+              "reboots.";
+  });
+
+  auto startup_handler_a = [this, &startup_counter_a]() {
+    EXPECT_FALSE(schedulers_.at(0).is_running());
+    ++startup_counter_a;
+  };
+
+  auto startup_handler_b = [this, &startup_counter_b]() {
+    EXPECT_FALSE(schedulers_.at(0).is_running());
+    ++startup_counter_b;
+  };
+
+  auto on_run_handler_a = [this, &on_run_counter_a]() {
+    EXPECT_TRUE(schedulers_.at(0).is_running());
+    ++on_run_counter_a;
+  };
+
+  auto on_run_handler_b = [this, &on_run_counter_b]() {
+    EXPECT_TRUE(schedulers_.at(0).is_running());
+    ++on_run_counter_b;
+  };
+
+  schedulers_.at(0).set_stopped([this, &stopped_counter]() {
+    EXPECT_FALSE(schedulers_.at(0).is_running());
+    ++stopped_counter;
+  });
+  schedulers_.at(0).set_on_shutdown(
+      [this, &shutdown_counter, startup_handler_a, on_run_handler_a]() {
+        EXPECT_FALSE(schedulers_.at(0).is_running());
+        schedulers_.at(0).ScheduleOnStartup(startup_handler_a);
+        schedulers_.at(0).ScheduleOnRun(on_run_handler_a);
+        ++shutdown_counter;
+      });
+  schedulers_.at(0).ScheduleOnStartup(startup_handler_a);
+  schedulers_.at(0).set_started([this, &started_counter]() {
+    EXPECT_FALSE(schedulers_.at(0).is_running());
+    ++started_counter;
+  });
+  schedulers_.at(0).ScheduleOnRun(on_run_handler_a);
+
+  FunctionEvent e([]() {});
+  schedulers_.at(0).Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+  ParamRunFor(std::chrono::seconds(1));
+  EXPECT_EQ(shutdown_counter, 0);
+  EXPECT_EQ(stopped_counter, 1);
+  EXPECT_EQ(started_counter, 1);
+  EXPECT_EQ(startup_counter_a, 1);
+  EXPECT_EQ(on_run_counter_a, 1);
+  EXPECT_EQ(startup_counter_b, 0);
+  EXPECT_EQ(on_run_counter_b, 0);
+
+  // In the middle, execute a TemporarilyStopAndRun. Use it to re-register the
+  // startup handlers.
+  schedulers_.at(0).ScheduleOnStartup(startup_handler_b);
+  schedulers_.at(0).ScheduleOnRun(on_run_handler_b);
+  FunctionEvent stop_and_run([this, startup_handler_a, on_run_handler_a]() {
+    scheduler_scheduler_.TemporarilyStopAndRun(
+        [this, startup_handler_a, on_run_handler_a]() {
+          schedulers_.at(0).ScheduleOnStartup(startup_handler_a);
+          schedulers_.at(0).ScheduleOnRun(on_run_handler_a);
+        });
+  });
+  schedulers_.at(1).Schedule(monotonic_clock::epoch() + chrono::seconds(2),
+                             &stop_and_run);
+  ParamRunFor(std::chrono::seconds(1));
+  EXPECT_EQ(shutdown_counter, 0);
+  EXPECT_EQ(stopped_counter, 3);
+  EXPECT_EQ(started_counter, 3);
+  EXPECT_EQ(startup_counter_a, 2);
+  EXPECT_EQ(on_run_counter_a, 2);
+  EXPECT_EQ(startup_counter_b, 1);
+  EXPECT_EQ(on_run_counter_b, 1);
+
+  // Next, execute a reboot in the middle of running and confirm that things
+  // tally correctly. We do not re-register the startup/on_run handlers before
+  // starting here, but do in the shutdown handler, so should see the A handlers
+  // increment.
+  // We need to schedule at least one event so that the reboot is actually
+  // observable (otherwise Run() will just terminate immediately, since there
+  // are no scheduled events that could possibly observe the reboot anyways).
+  schedulers_.at(1).Schedule(monotonic_clock::epoch() + chrono::seconds(5), &e);
+  ParamRunFor(std::chrono::seconds(5));
+  EXPECT_EQ(shutdown_counter, 1);
+  EXPECT_EQ(stopped_counter, 5);
+  EXPECT_EQ(started_counter, 5);
+  EXPECT_EQ(startup_counter_a, 3);
+  EXPECT_EQ(on_run_counter_a, 3);
+  EXPECT_EQ(startup_counter_b, 1);
+  EXPECT_EQ(on_run_counter_b, 1);
+}
+
+// Test that descheduling an already scheduled event doesn't run the event.
+TEST_P(EventSchedulerParamTest, DescheduleEvent) {
+  StartClocksAtEpoch();
+  int counter = 0;
+  FunctionEvent e([&counter]() { counter += 1; });
+  auto token = schedulers_.at(0).Schedule(
+      monotonic_clock::epoch() + chrono::seconds(1), &e);
+  schedulers_.at(0).Deschedule(token);
+  ParamRunFor(std::chrono::seconds(2));
+  EXPECT_EQ(counter, 0);
+}
+
+// Test that TemporarilyStopAndRun respects and preserves running.
+TEST_P(EventSchedulerParamTest, TemporarilyStopAndRun) {
+  StartClocksAtEpoch();
+  int counter = 0;
+
+  scheduler_scheduler_.TemporarilyStopAndRun([this]() {
+    SCOPED_TRACE("StopAndRun while stopped.");
+    CheckSchedulersRunning(false);
+  });
+  {
+    SCOPED_TRACE("After StopAndRun while stopped.");
+    CheckSchedulersRunning(false);
+  }
+
+  FunctionEvent e([&]() {
+    counter += 1;
+    {
+      SCOPED_TRACE("Before StopAndRun while running.");
+      CheckSchedulersRunning(true);
+    }
+    scheduler_scheduler_.TemporarilyStopAndRun([&]() {
+      SCOPED_TRACE("StopAndRun while running.");
+      CheckSchedulersRunning(false);
+    });
+    {
+      SCOPED_TRACE("After StopAndRun while running.");
+      CheckSchedulersRunning(true);
+    }
+  });
+  schedulers_.at(0).Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+  ParamRunFor(std::chrono::seconds(1));
+  EXPECT_EQ(counter, 1);
+}
+
+// Test that TemporarilyStopAndRun leaves stopped nodes stopped.
+TEST_P(EventSchedulerParamTest, TemporarilyStopAndRunStaggeredStart) {
+  time_.AddNextTimestamp(
+      distributed_clock::epoch(),
+      {BootTimestamp{0, monotonic_clock::epoch()},
+       BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(10)}});
+  int counter = 0;
+
+  schedulers_[1].ScheduleOnRun([]() { FAIL(); });
+  schedulers_[1].ScheduleOnStartup([]() { FAIL(); });
+  schedulers_[1].set_on_shutdown([]() { FAIL(); });
+  schedulers_[1].set_started([]() { FAIL(); });
+  schedulers_[1].set_stopped([]() { FAIL(); });
+
+  FunctionEvent e([this, &counter]() {
+    counter += 1;
+    EXPECT_TRUE(schedulers_[0].is_running());
+    EXPECT_FALSE(schedulers_[1].is_running());
+    scheduler_scheduler_.TemporarilyStopAndRun([&]() {
+      SCOPED_TRACE("StopAndRun while running.");
+      CheckSchedulersRunning(false);
+    });
+    EXPECT_TRUE(schedulers_[0].is_running());
+    EXPECT_FALSE(schedulers_[1].is_running());
+  });
+  FunctionEvent exiter([this]() { scheduler_scheduler_.Exit(); });
+  schedulers_.at(0).Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+  schedulers_.at(0).Schedule(monotonic_clock::epoch() + chrono::seconds(2),
+                             &exiter);
+  ParamRunFor(std::chrono::seconds(1));
+  EXPECT_EQ(counter, 1);
+}
+
+INSTANTIATE_TEST_SUITE_P(EventSchedulerParamTest, EventSchedulerParamTest,
+                         testing::Values(RunMode::kRun, RunMode::kRunFor));
+
 }  // namespace aos
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 35474c7..bf643e0 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -4533,6 +4533,73 @@
   ConfirmReadable(filenames);
 }
 
+// Tests that we can replay a logfile that has timestamps such that at least one
+// node's epoch is at a positive distributed_clock (and thus will have to be
+// booted after the other node(s)).
+TEST_P(MultinodeLoggerTest, StartOneNodeBeforeOther) {
+  std::vector<std::string> filenames;
+
+  CHECK_EQ(pi1_index_, 0u);
+  CHECK_EQ(pi2_index_, 1u);
+
+  time_converter_.AddNextTimestamp(
+      distributed_clock::epoch(),
+      {BootTimestamp::epoch(), BootTimestamp::epoch()});
+
+  const chrono::nanoseconds before_reboot_duration = chrono::milliseconds(1000);
+  time_converter_.RebootAt(
+      0, distributed_clock::time_point(before_reboot_duration));
+
+  const chrono::nanoseconds test_duration = time_converter_.AddMonotonic(
+      {chrono::milliseconds(10000), chrono::milliseconds(10000)});
+
+  const std::string kLogfile =
+      aos::testing::TestTmpDir() + "/multi_logfile2.1/";
+  util::UnlinkRecursive(kLogfile);
+
+  pi2_->Disconnect(pi1_->node());
+  pi1_->Disconnect(pi2_->node());
+
+  {
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    pi2_logger.StartLogger(kLogfile);
+    event_loop_factory_.RunFor(before_reboot_duration);
+
+    pi2_->Connect(pi1_->node());
+    pi1_->Connect(pi2_->node());
+
+    event_loop_factory_.RunFor(test_duration);
+
+    pi2_logger.AppendAllFilenames(&filenames);
+  }
+
+  const std::vector<LogFile> sorted_parts = SortParts(filenames);
+  ConfirmReadable(filenames);
+
+  {
+    LogReader reader(sorted_parts);
+    SimulatedEventLoopFactory replay_factory(reader.configuration());
+    reader.RegisterWithoutStarting(&replay_factory);
+
+    NodeEventLoopFactory *const replay_node =
+        reader.event_loop_factory()->GetNodeEventLoopFactory("pi1");
+
+    std::unique_ptr<EventLoop> test_event_loop =
+        replay_node->MakeEventLoop("test_reader");
+    replay_node->OnStartup([replay_node]() {
+      // Check that we didn't boot until at least t=0.
+      CHECK_LE(monotonic_clock::epoch(), replay_node->monotonic_now());
+    });
+    test_event_loop->OnRun([&test_event_loop]() {
+      // Check that we didn't boot until at least t=0.
+      EXPECT_LE(monotonic_clock::epoch(), test_event_loop->monotonic_now());
+    });
+    reader.event_loop_factory()->Run();
+    reader.Deregister();
+  }
+}
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/multinode_pingpong_test_combined.json b/aos/events/multinode_pingpong_test_combined.json
index 2d58dd0..66ef8df 100644
--- a/aos/events/multinode_pingpong_test_combined.json
+++ b/aos/events/multinode_pingpong_test_combined.json
@@ -250,6 +250,20 @@
           "time_to_live": 5000000
         }
       ]
+    },
+    {
+      "name": "/reliable2",
+      "type": "aos.examples.Ping",
+      "source_node": "pi2",
+      "destination_nodes": [
+        {
+          "name": "pi1",
+          "priority": 1,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["pi2"],
+          "time_to_live": 0
+        }
+      ]
     }
   ],
   "maps": [
diff --git a/aos/events/multinode_pingpong_test_split.json b/aos/events/multinode_pingpong_test_split.json
index b160c42..049c407 100644
--- a/aos/events/multinode_pingpong_test_split.json
+++ b/aos/events/multinode_pingpong_test_split.json
@@ -163,6 +163,12 @@
       "source_node": "pi1"
     },
     {
+      "name": "/pi2/aos/remote_timestamps/pi1/reliable2/aos-examples-Ping",
+      "type": "aos.message_bridge.RemoteMessage",
+      "logger": "NOT_LOGGED",
+      "source_node": "pi2"
+    },
+    {
       "name": "/pi1/aos",
       "type": "aos.timing.Report",
       "source_node": "pi1",
@@ -266,6 +272,20 @@
           "time_to_live": 5000000
         }
       ]
+    },
+    {
+      "name": "/reliable2",
+      "type": "aos.examples.Ping",
+      "source_node": "pi2",
+      "destination_nodes": [
+        {
+          "name": "pi1",
+          "priority": 1,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["pi2"],
+          "time_to_live": 0
+        }
+      ]
     }
   ],
   "maps": [
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index c78405d..0bdd711 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -1164,6 +1164,7 @@
 
 void SimulatedTimerHandler::Setup(monotonic_clock::time_point base,
                                   monotonic_clock::duration repeat_offset) {
+  CHECK_GE(base, monotonic_clock::epoch());
   // The allocations in here are due to infrastructure and don't count in the no
   // mallocs in RT code.
   ScopedNotRealtime nrt;
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index a46cc1c..2d59c5d 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -139,72 +139,6 @@
   aos::FlatbufferDetachedBuffer<aos::Configuration> config;
 };
 
-class FunctionEvent : public EventScheduler::Event {
- public:
-  FunctionEvent(std::function<void()> fn) : fn_(fn) {}
-
-  void Handle() noexcept override { fn_(); }
-
- private:
-  std::function<void()> fn_;
-};
-
-// Test that creating an event and running the scheduler runs the event.
-TEST(EventSchedulerTest, ScheduleEvent) {
-  int counter = 0;
-  EventSchedulerScheduler scheduler_scheduler;
-  EventScheduler scheduler(0);
-  scheduler_scheduler.AddEventScheduler(&scheduler);
-
-  FunctionEvent e([&counter]() { counter += 1; });
-  scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
-  scheduler_scheduler.Run();
-  EXPECT_EQ(counter, 1);
-  auto token =
-      scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(2), &e);
-  scheduler.Deschedule(token);
-  scheduler_scheduler.Run();
-  EXPECT_EQ(counter, 1);
-}
-
-// Test that descheduling an already scheduled event doesn't run the event.
-TEST(EventSchedulerTest, DescheduleEvent) {
-  int counter = 0;
-  EventSchedulerScheduler scheduler_scheduler;
-  EventScheduler scheduler(0);
-  scheduler_scheduler.AddEventScheduler(&scheduler);
-
-  FunctionEvent e([&counter]() { counter += 1; });
-  auto token =
-      scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
-  scheduler.Deschedule(token);
-  scheduler_scheduler.Run();
-  EXPECT_EQ(counter, 0);
-}
-
-// Test that TemporarilyStopAndRun respects and preserves running.
-TEST(EventSchedulerTest, TemporarilyStopAndRun) {
-  int counter = 0;
-  EventSchedulerScheduler scheduler_scheduler;
-  EventScheduler scheduler(0);
-  scheduler_scheduler.AddEventScheduler(&scheduler);
-
-  scheduler_scheduler.TemporarilyStopAndRun(
-      [&]() { CHECK(!scheduler_scheduler.is_running()); });
-  ASSERT_FALSE(scheduler_scheduler.is_running());
-
-  FunctionEvent e([&]() {
-    counter += 1;
-    CHECK(scheduler_scheduler.is_running());
-    scheduler_scheduler.TemporarilyStopAndRun(
-        [&]() { CHECK(!scheduler_scheduler.is_running()); });
-    CHECK(scheduler_scheduler.is_running());
-  });
-  scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
-  scheduler_scheduler.Run();
-  EXPECT_EQ(counter, 1);
-}
-
 // Test that sending a message after running gets properly notified.
 TEST(SimulatedEventLoopTest, SendAfterRunFor) {
   SimulatedEventLoopTestFactory factory;
@@ -1347,9 +1281,34 @@
       pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
                                     "/pi3/aos");
 
+  std::vector<std::unique_ptr<aos::EventLoop>> statistics_watcher_loops;
+  statistics_watcher_loops.emplace_back(pi1->MakeEventLoop("test"));
+  statistics_watcher_loops.emplace_back(pi2->MakeEventLoop("test"));
+  statistics_watcher_loops.emplace_back(pi3->MakeEventLoop("test"));
+  // The currenct contract is that, if all nodes boot simultaneously in
+  // simulation, that they should all act as if they area already connected,
+  // without ever observing the transition from disconnected to connected (note
+  // that on a real system the ServerStatistics message will get resent for each
+  // and every new connection, even if the new connections happen
+  // "simultaneously"--in simulation, we are essentially acting as if we are
+  // starting execution in an already running system, rather than observing the
+  // boot process).
+  for (auto &event_loop : statistics_watcher_loops) {
+    event_loop->MakeWatcher(
+        "/aos", [](const message_bridge::ServerStatistics &msg) {
+          for (const message_bridge::ServerConnection *connection :
+               *msg.connections()) {
+            EXPECT_EQ(message_bridge::State::CONNECTED, connection->state())
+                << connection->node()->name()->string_view();
+          }
+        });
+  }
+
   simulated_event_loop_factory.RunFor(chrono::seconds(2) +
                                       chrono::milliseconds(5));
 
+  statistics_watcher_loops.clear();
+
   EXPECT_EQ(pi1_pong_counter.count(), 201u);
   EXPECT_EQ(pi2_pong_counter.count(), 201u);
 
@@ -1642,8 +1601,13 @@
 
   std::unique_ptr<EventLoop> pi2_pong_event_loop =
       simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+  aos::Sender<examples::Ping> pi2_reliable_sender =
+      pi2_pong_event_loop->MakeSender<examples::Ping>("/reliable2");
+  SendPing(&pi2_reliable_sender, 1);
   MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
                                                       "/reliable");
+  MessageCounter<examples::Ping> pi1_reliable_counter(ping_event_loop.get(),
+                                                      "/reliable2");
   MessageCounter<examples::Ping> pi2_unreliable_counter(
       pi2_pong_event_loop.get(), "/unreliable");
   aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
@@ -1699,6 +1663,7 @@
   SendPing(&pi1_unreliable_sender, 2);
   simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
   EXPECT_EQ(pi2_reliable_counter.count(), 2u);
+  EXPECT_EQ(pi1_reliable_counter.count(), 1u);
   EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
 
   EXPECT_EQ(reliable_timestamp_count, 2u);
@@ -2184,6 +2149,71 @@
   EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
 }
 
+TEST(SimulatedEventLoopTest, ReliableMessageSentOnStaggeredBoot) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+  message_bridge::TestingTimeConverter time(
+      configuration::NodesCount(&config.message()));
+  time.AddNextTimestamp(
+      distributed_clock::epoch(),
+      {BootTimestamp{0, monotonic_clock::epoch()},
+       BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(1)},
+       BootTimestamp{0, monotonic_clock::epoch()}});
+  SimulatedEventLoopFactory factory(&config.message());
+  factory.SetTimeConverter(&time);
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+  NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+  const UUID pi1_boot_uuid = pi1->boot_uuid();
+  const UUID pi2_boot_uuid = pi2->boot_uuid();
+  EXPECT_NE(pi1_boot_uuid, UUID::Zero());
+  EXPECT_NE(pi2_boot_uuid, UUID::Zero());
+
+  {
+    ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
+    aos::Sender<examples::Ping> pi1_sender =
+        pi1_event_loop->MakeSender<examples::Ping>("/reliable");
+    SendPing(&pi1_sender, 1);
+  }
+  ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("ping");
+  aos::Sender<examples::Ping> pi2_sender =
+      pi2_event_loop->MakeSender<examples::Ping>("/reliable2");
+  SendPing(&pi2_sender, 1);
+  // Verify that we staggered the OnRun callback correctly.
+  pi2_event_loop->OnRun([pi1, pi2]() {
+    EXPECT_EQ(pi1->monotonic_now(),
+              monotonic_clock::epoch() + std::chrono::seconds(1));
+    EXPECT_EQ(pi2->monotonic_now(), monotonic_clock::epoch());
+  });
+
+  factory.RunFor(chrono::seconds(2));
+
+  {
+    ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
+    aos::Fetcher<examples::Ping> fetcher =
+        pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
+    ASSERT_TRUE(fetcher.Fetch());
+    EXPECT_EQ(fetcher.context().monotonic_event_time,
+              monotonic_clock::epoch() + factory.network_delay());
+    EXPECT_EQ(fetcher.context().monotonic_remote_time,
+              monotonic_clock::epoch());
+  }
+  {
+    ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("pong");
+    aos::Fetcher<examples::Ping> fetcher =
+        pi1_event_loop->MakeFetcher<examples::Ping>("/reliable2");
+    ASSERT_TRUE(fetcher.Fetch());
+    EXPECT_EQ(fetcher.context().monotonic_event_time,
+              monotonic_clock::epoch() + std::chrono::seconds(1) +
+                  factory.network_delay());
+    EXPECT_EQ(fetcher.context().monotonic_remote_time,
+              monotonic_clock::epoch() - std::chrono::seconds(1));
+  }
+}
+
 class SimulatedEventLoopDisconnectTest : public ::testing::Test {
  public:
   SimulatedEventLoopDisconnectTest()
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 108176e..31f3f0e 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -138,7 +138,6 @@
     if (fetcher_->context().data == nullptr || sent_) {
       return;
     }
-    CHECK(!timer_scheduled_);
 
     // Send at startup.  It is the best we can do.
     const monotonic_clock::time_point monotonic_delivered_time =
@@ -739,6 +738,25 @@
         destination_delayer->ScheduleReliable();
       }
     }
+    // Note: This exists to work around the fact that some users like to be able
+    // to send reliable messages while execution is stopped, creating a
+    // situation where the following sequencing can occur:
+    // 1) <While stopped> Send a reliable message on Node A (to be forwarded to
+    //    Node B).
+    // 2) Node B starts up.
+    // 3) Anywhere from 0 to N seconds later, Node A starts up.
+    //
+    // In this case, we need the reliable message to make it to Node B, but it
+    // also shouldn't make it to Node B until Node A has started up.
+    //
+    // Ideally, if the user were to wait for the Node B OnRun callbacks to send
+    // the message, then that would trigger the watchers in the delayers.
+    // However, we so far have continued to support Sending while stopped....
+    for (RawMessageDelayer *source_delayer : source_delayers_) {
+      if (source_delayer->time_to_live() == 0) {
+        source_delayer->ScheduleReliable();
+      }
+    }
   });
 }