Make LogReader::OnStart able to start applications at times

There are 2 main features missing from OnStart to make it truly useful
at scale.
1) We need to be able to start applications...
2) Only replaying a time range (typically on the RT clock) is very
   helpful.

Add support for 1.

2 is more tricky.  I considered putting the logic outside LogReader, but
in the end, the bookkeeping for starting and stopping is bad enough that
it really helps to have LogReader manage and synchronize it.  Maybe we
can refactor it back out later.

Change-Id: I4ddf6e18819d3aadd02f38776bbc7aef843a96b0
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 11368ae..e810e3f 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -407,7 +407,9 @@
   // Returns the name of the underlying queue.
   const Channel *channel() const { return sender_->channel(); }
 
+  // TODO(austin): Deprecate the operator bool.
   operator bool() const { return sender_ ? true : false; }
+  bool valid() const { return static_cast<bool>(sender_); }
 
   // Returns the time_points that the last message was sent at.
   aos::monotonic_clock::time_point monotonic_sent_time() const {
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index c06638c..97e0946 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -66,20 +66,32 @@
 }
 
 void EventScheduler::RunOnRun() {
-  for (std::function<void()> &on_run : on_run_) {
-    on_run();
+  while (!on_run_.empty()) {
+    std::function<void()> fn = std::move(*on_run_.begin());
+    on_run_.erase(on_run_.begin());
+    fn();
   }
-  on_run_.clear();
 }
 
-void EventScheduler::RunOnStartup() {
-  for (size_t i = 0; i < on_startup_.size(); ++i) {
-    on_startup_[i]();
+void EventScheduler::RunOnStartup() noexcept {
+  while (!on_startup_.empty()) {
+    std::function<void()> fn = std::move(*on_startup_.begin());
+    on_startup_.erase(on_startup_.begin());
+    fn();
   }
-  on_startup_.clear();
 }
 
-void EventScheduler::RunStarted() { started_(); }
+void EventScheduler::RunStarted() {
+  if (started_) {
+    started_();
+  }
+}
+
+void EventScheduler::RunStopped() {
+  if (stopped_) {
+    stopped_();
+  }
+}
 
 std::ostream &operator<<(std::ostream &stream,
                          const aos::distributed_clock::time_point &now) {
@@ -119,6 +131,7 @@
       rebooted.emplace_back(node_index);
       CHECK_EQ(schedulers_[node_index]->boot_count() + 1,
                times[node_index].boot);
+      schedulers_[node_index]->RunStopped();
       schedulers_[node_index]->Shutdown();
     }
   }
@@ -144,6 +157,7 @@
 void EventSchedulerScheduler::RunFor(distributed_clock::duration duration) {
   distributed_clock::time_point end_time = now_ + duration;
   logging::ScopedLogRestorer prev_logger;
+  RunOnStartup();
   RunOnRun();
 
   // Run all the sub-event-schedulers.
@@ -191,10 +205,13 @@
   }
 
   now_ = end_time;
+
+  RunStopped();
 }
 
 void EventSchedulerScheduler::Run() {
   logging::ScopedLogRestorer prev_logger;
+  RunOnStartup();
   RunOnRun();
   // Run all the sub-event-schedulers.
   while (is_running_) {
@@ -232,6 +249,8 @@
   }
 
   is_running_ = false;
+
+  RunStopped();
 }
 
 std::tuple<distributed_clock::time_point, EventScheduler *>
@@ -261,4 +280,17 @@
   return std::make_tuple(min_event_time, min_scheduler);
 }
 
+void EventSchedulerScheduler::TemporarilyStopAndRun(std::function<void()> fn) {
+  const bool was_running = is_running_;
+  if (is_running_) {
+    is_running_ = false;
+    RunStopped();
+  }
+  fn();
+  if (was_running) {
+    RunOnStartup();
+    RunOnRun();
+  }
+}
+
 }  // namespace aos
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index 55c1cf8..b64728c 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -126,7 +126,12 @@
     started_ = std::move(callback);
   }
 
+  void set_stopped(std::function<void()> callback) {
+    stopped_ = std::move(callback);
+  }
+
   std::function<void()> started_;
+  std::function<void()> stopped_;
   std::function<void()> on_shutdown_;
 
   Token InvalidToken() { return events_list_.end(); }
@@ -138,10 +143,12 @@
   void RunOnRun();
 
   // Runs the OnStartup callbacks.
-  void RunOnStartup();
+  void RunOnStartup() noexcept;
 
   // Runs the Started callback.
   void RunStarted();
+  // Runs the Started callback.
+  void RunStopped();
 
   // Returns true if events are being handled.
   inline bool is_running() const;
@@ -283,6 +290,13 @@
     }
   }
 
+  void RunStopped() {
+    CHECK(!is_running_);
+    for (EventScheduler *scheduler : schedulers_) {
+      scheduler->RunStopped();
+    }
+  }
+
   void SetTimeConverter(TimeConverter *time_converter) {
     time_converter->set_reboot_found(
         [this](distributed_clock::time_point reboot_time,
@@ -294,6 +308,11 @@
         });
   }
 
+  // Runs the provided callback now.  Stops everything, runs the callback, then
+  // starts it all up again.  This lets us do operations like starting and
+  // stopping applications while running.
+  void TemporarilyStopAndRun(std::function<void()> fn);
+
  private:
   // Handles running the OnRun functions.
   void RunOnRun() {
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index e58ccd2..4e89a0e 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -39,6 +39,13 @@
     time_estimation_buffer_seconds, 2.0,
     "The time to buffer ahead in the log file to accurately reconstruct time.");
 
+DEFINE_string(
+    start_time, "",
+    "If set, start at this point in time in the log on the realtime clock.");
+DEFINE_string(
+    end_time, "",
+    "If set, end at this point in time in the log on the realtime clock.");
+
 namespace aos {
 namespace configuration {
 // We don't really want to expose this publicly, but log reader doesn't really
@@ -131,6 +138,79 @@
 using message_bridge::RemoteMessage;
 }  // namespace
 
+// Class to manage triggering events on the RT clock while replaying logs. Since
+// the RT clock can only change when we get a message, we only need to update
+// our timers when new messages are read.
+class EventNotifier {
+ public:
+  EventNotifier(EventLoop *event_loop, std::function<void()> fn,
+                std::string_view name,
+                realtime_clock::time_point realtime_event_time)
+      : event_loop_(event_loop),
+        fn_(std::move(fn)),
+        realtime_event_time_(realtime_event_time) {
+    CHECK(event_loop_);
+    event_timer_ = event_loop->AddTimer([this]() { HandleTime(); });
+
+    if (event_loop_->node() != nullptr) {
+      event_timer_->set_name(
+          absl::StrCat(event_loop_->node()->name()->string_view(), "_", name));
+    } else {
+      event_timer_->set_name(name);
+    }
+  }
+
+  ~EventNotifier() { event_timer_->Disable(); }
+
+  // Returns the event trigger time.
+  realtime_clock::time_point realtime_event_time() const {
+    return realtime_event_time_;
+  }
+
+  // Observes the next message and potentially calls the callback or updates the
+  // timer.
+  void ObserveNextMessage(monotonic_clock::time_point monotonic_message_time,
+                          realtime_clock::time_point realtime_message_time) {
+    if (realtime_message_time < realtime_event_time_) {
+      return;
+    }
+    if (called_) {
+      return;
+    }
+
+    // Move the callback wakeup time to the correct time (or make it now if
+    // there's a gap in time) now that we know it is before the next
+    // message.
+    const monotonic_clock::time_point candidate_monotonic =
+        (realtime_event_time_ - realtime_message_time) + monotonic_message_time;
+    const monotonic_clock::time_point monotonic_now =
+        event_loop_->monotonic_now();
+    if (candidate_monotonic < monotonic_now) {
+      // Whops, time went backwards.  Just do it now.
+      HandleTime();
+    } else {
+      event_timer_->Setup(candidate_monotonic);
+    }
+  }
+
+ private:
+  void HandleTime() {
+    if (!called_) {
+      called_ = true;
+      fn_();
+    }
+  }
+
+  EventLoop *event_loop_ = nullptr;
+  TimerHandler *event_timer_ = nullptr;
+  std::function<void()> fn_;
+
+  const realtime_clock::time_point realtime_event_time_ =
+      realtime_clock::min_time;
+
+  bool called_ = false;
+};
+
 LogReader::LogReader(std::string_view filename,
                      const Configuration *replay_configuration)
     : LogReader(SortParts({std::string(filename)}), replay_configuration) {}
@@ -139,6 +219,9 @@
                      const Configuration *replay_configuration)
     : log_files_(std::move(log_files)),
       replay_configuration_(replay_configuration) {
+  SetStartTime(FLAGS_start_time);
+  SetEndTime(FLAGS_end_time);
+
   CHECK_GT(log_files_.size(), 0u);
   {
     // Validate that we have the same config everwhere.  This will be true if
@@ -329,8 +412,15 @@
 
   VLOG(1) << "Starting " << MaybeNodeName(node()) << "at time "
           << monotonic_start_time(boot_count());
-  for (size_t i = 0; i < on_starts_.size(); ++i) {
-    on_starts_[i]();
+  auto fn = [this]() {
+    for (size_t i = 0; i < on_starts_.size(); ++i) {
+      on_starts_[i]();
+    }
+  };
+  if (event_loop_factory_) {
+    event_loop_factory_->AllowApplicationCreationDuring(std::move(fn));
+  } else {
+    fn();
   }
   stopped_ = false;
   started_ = true;
@@ -358,12 +448,19 @@
 void LogReader::State::RunOnEnd() {
   VLOG(1) << "Ending " << MaybeNodeName(node()) << "at time "
           << monotonic_start_time(boot_count());
-  for (size_t i = 0; i < on_ends_.size(); ++i) {
-    on_ends_[i]();
+  auto fn = [this]() {
+    for (size_t i = 0; i < on_ends_.size(); ++i) {
+      on_ends_[i]();
+    }
+  };
+  if (event_loop_factory_) {
+    event_loop_factory_->AllowApplicationCreationDuring(std::move(fn));
+  } else {
+    fn();
   }
 
   stopped_ = true;
-  started_ = false;
+  started_ = true;
 }
 
 void LogReader::Register() {
@@ -397,7 +494,8 @@
         node);
     State *state = states_[node_index].get();
     state->SetNodeEventLoopFactory(
-        event_loop_factory_->GetNodeEventLoopFactory(node));
+        event_loop_factory_->GetNodeEventLoopFactory(node),
+        event_loop_factory_);
 
     state->SetChannelCount(logged_configuration()->channels()->size());
     timestamp_mappers.emplace_back(state->timestamp_mapper());
@@ -489,6 +587,11 @@
 
 void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
   RegisterWithoutStarting(event_loop_factory);
+  StartAfterRegister(event_loop_factory);
+}
+
+void LogReader::StartAfterRegister(
+    SimulatedEventLoopFactory *event_loop_factory) {
   // We want to start the log file at the last start time of the log files
   // from all the nodes.  Compute how long each node's simulation needs to run
   // to move time to this point.
@@ -588,6 +691,10 @@
   State *state =
       states_[configuration::GetNodeIndex(configuration(), node)].get();
 
+  if (!event_loop) {
+    state->ClearTimeFlags();
+  }
+
   state->set_event_loop(event_loop);
 
   // We don't run timing reports when trying to print out logged data, because
@@ -889,7 +996,7 @@
                 << "is on the next boot, " << next_time << " now is "
                 << state->monotonic_now();
         CHECK(event_loop_factory_);
-        state->RunOnEnd();
+        state->NotifyLogfileEnd();
         return;
       }
       VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
@@ -900,7 +1007,7 @@
     } else {
       VLOG(1) << MaybeNodeName(state->event_loop()->node())
               << "No next message, scheduling shutdown";
-      state->RunOnEnd();
+      state->NotifyLogfileEnd();
       // Set a timer up immediately after now to die. If we don't do this,
       // then the senders waiting on the message we just read will never get
       // called.
@@ -917,7 +1024,13 @@
 
   if (state->OldestMessageTime() != BootTimestamp::max_time()) {
     state->set_startup_timer(
-        event_loop->AddTimer([state]() { state->RunOnStart(); }));
+        event_loop->AddTimer([state]() { state->NotifyLogfileStart(); }));
+    if (start_time_ != realtime_clock::min_time) {
+      state->SetStartTimeFlag(start_time_);
+    }
+    if (end_time_ != realtime_clock::max_time) {
+      state->SetEndTimeFlag(end_time_);
+    }
     event_loop->OnRun([state]() {
       BootTimestamp next_time = state->OldestMessageTime();
       CHECK_EQ(next_time.boot, state->boot_count());
@@ -927,6 +1040,40 @@
   }
 }
 
+void LogReader::SetEndTime(std::string end_time) {
+  if (end_time.empty()) {
+    SetEndTime(realtime_clock::max_time);
+  } else {
+    std::optional<aos::realtime_clock::time_point> parsed_end_time =
+        aos::realtime_clock::FromString(end_time);
+    CHECK(parsed_end_time) << ": Failed to parse end time '" << end_time
+                           << "'.  Expected a date in the format of "
+                              "2021-01-15_15-30-35.000000000.";
+    SetEndTime(*parsed_end_time);
+  }
+}
+
+void LogReader::SetEndTime(realtime_clock::time_point end_time) {
+  end_time_ = end_time;
+}
+
+void LogReader::SetStartTime(std::string start_time) {
+  if (start_time.empty()) {
+    SetStartTime(realtime_clock::min_time);
+  } else {
+    std::optional<aos::realtime_clock::time_point> parsed_start_time =
+        aos::realtime_clock::FromString(start_time);
+    CHECK(parsed_start_time) << ": Failed to parse start time '" << start_time
+                             << "'.  Expected a date in the format of "
+                                "2021-01-15_15-30-35.000000000.";
+    SetStartTime(*parsed_start_time);
+  }
+}
+
+void LogReader::SetStartTime(realtime_clock::time_point start_time) {
+  start_time_ = start_time;
+}
+
 void LogReader::Deregister() {
   // Make sure that things get destroyed in the correct order, rather than
   // relying on getting the order correct in the class definition.
@@ -1312,8 +1459,10 @@
 }
 
 void LogReader::State::SetNodeEventLoopFactory(
-    NodeEventLoopFactory *node_event_loop_factory) {
+    NodeEventLoopFactory *node_event_loop_factory,
+    SimulatedEventLoopFactory *event_loop_factory) {
   node_event_loop_factory_ = node_event_loop_factory;
+  event_loop_factory_ = event_loop_factory;
 }
 
 void LogReader::State::SetChannelCount(size_t count) {
@@ -1670,7 +1819,7 @@
   return result;
 }
 
-BootTimestamp LogReader::State::OldestMessageTime() const {
+BootTimestamp LogReader::State::OldestMessageTime() {
   if (timestamp_mapper_ == nullptr) {
     return BootTimestamp::max_time();
   }
@@ -1680,6 +1829,12 @@
   }
   VLOG(2) << MaybeNodeName(node()) << "oldest message at "
           << result_ptr->monotonic_event_time.time;
+
+  if (result_ptr->monotonic_event_time.boot == boot_count()) {
+    ObserveNextMessage(result_ptr->monotonic_event_time.time,
+                       result_ptr->realtime_event_time);
+  }
+
   return result_ptr->monotonic_event_time;
 }
 
@@ -1692,11 +1847,12 @@
 
 void LogReader::State::Deregister() {
   if (started_ && !stopped_) {
-    RunOnEnd();
+    NotifyLogfileEnd();
   }
   for (size_t i = 0; i < channels_.size(); ++i) {
     channels_[i].reset();
   }
+  ClearTimeFlags();
   channel_timestamp_loggers_.clear();
   timestamp_loggers_.clear();
   event_loop_unique_ptr_.reset();
@@ -1705,5 +1861,75 @@
   node_event_loop_factory_ = nullptr;
 }
 
+void LogReader::State::SetStartTimeFlag(realtime_clock::time_point start_time) {
+  if (start_time != realtime_clock::min_time) {
+    start_event_notifier_ = std::make_unique<EventNotifier>(
+        event_loop_, [this]() { NotifyFlagStart(); }, "flag_start", start_time);
+  }
+}
+
+void LogReader::State::SetEndTimeFlag(realtime_clock::time_point end_time) {
+  if (end_time != realtime_clock::max_time) {
+    end_event_notifier_ = std::make_unique<EventNotifier>(
+        event_loop_, [this]() { NotifyFlagEnd(); }, "flag_end", end_time);
+  }
+}
+
+void LogReader::State::ObserveNextMessage(
+    monotonic_clock::time_point monotonic_event,
+    realtime_clock::time_point realtime_event) {
+  if (start_event_notifier_) {
+    start_event_notifier_->ObserveNextMessage(monotonic_event, realtime_event);
+  }
+  if (end_event_notifier_) {
+    end_event_notifier_->ObserveNextMessage(monotonic_event, realtime_event);
+  }
+}
+
+void LogReader::State::ClearTimeFlags() {
+  start_event_notifier_.reset();
+  end_event_notifier_.reset();
+}
+
+void LogReader::State::NotifyLogfileStart() {
+  if (start_event_notifier_) {
+    if (start_event_notifier_->realtime_event_time() >
+        realtime_start_time(boot_count())) {
+      VLOG(1) << "Skipping, " << start_event_notifier_->realtime_event_time()
+              << " > " << realtime_start_time(boot_count());
+      return;
+    }
+  }
+  if (found_last_message_) {
+    VLOG(1) << "Last message already found, bailing";
+    return;
+  }
+  RunOnStart();
+}
+
+void LogReader::State::NotifyFlagStart() {
+  if (start_event_notifier_->realtime_event_time() >=
+      realtime_start_time(boot_count())) {
+    RunOnStart();
+  }
+}
+
+void LogReader::State::NotifyLogfileEnd() {
+  if (found_last_message_) {
+    return;
+  }
+
+  if (!stopped_ && started_) {
+    RunOnEnd();
+  }
+}
+
+void LogReader::State::NotifyFlagEnd() {
+  if (!stopped_ && started_) {
+    RunOnEnd();
+    SetFoundLastMessage(true);
+  }
+}
+
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 3eebba7..9bfc90a 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -23,6 +23,8 @@
 namespace aos {
 namespace logger {
 
+class EventNotifier;
+
 // We end up with one of the following 3 log file types.
 //
 // Single node logged as the source node.
@@ -73,10 +75,20 @@
   // below, but can be anything as long as the locations needed to send
   // everything are available.
   void Register(SimulatedEventLoopFactory *event_loop_factory);
+
   // Registers all the callbacks to send the log file data out to an event loop
   // factory.  This does not start replaying or change the current distributed
   // time of the factory.  It does change the monotonic clocks to be right.
   void RegisterWithoutStarting(SimulatedEventLoopFactory *event_loop_factory);
+  // Runs the log until the last start time.  Register above is defined as:
+  // Register(...) {
+  //   RegisterWithoutStarting
+  //   StartAfterRegister
+  // }
+  // This should generally be considered as a stepping stone to convert from
+  // Register() to RegisterWithoutStarting() incrementally.
+  void StartAfterRegister(SimulatedEventLoopFactory *event_loop_factory);
+
   // Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
   // and then calls Register.
   void Register();
@@ -121,6 +133,14 @@
   realtime_clock::time_point realtime_start_time(
       const Node *node = nullptr) const;
 
+  // Sets the start and end times to replay data until for all nodes.  This
+  // overrides the --start_time and --end_time flags.  The default is to replay
+  // all data.
+  void SetStartTime(std::string start_time);
+  void SetStartTime(realtime_clock::time_point start_time);
+  void SetEndTime(std::string end_time);
+  void SetEndTime(realtime_clock::time_point end_time);
+
   // Causes the logger to publish the provided channel on a different name so
   // that replayed applications can publish on the proper channel name without
   // interference. This operates on raw channel names, without any node or
@@ -277,7 +297,7 @@
     TimestampedMessage PopOldest();
 
     // Returns the monotonic time of the oldest message.
-    BootTimestamp OldestMessageTime() const;
+    BootTimestamp OldestMessageTime();
 
     size_t boot_count() const {
       // If we are replaying directly into an event loop, we can't reboot.  So
@@ -296,7 +316,7 @@
       if (start_time == monotonic_clock::min_time) {
         LOG(ERROR)
             << "No start time, skipping, please figure out when this happens";
-        RunOnStart();
+        NotifyLogfileStart();
         return;
       }
       CHECK_GE(start_time, event_loop_->monotonic_now());
@@ -329,7 +349,9 @@
 
     // Sets the node event loop factory for replaying into a
     // SimulatedEventLoopFactory.  Returns the EventLoop to use.
-    void SetNodeEventLoopFactory(NodeEventLoopFactory *node_event_loop_factory);
+    void SetNodeEventLoopFactory(
+        NodeEventLoopFactory *node_event_loop_factory,
+        SimulatedEventLoopFactory *event_loop_factory);
 
     // Sets and gets the event loop to use.
     void set_event_loop(EventLoop *event_loop) { event_loop_ = event_loop; }
@@ -419,6 +441,18 @@
     void RunOnStart();
     void RunOnEnd();
 
+    // Handles a logfile start event to potentially call the OnStart callbacks.
+    void NotifyLogfileStart();
+    // Handles a start time flag start event to potentially call the OnStart
+    // callbacks.
+    void NotifyFlagStart();
+
+    // Handles a logfile end event to potentially call the OnEnd callbacks.
+    void NotifyLogfileEnd();
+    // Handles a end time flag start event to potentially call the OnEnd
+    // callbacks.
+    void NotifyFlagEnd();
+
     // Unregisters everything so we can destory the event loop.
     // TODO(austin): Is this needed?  OnShutdown should be able to serve this
     // need.
@@ -437,6 +471,18 @@
       }
     }
 
+    // Creates and registers the --start_time and --end_time event callbacks.
+    void SetStartTimeFlag(realtime_clock::time_point start_time);
+    void SetEndTimeFlag(realtime_clock::time_point end_time);
+
+    // Notices the next message to update the start/end time callbacks.
+    void ObserveNextMessage(monotonic_clock::time_point monotonic_event,
+                            realtime_clock::time_point realtime_event);
+
+    // Clears the start and end time flag handlers so we can delete the event
+    // loop.
+    void ClearTimeFlags();
+
     // Sets the next wakeup time on the replay callback.
     void Setup(monotonic_clock::time_point next_time) {
       timer_handler_->Setup(next_time);
@@ -516,6 +562,8 @@
 
     // Factory (if we are in sim) that this loop was created on.
     NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
+    SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
+
     std::unique_ptr<EventLoop> event_loop_unique_ptr_;
     // Event loop.
     const Node *node_ = nullptr;
@@ -524,6 +572,9 @@
     TimerHandler *timer_handler_ = nullptr;
     TimerHandler *startup_timer_ = nullptr;
 
+    std::unique_ptr<EventNotifier> start_event_notifier_;
+    std::unique_ptr<EventNotifier> end_event_notifier_;
+
     // Filters (or nullptr if it isn't a forwarded channel) for each channel.
     // This corresponds to the object which is shared among all the channels
     // going between 2 nodes.  The second element in the tuple indicates if this
@@ -598,6 +649,9 @@
 
   // Whether to exit the SimulatedEventLoop when we finish reading the logs.
   bool exit_on_finish_ = true;
+
+  realtime_clock::time_point start_time_ = realtime_clock::min_time;
+  realtime_clock::time_point end_time_ = realtime_clock::max_time;
 };
 
 }  // namespace logger
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 1732549..3b0b672 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -523,7 +523,12 @@
   }
 };
 
-void ConfirmReadable(const std::vector<std::string> &files) {
+std::vector<std::pair<std::vector<realtime_clock::time_point>,
+                      std::vector<realtime_clock::time_point>>>
+ConfirmReadable(
+    const std::vector<std::string> &files,
+    realtime_clock::time_point start_time = realtime_clock::min_time,
+    realtime_clock::time_point end_time = realtime_clock::max_time) {
   {
     LogReader reader(SortParts(files));
 
@@ -535,22 +540,65 @@
     reader.Deregister();
   }
   {
+    std::vector<std::pair<std::vector<realtime_clock::time_point>,
+                          std::vector<realtime_clock::time_point>>>
+        result;
     LogReader reader(SortParts(files));
 
+    reader.SetStartTime(start_time);
+    reader.SetEndTime(end_time);
+
     SimulatedEventLoopFactory log_reader_factory(reader.configuration());
     reader.RegisterWithoutStarting(&log_reader_factory);
+    result.resize(
+        configuration::NodesCount(log_reader_factory.configuration()));
     if (configuration::MultiNode(log_reader_factory.configuration())) {
+      size_t i = 0;
       for (const aos::Node *node :
            *log_reader_factory.configuration()->nodes()) {
-        reader.OnStart(node, [node]() {
+        LOG(INFO) << "Registering start";
+        reader.OnStart(node, [node, &log_reader_factory, &result,
+                              node_index = i]() {
           LOG(INFO) << "Starting " << node->name()->string_view();
+          result[node_index].first.push_back(
+              log_reader_factory.GetNodeEventLoopFactory(node)->realtime_now());
         });
+        reader.OnEnd(node, [node, &log_reader_factory, &result,
+                            node_index = i]() {
+          LOG(INFO) << "Ending " << node->name()->string_view();
+          result[node_index].second.push_back(
+              log_reader_factory.GetNodeEventLoopFactory(node)->realtime_now());
+        });
+        ++i;
       }
+    } else {
+      reader.OnStart([&log_reader_factory, &result]() {
+        LOG(INFO) << "Starting";
+        result[0].first.push_back(
+            log_reader_factory.GetNodeEventLoopFactory(nullptr)
+                ->realtime_now());
+      });
+      reader.OnEnd([&log_reader_factory, &result]() {
+        LOG(INFO) << "Ending";
+        result[0].second.push_back(
+            log_reader_factory.GetNodeEventLoopFactory(nullptr)
+                ->realtime_now());
+      });
     }
 
     log_reader_factory.Run();
 
     reader.Deregister();
+
+    for (auto x : result) {
+      for (auto y : x.first) {
+        VLOG(1) << "Start " << y;
+      }
+      for (auto y : x.second) {
+        VLOG(1) << "End " << y;
+      }
+    }
+    return result;
   }
 }
 
@@ -3589,7 +3637,27 @@
   // Confirm that we can parse the result.  LogReader has enough internal CHECKs
   // to confirm the right thing happened.
   const std::vector<LogFile> sorted_parts = SortParts(filenames);
-  ConfirmReadable(filenames);
+  auto result = ConfirmReadable(filenames);
+  EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
+                                                      chrono::seconds(1)));
+  EXPECT_THAT(result[0].second,
+              ::testing::ElementsAre(realtime_clock::epoch() +
+                                     chrono::microseconds(34990350)));
+
+  EXPECT_THAT(result[1].first,
+              ::testing::ElementsAre(
+                  realtime_clock::epoch() + chrono::seconds(1),
+                  realtime_clock::epoch() + chrono::microseconds(3323000)));
+  EXPECT_THAT(result[1].second,
+              ::testing::ElementsAre(
+                  realtime_clock::epoch() + chrono::microseconds(13990200),
+                  realtime_clock::epoch() + chrono::microseconds(16313200)));
+
+  EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
+                                                      chrono::seconds(1)));
+  EXPECT_THAT(result[2].second,
+              ::testing::ElementsAre(realtime_clock::epoch() +
+                                     chrono::microseconds(34900150)));
 }
 
 // Tests that local data before remote data after reboot is properly replayed.
@@ -3730,9 +3798,201 @@
   // Confirm that we can parse the result.  LogReader has enough internal CHECKs
   // to confirm the right thing happened.
   const std::vector<LogFile> sorted_parts = SortParts(filenames);
-  ConfirmReadable(filenames);
+  auto result = ConfirmReadable(filenames);
+
+  EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
+  EXPECT_THAT(result[0].second,
+              ::testing::ElementsAre(realtime_clock::epoch() +
+                                     chrono::microseconds(11000350)));
+
+  EXPECT_THAT(result[1].first,
+              ::testing::ElementsAre(
+                  realtime_clock::epoch(),
+                  realtime_clock::epoch() + chrono::microseconds(107005000)));
+  EXPECT_THAT(result[1].second,
+              ::testing::ElementsAre(
+                  realtime_clock::epoch() + chrono::microseconds(4000150),
+                  realtime_clock::epoch() + chrono::microseconds(111000200)));
+
+  EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
+  EXPECT_THAT(result[2].second,
+              ::testing::ElementsAre(realtime_clock::epoch() +
+                                     chrono::microseconds(11000150)));
+
+  auto start_stop_result = ConfirmReadable(
+      filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
+      realtime_clock::epoch() + chrono::milliseconds(3000));
+
+  EXPECT_THAT(start_stop_result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
+                                                      chrono::seconds(2)));
+  EXPECT_THAT(start_stop_result[0].second, ::testing::ElementsAre(realtime_clock::epoch() +
+                                                       chrono::seconds(3)));
+  EXPECT_THAT(start_stop_result[1].first, ::testing::ElementsAre(realtime_clock::epoch() +
+                                                      chrono::seconds(2)));
+  EXPECT_THAT(start_stop_result[1].second, ::testing::ElementsAre(realtime_clock::epoch() +
+                                                       chrono::seconds(3)));
+  EXPECT_THAT(start_stop_result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
+                                                      chrono::seconds(2)));
+  EXPECT_THAT(start_stop_result[2].second, ::testing::ElementsAre(realtime_clock::epoch() +
+                                                       chrono::seconds(3)));
 }
 
+// Tests that setting the start and stop flags across a reboot works as
+// expected.
+TEST(MultinodeRebootLoggerTest, RebootStartStopTimes) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(ArtifactPath(
+          "aos/events/logging/multinode_pingpong_split3_config.json"));
+  message_bridge::TestingTimeConverter time_converter(
+      configuration::NodesCount(&config.message()));
+  SimulatedEventLoopFactory event_loop_factory(&config.message());
+  event_loop_factory.SetTimeConverter(&time_converter);
+  NodeEventLoopFactory *const pi1 =
+      event_loop_factory.GetNodeEventLoopFactory("pi1");
+  const size_t pi1_index = configuration::GetNodeIndex(
+      event_loop_factory.configuration(), pi1->node());
+  NodeEventLoopFactory *const pi2 =
+      event_loop_factory.GetNodeEventLoopFactory("pi2");
+  const size_t pi2_index = configuration::GetNodeIndex(
+      event_loop_factory.configuration(), pi2->node());
+  NodeEventLoopFactory *const pi3 =
+      event_loop_factory.GetNodeEventLoopFactory("pi3");
+  const size_t pi3_index = configuration::GetNodeIndex(
+      event_loop_factory.configuration(), pi3->node());
+
+  const std::string kLogfile1_1 =
+      aos::testing::TestTmpDir() + "/multi_logfile1/";
+  const std::string kLogfile2_1 =
+      aos::testing::TestTmpDir() + "/multi_logfile2.1/";
+  const std::string kLogfile2_2 =
+      aos::testing::TestTmpDir() + "/multi_logfile2.2/";
+  const std::string kLogfile3_1 =
+      aos::testing::TestTmpDir() + "/multi_logfile3/";
+  util::UnlinkRecursive(kLogfile1_1);
+  util::UnlinkRecursive(kLogfile2_1);
+  util::UnlinkRecursive(kLogfile2_2);
+  util::UnlinkRecursive(kLogfile3_1);
+  {
+    CHECK_EQ(pi1_index, 0u);
+    CHECK_EQ(pi2_index, 1u);
+    CHECK_EQ(pi3_index, 2u);
+
+    time_converter.AddNextTimestamp(
+        distributed_clock::epoch(),
+        {BootTimestamp::epoch(), BootTimestamp::epoch(),
+         BootTimestamp::epoch()});
+    const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
+    time_converter.AddNextTimestamp(
+        distributed_clock::epoch() + reboot_time,
+        {BootTimestamp::epoch() + reboot_time,
+         BootTimestamp{.boot = 1,
+                       .time = monotonic_clock::epoch() + reboot_time},
+         BootTimestamp::epoch() + reboot_time});
+  }
+
+  std::vector<std::string> filenames;
+  {
+    LoggerState pi1_logger = LoggerState::MakeLogger(
+        pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+    LoggerState pi3_logger = LoggerState::MakeLogger(
+        pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+    {
+      // And now start the logger.
+      LoggerState pi2_logger = LoggerState::MakeLogger(
+          pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+
+      pi1_logger.StartLogger(kLogfile1_1);
+      pi3_logger.StartLogger(kLogfile3_1);
+      pi2_logger.StartLogger(kLogfile2_1);
+
+      event_loop_factory.RunFor(chrono::milliseconds(1005));
+
+      // Now that we've got a start time in the past, turn on data.
+      std::unique_ptr<aos::EventLoop> ping_event_loop =
+          pi1->MakeEventLoop("ping");
+      Ping ping(ping_event_loop.get());
+
+      pi2->AlwaysStart<Pong>("pong");
+
+      event_loop_factory.RunFor(chrono::milliseconds(3000));
+
+      pi2_logger.AppendAllFilenames(&filenames);
+    }
+    event_loop_factory.RunFor(chrono::milliseconds(995));
+    // pi2 now reboots at 5 seconds.
+    {
+      event_loop_factory.RunFor(chrono::milliseconds(1000));
+
+      // Make local stuff happen before we start logging and connect the remote.
+      pi2->AlwaysStart<Pong>("pong");
+      std::unique_ptr<aos::EventLoop> ping_event_loop =
+          pi1->MakeEventLoop("ping");
+      Ping ping(ping_event_loop.get());
+      event_loop_factory.RunFor(chrono::milliseconds(5));
+
+      // Start logging again on pi2 after it is up.
+      LoggerState pi2_logger = LoggerState::MakeLogger(
+          pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+      pi2_logger.StartLogger(kLogfile2_2);
+
+      event_loop_factory.RunFor(chrono::milliseconds(5000));
+
+      pi2_logger.AppendAllFilenames(&filenames);
+    }
+
+    pi1_logger.AppendAllFilenames(&filenames);
+    pi3_logger.AppendAllFilenames(&filenames);
+  }
+
+  const std::vector<LogFile> sorted_parts = SortParts(filenames);
+  auto result = ConfirmReadable(filenames);
+
+  EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
+  EXPECT_THAT(result[0].second,
+              ::testing::ElementsAre(realtime_clock::epoch() +
+                                     chrono::microseconds(11000350)));
+
+  EXPECT_THAT(result[1].first,
+              ::testing::ElementsAre(
+                  realtime_clock::epoch(),
+                  realtime_clock::epoch() + chrono::microseconds(6005000)));
+  EXPECT_THAT(result[1].second,
+              ::testing::ElementsAre(
+                  realtime_clock::epoch() + chrono::microseconds(4900150),
+                  realtime_clock::epoch() + chrono::microseconds(11000200)));
+
+  EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
+  EXPECT_THAT(result[2].second,
+              ::testing::ElementsAre(realtime_clock::epoch() +
+                                     chrono::microseconds(11000150)));
+
+  // Confirm we observed the correct start and stop times.  We should see the
+  // reboot here.
+  auto start_stop_result = ConfirmReadable(
+      filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
+      realtime_clock::epoch() + chrono::milliseconds(8000));
+
+  EXPECT_THAT(
+      start_stop_result[0].first,
+      ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
+  EXPECT_THAT(
+      start_stop_result[0].second,
+      ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
+  EXPECT_THAT(start_stop_result[1].first,
+              ::testing::ElementsAre(
+                  realtime_clock::epoch() + chrono::seconds(2),
+                  realtime_clock::epoch() + chrono::microseconds(6005000)));
+  EXPECT_THAT(start_stop_result[1].second,
+              ::testing::ElementsAre(
+                  realtime_clock::epoch() + chrono::microseconds(4900150),
+                  realtime_clock::epoch() + chrono::seconds(8)));
+  EXPECT_THAT(
+      start_stop_result[2].first,
+      ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
+  EXPECT_THAT(
+      start_stop_result[2].second,
+      ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(8)));
+}
 
 }  // namespace testing
 }  // namespace logger
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index a2428a4..a1ef95d 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -1268,6 +1268,11 @@
       event_loop->SetIsRunning(true);
     }
   });
+  scheduler_.set_stopped([this]() {
+    for (SimulatedEventLoop *event_loop : event_loops_) {
+      event_loop->SetIsRunning(false);
+    }
+  });
   scheduler_.set_on_shutdown([this]() {
     VLOG(1) << scheduler_.distributed_now() << " " << NodeName(this->node())
             << monotonic_now() << " Shutting down node.";
@@ -1339,7 +1344,7 @@
 
 void NodeEventLoopFactory::Shutdown() {
   for (SimulatedEventLoop *event_loop : event_loops_) {
-    event_loop->SetIsRunning(false);
+    CHECK(!event_loop->is_running());
   }
 
   CHECK(started_);
@@ -1367,12 +1372,11 @@
 
 void SimulatedEventLoopFactory::RunFor(monotonic_clock::duration duration) {
   // This sets running to true too.
-  scheduler_scheduler_.RunOnStartup();
   scheduler_scheduler_.RunFor(duration);
   for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
     if (node) {
       for (SimulatedEventLoop *loop : node->event_loops_) {
-        loop->SetIsRunning(false);
+        CHECK(!loop->is_running());
       }
     }
   }
@@ -1380,12 +1384,11 @@
 
 void SimulatedEventLoopFactory::Run() {
   // This sets running to true too.
-  scheduler_scheduler_.RunOnStartup();
   scheduler_scheduler_.Run();
   for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
     if (node) {
       for (SimulatedEventLoop *loop : node->event_loops_) {
-        loop->SetIsRunning(false);
+        CHECK(!loop->is_running());
       }
     }
   }
@@ -1458,6 +1461,11 @@
   return std::move(result);
 }
 
+void SimulatedEventLoopFactory::AllowApplicationCreationDuring(
+    std::function<void()> fn) {
+  scheduler_scheduler_.TemporarilyStopAndRun(std::move(fn));
+}
+
 void NodeEventLoopFactory::Disconnect(const Node *other) {
   factory_->bridge_->Disconnect(node_, other);
 }
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index bd589d6..9a447e9 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -128,6 +128,11 @@
   // tests.
   void SkipTimingReport();
 
+  // Re-enables application creation for the duration of fn.  This is mostly to
+  // allow use cases like log reading to create applications after the node
+  // starts up without stopping execution.
+  void AllowApplicationCreationDuring(std::function<void()> fn);
+
  private:
   friend class NodeEventLoopFactory;
 
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index a94b895..09202ff 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -181,6 +181,29 @@
   EXPECT_EQ(counter, 0);
 }
 
+// Test that TemporarilyStopAndRun respects and preserves running.
+TEST(EventSchedulerTest, TemporarilyStopAndRun) {
+  int counter = 0;
+  EventSchedulerScheduler scheduler_scheduler;
+  EventScheduler scheduler(0);
+  scheduler_scheduler.AddEventScheduler(&scheduler);
+
+  scheduler_scheduler.TemporarilyStopAndRun(
+      [&]() { CHECK(!scheduler_scheduler.is_running()); });
+  ASSERT_FALSE(scheduler_scheduler.is_running());
+
+  FunctionEvent e([&]() {
+    counter += 1;
+    CHECK(scheduler_scheduler.is_running());
+    scheduler_scheduler.TemporarilyStopAndRun(
+        [&]() { CHECK(!scheduler_scheduler.is_running()); });
+    CHECK(scheduler_scheduler.is_running());
+  });
+  scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+  scheduler_scheduler.Run();
+  EXPECT_EQ(counter, 1);
+}
+
 void SendTestMessage(aos::Sender<TestMessage> *sender, int value) {
   aos::Sender<TestMessage>::Builder builder = sender->MakeBuilder();
   TestMessage::Builder test_message_builder =