Prototype event-loop.h for queue dependency injection.

Change-Id: I1af3f71a5b0cca741641f872f55dab10474ee064
diff --git a/aos/events/shm-event-loop.h b/aos/events/shm-event-loop.h
new file mode 100644
index 0000000..9ee7926
--- /dev/null
+++ b/aos/events/shm-event-loop.h
@@ -0,0 +1,76 @@
+#include <unordered_set>
+#include <vector>
+#include "aos/common/condition.h"
+#include "aos/common/mutex.h"
+#include "aos/events/event-loop.h"
+
+namespace aos {
+namespace internal {
+
+class WatcherThreadState;
+
+}  // namespace internal
+
+// Specialization of EventLoop that is build from queues running out of shared
+// memory. See more details at aos/common/queue.h
+class ShmEventLoop : public EventLoop {
+ public:
+  ShmEventLoop();
+  ~ShmEventLoop() override;
+
+  ::aos::monotonic_clock::time_point monotonic_now() override {
+    return ::aos::monotonic_clock::now();
+  }
+
+  std::unique_ptr<RawSender> MakeRawSender(const std::string &path,
+                                           const QueueTypeInfo &type) override;
+  std::unique_ptr<RawFetcher> MakeRawFetcher(
+      const std::string &path, const QueueTypeInfo &type) override;
+
+  void MakeRawWatcher(
+      const std::string &path, const QueueTypeInfo &type,
+      std::function<void(const aos::Message *message)> watcher) override;
+
+  void OnRun(std::function<void()> on_run) override;
+  void Run() override;
+  void Exit() override;
+
+ private:
+  friend class internal::WatcherThreadState;
+  // This ThreadState ensures that two watchers in the same loop cannot be
+  // triggered concurrently.  Because watchers block threads indefinitely, this
+  // has to be shared_ptr in case the EventLoop is destroyed before the thread
+  // receives any new events.
+  class ThreadState {
+   public:
+    void WaitForStart();
+
+    bool is_running() { return loop_running_; }
+
+    void Run();
+
+    void Exit();
+
+   private:
+    friend class internal::WatcherThreadState;
+    friend class ShmEventLoop;
+
+    // This mutex ensures that only one watch event happens at a time.
+    aos::Mutex mutex_;
+    // Block on this until the loop starts.
+    aos::Condition loop_running_cond_{&mutex_};
+    // Used to notify watchers that the loop is done.
+    std::atomic<bool> loop_running_{false};
+    bool loop_finished_ = false;
+  };
+
+  // Exclude multiple of the same type for path.
+  void Take(const std::string &path);
+
+  std::vector<std::function<void()>> on_run_;
+  std::shared_ptr<ThreadState> thread_state_;
+
+  std::unordered_set<std::string> taken_;
+};
+
+}  // namespace aos