Sort wakeups and add send latency.

We don't want to rely on epoll to trigger wakeups in order.  Timers will
likely wake up before watchers due to less kernel delays, etc.  So,
maintain a sorted list of events and see which should be triggered first
and trigger it first.  It also merges wakeups nicely.

This gives us the infrastructure to handle the same problem in
simulation.  We can have perfect timers, and senders with latency.  But,
if a timer wakes up while something is being sent, we will move up the
wakeup and handle it in order.

Change-Id: I8675ec5221dd2603b4aa5b1a3729907a599813b4
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index f5feea9..e549d57 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -179,8 +179,6 @@
   }
 
   bool FetchNext() {
-    // TODO(austin): Write a test which starts with nothing in the queue,
-    // and then calls FetchNext() after something is sent.
     // TODO(austin): Get behind and make sure it dies both here and with
     // Fetch.
     ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
@@ -340,41 +338,41 @@
 class WatcherState : public aos::WatcherState {
  public:
   WatcherState(
-      EventLoop *event_loop, const Channel *channel,
+      ShmEventLoop *event_loop, const Channel *channel,
       std::function<void(const Context &context, const void *message)> fn)
       : aos::WatcherState(event_loop, channel, std::move(fn)),
+        event_loop_(event_loop),
+        event_(this),
         simple_shm_fetcher_(channel) {}
 
-  ~WatcherState() override {}
+  ~WatcherState() override { event_loop_->RemoveEvent(&event_); }
 
   void Startup(EventLoop *event_loop) override {
-    PointAtNextQueueIndex();
+    simple_shm_fetcher_.PointAtNextQueueIndex();
     CHECK(RegisterWakeup(event_loop->priority()));
   }
 
-  // Points the next message to fetch at the queue index which will be populated
-  // next.
-  void PointAtNextQueueIndex() { simple_shm_fetcher_.PointAtNextQueueIndex(); }
-
   // Returns true if there is new data available.
-  bool HasNewData() {
+  bool CheckForNewData() {
     if (!has_new_data_) {
       has_new_data_ = simple_shm_fetcher_.FetchNext();
+
+      if (has_new_data_) {
+        event_.set_event_time(
+            simple_shm_fetcher_.context().monotonic_sent_time);
+        event_loop_->AddEvent(&event_);
+      }
     }
 
     return has_new_data_;
   }
 
-  // Returns the time of the current data sample.
-  aos::monotonic_clock::time_point event_time() const {
-    return simple_shm_fetcher_.context().monotonic_sent_time;
-  }
-
   // Consumes the data by calling the callback.
-  void CallCallback() {
+  void HandleEvent() {
     CHECK(has_new_data_);
     DoCallCallback(monotonic_clock::now, simple_shm_fetcher_.context());
     has_new_data_ = false;
+    CheckForNewData();
   }
 
   // Registers us to receive a signal on event reception.
@@ -387,37 +385,68 @@
  private:
   bool has_new_data_ = false;
 
+  ShmEventLoop *event_loop_;
+  EventHandler<WatcherState> event_;
   SimpleShmFetcher simple_shm_fetcher_;
 };
 
 // Adapter class to adapt a timerfd to a TimerHandler.
-class TimerHandlerState : public TimerHandler {
+class TimerHandlerState final : public TimerHandler {
  public:
   TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
       : TimerHandler(shm_event_loop, std::move(fn)),
-        shm_event_loop_(shm_event_loop) {
-    shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
-      const uint64_t elapsed_cycles = timerfd_.Read();
-
-      Call(monotonic_clock::now, base_);
-
-      base_ += repeat_offset_ * elapsed_cycles;
-    });
+        shm_event_loop_(shm_event_loop),
+        event_(this) {
+    shm_event_loop_->epoll_.OnReadable(
+        timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
   }
 
-  ~TimerHandlerState() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
+  ~TimerHandlerState() {
+    Disable();
+    shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
+  }
+
+  void HandleEvent() {
+    uint64_t elapsed_cycles = timerfd_.Read();
+    if (elapsed_cycles == 0u) {
+      // We got called before the timer interrupt could happen, but because we
+      // are checking the time, we got called on time.  Push the timer out by 1
+      // cycle.
+      elapsed_cycles = 1u;
+      timerfd_.SetTime(base_ + repeat_offset_, repeat_offset_);
+    }
+
+    Call(monotonic_clock::now, base_);
+
+    base_ += repeat_offset_ * elapsed_cycles;
+
+    if (repeat_offset_ != chrono::seconds(0)) {
+      event_.set_event_time(base_);
+      shm_event_loop_->AddEvent(&event_);
+    }
+  }
 
   void Setup(monotonic_clock::time_point base,
              monotonic_clock::duration repeat_offset) override {
+    if (event_.valid()) {
+      shm_event_loop_->RemoveEvent(&event_);
+    }
+
     timerfd_.SetTime(base, repeat_offset);
     base_ = base;
     repeat_offset_ = repeat_offset;
+    event_.set_event_time(base_);
+    shm_event_loop_->AddEvent(&event_);
   }
 
-  void Disable() override { timerfd_.Disable(); }
+  void Disable() override {
+    shm_event_loop_->RemoveEvent(&event_);
+    timerfd_.Disable();
+  }
 
  private:
   ShmEventLoop *shm_event_loop_;
+  EventHandler<TimerHandlerState> event_;
 
   TimerFd timerfd_;
 
@@ -426,33 +455,52 @@
 };
 
 // Adapter class to the timerfd and PhasedLoop.
-class PhasedLoopHandler : public ::aos::PhasedLoopHandler {
+class PhasedLoopHandler final : public ::aos::PhasedLoopHandler {
  public:
   PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
                     const monotonic_clock::duration interval,
                     const monotonic_clock::duration offset)
       : aos::PhasedLoopHandler(shm_event_loop, std::move(fn), interval, offset),
-        shm_event_loop_(shm_event_loop) {
-    shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
-      timerfd_.Read();
-      Call(monotonic_clock::now,
-           [this](monotonic_clock::time_point sleep_time) {
-             Schedule(sleep_time);
-           });
+        shm_event_loop_(shm_event_loop),
+        event_(this) {
+    shm_event_loop_->epoll_.OnReadable(
+        timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
+  }
+
+  void HandleEvent() {
+    // The return value for read is the number of cycles that have elapsed.
+    // Because we check to see when this event *should* have happened, there are
+    // cases where Read() will return 0, when 1 cycle has actually happened.
+    // This occurs when the timer interrupt hasn't triggered yet.  Therefore,
+    // ignore it.  Call handles rescheduling and calculating elapsed cycles
+    // without any extra help.
+    timerfd_.Read();
+    event_.Invalidate();
+
+    Call(monotonic_clock::now, [this](monotonic_clock::time_point sleep_time) {
+      Schedule(sleep_time);
     });
   }
 
   ~PhasedLoopHandler() override {
     shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
+    shm_event_loop_->RemoveEvent(&event_);
   }
 
  private:
   // Reschedules the timer.
   void Schedule(monotonic_clock::time_point sleep_time) override {
+    if (event_.valid()) {
+      shm_event_loop_->RemoveEvent(&event_);
+    }
+
     timerfd_.SetTime(sleep_time, ::aos::monotonic_clock::zero());
+    event_.set_event_time(sleep_time);
+    shm_event_loop_->AddEvent(&event_);
   }
 
   ShmEventLoop *shm_event_loop_;
+  EventHandler<PhasedLoopHandler> event_;
 
   TimerFd timerfd_;
 };
@@ -497,33 +545,23 @@
   on_run_.push_back(::std::move(on_run));
 }
 
-void ShmEventLoop::HandleWatcherSignal() {
+void ShmEventLoop::HandleEvent() {
+  // Update all the times for handlers.
+  for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
+    internal::WatcherState *watcher =
+        reinterpret_cast<internal::WatcherState *>(base_watcher.get());
+
+    watcher->CheckForNewData();
+  }
+
   while (true) {
-    // Call the handlers in time order of their messages.
-    aos::monotonic_clock::time_point min_event_time =
-        aos::monotonic_clock::max_time;
-    size_t min_watcher_index = -1;
-    size_t watcher_index = 0;
-    for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
-      internal::WatcherState *watcher =
-          reinterpret_cast<internal::WatcherState *>(base_watcher.get());
-
-      if (watcher->HasNewData()) {
-        if (watcher->event_time() < min_event_time) {
-          min_watcher_index = watcher_index;
-          min_event_time = watcher->event_time();
-        }
-      }
-      ++watcher_index;
-    }
-
-    if (min_event_time == aos::monotonic_clock::max_time) {
+    if (EventCount() == 0 ||
+        PeekEvent()->event_time() > monotonic_clock::now()) {
       break;
     }
 
-    reinterpret_cast<internal::WatcherState *>(
-        watchers_[min_watcher_index].get())
-        ->CallCallback();
+    EventLoopEvent *event = PopEvent();
+    event->HandleEvent();
   }
 }
 
@@ -635,12 +673,14 @@
       // watchers, and calling the oldest thing first.  That will improve
       // determinism a lot.
 
-      HandleWatcherSignal();
+      HandleEvent();
     });
   }
 
   MaybeScheduleTimingReports();
 
+  ReserveEvents();
+
   // Now, all the callbacks are setup.  Lock everything into memory and go RT.
   if (priority_ != 0) {
     ::aos::InitRT();