Run all timers in the main thread with epoll

This ditches 1+ threads in all use cases.

Change-Id: I8772a539c20cd10d80a57f3f53c48a3d8124461a
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 21e9677..7c2db52 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -1,4 +1,14 @@
 cc_library(
+    name = "epoll",
+    srcs = ["epoll.cc"],
+    hdrs = ["epoll.h"],
+    deps = [
+        "//aos/logging",
+        "//aos/time",
+    ],
+)
+
+cc_library(
     name = "raw-event-loop",
     hdrs = ["raw-event-loop.h"],
     deps = [
@@ -28,6 +38,7 @@
     hdrs = ["shm-event-loop.h"],
     visibility = ["//visibility:public"],
     deps = [
+        ":epoll",
         ":event-loop",
         "//aos:init",
         "//aos:queues",
@@ -38,6 +49,7 @@
 cc_test(
     name = "shm-event-loop_test",
     srcs = ["shm-event-loop_test.cc"],
+    shard_count = 5,
     deps = [
         ":event-loop_param_test",
         ":shm-event-loop",
diff --git a/aos/events/epoll.cc b/aos/events/epoll.cc
new file mode 100644
index 0000000..83f44c3
--- /dev/null
+++ b/aos/events/epoll.cc
@@ -0,0 +1,118 @@
+#include "aos/events/epoll.h"
+
+#include <fcntl.h>
+#include <sys/epoll.h>
+#include <sys/timerfd.h>
+#include <unistd.h>
+#include <atomic>
+#include <vector>
+
+#include "aos/logging/logging.h"
+#include "aos/time/time.h"
+
+namespace aos {
+namespace internal {
+
+TimerFd::TimerFd()
+    : fd_(timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK)) {
+  PCHECK(fd_ != -1);
+  Disable();
+}
+
+void TimerFd::SetTime(monotonic_clock::time_point start,
+                      monotonic_clock::duration interval) {
+  struct itimerspec new_value;
+  new_value.it_interval = ::aos::time::to_timespec(interval);
+  new_value.it_value = ::aos::time::to_timespec(start);
+  PCHECK(timerfd_settime(fd_, TFD_TIMER_ABSTIME, &new_value, nullptr) == 0);
+}
+
+void TimerFd::Read() {
+  uint64_t buf;
+  ssize_t result = read(fd_, &buf, sizeof(buf));
+  PCHECK(result != -1);
+  CHECK_EQ(result, static_cast<int>(sizeof(buf)));
+}
+
+EPoll::EPoll() : epoll_fd_(epoll_create1(EPOLL_CLOEXEC)) {
+  PCHECK(epoll_fd_ > 0);
+
+  // Create a pipe for the Quit function.  We want to use a pipe to be async
+  // safe so this can be called from signal handlers.
+  int pipefd[2];
+  PCHECK(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK) == 0);
+  quit_epoll_fd_ = pipefd[0];
+  quit_signal_fd_ = pipefd[1];
+  // Read the fd when data is sent and set run_ to false.
+  OnReadable(quit_epoll_fd_, [this]() {
+    run_ = false;
+    char buf[1];
+    PCHECK(read(quit_epoll_fd_, &buf[0], 1) == 1);
+  });
+}
+
+EPoll::~EPoll() {
+  // Clean up the quit pipe and epoll fd.
+  DeleteFd(quit_epoll_fd_);
+  close(quit_signal_fd_);
+  close(quit_epoll_fd_);
+  CHECK_EQ(fns_.size(), 0u);
+  close(epoll_fd_);
+}
+
+void EPoll::Run() {
+  while (true) {
+    // Pull a single event out.  Infinite timeout if we are supposed to be
+    // running, and 0 length timeout otherwise.  This lets us flush the event
+    // queue before quitting.
+    struct epoll_event event;
+    int num_events = epoll_wait(epoll_fd_, &event, 1, run_ ? -1 : 0);
+    // Retry on EINTR and nothing else.
+    if (num_events == -1) {
+      if (errno == EINTR) {
+        continue;
+      }
+      PCHECK(num_events != -1);
+    }
+    if (!run_) {
+      // If we ran out of events, quit.
+      if (num_events == 0) {
+        return;
+      }
+    }
+    EventData *event_data = static_cast<struct EventData *>(event.data.ptr);
+    if (event.events & (EPOLLIN | EPOLLPRI)) {
+      event_data->in_fn();
+    }
+  }
+}
+
+void EPoll::Quit() { PCHECK(write(quit_signal_fd_, "q", 1) == 1); }
+
+void EPoll::OnReadable(int fd, ::std::function<void()> function) {
+  ::std::unique_ptr<EventData> event_data(
+      new EventData{fd, ::std::move(function)});
+
+  struct epoll_event event;
+  event.events = EPOLLIN | EPOLLPRI;
+  event.data.ptr = event_data.get();
+  PCHECK(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event) != 0);
+  fns_.push_back(::std::move(event_data));
+}
+
+// Removes fd from the event loop.
+void EPoll::DeleteFd(int fd) {
+  auto element = fns_.begin();
+  PCHECK(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr) != 0);
+  while (fns_.end() != element) {
+    if (element->get()->fd == fd) {
+      fns_.erase(element);
+      return;
+    }
+    ++element;
+  }
+  LOG(FATAL, "fd %d not found\n", fd);
+}
+
+}  // namespace internal
+}  // namespace aos
diff --git a/aos/events/epoll.h b/aos/events/epoll.h
new file mode 100644
index 0000000..29311c9
--- /dev/null
+++ b/aos/events/epoll.h
@@ -0,0 +1,95 @@
+#ifndef AOS_EVENTS_EPOLL_H_
+#define AOS_EVENTS_EPOLL_H_
+
+#include <fcntl.h>
+#include <sys/epoll.h>
+#include <sys/timerfd.h>
+#include <unistd.h>
+#include <atomic>
+#include <vector>
+
+#include "aos/logging/logging.h"
+#include "aos/time/time.h"
+
+namespace aos {
+namespace internal {
+
+// Class wrapping up timerfd.
+class TimerFd {
+ public:
+  TimerFd();
+  ~TimerFd() { PCHECK(close(fd_) == 0); }
+
+  TimerFd(const TimerFd &) = delete;
+  TimerFd &operator=(const TimerFd &) = delete;
+  TimerFd(TimerFd &&) = delete;
+  TimerFd &operator=(TimerFd &&) = delete;
+
+  // Sets the trigger time and repeat for the timerfd.
+  // An interval of 0 results in a single expiration.
+  void SetTime(monotonic_clock::time_point start,
+               monotonic_clock::duration interval);
+
+  // Disarms the timer.
+  void Disable() {
+    // Disarm the timer by feeding zero values
+    SetTime(::aos::monotonic_clock::epoch(), ::aos::monotonic_clock::zero());
+  }
+
+  // Reads the event.  Ignores it.
+  void Read();
+
+  // Returns the file descriptor associated with the timerfd.
+  int fd() { return fd_; }
+
+ private:
+  int fd_ = -1;
+};
+
+// Class to wrap epoll and call a callback when an event happens.
+class EPoll {
+ public:
+  EPoll();
+  ~EPoll();
+  EPoll(const EPoll &) = delete;
+  EPoll &operator=(const EPoll &) = delete;
+  EPoll(EPoll &&) = delete;
+  EPoll &operator=(EPoll &&) = delete;
+
+  // Runs until Quit() is called.
+  void Run();
+
+  // Quits.  Async safe.
+  void Quit();
+
+  // Registers a function to be called if the fd becomes readable.
+  // There should only be 1 function registered for each fd.
+  void OnReadable(int fd, ::std::function<void()> function);
+
+  // Removes fd from the event loop.
+  // All Fds must be cleaned up before this class is destroyed.
+  void DeleteFd(int fd);
+
+ private:
+  ::std::atomic<bool> run_{true};
+
+  // Main epoll fd.
+  int epoll_fd_;
+
+  // Structure whose pointer should be returned by epoll.  Makes looking up the
+  // function fast and easy.
+  struct EventData {
+    int fd;
+    ::std::function<void()> in_fn;
+  };
+  ::std::vector<::std::unique_ptr<EventData>> fns_;
+
+  // Pipe pair for handling quit.
+  int quit_signal_fd_;
+  int quit_epoll_fd_;
+};
+
+}  // namespace internal
+}  // namespace aos
+
+#endif  // AOS_EVENTS_EPOLL_H_
diff --git a/aos/events/event-loop.h b/aos/events/event-loop.h
index 79651ef..1435723 100644
--- a/aos/events/event-loop.h
+++ b/aos/events/event-loop.h
@@ -29,8 +29,8 @@
 
  private:
   friend class EventLoop;
-  Fetcher(std::unique_ptr<RawFetcher> fetcher) : fetcher_(std::move(fetcher)) {}
-  std::unique_ptr<RawFetcher> fetcher_;
+  Fetcher(::std::unique_ptr<RawFetcher> fetcher) : fetcher_(::std::move(fetcher)) {}
+  ::std::unique_ptr<RawFetcher> fetcher_;
 };
 
 // Sends messages to a queue.
@@ -85,8 +85,8 @@
 
  private:
   friend class EventLoop;
-  Sender(std::unique_ptr<RawSender> sender) : sender_(std::move(sender)) {}
-  std::unique_ptr<RawSender> sender_;
+  Sender(::std::unique_ptr<RawSender> sender) : sender_(::std::move(sender)) {}
+  ::std::unique_ptr<RawSender> sender_;
 };
 
 // TODO(parker): Consider making EventLoop wrap a RawEventLoop rather than
@@ -105,14 +105,14 @@
   // Makes a class that will always fetch the most recent value
   // sent to path.
   template <typename T>
-  Fetcher<T> MakeFetcher(const std::string &path) {
+  Fetcher<T> MakeFetcher(const ::std::string &path) {
     return Fetcher<T>(MakeRawFetcher(path, QueueTypeInfo::Get<T>()));
   }
 
   // Makes class that allows constructing and sending messages to
   // address path.
   template <typename T>
-  Sender<T> MakeSender(const std::string &path) {
+  Sender<T> MakeSender(const ::std::string &path) {
     return Sender<T>(MakeRawSender(path, QueueTypeInfo::Get<T>()));
   }
 
@@ -123,11 +123,11 @@
   // Note that T needs to match both send and recv side.
   // TODO(parker): Need to support ::std::bind.  For now, use lambdas.
   template <typename Watch>
-  void MakeWatcher(const std::string &path, Watch &&w);
+  void MakeWatcher(const ::std::string &path, Watch &&w);
 
   // The passed in function will be called when the event loop starts.
   // Use this to run code once the thread goes into "real-time-mode",
-  virtual void OnRun(std::function<void()>) = 0;
+  virtual void OnRun(::std::function<void()>) = 0;
 
   // TODO(austin): OnExit
 
diff --git a/aos/events/event-loop_param_test.cc b/aos/events/event-loop_param_test.cc
index 5276aec..c74538c 100644
--- a/aos/events/event-loop_param_test.cc
+++ b/aos/events/event-loop_param_test.cc
@@ -26,10 +26,38 @@
                    ::std::chrono::milliseconds(duration));
 }
 
-// Tests that watcher and fetcher can fetch from a sender.
+// Tests that watcher can receive messages from a sender.
 // Also tests that OnRun() works.
 TEST_P(AbstractEventLoopTest, Basic) {
   auto loop1 = Make();
+  auto loop2 = MakePrimary();
+
+  auto sender = loop1->MakeSender<TestMessage>("/test");
+
+  bool happened = false;
+
+  loop2->OnRun([&]() {
+    happened = true;
+
+    auto msg = sender.MakeMessage();
+    msg->msg_value = 200;
+    msg.Send();
+  });
+
+  loop2->MakeWatcher("/test", [&](const TestMessage &message) {
+    EXPECT_EQ(message.msg_value, 200);
+    loop2->Exit();
+  });
+
+  EXPECT_FALSE(happened);
+  Run();
+  EXPECT_TRUE(happened);
+}
+
+// Tests that a fetcher can fetch from a sender.
+// Also tests that OnRun() works.
+TEST_P(AbstractEventLoopTest, FetchWithoutRun) {
+  auto loop1 = Make();
   auto loop2 = Make();
   auto loop3 = MakePrimary();
 
@@ -39,15 +67,6 @@
 
   EXPECT_FALSE(fetcher.Fetch());
 
-  bool happened = false;
-
-  loop3->OnRun([&]() { happened = true; });
-
-  loop3->MakeWatcher("/test", [&](const TestMessage &message) {
-    EXPECT_EQ(message.msg_value, 200);
-    loop3->Exit();
-  });
-
   auto msg = sender.MakeMessage();
   msg->msg_value = 200;
   msg.Send();
@@ -55,10 +74,6 @@
   EXPECT_TRUE(fetcher.Fetch());
   ASSERT_FALSE(fetcher.get() == nullptr);
   EXPECT_EQ(fetcher->msg_value, 200);
-
-  EXPECT_FALSE(happened);
-  Run();
-  EXPECT_TRUE(happened);
 }
 
 // Tests that watcher will receive all messages sent if they are sent after
@@ -78,16 +93,25 @@
     }
   });
 
+  // Before Run, should be ignored.
   {
     auto msg = sender.MakeMessage();
-    msg->msg_value = 200;
+    msg->msg_value = 199;
     msg.Send();
   }
-  {
-    auto msg = sender.MakeMessage();
-    msg->msg_value = 201;
-    msg.Send();
-  }
+
+  loop2->OnRun([&]() {
+    {
+      auto msg = sender.MakeMessage();
+      msg->msg_value = 200;
+      msg.Send();
+    }
+    {
+      auto msg = sender.MakeMessage();
+      msg->msg_value = 201;
+      msg.Send();
+    }
+  });
 
   Run();
 
@@ -361,7 +385,7 @@
 }
 
 // Verify that registering a watcher twice for "/test" fails.
-TEST_P(AbstractEventLoopTest, TwoWatcher) {
+TEST_P(AbstractEventLoopDeathTest, TwoWatcher) {
   auto loop = Make();
   loop->MakeWatcher("/test", [&](const TestMessage &) {});
   EXPECT_DEATH(loop->MakeWatcher("/test", [&](const TestMessage &) {}),
@@ -369,7 +393,7 @@
 }
 
 // Verify that SetRuntimeRealtimePriority fails while running.
-TEST_P(AbstractEventLoopTest, SetRuntimeRealtimePriority) {
+TEST_P(AbstractEventLoopDeathTest, SetRuntimeRealtimePriority) {
   auto loop = MakePrimary();
   // Confirm that runtime priority calls work when not realtime.
   loop->SetRuntimeRealtimePriority(5);
@@ -380,13 +404,33 @@
 }
 
 // Verify that registering a watcher and a sender for "/test" fails.
-TEST_P(AbstractEventLoopTest, WatcherAndSender) {
+TEST_P(AbstractEventLoopDeathTest, WatcherAndSender) {
   auto loop = Make();
   auto sender = loop->MakeSender<TestMessage>("/test");
   EXPECT_DEATH(loop->MakeWatcher("/test", [&](const TestMessage &) {}),
                "/test");
 }
 
+// Verify that we can't create a sender inside OnRun.
+TEST_P(AbstractEventLoopDeathTest, SenderInOnRun) {
+  auto loop1 = MakePrimary();
+
+  loop1->OnRun(
+      [&]() { auto sender = loop1->MakeSender<TestMessage>("/test2"); });
+
+  EXPECT_DEATH(Run(), "running");
+}
+
+// Verify that we can't create a watcher inside OnRun.
+TEST_P(AbstractEventLoopDeathTest, WatcherInOnRun) {
+  auto loop1 = MakePrimary();
+
+  loop1->OnRun(
+      [&]() { loop1->MakeWatcher("/test", [&](const TestMessage &) {}); });
+
+  EXPECT_DEATH(Run(), "running");
+}
+
 // Verify that Quit() works when there are multiple watchers.
 TEST_P(AbstractEventLoopTest, MultipleWatcherQuit) {
   auto loop1 = Make();
@@ -399,11 +443,12 @@
   });
 
   auto sender = loop1->MakeSender<TestMessage>("/test2");
-  {
+
+  loop2->OnRun([&]() {
     auto msg = sender.MakeMessage();
     msg->msg_value = 200;
     msg.Send();
-  }
+  });
 
   Run();
 }
diff --git a/aos/events/event-loop_param_test.h b/aos/events/event-loop_param_test.h
index b5574ba..1d2060a 100644
--- a/aos/events/event-loop_param_test.h
+++ b/aos/events/event-loop_param_test.h
@@ -23,10 +23,10 @@
   virtual void Run() = 0;
 };
 
-class AbstractEventLoopTest
+class AbstractEventLoopTestBase
     : public ::testing::TestWithParam<std::function<EventLoopTestFactory *()>> {
  public:
-  AbstractEventLoopTest() { factory_.reset(GetParam()()); }
+  AbstractEventLoopTestBase() { factory_.reset(GetParam()()); }
 
   ::std::unique_ptr<EventLoop> Make() { return factory_->Make(); }
   ::std::unique_ptr<EventLoop> MakePrimary() { return factory_->MakePrimary(); }
@@ -39,6 +39,9 @@
   ::std::unique_ptr<EventLoopTestFactory> factory_;
 };
 
+typedef AbstractEventLoopTestBase AbstractEventLoopDeathTest;
+typedef AbstractEventLoopTestBase AbstractEventLoopTest;
+
 }  // namespace testing
 }  // namespace aos
 
diff --git a/aos/events/shm-event-loop.cc b/aos/events/shm-event-loop.cc
index ee71b4c..08f9e1f 100644
--- a/aos/events/shm-event-loop.cc
+++ b/aos/events/shm-event-loop.cc
@@ -6,15 +6,19 @@
 #include <chrono>
 #include <stdexcept>
 
+#include "aos/events/epoll.h"
 #include "aos/init.h"
 #include "aos/logging/logging.h"
 #include "aos/queue.h"
 
 namespace aos {
 
-ShmEventLoop::ShmEventLoop() : thread_state_(std::make_shared<ThreadState>()) {}
+ShmEventLoop::ShmEventLoop() {}
 
 namespace {
+
+namespace chrono = ::std::chrono;
+
 class ShmFetcher : public RawFetcher {
  public:
   explicit ShmFetcher(RawQueue *queue) : queue_(queue) {
@@ -79,13 +83,13 @@
  public:
   explicit ShmSender(RawQueue *queue) : queue_(queue) {}
 
-  aos::Message *GetMessage() override {
-    return reinterpret_cast<aos::Message *>(queue_->GetMessage());
+  ::aos::Message *GetMessage() override {
+    return reinterpret_cast<::aos::Message *>(queue_->GetMessage());
   }
 
-  void Free(aos::Message *msg) override { queue_->FreeMessage(msg); }
+  void Free(::aos::Message *msg) override { queue_->FreeMessage(msg); }
 
-  bool Send(aos::Message *msg) override {
+  bool Send(::aos::Message *msg) override {
     assert(queue_ != nullptr);
     {
       // TODO(austin): This lets multiple senders reorder messages since time
@@ -102,18 +106,60 @@
  private:
   RawQueue *queue_;
 };
+
 }  // namespace
 
 namespace internal {
+
+// Class to manage the state for a Watcher.
 class WatcherThreadState {
  public:
-  WatcherThreadState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
-                     RawQueue *queue,
-                     std::function<void(const aos::Message *message)> watcher)
-      : thread_state_(std::move(thread_state)),
+  WatcherThreadState(
+      ShmEventLoop::ThreadState *thread_state, RawQueue *queue,
+      ::std::function<void(const ::aos::Message *message)> watcher)
+      : thread_state_(thread_state),
         queue_(queue),
         index_(0),
-        watcher_(std::move(watcher)) {
+        watcher_(::std::move(watcher)) {}
+
+  ~WatcherThreadState() {
+    // Only kill the thread if it is running.
+    if (running_) {
+      // TODO(austin): CHECK that we aren't RT here.
+
+      // Try joining.  If we fail, we weren't asleep on the condition in the
+      // queue.  So hit it again and again until that's true.
+      struct timespec end_time;
+      PCHECK(clock_gettime(CLOCK_REALTIME, &end_time) == 0);
+      while (true) {
+        void *retval = nullptr;
+        end_time.tv_nsec += 100000000;
+        if (end_time.tv_nsec > 1000000000L) {
+          end_time.tv_nsec -= 1000000000L;
+          ++end_time.tv_sec;
+        }
+        int ret = pthread_timedjoin_np(pthread_, &retval, &end_time);
+        if (ret == ETIMEDOUT) continue;
+        PCHECK(ret == 0);
+        break;
+      }
+    }
+  }
+
+  // Starts the thread and waits until it is running.
+  void Start() {
+    PCHECK(pthread_create(&pthread_, nullptr, &StaticRun, this) == 0);
+    IPCRecursiveMutexLocker locker(&thread_started_mutex_);
+    if (locker.owner_died()) ::aos::Die("Owner died");
+    while (!running_) {
+      CHECK(!thread_started_condition_.Wait());
+    }
+  }
+
+  void GrabQueueIndex() {
+    // Right after we are signaled to start, point index to the current index
+    // so we don't read any messages received before now.  Otherwise we will
+    // get a significantly delayed read.
     static constexpr Options<RawQueue> kOptions =
         RawQueue::kFromEnd | RawQueue::kNonBlock;
     const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
@@ -122,10 +168,31 @@
     }
   }
 
+ private:
+  // Runs Run given a WatcherThreadState as the argument.  This is an adapter
+  // between pthreads and Run.
+  static void *StaticRun(void *arg) {
+    WatcherThreadState *watcher_thread_state =
+        reinterpret_cast<WatcherThreadState *>(arg);
+    watcher_thread_state->Run();
+    return nullptr;
+  }
+
+  // Runs the watcher callback on new messages.
   void Run() {
+    // Signal the main thread that we are now ready.
     thread_state_->MaybeSetCurrentThreadRealtimePriority();
+    {
+      IPCRecursiveMutexLocker locker(&thread_started_mutex_);
+      if (locker.owner_died()) ::aos::Die("Owner died");
+      running_ = true;
+      thread_started_condition_.Broadcast();
+    }
+
+    // Wait for the global start before handling events.
     thread_state_->WaitForStart();
 
+    // Bail immediately if we are supposed to stop.
     if (!thread_state_->is_running()) {
       ::aos::UnsetCurrentThreadRealtimePriority();
       return;
@@ -133,10 +200,18 @@
 
     const void *msg = nullptr;
     while (true) {
-      msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_);
-      assert(msg != nullptr);
+      msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_,
+                                     chrono::seconds(1));
+      // We hit a timeout.  Confirm that we should be running and retry.  Note,
+      // is_running is threadsafe (it's an atomic underneath).  Worst case, we
+      // check again in a second.
+      if (msg == nullptr) {
+        if (!thread_state_->is_running()) break;
+        continue;
+      }
 
       {
+        // Grab the lock so that only one callback can be called at a time.
         MutexLocker locker(&thread_state_->mutex_);
         if (!thread_state_->is_running()) break;
 
@@ -144,142 +219,155 @@
         // watcher_ may have exited the event loop.
         if (!thread_state_->is_running()) break;
       }
+      // Drop the reference.
       queue_->FreeMessage(msg);
     }
 
+    // And drop the last reference.
     queue_->FreeMessage(msg);
+    // Now that everything is cleaned up, drop RT priority before destroying the
+    // thread.
     ::aos::UnsetCurrentThreadRealtimePriority();
   }
-
- private:
-  std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
+  pthread_t pthread_;
+  ShmEventLoop::ThreadState *thread_state_;
   RawQueue *queue_;
   int32_t index_;
+  bool running_ = false;
 
-  std::function<void(const Message *message)> watcher_;
+  ::std::function<void(const Message *message)> watcher_;
+
+  // Mutex and condition variable used to wait until the thread is started
+  // before going RT.
+  ::aos::Mutex thread_started_mutex_;
+  ::aos::Condition thread_started_condition_{&thread_started_mutex_};
 };
 
+// Adapter class to adapt a timerfd to a TimerHandler.
 class TimerHandlerState : public TimerHandler {
  public:
-  TimerHandlerState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
-                    ::std::function<void()> fn)
-      : thread_state_(std::move(thread_state)), fn_(::std::move(fn)) {
-    fd_ = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
-    PCHECK(fd_ != -1);
+  TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
+      : shm_event_loop_(shm_event_loop), fn_(::std::move(fn)) {
+    shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
+      MutexLocker locker(&shm_event_loop_->thread_state_.mutex_);
+      timerfd_.Read();
+      fn_();
+    });
   }
 
-  ~TimerHandlerState() {
-    PCHECK(close(fd_) == 0);
-  }
+  ~TimerHandlerState() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
 
   void Setup(monotonic_clock::time_point base,
              monotonic_clock::duration repeat_offset) override {
-    struct itimerspec new_value;
-    new_value.it_interval = ::aos::time::to_timespec(repeat_offset);
-    new_value.it_value = ::aos::time::to_timespec(base);
-    PCHECK(timerfd_settime(fd_, TFD_TIMER_ABSTIME, &new_value, nullptr) == 0);
+    timerfd_.SetTime(base, repeat_offset);
   }
 
-  void Disable() override {
-    // Disarm the timer by feeding zero values
-    Setup(::aos::monotonic_clock::epoch(), ::aos::monotonic_clock::zero());
-  }
-
-  void Run() {
-    thread_state_->MaybeSetCurrentThreadRealtimePriority();
-    thread_state_->WaitForStart();
-
-    while (true) {
-      uint64_t buf;
-      ssize_t result = read(fd_, &buf, sizeof(buf));
-      PCHECK(result != -1);
-      CHECK_EQ(result, static_cast<int>(sizeof(buf)));
-
-      {
-        MutexLocker locker(&thread_state_->mutex_);
-        if (!thread_state_->is_running()) break;
-        fn_();
-        // fn_ may have exited the event loop.
-        if (!thread_state_->is_running()) break;
-      }
-    }
-    ::aos::UnsetCurrentThreadRealtimePriority();
-  }
+  void Disable() override { timerfd_.Disable(); }
 
  private:
-  std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
+  ShmEventLoop *shm_event_loop_;
 
-  // File descriptor for the timer
-  int fd_;
+  TimerFd timerfd_;
 
   // Function to be run on the thread
   ::std::function<void()> fn_;
 };
 }  // namespace internal
 
-std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
-    const std::string &path, const QueueTypeInfo &type) {
-  return std::unique_ptr<RawFetcher>(new ShmFetcher(
+::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
+    const ::std::string &path, const QueueTypeInfo &type) {
+  return ::std::unique_ptr<RawFetcher>(new ShmFetcher(
       RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
 }
 
-std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
-    const std::string &path, const QueueTypeInfo &type) {
+::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
+    const ::std::string &path, const QueueTypeInfo &type) {
   Take(path);
-  return std::unique_ptr<RawSender>(new ShmSender(
+  return ::std::unique_ptr<RawSender>(new ShmSender(
       RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
 }
 
 void ShmEventLoop::MakeRawWatcher(
-    const std::string &path, const QueueTypeInfo &type,
-    std::function<void(const Message *message)> watcher) {
+    const ::std::string &path, const QueueTypeInfo &type,
+    ::std::function<void(const Message *message)> watcher) {
   Take(path);
-  auto *state = new internal::WatcherThreadState(
-      thread_state_,
-      RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length),
-      std::move(watcher));
-
-  std::thread thread([state] {
-    state->Run();
-    delete state;
-  });
-  thread.detach();
+  ::std::unique_ptr<internal::WatcherThreadState> state(
+      new internal::WatcherThreadState(
+          &thread_state_, RawQueue::Fetch(path.c_str(), type.size, type.hash,
+                                          type.queue_length),
+          std::move(watcher)));
+  watchers_.push_back(::std::move(state));
 }
 
 TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
-  internal::TimerHandlerState *timer =
-      new internal::TimerHandlerState(thread_state_, ::std::move(callback));
+  ::std::unique_ptr<internal::TimerHandlerState> timer(
+      new internal::TimerHandlerState(this, ::std::move(callback)));
 
-  ::std::thread t([timer] {
-    timer->Run();
-    delete timer;
-  });
-  t.detach();
+  timers_.push_back(::std::move(timer));
 
-  return timer;
+  return timers_.back().get();
 }
 
-void ShmEventLoop::OnRun(std::function<void()> on_run) {
-  on_run_.push_back(std::move(on_run));
+void ShmEventLoop::OnRun(::std::function<void()> on_run) {
+  on_run_.push_back(::std::move(on_run));
 }
 
 void ShmEventLoop::Run() {
-  thread_state_->MaybeSetCurrentThreadRealtimePriority();
+  // Start all the watcher threads.
+  for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
+    watcher->Start();
+  }
+
+  // Now, all the threads are up.  Go RT.
+  thread_state_.MaybeSetCurrentThreadRealtimePriority();
   set_is_running(true);
-  for (const auto &run : on_run_) run();
-  // TODO(austin): epoll event loop in main thread (if needed), and async safe
-  // quit handler.
-  thread_state_->Run();
+
+  // Now that we are realtime (but before the OnRun handlers run), snap the
+  // queue index.
+  for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
+    watcher->GrabQueueIndex();
+  }
+
+  // Now that we are RT, run all the OnRun handlers.
+  for (const auto &run : on_run_) {
+    run();
+  }
+  // TODO(austin): We don't need a separate watcher thread if there are only
+  // watchers and fetchers.  Could lazily create the epoll loop and pick a
+  // victim watcher to run in this thread.
+  // Trigger all the threads to start now.
+  thread_state_.Start();
+
+  // And start our main event loop which runs all the timers and handles Quit.
+  epoll_.Run();
+
+  // Once epoll exits, there is no useful nonrt work left to do.
+  set_is_running(false);
+
+  // Signal all the watcher threads to exit.  After this point, no more
+  // callbacks will be handled.
+  thread_state_.Exit();
+
+  // Nothing time or synchronization critical needs to happen after this point.
+  // Drop RT priority.
   ::aos::UnsetCurrentThreadRealtimePriority();
+
+  // The watcher threads get cleaned up in the destructor.
 }
 
-void ShmEventLoop::ThreadState::Run() {
+void ShmEventLoop::ThreadState::Start() {
   MutexLocker locker(&mutex_);
   loop_running_ = true;
   if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
   loop_running_cond_.Broadcast();
-  while (loop_running_) {
-    if (loop_running_cond_.Wait()) {
+}
+
+void ShmEventLoop::ThreadState::WaitForStart() {
+  MutexLocker locker(&mutex_);
+  while (!(loop_running_ || loop_finished_)) {
+    Condition::WaitResult wait_result =
+        loop_running_cond_.WaitTimed(chrono::milliseconds(1000));
+    if (wait_result == Condition::WaitResult::kOwnerDied) {
       ::aos::Die("ShmEventLoop mutex lock problem.\n");
     }
   }
@@ -291,19 +379,7 @@
   }
 }
 
-void ShmEventLoop::ThreadState::WaitForStart() {
-  MutexLocker locker(&mutex_);
-  while (!(loop_running_ || loop_finished_)) {
-    if (loop_running_cond_.Wait()) {
-      ::aos::Die("ShmEventLoop mutex lock problem.\n");
-    }
-  }
-}
-
-void ShmEventLoop::Exit() {
-  set_is_running(false);
-  thread_state_->Exit();
-}
+void ShmEventLoop::Exit() { epoll_.Quit(); }
 
 void ShmEventLoop::ThreadState::Exit() {
   IPCRecursiveMutexLocker locker(&mutex_);
@@ -319,7 +395,7 @@
   }
 }
 
-void ShmEventLoop::Take(const std::string &path) {
+void ShmEventLoop::Take(const ::std::string &path) {
   if (is_running()) {
     ::aos::Die("Cannot add new objects while running.\n");
   }
diff --git a/aos/events/shm-event-loop.h b/aos/events/shm-event-loop.h
index 3d231ba..79101de 100644
--- a/aos/events/shm-event-loop.h
+++ b/aos/events/shm-event-loop.h
@@ -3,9 +3,11 @@
 
 #include <unordered_set>
 #include <vector>
+
 #include "aos/condition.h"
-#include "aos/mutex/mutex.h"
+#include "aos/events/epoll.h"
 #include "aos/events/event-loop.h"
+#include "aos/mutex/mutex.h"
 
 namespace aos {
 namespace internal {
@@ -30,26 +32,28 @@
     return ::aos::monotonic_clock::now();
   }
 
-  std::unique_ptr<RawSender> MakeRawSender(const std::string &path,
-                                           const QueueTypeInfo &type) override;
-  std::unique_ptr<RawFetcher> MakeRawFetcher(
-      const std::string &path, const QueueTypeInfo &type) override;
+  ::std::unique_ptr<RawSender> MakeRawSender(
+      const ::std::string &path, const QueueTypeInfo &type) override;
+  ::std::unique_ptr<RawFetcher> MakeRawFetcher(
+      const ::std::string &path, const QueueTypeInfo &type) override;
 
   void MakeRawWatcher(
-      const std::string &path, const QueueTypeInfo &type,
-      std::function<void(const aos::Message *message)> watcher) override;
+      const ::std::string &path, const QueueTypeInfo &type,
+      ::std::function<void(const aos::Message *message)> watcher) override;
 
   TimerHandler *AddTimer(::std::function<void()> callback) override;
 
-  void OnRun(std::function<void()> on_run) override;
+  void OnRun(::std::function<void()> on_run) override;
   void Run() override;
   void Exit() override;
 
+  // TODO(austin): Add a function to register control-C call.
+
   void SetRuntimeRealtimePriority(int priority) override {
     if (is_running()) {
       ::aos::Die("Cannot set realtime priority while running.");
     }
-    thread_state_->priority_ = priority;
+    thread_state_.priority_ = priority;
   }
 
  private:
@@ -65,7 +69,7 @@
 
     bool is_running() { return loop_running_; }
 
-    void Run();
+    void Start();
 
     void Exit();
 
@@ -77,23 +81,26 @@
     friend class ShmEventLoop;
 
     // This mutex ensures that only one watch event happens at a time.
-    aos::Mutex mutex_;
+    ::aos::Mutex mutex_;
     // Block on this until the loop starts.
-    aos::Condition loop_running_cond_{&mutex_};
+    ::aos::Condition loop_running_cond_{&mutex_};
     // Used to notify watchers that the loop is done.
-    std::atomic<bool> loop_running_{false};
+    ::std::atomic<bool> loop_running_{false};
     bool loop_finished_ = false;
     int priority_ = -1;
   };
 
-  // Exclude multiple of the same type for path.
+  // Tracks that we can't have multiple watchers or a sender and a watcher (or
+  // multiple senders) on a single queue (path).
+  void Take(const ::std::string &path);
 
-  std::vector<std::function<void()>> on_run_;
-  std::shared_ptr<ThreadState> thread_state_;
+  ::std::vector<::std::function<void()>> on_run_;
+  ThreadState thread_state_;
+  ::std::vector<::std::string> taken_;
+  internal::EPoll epoll_;
 
-  void Take(const std::string &path);
-
-  std::vector<::std::string> taken_;
+  ::std::vector<::std::unique_ptr<internal::TimerHandlerState>> timers_;
+  ::std::vector<::std::unique_ptr<internal::WatcherThreadState>> watchers_;
 };
 
 }  // namespace aos
diff --git a/aos/events/shm-event-loop_test.cc b/aos/events/shm-event-loop_test.cc
index efa8545..61adbe3 100644
--- a/aos/events/shm-event-loop_test.cc
+++ b/aos/events/shm-event-loop_test.cc
@@ -35,6 +35,11 @@
                           return new ShmEventLoopTestFactory();
                         }));
 
+INSTANTIATE_TEST_CASE_P(ShmEventLoopDeathTest, AbstractEventLoopDeathTest,
+                        ::testing::Values([]() {
+                          return new ShmEventLoopTestFactory();
+                        }));
+
 struct TestMessage : public ::aos::Message {
   enum { kQueueLength = 100, kHash = 0x696c0cdc };
   int msg_value;
diff --git a/aos/events/simulated-event-loop.cc b/aos/events/simulated-event-loop.cc
index 3836283..04e4628 100644
--- a/aos/events/simulated-event-loop.cc
+++ b/aos/events/simulated-event-loop.cc
@@ -313,9 +313,11 @@
 
 void SimulatedQueue::Send(RefCountedBuffer message) {
   latest_message_ = message;
-  for (auto &watcher : watchers_) {
-    scheduler_->Schedule(scheduler_->monotonic_now(),
-                         [watcher, message]() { watcher(message.get()); });
+  if (scheduler_->is_running()) {
+    for (auto &watcher : watchers_) {
+      scheduler_->Schedule(scheduler_->monotonic_now(),
+                           [watcher, message]() { watcher(message.get()); });
+    }
   }
   for (auto &fetcher : fetchers_) {
     fetcher->Enqueue(message);
diff --git a/aos/events/simulated-event-loop.h b/aos/events/simulated-event-loop.h
index 6ee7d92..81b181f 100644
--- a/aos/events/simulated-event-loop.h
+++ b/aos/events/simulated-event-loop.h
@@ -120,6 +120,8 @@
     is_running_ = false;
   }
 
+  bool is_running() const { return is_running_; }
+
   void AddRawEventLoop(RawEventLoop *event_loop) {
     raw_event_loops_.push_back(event_loop);
   }
diff --git a/aos/events/simulated-event-loop_test.cc b/aos/events/simulated-event-loop_test.cc
index 41d4dde..bcc6f28 100644
--- a/aos/events/simulated-event-loop_test.cc
+++ b/aos/events/simulated-event-loop_test.cc
@@ -22,6 +22,11 @@
    SimulatedEventLoopFactory event_loop_factory_;
 };
 
+INSTANTIATE_TEST_CASE_P(SimulatedEventLoopDeathTest, AbstractEventLoopDeathTest,
+                        ::testing::Values([]() {
+                          return new SimulatedEventLoopTestFactory();
+                        }));
+
 INSTANTIATE_TEST_CASE_P(SimulatedEventLoopTest, AbstractEventLoopTest,
                         ::testing::Values([]() {
                           return new SimulatedEventLoopTestFactory();