Convert aos over to flatbuffers

Everything builds, and all the tests pass.  I suspect that some entries
are missing from the config files, but those will be found pretty
quickly on startup.

There is no logging or live introspection of queue messages.

Change-Id: I496ee01ed68f202c7851bed7e8786cee30df29f5
diff --git a/aos/events/BUILD b/aos/events/BUILD
index b69fc66..17ed688 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -1,3 +1,19 @@
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+
+package(default_visibility = ["//visibility:public"])
+
+flatbuffer_cc_library(
+    name = "test_message_fbs",
+    srcs = ["test_message.fbs"],
+    gen_reflections = 1,
+)
+
+flatbuffer_cc_library(
+    name = "pingpong_fbs",
+    srcs = ["pingpong.fbs"],
+    gen_reflections = 1,
+)
+
 cc_library(
     name = "epoll",
     srcs = ["epoll.cc"],
@@ -10,72 +26,97 @@
 )
 
 cc_library(
-    name = "raw-event-loop",
-    hdrs = ["raw-event-loop.h"],
-    deps = [
-        "//aos:queues",
-        "//aos/time",
-    ],
-)
-
-cc_library(
-    name = "event-loop",
-    srcs = ["event-loop-tmpl.h"],
+    name = "event_loop",
+    srcs = ["event_loop_tmpl.h"],
     hdrs = [
-        "event-loop.h",
-        "raw-event-loop.h",
+        "event_loop.h",
     ],
     visibility = ["//visibility:public"],
     deps = [
-        ":raw-event-loop",
-        "//aos:queues",
+        "//aos:configuration",
+        "//aos:configuration_fbs",
+        "//aos:flatbuffers",
         "//aos/time",
+        "@com_github_google_flatbuffers//:flatbuffers",
+    ],
+)
+
+cc_binary(
+    name = "ping",
+    srcs = [
+        "ping.cc",
+    ],
+    data = ["config.fb.json"],
+    deps = [
+        ":pingpong_fbs",
+        ":shm_event_loop",
+        "//aos:configuration",
+        "//aos:init",
+        "//aos:json_to_flatbuffer",
+        "@com_github_google_glog//:glog",
+    ],
+)
+
+cc_binary(
+    name = "pong",
+    srcs = [
+        "pong.cc",
+    ],
+    data = ["config.fb.json"],
+    deps = [
+        ":pingpong_fbs",
+        ":shm_event_loop",
+        "//aos:configuration",
+        "//aos:init",
+        "//aos:json_to_flatbuffer",
+        "@com_github_google_glog//:glog",
     ],
 )
 
 cc_library(
-    name = "shm-event-loop",
-    srcs = ["shm-event-loop.cc"],
-    hdrs = ["shm-event-loop.h"],
+    name = "shm_event_loop",
+    srcs = ["shm_event_loop.cc"],
+    hdrs = ["shm_event_loop.h"],
     visibility = ["//visibility:public"],
     deps = [
         ":epoll",
-        ":event-loop",
-        "//aos:init",
-        "//aos:queues",
-        "//aos/logging",
+        ":event_loop",
+        ":test_message_fbs",
+        "//aos:realtime",
+        "//aos/ipc_lib:lockless_queue",
+        "//aos/ipc_lib:signalfd",
         "//aos/util:phased_loop",
     ],
 )
 
 cc_test(
-    name = "shm-event-loop_test",
-    srcs = ["shm-event-loop_test.cc"],
+    name = "shm_event_loop_test",
+    srcs = ["shm_event_loop_test.cc"],
     shard_count = 5,
     deps = [
-        ":event-loop_param_test",
-        ":shm-event-loop",
-        "//aos/testing:test_shm",
+        ":event_loop_param_test",
+        ":shm_event_loop",
+        ":test_message_fbs",
     ],
 )
 
 cc_library(
-    name = "event-loop_param_test",
+    name = "event_loop_param_test",
     testonly = True,
-    srcs = ["event-loop_param_test.cc"],
-    hdrs = ["event-loop_param_test.h"],
+    srcs = ["event_loop_param_test.cc"],
+    hdrs = ["event_loop_param_test.h"],
     deps = [
-        ":event-loop",
-        "//aos/logging:queue_logging",
+        ":event_loop",
+        ":test_message_fbs",
         "//aos/testing:googletest",
     ],
 )
 
 cc_test(
     name = "simulated_event_loop_test",
-    srcs = ["simulated-event-loop_test.cc"],
+    srcs = ["simulated_event_loop_test.cc"],
     deps = [
-        ":event-loop_param_test",
+        ":event_loop_param_test",
         ":simulated_event_loop",
         "//aos/testing:googletest",
     ],
@@ -84,14 +125,19 @@
 cc_library(
     name = "simulated_event_loop",
     testonly = True,
-    srcs = ["simulated-event-loop.cc"],
-    hdrs = ["simulated-event-loop.h"],
+    srcs = [
+        "event_scheduler.cc",
+        "simulated_event_loop.cc",
+    ],
+    hdrs = [
+        "event_scheduler.h",
+        "simulated_event_loop.h",
+    ],
     visibility = ["//visibility:public"],
     deps = [
-        ":event-loop",
-        "//aos:queues",
-        "//aos/logging",
-        "//aos/testing:test_logging",
+        ":event_loop",
+        "//aos/ipc_lib:index",
         "//aos/util:phased_loop",
+        "@com_google_absl//absl/container:btree",
     ],
 )
diff --git a/aos/events/config.fb.json b/aos/events/config.fb.json
new file mode 100644
index 0000000..4068243
--- /dev/null
+++ b/aos/events/config.fb.json
@@ -0,0 +1,12 @@
+{
+  "channels": [
+    {
+      "name": "/test",
+      "type": "aos.examples.Ping"
+    },
+    {
+      "name": "/test",
+      "type": "aos.examples.Pong"
+    }
+  ]
+}
diff --git a/aos/events/epoll.h b/aos/events/epoll.h
index f6ebe95..0f335e5 100644
--- a/aos/events/epoll.h
+++ b/aos/events/epoll.h
@@ -6,6 +6,7 @@
 #include <sys/timerfd.h>
 #include <unistd.h>
 #include <atomic>
+#include <functional>
 #include <vector>
 
 #include "aos/time/time.h"
diff --git a/aos/events/event-loop-tmpl.h b/aos/events/event-loop-tmpl.h
deleted file mode 100644
index 4c53be3..0000000
--- a/aos/events/event-loop-tmpl.h
+++ /dev/null
@@ -1,40 +0,0 @@
-#ifndef _AOS_EVENTS_EVENT_LOOP_TMPL_H_
-#define _AOS_EVENTS_EVENT_LOOP_TMPL_H_
-
-#include <type_traits>
-#include "aos/events/event-loop.h"
-
-namespace aos {
-
-// From a watch functor, this will extract the message type of the argument.
-// This is the template forward declaration, and it extracts the call operator
-// as a PTMF to be used by the following specialization.
-template <class T>
-struct watch_message_type_trait
-    : watch_message_type_trait<decltype(&T::operator())> {};
-
-// From a watch functor, this will extract the message type of the argument.
-// This is the template specialization.
-template <class ClassType, class ReturnType, class A1>
-struct watch_message_type_trait<ReturnType (ClassType::*)(A1) const> {
-  using message_type = typename std::decay<A1>::type;
-};
-
-template <typename T>
-typename Sender<T>::Message Sender<T>::MakeMessage() {
-  return Message(sender_.get());
-}
-
-template <typename Watch>
-void EventLoop::MakeWatcher(const std::string &path, Watch &&w) {
-  using T = typename watch_message_type_trait<Watch>::message_type;
-
-  return MakeRawWatcher(path, QueueTypeInfo::Get<T>(),
-                        [w](const Message *message) {
-                          w(*reinterpret_cast<const T *>(message));
-                        });
-}
-
-}  // namespace aos
-
-#endif  // _AOS_EVENTS_EVENT_LOOP_TMPL_H
diff --git a/aos/events/event-loop.h b/aos/events/event-loop.h
deleted file mode 100644
index 4d6fda5..0000000
--- a/aos/events/event-loop.h
+++ /dev/null
@@ -1,143 +0,0 @@
-#ifndef _AOS_EVENTS_EVENT_LOOP_H_
-#define _AOS_EVENTS_EVENT_LOOP_H_
-
-#include <string>
-#include "aos/queue.h"
-#include "aos/time/time.h"
-#include "aos/events/raw-event-loop.h"
-
-namespace aos {
-
-// Fetches the newest message from a queue.
-template <typename T>
-class Fetcher {
- public:
-  Fetcher() {}
-  // Fetches the next message. Returns true if it fetched a new message.  This
-  // method will only return messages sent after the Fetcher was created.
-  bool FetchNext() { return fetcher_->FetchNext(); }
-  // Fetches the most recent message. Returns true if it fetched a new message.
-  // This will return the latest message regardless of if it was sent before or
-  // after the fetcher was created.
-  bool Fetch() { return fetcher_->Fetch(); }
-
-  const T *get() const {
-    return reinterpret_cast<const T *>(fetcher_->most_recent());
-  }
-  const T &operator*() const { return *get(); }
-  const T *operator->() const { return get(); }
-
- private:
-  friend class EventLoop;
-  Fetcher(::std::unique_ptr<RawFetcher> fetcher) : fetcher_(::std::move(fetcher)) {}
-  ::std::unique_ptr<RawFetcher> fetcher_;
-};
-
-// Sends messages to a queue.
-template <typename T>
-class Sender {
- public:
-  typedef T Type;
-
-  Sender() {}
-
-  // Represents a single message about to be sent to the queue.
-  // The lifecycle goes:
-  //
-  // Message msg = sender.MakeMessage();
-  // Populate(msg.get());
-  // msg.Send();
-  //
-  // Or:
-  //
-  // Message msg = sender.MakeMessage();
-  // PopulateOrNot(msg.get());
-  class Message {
-   public:
-    Message(RawSender *sender)
-        : msg_(reinterpret_cast<T *>(sender->GetMessage()), *sender) {
-      msg_->Zero();
-    }
-
-    T *get() { return msg_.get(); }
-    const T *get() const { return msg_.get(); }
-    T &operator*() { return *get(); }
-    T *operator->() { return get(); }
-    const T &operator*() const { return *get(); }
-    const T *operator->() const { return get(); }
-
-    // Sends the message to the queue. Should only be called once.  Returns true
-    // if the message was successfully sent, and false otherwise.
-    bool Send() {
-      RawSender *sender = &msg_.get_deleter();
-      return sender->Send(msg_.release());
-    }
-
-   private:
-    std::unique_ptr<T, RawSender &> msg_;
-  };
-
-  // Constructs an above message.
-  Message MakeMessage();
-
-  // Returns the name of the underlying queue.
-  const char *name() const { return sender_->name(); }
-
- private:
-  friend class EventLoop;
-  Sender(::std::unique_ptr<RawSender> sender) : sender_(::std::move(sender)) {}
-  ::std::unique_ptr<RawSender> sender_;
-};
-
-// TODO(parker): Consider making EventLoop wrap a RawEventLoop rather than
-// inheriting.
-class EventLoop : public RawEventLoop {
- public:
-  virtual ~EventLoop() {}
-
-  // Current time.
-  virtual monotonic_clock::time_point monotonic_now() = 0;
-
-  // Note, it is supported to create:
-  //   multiple fetchers, and (one sender or one watcher) per <path, type>
-  //   tuple.
-
-  // Makes a class that will always fetch the most recent value
-  // sent to path.
-  template <typename T>
-  Fetcher<T> MakeFetcher(const ::std::string &path) {
-    return Fetcher<T>(MakeRawFetcher(path, QueueTypeInfo::Get<T>()));
-  }
-
-  // Makes class that allows constructing and sending messages to
-  // address path.
-  template <typename T>
-  Sender<T> MakeSender(const ::std::string &path) {
-    return Sender<T>(MakeRawSender(path, QueueTypeInfo::Get<T>()));
-  }
-
-  // Watch is a functor that have a call signature like so:
-  // void Event(const MessageType& type);
-  //
-  // This will watch messages sent to path.
-  // Note that T needs to match both send and recv side.
-  // TODO(parker): Need to support ::std::bind.  For now, use lambdas.
-  template <typename Watch>
-  void MakeWatcher(const ::std::string &path, Watch &&w);
-
-  // The passed in function will be called when the event loop starts.
-  // Use this to run code once the thread goes into "real-time-mode",
-  virtual void OnRun(::std::function<void()>) = 0;
-
-  // TODO(austin): OnExit
-
-  // Sets the scheduler priority to run the event loop at.  This may not be
-  // called after we go into "real-time-mode".
-  virtual void SetRuntimeRealtimePriority(int priority) = 0;
-};
-
-}  // namespace aos
-
-#include "aos/events/event-loop-tmpl.h"
-
-#endif  // _AOS_EVENTS_EVENT_LOOP_H
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
new file mode 100644
index 0000000..f614517
--- /dev/null
+++ b/aos/events/event_loop.h
@@ -0,0 +1,328 @@
+#ifndef AOS_EVENTS_EVENT_LOOP_H_
+#define AOS_EVENTS_EVENT_LOOP_H_
+
+#include <atomic>
+#include <string>
+
+#include "absl/strings/string_view.h"
+#include "aos/configuration.h"
+#include "aos/configuration_generated.h"
+#include "aos/flatbuffers.h"
+#include "aos/json_to_flatbuffer.h"
+#include "aos/time/time.h"
+#include "flatbuffers/flatbuffers.h"
+#include "glog/logging.h"
+
+namespace aos {
+
+// Struct available on Watchers and Fetchers with context about the current
+// message.
+struct Context {
+  // Time that the message was sent.
+  monotonic_clock::time_point monotonic_sent_time;
+  realtime_clock::time_point realtime_sent_time;
+  // Index in the queue.
+  uint32_t queue_index;
+  // Size of the data sent.
+  size_t size;
+  // Pointer to the data.
+  void *data;
+};
+
+// Raw version of fetcher. Contains a local variable that the fetcher will
+// update.  This is used for reflection and as an interface to implement typed
+// fetchers.
+class RawFetcher {
+ public:
+  RawFetcher() {}
+  virtual ~RawFetcher() {}
+
+  // Non-blocking fetch of the next message in the queue. Returns true if there
+  // was a new message and we got it.
+  virtual bool FetchNext() = 0;
+
+  // Non-blocking fetch of the latest message:
+  virtual bool Fetch() = 0;
+
+  // Returns a pointer to data in the most recent message, or nullptr if there
+  // is no message.
+  const void *most_recent_data() const { return data_; }
+
+  const Context &context() const { return context_; }
+
+ protected:
+  RawFetcher(const RawFetcher &) = delete;
+  RawFetcher &operator=(const RawFetcher &) = delete;
+
+  void *data_ = nullptr;
+  Context context_;
+};
+
+// Raw version of sender.  Sends a block of data.  This is used for reflection
+// and as a building block to implement typed senders.
+class RawSender {
+ public:
+  RawSender() {}
+  virtual ~RawSender() {}
+
+  // Sends a message without copying it.  The users starts by copying up to
+  // size() bytes into the data backed by data().  They then call Send to send.
+  // Returns true on a successful send.
+  virtual void *data() = 0;
+  virtual size_t size() = 0;
+  virtual bool Send(size_t size) = 0;
+
+  // Sends a single block of data by copying it.
+  virtual bool Send(void *data, size_t size) = 0;
+
+  // Returns the name of this sender.
+  virtual const absl::string_view name() const = 0;
+
+ protected:
+  RawSender(const RawSender &) = delete;
+  RawSender &operator=(const RawSender &) = delete;
+};
+
+
+// Fetches the newest message from a channel.
+// This provides a polling based interface for channels.
+template <typename T>
+class Fetcher {
+ public:
+  Fetcher() {}
+
+  // Fetches the next message. Returns true if it fetched a new message.  This
+  // method will only return messages sent after the Fetcher was created.
+  bool FetchNext() { return fetcher_->FetchNext(); }
+
+  // Fetches the most recent message. Returns true if it fetched a new message.
+  // This will return the latest message regardless of if it was sent before or
+  // after the fetcher was created.
+  bool Fetch() { return fetcher_->Fetch(); }
+
+  // Returns a pointer to the contained flatbuffer, or nullptr if there is no
+  // available message.
+  const T *get() const {
+    return fetcher_->most_recent_data() != nullptr
+               ? flatbuffers::GetRoot<T>(reinterpret_cast<const char *>(
+                     fetcher_->most_recent_data()))
+               : nullptr;
+  }
+
+  // Returns the context holding timestamps and other metadata about the
+  // message.
+  const Context &context() const { return fetcher_->context(); }
+
+  const T &operator*() const { return *get(); }
+  const T *operator->() const { return get(); }
+
+ private:
+  friend class EventLoop;
+  Fetcher(::std::unique_ptr<RawFetcher> fetcher)
+      : fetcher_(::std::move(fetcher)) {}
+  ::std::unique_ptr<RawFetcher> fetcher_;
+};
+
+// Sends messages to a channel.
+template <typename T>
+class Sender {
+ public:
+  Sender() {}
+
+  // Represents a single message about to be sent to the queue.
+  // The lifecycle goes:
+  //
+  // Builder builder = sender.MakeBuilder();
+  // T::Builder t_builder = builder.MakeBuilder<T>();
+  // Populate(&t_builder);
+  // builder.Send(t_builder.Finish());
+  class Builder {
+   public:
+    Builder(RawSender *sender, void *data, size_t size)
+        : alloc_(data, size), fbb_(size, &alloc_), sender_(sender) {
+      fbb_.ForceDefaults(1);
+    }
+
+    flatbuffers::FlatBufferBuilder *fbb() { return &fbb_; }
+
+    template <typename T2>
+    typename T2::Builder MakeBuilder() {
+      return typename T2::Builder(fbb_);
+    }
+
+    bool Send(flatbuffers::Offset<T> offset) {
+      fbb_.Finish(offset);
+      return sender_->Send(fbb_.GetSize());
+    }
+
+    // CHECKs that this message was sent.
+    void CheckSent() { fbb_.Finished(); }
+
+   private:
+    PreallocatedAllocator alloc_;
+    flatbuffers::FlatBufferBuilder fbb_;
+    RawSender *sender_;
+  };
+
+  // Constructs an above builder.
+  Builder MakeBuilder();
+
+  // Returns the name of the underlying queue.
+  const absl::string_view name() const { return sender_->name(); }
+
+ private:
+  friend class EventLoop;
+  Sender(std::unique_ptr<RawSender> sender) : sender_(std::move(sender)) {}
+  std::unique_ptr<RawSender> sender_;
+};
+
+// Interface for timers
+class TimerHandler {
+ public:
+  virtual ~TimerHandler() {}
+
+  // Timer should sleep until base, base + offset, base + offset * 2, ...
+  // If repeat_offset isn't set, the timer only expires once.
+  virtual void Setup(monotonic_clock::time_point base,
+                     monotonic_clock::duration repeat_offset =
+                         ::aos::monotonic_clock::zero()) = 0;
+
+  // Stop future calls to callback().
+  virtual void Disable() = 0;
+};
+
+// Interface for phased loops.  They are built on timers.
+class PhasedLoopHandler {
+ public:
+  virtual ~PhasedLoopHandler() {}
+
+  // Sets the interval and offset.  Any changes to interval and offset only take
+  // effect when the handler finishes running.
+  virtual void set_interval_and_offset(
+      const monotonic_clock::duration interval,
+      const monotonic_clock::duration offset) = 0;
+};
+
+// TODO(austin): Ping pong example apps, and then start doing introspection.
+// TODO(austin): Timing reporter.  Publish statistics on latencies of
+// handlers.
+class EventLoop {
+ public:
+  EventLoop(const Configuration *configuration)
+      : configuration_(configuration) {}
+
+  virtual ~EventLoop() {}
+
+  // Current time.
+  virtual monotonic_clock::time_point monotonic_now() = 0;
+  virtual realtime_clock::time_point realtime_now() = 0;
+
+  // Note, it is supported to create:
+  //   multiple fetchers, and (one sender or one watcher) per <name, type>
+  //   tuple.
+
+  // Makes a class that will always fetch the most recent value
+  // sent to the provided channel.
+  template <typename T>
+  Fetcher<T> MakeFetcher(const absl::string_view channel_name) {
+    const Channel *channel = configuration::GetChannel(
+        configuration_, channel_name, T::GetFullyQualifiedName(), name());
+    CHECK(channel != nullptr)
+        << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
+        << T::GetFullyQualifiedName() << "\" } not found in config.";
+
+    return Fetcher<T>(MakeRawFetcher(channel));
+  }
+
+  // Makes class that allows constructing and sending messages to
+  // the provided channel.
+  template <typename T>
+  Sender<T> MakeSender(const absl::string_view channel_name) {
+    const Channel *channel = configuration::GetChannel(
+        configuration_, channel_name, T::GetFullyQualifiedName(), name());
+    CHECK(channel != nullptr)
+        << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
+        << T::GetFullyQualifiedName() << "\" } not found in config.";
+
+    return Sender<T>(MakeRawSender(channel));
+  }
+
+  // This will watch messages sent to the provided channel.
+  //
+  // Watch is a functor that have a call signature like so:
+  // void Event(const MessageType& type);
+  //
+  // TODO(parker): Need to support ::std::bind.  For now, use lambdas.
+  // TODO(austin): Do we need a functor?  Or is a std::function good enough?
+  template <typename Watch>
+  void MakeWatcher(const absl::string_view name, Watch &&w);
+
+  // The passed in function will be called when the event loop starts.
+  // Use this to run code once the thread goes into "real-time-mode",
+  virtual void OnRun(::std::function<void()> on_run) = 0;
+
+  // Sets the name of the event loop.  This is the application name.
+  virtual void set_name(const absl::string_view name) = 0;
+  // Gets the name of the event loop.
+  virtual const absl::string_view name() const = 0;
+
+  // Creates a timer that executes callback when the timer expires
+  // Returns a TimerHandle for configuration of the timer
+  virtual TimerHandler *AddTimer(::std::function<void()> callback) = 0;
+
+  // Creates a timer that executes callback periodically at the specified
+  // interval and offset.  Returns a PhasedLoopHandler for interacting with the
+  // timer.
+  virtual PhasedLoopHandler *AddPhasedLoop(
+      ::std::function<void(int)> callback,
+      const monotonic_clock::duration interval,
+      const monotonic_clock::duration offset = ::std::chrono::seconds(0)) = 0;
+
+  // TODO(austin): OnExit
+
+  // Threadsafe.
+  bool is_running() const { return is_running_.load(); }
+
+  // Sets the scheduler priority to run the event loop at.  This may not be
+  // called after we go into "real-time-mode".
+  virtual void SetRuntimeRealtimePriority(int priority) = 0;
+
+  // Fetches new messages from the provided channel (path, type).  Note: this
+  // channel must be a member of the exact configuration object this was built
+  // with.
+  virtual std::unique_ptr<RawFetcher> MakeRawFetcher(
+      const Channel *channel) = 0;
+
+  // Will watch channel (name, type) for new messages
+  virtual void MakeRawWatcher(
+      const Channel *channel,
+      std::function<void(const Context &context, const void *message)>
+          watcher) = 0;
+
+  // Returns the context for the current message.
+  // TODO(austin): Fill out whatever is useful for timers.
+  const Context &context() const { return context_; }
+
+  // Returns the configuration that this event loop was built with.
+  const Configuration *configuration() const { return configuration_; }
+
+ protected:
+  void set_is_running(bool value) { is_running_.store(value); }
+
+  // Will send new messages from channel (path, type).
+  virtual std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) = 0;
+
+ private:
+  ::std::atomic<bool> is_running_{false};
+
+  // Context available for watchers.
+  Context context_;
+
+  const Configuration *configuration_;
+};
+
+}  // namespace aos
+
+#include "aos/events/event_loop_tmpl.h"
+
+#endif  // AOS_EVENTS_EVENT_LOOP_H
diff --git a/aos/events/event-loop_param_test.cc b/aos/events/event_loop_param_test.cc
similarity index 64%
rename from aos/events/event-loop_param_test.cc
rename to aos/events/event_loop_param_test.cc
index 04343cc..8a0f9b6 100644
--- a/aos/events/event-loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -1,11 +1,13 @@
-#include "aos/events/event-loop_param_test.h"
+#include "aos/events/event_loop_param_test.h"
 
 #include <chrono>
 
+#include "glog/logging.h"
 #include "gmock/gmock.h"
 #include "gtest/gtest.h"
 
 #include "glog/logging.h"
+#include "aos/events/test_message_generated.h"
 
 namespace aos {
 namespace testing {
@@ -13,39 +15,27 @@
 namespace chrono = ::std::chrono;
 }  // namespace
 
-struct TestMessage : public ::aos::Message {
-  enum { kQueueLength = 100, kHash = 0x696c0cdc };
-  int msg_value;
-
-  void Zero() {
-    ::aos::Message::Zero();
-    msg_value = 0;
-  }
-  static size_t Size() { return 1 + ::aos::Message::Size(); }
-  size_t Print(char *buffer, size_t length) const;
-  TestMessage() { Zero(); }
-};
-
 // Tests that watcher can receive messages from a sender.
 // Also tests that OnRun() works.
 TEST_P(AbstractEventLoopTest, Basic) {
   auto loop1 = Make();
   auto loop2 = MakePrimary();
 
-  auto sender = loop1->MakeSender<TestMessage>("/test");
+  aos::Sender<TestMessage> sender = loop1->MakeSender<TestMessage>("/test");
 
   bool happened = false;
 
   loop2->OnRun([&]() {
     happened = true;
 
-    auto msg = sender.MakeMessage();
-    msg->msg_value = 200;
-    msg.Send();
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(200);
+    ASSERT_TRUE(msg.Send(builder.Finish()));
   });
 
   loop2->MakeWatcher("/test", [&](const TestMessage &message) {
-    EXPECT_EQ(message.msg_value, 200);
+    EXPECT_EQ(message.value(), 200);
     this->Exit();
   });
 
@@ -67,13 +57,14 @@
 
   EXPECT_FALSE(fetcher.Fetch());
 
-  auto msg = sender.MakeMessage();
-  msg->msg_value = 200;
-  msg.Send();
+  aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+  TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+  builder.add_value(200);
+  ASSERT_TRUE(msg.Send(builder.Finish()));
 
   EXPECT_TRUE(fetcher.Fetch());
   ASSERT_FALSE(fetcher.get() == nullptr);
-  EXPECT_EQ(fetcher->msg_value, 200);
+  EXPECT_EQ(fetcher.get()->value(), 200);
 }
 
 // Tests that watcher will receive all messages sent if they are sent after
@@ -87,7 +78,7 @@
   ::std::vector<int> values;
 
   loop2->MakeWatcher("/test", [&](const TestMessage &message) {
-    values.push_back(message.msg_value);
+    values.push_back(message.value());
     if (values.size() == 2) {
       this->Exit();
     }
@@ -95,21 +86,24 @@
 
   // Before Run, should be ignored.
   {
-    auto msg = sender.MakeMessage();
-    msg->msg_value = 199;
-    msg.Send();
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(199);
+    ASSERT_TRUE(msg.Send(builder.Finish()));
   }
 
   loop2->OnRun([&]() {
     {
-      auto msg = sender.MakeMessage();
-      msg->msg_value = 200;
-      msg.Send();
+      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+      builder.add_value(200);
+      ASSERT_TRUE(msg.Send(builder.Finish()));
     }
     {
-      auto msg = sender.MakeMessage();
-      msg->msg_value = 201;
-      msg.Send();
+      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+      builder.add_value(201);
+      ASSERT_TRUE(msg.Send(builder.Finish()));
     }
   });
 
@@ -129,18 +123,20 @@
   ::std::vector<int> values;
 
   {
-    auto msg = sender.MakeMessage();
-    msg->msg_value = 200;
-    msg.Send();
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(200);
+    ASSERT_TRUE(msg.Send(builder.Finish()));
   }
   {
-    auto msg = sender.MakeMessage();
-    msg->msg_value = 201;
-    msg.Send();
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(201);
+    ASSERT_TRUE(msg.Send(builder.Finish()));
   }
 
   loop2->MakeWatcher("/test", [&](const TestMessage &message) {
-    values.push_back(message.msg_value);
+    values.push_back(message.value());
   });
 
   // Add a timer to actually quit.
@@ -164,20 +160,22 @@
   ::std::vector<int> values;
 
   {
-    auto msg = sender.MakeMessage();
-    msg->msg_value = 200;
-    msg.Send();
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(200);
+    ASSERT_TRUE(msg.Send(builder.Finish()));
   }
   {
-    auto msg = sender.MakeMessage();
-    msg->msg_value = 201;
-    msg.Send();
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(201);
+    ASSERT_TRUE(msg.Send(builder.Finish()));
   }
 
   // Add a timer to actually quit.
   auto test_timer = loop2->AddTimer([&fetcher, &values, this]() {
     while (fetcher.FetchNext()) {
-      values.push_back(fetcher->msg_value);
+      values.push_back(fetcher.get()->value());
     }
     this->Exit();
   });
@@ -200,14 +198,16 @@
   ::std::vector<int> values;
 
   {
-    auto msg = sender.MakeMessage();
-    msg->msg_value = 200;
-    msg.Send();
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(200);
+    ASSERT_TRUE(msg.Send(builder.Finish()));
   }
   {
-    auto msg = sender.MakeMessage();
-    msg->msg_value = 201;
-    msg.Send();
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(201);
+    ASSERT_TRUE(msg.Send(builder.Finish()));
   }
 
   auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
@@ -215,7 +215,7 @@
   // Add a timer to actually quit.
   auto test_timer = loop2->AddTimer([&fetcher, &values, this]() {
     while (fetcher.FetchNext()) {
-      values.push_back(fetcher->msg_value);
+      values.push_back(fetcher.get()->value());
     }
     this->Exit();
   });
@@ -239,14 +239,16 @@
   ::std::vector<int> values;
 
   {
-    auto msg = sender.MakeMessage();
-    msg->msg_value = 200;
-    msg.Send();
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(200);
+    ASSERT_TRUE(msg.Send(builder.Finish()));
   }
   {
-    auto msg = sender.MakeMessage();
-    msg->msg_value = 201;
-    msg.Send();
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(201);
+    ASSERT_TRUE(msg.Send(builder.Finish()));
   }
 
   auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
@@ -254,11 +256,11 @@
   // Add a timer to actually quit.
   auto test_timer = loop2->AddTimer([&fetcher, &values, this]() {
     if (fetcher.Fetch()) {
-      values.push_back(fetcher->msg_value);
+      values.push_back(fetcher.get()->value());
     }
     // Do it again to make sure we don't double fetch.
     if (fetcher.Fetch()) {
-      values.push_back(fetcher->msg_value);
+      values.push_back(fetcher.get()->value());
     }
     this->Exit();
   });
@@ -281,14 +283,16 @@
   ::std::vector<int> values;
 
   {
-    auto msg = sender.MakeMessage();
-    msg->msg_value = 200;
-    msg.Send();
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(200);
+    ASSERT_TRUE(msg.Send(builder.Finish()));
   }
   {
-    auto msg = sender.MakeMessage();
-    msg->msg_value = 201;
-    msg.Send();
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(201);
+    ASSERT_TRUE(msg.Send(builder.Finish()));
   }
 
   auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
@@ -296,31 +300,34 @@
   // Add a timer to actually quit.
   auto test_timer = loop2->AddTimer([&fetcher, &values, &sender, this]() {
     if (fetcher.Fetch()) {
-      values.push_back(fetcher->msg_value);
+      values.push_back(fetcher.get()->value());
     }
 
     {
-      auto msg = sender.MakeMessage();
-      msg->msg_value = 202;
-      msg.Send();
+      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+      builder.add_value(202);
+      ASSERT_TRUE(msg.Send(builder.Finish()));
     }
     {
-      auto msg = sender.MakeMessage();
-      msg->msg_value = 203;
-      msg.Send();
+      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+      builder.add_value(203);
+      ASSERT_TRUE(msg.Send(builder.Finish()));
     }
     {
-      auto msg = sender.MakeMessage();
-      msg->msg_value = 204;
-      msg.Send();
+      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+      builder.add_value(204);
+      ASSERT_TRUE(msg.Send(builder.Finish()));
     }
 
     if (fetcher.FetchNext()) {
-      values.push_back(fetcher->msg_value);
+      values.push_back(fetcher.get()->value());
     }
 
     if (fetcher.Fetch()) {
-      values.push_back(fetcher->msg_value);
+      values.push_back(fetcher.get()->value());
     }
 
     this->Exit();
@@ -345,29 +352,31 @@
   Fetcher<TestMessage> fetcher = fetch_loop->MakeFetcher<TestMessage>("/test");
 
   {
-    auto msg = sender.MakeMessage();
-    msg->msg_value = 100;
-    ASSERT_TRUE(msg.Send());
+      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+      builder.add_value(100);
+      ASSERT_TRUE(msg.Send(builder.Finish()));
   }
 
   {
-    auto msg = sender.MakeMessage();
-    msg->msg_value = 200;
-    ASSERT_TRUE(msg.Send());
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(200);
+    ASSERT_TRUE(msg.Send(builder.Finish()));
   }
 
   ASSERT_TRUE(fetcher.FetchNext());
   ASSERT_NE(nullptr, fetcher.get());
-  EXPECT_EQ(100, fetcher->msg_value);
+  EXPECT_EQ(100, fetcher.get()->value());
 
   ASSERT_TRUE(fetcher.FetchNext());
   ASSERT_NE(nullptr, fetcher.get());
-  EXPECT_EQ(200, fetcher->msg_value);
+  EXPECT_EQ(200, fetcher.get()->value());
 
   // When we run off the end of the queue, expect to still have the old message:
   ASSERT_FALSE(fetcher.FetchNext());
   ASSERT_NE(nullptr, fetcher.get());
-  EXPECT_EQ(200, fetcher->msg_value);
+  EXPECT_EQ(200, fetcher.get()->value());
 }
 
 // Verify that making a fetcher and watcher for "/test" succeeds.
@@ -384,6 +393,14 @@
   auto fetcher2 = loop->MakeFetcher<TestMessage>("/test");
 }
 
+// Verify that registering a watcher for an invalid channel name dies.
+TEST_P(AbstractEventLoopDeathTest, InvalidChannelName) {
+  auto loop = Make();
+  EXPECT_DEATH(
+      { loop->MakeWatcher("/test/invalid", [&](const TestMessage &) {}); },
+      "/test/invalid");
+}
+
 // Verify that registering a watcher twice for "/test" fails.
 TEST_P(AbstractEventLoopDeathTest, TwoWatcher) {
   auto loop = Make();
@@ -438,16 +455,17 @@
 
   loop2->MakeWatcher("/test1", [&](const TestMessage &) {});
   loop2->MakeWatcher("/test2", [&](const TestMessage &message) {
-    EXPECT_EQ(message.msg_value, 200);
+    EXPECT_EQ(message.value(), 200);
     this->Exit();
   });
 
   auto sender = loop1->MakeSender<TestMessage>("/test2");
 
   loop2->OnRun([&]() {
-    auto msg = sender.MakeMessage();
-    msg->msg_value = 200;
-    msg.Send();
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(200);
+    ASSERT_TRUE(msg.Send(builder.Finish()));
   });
 
   Run();
@@ -525,9 +543,18 @@
   auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
 
   auto test_timer = loop1->AddTimer([&sender]() {
-    auto msg = sender.MakeMessage();
-    msg->msg_value = 200;
-    msg.Send();
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(200);
+    ASSERT_TRUE(msg.Send(builder.Finish()));
+  });
+
+  loop2->MakeWatcher("/test", [&loop2](const TestMessage &msg) {
+    // Confirm that the data pointer makes sense from a watcher.
+    EXPECT_GT(&msg, loop2->context().data);
+    EXPECT_LT(&msg, reinterpret_cast<void *>(
+                        reinterpret_cast<char *>(loop2->context().data) +
+                        loop2->context().size));
   });
 
   test_timer->Setup(loop1->monotonic_now() + ::std::chrono::seconds(1));
@@ -537,15 +564,36 @@
 
   EXPECT_TRUE(fetcher.Fetch());
 
-  monotonic_clock::duration time_offset =
-      fetcher->sent_time - (loop1->monotonic_now() - ::std::chrono::seconds(1));
+  monotonic_clock::duration monotonic_time_offset =
+      fetcher.context().monotonic_sent_time -
+      (loop1->monotonic_now() - ::std::chrono::seconds(1));
+  realtime_clock::duration realtime_time_offset =
+      fetcher.context().realtime_sent_time -
+      (loop1->realtime_now() - ::std::chrono::seconds(1));
 
-  EXPECT_TRUE(time_offset > ::std::chrono::milliseconds(-500))
-      << ": Got " << fetcher->sent_time.time_since_epoch().count()
+  EXPECT_TRUE(monotonic_time_offset > ::std::chrono::milliseconds(-500))
+      << ": Got "
+      << fetcher.context().monotonic_sent_time.time_since_epoch().count()
       << " expected " << loop1->monotonic_now().time_since_epoch().count();
-  EXPECT_TRUE(time_offset < ::std::chrono::milliseconds(500))
-      << ": Got " << fetcher->sent_time.time_since_epoch().count()
+  // Confirm that the data pointer makes sense.
+  EXPECT_GT(fetcher.get(), fetcher.context().data);
+  EXPECT_LT(fetcher.get(),
+            reinterpret_cast<void *>(
+                reinterpret_cast<char *>(fetcher.context().data) +
+                fetcher.context().size));
+  EXPECT_TRUE(monotonic_time_offset < ::std::chrono::milliseconds(500))
+      << ": Got "
+      << fetcher.context().monotonic_sent_time.time_since_epoch().count()
       << " expected " << loop1->monotonic_now().time_since_epoch().count();
+
+  EXPECT_TRUE(realtime_time_offset > ::std::chrono::milliseconds(-500))
+      << ": Got "
+      << fetcher.context().realtime_sent_time.time_since_epoch().count()
+      << " expected " << loop1->realtime_now().time_since_epoch().count();
+  EXPECT_TRUE(realtime_time_offset < ::std::chrono::milliseconds(500))
+      << ": Got "
+      << fetcher.context().realtime_sent_time.time_since_epoch().count()
+      << " expected " << loop1->realtime_now().time_since_epoch().count();
 }
 
 // Tests that a couple phased loops run in a row result in the correct offset
@@ -612,41 +660,5 @@
               1.0, 0.1);
 }
 
-// Verify that sending lots and lots of messages and using FetchNext gets a
-// contiguous block of messages and doesn't crash.
-// TODO(austin): We should store the same number of messages in simulation and
-// reality.
-TEST_P(AbstractEventLoopTest, LotsOfSends) {
-  auto loop1 = MakePrimary();
-  auto loop2 = Make();
-  auto sender = loop1->MakeSender<TestMessage>("/test");
-  auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
-
-  auto test_timer = loop1->AddTimer([&sender, &fetcher, this]() {
-    for (int i = 0; i < 100000; ++i) {
-      auto msg = sender.MakeMessage();
-      msg->msg_value = i;
-      msg.Send();
-    }
-
-    int last = 0;
-    if (fetcher.FetchNext()) {
-      last = fetcher->msg_value;
-    }
-    while (fetcher.FetchNext()) {
-      EXPECT_EQ(last + 1, fetcher->msg_value);
-      ++last;
-    }
-
-    this->Exit();
-  });
-
-  loop1->OnRun([&test_timer, &loop1]() {
-    test_timer->Setup(loop1->monotonic_now() + ::std::chrono::milliseconds(10));
-  });
-
-  Run();
-}
-
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/events/event-loop_param_test.h b/aos/events/event_loop_param_test.h
similarity index 61%
rename from aos/events/event-loop_param_test.h
rename to aos/events/event_loop_param_test.h
index 83f0b37..fb08ac3 100644
--- a/aos/events/event-loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -3,7 +3,9 @@
 
 #include <vector>
 
-#include "aos/events/event-loop.h"
+#include "aos/events/event_loop.h"
+#include "aos/flatbuffers.h"
+#include "aos/json_to_flatbuffer.h"
 #include "gtest/gtest.h"
 
 namespace aos {
@@ -11,6 +13,25 @@
 
 class EventLoopTestFactory {
  public:
+  EventLoopTestFactory()
+      : flatbuffer_(JsonToFlatbuffer("{\n"
+                                     "  \"channels\": [ \n"
+                                     "    {\n"
+                                     "      \"name\": \"/test\",\n"
+                                     "      \"type\": \"aos.TestMessage\"\n"
+                                     "    },\n"
+                                     "    {\n"
+                                     "      \"name\": \"/test1\",\n"
+                                     "      \"type\": \"aos.TestMessage\"\n"
+                                     "    },\n"
+                                     "    {\n"
+                                     "      \"name\": \"/test2\",\n"
+                                     "      \"type\": \"aos.TestMessage\"\n"
+                                     "    }\n"
+                                     "  ]\n"
+                                     "}\n",
+                                     Configuration::MiniReflectTypeTable())) {}
+
   virtual ~EventLoopTestFactory() {}
 
   // Makes a connected event loop.
@@ -27,6 +48,11 @@
 
   // Advances time by sleeping.  Can't be called from inside a loop.
   virtual void SleepFor(::std::chrono::nanoseconds duration) = 0;
+
+  const Configuration *configuration() { return &flatbuffer_.message(); }
+
+ private:
+  FlatbufferDetachedBuffer<Configuration> flatbuffer_;
 };
 
 class AbstractEventLoopTestBase
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
new file mode 100644
index 0000000..9910d4d
--- /dev/null
+++ b/aos/events/event_loop_tmpl.h
@@ -0,0 +1,48 @@
+#ifndef AOS_EVENTS_EVENT_LOOP_TMPL_H_
+#define AOS_EVENTS_EVENT_LOOP_TMPL_H_
+
+#include <type_traits>
+#include "aos/events/event_loop.h"
+#include "glog/logging.h"
+
+namespace aos {
+
+// From a watch functor, this will extract the message type of the argument.
+// This is the template forward declaration, and it extracts the call operator
+// as a PTMF to be used by the following specialization.
+template <class T>
+struct watch_message_type_trait
+    : watch_message_type_trait<decltype(&T::operator())> {};
+
+// From a watch functor, this will extract the message type of the argument.
+// This is the template specialization.
+template <class ClassType, class ReturnType, class A1>
+struct watch_message_type_trait<ReturnType (ClassType::*)(A1) const> {
+  using message_type = typename std::decay<A1>::type;
+};
+
+template <typename T>
+typename Sender<T>::Builder Sender<T>::MakeBuilder() {
+  return Builder(sender_.get(), sender_->data(), sender_->size());
+}
+
+template <typename Watch>
+void EventLoop::MakeWatcher(const absl::string_view channel_name, Watch &&w) {
+  using T = typename watch_message_type_trait<Watch>::message_type;
+  const Channel *channel = configuration::GetChannel(
+      configuration_, channel_name, T::GetFullyQualifiedName(), name());
+
+  CHECK(channel != nullptr)
+      << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
+      << T::GetFullyQualifiedName() << "\" } not found in config.";
+
+  return MakeRawWatcher(
+      channel, [this, w](const Context &context, const void *message) {
+        context_ = context;
+        w(*flatbuffers::GetRoot<T>(reinterpret_cast<const char *>(message)));
+      });
+}
+
+}  // namespace aos
+
+#endif  // AOS_EVENTS_EVENT_LOOP_TMPL_H
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
new file mode 100644
index 0000000..a4cfa72
--- /dev/null
+++ b/aos/events/event_scheduler.cc
@@ -0,0 +1,48 @@
+#include "aos/events/event_scheduler.h"
+
+#include <algorithm>
+#include <deque>
+
+#include "aos/events/event_loop.h"
+
+namespace aos {
+
+EventScheduler::Token EventScheduler::Schedule(
+    ::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
+  return events_list_.emplace(time, callback);
+}
+
+void EventScheduler::Deschedule(EventScheduler::Token token) {
+  events_list_.erase(token);
+}
+
+void EventScheduler::RunFor(monotonic_clock::duration duration) {
+  const ::aos::monotonic_clock::time_point end_time =
+      monotonic_now() + duration;
+  is_running_ = true;
+  while (!events_list_.empty() && is_running_) {
+    auto iter = events_list_.begin();
+    ::aos::monotonic_clock::time_point next_time = iter->first;
+    if (next_time > end_time) {
+      break;
+    }
+    now_ = iter->first;
+    ::std::function<void()> callback = ::std::move(iter->second);
+    events_list_.erase(iter);
+    callback();
+  }
+  now_ = end_time;
+}
+
+void EventScheduler::Run() {
+  is_running_ = true;
+  while (!events_list_.empty() && is_running_) {
+    auto iter = events_list_.begin();
+    now_ = iter->first;
+    ::std::function<void()> callback = ::std::move(iter->second);
+    events_list_.erase(iter);
+    callback();
+  }
+}
+
+}  // namespace aos
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
new file mode 100644
index 0000000..66d34b3
--- /dev/null
+++ b/aos/events/event_scheduler.h
@@ -0,0 +1,61 @@
+#ifndef AOS_EVENTS_EVENT_SCHEDULER_H_
+#define AOS_EVENTS_EVENT_SCHEDULER_H_
+
+#include <algorithm>
+#include <map>
+#include <memory>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "aos/events/event_loop.h"
+#include "aos/time/time.h"
+#include "glog/logging.h"
+
+namespace aos {
+
+class EventScheduler {
+ public:
+  using ChannelType =
+      std::multimap<monotonic_clock::time_point, std::function<void()>>;
+  using Token = ChannelType::iterator;
+
+  // Schedule an event with a callback function
+  // Returns an iterator to the event
+  Token Schedule(monotonic_clock::time_point time,
+                 std::function<void()> callback);
+
+  Token InvalidToken() { return events_list_.end(); }
+
+  // Deschedule an event by its iterator
+  void Deschedule(Token token);
+
+  // Runs until exited.
+  void Run();
+  // Runs for a duration.
+  void RunFor(monotonic_clock::duration duration);
+
+  void Exit() { is_running_ = false; }
+
+  bool is_running() const { return is_running_; }
+
+  monotonic_clock::time_point monotonic_now() const { return now_; }
+  realtime_clock::time_point realtime_now() const {
+    // TODO(austin): Make this all configurable...
+    return realtime_clock::epoch() + now_.time_since_epoch() +
+           std::chrono::seconds(1000000);
+  }
+
+ private:
+  // Current execution time.
+  monotonic_clock::time_point now_ = monotonic_clock::epoch();
+
+  // Multimap holding times to run functions.  These are stored in order, and
+  // the order is the callback tree.
+  ChannelType events_list_;
+  bool is_running_ = false;
+};
+
+}  // namespace aos
+
+#endif  // AOS_EVENTS_EVENT_SCHEDULER_H_
diff --git a/aos/events/ping.cc b/aos/events/ping.cc
new file mode 100644
index 0000000..b904cb2
--- /dev/null
+++ b/aos/events/ping.cc
@@ -0,0 +1,88 @@
+#include <chrono>
+
+#include "aos/configuration.h"
+#include "aos/events/pingpong_generated.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/init.h"
+#include "aos/json_to_flatbuffer.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+DEFINE_int32(sleep_ms, 10, "Time to sleep between pings");
+
+namespace aos {
+
+namespace chrono = std::chrono;
+
+class Ping {
+ public:
+  Ping(EventLoop *event_loop)
+      : event_loop_(event_loop),
+        sender_(event_loop_->MakeSender<examples::Ping>("/test")) {
+    timer_handle_ = event_loop_->AddTimer([this]() { SendPing(); });
+
+    event_loop_->MakeWatcher(
+        "/test", [this](const examples::Pong &pong) { HandlePong(pong); });
+
+    event_loop_->OnRun([this]() {
+      timer_handle_->Setup(event_loop_->monotonic_now(),
+                           chrono::milliseconds(FLAGS_sleep_ms));
+    });
+
+    event_loop_->SetRuntimeRealtimePriority(5);
+  }
+
+  void SendPing() {
+    ++count_;
+    aos::Sender<examples::Ping>::Builder msg = sender_.MakeBuilder();
+    examples::Ping::Builder builder = msg.MakeBuilder<examples::Ping>();
+    builder.add_value(count_);
+    builder.add_send_time(
+        event_loop_->monotonic_now().time_since_epoch().count());
+    CHECK(msg.Send(builder.Finish()));
+    VLOG(2) << "Sending ping";
+  }
+
+  void HandlePong(const examples::Pong &pong) {
+    const aos::monotonic_clock::time_point monotonic_send_time(
+        chrono::nanoseconds(pong.initial_send_time()));
+    const aos::monotonic_clock::time_point monotonic_now =
+        event_loop_->monotonic_now();
+
+    const chrono::nanoseconds round_trip_time =
+        monotonic_now - monotonic_send_time;
+
+    if (pong.value() == count_) {
+      VLOG(1) << "Elapsed time " << round_trip_time.count() << " ns "
+              << FlatbufferToJson(&pong);
+    } else {
+      VLOG(1) << "Missmatched pong message";
+    }
+  }
+
+ private:
+  EventLoop *event_loop_;
+  aos::Sender<examples::Ping> sender_;
+  TimerHandler *timer_handle_;
+  int count_ = 0;
+};
+
+}  // namespace aos
+
+int main(int argc, char **argv) {
+  FLAGS_logtostderr = true;
+  google::InitGoogleLogging(argv[0]);
+  ::gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig("aos/events/config.fb.json");
+
+  ::aos::ShmEventLoop event_loop(&config.message());
+
+  aos::Ping ping(&event_loop);
+
+  event_loop.Run();
+
+  ::aos::Cleanup();
+  return 0;
+}
diff --git a/aos/events/pingpong.fbs b/aos/events/pingpong.fbs
new file mode 100644
index 0000000..67e5015
--- /dev/null
+++ b/aos/events/pingpong.fbs
@@ -0,0 +1,14 @@
+namespace aos.examples;
+
+table Ping {
+  value:int;
+  send_time:long;
+}
+
+table Pong {
+  value:int;
+  initial_send_time:long;
+}
+
+root_type Ping;
+root_type Pong;
diff --git a/aos/events/pong.cc b/aos/events/pong.cc
new file mode 100644
index 0000000..1504bfb
--- /dev/null
+++ b/aos/events/pong.cc
@@ -0,0 +1,54 @@
+#include <chrono>
+
+#include "aos/configuration.h"
+#include "aos/events/pingpong_generated.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/init.h"
+#include "aos/json_to_flatbuffer.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace aos {
+
+namespace chrono = std::chrono;
+
+class Pong {
+ public:
+  Pong(EventLoop *event_loop)
+      : event_loop_(event_loop),
+        sender_(event_loop_->MakeSender<examples::Pong>("/test")) {
+    event_loop_->MakeWatcher("/test", [this](const examples::Ping &ping) {
+      aos::Sender<examples::Pong>::Builder msg = sender_.MakeBuilder();
+      examples::Pong::Builder builder = msg.MakeBuilder<examples::Pong>();
+      builder.add_value(ping.value());
+      builder.add_initial_send_time(ping.send_time());
+      CHECK(msg.Send(builder.Finish()));
+    });
+
+    event_loop_->SetRuntimeRealtimePriority(5);
+  }
+
+ private:
+  EventLoop *event_loop_;
+  aos::Sender<examples::Pong> sender_;
+};
+
+}  // namespace aos
+
+int main(int argc, char **argv) {
+  FLAGS_logtostderr = true;
+  google::InitGoogleLogging(argv[0]);
+  ::gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig("aos/events/config.fb.json");
+
+  ::aos::ShmEventLoop event_loop(&config.message());
+
+  aos::Pong ping(&event_loop);
+
+  event_loop.Run();
+
+  ::aos::Cleanup();
+  return 0;
+}
diff --git a/aos/events/raw-event-loop.h b/aos/events/raw-event-loop.h
deleted file mode 100644
index 887c7ab..0000000
--- a/aos/events/raw-event-loop.h
+++ /dev/null
@@ -1,179 +0,0 @@
-#ifndef _AOS_EVENTS_RAW_EVENT_LOOP_H_
-#define _AOS_EVENTS_RAW_EVENT_LOOP_H_
-
-#include <atomic>
-#include <memory>
-#include <string>
-#include "aos/queue.h"
-#include "aos/time/time.h"
-
-// This file contains raw versions of the classes in event-loop.h.
-//
-// Users should look exclusively at event-loop.h. Only people who wish to
-// implement a new IPC layer (like a fake layer for example) may wish to use
-// these classes.
-namespace aos {
-
-// Raw version of fetcher. Contains a local variable that the fetcher will
-// update.
-// It is the job of the typed version to cast this to the appropriate type.
-class RawFetcher {
- public:
-  RawFetcher() {}
-  virtual ~RawFetcher() {}
-
-  // Non-blocking fetch of the next message in the queue. Returns true if there
-  // was a new message and we got it.
-  virtual bool FetchNext() = 0;
-  // Non-blocking fetch of the latest message:
-  virtual bool Fetch() = 0;
-
-  const void *most_recent() { return most_recent_; }
-
- protected:
-  RawFetcher(const RawFetcher &) = delete;
-  RawFetcher &operator=(const RawFetcher &) = delete;
-  void set_most_recent(const void *most_recent) { most_recent_ = most_recent; }
-
- private:
-  const void *most_recent_ = nullptr;
-};
-
-// Raw version of sender. Sending a message is a 3 part process. Fetch an opaque
-// token, cast that token to the message type, populate and then calling one of
-// Send() or Free().
-class RawSender {
- public:
-  RawSender() {}
-  virtual ~RawSender() {}
-
-  virtual aos::Message *GetMessage() = 0;
-
-  virtual void Free(aos::Message *msg) = 0;
-
-  virtual bool Send(aos::Message *msg) = 0;
-
-  // Call operator that calls Free().
-  template <typename T>
-  void operator()(T *t) {
-    Free(t);
-  }
-
-  virtual const char *name() const = 0;
-
- protected:
-  RawSender(const RawSender &) = delete;
-  RawSender &operator=(const RawSender &) = delete;
-};
-
-// Opaque Information extracted from a particular type passed to the underlying
-// system so that it knows how much memory to allocate etc.
-struct QueueTypeInfo {
-  // Message size:
-  size_t size;
-  // This should be a globally unique identifier for the type.
-  int hash;
-  // Config parameter for how long the queue should be.
-  int queue_length;
-
-  template <typename T>
-  static QueueTypeInfo Get() {
-    QueueTypeInfo info;
-    info.size = sizeof(T);
-    info.hash = T::kHash;
-    info.queue_length = T::kQueueLength;
-    return info;
-  }
-
-  // Necessary for the comparison of QueueTypeInfo objects in the
-  // SimulatedEventLoop.
-  bool operator<(const QueueTypeInfo &other) const {
-    if (size != other.size) return size < other.size;
-    if (hash != other.hash) return hash < other.hash;
-    return queue_length < other.queue_length;
-  }
-};
-
-// Interface for timers
-class TimerHandler {
- public:
-  virtual ~TimerHandler() {}
-
-  // Timer should sleep until base, base + offset, base + offset * 2, ...
-  // If repeat_offset isn't set, the timer only expires once.
-  virtual void Setup(monotonic_clock::time_point base,
-                     monotonic_clock::duration repeat_offset =
-                         ::aos::monotonic_clock::zero()) = 0;
-
-  // Stop future calls to callback().
-  virtual void Disable() = 0;
-};
-
-// Interface for phased loops.  They are built on timers.
-class PhasedLoopHandler {
- public:
-  virtual ~PhasedLoopHandler() {}
-
-  // Sets the interval and offset.  Any changes to interval and offset only take
-  // effect when the handler finishes running.
-  virtual void set_interval_and_offset(
-      const monotonic_clock::duration interval,
-      const monotonic_clock::duration offset) = 0;
-};
-
-class EventScheduler;
-
-// Virtual base class for all event queue-types.
-class RawEventLoop {
- public:
-  virtual ~RawEventLoop() {}
-
-  // Current time.
-  virtual monotonic_clock::time_point monotonic_now() = 0;
-
-  // The passed in function will be called when the event loop starts.
-  // Use this to run code once the thread goes into "real-time-mode",
-  virtual void OnRun(::std::function<void()> on_run) = 0;
-
-  // Sets the name of the event loop.
-  virtual void set_name(const char *name) = 0;
-
-  // Threadsafe.
-  bool is_running() const { return is_running_.load(); }
-
-  // Creates a timer that executes callback when the timer expires
-  // Returns a TimerHandle for configuration of the timer
-  virtual TimerHandler *AddTimer(::std::function<void()> callback) = 0;
-
-  // Creates a timer that executes callback periodically at the specified
-  // interval and offset.  Returns a PhasedLoopHandler for interacting with the
-  // timer.
-  virtual PhasedLoopHandler *AddPhasedLoop(
-      ::std::function<void(int)> callback,
-      const monotonic_clock::duration interval,
-      const monotonic_clock::duration offset = ::std::chrono::seconds(0)) = 0;
-
- protected:
-  friend class EventScheduler;
-  void set_is_running(bool value) { is_running_.store(value); }
-
-  // Will send new messages from (path, type).
-  virtual std::unique_ptr<RawSender> MakeRawSender(
-      const std::string &path, const QueueTypeInfo &type) = 0;
-
-  // Will fetch new messages from (path, type).
-  virtual std::unique_ptr<RawFetcher> MakeRawFetcher(
-      const std::string &path, const QueueTypeInfo &type) = 0;
-
-  // Will watch (path, type) for new messages
-  virtual void MakeRawWatcher(
-      const std::string &path, const QueueTypeInfo &type,
-      std::function<void(const Message *message)> watcher) = 0;
-
- private:
-  ::std::atomic<bool> is_running_{false};
-};
-
-}  // namespace aos
-
-#endif  // _AOS_EVENTS_RAW_EVENT_LOOP_H_
diff --git a/aos/events/shm-event-loop.cc b/aos/events/shm-event-loop.cc
deleted file mode 100644
index 5034f0b..0000000
--- a/aos/events/shm-event-loop.cc
+++ /dev/null
@@ -1,507 +0,0 @@
-#include "aos/events/shm-event-loop.h"
-
-#include <sys/timerfd.h>
-#include <algorithm>
-#include <atomic>
-#include <chrono>
-#include <stdexcept>
-
-#include "aos/events/epoll.h"
-#include "aos/init.h"
-#include "aos/logging/logging.h"
-#include "aos/queue.h"
-#include "aos/util/phased_loop.h"
-
-namespace aos {
-
-ShmEventLoop::ShmEventLoop() {}
-
-namespace {
-
-namespace chrono = ::std::chrono;
-
-class ShmFetcher : public RawFetcher {
- public:
-  explicit ShmFetcher(RawQueue *queue) : queue_(queue) {
-    // Move index_ to point to the end of the queue as it is at construction
-    // time.  Also grab the oldest message but don't expose it to the user yet.
-    static constexpr Options<RawQueue> kOptions =
-        RawQueue::kFromEnd | RawQueue::kNonBlock;
-    msg_ = queue_->ReadMessageIndex(kOptions, &index_);
-  }
-  ~ShmFetcher() {
-    if (msg_) {
-      queue_->FreeMessage(msg_);
-    }
-  }
-
-  bool FetchNext() override {
-    const void *msg = queue_->ReadMessageIndex(RawQueue::kNonBlock, &index_);
-    // Only update the internal pointer if we got a new message.
-    if (msg != nullptr) {
-      queue_->FreeMessage(msg_);
-      msg_ = msg;
-      set_most_recent(msg_);
-    }
-    return msg != nullptr;
-  }
-
-  bool Fetch() override {
-    static constexpr Options<RawQueue> kOptions =
-        RawQueue::kFromEnd | RawQueue::kNonBlock;
-    const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
-    // Only update the internal pointer if we got a new message.
-    if (msg != nullptr && msg != msg_) {
-      queue_->FreeMessage(msg_);
-      msg_ = msg;
-      set_most_recent(msg_);
-      return true;
-    } else {
-      // The message has to get freed if we didn't use it (and
-      // RawQueue::FreeMessage is ok to call on nullptr).
-      queue_->FreeMessage(msg);
-
-      // We have a message from construction time.  Give it to the user now.
-      if (msg_ != nullptr && most_recent() != msg_) {
-        set_most_recent(msg_);
-        return true;
-      } else {
-        return false;
-      }
-    }
-  }
-
- private:
-  int index_ = 0;
-  RawQueue *queue_;
-  const void *msg_ = nullptr;
-};
-
-class ShmSender : public RawSender {
- public:
-  explicit ShmSender(RawQueue *queue) : queue_(queue) {}
-
-  ::aos::Message *GetMessage() override {
-    return reinterpret_cast<::aos::Message *>(queue_->GetMessage());
-  }
-
-  void Free(::aos::Message *msg) override { queue_->FreeMessage(msg); }
-
-  bool Send(::aos::Message *msg) override {
-    assert(queue_ != nullptr);
-    {
-      // TODO(austin): This lets multiple senders reorder messages since time
-      // isn't acquired with a lock held.
-      if (msg->sent_time == monotonic_clock::min_time) {
-        msg->sent_time = monotonic_clock::now();
-      }
-    }
-    return queue_->WriteMessage(msg, RawQueue::kOverride);
-  }
-
-  const char *name() const override { return queue_->name(); }
-
- private:
-  RawQueue *queue_;
-};
-
-}  // namespace
-
-namespace internal {
-
-// Class to manage the state for a Watcher.
-class WatcherThreadState {
- public:
-  WatcherThreadState(
-      ShmEventLoop::ThreadState *thread_state, RawQueue *queue,
-      ::std::function<void(const ::aos::Message *message)> watcher)
-      : thread_state_(thread_state),
-        queue_(queue),
-        index_(0),
-        watcher_(::std::move(watcher)) {}
-
-  ~WatcherThreadState() {
-    // Only kill the thread if it is running.
-    if (running_) {
-      // TODO(austin): CHECK that we aren't RT here.
-
-      // Try joining.  If we fail, we weren't asleep on the condition in the
-      // queue.  So hit it again and again until that's true.
-      struct timespec end_time;
-      AOS_PCHECK(clock_gettime(CLOCK_REALTIME, &end_time) == 0);
-      while (true) {
-        void *retval = nullptr;
-        end_time.tv_nsec += 100000000;
-        if (end_time.tv_nsec > 1000000000L) {
-          end_time.tv_nsec -= 1000000000L;
-          ++end_time.tv_sec;
-        }
-        int ret = pthread_timedjoin_np(pthread_, &retval, &end_time);
-        if (ret == ETIMEDOUT) continue;
-        AOS_PCHECK(ret == 0);
-        break;
-      }
-    }
-  }
-
-  // Starts the thread and waits until it is running.
-  void Start() {
-    AOS_PCHECK(pthread_create(&pthread_, nullptr, &StaticRun, this) == 0);
-    IPCRecursiveMutexLocker locker(&thread_started_mutex_);
-    if (locker.owner_died()) ::aos::Die("Owner died");
-    while (!running_) {
-      AOS_CHECK(!thread_started_condition_.Wait());
-    }
-  }
-
-  void GrabQueueIndex() {
-    // Right after we are signaled to start, point index to the current index
-    // so we don't read any messages received before now.  Otherwise we will
-    // get a significantly delayed read.
-    static constexpr Options<RawQueue> kOptions =
-        RawQueue::kFromEnd | RawQueue::kNonBlock;
-    const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
-    if (msg) {
-      queue_->FreeMessage(msg);
-    }
-  }
-
- private:
-  // Runs Run given a WatcherThreadState as the argument.  This is an adapter
-  // between pthreads and Run.
-  static void *StaticRun(void *arg) {
-    WatcherThreadState *watcher_thread_state =
-        reinterpret_cast<WatcherThreadState *>(arg);
-    watcher_thread_state->Run();
-    return nullptr;
-  }
-
-  // Runs the watcher callback on new messages.
-  void Run() {
-    ::aos::SetCurrentThreadName(thread_state_->name() + ".watcher");
-
-    // Signal the main thread that we are now ready.
-    thread_state_->MaybeSetCurrentThreadRealtimePriority();
-    {
-      IPCRecursiveMutexLocker locker(&thread_started_mutex_);
-      if (locker.owner_died()) ::aos::Die("Owner died");
-      running_ = true;
-      thread_started_condition_.Broadcast();
-    }
-
-    // Wait for the global start before handling events.
-    thread_state_->WaitForStart();
-
-    // Bail immediately if we are supposed to stop.
-    if (!thread_state_->is_running()) {
-      ::aos::UnsetCurrentThreadRealtimePriority();
-      return;
-    }
-
-    const void *msg = nullptr;
-    while (true) {
-      msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_,
-                                     chrono::seconds(1));
-      // We hit a timeout.  Confirm that we should be running and retry.  Note,
-      // is_running is threadsafe (it's an atomic underneath).  Worst case, we
-      // check again in a second.
-      if (msg == nullptr) {
-        if (!thread_state_->is_running()) break;
-        continue;
-      }
-
-      {
-        // Grab the lock so that only one callback can be called at a time.
-        MutexLocker locker(&thread_state_->mutex_);
-        if (!thread_state_->is_running()) break;
-
-        watcher_(reinterpret_cast<const Message *>(msg));
-        // watcher_ may have exited the event loop.
-        if (!thread_state_->is_running()) break;
-      }
-      // Drop the reference.
-      queue_->FreeMessage(msg);
-    }
-
-    // And drop the last reference.
-    queue_->FreeMessage(msg);
-    // Now that everything is cleaned up, drop RT priority before destroying the
-    // thread.
-    ::aos::UnsetCurrentThreadRealtimePriority();
-  }
-  pthread_t pthread_;
-  ShmEventLoop::ThreadState *thread_state_;
-  RawQueue *queue_;
-  int32_t index_;
-  bool running_ = false;
-
-  ::std::function<void(const Message *message)> watcher_;
-
-  // Mutex and condition variable used to wait until the thread is started
-  // before going RT.
-  ::aos::Mutex thread_started_mutex_;
-  ::aos::Condition thread_started_condition_{&thread_started_mutex_};
-};
-
-// Adapter class to adapt a timerfd to a TimerHandler.
-// The part of the API which is accessed by the TimerHandler interface needs to
-// be threadsafe.  This means Setup and Disable.
-class TimerHandlerState : public TimerHandler {
- public:
-  TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
-      : shm_event_loop_(shm_event_loop), fn_(::std::move(fn)) {
-    shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
-      MutexLocker locker(&shm_event_loop_->thread_state_.mutex_);
-      timerfd_.Read();
-      fn_();
-    });
-  }
-
-  ~TimerHandlerState() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
-
-  void Setup(monotonic_clock::time_point base,
-             monotonic_clock::duration repeat_offset) override {
-    // SetTime is threadsafe already.
-    timerfd_.SetTime(base, repeat_offset);
-  }
-
-  void Disable() override {
-    // Disable is also threadsafe already.
-    timerfd_.Disable();
-  }
-
- private:
-  ShmEventLoop *shm_event_loop_;
-
-  TimerFd timerfd_;
-
-  // Function to be run on the thread
-  ::std::function<void()> fn_;
-};
-
-// Adapter class to the timerfd and PhasedLoop.
-// The part of the API which is accessed by the PhasedLoopHandler interface
-// needs to be threadsafe.  This means set_interval_and_offset
-class PhasedLoopHandler : public ::aos::PhasedLoopHandler {
- public:
-  PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
-                    const monotonic_clock::duration interval,
-                    const monotonic_clock::duration offset)
-      : shm_event_loop_(shm_event_loop),
-        phased_loop_(interval, shm_event_loop_->monotonic_now(), offset),
-        fn_(::std::move(fn)) {
-    shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
-      MutexLocker locker(&shm_event_loop_->thread_state_.mutex_);
-      {
-        MutexLocker locker(&mutex_);
-        timerfd_.Read();
-      }
-      // Call the function.  To avoid needing a recursive mutex, drop the lock
-      // before running the function.
-      fn_(cycles_elapsed_);
-      {
-        MutexLocker locker(&mutex_);
-        Reschedule();
-      }
-    });
-  }
-
-  ~PhasedLoopHandler() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
-
-  void set_interval_and_offset(
-      const monotonic_clock::duration interval,
-      const monotonic_clock::duration offset) override {
-    MutexLocker locker(&mutex_);
-    phased_loop_.set_interval_and_offset(interval, offset);
-  }
-
-  void Startup() {
-    MutexLocker locker(&mutex_);
-    phased_loop_.Reset(shm_event_loop_->monotonic_now());
-    Reschedule();
-  }
-
- private:
-  // Reschedules the timer.  Must be called with the mutex held.
-  void Reschedule() {
-    cycles_elapsed_ = phased_loop_.Iterate(shm_event_loop_->monotonic_now());
-    timerfd_.SetTime(phased_loop_.sleep_time(), ::aos::monotonic_clock::zero());
-  }
-
-  ShmEventLoop *shm_event_loop_;
-
-  // Mutex to protect access to the timerfd_ (not strictly necessary), and the
-  // phased_loop (necessary).
-  ::aos::Mutex mutex_;
-
-  TimerFd timerfd_;
-  time::PhasedLoop phased_loop_;
-
-  int cycles_elapsed_ = 1;
-
-  // Function to be run
-  const ::std::function<void(int)> fn_;
-};
-}  // namespace internal
-
-::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
-    const ::std::string &path, const QueueTypeInfo &type) {
-  return ::std::unique_ptr<RawFetcher>(new ShmFetcher(
-      RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
-}
-
-::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
-    const ::std::string &path, const QueueTypeInfo &type) {
-  Take(path);
-  return ::std::unique_ptr<RawSender>(new ShmSender(
-      RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
-}
-
-void ShmEventLoop::MakeRawWatcher(
-    const ::std::string &path, const QueueTypeInfo &type,
-    ::std::function<void(const Message *message)> watcher) {
-  Take(path);
-  ::std::unique_ptr<internal::WatcherThreadState> state(
-      new internal::WatcherThreadState(
-          &thread_state_, RawQueue::Fetch(path.c_str(), type.size, type.hash,
-                                          type.queue_length),
-          std::move(watcher)));
-  watchers_.push_back(::std::move(state));
-}
-
-TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
-  ::std::unique_ptr<internal::TimerHandlerState> timer(
-      new internal::TimerHandlerState(this, ::std::move(callback)));
-
-  timers_.push_back(::std::move(timer));
-
-  return timers_.back().get();
-}
-
-PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
-    ::std::function<void(int)> callback,
-    const monotonic_clock::duration interval,
-    const monotonic_clock::duration offset) {
-  ::std::unique_ptr<internal::PhasedLoopHandler> phased_loop(
-      new internal::PhasedLoopHandler(this, ::std::move(callback), interval,
-                                      offset));
-
-  phased_loops_.push_back(::std::move(phased_loop));
-
-  return phased_loops_.back().get();
-}
-
-void ShmEventLoop::OnRun(::std::function<void()> on_run) {
-  on_run_.push_back(::std::move(on_run));
-}
-
-void ShmEventLoop::set_name(const char *name) { thread_state_.name_ = name; }
-
-void ShmEventLoop::Run() {
-  // Start all the watcher threads.
-  for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
-    watcher->Start();
-  }
-
-  ::aos::SetCurrentThreadName(thread_state_.name());
-
-  // Now, all the threads are up.  Lock everything into memory and go RT.
-  if (thread_state_.priority_ != -1) {
-    ::aos::InitRT();
-  }
-  thread_state_.MaybeSetCurrentThreadRealtimePriority();
-  set_is_running(true);
-
-  // Now that we are realtime (but before the OnRun handlers run), snap the
-  // queue index.
-  for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
-    watcher->GrabQueueIndex();
-  }
-
-  // Now that we are RT, run all the OnRun handlers.
-  for (const auto &run : on_run_) {
-    run();
-  }
-
-  // Start up all the phased loops.
-  for (::std::unique_ptr<internal::PhasedLoopHandler> &phased_loop :
-       phased_loops_) {
-    phased_loop->Startup();
-  }
-  // TODO(austin): We don't need a separate watcher thread if there are only
-  // watchers and fetchers.  Could lazily create the epoll loop and pick a
-  // victim watcher to run in this thread.
-  // Trigger all the threads to start now.
-  thread_state_.Start();
-
-  // And start our main event loop which runs all the timers and handles Quit.
-  epoll_.Run();
-
-  // Once epoll exits, there is no useful nonrt work left to do.
-  set_is_running(false);
-
-  // Signal all the watcher threads to exit.  After this point, no more
-  // callbacks will be handled.
-  thread_state_.Exit();
-
-  // Nothing time or synchronization critical needs to happen after this point.
-  // Drop RT priority.
-  ::aos::UnsetCurrentThreadRealtimePriority();
-
-  // The watcher threads get cleaned up in the destructor.
-}
-
-void ShmEventLoop::ThreadState::Start() {
-  MutexLocker locker(&mutex_);
-  loop_running_ = true;
-  if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
-  loop_running_cond_.Broadcast();
-}
-
-void ShmEventLoop::ThreadState::WaitForStart() {
-  MutexLocker locker(&mutex_);
-  while (!(loop_running_ || loop_finished_)) {
-    Condition::WaitResult wait_result =
-        loop_running_cond_.WaitTimed(chrono::milliseconds(1000));
-    if (wait_result == Condition::WaitResult::kOwnerDied) {
-      ::aos::Die("ShmEventLoop mutex lock problem.\n");
-    }
-  }
-}
-
-void ShmEventLoop::ThreadState::MaybeSetCurrentThreadRealtimePriority() {
-  if (priority_ != -1) {
-    ::aos::SetCurrentThreadRealtimePriority(priority_);
-  }
-}
-
-void ShmEventLoop::Exit() { epoll_.Quit(); }
-
-void ShmEventLoop::ThreadState::Exit() {
-  IPCRecursiveMutexLocker locker(&mutex_);
-  if (locker.owner_died()) ::aos::Die("Owner died");
-  loop_running_ = false;
-  loop_finished_ = true;
-  loop_running_cond_.Broadcast();
-}
-
-ShmEventLoop::~ShmEventLoop() {
-  if (is_running()) {
-    ::aos::Die("ShmEventLoop destroyed while running\n");
-  }
-}
-
-void ShmEventLoop::Take(const ::std::string &path) {
-  if (is_running()) {
-    ::aos::Die("Cannot add new objects while running.\n");
-  }
-
-  const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
-  if (prior != taken_.end()) {
-    ::aos::Die("%s is already being used.", path.c_str());
-  } else {
-    taken_.emplace_back(path);
-  }
-}
-
-}  // namespace aos
diff --git a/aos/events/shm-event-loop.h b/aos/events/shm-event-loop.h
deleted file mode 100644
index 5db8319..0000000
--- a/aos/events/shm-event-loop.h
+++ /dev/null
@@ -1,124 +0,0 @@
-#ifndef AOS_EVENTS_SHM_EVENT_LOOP_H_
-#define AOS_EVENTS_SHM_EVENT_LOOP_H_
-
-#include <unordered_set>
-#include <vector>
-
-#include "aos/condition.h"
-#include "aos/events/epoll.h"
-#include "aos/events/event-loop.h"
-#include "aos/mutex/mutex.h"
-
-namespace aos {
-namespace internal {
-
-class WatcherThreadState;
-class TimerHandlerState;
-class PhasedLoopHandler;
-
-}  // namespace internal
-
-// Specialization of EventLoop that is built from queues running out of shared
-// memory. See more details at aos/queue.h
-//
-// This object must be interacted with from one thread, but the Senders and
-// Fetchers may be used from multiple threads afterwords (as long as their
-// destructors are called back in one thread again)
-class ShmEventLoop : public EventLoop {
- public:
-  ShmEventLoop();
-  ~ShmEventLoop() override;
-
-  ::aos::monotonic_clock::time_point monotonic_now() override {
-    return ::aos::monotonic_clock::now();
-  }
-
-  ::std::unique_ptr<RawSender> MakeRawSender(
-      const ::std::string &path, const QueueTypeInfo &type) override;
-  ::std::unique_ptr<RawFetcher> MakeRawFetcher(
-      const ::std::string &path, const QueueTypeInfo &type) override;
-
-  void MakeRawWatcher(
-      const ::std::string &path, const QueueTypeInfo &type,
-      ::std::function<void(const aos::Message *message)> watcher) override;
-
-  TimerHandler *AddTimer(::std::function<void()> callback) override;
-  ::aos::PhasedLoopHandler *AddPhasedLoop(
-      ::std::function<void(int)> callback,
-      const monotonic_clock::duration interval,
-      const monotonic_clock::duration offset =
-          ::std::chrono::seconds(0)) override;
-
-  void OnRun(::std::function<void()> on_run) override;
-  void Run();
-  void Exit();
-
-  // TODO(austin): Add a function to register control-C call.
-
-  void SetRuntimeRealtimePriority(int priority) override {
-    if (is_running()) {
-      ::aos::Die("Cannot set realtime priority while running.");
-    }
-    thread_state_.priority_ = priority;
-  }
-
-  void set_name(const char *name) override;
-
- private:
-  friend class internal::WatcherThreadState;
-  friend class internal::TimerHandlerState;
-  friend class internal::PhasedLoopHandler;
-  // This ThreadState ensures that two watchers in the same loop cannot be
-  // triggered concurrently.  Because watchers block threads indefinitely, this
-  // has to be shared_ptr in case the EventLoop is destroyed before the thread
-  // receives any new events.
-  class ThreadState {
-   public:
-    void WaitForStart();
-
-    bool is_running() { return loop_running_; }
-
-    void Start();
-
-    void Exit();
-
-    void MaybeSetCurrentThreadRealtimePriority();
-
-    const ::std::string &name() const { return name_; }
-
-   private:
-    friend class internal::WatcherThreadState;
-    friend class internal::TimerHandlerState;
-    friend class internal::PhasedLoopHandler;
-    friend class ShmEventLoop;
-
-    // This mutex ensures that only one watch event happens at a time.
-    ::aos::Mutex mutex_;
-    // Block on this until the loop starts.
-    ::aos::Condition loop_running_cond_{&mutex_};
-    // Used to notify watchers that the loop is done.
-    ::std::atomic<bool> loop_running_{false};
-    bool loop_finished_ = false;
-    int priority_ = -1;
-
-    // Immutable after Start is called.
-    ::std::string name_;
-  };
-
-  // Tracks that we can't have multiple watchers or a sender and a watcher (or
-  // multiple senders) on a single queue (path).
-  void Take(const ::std::string &path);
-
-  ::std::vector<::std::function<void()>> on_run_;
-  ThreadState thread_state_;
-  ::std::vector<::std::string> taken_;
-  internal::EPoll epoll_;
-
-  ::std::vector<::std::unique_ptr<internal::TimerHandlerState>> timers_;
-  ::std::vector<::std::unique_ptr<internal::PhasedLoopHandler>> phased_loops_;
-  ::std::vector<::std::unique_ptr<internal::WatcherThreadState>> watchers_;
-};
-
-}  // namespace aos
-
-#endif  // AOS_EVENTS_SHM_EVENT_LOOP_H_
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
new file mode 100644
index 0000000..5a88717
--- /dev/null
+++ b/aos/events/shm_event_loop.cc
@@ -0,0 +1,611 @@
+#include "glog/logging.h"
+
+#include "aos/events/shm_event_loop.h"
+
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <sys/timerfd.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <algorithm>
+#include <atomic>
+#include <chrono>
+#include <stdexcept>
+
+#include "aos/events/epoll.h"
+#include "aos/ipc_lib/lockless_queue.h"
+#include "aos/realtime.h"
+#include "aos/util/phased_loop.h"
+
+DEFINE_string(shm_base, "/dev/shm/aos",
+              "Directory to place queue backing mmaped files in.");
+DEFINE_uint32(permissions, 0770,
+              "Permissions to make shared memory files and folders.");
+
+namespace aos {
+
+std::string ShmFolder(const Channel *channel) {
+  CHECK(channel->has_name());
+  CHECK_EQ(channel->name()->string_view()[0], '/');
+  return FLAGS_shm_base + channel->name()->str() + "/";
+}
+std::string ShmPath(const Channel *channel) {
+  CHECK(channel->has_type());
+  return ShmFolder(channel) + channel->type()->str() + ".v0";
+}
+
+class MMapedQueue {
+ public:
+  MMapedQueue(const Channel *channel) {
+    std::string path = ShmPath(channel);
+
+    // TODO(austin): Pull these out into the config if there is a need.
+    config_.num_watchers = 10;
+    config_.num_senders = 10;
+    config_.queue_size = 2 * channel->frequency();
+    config_.message_data_size = channel->max_size();
+
+    size_ = ipc_lib::LocklessQueueMemorySize(config_);
+
+    MkdirP(path);
+
+    // There are 2 cases.  Either the file already exists, or it does not
+    // already exist and we need to create it.  Start by trying to create it. If
+    // that fails, the file has already been created and we can open it
+    // normally..  Once the file has been created it wil never be deleted.
+    fd_ = open(path.c_str(), O_RDWR | O_CREAT | O_EXCL,
+               O_CLOEXEC | FLAGS_permissions);
+    if (fd_ == -1 && errno == EEXIST) {
+      VLOG(1) << path << " already created.";
+      // File already exists.
+      fd_ = open(path.c_str(), O_RDWR, O_CLOEXEC);
+      PCHECK(fd_ != -1) << ": Failed to open " << path;
+      while (true) {
+        struct stat st;
+        PCHECK(fstat(fd_, &st) == 0);
+        if (st.st_size != 0) {
+          CHECK_EQ(static_cast<size_t>(st.st_size), size_)
+              << ": Size of " << path
+              << " doesn't match expected size of backing queue file.  Did the "
+                 "queue definition change?";
+          break;
+        } else {
+          // The creating process didn't get around to it yet.  Give it a bit.
+          std::this_thread::sleep_for(std::chrono::milliseconds(10));
+          VLOG(1) << path << " is zero size, waiting";
+        }
+      }
+    } else {
+      VLOG(1) << "Created " << path;
+      PCHECK(fd_ != -1) << ": Failed to open " << path;
+      PCHECK(ftruncate(fd_, size_) == 0);
+    }
+
+    data_ = mmap(NULL, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
+    PCHECK(data_ != MAP_FAILED);
+
+    ipc_lib::InitializeLocklessQueueMemory(memory(), config_);
+  }
+
+  ~MMapedQueue() {
+    PCHECK(munmap(data_, size_) == 0);
+    PCHECK(close(fd_) == 0);
+  }
+
+  ipc_lib::LocklessQueueMemory *memory() const {
+    return reinterpret_cast<ipc_lib::LocklessQueueMemory *>(data_);
+  }
+
+  const ipc_lib::LocklessQueueConfiguration &config() const {
+    return config_;
+  }
+
+ private:
+  void MkdirP(absl::string_view path) {
+    struct stat st;
+    auto last_slash_pos = path.find_last_of("/");
+
+    std::string folder(last_slash_pos == absl::string_view::npos
+                           ? absl::string_view("")
+                           : path.substr(0, last_slash_pos));
+    if (stat(folder.c_str(), &st) == -1) {
+      PCHECK(errno == ENOENT);
+      CHECK_NE(folder, "") << ": Base path doesn't exist";
+      MkdirP(folder);
+      VLOG(1) << "Creating " << folder;
+      PCHECK(mkdir(folder.c_str(), FLAGS_permissions) == 0);
+    }
+  }
+
+  ipc_lib::LocklessQueueConfiguration config_;
+
+  int fd_;
+
+  size_t size_;
+  void *data_;
+};
+
+// Returns the portion of the path after the last /.
+absl::string_view Filename(absl::string_view path) {
+  auto last_slash_pos = path.find_last_of("/");
+
+  return last_slash_pos == absl::string_view::npos
+             ? path
+             : path.substr(last_slash_pos + 1, path.size());
+}
+
+ShmEventLoop::ShmEventLoop(const Configuration *configuration)
+    : EventLoop(configuration), name_(Filename(program_invocation_name)) {}
+
+namespace {
+
+namespace chrono = ::std::chrono;
+
+class ShmFetcher : public RawFetcher {
+ public:
+  explicit ShmFetcher(const Channel *channel)
+      : lockless_queue_memory_(channel),
+        lockless_queue_(lockless_queue_memory_.memory(),
+                        lockless_queue_memory_.config()),
+        data_storage_(static_cast<AlignedChar *>(aligned_alloc(
+                          alignof(AlignedChar), channel->max_size())),
+                      &free) {
+    context_.data = nullptr;
+    // Point the queue index at the next index to read starting now.  This
+    // makes it such that FetchNext will read the next message sent after
+    // the fetcher is created.
+    PointAtNextQueueIndex();
+  }
+
+  ~ShmFetcher() { data_ = nullptr; }
+
+  // Points the next message to fetch at the queue index which will be
+  // populated next.
+  void PointAtNextQueueIndex() {
+    actual_queue_index_ = lockless_queue_.LatestQueueIndex();
+    if (!actual_queue_index_.valid()) {
+      // Nothing in the queue.  The next element will show up at the 0th
+      // index in the queue.
+      actual_queue_index_ =
+          ipc_lib::QueueIndex::Zero(lockless_queue_.queue_size());
+    } else {
+      actual_queue_index_ = actual_queue_index_.Increment();
+    }
+  }
+
+  bool FetchNext() override {
+    // TODO(austin): Write a test which starts with nothing in the queue,
+    // and then calls FetchNext() after something is sent.
+    // TODO(austin): Get behind and make sure it dies both here and with
+    // Fetch.
+    ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
+        actual_queue_index_.index(), &context_.monotonic_sent_time,
+        &context_.realtime_sent_time, &context_.size,
+        reinterpret_cast<char *>(data_storage_.get()));
+    if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
+      context_.queue_index = actual_queue_index_.index();
+      data_ = reinterpret_cast<char *>(data_storage_.get()) +
+              lockless_queue_.message_data_size() - context_.size;
+      context_.data = data_;
+      actual_queue_index_ = actual_queue_index_.Increment();
+    }
+
+    // Make sure the data wasn't modified while we were reading it.  This
+    // can only happen if you are reading the last message *while* it is
+    // being written to, which means you are pretty far behind.
+    CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
+        << ": Got behind while reading and the last message was modified "
+           "out "
+           "from under us while we were reading it.  Don't get so far "
+           "behind.";
+
+    CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
+        << ": The next message is no longer available.";
+    return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
+  }
+
+  bool Fetch() override {
+    const ipc_lib::QueueIndex queue_index = lockless_queue_.LatestQueueIndex();
+    // actual_queue_index_ is only meaningful if it was set by Fetch or
+    // FetchNext.  This happens when valid_data_ has been set.  So, only
+    // skip checking if valid_data_ is true.
+    //
+    // Also, if the latest queue index is invalid, we are empty.  So there
+    // is nothing to fetch.
+    if ((data_ != nullptr &&
+         queue_index == actual_queue_index_.DecrementBy(1u)) ||
+        !queue_index.valid()) {
+      return false;
+    }
+
+    ipc_lib::LocklessQueue::ReadResult read_result =
+        lockless_queue_.Read(queue_index.index(), &context_.monotonic_sent_time,
+                             &context_.realtime_sent_time, &context_.size,
+                             reinterpret_cast<char *>(data_storage_.get()));
+    if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
+      context_.queue_index = queue_index.index();
+      data_ = reinterpret_cast<char *>(data_storage_.get()) +
+              lockless_queue_.message_data_size() - context_.size;
+      context_.data = data_;
+      actual_queue_index_ = queue_index.Increment();
+    }
+
+    // Make sure the data wasn't modified while we were reading it.  This
+    // can only happen if you are reading the last message *while* it is
+    // being written to, which means you are pretty far behind.
+    CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
+        << ": Got behind while reading and the last message was modified "
+           "out "
+           "from under us while we were reading it.  Don't get so far "
+           "behind.";
+
+    CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW)
+        << ": Queue index went backwards.  This should never happen.";
+
+    // We fell behind between when we read the index and read the value.
+    // This isn't worth recovering from since this means we went to sleep
+    // for a long time in the middle of this function.
+    CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
+        << ": The next message is no longer available.";
+    return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
+  }
+
+  bool RegisterWakeup(int priority) {
+    return lockless_queue_.RegisterWakeup(priority);
+  }
+
+  void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
+
+ private:
+  MMapedQueue lockless_queue_memory_;
+  ipc_lib::LocklessQueue lockless_queue_;
+
+  ipc_lib::QueueIndex actual_queue_index_ =
+      ipc_lib::LocklessQueue::empty_queue_index();
+
+  struct AlignedChar {
+    alignas(32) char data;
+  };
+
+  std::unique_ptr<AlignedChar, decltype(&free)> data_storage_;
+};
+
+class ShmSender : public RawSender {
+ public:
+  explicit ShmSender(const Channel *channel, const ShmEventLoop *shm_event_loop)
+      : RawSender(),
+        shm_event_loop_(shm_event_loop),
+        name_(channel->name()->str()),
+        lockless_queue_memory_(channel),
+        lockless_queue_(lockless_queue_memory_.memory(),
+                        lockless_queue_memory_.config()),
+        lockless_queue_sender_(lockless_queue_.MakeSender()) {}
+
+  void *data() override { return lockless_queue_sender_.Data(); }
+  size_t size() override { return lockless_queue_sender_.size(); }
+  bool Send(size_t size) override {
+    lockless_queue_sender_.Send(size);
+    lockless_queue_.Wakeup(shm_event_loop_->priority());
+    return true;
+  }
+
+  bool Send(void *msg, size_t length) override {
+    lockless_queue_sender_.Send(reinterpret_cast<char *>(msg), length);
+    lockless_queue_.Wakeup(shm_event_loop_->priority());
+    // TODO(austin): Return an error if we send too fast.
+    return true;
+  }
+
+  const absl::string_view name() const override { return name_; }
+
+ private:
+  const ShmEventLoop *shm_event_loop_;
+  std::string name_;
+  MMapedQueue lockless_queue_memory_;
+  ipc_lib::LocklessQueue lockless_queue_;
+  ipc_lib::LocklessQueue::Sender lockless_queue_sender_;
+};
+
+}  // namespace
+
+namespace internal {
+
+// Class to manage the state for a Watcher.
+class WatcherState {
+ public:
+  WatcherState(
+      const Channel *channel,
+      std::function<void(const Context &context, const void *message)> watcher)
+      : shm_fetcher_(channel), watcher_(watcher) {}
+
+  ~WatcherState() {}
+
+  // Points the next message to fetch at the queue index which will be populated
+  // next.
+  void PointAtNextQueueIndex() { shm_fetcher_.PointAtNextQueueIndex(); }
+
+  // Returns true if there is new data available.
+  bool HasNewData() {
+    if (!has_new_data_) {
+      has_new_data_ = shm_fetcher_.FetchNext();
+    }
+
+    return has_new_data_;
+  }
+
+  // Returns the time of the current data sample.
+  aos::monotonic_clock::time_point event_time() const {
+    return shm_fetcher_.context().monotonic_sent_time;
+  }
+
+  // Consumes the data by calling the callback.
+  void CallCallback() {
+    CHECK(has_new_data_);
+    watcher_(shm_fetcher_.context(), shm_fetcher_.most_recent_data());
+    has_new_data_ = false;
+  }
+
+  // Starts the thread and waits until it is running.
+  bool RegisterWakeup(int priority) {
+    return shm_fetcher_.RegisterWakeup(priority);
+  }
+
+  void UnregisterWakeup() { return shm_fetcher_.UnregisterWakeup(); }
+
+ private:
+  bool has_new_data_ = false;
+
+  ShmFetcher shm_fetcher_;
+
+  std::function<void(const Context &context, const void *message)> watcher_;
+};
+
+// Adapter class to adapt a timerfd to a TimerHandler.
+// The part of the API which is accessed by the TimerHandler interface needs to
+// be threadsafe.  This means Setup and Disable.
+class TimerHandlerState : public TimerHandler {
+ public:
+  TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
+      : shm_event_loop_(shm_event_loop), fn_(::std::move(fn)) {
+    shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
+      timerfd_.Read();
+      fn_();
+    });
+  }
+
+  ~TimerHandlerState() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
+
+  void Setup(monotonic_clock::time_point base,
+             monotonic_clock::duration repeat_offset) override {
+    // SetTime is threadsafe already.
+    timerfd_.SetTime(base, repeat_offset);
+  }
+
+  void Disable() override {
+    // Disable is also threadsafe already.
+    timerfd_.Disable();
+  }
+
+ private:
+  ShmEventLoop *shm_event_loop_;
+
+  TimerFd timerfd_;
+
+  // Function to be run on the thread
+  ::std::function<void()> fn_;
+};
+
+// Adapter class to the timerfd and PhasedLoop.
+// The part of the API which is accessed by the PhasedLoopHandler interface
+// needs to be threadsafe.  This means set_interval_and_offset
+class PhasedLoopHandler : public ::aos::PhasedLoopHandler {
+ public:
+  PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
+                    const monotonic_clock::duration interval,
+                    const monotonic_clock::duration offset)
+      : shm_event_loop_(shm_event_loop),
+        phased_loop_(interval, shm_event_loop_->monotonic_now(), offset),
+        fn_(::std::move(fn)) {
+    shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
+      timerfd_.Read();
+      // Call the function.  To avoid needing a recursive mutex, drop the lock
+      // before running the function.
+      fn_(cycles_elapsed_);
+      Reschedule();
+    });
+  }
+
+  ~PhasedLoopHandler() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
+
+  void set_interval_and_offset(
+      const monotonic_clock::duration interval,
+      const monotonic_clock::duration offset) override {
+    phased_loop_.set_interval_and_offset(interval, offset);
+  }
+
+  void Startup() {
+    phased_loop_.Reset(shm_event_loop_->monotonic_now());
+    Reschedule();
+  }
+
+ private:
+  // Reschedules the timer.  Must be called with the mutex held.
+  void Reschedule() {
+    cycles_elapsed_ = phased_loop_.Iterate(shm_event_loop_->monotonic_now());
+    timerfd_.SetTime(phased_loop_.sleep_time(), ::aos::monotonic_clock::zero());
+  }
+
+  ShmEventLoop *shm_event_loop_;
+
+  TimerFd timerfd_;
+  time::PhasedLoop phased_loop_;
+
+  int cycles_elapsed_ = 1;
+
+  // Function to be run
+  const ::std::function<void(int)> fn_;
+};
+}  // namespace internal
+
+::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
+    const Channel *channel) {
+  return ::std::unique_ptr<RawFetcher>(new ShmFetcher(channel));
+}
+
+::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
+    const Channel *channel) {
+  Take(channel);
+  return ::std::unique_ptr<RawSender>(new ShmSender(channel, this));
+}
+
+void ShmEventLoop::MakeRawWatcher(
+    const Channel *channel,
+    std::function<void(const Context &context, const void *message)> watcher) {
+  Take(channel);
+
+  ::std::unique_ptr<internal::WatcherState> state(
+      new internal::WatcherState(
+      channel, std::move(watcher)));
+  watchers_.push_back(::std::move(state));
+}
+
+TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
+  ::std::unique_ptr<internal::TimerHandlerState> timer(
+      new internal::TimerHandlerState(this, ::std::move(callback)));
+
+  timers_.push_back(::std::move(timer));
+
+  return timers_.back().get();
+}
+
+PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
+    ::std::function<void(int)> callback,
+    const monotonic_clock::duration interval,
+    const monotonic_clock::duration offset) {
+  ::std::unique_ptr<internal::PhasedLoopHandler> phased_loop(
+      new internal::PhasedLoopHandler(this, ::std::move(callback), interval,
+                                      offset));
+
+  phased_loops_.push_back(::std::move(phased_loop));
+
+  return phased_loops_.back().get();
+}
+
+void ShmEventLoop::OnRun(::std::function<void()> on_run) {
+  on_run_.push_back(::std::move(on_run));
+}
+
+void ShmEventLoop::Run() {
+  std::unique_ptr<ipc_lib::SignalFd> signalfd;
+
+  if (watchers_.size() > 0) {
+    signalfd.reset(new ipc_lib::SignalFd({ipc_lib::kWakeupSignal}));
+
+    epoll_.OnReadable(signalfd->fd(), [signalfd_ptr = signalfd.get(), this]() {
+      signalfd_siginfo result = signalfd_ptr->Read();
+      CHECK_EQ(result.ssi_signo, ipc_lib::kWakeupSignal);
+
+      // TODO(austin): We should really be checking *everything*, not just
+      // watchers, and calling the oldest thing first.  That will improve
+      // determinism a lot.
+
+      while (true) {
+        // Call the handlers in time order of their messages.
+        aos::monotonic_clock::time_point min_event_time =
+            aos::monotonic_clock::max_time;
+        size_t min_watcher_index = -1;
+        size_t watcher_index = 0;
+        for (::std::unique_ptr<internal::WatcherState> &watcher : watchers_) {
+          if (watcher->HasNewData()) {
+            if (watcher->event_time() < min_event_time) {
+              min_watcher_index = watcher_index;
+              min_event_time = watcher->event_time();
+            }
+          }
+          ++watcher_index;
+        }
+
+        if (min_event_time == aos::monotonic_clock::max_time) {
+          break;
+        }
+
+        watchers_[min_watcher_index]->CallCallback();
+      }
+    });
+  }
+
+  // Now, all the threads are up.  Lock everything into memory and go RT.
+  if (priority_ != 0) {
+    ::aos::InitRT();
+
+    LOG(INFO) << "Setting priority to " << priority_;
+    ::aos::SetCurrentThreadRealtimePriority(priority_);
+  }
+
+  set_is_running(true);
+
+  // Now that we are realtime (but before the OnRun handlers run), snap the
+  // queue index.
+  for (::std::unique_ptr<internal::WatcherState> &watcher : watchers_) {
+    watcher->PointAtNextQueueIndex();
+    CHECK(watcher->RegisterWakeup(priority_));
+  }
+
+  // Now that we are RT, run all the OnRun handlers.
+  for (const auto &run : on_run_) {
+    run();
+  }
+
+  // Start up all the phased loops.
+  for (::std::unique_ptr<internal::PhasedLoopHandler> &phased_loop :
+       phased_loops_) {
+    phased_loop->Startup();
+  }
+
+  // And start our main event loop which runs all the timers and handles Quit.
+  epoll_.Run();
+
+  // Once epoll exits, there is no useful nonrt work left to do.
+  set_is_running(false);
+
+  // Nothing time or synchronization critical needs to happen after this point.
+  // Drop RT priority.
+  ::aos::UnsetCurrentThreadRealtimePriority();
+
+  for (::std::unique_ptr<internal::WatcherState> &watcher : watchers_) {
+    watcher->UnregisterWakeup();
+  }
+
+  if (watchers_.size() > 0) {
+    epoll_.DeleteFd(signalfd->fd());
+    signalfd.reset();
+  }
+}
+
+void ShmEventLoop::Exit() { epoll_.Quit(); }
+
+ShmEventLoop::~ShmEventLoop() {
+  CHECK(!is_running()) << ": ShmEventLoop destroyed while running";
+}
+
+void ShmEventLoop::Take(const Channel *channel) {
+  CHECK(!is_running()) << ": Cannot add new objects while running.";
+
+  // Cheat aggresively.  Use the shared memory path as a proxy for a unique
+  // identifier for the channel.
+  const std::string path = ShmPath(channel);
+
+  const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
+  CHECK(prior == taken_.end()) << ": " << path << " is already being used.";
+
+  taken_.emplace_back(path);
+}
+
+void ShmEventLoop::SetRuntimeRealtimePriority(int priority) {
+  if (is_running()) {
+    LOG(FATAL) << "Cannot set realtime priority while running.";
+  }
+  priority_ = priority;
+}
+
+}  // namespace aos
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
new file mode 100644
index 0000000..e859201
--- /dev/null
+++ b/aos/events/shm_event_loop.h
@@ -0,0 +1,92 @@
+#ifndef AOS_EVENTS_SHM_EVENT_LOOP_H_
+#define AOS_EVENTS_SHM_EVENT_LOOP_H_
+
+#include <unordered_set>
+#include <vector>
+
+#include "aos/events/epoll.h"
+#include "aos/events/event_loop.h"
+#include "aos/ipc_lib/signalfd.h"
+#include "aos/mutex/mutex.h"
+
+namespace aos {
+namespace internal {
+
+class WatcherState;
+class TimerHandlerState;
+class PhasedLoopHandler;
+
+}  // namespace internal
+
+// Specialization of EventLoop that is built from queues running out of shared
+// memory. See more details at aos/queue.h
+//
+// This object must be interacted with from one thread, but the Senders and
+// Fetchers may be used from multiple threads afterwords (as long as their
+// destructors are called back in one thread again)
+class ShmEventLoop : public EventLoop {
+ public:
+  ShmEventLoop(const Configuration *configuration);
+  ~ShmEventLoop() override;
+
+  aos::monotonic_clock::time_point monotonic_now() override {
+    return aos::monotonic_clock::now();
+  }
+  aos::realtime_clock::time_point realtime_now() override {
+    return aos::realtime_clock::now();
+  }
+
+  std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) override;
+  std::unique_ptr<RawFetcher> MakeRawFetcher(const Channel *channel) override;
+
+  void MakeRawWatcher(
+      const Channel *channel,
+      std::function<void(const Context &context, const void *message)> watcher)
+      override;
+
+  TimerHandler *AddTimer(std::function<void()> callback) override;
+  aos::PhasedLoopHandler *AddPhasedLoop(
+      std::function<void(int)> callback,
+      const monotonic_clock::duration interval,
+      const monotonic_clock::duration offset =
+          std::chrono::seconds(0)) override;
+
+  void OnRun(std::function<void()> on_run) override;
+  void Run();
+  void Exit();
+
+  // TODO(austin): Add a function to register control-C call.
+
+  void SetRuntimeRealtimePriority(int priority) override;
+
+  void set_name(const absl::string_view name) override {
+    name_ = std::string(name);
+  }
+  const absl::string_view name() const override { return name_; }
+
+  int priority() const { return priority_; }
+
+ private:
+  friend class internal::WatcherState;
+  friend class internal::TimerHandlerState;
+  friend class internal::PhasedLoopHandler;
+
+  // Tracks that we can't have multiple watchers or a sender and a watcher (or
+  // multiple senders) on a single queue (path).
+  void Take(const Channel *channel);
+
+  std::vector<std::function<void()>> on_run_;
+  int priority_ = 0;
+  std::string name_;
+  std::vector<std::string> taken_;
+
+  internal::EPoll epoll_;
+
+  std::vector<std::unique_ptr<internal::TimerHandlerState>> timers_;
+  std::vector<std::unique_ptr<internal::PhasedLoopHandler>> phased_loops_;
+  std::vector<std::unique_ptr<internal::WatcherState>> watchers_;
+};
+
+}  // namespace aos
+
+#endif  // AOS_EVENTS_SHM_EVENT_LOOP_H_
diff --git a/aos/events/shm-event-loop_test.cc b/aos/events/shm_event_loop_test.cc
similarity index 72%
rename from aos/events/shm-event-loop_test.cc
rename to aos/events/shm_event_loop_test.cc
index e0d0b49..4406e01 100644
--- a/aos/events/shm-event-loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -1,9 +1,13 @@
-#include "aos/events/shm-event-loop.h"
+#include "aos/events/shm_event_loop.h"
 
-#include "aos/events/event-loop_param_test.h"
-#include "aos/testing/test_shm.h"
+#include "aos/events/event_loop_param_test.h"
+#include "glog/logging.h"
 #include "gtest/gtest.h"
 
+#include "aos/events/test_message_generated.h"
+
+DECLARE_string(shm_base);
+
 namespace aos {
 namespace testing {
 namespace {
@@ -11,28 +15,40 @@
 
 class ShmEventLoopTestFactory : public EventLoopTestFactory {
  public:
+  ShmEventLoopTestFactory() {
+    // Put all the queue files in ${TEST_TMPDIR} if it is set, otherwise
+    // everything will be reusing /dev/shm when sharded.
+    char *test_tmpdir = getenv("TEST_TMPDIR");
+    if (test_tmpdir != nullptr) {
+      FLAGS_shm_base = std::string(test_tmpdir) + "/aos";
+    }
+
+    // Clean up anything left there before.
+    unlink((FLAGS_shm_base + "test/aos.TestMessage.v0").c_str());
+    unlink((FLAGS_shm_base + "test1/aos.TestMessage.v0").c_str());
+    unlink((FLAGS_shm_base + "test2/aos.TestMessage.v0").c_str());
+  }
+
   ::std::unique_ptr<EventLoop> Make() override {
-    return ::std::unique_ptr<EventLoop>(new ShmEventLoop());
+    return ::std::unique_ptr<EventLoop>(new ShmEventLoop(configuration()));
   }
 
   ::std::unique_ptr<EventLoop> MakePrimary() override {
     ::std::unique_ptr<ShmEventLoop> loop =
-        ::std::unique_ptr<ShmEventLoop>(new ShmEventLoop());
+        ::std::unique_ptr<ShmEventLoop>(new ShmEventLoop(configuration()));
     primary_event_loop_ = loop.get();
     return ::std::move(loop);
   }
 
-  void Run() override { AOS_CHECK_NOTNULL(primary_event_loop_)->Run(); }
+  void Run() override { CHECK_NOTNULL(primary_event_loop_)->Run(); }
 
-  void Exit() override { AOS_CHECK_NOTNULL(primary_event_loop_)->Exit(); }
+  void Exit() override { CHECK_NOTNULL(primary_event_loop_)->Exit(); }
 
   void SleepFor(::std::chrono::nanoseconds duration) override {
     ::std::this_thread::sleep_for(duration);
   }
 
  private:
-  ::aos::testing::TestSharedMemory my_shm_;
-
   ::aos::ShmEventLoop *primary_event_loop_;
 };
 
@@ -46,24 +62,13 @@
                           return new ShmEventLoopTestFactory();
                         }));
 
-struct TestMessage : public ::aos::Message {
-  enum { kQueueLength = 100, kHash = 0x696c0cdc };
-  int msg_value;
-
-  void Zero() { msg_value = 0; }
-  static size_t Size() { return 1 + ::aos::Message::Size(); }
-  size_t Print(char *buffer, size_t length) const;
-  TestMessage() { Zero(); }
-};
-
 }  // namespace
 
 bool IsRealtime() {
   int scheduler;
-  if ((scheduler = sched_getscheduler(0)) == -1) {
-    AOS_PLOG(FATAL, "sched_getscheduler(0) failed\n");
-  }
-  AOS_LOG(INFO, "scheduler is %d\n", scheduler);
+  PCHECK((scheduler = sched_getscheduler(0)) != -1);
+
+  LOG(INFO) << "scheduler is " << scheduler;
   return scheduler == SCHED_FIFO || scheduler == SCHED_RR;
 }
 
@@ -82,7 +87,7 @@
   bool did_timer = false;
   bool did_watcher = false;
 
-  auto timer = loop->AddTimer([&did_timer, &loop, &factory]() {
+  auto timer = loop->AddTimer([&did_timer, &factory]() {
     EXPECT_TRUE(IsRealtime());
     did_timer = true;
     factory.Exit();
@@ -97,9 +102,11 @@
     EXPECT_TRUE(IsRealtime());
     did_onrun = true;
     timer->Setup(loop->monotonic_now() + chrono::milliseconds(100));
-    auto msg = sender.MakeMessage();
-    msg->msg_value = 200;
-    msg.Send();
+
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(200);
+    msg.Send(builder.Finish());
   });
 
   factory.Run();
diff --git a/aos/events/simulated-event-loop.cc b/aos/events/simulated-event-loop.cc
deleted file mode 100644
index 85ea21b..0000000
--- a/aos/events/simulated-event-loop.cc
+++ /dev/null
@@ -1,402 +0,0 @@
-#include "aos/events/simulated-event-loop.h"
-
-#include <algorithm>
-#include <deque>
-
-#include "aos/logging/logging.h"
-#include "aos/queue.h"
-#include "aos/testing/test_logging.h"
-#include "aos/util/phased_loop.h"
-
-namespace aos {
-namespace {
-
-class SimulatedSender : public RawSender {
- public:
-  SimulatedSender(SimulatedQueue *queue, EventLoop *event_loop)
-      : queue_(queue), event_loop_(event_loop) {
-    testing::EnableTestLogging();
-  }
-  ~SimulatedSender() {}
-
-  aos::Message *GetMessage() override {
-    return RefCountedBuffer(queue_->size()).release();
-  }
-
-  void Free(aos::Message *msg) override { RefCountedBuffer tmp(msg); }
-
-  bool Send(aos::Message *msg) override {
-    {
-      if (msg->sent_time == monotonic_clock::min_time) {
-        msg->sent_time = event_loop_->monotonic_now();
-      }
-    }
-    queue_->Send(RefCountedBuffer(msg));
-    return true;
-  }
-
-  const char *name() const override { return queue_->name(); }
-
- private:
-  SimulatedQueue *queue_;
-  EventLoop *event_loop_;
-};
-}  // namespace
-
-class SimulatedFetcher : public RawFetcher {
- public:
-  explicit SimulatedFetcher(SimulatedQueue *queue) : queue_(queue) {}
-  ~SimulatedFetcher() { queue_->UnregisterFetcher(this); }
-
-  bool FetchNext() override {
-    if (msgs_.size() == 0) return false;
-
-    msg_ = msgs_.front();
-    msgs_.pop_front();
-    set_most_recent(msg_.get());
-    return true;
-  }
-
-  bool Fetch() override {
-    if (msgs_.size() == 0) {
-      if (!msg_ && queue_->latest_message()) {
-        msg_ = queue_->latest_message();
-        set_most_recent(msg_.get());
-        return true;
-      } else {
-        return false;
-      }
-    }
-
-    // We've had a message enqueued, so we don't need to go looking for the
-    // latest message from before we started.
-    msg_ = msgs_.back();
-    msgs_.clear();
-    set_most_recent(msg_.get());
-    return true;
-  }
-
- private:
-  friend class SimulatedQueue;
-
-  // Internal method for Simulation to add a message to the buffer.
-  void Enqueue(RefCountedBuffer buffer) {
-    msgs_.emplace_back(buffer);
-  }
-
-  SimulatedQueue *queue_;
-  RefCountedBuffer msg_;
-
-  // Messages queued up but not in use.
-  ::std::deque<RefCountedBuffer> msgs_;
-};
-
-class SimulatedTimerHandler : public TimerHandler {
- public:
-  explicit SimulatedTimerHandler(EventScheduler *scheduler,
-                                 ::std::function<void()> fn)
-      : scheduler_(scheduler), fn_(fn) {}
-  ~SimulatedTimerHandler() {}
-
-  void Setup(monotonic_clock::time_point base,
-             monotonic_clock::duration repeat_offset) override {
-    Disable();
-    const ::aos::monotonic_clock::time_point monotonic_now =
-        scheduler_->monotonic_now();
-    base_ = base;
-    repeat_offset_ = repeat_offset;
-    if (base < monotonic_now) {
-      token_ = scheduler_->Schedule(monotonic_now, [this]() { HandleEvent(); });
-    } else {
-      token_ = scheduler_->Schedule(base, [this]() { HandleEvent(); });
-    }
-  }
-
-  void HandleEvent() {
-    const ::aos::monotonic_clock::time_point monotonic_now =
-        scheduler_->monotonic_now();
-    if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
-      // Reschedule.
-      while (base_ <= monotonic_now) base_ += repeat_offset_;
-      token_ = scheduler_->Schedule(base_, [this]() { HandleEvent(); });
-    } else {
-      token_ = EventScheduler::Token();
-    }
-    fn_();
-  }
-
-  void Disable() override {
-    if (token_ != EventScheduler::Token()) {
-      scheduler_->Deschedule(token_);
-      token_ = EventScheduler::Token();
-    }
-  }
-
-  ::aos::monotonic_clock::time_point monotonic_now() const {
-    return scheduler_->monotonic_now();
-  }
-
- private:
-  EventScheduler *scheduler_;
-  EventScheduler::Token token_;
-  // Function to be run on the thread
-  ::std::function<void()> fn_;
-  monotonic_clock::time_point base_;
-  monotonic_clock::duration repeat_offset_;
-};
-
-class SimulatedPhasedLoopHandler : public PhasedLoopHandler {
- public:
-  SimulatedPhasedLoopHandler(EventScheduler *scheduler,
-                             ::std::function<void(int)> fn,
-                             const monotonic_clock::duration interval,
-                             const monotonic_clock::duration offset)
-      : simulated_timer_handler_(scheduler, [this]() { HandleTimerWakeup(); }),
-        phased_loop_(interval, simulated_timer_handler_.monotonic_now(),
-                     offset),
-        fn_(fn) {
-    // TODO(austin): This assumes time doesn't change between when the
-    // constructor is called and when we start running.  It's probably a safe
-    // assumption.
-    Reschedule();
-  }
-
-  void HandleTimerWakeup() {
-    fn_(cycles_elapsed_);
-    Reschedule();
-  }
-
-  void set_interval_and_offset(
-      const monotonic_clock::duration interval,
-      const monotonic_clock::duration offset) override {
-    phased_loop_.set_interval_and_offset(interval, offset);
-  }
-
-  void Reschedule() {
-    cycles_elapsed_ =
-        phased_loop_.Iterate(simulated_timer_handler_.monotonic_now());
-    simulated_timer_handler_.Setup(phased_loop_.sleep_time(),
-                                   ::aos::monotonic_clock::zero());
-  }
-
- private:
-  SimulatedTimerHandler simulated_timer_handler_;
-
-  time::PhasedLoop phased_loop_;
-
-  int cycles_elapsed_ = 1;
-
-  ::std::function<void(int)> fn_;
-};
-
-class SimulatedEventLoop : public EventLoop {
- public:
-  explicit SimulatedEventLoop(
-      EventScheduler *scheduler,
-      ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
-          *queues)
-      : scheduler_(scheduler), queues_(queues) {
-    scheduler_->AddRawEventLoop(this);
-  }
-  ~SimulatedEventLoop() override { scheduler_->RemoveRawEventLoop(this); };
-
-  ::aos::monotonic_clock::time_point monotonic_now() override {
-    return scheduler_->monotonic_now();
-  }
-
-  ::std::unique_ptr<RawSender> MakeRawSender(
-      const ::std::string &path, const QueueTypeInfo &type) override;
-
-  ::std::unique_ptr<RawFetcher> MakeRawFetcher(
-      const ::std::string &path, const QueueTypeInfo &type) override;
-
-  void MakeRawWatcher(
-      const ::std::string &path, const QueueTypeInfo &type,
-      ::std::function<void(const ::aos::Message *message)> watcher) override;
-
-  TimerHandler *AddTimer(::std::function<void()> callback) override {
-    timers_.emplace_back(new SimulatedTimerHandler(scheduler_, callback));
-    return timers_.back().get();
-  }
-
-  PhasedLoopHandler *AddPhasedLoop(::std::function<void(int)> callback,
-                                   const monotonic_clock::duration interval,
-                                   const monotonic_clock::duration offset =
-                                       ::std::chrono::seconds(0)) override {
-    phased_loops_.emplace_back(
-        new SimulatedPhasedLoopHandler(scheduler_, callback, interval, offset));
-    return phased_loops_.back().get();
-  }
-
-  void OnRun(::std::function<void()> on_run) override {
-    scheduler_->Schedule(scheduler_->monotonic_now(), on_run);
-  }
-
-  void set_name(const char *name) override { name_ = name; }
-
-  SimulatedQueue *GetSimulatedQueue(
-      const ::std::pair<::std::string, QueueTypeInfo> &);
-
-  void Take(const ::std::string &path);
-
-  void SetRuntimeRealtimePriority(int /*priority*/) override {
-    if (is_running()) {
-      ::aos::Die("Cannot set realtime priority while running.");
-    }
-  }
-
- private:
-  EventScheduler *scheduler_;
-  ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
-      *queues_;
-  ::std::vector<std::string> taken_;
-  ::std::vector<std::unique_ptr<TimerHandler>> timers_;
-  ::std::vector<std::unique_ptr<PhasedLoopHandler>> phased_loops_;
-
-  ::std::string name_;
-};
-
-EventScheduler::Token EventScheduler::Schedule(
-    ::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
-  return events_list_.emplace(time, callback);
-}
-
-void EventScheduler::Deschedule(EventScheduler::Token token) {
-  events_list_.erase(token);
-}
-
-void EventScheduler::RunFor(monotonic_clock::duration duration) {
-  const ::aos::monotonic_clock::time_point end_time =
-      monotonic_now() + duration;
-  testing::MockTime(monotonic_now());
-  for (RawEventLoop *event_loop : raw_event_loops_) {
-    event_loop->set_is_running(true);
-  }
-  is_running_ = true;
-  while (!events_list_.empty() && is_running_) {
-    auto iter = events_list_.begin();
-    ::aos::monotonic_clock::time_point next_time = iter->first;
-    if (next_time > end_time) {
-      break;
-    }
-    now_ = iter->first;
-    testing::MockTime(now_);
-    ::std::function<void()> callback = ::std::move(iter->second);
-    events_list_.erase(iter);
-    callback();
-  }
-  now_ = end_time;
-  if (!is_running_) {
-    for (RawEventLoop *event_loop : raw_event_loops_) {
-      event_loop->set_is_running(false);
-    }
-  }
-  testing::UnMockTime();
-}
-
-void EventScheduler::Run() {
-  testing::MockTime(monotonic_now());
-  for (RawEventLoop *event_loop : raw_event_loops_) {
-    event_loop->set_is_running(true);
-  }
-  is_running_ = true;
-  while (!events_list_.empty() && is_running_) {
-    auto iter = events_list_.begin();
-    now_ = iter->first;
-    testing::MockTime(now_);
-    ::std::function<void()> callback = ::std::move(iter->second);
-    events_list_.erase(iter);
-    callback();
-  }
-  if (!is_running_) {
-    for (RawEventLoop *event_loop : raw_event_loops_) {
-      event_loop->set_is_running(false);
-    }
-  }
-  testing::UnMockTime();
-}
-
-void SimulatedEventLoop::MakeRawWatcher(
-    const std::string &path, const QueueTypeInfo &type,
-    std::function<void(const aos::Message *message)> watcher) {
-  Take(path);
-  ::std::pair<::std::string, QueueTypeInfo> key(path, type);
-  GetSimulatedQueue(key)->MakeRawWatcher(watcher);
-}
-
-std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
-    const std::string &path, const QueueTypeInfo &type) {
-  Take(path);
-  ::std::pair<::std::string, QueueTypeInfo> key(path, type);
-  return GetSimulatedQueue(key)->MakeRawSender(this);
-}
-
-std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
-    const std::string &path, const QueueTypeInfo &type) {
-  ::std::pair<::std::string, QueueTypeInfo> key(path, type);
-  return GetSimulatedQueue(key)->MakeRawFetcher();
-}
-
-SimulatedQueue *SimulatedEventLoop::GetSimulatedQueue(
-    const ::std::pair<::std::string, QueueTypeInfo> &type) {
-  auto it = queues_->find(type);
-  if (it == queues_->end()) {
-    it =
-        queues_
-            ->emplace(type, SimulatedQueue(type.second, type.first, scheduler_))
-            .first;
-  }
-  return &it->second;
-}
-
-void SimulatedQueue::MakeRawWatcher(
-    ::std::function<void(const aos::Message *message)> watcher) {
-  watchers_.push_back(watcher);
-}
-
-::std::unique_ptr<RawSender> SimulatedQueue::MakeRawSender(
-    EventLoop *event_loop) {
-  return ::std::unique_ptr<RawSender>(new SimulatedSender(this, event_loop));
-}
-
-::std::unique_ptr<RawFetcher> SimulatedQueue::MakeRawFetcher() {
-  ::std::unique_ptr<SimulatedFetcher> fetcher(new SimulatedFetcher(this));
-  fetchers_.push_back(fetcher.get());
-  return ::std::move(fetcher);
-}
-
-void SimulatedQueue::Send(RefCountedBuffer message) {
-  latest_message_ = message;
-  if (scheduler_->is_running()) {
-    for (auto &watcher : watchers_) {
-      scheduler_->Schedule(scheduler_->monotonic_now(),
-                           [watcher, message]() { watcher(message.get()); });
-    }
-  }
-  for (auto &fetcher : fetchers_) {
-    fetcher->Enqueue(message);
-  }
-}
-
-void SimulatedQueue::UnregisterFetcher(SimulatedFetcher *fetcher) {
-  fetchers_.erase(::std::find(fetchers_.begin(), fetchers_.end(), fetcher));
-}
-
-void SimulatedEventLoop::Take(const ::std::string &path) {
-  if (is_running()) {
-    ::aos::Die("Cannot add new objects while running.\n");
-  }
-  const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
-  if (prior != taken_.end()) {
-    ::aos::Die("%s is already being used.", path.c_str());
-  } else {
-    taken_.emplace_back(path);
-  }
-}
-
-::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop() {
-  return ::std::unique_ptr<EventLoop>(
-      new SimulatedEventLoop(&scheduler_, &queues_));
-}
-
-}  // namespace aos
diff --git a/aos/events/simulated-event-loop.h b/aos/events/simulated-event-loop.h
deleted file mode 100644
index 2fcc9d1..0000000
--- a/aos/events/simulated-event-loop.h
+++ /dev/null
@@ -1,214 +0,0 @@
-#ifndef _AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
-#define _AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
-
-#include <algorithm>
-#include <map>
-#include <memory>
-#include <unordered_set>
-#include <utility>
-#include <vector>
-
-#include "aos/events/event-loop.h"
-
-namespace aos {
-
-// This class manages allocation of queue messages for simulation.
-// Unfortunately, because the current interfaces all assume that we pass around
-// raw pointers to messages we can't use a std::shared_ptr or the such, and
-// because aos::Message's themselves to not have any sort of built-in support
-// for this, we need to manage memory for the Messages in some custom fashion.
-// In this case, we do so by allocating a ref-counter in the bytes immediately
-// preceding the aos::Message. We then provide a constructor that takes just a
-// pointer to an existing message and we assume that it was allocated using this
-// class, and can decrement the counter if the RefCountedBuffer we constructed
-// goes out of scope. There are currently no checks to ensure that pointers
-// passed into this class were actually allocated using this class.
-class RefCountedBuffer {
- public:
-  RefCountedBuffer() {}
-  ~RefCountedBuffer() { clear(); }
-
-  // Create a RefCountedBuffer for some Message that was already allocated using
-  // a RefCountedBuffer class. This, or some function like it, is required to
-  // allow us to let users of the simulated event loops work with raw pointers
-  // to messages.
-  explicit RefCountedBuffer(aos::Message *data) : data_(data) {}
-
-  // Allocates memory for a new message of a given size. Does not initialize the
-  // memory or call any constructors.
-  explicit RefCountedBuffer(size_t size) {
-    data_ = reinterpret_cast<uint8_t *>(malloc(kRefCountSize + size)) +
-            kRefCountSize;
-    // Initialize the allocated memory with an integer
-    *GetRefCount() = 1;
-  }
-
-  RefCountedBuffer(const RefCountedBuffer &other) {
-    data_ = other.data_;
-    if (data_ != nullptr) {
-      ++*GetRefCount();
-    }
-  }
-
-  RefCountedBuffer(RefCountedBuffer &&other) { std::swap(data_, other.data_); }
-
-  RefCountedBuffer &operator=(const RefCountedBuffer &other) {
-    if (this == &other) return *this;
-    clear();
-    data_ = other.data_;
-    ++*GetRefCount();
-    return *this;
-  }
-
-  RefCountedBuffer &operator=(RefCountedBuffer &&other) {
-    if (this == &other) return *this;
-    std::swap(data_, other.data_);
-    return *this;
-  }
-
-  operator bool() const { return data_ != nullptr; }
-
-  aos::Message *get() const { return static_cast<aos::Message *>(data_); }
-
-  aos::Message *release() {
-    auto tmp = get();
-    data_ = nullptr;
-    return tmp;
-  }
-
-  void clear() {
-    if (data_ != nullptr) {
-      if (--*GetRefCount() == 0) {
-        // Free memory block from the start of the allocated block
-        free(GetRefCount());
-      }
-      data_ = nullptr;
-    }
-  }
-
- private:
-  void *data_ = nullptr;
-  // Qty. memory to be allocated to the ref counter
-  static constexpr size_t kRefCountSize = sizeof(int64_t);
-
-  int64_t *GetRefCount() {
-    // Need to cast the void* to an 8 bit long object (size of void* is
-    // technically 0)
-    return reinterpret_cast<int64_t *>(static_cast<void *>(
-        reinterpret_cast<uint8_t *>(data_) - kRefCountSize));
-  }
-};
-
-class EventScheduler {
- public:
-  using QueueType = ::std::multimap<::aos::monotonic_clock::time_point,
-                                    ::std::function<void()>>;
-  using Token = QueueType::iterator;
-
-  // Schedule an event with a callback function
-  // Returns an iterator to the event
-  Token Schedule(::aos::monotonic_clock::time_point time,
-                 ::std::function<void()> callback);
-
-  // Deschedule an event by its iterator
-  void Deschedule(Token token);
-
-  void Run();
-  void RunFor(::aos::monotonic_clock::duration duration);
-
-  void Exit() {
-    is_running_ = false;
-  }
-
-  bool is_running() const { return is_running_; }
-
-  void AddRawEventLoop(RawEventLoop *event_loop) {
-    raw_event_loops_.push_back(event_loop);
-  }
-  void RemoveRawEventLoop(RawEventLoop *event_loop) {
-    raw_event_loops_.erase(::std::find(raw_event_loops_.begin(),
-                                       raw_event_loops_.end(), event_loop));
-  }
-
-  ::aos::monotonic_clock::time_point monotonic_now() const { return now_; }
-
- private:
-  ::aos::monotonic_clock::time_point now_ = ::aos::monotonic_clock::epoch();
-  QueueType events_list_;
-  bool is_running_ = false;
-  ::std::vector<RawEventLoop *> raw_event_loops_;
-};
-
-// Class for simulated fetchers.
-class SimulatedFetcher;
-
-class SimulatedQueue {
- public:
-  explicit SimulatedQueue(const QueueTypeInfo &type, const ::std::string &name,
-                          EventScheduler *scheduler)
-      : type_(type), name_(name), scheduler_(scheduler){};
-
-  ~SimulatedQueue() { AOS_CHECK_EQ(0u, fetchers_.size()); }
-
-  // Makes a connected raw sender which calls Send below.
-  ::std::unique_ptr<RawSender> MakeRawSender(EventLoop *event_loop);
-
-  // Makes a connected raw fetcher.
-  ::std::unique_ptr<RawFetcher> MakeRawFetcher();
-
-  // Registers a watcher for the queue.
-  void MakeRawWatcher(
-      ::std::function<void(const ::aos::Message *message)> watcher);
-
-  // Sends the message to all the connected receivers and fetchers.
-  void Send(RefCountedBuffer message);
-
-  // Unregisters a fetcher.
-  void UnregisterFetcher(SimulatedFetcher *fetcher);
-
-  const RefCountedBuffer &latest_message() { return latest_message_; }
-
-  size_t size() const { return type_.size; }
-
-  const char *name() const { return name_.c_str(); }
-
- private:
-  const QueueTypeInfo type_;
-  const ::std::string name_;
-
-  // List of all watchers.
-  ::std::vector<std::function<void(const aos::Message *message)>> watchers_;
-
-  // List of all fetchers.
-  ::std::vector<SimulatedFetcher *> fetchers_;
-  RefCountedBuffer latest_message_;
-  EventScheduler *scheduler_;
-};
-
-class SimulatedEventLoopFactory {
- public:
-  ::std::unique_ptr<EventLoop> MakeEventLoop();
-
-  // Starts executing the event loops unconditionally.
-  void Run() { scheduler_.Run(); }
-  // Executes the event loops for a duration.
-  void RunFor(monotonic_clock::duration duration) {
-    scheduler_.RunFor(duration);
-  }
-
-  // Stops executing all event loops.  Meant to be called from within an event
-  // loop handler.
-  void Exit() { scheduler_.Exit(); }
-
-  monotonic_clock::time_point monotonic_now() const {
-    return scheduler_.monotonic_now();
-  }
-
- private:
-  EventScheduler scheduler_;
-  ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue> queues_;
-};
-
-}  // namespace aos
-
-#endif  //_AOS_EVENTS_TEST_EVENT_LOOP_H_
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
new file mode 100644
index 0000000..9bc74d5
--- /dev/null
+++ b/aos/events/simulated_event_loop.cc
@@ -0,0 +1,514 @@
+#include "aos/events/simulated_event_loop.h"
+
+#include <algorithm>
+#include <deque>
+
+#include "absl/container/btree_map.h"
+#include "absl/container/btree_set.h"
+#include "aos/json_to_flatbuffer.h"
+#include "aos/util/phased_loop.h"
+
+namespace aos {
+
+// Container for both a message, and the context for it for simulation.  This
+// makes tracking the timestamps associated with the data easy.
+struct SimulatedMessage {
+  // Struct to let us force data to be well aligned.
+  struct OveralignedChar {
+    char data alignas(32);
+  };
+
+  // Context for the data.
+  Context context;
+
+  // The data.
+  char *data() { return reinterpret_cast<char *>(&actual_data[0]); }
+
+  // Then the data.
+  OveralignedChar actual_data[];
+};
+
+class SimulatedFetcher;
+
+class SimulatedChannel {
+ public:
+  explicit SimulatedChannel(const Channel *channel, EventScheduler *scheduler)
+      : channel_(CopyFlatBuffer(channel)),
+        scheduler_(scheduler),
+        next_queue_index_(ipc_lib::QueueIndex::Zero(channel->max_size())) {}
+
+  ~SimulatedChannel() { CHECK_EQ(0u, fetchers_.size()); }
+
+  // Makes a connected raw sender which calls Send below.
+  ::std::unique_ptr<RawSender> MakeRawSender(EventLoop *event_loop);
+
+  // Makes a connected raw fetcher.
+  ::std::unique_ptr<RawFetcher> MakeRawFetcher();
+
+  // Registers a watcher for the queue.
+  void MakeRawWatcher(
+      ::std::function<void(const Context &context, const void *message)>
+          watcher);
+
+  // Sends the message to all the connected receivers and fetchers.
+  void Send(std::shared_ptr<SimulatedMessage> message);
+
+  // Unregisters a fetcher.
+  void UnregisterFetcher(SimulatedFetcher *fetcher);
+
+  std::shared_ptr<SimulatedMessage> latest_message() { return latest_message_; }
+
+  size_t max_size() const { return channel_.message().max_size(); }
+
+  const absl::string_view name() const {
+    return channel_.message().name()->string_view();
+  }
+
+  const Channel *channel() const { return &channel_.message(); }
+
+ private:
+  const FlatbufferDetachedBuffer<Channel> channel_;
+
+  // List of all watchers.
+  ::std::vector<
+      std::function<void(const Context &context, const void *message)>>
+      watchers_;
+
+  // List of all fetchers.
+  ::std::vector<SimulatedFetcher *> fetchers_;
+  std::shared_ptr<SimulatedMessage> latest_message_;
+  EventScheduler *scheduler_;
+
+  ipc_lib::QueueIndex next_queue_index_;
+};
+
+namespace {
+
+// Creates a SimulatedMessage with size bytes of storage.
+// This is a shared_ptr so we don't have to implement refcounting or copying.
+std::shared_ptr<SimulatedMessage> MakeSimulatedMessage(size_t size) {
+  SimulatedMessage *message = reinterpret_cast<SimulatedMessage *>(
+      malloc(sizeof(SimulatedMessage) + size));
+  message->context.size = size;
+  message->context.data = message->data();
+
+  return std::shared_ptr<SimulatedMessage>(message, free);
+}
+
+class SimulatedSender : public RawSender {
+ public:
+  SimulatedSender(SimulatedChannel *simulated_channel, EventLoop *event_loop)
+      : simulated_channel_(simulated_channel), event_loop_(event_loop) {}
+  ~SimulatedSender() {}
+
+  void *data() override {
+    if (!message_) {
+      message_ = MakeSimulatedMessage(simulated_channel_->max_size());
+    }
+    return message_->data();
+  }
+
+  size_t size() override { return simulated_channel_->max_size(); }
+
+  bool Send(size_t length) override {
+    CHECK_LE(length, size()) << ": Attempting to send too big a message.";
+    message_->context.monotonic_sent_time = event_loop_->monotonic_now();
+    message_->context.realtime_sent_time = event_loop_->realtime_now();
+    CHECK_LE(length, message_->context.size);
+    message_->context.size = length;
+
+    // TODO(austin): Track sending too fast.
+    simulated_channel_->Send(message_);
+
+    // Drop the reference to the message so that we allocate a new message for
+    // next time.  Otherwise we will continue to reuse the same memory for all
+    // messages and corrupt it.
+    message_.reset();
+    return true;
+  }
+
+  bool Send(void *msg, size_t size) override {
+    CHECK_LE(size, this->size()) << ": Attempting to send too big a message.";
+
+    // This is wasteful, but since flatbuffers fill from the back end of the
+    // queue, we need it to be full sized.
+    message_ = MakeSimulatedMessage(simulated_channel_->max_size());
+
+    // Now fill in the message.  size is already populated above, and
+    // queue_index will be populated in queue_.  Put this at the back of the
+    // data segment.
+    memcpy(message_->data() + simulated_channel_->max_size() - size, msg, size);
+
+    return Send(size);
+  }
+
+  const absl::string_view name() const override {
+    return simulated_channel_->name();
+  }
+
+ private:
+  SimulatedChannel *simulated_channel_;
+  EventLoop *event_loop_;
+
+  std::shared_ptr<SimulatedMessage> message_;
+};
+}  // namespace
+
+class SimulatedFetcher : public RawFetcher {
+ public:
+  explicit SimulatedFetcher(SimulatedChannel *queue) : queue_(queue) {}
+  ~SimulatedFetcher() { queue_->UnregisterFetcher(this); }
+
+  bool FetchNext() override {
+    if (msgs_.size() == 0) return false;
+
+    SetMsg(msgs_.front());
+    msgs_.pop_front();
+    return true;
+  }
+
+  bool Fetch() override {
+    if (msgs_.size() == 0) {
+      if (!msg_ && queue_->latest_message()) {
+        SetMsg(queue_->latest_message());
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    // We've had a message enqueued, so we don't need to go looking for the
+    // latest message from before we started.
+    SetMsg(msgs_.back());
+    msgs_.clear();
+    return true;
+  }
+
+ private:
+  friend class SimulatedChannel;
+
+  // Updates the state inside RawFetcher to point to the data in msg_.
+  void SetMsg(std::shared_ptr<SimulatedMessage> msg) {
+    msg_ = msg;
+    data_ = msg_->context.data;
+    context_ = msg_->context;
+  }
+
+  // Internal method for Simulation to add a message to the buffer.
+  void Enqueue(std::shared_ptr<SimulatedMessage> buffer) {
+    msgs_.emplace_back(buffer);
+  }
+
+  SimulatedChannel *queue_;
+  std::shared_ptr<SimulatedMessage> msg_;
+
+  // Messages queued up but not in use.
+  ::std::deque<std::shared_ptr<SimulatedMessage>> msgs_;
+};
+
+class SimulatedTimerHandler : public TimerHandler {
+ public:
+  explicit SimulatedTimerHandler(EventScheduler *scheduler,
+                                 ::std::function<void()> fn)
+      : scheduler_(scheduler), token_(scheduler_->InvalidToken()), fn_(fn) {}
+  ~SimulatedTimerHandler() {}
+
+  void Setup(monotonic_clock::time_point base,
+             monotonic_clock::duration repeat_offset) override {
+    Disable();
+    const ::aos::monotonic_clock::time_point monotonic_now =
+        scheduler_->monotonic_now();
+    base_ = base;
+    repeat_offset_ = repeat_offset;
+    if (base < monotonic_now) {
+      token_ = scheduler_->Schedule(monotonic_now, [this]() { HandleEvent(); });
+    } else {
+      token_ = scheduler_->Schedule(base, [this]() { HandleEvent(); });
+    }
+  }
+
+  void HandleEvent() {
+    const ::aos::monotonic_clock::time_point monotonic_now =
+        scheduler_->monotonic_now();
+    if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
+      // Reschedule.
+      while (base_ <= monotonic_now) base_ += repeat_offset_;
+      token_ = scheduler_->Schedule(base_, [this]() { HandleEvent(); });
+    } else {
+      token_ = scheduler_->InvalidToken();
+    }
+    fn_();
+  }
+
+  void Disable() override {
+    if (token_ != scheduler_->InvalidToken()) {
+      scheduler_->Deschedule(token_);
+      token_ = scheduler_->InvalidToken();
+    }
+  }
+
+  ::aos::monotonic_clock::time_point monotonic_now() const {
+    return scheduler_->monotonic_now();
+  }
+
+ private:
+  EventScheduler *scheduler_;
+  EventScheduler::Token token_;
+  // Function to be run on the thread
+  ::std::function<void()> fn_;
+  monotonic_clock::time_point base_;
+  monotonic_clock::duration repeat_offset_;
+};
+
+class SimulatedPhasedLoopHandler : public PhasedLoopHandler {
+ public:
+  SimulatedPhasedLoopHandler(EventScheduler *scheduler,
+                             ::std::function<void(int)> fn,
+                             const monotonic_clock::duration interval,
+                             const monotonic_clock::duration offset)
+      : simulated_timer_handler_(scheduler, [this]() { HandleTimerWakeup(); }),
+        phased_loop_(interval, simulated_timer_handler_.monotonic_now(),
+                     offset),
+        fn_(fn) {
+    // TODO(austin): This assumes time doesn't change between when the
+    // constructor is called and when we start running.  It's probably a safe
+    // assumption.
+    Reschedule();
+  }
+
+  void HandleTimerWakeup() {
+    fn_(cycles_elapsed_);
+    Reschedule();
+  }
+
+  void set_interval_and_offset(
+      const monotonic_clock::duration interval,
+      const monotonic_clock::duration offset) override {
+    phased_loop_.set_interval_and_offset(interval, offset);
+  }
+
+  void Reschedule() {
+    cycles_elapsed_ =
+        phased_loop_.Iterate(simulated_timer_handler_.monotonic_now());
+    simulated_timer_handler_.Setup(phased_loop_.sleep_time(),
+                                   ::aos::monotonic_clock::zero());
+  }
+
+ private:
+  SimulatedTimerHandler simulated_timer_handler_;
+
+  time::PhasedLoop phased_loop_;
+
+  int cycles_elapsed_ = 1;
+
+  ::std::function<void(int)> fn_;
+};
+
+class SimulatedEventLoop : public EventLoop {
+ public:
+  explicit SimulatedEventLoop(
+      EventScheduler *scheduler,
+      absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>>
+          *channels,
+      const Configuration *configuration,
+      std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+          *raw_event_loops)
+      : EventLoop(configuration),
+        scheduler_(scheduler),
+        channels_(channels),
+        raw_event_loops_(raw_event_loops) {
+    raw_event_loops_->push_back(
+        std::make_pair(this, [this](bool value) { set_is_running(value); }));
+  }
+  ~SimulatedEventLoop() override {
+    for (auto it = raw_event_loops_->begin(); it != raw_event_loops_->end();
+         ++it) {
+      if (it->first == this) {
+        raw_event_loops_->erase(it);
+        break;
+      }
+    }
+  }
+
+  ::aos::monotonic_clock::time_point monotonic_now() override {
+    return scheduler_->monotonic_now();
+  }
+
+  ::aos::realtime_clock::time_point realtime_now() override {
+    return scheduler_->realtime_now();
+  }
+
+  ::std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) override;
+
+  ::std::unique_ptr<RawFetcher> MakeRawFetcher(const Channel *channel) override;
+
+  void MakeRawWatcher(
+      const Channel *channel,
+      ::std::function<void(const Context &context, const void *message)>
+          watcher) override;
+
+  TimerHandler *AddTimer(::std::function<void()> callback) override {
+    timers_.emplace_back(new SimulatedTimerHandler(scheduler_, callback));
+    return timers_.back().get();
+  }
+
+  PhasedLoopHandler *AddPhasedLoop(::std::function<void(int)> callback,
+                                   const monotonic_clock::duration interval,
+                                   const monotonic_clock::duration offset =
+                                       ::std::chrono::seconds(0)) override {
+    phased_loops_.emplace_back(
+        new SimulatedPhasedLoopHandler(scheduler_, callback, interval, offset));
+    return phased_loops_.back().get();
+  }
+
+  void OnRun(::std::function<void()> on_run) override {
+    scheduler_->Schedule(scheduler_->monotonic_now(), on_run);
+  }
+
+  void set_name(const absl::string_view name) override {
+    name_ = std::string(name);
+  }
+  const absl::string_view name() const override { return name_; }
+
+  SimulatedChannel *GetSimulatedChannel(const Channel *channel);
+
+  void Take(const Channel *channel);
+
+  void SetRuntimeRealtimePriority(int /*priority*/) override {
+    CHECK(!is_running()) << ": Cannot set realtime priority while running.";
+  }
+
+ private:
+  EventScheduler *scheduler_;
+  absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> *channels_;
+  std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+      *raw_event_loops_;
+  absl::btree_set<SimpleChannel> taken_;
+  ::std::vector<std::unique_ptr<TimerHandler>> timers_;
+  ::std::vector<std::unique_ptr<PhasedLoopHandler>> phased_loops_;
+
+  ::std::string name_;
+};
+
+void SimulatedEventLoop::MakeRawWatcher(
+    const Channel *channel,
+    std::function<void(const Context &channel, const void *message)> watcher) {
+  Take(channel);
+  GetSimulatedChannel(channel)->MakeRawWatcher(watcher);
+}
+
+std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
+    const Channel *channel) {
+  Take(channel);
+  return GetSimulatedChannel(channel)->MakeRawSender(this);
+}
+
+std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
+    const Channel *channel) {
+  return GetSimulatedChannel(channel)->MakeRawFetcher();
+}
+
+SimulatedChannel *SimulatedEventLoop::GetSimulatedChannel(
+    const Channel *channel) {
+  auto it = channels_->find(SimpleChannel(channel));
+  if (it == channels_->end()) {
+    it = channels_
+             ->emplace(SimpleChannel(channel),
+                       std::unique_ptr<SimulatedChannel>(
+                           new SimulatedChannel(channel, scheduler_)))
+             .first;
+  }
+  return it->second.get();
+}
+
+void SimulatedChannel::MakeRawWatcher(
+    ::std::function<void(const Context &context, const void *message)>
+        watcher) {
+  watchers_.push_back(watcher);
+}
+
+::std::unique_ptr<RawSender> SimulatedChannel::MakeRawSender(
+    EventLoop *event_loop) {
+  return ::std::unique_ptr<RawSender>(new SimulatedSender(this, event_loop));
+}
+
+::std::unique_ptr<RawFetcher> SimulatedChannel::MakeRawFetcher() {
+  ::std::unique_ptr<SimulatedFetcher> fetcher(new SimulatedFetcher(this));
+  fetchers_.push_back(fetcher.get());
+  return ::std::move(fetcher);
+}
+
+void SimulatedChannel::Send(std::shared_ptr<SimulatedMessage> message) {
+  message->context.queue_index = next_queue_index_.index();
+  message->context.data =
+      message->data() + channel()->max_size() - message->context.size;
+  next_queue_index_ = next_queue_index_.Increment();
+
+  latest_message_ = message;
+  if (scheduler_->is_running()) {
+    for (auto &watcher : watchers_) {
+      scheduler_->Schedule(scheduler_->monotonic_now(), [watcher, message]() {
+        watcher(message->context, message->context.data);
+      });
+    }
+  }
+  for (auto &fetcher : fetchers_) {
+    fetcher->Enqueue(message);
+  }
+}
+
+void SimulatedChannel::UnregisterFetcher(SimulatedFetcher *fetcher) {
+  fetchers_.erase(::std::find(fetchers_.begin(), fetchers_.end(), fetcher));
+}
+
+SimpleChannel::SimpleChannel(const Channel *channel)
+    : name(CHECK_NOTNULL(CHECK_NOTNULL(channel)->name())->str()),
+      type(CHECK_NOTNULL(CHECK_NOTNULL(channel)->type())->str()) {}
+
+void SimulatedEventLoop::Take(const Channel *channel) {
+  CHECK(!is_running()) << ": Cannot add new objects while running.";
+
+  auto result = taken_.insert(SimpleChannel(channel));
+  CHECK(result.second) << ": " << FlatbufferToJson(channel)
+                       << " is already being used.";
+}
+
+SimulatedEventLoopFactory::SimulatedEventLoopFactory(
+    const Configuration *configuration)
+    : configuration_(configuration) {}
+SimulatedEventLoopFactory::~SimulatedEventLoopFactory() {}
+
+::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop() {
+  return ::std::unique_ptr<EventLoop>(new SimulatedEventLoop(
+      &scheduler_, &channels_, configuration_, &raw_event_loops_));
+}
+
+void SimulatedEventLoopFactory::RunFor(monotonic_clock::duration duration) {
+  for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
+       raw_event_loops_) {
+    event_loop.second(true);
+  }
+  scheduler_.RunFor(duration);
+  if (!scheduler_.is_running()) {
+    for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
+         raw_event_loops_) {
+      event_loop.second(false);
+    }
+  }
+}
+
+void SimulatedEventLoopFactory::Run() {
+  for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
+       raw_event_loops_) {
+    event_loop.second(true);
+  }
+  scheduler_.Run();
+  if (!scheduler_.is_running()) {
+    for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
+         raw_event_loops_) {
+      event_loop.second(false);
+    }
+  }
+}
+
+}  // namespace aos
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
new file mode 100644
index 0000000..485ab68
--- /dev/null
+++ b/aos/events/simulated_event_loop.h
@@ -0,0 +1,86 @@
+#ifndef AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
+#define AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
+
+#include <algorithm>
+#include <map>
+#include <memory>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "absl/container/btree_map.h"
+#include "aos/events/event_loop.h"
+#include "aos/events/event_scheduler.h"
+#include "aos/flatbuffer_merge.h"
+#include "aos/flatbuffers.h"
+#include "aos/ipc_lib/index.h"
+#include "glog/logging.h"
+
+namespace aos {
+
+// Class for simulated fetchers.
+class SimulatedChannel;
+
+struct SimpleChannel {
+  SimpleChannel(const Channel *channel);
+  std::string name;
+  std::string type;
+
+  std::string DebugString() const {
+    return std::string("{ ") + name + ", " + type + "}";
+  }
+
+  bool operator==(const SimpleChannel &other) const {
+    return name == other.name && type == other.type;
+  }
+  bool operator<(const SimpleChannel &other) const {
+    int name_compare = other.name.compare(name);
+    if (name_compare == 0) {
+      return other.type < type;
+    } else if (name_compare < 0) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+};
+
+class SimulatedEventLoopFactory {
+ public:
+  // Constructs a SimulatedEventLoopFactory with the provided configuration.
+  // This configuration must remain in scope for the lifetime of the factory and
+  // all sub-objects.
+  SimulatedEventLoopFactory(const Configuration *configuration);
+  ~SimulatedEventLoopFactory();
+
+  ::std::unique_ptr<EventLoop> MakeEventLoop();
+
+  // Starts executing the event loops unconditionally.
+  void Run();
+  // Executes the event loops for a duration.
+  void RunFor(monotonic_clock::duration duration);
+
+  // Stops executing all event loops.  Meant to be called from within an event
+  // loop handler.
+  void Exit() { scheduler_.Exit(); }
+
+  monotonic_clock::time_point monotonic_now() const {
+    return scheduler_.monotonic_now();
+  }
+  realtime_clock::time_point realtime_now() const {
+    return scheduler_.realtime_now();
+  }
+
+ private:
+  const Configuration *configuration_;
+  EventScheduler scheduler_;
+  // Map from name, type to queue.
+  absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> channels_;
+  // List of event loops to manage running and not running for.
+  std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+      raw_event_loops_;
+};
+
+}  // namespace aos
+
+#endif  // AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
diff --git a/aos/events/simulated-event-loop_test.cc b/aos/events/simulated_event_loop_test.cc
similarity index 90%
rename from aos/events/simulated-event-loop_test.cc
rename to aos/events/simulated_event_loop_test.cc
index e987a11..356c25b 100644
--- a/aos/events/simulated-event-loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -1,5 +1,6 @@
-#include "aos/events/simulated-event-loop.h"
-#include "aos/events/event-loop_param_test.h"
+#include "aos/events/simulated_event_loop.h"
+
+#include "aos/events/event_loop_param_test.h"
 #include "gtest/gtest.h"
 
 namespace aos {
@@ -9,6 +10,8 @@
 
 class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
  public:
+  SimulatedEventLoopTestFactory() : event_loop_factory_(configuration()) {}
+
   ::std::unique_ptr<EventLoop> Make() override {
     return event_loop_factory_.MakeEventLoop();
   }
@@ -68,7 +71,7 @@
 // Test that running for a time period with no handlers causes time to progress
 // correctly.
 TEST(SimulatedEventLoopTest, RunForNoHandlers) {
-  SimulatedEventLoopFactory simulated_event_loop_factory;
+  SimulatedEventLoopFactory simulated_event_loop_factory(nullptr);
   ::std::unique_ptr<EventLoop> event_loop =
       simulated_event_loop_factory.MakeEventLoop();
 
@@ -83,12 +86,12 @@
 // Test that running for a time with a periodic handler causes time to end
 // correctly.
 TEST(SimulatedEventLoopTest, RunForTimerHandler) {
-  SimulatedEventLoopFactory simulated_event_loop_factory;
+  SimulatedEventLoopFactory simulated_event_loop_factory(nullptr);
   ::std::unique_ptr<EventLoop> event_loop =
       simulated_event_loop_factory.MakeEventLoop();
 
   int counter = 0;
-  auto timer = event_loop->AddTimer([&counter, &event_loop]() { ++counter; });
+  auto timer = event_loop->AddTimer([&counter]() { ++counter; });
   event_loop->OnRun([&event_loop, &timer] {
     timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50),
                  chrono::milliseconds(100));
diff --git a/aos/events/test_message.fbs b/aos/events/test_message.fbs
new file mode 100644
index 0000000..1d26c2e
--- /dev/null
+++ b/aos/events/test_message.fbs
@@ -0,0 +1,7 @@
+namespace aos;
+
+table TestMessage {
+  value:int;
+}
+
+root_type TestMessage;