Merge "Add node_boots parsing from log file headers"
diff --git a/README.md b/README.md
index bcfe5c6..839f9c8 100644
--- a/README.md
+++ b/README.md
@@ -77,7 +77,7 @@
         1. Step 1: Add Bazel distribution URI as a package source
            ```
            sudo apt install curl
-           curl -fsSL https://bazel.build/bazel-release.pub.gpg | apt-key add -
+           curl -fsSL https://bazel.build/bazel-release.pub.gpg | sudo apt-key add -
            echo "deb [arch=amd64] https://storage.googleapis.com/bazel-apt stable jdk1.8" | sudo tee /etc/apt/sources.list.d/bazel.list
            ```
         2. Step 2: Install Bazel
diff --git a/aos/configuration.cc b/aos/configuration.cc
index d794a37..76704a4 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -10,10 +10,14 @@
 
 #include <map>
 #include <set>
+#include <string>
 #include <string_view>
+#include <vector>
 
 #include "absl/container/btree_set.h"
 #include "absl/strings/str_cat.h"
+#include "absl/strings/str_join.h"
+#include "absl/strings/str_split.h"
 #include "aos/configuration_generated.h"
 #include "aos/flatbuffer_merge.h"
 #include "aos/json_to_flatbuffer.h"
@@ -131,10 +135,33 @@
   return buffer;
 }
 
+std::string RemoveDotDots(const std::string_view filename) {
+  std::vector<std::string> split = absl::StrSplit(filename, '/');
+  auto iterator = split.begin();
+  while (iterator != split.end()) {
+    if (iterator->empty()) {
+      iterator = split.erase(iterator);
+    } else if (*iterator == ".") {
+      iterator = split.erase(iterator);
+    } else if (*iterator == "..") {
+      CHECK(iterator != split.begin())
+          << ": Import path may not start with ..: " << filename;
+      auto previous = iterator;
+      --previous;
+      split.erase(iterator);
+      iterator = split.erase(previous);
+    } else {
+      ++iterator;
+    }
+  }
+  return absl::StrJoin(split, "/");
+}
+
 FlatbufferDetachedBuffer<Configuration> ReadConfig(
     const std::string_view path, absl::btree_set<std::string> *visited_paths,
     const std::vector<std::string_view> &extra_import_paths) {
   std::string binary_path = MaybeReplaceExtension(path, ".json", ".bfbs");
+  VLOG(1) << "Looking up: " << path << ", starting with: " << binary_path;
   bool binary_path_exists = util::PathExists(binary_path);
   std::string raw_path(path);
   // For each .json file, look and see if we can find a .bfbs file next to it
@@ -151,13 +178,15 @@
 
     bool found_path = false;
     for (const auto &import_path : extra_import_paths) {
-      raw_path = std::string(import_path) + "/" + std::string(path);
+      raw_path = std::string(import_path) + "/" + RemoveDotDots(path);
       binary_path = MaybeReplaceExtension(raw_path, ".json", ".bfbs");
+      VLOG(1) << "Checking: " << binary_path;
       binary_path_exists = util::PathExists(binary_path);
       if (binary_path_exists) {
         found_path = true;
         break;
       }
+      VLOG(1) << "Checking: " << raw_path;
       if (util::PathExists(raw_path)) {
         found_path = true;
         break;
@@ -560,11 +589,11 @@
 
 FlatbufferDetachedBuffer<Configuration> ReadConfig(
     const std::string_view path,
-    const std::vector<std::string_view> &import_paths) {
+    const std::vector<std::string_view> &extra_import_paths) {
   // We only want to read a file once.  So track the visited files in a set.
   absl::btree_set<std::string> visited_paths;
   FlatbufferDetachedBuffer<Configuration> read_config =
-      ReadConfig(path, &visited_paths, import_paths);
+      ReadConfig(path, &visited_paths, extra_import_paths);
 
   // If we only read one file, and it had a .bfbs extension, it has to be a
   // fully formatted config.  Do a quick verification and return it.
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 4194514..79d09e4 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -225,7 +225,10 @@
 cc_test(
     name = "pingpong_test",
     srcs = ["pingpong_test.cc"],
-    data = [":pingpong_config"],
+    data = [
+        ":multinode_pingpong_test_split_config",
+        ":pingpong_config",
+    ],
     target_compatible_with = ["@platforms//os:linux"],
     deps = [
         ":ping_lib",
diff --git a/aos/events/channel_preallocated_allocator.h b/aos/events/channel_preallocated_allocator.h
index c4f0eca..76fb694 100644
--- a/aos/events/channel_preallocated_allocator.h
+++ b/aos/events/channel_preallocated_allocator.h
@@ -62,11 +62,12 @@
   uint8_t *reallocate_downward(uint8_t * /*old_p*/, size_t /*old_size*/,
                                size_t new_size, size_t /*in_use_back*/,
                                size_t /*in_use_front*/) override {
-    LOG(FATAL) << "Requested " << new_size << " bytes, max size "
-               << channel_->max_size() << " for channel "
-               << configuration::CleanedChannelToString(channel_)
-               << ".  Increase the memory reserved to at least " << new_size
-               << ".";
+    LOG(FATAL)
+        << "Requested " << new_size
+        << " bytes (includes extra for room to grow even more), max size "
+        << channel_->max_size() << " for channel "
+        << configuration::CleanedChannelToString(channel_)
+        << ".  Increase the memory reserved to at least " << new_size << ".";
     return nullptr;
   }
 
diff --git a/aos/events/epoll.cc b/aos/events/epoll.cc
index 1c4427b..8cb553b 100644
--- a/aos/events/epoll.cc
+++ b/aos/events/epoll.cc
@@ -4,6 +4,7 @@
 #include <sys/epoll.h>
 #include <sys/timerfd.h>
 #include <unistd.h>
+
 #include <atomic>
 #include <vector>
 
@@ -109,62 +110,66 @@
   }
 
   EventData *const event_data = static_cast<struct EventData *>(event.data.ptr);
-  if (event.events & kInEvents) {
-    CHECK(event_data->in_fn)
-        << ": No handler registered for input events on " << event_data->fd;
-    event_data->in_fn();
-  }
-  if (event.events & kOutEvents) {
-    CHECK(event_data->out_fn)
-        << ": No handler registered for output events on " << event_data->fd;
-    event_data->out_fn();
-  }
-  if (event.events & kErrorEvents) {
-    CHECK(event_data->err_fn)
-        << ": No handler registered for error events on " << event_data->fd;
-    event_data->err_fn();
-  }
+  event_data->DoCallbacks(event.events);
   return true;
 }
 
-void EPoll::Quit() { PCHECK(write(quit_signal_fd_, "q", 1) == 1); }
+void EPoll::Quit() {
+  // Shortcut to break us out of infinite loops. We might write more than once
+  // to the pipe, but we'll stop once the first is read on the other end.
+  if (!run_) {
+    return;
+  }
+  PCHECK(write(quit_signal_fd_, "q", 1) == 1);
+}
 
 void EPoll::OnReadable(int fd, ::std::function<void()> function) {
   EventData *event_data = GetEventData(fd);
   if (event_data == nullptr) {
-    fns_.emplace_back(std::make_unique<EventData>(fd));
+    fns_.emplace_back(std::make_unique<InOutEventData>(fd));
     event_data = fns_.back().get();
   } else {
-    CHECK(!event_data->in_fn) << ": Duplicate in functions for " << fd;
+    CHECK(!static_cast<InOutEventData *>(event_data)->in_fn)
+        << ": Duplicate in functions for " << fd;
   }
-  event_data->in_fn = ::std::move(function);
+  static_cast<InOutEventData *>(event_data)->in_fn = ::std::move(function);
   DoEpollCtl(event_data, event_data->events | kInEvents);
 }
 
 void EPoll::OnError(int fd, ::std::function<void()> function) {
   EventData *event_data = GetEventData(fd);
   if (event_data == nullptr) {
-    fns_.emplace_back(std::make_unique<EventData>(fd));
+    fns_.emplace_back(std::make_unique<InOutEventData>(fd));
     event_data = fns_.back().get();
   } else {
-    CHECK(!event_data->err_fn) << ": Duplicate in functions for " << fd;
+    CHECK(!static_cast<InOutEventData *>(event_data)->err_fn)
+        << ": Duplicate error functions for " << fd;
   }
-  event_data->err_fn = ::std::move(function);
+  static_cast<InOutEventData *>(event_data)->err_fn = ::std::move(function);
   DoEpollCtl(event_data, event_data->events | kErrorEvents);
 }
 
 void EPoll::OnWriteable(int fd, ::std::function<void()> function) {
   EventData *event_data = GetEventData(fd);
   if (event_data == nullptr) {
-    fns_.emplace_back(std::make_unique<EventData>(fd));
+    fns_.emplace_back(std::make_unique<InOutEventData>(fd));
     event_data = fns_.back().get();
   } else {
-    CHECK(!event_data->out_fn) << ": Duplicate out functions for " << fd;
+    CHECK(!static_cast<InOutEventData *>(event_data)->out_fn)
+        << ": Duplicate out functions for " << fd;
   }
-  event_data->out_fn = ::std::move(function);
+  static_cast<InOutEventData *>(event_data)->out_fn = ::std::move(function);
   DoEpollCtl(event_data, event_data->events | kOutEvents);
 }
 
+void EPoll::OnEvents(int fd, ::std::function<void(uint32_t)> function) {
+  if (GetEventData(fd) != nullptr) {
+    LOG(FATAL) << "May not replace OnEvents handlers";
+  }
+  fns_.emplace_back(std::make_unique<SingleEventData>(fd));
+  static_cast<SingleEventData *>(fns_.back().get())->fn = std::move(function);
+}
+
 void EPoll::ForgetClosedFd(int fd) {
   auto element = fns_.begin();
   while (fns_.end() != element) {
@@ -177,6 +182,10 @@
   LOG(FATAL) << "fd " << fd << " not found";
 }
 
+void EPoll::SetEvents(int fd, uint32_t events) {
+  DoEpollCtl(CHECK_NOTNULL(GetEventData(fd)), events);
+}
+
 // Removes fd from the event loop.
 void EPoll::DeleteFd(int fd) {
   auto element = fns_.begin();
@@ -192,6 +201,21 @@
   LOG(FATAL) << "fd " << fd << " not found";
 }
 
+void EPoll::InOutEventData::DoCallbacks(uint32_t events) {
+  if (events & kInEvents) {
+    CHECK(in_fn) << ": No handler registered for input events on " << fd;
+    in_fn();
+  }
+  if (events & kOutEvents) {
+    CHECK(out_fn) << ": No handler registered for output events on " << fd;
+    out_fn();
+  }
+  if (events & kErrorEvents) {
+    CHECK(err_fn) << ": No handler registered for error events on " << fd;
+    err_fn();
+  }
+}
+
 void EPoll::EnableEvents(int fd, uint32_t events) {
   EventData *const event_data = CHECK_NOTNULL(GetEventData(fd));
   DoEpollCtl(event_data, event_data->events | events);
@@ -214,12 +238,14 @@
 
 void EPoll::DoEpollCtl(EventData *event_data, const uint32_t new_events) {
   const uint32_t old_events = event_data->events;
+  if (old_events == new_events) {
+    // Shortcut without calling into the kernel. This happens often with
+    // external event loop integrations that are emulating poll, so make it
+    // fast.
+    return;
+  }
   event_data->events = new_events;
   if (new_events == 0) {
-    if (old_events == 0) {
-      // Not added, and doesn't need to be. Nothing to do here.
-      return;
-    }
     // It was added, but should now be removed.
     PCHECK(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, event_data->fd, nullptr) == 0);
     return;
diff --git a/aos/events/epoll.h b/aos/events/epoll.h
index 4bcedf1..2b7eb76 100644
--- a/aos/events/epoll.h
+++ b/aos/events/epoll.h
@@ -5,6 +5,7 @@
 #include <sys/epoll.h>
 #include <sys/timerfd.h>
 #include <unistd.h>
+
 #include <atomic>
 #include <functional>
 #include <vector>
@@ -68,21 +69,34 @@
   // Quits.  Async safe.
   void Quit();
 
-  // Called before waiting on the epoll file descriptor.
+  // Adds a function which will be called before waiting on the epoll file
+  // descriptor.
   void BeforeWait(std::function<void()> function);
 
   // Registers a function to be called if the fd becomes readable.
   // Only one function may be registered for readability on each fd.
+  // A fd may be registered exclusively with OnReadable/OnWriteable/OnError OR
+  // OnEvents.
   void OnReadable(int fd, ::std::function<void()> function);
 
   // Registers a function to be called if the fd reports an error.
   // Only one function may be registered for errors on each fd.
+  // A fd may be registered exclusively with OnReadable/OnWriteable/OnError OR
+  // OnEvents.
   void OnError(int fd, ::std::function<void()> function);
 
   // Registers a function to be called if the fd becomes writeable.
   // Only one function may be registered for writability on each fd.
+  // A fd may be registered exclusively with OnReadable/OnWriteable/OnError OR
+  // OnEvents.
   void OnWriteable(int fd, ::std::function<void()> function);
 
+  // Registers a function to be called when the configured events occur on fd.
+  // Which events occur will be passed to the function.
+  // A fd may be registered exclusively with OnReadable/OnWriteable/OnError OR
+  // OnEvents.
+  void OnEvents(int fd, ::std::function<void(uint32_t)> function);
+
   // Removes fd from the event loop.
   // All Fds must be cleaned up before this class is destroyed.
   void DeleteFd(int fd);
@@ -100,19 +114,50 @@
   // writeable.
   void DisableWriteable(int fd) { DisableEvents(fd, kOutEvents); }
 
+  // Sets the epoll events for the given fd. Be careful using this with
+  // OnReadable/OnWriteable/OnError: enabled events which fire with no handler
+  // registered will result in a crash.
+  void SetEvents(int fd, uint32_t events);
+
+  // Returns whether we're currently running. This changes to false when we
+  // start draining events to finish.
+  bool should_run() const { return run_; }
+
  private:
   // Structure whose pointer should be returned by epoll.  Makes looking up the
   // function fast and easy.
   struct EventData {
     EventData(int fd_in) : fd(fd_in) {}
+    virtual ~EventData() = default;
+
     // We use pointers to these objects as persistent identifiers, so they can't
     // be moved.
     EventData(const EventData &) = delete;
     EventData &operator=(const EventData &) = delete;
 
+    // Calls the appropriate callbacks when events are returned from the kernel.
+    virtual void DoCallbacks(uint32_t events) = 0;
+
     const int fd;
     uint32_t events = 0;
+  };
+
+  struct InOutEventData : public EventData {
+    InOutEventData(int fd) : EventData(fd) {}
+    ~InOutEventData() override = default;
+
     std::function<void()> in_fn, out_fn, err_fn;
+
+    void DoCallbacks(uint32_t events) override;
+  };
+
+  struct SingleEventData : public EventData {
+    SingleEventData(int fd) : EventData(fd) {}
+    ~SingleEventData() override = default;
+
+    std::function<void(uint32_t)> fn;
+
+    void DoCallbacks(uint32_t events) override { fn(events); }
   };
 
   void EnableEvents(int fd, uint32_t events);
diff --git a/aos/events/epoll_test.cc b/aos/events/epoll_test.cc
index 66460c2..053a09b 100644
--- a/aos/events/epoll_test.cc
+++ b/aos/events/epoll_test.cc
@@ -3,8 +3,8 @@
 #include <fcntl.h>
 #include <unistd.h>
 
-#include "gtest/gtest.h"
 #include "glog/logging.h"
+#include "gtest/gtest.h"
 
 namespace aos {
 namespace internal {
@@ -48,22 +48,22 @@
 };
 
 class EPollTest : public ::testing::Test {
-  public:
-   void RunFor(std::chrono::nanoseconds duration) {
-     TimerFd timerfd;
-     bool did_quit = false;
-     epoll_.OnReadable(timerfd.fd(), [this, &timerfd, &did_quit]() {
-       CHECK(!did_quit);
-       epoll_.Quit();
-       did_quit = true;
-       timerfd.Read();
-     });
-     timerfd.SetTime(monotonic_clock::now() + duration,
-                     monotonic_clock::duration::zero());
-     epoll_.Run();
-     CHECK(did_quit);
-     epoll_.DeleteFd(timerfd.fd());
-   }
+ public:
+  void RunFor(std::chrono::nanoseconds duration) {
+    TimerFd timerfd;
+    bool did_quit = false;
+    epoll_.OnReadable(timerfd.fd(), [this, &timerfd, &did_quit]() {
+      CHECK(!did_quit);
+      epoll_.Quit();
+      did_quit = true;
+      timerfd.Read();
+    });
+    timerfd.SetTime(monotonic_clock::now() + duration,
+                    monotonic_clock::duration::zero());
+    epoll_.Run();
+    CHECK(did_quit);
+    epoll_.DeleteFd(timerfd.fd());
+  }
 
   // Tests should avoid relying on ordering for events closer in time than this,
   // or waiting for longer than this to ensure events happen in order.
@@ -71,7 +71,7 @@
     return std::chrono::milliseconds(50);
   }
 
-   EPoll epoll_;
+  EPoll epoll_;
 };
 
 // Test that the basics of OnReadable work.
@@ -201,6 +201,11 @@
   epoll_.DeleteFd(pipe.write_fd());
 }
 
+TEST_F(EPollTest, QuitInBeforeWait) {
+  epoll_.BeforeWait([this]() { epoll_.Quit(); });
+  epoll_.Run();
+}
+
 }  // namespace testing
 }  // namespace internal
 }  // namespace aos
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index 29e4c22..b0487d5 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -493,6 +493,19 @@
   return result;
 }
 
+void EventLoop::SetTimerContext(
+    monotonic_clock::time_point monotonic_event_time) {
+  context_.monotonic_event_time = monotonic_event_time;
+  context_.monotonic_remote_time = monotonic_clock::min_time;
+  context_.realtime_event_time = realtime_clock::min_time;
+  context_.realtime_remote_time = realtime_clock::min_time;
+  context_.queue_index = 0xffffffffu;
+  context_.size = 0u;
+  context_.data = nullptr;
+  context_.buffer_index = -1;
+  context_.source_boot_uuid = boot_uuid();
+}
+
 void WatcherState::set_timing_report(timing::Watcher *watcher) {
   CHECK_NOTNULL(watcher);
   watcher_ = watcher;
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index d23314e..8ef8d57 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -7,6 +7,7 @@
 #include <string>
 #include <string_view>
 
+#include "absl/container/btree_set.h"
 #include "aos/configuration.h"
 #include "aos/configuration_generated.h"
 #include "aos/events/channel_preallocated_allocator.h"
@@ -20,8 +21,6 @@
 #include "aos/time/time.h"
 #include "aos/util/phased_loop.h"
 #include "aos/uuid.h"
-
-#include "absl/container/btree_set.h"
 #include "flatbuffers/flatbuffers.h"
 #include "glog/logging.h"
 
@@ -77,7 +76,7 @@
 
   // UUID of the remote node which sent this message, or this node in the case
   // of events which are local to this node.
-  UUID remote_boot_uuid = UUID::Zero();
+  UUID source_boot_uuid = UUID::Zero();
 
   // Efficiently copies the flatbuffer into a FlatbufferVector, allocating
   // memory in the process.  It is vital that T matches the type of the
@@ -115,6 +114,7 @@
 
  protected:
   EventLoop *event_loop() { return event_loop_; }
+  const EventLoop *event_loop() const { return event_loop_; }
 
   Context context_;
 
@@ -154,7 +154,7 @@
   bool Send(size_t size);
   bool Send(size_t size, monotonic_clock::time_point monotonic_remote_time,
             realtime_clock::time_point realtime_remote_time,
-            uint32_t remote_queue_index, const UUID &remote_boot_uuid);
+            uint32_t remote_queue_index, const UUID &source_boot_uuid);
 
   // Sends a single block of data by copying it.
   // The remote arguments have the same meaning as in Send above.
@@ -162,7 +162,7 @@
   bool Send(const void *data, size_t size,
             monotonic_clock::time_point monotonic_remote_time,
             realtime_clock::time_point realtime_remote_time,
-            uint32_t remote_queue_index, const UUID &remote_boot_uuid);
+            uint32_t remote_queue_index, const UUID &source_boot_uuid);
 
   const Channel *channel() const { return channel_; }
 
@@ -190,6 +190,7 @@
 
  protected:
   EventLoop *event_loop() { return event_loop_; }
+  const EventLoop *event_loop() const { return event_loop_; }
 
   monotonic_clock::time_point monotonic_sent_time_ = monotonic_clock::min_time;
   realtime_clock::time_point realtime_sent_time_ = realtime_clock::min_time;
@@ -202,12 +203,12 @@
                       monotonic_clock::time_point monotonic_remote_time,
                       realtime_clock::time_point realtime_remote_time,
                       uint32_t remote_queue_index,
-                      const UUID &remote_boot_uuid) = 0;
+                      const UUID &source_boot_uuid) = 0;
   virtual bool DoSend(size_t size,
                       monotonic_clock::time_point monotonic_remote_time,
                       realtime_clock::time_point realtime_remote_time,
                       uint32_t remote_queue_index,
-                      const UUID &remote_boot_uuid) = 0;
+                      const UUID &source_boot_uuid) = 0;
 
   EventLoop *const event_loop_;
   const Channel *const channel_;
@@ -473,8 +474,13 @@
   Ftrace ftrace_;
 };
 
+// Note, it is supported to create only:
+//   multiple fetchers, and (one sender or one watcher) per <name, type>
+//   tuple.
 class EventLoop {
  public:
+  // Holds configuration by reference for the lifetime of this object. It may
+  // never be mutated externally in any way.
   EventLoop(const Configuration *configuration);
 
   virtual ~EventLoop();
@@ -495,10 +501,6 @@
     return GetChannel<T>(channel_name) != nullptr;
   }
 
-  // Note, it is supported to create:
-  //   multiple fetchers, and (one sender or one watcher) per <name, type>
-  //   tuple.
-
   // Makes a class that will always fetch the most recent value
   // sent to the provided channel.
   template <typename T>
@@ -596,7 +598,7 @@
 
   // TODO(austin): OnExit for cleanup.
 
-  // Threadsafe.
+  // May be safely called from any thread.
   bool is_running() const { return is_running_.load(); }
 
   // Sets the scheduler priority to run the event loop at.  This may not be
@@ -737,6 +739,10 @@
   // If true, don't send AOS_LOG to /aos
   bool skip_logger_ = false;
 
+  // Sets context_ for a timed event which is supposed to happen at the provided
+  // time.
+  void SetTimerContext(monotonic_clock::time_point monotonic_event_time);
+
  private:
   virtual pid_t GetTid() = 0;
 
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 2ffcb58..1460663 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -274,7 +274,7 @@
   EXPECT_EQ(fetcher.context().monotonic_remote_time, monotonic_clock::min_time);
   EXPECT_EQ(fetcher.context().realtime_event_time, realtime_clock::min_time);
   EXPECT_EQ(fetcher.context().realtime_remote_time, realtime_clock::min_time);
-  EXPECT_EQ(fetcher.context().remote_boot_uuid, UUID::Zero());
+  EXPECT_EQ(fetcher.context().source_boot_uuid, UUID::Zero());
   EXPECT_EQ(fetcher.context().queue_index, 0xffffffffu);
   EXPECT_EQ(fetcher.context().size, 0u);
   EXPECT_EQ(fetcher.context().data, nullptr);
@@ -302,7 +302,7 @@
   EXPECT_LE(fetcher.context().monotonic_event_time, monotonic_now + kEpsilon);
   EXPECT_GE(fetcher.context().realtime_event_time, realtime_now - kEpsilon);
   EXPECT_LE(fetcher.context().realtime_event_time, realtime_now + kEpsilon);
-  EXPECT_EQ(fetcher.context().remote_boot_uuid, loop2->boot_uuid());
+  EXPECT_EQ(fetcher.context().source_boot_uuid, loop2->boot_uuid());
   EXPECT_EQ(fetcher.context().queue_index, 0x0u);
   EXPECT_EQ(fetcher.context().size, 20u);
   EXPECT_NE(fetcher.context().data, nullptr);
@@ -957,7 +957,7 @@
     EXPECT_EQ(loop->context().monotonic_remote_time, monotonic_clock::min_time);
     EXPECT_EQ(loop->context().realtime_event_time, realtime_clock::min_time);
     EXPECT_EQ(loop->context().realtime_remote_time, realtime_clock::min_time);
-    EXPECT_EQ(loop->context().remote_boot_uuid, loop->boot_uuid());
+    EXPECT_EQ(loop->context().source_boot_uuid, loop->boot_uuid());
     EXPECT_EQ(loop->context().queue_index, 0xffffffffu);
     EXPECT_EQ(loop->context().size, 0u);
     EXPECT_EQ(loop->context().data, nullptr);
@@ -1252,7 +1252,7 @@
               loop1->context().monotonic_event_time);
     EXPECT_EQ(loop1->context().realtime_remote_time,
               loop1->context().realtime_event_time);
-    EXPECT_EQ(loop1->context().remote_boot_uuid, loop1->boot_uuid());
+    EXPECT_EQ(loop1->context().source_boot_uuid, loop1->boot_uuid());
 
     const aos::monotonic_clock::time_point monotonic_now =
         loop1->monotonic_now();
@@ -1300,7 +1300,7 @@
             fetcher.context().realtime_remote_time);
   EXPECT_EQ(fetcher.context().monotonic_event_time,
             fetcher.context().monotonic_remote_time);
-  EXPECT_EQ(fetcher.context().remote_boot_uuid, loop1->boot_uuid());
+  EXPECT_EQ(fetcher.context().source_boot_uuid, loop1->boot_uuid());
 
   EXPECT_TRUE(monotonic_time_offset > ::std::chrono::milliseconds(-500))
       << ": Got "
@@ -1353,7 +1353,7 @@
               loop1->context().monotonic_event_time);
     EXPECT_EQ(loop1->context().realtime_remote_time,
               loop1->context().realtime_event_time);
-    EXPECT_EQ(loop1->context().remote_boot_uuid, loop1->boot_uuid());
+    EXPECT_EQ(loop1->context().source_boot_uuid, loop1->boot_uuid());
 
     const aos::monotonic_clock::time_point monotonic_now =
         loop1->monotonic_now();
@@ -1387,7 +1387,7 @@
             fetcher.context().realtime_remote_time);
   EXPECT_EQ(fetcher.context().monotonic_event_time,
             fetcher.context().monotonic_remote_time);
-  EXPECT_EQ(fetcher.context().remote_boot_uuid, loop1->boot_uuid());
+  EXPECT_EQ(fetcher.context().source_boot_uuid, loop1->boot_uuid());
 
   EXPECT_TRUE(monotonic_time_offset > ::std::chrono::milliseconds(-500))
       << ": Got "
@@ -1445,7 +1445,7 @@
 
             EXPECT_EQ(loop1->context().monotonic_remote_time,
                       monotonic_clock::min_time);
-            EXPECT_EQ(loop1->context().remote_boot_uuid, loop1->boot_uuid());
+            EXPECT_EQ(loop1->context().source_boot_uuid, loop1->boot_uuid());
             EXPECT_EQ(loop1->context().realtime_event_time,
                       realtime_clock::min_time);
             EXPECT_EQ(loop1->context().realtime_remote_time,
@@ -1956,7 +1956,7 @@
   const aos::realtime_clock::time_point realtime_remote_time =
       aos::realtime_clock::time_point(chrono::seconds(3132));
   const uint32_t remote_queue_index = 0x254971;
-  const UUID remote_boot_uuid = UUID::Random();
+  const UUID source_boot_uuid = UUID::Random();
 
   std::unique_ptr<aos::RawSender> sender =
       loop1->MakeRawSender(configuration::GetChannel(
@@ -1969,20 +1969,20 @@
   loop2->OnRun([&]() {
     EXPECT_TRUE(sender->Send(kMessage.span().data(), kMessage.span().size(),
                              monotonic_remote_time, realtime_remote_time,
-                             remote_queue_index, remote_boot_uuid));
+                             remote_queue_index, source_boot_uuid));
   });
 
   bool happened = false;
   loop2->MakeRawWatcher(
       configuration::GetChannel(loop2->configuration(), "/test",
                                 "aos.TestMessage", "", nullptr),
-      [this, monotonic_remote_time, realtime_remote_time, remote_boot_uuid,
+      [this, monotonic_remote_time, realtime_remote_time, source_boot_uuid,
        remote_queue_index, &fetcher,
        &happened](const Context &context, const void * /*message*/) {
         happened = true;
         EXPECT_EQ(monotonic_remote_time, context.monotonic_remote_time);
         EXPECT_EQ(realtime_remote_time, context.realtime_remote_time);
-        EXPECT_EQ(remote_boot_uuid, context.remote_boot_uuid);
+        EXPECT_EQ(source_boot_uuid, context.source_boot_uuid);
         EXPECT_EQ(remote_queue_index, context.remote_queue_index);
 
         ASSERT_TRUE(fetcher->Fetch());
@@ -2212,6 +2212,35 @@
   Run();
 }
 
+// Tests that event loop's context's monotonic time is set to a value on OnRun.
+TEST_P(AbstractEventLoopTest, SetContextOnRun) {
+  auto loop = MakePrimary();
+
+  // We want to check that monotonic event time is before monotonic now
+  // called inside of callback, but after time point obtained callback.
+  aos::monotonic_clock::time_point monotonic_event_time_on_run;
+
+  loop->OnRun([&]() {
+    monotonic_event_time_on_run = loop->context().monotonic_event_time;
+    EXPECT_LE(monotonic_event_time_on_run, loop->monotonic_now());
+    EXPECT_EQ(loop->context().monotonic_remote_time, monotonic_clock::min_time);
+    EXPECT_EQ(loop->context().realtime_event_time, realtime_clock::min_time);
+    EXPECT_EQ(loop->context().realtime_remote_time, realtime_clock::min_time);
+    EXPECT_EQ(loop->context().source_boot_uuid, loop->boot_uuid());
+    EXPECT_EQ(loop->context().queue_index, 0xffffffffu);
+    EXPECT_EQ(loop->context().size, 0u);
+    EXPECT_EQ(loop->context().data, nullptr);
+    EXPECT_EQ(loop->context().buffer_index, -1);
+  });
+
+  EndEventLoop(loop.get(), ::std::chrono::milliseconds(200));
+
+  const aos::monotonic_clock::time_point before_run_time =
+      loop->monotonic_now();
+  Run();
+  EXPECT_GE(monotonic_event_time_on_run, before_run_time);
+}
+
 // Tests that watchers fail when created on the wrong node.
 TEST_P(AbstractEventLoopDeathTest, NodeWatcher) {
   EnableNodes("them");
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index e4e879f..c509170 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -182,15 +182,7 @@
   CHECK_NOTNULL(timing_.timer);
   const monotonic_clock::time_point monotonic_start_time = get_time();
 
-  event_loop_->context_.monotonic_event_time = event_time;
-  event_loop_->context_.monotonic_remote_time = monotonic_clock::min_time;
-  event_loop_->context_.realtime_remote_time =
-      event_loop_->context_.realtime_event_time = realtime_clock::min_time;
-  event_loop_->context_.queue_index = 0xffffffffu;
-  event_loop_->context_.size = 0;
-  event_loop_->context_.data = nullptr;
-  event_loop_->context_.buffer_index = -1;
-  event_loop_->context_.remote_boot_uuid = event_loop_->boot_uuid();
+  event_loop_->SetTimerContext(event_time);
 
   ftrace_.FormatMessage(
       "timer: %.*s: start now=%" PRId64 " event=%" PRId64,
@@ -228,15 +220,7 @@
   const monotonic_clock::time_point monotonic_start_time = get_time();
 
   // Update the context to hold the desired wakeup time.
-  event_loop_->context_.monotonic_event_time = phased_loop_.sleep_time();
-  event_loop_->context_.monotonic_remote_time = monotonic_clock::min_time;
-  event_loop_->context_.realtime_remote_time =
-      event_loop_->context_.realtime_event_time = realtime_clock::min_time;
-  event_loop_->context_.queue_index = 0xffffffffu;
-  event_loop_->context_.size = 0;
-  event_loop_->context_.data = nullptr;
-  event_loop_->context_.buffer_index = -1;
-  event_loop_->context_.remote_boot_uuid = event_loop_->boot_uuid();
+  event_loop_->SetTimerContext(phased_loop_.sleep_time());
 
   // Compute how many cycles elapsed
   cycles_elapsed_ += phased_loop_.Iterate(monotonic_start_time);
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index 49b170d..3739f49 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -50,6 +50,8 @@
   ::std::function<void()> callback = ::std::move(iter->second);
   events_list_.erase(iter);
   callback();
+
+  converter_->ObserveTimePassed(scheduler_scheduler_->distributed_now());
 }
 
 void EventScheduler::RunOnRun() {
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index 397b5f0..f981ef2 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -58,6 +58,9 @@
   // node.
   virtual monotonic_clock::time_point FromDistributedClock(
       size_t node_index, distributed_clock::time_point time) = 0;
+
+  // Called whenever time passes this point and we can forget about it.
+  virtual void ObserveTimePassed(distributed_clock::time_point time) = 0;
 };
 
 class EventSchedulerScheduler;
@@ -151,6 +154,8 @@
         size_t /*node_index*/, distributed_clock::time_point time) override {
       return monotonic_clock::epoch() + time.time_since_epoch();
     }
+
+    void ObserveTimePassed(distributed_clock::time_point /*time*/) override {}
   };
 
   UnityConverter unity_converter_;
diff --git a/aos/events/event_scheduler_test.cc b/aos/events/event_scheduler_test.cc
index 1ad0a84..3bcf6ca 100644
--- a/aos/events/event_scheduler_test.cc
+++ b/aos/events/event_scheduler_test.cc
@@ -41,6 +41,8 @@
            distributed_offset_[node_index];
   }
 
+  void ObserveTimePassed(distributed_clock::time_point /*time*/) override {}
+
  private:
   // Offset to the distributed clock.
   //   distributed = monotonic + offset;
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index 5c69f07..342a491 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -39,6 +39,8 @@
             "confirming they can be parsed.");
 DEFINE_bool(print_parts_only, false,
             "If true, only print out the results of logfile sorting.");
+DEFINE_bool(channels, false,
+            "If true, print out all the configured channels for this log.");
 
 // Print the flatbuffer out to stdout, both to remove the unnecessary cruft from
 // glog and to allow the user to readily redirect just the logged output
@@ -227,6 +229,15 @@
 
   aos::logger::LogReader reader(logfiles);
 
+  if (FLAGS_channels) {
+    const aos::Configuration *config = reader.configuration();
+    for (const aos::Channel *channel : *config->channels()) {
+      std::cout << channel->name()->c_str() << " " << channel->type()->c_str()
+                << '\n';
+    }
+    return 0;
+  }
+
   aos::FastStringBuilder builder;
 
   aos::SimulatedEventLoopFactory event_loop_factory(reader.configuration());
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index 6630441..abae9a1 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -713,7 +713,7 @@
         if (our_node_index != f.data_node_index) {
           // And update our boot UUID if the UUID has changed.
           if (node_state_[f.data_node_index].SetBootUUID(
-                  f.fetcher->context().remote_boot_uuid)) {
+                  f.fetcher->context().source_boot_uuid)) {
             MaybeWriteHeader(f.data_node_index);
           }
         }
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 89be828..cd2ceb8 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -320,7 +320,7 @@
   static const std::string_view kXz = ".xz";
   if (filename.substr(filename.size() - kXz.size()) == kXz) {
 #if ENABLE_LZMA
-    decoder_ = std::make_unique<LzmaDecoder>(filename);
+    decoder_ = std::make_unique<ThreadedLzmaDecoder>(filename);
 #else
     LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
 #endif
@@ -972,6 +972,8 @@
     CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
 
     timestamp_mapper->nodes_data_[node()].peer = this;
+
+    node_data->save_for_peer = true;
   }
 }
 
@@ -1271,6 +1273,7 @@
   }
   for (NodeData &node_data : nodes_data_) {
     if (!node_data.any_delivered) continue;
+    if (!node_data.save_for_peer) continue;
     if (node_data.channels[m->channel_index].delivered) {
       // TODO(austin): This copies the data...  Probably not worth stressing
       // about yet.
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index f31f4df..56b582f 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -654,6 +654,9 @@
     // bools in delivered below are true.
     bool any_delivered = false;
 
+    // True if we have a peer and therefore should be saving data for it.
+    bool save_for_peer = false;
+
     // Peer pointer.  This node is only to be considered if a peer is set.
     TimestampMapper *peer = nullptr;
 
diff --git a/aos/events/logging/lzma_encoder.cc b/aos/events/logging/lzma_encoder.cc
index 54dc6f7..60a203e 100644
--- a/aos/events/logging/lzma_encoder.cc
+++ b/aos/events/logging/lzma_encoder.cc
@@ -205,4 +205,102 @@
   return end - begin;
 }
 
+ThreadedLzmaDecoder::ThreadedLzmaDecoder(std::string_view filename)
+    : decoder_(filename), decode_thread_([this] {
+        std::unique_lock lock(decode_mutex_);
+        while (true) {
+          // Wake if the queue is too small or we are finished.
+          continue_decoding_.wait(lock, [this] {
+            return decoded_queue_.size() < kQueueSize || finished_;
+          });
+
+          if (finished_) {
+            return;
+          }
+
+          while (true) {
+            CHECK(!finished_);
+            // Release our lock on the queue before doing decompression work.
+            lock.unlock();
+
+            ResizeableBuffer buffer;
+            buffer.resize(kBufSize);
+
+            const size_t bytes_read =
+                decoder_.Read(buffer.begin(), buffer.end());
+            buffer.resize(bytes_read);
+
+            // Relock the queue and move the new buffer to the end. This should
+            // be fast. We also need to stay locked when we wait().
+            lock.lock();
+            if (bytes_read > 0) {
+              decoded_queue_.emplace_back(std::move(buffer));
+            } else {
+              finished_ = true;
+            }
+
+            // If we've filled the queue or are out of data, go back to sleep.
+            if (decoded_queue_.size() >= kQueueSize || finished_) {
+              break;
+            }
+          }
+
+          // Notify main thread in case it was waiting for us to queue more
+          // data.
+          queue_filled_.notify_one();
+        }
+      }) {}
+
+ThreadedLzmaDecoder::~ThreadedLzmaDecoder() {
+  // Wake up decode thread so it can return.
+  {
+    std::scoped_lock lock(decode_mutex_);
+    finished_ = true;
+  }
+  continue_decoding_.notify_one();
+  decode_thread_.join();
+}
+
+size_t ThreadedLzmaDecoder::Read(uint8_t *begin, uint8_t *end) {
+  std::unique_lock lock(decode_mutex_);
+
+  // Strip any empty buffers
+  for (auto iter = decoded_queue_.begin(); iter != decoded_queue_.end();) {
+    if (iter->size() == 0) {
+      iter = decoded_queue_.erase(iter);
+    } else {
+      ++iter;
+    }
+  }
+
+  // If the queue is empty, sleep until the decoder thread has produced another
+  // buffer.
+  if (decoded_queue_.empty()) {
+    continue_decoding_.notify_one();
+    queue_filled_.wait(lock,
+                       [this] { return finished_ || !decoded_queue_.empty(); });
+    if (finished_ && decoded_queue_.empty()) {
+      return 0;
+    }
+  }
+  // Sanity check if the queue is empty and we're not finished.
+  CHECK(!decoded_queue_.empty()) << "Decoded queue unexpectedly empty";
+
+  ResizeableBuffer &front_buffer = decoded_queue_.front();
+
+  // Copy some data from our working buffer to the requested destination.
+  const std::size_t bytes_requested = end - begin;
+  const std::size_t bytes_to_copy =
+      std::min(bytes_requested, front_buffer.size());
+  memcpy(begin, front_buffer.data(), bytes_to_copy);
+  front_buffer.erase_front(bytes_to_copy);
+
+  // Ensure the decoding thread wakes up if the queue isn't full.
+  if (!finished_ && decoded_queue_.size() < kQueueSize) {
+    continue_decoding_.notify_one();
+  }
+
+  return bytes_to_copy;
+}
+
 }  // namespace aos::logger
diff --git a/aos/events/logging/lzma_encoder.h b/aos/events/logging/lzma_encoder.h
index 919f5fa..972ed6c 100644
--- a/aos/events/logging/lzma_encoder.h
+++ b/aos/events/logging/lzma_encoder.h
@@ -1,13 +1,16 @@
 #ifndef AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
 #define AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
 
-#include "absl/types/span.h"
-#include "flatbuffers/flatbuffers.h"
-#include "lzma.h"
+#include <condition_variable>
+#include <mutex>
+#include <thread>
 
+#include "absl/types/span.h"
 #include "aos/containers/resizeable_buffer.h"
 #include "aos/events/logging/buffer_encoder.h"
 #include "aos/events/logging/logger_generated.h"
+#include "flatbuffers/flatbuffers.h"
+#include "lzma.h"
 
 namespace aos::logger {
 
@@ -78,6 +81,38 @@
   std::string filename_;
 };
 
+// Decompresses data with liblzma in a new thread, up to a maximum queue
+// size. Calls to Read() will return data from the queue if available,
+// or block until more data is queued or the stream finishes.
+class ThreadedLzmaDecoder : public DataDecoder {
+ public:
+  explicit ThreadedLzmaDecoder(std::string_view filename);
+  ThreadedLzmaDecoder(const ThreadedLzmaDecoder &) = delete;
+  ThreadedLzmaDecoder &operator=(const ThreadedLzmaDecoder &) = delete;
+
+  ~ThreadedLzmaDecoder();
+
+  size_t Read(uint8_t *begin, uint8_t *end) final;
+
+ private:
+  static constexpr size_t kBufSize{256 * 1024};
+  static constexpr size_t kQueueSize{8};
+
+  LzmaDecoder decoder_;
+
+  // Queue of decompressed data to return on calls to Read
+  std::vector<ResizeableBuffer> decoded_queue_;
+
+  // Mutex to control access to decoded_queue_.
+  std::mutex decode_mutex_;
+  std::condition_variable continue_decoding_;
+  std::condition_variable queue_filled_;
+
+  bool finished_ = false;
+
+  std::thread decode_thread_;
+};
+
 }  // namespace aos::logger
 
 #endif  // AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
diff --git a/aos/events/logging/lzma_encoder_test.cc b/aos/events/logging/lzma_encoder_test.cc
index 63ed6c2..bbd0c60 100644
--- a/aos/events/logging/lzma_encoder_test.cc
+++ b/aos/events/logging/lzma_encoder_test.cc
@@ -17,6 +17,16 @@
                        }),
                        ::testing::Range(0, 100)));
 
+INSTANTIATE_TEST_SUITE_P(
+    LzmaThreaded, BufferEncoderTest,
+    ::testing::Combine(::testing::Values([]() {
+                         return std::make_unique<LzmaEncoder>(2);
+                       }),
+                       ::testing::Values([](std::string_view filename) {
+                         return std::make_unique<ThreadedLzmaDecoder>(filename);
+                       }),
+                       ::testing::Range(0, 100)));
+
 // Tests that we return as much of the file as we can read if the end is
 // corrupted.
 TEST_F(BufferEncoderBaseTest, CorruptedBuffer) {
diff --git a/aos/events/pingpong_test.cc b/aos/events/pingpong_test.cc
index 07340f7..9c874ef 100644
--- a/aos/events/pingpong_test.cc
+++ b/aos/events/pingpong_test.cc
@@ -79,5 +79,93 @@
   EXPECT_EQ(ping_count, 1001);
 }
 
+// Multi-node ping pong test.  This test carefully mirrors the structure of the
+// single node test above to help highlight the similarities and differences.
+class MultiNodePingPongTest : public ::testing::Test {
+ public:
+  MultiNodePingPongTest()
+      : config_(aos::configuration::ReadConfig(ArtifactPath(
+            "aos/events/multinode_pingpong_test_split_config.json"))),
+        event_loop_factory_(&config_.message()),
+        pi1_(
+            configuration::GetNode(event_loop_factory_.configuration(), "pi1")),
+        pi2_(
+            configuration::GetNode(event_loop_factory_.configuration(), "pi2")),
+        ping_event_loop_(event_loop_factory_.MakeEventLoop("ping", pi1_)),
+        ping_(ping_event_loop_.get()),
+        pong_event_loop_(event_loop_factory_.MakeEventLoop("pong", pi2_)),
+        pong_(pong_event_loop_.get()) {}
+
+  // Config and factory.
+  // The factory is a factory for connected event loops.  Each application needs
+  // a separate event loop (because you can't send a message to yourself in a
+  // single event loop).  The created event loops can then send messages to each
+  // other and trigger callbacks to be called, or fetchers to receive data.
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
+  SimulatedEventLoopFactory event_loop_factory_;
+
+  // Convenience pointers for each Node.
+  const Node *pi1_;
+  const Node *pi2_;
+
+  // Event loop and app for Ping
+  std::unique_ptr<EventLoop> ping_event_loop_;
+  Ping ping_;
+
+  // Event loop and app for Pong
+  std::unique_ptr<EventLoop> pong_event_loop_;
+  Pong pong_;
+};
+
+// Tests that the number of pong messages matches the number of ping messages
+// (on both nodes this time)
+TEST_F(MultiNodePingPongTest, AlwaysReplies) {
+  // For grins, test that ping and pong appear on both nodes and match.
+  std::unique_ptr<EventLoop> pi1_test_event_loop =
+      event_loop_factory_.MakeEventLoop("test", pi1_);
+  std::unique_ptr<EventLoop> pi2_test_event_loop =
+      event_loop_factory_.MakeEventLoop("test", pi2_);
+
+  int pi1_ping_count = 0;
+  int pi2_ping_count = 0;
+  int pi1_pong_count = 0;
+  int pi2_pong_count = 0;
+
+  // Confirm that the ping value matches on both nodes.
+  pi1_test_event_loop->MakeWatcher(
+      "/test", [&pi1_ping_count](const examples::Ping &ping) {
+        EXPECT_EQ(ping.value(), pi1_ping_count + 1);
+        ++pi1_ping_count;
+      });
+  pi2_test_event_loop->MakeWatcher(
+      "/test", [&pi2_ping_count](const examples::Ping &ping) {
+        EXPECT_EQ(ping.value(), pi2_ping_count + 1);
+        ++pi2_ping_count;
+      });
+
+  // Confirm that the ping and pong counts both match, and the value also
+  // matches.
+  pi2_test_event_loop->MakeWatcher(
+      "/test", [&pi2_pong_count, &pi2_ping_count](const examples::Pong &pong) {
+        EXPECT_EQ(pong.value(), pi2_pong_count + 1);
+        ++pi2_pong_count;
+        EXPECT_EQ(pi2_ping_count, pi2_pong_count);
+      });
+  pi1_test_event_loop->MakeWatcher(
+      "/test", [&pi1_pong_count, &pi1_ping_count](const examples::Pong &pong) {
+        EXPECT_EQ(pong.value(), pi1_pong_count + 1);
+        ++pi1_pong_count;
+        EXPECT_EQ(pi1_ping_count, pi1_pong_count);
+      });
+
+  // Since forwarding takes "time", we need to run a bit longer to let pong
+  // finish the last cycle.
+  event_loop_factory_.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
+
+  // We run at t=0 and t=10 seconds, which means we run 1 extra time.
+  EXPECT_EQ(pi1_ping_count, 1001);
+  EXPECT_EQ(pi2_ping_count, 1001);
+}
+
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 63e1cb9..c70fef6 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -372,7 +372,7 @@
         queue_index.index(), &context_.monotonic_event_time,
         &context_.realtime_event_time, &context_.monotonic_remote_time,
         &context_.realtime_remote_time, &context_.remote_queue_index,
-        &context_.remote_boot_uuid, &context_.size, copy_buffer);
+        &context_.source_boot_uuid, &context_.size, copy_buffer);
 
     if (read_result == ipc_lib::LocklessQueueReader::Result::GOOD) {
       if (pin_data()) {
@@ -477,9 +477,13 @@
     simple_shm_fetcher_.RetrieveData();
   }
 
-  ~ShmFetcher() { context_.data = nullptr; }
+  ~ShmFetcher() override {
+    shm_event_loop()->CheckCurrentThread();
+    context_.data = nullptr;
+  }
 
   std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
+    shm_event_loop()->CheckCurrentThread();
     if (simple_shm_fetcher_.FetchNext()) {
       context_ = simple_shm_fetcher_.context();
       return std::make_pair(true, monotonic_clock::now());
@@ -488,6 +492,7 @@
   }
 
   std::pair<bool, monotonic_clock::time_point> DoFetch() override {
+    shm_event_loop()->CheckCurrentThread();
     if (simple_shm_fetcher_.Fetch()) {
       context_ = simple_shm_fetcher_.context();
       return std::make_pair(true, monotonic_clock::now());
@@ -500,6 +505,10 @@
   }
 
  private:
+  const ShmEventLoop *shm_event_loop() const {
+    return static_cast<const ShmEventLoop *>(event_loop());
+  }
+
   SimpleShmFetcher simple_shm_fetcher_;
 };
 
@@ -517,7 +526,7 @@
             channel)),
         wake_upper_(lockless_queue_memory_.queue()) {}
 
-  ~ShmSender() override {}
+  ~ShmSender() override { shm_event_loop()->CheckCurrentThread(); }
 
   static ipc_lib::LocklessQueueSender VerifySender(
       std::optional<ipc_lib::LocklessQueueSender> sender,
@@ -530,24 +539,32 @@
                << ", too many senders.";
   }
 
-  void *data() override { return lockless_queue_sender_.Data(); }
-  size_t size() override { return lockless_queue_sender_.size(); }
+  void *data() override {
+    shm_event_loop()->CheckCurrentThread();
+    return lockless_queue_sender_.Data();
+  }
+  size_t size() override {
+    shm_event_loop()->CheckCurrentThread();
+    return lockless_queue_sender_.size();
+  }
   bool DoSend(size_t length,
               aos::monotonic_clock::time_point monotonic_remote_time,
               aos::realtime_clock::time_point realtime_remote_time,
               uint32_t remote_queue_index,
-              const UUID &remote_boot_uuid) override {
+              const UUID &source_boot_uuid) override {
+    shm_event_loop()->CheckCurrentThread();
     CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
         << ": Sent too big a message on "
         << configuration::CleanedChannelToString(channel());
     CHECK(lockless_queue_sender_.Send(length, monotonic_remote_time,
                                       realtime_remote_time, remote_queue_index,
-                                      remote_boot_uuid, &monotonic_sent_time_,
+                                      source_boot_uuid, &monotonic_sent_time_,
                                       &realtime_sent_time_, &sent_queue_index_))
         << ": Somebody wrote outside the buffer of their message on channel "
         << configuration::CleanedChannelToString(channel());
 
-    wake_upper_.Wakeup(event_loop()->priority());
+    wake_upper_.Wakeup(event_loop()->is_running() ? event_loop()->priority()
+                                                  : 0);
     return true;
   }
 
@@ -555,17 +572,19 @@
               aos::monotonic_clock::time_point monotonic_remote_time,
               aos::realtime_clock::time_point realtime_remote_time,
               uint32_t remote_queue_index,
-              const UUID &remote_boot_uuid) override {
+              const UUID &source_boot_uuid) override {
+    shm_event_loop()->CheckCurrentThread();
     CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
         << ": Sent too big a message on "
         << configuration::CleanedChannelToString(channel());
     CHECK(lockless_queue_sender_.Send(
         reinterpret_cast<const char *>(msg), length, monotonic_remote_time,
-        realtime_remote_time, remote_queue_index, remote_boot_uuid,
+        realtime_remote_time, remote_queue_index, source_boot_uuid,
         &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_))
         << ": Somebody wrote outside the buffer of their message on channel "
         << configuration::CleanedChannelToString(channel());
-    wake_upper_.Wakeup(event_loop()->priority());
+    wake_upper_.Wakeup(event_loop()->is_running() ? event_loop()->priority()
+                                                  : 0);
     // TODO(austin): Return an error if we send too fast.
     return true;
   }
@@ -574,9 +593,16 @@
     return lockless_queue_memory_.GetMutableSharedMemory();
   }
 
-  int buffer_index() override { return lockless_queue_sender_.buffer_index(); }
+  int buffer_index() override {
+    shm_event_loop()->CheckCurrentThread();
+    return lockless_queue_sender_.buffer_index();
+  }
 
  private:
+  const ShmEventLoop *shm_event_loop() const {
+    return static_cast<const ShmEventLoop *>(event_loop());
+  }
+
   MMappedQueue lockless_queue_memory_;
   ipc_lib::LocklessQueueSender lockless_queue_sender_;
   ipc_lib::LocklessQueueWakeUpper wake_upper_;
@@ -599,9 +625,13 @@
     }
   }
 
-  ~ShmWatcherState() override { event_loop_->RemoveEvent(&event_); }
+  ~ShmWatcherState() override {
+    event_loop_->CheckCurrentThread();
+    event_loop_->RemoveEvent(&event_);
+  }
 
   void Startup(EventLoop *event_loop) override {
+    event_loop_->CheckCurrentThread();
     simple_shm_fetcher_.PointAtNextQueueIndex();
     CHECK(RegisterWakeup(event_loop->priority()));
   }
@@ -666,6 +696,7 @@
   }
 
   ~ShmTimerHandler() {
+    shm_event_loop_->CheckCurrentThread();
     Disable();
     shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
   }
@@ -705,6 +736,7 @@
 
   void Setup(monotonic_clock::time_point base,
              monotonic_clock::duration repeat_offset) override {
+    shm_event_loop_->CheckCurrentThread();
     if (event_.valid()) {
       shm_event_loop_->RemoveEvent(&event_);
     }
@@ -717,6 +749,7 @@
   }
 
   void Disable() override {
+    shm_event_loop_->CheckCurrentThread();
     shm_event_loop_->RemoveEvent(&event_);
     timerfd_.Disable();
     disabled_ = true;
@@ -766,6 +799,7 @@
   }
 
   ~ShmPhasedLoopHandler() override {
+    shm_event_loop_->CheckCurrentThread();
     shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
     shm_event_loop_->RemoveEvent(&event_);
   }
@@ -773,6 +807,7 @@
  private:
   // Reschedules the timer.
   void Schedule(monotonic_clock::time_point sleep_time) override {
+    shm_event_loop_->CheckCurrentThread();
     if (event_.valid()) {
       shm_event_loop_->RemoveEvent(&event_);
     }
@@ -792,6 +827,7 @@
 
 ::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
     const Channel *channel) {
+  CheckCurrentThread();
   if (!configuration::ChannelIsReadableOnNode(channel, node())) {
     LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
                << "\", \"type\": \"" << channel->type()->string_view()
@@ -805,6 +841,7 @@
 
 ::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
     const Channel *channel) {
+  CheckCurrentThread();
   TakeSender(channel);
 
   return ::std::unique_ptr<RawSender>(new ShmSender(shm_base_, this, channel));
@@ -813,6 +850,7 @@
 void ShmEventLoop::MakeRawWatcher(
     const Channel *channel,
     std::function<void(const Context &context, const void *message)> watcher) {
+  CheckCurrentThread();
   TakeWatcher(channel);
 
   NewWatcher(::std::unique_ptr<WatcherState>(
@@ -822,6 +860,7 @@
 void ShmEventLoop::MakeRawNoArgWatcher(
     const Channel *channel,
     std::function<void(const Context &context)> watcher) {
+  CheckCurrentThread();
   TakeWatcher(channel);
 
   NewWatcher(::std::unique_ptr<WatcherState>(new ShmWatcherState(
@@ -831,6 +870,7 @@
 }
 
 TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
+  CheckCurrentThread();
   return NewTimer(::std::unique_ptr<TimerHandler>(
       new ShmTimerHandler(this, ::std::move(callback))));
 }
@@ -839,14 +879,28 @@
     ::std::function<void(int)> callback,
     const monotonic_clock::duration interval,
     const monotonic_clock::duration offset) {
+  CheckCurrentThread();
   return NewPhasedLoop(::std::unique_ptr<PhasedLoopHandler>(
       new ShmPhasedLoopHandler(this, ::std::move(callback), interval, offset)));
 }
 
 void ShmEventLoop::OnRun(::std::function<void()> on_run) {
+  CheckCurrentThread();
   on_run_.push_back(::std::move(on_run));
 }
 
+void ShmEventLoop::CheckCurrentThread() const {
+  if (__builtin_expect(check_mutex_ != nullptr, false)) {
+    CHECK(check_mutex_->is_locked())
+        << ": The configured mutex is not locked while calling a "
+           "ShmEventLoop function";
+  }
+  if (__builtin_expect(!!check_tid_, false)) {
+    CHECK_EQ(syscall(SYS_gettid), *check_tid_)
+        << ": Being called from the wrong thread";
+  }
+}
+
 // This is a bit tricky because watchers can generate new events at any time (as
 // long as it's in the past). We want to check the watchers at least once before
 // declaring there are no events to handle, and we want to check them again if
@@ -1021,6 +1075,7 @@
 };
 
 void ShmEventLoop::Run() {
+  CheckCurrentThread();
   SignalHandler::global()->Register(this);
 
   if (watchers_.size() > 0) {
@@ -1063,6 +1118,7 @@
     }
 
     // Now that we are RT, run all the OnRun handlers.
+    SetTimerContext(monotonic_clock::now());
     for (const auto &run : on_run_) {
       run();
     }
@@ -1100,6 +1156,7 @@
 void ShmEventLoop::Exit() { epoll_.Quit(); }
 
 ShmEventLoop::~ShmEventLoop() {
+  CheckCurrentThread();
   // Force everything with a registered fd with epoll to be destroyed now.
   timers_.clear();
   phased_loops_.clear();
@@ -1109,6 +1166,7 @@
 }
 
 void ShmEventLoop::SetRuntimeRealtimePriority(int priority) {
+  CheckCurrentThread();
   if (is_running()) {
     LOG(FATAL) << "Cannot set realtime priority while running.";
   }
@@ -1116,6 +1174,7 @@
 }
 
 void ShmEventLoop::SetRuntimeAffinity(const cpu_set_t &cpuset) {
+  CheckCurrentThread();
   if (is_running()) {
     LOG(FATAL) << "Cannot set affinity while running.";
   }
@@ -1123,18 +1182,21 @@
 }
 
 void ShmEventLoop::set_name(const std::string_view name) {
+  CheckCurrentThread();
   name_ = std::string(name);
   UpdateTimingReport();
 }
 
 absl::Span<const char> ShmEventLoop::GetWatcherSharedMemory(
     const Channel *channel) {
+  CheckCurrentThread();
   ShmWatcherState *const watcher_state =
       static_cast<ShmWatcherState *>(GetWatcherState(channel));
   return watcher_state->GetSharedMemory();
 }
 
 int ShmEventLoop::NumberBuffers(const Channel *channel) {
+  CheckCurrentThread();
   return MakeQueueConfiguration(
              channel, chrono::ceil<chrono::seconds>(chrono::nanoseconds(
                           configuration()->channel_storage_duration())))
@@ -1143,14 +1205,19 @@
 
 absl::Span<char> ShmEventLoop::GetShmSenderSharedMemory(
     const aos::RawSender *sender) const {
+  CheckCurrentThread();
   return static_cast<const ShmSender *>(sender)->GetSharedMemory();
 }
 
 absl::Span<const char> ShmEventLoop::GetShmFetcherPrivateMemory(
     const aos::RawFetcher *fetcher) const {
+  CheckCurrentThread();
   return static_cast<const ShmFetcher *>(fetcher)->GetPrivateMemory();
 }
 
-pid_t ShmEventLoop::GetTid() { return syscall(SYS_gettid); }
+pid_t ShmEventLoop::GetTid() {
+  CheckCurrentThread();
+  return syscall(SYS_gettid);
+}
 
 }  // namespace aos
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index 845857c..3245f11 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -4,11 +4,11 @@
 #include <vector>
 
 #include "absl/types/span.h"
-
 #include "aos/events/epoll.h"
 #include "aos/events/event_loop.h"
 #include "aos/events/event_loop_generated.h"
 #include "aos/ipc_lib/signalfd.h"
+#include "aos/stl_mutex/stl_mutex.h"
 
 DECLARE_string(application_name);
 DECLARE_string(shm_base);
@@ -92,6 +92,7 @@
   // Returns the local mapping of the shared memory used by the provided Sender.
   template <typename T>
   absl::Span<char> GetSenderSharedMemory(aos::Sender<T> *sender) const {
+    CheckCurrentThread();
     return GetShmSenderSharedMemory(GetRawSender(sender));
   }
 
@@ -103,11 +104,30 @@
   template <typename T>
   absl::Span<const char> GetFetcherPrivateMemory(
       aos::Fetcher<T> *fetcher) const {
+    CheckCurrentThread();
     return GetShmFetcherPrivateMemory(GetRawFetcher(fetcher));
   }
 
   int NumberBuffers(const Channel *channel) override;
 
+  // All public-facing APIs will verify this mutex is held when they are called.
+  // For normal use with everything in a single thread, this is unnecessary.
+  //
+  // This is helpful as a safety check when using a ShmEventLoop with external
+  // synchronization across multiple threads. It will NOT reliably catch race
+  // conditions, but if you have a race condition triggered repeatedly it'll
+  // probably catch it eventually.
+  void CheckForMutex(aos::stl_mutex *check_mutex) {
+    check_mutex_ = check_mutex;
+  }
+
+  // All public-facing APIs will verify they are called in this thread.
+  // For normal use with the whole program in a single thread, this is
+  // unnecessary. It's helpful as a safety check for programs with multiple
+  // threads, where the EventLoop should only be interacted with from a single
+  // one.
+  void LockToThread() { check_tid_ = GetTid(); }
+
  private:
   friend class shm_event_loop_internal::ShmWatcherState;
   friend class shm_event_loop_internal::ShmTimerHandler;
@@ -126,6 +146,8 @@
     return result;
   }
 
+  void CheckCurrentThread() const;
+
   void HandleEvent();
 
   // Returns the TID of the event loop.
@@ -151,6 +173,9 @@
   std::string name_;
   const Node *const node_;
 
+  aos::stl_mutex *check_mutex_ = nullptr;
+  std::optional<pid_t> check_tid_;
+
   internal::EPoll epoll_;
 
   // Only set during Run().
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index 5524597..4c12213 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -3,13 +3,12 @@
 #include <string_view>
 
 #include "aos/events/event_loop_param_test.h"
+#include "aos/events/test_message_generated.h"
+#include "aos/network/team_number.h"
 #include "aos/realtime.h"
 #include "glog/logging.h"
 #include "gtest/gtest.h"
 
-#include "aos/events/test_message_generated.h"
-#include "aos/network/team_number.h"
-
 namespace aos {
 namespace testing {
 namespace {
@@ -123,6 +122,36 @@
 
 using ShmEventLoopDeathTest = ShmEventLoopTest;
 
+// Tests that we don't leave the calling thread realtime when calling Send
+// before Run.
+TEST_P(ShmEventLoopTest, SendBeforeRun) {
+  auto loop = factory()->MakePrimary("primary");
+  loop->SetRuntimeRealtimePriority(1);
+
+  auto loop2 = factory()->Make("loop2");
+  loop2->SetRuntimeRealtimePriority(2);
+  loop2->MakeWatcher("/test", [](const TestMessage &) {});
+  // Need the other one running for its watcher to record in SHM that it wants
+  // wakers to boost their priority, so leave it running in a thread for this
+  // test.
+  std::thread loop2_thread(
+      [&loop2]() { static_cast<ShmEventLoop *>(loop2.get())->Run(); });
+  std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+  auto sender = loop->MakeSender<TestMessage>("/test");
+  EXPECT_FALSE(IsRealtime());
+  {
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(200);
+    msg.Send(builder.Finish());
+  }
+  EXPECT_FALSE(IsRealtime());
+
+  static_cast<ShmEventLoop *>(loop2.get())->Exit();
+  loop2_thread.join();
+}
+
 // Tests that every handler type is realtime and runs.  There are threads
 // involved and it's easy to miss one.
 TEST_P(ShmEventLoopTest, AllHandlersAreRealtime) {
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 51021b6..4dc37fa 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -295,13 +295,13 @@
               aos::monotonic_clock::time_point monotonic_remote_time,
               aos::realtime_clock::time_point realtime_remote_time,
               uint32_t remote_queue_index,
-              const UUID &remote_boot_uuid) override;
+              const UUID &source_boot_uuid) override;
 
   bool DoSend(const void *msg, size_t size,
               aos::monotonic_clock::time_point monotonic_remote_time,
               aos::realtime_clock::time_point realtime_remote_time,
               uint32_t remote_queue_index,
-              const UUID &remote_boot_uuid) override;
+              const UUID &source_boot_uuid) override;
 
   int buffer_index() override {
     // First, ensure message_ is allocated.
@@ -542,6 +542,7 @@
     CHECK(!is_running()) << ": Cannot register OnRun callback while running.";
     scheduler_->ScheduleOnRun([this, on_run = std::move(on_run)]() {
       ScopedMarkRealtimeRestorer rt(priority() > 0);
+      SetTimerContext(monotonic_now());
       on_run();
     });
   }
@@ -852,7 +853,7 @@
                              monotonic_clock::time_point monotonic_remote_time,
                              realtime_clock::time_point realtime_remote_time,
                              uint32_t remote_queue_index,
-                             const UUID &remote_boot_uuid) {
+                             const UUID &source_boot_uuid) {
   // The allocations in here are due to infrastructure and don't count in the
   // no mallocs in RT code.
   ScopedNotRealtime nrt;
@@ -862,7 +863,7 @@
   message_->context.remote_queue_index = remote_queue_index;
   message_->context.realtime_event_time = event_loop_->realtime_now();
   message_->context.realtime_remote_time = realtime_remote_time;
-  message_->context.remote_boot_uuid = remote_boot_uuid;
+  message_->context.source_boot_uuid = source_boot_uuid;
   CHECK_LE(length, message_->context.size);
   message_->context.size = length;
 
@@ -882,7 +883,7 @@
                              monotonic_clock::time_point monotonic_remote_time,
                              realtime_clock::time_point realtime_remote_time,
                              uint32_t remote_queue_index,
-                             const UUID &remote_boot_uuid) {
+                             const UUID &source_boot_uuid) {
   CHECK_LE(size, this->size())
       << ": Attempting to send too big a message on "
       << configuration::CleanedChannelToString(simulated_channel_->channel());
@@ -899,7 +900,7 @@
          msg, size);
 
   return DoSend(size, monotonic_remote_time, realtime_remote_time,
-                remote_queue_index, remote_boot_uuid);
+                remote_queue_index, source_boot_uuid);
 }
 
 SimulatedTimerHandler::SimulatedTimerHandler(
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 7191365..4cb51de 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -1492,13 +1492,13 @@
   pi1_remote_timestamp->MakeWatcher(
       "/pi2/aos", [&expected_boot_uuid,
                    &pi1_remote_timestamp](const message_bridge::Timestamp &) {
-        EXPECT_EQ(pi1_remote_timestamp->context().remote_boot_uuid,
+        EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
                   expected_boot_uuid);
       });
   pi1_remote_timestamp->MakeWatcher(
       "/test",
       [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
-        EXPECT_EQ(pi1_remote_timestamp->context().remote_boot_uuid,
+        EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
                   expected_boot_uuid);
       });
   pi1_remote_timestamp->MakeWatcher(
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 40b468f..11e8c56 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -137,7 +137,7 @@
                   fetcher_->context().monotonic_event_time,
                   fetcher_->context().realtime_event_time,
                   fetcher_->context().queue_index,
-                  fetcher_->context().remote_boot_uuid);
+                  fetcher_->context().source_boot_uuid);
 
     // And simulate message_bridge's offset recovery.
     client_status_->SampleFilter(client_index_,
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index 98701b4..fe86f6c 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -908,7 +908,7 @@
     const char *data, size_t length,
     monotonic_clock::time_point monotonic_remote_time,
     realtime_clock::time_point realtime_remote_time,
-    uint32_t remote_queue_index, const UUID &remote_boot_uuid,
+    uint32_t remote_queue_index, const UUID &source_boot_uuid,
     monotonic_clock::time_point *monotonic_sent_time,
     realtime_clock::time_point *realtime_sent_time, uint32_t *queue_index) {
   CHECK_LE(length, size());
@@ -917,14 +917,14 @@
   // adhere to this convention and place it at the end.
   memcpy((reinterpret_cast<char *>(Data()) + size() - length), data, length);
   return Send(length, monotonic_remote_time, realtime_remote_time,
-              remote_queue_index, remote_boot_uuid, monotonic_sent_time,
+              remote_queue_index, source_boot_uuid, monotonic_sent_time,
               realtime_sent_time, queue_index);
 }
 
 bool LocklessQueueSender::Send(
     size_t length, monotonic_clock::time_point monotonic_remote_time,
     realtime_clock::time_point realtime_remote_time,
-    uint32_t remote_queue_index, const UUID &remote_boot_uuid,
+    uint32_t remote_queue_index, const UUID &source_boot_uuid,
     monotonic_clock::time_point *monotonic_sent_time,
     realtime_clock::time_point *realtime_sent_time, uint32_t *queue_index) {
   const size_t queue_size = memory_->queue_size();
@@ -949,7 +949,7 @@
   // Pass these through.  Any alternative behavior can be implemented out a
   // layer.
   message->header.remote_queue_index = remote_queue_index;
-  message->header.remote_boot_uuid = remote_boot_uuid;
+  message->header.source_boot_uuid = source_boot_uuid;
   message->header.monotonic_remote_time = monotonic_remote_time;
   message->header.realtime_remote_time = realtime_remote_time;
 
@@ -1210,7 +1210,7 @@
     realtime_clock::time_point *realtime_sent_time,
     monotonic_clock::time_point *monotonic_remote_time,
     realtime_clock::time_point *realtime_remote_time,
-    uint32_t *remote_queue_index, UUID *remote_boot_uuid, size_t *length,
+    uint32_t *remote_queue_index, UUID *source_boot_uuid, size_t *length,
     char *data) const {
   const size_t queue_size = memory_->queue_size();
 
@@ -1294,7 +1294,7 @@
   }
   *monotonic_remote_time = m->header.monotonic_remote_time;
   *realtime_remote_time = m->header.realtime_remote_time;
-  *remote_boot_uuid = m->header.remote_boot_uuid;
+  *source_boot_uuid = m->header.source_boot_uuid;
   if (data) {
     memcpy(data, m->data(memory_->message_data_size()),
            memory_->message_data_size());
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index 41aa0fb..b0fc425 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -94,7 +94,7 @@
     uint32_t remote_queue_index;
 
     // Remote boot UUID for this message.
-    UUID remote_boot_uuid;
+    UUID source_boot_uuid;
 
     size_t length;
   } header;
@@ -309,7 +309,7 @@
   void *Data();
   bool Send(size_t length, monotonic_clock::time_point monotonic_remote_time,
             realtime_clock::time_point realtime_remote_time,
-            uint32_t remote_queue_index, const UUID &remote_boot_uuid,
+            uint32_t remote_queue_index, const UUID &source_boot_uuid,
             monotonic_clock::time_point *monotonic_sent_time = nullptr,
             realtime_clock::time_point *realtime_sent_time = nullptr,
             uint32_t *queue_index = nullptr);
@@ -318,7 +318,7 @@
   bool Send(const char *data, size_t length,
             monotonic_clock::time_point monotonic_remote_time,
             realtime_clock::time_point realtime_remote_time,
-            uint32_t remote_queue_index, const UUID &remote_boot_uuid,
+            uint32_t remote_queue_index, const UUID &source_boot_uuid,
             monotonic_clock::time_point *monotonic_sent_time = nullptr,
             realtime_clock::time_point *realtime_sent_time = nullptr,
             uint32_t *queue_index = nullptr);
@@ -404,7 +404,7 @@
               realtime_clock::time_point *realtime_sent_time,
               monotonic_clock::time_point *monotonic_remote_time,
               realtime_clock::time_point *realtime_remote_time,
-              uint32_t *remote_queue_index, UUID *remote_boot_uuid,
+              uint32_t *remote_queue_index, UUID *source_boot_uuid,
               size_t *length, char *data) const;
 
   // Returns the index to the latest queue message.  Returns empty_queue_index()
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index b521d9e..ae780b8 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -622,14 +622,14 @@
           monotonic_clock::time_point monotonic_remote_time;
           realtime_clock::time_point realtime_remote_time;
           uint32_t remote_queue_index;
-          UUID remote_boot_uuid;
+          UUID source_boot_uuid;
           char read_data[1024];
           size_t length;
 
           LocklessQueueReader::Result read_result = reader.Read(
               i, &monotonic_sent_time, &realtime_sent_time,
               &monotonic_remote_time, &realtime_remote_time,
-              &remote_queue_index, &remote_boot_uuid, &length, &(read_data[0]));
+              &remote_queue_index, &source_boot_uuid, &length, &(read_data[0]));
 
           if (read_result != LocklessQueueReader::Result::GOOD) {
             if (read_result == LocklessQueueReader::Result::TOO_OLD) {
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index 91f995b..57867e4 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -251,7 +251,7 @@
     monotonic_clock::time_point monotonic_remote_time;
     realtime_clock::time_point realtime_remote_time;
     uint32_t remote_queue_index;
-    UUID remote_boot_uuid;
+    UUID source_boot_uuid;
     char read_data[1024];
     size_t length;
 
@@ -264,7 +264,7 @@
     LocklessQueueReader::Result read_result = reader.Read(
         index.index(), &monotonic_sent_time, &realtime_sent_time,
         &monotonic_remote_time, &realtime_remote_time, &remote_queue_index,
-        &remote_boot_uuid, &length, &(read_data[0]));
+        &source_boot_uuid, &length, &(read_data[0]));
 
     // This should either return GOOD, or TOO_OLD if it is before the start of
     // the queue.
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index cf46807..d14a638 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -267,7 +267,7 @@
     realtime_clock::time_point realtime_sent_time;
     monotonic_clock::time_point monotonic_remote_time;
     realtime_clock::time_point realtime_remote_time;
-    UUID remote_boot_uuid;
+    UUID source_boot_uuid;
     uint32_t remote_queue_index;
     size_t length;
     char read_data[1024];
@@ -279,7 +279,7 @@
     LocklessQueueReader::Result read_result = reader.Read(
         wrapped_i, &monotonic_sent_time, &realtime_sent_time,
         &monotonic_remote_time, &realtime_remote_time, &remote_queue_index,
-        &remote_boot_uuid, &length, &(read_data[0]));
+        &source_boot_uuid, &length, &(read_data[0]));
 
     if (race_reads) {
       if (read_result == LocklessQueueReader::Result::NOTHING_NEW) {
@@ -302,7 +302,7 @@
 
     EXPECT_EQ(monotonic_remote_time, aos::monotonic_clock::min_time);
     EXPECT_EQ(realtime_remote_time, aos::realtime_clock::min_time);
-    EXPECT_EQ(remote_boot_uuid, UUID::Zero());
+    EXPECT_EQ(source_boot_uuid, UUID::Zero());
 
     ThreadPlusCount tpc;
     ASSERT_EQ(length, sizeof(ThreadPlusCount));
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index 08350a8..1044b2f 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -186,24 +186,26 @@
 }
 
 void SctpClientConnection::SendConnect() {
+  VLOG(1) << "Sending Connect";
   // Try to send the connect message.  If that fails, retry.
-  if (!client_.Send(kConnectStream(),
-                    std::string_view(reinterpret_cast<const char *>(
-                                         connect_message_.span().data()),
-                                     connect_message_.span().size()),
-                    0)) {
+  if (client_.Send(kConnectStream(),
+                   std::string_view(reinterpret_cast<const char *>(
+                                        connect_message_.span().data()),
+                                    connect_message_.span().size()),
+                   0)) {
+    ScheduleConnectTimeout();
+  } else {
     NodeDisconnected();
   }
 }
 
 void SctpClientConnection::NodeConnected(sctp_assoc_t assoc_id) {
-  connect_timer_->Disable();
+  ScheduleConnectTimeout();
 
   // We want to tell the kernel to schedule the packets on this new stream with
   // the priority scheduler.  This only needs to be done once per stream.
   client_.SetPriorityScheduler(assoc_id);
 
-  remote_assoc_id_ = assoc_id;
   connection_->mutate_state(State::CONNECTED);
   client_status_->SampleReset(client_index_);
 }
@@ -212,13 +214,14 @@
   connect_timer_->Setup(
       event_loop_->monotonic_now() + chrono::milliseconds(100),
       chrono::milliseconds(100));
-  remote_assoc_id_ = 0;
   connection_->mutate_state(State::DISCONNECTED);
   connection_->mutate_monotonic_offset(0);
   client_status_->SampleReset(client_index_);
 }
 
 void SctpClientConnection::HandleData(const Message *message) {
+  ScheduleConnectTimeout();
+
   const RemoteData *remote_data =
       flatbuffers::GetSizePrefixedRoot<RemoteData>(message->data());
 
diff --git a/aos/network/message_bridge_client_lib.h b/aos/network/message_bridge_client_lib.h
index 2b48906..0552f79 100644
--- a/aos/network/message_bridge_client_lib.h
+++ b/aos/network/message_bridge_client_lib.h
@@ -53,6 +53,14 @@
   void NodeDisconnected();
   void HandleData(const Message *message);
 
+  // Schedules connect_timer_ for a ways in the future. If one of our messages
+  // gets dropped, the server might be waiting for this, so if we don't hear
+  // from the server for a while we'll try sending it again.
+  void ScheduleConnectTimeout() {
+    connect_timer_->Setup(event_loop_->context().monotonic_event_time +
+                          std::chrono::seconds(1));
+  }
+
   // Event loop to register the server on.
   aos::ShmEventLoop *const event_loop_;
 
@@ -81,10 +89,6 @@
   // Timer which fires to handle reconnections.
   aos::TimerHandler *connect_timer_;
 
-  // id of the server once known.  This is only valid if connection_ says
-  // connected.
-  sctp_assoc_t remote_assoc_id_ = 0;
-
   // ClientConnection statistics message to modify.  This will be published
   // periodicially.
   MessageBridgeClientStatus *client_status_;
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index ea75693..c929e31 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -40,7 +40,7 @@
                        context.size);
 
   flatbuffers::Offset<flatbuffers::Vector<uint8_t>> boot_uuid_offset =
-      context.remote_boot_uuid.PackVector(&fbb);
+      context.source_boot_uuid.PackVector(&fbb);
 
   RemoteData::Builder remote_data_builder(fbb);
   remote_data_builder.add_channel_index(channel_index_);
diff --git a/aos/network/message_bridge_server_status.cc b/aos/network/message_bridge_server_status.cc
index 48b442a..a662990 100644
--- a/aos/network/message_bridge_server_status.cc
+++ b/aos/network/message_bridge_server_status.cc
@@ -359,7 +359,7 @@
   context.realtime_event_time = timestamp_sender_.realtime_sent_time();
   context.queue_index = timestamp_sender_.sent_queue_index();
   context.size = timestamp_copy.span().size();
-  context.remote_boot_uuid = event_loop_->boot_uuid();
+  context.source_boot_uuid = event_loop_->boot_uuid();
   context.data = timestamp_copy.span().data();
 
   // Since we are building up the timestamp to send here, we need to trigger the
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 1953dc2..8222760 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -160,7 +160,7 @@
     pi1_test_event_loop->MakeWatcher(
         "/pi2/aos", [this](const Timestamp &timestamp) {
           VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
-          EXPECT_EQ(pi1_test_event_loop->context().remote_boot_uuid,
+          EXPECT_EQ(pi1_test_event_loop->context().source_boot_uuid,
                     pi2_boot_uuid_);
         });
   }
@@ -275,7 +275,7 @@
     pi2_test_event_loop->MakeWatcher(
         "/pi1/aos", [this](const Timestamp &timestamp) {
           VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
-          EXPECT_EQ(pi2_test_event_loop->context().remote_boot_uuid,
+          EXPECT_EQ(pi2_test_event_loop->context().source_boot_uuid,
                     pi1_boot_uuid_);
         });
     pi2_test_event_loop->MakeWatcher(
@@ -405,7 +405,7 @@
   int pong_count = 0;
   pong_event_loop.MakeWatcher("/test", [&pong_count, &pong_event_loop,
                                         this](const examples::Ping &ping) {
-    EXPECT_EQ(pong_event_loop.context().remote_boot_uuid, pi1_boot_uuid_);
+    EXPECT_EQ(pong_event_loop.context().source_boot_uuid, pi1_boot_uuid_);
     ++pong_count;
     VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
   });
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index a43fdfd..ebb7a19 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -353,11 +353,26 @@
 
   CHECK(!times_.empty())
       << ": Found no times to do timestamp estimation, please investigate.";
+}
+
+void InterpolatedTimeConverter::ObserveTimePassed(
+    distributed_clock::time_point time) {
   // Keep at least 500 points and time_estimation_buffer_seconds seconds of
   // time.  This should be enough to handle any reasonable amount of history.
-  while (times_.size() > kHistoryMinCount &&
-         std::get<0>(times_.front()) + time_estimation_buffer_seconds_ <
-             std::get<0>(times_.back())) {
+  while (true) {
+    if (times_.size() < kHistoryMinCount) {
+      return;
+    }
+    if (std::get<0>(times_[1]) + time_estimation_buffer_seconds_ > time) {
+      VLOG(1) << "Not popping because "
+              << std::get<0>(times_[1]) + time_estimation_buffer_seconds_
+              << " > " << time;
+      return;
+    }
+
+    VLOG(1) << "Popping sample because " << times_.size() << " > "
+            << kHistoryMinCount << " && " << std::get<0>(times_[1]) << " < "
+            << time - time_estimation_buffer_seconds_;
     times_.pop_front();
     have_popped_ = true;
   }
diff --git a/aos/network/multinode_timestamp_filter.h b/aos/network/multinode_timestamp_filter.h
index 929c837..96f7228 100644
--- a/aos/network/multinode_timestamp_filter.h
+++ b/aos/network/multinode_timestamp_filter.h
@@ -177,6 +177,9 @@
   monotonic_clock::time_point FromDistributedClock(
       size_t node_index, distributed_clock::time_point time) override;
 
+  // Called whenever time passes this point and we can forget about it.
+  void ObserveTimePassed(distributed_clock::time_point time) override;
+
  private:
   // Returns the next timestamp, or nullopt if there isn't one. It is assumed
   // that if there isn't one, there never will be one.
diff --git a/aos/network/multinode_timestamp_filter_test.cc b/aos/network/multinode_timestamp_filter_test.cc
index f1cf0aa..bdc1992 100644
--- a/aos/network/multinode_timestamp_filter_test.cc
+++ b/aos/network/multinode_timestamp_filter_test.cc
@@ -223,13 +223,8 @@
   EXPECT_EQ(me + chrono::milliseconds(10),
             time_converter.FromDistributedClock(0, de + kDt));
 
-  // Force 10.1 seconds now.  This will forget the 0th point at the origin.
-  EXPECT_EQ(
-      de + kDefaultHistoryDuration + kDt,
-      time_converter.ToDistributedClock(0, me + kDefaultHistoryDuration + kDt));
-  EXPECT_EQ(me + kDefaultHistoryDuration + kDt,
-            time_converter.FromDistributedClock(
-                0, de + kDefaultHistoryDuration + kDt));
+  // Now force ourselves to forget.
+  time_converter.ObserveTimePassed(de + kDefaultHistoryDuration + kDt * 3 / 2);
 
   // Yup, can't read the origin anymore.
   EXPECT_DEATH({ LOG(INFO) << time_converter.ToDistributedClock(0, me); },
diff --git a/aos/stl_mutex/stl_mutex.h b/aos/stl_mutex/stl_mutex.h
index 86a5988..e3a930e 100644
--- a/aos/stl_mutex/stl_mutex.h
+++ b/aos/stl_mutex/stl_mutex.h
@@ -61,6 +61,11 @@
   bool owner_died() const { return owner_died_; }
   void consistent() { owner_died_ = false; }
 
+  // Returns whether this mutex is locked by the current thread. This is very
+  // hard to use reliably, please think very carefully before using it for
+  // anything beyond probabilistic assertion checks.
+  bool is_locked() const { return mutex_islocked(&native_handle_); }
+
  private:
   aos_mutex native_handle_;
 
@@ -82,7 +87,7 @@
   constexpr stl_recursive_mutex() {}
 
   void lock() {
-    if (mutex_islocked(mutex_.native_handle())) {
+    if (mutex_.is_locked()) {
       CHECK(!owner_died());
       ++recursive_locks_;
     } else {
@@ -95,7 +100,7 @@
     }
   }
   bool try_lock() {
-    if (mutex_islocked(mutex_.native_handle())) {
+    if (mutex_.is_locked()) {
       CHECK(!owner_died());
       ++recursive_locks_;
       return true;
diff --git a/frc971/control_loops/drivetrain/drivetrain_test_lib.cc b/frc971/control_loops/drivetrain/drivetrain_test_lib.cc
index fb685f3..7680688 100644
--- a/frc971/control_loops/drivetrain/drivetrain_test_lib.cc
+++ b/frc971/control_loops/drivetrain/drivetrain_test_lib.cc
@@ -122,7 +122,6 @@
                   dt_config_.make_hybrid_drivetrain_velocity_loop()))) {
   Reinitialize();
   last_U_.setZero();
-
   event_loop_->AddPhasedLoop(
       [this](int) {
         // Skip this the first time.
@@ -144,10 +143,11 @@
         first_ = false;
         SendPositionMessage();
         SendTruthMessage();
+        SendImuMessage();
       },
       dt_config_.dt);
-
-  event_loop_->AddPhasedLoop([this](int) { SendImuMessage(); },
+  // TODO(milind): We should be able to get IMU readings at 1 kHz instead of 2.
+  event_loop_->AddPhasedLoop([this](int) { ReadImu(); },
                              std::chrono::microseconds(500));
 }
 
@@ -188,54 +188,78 @@
   }
 }
 
-void DrivetrainSimulation::SendImuMessage() {
+void DrivetrainSimulation::ReadImu() {
+  // Don't accumalate readings when we aren't sending them
   if (!send_messages_) {
     return;
   }
-  auto builder = imu_sender_.MakeBuilder();
 
-  frc971::ADIS16470DiagStat::Builder diag_stat_builder =
-      builder.MakeBuilder<frc971::ADIS16470DiagStat>();
-  diag_stat_builder.add_clock_error(false);
-  diag_stat_builder.add_memory_failure(imu_faulted_);
-  diag_stat_builder.add_sensor_failure(false);
-  diag_stat_builder.add_standby_mode(false);
-  diag_stat_builder.add_spi_communication_error(false);
-  diag_stat_builder.add_flash_memory_update_error(false);
-  diag_stat_builder.add_data_path_overrun(false);
-
-  const auto diag_stat_offset = diag_stat_builder.Finish();
-
-  frc971::IMUValues::Builder imu_builder =
-      builder.MakeBuilder<frc971::IMUValues>();
-  imu_builder.add_self_test_diag_stat(diag_stat_offset);
   const Eigen::Vector3d gyro =
       dt_config_.imu_transform.inverse() *
       Eigen::Vector3d(0.0, 0.0,
                       (drivetrain_plant_.X(3, 0) - drivetrain_plant_.X(1, 0)) /
                           (dt_config_.robot_radius * 2.0));
-  imu_builder.add_gyro_x(gyro.x());
-  imu_builder.add_gyro_y(gyro.y());
-  imu_builder.add_gyro_z(gyro.z());
+
   // Acceleration due to gravity, in m/s/s.
   constexpr double kG = 9.807;
   const Eigen::Vector3d accel =
       dt_config_.imu_transform.inverse() *
       Eigen::Vector3d(last_acceleration_.x() / kG, last_acceleration_.y() / kG,
                       1.0);
-  imu_builder.add_accelerometer_x(accel.x());
-  imu_builder.add_accelerometer_y(accel.y());
-  imu_builder.add_accelerometer_z(accel.z());
-  imu_builder.add_monotonic_timestamp_ns(
+  const int64_t timestamp =
       std::chrono::duration_cast<std::chrono::nanoseconds>(
           event_loop_->monotonic_now().time_since_epoch())
-          .count());
-  flatbuffers::Offset<frc971::IMUValues> imu_values_offsets =
-      imu_builder.Finish();
+          .count();
+  imu_readings_.push({.gyro = gyro,
+                      .accel = accel,
+                      .timestamp = timestamp,
+                      .faulted = imu_faulted_});
+}
+
+void DrivetrainSimulation::SendImuMessage() {
+  if (!send_messages_) {
+    return;
+  }
+
+  std::vector<flatbuffers::Offset<IMUValues>> imu_values;
+  auto builder = imu_sender_.MakeBuilder();
+
+  // Send all the IMU readings and pop the ones we have sent
+  while (!imu_readings_.empty()) {
+    const auto imu_reading = imu_readings_.front();
+    imu_readings_.pop();
+
+    frc971::ADIS16470DiagStat::Builder diag_stat_builder =
+        builder.MakeBuilder<frc971::ADIS16470DiagStat>();
+    diag_stat_builder.add_clock_error(false);
+    diag_stat_builder.add_memory_failure(imu_reading.faulted);
+    diag_stat_builder.add_sensor_failure(false);
+    diag_stat_builder.add_standby_mode(false);
+    diag_stat_builder.add_spi_communication_error(false);
+    diag_stat_builder.add_flash_memory_update_error(false);
+    diag_stat_builder.add_data_path_overrun(false);
+
+    const auto diag_stat_offset = diag_stat_builder.Finish();
+
+    frc971::IMUValues::Builder imu_builder =
+        builder.MakeBuilder<frc971::IMUValues>();
+    imu_builder.add_self_test_diag_stat(diag_stat_offset);
+
+    imu_builder.add_gyro_x(imu_reading.gyro.x());
+    imu_builder.add_gyro_y(imu_reading.gyro.y());
+    imu_builder.add_gyro_z(imu_reading.gyro.z());
+
+    imu_builder.add_accelerometer_x(imu_reading.accel.x());
+    imu_builder.add_accelerometer_y(imu_reading.accel.y());
+    imu_builder.add_accelerometer_z(imu_reading.accel.z());
+    imu_builder.add_monotonic_timestamp_ns(imu_reading.timestamp);
+
+    imu_values.push_back(imu_builder.Finish());
+  }
+
   flatbuffers::Offset<
       flatbuffers::Vector<flatbuffers::Offset<frc971::IMUValues>>>
-      imu_values_offset = builder.fbb()->CreateVector(&imu_values_offsets, 1);
-
+      imu_values_offset = builder.fbb()->CreateVector(imu_values);
   frc971::IMUValuesBatch::Builder imu_values_batch_builder =
       builder.MakeBuilder<frc971::IMUValuesBatch>();
   imu_values_batch_builder.add_readings(imu_values_offset);
diff --git a/frc971/control_loops/drivetrain/drivetrain_test_lib.h b/frc971/control_loops/drivetrain/drivetrain_test_lib.h
index 2075e66..b98711b 100644
--- a/frc971/control_loops/drivetrain/drivetrain_test_lib.h
+++ b/frc971/control_loops/drivetrain/drivetrain_test_lib.h
@@ -1,6 +1,9 @@
 #ifndef FRC971_CONTROL_LOOPS_DRIVETRAIN_DRIVETRAIN_TEST_LIB_H_
 #define FRC971_CONTROL_LOOPS_DRIVETRAIN_DRIVETRAIN_TEST_LIB_H_
 
+#include <queue>
+#include <vector>
+
 #include "aos/events/event_loop.h"
 #include "frc971/control_loops/control_loops_generated.h"
 #include "frc971/control_loops/drivetrain/drivetrain_config.h"
@@ -76,16 +79,27 @@
   // Set whether we should send out the drivetrain Position and IMU messages
   // (this will keep sending the "truth" message).
   void set_send_messages(const bool send_messages) {
+    if (!send_messages && !imu_readings_.empty()) {
+      // Flush current IMU readings
+      SendImuMessage();
+    }
     send_messages_ = send_messages;
   }
 
-  void set_imu_faulted(const bool fault_imu) {
-    imu_faulted_ = fault_imu;
-  }
+  void set_imu_faulted(const bool fault_imu) { imu_faulted_ = fault_imu; }
 
  private:
+  struct ImuReading {
+    Eigen::Vector3d gyro;
+    Eigen::Vector3d accel;
+    int64_t timestamp;
+    bool faulted;
+  };
+
   // Sends out the position queue messages.
   void SendPositionMessage();
+  // Reads and stores the IMU state
+  void ReadImu();
   // Sends out the IMU messages.
   void SendImuMessage();
   // Sends out the "truth" status message.
@@ -109,6 +123,8 @@
 
   bool imu_faulted_ = false;
 
+  std::queue<ImuReading> imu_readings_;
+
   DrivetrainConfig<double> dt_config_;
 
   DrivetrainPlant drivetrain_plant_;
diff --git a/y2021_bot3/control_loops/python/drivetrain.py b/y2021_bot3/control_loops/python/drivetrain.py
index e5b001a..fdd08c2 100644
--- a/y2021_bot3/control_loops/python/drivetrain.py
+++ b/y2021_bot3/control_loops/python/drivetrain.py
@@ -16,10 +16,11 @@
     J=6.0,
     mass=58.0,
     # TODO(austin): Measure radius a bit better.
-    robot_radius=0.7 / 2.0,
-    wheel_radius=6.0 * 0.0254 / 2.0,
+    robot_radius= 0.39,
+    wheel_radius= 3/39.37,
     motor_type=control_loop.Falcon(),
-    G=(8.0 / 70.0) * (17.0 / 24.0),
+    num_motors = 3,
+    G=8.0 / 80.0,
     q_pos=0.24,
     q_vel=2.5,
     efficiency=0.80,