Run all timers in the main thread with epoll

This ditches 1+ threads in all use cases.

Change-Id: I8772a539c20cd10d80a57f3f53c48a3d8124461a
diff --git a/aos/events/shm-event-loop.cc b/aos/events/shm-event-loop.cc
index ee71b4c..08f9e1f 100644
--- a/aos/events/shm-event-loop.cc
+++ b/aos/events/shm-event-loop.cc
@@ -6,15 +6,19 @@
 #include <chrono>
 #include <stdexcept>
 
+#include "aos/events/epoll.h"
 #include "aos/init.h"
 #include "aos/logging/logging.h"
 #include "aos/queue.h"
 
 namespace aos {
 
-ShmEventLoop::ShmEventLoop() : thread_state_(std::make_shared<ThreadState>()) {}
+ShmEventLoop::ShmEventLoop() {}
 
 namespace {
+
+namespace chrono = ::std::chrono;
+
 class ShmFetcher : public RawFetcher {
  public:
   explicit ShmFetcher(RawQueue *queue) : queue_(queue) {
@@ -79,13 +83,13 @@
  public:
   explicit ShmSender(RawQueue *queue) : queue_(queue) {}
 
-  aos::Message *GetMessage() override {
-    return reinterpret_cast<aos::Message *>(queue_->GetMessage());
+  ::aos::Message *GetMessage() override {
+    return reinterpret_cast<::aos::Message *>(queue_->GetMessage());
   }
 
-  void Free(aos::Message *msg) override { queue_->FreeMessage(msg); }
+  void Free(::aos::Message *msg) override { queue_->FreeMessage(msg); }
 
-  bool Send(aos::Message *msg) override {
+  bool Send(::aos::Message *msg) override {
     assert(queue_ != nullptr);
     {
       // TODO(austin): This lets multiple senders reorder messages since time
@@ -102,18 +106,60 @@
  private:
   RawQueue *queue_;
 };
+
 }  // namespace
 
 namespace internal {
+
+// Class to manage the state for a Watcher.
 class WatcherThreadState {
  public:
-  WatcherThreadState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
-                     RawQueue *queue,
-                     std::function<void(const aos::Message *message)> watcher)
-      : thread_state_(std::move(thread_state)),
+  WatcherThreadState(
+      ShmEventLoop::ThreadState *thread_state, RawQueue *queue,
+      ::std::function<void(const ::aos::Message *message)> watcher)
+      : thread_state_(thread_state),
         queue_(queue),
         index_(0),
-        watcher_(std::move(watcher)) {
+        watcher_(::std::move(watcher)) {}
+
+  ~WatcherThreadState() {
+    // Only kill the thread if it is running.
+    if (running_) {
+      // TODO(austin): CHECK that we aren't RT here.
+
+      // Try joining.  If we fail, we weren't asleep on the condition in the
+      // queue.  So hit it again and again until that's true.
+      struct timespec end_time;
+      PCHECK(clock_gettime(CLOCK_REALTIME, &end_time) == 0);
+      while (true) {
+        void *retval = nullptr;
+        end_time.tv_nsec += 100000000;
+        if (end_time.tv_nsec > 1000000000L) {
+          end_time.tv_nsec -= 1000000000L;
+          ++end_time.tv_sec;
+        }
+        int ret = pthread_timedjoin_np(pthread_, &retval, &end_time);
+        if (ret == ETIMEDOUT) continue;
+        PCHECK(ret == 0);
+        break;
+      }
+    }
+  }
+
+  // Starts the thread and waits until it is running.
+  void Start() {
+    PCHECK(pthread_create(&pthread_, nullptr, &StaticRun, this) == 0);
+    IPCRecursiveMutexLocker locker(&thread_started_mutex_);
+    if (locker.owner_died()) ::aos::Die("Owner died");
+    while (!running_) {
+      CHECK(!thread_started_condition_.Wait());
+    }
+  }
+
+  void GrabQueueIndex() {
+    // Right after we are signaled to start, point index to the current index
+    // so we don't read any messages received before now.  Otherwise we will
+    // get a significantly delayed read.
     static constexpr Options<RawQueue> kOptions =
         RawQueue::kFromEnd | RawQueue::kNonBlock;
     const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
@@ -122,10 +168,31 @@
     }
   }
 
+ private:
+  // Runs Run given a WatcherThreadState as the argument.  This is an adapter
+  // between pthreads and Run.
+  static void *StaticRun(void *arg) {
+    WatcherThreadState *watcher_thread_state =
+        reinterpret_cast<WatcherThreadState *>(arg);
+    watcher_thread_state->Run();
+    return nullptr;
+  }
+
+  // Runs the watcher callback on new messages.
   void Run() {
+    // Signal the main thread that we are now ready.
     thread_state_->MaybeSetCurrentThreadRealtimePriority();
+    {
+      IPCRecursiveMutexLocker locker(&thread_started_mutex_);
+      if (locker.owner_died()) ::aos::Die("Owner died");
+      running_ = true;
+      thread_started_condition_.Broadcast();
+    }
+
+    // Wait for the global start before handling events.
     thread_state_->WaitForStart();
 
+    // Bail immediately if we are supposed to stop.
     if (!thread_state_->is_running()) {
       ::aos::UnsetCurrentThreadRealtimePriority();
       return;
@@ -133,10 +200,18 @@
 
     const void *msg = nullptr;
     while (true) {
-      msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_);
-      assert(msg != nullptr);
+      msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_,
+                                     chrono::seconds(1));
+      // We hit a timeout.  Confirm that we should be running and retry.  Note,
+      // is_running is threadsafe (it's an atomic underneath).  Worst case, we
+      // check again in a second.
+      if (msg == nullptr) {
+        if (!thread_state_->is_running()) break;
+        continue;
+      }
 
       {
+        // Grab the lock so that only one callback can be called at a time.
         MutexLocker locker(&thread_state_->mutex_);
         if (!thread_state_->is_running()) break;
 
@@ -144,142 +219,155 @@
         // watcher_ may have exited the event loop.
         if (!thread_state_->is_running()) break;
       }
+      // Drop the reference.
       queue_->FreeMessage(msg);
     }
 
+    // And drop the last reference.
     queue_->FreeMessage(msg);
+    // Now that everything is cleaned up, drop RT priority before destroying the
+    // thread.
     ::aos::UnsetCurrentThreadRealtimePriority();
   }
-
- private:
-  std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
+  pthread_t pthread_;
+  ShmEventLoop::ThreadState *thread_state_;
   RawQueue *queue_;
   int32_t index_;
+  bool running_ = false;
 
-  std::function<void(const Message *message)> watcher_;
+  ::std::function<void(const Message *message)> watcher_;
+
+  // Mutex and condition variable used to wait until the thread is started
+  // before going RT.
+  ::aos::Mutex thread_started_mutex_;
+  ::aos::Condition thread_started_condition_{&thread_started_mutex_};
 };
 
+// Adapter class to adapt a timerfd to a TimerHandler.
 class TimerHandlerState : public TimerHandler {
  public:
-  TimerHandlerState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
-                    ::std::function<void()> fn)
-      : thread_state_(std::move(thread_state)), fn_(::std::move(fn)) {
-    fd_ = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
-    PCHECK(fd_ != -1);
+  TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
+      : shm_event_loop_(shm_event_loop), fn_(::std::move(fn)) {
+    shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
+      MutexLocker locker(&shm_event_loop_->thread_state_.mutex_);
+      timerfd_.Read();
+      fn_();
+    });
   }
 
-  ~TimerHandlerState() {
-    PCHECK(close(fd_) == 0);
-  }
+  ~TimerHandlerState() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
 
   void Setup(monotonic_clock::time_point base,
              monotonic_clock::duration repeat_offset) override {
-    struct itimerspec new_value;
-    new_value.it_interval = ::aos::time::to_timespec(repeat_offset);
-    new_value.it_value = ::aos::time::to_timespec(base);
-    PCHECK(timerfd_settime(fd_, TFD_TIMER_ABSTIME, &new_value, nullptr) == 0);
+    timerfd_.SetTime(base, repeat_offset);
   }
 
-  void Disable() override {
-    // Disarm the timer by feeding zero values
-    Setup(::aos::monotonic_clock::epoch(), ::aos::monotonic_clock::zero());
-  }
-
-  void Run() {
-    thread_state_->MaybeSetCurrentThreadRealtimePriority();
-    thread_state_->WaitForStart();
-
-    while (true) {
-      uint64_t buf;
-      ssize_t result = read(fd_, &buf, sizeof(buf));
-      PCHECK(result != -1);
-      CHECK_EQ(result, static_cast<int>(sizeof(buf)));
-
-      {
-        MutexLocker locker(&thread_state_->mutex_);
-        if (!thread_state_->is_running()) break;
-        fn_();
-        // fn_ may have exited the event loop.
-        if (!thread_state_->is_running()) break;
-      }
-    }
-    ::aos::UnsetCurrentThreadRealtimePriority();
-  }
+  void Disable() override { timerfd_.Disable(); }
 
  private:
-  std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
+  ShmEventLoop *shm_event_loop_;
 
-  // File descriptor for the timer
-  int fd_;
+  TimerFd timerfd_;
 
   // Function to be run on the thread
   ::std::function<void()> fn_;
 };
 }  // namespace internal
 
-std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
-    const std::string &path, const QueueTypeInfo &type) {
-  return std::unique_ptr<RawFetcher>(new ShmFetcher(
+::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
+    const ::std::string &path, const QueueTypeInfo &type) {
+  return ::std::unique_ptr<RawFetcher>(new ShmFetcher(
       RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
 }
 
-std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
-    const std::string &path, const QueueTypeInfo &type) {
+::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
+    const ::std::string &path, const QueueTypeInfo &type) {
   Take(path);
-  return std::unique_ptr<RawSender>(new ShmSender(
+  return ::std::unique_ptr<RawSender>(new ShmSender(
       RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
 }
 
 void ShmEventLoop::MakeRawWatcher(
-    const std::string &path, const QueueTypeInfo &type,
-    std::function<void(const Message *message)> watcher) {
+    const ::std::string &path, const QueueTypeInfo &type,
+    ::std::function<void(const Message *message)> watcher) {
   Take(path);
-  auto *state = new internal::WatcherThreadState(
-      thread_state_,
-      RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length),
-      std::move(watcher));
-
-  std::thread thread([state] {
-    state->Run();
-    delete state;
-  });
-  thread.detach();
+  ::std::unique_ptr<internal::WatcherThreadState> state(
+      new internal::WatcherThreadState(
+          &thread_state_, RawQueue::Fetch(path.c_str(), type.size, type.hash,
+                                          type.queue_length),
+          std::move(watcher)));
+  watchers_.push_back(::std::move(state));
 }
 
 TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
-  internal::TimerHandlerState *timer =
-      new internal::TimerHandlerState(thread_state_, ::std::move(callback));
+  ::std::unique_ptr<internal::TimerHandlerState> timer(
+      new internal::TimerHandlerState(this, ::std::move(callback)));
 
-  ::std::thread t([timer] {
-    timer->Run();
-    delete timer;
-  });
-  t.detach();
+  timers_.push_back(::std::move(timer));
 
-  return timer;
+  return timers_.back().get();
 }
 
-void ShmEventLoop::OnRun(std::function<void()> on_run) {
-  on_run_.push_back(std::move(on_run));
+void ShmEventLoop::OnRun(::std::function<void()> on_run) {
+  on_run_.push_back(::std::move(on_run));
 }
 
 void ShmEventLoop::Run() {
-  thread_state_->MaybeSetCurrentThreadRealtimePriority();
+  // Start all the watcher threads.
+  for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
+    watcher->Start();
+  }
+
+  // Now, all the threads are up.  Go RT.
+  thread_state_.MaybeSetCurrentThreadRealtimePriority();
   set_is_running(true);
-  for (const auto &run : on_run_) run();
-  // TODO(austin): epoll event loop in main thread (if needed), and async safe
-  // quit handler.
-  thread_state_->Run();
+
+  // Now that we are realtime (but before the OnRun handlers run), snap the
+  // queue index.
+  for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
+    watcher->GrabQueueIndex();
+  }
+
+  // Now that we are RT, run all the OnRun handlers.
+  for (const auto &run : on_run_) {
+    run();
+  }
+  // TODO(austin): We don't need a separate watcher thread if there are only
+  // watchers and fetchers.  Could lazily create the epoll loop and pick a
+  // victim watcher to run in this thread.
+  // Trigger all the threads to start now.
+  thread_state_.Start();
+
+  // And start our main event loop which runs all the timers and handles Quit.
+  epoll_.Run();
+
+  // Once epoll exits, there is no useful nonrt work left to do.
+  set_is_running(false);
+
+  // Signal all the watcher threads to exit.  After this point, no more
+  // callbacks will be handled.
+  thread_state_.Exit();
+
+  // Nothing time or synchronization critical needs to happen after this point.
+  // Drop RT priority.
   ::aos::UnsetCurrentThreadRealtimePriority();
+
+  // The watcher threads get cleaned up in the destructor.
 }
 
-void ShmEventLoop::ThreadState::Run() {
+void ShmEventLoop::ThreadState::Start() {
   MutexLocker locker(&mutex_);
   loop_running_ = true;
   if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
   loop_running_cond_.Broadcast();
-  while (loop_running_) {
-    if (loop_running_cond_.Wait()) {
+}
+
+void ShmEventLoop::ThreadState::WaitForStart() {
+  MutexLocker locker(&mutex_);
+  while (!(loop_running_ || loop_finished_)) {
+    Condition::WaitResult wait_result =
+        loop_running_cond_.WaitTimed(chrono::milliseconds(1000));
+    if (wait_result == Condition::WaitResult::kOwnerDied) {
       ::aos::Die("ShmEventLoop mutex lock problem.\n");
     }
   }
@@ -291,19 +379,7 @@
   }
 }
 
-void ShmEventLoop::ThreadState::WaitForStart() {
-  MutexLocker locker(&mutex_);
-  while (!(loop_running_ || loop_finished_)) {
-    if (loop_running_cond_.Wait()) {
-      ::aos::Die("ShmEventLoop mutex lock problem.\n");
-    }
-  }
-}
-
-void ShmEventLoop::Exit() {
-  set_is_running(false);
-  thread_state_->Exit();
-}
+void ShmEventLoop::Exit() { epoll_.Quit(); }
 
 void ShmEventLoop::ThreadState::Exit() {
   IPCRecursiveMutexLocker locker(&mutex_);
@@ -319,7 +395,7 @@
   }
 }
 
-void ShmEventLoop::Take(const std::string &path) {
+void ShmEventLoop::Take(const ::std::string &path) {
   if (is_running()) {
     ::aos::Die("Cannot add new objects while running.\n");
   }