Merge changes I8675ec52,I2d8fd5b6

* changes:
  Sort wakeups and add send latency.
  Make raw senders able to send flatbuffers
diff --git a/aos/controls/control_loop_test.h b/aos/controls/control_loop_test.h
index a0a844f..a2dbb7b 100644
--- a/aos/controls/control_loop_test.h
+++ b/aos/controls/control_loop_test.h
@@ -93,6 +93,10 @@
     return event_loop_factory_.MakeEventLoop(name);
   }
 
+  void set_send_delay(std::chrono::nanoseconds send_delay) {
+    event_loop_factory_.set_send_delay(send_delay);
+  }
+
   void RunFor(monotonic_clock::duration duration) {
     event_loop_factory_.RunFor(duration);
   }
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 6ffa526..dfad4ff 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -45,6 +45,7 @@
     name = "event_loop",
     srcs = [
         "event_loop.cc",
+        "event_loop_event.h",
         "event_loop_tmpl.h",
     ],
     hdrs = [
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index 01f6a3e..2c3a215 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -65,6 +65,7 @@
 
 EventLoop::~EventLoop() {
   CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
+  CHECK_EQ(events_.size(), 0u) << ": Not all events unregistered";
 }
 
 int EventLoop::ChannelIndex(const Channel *channel) {
@@ -377,6 +378,39 @@
   }
 }
 
+void EventLoop::ReserveEvents() {
+  events_.reserve(timers_.size() + phased_loops_.size() + watchers_.size());
+}
+
+namespace {
+bool CompareEvents(const EventLoopEvent *first, const EventLoopEvent *second) {
+  return first->event_time() > second->event_time();
+}
+}  // namespace
+
+void EventLoop::AddEvent(EventLoopEvent *event) {
+  DCHECK(std::find(events_.begin(), events_.end(), event) == events_.end());
+  events_.push_back(event);
+  std::push_heap(events_.begin(), events_.end(), CompareEvents);
+}
+
+void EventLoop::RemoveEvent(EventLoopEvent *event) {
+  auto e = std::find(events_.begin(), events_.end(), event);
+  if (e != events_.end()) {
+    events_.erase(e);
+    std::make_heap(events_.begin(), events_.end(), CompareEvents);
+    event->Invalidate();
+  }
+}
+
+EventLoopEvent *EventLoop::PopEvent() {
+  EventLoopEvent *result = events_.front();
+  std::pop_heap(events_.begin(), events_.end(), CompareEvents);
+  events_.pop_back();
+  result->Invalidate();
+  return result;
+}
+
 void WatcherState::set_timing_report(timing::Watcher *watcher) {
   CHECK_NOTNULL(watcher);
   watcher_ = watcher;
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 16980dd..68b1b5e 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -7,6 +7,7 @@
 
 #include "aos/configuration.h"
 #include "aos/configuration_generated.h"
+#include "aos/events/event_loop_event.h"
 #include "aos/events/event_loop_generated.h"
 #include "aos/events/timing_statistics.h"
 #include "aos/flatbuffers.h"
@@ -424,9 +425,10 @@
   WatcherState *NewWatcher(std::unique_ptr<WatcherState> watcher);
 
   std::vector<RawSender *> senders_;
+  std::vector<RawFetcher *> fetchers_;
+
   std::vector<std::unique_ptr<TimerHandler>> timers_;
   std::vector<std::unique_ptr<PhasedLoopHandler>> phased_loops_;
-  std::vector<RawFetcher *> fetchers_;
   std::vector<std::unique_ptr<WatcherState>> watchers_;
 
   void SendTimingReport();
@@ -435,6 +437,19 @@
 
   std::unique_ptr<RawSender> timing_report_sender_;
 
+  // Tracks which event sources (timers and watchers) have data, and which
+  // don't.  Added events may not change their event_time().
+  // TODO(austin): Test case 1: timer triggers at t1, handler takes until after
+  // t2 to run, t2 should then be picked up without a context switch.
+  void AddEvent(EventLoopEvent *event);
+  void RemoveEvent(EventLoopEvent *event);
+  size_t EventCount() const { return events_.size(); }
+  EventLoopEvent *PopEvent();
+  EventLoopEvent *PeekEvent() { return events_.front(); }
+  void ReserveEvents();
+
+  std::vector<EventLoopEvent *> events_;
+
  private:
   virtual pid_t GetTid() = 0;
 
diff --git a/aos/events/event_loop_event.h b/aos/events/event_loop_event.h
new file mode 100644
index 0000000..6438da1
--- /dev/null
+++ b/aos/events/event_loop_event.h
@@ -0,0 +1,46 @@
+#ifndef AOS_EVENTS_EVENT_LOOP_EVENT_H
+#define AOS_EVENTS_EVENT_LOOP_EVENT_H
+
+#include "aos/time/time.h"
+#include "glog/logging.h"
+
+namespace aos {
+
+// Common interface to track when callbacks and timers should have happened.
+class EventLoopEvent {
+ public:
+  virtual ~EventLoopEvent() {}
+
+  bool valid() const { return event_time_ != monotonic_clock::max_time; }
+  void Invalidate() { event_time_ = monotonic_clock::max_time; }
+
+  monotonic_clock::time_point event_time() const {
+    DCHECK(valid());
+    return event_time_;
+  }
+
+  virtual void HandleEvent() = 0;
+
+  void set_event_time(monotonic_clock::time_point event_time) {
+    event_time_ = event_time;
+  }
+
+ private:
+  monotonic_clock::time_point event_time_ = monotonic_clock::max_time;
+};
+
+// Adapter class to implement EventLoopEvent by calling HandleEvent on T.
+template <typename T>
+class EventHandler final : public EventLoopEvent {
+ public:
+  EventHandler(T *t) : t_(t) {}
+  ~EventHandler() = default;
+  void HandleEvent() override { t_->HandleEvent(); }
+
+ private:
+  T *const t_;
+};
+
+}  // namespace aos
+
+#endif  // AOS_EVENTS_EVENT_LOOP_EVENT_H
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 834ca16..cf3b6df 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -1099,5 +1099,51 @@
   EXPECT_EQ(primary_report.message().fetchers()->Get(1)->count(), 10);
 }
 
+// Tests that a raw watcher and raw fetcher can receive messages from a raw
+// sender without messing up offsets.
+TEST_P(AbstractEventLoopTest, RawBasic) {
+  auto loop1 = Make();
+  auto loop2 = MakePrimary();
+  auto loop3 = Make();
+
+  const std::string kData("971 is the best");
+
+  std::unique_ptr<aos::RawSender> sender =
+      loop1->MakeRawSender(loop1->configuration()->channels()->Get(1));
+
+  std::unique_ptr<aos::RawFetcher> fetcher =
+      loop3->MakeRawFetcher(loop3->configuration()->channels()->Get(1));
+
+  loop2->OnRun(
+      [&]() { EXPECT_TRUE(sender->Send(kData.data(), kData.size())); });
+
+  bool happened = false;
+  loop2->MakeRawWatcher(
+      loop2->configuration()->channels()->Get(1),
+      [this, &kData, &fetcher, &happened](const Context &context,
+                                          const void *message) {
+        happened = true;
+        EXPECT_EQ(std::string_view(kData),
+                  std::string_view(reinterpret_cast<const char *>(message),
+                                   context.size));
+        EXPECT_EQ(std::string_view(kData),
+                  std::string_view(reinterpret_cast<const char *>(context.data),
+                                   context.size));
+
+        ASSERT_TRUE(fetcher->Fetch());
+
+        EXPECT_EQ(std::string_view(kData),
+                  std::string_view(
+                      reinterpret_cast<const char *>(fetcher->context().data),
+                      fetcher->context().size));
+
+        this->Exit();
+      });
+
+  EXPECT_FALSE(happened);
+  Run();
+  EXPECT_TRUE(happened);
+}
+
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index b69f28a..f4cf2ec 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -90,8 +90,7 @@
   // Ends the given event loop at the given time from now.
   void EndEventLoop(EventLoop *loop, ::std::chrono::milliseconds duration) {
     auto end_timer = loop->AddTimer([this]() { this->Exit(); });
-    end_timer->Setup(loop->monotonic_now() +
-                     ::std::chrono::milliseconds(duration));
+    end_timer->Setup(loop->monotonic_now() + duration);
     end_timer->set_name("end");
   }
 
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index f5feea9..e549d57 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -179,8 +179,6 @@
   }
 
   bool FetchNext() {
-    // TODO(austin): Write a test which starts with nothing in the queue,
-    // and then calls FetchNext() after something is sent.
     // TODO(austin): Get behind and make sure it dies both here and with
     // Fetch.
     ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
@@ -340,41 +338,41 @@
 class WatcherState : public aos::WatcherState {
  public:
   WatcherState(
-      EventLoop *event_loop, const Channel *channel,
+      ShmEventLoop *event_loop, const Channel *channel,
       std::function<void(const Context &context, const void *message)> fn)
       : aos::WatcherState(event_loop, channel, std::move(fn)),
+        event_loop_(event_loop),
+        event_(this),
         simple_shm_fetcher_(channel) {}
 
-  ~WatcherState() override {}
+  ~WatcherState() override { event_loop_->RemoveEvent(&event_); }
 
   void Startup(EventLoop *event_loop) override {
-    PointAtNextQueueIndex();
+    simple_shm_fetcher_.PointAtNextQueueIndex();
     CHECK(RegisterWakeup(event_loop->priority()));
   }
 
-  // Points the next message to fetch at the queue index which will be populated
-  // next.
-  void PointAtNextQueueIndex() { simple_shm_fetcher_.PointAtNextQueueIndex(); }
-
   // Returns true if there is new data available.
-  bool HasNewData() {
+  bool CheckForNewData() {
     if (!has_new_data_) {
       has_new_data_ = simple_shm_fetcher_.FetchNext();
+
+      if (has_new_data_) {
+        event_.set_event_time(
+            simple_shm_fetcher_.context().monotonic_sent_time);
+        event_loop_->AddEvent(&event_);
+      }
     }
 
     return has_new_data_;
   }
 
-  // Returns the time of the current data sample.
-  aos::monotonic_clock::time_point event_time() const {
-    return simple_shm_fetcher_.context().monotonic_sent_time;
-  }
-
   // Consumes the data by calling the callback.
-  void CallCallback() {
+  void HandleEvent() {
     CHECK(has_new_data_);
     DoCallCallback(monotonic_clock::now, simple_shm_fetcher_.context());
     has_new_data_ = false;
+    CheckForNewData();
   }
 
   // Registers us to receive a signal on event reception.
@@ -387,37 +385,68 @@
  private:
   bool has_new_data_ = false;
 
+  ShmEventLoop *event_loop_;
+  EventHandler<WatcherState> event_;
   SimpleShmFetcher simple_shm_fetcher_;
 };
 
 // Adapter class to adapt a timerfd to a TimerHandler.
-class TimerHandlerState : public TimerHandler {
+class TimerHandlerState final : public TimerHandler {
  public:
   TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
       : TimerHandler(shm_event_loop, std::move(fn)),
-        shm_event_loop_(shm_event_loop) {
-    shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
-      const uint64_t elapsed_cycles = timerfd_.Read();
-
-      Call(monotonic_clock::now, base_);
-
-      base_ += repeat_offset_ * elapsed_cycles;
-    });
+        shm_event_loop_(shm_event_loop),
+        event_(this) {
+    shm_event_loop_->epoll_.OnReadable(
+        timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
   }
 
-  ~TimerHandlerState() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
+  ~TimerHandlerState() {
+    Disable();
+    shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
+  }
+
+  void HandleEvent() {
+    uint64_t elapsed_cycles = timerfd_.Read();
+    if (elapsed_cycles == 0u) {
+      // We got called before the timer interrupt could happen, but because we
+      // are checking the time, we got called on time.  Push the timer out by 1
+      // cycle.
+      elapsed_cycles = 1u;
+      timerfd_.SetTime(base_ + repeat_offset_, repeat_offset_);
+    }
+
+    Call(monotonic_clock::now, base_);
+
+    base_ += repeat_offset_ * elapsed_cycles;
+
+    if (repeat_offset_ != chrono::seconds(0)) {
+      event_.set_event_time(base_);
+      shm_event_loop_->AddEvent(&event_);
+    }
+  }
 
   void Setup(monotonic_clock::time_point base,
              monotonic_clock::duration repeat_offset) override {
+    if (event_.valid()) {
+      shm_event_loop_->RemoveEvent(&event_);
+    }
+
     timerfd_.SetTime(base, repeat_offset);
     base_ = base;
     repeat_offset_ = repeat_offset;
+    event_.set_event_time(base_);
+    shm_event_loop_->AddEvent(&event_);
   }
 
-  void Disable() override { timerfd_.Disable(); }
+  void Disable() override {
+    shm_event_loop_->RemoveEvent(&event_);
+    timerfd_.Disable();
+  }
 
  private:
   ShmEventLoop *shm_event_loop_;
+  EventHandler<TimerHandlerState> event_;
 
   TimerFd timerfd_;
 
@@ -426,33 +455,52 @@
 };
 
 // Adapter class to the timerfd and PhasedLoop.
-class PhasedLoopHandler : public ::aos::PhasedLoopHandler {
+class PhasedLoopHandler final : public ::aos::PhasedLoopHandler {
  public:
   PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
                     const monotonic_clock::duration interval,
                     const monotonic_clock::duration offset)
       : aos::PhasedLoopHandler(shm_event_loop, std::move(fn), interval, offset),
-        shm_event_loop_(shm_event_loop) {
-    shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
-      timerfd_.Read();
-      Call(monotonic_clock::now,
-           [this](monotonic_clock::time_point sleep_time) {
-             Schedule(sleep_time);
-           });
+        shm_event_loop_(shm_event_loop),
+        event_(this) {
+    shm_event_loop_->epoll_.OnReadable(
+        timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
+  }
+
+  void HandleEvent() {
+    // The return value for read is the number of cycles that have elapsed.
+    // Because we check to see when this event *should* have happened, there are
+    // cases where Read() will return 0, when 1 cycle has actually happened.
+    // This occurs when the timer interrupt hasn't triggered yet.  Therefore,
+    // ignore it.  Call handles rescheduling and calculating elapsed cycles
+    // without any extra help.
+    timerfd_.Read();
+    event_.Invalidate();
+
+    Call(monotonic_clock::now, [this](monotonic_clock::time_point sleep_time) {
+      Schedule(sleep_time);
     });
   }
 
   ~PhasedLoopHandler() override {
     shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
+    shm_event_loop_->RemoveEvent(&event_);
   }
 
  private:
   // Reschedules the timer.
   void Schedule(monotonic_clock::time_point sleep_time) override {
+    if (event_.valid()) {
+      shm_event_loop_->RemoveEvent(&event_);
+    }
+
     timerfd_.SetTime(sleep_time, ::aos::monotonic_clock::zero());
+    event_.set_event_time(sleep_time);
+    shm_event_loop_->AddEvent(&event_);
   }
 
   ShmEventLoop *shm_event_loop_;
+  EventHandler<PhasedLoopHandler> event_;
 
   TimerFd timerfd_;
 };
@@ -497,33 +545,23 @@
   on_run_.push_back(::std::move(on_run));
 }
 
-void ShmEventLoop::HandleWatcherSignal() {
+void ShmEventLoop::HandleEvent() {
+  // Update all the times for handlers.
+  for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
+    internal::WatcherState *watcher =
+        reinterpret_cast<internal::WatcherState *>(base_watcher.get());
+
+    watcher->CheckForNewData();
+  }
+
   while (true) {
-    // Call the handlers in time order of their messages.
-    aos::monotonic_clock::time_point min_event_time =
-        aos::monotonic_clock::max_time;
-    size_t min_watcher_index = -1;
-    size_t watcher_index = 0;
-    for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
-      internal::WatcherState *watcher =
-          reinterpret_cast<internal::WatcherState *>(base_watcher.get());
-
-      if (watcher->HasNewData()) {
-        if (watcher->event_time() < min_event_time) {
-          min_watcher_index = watcher_index;
-          min_event_time = watcher->event_time();
-        }
-      }
-      ++watcher_index;
-    }
-
-    if (min_event_time == aos::monotonic_clock::max_time) {
+    if (EventCount() == 0 ||
+        PeekEvent()->event_time() > monotonic_clock::now()) {
       break;
     }
 
-    reinterpret_cast<internal::WatcherState *>(
-        watchers_[min_watcher_index].get())
-        ->CallCallback();
+    EventLoopEvent *event = PopEvent();
+    event->HandleEvent();
   }
 }
 
@@ -635,12 +673,14 @@
       // watchers, and calling the oldest thing first.  That will improve
       // determinism a lot.
 
-      HandleWatcherSignal();
+      HandleEvent();
     });
   }
 
   MaybeScheduleTimingReports();
 
+  ReserveEvents();
+
   // Now, all the callbacks are setup.  Lock everything into memory and go RT.
   if (priority_ != 0) {
     ::aos::InitRT();
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index 89a2721..5063186 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -77,7 +77,7 @@
   friend class internal::ShmSender;
   friend class internal::ShmFetcher;
 
-  void HandleWatcherSignal();
+  void HandleEvent();
 
   // Tracks that we can't have multiple watchers or a sender and a watcher (or
   // multiple senders) on a single queue (path).
@@ -94,6 +94,7 @@
   internal::EPoll epoll_;
 };
 
+
 }  // namespace aos
 
 #endif  // AOS_EVENTS_SHM_EVENT_LOOP_H_
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index a9dd3e5..29c9edd 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -29,41 +29,38 @@
   OveralignedChar actual_data[];
 };
 
+class SimulatedEventLoop;
 class SimulatedFetcher;
 class SimulatedChannel;
 
-class ShmWatcher : public WatcherState {
+class SimulatedWatcher : public WatcherState {
  public:
-  ShmWatcher(
-      EventLoop *event_loop, const Channel *channel,
-      std::function<void(const Context &context, const void *message)> fn)
-      : WatcherState(event_loop, channel, std::move(fn)),
-        event_loop_(event_loop) {}
+  SimulatedWatcher(
+      SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
+      const Channel *channel,
+      std::function<void(const Context &context, const void *message)> fn);
 
-  ~ShmWatcher() override;
-
-  void Call(const Context &context) {
-    const monotonic_clock::time_point monotonic_now =
-        event_loop_->monotonic_now();
-    DoCallCallback([monotonic_now]() { return monotonic_now; }, context);
-  }
+  ~SimulatedWatcher() override;
 
   void Startup(EventLoop * /*event_loop*/) override {}
 
-  void Schedule(EventScheduler *scheduler,
-                std::shared_ptr<SimulatedMessage> message) {
-    // TODO(austin): Track the token once we schedule in the future.
-    // TODO(austin): Schedule wakeup in the future so we don't have 0 latency.
-    scheduler->Schedule(scheduler->monotonic_now(),
-                        [this, message]() { Call(message->context); });
-  }
+  void Schedule(std::shared_ptr<SimulatedMessage> message);
+
+  void HandleEvent();
 
   void SetSimulatedChannel(SimulatedChannel *channel) {
     simulated_channel_ = channel;
   }
 
  private:
-  EventLoop *event_loop_;
+  void DoSchedule(monotonic_clock::time_point event_time);
+
+  ::std::deque<std::shared_ptr<SimulatedMessage>> msgs_;
+
+  SimulatedEventLoop *simulated_event_loop_;
+  EventHandler<SimulatedWatcher> event_;
+  EventScheduler *scheduler_;
+  EventScheduler::Token token_;
   SimulatedChannel *simulated_channel_ = nullptr;
 };
 
@@ -83,9 +80,9 @@
   ::std::unique_ptr<RawFetcher> MakeRawFetcher(EventLoop *event_loop);
 
   // Registers a watcher for the queue.
-  void MakeRawWatcher(ShmWatcher *watcher);
+  void MakeRawWatcher(SimulatedWatcher *watcher);
 
-  void RemoveWatcher(ShmWatcher *watcher) {
+  void RemoveWatcher(SimulatedWatcher *watcher) {
     watchers_.erase(std::find(watchers_.begin(), watchers_.end(), watcher));
   }
 
@@ -113,7 +110,7 @@
   const Channel *channel_;
 
   // List of all watchers.
-  ::std::vector<ShmWatcher *> watchers_;
+  ::std::vector<SimulatedWatcher *> watchers_;
 
   // List of all fetchers.
   ::std::vector<SimulatedFetcher *> fetchers_;
@@ -123,8 +120,6 @@
   ipc_lib::QueueIndex next_queue_index_;
 };
 
-ShmWatcher::~ShmWatcher() { simulated_channel_->RemoveWatcher(this); }
-
 namespace {
 
 // Creates a SimulatedMessage with size bytes of storage.
@@ -195,8 +190,6 @@
 };
 }  // namespace
 
-class SimulatedEventLoop;
-
 class SimulatedFetcher : public RawFetcher {
  public:
   explicit SimulatedFetcher(EventLoop *event_loop, SimulatedChannel *queue)
@@ -215,6 +208,8 @@
 
   std::pair<bool, monotonic_clock::time_point> DoFetch() override {
     if (msgs_.size() == 0) {
+      // TODO(austin): Can we just do this logic unconditionally?  It is a lot
+      // simpler.  And call clear, obviously.
       if (!msg_ && queue_->latest_message()) {
         SetMsg(queue_->latest_message());
         return std::make_pair(true, queue_->monotonic_now());
@@ -256,25 +251,22 @@
   explicit SimulatedTimerHandler(EventScheduler *scheduler,
                                  SimulatedEventLoop *simulated_event_loop,
                                  ::std::function<void()> fn);
-  ~SimulatedTimerHandler() {}
+  ~SimulatedTimerHandler() { Disable(); }
 
   void Setup(monotonic_clock::time_point base,
              monotonic_clock::duration repeat_offset) override;
 
   void HandleEvent();
 
-  void Disable() override {
-    if (token_ != scheduler_->InvalidToken()) {
-      scheduler_->Deschedule(token_);
-      token_ = scheduler_->InvalidToken();
-    }
-  }
+  void Disable() override;
 
   ::aos::monotonic_clock::time_point monotonic_now() const {
     return scheduler_->monotonic_now();
   }
 
  private:
+  SimulatedEventLoop *simulated_event_loop_;
+  EventHandler<SimulatedTimerHandler> event_;
   EventScheduler *scheduler_;
   EventScheduler::Token token_;
 
@@ -289,21 +281,15 @@
                              ::std::function<void(int)> fn,
                              const monotonic_clock::duration interval,
                              const monotonic_clock::duration offset);
-  ~SimulatedPhasedLoopHandler() {
-    if (token_ != scheduler_->InvalidToken()) {
-      scheduler_->Deschedule(token_);
-      token_ = scheduler_->InvalidToken();
-    }
-  }
+  ~SimulatedPhasedLoopHandler();
 
-  void HandleTimerWakeup();
+  void HandleEvent();
 
-  void Schedule(monotonic_clock::time_point sleep_time) override {
-    token_ = scheduler_->Schedule(sleep_time, [this]() { HandleTimerWakeup(); });
-  }
+  void Schedule(monotonic_clock::time_point sleep_time) override;
 
  private:
   SimulatedEventLoop *simulated_event_loop_;
+  EventHandler<SimulatedPhasedLoopHandler> event_;
 
   EventScheduler *scheduler_;
   EventScheduler::Token token_;
@@ -351,6 +337,11 @@
     }
   }
 
+  std::chrono::nanoseconds send_delay() const { return send_delay_; }
+  void set_send_delay(std::chrono::nanoseconds send_delay) {
+    send_delay_ = send_delay;
+  }
+
   ::aos::monotonic_clock::time_point monotonic_now() override {
     return scheduler_->monotonic_now();
   }
@@ -407,6 +398,19 @@
 
  private:
   friend class SimulatedTimerHandler;
+  friend class SimulatedPhasedLoopHandler;
+  friend class SimulatedWatcher;
+
+  void HandleEvent() {
+    while (true) {
+      if (EventCount() == 0 || PeekEvent()->event_time() > monotonic_now()) {
+        break;
+      }
+
+      EventLoopEvent *event = PopEvent();
+      event->HandleEvent();
+    }
+  }
 
   pid_t GetTid() override { return tid_; }
 
@@ -422,16 +426,28 @@
 
   bool has_setup_ = false;
 
+  std::chrono::nanoseconds send_delay_;
+
   const pid_t tid_;
 };
 
+void SimulatedEventLoopFactory::set_send_delay(
+    std::chrono::nanoseconds send_delay) {
+  send_delay_ = send_delay;
+  for (std::pair<EventLoop *, std::function<void(bool)>> &loop :
+       raw_event_loops_) {
+    reinterpret_cast<SimulatedEventLoop *>(loop.first)
+        ->set_send_delay(send_delay_);
+  }
+}
+
 void SimulatedEventLoop::MakeRawWatcher(
     const Channel *channel,
     std::function<void(const Context &channel, const void *message)> watcher) {
   ChannelIndex(channel);
   Take(channel);
-  std::unique_ptr<ShmWatcher> shm_watcher(
-      new ShmWatcher(this, channel, std::move(watcher)));
+  std::unique_ptr<SimulatedWatcher> shm_watcher(
+      new SimulatedWatcher(this, scheduler_, channel, std::move(watcher)));
 
   GetSimulatedChannel(channel)->MakeRawWatcher(shm_watcher.get());
   NewWatcher(std::move(shm_watcher));
@@ -463,7 +479,65 @@
   return it->second.get();
 }
 
-void SimulatedChannel::MakeRawWatcher(ShmWatcher *watcher) {
+SimulatedWatcher::SimulatedWatcher(
+    SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
+    const Channel *channel,
+    std::function<void(const Context &context, const void *message)> fn)
+    : WatcherState(simulated_event_loop, channel, std::move(fn)),
+      simulated_event_loop_(simulated_event_loop),
+      event_(this),
+      scheduler_(scheduler),
+      token_(scheduler_->InvalidToken()) {}
+
+SimulatedWatcher::~SimulatedWatcher() {
+  simulated_event_loop_->RemoveEvent(&event_);
+  if (token_ != scheduler_->InvalidToken()) {
+    scheduler_->Deschedule(token_);
+  }
+  simulated_channel_->RemoveWatcher(this);
+}
+
+void SimulatedWatcher::Schedule(std::shared_ptr<SimulatedMessage> message) {
+  monotonic_clock::time_point event_time = scheduler_->monotonic_now();
+
+  // Messages are queued in order.  If we are the first, add ourselves.
+  // Otherwise, don't.
+  if (msgs_.size() == 0) {
+    event_.set_event_time(message->context.monotonic_sent_time);
+    simulated_event_loop_->AddEvent(&event_);
+
+    DoSchedule(event_time);
+  }
+
+  msgs_.emplace_back(message);
+}
+
+void SimulatedWatcher::HandleEvent() {
+  CHECK_NE(msgs_.size(), 0u) << ": No events to handle.";
+
+  const monotonic_clock::time_point monotonic_now =
+      simulated_event_loop_->monotonic_now();
+  DoCallCallback([monotonic_now]() { return monotonic_now; },
+                 msgs_.front()->context);
+
+  msgs_.pop_front();
+  if (msgs_.size() != 0) {
+    event_.set_event_time(msgs_.front()->context.monotonic_sent_time);
+    simulated_event_loop_->AddEvent(&event_);
+
+    DoSchedule(event_.event_time());
+  } else {
+    token_ = scheduler_->InvalidToken();
+  }
+}
+
+void SimulatedWatcher::DoSchedule(monotonic_clock::time_point event_time) {
+  token_ =
+      scheduler_->Schedule(event_time + simulated_event_loop_->send_delay(),
+                           [this]() { simulated_event_loop_->HandleEvent(); });
+}
+
+void SimulatedChannel::MakeRawWatcher(SimulatedWatcher *watcher) {
   watcher->SetSimulatedChannel(this);
   watchers_.emplace_back(watcher);
 }
@@ -489,8 +563,8 @@
 
   latest_message_ = message;
   if (scheduler_->is_running()) {
-    for (ShmWatcher *watcher : watchers_) {
-      watcher->Schedule(scheduler_, message);
+    for (SimulatedWatcher *watcher : watchers_) {
+      watcher->Schedule(message);
     }
   }
   for (auto &fetcher : fetchers_) {
@@ -510,6 +584,8 @@
     EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
     ::std::function<void()> fn)
     : TimerHandler(simulated_event_loop, std::move(fn)),
+      simulated_event_loop_(simulated_event_loop),
+      event_(this),
       scheduler_(scheduler),
       token_(scheduler_->InvalidToken()) {}
 
@@ -521,10 +597,14 @@
   base_ = base;
   repeat_offset_ = repeat_offset;
   if (base < monotonic_now) {
-    token_ = scheduler_->Schedule(monotonic_now, [this]() { HandleEvent(); });
+    token_ = scheduler_->Schedule(
+        monotonic_now, [this]() { simulated_event_loop_->HandleEvent(); });
   } else {
-    token_ = scheduler_->Schedule(base, [this]() { HandleEvent(); });
+    token_ = scheduler_->Schedule(
+        base, [this]() { simulated_event_loop_->HandleEvent(); });
   }
+  event_.set_event_time(base_);
+  simulated_event_loop_->AddEvent(&event_);
 }
 
 void SimulatedTimerHandler::HandleEvent() {
@@ -533,7 +613,10 @@
   if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
     // Reschedule.
     while (base_ <= monotonic_now) base_ += repeat_offset_;
-    token_ = scheduler_->Schedule(base_, [this]() { HandleEvent(); });
+    token_ = scheduler_->Schedule(
+        base_, [this]() { simulated_event_loop_->HandleEvent(); });
+    event_.set_event_time(base_);
+    simulated_event_loop_->AddEvent(&event_);
   } else {
     token_ = scheduler_->InvalidToken();
   }
@@ -541,16 +624,33 @@
   Call([monotonic_now]() { return monotonic_now; }, monotonic_now);
 }
 
+void SimulatedTimerHandler::Disable() {
+  simulated_event_loop_->RemoveEvent(&event_);
+  if (token_ != scheduler_->InvalidToken()) {
+    scheduler_->Deschedule(token_);
+    token_ = scheduler_->InvalidToken();
+  }
+}
+
 SimulatedPhasedLoopHandler::SimulatedPhasedLoopHandler(
     EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
     ::std::function<void(int)> fn, const monotonic_clock::duration interval,
     const monotonic_clock::duration offset)
     : PhasedLoopHandler(simulated_event_loop, std::move(fn), interval, offset),
       simulated_event_loop_(simulated_event_loop),
+      event_(this),
       scheduler_(scheduler),
       token_(scheduler_->InvalidToken()) {}
 
-void SimulatedPhasedLoopHandler::HandleTimerWakeup() {
+SimulatedPhasedLoopHandler::~SimulatedPhasedLoopHandler() {
+  if (token_ != scheduler_->InvalidToken()) {
+    scheduler_->Deschedule(token_);
+    token_ = scheduler_->InvalidToken();
+  }
+  simulated_event_loop_->RemoveEvent(&event_);
+}
+
+void SimulatedPhasedLoopHandler::HandleEvent() {
   monotonic_clock::time_point monotonic_now =
       simulated_event_loop_->monotonic_now();
   Call(
@@ -558,6 +658,14 @@
       [this](monotonic_clock::time_point sleep_time) { Schedule(sleep_time); });
 }
 
+void SimulatedPhasedLoopHandler::Schedule(
+    monotonic_clock::time_point sleep_time) {
+  token_ = scheduler_->Schedule(
+      sleep_time, [this]() { simulated_event_loop_->HandleEvent(); });
+  event_.set_event_time(sleep_time);
+  simulated_event_loop_->AddEvent(&event_);
+}
+
 void SimulatedEventLoop::Take(const Channel *channel) {
   CHECK(!is_running()) << ": Cannot add new objects while running.";
 
@@ -575,10 +683,11 @@
     std::string_view name) {
   pid_t tid = tid_;
   ++tid_;
-  ::std::unique_ptr<EventLoop> result(new SimulatedEventLoop(
+  ::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
       &scheduler_, &channels_, configuration_, &raw_event_loops_, tid));
   result->set_name(name);
-  return result;
+  result->set_send_delay(send_delay_);
+  return std::move(result);
 }
 
 void SimulatedEventLoopFactory::RunFor(monotonic_clock::duration duration) {
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 79f1e46..7b2b9c7 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -65,6 +65,9 @@
   // loop handler.
   void Exit() { scheduler_.Exit(); }
 
+  // Sets the simulated send delay for the factory.
+  void set_send_delay(std::chrono::nanoseconds send_delay);
+
   monotonic_clock::time_point monotonic_now() const {
     return scheduler_.monotonic_now();
   }
@@ -81,6 +84,8 @@
   std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
       raw_event_loops_;
 
+  std::chrono::nanoseconds send_delay_ = std::chrono::microseconds(50);
+
   pid_t tid_ = 0;
 };
 
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 16a1923..44be581 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -3,6 +3,7 @@
 #include <string_view>
 
 #include "aos/events/event_loop_param_test.h"
+#include "aos/events/test_message_generated.h"
 #include "gtest/gtest.h"
 
 namespace aos {
@@ -28,6 +29,10 @@
   // I'm not sure how much that matters.
   void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
 
+  void set_send_delay(std::chrono::nanoseconds send_delay) {
+    event_loop_factory_.set_send_delay(send_delay);
+  }
+
  private:
    SimulatedEventLoopFactory event_loop_factory_;
 };
@@ -114,5 +119,106 @@
   EXPECT_EQ(counter, 10);
 }
 
+// Tests that watchers have latency in simulation.
+TEST(SimulatedEventLoopTest, WatcherTimingReport) {
+  SimulatedEventLoopTestFactory factory;
+  factory.set_send_delay(std::chrono::microseconds(50));
+
+  FLAGS_timing_report_ms = 1000;
+  auto loop1 = factory.MakePrimary("primary");
+  loop1->MakeWatcher("/test", [](const TestMessage &) {});
+
+  auto loop2 = factory.Make("sender_loop");
+
+  auto loop3 = factory.Make("report_fetcher");
+
+  Fetcher<timing::Report> report_fetcher =
+      loop3->MakeFetcher<timing::Report>("/aos");
+  EXPECT_FALSE(report_fetcher.Fetch());
+
+  auto sender = loop2->MakeSender<TestMessage>("/test");
+
+  // Send 10 messages in the middle of a timing report period so we get
+  // something interesting back.
+  auto test_timer = loop2->AddTimer([&sender]() {
+    for (int i = 0; i < 10; ++i) {
+      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+      builder.add_value(200 + i);
+      ASSERT_TRUE(msg.Send(builder.Finish()));
+    }
+  });
+
+  // Quit after 1 timing report, mid way through the next cycle.
+  {
+    auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
+    end_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(2500));
+    end_timer->set_name("end");
+  }
+
+  loop1->OnRun([&test_timer, &loop1]() {
+    test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1500));
+  });
+
+  factory.Run();
+
+  // And, since we are here, check that the timing report makes sense.
+  // Start by looking for our event loop's timing.
+  FlatbufferDetachedBuffer<timing::Report> primary_report =
+      FlatbufferDetachedBuffer<timing::Report>::Empty();
+  while (report_fetcher.FetchNext()) {
+    LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
+    if (report_fetcher->name()->string_view() == "primary") {
+      primary_report = CopyFlatBuffer(report_fetcher.get());
+    }
+  }
+
+  // Check the watcher report.
+  VLOG(1) << FlatbufferToJson(primary_report, true);
+
+  EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
+
+  // Just the timing report timer.
+  ASSERT_NE(primary_report.message().timers(), nullptr);
+  EXPECT_EQ(primary_report.message().timers()->size(), 2);
+
+  // No phased loops
+  ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
+
+  // And now confirm that the watcher received all 10 messages, and has latency.
+  ASSERT_NE(primary_report.message().watchers(), nullptr);
+  ASSERT_EQ(primary_report.message().watchers()->size(), 1);
+  EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
+  EXPECT_NEAR(
+      primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
+      0.00005, 1e-9);
+  EXPECT_NEAR(
+      primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
+      0.00005, 1e-9);
+  EXPECT_NEAR(
+      primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
+      0.00005, 1e-9);
+  EXPECT_EQ(primary_report.message()
+                .watchers()
+                ->Get(0)
+                ->wakeup_latency()
+                ->standard_deviation(),
+            0.0);
+
+  EXPECT_EQ(
+      primary_report.message().watchers()->Get(0)->handler_time()->average(),
+      0.0);
+  EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
+            0.0);
+  EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
+            0.0);
+  EXPECT_EQ(primary_report.message()
+                .watchers()
+                ->Get(0)
+                ->handler_time()
+                ->standard_deviation(),
+            0.0);
+}
+
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index a4539f8..0c87e4d 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -549,7 +549,10 @@
 
 void LocklessQueue::Sender::Send(const char *data, size_t length) {
   CHECK_LE(length, size());
-  memcpy(Data(), data, length);
+  // Flatbuffers write from the back of the buffer to the front.  If we are
+  // going to write an explicit chunk of memory into the buffer, we need to
+  // adhere to this convention and place it at the end.
+  memcpy((reinterpret_cast<char *>(Data()) + size() - length), data, length);
   Send(length);
 }
 
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index 6b3f7c5..e3d6c5e 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -595,8 +595,10 @@
             break;
           }
 
-          EXPECT_GT(read_data[6], last_data) << ": Got " << read_data;
-          last_data = read_data[6];
+          EXPECT_GT(read_data[queue.message_data_size() - length + 6],
+                    last_data)
+              << ": Got " << read_data;
+          last_data = read_data[queue.message_data_size() - length + 6];
 
           ++i;
         }
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index 7f59e96..350350c 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -282,7 +282,8 @@
 
     ThreadPlusCount tpc;
     ASSERT_EQ(length, sizeof(ThreadPlusCount));
-    memcpy(&tpc, read_data, sizeof(ThreadPlusCount));
+    memcpy(&tpc, read_data + queue.message_data_size() - length,
+           sizeof(ThreadPlusCount));
 
     if (will_wrap) {
       // The queue won't chang out from under us, so we should get some amount
diff --git a/frc971/control_loops/drivetrain/drivetrain_lib_test.cc b/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
index b2a533e..9fd79fc 100644
--- a/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
+++ b/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
@@ -53,6 +53,8 @@
         drivetrain_(dt_config_, drivetrain_event_loop_.get(), &localizer_),
         drivetrain_plant_event_loop_(MakeEventLoop("drivetrain_plant")),
         drivetrain_plant_(drivetrain_plant_event_loop_.get(), dt_config_) {
+    // Too many tests care...
+    set_send_delay(chrono::nanoseconds(0));
     set_battery_voltage(12.0);
   }
   virtual ~DrivetrainTest() {}
diff --git a/y2016/control_loops/shooter/shooter_lib_test.cc b/y2016/control_loops/shooter/shooter_lib_test.cc
index 6b3e2d6..c20526c 100644
--- a/y2016/control_loops/shooter/shooter_lib_test.cc
+++ b/y2016/control_loops/shooter/shooter_lib_test.cc
@@ -178,7 +178,7 @@
     EXPECT_TRUE(builder.Send(goal_builder.Finish()));
   }
 
-  RunFor(dt());
+  RunFor(dt() * 3 / 2);
 
   VerifyNearGoal();
 
diff --git a/y2019/control_loops/superstructure/superstructure_lib_test.cc b/y2019/control_loops/superstructure/superstructure_lib_test.cc
index f0e1a70..58bac76 100644
--- a/y2019/control_loops/superstructure/superstructure_lib_test.cc
+++ b/y2019/control_loops/superstructure/superstructure_lib_test.cc
@@ -445,7 +445,11 @@
       superstructure_status_fetcher_.Fetch();
       // 2 Seconds
       ASSERT_LE(i, 2 * 1.0 / .00505);
-    } while (!superstructure_status_fetcher_.get()->zeroed());
+
+      // Since there is a delay when sending running, make sure we have a status
+      // before checking it.
+    } while (superstructure_status_fetcher_.get() == nullptr ||
+             !superstructure_status_fetcher_.get()->zeroed());
   }
 
   ::std::unique_ptr<::aos::EventLoop> test_event_loop_;