Prototype event-loop.h for queue dependency injection.

Change-Id: I1af3f71a5b0cca741641f872f55dab10474ee064
diff --git a/aos/events/BUILD b/aos/events/BUILD
new file mode 100644
index 0000000..80495fe
--- /dev/null
+++ b/aos/events/BUILD
@@ -0,0 +1,52 @@
+
+cc_library(
+  name = "raw-event-loop",
+  hdrs = ["raw-event-loop.h"],
+  deps = [
+    "//aos/common:time",
+    "//aos/common:queues",
+  ],
+)
+
+cc_library(
+  name = "event-loop",
+  hdrs = ["event-loop.h", "raw-event-loop.h"],
+  srcs = ["event-loop-tmpl.h"],
+  deps = [
+    ":raw-event-loop",
+    "//aos/common:time",
+    "//aos/common:queues",
+  ],
+)
+
+cc_library(
+  name = "shm-event-loop",
+  hdrs = ["shm-event-loop.h"],
+  srcs = ["shm-event-loop.cc"],
+  deps = [
+    "//aos/common:queues",
+    "//aos/vision/events:intrusive_free_list",
+    ":event-loop",
+  ],
+)
+
+cc_test(
+  name = "shm-event-loop_test",
+  srcs = ["shm-event-loop_test.cc"],
+  deps = [
+    ":event-loop_param_test",
+    ":shm-event-loop",
+    "//aos/testing:test_shm",
+  ],
+)
+
+cc_library(
+  name = "event-loop_param_test",
+  srcs = ["event-loop_param_test.cc"],
+  hdrs = ["event-loop_param_test.h"],
+  deps = [
+    "event-loop",
+    "//aos/testing:googletest",
+  ],
+  testonly = True,
+)
diff --git a/aos/events/event-loop-tmpl.h b/aos/events/event-loop-tmpl.h
new file mode 100644
index 0000000..4c53be3
--- /dev/null
+++ b/aos/events/event-loop-tmpl.h
@@ -0,0 +1,40 @@
+#ifndef _AOS_EVENTS_EVENT_LOOP_TMPL_H_
+#define _AOS_EVENTS_EVENT_LOOP_TMPL_H_
+
+#include <type_traits>
+#include "aos/events/event-loop.h"
+
+namespace aos {
+
+// From a watch functor, this will extract the message type of the argument.
+// This is the template forward declaration, and it extracts the call operator
+// as a PTMF to be used by the following specialization.
+template <class T>
+struct watch_message_type_trait
+    : watch_message_type_trait<decltype(&T::operator())> {};
+
+// From a watch functor, this will extract the message type of the argument.
+// This is the template specialization.
+template <class ClassType, class ReturnType, class A1>
+struct watch_message_type_trait<ReturnType (ClassType::*)(A1) const> {
+  using message_type = typename std::decay<A1>::type;
+};
+
+template <typename T>
+typename Sender<T>::Message Sender<T>::MakeMessage() {
+  return Message(sender_.get());
+}
+
+template <typename Watch>
+void EventLoop::MakeWatcher(const std::string &path, Watch &&w) {
+  using T = typename watch_message_type_trait<Watch>::message_type;
+
+  return MakeRawWatcher(path, QueueTypeInfo::Get<T>(),
+                        [w](const Message *message) {
+                          w(*reinterpret_cast<const T *>(message));
+                        });
+}
+
+}  // namespace aos
+
+#endif  // _AOS_EVENTS_EVENT_LOOP_TMPL_H
diff --git a/aos/events/event-loop.h b/aos/events/event-loop.h
new file mode 100644
index 0000000..d3cbf3f
--- /dev/null
+++ b/aos/events/event-loop.h
@@ -0,0 +1,131 @@
+#ifndef _AOS_EVENTS_EVENT_LOOP_H_
+#define _AOS_EVENTS_EVENT_LOOP_H_
+
+#include <string>
+#include "aos/common/queue.h"
+#include "aos/common/time.h"
+#include "aos/events/raw-event-loop.h"
+
+namespace aos {
+
+// Fetches the newest message from a queue.
+template <typename T>
+class Fetcher {
+ public:
+  Fetcher() {}
+  // Fetches the most recent message. Returns whether it fetched a new message.
+  bool Fetch() { return fetcher_->Fetch(); }
+
+  const T *get() const {
+    return reinterpret_cast<const T *>(fetcher_->most_recent());
+  }
+  const T &operator*() const { return *get(); }
+  const T *operator->() const { return get(); }
+
+ private:
+  friend class EventLoop;
+  Fetcher(std::unique_ptr<RawFetcher> fetcher) : fetcher_(std::move(fetcher)) {}
+  std::unique_ptr<RawFetcher> fetcher_;
+};
+
+// Sends messages to a queue.
+template <typename T>
+class Sender {
+ public:
+  Sender() {}
+
+  // Represents a single message about to be sent to the queue.
+  // The lifecycle goes:
+  //
+  // Message msg = sender.MakeMessage();
+  // Populate(msg.get());
+  // msg.Send();
+  //
+  // Or:
+  //
+  // Message msg = sender.MakeMessage();
+  // PopulateOrNot(msg.get());
+  class Message {
+   public:
+    Message(RawSender *sender)
+        : msg_(reinterpret_cast<T *>(sender->GetContext()), *sender) {
+      msg_->Zero();
+    }
+
+    T *get() { return msg_.get(); }
+    const T *get() const { return msg_.get(); }
+    T &operator*() { return *get(); }
+    T *operator->() { return get(); }
+    const T &operator*() const { return *get(); }
+    const T *operator->() const { return get(); }
+
+    // Sends the message to the queue. Should only be called once.
+    void Send() {
+      RawSender *sender = &msg_.get_deleter();
+      msg_->SetTimeToNow();
+      sender->Send(reinterpret_cast<RawSender::SendContext *>(msg_.release()));
+    }
+
+   private:
+    std::unique_ptr<T, RawSender &> msg_;
+  };
+
+  // Constructs an above message.
+  Message MakeMessage();
+
+ private:
+  friend class EventLoop;
+  Sender(std::unique_ptr<RawSender> sender) : sender_(std::move(sender)) {}
+  std::unique_ptr<RawSender> sender_;
+};
+
+// TODO(parker): Consider making EventLoop wrap a RawEventLoop rather than
+// inheriting.
+class EventLoop : public RawEventLoop {
+ public:
+  virtual ~EventLoop() {}
+
+  // Current time.
+  virtual monotonic_clock::time_point monotonic_now() = 0;
+
+  // Makes a class that will always fetch the most recent value
+  // sent to path.
+  template <typename T>
+  Fetcher<T> MakeFetcher(const std::string &path) {
+    return Fetcher<T>(MakeRawFetcher(path, QueueTypeInfo::Get<T>()));
+  }
+
+  // Makes class that allows constructing and sending messages to
+  // address path.
+  template <typename T>
+  Sender<T> MakeSender(const std::string &path) {
+    return Sender<T>(MakeRawSender(path, QueueTypeInfo::Get<T>()));
+  }
+
+  // Watch is a functor that have a call signature like so:
+  // void Event(const MessageType& type);
+  //
+  // This will watch messages sent to path.
+  // Note that T needs to match both send and recv side.
+  template <typename Watch>
+  void MakeWatcher(const std::string &path, Watch &&w);
+
+  // The passed in function will be called when the event loop starts.
+  // Use this to run code once the thread goes into "real-time-mode",
+  virtual void OnRun(std::function<void()>) = 0;
+
+  // Starts receiving events.
+  virtual void Run() = 0;
+
+  // TODO(austin): Sort out how to switch to realtime on run.
+  // virtual void RunRealtime() = 0;
+
+  // Stops receiving events
+  virtual void Exit() = 0;
+};
+
+}  // namespace aos
+
+#include "aos/events/event-loop-tmpl.h"
+
+#endif  // _AOS_EVENTS_EVENT_LOOP_H
diff --git a/aos/events/event-loop_param_test.cc b/aos/events/event-loop_param_test.cc
new file mode 100644
index 0000000..6f04a7d
--- /dev/null
+++ b/aos/events/event-loop_param_test.cc
@@ -0,0 +1,92 @@
+#include "aos/events/event-loop_param_test.h"
+
+namespace aos {
+namespace testing {
+
+struct TestMessage : public ::aos::Message {
+  enum { kQueueLength = 100, kHash = 0x696c0cdc };
+  int msg_value;
+
+  void Zero() { msg_value = 0; }
+  static size_t Size() { return 1 + ::aos::Message::Size(); }
+  size_t Print(char *buffer, size_t length) const;
+  TestMessage() { Zero(); }
+};
+
+// Tests that watcher and fetcher can fetch from a sender.
+// Also tests that OnRun() works.
+TEST_P(AbstractEventLoopTest, Basic) {
+  auto loop1 = Make();
+  auto loop2 = Make();
+  auto loop3 = Make();
+
+  auto sender = loop1->MakeSender<TestMessage>("/test");
+
+  auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
+
+  auto msg = sender.MakeMessage();
+
+  msg->msg_value = 200;
+
+  msg.Send();
+
+  EXPECT_TRUE(fetcher.Fetch());
+  ASSERT_FALSE(fetcher.get() == nullptr);
+  EXPECT_EQ(fetcher->msg_value, 200);
+
+  bool happened = false;
+
+  loop3->OnRun([&]() { happened = true; });
+
+  loop3->MakeWatcher("/test", [&](const TestMessage &message) {
+    EXPECT_EQ(message.msg_value, 200);
+    loop3->Exit();
+  });
+
+  EXPECT_FALSE(happened);
+  loop3->Run();
+  EXPECT_TRUE(happened);
+}
+
+// Verify that making a fetcher and handler for "/test" dies.
+TEST_P(AbstractEventLoopTest, FetcherAndHandler) {
+  auto loop = Make();
+  auto fetcher = loop->MakeFetcher<TestMessage>("/test");
+  EXPECT_DEATH(loop->MakeWatcher("/test", [&](const TestMessage &) {}), "/test");
+}
+
+// Verify that making 2 fetchers for "/test" fails.
+TEST_P(AbstractEventLoopTest, TwoFetcher) {
+  auto loop = Make();
+  auto fetcher = loop->MakeFetcher<TestMessage>("/test");
+  EXPECT_DEATH(loop->MakeFetcher<TestMessage>("/test"), "/test");
+}
+
+// Verify that registering a handler twice for "/test" fails.
+TEST_P(AbstractEventLoopTest, TwoHandler) {
+  auto loop = Make();
+  loop->MakeWatcher("/test", [&](const TestMessage &) {});
+  EXPECT_DEATH(loop->MakeWatcher("/test", [&](const TestMessage &) {}), "/test");
+}
+
+// Verify that Quit() works when there are multiple watchers.
+TEST_P(AbstractEventLoopTest, MultipleFetcherQuit) {
+  auto loop = Make();
+
+  auto sender = loop->MakeSender<TestMessage>("/test2");
+  {
+    auto msg = sender.MakeMessage();
+    msg->msg_value = 200;
+    msg.Send();
+  }
+
+  loop->MakeWatcher("/test1", [&](const TestMessage &) {});
+  loop->MakeWatcher("/test2", [&](const TestMessage &message) {
+    EXPECT_EQ(message.msg_value, 200);
+    loop->Exit();
+  });
+  loop->Run();
+}
+
+}  // namespace testing
+}  // namespace aos
diff --git a/aos/events/event-loop_param_test.h b/aos/events/event-loop_param_test.h
new file mode 100644
index 0000000..ae215db
--- /dev/null
+++ b/aos/events/event-loop_param_test.h
@@ -0,0 +1,33 @@
+#ifndef _AOS_EVENTS_EVENT_LOOP_PARAM_TEST_H_
+#define _AOS_EVENTS_EVENT_LOOP_PARAM_TEST_H_
+
+#include "aos/events/event-loop.h"
+#include "gtest/gtest.h"
+
+namespace aos {
+namespace testing {
+
+class EventLoopTestFactory {
+ public:
+  virtual ~EventLoopTestFactory() {}
+
+  virtual std::unique_ptr<EventLoop> Make() = 0;
+};
+
+class AbstractEventLoopTest
+    : public ::testing::TestWithParam<std::function<EventLoopTestFactory *()>> {
+ public:
+  AbstractEventLoopTest() { factory_.reset(GetParam()()); }
+
+  std::unique_ptr<EventLoop> Make() { return factory_->Make(); }
+  // You can implement all the usual fixture class members here.
+  // To access the test parameter, call GetParam() from class
+  // TestWithParam<T>.
+ private:
+  std::unique_ptr<EventLoopTestFactory> factory_;
+};
+
+}  // namespace testing
+}  // namespace aos
+
+#endif  // _AOS_EVENTS_EVENT_LOOP_PARAM_TEST_H_
diff --git a/aos/events/raw-event-loop.h b/aos/events/raw-event-loop.h
new file mode 100644
index 0000000..38ceaec
--- /dev/null
+++ b/aos/events/raw-event-loop.h
@@ -0,0 +1,130 @@
+#ifndef _AOS_EVENTS_RAW_EVENT_LOOP_H_
+#define _AOS_EVENTS_RAW_EVENT_LOOP_H_
+
+#include <atomic>
+#include <memory>
+#include <string>
+#include "aos/common/queue.h"
+#include "aos/common/time.h"
+
+// This file contains raw versions of the classes in event-loop.h.
+//
+// Users should look exclusively at event-loop.h. Only people who wish to
+// implement a new IPC layer (like a fake layer for example) may wish to use
+// these classes.
+namespace aos {
+
+// Raw version of fetcher. Contains a local variable that the fetcher will
+// update.
+// It is the job of the typed version to cast this to the appropriate type.
+class RawFetcher {
+ public:
+  class FetchValue;
+  RawFetcher() {}
+  virtual ~RawFetcher() {}
+
+  virtual bool Fetch() = 0;
+
+  const FetchValue *most_recent() { return most_recent_; }
+
+ protected:
+  RawFetcher(const RawFetcher &) = delete;
+  RawFetcher &operator=(const RawFetcher &) = delete;
+  void set_most_recent(const FetchValue *most_recent) {
+    most_recent_ = most_recent;
+  }
+
+ private:
+  const FetchValue *most_recent_ = nullptr;
+};
+
+// Raw version of sender. Sending a message is a 3 part process. Fetch an opaque
+// token, cast that token to the message type, populate and then calling one of
+// Send() or Free().
+class RawSender {
+ public:
+  class SendContext;
+
+  RawSender() {}
+  virtual ~RawSender() {}
+
+  virtual SendContext *GetContext() = 0;
+
+  virtual void Free(SendContext *context) = 0;
+
+  virtual bool Send(SendContext *context) = 0;
+
+  // Call operator that calls Free().
+  template <typename T>
+  void operator()(T *t) {
+    Free(reinterpret_cast<SendContext *>(t));
+  }
+
+ protected:
+  RawSender(const RawSender &) = delete;
+  RawSender &operator=(const RawSender &) = delete;
+};
+
+// Opaque Information extracted from a particular type passed to the underlying
+// system so that it knows how much memory to allocate etc.
+struct QueueTypeInfo {
+  // Message size:
+  size_t size;
+  // This should be a globally unique identifier for the type.
+  int hash;
+  // Config parameter for how long the queue should be.
+  int queue_length;
+
+  template <typename T>
+  static QueueTypeInfo Get() {
+    QueueTypeInfo info;
+    info.size = sizeof(T);
+    info.hash = T::kHash;
+    info.queue_length = T::kQueueLength;
+    return info;
+  }
+};
+
+// Virtual base class for all event queue-types.
+class RawEventLoop {
+ public:
+  virtual ~RawEventLoop() {}
+
+  // Current time.
+  virtual monotonic_clock::time_point monotonic_now() = 0;
+
+  // The passed in function will be called when the event loop starts.
+  // Use this to run code once the thread goes into "real-time-mode",
+  virtual void OnRun(std::function<void()> on_run) = 0;
+
+  bool is_running() const { return is_running_.load(); }
+
+  // Starts receiving events.
+  virtual void Run() = 0;
+
+  // Stops receiving events.
+  virtual void Exit() = 0;
+
+ protected:
+  void set_is_running(bool value) { is_running_.store(value); }
+
+  // Will send new messages from (path, type).
+  virtual std::unique_ptr<RawSender> MakeRawSender(
+      const std::string &path, const QueueTypeInfo &type) = 0;
+
+  // Will fetch new messages from (path, type).
+  virtual std::unique_ptr<RawFetcher> MakeRawFetcher(
+      const std::string &path, const QueueTypeInfo &type) = 0;
+
+  // Will watch (path, type) for new messages
+  virtual void MakeRawWatcher(
+      const std::string &path, const QueueTypeInfo &type,
+      std::function<void(const Message *message)> watcher) = 0;
+
+ private:
+  std::atomic<bool> is_running_{false};
+};
+
+}  // namespace aos
+
+#endif  // _AOS_EVENTS_RAW_EVENT_LOOP_H_
diff --git a/aos/events/shm-event-loop.cc b/aos/events/shm-event-loop.cc
new file mode 100644
index 0000000..d227a48
--- /dev/null
+++ b/aos/events/shm-event-loop.cc
@@ -0,0 +1,203 @@
+#include "aos/events/shm-event-loop.h"
+#include "aos/common/queue.h"
+
+#include <atomic>
+#include <chrono>
+#include <stdexcept>
+
+namespace aos {
+
+ShmEventLoop::ShmEventLoop() : thread_state_(std::make_shared<ThreadState>()) {}
+
+namespace {
+class ShmFetcher : public RawFetcher {
+ public:
+  explicit ShmFetcher(RawQueue *queue) : queue_(queue) {}
+  ~ShmFetcher() {
+    if (msg_) {
+      queue_->FreeMessage(msg_);
+    }
+  }
+
+  bool Fetch() {
+    static constexpr Options<RawQueue> kOptions =
+        RawQueue::kFromEnd | RawQueue::kNonBlock;
+    const FetchValue *msg = static_cast<const FetchValue *>(
+        queue_->ReadMessageIndex(kOptions, &index_));
+    // Only update the internal pointer if we got a new message.
+    if (msg != NULL && msg != msg_) {
+      queue_->FreeMessage(msg_);
+      msg_ = msg;
+      set_most_recent(msg_);
+      return true;
+    }
+    // The message has to get freed if we didn't use it (and
+    // RawQueue::FreeMessage is ok to call on NULL).
+    queue_->FreeMessage(msg);
+    return false;
+  }
+
+ private:
+  int index_ = 0;
+  RawQueue *queue_;
+  const FetchValue *msg_ = nullptr;
+};
+
+class ShmSender : public RawSender {
+ public:
+  explicit ShmSender(RawQueue *queue) : queue_(queue) {}
+
+  SendContext *GetContext() override {
+    return reinterpret_cast<SendContext *>(queue_->GetMessage());
+  }
+
+  void Free(SendContext *context) override { queue_->FreeMessage(context); }
+
+  bool Send(SendContext *msg) override {
+    assert(queue_ != NULL);
+    return queue_->WriteMessage(msg, RawQueue::kOverride);
+  }
+
+ private:
+  RawQueue *queue_;
+};
+}  // namespace
+
+namespace internal {
+class WatcherThreadState {
+ public:
+  WatcherThreadState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
+                     RawQueue *queue,
+                     std::function<void(const aos::Message *message)> watcher)
+      : thread_state_(std::move(thread_state)),
+        queue_(queue),
+        watcher_(std::move(watcher)) {}
+
+  void Run() {
+    thread_state_->WaitForStart();
+
+    if (!thread_state_->is_running()) return;
+
+    int32_t index = 0;
+
+    static constexpr Options<RawQueue> kOptions =
+        RawQueue::kFromEnd | RawQueue::kNonBlock;
+    const void *msg = queue_->ReadMessageIndex(kOptions, &index);
+
+    while (true) {
+      if (msg == nullptr) {
+        msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index);
+        assert(msg != nullptr);
+      }
+
+      {
+        MutexLocker locker2(&thread_state_->mutex_);
+        if (!thread_state_->is_running()) break;
+
+        watcher_(reinterpret_cast<const Message *>(msg));
+        // watcher_ may have exited the event loop.
+        if (!thread_state_->is_running()) break;
+      }
+      queue_->FreeMessage(msg);
+      msg = nullptr;
+    }
+
+    queue_->FreeMessage(msg);
+  }
+
+ private:
+  std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
+  RawQueue *queue_;
+  std::function<void(const Message *message)> watcher_;
+};
+}  // namespace internal
+
+std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
+    const std::string &path, const QueueTypeInfo &type) {
+  Take(path);
+  return std::unique_ptr<RawFetcher>(new ShmFetcher(
+      RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
+}
+
+std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
+    const std::string &path, const QueueTypeInfo &type) {
+  return std::unique_ptr<RawSender>(new ShmSender(
+      RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
+}
+
+void ShmEventLoop::MakeRawWatcher(
+    const std::string &path, const QueueTypeInfo &type,
+    std::function<void(const Message *message)> watcher) {
+  Take(path);
+  auto *state = new internal::WatcherThreadState(
+      thread_state_,
+      RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length),
+      std::move(watcher));
+
+  std::thread thread([state] {
+    state->Run();
+    delete state;
+  });
+  thread.detach();
+}
+
+void ShmEventLoop::OnRun(std::function<void()> on_run) {
+  on_run_.push_back(std::move(on_run));
+}
+
+void ShmEventLoop::Run() {
+  set_is_running(true);
+  for (const auto &run : on_run_) run();
+  thread_state_->Run();
+}
+
+void ShmEventLoop::ThreadState::Run() {
+  MutexLocker locker(&mutex_);
+  loop_running_ = true;
+  if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
+  loop_running_cond_.Broadcast();
+  while (loop_running_) {
+    if (loop_running_cond_.Wait()) {
+      ::aos::Die("ShmEventLoop mutex lock problem.\n");
+    }
+  }
+}
+
+void ShmEventLoop::ThreadState::WaitForStart() {
+  MutexLocker locker(&mutex_);
+  while (!(loop_running_ || loop_finished_)) {
+    if (loop_running_cond_.Wait()) {
+      ::aos::Die("ShmEventLoop mutex lock problem.\n");
+    }
+  }
+}
+
+void ShmEventLoop::Exit() {
+  set_is_running(false);
+  thread_state_->Exit();
+}
+
+void ShmEventLoop::ThreadState::Exit() {
+  IPCRecursiveMutexLocker locker(&mutex_);
+  if (locker.owner_died()) ::aos::Die("Owner died");
+  loop_running_ = false;
+  loop_finished_ = true;
+  loop_running_cond_.Broadcast();
+}
+
+ShmEventLoop::~ShmEventLoop() {
+  if (is_running()) {
+    ::aos::Die("ShmEventLoop destroyed while running\n");
+  }
+}
+
+void ShmEventLoop::Take(const std::string &path) {
+  if (is_running()) {
+    ::aos::Die("Cannot add new objects while running.\n");
+  }
+  if (!taken_.emplace(path).second) {
+    ::aos::Die("%s already has a listener / watcher.", path.c_str());
+  }
+}
+
+}  // namespace aos
diff --git a/aos/events/shm-event-loop.h b/aos/events/shm-event-loop.h
new file mode 100644
index 0000000..9ee7926
--- /dev/null
+++ b/aos/events/shm-event-loop.h
@@ -0,0 +1,76 @@
+#include <unordered_set>
+#include <vector>
+#include "aos/common/condition.h"
+#include "aos/common/mutex.h"
+#include "aos/events/event-loop.h"
+
+namespace aos {
+namespace internal {
+
+class WatcherThreadState;
+
+}  // namespace internal
+
+// Specialization of EventLoop that is build from queues running out of shared
+// memory. See more details at aos/common/queue.h
+class ShmEventLoop : public EventLoop {
+ public:
+  ShmEventLoop();
+  ~ShmEventLoop() override;
+
+  ::aos::monotonic_clock::time_point monotonic_now() override {
+    return ::aos::monotonic_clock::now();
+  }
+
+  std::unique_ptr<RawSender> MakeRawSender(const std::string &path,
+                                           const QueueTypeInfo &type) override;
+  std::unique_ptr<RawFetcher> MakeRawFetcher(
+      const std::string &path, const QueueTypeInfo &type) override;
+
+  void MakeRawWatcher(
+      const std::string &path, const QueueTypeInfo &type,
+      std::function<void(const aos::Message *message)> watcher) override;
+
+  void OnRun(std::function<void()> on_run) override;
+  void Run() override;
+  void Exit() override;
+
+ private:
+  friend class internal::WatcherThreadState;
+  // This ThreadState ensures that two watchers in the same loop cannot be
+  // triggered concurrently.  Because watchers block threads indefinitely, this
+  // has to be shared_ptr in case the EventLoop is destroyed before the thread
+  // receives any new events.
+  class ThreadState {
+   public:
+    void WaitForStart();
+
+    bool is_running() { return loop_running_; }
+
+    void Run();
+
+    void Exit();
+
+   private:
+    friend class internal::WatcherThreadState;
+    friend class ShmEventLoop;
+
+    // This mutex ensures that only one watch event happens at a time.
+    aos::Mutex mutex_;
+    // Block on this until the loop starts.
+    aos::Condition loop_running_cond_{&mutex_};
+    // Used to notify watchers that the loop is done.
+    std::atomic<bool> loop_running_{false};
+    bool loop_finished_ = false;
+  };
+
+  // Exclude multiple of the same type for path.
+  void Take(const std::string &path);
+
+  std::vector<std::function<void()>> on_run_;
+  std::shared_ptr<ThreadState> thread_state_;
+
+  std::unordered_set<std::string> taken_;
+};
+
+}  // namespace aos
diff --git a/aos/events/shm-event-loop_test.cc b/aos/events/shm-event-loop_test.cc
new file mode 100644
index 0000000..544adba
--- /dev/null
+++ b/aos/events/shm-event-loop_test.cc
@@ -0,0 +1,27 @@
+#include "aos/events/shm-event-loop.h"
+
+#include "aos/events/event-loop_param_test.h"
+#include "aos/testing/test_shm.h"
+#include "gtest/gtest.h"
+
+namespace aos {
+namespace testing {
+namespace {
+
+class ShmEventLoopTestFactory : public EventLoopTestFactory {
+ public:
+  std::unique_ptr<EventLoop> Make() override {
+    return std::unique_ptr<EventLoop>(new ShmEventLoop());
+  }
+
+  ::aos::testing::TestSharedMemory my_shm_;
+};
+
+INSTANTIATE_TEST_CASE_P(ShmEventLoopTest, AbstractEventLoopTest,
+                        ::testing::Values([]() {
+                          return new ShmEventLoopTestFactory();
+                        }));
+
+}  // namespace
+}  // namespace testing
+}  // namespace aos