Support nodes rebooting

We have log files which span a reboot.  We want to be able to replay the
timeline across that reboot so we can run simulations and everything
else interesting.

This requires a bunch of stuff, unfortunately.

The most visible one is that we need to be able to "reboot" a node.
This means we need a way of starting it up and stopping it.  There are
now OnStartup and OnShutdown handlers in NodeEventLoopFactory to serve
this purpose, and better application context tracking to make it easier
to start and stop applications through a virtual starter.

This requires LogReader and the simulated network bridge to be
refactored to support nodes coming and going while the main application
continues to run.

From there, everything else is just a massive amount of plumbing of the
BootTimestamp through everything just short of the user.  Boot UUIDs
were put in TimeConverter so everything related to rebooting is all
nicely together.

Change-Id: I2cfb659c5764c1dd80dc66f33cfab3937159e324
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/configuration.cc b/aos/configuration.cc
index 24e15d9..2b9ac6d 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -1122,6 +1122,13 @@
   LOG(FATAL) << "Unknown logger config " << static_cast<int>(channel->logger());
 }
 
+size_t ConnectionCount(const Channel *channel) {
+  if (!channel->has_destination_nodes()) {
+    return 0;
+  }
+  return channel->destination_nodes()->size();
+}
+
 const Connection *ConnectionToNode(const Channel *channel, const Node *node) {
   if (!channel->has_destination_nodes()) {
     return nullptr;
diff --git a/aos/configuration.h b/aos/configuration.h
index 8dd4851..ac5c90c 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -134,6 +134,9 @@
 bool ChannelMessageIsLoggedOnNode(const Channel *channel,
                                   std::string_view node_name);
 
+// Returns the number of connections.
+size_t ConnectionCount(const Channel *channel);
+
 const Connection *ConnectionToNode(const Channel *channel, const Node *node);
 // Returns true if the delivery timestamps are supposed to be logged on this
 // node.
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 79d09e4..e27e775 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -375,6 +375,7 @@
         ":simple_channel",
         "//aos:init",
         "//aos:realtime",
+        "//aos/events/logging:boot_timestamp",
         "//aos/events/logging:logger_fbs",
         "//aos/ipc_lib:index",
         "//aos/network:message_bridge_client_status",
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index b0487d5..8728111 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -84,6 +84,13 @@
       configuration_(configuration) {}
 
 EventLoop::~EventLoop() {
+  if(!senders_.empty()) {
+    for (const RawSender *sender : senders_) {
+      LOG(ERROR) << "  Sender "
+                 << configuration::StrippedChannelToString(sender->channel())
+                 << " still open";
+    }
+  }
   CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
   CHECK_EQ(events_.size(), 0u) << ": Not all events unregistered";
 }
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index 668f2b6..fdeaf1e 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -41,11 +41,23 @@
   return events_list_.begin()->first;
 }
 
+void EventScheduler::Shutdown() {
+  on_shutdown_();
+}
+
+void EventScheduler::Startup() {
+  ++boot_count_;
+  RunOnStartup();
+}
+
 void EventScheduler::CallOldestEvent() {
   CHECK_GT(events_list_.size(), 0u);
   auto iter = events_list_.begin();
-  CHECK_EQ(monotonic_now(), iter->first)
-      << ": Time is wrong on node " << node_index_;
+  const logger::BootTimestamp t =
+      FromDistributedClock(scheduler_scheduler_->distributed_now());
+  VLOG(1) << "Got time back " << t;
+  CHECK_EQ(t.boot, boot_count_);
+  CHECK_EQ(t.time, iter->first) << ": Time is wrong on node " << node_index_;
 
   ::std::function<void()> callback = ::std::move(iter->second);
   events_list_.erase(iter);
@@ -68,6 +80,8 @@
   on_startup_.clear();
 }
 
+void EventScheduler::RunStarted() { started_(); }
+
 std::ostream &operator<<(std::ostream &stream,
                          const aos::distributed_clock::time_point &now) {
   // Print it the same way we print a monotonic time.  Literally.
@@ -79,11 +93,55 @@
   CHECK(std::find(schedulers_.begin(), schedulers_.end(), scheduler) ==
         schedulers_.end());
   CHECK(scheduler->scheduler_scheduler_ == nullptr);
+  CHECK_EQ(scheduler->node_index(), schedulers_.size());
 
   schedulers_.emplace_back(scheduler);
   scheduler->scheduler_scheduler_ = this;
 }
 
+void EventSchedulerScheduler::Reboot() {
+  const std::vector<logger::BootTimestamp> &times =
+      std::get<1>(reboots_.front());
+  CHECK_EQ(times.size(), schedulers_.size());
+
+  VLOG(1) << "Rebooting at " << now_;
+  for (const auto &time : times) {
+    VLOG(1) << "  " << time;
+  }
+
+  is_running_ = false;
+
+  // Shut everything down.
+  std::vector<size_t> rebooted;
+  for (size_t node_index = 0; node_index < schedulers_.size(); ++node_index) {
+    if (schedulers_[node_index]->boot_count() == times[node_index].boot) {
+      continue;
+    } else {
+      rebooted.emplace_back(node_index);
+      CHECK_EQ(schedulers_[node_index]->boot_count() + 1,
+               times[node_index].boot);
+      schedulers_[node_index]->Shutdown();
+    }
+  }
+
+  // And start it back up again to reboot.  When something starts back up
+  // (especially message_bridge), it could try to send stuff out.  We want
+  // to move everything over to the new boot before doing that.
+  for (const size_t node_index : rebooted) {
+    CHECK_EQ(schedulers_[node_index]->boot_count() + 1, times[node_index].boot);
+    schedulers_[node_index]->Startup();
+  }
+
+  for (const size_t node_index : rebooted) {
+    schedulers_[node_index]->RunStarted();
+  }
+
+  for (const size_t node_index : rebooted) {
+    schedulers_[node_index]->RunOnRun();
+  }
+  is_running_ = true;
+}
+
 void EventSchedulerScheduler::RunFor(distributed_clock::duration duration) {
   distributed_clock::time_point end_time = now_ + duration;
   logging::ScopedLogRestorer prev_logger;
@@ -93,6 +151,25 @@
   while (is_running_) {
     std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
         OldestEvent();
+    if (!reboots_.empty() &&
+        std::get<0>(reboots_.front()) <= std::get<0>(oldest_event)) {
+      // Reboot is next.
+      if (std::get<0>(reboots_.front()) > end_time) {
+        // Reboot is after our end time, give up.
+        is_running_ = false;
+        break;
+      }
+
+      CHECK_LE(now_,
+               std::get<0>(reboots_.front()) + std::chrono::nanoseconds(1))
+          << ": Simulated time went backwards by too much.  Please "
+             "investigate.";
+      now_ = std::get<0>(reboots_.front());
+      Reboot();
+      reboots_.erase(reboots_.begin());
+      continue;
+    }
+
     // No events left, bail.
     if (std::get<0>(oldest_event) == distributed_clock::max_time ||
         std::get<0>(oldest_event) > end_time) {
@@ -124,6 +201,18 @@
   while (is_running_) {
     std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
         OldestEvent();
+    if (!reboots_.empty() &&
+        std::get<0>(reboots_.front()) <= std::get<0>(oldest_event)) {
+      // Reboot is next.
+      CHECK_LE(now_,
+               std::get<0>(reboots_.front()) + std::chrono::nanoseconds(1))
+          << ": Simulated time went backwards by too much.  Please "
+             "investigate.";
+      now_ = std::get<0>(reboots_.front());
+      Reboot();
+      reboots_.erase(reboots_.begin());
+      continue;
+    }
     // No events left, bail.
     if (std::get<0>(oldest_event) == distributed_clock::max_time) {
       break;
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index d6ab858..cc70757 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -9,6 +9,7 @@
 #include <vector>
 
 #include "aos/events/event_loop.h"
+#include "aos/events/logging/boot_timestamp.h"
 #include "aos/logging/implementations.h"
 #include "aos/time/time.h"
 #include "glog/logging.h"
@@ -49,18 +50,35 @@
  public:
   virtual ~TimeConverter() {}
 
+  // Returns the boot UUID for a node and boot.  Note: the boot UUID for
+  // subsequent calls needs to be the same each time.
+  virtual UUID boot_uuid(size_t node_index, size_t boot_count) = 0;
+
+  void set_reboot_found(
+      std::function<void(distributed_clock::time_point,
+                         const std::vector<logger::BootTimestamp> &)>
+          fn) {
+    reboot_found_ = fn;
+  }
+
   // Converts a time to the distributed clock for scheduling and cross-node
   // time measurement.
   virtual distributed_clock::time_point ToDistributedClock(
-      size_t node_index, monotonic_clock::time_point time) = 0;
+      size_t node_index, logger::BootTimestamp time) = 0;
 
   // Takes the distributed time and converts it to the monotonic clock for this
   // node.
-  virtual monotonic_clock::time_point FromDistributedClock(
-      size_t node_index, distributed_clock::time_point time) = 0;
+  virtual logger::BootTimestamp FromDistributedClock(
+      size_t node_index, distributed_clock::time_point time,
+      size_t boot_count) = 0;
 
   // Called whenever time passes this point and we can forget about it.
   virtual void ObserveTimePassed(distributed_clock::time_point time) = 0;
+
+ protected:
+  std::function<void(distributed_clock::time_point,
+                     const std::vector<logger::BootTimestamp> &)>
+      reboot_found_;
 };
 
 class EventSchedulerScheduler;
@@ -70,14 +88,19 @@
   using ChannelType =
       std::multimap<monotonic_clock::time_point, std::function<void()>>;
   using Token = ChannelType::iterator;
+  EventScheduler(size_t node_index) : node_index_(node_index) {}
 
   // Sets the time converter in use for this scheduler (and the corresponding
   // node index)
   void SetTimeConverter(size_t node_index, TimeConverter *converter) {
-    node_index_ = node_index;
+    CHECK_EQ(node_index_, node_index);
     converter_ = converter;
   }
 
+  UUID boot_uuid() {
+    return converter_->boot_uuid(node_index_, boot_count_);
+  }
+
   // Schedule an event with a callback function
   // Returns an iterator to the event
   Token Schedule(monotonic_clock::time_point time,
@@ -93,6 +116,17 @@
     on_startup_.emplace_back(std::move(callback));
   }
 
+  void set_on_shutdown(std::function<void()> callback) {
+    on_shutdown_ = std::move(callback);
+  }
+
+  void set_started(std::function<void()> callback) {
+    started_ = std::move(callback);
+  }
+
+  std::function<void()> started_;
+  std::function<void()> on_shutdown_;
+
   Token InvalidToken() { return events_list_.end(); }
 
   // Deschedule an event by its iterator
@@ -104,11 +138,14 @@
   // Runs the OnStartup callbacks.
   void RunOnStartup();
 
+  // Runs the Started callback.
+  void RunStarted();
+
   // Returns true if events are being handled.
   inline bool is_running() const;
 
   // Returns the timestamp of the next event to trigger.
-  aos::monotonic_clock::time_point OldestEvent();
+  monotonic_clock::time_point OldestEvent();
   // Handles the next event.
   void CallOldestEvent();
 
@@ -116,25 +153,41 @@
   // measurement.
   distributed_clock::time_point ToDistributedClock(
       monotonic_clock::time_point time) const {
-    return converter_->ToDistributedClock(node_index_, time);
+    return converter_->ToDistributedClock(node_index_,
+                                          {.boot = boot_count_, .time = time});
   }
 
   // Takes the distributed time and converts it to the monotonic clock for this
   // node.
-  monotonic_clock::time_point FromDistributedClock(
+  logger::BootTimestamp FromDistributedClock(
       distributed_clock::time_point time) const {
-    return converter_->FromDistributedClock(node_index_, time);
+    return converter_->FromDistributedClock(node_index_, time, boot_count_);
   }
 
   // Returns the current monotonic time on this node calculated from the
   // distributed clock.
   inline monotonic_clock::time_point monotonic_now() const;
 
+  // Returns the current monotonic time on this node calculated from the
+  // distributed clock.
+  inline distributed_clock::time_point distributed_now() const;
+
+  size_t boot_count() const { return boot_count_; }
+
+  size_t node_index() const { return node_index_; }
+
+  // For implementing reboots.
+  void Shutdown();
+  void Startup();
+
  private:
   friend class EventSchedulerScheduler;
+
   // Current execution time.
   monotonic_clock::time_point monotonic_now_ = monotonic_clock::epoch();
 
+  size_t boot_count_ = 0;
+
   // List of functions to run (once) when running.
   std::vector<std::function<void()>> on_run_;
   std::vector<std::function<void()>> on_startup_;
@@ -155,16 +208,29 @@
   class UnityConverter final : public TimeConverter {
    public:
     distributed_clock::time_point ToDistributedClock(
-        size_t /*node_index*/, monotonic_clock::time_point time) override {
-      return distributed_clock::epoch() + time.time_since_epoch();
+        size_t /*node_index*/, logger::BootTimestamp time) override {
+      CHECK_EQ(time.boot, 0u) << ": Reboots unsupported by default.";
+      return distributed_clock::epoch() + time.time.time_since_epoch();
     }
 
-    monotonic_clock::time_point FromDistributedClock(
-        size_t /*node_index*/, distributed_clock::time_point time) override {
-      return monotonic_clock::epoch() + time.time_since_epoch();
+    logger::BootTimestamp FromDistributedClock(
+        size_t /*node_index*/, distributed_clock::time_point time,
+        size_t boot_count) override {
+      CHECK_EQ(boot_count, 0u);
+      return logger::BootTimestamp{
+          .boot = boot_count,
+          .time = monotonic_clock::epoch() + time.time_since_epoch()};
     }
 
     void ObserveTimePassed(distributed_clock::time_point /*time*/) override {}
+
+    UUID boot_uuid(size_t /*node_index*/, size_t boot_count) override {
+      CHECK_EQ(boot_count, 0u);
+      return uuid_;
+    }
+
+   private:
+    const UUID uuid_ = UUID::Random();
   };
 
   UnityConverter unity_converter_;
@@ -210,6 +276,20 @@
     for (EventScheduler *scheduler : schedulers_) {
       scheduler->RunOnStartup();
     }
+    for (EventScheduler *scheduler : schedulers_) {
+      scheduler->RunStarted();
+    }
+  }
+
+  void SetTimeConverter(TimeConverter *time_converter) {
+    time_converter->set_reboot_found(
+        [this](distributed_clock::time_point reboot_time,
+               const std::vector<logger::BootTimestamp> &node_times) {
+          if (!reboots_.empty()) {
+            CHECK_GT(reboot_time, std::get<0>(reboots_.back()));
+          }
+          reboots_.emplace_back(reboot_time, node_times);
+        });
   }
 
  private:
@@ -222,6 +302,8 @@
     }
   }
 
+  void Reboot();
+
   // Returns the next event time and scheduler on which to run it.
   std::tuple<distributed_clock::time_point, EventScheduler *> OldestEvent();
 
@@ -231,10 +313,22 @@
   distributed_clock::time_point now_ = distributed_clock::epoch();
   // List of schedulers to run in sync.
   std::vector<EventScheduler *> schedulers_;
+
+  // List of when to reboot each node.
+  std::vector<std::tuple<distributed_clock::time_point,
+                         std::vector<logger::BootTimestamp>>>
+      reboots_;
 };
 
+inline distributed_clock::time_point EventScheduler::distributed_now() const {
+  return scheduler_scheduler_->distributed_now();
+}
 inline monotonic_clock::time_point EventScheduler::monotonic_now() const {
-  return FromDistributedClock(scheduler_scheduler_->distributed_now());
+  const logger::BootTimestamp t =
+      FromDistributedClock(scheduler_scheduler_->distributed_now());
+  CHECK_EQ(t.boot, boot_count_) << ": " << " " << t << " d "
+                                << scheduler_scheduler_->distributed_now();
+  return t.time;
 }
 
 inline bool EventScheduler::is_running() const {
diff --git a/aos/events/event_scheduler_test.cc b/aos/events/event_scheduler_test.cc
index 3bcf6ca..54fb91a 100644
--- a/aos/events/event_scheduler_test.cc
+++ b/aos/events/event_scheduler_test.cc
@@ -7,6 +7,7 @@
 namespace aos {
 
 namespace chrono = std::chrono;
+using aos::logger::BootTimestamp;
 
 // Legacy time converter for keeping old tests working.  Has numerical precision
 // problems.
@@ -14,7 +15,12 @@
  public:
   SlopeOffsetTimeConverter(size_t nodes_count)
       : distributed_offset_(nodes_count, std::chrono::seconds(0)),
-        distributed_slope_(nodes_count, 1.0) {}
+        distributed_slope_(nodes_count, 1.0) {
+    uuids_.reserve(nodes_count);
+    while (uuids_.size() < nodes_count) {
+      uuids_.emplace_back(UUID::Random());
+    }
+  }
 
   // Sets the offset between the distributed and monotonic clock.
   //   monotonic = distributed * slope + offset;
@@ -26,19 +32,29 @@
   }
 
   distributed_clock::time_point ToDistributedClock(
-      size_t node_index, monotonic_clock::time_point time) override {
+      size_t node_index, BootTimestamp time) override {
+    CHECK_EQ(time.boot, 0u);
     return distributed_clock::epoch() +
            std::chrono::duration_cast<std::chrono::nanoseconds>(
                (time.time_since_epoch() - distributed_offset_[node_index]) /
                distributed_slope_[node_index]);
   }
 
-  monotonic_clock::time_point FromDistributedClock(
-      size_t node_index, distributed_clock::time_point time) override {
-    return monotonic_clock::epoch() +
-           std::chrono::duration_cast<std::chrono::nanoseconds>(
-               time.time_since_epoch() * distributed_slope_[node_index]) +
-           distributed_offset_[node_index];
+  BootTimestamp FromDistributedClock(size_t node_index,
+                                     distributed_clock::time_point time,
+                                     size_t boot_index) override {
+    CHECK_EQ(boot_index, 0u);
+    return {
+        .boot = 0u,
+        .time = monotonic_clock::epoch() +
+                std::chrono::duration_cast<std::chrono::nanoseconds>(
+                    time.time_since_epoch() * distributed_slope_[node_index]) +
+                distributed_offset_[node_index]};
+  }
+
+  UUID boot_uuid(size_t node_index, size_t boot_count) override {
+    CHECK_EQ(boot_count, 0u);
+    return uuids_[node_index];
   }
 
   void ObserveTimePassed(distributed_clock::time_point /*time*/) override {}
@@ -48,20 +64,21 @@
   //   distributed = monotonic + offset;
   std::vector<std::chrono::nanoseconds> distributed_offset_;
   std::vector<double> distributed_slope_;
+  std::vector<UUID> uuids_;
 };
 
 // Tests that the default parameters (slope of 1, offest of 0) behave as
 // an identity.
 TEST(EventSchedulerTest, IdentityTimeConversion) {
   SlopeOffsetTimeConverter time(1);
-  EventScheduler s;
+  EventScheduler s(0);
   s.SetTimeConverter(0u, &time);
   EXPECT_EQ(s.FromDistributedClock(distributed_clock::epoch()),
-            monotonic_clock::epoch());
+            BootTimestamp::epoch());
 
   EXPECT_EQ(
       s.FromDistributedClock(distributed_clock::epoch() + chrono::seconds(1)),
-      monotonic_clock::epoch() + chrono::seconds(1));
+      BootTimestamp::epoch() + chrono::seconds(1));
 
   EXPECT_EQ(s.ToDistributedClock(monotonic_clock::epoch()),
             distributed_clock::epoch());
@@ -73,16 +90,16 @@
 // Tests that a non-unity slope is computed correctly.
 TEST(EventSchedulerTest, DoubleTimeConversion) {
   SlopeOffsetTimeConverter time(1);
-  EventScheduler s;
+  EventScheduler s(0);
   s.SetTimeConverter(0u, &time);
   time.SetDistributedOffset(0u, std::chrono::seconds(7), 2.0);
 
   EXPECT_EQ(s.FromDistributedClock(distributed_clock::epoch()),
-            monotonic_clock::epoch() + chrono::seconds(7));
+            BootTimestamp::epoch() + chrono::seconds(7));
 
   EXPECT_EQ(
       s.FromDistributedClock(distributed_clock::epoch() + chrono::seconds(1)),
-      monotonic_clock::epoch() + chrono::seconds(9));
+      BootTimestamp::epoch() + chrono::seconds(9));
 
   EXPECT_EQ(s.ToDistributedClock(monotonic_clock::epoch() + chrono::seconds(7)),
             distributed_clock::epoch());
diff --git a/aos/events/logging/boot_timestamp.cc b/aos/events/logging/boot_timestamp.cc
index cdd33b6..b3511f4 100644
--- a/aos/events/logging/boot_timestamp.cc
+++ b/aos/events/logging/boot_timestamp.cc
@@ -17,4 +17,10 @@
             << ", .duration=" << duration.duration.count() << "ns}";
 }
 
+std::ostream &operator<<(std::ostream &os,
+                         const struct BootQueueIndex &queue_index) {
+  return os << "{.boot=" << queue_index.boot
+            << ", .index=" << queue_index.index << "}";
+}
+
 }  // namespace aos::logger
diff --git a/aos/events/logging/boot_timestamp.h b/aos/events/logging/boot_timestamp.h
index 64c824a..dac6533 100644
--- a/aos/events/logging/boot_timestamp.h
+++ b/aos/events/logging/boot_timestamp.h
@@ -70,9 +70,47 @@
   }
 };
 
+// Structure to hold both a boot and queue index.  Queue indices reset after
+// reboot, so we need to track them.
+struct BootQueueIndex {
+  // Boot number for this queue index.
+  size_t boot = std::numeric_limits<size_t>::max();
+  // Queue index.
+  uint32_t index = std::numeric_limits<uint32_t>::max();
+
+  // Returns a QueueIndex representing an invalid index.  Since
+  // std::numeric_limits<uint32_t>::max() is never used in the QueueIndex code
+  // and is reserved as an Invalid value, this will never collide.
+  static BootQueueIndex Invalid() {
+    return {.boot = std::numeric_limits<size_t>::max(),
+            .index = std::numeric_limits<uint32_t>::max()};
+  }
+
+  bool operator==(const BootQueueIndex &b2) const {
+    return index == b2.index && boot == b2.boot;
+  }
+  bool operator!=(const BootQueueIndex &b2) const {
+    return index != b2.index || boot != b2.boot;
+  }
+  bool operator<(const BootQueueIndex &b2) const {
+    if (boot == b2.boot) {
+      return index < b2.index;
+    }
+    return boot < b2.boot;
+  }
+  bool operator>(const BootQueueIndex &b2) const {
+    if (boot == b2.boot) {
+      return index > b2.index;
+    }
+    return boot > b2.boot;
+  }
+};
+
 std::ostream &operator<<(std::ostream &os,
                          const struct BootTimestamp &timestamp);
 std::ostream &operator<<(std::ostream &os, const struct BootDuration &duration);
+std::ostream &operator<<(std::ostream &os,
+                         const struct BootQueueIndex &queue_index);
 
 inline bool BootTimestamp::operator<(const BootTimestamp &m2) const {
   if (boot != m2.boot) {
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index 5079606..5a9040e 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -79,6 +79,223 @@
   }
 }
 
+// Prints out raw log parts to stdout.
+int PrintRaw(int argc, char **argv) {
+  if (argc != 2) {
+    LOG(FATAL) << "Expected 1 logfile as an argument.";
+  }
+  aos::logger::SpanReader reader(argv[1]);
+  absl::Span<const uint8_t> raw_log_file_header_span = reader.ReadMessage();
+
+  if (raw_log_file_header_span == absl::Span<const uint8_t>()) {
+    LOG(WARNING) << "Empty log file on " << reader.filename();
+    return 0;
+  }
+
+  // Now, reproduce the log file header deduplication logic inline so we can
+  // print out all the headers we find.
+  aos::SizePrefixedFlatbufferVector<aos::logger::LogFileHeader> log_file_header(
+      raw_log_file_header_span);
+  if (!log_file_header.Verify()) {
+    LOG(ERROR) << "Header corrupted on " << reader.filename();
+    return 1;
+  }
+  while (true) {
+    absl::Span<const uint8_t> maybe_header_data = reader.PeekMessage();
+    if (maybe_header_data == absl::Span<const uint8_t>()) {
+      break;
+    }
+
+    aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
+        maybe_header_data);
+    if (maybe_header.Verify()) {
+      std::cout << aos::FlatbufferToJson(
+                       log_file_header, {.multi_line = FLAGS_pretty,
+                                         .max_vector_size = static_cast<size_t>(
+                                             FLAGS_max_vector_size)})
+                << std::endl;
+      LOG(WARNING) << "Found duplicate LogFileHeader in " << reader.filename();
+      log_file_header =
+          aos::SizePrefixedFlatbufferVector<aos::logger::LogFileHeader>(
+              maybe_header_data);
+
+      reader.ConsumeMessage();
+    } else {
+      break;
+    }
+  }
+
+  // And now use the final sha256 to match the raw_header.
+  std::optional<aos::logger::MessageReader> raw_header_reader;
+  const aos::logger::LogFileHeader *full_header = &log_file_header.message();
+  if (!FLAGS_raw_header.empty()) {
+    raw_header_reader.emplace(FLAGS_raw_header);
+    std::cout << aos::FlatbufferToJson(full_header,
+                                       {.multi_line = FLAGS_pretty,
+                                        .max_vector_size = static_cast<size_t>(
+                                            FLAGS_max_vector_size)})
+              << std::endl;
+    CHECK_EQ(
+        full_header->configuration_sha256()->string_view(),
+        aos::logger::Sha256(raw_header_reader->raw_log_file_header().span()));
+    full_header = raw_header_reader->log_file_header();
+  }
+
+  if (!FLAGS_print) {
+    return 0;
+  }
+
+  std::cout << aos::FlatbufferToJson(full_header,
+                                     {.multi_line = FLAGS_pretty,
+                                      .max_vector_size = static_cast<size_t>(
+                                          FLAGS_max_vector_size)})
+            << std::endl;
+  CHECK(full_header->has_configuration())
+      << ": Missing configuration! You may want to provide the path to the "
+         "logged configuration file using the --raw_header flag.";
+
+  while (true) {
+    const aos::SizePrefixedFlatbufferSpan<aos::logger::MessageHeader> message(
+        reader.ReadMessage());
+    if (message.span() == absl::Span<const uint8_t>()) {
+      break;
+    }
+    CHECK(message.Verify());
+
+    const auto *const channels = full_header->configuration()->channels();
+    const size_t channel_index = message.message().channel_index();
+    CHECK_LT(channel_index, channels->size());
+    const aos::Channel *const channel = channels->Get(channel_index);
+
+    CHECK(message.Verify()) << absl::BytesToHexString(
+        std::string_view(reinterpret_cast<const char *>(message.span().data()),
+                         message.span().size()));
+
+    if (message.message().data() != nullptr) {
+      CHECK(channel->has_schema());
+
+      CHECK(flatbuffers::Verify(
+          *channel->schema(), *channel->schema()->root_table(),
+          message.message().data()->data(), message.message().data()->size()))
+          << ": Corrupted flatbuffer on " << channel->name()->c_str() << " "
+          << channel->type()->c_str();
+    }
+
+    if (FLAGS_format_raw && message.message().data() != nullptr) {
+      std::cout << aos::configuration::StrippedChannelToString(channel) << " "
+                << aos::FlatbufferToJson(message, {.multi_line = FLAGS_pretty,
+                                                   .max_vector_size = 4})
+                << ": "
+                << aos::FlatbufferToJson(
+                       channel->schema(), message.message().data()->data(),
+                       {FLAGS_pretty,
+                        static_cast<size_t>(FLAGS_max_vector_size)})
+                << std::endl;
+    } else {
+      std::cout << aos::configuration::StrippedChannelToString(channel) << " "
+                << aos::FlatbufferToJson(
+                       message, {FLAGS_pretty,
+                                 static_cast<size_t>(FLAGS_max_vector_size)})
+                << std::endl;
+    }
+  }
+  return 0;
+}
+
+// This class prints out all data from a node on a boot.
+class NodePrinter {
+ public:
+  NodePrinter(aos::EventLoop *event_loop, uint64_t *message_print_counter,
+              aos::SimulatedEventLoopFactory *factory,
+              aos::FastStringBuilder *builder)
+      : factory_(factory),
+        event_loop_(event_loop),
+        message_print_counter_(message_print_counter),
+        node_name_(
+            event_loop_->node() == nullptr
+                ? ""
+                : std::string(event_loop->node()->name()->string_view()) + " "),
+        builder_(builder) {
+    event_loop_->SkipTimingReport();
+    event_loop_->SkipAosLog();
+
+    const flatbuffers::Vector<flatbuffers::Offset<aos::Channel>> *channels =
+        event_loop_->configuration()->channels();
+
+    for (flatbuffers::uoffset_t i = 0; i < channels->size(); i++) {
+      const aos::Channel *channel = channels->Get(i);
+      const flatbuffers::string_view name = channel->name()->string_view();
+      const flatbuffers::string_view type = channel->type()->string_view();
+      if (name.find(FLAGS_name) != std::string::npos &&
+          type.find(FLAGS_type) != std::string::npos) {
+        if (!aos::configuration::ChannelIsReadableOnNode(channel,
+                                                         event_loop_->node())) {
+          continue;
+        }
+        VLOG(1) << "Listening on " << name << " " << type;
+
+        CHECK_NOTNULL(channel->schema());
+        event_loop_->MakeRawWatcher(
+            channel, [this, channel](const aos::Context &context,
+                                     const void * /*message*/) {
+              if (!FLAGS_print) {
+                return;
+              }
+
+              if (!FLAGS_fetch && !started_) {
+                return;
+              }
+
+              PrintMessage(node_name_, channel, context, builder_);
+              ++(*message_print_counter_);
+              if (FLAGS_count > 0 && *message_print_counter_ >= FLAGS_count) {
+                factory_->Exit();
+              }
+            });
+      }
+    }
+  }
+
+  void SetStarted(bool started, aos::monotonic_clock::time_point monotonic_now,
+                  aos::realtime_clock::time_point realtime_now) {
+    started_ = started;
+    if (started_) {
+      std::cout << std::endl;
+      std::cout << (event_loop_->node() != nullptr
+                        ? (event_loop_->node()->name()->str() + " ")
+                        : "")
+                << "Log starting at " << realtime_now << " (" << monotonic_now
+                << ")";
+      std::cout << std::endl << std::endl;
+    } else {
+      std::cout << std::endl;
+      std::cout << (event_loop_->node() != nullptr
+                        ? (event_loop_->node()->name()->str() + " ")
+                        : "")
+                << "Log shutting down at " << realtime_now << " ("
+                << monotonic_now << ")";
+      std::cout << std::endl << std::endl;
+    }
+  }
+
+ private:
+  struct MessageInfo {
+    std::string node_name;
+    std::unique_ptr<aos::RawFetcher> fetcher;
+  };
+
+  aos::SimulatedEventLoopFactory *factory_;
+  aos::EventLoop *event_loop_;
+
+  uint64_t *message_print_counter_ = nullptr;
+
+  std::string node_name_;
+
+  bool started_ = false;
+
+  aos::FastStringBuilder *builder_;
+};
+
 int main(int argc, char **argv) {
   gflags::SetUsageMessage(
       "Usage:\n"
@@ -95,138 +312,15 @@
   aos::InitGoogle(&argc, &argv);
 
   if (FLAGS_raw) {
-    if (argc != 2) {
-      LOG(FATAL) << "Expected 1 logfile as an argument.";
-    }
-    aos::logger::SpanReader reader(argv[1]);
-    absl::Span<const uint8_t> raw_log_file_header_span = reader.ReadMessage();
-
-    if (raw_log_file_header_span == absl::Span<const uint8_t>()) {
-      LOG(WARNING) << "Empty log file on " << reader.filename();
-      return 0;
-    }
-
-    // Now, reproduce the log file header deduplication logic inline so we can
-    // print out all the headers we find.
-    aos::SizePrefixedFlatbufferVector<aos::logger::LogFileHeader>
-        log_file_header(raw_log_file_header_span);
-    if (!log_file_header.Verify()) {
-      LOG(ERROR) << "Header corrupted on " << reader.filename();
-      return 1;
-    }
-    while (true) {
-      absl::Span<const uint8_t> maybe_header_data = reader.PeekMessage();
-      if (maybe_header_data == absl::Span<const uint8_t>()) {
-        break;
-      }
-
-      aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
-          maybe_header_data);
-      if (maybe_header.Verify()) {
-        std::cout << aos::FlatbufferToJson(
-                         log_file_header,
-                         {.multi_line = FLAGS_pretty,
-                          .max_vector_size =
-                              static_cast<size_t>(FLAGS_max_vector_size)})
-                  << std::endl;
-        LOG(WARNING) << "Found duplicate LogFileHeader in "
-                     << reader.filename();
-        log_file_header =
-            aos::SizePrefixedFlatbufferVector<aos::logger::LogFileHeader>(
-                maybe_header_data);
-
-        reader.ConsumeMessage();
-      } else {
-        break;
-      }
-    }
-
-    // And now use the final sha256 to match the raw_header.
-    std::optional<aos::logger::MessageReader> raw_header_reader;
-    const aos::logger::LogFileHeader *full_header = &log_file_header.message();
-    if (!FLAGS_raw_header.empty()) {
-      raw_header_reader.emplace(FLAGS_raw_header);
-      std::cout << aos::FlatbufferToJson(
-                       full_header, {.multi_line = FLAGS_pretty,
-                                     .max_vector_size = static_cast<size_t>(
-                                         FLAGS_max_vector_size)})
-                << std::endl;
-      CHECK_EQ(
-          full_header->configuration_sha256()->string_view(),
-          aos::logger::Sha256(raw_header_reader->raw_log_file_header().span()));
-      full_header = raw_header_reader->log_file_header();
-    }
-
-    if (!FLAGS_print) {
-      return 0;
-    }
-
-    std::cout << aos::FlatbufferToJson(full_header,
-                                       {.multi_line = FLAGS_pretty,
-                                        .max_vector_size = static_cast<size_t>(
-                                            FLAGS_max_vector_size)})
-              << std::endl;
-    CHECK(full_header->has_configuration())
-        << ": Missing configuration! You may want to provide the path to the "
-           "logged configuration file using the --raw_header flag.";
-
-    while (true) {
-      const aos::SizePrefixedFlatbufferSpan<aos::logger::MessageHeader> message(
-          reader.ReadMessage());
-      if (message.span() == absl::Span<const uint8_t>()) {
-        break;
-      }
-      CHECK(message.Verify());
-
-      const auto *const channels = full_header->configuration()->channels();
-      const size_t channel_index = message.message().channel_index();
-      CHECK_LT(channel_index, channels->size());
-      const aos::Channel *const channel = channels->Get(channel_index);
-
-      CHECK(message.Verify()) << absl::BytesToHexString(std::string_view(
-          reinterpret_cast<const char *>(message.span().data()),
-          message.span().size()));
-
-      if (message.message().data() != nullptr) {
-        CHECK(channel->has_schema());
-
-        CHECK(flatbuffers::Verify(
-            *channel->schema(), *channel->schema()->root_table(),
-            message.message().data()->data(), message.message().data()->size()))
-            << ": Corrupted flatbuffer on " << channel->name()->c_str() << " "
-            << channel->type()->c_str();
-      }
-
-      if (FLAGS_format_raw && message.message().data() != nullptr) {
-        std::cout << aos::configuration::StrippedChannelToString(channel) << " "
-                  << aos::FlatbufferToJson(message, {.multi_line = FLAGS_pretty,
-                                                     .max_vector_size = 4})
-                  << ": "
-                  << aos::FlatbufferToJson(
-                         channel->schema(), message.message().data()->data(),
-                         {FLAGS_pretty,
-                          static_cast<size_t>(FLAGS_max_vector_size)})
-                  << std::endl;
-      } else {
-        std::cout << aos::configuration::StrippedChannelToString(channel) << " "
-                  << aos::FlatbufferToJson(
-                         message, {FLAGS_pretty,
-                                   static_cast<size_t>(FLAGS_max_vector_size)})
-                  << std::endl;
-      }
-    }
-    return 0;
+    return PrintRaw(argc, argv);
   }
 
   if (argc < 2) {
     LOG(FATAL) << "Expected at least 1 logfile as an argument.";
   }
 
-  const std::vector<std::string> unsorted_logfiles =
-      aos::logger::FindLogs(argc, argv);
-
   const std::vector<aos::logger::LogFile> logfiles =
-      aos::logger::SortParts(unsorted_logfiles);
+      aos::logger::SortParts(aos::logger::FindLogs(argc, argv));
 
   for (auto &it : logfiles) {
     VLOG(1) << it;
@@ -249,32 +343,10 @@
     return 0;
   }
 
-  aos::FastStringBuilder builder;
-
-  aos::SimulatedEventLoopFactory event_loop_factory(reader.configuration());
-  reader.Register(&event_loop_factory);
-
-  std::vector<std::unique_ptr<aos::EventLoop>> printer_event_loops;
-
-  bool found_channel = false;
-
-  uint64_t message_print_counter = 0;
-
-  for (const aos::Node *node :
-       aos::configuration::GetNodes(event_loop_factory.configuration())) {
-    std::unique_ptr<aos::EventLoop> printer_event_loop =
-        event_loop_factory.MakeEventLoop("printer", node);
-    printer_event_loop->SkipTimingReport();
-    printer_event_loop->SkipAosLog();
-
-    struct MessageInfo {
-      std::string node_name;
-      std::unique_ptr<aos::RawFetcher> fetcher;
-    };
-    std::vector<MessageInfo> messages_before_start;
-
+  {
+    bool found_channel = false;
     const flatbuffers::Vector<flatbuffers::Offset<aos::Channel>> *channels =
-        printer_event_loop->configuration()->channels();
+        reader.configuration()->channels();
 
     for (flatbuffers::uoffset_t i = 0; i < channels->size(); i++) {
       const aos::Channel *channel = channels->Get(i);
@@ -282,91 +354,54 @@
       const flatbuffers::string_view type = channel->type()->string_view();
       if (name.find(FLAGS_name) != std::string::npos &&
           type.find(FLAGS_type) != std::string::npos) {
-        if (!aos::configuration::ChannelIsReadableOnNode(
-                channel, printer_event_loop->node())) {
-          continue;
-        }
-        VLOG(1) << "Listening on " << name << " " << type;
-
-        std::string node_name =
-            node == nullptr ? ""
-                            : std::string(node->name()->string_view()) + " ";
-
-        CHECK_NOTNULL(channel->schema());
-
-        // Fetch the last message on this channel from before the log start
-        // time.
-        if (FLAGS_fetch) {
-          std::unique_ptr<aos::RawFetcher> fetcher =
-              printer_event_loop->MakeRawFetcher(channel);
-          if (fetcher->Fetch()) {
-            MessageInfo message{.node_name = node_name,
-                                .fetcher = std::move(fetcher)};
-            // Insert it sorted into the vector so we can print in time order
-            // instead of channel order at the start.
-            auto it = std::lower_bound(
-                messages_before_start.begin(), messages_before_start.end(),
-                message, [](const MessageInfo &lhs, const MessageInfo &rhs) {
-                  if (lhs.fetcher->context().monotonic_event_time <
-                      rhs.fetcher->context().monotonic_event_time) {
-                    return true;
-                  }
-                  if (lhs.fetcher->context().monotonic_event_time >
-                      rhs.fetcher->context().monotonic_event_time) {
-                    return false;
-                  }
-                  return lhs.fetcher->channel() < rhs.fetcher->channel();
-                });
-            messages_before_start.insert(it, std::move(message));
-          }
-        }
-
-        printer_event_loop->MakeRawWatcher(
-            channel, [channel, node_name, &builder, &event_loop_factory,
-                      &message_print_counter](const aos::Context &context,
-                                               const void * /*message*/) {
-              if (FLAGS_print) {
-                PrintMessage(node_name, channel, context, &builder);
-                ++message_print_counter;
-                if (FLAGS_count > 0 && message_print_counter >= FLAGS_count) {
-                  event_loop_factory.Exit();
-                }
-              }
-            });
         found_channel = true;
       }
     }
-
-    // Print the messages from before the log start time.
-    // TODO(austin): Sort between nodes too when it becomes annoying enough.
-    for (const MessageInfo &message : messages_before_start) {
-      if (FLAGS_print) {
-        PrintMessage(message.node_name, message.fetcher->channel(),
-                     message.fetcher->context(), &builder);
-        ++message_print_counter;
-        if (FLAGS_count > 0 && message_print_counter >= FLAGS_count) {
-          // We are done.  Clean up and exit.
-          reader.Deregister();
-          return 0;
-        }
-      }
+    if (!found_channel) {
+      LOG(FATAL) << "Could not find any channels";
     }
-    printer_event_loops.emplace_back(std::move(printer_event_loop));
-
-    std::cout << std::endl;
-    std::cout << (node != nullptr ? (node->name()->str() + " ") : "")
-              << "Log starting at " << reader.realtime_start_time(node) << " ("
-              << reader.monotonic_start_time(node) << ")";
-    std::cout << std::endl << std::endl;
   }
 
-  if (!found_channel) {
-    LOG(FATAL) << "Could not find any channels";
-  }
+  aos::FastStringBuilder builder;
 
-  if (FLAGS_fetch) {
-    // New line to separate fetched messages from non-fetched messages.
-    std::cout << std::endl;
+  aos::SimulatedEventLoopFactory event_loop_factory(reader.configuration());
+
+  reader.RegisterWithoutStarting(&event_loop_factory);
+
+  uint64_t message_print_counter = 0;
+
+  std::vector<NodePrinter *> printers;
+  printers.resize(
+      aos::configuration::NodesCount(event_loop_factory.configuration()),
+      nullptr);
+
+  for (const aos::Node *node :
+       aos::configuration::GetNodes(event_loop_factory.configuration())) {
+    size_t node_index = aos::configuration::GetNodeIndex(
+        event_loop_factory.configuration(), node);
+    // Spin up the printer, and hook up the SetStarted method so that it gets
+    // notified when the log starts and stops.
+    aos::NodeEventLoopFactory *node_factory =
+        event_loop_factory.GetNodeEventLoopFactory(node);
+    node_factory->OnStartup([&event_loop_factory, node_factory,
+                             &message_print_counter, &builder, &printers,
+                             node_index]() {
+      printers[node_index] = node_factory->AlwaysStart<NodePrinter>(
+          "printer", &message_print_counter, &event_loop_factory, &builder);
+    });
+    node_factory->OnShutdown(
+        [&printers, node_index]() { printers[node_index] = nullptr; });
+
+    reader.OnStart(node, [&printers, node_index, node_factory]() {
+      CHECK(printers[node_index]);
+      printers[node_index]->SetStarted(true, node_factory->monotonic_now(),
+                                       node_factory->realtime_now());
+    });
+    reader.OnEnd(node, [&printers, node_index, node_factory]() {
+      CHECK(printers[node_index]);
+      printers[node_index]->SetStarted(false, node_factory->monotonic_now(),
+                                       node_factory->realtime_now());
+    });
   }
 
   event_loop_factory.Run();
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index a880f35..f13b215 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -38,6 +38,7 @@
 void NewDataWriter::Rotate() {
   // No need to rotate if nothing has been written.
   if (header_written_) {
+    VLOG(1) << "Rotated " << filename();
     ++parts_index_;
     reopen_(this);
     header_written_ = false;
@@ -57,9 +58,12 @@
     const monotonic_clock::time_point monotonic_remote_time,
     const monotonic_clock::time_point monotonic_event_time,
     const bool reliable) {
+  // Trigger rotation if anything in the header changes.
   bool rotate = false;
   CHECK_LT(remote_node_index, state_.size());
   State &state = state_[remote_node_index];
+
+  // Did the remote boot UUID change?
   if (state.boot_uuid != remote_node_boot_uuid) {
     VLOG(1) << filename() << " Remote " << remote_node_index << " updated to "
             << remote_node_boot_uuid << " from " << state.boot_uuid;
@@ -73,9 +77,15 @@
     rotate = true;
   }
 
+
+  // Did the unreliable timestamps change?
   if (!reliable) {
     if (state.oldest_remote_unreliable_monotonic_timestamp >
         monotonic_remote_time) {
+      VLOG(1) << filename() << " Remote " << remote_node_index
+              << " oldest_remote_unreliable_monotonic_timestamp updated from "
+              << state.oldest_remote_unreliable_monotonic_timestamp << " to "
+              << monotonic_remote_time;
       state.oldest_remote_unreliable_monotonic_timestamp =
           monotonic_remote_time;
       state.oldest_local_unreliable_monotonic_timestamp = monotonic_event_time;
@@ -83,7 +93,12 @@
     }
   }
 
+  // Did any of the timestamps change?
   if (state.oldest_remote_monotonic_timestamp > monotonic_remote_time) {
+    VLOG(1) << filename() << " Remote " << remote_node_index
+            << " oldest_remote_monotonic_timestamp updated from "
+            << state.oldest_remote_monotonic_timestamp << " to "
+            << monotonic_remote_time;
     state.oldest_remote_monotonic_timestamp = monotonic_remote_time;
     state.oldest_local_monotonic_timestamp = monotonic_event_time;
     rotate = true;
@@ -97,7 +112,7 @@
 void NewDataWriter::QueueMessage(flatbuffers::FlatBufferBuilder *fbb,
                                  const UUID &source_node_boot_uuid,
                                  aos::monotonic_clock::time_point now) {
-  // TODO(austin): Handle remote nodes changing too, not just the source node.
+  // Trigger a reboot if we detect the boot UUID change.
   if (state_[node_index_].boot_uuid != source_node_boot_uuid) {
     state_[node_index_].boot_uuid = source_node_boot_uuid;
     if (header_written_) {
@@ -106,6 +121,16 @@
 
     QueueHeader(MakeHeader());
   }
+
+  // If the start time has changed for this node, trigger a rotation.
+  if (log_namer_->monotonic_start_time(node_index_, source_node_boot_uuid) !=
+           monotonic_start_time_) {
+    CHECK(header_written_);
+    Rotate();
+  }
+
+  CHECK_EQ(log_namer_->monotonic_start_time(node_index_, source_node_boot_uuid),
+           monotonic_start_time_);
   CHECK_EQ(state_[node_index_].boot_uuid, source_node_boot_uuid);
   CHECK(header_written_) << ": Attempting to write message before header to "
                          << writer->filename();
@@ -139,11 +164,17 @@
     reopen_(this);
   }
 
+  VLOG(1) << "Writing to " << filename() << " "
+          << aos::FlatbufferToJson(
+                 header, {.multi_line = false, .max_vector_size = 100});
+
   // TODO(austin): This triggers a dummy allocation that we don't need as part
   // of releasing.  Can we skip it?
   CHECK(writer);
   writer->QueueSizedFlatbuffer(header.Release());
   header_written_ = true;
+  monotonic_start_time_ = log_namer_->monotonic_start_time(
+      node_index_, state_[node_index_].boot_uuid);
 }
 
 void NewDataWriter::Close() {
@@ -153,9 +184,20 @@
   header_written_ = false;
 }
 
+LogNamer::NodeState *LogNamer::GetNodeState(size_t node_index,
+                                            const UUID &boot_uuid) {
+  auto it = node_states_.find(std::make_pair(node_index, boot_uuid));
+  if (it == node_states_.end()) {
+    it =
+        node_states_.emplace(std::make_pair(node_index, boot_uuid), NodeState())
+            .first;
+  }
+  return &it->second;
+}
+
 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
     size_t node_index, const std::vector<NewDataWriter::State> &state,
-    const UUID &parts_uuid, int parts_index) const {
+    const UUID &parts_uuid, int parts_index) {
   const UUID &source_node_boot_uuid = state[node_index].boot_uuid;
   const Node *const source_node =
       configuration::GetNode(configuration_, node_index);
@@ -281,27 +323,26 @@
   log_file_header_builder.add_max_out_of_order_duration(
       header_.message().max_out_of_order_duration());
 
+  NodeState *node_state = GetNodeState(node_index, source_node_boot_uuid);
   log_file_header_builder.add_monotonic_start_time(
       std::chrono::duration_cast<std::chrono::nanoseconds>(
-          node_states_[node_index].monotonic_start_time.time_since_epoch())
+          node_state->monotonic_start_time.time_since_epoch())
           .count());
   if (source_node == node_) {
     log_file_header_builder.add_realtime_start_time(
         std::chrono::duration_cast<std::chrono::nanoseconds>(
-            node_states_[node_index].realtime_start_time.time_since_epoch())
+            node_state->realtime_start_time.time_since_epoch())
             .count());
   } else {
     // Fill out the legacy start times.  Since these were implemented to never
     // change on reboot, they aren't very helpful in tracking what happened.
     log_file_header_builder.add_logger_monotonic_start_time(
         std::chrono::duration_cast<std::chrono::nanoseconds>(
-            node_states_[node_index]
-                .logger_monotonic_start_time.time_since_epoch())
+            node_state->logger_monotonic_start_time.time_since_epoch())
             .count());
     log_file_header_builder.add_logger_realtime_start_time(
         std::chrono::duration_cast<std::chrono::nanoseconds>(
-            node_states_[node_index]
-                .logger_realtime_start_time.time_since_epoch())
+            node_state->logger_realtime_start_time.time_since_epoch())
             .count());
   }
 
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index c2c8dd1..f1f1829 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -7,6 +7,7 @@
 #include <string_view>
 #include <vector>
 
+#include "absl/container/btree_map.h"
 #include "aos/events/logging/logfile_utils.h"
 #include "aos/events/logging/logger_generated.h"
 #include "aos/uuid.h"
@@ -22,6 +23,11 @@
 //
 // Class to manage writing data to log files.  This lets us track which boot the
 // written header has in it, and if the header has been written or not.
+//
+// The design of this class is that instead of being notified when any of the
+// header data changes, it polls and owns that decision.  This makes it much
+// harder to write corrupted data.  If that becomes a performance problem, we
+// can DCHECK and take it out of production binaries.
 class NewDataWriter {
  public:
   // Constructs a NewDataWriter.
@@ -54,7 +60,9 @@
                     aos::monotonic_clock::time_point now);
 
   // Returns the filename of the writer.
-  std::string_view filename() const { return writer->filename(); }
+  std::string_view filename() const {
+    return writer ? writer->filename() : "(closed)";
+  }
 
   void Close();
 
@@ -96,6 +104,8 @@
 
   aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader();
 
+  monotonic_clock::time_point monotonic_start_time_ = monotonic_clock::min_time;
+
   const Node *const node_ = nullptr;
   const size_t node_index_ = 0;
   LogNamer *log_namer_;
@@ -121,7 +131,6 @@
         node_(node),
         logger_node_index_(configuration::GetNodeIndex(configuration_, node_)) {
     nodes_.emplace_back(node_);
-    node_states_.resize(configuration::NodesCount(configuration_));
   }
   virtual ~LogNamer() {}
 
@@ -180,45 +189,35 @@
         UUID::FromString(header_.message().logger_node_boot_uuid());
   }
 
-  void SetStartTimes(size_t node_index,
+  void ClearStartTimes() {
+    node_states_.clear();
+  }
+
+  void SetStartTimes(size_t node_index, const UUID &boot_uuid,
                      monotonic_clock::time_point monotonic_start_time,
                      realtime_clock::time_point realtime_start_time,
                      monotonic_clock::time_point logger_monotonic_start_time,
                      realtime_clock::time_point logger_realtime_start_time) {
-    node_states_[node_index].monotonic_start_time = monotonic_start_time;
-    node_states_[node_index].realtime_start_time = realtime_start_time;
-    node_states_[node_index].logger_monotonic_start_time =
-        logger_monotonic_start_time;
-    node_states_[node_index].logger_realtime_start_time =
-        logger_realtime_start_time;
-
-    // TODO(austin): Track that the header has changed and needs to be
-    // rewritten down here rather than up in log_writer.
+    VLOG(1) << "Setting node " << node_index << " to start time "
+            << monotonic_start_time << " rt " << realtime_start_time << " UUID "
+            << boot_uuid;
+    NodeState *node_state = GetNodeState(node_index, boot_uuid);
+    node_state->monotonic_start_time = monotonic_start_time;
+    node_state->realtime_start_time = realtime_start_time;
+    node_state->logger_monotonic_start_time = logger_monotonic_start_time;
+    node_state->logger_realtime_start_time = logger_realtime_start_time;
   }
 
-  monotonic_clock::time_point monotonic_start_time(size_t node_index) const {
-    return node_states_[node_index].monotonic_start_time;
+  monotonic_clock::time_point monotonic_start_time(size_t node_index,
+                                                   const UUID &boot_uuid) {
+    DCHECK_NE(boot_uuid, UUID::Zero());
+
+    NodeState *node_state = GetNodeState(node_index, boot_uuid);
+    return node_state->monotonic_start_time;
   }
 
  protected:
-  // Creates a new header by copying fields out of the template and combining
-  // them with the arguments provided.
-  aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
-      size_t node_index, const std::vector<NewDataWriter::State> &state,
-      const UUID &parts_uuid, int parts_index) const;
-
-  EventLoop *event_loop_;
-  const Configuration *const configuration_;
-  const Node *const node_;
-  const size_t logger_node_index_;
-  UUID logger_node_boot_uuid_;
-  std::vector<const Node *> nodes_;
-
-  friend NewDataWriter;
-
   // Structure with state per node about times and such.
-  // TODO(austin): some of this lives better in NewDataWriter once we move
-  // ownership of deciding when to write headers into LogNamer.
   struct NodeState {
     // Time when this node started logging.
     monotonic_clock::time_point monotonic_start_time =
@@ -231,7 +230,28 @@
     realtime_clock::time_point logger_realtime_start_time =
         realtime_clock::min_time;
   };
-  std::vector<NodeState> node_states_;
+
+  // Creates a new header by copying fields out of the template and combining
+  // them with the arguments provided.
+  aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
+      size_t node_index, const std::vector<NewDataWriter::State> &state,
+      const UUID &parts_uuid, int parts_index);
+
+  EventLoop *event_loop_;
+  const Configuration *const configuration_;
+  const Node *const node_;
+  const size_t logger_node_index_;
+  UUID logger_node_boot_uuid_;
+  std::vector<const Node *> nodes_;
+
+  friend NewDataWriter;
+
+  // Returns the start/stop time state structure for a node and boot.  We can
+  // have data from multiple boots, and it makes sense to reuse the start/stop
+  // times if we get data from the same boot again.
+  NodeState *GetNodeState(size_t node_index, const UUID &boot_uuid);
+
+  absl::btree_map<std::pair<size_t, UUID>, NodeState> node_states_;
 
   aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> header_ =
       aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index b320474..a1269e1 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -49,14 +49,6 @@
 namespace logger {
 namespace {
 
-std::string LogFileVectorToString(std::vector<LogFile> log_files) {
-  std::stringstream ss;
-  for (const auto &f : log_files) {
-    ss << f << "\n";
-  }
-  return ss.str();
-}
-
 // Copies the channel, removing the schema as we go.  If new_name is provided,
 // it is used instead of the name inside the channel.  If new_type is provided,
 // it is used instead of the type in the channel.
@@ -230,7 +222,8 @@
 
   if (!configuration::MultiNode(configuration())) {
     states_.emplace_back(std::make_unique<State>(
-        std::make_unique<TimestampMapper>(FilterPartsForNode(log_files_, ""))));
+        std::make_unique<TimestampMapper>(FilterPartsForNode(log_files_, "")),
+        nullptr));
   } else {
     if (replay_configuration) {
       CHECK_EQ(logged_configuration()->nodes()->size(),
@@ -295,19 +288,82 @@
   return state->realtime_start_time(0);
 }
 
+void LogReader::OnStart(std::function<void()> fn) {
+  CHECK(!configuration::MultiNode(configuration()));
+  OnStart(nullptr, std::move(fn));
+}
+
+void LogReader::OnStart(const Node *node, std::function<void()> fn) {
+  const int node_index = configuration::GetNodeIndex(configuration(), node);
+  CHECK_GE(node_index, 0);
+  CHECK_LT(node_index, static_cast<int>(states_.size()));
+  State *state = states_[node_index].get();
+  CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
+
+  state->OnStart(std::move(fn));
+}
+
+void LogReader::State::OnStart(std::function<void()> fn) {
+  on_starts_.emplace_back(std::move(fn));
+}
+
+void LogReader::State::RunOnStart() {
+  SetRealtimeOffset(monotonic_start_time(boot_count()),
+                    realtime_start_time(boot_count()));
+
+  VLOG(1) << "Starting " << MaybeNodeName(node()) << "at time "
+          << monotonic_start_time(boot_count());
+  for (size_t i = 0; i < on_starts_.size(); ++i) {
+    on_starts_[i]();
+  }
+  stopped_ = false;
+  started_ = true;
+}
+
+void LogReader::OnEnd(std::function<void()> fn) {
+  CHECK(!configuration::MultiNode(configuration()));
+  OnEnd(nullptr, std::move(fn));
+}
+
+void LogReader::OnEnd(const Node *node, std::function<void()> fn) {
+  const int node_index = configuration::GetNodeIndex(configuration(), node);
+  CHECK_GE(node_index, 0);
+  CHECK_LT(node_index, static_cast<int>(states_.size()));
+  State *state = states_[node_index].get();
+  CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
+
+  state->OnEnd(std::move(fn));
+}
+
+void LogReader::State::OnEnd(std::function<void()> fn) {
+  on_ends_.emplace_back(std::move(fn));
+}
+
+void LogReader::State::RunOnEnd() {
+  VLOG(1) << "Ending " << MaybeNodeName(node()) << "at time "
+          << monotonic_start_time(boot_count());
+  for (size_t i = 0; i < on_ends_.size(); ++i) {
+    on_ends_[i]();
+  }
+
+  stopped_ = true;
+  started_ = false;
+}
+
 void LogReader::Register() {
   event_loop_factory_unique_ptr_ =
       std::make_unique<SimulatedEventLoopFactory>(configuration());
   Register(event_loop_factory_unique_ptr_.get());
 }
 
-void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
+void LogReader::RegisterWithoutStarting(
+    SimulatedEventLoopFactory *event_loop_factory) {
   event_loop_factory_ = event_loop_factory;
   remapped_configuration_ = event_loop_factory_->configuration();
   filters_ =
       std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
           event_loop_factory_->configuration(), logged_configuration(),
-          FLAGS_skip_order_validation,
+          log_files_[0].boots, FLAGS_skip_order_validation,
           chrono::duration_cast<chrono::nanoseconds>(
               chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
 
@@ -318,29 +374,14 @@
     std::vector<LogParts> filtered_parts = FilterPartsForNode(
         log_files_, node != nullptr ? node->name()->string_view() : "");
 
-    // Confirm that all the parts are from the same boot if there are enough
-    // parts to not be from the same boot.
-    if (filtered_parts.size() > 1u) {
-      for (size_t i = 1; i < filtered_parts.size(); ++i) {
-        CHECK_EQ(filtered_parts[i].source_boot_uuid,
-                 filtered_parts[0].source_boot_uuid)
-            << ": Found parts from different boots for node "
-            << node->name()->string_view() << " "
-            << LogFileVectorToString(log_files_);
-      }
-      if (!filtered_parts[0].source_boot_uuid.empty()) {
-        event_loop_factory_->GetNodeEventLoopFactory(node)->set_boot_uuid(
-            filtered_parts[0].source_boot_uuid);
-      }
-    }
-
     states_[node_index] = std::make_unique<State>(
         filtered_parts.size() == 0u
             ? nullptr
-            : std::make_unique<TimestampMapper>(std::move(filtered_parts)));
+            : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
+        node);
     State *state = states_[node_index].get();
-    state->set_event_loop(state->SetNodeEventLoopFactory(
-        event_loop_factory_->GetNodeEventLoopFactory(node)));
+    state->SetNodeEventLoopFactory(
+        event_loop_factory_->GetNodeEventLoopFactory(node));
 
     state->SetChannelCount(logged_configuration()->channels()->size());
     timestamp_mappers.emplace_back(state->timestamp_mapper());
@@ -372,7 +413,22 @@
         configuration::GetNodeIndex(configuration(), node);
     State *state = states_[node_index].get();
 
-    Register(state->event_loop());
+    // If we didn't find any log files with data in them, we won't ever get a
+    // callback or be live.  So skip the rest of the setup.
+    if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+      continue;
+    }
+    ++live_nodes_;
+
+    NodeEventLoopFactory *node_factory =
+        event_loop_factory_->GetNodeEventLoopFactory(node);
+    node_factory->OnStartup([this, state, node]() {
+      RegisterDuringStartup(state->MakeEventLoop(), node);
+    });
+    node_factory->OnShutdown([this, state, node]() {
+      RegisterDuringStartup(nullptr, node);
+      state->DestroyEventLoop();
+    });
   }
 
   if (live_nodes_ == 0) {
@@ -387,34 +443,6 @@
     state->SeedSortedMessages();
   }
 
-  // We want to start the log file at the last start time of the log files
-  // from all the nodes.  Compute how long each node's simulation needs to run
-  // to move time to this point.
-  distributed_clock::time_point start_time = distributed_clock::min_time;
-
-  // TODO(austin): We want an "OnStart" callback for each node rather than
-  // running until the last node.
-
-  for (std::unique_ptr<State> &state : states_) {
-    VLOG(1) << "Start time is " << state->monotonic_start_time(0)
-            << " for node " << MaybeNodeName(state->event_loop()->node())
-            << "now " << state->monotonic_now();
-    if (state->monotonic_start_time(0) == monotonic_clock::min_time) {
-      continue;
-    }
-    // And start computing the start time on the distributed clock now that
-    // that works.
-    start_time = std::max(
-        start_time, state->ToDistributedClock(state->monotonic_start_time(0)));
-  }
-
-  // TODO(austin): If a node doesn't have a start time, we might not queue
-  // enough.  If this happens, we'll explode with a frozen error eventually.
-
-  CHECK_GE(start_time, distributed_clock::epoch())
-      << ": Hmm, we have a node starting before the start of time.  Offset "
-         "everything.";
-
   // Forwarding is tracked per channel.  If it is enabled, we want to turn it
   // off.  Otherwise messages replayed will get forwarded across to the other
   // nodes, and also replayed on the other nodes.  This may not satisfy all
@@ -429,7 +457,7 @@
           states_[configuration::GetNodeIndex(configuration(), node)].get();
 
       const Channel *remapped_channel =
-          RemapChannel(state->event_loop(), channel);
+          RemapChannel(state->event_loop(), node, channel);
 
       event_loop_factory_->DisableForwarding(remapped_channel);
     }
@@ -438,6 +466,37 @@
     // from both the real message bridge and simulated message bridge.
     event_loop_factory_->DisableStatistics();
   }
+}
+
+void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
+  RegisterWithoutStarting(event_loop_factory);
+  // We want to start the log file at the last start time of the log files
+  // from all the nodes.  Compute how long each node's simulation needs to run
+  // to move time to this point.
+  distributed_clock::time_point start_time = distributed_clock::min_time;
+
+  // TODO(austin): We want an "OnStart" callback for each node rather than
+  // running until the last node.
+
+  for (std::unique_ptr<State> &state : states_) {
+    VLOG(1) << "Start time is " << state->monotonic_start_time(0)
+            << " for node " << MaybeNodeName(state->node()) << "now "
+            << state->monotonic_now();
+    if (state->monotonic_start_time(0) == monotonic_clock::min_time) {
+      continue;
+    }
+    // And start computing the start time on the distributed clock now that
+    // that works.
+    start_time = std::max(
+        start_time, state->ToDistributedClock(state->monotonic_start_time(0)));
+  }
+
+  // TODO(austin): If a node doesn't have a start time, we might not queue
+  // enough.  If this happens, we'll explode with a frozen error eventually.
+
+  CHECK_GE(start_time, distributed_clock::epoch())
+      << ": Hmm, we have a node starting before the start of time.  Offset "
+         "everything.";
 
   // While we are starting the system up, we might be relying on matching data
   // to timestamps on log files where the timestamp log file starts before the
@@ -478,23 +537,53 @@
 }
 
 void LogReader::Register(EventLoop *event_loop) {
+  Register(event_loop, event_loop->node());
+}
+
+void LogReader::Register(EventLoop *event_loop, const Node *node) {
   State *state =
-      states_[configuration::GetNodeIndex(configuration(), event_loop->node())]
-          .get();
+      states_[configuration::GetNodeIndex(configuration(), node)].get();
+
+  // If we didn't find any log files with data in them, we won't ever get a
+  // callback or be live.  So skip the rest of the setup.
+  if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+    return;
+  }
+  ++live_nodes_;
+
+  if (event_loop_factory_ != nullptr) {
+    event_loop_factory_->GetNodeEventLoopFactory(node)->OnStartup(
+        [this, event_loop, node]() {
+          RegisterDuringStartup(event_loop, node);
+        });
+  } else {
+    RegisterDuringStartup(event_loop, node);
+  }
+}
+
+void LogReader::RegisterDuringStartup(EventLoop *event_loop, const Node *node) {
+  if (event_loop) {
+    CHECK(event_loop->configuration() == configuration());
+  }
+
+  State *state =
+      states_[configuration::GetNodeIndex(configuration(), node)].get();
 
   state->set_event_loop(event_loop);
 
   // We don't run timing reports when trying to print out logged data, because
   // otherwise we would end up printing out the timing reports themselves...
   // This is only really relevant when we are replaying into a simulation.
-  event_loop->SkipTimingReport();
-  event_loop->SkipAosLog();
+  if (event_loop) {
+    event_loop->SkipTimingReport();
+    event_loop->SkipAosLog();
+  }
 
   for (size_t logged_channel_index = 0;
        logged_channel_index < logged_configuration()->channels()->size();
        ++logged_channel_index) {
     const Channel *channel = RemapChannel(
-        event_loop,
+        event_loop, node,
         logged_configuration()->channels()->Get(logged_channel_index));
 
     if (channel->logger() == LoggerConfig::NOT_LOGGED) {
@@ -502,45 +591,58 @@
     }
 
     message_bridge::NoncausalOffsetEstimator *filter = nullptr;
-    RemoteMessageSender *remote_timestamp_sender = nullptr;
 
     State *source_state = nullptr;
-
-    if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
-        configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
-      // We've got a message which is being forwarded to this node.
+    if (!configuration::ChannelIsSendableOnNode(channel, node) &&
+        configuration::ChannelIsReadableOnNode(channel, node)) {
       const Node *source_node = configuration::GetNode(
-          event_loop->configuration(), channel->source_node()->string_view());
-      filter = GetFilter(event_loop->node(), source_node);
+          configuration(), channel->source_node()->string_view());
 
-      // Delivery timestamps are supposed to be logged back on the source node.
-      // Configure remote timestamps to be sent.
-      const Connection *connection =
-          configuration::ConnectionToNode(channel, event_loop->node());
-      const bool delivery_time_is_logged =
-          configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
-                                                              source_node);
+      // We've got a message which is being forwarded to this node.
+      filter = GetFilter(node, source_node);
 
       source_state =
           states_[configuration::GetNodeIndex(configuration(), source_node)]
               .get();
-
-      if (delivery_time_is_logged) {
-        remote_timestamp_sender =
-            source_state->RemoteTimestampSender(channel, connection);
-      }
     }
 
-    state->SetChannel(
-        logged_channel_index,
-        configuration::ChannelIndex(event_loop->configuration(), channel),
-        event_loop->MakeRawSender(channel), filter, remote_timestamp_sender,
-        source_state);
+    // We are the source, and it is forwarded.
+    const bool is_forwarded =
+        configuration::ChannelIsSendableOnNode(channel, node) &&
+        configuration::ConnectionCount(channel);
+
+    state->SetChannel(logged_channel_index,
+                      configuration::ChannelIndex(configuration(), channel),
+                      event_loop ? event_loop->MakeRawSender(channel) : nullptr,
+                      filter, is_forwarded, source_state);
+
+    if (is_forwarded) {
+      const Node *source_node = configuration::GetNode(
+          configuration(), channel->source_node()->string_view());
+
+      for (const Connection *connection : *channel->destination_nodes()) {
+        const bool delivery_time_is_logged =
+            configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
+                                                                source_node);
+
+        if (delivery_time_is_logged) {
+          State *destination_state =
+              states_[configuration::GetNodeIndex(
+                          configuration(), connection->name()->string_view())]
+                  .get();
+          destination_state->SetRemoteTimestampSender(
+              logged_channel_index,
+              event_loop ? state->RemoteTimestampSender(channel, connection)
+                         : nullptr);
+        }
+      }
+    }
   }
 
-  // If we didn't find any log files with data in them, we won't ever get a
-  // callback or be live.  So skip the rest of the setup.
-  if (state->OldestMessageTime() == monotonic_clock::max_time) {
+  if (!event_loop) {
+    state->ClearRemoteTimestampSenders();
+    state->set_timer_handler(nullptr);
+    state->set_startup_timer(nullptr);
     return;
   }
 
@@ -548,7 +650,7 @@
     VLOG(1) << "Starting sending " << MaybeNodeName(state->event_loop()->node())
             << "at " << state->event_loop()->context().monotonic_event_time
             << " now " << state->monotonic_now();
-    if (state->OldestMessageTime() == monotonic_clock::max_time) {
+    if (state->OldestMessageTime() == BootTimestamp::max_time()) {
       --live_nodes_;
       VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
       if (exit_on_finish_ && live_nodes_ == 0) {
@@ -558,9 +660,9 @@
     }
 
     TimestampedMessage timestamped_message = state->PopOldest();
-    CHECK_EQ(timestamped_message.monotonic_event_time.boot, 0u);
-    CHECK_EQ(timestamped_message.monotonic_remote_time.boot, 0u);
-    CHECK_EQ(timestamped_message.monotonic_timestamp_time.boot, 0u);
+
+    CHECK_EQ(timestamped_message.monotonic_event_time.boot,
+             state->boot_count());
 
     const monotonic_clock::time_point monotonic_now =
         state->event_loop()->context().monotonic_event_time;
@@ -570,7 +672,8 @@
           << monotonic_now << " trying to send "
           << timestamped_message.monotonic_event_time << " failure "
           << state->DebugString();
-    } else if (BootTimestamp{.boot = 0u, .time = monotonic_now} !=
+    } else if (BootTimestamp{.boot = state->boot_count(),
+                             .time = monotonic_now} !=
                timestamped_message.monotonic_event_time) {
       LOG(WARNING) << "Check failed: monotonic_now == "
                       "timestamped_message.monotonic_event_time) ("
@@ -597,10 +700,13 @@
           // happen after the effect even though we know they are at the same
           // time.  I doubt anyone will notice for a bit, but we should really
           // fix that.
+          BootTimestamp monotonic_remote_now =
+              state->monotonic_remote_now(timestamped_message.channel_index);
           if (!FLAGS_skip_order_validation) {
-            CHECK_LE(
-                timestamped_message.monotonic_remote_time.time,
-                state->monotonic_remote_now(timestamped_message.channel_index))
+            CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
+                     monotonic_remote_now.boot);
+            CHECK_LE(timestamped_message.monotonic_remote_time,
+                     monotonic_remote_now)
                 << state->event_loop()->node()->name()->string_view() << " to "
                 << state->remote_node(timestamped_message.channel_index)
                        ->name()
@@ -610,9 +716,13 @@
                        logged_configuration()->channels()->Get(
                            timestamped_message.channel_index))
                 << " " << state->DebugString();
-          } else if (timestamped_message.monotonic_remote_time.time >
-                     state->monotonic_remote_now(
-                         timestamped_message.channel_index)) {
+          } else if (monotonic_remote_now.boot !=
+                     timestamped_message.monotonic_remote_time.boot) {
+            LOG(WARNING) << "Missmatched boots, " << monotonic_remote_now.boot
+                         << " vs "
+                         << timestamped_message.monotonic_remote_time.boot;
+          } else if (timestamped_message.monotonic_remote_time >
+                     monotonic_remote_now) {
             LOG(WARNING)
                 << "Check failed: timestamped_message.monotonic_remote_time < "
                    "state->monotonic_remote_now(timestamped_message.channel_"
@@ -665,7 +775,8 @@
                 << timestamped_message.channel_index << ", "
                 << configuration::CleanedChannelToString(
                        logged_configuration()->channels()->Get(
-                           timestamped_message.channel_index));
+                           timestamped_message.channel_index))
+                << " " << timestamped_message;
 
         // The user might be working with log files from 1 node but forgot to
         // configure the infrastructure to log data for a remote channel on that
@@ -713,7 +824,7 @@
         // rest.  It is confusing when part of your data gets replayed but not
         // all.  Read the rest of the messages and drop them on the floor while
         // doing some basic validation.
-        while (state->OldestMessageTime() != monotonic_clock::max_time) {
+        while (state->OldestMessageTime() != BootTimestamp::max_time()) {
           TimestampedMessage next = state->PopOldest();
           // Make sure that once we have seen the last message on a channel,
           // data doesn't start back up again.  If the user wants to play
@@ -727,8 +838,9 @@
               LOG(FATAL)
                   << "Found missing data in the middle of the log file on "
                      "channel "
-                  << next.channel_index << " Last "
-                  << last_message[next.channel_index] << state->DebugString();
+                  << next.channel_index << " " << next << " Last "
+                  << last_message[next.channel_index] << " "
+                  << state->DebugString();
             }
           }
         }
@@ -745,16 +857,26 @@
                           {.multi_line = false, .max_vector_size = 100});
     }
 
-    const monotonic_clock::time_point next_time = state->OldestMessageTime();
-    if (next_time != monotonic_clock::max_time) {
+    const BootTimestamp next_time = state->OldestMessageTime();
+    if (next_time != BootTimestamp::max_time()) {
+      if (next_time.boot != state->boot_count()) {
+        VLOG(1) << "Next message for "
+                << MaybeNodeName(state->event_loop()->node())
+                << "is on the next boot, " << next_time << " now is "
+                << state->monotonic_now();
+        CHECK(event_loop_factory_);
+        state->RunOnEnd();
+        return;
+      }
       VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
-              << "wakeup for " << next_time << "("
-              << state->ToDistributedClock(next_time)
+              << "wakeup for " << next_time.time << "("
+              << state->ToDistributedClock(next_time.time)
               << " distributed), now is " << state->monotonic_now();
-      state->Setup(next_time);
+      state->Setup(next_time.time);
     } else {
       VLOG(1) << MaybeNodeName(state->event_loop()->node())
               << "No next message, scheduling shutdown";
+      state->RunOnEnd();
       // Set a timer up immediately after now to die. If we don't do this,
       // then the senders waiting on the message we just read will never get
       // called.
@@ -769,10 +891,15 @@
             << state->monotonic_now();
   }));
 
-  ++live_nodes_;
-
-  if (state->OldestMessageTime() != monotonic_clock::max_time) {
-    event_loop->OnRun([state]() { state->Setup(state->OldestMessageTime()); });
+  if (state->OldestMessageTime() != BootTimestamp::max_time()) {
+    state->set_startup_timer(
+        event_loop->AddTimer([state]() { state->RunOnStart(); }));
+    event_loop->OnRun([state]() {
+      BootTimestamp next_time = state->OldestMessageTime();
+      CHECK_EQ(next_time.boot, state->boot_count());
+      state->Setup(next_time.time);
+      state->SetupStartupTimer();
+    });
   }
 }
 
@@ -1101,6 +1228,7 @@
 }
 
 const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
+                                       const Node *node,
                                        const Channel *channel) {
   std::string_view channel_name = channel->name()->string_view();
   std::string_view channel_type = channel->type()->string_view();
@@ -1115,8 +1243,8 @@
 
   VLOG(2) << "Going to remap channel " << channel_name << " " << channel_type;
   const Channel *remapped_channel = configuration::GetChannel(
-      event_loop->configuration(), channel_name, channel_type,
-      event_loop->name(), event_loop->node());
+      configuration(), channel_name, channel_type,
+      event_loop ? event_loop->name() : "log_reader", node);
 
   CHECK(remapped_channel != nullptr)
       << ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \""
@@ -1125,8 +1253,9 @@
   return remapped_channel;
 }
 
-LogReader::State::State(std::unique_ptr<TimestampMapper> timestamp_mapper)
-    : timestamp_mapper_(std::move(timestamp_mapper)) {}
+LogReader::State::State(std::unique_ptr<TimestampMapper> timestamp_mapper,
+                        const Node *node)
+    : timestamp_mapper_(std::move(timestamp_mapper)), node_(node) {}
 
 void LogReader::State::AddPeer(State *peer) {
   if (timestamp_mapper_ && peer->timestamp_mapper_) {
@@ -1134,12 +1263,9 @@
   }
 }
 
-EventLoop *LogReader::State::SetNodeEventLoopFactory(
+void LogReader::State::SetNodeEventLoopFactory(
     NodeEventLoopFactory *node_event_loop_factory) {
   node_event_loop_factory_ = node_event_loop_factory;
-  event_loop_unique_ptr_ =
-      node_event_loop_factory_->MakeEventLoop("log_reader");
-  return event_loop_unique_ptr_.get();
 }
 
 void LogReader::State::SetChannelCount(size_t count) {
@@ -1151,22 +1277,23 @@
   queue_index_map_.resize(count);
 }
 
+void LogReader::State::SetRemoteTimestampSender(
+    size_t logged_channel_index, RemoteMessageSender *remote_timestamp_sender) {
+  remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
+}
+
 void LogReader::State::SetChannel(
     size_t logged_channel_index, size_t factory_channel_index,
     std::unique_ptr<RawSender> sender,
-    message_bridge::NoncausalOffsetEstimator *filter,
-    RemoteMessageSender *remote_timestamp_sender, State *source_state) {
+    message_bridge::NoncausalOffsetEstimator *filter, bool is_forwarded,
+    State *source_state) {
   channels_[logged_channel_index] = std::move(sender);
   filters_[logged_channel_index] = filter;
-  remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
+  channel_source_state_[logged_channel_index] = source_state;
 
-  if (source_state) {
-    channel_source_state_[logged_channel_index] = source_state;
-
-    if (remote_timestamp_sender != nullptr) {
-      source_state->queue_index_map_[logged_channel_index] =
-          std::make_unique<std::vector<State::ContiguousSentTimestamp>>();
-    }
+  if (is_forwarded) {
+    queue_index_map_[logged_channel_index] =
+        std::make_unique<std::vector<State::ContiguousSentTimestamp>>();
   }
 
   factory_channel_index_[logged_channel_index] = factory_channel_index;
@@ -1174,12 +1301,14 @@
 
 bool LogReader::State::Send(const TimestampedMessage &timestamped_message) {
   aos::RawSender *sender = channels_[timestamped_message.channel_index].get();
+  CHECK(sender);
   uint32_t remote_queue_index = 0xffffffff;
 
   if (remote_timestamp_senders_[timestamped_message.channel_index] != nullptr) {
+    State *source_state =
+        CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index]);
     std::vector<ContiguousSentTimestamp> *queue_index_map = CHECK_NOTNULL(
-        CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index])
-            ->queue_index_map_[timestamped_message.channel_index]
+        source_state->queue_index_map_[timestamped_message.channel_index]
             .get());
 
     struct SentTimestamp {
@@ -1187,10 +1316,11 @@
       uint32_t queue_index;
     } search;
 
-    CHECK_EQ(timestamped_message.monotonic_remote_time.boot, 0u);
+    CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
+             source_state->boot_count());
     search.monotonic_event_time =
         timestamped_message.monotonic_remote_time.time;
-    search.queue_index = timestamped_message.remote_queue_index;
+    search.queue_index = timestamped_message.remote_queue_index.index;
 
     // Find the sent time if available.
     auto element = std::lower_bound(
@@ -1220,28 +1350,30 @@
     // other node isn't done yet.  So there is no send time, but there is a
     // receive time.
     if (element != queue_index_map->end()) {
-      CHECK_EQ(timestamped_message.monotonic_remote_time.boot, 0u);
+      CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
+               source_state->boot_count());
 
       CHECK_GE(timestamped_message.monotonic_remote_time.time,
                element->starting_monotonic_event_time);
       CHECK_LE(timestamped_message.monotonic_remote_time.time,
                element->ending_monotonic_event_time);
-      CHECK_GE(timestamped_message.remote_queue_index,
+      CHECK_GE(timestamped_message.remote_queue_index.index,
                element->starting_queue_index);
-      CHECK_LE(timestamped_message.remote_queue_index,
+      CHECK_LE(timestamped_message.remote_queue_index.index,
                element->ending_queue_index);
 
-      remote_queue_index = timestamped_message.remote_queue_index +
+      remote_queue_index = timestamped_message.remote_queue_index.index +
                            element->actual_queue_index -
                            element->starting_queue_index;
     } else {
       VLOG(1) << "No timestamp match in the map.";
     }
+    CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
+             source_state->boot_count());
   }
 
   // Send!  Use the replayed queue index here instead of the logged queue index
   // for the remote queue index.  This makes re-logging work.
-  CHECK_EQ(timestamped_message.monotonic_remote_time.boot, 0u);
   const bool sent = sender->Send(
       timestamped_message.data.message().data()->Data(),
       timestamped_message.data.message().data()->size(),
@@ -1255,15 +1387,15 @@
   if (!sent) return false;
 
   if (queue_index_map_[timestamped_message.channel_index]) {
+    CHECK_EQ(timestamped_message.monotonic_event_time.boot, boot_count());
     if (queue_index_map_[timestamped_message.channel_index]->empty()) {
       // Nothing here, start a range with 0 length.
       ContiguousSentTimestamp timestamp;
-      CHECK_EQ(timestamped_message.monotonic_event_time.boot, 0u);
       timestamp.starting_monotonic_event_time =
           timestamp.ending_monotonic_event_time =
               timestamped_message.monotonic_event_time.time;
       timestamp.starting_queue_index = timestamp.ending_queue_index =
-          timestamped_message.queue_index;
+          timestamped_message.queue_index.index;
       timestamp.actual_queue_index = sender->sent_queue_index();
       queue_index_map_[timestamped_message.channel_index]->emplace_back(
           timestamp);
@@ -1273,20 +1405,18 @@
       ContiguousSentTimestamp *back =
           &queue_index_map_[timestamped_message.channel_index]->back();
       if ((back->starting_queue_index - back->actual_queue_index) ==
-          (timestamped_message.queue_index - sender->sent_queue_index())) {
-        back->ending_queue_index = timestamped_message.queue_index;
-        CHECK_EQ(timestamped_message.monotonic_event_time.boot, 0u);
+          (timestamped_message.queue_index.index - sender->sent_queue_index())) {
+        back->ending_queue_index = timestamped_message.queue_index.index;
         back->ending_monotonic_event_time =
             timestamped_message.monotonic_event_time.time;
       } else {
         // Otherwise, make a new one.
-        CHECK_EQ(timestamped_message.monotonic_event_time.boot, 0u);
         ContiguousSentTimestamp timestamp;
         timestamp.starting_monotonic_event_time =
             timestamp.ending_monotonic_event_time =
                 timestamped_message.monotonic_event_time.time;
         timestamp.starting_queue_index = timestamp.ending_queue_index =
-            timestamped_message.queue_index;
+            timestamped_message.queue_index.index;
         timestamp.actual_queue_index = sender->sent_queue_index();
         queue_index_map_[timestamped_message.channel_index]->emplace_back(
             timestamp);
@@ -1298,6 +1428,9 @@
     // map.
   } else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
              nullptr) {
+    State *source_state =
+        CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index]);
+
     flatbuffers::FlatBufferBuilder fbb;
     fbb.ForceDefaults(true);
     flatbuffers::Offset<flatbuffers::Vector<uint8_t>> boot_uuid_offset =
@@ -1316,7 +1449,8 @@
         sender->realtime_sent_time().time_since_epoch().count());
     message_header_builder.add_queue_index(sender->sent_queue_index());
 
-    CHECK_EQ(timestamped_message.monotonic_remote_time.boot, 0u);
+    CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
+             source_state->boot_count());
     message_header_builder.add_monotonic_remote_time(
         timestamped_message.monotonic_remote_time.time.time_since_epoch()
             .count());
@@ -1328,10 +1462,10 @@
 
     fbb.Finish(message_header_builder.Finish());
 
-    CHECK_EQ(timestamped_message.monotonic_timestamp_time.boot, 0u);
     remote_timestamp_senders_[timestamped_message.channel_index]->Send(
         FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
-        timestamped_message.monotonic_timestamp_time.time);
+        timestamped_message.monotonic_timestamp_time,
+        source_state->boot_count());
   }
 
   return true;
@@ -1362,7 +1496,7 @@
 
 void LogReader::RemoteMessageSender::Send(
     FlatbufferDetachedBuffer<RemoteMessage> remote_message,
-    monotonic_clock::time_point monotonic_timestamp_time) {
+    BootTimestamp monotonic_timestamp_time, size_t source_boot_count) {
   // There are 2 variants of logs.
   //   1) Logs without monotonic_timestamp_time
   //   2) Logs with monotonic_timestamp_time
@@ -1386,20 +1520,22 @@
   // timestamp to distinguish 2 and 3, and ignore 1.  If we don't have a
   // monotonic_timestamp_time, this means the message was logged locally and
   // remote timestamps can be ignored.
-  if (monotonic_timestamp_time == monotonic_clock::min_time) {
+  if (monotonic_timestamp_time == BootTimestamp::min_time()) {
     return;
   }
 
+  CHECK_EQ(monotonic_timestamp_time.boot, source_boot_count);
+
   remote_timestamps_.emplace(
       std::upper_bound(
           remote_timestamps_.begin(), remote_timestamps_.end(),
-          monotonic_timestamp_time,
+          monotonic_timestamp_time.time,
           [](const aos::monotonic_clock::time_point monotonic_timestamp_time,
              const Timestamp &timestamp) {
             return monotonic_timestamp_time <
                    timestamp.monotonic_timestamp_time;
           }),
-      std::move(remote_message), monotonic_timestamp_time);
+      std::move(remote_message), monotonic_timestamp_time.time);
   ScheduleTimestamp();
 }
 
@@ -1475,18 +1611,8 @@
   timestamp_mapper_->PopFront();
   SeedSortedMessages();
 
-  CHECK_EQ(result.monotonic_remote_time.boot, 0u);
+  CHECK_EQ(result.monotonic_event_time.boot, boot_count());
 
-  if (result.monotonic_remote_time.time != monotonic_clock::min_time) {
-    message_bridge::NoncausalOffsetEstimator *filter =
-        filters_[result.channel_index];
-    CHECK(filter != nullptr);
-
-    // TODO(austin): We probably want to push this down into the timestamp
-    // mapper directly.
-    // TODO(austin): This hard-codes the boot to 0.  We need to fix that.
-    filter->Pop(event_loop_->node(), {0, event_loop_->monotonic_now()});
-  }
   VLOG(1) << "Popped " << result
           << configuration::CleanedChannelToString(
                  event_loop_->configuration()->channels()->Get(
@@ -1494,18 +1620,17 @@
   return result;
 }
 
-monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
+BootTimestamp LogReader::State::OldestMessageTime() const {
   if (timestamp_mapper_ == nullptr) {
-    return monotonic_clock::max_time;
+    return BootTimestamp::max_time();
   }
   TimestampedMessage *result_ptr = timestamp_mapper_->Front();
   if (result_ptr == nullptr) {
-    return monotonic_clock::max_time;
+    return BootTimestamp::max_time();
   }
-  CHECK_EQ(result_ptr->monotonic_event_time.boot, 0u);
   VLOG(2) << MaybeNodeName(event_loop_->node()) << "oldest message at "
           << result_ptr->monotonic_event_time.time;
-  return result_ptr->monotonic_event_time.time;
+  return result_ptr->monotonic_event_time;
 }
 
 void LogReader::State::SeedSortedMessages() {
@@ -1516,6 +1641,9 @@
 }
 
 void LogReader::State::Deregister() {
+  if (started_ && !stopped_) {
+    RunOnEnd();
+  }
   for (size_t i = 0; i < channels_.size(); ++i) {
     channels_[i].reset();
   }
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 5cfb5df..8293956 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -73,6 +73,10 @@
   // below, but can be anything as long as the locations needed to send
   // everything are available.
   void Register(SimulatedEventLoopFactory *event_loop_factory);
+  // Registers all the callbacks to send the log file data out to an event loop
+  // factory.  This does not start replaying or change the current distributed
+  // time of the factory.  It does change the monotonic clocks to be right.
+  void RegisterWithoutStarting(SimulatedEventLoopFactory *event_loop_factory);
   // Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
   // and then calls Register.
   void Register();
@@ -80,6 +84,13 @@
   // only useful when replaying live.
   void Register(EventLoop *event_loop);
 
+  // Called whenever a log file starts for a node.
+  void OnStart(std::function<void()> fn);
+  void OnStart(const Node *node, std::function<void()> fn);
+  // Called whenever a log file ends for a node.
+  void OnEnd(std::function<void()> fn);
+  void OnEnd(const Node *node, std::function<void()> fn);
+
   // Unregisters the senders. You only need to call this if you separately
   // supplied an event loop or event loop factory and the lifetimes are such
   // that they need to be explicitly destroyed before the LogReader destructor
@@ -175,7 +186,11 @@
   }
 
  private:
-  const Channel *RemapChannel(const EventLoop *event_loop,
+  void Register(EventLoop *event_loop, const Node *node);
+
+  void RegisterDuringStartup(EventLoop *event_loop, const Node *node);
+
+  const Channel *RemapChannel(const EventLoop *event_loop, const Node *node,
                               const Channel *channel);
 
   // Queues at least max_out_of_order_duration_ messages into channels_.
@@ -206,7 +221,7 @@
     // send it immediately.
     void Send(
         FlatbufferDetachedBuffer<message_bridge::RemoteMessage> remote_message,
-        monotonic_clock::time_point monotonic_timestamp_time);
+        BootTimestamp monotonic_timestamp_time, size_t source_boot_count);
 
    private:
     // Handles actually sending the timestamp if we were delayed.
@@ -239,7 +254,7 @@
   // State per node.
   class State {
    public:
-    State(std::unique_ptr<TimestampMapper> timestamp_mapper);
+    State(std::unique_ptr<TimestampMapper> timestamp_mapper, const Node *node);
 
     // Connects up the timestamp mappers.
     void AddPeer(State *peer);
@@ -251,12 +266,44 @@
     TimestampedMessage PopOldest();
 
     // Returns the monotonic time of the oldest message.
-    monotonic_clock::time_point OldestMessageTime() const;
+    BootTimestamp OldestMessageTime() const;
+
+    size_t boot_count() const {
+      // If we are replaying directly into an event loop, we can't reboot.  So
+      // we will stay stuck on the 0th boot.
+      if (!node_event_loop_factory_) return 0u;
+      return node_event_loop_factory_->boot_count();
+    }
 
     // Primes the queues inside State.  Should be called before calling
     // OldestMessageTime.
     void SeedSortedMessages();
 
+    void SetupStartupTimer() {
+      const monotonic_clock::time_point start_time =
+          monotonic_start_time(boot_count());
+      if (start_time == monotonic_clock::min_time) {
+        LOG(ERROR)
+            << "No start time, skipping, please figure out when this happens";
+        RunOnStart();
+        return;
+      }
+      CHECK_GT(start_time, event_loop_->monotonic_now());
+      startup_timer_->Setup(start_time);
+    }
+
+    void set_startup_timer(TimerHandler *timer_handler) {
+      startup_timer_ = timer_handler;
+      if (startup_timer_) {
+        if (event_loop_->node() != nullptr) {
+          startup_timer_->set_name(absl::StrCat(
+              event_loop_->node()->name()->string_view(), "_startup"));
+        } else {
+          startup_timer_->set_name("startup");
+        }
+      }
+    }
+
     // Returns the starting time for this node.
     monotonic_clock::time_point monotonic_start_time(size_t boot_count) const {
       return timestamp_mapper_
@@ -271,13 +318,19 @@
 
     // Sets the node event loop factory for replaying into a
     // SimulatedEventLoopFactory.  Returns the EventLoop to use.
-    EventLoop *SetNodeEventLoopFactory(
-        NodeEventLoopFactory *node_event_loop_factory);
+    void SetNodeEventLoopFactory(NodeEventLoopFactory *node_event_loop_factory);
 
     // Sets and gets the event loop to use.
     void set_event_loop(EventLoop *event_loop) { event_loop_ = event_loop; }
     EventLoop *event_loop() { return event_loop_; }
 
+    const Node *node() const { return node_; }
+
+    void Register(EventLoop *event_loop);
+
+    void OnStart(std::function<void()> fn);
+    void OnEnd(std::function<void()> fn);
+
     // Sets the current realtime offset from the monotonic clock for this node
     // (if we are on a simulated event loop).
     void SetRealtimeOffset(monotonic_clock::time_point monotonic_time,
@@ -302,19 +355,29 @@
 
     // Returns the current time on the remote node which sends messages on
     // channel_index.
-    monotonic_clock::time_point monotonic_remote_now(size_t channel_index) {
-      return channel_source_state_[channel_index]
-          ->node_event_loop_factory_->monotonic_now();
+    BootTimestamp monotonic_remote_now(size_t channel_index) {
+      State *s = channel_source_state_[channel_index];
+      return BootTimestamp{
+          .boot = s->boot_count(),
+          .time = s->node_event_loop_factory_->monotonic_now()};
     }
 
     // Returns the start time of the remote for the provided channel.
     monotonic_clock::time_point monotonic_remote_start_time(
-        size_t boot_count,
-        size_t channel_index) {
+        size_t boot_count, size_t channel_index) {
       return channel_source_state_[channel_index]->monotonic_start_time(
           boot_count);
     }
 
+    void DestroyEventLoop() { event_loop_unique_ptr_.reset(); }
+
+    EventLoop *MakeEventLoop() {
+      CHECK(!event_loop_unique_ptr_);
+      event_loop_unique_ptr_ =
+          node_event_loop_factory_->MakeEventLoop("log_reader");
+      return event_loop_unique_ptr_.get();
+    }
+
     distributed_clock::time_point RemoteToDistributedClock(
         size_t channel_index, monotonic_clock::time_point time) {
       return channel_source_state_[channel_index]
@@ -337,15 +400,30 @@
     void SetChannel(size_t logged_channel_index, size_t factory_channel_index,
                     std::unique_ptr<RawSender> sender,
                     message_bridge::NoncausalOffsetEstimator *filter,
-                    RemoteMessageSender *remote_timestamp_sender,
-                    State *source_state);
+                    bool is_forwarded, State *source_state);
+
+    void SetRemoteTimestampSender(size_t logged_channel_index,
+                                  RemoteMessageSender *remote_timestamp_sender);
+
+    void RunOnStart();
+    void RunOnEnd();
 
     // Unregisters everything so we can destory the event loop.
+    // TODO(austin): Is this needed?  OnShutdown should be able to serve this
+    // need.
     void Deregister();
 
     // Sets the current TimerHandle for the replay callback.
     void set_timer_handler(TimerHandler *timer_handler) {
       timer_handler_ = timer_handler;
+      if (timer_handler_) {
+        if (event_loop_->node() != nullptr) {
+          timer_handler_->set_name(absl::StrCat(
+              event_loop_->node()->name()->string_view(), "_main"));
+        } else {
+          timer_handler_->set_name("main");
+        }
+      }
     }
 
     // Sets the next wakeup time on the replay callback.
@@ -364,6 +442,11 @@
       return timestamp_mapper_->DebugString();
     }
 
+    void ClearRemoteTimestampSenders() {
+      channel_timestamp_loggers_.clear();
+      timestamp_loggers_.clear();
+    }
+
    private:
     // Log file.
     std::unique_ptr<TimestampMapper> timestamp_mapper_;
@@ -408,9 +491,11 @@
     NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
     std::unique_ptr<EventLoop> event_loop_unique_ptr_;
     // Event loop.
+    const Node *node_ = nullptr;
     EventLoop *event_loop_ = nullptr;
     // And timer used to send messages.
-    TimerHandler *timer_handler_;
+    TimerHandler *timer_handler_ = nullptr;
+    TimerHandler *startup_timer_ = nullptr;
 
     // Filters (or nullptr if it isn't a forwarded channel) for each channel.
     // This corresponds to the object which is shared among all the channels
@@ -432,6 +517,12 @@
     // is the channel that timestamps are published to.
     absl::btree_map<const Channel *, std::shared_ptr<RemoteMessageSender>>
         timestamp_loggers_;
+
+    std::vector<std::function<void()>> on_starts_;
+    std::vector<std::function<void()>> on_ends_;
+
+    bool stopped_ = false;
+    bool started_ = false;
   };
 
   // Node index -> State.
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index fffc886..4e34b61 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -33,6 +33,7 @@
               ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
                     "/aos")
               : aos::Fetcher<message_bridge::ServerStatistics>()) {
+  timer_handler_->set_name("channel_poll");
   VLOG(1) << "Creating logger for " << FlatbufferToJson(node_);
 
   std::map<const Channel *, const Node *> timestamp_logger_channels;
@@ -289,9 +290,7 @@
 
   // Clear out any old timestamps in case we are re-starting logging.
   for (size_t i = 0; i < configuration::NodesCount(configuration_); ++i) {
-    log_namer_->SetStartTimes(
-        i, monotonic_clock::min_time, realtime_clock::min_time,
-        monotonic_clock::min_time, realtime_clock::min_time);
+    log_namer_->ClearStartTimes();
   }
 
   const aos::monotonic_clock::time_point fetch_time =
@@ -387,8 +386,7 @@
             node, node_index,
             server_statistics_fetcher_.context().monotonic_event_time,
             server_statistics_fetcher_.context().realtime_event_time)) {
-      VLOG(1) << "Rotating because timestamps changed";
-      log_namer_->Rotate(node);
+      VLOG(1) << "Timestamps changed on " << aos::FlatbufferToJson(node);
     }
   }
 }
@@ -398,16 +396,16 @@
     aos::monotonic_clock::time_point monotonic_start_time,
     aos::realtime_clock::time_point realtime_start_time) {
   // Bail early if the start times are already set.
-  if (log_namer_->monotonic_start_time(node_index) !=
-      monotonic_clock::min_time) {
-    return false;
-  }
-  if (node_ == node ||
-      !configuration::MultiNode(configuration_)) {
+  if (node_ == node || !configuration::MultiNode(configuration_)) {
+    if (log_namer_->monotonic_start_time(node_index,
+                                         event_loop_->boot_uuid()) !=
+        monotonic_clock::min_time) {
+      return false;
+    }
     // There are no offsets to compute for ourself, so always succeed.
-    log_namer_->SetStartTimes(node_index, monotonic_start_time,
-                              realtime_start_time, monotonic_start_time,
-                              realtime_start_time);
+    log_namer_->SetStartTimes(node_index, event_loop_->boot_uuid(),
+                              monotonic_start_time, realtime_start_time,
+                              monotonic_start_time, realtime_start_time);
     return true;
   } else if (server_statistics_fetcher_.get() != nullptr) {
     // We must be a remote node now.  Look for the connection and see if it is
@@ -432,9 +430,20 @@
         break;
       }
 
+      const UUID boot_uuid =
+          UUID::FromString(connection->boot_uuid()->string_view());
+
+      if (log_namer_->monotonic_start_time(node_index, boot_uuid) !=
+          monotonic_clock::min_time) {
+        break;
+      }
+
+      VLOG(1) << "Updating start time for "
+              << aos::FlatbufferToJson(connection);
+
       // Found it and it is connected.  Compensate and go.
       log_namer_->SetStartTimes(
-          node_index,
+          node_index, boot_uuid,
           monotonic_start_time +
               std::chrono::nanoseconds(connection->monotonic_offset()),
           realtime_start_time, monotonic_start_time, realtime_start_time);
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index e27cecb..8e536c8 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -785,6 +785,7 @@
     new_file.realtime_start_time = logs.second.realtime_start_time;
     new_file.name = logs.second.name;
     new_file.corrupted = corrupted;
+    new_file.boots = boot_counts;
     bool seen_part = false;
     std::string config_sha256;
     for (std::pair<const std::pair<std::string, std::string>, UnsortedLogParts>
diff --git a/aos/events/logging/logfile_sorting.h b/aos/events/logging/logfile_sorting.h
index 1d05dc7..857f351 100644
--- a/aos/events/logging/logfile_sorting.h
+++ b/aos/events/logging/logfile_sorting.h
@@ -102,6 +102,7 @@
   // object for log files with the same config.
   std::string config_sha256;
   std::shared_ptr<const aos::Configuration> config;
+  std::shared_ptr<const Boots> boots;
 };
 
 std::ostream &operator<<(std::ostream &stream, const LogFile &file);
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 20e38d2..9522351 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -57,8 +57,9 @@
     if (fd_ == -1 && errno == ENOSPC) {
       ran_out_of_space_ = true;
     } else {
-      PCHECK(fd_ != -1) << ": Failed to open " << filename << " for writing";
-      VLOG(1) << "Opened " << filename << " for writing";
+      PCHECK(fd_ != -1) << ": Failed to open " << this->filename()
+                        << " for writing";
+      VLOG(1) << "Opened " << this->filename() << " for writing";
     }
   }
 }
@@ -147,7 +148,7 @@
     }
   }
   fd_ = -1;
-  VLOG(1) << "Closed " << filename_;
+  VLOG(1) << "Closed " << filename();
 }
 
 void DetachedBufferWriter::Flush() {
@@ -651,7 +652,7 @@
      << ", .queue_index=" << m.queue_index
      << ", .monotonic_event_time=" << m.monotonic_event_time
      << ", .realtime_event_time=" << m.realtime_event_time;
-  if (m.remote_queue_index != 0xffffffff) {
+  if (m.remote_queue_index != BootQueueIndex::Invalid()) {
     os << ", .remote_queue_index=" << m.remote_queue_index;
   }
   if (m.monotonic_remote_time != BootTimestamp::min_time()) {
@@ -713,7 +714,9 @@
 
       messages_.insert(Message{
           .channel_index = m.value().message().channel_index(),
-          .queue_index = m.value().message().queue_index(),
+          .queue_index =
+              BootQueueIndex{.boot = parts().boot_count,
+                             .index = m.value().message().queue_index()},
           .timestamp =
               BootTimestamp{
                   .boot = parts().boot_count,
@@ -986,7 +989,7 @@
       .monotonic_event_time = m->timestamp,
       .realtime_event_time = aos::realtime_clock::time_point(
           std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
-      .remote_queue_index = 0xffffffff,
+      .remote_queue_index = BootQueueIndex::Invalid(),
       .monotonic_remote_time = BootTimestamp::min_time(),
       .realtime_remote_time = realtime_clock::min_time,
       .monotonic_timestamp_time = BootTimestamp::min_time(),
@@ -1070,7 +1073,9 @@
         .monotonic_event_time = m->timestamp,
         .realtime_event_time = aos::realtime_clock::time_point(
             std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
-        .remote_queue_index = m->data.message().remote_queue_index(),
+        .remote_queue_index =
+            BootQueueIndex{.boot = m->monotonic_remote_boot,
+                           .index = m->data.message().remote_queue_index()},
         .monotonic_remote_time =
             {m->monotonic_remote_boot,
              monotonic_clock::time_point(std::chrono::nanoseconds(
@@ -1139,8 +1144,9 @@
 Message TimestampMapper::MatchingMessageFor(const Message &message) {
   // Figure out what queue index we are looking for.
   CHECK(message.data.message().has_remote_queue_index());
-  const uint32_t remote_queue_index =
-      message.data.message().remote_queue_index();
+  const BootQueueIndex remote_queue_index =
+      BootQueueIndex{.boot = message.monotonic_remote_boot,
+                    .index = message.data.message().remote_queue_index()};
 
   CHECK(message.data.message().has_monotonic_remote_time());
   CHECK(message.data.message().has_realtime_remote_time());
@@ -1199,11 +1205,17 @@
   // The algorithm below is constant time with some assumptions.  We need there
   // to be no missing messages in the data stream.  This also assumes a queue
   // hasn't wrapped.  That is conservative, but should let us get started.
-  if (data_queue->back().queue_index - data_queue->front().queue_index + 1u ==
-      data_queue->size()) {
+  if (data_queue->back().queue_index.boot ==
+          data_queue->front().queue_index.boot &&
+      (data_queue->back().queue_index.index -
+           data_queue->front().queue_index.index + 1u ==
+       data_queue->size())) {
+    CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot);
     // Pull the data out and confirm that the timestamps match as expected.
-    Message result = std::move(
-        (*data_queue)[remote_queue_index - data_queue->front().queue_index]);
+    //
+    // TODO(austin): Move if not reliable.
+    Message result = (*data_queue)[remote_queue_index.index -
+                                   data_queue->front().queue_index.index];
 
     CHECK_EQ(result.timestamp, monotonic_remote_time)
         << ": Queue index matches, but timestamp doesn't.  Please investigate!";
@@ -1213,15 +1225,20 @@
         << ": Queue index matches, but timestamp doesn't.  Please investigate!";
     // Now drop the data off the front.  We have deduplicated timestamps, so we
     // are done.  And all the data is in order.
-    data_queue->erase(data_queue->begin(),
-                      data_queue->begin() + (1 + remote_queue_index -
-                                             data_queue->front().queue_index));
+    data_queue->erase(
+        data_queue->begin(),
+        data_queue->begin() +
+            (remote_queue_index.index - data_queue->front().queue_index.index));
     return result;
   } else {
-    auto it = std::find_if(data_queue->begin(), data_queue->end(),
-                           [remote_queue_index](const Message &m) {
-                             return m.queue_index == remote_queue_index;
-                           });
+    // TODO(austin): Binary search.
+    auto it = std::find_if(
+        data_queue->begin(), data_queue->end(),
+        [remote_queue_index,
+         remote_boot = monotonic_remote_time.boot](const Message &m) {
+          return m.queue_index == remote_queue_index &&
+                 m.timestamp.boot == remote_boot;
+        });
     if (it == data_queue->end()) {
       return Message{
           .channel_index = message.channel_index,
@@ -1241,6 +1258,8 @@
              realtime_remote_time)
         << ": Queue index matches, but timestamp doesn't.  Please investigate!";
 
+    // TODO(austin): We still go in order, so we can erase from the beginning to
+    // our iterator minus 1.  That'll keep 1 in the queue.
     data_queue->erase(it);
 
     return result;
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 56b582f..26fc74d 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -373,7 +373,10 @@
   // The channel.
   uint32_t channel_index = 0xffffffff;
   // The local queue index.
-  uint32_t queue_index = 0xffffffff;
+  // TODO(austin): Technically the boot inside queue_index is redundant with
+  // timestamp.  In practice, it is less error-prone to duplicate it.  Maybe a
+  // function to return the combined struct?
+  BootQueueIndex queue_index;
   // The local timestamp.
   BootTimestamp timestamp;
 
@@ -398,11 +401,11 @@
 struct TimestampedMessage {
   uint32_t channel_index = 0xffffffff;
 
-  uint32_t queue_index = 0xffffffff;
+  BootQueueIndex queue_index;
   BootTimestamp monotonic_event_time;
   realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
 
-  uint32_t remote_queue_index = 0xffffffff;
+  BootQueueIndex remote_queue_index;
   BootTimestamp monotonic_remote_time;
   realtime_clock::time_point realtime_remote_time = realtime_clock::min_time;
 
@@ -598,8 +601,6 @@
   size_t node() const { return boot_merger_.node(); }
 
   // The start time of this log.
-  // TODO(austin): This concept is probably wrong...  We have start times per
-  // boot, and an order of them.
   monotonic_clock::time_point monotonic_start_time(size_t boot) const {
     return boot_merger_.monotonic_start_time(boot);
   }
@@ -695,6 +696,15 @@
   // Queues m into matched_messages_.
   void QueueMessage(Message *m);
 
+  // Returns the name of the node this class is sorting for.
+  std::string_view node_name() const {
+    return configuration_->has_nodes() ? configuration_->nodes()
+                                             ->Get(boot_merger_.node())
+                                             ->name()
+                                             ->string_view()
+                                       : "(single node)";
+  }
+
   // The node merger to source messages from.
   BootMerger boot_merger_;
 
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 763bb7f..1c67312 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -225,14 +225,14 @@
   const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
 
   Message m1{.channel_index = 0,
-             .queue_index = 0,
+             .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
              .timestamp =
                  BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
              .monotonic_remote_boot = 0xffffff,
              .monotonic_timestamp_boot = 0xffffff,
              .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
   Message m2{.channel_index = 0,
-             .queue_index = 0,
+             .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
              .timestamp =
                  BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
              .monotonic_remote_boot = 0xffffff,
@@ -253,8 +253,8 @@
 
   m1.channel_index = 0;
   m2.channel_index = 0;
-  m1.queue_index = 0;
-  m2.queue_index = 1;
+  m1.queue_index.index = 0u;
+  m2.queue_index.index = 1u;
 
   EXPECT_LT(m1, m2);
   EXPECT_GE(m2, m1);
@@ -695,7 +695,8 @@
   ASSERT_TRUE(parts_sorter.Front() != nullptr);
   parts_sorter.PopFront();
 
-  EXPECT_DEATH({ parts_sorter.Front(); }, "Max out of order of 100000000ns exceeded.");
+  EXPECT_DEATH({ parts_sorter.Front(); },
+               "Max out of order of 100000000ns exceeded.");
 }
 
 // Tests that we can merge data from 2 separate files, including duplicate data.
@@ -2093,7 +2094,6 @@
   const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
 };
 
-
 // Tests that we can match timestamps on delivered messages in the presence of
 // reboots on the node receiving timestamps.
 TEST_F(RebootTimestampMapperTest, ReadNode0First) {
@@ -2114,6 +2114,10 @@
         e + chrono::milliseconds(1000), 0, chrono::seconds(100),
         e + chrono::milliseconds(1001)));
 
+    writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1000), 0, chrono::seconds(21),
+        e + chrono::milliseconds(2001)));
+
     writer0b.QueueSizedFlatbuffer(
         MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
     writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
@@ -2127,7 +2131,8 @@
         e + chrono::milliseconds(3001)));
   }
 
-  const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
+  const std::vector<LogFile> parts =
+      SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
 
   for (const auto &x : parts) {
     LOG(INFO) << x;
@@ -2185,6 +2190,8 @@
     EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output0[0].monotonic_event_time.time,
               e + chrono::milliseconds(1000));
+    EXPECT_EQ(output0[0].queue_index,
+              (BootQueueIndex{.boot = 0u, .index = 0u}));
     EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
     EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
     EXPECT_TRUE(output0[0].data.Verify());
@@ -2192,6 +2199,8 @@
     EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output0[1].monotonic_event_time.time,
               e + chrono::milliseconds(2000));
+    EXPECT_EQ(output0[1].queue_index,
+              (BootQueueIndex{.boot = 0u, .index = 1u}));
     EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
     EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
     EXPECT_TRUE(output0[1].data.Verify());
@@ -2199,6 +2208,8 @@
     EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output0[2].monotonic_event_time.time,
               e + chrono::milliseconds(3000));
+    EXPECT_EQ(output0[2].queue_index,
+              (BootQueueIndex{.boot = 0u, .index = 2u}));
     EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
     EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
     EXPECT_TRUE(output0[2].data.Verify());
@@ -2232,13 +2243,18 @@
     mapper1.PopFront();
     EXPECT_TRUE(mapper1.started());
 
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_TRUE(mapper1.started());
+
     EXPECT_EQ(mapper0_count, 3u);
-    EXPECT_EQ(mapper1_count, 3u);
+    EXPECT_EQ(mapper1_count, 4u);
 
     ASSERT_TRUE(mapper1.Front() == nullptr);
 
     EXPECT_EQ(mapper0_count, 3u);
-    EXPECT_EQ(mapper1_count, 3u);
+    EXPECT_EQ(mapper1_count, 4u);
 
     EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[0].monotonic_event_time.time,
@@ -2246,6 +2262,8 @@
     EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
     EXPECT_EQ(output1[0].monotonic_remote_time.time,
               e + chrono::milliseconds(1000));
+    EXPECT_EQ(output1[0].remote_queue_index,
+              (BootQueueIndex{.boot = 0u, .index = 0u}));
     EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
     EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
               e + chrono::milliseconds(1001));
@@ -2254,9 +2272,11 @@
     EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
     EXPECT_EQ(output1[1].monotonic_event_time.time,
               e + chrono::seconds(20) + chrono::milliseconds(2000));
+    EXPECT_EQ(output1[1].remote_queue_index,
+              (BootQueueIndex{.boot = 0u, .index = 0u}));
     EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
     EXPECT_EQ(output1[1].monotonic_remote_time.time,
-              e + chrono::milliseconds(2000));
+              e + chrono::milliseconds(1000));
     EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
     EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
               e + chrono::milliseconds(2001));
@@ -2264,18 +2284,34 @@
 
     EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
     EXPECT_EQ(output1[2].monotonic_event_time.time,
-              e + chrono::seconds(20) + chrono::milliseconds(3000));
+              e + chrono::seconds(20) + chrono::milliseconds(2000));
     EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
     EXPECT_EQ(output1[2].monotonic_remote_time.time,
-              e + chrono::milliseconds(3000));
+              e + chrono::milliseconds(2000));
+    EXPECT_EQ(output1[2].remote_queue_index,
+              (BootQueueIndex{.boot = 0u, .index = 1u}));
     EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
     EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
-              e + chrono::milliseconds(3001));
+              e + chrono::milliseconds(2001));
     EXPECT_TRUE(output1[2].data.Verify());
 
+    EXPECT_EQ(output1[3].monotonic_event_time.boot, 1u);
+    EXPECT_EQ(output1[3].monotonic_event_time.time,
+              e + chrono::seconds(20) + chrono::milliseconds(3000));
+    EXPECT_EQ(output1[3].monotonic_remote_time.boot, 0u);
+    EXPECT_EQ(output1[3].monotonic_remote_time.time,
+              e + chrono::milliseconds(3000));
+    EXPECT_EQ(output1[3].remote_queue_index,
+              (BootQueueIndex{.boot = 0u, .index = 2u}));
+    EXPECT_EQ(output1[3].monotonic_timestamp_time.boot, 0u);
+    EXPECT_EQ(output1[3].monotonic_timestamp_time.time,
+              e + chrono::milliseconds(3001));
+    EXPECT_TRUE(output1[3].data.Verify());
+
     LOG(INFO) << output1[0];
     LOG(INFO) << output1[1];
     LOG(INFO) << output1[2];
+    LOG(INFO) << output1[3];
   }
 }
 
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 195d442..226c084 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -435,25 +435,19 @@
             absl::StrCat("aos/events/logging/", GetParam().config)))),
         time_converter_(configuration::NodesCount(&config_.message())),
         event_loop_factory_(&config_.message()),
-        pi1_(
-            configuration::GetNode(event_loop_factory_.configuration(), "pi1")),
+        pi1_(event_loop_factory_.GetNodeEventLoopFactory("pi1")),
         pi1_index_(configuration::GetNodeIndex(
-            event_loop_factory_.configuration(), pi1_)),
-        pi2_(
-            configuration::GetNode(event_loop_factory_.configuration(), "pi2")),
+            event_loop_factory_.configuration(), pi1_->node())),
+        pi2_(event_loop_factory_.GetNodeEventLoopFactory("pi2")),
         pi2_index_(configuration::GetNodeIndex(
-            event_loop_factory_.configuration(), pi2_)),
+            event_loop_factory_.configuration(), pi2_->node())),
         tmp_dir_(aos::testing::TestTmpDir()),
         logfile_base1_(tmp_dir_ + "/multi_logfile1"),
         logfile_base2_(tmp_dir_ + "/multi_logfile2"),
         pi1_reboot_logfiles_(MakePi1RebootLogfiles()),
         logfiles_(MakeLogFiles(logfile_base1_, logfile_base2_)),
         pi1_single_direction_logfiles_(MakePi1SingleDirectionLogfiles()),
-        structured_logfiles_(StructureLogFiles()),
-        ping_event_loop_(event_loop_factory_.MakeEventLoop("ping", pi1_)),
-        ping_(ping_event_loop_.get()),
-        pong_event_loop_(event_loop_factory_.MakeEventLoop("pong", pi2_)),
-        pong_(pong_event_loop_.get()) {
+        structured_logfiles_(StructureLogFiles()) {
     LOG(INFO) << "Config " << GetParam().config;
     event_loop_factory_.SetTimeConverter(&time_converter_);
 
@@ -474,6 +468,9 @@
 
     LOG(INFO) << "Logging data to " << logfiles_[0] << ", " << logfiles_[1]
               << " and " << logfiles_[2];
+
+    pi1_->OnStartup([this]() { pi1_->AlwaysStart<Ping>("ping"); });
+    pi2_->OnStartup([this]() { pi2_->AlwaysStart<Pong>("pong"); });
   }
 
   bool shared() const { return GetParam().shared; }
@@ -559,13 +556,14 @@
     result.emplace_back(logfile_base1_ + "_pi1_data.part0.bfbs");
     result.emplace_back(logfile_base1_ + "_pi1_data.part1.bfbs");
     result.emplace_back(logfile_base1_ + "_pi1_data.part2.bfbs");
-    result.emplace_back(logfile_base1_ + "_pi1_data.part3.bfbs");
     result.emplace_back(logfile_base1_ +
                         "_pi2_data/test/aos.examples.Pong.part0.bfbs");
     result.emplace_back(logfile_base1_ +
                         "_pi2_data/test/aos.examples.Pong.part1.bfbs");
     result.emplace_back(logfile_base1_ +
                         "_pi2_data/test/aos.examples.Pong.part2.bfbs");
+    result.emplace_back(logfile_base1_ +
+                        "_pi2_data/test/aos.examples.Pong.part3.bfbs");
     result.emplace_back(
         logfile_base1_ +
         "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0.bfbs");
@@ -576,6 +574,9 @@
         logfile_base1_ +
         "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part2.bfbs");
     result.emplace_back(
+        logfile_base1_ +
+        "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part3.bfbs");
+    result.emplace_back(
         absl::StrCat(logfile_base1_, "_", GetParam().sha256, ".bfbs"));
     if (shared()) {
       result.emplace_back(logfile_base1_ +
@@ -587,6 +588,9 @@
       result.emplace_back(logfile_base1_ +
                           "_timestamps/pi1/aos/remote_timestamps/pi2/"
                           "aos.message_bridge.RemoteMessage.part2.bfbs");
+      result.emplace_back(logfile_base1_ +
+                          "_timestamps/pi1/aos/remote_timestamps/pi2/"
+                          "aos.message_bridge.RemoteMessage.part3.bfbs");
     } else {
       result.emplace_back(logfile_base1_ +
                           "_timestamps/pi1/aos/remote_timestamps/pi2/pi1/aos/"
@@ -600,6 +604,10 @@
                           "_timestamps/pi1/aos/remote_timestamps/pi2/pi1/aos/"
                           "aos-message_bridge-Timestamp/"
                           "aos.message_bridge.RemoteMessage.part2.bfbs");
+      result.emplace_back(logfile_base1_ +
+                          "_timestamps/pi1/aos/remote_timestamps/pi2/pi1/aos/"
+                          "aos-message_bridge-Timestamp/"
+                          "aos.message_bridge.RemoteMessage.part3.bfbs");
 
       result.emplace_back(logfile_base1_ +
                           "_timestamps/pi1/aos/remote_timestamps/pi2/test/"
@@ -613,6 +621,10 @@
                           "_timestamps/pi1/aos/remote_timestamps/pi2/test/"
                           "aos-examples-Ping/"
                           "aos.message_bridge.RemoteMessage.part2.bfbs");
+      result.emplace_back(logfile_base1_ +
+                          "_timestamps/pi1/aos/remote_timestamps/pi2/test/"
+                          "aos-examples-Ping/"
+                          "aos.message_bridge.RemoteMessage.part3.bfbs");
     }
     return result;
   }
@@ -663,7 +675,7 @@
     MultiNodeLogNamer *log_namer;
   };
 
-  LoggerState MakeLogger(const Node *node,
+  LoggerState MakeLogger(NodeEventLoopFactory *node,
                          SimulatedEventLoopFactory *factory = nullptr,
                          const Configuration *configuration = nullptr) {
     if (factory == nullptr) {
@@ -672,13 +684,11 @@
     if (configuration == nullptr) {
       configuration = factory->configuration();
     }
-    return {
-        factory->MakeEventLoop(
-            "logger", configuration::GetNode(factory->configuration(), node)),
-        {},
-        configuration,
-        node,
-        nullptr};
+    return {node->MakeEventLoop("logger"),
+            {},
+            configuration,
+            configuration::GetNode(configuration, node->node()),
+            nullptr};
   }
 
   void StartLogger(LoggerState *logger, std::string logfile_base = "",
@@ -820,9 +830,9 @@
   message_bridge::TestingTimeConverter time_converter_;
   SimulatedEventLoopFactory event_loop_factory_;
 
-  const Node *const pi1_;
+  NodeEventLoopFactory *const pi1_;
   const size_t pi1_index_;
-  const Node *const pi2_;
+  NodeEventLoopFactory *const pi2_;
   const size_t pi2_index_;
 
   std::string tmp_dir_;
@@ -833,11 +843,6 @@
   std::vector<std::string> pi1_single_direction_logfiles_;
 
   std::vector<std::vector<std::string>> structured_logfiles_;
-
-  std::unique_ptr<EventLoop> ping_event_loop_;
-  Ping ping_;
-  std::unique_ptr<EventLoop> pong_event_loop_;
-  Pong pong_;
 };
 
 // Counts the number of messages on a channel.  Returns (channel name, channel
@@ -1490,17 +1495,13 @@
   {
     LoggerState pi2_logger = MakeLogger(pi2_);
 
-    NodeEventLoopFactory *pi1 =
-        event_loop_factory_.GetNodeEventLoopFactory(pi1_);
-    NodeEventLoopFactory *pi2 =
-        event_loop_factory_.GetNodeEventLoopFactory(pi2_);
-    LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
-              << pi2->realtime_now() << " distributed "
-              << pi2->ToDistributedClock(pi2->monotonic_now());
+    LOG(INFO) << "pi2 times: " << pi2_->monotonic_now() << " "
+              << pi2_->realtime_now() << " distributed "
+              << pi2_->ToDistributedClock(pi2_->monotonic_now());
 
-    LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
-              << pi2->realtime_now() << " distributed "
-              << pi2->ToDistributedClock(pi2->monotonic_now());
+    LOG(INFO) << "pi2_ times: " << pi2_->monotonic_now() << " "
+              << pi2_->realtime_now() << " distributed "
+              << pi2_->ToDistributedClock(pi2_->monotonic_now());
 
     event_loop_factory_.RunFor(startup_sleep1);
 
@@ -1519,7 +1520,7 @@
       // than the network delay.  This confirms that if we sort incorrectly, it
       // would show in the results.
       EXPECT_LT(
-          (pi2->monotonic_now() - pi1->monotonic_now()) - initial_pi2_offset,
+          (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
           -event_loop_factory_.send_delay() -
               event_loop_factory_.network_delay());
 
@@ -1528,7 +1529,7 @@
       // And now check that we went far enough the other way to make sure we
       // cover both problems.
       EXPECT_GT(
-          (pi2->monotonic_now() - pi1->monotonic_now()) - initial_pi2_offset,
+          (pi2_->monotonic_now() - pi1_->monotonic_now()) - initial_pi2_offset,
           event_loop_factory_.send_delay() +
               event_loop_factory_.network_delay());
     }
@@ -2140,11 +2141,9 @@
   // And confirm we can re-create a log again, while checking the contents.
   {
     LoggerState pi1_logger = MakeLogger(
-        configuration::GetNode(log_reader_factory.configuration(), pi1_),
-        &log_reader_factory);
+        log_reader_factory.GetNodeEventLoopFactory("pi1"), &log_reader_factory);
     LoggerState pi2_logger = MakeLogger(
-        configuration::GetNode(log_reader_factory.configuration(), pi2_),
-        &log_reader_factory);
+        log_reader_factory.GetNodeEventLoopFactory("pi2"), &log_reader_factory);
 
     StartLogger(&pi1_logger, tmp_dir_ + "/relogged1");
     StartLogger(&pi2_logger, tmp_dir_ + "/relogged2");
@@ -2234,8 +2233,7 @@
 // Test that renaming the file base dies.
 TEST_P(MultinodeLoggerDeathTest, LoggerRenameFile) {
   time_converter_.AddMonotonic(
-      {BootTimestamp::epoch(),
-       BootTimestamp::epoch() + chrono::seconds(1000)});
+      {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
   util::UnlinkRecursive(tmp_dir_ + "/renamefile");
   logfile_base1_ = tmp_dir_ + "/renamefile/multi_logfile1";
   logfile_base2_ = tmp_dir_ + "/renamefile/multi_logfile2";
@@ -2254,29 +2252,50 @@
 // Tests that we properly recreate forwarded timestamps when replaying a log.
 // This should be enough that we can then re-run the logger and get a valid log
 // back.
-TEST_P(MultinodeLoggerDeathTest, RemoteReboot) {
-  time_converter_.StartEqual();
-  std::string pi2_boot1;
-  std::string pi2_boot2;
+TEST_P(MultinodeLoggerTest, RemoteReboot) {
+  const UUID pi1_boot0 = UUID::Random();
+  const UUID pi2_boot0 = UUID::Random();
+  const UUID pi2_boot1 = UUID::Random();
   {
-    pi2_boot1 = event_loop_factory_.GetNodeEventLoopFactory(pi2_)
-                    ->boot_uuid()
-                    .ToString();
+    CHECK_EQ(pi1_index_, 0u);
+    CHECK_EQ(pi2_index_, 1u);
+
+    time_converter_.set_boot_uuid(pi1_index_, 0, pi1_boot0);
+    time_converter_.set_boot_uuid(pi2_index_, 0, pi2_boot0);
+    time_converter_.set_boot_uuid(pi2_index_, 1, pi2_boot1);
+
+    time_converter_.AddNextTimestamp(
+        distributed_clock::epoch(),
+        {BootTimestamp::epoch(), BootTimestamp::epoch()});
+    const chrono::nanoseconds reboot_time = chrono::milliseconds(10100);
+    time_converter_.AddNextTimestamp(
+        distributed_clock::epoch() + reboot_time,
+        {BootTimestamp::epoch() + reboot_time,
+         BootTimestamp{
+             .boot = 1,
+             .time = monotonic_clock::epoch() + chrono::milliseconds(1323)}});
+  }
+
+  {
     LoggerState pi1_logger = MakeLogger(pi1_);
 
     event_loop_factory_.RunFor(chrono::milliseconds(95));
+    EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
+              pi1_boot0);
+    EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
+              pi2_boot0);
 
     StartLogger(&pi1_logger);
 
     event_loop_factory_.RunFor(chrono::milliseconds(10000));
 
-    event_loop_factory_.GetNodeEventLoopFactory(pi2_)->Reboot();
-
-    pi2_boot2 = event_loop_factory_.GetNodeEventLoopFactory(pi2_)
-                    ->boot_uuid()
-                    .ToString();
+    VLOG(1) << "Reboot now!";
 
     event_loop_factory_.RunFor(chrono::milliseconds(20000));
+    EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi1")->boot_uuid(),
+              pi1_boot0);
+    EXPECT_EQ(event_loop_factory_.GetNodeEventLoopFactory("pi2")->boot_uuid(),
+              pi2_boot1);
   }
 
   // Confirm that our new oldest timestamps properly update as we reboot and
@@ -2289,7 +2308,36 @@
       continue;
     }
 
+    const monotonic_clock::time_point monotonic_start_time =
+        monotonic_clock::time_point(
+            chrono::nanoseconds(log_header->message().monotonic_start_time()));
+    const UUID source_node_boot_uuid = UUID::FromString(
+        log_header->message().source_node_boot_uuid()->string_view());
+
     if (log_header->message().node()->name()->string_view() != "pi1") {
+      switch (log_header->message().parts_index()) {
+        case 0:
+          EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
+          EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
+          break;
+        case 1:
+          EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
+          ASSERT_EQ(monotonic_start_time,
+                    monotonic_clock::epoch() + chrono::seconds(1));
+          break;
+        case 2:
+          EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
+          EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
+          break;
+        case 3:
+          EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
+          ASSERT_EQ(monotonic_start_time,
+                    monotonic_clock::epoch() + chrono::nanoseconds(2322999462));
+          break;
+        default:
+          FAIL();
+          break;
+      }
       continue;
     }
     SCOPED_TRACE(file);
@@ -2368,21 +2416,13 @@
         break;
       case 2:
         EXPECT_EQ(oldest_remote_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(10100000)));
+                  monotonic_clock::time_point(chrono::milliseconds(1323) +
+                                              chrono::microseconds(200)));
         EXPECT_EQ(oldest_local_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(10100150)));
+                  monotonic_clock::time_point(chrono::microseconds(10100350)));
         EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
-                  monotonic_clock::max_time);
-        EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
-                  monotonic_clock::max_time);
-        break;
-      case 3:
-        EXPECT_EQ(oldest_remote_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(10100000)));
-        EXPECT_EQ(oldest_local_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(10100150)));
-        EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(10100200)));
+                  monotonic_clock::time_point(chrono::milliseconds(1323) +
+                                              chrono::microseconds(200)));
         EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
                   monotonic_clock::time_point(chrono::microseconds(10100350)));
         break;
@@ -2393,25 +2433,26 @@
   }
 
   // Confirm that we refuse to replay logs with missing boot uuids.
-  EXPECT_DEATH(
-      {
-        LogReader reader(SortParts(pi1_reboot_logfiles_));
+  {
+    LogReader reader(SortParts(pi1_reboot_logfiles_));
 
-        SimulatedEventLoopFactory log_reader_factory(reader.configuration());
-        log_reader_factory.set_send_delay(chrono::microseconds(0));
+    SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+    log_reader_factory.set_send_delay(chrono::microseconds(0));
 
-        // This sends out the fetched messages and advances time to the start of
-        // the log file.
-        reader.Register(&log_reader_factory);
-      },
-      absl::StrFormat("(%s|%s).*(%s|%s).*Found parts from different boots",
-                      pi2_boot1, pi2_boot2, pi2_boot2, pi2_boot1));
+    // This sends out the fetched messages and advances time to the start of
+    // the log file.
+    reader.Register(&log_reader_factory);
+
+    log_reader_factory.Run();
+
+    reader.Deregister();
+  }
 }
 
 // Tests that we properly handle one direction of message_bridge being
 // unavailable.
 TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
-  event_loop_factory_.GetNodeEventLoopFactory(pi1_)->Disconnect(pi2_);
+  pi1_->Disconnect(pi2_->node());
   time_converter_.AddMonotonic(
       {BootTimestamp::epoch(),
        BootTimestamp::epoch() + chrono::seconds(1000)});
@@ -2437,7 +2478,7 @@
 // Tests that we properly handle one direction of message_bridge being
 // unavailable.
 TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
-  event_loop_factory_.GetNodeEventLoopFactory(pi1_)->Disconnect(pi2_);
+  pi1_->Disconnect(pi2_->node());
   time_converter_.AddMonotonic(
       {BootTimestamp::epoch(),
        BootTimestamp::epoch() + chrono::seconds(500)});
@@ -2463,8 +2504,8 @@
 // Tests that we properly handle a dead node.  Do this by just disconnecting it
 // and only using one nodes of logs.
 TEST_P(MultinodeLoggerTest, DeadNode) {
-  event_loop_factory_.GetNodeEventLoopFactory(pi1_)->Disconnect(pi2_);
-  event_loop_factory_.GetNodeEventLoopFactory(pi2_)->Disconnect(pi1_);
+  pi1_->Disconnect(pi2_->node());
+  pi2_->Disconnect(pi1_->node());
   time_converter_.AddMonotonic(
       {BootTimestamp::epoch(),
        BootTimestamp::epoch() + chrono::seconds(1000)});
@@ -2552,10 +2593,10 @@
   std::vector<std::string> log_files;
   {
     LoggerState pi1_logger =
-        MakeLogger(configuration::GetNode(reader.logged_configuration(), pi1_),
+        MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi1"),
                    &log_reader_factory, reader.logged_configuration());
     LoggerState pi2_logger =
-        MakeLogger(configuration::GetNode(reader.logged_configuration(), pi2_),
+        MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
                    &log_reader_factory, reader.logged_configuration());
 
     StartLogger(&pi1_logger, tmp_dir_ + "/relogged1");
diff --git a/aos/events/logging/timestamp_extractor.cc b/aos/events/logging/timestamp_extractor.cc
index 6a2c95a..0cfc19b 100644
--- a/aos/events/logging/timestamp_extractor.cc
+++ b/aos/events/logging/timestamp_extractor.cc
@@ -63,7 +63,20 @@
 
   // Now, build up the estimator used to solve for time.
   message_bridge::MultiNodeNoncausalOffsetEstimator multinode_estimator(
-      config, config, FLAGS_skip_order_validation, chrono::seconds(0));
+      config, config, log_files[0].boots, FLAGS_skip_order_validation,
+      chrono::seconds(0));
+  multinode_estimator.set_reboot_found(
+      [config](distributed_clock::time_point reboot_time,
+               const std::vector<logger::BootTimestamp> &node_times) {
+        LOG(INFO) << "Rebooted at distributed " << reboot_time;
+        size_t node_index = 0;
+        for (const logger::BootTimestamp &time : node_times) {
+          LOG(INFO) << "  "
+                    << config->nodes()->Get(node_index)->name()->string_view()
+                    << " " << time;
+          ++node_index;
+        }
+      });
 
   {
     std::vector<TimestampMapper *> timestamp_mappers;
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index afa1f1a..15375e8 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -299,15 +299,14 @@
 
   size_t size() override { return simulated_channel_->max_size(); }
 
-  bool DoSend(size_t length,
-              aos::monotonic_clock::time_point monotonic_remote_time,
-              aos::realtime_clock::time_point realtime_remote_time,
+  bool DoSend(size_t length, monotonic_clock::time_point monotonic_remote_time,
+              realtime_clock::time_point realtime_remote_time,
               uint32_t remote_queue_index,
               const UUID &source_boot_uuid) override;
 
   bool DoSend(const void *msg, size_t size,
-              aos::monotonic_clock::time_point monotonic_remote_time,
-              aos::realtime_clock::time_point realtime_remote_time,
+              monotonic_clock::time_point monotonic_remote_time,
+              realtime_clock::time_point realtime_remote_time,
               uint32_t remote_queue_index,
               const UUID &source_boot_uuid) override;
 
@@ -319,7 +318,7 @@
 
  private:
   SimulatedChannel *simulated_channel_;
-  SimulatedEventLoop *event_loop_;
+  SimulatedEventLoop *simulated_event_loop_;
 
   std::shared_ptr<SimulatedMessage> message_;
 };
@@ -386,10 +385,10 @@
     if (context_.remote_queue_index == 0xffffffffu) {
       context_.remote_queue_index = context_.queue_index;
     }
-    if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
+    if (context_.monotonic_remote_time == monotonic_clock::min_time) {
       context_.monotonic_remote_time = context_.monotonic_event_time;
     }
-    if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
+    if (context_.realtime_remote_time == realtime_clock::min_time) {
       context_.realtime_remote_time = context_.realtime_event_time;
     }
   }
@@ -477,14 +476,19 @@
         channels_(channels),
         event_loops_(event_loops_),
         node_(node),
-        tid_(tid) {
-    scheduler_->ScheduleOnStartup([this]() {
-      Setup();
-      has_setup_ = true;
+        tid_(tid),
+        startup_tracker_(std::make_shared<StartupTracker>()) {
+    startup_tracker_->loop = this;
+    scheduler_->ScheduleOnStartup([startup_tracker = startup_tracker_]() {
+      if (startup_tracker->loop) {
+        startup_tracker->loop->Setup();
+        startup_tracker->has_setup = true;
+      }
     });
 
     event_loops_->push_back(this);
   }
+
   ~SimulatedEventLoop() override {
     // Trigger any remaining senders or fetchers to be cleared before destroying
     // the event loop so the book keeping matches.
@@ -495,20 +499,27 @@
     phased_loops_.clear();
     watchers_.clear();
 
-    for (auto it = event_loops_->begin(); it != event_loops_->end();
-         ++it) {
+    for (auto it = event_loops_->begin(); it != event_loops_->end(); ++it) {
       if (*it == this) {
         event_loops_->erase(it);
         break;
       }
     }
+    VLOG(1) << scheduler_->distributed_now() << " " << NodeName(node())
+            << monotonic_now() << " ~SimulatedEventLoop(\"" << name_ << "\")";
+    startup_tracker_->loop = nullptr;
   }
 
   void SetIsRunning(bool running) {
-    CHECK(has_setup_);
+    VLOG(1) << scheduler_->distributed_now() << " " << NodeName(node())
+            << monotonic_now() << " " << name_ << " set_is_running(" << running
+            << ")";
+    CHECK(startup_tracker_->has_setup);
 
     set_is_running(running);
-    has_run_ = true;
+    if (running) {
+      has_run_ = true;
+    }
   }
 
   bool has_run() const { return has_run_; }
@@ -518,17 +529,21 @@
     send_delay_ = send_delay;
   }
 
-  ::aos::monotonic_clock::time_point monotonic_now() override {
+  monotonic_clock::time_point monotonic_now() override {
     return node_event_loop_factory_->monotonic_now();
   }
 
-  ::aos::realtime_clock::time_point realtime_now() override {
+  realtime_clock::time_point realtime_now() override {
     return node_event_loop_factory_->realtime_now();
   }
 
-  ::std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) override;
+  distributed_clock::time_point distributed_now() {
+    return scheduler_->distributed_now();
+  }
 
-  ::std::unique_ptr<RawFetcher> MakeRawFetcher(const Channel *channel) override;
+  std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) override;
+
+  std::unique_ptr<RawFetcher> MakeRawFetcher(const Channel *channel) override;
 
   void MakeRawWatcher(
       const Channel *channel,
@@ -598,6 +613,16 @@
   friend class SimulatedPhasedLoopHandler;
   friend class SimulatedWatcher;
 
+  // We have a condition where we register a startup handler, but then get shut
+  // down before it runs.  This results in a segfault if we are lucky, and
+  // corruption otherwise.  To handle that, allocate a small object which points
+  // back to us and can be freed when the function is freed.  That object can
+  // then be updated when we get destroyed so setup is not called.
+  struct StartupTracker {
+    SimulatedEventLoop *loop = nullptr;
+    bool has_setup = false;
+  };
+
   void HandleEvent() {
     while (true) {
       if (EventCount() == 0 || PeekEvent()->event_time() > monotonic_now()) {
@@ -620,8 +645,6 @@
 
   int priority_ = 0;
 
-  bool has_setup_ = false;
-
   std::chrono::nanoseconds send_delay_;
 
   const Node *const node_;
@@ -631,12 +654,14 @@
   std::shared_ptr<logging::LogImplementation> log_impl_ = nullptr;
 
   bool has_run_ = false;
+
+  std::shared_ptr<StartupTracker> startup_tracker_;
 };
 
 void SimulatedEventLoopFactory::set_send_delay(
     std::chrono::nanoseconds send_delay) {
   send_delay_ = send_delay;
-  for (std::unique_ptr<NodeEventLoopFactory> & node : node_factories_) {
+  for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
     if (node) {
       for (SimulatedEventLoop *loop : node->event_loops_) {
         loop->set_send_delay(send_delay_);
@@ -657,9 +682,9 @@
   GetSimulatedChannel(channel)->MakeRawWatcher(shm_watcher.get());
 
   NewWatcher(std::move(shm_watcher));
-  VLOG(1) << monotonic_now() << " " << NodeName(node()) << name()
-          << " MakeRawWatcher "
-          << configuration::StrippedChannelToString(channel);
+  VLOG(1) << distributed_now() << " " << NodeName(node()) << monotonic_now()
+          << " " << name() << " MakeRawWatcher(\""
+          << configuration::StrippedChannelToString(channel) << "\")";
 
   // Order of operations gets kinda wonky if we let people make watchers after
   // running once.  If someone has a valid use case, we can reconsider.
@@ -670,9 +695,9 @@
     const Channel *channel) {
   TakeSender(channel);
 
-  VLOG(1) << monotonic_now() << " " << NodeName(node()) << name()
-          << " MakeRawSender "
-          << configuration::StrippedChannelToString(channel);
+  VLOG(1) << distributed_now() << " " << NodeName(node()) << monotonic_now()
+          << " " << name() << " MakeRawSender(\""
+          << configuration::StrippedChannelToString(channel) << "\")";
   return GetSimulatedChannel(channel)->MakeRawSender(this);
 }
 
@@ -687,9 +712,9 @@
                   "configuration.";
   }
 
-  VLOG(1) << monotonic_now() << " " << NodeName(node()) << name()
-          << " MakeRawFetcher "
-          << configuration::StrippedChannelToString(channel);
+  VLOG(1) << distributed_now() << " " << NodeName(node()) << monotonic_now()
+          << " " << name() << " MakeRawFetcher(\""
+          << configuration::StrippedChannelToString(channel) << "\")";
   return GetSimulatedChannel(channel)->MakeRawFetcher(this);
 }
 
@@ -722,12 +747,19 @@
       channel_(channel),
       scheduler_(scheduler),
       event_(this),
-      token_(scheduler_->InvalidToken()) {}
+      token_(scheduler_->InvalidToken()) {
+  VLOG(1) << simulated_event_loop_->distributed_now() << " "
+          << NodeName(simulated_event_loop_->node())
+          << simulated_event_loop_->monotonic_now() << " "
+          << simulated_event_loop_->name() << " Watching "
+          << configuration::StrippedChannelToString(channel_);
+}
 
 SimulatedWatcher::~SimulatedWatcher() {
-  VLOG(1) << simulated_event_loop_->monotonic_now() << " "
+  VLOG(1) << simulated_event_loop_->distributed_now() << " "
           << NodeName(simulated_event_loop_->node())
-          << simulated_event_loop_->name() << " Stopped Watching "
+          << simulated_event_loop_->monotonic_now() << " "
+          << simulated_event_loop_->name() << " ~Watching "
           << configuration::StrippedChannelToString(channel_);
   simulated_event_loop_->RemoveEvent(&event_);
   if (token_ != scheduler_->InvalidToken()) {
@@ -759,8 +791,10 @@
 void SimulatedWatcher::HandleEvent() {
   const monotonic_clock::time_point monotonic_now =
       simulated_event_loop_->monotonic_now();
-  VLOG(1) << monotonic_now << " " << NodeName(simulated_event_loop_->node())
-          << "Watcher " << simulated_event_loop_->name() << ", "
+  VLOG(1) << simulated_event_loop_->distributed_now() << " "
+          << NodeName(simulated_event_loop_->node())
+          << simulated_event_loop_->monotonic_now() << " "
+          << simulated_event_loop_->name() << " Watcher "
           << configuration::StrippedChannelToString(channel_);
   CHECK_NE(msgs_.size(), 0u) << ": No events to handle.";
 
@@ -776,10 +810,10 @@
   if (context.remote_queue_index == 0xffffffffu) {
     context.remote_queue_index = context.queue_index;
   }
-  if (context.monotonic_remote_time == aos::monotonic_clock::min_time) {
+  if (context.monotonic_remote_time == monotonic_clock::min_time) {
     context.monotonic_remote_time = context.monotonic_event_time;
   }
-  if (context.realtime_remote_time == aos::realtime_clock::min_time) {
+  if (context.realtime_remote_time == realtime_clock::min_time) {
     context.realtime_remote_time = context.realtime_event_time;
   }
 
@@ -871,7 +905,7 @@
                                  SimulatedEventLoop *event_loop)
     : RawSender(event_loop, simulated_channel->channel()),
       simulated_channel_(simulated_channel),
-      event_loop_(event_loop) {
+      simulated_event_loop_(event_loop) {
   simulated_channel_->CountSenderCreated();
 }
 
@@ -884,17 +918,21 @@
                              realtime_clock::time_point realtime_remote_time,
                              uint32_t remote_queue_index,
                              const UUID &source_boot_uuid) {
-  VLOG(1) << event_loop_->monotonic_now() << " "
-          << NodeName(event_loop_->node()) << event_loop_->name()
-          << " Send " << configuration::StrippedChannelToString(channel());
+  VLOG(1) << simulated_event_loop_->distributed_now() << " "
+          << NodeName(simulated_event_loop_->node())
+          << simulated_event_loop_->monotonic_now() << " "
+          << simulated_event_loop_->name() << " Send "
+          << configuration::StrippedChannelToString(channel());
+
   // The allocations in here are due to infrastructure and don't count in the
   // no mallocs in RT code.
   ScopedNotRealtime nrt;
   CHECK_LE(length, size()) << ": Attempting to send too big a message.";
-  message_->context.monotonic_event_time = event_loop_->monotonic_now();
+  message_->context.monotonic_event_time =
+      simulated_event_loop_->monotonic_now();
   message_->context.monotonic_remote_time = monotonic_remote_time;
   message_->context.remote_queue_index = remote_queue_index;
-  message_->context.realtime_event_time = event_loop_->realtime_now();
+  message_->context.realtime_event_time = simulated_event_loop_->realtime_now();
   message_->context.realtime_remote_time = realtime_remote_time;
   message_->context.source_boot_uuid = source_boot_uuid;
   CHECK_LE(length, message_->context.size);
@@ -902,8 +940,8 @@
 
   // TODO(austin): Track sending too fast.
   sent_queue_index_ = simulated_channel_->Send(message_);
-  monotonic_sent_time_ = event_loop_->monotonic_now();
-  realtime_sent_time_ = event_loop_->realtime_now();
+  monotonic_sent_time_ = simulated_event_loop_->monotonic_now();
+  realtime_sent_time_ = simulated_event_loop_->realtime_now();
 
   // Drop the reference to the message so that we allocate a new message for
   // next time.  Otherwise we will continue to reuse the same memory for all
@@ -951,7 +989,7 @@
   // mallocs in RT code.
   ScopedNotRealtime nrt;
   Disable();
-  const ::aos::monotonic_clock::time_point monotonic_now =
+  const monotonic_clock::time_point monotonic_now =
       simulated_event_loop_->monotonic_now();
   base_ = base;
   repeat_offset_ = repeat_offset;
@@ -965,11 +1003,11 @@
 }
 
 void SimulatedTimerHandler::HandleEvent() {
-  const ::aos::monotonic_clock::time_point monotonic_now =
+  const monotonic_clock::time_point monotonic_now =
       simulated_event_loop_->monotonic_now();
-  VLOG(1) << monotonic_now << " " << NodeName(simulated_event_loop_->node())
-          << "Timer '" << simulated_event_loop_->name() << "', '" << name()
-          << "'";
+  VLOG(1) << simulated_event_loop_->distributed_now() << " "
+          << NodeName(simulated_event_loop_->node()) << monotonic_now << " "
+          << simulated_event_loop_->name() << " Timer '" << name() << "'";
   logging::ScopedLogRestorer prev_logger;
   if (simulated_event_loop_->log_impl_) {
     prev_logger.Swap(simulated_event_loop_->log_impl_);
@@ -978,7 +1016,7 @@
     scheduler_->Deschedule(token_);
     token_ = scheduler_->InvalidToken();
   }
-  if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
+  if (repeat_offset_ != monotonic_clock::zero()) {
     // Reschedule.
     while (base_ <= monotonic_now) base_ += repeat_offset_;
     token_ = scheduler_->Schedule(base_, [this]() {
@@ -1065,8 +1103,8 @@
       nodes_(configuration::GetNodes(configuration_)) {
   CHECK(IsInitialized()) << ": Need to initialize AOS first.";
   for (const Node *node : nodes_) {
-    node_factories_.emplace_back(new NodeEventLoopFactory(
-        &scheduler_scheduler_, this, node));
+    node_factories_.emplace_back(
+        new NodeEventLoopFactory(&scheduler_scheduler_, this, node));
   }
 
   if (configuration::MultiNode(configuration)) {
@@ -1100,6 +1138,7 @@
   for (std::unique_ptr<NodeEventLoopFactory> &factory : node_factories_) {
     factory->SetTimeConverter(time_converter);
   }
+  scheduler_scheduler_.SetTimeConverter(time_converter);
 }
 
 ::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop(
@@ -1117,46 +1156,113 @@
 NodeEventLoopFactory::NodeEventLoopFactory(
     EventSchedulerScheduler *scheduler_scheduler,
     SimulatedEventLoopFactory *factory, const Node *node)
-    : factory_(factory), node_(node) {
+    : scheduler_(configuration::GetNodeIndex(factory->configuration(), node)),
+      factory_(factory),
+      node_(node) {
   scheduler_scheduler->AddEventScheduler(&scheduler_);
+  scheduler_.set_started([this]() {
+    started_ = true;
+    for (SimulatedEventLoop *event_loop : event_loops_) {
+      event_loop->SetIsRunning(true);
+    }
+  });
+  scheduler_.set_on_shutdown([this]() {
+    VLOG(1) << scheduler_.distributed_now() << " " << NodeName(this->node())
+            << monotonic_now() << " Shutting down node.";
+    Shutdown();
+    ScheduleStartup();
+  });
+  ScheduleStartup();
 }
 
 NodeEventLoopFactory::~NodeEventLoopFactory() {
+  if (started_) {
+    for (std::function<void()> &fn : on_shutdown_) {
+      fn();
+    }
+
+    VLOG(1) << scheduler_.distributed_now() << " " << NodeName(node())
+            << monotonic_now() << " Shutting down applications.";
+    applications_.clear();
+    started_ = false;
+  }
+
+  if (event_loops_.size() != 0u) {
+    for (SimulatedEventLoop *event_loop : event_loops_) {
+      LOG(ERROR) << scheduler_.distributed_now() << " " << NodeName(node())
+                 << monotonic_now() << " Event loop '" << event_loop->name()
+                 << "' failed to shut down";
+    }
+  }
   CHECK_EQ(event_loops_.size(), 0u) << "Event loop didn't exit";
 }
 
-::std::unique_ptr<EventLoop> NodeEventLoopFactory::MakeEventLoop(
-    std::string_view name) {
+void NodeEventLoopFactory::OnStartup(std::function<void()> &&fn) {
   CHECK(!scheduler_.is_running())
-      << ": Can't create an event loop while running";
-
-  pid_t tid = tid_;
-  ++tid_;
-  ::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
-      &scheduler_, this, &channels_, factory_->configuration(), &event_loops_,
-      node_, tid));
-  result->set_name(name);
-  result->set_send_delay(factory_->send_delay());
-  return std::move(result);
+      << ": Can only register OnStartup handlers when not running.";
+  on_startup_.emplace_back(std::move(fn));
+  if (started_) {
+    size_t on_startup_index = on_startup_.size() - 1;
+    scheduler_.ScheduleOnStartup(
+        [this, on_startup_index]() { on_startup_[on_startup_index](); });
+  }
 }
 
-void NodeEventLoopFactory::Disconnect(const Node *other) {
-  factory_->bridge_->Disconnect(node_, other);
+void NodeEventLoopFactory::OnShutdown(std::function<void()> &&fn) {
+  on_shutdown_.emplace_back(std::move(fn));
 }
 
-void NodeEventLoopFactory::Connect(const Node *other) {
-  factory_->bridge_->Connect(node_, other);
+void NodeEventLoopFactory::ScheduleStartup() {
+  scheduler_.ScheduleOnStartup([this]() {
+    UUID next_uuid = scheduler_.boot_uuid();
+    if (boot_uuid_ != next_uuid) {
+      CHECK_EQ(boot_uuid_, UUID::Zero());
+      boot_uuid_ = next_uuid;
+    }
+    VLOG(1) << scheduler_.distributed_now() << " " << NodeName(this->node())
+            << monotonic_now() << " Starting up node on boot " << boot_uuid_;
+    Startup();
+  });
+}
+
+void NodeEventLoopFactory::Startup() {
+  CHECK(!started_);
+  for (size_t i = 0; i < on_startup_.size(); ++i) {
+    on_startup_[i]();
+  }
+}
+
+void NodeEventLoopFactory::Shutdown() {
+  for (SimulatedEventLoop *event_loop : event_loops_) {
+    event_loop->SetIsRunning(false);
+  }
+
+  CHECK(started_);
+  started_ = false;
+  for (std::function<void()> &fn : on_shutdown_) {
+    fn();
+  }
+
+  VLOG(1) << scheduler_.distributed_now() << " " << NodeName(node())
+          << monotonic_now() << " Shutting down applications.";
+  applications_.clear();
+
+  if (event_loops_.size() != 0u) {
+    for (SimulatedEventLoop *event_loop : event_loops_) {
+      LOG(ERROR) << scheduler_.distributed_now() << " " << NodeName(node())
+                 << monotonic_now() << " Event loop '" << event_loop->name()
+                 << "' failed to shut down";
+    }
+  }
+  CHECK_EQ(event_loops_.size(), 0u) << "Not all event loops shut down";
+  boot_uuid_ = UUID::Zero();
+
+  channels_.clear();
 }
 
 void SimulatedEventLoopFactory::RunFor(monotonic_clock::duration duration) {
+  // This sets running to true too.
   scheduler_scheduler_.RunOnStartup();
-  for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
-    if (node) {
-      for (SimulatedEventLoop *loop : node->event_loops_) {
-        loop->SetIsRunning(true);
-      }
-    }
-  }
   scheduler_scheduler_.RunFor(duration);
   for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
     if (node) {
@@ -1168,14 +1274,8 @@
 }
 
 void SimulatedEventLoopFactory::Run() {
+  // This sets running to true too.
   scheduler_scheduler_.RunOnStartup();
-  for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
-    if (node) {
-      for (SimulatedEventLoop *loop : node->event_loops_) {
-        loop->SetIsRunning(true);
-      }
-    }
-  }
   scheduler_scheduler_.Run();
   for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
     if (node) {
@@ -1203,4 +1303,30 @@
   bridge_->SkipTimingReport();
 }
 
+::std::unique_ptr<EventLoop> NodeEventLoopFactory::MakeEventLoop(
+    std::string_view name) {
+  CHECK(!scheduler_.is_running() || !started_)
+      << ": Can't create an event loop while running";
+
+  pid_t tid = tid_;
+  ++tid_;
+  ::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
+      &scheduler_, this, &channels_, factory_->configuration(), &event_loops_,
+      node_, tid));
+  result->set_name(name);
+  result->set_send_delay(factory_->send_delay());
+
+  VLOG(1) << scheduler_.distributed_now() << " " << NodeName(node())
+          << monotonic_now() << " MakeEventLoop(\"" << result->name() << "\")";
+  return std::move(result);
+}
+
+void NodeEventLoopFactory::Disconnect(const Node *other) {
+  factory_->bridge_->Disconnect(node_, other);
+}
+
+void NodeEventLoopFactory::Connect(const Node *other) {
+  factory_->bridge_->Connect(node_, other);
+}
+
 }  // namespace aos
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index cc016a8..b25f260 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -64,6 +64,12 @@
   SimulatedEventLoopFactory(const Configuration *configuration);
   ~SimulatedEventLoopFactory();
 
+  SimulatedEventLoopFactory(const SimulatedEventLoopFactory &) = delete;
+  SimulatedEventLoopFactory &operator=(const SimulatedEventLoopFactory &) =
+      delete;
+  SimulatedEventLoopFactory(SimulatedEventLoopFactory &&) = delete;
+  SimulatedEventLoopFactory &operator=(SimulatedEventLoopFactory &&) = delete;
+
   // Creates an event loop.  If running in a multi-node environment, node needs
   // to point to the node to create this event loop on.
   ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name,
@@ -78,7 +84,8 @@
   // Sets the time converter for all nodes.
   void SetTimeConverter(TimeConverter *time_converter);
 
-  // Starts executing the event loops unconditionally.
+  // Starts executing the event loops unconditionally until Exit is called or
+  // all the nodes have shut down.
   void Run();
   // Executes the event loops for a duration.
   void RunFor(distributed_clock::duration duration);
@@ -128,11 +135,11 @@
   std::chrono::nanoseconds send_delay_ = std::chrono::microseconds(50);
   std::chrono::nanoseconds network_delay_ = std::chrono::microseconds(100);
 
+  std::unique_ptr<message_bridge::SimulatedMessageBridge> bridge_;
+
   std::vector<std::unique_ptr<NodeEventLoopFactory>> node_factories_;
 
   std::vector<const Node *> nodes_;
-
-  std::unique_ptr<message_bridge::SimulatedMessageBridge> bridge_;
 };
 
 // This class holds all the state required to be a single node.
@@ -156,11 +163,37 @@
   // Returns the current time on both clocks.
   inline monotonic_clock::time_point monotonic_now() const;
   inline realtime_clock::time_point realtime_now() const;
+  inline distributed_clock::time_point distributed_now() const;
 
   const Configuration *configuration() const {
     return factory_->configuration();
   }
 
+  // Starts the node up by calling the OnStartup handlers.  These get called
+  // every time a node is started.
+
+  // Called when a node has started.  This is typically when a log file starts
+  // for a node.
+  void OnStartup(std::function<void()> &&fn);
+
+  // Called when a node shuts down.  These get called every time a node is shut
+  // down.  All applications are destroyed right after the last OnShutdown
+  // callback is called.
+  void OnShutdown(std::function<void()> &&fn);
+
+  // Starts an application if the configuration says it should be started on
+  // this node.  name is the name of the application.  args are the constructor
+  // args for the Main class.  Returns a pointer to the class that was started
+  // if it was started, or nullptr.
+  template <class Main, class... Args>
+  Main *MaybeStart(std::string_view name, Args &&... args);
+
+  // Starts an application regardless of if the config says to or not.  name is
+  // the name of the application, and args are the constructor args for the
+  // application.  Returns a pointer to the class that was started.
+  template <class Main, class... Args>
+  Main *AlwaysStart(std::string_view name, Args &&... args);
+
   // Returns the simulated network delay for messages forwarded between nodes.
   std::chrono::nanoseconds network_delay() const {
     return factory_->network_delay();
@@ -169,6 +202,8 @@
   // node.
   std::chrono::nanoseconds send_delay() const { return factory_->send_delay(); }
 
+  size_t boot_count() const { return scheduler_.boot_count(); }
+
   // TODO(austin): Private for the following?
 
   // Converts a time to the distributed clock for scheduling and cross-node time
@@ -177,7 +212,7 @@
   // replaying logs.  Only convert times in the present or near past.
   inline distributed_clock::time_point ToDistributedClock(
       monotonic_clock::time_point time) const;
-  inline monotonic_clock::time_point FromDistributedClock(
+  inline logger::BootTimestamp FromDistributedClock(
       distributed_clock::time_point time) const;
 
   // Sets the class used to convert time.  This pointer must out-live the
@@ -188,19 +223,13 @@
         time_converter);
   }
 
-  // Sets the boot UUID for this node.  This typically should only be used by
-  // the log reader.
-  void set_boot_uuid(std::string_view uuid) {
-    boot_uuid_ = UUID::FromString(uuid);
-  }
   // Returns the boot UUID for this node.
-  const UUID &boot_uuid() const { return boot_uuid_; }
-
-  // Reboots the node.  This just resets the boot_uuid_, nothing else.
-  // TODO(austin): This is here for a test case or two, not for general
-  // consumption.  The interactions with the rest of the system need to be
-  // worked out better.  Don't use this for anything real yet.
-  void Reboot() { boot_uuid_ = UUID::Random(); }
+  const UUID &boot_uuid() {
+    if (boot_uuid_ == UUID::Zero()) {
+      boot_uuid_ = scheduler_.boot_uuid();
+    }
+    return boot_uuid_;
+  }
 
   // Stops forwarding messages to the other node, and reports disconnected in
   // the ServerStatistics message for this node, and the ClientStatistics for
@@ -214,10 +243,15 @@
   NodeEventLoopFactory(EventSchedulerScheduler *scheduler_scheduler,
                        SimulatedEventLoopFactory *factory, const Node *node);
 
+  // Helpers to restart.
+  void ScheduleStartup();
+  void Startup();
+  void Shutdown();
+
   EventScheduler scheduler_;
   SimulatedEventLoopFactory *const factory_;
 
-  UUID boot_uuid_ = UUID::Random();
+  UUID boot_uuid_ = UUID::Zero();
 
   const Node *const node_;
 
@@ -230,8 +264,72 @@
 
   // pid so we get unique timing reports.
   pid_t tid_ = 0;
+
+  // True if we are started.
+  bool started_ = false;
+
+  std::vector<std::function<void()>> pending_on_startup_;
+  std::vector<std::function<void()>> on_startup_;
+  std::vector<std::function<void()>> on_shutdown_;
+
+  // Base class for an application to start.  This shouldn't be used directly.
+  struct Application {
+    Application(NodeEventLoopFactory *node_factory, std::string_view name)
+        : event_loop(node_factory->MakeEventLoop(name)) {}
+    virtual ~Application() {}
+
+    std::unique_ptr<EventLoop> event_loop;
+  };
+
+  // Subclass to do type erasure for the base class.  Holds an instance of a
+  // specific class.  Use SimulationStarter instead.
+  template <typename Main>
+  struct TypedApplication : public Application {
+    // Constructs an Application by delegating the arguments used to construct
+    // the event loop to Application and the rest of the args to the actual
+    // application.
+    template <class... Args>
+    TypedApplication(NodeEventLoopFactory *node_factory, std::string_view name,
+                     Args &&... args)
+        : Application(node_factory, name),
+          main(event_loop.get(), std::forward<Args>(args)...) {
+      VLOG(1) << node_factory->scheduler_.distributed_now() << " "
+              << (node_factory->node() == nullptr
+                      ? ""
+                      : node_factory->node()->name()->str() + " ")
+              << node_factory->monotonic_now() << " Starting Application \""
+              << name << "\"";
+    }
+    ~TypedApplication() override {}
+
+    Main main;
+  };
+
+  std::vector<std::unique_ptr<Application>> applications_;
 };
 
+template <class Main, class... Args>
+Main *NodeEventLoopFactory::MaybeStart(std::string_view name, Args &&... args) {
+  const aos::Application *application =
+      configuration::GetApplication(configuration(), node(), name);
+
+  if (application != nullptr) {
+    return AlwaysStart<Main>(name, std::forward<Args>(args)...);
+  }
+  return nullptr;
+}
+
+template <class Main, class... Args>
+Main *NodeEventLoopFactory::AlwaysStart(std::string_view name,
+                                        Args &&... args) {
+  std::unique_ptr<TypedApplication<Main>> app =
+      std::make_unique<TypedApplication<Main>>(this, name,
+                                               std::forward<Args>(args)...);
+  Main *main_ptr = &app->main;
+  applications_.emplace_back(std::move(app));
+  return main_ptr;
+}
+
 inline monotonic_clock::time_point NodeEventLoopFactory::monotonic_now() const {
   // TODO(austin): Confirm that time never goes backwards?
   return scheduler_.monotonic_now();
@@ -242,7 +340,12 @@
                                     realtime_offset_);
 }
 
-inline monotonic_clock::time_point NodeEventLoopFactory::FromDistributedClock(
+inline distributed_clock::time_point NodeEventLoopFactory::distributed_now()
+    const {
+  return scheduler_.distributed_now();
+}
+
+inline logger::BootTimestamp NodeEventLoopFactory::FromDistributedClock(
     distributed_clock::time_point time) const {
   return scheduler_.FromDistributedClock(time);
 }
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 83568da..43c0c97 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -22,6 +22,7 @@
 
 using aos::testing::ArtifactPath;
 
+using logger::BootTimestamp;
 using message_bridge::RemoteMessage;
 namespace chrono = ::std::chrono;
 
@@ -140,7 +141,7 @@
 TEST(EventSchedulerTest, ScheduleEvent) {
   int counter = 0;
   EventSchedulerScheduler scheduler_scheduler;
-  EventScheduler scheduler;
+  EventScheduler scheduler(0);
   scheduler_scheduler.AddEventScheduler(&scheduler);
 
   scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1),
@@ -158,7 +159,7 @@
 TEST(EventSchedulerTest, DescheduleEvent) {
   int counter = 0;
   EventSchedulerScheduler scheduler_scheduler;
-  EventScheduler scheduler;
+  EventScheduler scheduler(0);
   scheduler_scheduler.AddEventScheduler(&scheduler);
 
   auto token = scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1),
@@ -778,15 +779,13 @@
   message_bridge::TestingTimeConverter time(
       configuration::NodesCount(&config.message()));
   SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
-  NodeEventLoopFactory *pi2_factory =
-      simulated_event_loop_factory.GetNodeEventLoopFactory(pi2);
-  pi2_factory->SetTimeConverter(&time);
+  simulated_event_loop_factory.SetTimeConverter(&time);
 
   constexpr chrono::milliseconds kOffset{1501};
   time.AddNextTimestamp(
       distributed_clock::epoch(),
-      {logger::BootTimestamp::epoch(), logger::BootTimestamp::epoch() + kOffset,
-       logger::BootTimestamp::epoch()});
+      {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
+       BootTimestamp::epoch()});
 
   std::unique_ptr<EventLoop> ping_event_loop =
       simulated_event_loop_factory.MakeEventLoop("ping", pi1);
@@ -871,7 +870,7 @@
                                       chrono::milliseconds(5));
 
   EXPECT_EQ(pi1_server_statistics_count, 10);
-  EXPECT_EQ(pi2_server_statistics_count, 9);
+  EXPECT_EQ(pi2_server_statistics_count, 10);
   EXPECT_EQ(pi3_server_statistics_count, 10);
 }
 
@@ -1034,31 +1033,32 @@
 
 // Test that disconnecting nodes actually disconnects them.
 TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
-  const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
-  const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
-  const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
-
   SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
 
-  std::unique_ptr<EventLoop> ping_event_loop =
-      simulated_event_loop_factory.MakeEventLoop("ping", pi1);
+  NodeEventLoopFactory *pi1 =
+      simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
+  NodeEventLoopFactory *pi2 =
+      simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
+  NodeEventLoopFactory *pi3 =
+      simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
+
+  std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
   Ping ping(ping_event_loop.get());
 
-  std::unique_ptr<EventLoop> pong_event_loop =
-      simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+  std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
   Pong pong(pong_event_loop.get());
 
   std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
-      simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
+      pi2->MakeEventLoop("pi2_pong_counter");
 
   MessageCounter<examples::Pong> pi2_pong_counter(
       pi2_pong_counter_event_loop.get(), "/test");
 
   std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
-      simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
+      pi3->MakeEventLoop("pi3_pong_counter");
 
   std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
-      simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
+      pi1->MakeEventLoop("pi1_pong_counter");
 
   MessageCounter<examples::Pong> pi1_pong_counter(
       pi1_pong_counter_event_loop.get(), "/test");
@@ -1088,8 +1088,13 @@
           MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
 
   MessageCounter<message_bridge::ServerStatistics>
-      pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
-                                    "/pi1/aos");
+      *pi1_server_statistics_counter;
+  pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
+    pi1_server_statistics_counter =
+        pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
+            "pi1_server_statistics_counter", "/pi1/aos");
+  });
+
   aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
       pi1_pong_counter_event_loop
           ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
@@ -1098,8 +1103,12 @@
           ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
 
   MessageCounter<message_bridge::ServerStatistics>
-      pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
-                                    "/pi2/aos");
+      *pi2_server_statistics_counter;
+  pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
+    pi2_server_statistics_counter =
+        pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
+            "pi2_server_statistics_counter", "/pi2/aos");
+  });
   aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
       pi2_pong_counter_event_loop
           ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
@@ -1108,8 +1117,12 @@
           ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
 
   MessageCounter<message_bridge::ServerStatistics>
-      pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
-                                    "/pi3/aos");
+      *pi3_server_statistics_counter;
+  pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
+    pi3_server_statistics_counter =
+        pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
+            "pi3_server_statistics_counter", "/pi3/aos");
+  });
   aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
       pi3_pong_counter_event_loop
           ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
@@ -1141,9 +1154,9 @@
   EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
   EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
 
-  EXPECT_EQ(pi1_server_statistics_counter.count(), 2u);
-  EXPECT_EQ(pi2_server_statistics_counter.count(), 2u);
-  EXPECT_EQ(pi3_server_statistics_counter.count(), 2u);
+  EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
+  EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
+  EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
 
   EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
   EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
@@ -1172,7 +1185,7 @@
   EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
       << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
 
-  simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)->Disconnect(pi3);
+  pi1->Disconnect(pi3->node());
 
   simulated_event_loop_factory.RunFor(chrono::seconds(2));
 
@@ -1187,9 +1200,9 @@
   EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
   EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
 
-  EXPECT_EQ(pi1_server_statistics_counter.count(), 4u);
-  EXPECT_EQ(pi2_server_statistics_counter.count(), 4u);
-  EXPECT_EQ(pi3_server_statistics_counter.count(), 4u);
+  EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
+  EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
+  EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
 
   EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
   EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
@@ -1218,7 +1231,7 @@
   EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
       << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
 
-  simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)->Connect(pi3);
+  pi1->Connect(pi3->node());
 
   simulated_event_loop_factory.RunFor(chrono::seconds(2));
 
@@ -1233,9 +1246,9 @@
   EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
   EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
 
-  EXPECT_EQ(pi1_server_statistics_counter.count(), 6u);
-  EXPECT_EQ(pi2_server_statistics_counter.count(), 6u);
-  EXPECT_EQ(pi3_server_statistics_counter.count(), 6u);
+  EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
+  EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
+  EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
 
   EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
   EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
@@ -1286,20 +1299,17 @@
   message_bridge::TestingTimeConverter time(
       configuration::NodesCount(&config.message()));
   SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
-  NodeEventLoopFactory *pi2_factory =
-      simulated_event_loop_factory.GetNodeEventLoopFactory(pi2);
-  pi2_factory->SetTimeConverter(&time);
+  simulated_event_loop_factory.SetTimeConverter(&time);
 
   constexpr chrono::milliseconds kOffset{150100};
   time.AddNextTimestamp(
       distributed_clock::epoch(),
-      {logger::BootTimestamp::epoch(), logger::BootTimestamp::epoch() + kOffset,
-       logger::BootTimestamp::epoch()});
-  time.AddNextTimestamp(
-      distributed_clock::epoch() + chrono::seconds(10),
-      {logger::BootTimestamp::epoch() + chrono::milliseconds(9999),
-       logger::BootTimestamp::epoch() + kOffset + chrono::seconds(10),
-       logger::BootTimestamp::epoch() + chrono::milliseconds(9999)});
+      {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
+       BootTimestamp::epoch()});
+  time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
+                        {BootTimestamp::epoch() + chrono::milliseconds(9999),
+                         BootTimestamp::epoch() + kOffset + chrono::seconds(10),
+                         BootTimestamp::epoch() + chrono::milliseconds(9999)});
 
   std::unique_ptr<EventLoop> ping_event_loop =
       simulated_event_loop_factory.MakeEventLoop("ping", pi1);
@@ -1470,23 +1480,51 @@
 // Tests that rebooting a node changes the ServerStatistics message and the
 // RemoteTimestamp message.
 TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
-  const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
-  const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
 
-  SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
+  message_bridge::TestingTimeConverter time(
+      configuration::NodesCount(&config.message()));
+  SimulatedEventLoopFactory factory(&config.message());
+  factory.SetTimeConverter(&time);
 
-  std::unique_ptr<EventLoop> ping_event_loop =
-      simulated_event_loop_factory.MakeEventLoop("ping", pi1);
-  Ping ping(ping_event_loop.get());
+  const size_t pi1_index =
+      configuration::GetNodeIndex(&config.message(), "pi1");
+  const size_t pi2_index =
+      configuration::GetNodeIndex(&config.message(), "pi2");
+  const size_t pi3_index =
+      configuration::GetNodeIndex(&config.message(), "pi3");
 
-  std::unique_ptr<EventLoop> pong_event_loop =
-      simulated_event_loop_factory.MakeEventLoop("pong", pi2);
-  Pong pong(pong_event_loop.get());
+  const UUID pi1_boot0 = UUID::Random();
+  const UUID pi2_boot0 = UUID::Random();
+  const UUID pi2_boot1 = UUID::Random();
+  const UUID pi3_boot0 = UUID::Random();
+  {
+    time.AddNextTimestamp(distributed_clock::epoch(),
+                          {BootTimestamp::epoch(), BootTimestamp::epoch(),
+                           BootTimestamp::epoch()});
+
+    const chrono::nanoseconds dt = chrono::milliseconds(2001);
+
+    time.AddNextTimestamp(
+        distributed_clock::epoch() + dt,
+        {BootTimestamp::epoch() + dt,
+         BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
+         BootTimestamp::epoch() + dt});
+
+    time.set_boot_uuid(pi1_index, 0, pi1_boot0);
+    time.set_boot_uuid(pi2_index, 0, pi2_boot0);
+    time.set_boot_uuid(pi2_index, 1, pi2_boot1);
+    time.set_boot_uuid(pi3_index, 0, pi3_boot0);
+  }
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+  NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+  pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
+  pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
 
   std::unique_ptr<EventLoop> pi1_remote_timestamp =
-      simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
-  UUID expected_boot_uuid =
-      simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->boot_uuid();
+      pi1->MakeEventLoop("pi1_remote_timestamp");
+  UUID expected_boot_uuid = pi2_boot0;
 
   int timestamp_count = 0;
   pi1_remote_timestamp->MakeWatcher(
@@ -1512,41 +1550,65 @@
       });
 
   int pi1_server_statistics_count = 0;
+  bool first_pi1_server_statistics = true;
   pi1_remote_timestamp->MakeWatcher(
-      "/pi1/aos", [&pi1_server_statistics_count, &expected_boot_uuid](
+      "/pi1/aos", [&pi1_server_statistics_count, &expected_boot_uuid,
+                   &first_pi1_server_statistics](
                       const message_bridge::ServerStatistics &stats) {
         VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
         for (const message_bridge::ServerConnection *connection :
              *stats.connections()) {
-          EXPECT_TRUE(connection->has_boot_uuid());
+          if (connection->state() == message_bridge::State::CONNECTED) {
+            ASSERT_TRUE(connection->has_boot_uuid());
+          }
+          if (!first_pi1_server_statistics) {
+            EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+          }
           if (connection->node()->name()->string_view() == "pi2") {
+            EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+            ASSERT_TRUE(connection->has_boot_uuid());
             EXPECT_EQ(expected_boot_uuid,
                       UUID::FromString(connection->boot_uuid()))
                 << " : Got " << aos::FlatbufferToJson(&stats);
             ++pi1_server_statistics_count;
           }
         }
+        first_pi1_server_statistics = false;
       });
 
+  int pi1_client_statistics_count = 0;
+  pi1_remote_timestamp->MakeWatcher(
+      "/pi1/aos", [&pi1_client_statistics_count](
+                      const message_bridge::ClientStatistics &stats) {
+        VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
+        for (const message_bridge::ClientConnection *connection :
+             *stats.connections()) {
+          EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+          if (connection->node()->name()->string_view() == "pi2") {
+            ++pi1_client_statistics_count;
+          }
+        }
+      });
+
+  // Confirm that reboot changes the UUID.
+  pi2->OnShutdown([&expected_boot_uuid, pi2, pi2_boot1]() {
+    expected_boot_uuid = pi2_boot1;
+    LOG(INFO) << "OnShutdown triggered for pi2";
+    pi2->OnStartup([&expected_boot_uuid, pi2]() {
+      EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
+    });
+  });
+
   // Let a couple of ServerStatistics messages show up before rebooting.
-  simulated_event_loop_factory.RunFor(chrono::milliseconds(2001));
+  factory.RunFor(chrono::milliseconds(2002));
 
   EXPECT_GT(timestamp_count, 100);
   EXPECT_GE(pi1_server_statistics_count, 1u);
 
-  // Confirm that reboot changes the UUID.
-  simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->Reboot();
-
-  EXPECT_NE(
-      expected_boot_uuid,
-      simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->boot_uuid());
-
-  expected_boot_uuid =
-      simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->boot_uuid();
   timestamp_count = 0;
   pi1_server_statistics_count = 0;
 
-  simulated_event_loop_factory.RunFor(chrono::milliseconds(2000));
+  factory.RunFor(chrono::milliseconds(2000));
   EXPECT_GT(timestamp_count, 100);
   EXPECT_GE(pi1_server_statistics_count, 1u);
 }
@@ -1557,5 +1619,321 @@
         Param{"multinode_pingpong_test_combined_config.json", true},
         Param{"multinode_pingpong_test_split_config.json", false}));
 
+// Tests that Startup and Shutdown do reasonable things.
+TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+  message_bridge::TestingTimeConverter time(
+      configuration::NodesCount(&config.message()));
+  SimulatedEventLoopFactory factory(&config.message());
+  factory.SetTimeConverter(&time);
+  time.AddNextTimestamp(
+      distributed_clock::epoch(),
+      {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
+
+  const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
+
+  time.AddNextTimestamp(
+      distributed_clock::epoch() + dt,
+      {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
+       BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
+       BootTimestamp::epoch() + dt});
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+  NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+  // Configure startup to start Ping and Pong, and count.
+  size_t pi1_startup_counter = 0;
+  size_t pi2_startup_counter = 0;
+  pi1->OnStartup([pi1]() {
+    LOG(INFO) << "Made ping";
+    pi1->AlwaysStart<Ping>("ping");
+  });
+  pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
+  pi2->OnStartup([pi2]() {
+    LOG(INFO) << "Made pong";
+    pi2->AlwaysStart<Pong>("pong");
+  });
+  pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
+
+  // Shutdown just counts.
+  size_t pi1_shutdown_counter = 0;
+  size_t pi2_shutdown_counter = 0;
+  pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
+  pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
+
+  MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
+  MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
+
+  // Automatically make counters on startup.
+  pi1->OnStartup([&pi1_pong_counter, pi1]() {
+    pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
+        "pi1_pong_counter", "/test");
+  });
+  pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
+  pi2->OnStartup([&pi2_ping_counter, pi2]() {
+    pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
+        "pi2_ping_counter", "/test");
+  });
+  pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
+
+  EXPECT_EQ(pi2_ping_counter, nullptr);
+  EXPECT_EQ(pi1_pong_counter, nullptr);
+
+  EXPECT_EQ(pi1_startup_counter, 0u);
+  EXPECT_EQ(pi2_startup_counter, 0u);
+  EXPECT_EQ(pi1_shutdown_counter, 0u);
+  EXPECT_EQ(pi2_shutdown_counter, 0u);
+
+  factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
+  EXPECT_EQ(pi1_startup_counter, 1u);
+  EXPECT_EQ(pi2_startup_counter, 1u);
+  EXPECT_EQ(pi1_shutdown_counter, 0u);
+  EXPECT_EQ(pi2_shutdown_counter, 0u);
+  EXPECT_EQ(pi2_ping_counter->count(), 1001);
+  EXPECT_EQ(pi1_pong_counter->count(), 1001);
+
+  LOG(INFO) << pi1->monotonic_now();
+  LOG(INFO) << pi2->monotonic_now();
+
+  factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
+
+  EXPECT_EQ(pi1_startup_counter, 2u);
+  EXPECT_EQ(pi2_startup_counter, 2u);
+  EXPECT_EQ(pi1_shutdown_counter, 1u);
+  EXPECT_EQ(pi2_shutdown_counter, 1u);
+  EXPECT_EQ(pi2_ping_counter->count(), 501);
+  EXPECT_EQ(pi1_pong_counter->count(), 501);
+}
+
+// Tests that OnStartup handlers can be added after running and get called, and
+// can't be called when running.
+TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+  // Test that we can add startup handlers as long as we aren't running, and
+  // they get run when Run gets called again.
+  // Test that adding a startup handler when running fails.
+  //
+  // Test shutdown handlers get called on destruction.
+  SimulatedEventLoopFactory factory(&config.message());
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+
+  int startup_count0 = 0;
+  int startup_count1 = 0;
+
+  pi1->OnStartup([&]() { ++startup_count0; });
+  EXPECT_EQ(startup_count0, 0);
+  EXPECT_EQ(startup_count1, 0);
+
+  factory.RunFor(chrono::nanoseconds(1));
+  EXPECT_EQ(startup_count0, 1);
+  EXPECT_EQ(startup_count1, 0);
+
+  pi1->OnStartup([&]() { ++startup_count1; });
+  EXPECT_EQ(startup_count0, 1);
+  EXPECT_EQ(startup_count1, 0);
+
+  factory.RunFor(chrono::nanoseconds(1));
+  EXPECT_EQ(startup_count0, 1);
+  EXPECT_EQ(startup_count1, 1);
+
+  std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
+  loop->OnRun([&]() { pi1->OnStartup([]() {}); });
+
+  EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
+               "Can only register OnStartup handlers when not running.");
+}
+
+// Tests that OnStartup handlers can be added after running and get called, and
+// all the handlers get called on reboot.  Shutdown handlers are tested the same
+// way.
+TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+  message_bridge::TestingTimeConverter time(
+      configuration::NodesCount(&config.message()));
+  SimulatedEventLoopFactory factory(&config.message());
+  factory.SetTimeConverter(&time);
+  time.StartEqual();
+
+  const chrono::nanoseconds dt = chrono::seconds(10);
+  time.RebootAt(0, distributed_clock::epoch() + dt);
+  time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+
+  int startup_count0 = 0;
+  int shutdown_count0 = 0;
+  int startup_count1 = 0;
+  int shutdown_count1 = 0;
+
+  pi1->OnStartup([&]() { ++startup_count0; });
+  pi1->OnShutdown([&]() { ++shutdown_count0; });
+  EXPECT_EQ(startup_count0, 0);
+  EXPECT_EQ(startup_count1, 0);
+  EXPECT_EQ(shutdown_count0, 0);
+  EXPECT_EQ(shutdown_count1, 0);
+
+  factory.RunFor(chrono::nanoseconds(1));
+  EXPECT_EQ(startup_count0, 1);
+  EXPECT_EQ(startup_count1, 0);
+  EXPECT_EQ(shutdown_count0, 0);
+  EXPECT_EQ(shutdown_count1, 0);
+
+  pi1->OnStartup([&]() { ++startup_count1; });
+  EXPECT_EQ(startup_count0, 1);
+  EXPECT_EQ(startup_count1, 0);
+  EXPECT_EQ(shutdown_count0, 0);
+  EXPECT_EQ(shutdown_count1, 0);
+
+  factory.RunFor(chrono::nanoseconds(1));
+  EXPECT_EQ(startup_count0, 1);
+  EXPECT_EQ(startup_count1, 1);
+  EXPECT_EQ(shutdown_count0, 0);
+  EXPECT_EQ(shutdown_count1, 0);
+
+  factory.RunFor(chrono::seconds(15));
+
+  EXPECT_EQ(startup_count0, 2);
+  EXPECT_EQ(startup_count1, 2);
+  EXPECT_EQ(shutdown_count0, 1);
+  EXPECT_EQ(shutdown_count1, 0);
+
+  pi1->OnShutdown([&]() { ++shutdown_count1; });
+  factory.RunFor(chrono::seconds(10));
+
+  EXPECT_EQ(startup_count0, 3);
+  EXPECT_EQ(startup_count1, 3);
+  EXPECT_EQ(shutdown_count0, 2);
+  EXPECT_EQ(shutdown_count1, 1);
+}
+
+// Tests that event loops which outlive shutdown crash.
+TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+  message_bridge::TestingTimeConverter time(
+      configuration::NodesCount(&config.message()));
+  SimulatedEventLoopFactory factory(&config.message());
+  factory.SetTimeConverter(&time);
+  time.StartEqual();
+
+  const chrono::nanoseconds dt = chrono::seconds(10);
+  time.RebootAt(0, distributed_clock::epoch() + dt);
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+
+  std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
+
+  EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
+}
+
+// Tests that messages don't survive a reboot of a node.
+TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+  message_bridge::TestingTimeConverter time(
+      configuration::NodesCount(&config.message()));
+  SimulatedEventLoopFactory factory(&config.message());
+  factory.SetTimeConverter(&time);
+  time.StartEqual();
+
+  const chrono::nanoseconds dt = chrono::seconds(10);
+  time.RebootAt(0, distributed_clock::epoch() + dt);
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+
+  const UUID boot_uuid = pi1->boot_uuid();
+  EXPECT_NE(boot_uuid, UUID::Zero());
+
+  {
+    ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
+    aos::Sender<examples::Ping> test_message_sender =
+        ping_event_loop->MakeSender<examples::Ping>("/reliable");
+    SendPing(&test_message_sender, 1);
+  }
+
+  factory.RunFor(chrono::seconds(5));
+
+  {
+    ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
+    aos::Fetcher<examples::Ping> fetcher =
+        ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
+    EXPECT_TRUE(fetcher.Fetch());
+  }
+
+  factory.RunFor(chrono::seconds(10));
+
+  {
+    ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
+    aos::Fetcher<examples::Ping> fetcher =
+        ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
+    EXPECT_FALSE(fetcher.Fetch());
+  }
+  EXPECT_NE(boot_uuid, pi1->boot_uuid());
+}
+
+// Tests that reliable messages get resent on reboot.
+TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+  message_bridge::TestingTimeConverter time(
+      configuration::NodesCount(&config.message()));
+  SimulatedEventLoopFactory factory(&config.message());
+  factory.SetTimeConverter(&time);
+  time.StartEqual();
+
+  const chrono::nanoseconds dt = chrono::seconds(1);
+  time.RebootAt(1, distributed_clock::epoch() + dt);
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+  NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+  const UUID pi1_boot_uuid = pi1->boot_uuid();
+  const UUID pi2_boot_uuid = pi2->boot_uuid();
+  EXPECT_NE(pi1_boot_uuid, UUID::Zero());
+  EXPECT_NE(pi2_boot_uuid, UUID::Zero());
+
+  {
+    ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
+    aos::Sender<examples::Ping> test_message_sender =
+        ping_event_loop->MakeSender<examples::Ping>("/reliable");
+    SendPing(&test_message_sender, 1);
+  }
+
+  factory.RunFor(chrono::milliseconds(500));
+
+  {
+    ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
+    aos::Fetcher<examples::Ping> fetcher =
+        ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
+    EXPECT_TRUE(fetcher.Fetch());
+  }
+
+  factory.RunFor(chrono::seconds(1));
+
+  {
+    ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
+    aos::Fetcher<examples::Ping> fetcher =
+        ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
+    EXPECT_TRUE(fetcher.Fetch());
+  }
+  EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
+}
+
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 11e8c56..f4a4188 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -16,55 +16,192 @@
 // fetcher to manage the queue of data, and a timer to schedule the sends.
 class RawMessageDelayer {
  public:
-  RawMessageDelayer(aos::NodeEventLoopFactory *fetch_node_factory,
+  RawMessageDelayer(const Channel *channel, const Connection *connection,
+                    aos::NodeEventLoopFactory *fetch_node_factory,
                     aos::NodeEventLoopFactory *send_node_factory,
-                    aos::EventLoop *fetch_event_loop,
-                    aos::EventLoop *send_event_loop,
-                    std::unique_ptr<aos::RawFetcher> fetcher,
-                    std::unique_ptr<aos::RawSender> sender,
-                    MessageBridgeServerStatus *server_status,
-                    size_t destination_node_index,
-                    ServerConnection *server_connection, int client_index,
-                    MessageBridgeClientStatus *client_status,
-                    size_t channel_index,
-                    aos::Sender<RemoteMessage> *timestamp_logger)
-      : fetch_node_factory_(fetch_node_factory),
+                    size_t destination_node_index, bool delivery_time_is_logged)
+      : channel_(channel),
+        connection_(connection),
+        fetch_node_factory_(fetch_node_factory),
         send_node_factory_(send_node_factory),
-        fetch_event_loop_(fetch_event_loop),
-        send_event_loop_(send_event_loop),
-        fetcher_(std::move(fetcher)),
-        sender_(std::move(sender)),
-        server_status_(server_status),
         destination_node_index_(destination_node_index),
-        server_connection_(server_connection),
-        client_status_(client_status),
-        client_index_(client_index),
-        client_connection_(client_status_->GetClientConnection(client_index)),
-        channel_index_(channel_index),
-        timestamp_logger_(timestamp_logger) {
-    timer_ = send_event_loop_->AddTimer([this]() { Send(); });
-    std::string timer_name =
-        absl::StrCat(send_event_loop_->node()->name()->string_view(), " ",
-                     fetcher_->channel()->name()->string_view(), " ",
-                     fetcher_->channel()->type()->string_view());
-    timer_->set_name(timer_name);
-    timestamp_timer_ =
-        fetch_event_loop_->AddTimer([this]() { SendTimestamp(); });
-    timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
+        channel_index_(configuration::ChannelIndex(
+            fetch_node_factory_->configuration(), channel_)),
+        delivery_time_is_logged_(delivery_time_is_logged) {}
 
-    Schedule();
+  bool forwarding_disabled() const { return forwarding_disabled_; }
+  void set_forwarding_disabled(bool forwarding_disabled) {
+    forwarding_disabled_ = forwarding_disabled;
   }
 
-  const Channel *channel() const { return fetcher_->channel(); }
+  void SetFetchEventLoop(aos::EventLoop *fetch_event_loop,
+                         MessageBridgeServerStatus *server_status,
+                         ChannelTimestampSender *timestamp_loggers) {
+    sent_ = false;
+    fetch_event_loop_ = fetch_event_loop;
+    if (fetch_event_loop_) {
+      fetcher_ = fetch_event_loop_->MakeRawFetcher(channel_);
+    } else {
+      fetcher_ = nullptr;
+    }
+
+    server_status_ = server_status;
+    if (server_status) {
+      server_connection_ =
+          server_status_->FindServerConnection(send_node_factory_->node());
+    }
+    if (delivery_time_is_logged_ && timestamp_loggers != nullptr) {
+      timestamp_logger_ =
+          timestamp_loggers->SenderForChannel(channel_, connection_);
+    } else {
+      timestamp_logger_ = nullptr;
+    }
+
+    if (fetch_event_loop_) {
+      timestamp_timer_ =
+          fetch_event_loop_->AddTimer([this]() { SendTimestamp(); });
+      if (send_event_loop_) {
+        std::string timer_name = absl::StrCat(
+            send_event_loop_->node()->name()->string_view(), " ",
+            fetcher_->channel()->name()->string_view(), " ",
+            fetcher_->channel()->type()->string_view());
+        if (timer_) {
+          timer_->set_name(timer_name);
+        }
+        timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
+      }
+    } else {
+      timestamp_timer_ = nullptr;
+    }
+  }
+
+  void SetSendEventLoop(aos::EventLoop *send_event_loop,
+                        MessageBridgeClientStatus *client_status) {
+    sent_ = false;
+    send_event_loop_ = send_event_loop;
+    if (send_event_loop_) {
+      sender_ = send_event_loop_->MakeRawSender(channel_);
+    } else {
+      sender_ = nullptr;
+    }
+
+    client_status_ = client_status;
+    if (client_status_) {
+      client_index_ = client_status_->FindClientIndex(
+          channel_->source_node()->string_view());
+      client_connection_ = client_status_->GetClientConnection(client_index_);
+    } else {
+      client_index_ = -1;
+      client_connection_ = nullptr;
+    }
+
+    if (send_event_loop_) {
+      timer_ = send_event_loop_->AddTimer([this]() { Send(); });
+      if (fetcher_) {
+        std::string timer_name =
+            absl::StrCat(send_event_loop_->node()->name()->string_view(), " ",
+                         fetcher_->channel()->name()->string_view(), " ",
+                         fetcher_->channel()->type()->string_view());
+        timer_->set_name(timer_name);
+        if (timestamp_timer_) {
+          timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
+        }
+      }
+    } else {
+      timer_ = nullptr;
+    }
+  }
+
+  const Channel *channel() const { return channel_; }
 
   uint32_t time_to_live() {
-    return configuration::ConnectionToNode(sender_->channel(),
-                                           send_node_factory_->node())
+    return configuration::ConnectionToNode(channel_, send_node_factory_->node())
         ->time_to_live();
   }
 
+  void ScheduleReliable() {
+    if (forwarding_disabled()) return;
+
+    if (!fetcher_) {
+      return;
+    }
+    if (fetcher_->context().data == nullptr || sent_) {
+      sent_ = !fetcher_->Fetch();
+    }
+
+    FetchNext();
+    if (fetcher_->context().data == nullptr || sent_) {
+      return;
+    }
+    CHECK(!timer_scheduled_);
+
+    // Send at startup.  It is the best we can do.
+    const monotonic_clock::time_point monotonic_delivered_time =
+        send_node_factory_->monotonic_now() +
+        send_node_factory_->network_delay();
+
+    CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
+        << ": Trying to deliver message in the past on channel "
+        << configuration::StrippedChannelToString(fetcher_->channel())
+        << " to node " << send_event_loop_->node()->name()->string_view()
+        << " sent from " << fetcher_->channel()->source_node()->string_view()
+        << " at " << fetch_node_factory_->monotonic_now();
+
+    if (timer_) {
+      server_connection_->mutate_sent_packets(
+          server_connection_->sent_packets() + 1);
+      timer_->Setup(monotonic_delivered_time);
+      timer_scheduled_ = true;
+    } else {
+      server_connection_->mutate_dropped_packets(
+          server_connection_->dropped_packets() + 1);
+      sent_ = true;
+    }
+  }
+
+  bool timer_scheduled_ = false;
+
   // Kicks us to re-fetch and schedule the timer.
   void Schedule() {
+    CHECK(!forwarding_disabled());
+    if (!fetcher_) {
+      return;
+    }
+    if (timer_scheduled_) {
+      return;
+    }
+    FetchNext();
+    if (fetcher_->context().data == nullptr || sent_) {
+      return;
+    }
+
+    // Compute the time to publish this message.
+    const monotonic_clock::time_point monotonic_delivered_time =
+        DeliveredTime(fetcher_->context());
+
+    CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
+        << ": Trying to deliver message in the past on channel "
+        << configuration::StrippedChannelToString(fetcher_->channel())
+        << " to node " << send_event_loop_->node()->name()->string_view()
+        << " sent from " << fetcher_->channel()->source_node()->string_view()
+        << " at " << fetch_node_factory_->monotonic_now();
+
+    if (timer_) {
+      server_connection_->mutate_sent_packets(
+          server_connection_->sent_packets() + 1);
+      timer_->Setup(monotonic_delivered_time);
+      timer_scheduled_ = true;
+    } else {
+      server_connection_->mutate_dropped_packets(
+          server_connection_->dropped_packets() + 1);
+      sent_ = true;
+      Schedule();
+    }
+  }
+
+ private:
+  void FetchNext() {
+    CHECK(server_connection_);
     // Keep pulling messages out of the fetcher until we find one in the future.
     while (true) {
       if (fetcher_->context().data == nullptr || sent_) {
@@ -82,9 +219,10 @@
       }
 
       if (fetcher_->context().monotonic_event_time +
-              send_node_factory_->network_delay() +
-              send_node_factory_->send_delay() >
-          fetch_node_factory_->monotonic_now()) {
+                  send_node_factory_->network_delay() +
+                  send_node_factory_->send_delay() >
+              fetch_node_factory_->monotonic_now() ||
+          time_to_live() == 0) {
         break;
       }
 
@@ -99,34 +237,13 @@
       server_connection_->mutate_dropped_packets(
           server_connection_->dropped_packets() + 1);
     }
-
-    if (fetcher_->context().data == nullptr) {
-      return;
-    }
-
-    if (sent_) {
-      return;
-    }
-
-    // Compute the time to publish this message.
-    const monotonic_clock::time_point monotonic_delivered_time =
-        DeliveredTime(fetcher_->context());
-
-    CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
-        << ": Trying to deliver message in the past on channel "
-        << configuration::StrippedChannelToString(fetcher_->channel())
-        << " to node " << send_event_loop_->node()->name()->string_view()
-        << " sent from " << fetcher_->channel()->source_node()->string_view()
-        << " at " << fetch_node_factory_->monotonic_now();
-
-    server_connection_->mutate_sent_packets(server_connection_->sent_packets() +
-                                            1);
-    timer_->Setup(monotonic_delivered_time);
   }
 
- private:
-  // Acutally sends the message, and reschedules.
+  // Actually sends the message, and reschedules.
   void Send() {
+    timer_scheduled_ = false;
+    CHECK(sender_);
+    CHECK(client_status_);
     if (server_connection_->state() != State::CONNECTED) {
       sent_ = true;
       Schedule();
@@ -238,23 +355,28 @@
     const distributed_clock::time_point distributed_sent_time =
         fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
 
-    return send_node_factory_->FromDistributedClock(
+    const logger::BootTimestamp t = send_node_factory_->FromDistributedClock(
         distributed_sent_time + send_node_factory_->network_delay() +
         send_node_factory_->send_delay());
+    CHECK_EQ(t.boot, send_node_factory_->boot_count());
+    return t.time;
   }
 
+  const Channel *channel_;
+  const Connection *connection_;
+
   // Factories used for time conversion.
   aos::NodeEventLoopFactory *fetch_node_factory_;
   aos::NodeEventLoopFactory *send_node_factory_;
 
   // Event loop which fetching and sending timestamps are scheduled on.
-  aos::EventLoop *fetch_event_loop_;
+  aos::EventLoop *fetch_event_loop_ = nullptr;
   // Event loop which sending is scheduled on.
-  aos::EventLoop *send_event_loop_;
+  aos::EventLoop *send_event_loop_ = nullptr;
   // Timer used to send.
-  aos::TimerHandler *timer_;
+  aos::TimerHandler *timer_ = nullptr;
   // Timer used to send timestamps out.
-  aos::TimerHandler *timestamp_timer_;
+  aos::TimerHandler *timestamp_timer_ = nullptr;
   // Time that the timer is scheduled for.  Used to track if it needs to be
   // rescheduled.
   monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
@@ -264,14 +386,14 @@
   // Sender to send them back out.
   std::unique_ptr<aos::RawSender> sender_;
 
-  MessageBridgeServerStatus *server_status_;
+  MessageBridgeServerStatus *server_status_ = nullptr;
   const size_t destination_node_index_;
   // True if we have sent the message in the fetcher.
   bool sent_ = false;
 
   ServerConnection *server_connection_ = nullptr;
   MessageBridgeClientStatus *client_status_ = nullptr;
-  int client_index_;
+  int client_index_ = -1;
   ClientConnection *client_connection_ = nullptr;
 
   size_t channel_index_;
@@ -287,6 +409,10 @@
   };
 
   std::deque<Timestamp> remote_timestamps_;
+
+  bool delivery_time_is_logged_;
+
+  bool forwarding_disabled_ = false;
 };
 
 SimulatedMessageBridge::SimulatedMessageBridge(
@@ -296,56 +422,43 @@
 
   // Pre-build up event loops for every node.  They are pretty cheap anyways.
   for (const Node *node : simulated_event_loop_factory->nodes()) {
-    auto it = event_loop_map_.emplace(std::make_pair(
-        node,
-        simulated_event_loop_factory->MakeEventLoop("message_bridge", node)));
-
+    NodeEventLoopFactory *node_factory =
+        simulated_event_loop_factory->GetNodeEventLoopFactory(node);
+    auto it = event_loop_map_.emplace(node, node_factory);
     CHECK(it.second);
 
-    it.first->second.event_loop->SkipTimingReport();
-    it.first->second.event_loop->SkipAosLog();
+    node_factory->OnStartup(
+        [this, simulated_event_loop_factory, node_state = &it.first->second]() {
+          node_state->MakeEventLoop();
+          const size_t my_node_index = configuration::GetNodeIndex(
+              simulated_event_loop_factory->configuration(),
+              node_state->event_loop->node());
 
-    for (ServerConnection *connection :
-         it.first->second.server_status.server_connection()) {
-      if (connection == nullptr) continue;
+          size_t node_index = 0;
+          for (ServerConnection *connection :
+               node_state->server_status->server_connection()) {
+            if (connection != nullptr) {
+              node_state->server_status->ResetFilter(node_index);
+            }
+            ++node_index;
+          }
 
-      connection->mutate_state(message_bridge::State::CONNECTED);
-    }
+          for (const ClientConnection *client_connections :
+               *node_state->client_status->mutable_client_statistics()
+                    ->connections()) {
+            const Node *client_node = configuration::GetNode(
+                simulated_event_loop_factory->configuration(),
+                client_connections->node()->name()->string_view());
 
-    for (size_t i = 0;
-         i < it.first->second.client_status.mutable_client_statistics()
-                 ->mutable_connections()
-                 ->size();
-         ++i) {
-      ClientConnection *connection =
-          it.first->second.client_status.mutable_client_statistics()
-              ->mutable_connections()
-              ->GetMutableObject(i);
-      if (connection == nullptr) continue;
+            auto client_event_loop = event_loop_map_.find(client_node);
+            client_event_loop->second.SetBootUUID(
+                my_node_index, node_state->event_loop->boot_uuid());
+          }
+        });
 
-      connection->mutate_state(message_bridge::State::CONNECTED);
-    }
-  }
-
-  for (const Node *node : simulated_event_loop_factory->nodes()) {
-    auto it = event_loop_map_.find(node);
-
-    CHECK(it != event_loop_map_.end());
-
-    size_t node_index = 0;
-    for (ServerConnection *connection :
-         it->second.server_status.server_connection()) {
-      if (connection != nullptr) {
-        const Node *client_node =
-            simulated_event_loop_factory->configuration()->nodes()->Get(
-                node_index);
-        auto client_event_loop = event_loop_map_.find(client_node);
-        it->second.server_status.ResetFilter(node_index);
-        it->second.server_status.SetBootUUID(
-            node_index, client_event_loop->second.event_loop->boot_uuid());
-      }
-      ++node_index;
-    }
+    node_factory->OnShutdown([node_state = &it.first->second]() {
+      node_state->SetEventLoop(nullptr);
+    });
   }
 
   for (const Channel *channel :
@@ -355,10 +468,10 @@
     }
 
     // Find the sending node.
-    const Node *node =
+    const Node *source_node =
         configuration::GetNode(simulated_event_loop_factory->configuration(),
                                channel->source_node()->string_view());
-    auto source_event_loop = event_loop_map_.find(node);
+    auto source_event_loop = event_loop_map_.find(source_node);
     CHECK(source_event_loop != event_loop_map_.end());
 
     std::unique_ptr<DelayersVector> delayers =
@@ -372,72 +485,39 @@
       auto destination_event_loop = event_loop_map_.find(destination_node);
       CHECK(destination_event_loop != event_loop_map_.end());
 
-      ServerConnection *server_connection =
-          source_event_loop->second.server_status.FindServerConnection(
-              connection->name()->string_view());
-
-      int client_index =
-          destination_event_loop->second.client_status.FindClientIndex(
-              channel->source_node()->string_view());
-
       const size_t destination_node_index = configuration::GetNodeIndex(
           simulated_event_loop_factory->configuration(), destination_node);
 
       const bool delivery_time_is_logged =
-          configuration::ConnectionDeliveryTimeIsLoggedOnNode(
-              connection, source_event_loop->second.event_loop->node());
+          configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
+                                                              source_node);
 
-      delayers->emplace_back(std::make_unique<RawMessageDelayer>(
-          simulated_event_loop_factory->GetNodeEventLoopFactory(node),
+      delayers->v.emplace_back(std::make_unique<RawMessageDelayer>(
+          channel, connection,
+          simulated_event_loop_factory->GetNodeEventLoopFactory(source_node),
           simulated_event_loop_factory->GetNodeEventLoopFactory(
               destination_node),
-          source_event_loop->second.event_loop.get(),
-          destination_event_loop->second.event_loop.get(),
-          source_event_loop->second.event_loop->MakeRawFetcher(channel),
-          destination_event_loop->second.event_loop->MakeRawSender(channel),
-          &source_event_loop->second.server_status, destination_node_index,
-          server_connection, client_index,
-          &destination_event_loop->second.client_status,
-          configuration::ChannelIndex(
-              source_event_loop->second.event_loop->configuration(), channel),
-          delivery_time_is_logged
-              ? source_event_loop->second.timestamp_loggers.SenderForChannel(
-                    channel, connection)
-              : nullptr));
+          destination_node_index, delivery_time_is_logged));
+
+      source_event_loop->second.AddSourceDelayer(delayers->v.back().get());
+      destination_event_loop->second.AddDestinationDelayer(
+          delayers->v.back().get());
     }
 
     const Channel *const timestamp_channel = configuration::GetChannel(
         simulated_event_loop_factory->configuration(), "/aos",
-        Timestamp::GetFullyQualifiedName(),
-        source_event_loop->second.event_loop->name(), node);
+        Timestamp::GetFullyQualifiedName(), "message_bridge", source_node);
 
     if (channel == timestamp_channel) {
-      source_event_loop->second.server_status.set_send_data(
+      source_event_loop->second.SetSendData(
           [captured_delayers = delayers.get()](const Context &) {
             for (std::unique_ptr<RawMessageDelayer> &delayer :
-                 *captured_delayers) {
+                 captured_delayers->v) {
               delayer->Schedule();
             }
           });
     } else {
-      // And register every delayer to be poked when a new message shows up.
-
-      source_event_loop->second.event_loop->OnRun([captured_delayers =
-                                                       delayers.get()]() {
-        // Poke all the reliable delayers so they send any queued messages.
-        for (std::unique_ptr<RawMessageDelayer> &delayer : *captured_delayers) {
-          if (delayer->time_to_live() == 0) {
-            delayer->Schedule();
-          }
-        }
-      });
-      source_event_loop->second.event_loop->MakeRawNoArgWatcher(
-          channel, [captured_delayers = delayers.get()](const Context &) {
-            for (std::unique_ptr<RawMessageDelayer> &delayer :
-                 *captured_delayers) {
-              delayer->Schedule();
-            }
-          });
+      source_event_loop->second.AddDelayerWatcher(channel, delayers.get());
     }
     delayers_list_.emplace_back(std::move(delayers));
   }
@@ -446,17 +526,13 @@
 SimulatedMessageBridge::~SimulatedMessageBridge() {}
 
 void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
-  for (std::unique_ptr<std::vector<std::unique_ptr<RawMessageDelayer>>>
-           &delayers : delayers_list_) {
-    if (delayers->size() > 0) {
-      if ((*delayers)[0]->channel() == channel) {
-        for (std::unique_ptr<RawMessageDelayer> &delayer : *delayers) {
-          CHECK(delayer->channel() == channel);
+  for (std::unique_ptr<DelayersVector> &delayers : delayers_list_) {
+    if (delayers->v.size() > 0) {
+      if (delayers->v[0]->channel() == channel) {
+        delayers->disable_forwarding = true;
+        for (std::unique_ptr<RawMessageDelayer> &delayer : delayers->v) {
+          delayer->set_forwarding_disabled(true);
         }
-
-        // If we clear the delayers list, nothing will be scheduled.  Which is a
-        // success!
-        delayers->clear();
       }
     }
   }
@@ -476,45 +552,115 @@
                                       message_bridge::State state) {
   auto source_state = event_loop_map_.find(source);
   CHECK(source_state != event_loop_map_.end());
-
-  ServerConnection *server_connection =
-      source_state->second.server_status.FindServerConnection(destination);
-  if (!server_connection) {
-    return;
-  }
-  server_connection->mutate_state(state);
+  source_state->second.SetServerState(destination, state);
 
   auto destination_state = event_loop_map_.find(destination);
   CHECK(destination_state != event_loop_map_.end());
-  ClientConnection *client_connection =
-      destination_state->second.client_status.GetClientConnection(source);
-  if (!client_connection) {
-    return;
-  }
-  client_connection->mutate_state(state);
+  destination_state->second.SetClientState(source, state);
 }
 
 void SimulatedMessageBridge::DisableStatistics() {
   for (std::pair<const Node *const, State> &state : event_loop_map_) {
-    state.second.server_status.DisableStatistics();
-    state.second.client_status.DisableStatistics();
+    state.second.DisableStatistics();
   }
 }
 
 void SimulatedMessageBridge::SkipTimingReport() {
+  // TODO(austin): I think this can be deleted...
   for (std::pair<const Node *const, State> &state : event_loop_map_) {
     state.second.event_loop->SkipTimingReport();
   }
 }
 
-SimulatedMessageBridge::State::State(
-    std::unique_ptr<aos::EventLoop> &&new_event_loop)
-    : event_loop(std::move(new_event_loop)),
-      timestamp_loggers(event_loop.get()),
-      server_status(event_loop.get()),
-      client_status(event_loop.get()) {
+void SimulatedMessageBridge::State::SetEventLoop(
+    std::unique_ptr<aos::EventLoop> loop) {
+  if (!loop) {
+    timestamp_loggers = ChannelTimestampSender(nullptr);
+    server_status.reset();
+    client_status.reset();
+    for (RawMessageDelayer *source_delayer : source_delayers_) {
+      source_delayer->SetFetchEventLoop(nullptr, nullptr, nullptr);
+    }
+    for (RawMessageDelayer *destination_delayer : destination_delayers_) {
+      destination_delayer->SetSendEventLoop(nullptr, nullptr);
+    }
+    event_loop = std::move(loop);
+    return;
+  } else {
+    CHECK(!event_loop);
+  }
+  event_loop = std::move(loop);
 
-  // Find all nodes which log timestamps back to us (from us).
+  event_loop->SkipTimingReport();
+  event_loop->SkipAosLog();
+
+  for (std::pair<const Channel *, DelayersVector *> &watcher :
+       delayer_watchers_) {
+    // Don't register watchers if we know we aren't forwarding.
+    if (watcher.second->disable_forwarding) continue;
+    event_loop->MakeRawNoArgWatcher(
+        watcher.first, [captured_delayers = watcher.second](const Context &) {
+          // We might get told after registering, so don't forward at that point
+          // too.
+          for (std::unique_ptr<RawMessageDelayer> &delayer :
+               captured_delayers->v) {
+            delayer->Schedule();
+          }
+        });
+  }
+
+  timestamp_loggers = ChannelTimestampSender(event_loop.get());
+  server_status = std::make_unique<MessageBridgeServerStatus>(event_loop.get());
+
+  {
+    size_t node_index = 0;
+    for (ServerConnection *connection : server_status->server_connection()) {
+      if (connection) {
+        if (boot_uuids_[node_index] != UUID::Zero()) {
+          connection->mutate_state(server_state_[node_index]);
+        } else {
+          connection->mutate_state(message_bridge::State::DISCONNECTED);
+        }
+      }
+      ++node_index;
+    }
+  }
+
+  for (size_t i = 0; i < boot_uuids_.size(); ++i) {
+    if (boot_uuids_[i] != UUID::Zero()) {
+      server_status->SetBootUUID(i, boot_uuids_[i]);
+    }
+  }
+  if (disable_statistics_) {
+    server_status->DisableStatistics();
+  }
+  if (fn_) {
+    server_status->set_send_data(fn_);
+  }
+  client_status = std::make_unique<MessageBridgeClientStatus>(event_loop.get());
+  if (disable_statistics_) {
+    client_status->DisableStatistics();
+  }
+
+  for (size_t i = 0;
+       i < client_status->mutable_client_statistics()->connections()->size();
+       ++i) {
+    ClientConnection *client_connection =
+        client_status->mutable_client_statistics()
+            ->mutable_connections()
+            ->GetMutableObject(i);
+    const Node *client_node = configuration::GetNode(
+        node_factory_->configuration(),
+        client_connection->node()->name()->string_view());
+    const size_t client_node_index = configuration::GetNodeIndex(
+        node_factory_->configuration(), client_node);
+    if (boot_uuids_[client_node_index] != UUID::Zero()) {
+      client_connection->mutate_state(client_state_[client_node_index]);
+    } else {
+      client_connection->mutate_state(message_bridge::State::DISCONNECTED);
+    }
+  }
+
   for (const Channel *channel : *event_loop->configuration()->channels()) {
     CHECK(channel->has_source_node());
 
@@ -535,6 +681,22 @@
       }
     }
   }
+
+  for (RawMessageDelayer *source_delayer : source_delayers_) {
+    source_delayer->SetFetchEventLoop(event_loop.get(), server_status.get(),
+                                      &timestamp_loggers);
+  }
+  for (RawMessageDelayer *destination_delayer : destination_delayers_) {
+    destination_delayer->SetSendEventLoop(event_loop.get(),
+                                          client_status.get());
+  }
+  event_loop->OnRun([this]() {
+    for (RawMessageDelayer *destination_delayer : destination_delayers_) {
+      if (destination_delayer->time_to_live() == 0) {
+        destination_delayer->ScheduleReliable();
+      }
+    }
+  });
 }
 
 }  // namespace message_bridge
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index c1ff698..d245fb3 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -42,22 +42,135 @@
   void SkipTimingReport();
 
  private:
-  struct State {
-    State(std::unique_ptr<aos::EventLoop> &&new_event_loop);
+  struct DelayersVector {
+    std::vector<std::unique_ptr<RawMessageDelayer>> v;
 
+    bool disable_forwarding = false;
+  };
+  struct State {
+    State(NodeEventLoopFactory *node_factory) : node_factory_(node_factory) {
+      const size_t num_nodes = node_factory->configuration()->nodes()->size();
+      boot_uuids_.resize(num_nodes, UUID::Zero());
+      client_state_.resize(num_nodes, message_bridge::State::CONNECTED);
+      server_state_.resize(num_nodes, message_bridge::State::CONNECTED);
+    }
     State(const State &state) = delete;
 
+    void DisableStatistics() {
+      disable_statistics_ = true;
+      if (server_status) {
+        server_status->DisableStatistics();
+      }
+      if (client_status) {
+        client_status->DisableStatistics();
+      }
+    }
+
+    void AddSourceDelayer(RawMessageDelayer *delayer) {
+      source_delayers_.emplace_back(delayer);
+    }
+    void AddDestinationDelayer(RawMessageDelayer *delayer) {
+      destination_delayers_.emplace_back(delayer);
+    }
+
+    void MakeEventLoop() {
+      SetEventLoop(node_factory_->MakeEventLoop("message_bridge"));
+    }
+
+    void ClearEventLoop() { SetEventLoop(nullptr); }
+    void SetEventLoop(std::unique_ptr<aos::EventLoop> loop);
+
+    void SetSendData(std::function<void(const Context &)> fn) {
+      CHECK(!fn_);
+      fn_ = std::move(fn);
+      if (server_status) {
+        server_status->set_send_data(fn_);
+      }
+    }
+
+    void AddDelayerWatcher(const Channel *channel, DelayersVector *v) {
+      delayer_watchers_.emplace_back(channel, v);
+    }
+
+    void SetBootUUID(size_t node_index, const UUID &boot_uuid) {
+      boot_uuids_[node_index] = boot_uuid;
+      const Node *node =
+          node_factory_->configuration()->nodes()->Get(node_index);
+      if (server_status) {
+        ServerConnection *connection =
+            server_status->FindServerConnection(node);
+        if (connection) {
+          connection->mutate_state(server_state_[node_index]);
+          server_status->ResetFilter(node_index);
+          server_status->SetBootUUID(node_index, boot_uuid);
+        }
+      }
+      if (client_status) {
+        const int client_index =
+            client_status->FindClientIndex(node->name()->string_view());
+        ClientConnection *client_connection =
+            client_status->GetClientConnection(client_index);
+        if (client_connection) {
+          client_status->SampleReset(client_index);
+          client_connection->mutate_state(client_state_[node_index]);
+        }
+      }
+    }
+
+    void SetServerState(const Node *destination, message_bridge::State state) {
+      const size_t node_index = configuration::GetNodeIndex(
+          node_factory_->configuration(), destination);
+      server_state_[node_index] = state;
+      if (server_status) {
+        ServerConnection *connection =
+            server_status->FindServerConnection(destination);
+        if (connection == nullptr) return;
+
+        connection->mutate_state(state);
+      }
+    }
+
+    void SetClientState(const Node *source, message_bridge::State state) {
+      const size_t node_index =
+          configuration::GetNodeIndex(node_factory_->configuration(), source);
+      client_state_[node_index] = state;
+      if (client_status) {
+        ClientConnection *connection =
+            client_status->GetClientConnection(source);
+
+        if (connection == nullptr) return;
+
+        connection->mutate_state(state);
+      }
+    }
+
+    std::vector<UUID> boot_uuids_;
+    std::vector<message_bridge::State> client_state_;
+    std::vector<message_bridge::State> server_state_;
+
+    std::vector<std::pair<const Channel *, DelayersVector *>> delayer_watchers_;
+
+    std::function<void(const Context &)> fn_;
+
+    NodeEventLoopFactory *node_factory_;
     std::unique_ptr<aos::EventLoop> event_loop;
     ChannelTimestampSender timestamp_loggers;
-    MessageBridgeServerStatus server_status;
-    MessageBridgeClientStatus client_status;
+    std::unique_ptr<MessageBridgeServerStatus> server_status;
+    std::unique_ptr<MessageBridgeClientStatus> client_status;
+
+    // List of delayers to update whenever this node starts or stops.
+    // Source delayers (which are the ones fetching).
+    std::vector<RawMessageDelayer *> source_delayers_;
+    // Destination delayers (which are the ones sending on the receiving nodes).
+    std::vector<RawMessageDelayer *> destination_delayers_;
+
+    bool disable_statistics_ = false;
   };
   // Map of nodes to event loops.  This is a member variable so that the
   // lifetime of the event loops matches the lifetime of the bridge.
   std::map<const Node *, State> event_loop_map_;
 
   // List of delayers used to resend the messages.
-  using DelayersVector = std::vector<std::unique_ptr<RawMessageDelayer>>;
   std::vector<std::unique_ptr<DelayersVector>> delayers_list_;
 };
 
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 48a71c6..0ed756c 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -520,6 +520,7 @@
         ":timestamp_filter",
         "//aos:configuration",
         "//aos/events:simulated_event_loop",
+        "//aos/events/logging:boot_timestamp",
         "//aos/events/logging:logfile_utils",
         "//aos/time",
         "@org_tuxfamily_eigen//:eigen",
@@ -547,6 +548,7 @@
     deps = [
         ":multinode_timestamp_filter",
         "//aos/events:simulated_event_loop",
+        "//aos/events/logging:boot_timestamp",
         "//aos/time",
     ],
 )
diff --git a/aos/network/message_bridge_client_status.cc b/aos/network/message_bridge_client_status.cc
index 69f474a..5236f82 100644
--- a/aos/network/message_bridge_client_status.cc
+++ b/aos/network/message_bridge_client_status.cc
@@ -127,6 +127,7 @@
 }
 
 int MessageBridgeClientStatus::FindClientIndex(std::string_view node_name) {
+  CHECK(statistics_.message().has_connections());
   for (size_t i = 0; i < statistics_.message().connections()->size(); ++i) {
     const ClientConnection *client_connection =
         statistics_.message().connections()->Get(i);
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index 3b5b997..65691cb 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -347,8 +347,26 @@
     VLOG(1) << "  " << t;
   }
 
-  // TODO(austin): Figure out how to communicate the reboot up to the factory.
   CHECK_EQ(node_count_, std::get<1>(*next_time).size());
+
+  if (times_.empty()) {
+    for (BootTimestamp t : std::get<1>(*next_time)) {
+      CHECK_EQ(t.boot, 0u);
+    }
+  } else {
+    bool rebooted = false;
+    for (size_t i = 0; i < node_count_; ++i) {
+      if (std::get<1>(times_.back())[i].boot !=
+          std::get<1>(*next_time)[i].boot) {
+        rebooted = true;
+        break;
+      }
+    }
+    if (rebooted) {
+      CHECK(reboot_found_);
+      reboot_found_(std::get<0>(*next_time), std::get<1>(*next_time));
+    }
+  }
   times_.emplace_back(std::move(*next_time));
   return &times_.back();
 }
@@ -413,7 +431,7 @@
 }
 
 distributed_clock::time_point InterpolatedTimeConverter::ToDistributedClock(
-    size_t node_index, monotonic_clock::time_point time) {
+    size_t node_index, BootTimestamp time) {
   CHECK_LT(node_index, node_count_);
   // If there is only one node, time estimation makes no sense.  Just return
   // unity time.
@@ -425,19 +443,21 @@
   QueueUntil(
       [time, node_index](const std::tuple<distributed_clock::time_point,
                                           std::vector<BootTimestamp>> &t) {
-        return std::get<1>(t)[node_index].time < time;
+        return std::get<1>(t)[node_index] < time;
       });
 
   // Before the beginning needs to have 0 slope otherwise time jumps when
   // timestamp 2 happens.
-  if (times_.size() == 1u || time < std::get<1>(times_[0])[node_index].time) {
-    if (time < std::get<1>(times_[0])[node_index].time) {
+  if (times_.size() == 1u || time < std::get<1>(times_[0])[node_index]) {
+    if (time < std::get<1>(times_[0])[node_index]) {
       CHECK(!have_popped_)
           << ": Trying to interpolate time " << time
           << " but we have forgotten the relevant points already.";
     }
+    CHECK_EQ(time.boot, std::get<1>(times_[0])[node_index].boot);
     const distributed_clock::time_point result =
-        time - std::get<1>(times_[0])[node_index].time + std::get<0>(times_[0]);
+        time.time - std::get<1>(times_[0])[node_index].time +
+        std::get<0>(times_[0]);
     VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
             << result;
     return result;
@@ -448,7 +468,9 @@
   size_t index = times_.size() - 2u;
   while (index > 0u) {
     // TODO(austin): Binary search.
-    if (std::get<1>(times_[index])[node_index].time <= time) {
+    //LOG(INFO) << std::get<1>(times_[index])[node_index] << " <= " << time
+              //<< "?";
+    if (std::get<1>(times_[index])[node_index] <= time) {
       break;
     }
     --index;
@@ -457,38 +479,54 @@
   // Interpolate with the two of these.
   const distributed_clock::time_point d0 = std::get<0>(times_[index]);
   const distributed_clock::time_point d1 = std::get<0>(times_[index + 1]);
+  const BootTimestamp t0 = std::get<1>(times_[index])[node_index];
+  const BootTimestamp t1 = std::get<1>(times_[index + 1])[node_index];
 
-  // TODO(austin): We should extrapolate if the boot changes.
-  CHECK_EQ(std::get<1>(times_[index])[node_index].boot,
-           std::get<1>(times_[index + 1])[node_index].boot);
-  const monotonic_clock::time_point t0 =
-      std::get<1>(times_[index])[node_index].time;
-  const monotonic_clock::time_point t1 =
-      std::get<1>(times_[index + 1])[node_index].time;
+  if (time > t1) {
+    const distributed_clock::time_point result = (time.time - t1.time) + d1;
+    VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
+            << result;
+    return result;
+  }
+
+  if (t0.boot != t1.boot) {
+    if (t0.boot == time.boot) {
+      const distributed_clock::time_point result = (time.time - t0.time) + d0;
+      VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
+              << result;
+      return result;
+    } else if (t1.boot == time.boot) {
+      const distributed_clock::time_point result = (time.time - t1.time) + d1;
+      VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
+              << result;
+      return result;
+    } else {
+      LOG(FATAL) << t0 << " <= " << time << " <= " << t1;
+    }
+  }
 
   const distributed_clock::time_point result =
-      message_bridge::ToDistributedClock(d0, d1, t0, t1, time);
+      message_bridge::ToDistributedClock(d0, d1, t0.time, t1.time, time.time);
 
   VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
           << result;
   return result;
 }
 
-monotonic_clock::time_point InterpolatedTimeConverter::FromDistributedClock(
-    size_t node_index, distributed_clock::time_point time) {
+BootTimestamp InterpolatedTimeConverter::FromDistributedClock(
+    size_t node_index, distributed_clock::time_point time, size_t boot_count) {
   CHECK_LT(node_index, node_count_);
   // If there is only one node, time estimation makes no sense.  Just return
   // unity time.
   if (node_count_ == 1u) {
-    return monotonic_clock::epoch() + time.time_since_epoch();
+    return BootTimestamp::epoch() + time.time_since_epoch();
   }
 
   // Make sure there are enough timestamps in the queue.
-  QueueUntil(
-      [time](const std::tuple<distributed_clock::time_point,
-                              std::vector<BootTimestamp>> &t) {
-        return std::get<0>(t) < time;
-      });
+  QueueUntil([time](const std::tuple<distributed_clock::time_point,
+                                     std::vector<BootTimestamp>> &t) {
+    return std::get<0>(t) < time;
+  });
 
   if (times_.size() == 1u || time < std::get<0>(times_[0])) {
     if (time < std::get<0>(times_[0])) {
@@ -498,16 +536,21 @@
     }
     monotonic_clock::time_point result =
         time - std::get<0>(times_[0]) + std::get<1>(times_[0])[node_index].time;
-    VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ") -> "
-            << result;
-    return result;
+    VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+            << boot_count << ") -> " << result;
+    return {.boot = std::get<1>(times_[0])[node_index].boot, .time = result};
   }
 
   // Now, find the corresponding timestamps.  Search from the back since that's
   // where most of the times we care about will be.
   size_t index = times_.size() - 2u;
   while (index > 0u) {
-    if (std::get<0>(times_[index]) <= time) {
+    //LOG(INFO) << "Considering " << std::get<0>(times_[index + 1]) << " index "
+              //<< index << " vs " << time;
+    // If we are searching across a reboot, we want both the before and after
+    // time.  We will be asked to solve for the after, so make sure when a time
+    // matches exactly, we pick the time before, not the time after.
+    if (std::get<0>(times_[index]) < time) {
       break;
     }
     --index;
@@ -516,13 +559,39 @@
   // Interpolate with the two of these.
   const distributed_clock::time_point d0 = std::get<0>(times_[index]);
   const distributed_clock::time_point d1 = std::get<0>(times_[index + 1]);
+  const BootTimestamp t0 = std::get<1>(times_[index])[node_index];
+  const BootTimestamp t1 = std::get<1>(times_[index + 1])[node_index];
 
-  CHECK_EQ(std::get<1>(times_[index])[node_index].boot,
-           std::get<1>(times_[index + 1])[node_index].boot);
-  const monotonic_clock::time_point t0 =
-      std::get<1>(times_[index])[node_index].time;
-  const monotonic_clock::time_point t1 =
-      std::get<1>(times_[index + 1])[node_index].time;
+  if (time == d1) {
+    if (boot_count == t1.boot) {
+      const BootTimestamp result = t1 + (time - d1);
+      VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+              << boot_count << ") -> " << result;
+      return result;
+    } else {
+      CHECK_EQ(boot_count, t0.boot);
+      const BootTimestamp result = t0 + (time - d0);
+      VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+              << boot_count << ") -> " << result;
+      return result;
+    }
+  }
+
+  //LOG(INFO) << "d0 " << d0 << " time " << time << " d1 " << d1 << " t0 " << t0
+            //<< " t1 " << t1;
+  if (time > d1) {
+    const BootTimestamp result = t1 + (time - d1);
+    VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+            << boot_count << ") -> " << result;
+    return result;
+  }
+
+  if (t0.boot != t1.boot) {
+    const BootTimestamp result = t0 + (time - d0);
+    VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+            << boot_count << ") -> " << result;
+    return result;
+  }
 
   const chrono::nanoseconds dd = d1 - d0;
 
@@ -538,21 +607,23 @@
   // and subtract when < 0 so we round correctly.  Multiply before dividing so
   // we don't round early, and use 128 bit arithmetic to guarantee that 64 bit
   // multiplication fits.
-  absl::int128 numerator =
-      absl::int128((time - d0).count()) * absl::int128((t1 - t0).count());
+  absl::int128 numerator = absl::int128((time - d0).count()) *
+                           absl::int128((t1.time - t0.time).count());
   numerator += numerator > 0 ? absl::int128(dd.count() / 2)
                              : -absl::int128(dd.count() / 2);
 
   const monotonic_clock::time_point result =
-      t0 + std::chrono::nanoseconds(
-               static_cast<int64_t>(numerator / absl::int128(dd.count())));
-  VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ") -> "
-          << result;
-  return result;
+      t0.time + std::chrono::nanoseconds(
+                    static_cast<int64_t>(numerator / absl::int128(dd.count())));
+  VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ", "
+          << boot_count << ") -> " << result;
+  return {.boot = t0.boot, .time = result};
 }
+
 MultiNodeNoncausalOffsetEstimator::MultiNodeNoncausalOffsetEstimator(
     const Configuration *configuration,
-    const Configuration *logged_configuration, bool skip_order_validation,
+    const Configuration *logged_configuration,
+    std::shared_ptr<const logger::Boots> boots, bool skip_order_validation,
     chrono::nanoseconds time_estimation_buffer_seconds)
     : InterpolatedTimeConverter(!configuration::MultiNode(logged_configuration)
                                     ? 1u
@@ -560,11 +631,26 @@
                                 time_estimation_buffer_seconds),
       configuration_(configuration),
       logged_configuration_(logged_configuration),
+      boots_(boots),
       skip_order_validation_(skip_order_validation) {
+  const bool multi_node = configuration::MultiNode(logged_configuration);
+  if (!boots_ && !multi_node) {
+    // This is a super old log.  Fake out boots by making them up.
+    LOG(WARNING) << "Old single node log without boot UUIDs, generating a "
+                    "random boot UUID.";
+    std::shared_ptr<logger::Boots> boots = std::make_shared<logger::Boots>();
+    const UUID random_boot_uuid = UUID::Random();
+    boots->boot_count_map.emplace(random_boot_uuid.ToString(), 0);
+    boots->boots =
+        std::vector<std::vector<std::string>>{{random_boot_uuid.ToString()}};
+    boots_ = boots;
+  }
+
+  CHECK(boots_) << ": Missing boots for " << NodesCount();
+  CHECK_EQ(boots_->boots.size(), NodesCount());
   filters_per_node_.resize(NodesCount());
   last_monotonics_.resize(NodesCount(), BootTimestamp::epoch());
-  if (FLAGS_timestamps_to_csv &&
-      configuration::MultiNode(logged_configuration)) {
+  if (FLAGS_timestamps_to_csv && multi_node) {
     fp_ = fopen("/tmp/timestamp_noncausal_offsets.csv", "w");
     fprintf(fp_, "# distributed");
     for (const Node *node : configuration::GetNodes(logged_configuration)) {
@@ -637,12 +723,23 @@
   if (!node_samples_.empty()) {
     for (NodeSamples &node : node_samples_) {
       for (SingleNodeSamples &timestamps : node.nodes) {
-        CHECK (timestamps.messages.empty());
+        CHECK(timestamps.messages.empty());
       }
     }
   }
 }
 
+UUID MultiNodeNoncausalOffsetEstimator::boot_uuid(size_t node_index,
+                                                  size_t boot_count) {
+  CHECK(boots_);
+  CHECK_LT(node_index, boots_->boots.size());
+  if (boot_count < boots_->boots[node_index].size()) {
+    return UUID::FromString(boots_->boots[node_index][boot_count]);
+  } else {
+    return UUID::Random();
+  }
+}
+
 void MultiNodeNoncausalOffsetEstimator::Start(
     SimulatedEventLoopFactory *factory) {
   std::vector<monotonic_clock::time_point> times;
@@ -1290,7 +1387,12 @@
                       << "ns";
             }
             VLOG(1) << "Ignoring because it is close enough.";
-            next_node_filter->Consume();
+            std::optional<
+                std::tuple<logger::BootTimestamp, logger::BootDuration>>
+                result = next_node_filter->Consume();
+            CHECK(result);
+            next_node_filter->Pop(std::get<0>(*result) -
+                                  time_estimation_buffer_seconds_);
             break;
           }
           // Somehow the new solution is better *and* worse than the old
@@ -1307,7 +1409,12 @@
           }
 
           if (skip_order_validation_) {
-            next_node_filter->Consume();
+            std::optional<
+                std::tuple<logger::BootTimestamp, logger::BootDuration>>
+                result = next_node_filter->Consume();
+            CHECK(result);
+            next_node_filter->Pop(std::get<0>(*result) -
+                                  time_estimation_buffer_seconds_);
             LOG(ERROR) << "Skipping because --skip_order_validation";
             break;
           } else {
@@ -1396,8 +1503,10 @@
       }
     }
     sample = *next_filter->Consume();
+    next_filter->Pop(std::get<0>(sample) - time_estimation_buffer_seconds_);
   } else {
     sample = *next_filter->Consume();
+    next_filter->Pop(std::get<0>(sample) - time_estimation_buffer_seconds_);
     // We found a good sample, so consume it.  If it is a duplicate, we still
     // want to consume it.  But, if this is the first time around, we want to
     // re-solve by recursing (once) to pickup the better base.
diff --git a/aos/network/multinode_timestamp_filter.h b/aos/network/multinode_timestamp_filter.h
index a52db7d..8d776b8 100644
--- a/aos/network/multinode_timestamp_filter.h
+++ b/aos/network/multinode_timestamp_filter.h
@@ -8,6 +8,7 @@
 #include "Eigen/Dense"
 #include "absl/container/btree_set.h"
 #include "aos/configuration.h"
+#include "aos/events/logging/boot_timestamp.h"
 #include "aos/events/logging/logfile_utils.h"
 #include "aos/events/simulated_event_loop.h"
 #include "aos/network/timestamp_filter.h"
@@ -166,16 +167,17 @@
 
   // Converts a time to the distributed clock for scheduling and cross-node
   // time measurement.
-  // TODO(austin): Need to pass in boot.
   distributed_clock::time_point ToDistributedClock(
-      size_t node_index, monotonic_clock::time_point time) override;
+      size_t node_index, logger::BootTimestamp time) override;
 
   // Takes the distributed time and converts it to the monotonic clock for this
   // node.
-  monotonic_clock::time_point FromDistributedClock(
-      size_t node_index, distributed_clock::time_point time) override;
+  logger::BootTimestamp FromDistributedClock(size_t node_index,
+                                             distributed_clock::time_point time,
+                                             size_t boot_count) override;
 
   // Called whenever time passes this point and we can forget about it.
+  // TODO(austin): Pop here instead of in log reader.
   void ObserveTimePassed(distributed_clock::time_point time) override;
 
   // Queues 1 more timestammp in the interpolation list.  This is public for
@@ -283,7 +285,8 @@
  public:
   MultiNodeNoncausalOffsetEstimator(
       const Configuration *configuration,
-      const Configuration *logged_configuration, bool skip_order_validation,
+      const Configuration *logged_configuration,
+      std::shared_ptr<const logger::Boots> boots, bool skip_order_validation,
       std::chrono::nanoseconds time_estimation_buffer_seconds);
 
   ~MultiNodeNoncausalOffsetEstimator() override;
@@ -297,6 +300,8 @@
                            std::vector<logger::BootTimestamp>>>
   NextTimestamp() override;
 
+  UUID boot_uuid(size_t node_index, size_t boot_count) override;
+
   // Checks that all the nodes in the graph are connected.  Needs all filters to
   // be constructed first.
   void CheckGraph();
@@ -337,6 +342,8 @@
   const Configuration *configuration_;
   const Configuration *logged_configuration_;
 
+  std::shared_ptr<const logger::Boots> boots_;
+
   // If true, skip any validation which would trigger if we see evidance that
   // time estimation between nodes was incorrect.
   const bool skip_order_validation_;
diff --git a/aos/network/multinode_timestamp_filter_test.cc b/aos/network/multinode_timestamp_filter_test.cc
index f67d8de..c1fb456 100644
--- a/aos/network/multinode_timestamp_filter_test.cc
+++ b/aos/network/multinode_timestamp_filter_test.cc
@@ -82,20 +82,19 @@
 // results.  1 second should be 1 second everywhere.
 TEST(InterpolatedTimeConverterTest, OneTime) {
   const distributed_clock::time_point de = distributed_clock::epoch();
-  const monotonic_clock::time_point me = monotonic_clock::epoch();
+  const BootTimestamp me = BootTimestamp::epoch();
 
   TestingTimeConverter time_converter(3u);
   time_converter.AddNextTimestamp(
       de + chrono::seconds(0),
-      {{.boot = 0, .time = me + chrono::seconds(1)},
-       {.boot = 0, .time = me + chrono::seconds(10)},
-       {.boot = 0, .time = me + chrono::seconds(1000)}});
+      {me + chrono::seconds(1), me + chrono::seconds(10),
+       me + chrono::seconds(1000)});
 
-  EXPECT_EQ(time_converter.FromDistributedClock(0, de - chrono::seconds(1)),
+  EXPECT_EQ(time_converter.FromDistributedClock(0, de - chrono::seconds(1), 0),
             me + chrono::seconds(0));
-  EXPECT_EQ(time_converter.FromDistributedClock(1, de - chrono::seconds(1)),
+  EXPECT_EQ(time_converter.FromDistributedClock(1, de - chrono::seconds(1), 0),
             me + chrono::seconds(9));
-  EXPECT_EQ(time_converter.FromDistributedClock(2, de - chrono::seconds(1)),
+  EXPECT_EQ(time_converter.FromDistributedClock(2, de - chrono::seconds(1), 0),
             me + chrono::seconds(999));
   EXPECT_EQ(time_converter.ToDistributedClock(0, me + chrono::seconds(0)),
             de - chrono::seconds(1));
@@ -104,11 +103,11 @@
   EXPECT_EQ(time_converter.ToDistributedClock(2, me + chrono::seconds(999)),
             de - chrono::seconds(1));
 
-  EXPECT_EQ(time_converter.FromDistributedClock(0, de),
+  EXPECT_EQ(time_converter.FromDistributedClock(0, de, 0),
             me + chrono::seconds(1));
-  EXPECT_EQ(time_converter.FromDistributedClock(1, de),
+  EXPECT_EQ(time_converter.FromDistributedClock(1, de, 0),
             me + chrono::seconds(10));
-  EXPECT_EQ(time_converter.FromDistributedClock(2, de),
+  EXPECT_EQ(time_converter.FromDistributedClock(2, de, 0),
             me + chrono::seconds(1000));
   EXPECT_EQ(time_converter.ToDistributedClock(0, me + chrono::seconds(1)), de);
   EXPECT_EQ(time_converter.ToDistributedClock(1, me + chrono::seconds(10)), de);
@@ -119,29 +118,27 @@
 // Tests that actual interpolation works as expected for multiple timestamps.
 TEST(InterpolatedTimeConverterTest, Interpolation) {
   const distributed_clock::time_point de = distributed_clock::epoch();
-  const monotonic_clock::time_point me = monotonic_clock::epoch();
+  const BootTimestamp me = BootTimestamp::epoch();
 
   TestingTimeConverter time_converter(3u);
   // Test that 2 timestamps interpolate correctly.
   time_converter.AddNextTimestamp(
       de + chrono::seconds(0),
-      {{.boot = 0, .time = me + chrono::seconds(1)},
-       {.boot = 0, .time = me + chrono::seconds(10)},
-       {.boot = 0, .time = me + chrono::seconds(1000)}});
+      {me + chrono::seconds(1), me + chrono::seconds(10),
+       me + chrono::seconds(1000)});
   time_converter.AddNextTimestamp(
       de + chrono::seconds(1),
-      {{.boot = 0, .time = me + chrono::seconds(2)},
-       {.boot = 0, .time = me + chrono::seconds(11)},
-       {.boot = 0, .time = me + chrono::seconds(1001)}});
+      {me + chrono::seconds(2), me + chrono::seconds(11),
+       me + chrono::seconds(1001)});
 
   EXPECT_EQ(
-      time_converter.FromDistributedClock(0, de + chrono::milliseconds(500)),
+      time_converter.FromDistributedClock(0, de + chrono::milliseconds(500), 0),
       me + chrono::milliseconds(1500));
   EXPECT_EQ(
-      time_converter.FromDistributedClock(1, de + chrono::milliseconds(500)),
+      time_converter.FromDistributedClock(1, de + chrono::milliseconds(500), 0),
       me + chrono::milliseconds(10500));
   EXPECT_EQ(
-      time_converter.FromDistributedClock(2, de + chrono::milliseconds(500)),
+      time_converter.FromDistributedClock(2, de + chrono::milliseconds(500), 0),
       me + chrono::milliseconds(1000500));
   EXPECT_EQ(
       time_converter.ToDistributedClock(0, me + chrono::milliseconds(1500)),
@@ -156,26 +153,25 @@
   // And that we can interpolate between points not at the start.
   time_converter.AddNextTimestamp(
       de + chrono::seconds(2),
-      {{.boot = 0, .time = me + chrono::seconds(3) - chrono::milliseconds(2)},
-       {.boot = 0, .time = me + chrono::seconds(12) - chrono::milliseconds(2)},
-       {.boot = 0, .time = me + chrono::seconds(1002)}});
+      {me + chrono::seconds(3) - chrono::milliseconds(2),
+       me + chrono::seconds(12) - chrono::milliseconds(2),
+       me + chrono::seconds(1002)});
 
   time_converter.AddNextTimestamp(
       de + chrono::seconds(3),
-      {{.boot = 0, .time = me + chrono::seconds(4) - chrono::milliseconds(4)},
-       {.boot = 0, .time = me + chrono::seconds(13) - chrono::milliseconds(2)},
-       {.boot = 0,
-        .time = me + chrono::seconds(1003) - chrono::milliseconds(2)}});
+      {me + chrono::seconds(4) - chrono::milliseconds(4),
+       me + chrono::seconds(13) - chrono::milliseconds(2),
+       me + chrono::seconds(1003) - chrono::milliseconds(2)});
 
-  EXPECT_EQ(
-      time_converter.FromDistributedClock(0, de + chrono::milliseconds(2500)),
-      me + chrono::milliseconds(3497));
-  EXPECT_EQ(
-      time_converter.FromDistributedClock(1, de + chrono::milliseconds(2500)),
-      me + chrono::milliseconds(12498));
-  EXPECT_EQ(
-      time_converter.FromDistributedClock(2, de + chrono::milliseconds(2500)),
-      me + chrono::milliseconds(1002499));
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                0, de + chrono::milliseconds(2500), 0),
+            me + chrono::milliseconds(3497));
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                1, de + chrono::milliseconds(2500), 0),
+            me + chrono::milliseconds(12498));
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                2, de + chrono::milliseconds(2500), 0),
+            me + chrono::milliseconds(1002499));
   EXPECT_EQ(
       time_converter.ToDistributedClock(0, me + chrono::milliseconds(3497)),
       de + chrono::milliseconds(2500));
@@ -187,11 +183,87 @@
       de + chrono::milliseconds(2500));
 }
 
+// Tests that interpolation works across reboots.
+TEST(InterpolatedTimeConverterTest, RebootInterpolation) {
+  const distributed_clock::time_point de = distributed_clock::epoch();
+  const BootTimestamp me = BootTimestamp::epoch();
+  const BootTimestamp me2{.boot = 1u, .time = monotonic_clock::epoch()};
+
+  //LOG(FATAL) << "TODO(austin): Test ToDistributedClock too";
+
+  TestingTimeConverter time_converter(3u);
+  size_t reboot_counter = 0;
+  time_converter.set_reboot_found(
+      [&](distributed_clock::time_point,
+          const std::vector<logger::BootTimestamp> &) { ++reboot_counter; });
+  // Test that 2 timestamps interpolate correctly.
+  time_converter.AddNextTimestamp(
+      de + chrono::seconds(0),
+      {me + chrono::seconds(1), me + chrono::seconds(10),
+       me + chrono::seconds(1000)});
+  time_converter.AddNextTimestamp(
+      de + chrono::seconds(1),
+      {me + chrono::seconds(2), me + chrono::seconds(11),
+       me + chrono::seconds(1001)});
+  time_converter.AddNextTimestamp(
+      de + chrono::seconds(2),
+      {me + chrono::seconds(3), me + chrono::seconds(12),
+       me + chrono::seconds(1002)});
+
+  time_converter.AddNextTimestamp(
+      de + chrono::seconds(3),
+      {me2 + chrono::seconds(4), me + chrono::seconds(13),
+       me + chrono::seconds(1003)});
+
+  time_converter.AddNextTimestamp(
+      de + chrono::seconds(4),
+      {me2 + chrono::seconds(5), me + chrono::seconds(14),
+       me + chrono::seconds(1004)});
+
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                0, de + chrono::milliseconds(2400), 0),
+            me + chrono::milliseconds(3400));
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                1, de + chrono::milliseconds(2400), 0),
+            me + chrono::milliseconds(12400));
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                2, de + chrono::milliseconds(2400), 0),
+            me + chrono::milliseconds(1002400));
+
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                0, de + chrono::milliseconds(2900), 0),
+            me + chrono::milliseconds(3900));
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                1, de + chrono::milliseconds(2900), 0),
+            me + chrono::milliseconds(12900));
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                2, de + chrono::milliseconds(2900), 0),
+            me + chrono::milliseconds(1002900));
+
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                0, de + chrono::milliseconds(3000), 0),
+            me + chrono::seconds(4));
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                0, de + chrono::milliseconds(3000), 1),
+            me2 + chrono::seconds(4));
+
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                0, de + chrono::milliseconds(3900), 1),
+            me2 + chrono::milliseconds(4900));
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                1, de + chrono::milliseconds(3900), 0),
+            me + chrono::milliseconds(13900));
+  EXPECT_EQ(time_converter.FromDistributedClock(
+                2, de + chrono::milliseconds(3900), 0),
+            me + chrono::milliseconds(1003900));
+  EXPECT_EQ(reboot_counter, 1u);
+}
+
 // Tests that reading times before the start of our interpolation points
 // explodes.
 TEST(InterpolatedTimeConverterDeathTest, ReadLostTime) {
   const distributed_clock::time_point de = distributed_clock::epoch();
-  const monotonic_clock::time_point me = monotonic_clock::epoch();
+  const BootTimestamp me = BootTimestamp::epoch();
 
   constexpr auto kDefaultHistoryDuration =
       InterpolatedTimeConverter::kDefaultHistoryDuration;
@@ -211,22 +283,22 @@
   EXPECT_EQ(
       de + kDefaultHistoryDuration / 2,
       time_converter.ToDistributedClock(0, me + kDefaultHistoryDuration / 2));
-  EXPECT_EQ(
-      me + kDefaultHistoryDuration / 2,
-      time_converter.FromDistributedClock(0, de + kDefaultHistoryDuration / 2));
+  EXPECT_EQ(me + kDefaultHistoryDuration / 2,
+            time_converter.FromDistributedClock(
+                0, de + kDefaultHistoryDuration / 2, 0));
 
   // Double check we can read things from before the start
   EXPECT_EQ(de - kDt, time_converter.ToDistributedClock(0, me - kDt));
-  EXPECT_EQ(me - kDt, time_converter.FromDistributedClock(0, de - kDt));
+  EXPECT_EQ(me - kDt, time_converter.FromDistributedClock(0, de - kDt, 0));
 
   // And at and after the origin.
   EXPECT_EQ(de, time_converter.ToDistributedClock(0, me));
-  EXPECT_EQ(me, time_converter.FromDistributedClock(0, de));
+  EXPECT_EQ(me, time_converter.FromDistributedClock(0, de, 0));
 
   EXPECT_EQ(de + chrono::milliseconds(10),
             time_converter.ToDistributedClock(0, me + kDt));
   EXPECT_EQ(me + chrono::milliseconds(10),
-            time_converter.FromDistributedClock(0, de + kDt));
+            time_converter.FromDistributedClock(0, de + kDt, 0));
 
   // Now force ourselves to forget.
   time_converter.ObserveTimePassed(de + kDefaultHistoryDuration + kDt * 3 / 2);
@@ -234,26 +306,27 @@
   // Yup, can't read the origin anymore.
   EXPECT_DEATH({ LOG(INFO) << time_converter.ToDistributedClock(0, me); },
                "forgotten");
-  EXPECT_DEATH({ LOG(INFO) << time_converter.FromDistributedClock(0, de); },
+  EXPECT_DEATH({ LOG(INFO) << time_converter.FromDistributedClock(0, de, 0); },
                "forgotten");
 
   // But can still read the next point.
   EXPECT_EQ(de + kDt, time_converter.ToDistributedClock(0, me + kDt));
-  EXPECT_EQ(me + kDt, time_converter.FromDistributedClock(0, de + kDt));
+  EXPECT_EQ(me + kDt, time_converter.FromDistributedClock(0, de + kDt, 0));
 }
 
 // Tests unity time with 1 node.
 TEST(InterpolatedTimeConverterTest, SingleNodeTime) {
   const distributed_clock::time_point de = distributed_clock::epoch();
-  const monotonic_clock::time_point me = monotonic_clock::epoch();
+  const BootTimestamp me = BootTimestamp::epoch();
 
   TestingTimeConverter time_converter(1u);
-  time_converter.AddNextTimestamp(
-      de + chrono::seconds(0), {{.boot = 0, .time = me + chrono::seconds(1)}});
+  time_converter.AddNextTimestamp(de + chrono::seconds(0),
+                                  {me + chrono::seconds(1)});
 
-  EXPECT_EQ(time_converter.FromDistributedClock(0, de), me);
-  EXPECT_EQ(time_converter.FromDistributedClock(0, de + chrono::seconds(100)),
-            me + chrono::seconds(100));
+  EXPECT_EQ(time_converter.FromDistributedClock(0, de, 0), me);
+  EXPECT_EQ(
+      time_converter.FromDistributedClock(0, de + chrono::seconds(100), 0),
+      me + chrono::seconds(100));
 
   EXPECT_TRUE(time_converter.NextTimestamp());
 }
diff --git a/aos/network/testing_time_converter.cc b/aos/network/testing_time_converter.cc
index 9558033..c175fe6 100644
--- a/aos/network/testing_time_converter.cc
+++ b/aos/network/testing_time_converter.cc
@@ -71,6 +71,22 @@
   return dt;
 }
 
+void TestingTimeConverter::RebootAt(size_t node_index,
+                                    distributed_clock::time_point t) {
+  CHECK(!first_);
+  const chrono::nanoseconds dt = t - last_distributed_;
+
+  for (size_t i = 0; i < last_monotonic_.size(); ++i) {
+    last_monotonic_[i].time += dt;
+  }
+
+  ++last_monotonic_[node_index].boot;
+  last_monotonic_[node_index].time = monotonic_clock::epoch();
+
+  last_distributed_ = t;
+  ts_.emplace_back(std::make_tuple(last_distributed_, last_monotonic_));
+}
+
 void TestingTimeConverter::AddNextTimestamp(
     distributed_clock::time_point time,
     std::vector<logger::BootTimestamp> times) {
diff --git a/aos/network/testing_time_converter.h b/aos/network/testing_time_converter.h
index 5ffdc01..20b8e16 100644
--- a/aos/network/testing_time_converter.h
+++ b/aos/network/testing_time_converter.h
@@ -6,6 +6,7 @@
 #include <tuple>
 
 #include "aos/events/event_scheduler.h"
+#include "aos/events/logging/boot_timestamp.h"
 #include "aos/network/multinode_timestamp_filter.h"
 #include "aos/time/time.h"
 
@@ -34,6 +35,8 @@
   std::chrono::nanoseconds AddMonotonic(
       std::vector<logger::BootTimestamp> times);
 
+  void RebootAt(size_t node_index, distributed_clock::time_point t);
+
   // Adds a distributed to monotonic clock mapping to the queue.
   void AddNextTimestamp(distributed_clock::time_point time,
                         std::vector<logger::BootTimestamp> times);
@@ -42,6 +45,23 @@
                            std::vector<logger::BootTimestamp>>>
   NextTimestamp() override;
 
+  void set_boot_uuid(size_t node_index, size_t boot_count, UUID uuid) {
+    CHECK(boot_uuids_
+              .emplace(std::make_pair(node_index, boot_count), std ::move(uuid))
+              .second)
+        << ": Duplicate boot";
+  }
+
+  UUID boot_uuid(size_t node_index, size_t boot_count) override {
+    auto it = boot_uuids_.find(std::make_pair(node_index, boot_count));
+    if (it != boot_uuids_.end()) return it->second;
+
+    auto new_it = boot_uuids_.emplace(std::make_pair(node_index, boot_count),
+                                      UUID::Random());
+    CHECK(new_it.second);
+    return new_it.first->second;
+  }
+
  private:
   // List of timestamps.
   std::deque<std::tuple<distributed_clock::time_point,
@@ -53,6 +73,8 @@
   // The last times returned on all clocks.
   distributed_clock::time_point last_distributed_ = distributed_clock::epoch();
   std::vector<logger::BootTimestamp> last_monotonic_;
+
+  std::map<std::pair<size_t, size_t>, UUID> boot_uuids_;
 };
 
 }  // namespace message_bridge
diff --git a/aos/network/timestamp_channel.cc b/aos/network/timestamp_channel.cc
index bbd62c4..88b6ed0 100644
--- a/aos/network/timestamp_channel.cc
+++ b/aos/network/timestamp_channel.cc
@@ -65,11 +65,15 @@
 
 ChannelTimestampSender::ChannelTimestampSender(aos::EventLoop *event_loop)
     : event_loop_(event_loop) {
-  CHECK(configuration::MultiNode(event_loop_->configuration()));
+  if (event_loop_) {
+    CHECK(configuration::MultiNode(event_loop_->configuration()));
+  }
 }
 
 aos::Sender<RemoteMessage> *ChannelTimestampSender::SenderForChannel(
     const Channel *channel, const Connection *connection) {
+  CHECK(event_loop_);
+
   ChannelTimestampFinder finder(event_loop_);
   // Look at any pre-created channel/connection pairs.
   {
diff --git a/aos/network/timestamp_channel.h b/aos/network/timestamp_channel.h
index 738ca10..b78c83a 100644
--- a/aos/network/timestamp_channel.h
+++ b/aos/network/timestamp_channel.h
@@ -43,6 +43,11 @@
 class ChannelTimestampSender {
  public:
   ChannelTimestampSender(aos::EventLoop *event_loop);
+  ChannelTimestampSender() : event_loop_(nullptr) {}
+
+  ChannelTimestampSender(ChannelTimestampSender &&other) noexcept = default;
+  ChannelTimestampSender &operator=(ChannelTimestampSender &&other) noexcept =
+      default;
 
   aos::Sender<RemoteMessage> *SenderForChannel(const Channel *channel,
                                                const Connection *connection);
diff --git a/aos/network/timestamp_filter.cc b/aos/network/timestamp_filter.cc
index 59af3a0..c86dc8e 100644
--- a/aos/network/timestamp_filter.cc
+++ b/aos/network/timestamp_filter.cc
@@ -14,6 +14,8 @@
 namespace message_bridge {
 namespace {
 namespace chrono = std::chrono;
+using logger::BootDuration;
+using logger::BootTimestamp;
 
 std::string TimeString(const aos::monotonic_clock::time_point t,
                        std::chrono::nanoseconds o) {
@@ -55,7 +57,7 @@
   CHECK_GE(*ta, 0.0);
   CHECK_LT(*ta, 1.0);
 }
-void NormalizeTimestamps(logger::BootTimestamp *ta_base, double *ta) {
+void NormalizeTimestamps(BootTimestamp *ta_base, double *ta) {
   NormalizeTimestamps(&ta_base->time, ta);
 }
 
@@ -473,10 +475,10 @@
   return std::make_tuple(std::get<0>(t), std::get<1>(t));
 }
 
-std::pair<std::tuple<logger::BootTimestamp, logger::BootDuration>,
-          std::tuple<logger::BootTimestamp, logger::BootDuration>>
-NoncausalTimestampFilter::FindTimestamps(logger::BootTimestamp ta_base,
-                                         double ta, size_t sample_boot) const {
+std::pair<std::tuple<BootTimestamp, BootDuration>,
+          std::tuple<BootTimestamp, BootDuration>>
+NoncausalTimestampFilter::FindTimestamps(BootTimestamp ta_base, double ta,
+                                         size_t sample_boot) const {
   CHECK_GE(ta, 0.0);
   CHECK_LT(ta, 1.0);
 
@@ -731,9 +733,11 @@
          ((tb - ta) - offset.second);
 }
 
-std::string NoncausalTimestampFilter::DebugOffsetError(
-    logger::BootTimestamp ta_base, double ta, logger::BootTimestamp tb_base,
-    double tb, size_t node_a, size_t node_b) const {
+std::string NoncausalTimestampFilter::DebugOffsetError(BootTimestamp ta_base,
+                                                       double ta,
+                                                       BootTimestamp tb_base,
+                                                       double tb, size_t node_a,
+                                                       size_t node_b) const {
   NormalizeTimestamps(&ta_base, &ta);
   NormalizeTimestamps(&tb_base, &tb);
 
@@ -830,8 +834,8 @@
   return true;
 }
 
-void NoncausalTimestampFilter::Sample(logger::BootTimestamp monotonic_now_all,
-                                      logger::BootDuration sample_ns) {
+void NoncausalTimestampFilter::Sample(BootTimestamp monotonic_now_all,
+                                      BootDuration sample_ns) {
   filter(monotonic_now_all.boot, sample_ns.boot)
       ->Sample(monotonic_now_all.time, sample_ns.duration);
 }
@@ -1114,20 +1118,47 @@
   }
 }
 
-bool NoncausalTimestampFilter::Pop(logger::BootTimestamp time) {
-  // TODO(austin): Auto compute the second boot.
-  CHECK_LE(filters_.size(), 1u);
-  SingleFilter *f = filter(time.boot, 0);
+bool NoncausalTimestampFilter::Pop(BootTimestamp time) {
+  CHECK_GE(filters_.size(), 1u);
+
   VLOG(1) << NodeNames() << " Pop(" << time << ")";
   bool removed = false;
-  // When the timestamp which is the end of the line is popped, we want to
-  // drop it off the list.  Hence the >=
-  while (f->timestamps_size() >= 2 &&
-         time.time >= std::get<0>(f->timestamp(1))) {
-    f->PopFront();
-    removed = true;
+  while (true) {
+    DCHECK_LT(pop_filter_, filters_.size());
+    BootFilter *boot_filter = &filters_[pop_filter_];
+    CHECK(boot_filter != nullptr);
+    size_t timestamps_size = 0;
+    while ((timestamps_size = boot_filter->filter.timestamps_size()) > 2) {
+      // When the timestamp which is the end of the line is popped, we want to
+      // drop it off the list.  Hence the <
+      if (time < BootTimestamp{
+                     .boot = static_cast<size_t>(boot_filter->boot.first),
+                     .time = std::get<0>(boot_filter->filter.timestamp(1))}) {
+        return removed;
+      }
+      boot_filter->filter.PopFront();
+      removed = true;
+    }
+
+    if (timestamps_size == 2) {
+      if (pop_filter_ + 1u >= filters_.size()) {
+        return removed;
+      }
+
+      // There is 1 more filter, see if there is enough data in it to switch
+      // over to it.
+      if (filters_[pop_filter_ + 1].filter.timestamps_size() < 2u) {
+        return removed;
+      }
+      if (time <
+          BootTimestamp{.boot = static_cast<size_t>(boot_filter->boot.first),
+                        .time = std::get<0>(
+                            filters_[pop_filter_ + 1].filter.timestamp(1))}) {
+        return removed;
+      }
+    }
+    ++pop_filter_;
   }
-  return removed;
 }
 
 void NoncausalTimestampFilter::SingleFilter::Debug() const {
@@ -1247,9 +1278,9 @@
   }
 }
 
-void NoncausalOffsetEstimator::Sample(
-    const Node *node, logger::BootTimestamp node_delivered_time,
-    logger::BootTimestamp other_node_sent_time) {
+void NoncausalOffsetEstimator::Sample(const Node *node,
+                                      BootTimestamp node_delivered_time,
+                                      BootTimestamp other_node_sent_time) {
   VLOG(1) << "Sample delivered         " << node_delivered_time << " sent "
           << other_node_sent_time << " " << node->name()->string_view()
           << " -> "
@@ -1268,8 +1299,8 @@
 }
 
 void NoncausalOffsetEstimator::ReverseSample(
-    const Node *node, logger::BootTimestamp node_sent_time,
-    logger::BootTimestamp other_node_delivered_time) {
+    const Node *node, BootTimestamp node_sent_time,
+    BootTimestamp other_node_delivered_time) {
   VLOG(1) << "Reverse sample delivered " << other_node_delivered_time
           << " sent " << node_sent_time << " "
           << ((node == node_a_) ? node_b_ : node_a_)->name()->string_view()
@@ -1287,27 +1318,5 @@
   }
 }
 
-bool NoncausalOffsetEstimator::Pop(const Node *node,
-                                   logger::BootTimestamp node_monotonic_now) {
-  if (node == node_a_) {
-    if (a_.Pop(node_monotonic_now)) {
-      VLOG(1) << "Popping forward sample to " << node_a_->name()->string_view()
-              << " from " << node_b_->name()->string_view() << " at "
-              << node_monotonic_now;
-      return true;
-    }
-  } else if (node == node_b_) {
-    if (b_.Pop(node_monotonic_now)) {
-      VLOG(1) << "Popping reverse sample to " << node_b_->name()->string_view()
-              << " from " << node_a_->name()->string_view() << " at "
-              << node_monotonic_now;
-      return true;
-    }
-  } else {
-    LOG(FATAL) << "Unknown node " << node->name()->string_view();
-  }
-  return false;
-}
-
 }  // namespace message_bridge
 }  // namespace aos
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index 166e811..0645139 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -554,6 +554,7 @@
                                  adjusted_initial_time);
         }
       }
+      CHECK_LT(i, timestamps_.size());
       return std::make_tuple(std::get<0>(timestamps_[i]),
                              std::get<1>(timestamps_[i]));
     }
@@ -673,6 +674,9 @@
   std::vector<BootFilter> filters_;
 
   size_t current_filter_ = 0;
+
+  // The filter to resume popping from.
+  size_t pop_filter_ = 0;
 };
 
 // This class holds 2 NoncausalTimestampFilter's and handles averaging the
@@ -712,10 +716,6 @@
   void ReverseSample(const Node *node, logger::BootTimestamp node_sent_time,
                      logger::BootTimestamp other_node_delivered_time);
 
-  // Removes old data points from a node before the provided time.
-  // Returns true if any points were popped.
-  bool Pop(const Node *node, logger::BootTimestamp node_monotonic_now);
-
  private:
   NoncausalTimestampFilter a_;
   NoncausalTimestampFilter b_;
diff --git a/aos/network/timestamp_filter_test.cc b/aos/network/timestamp_filter_test.cc
index 5544711..37d02ac 100644
--- a/aos/network/timestamp_filter_test.cc
+++ b/aos/network/timestamp_filter_test.cc
@@ -1098,16 +1098,16 @@
   EXPECT_EQ(estimator.GetFilter(node_a)->timestamps_size(), 3u);
   EXPECT_EQ(estimator.GetFilter(node_b)->timestamps_size(), 3u);
 
-  estimator.Pop(node_a, ta2);
-  estimator.Pop(node_b, tb2);
+  estimator.GetFilter(node_a)->Pop(ta2);
+  estimator.GetFilter(node_b)->Pop(tb2);
   EXPECT_EQ(estimator.GetFilter(node_a)->timestamps_size(), 2u);
   EXPECT_EQ(estimator.GetFilter(node_b)->timestamps_size(), 2u);
 
-  // And dropping down to 1 point means 0 slope.
-  estimator.Pop(node_a, ta3);
-  estimator.Pop(node_b, tb3);
-  EXPECT_EQ(estimator.GetFilter(node_a)->timestamps_size(), 1u);
-  EXPECT_EQ(estimator.GetFilter(node_b)->timestamps_size(), 1u);
+  // And confirm we won't drop down to 1 point.
+  estimator.GetFilter(node_a)->Pop(ta3);
+  estimator.GetFilter(node_b)->Pop(tb3);
+  EXPECT_EQ(estimator.GetFilter(node_a)->timestamps_size(), 2u);
+  EXPECT_EQ(estimator.GetFilter(node_b)->timestamps_size(), 2u);
 }
 
 }  // namespace testing
diff --git a/aos/uuid.h b/aos/uuid.h
index d12bedf..3f7fc50 100644
--- a/aos/uuid.h
+++ b/aos/uuid.h
@@ -58,6 +58,7 @@
   }
 
   bool operator==(const UUID &other) const { return other.span() == span(); }
+  bool operator<(const UUID &other) const { return other.span() < span(); }
   bool operator!=(const UUID &other) const { return other.span() != span(); }
 
  private: