Add simulated version of the event loop

Change-Id: Id61ae7e72d71a52c59497ba13caa6f8267abec34
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 007154c..d2108a8 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -50,3 +50,25 @@
   ],
   testonly = True,
 )
+
+cc_test(
+  name = "simulated-event-loop_test",
+  srcs = ["simulated-event-loop_test.cc"],
+  deps = [
+    "//aos/testing:googletest",
+    ":event-loop_param_test",
+    ":simulated-event-loop",
+  ],
+  testonly = True,
+)
+
+cc_library(
+  name = "simulated-event-loop",
+  hdrs = ["simulated-event-loop.h"],
+  srcs = ["simulated-event-loop.cc"],
+  deps = [
+    ":event-loop",
+    "//aos/common:queues",
+  ],
+)
+
diff --git a/aos/events/raw-event-loop.h b/aos/events/raw-event-loop.h
index 5a0df1e..9b1408d 100644
--- a/aos/events/raw-event-loop.h
+++ b/aos/events/raw-event-loop.h
@@ -83,6 +83,14 @@
     info.queue_length = T::kQueueLength;
     return info;
   }
+
+  // Necessary for the comparison of QueueTypeInfo objects in the
+  // SimulatedEventLoop.
+  bool operator<(const QueueTypeInfo &other) const {
+    if (size != other.size) return size < other.size;
+    if (hash != other.hash) return hash < other.hash;
+    return queue_length < other.queue_length;
+  }
 };
 
 // Interface for timers
diff --git a/aos/events/shm-event-loop.h b/aos/events/shm-event-loop.h
index e40ccce..3b36f40 100644
--- a/aos/events/shm-event-loop.h
+++ b/aos/events/shm-event-loop.h
@@ -12,7 +12,7 @@
 
 }  // namespace internal
 
-// Specialization of EventLoop that is build from queues running out of shared
+// Specialization of EventLoop that is built from queues running out of shared
 // memory. See more details at aos/common/queue.h
 class ShmEventLoop : public EventLoop {
  public:
diff --git a/aos/events/simulated-event-loop.cc b/aos/events/simulated-event-loop.cc
new file mode 100644
index 0000000..90b78cd
--- /dev/null
+++ b/aos/events/simulated-event-loop.cc
@@ -0,0 +1,123 @@
+#include "aos/events/simulated-event-loop.h"
+#include "aos/common/queue.h"
+
+namespace aos {
+namespace {
+class SimulatedFetcher : public RawFetcher {
+ public:
+  explicit SimulatedFetcher(SimulatedQueue *queue) : queue_(queue) {}
+  ~SimulatedFetcher() {}
+
+  bool Fetch() override {
+    if (index_ == queue_->index()) return false;
+
+    // Fetched message is newer
+    msg_ = queue_->latest_message();
+    index_ = queue_->index();
+    set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
+    return true;
+  }
+
+ private:
+  int64_t index_ = -1;
+  SimulatedQueue *queue_;
+  RefCountedBuffer msg_;
+};
+
+class SimulatedSender : public RawSender {
+ public:
+  SimulatedSender(SimulatedQueue *queue) : queue_(queue) {}
+  ~SimulatedSender() {}
+
+  SendContext *GetContext() override {
+    return reinterpret_cast<SendContext *>(
+        RefCountedBuffer(queue_->size()).release());
+  }
+
+  void Free(SendContext *context) override {
+    RefCountedBuffer(reinterpret_cast<aos::Message *>(context));
+  }
+
+  bool Send(SendContext *context) override {
+    queue_->Send(RefCountedBuffer(reinterpret_cast<aos::Message *>(context)));
+    return true;  // Maybe false instead? :)
+  }
+
+ private:
+  SimulatedQueue *queue_;
+};
+}  // namespace
+
+EventScheduler::Token EventScheduler::Schedule(
+    ::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
+  return events_list_.emplace(time, callback);
+}
+
+void EventScheduler::Deschedule(EventScheduler::Token token) {
+  events_list_.erase(token);
+}
+
+void EventScheduler::Run() {
+  is_running_ = true;
+  while (!events_list_.empty() && is_running_) {
+    auto iter = events_list_.begin();
+    now_ = iter->first;
+    ::std::function<void()> callback = ::std::move(iter->second);
+    events_list_.erase(iter);
+    callback();
+  }
+}
+
+void SimulatedEventLoop::MakeRawWatcher(
+    const std::string &path, const QueueTypeInfo &type,
+    std::function<void(const aos::Message *message)> watcher) {
+  Take(path);
+  ::std::pair<::std::string, QueueTypeInfo> key(path, type);
+  GetSimulatedQueue(key)->MakeRawWatcher(watcher);
+}
+
+std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
+    const std::string &path, const QueueTypeInfo &type) {
+  ::std::pair<::std::string, QueueTypeInfo> key(path, type);
+  return GetSimulatedQueue(key)->MakeRawSender();
+}
+
+std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
+    const std::string &path, const QueueTypeInfo &type) {
+  Take(path);
+  ::std::pair<::std::string, QueueTypeInfo> key(path, type);
+  return GetSimulatedQueue(key)->MakeRawFetcher();
+}
+
+SimulatedQueue *SimulatedEventLoop::GetSimulatedQueue(
+    const ::std::pair<::std::string, QueueTypeInfo> &type) {
+  auto it = queues_->find(type);
+  if (it == queues_->end()) {
+    it = queues_->emplace(type, SimulatedQueue(type.second, scheduler_))
+             .first;
+  }
+  return &it->second;
+}
+
+void SimulatedQueue::MakeRawWatcher(
+    std::function<void(const aos::Message *message)> watcher) {
+  watchers_.push_back(watcher);
+}
+
+std::unique_ptr<RawSender> SimulatedQueue::MakeRawSender() {
+  return std::unique_ptr<RawSender>(new SimulatedSender(this));
+}
+
+std::unique_ptr<RawFetcher> SimulatedQueue::MakeRawFetcher() {
+  return std::unique_ptr<RawFetcher>(new SimulatedFetcher(this));
+}
+
+void SimulatedEventLoop::Take(const std::string &path) {
+  if (is_running()) {
+    ::aos::Die("Cannot add new objects while running.\n");
+  }
+  if (!taken_.emplace(path).second) {
+    ::aos::Die("%s already has a listener / watcher.", path.c_str());
+  }
+}
+}  // namespace aos
diff --git a/aos/events/simulated-event-loop.h b/aos/events/simulated-event-loop.h
new file mode 100644
index 0000000..829ee8c
--- /dev/null
+++ b/aos/events/simulated-event-loop.h
@@ -0,0 +1,252 @@
+#ifndef _AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
+#define _AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
+
+#include <map>
+#include <memory>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "aos/events/event-loop.h"
+
+namespace aos {
+
+class RefCountedBuffer {
+ public:
+  RefCountedBuffer() {}
+  ~RefCountedBuffer() { clear(); }
+
+  explicit RefCountedBuffer(aos::Message *data) : data_(data) {}
+
+  explicit RefCountedBuffer(size_t size) {
+    data_ = reinterpret_cast<uint8_t *>(malloc(kRefCountSize + size)) +
+            kRefCountSize;
+    // Initialize the allocated memory with an integer
+    *GetRefCount() = 1;
+  }
+
+  RefCountedBuffer(const RefCountedBuffer &other) {
+    data_ = other.data_;
+    ++*GetRefCount();
+  }
+
+  RefCountedBuffer(RefCountedBuffer &&other) { std::swap(data_, other.data_); }
+
+  RefCountedBuffer &operator=(const RefCountedBuffer &other) {
+    if (this == &other) return *this;
+    clear();
+    data_ = other.data_;
+    ++*GetRefCount();
+    return *this;
+  }
+
+  RefCountedBuffer &operator=(RefCountedBuffer &&other) {
+    if (this == &other) return *this;
+    std::swap(data_, other.data_);
+    return *this;
+  }
+
+  aos::Message *get() const { return static_cast<aos::Message *>(data_); }
+
+  aos::Message *release() {
+    auto tmp = get();
+    data_ = nullptr;
+    return tmp;
+  }
+
+  void clear() {
+    if (data_ != nullptr) {
+      if (--*GetRefCount() == 0) {
+        // Free memory block from the start of the allocated block
+        free(GetRefCount());
+      }
+      data_ = nullptr;
+    }
+  }
+
+ private:
+  void *data_ = nullptr;
+  // Qty. memory to be allocated to the ref counter
+  static constexpr size_t kRefCountSize = sizeof(int64_t);
+
+  int64_t *GetRefCount() {
+    // Need to cast the void* to an 8 bit long object (size of void* is
+    // technically 0)
+    return reinterpret_cast<int64_t *>(static_cast<void *>(
+        reinterpret_cast<uint8_t *>(data_) - kRefCountSize));
+  }
+};
+
+class EventScheduler {
+ public:
+  using QueueType = ::std::multimap<::aos::monotonic_clock::time_point,
+                                    ::std::function<void()>>;
+  using Token = QueueType::iterator;
+
+  // Schedule an event with a callback function
+  // Returns an iterator to the event
+  Token Schedule(::aos::monotonic_clock::time_point time,
+                 ::std::function<void()> callback);
+
+  // Deschedule an event by its iterator
+  void Deschedule(Token token);
+
+  void Run();
+
+  void Exit() { is_running_ = false; }
+
+  ::aos::monotonic_clock::time_point now() { return now_; }
+
+ private:
+  ::aos::monotonic_clock::time_point now_ = ::aos::monotonic_clock::epoch();
+  QueueType events_list_;
+  bool is_running_ = false;
+};
+
+class SimulatedQueue {
+ public:
+  explicit SimulatedQueue(const QueueTypeInfo &type, EventScheduler *scheduler)
+      : type_(type), scheduler_(scheduler){};
+
+  std::unique_ptr<RawSender> MakeRawSender();
+
+  std::unique_ptr<RawFetcher> MakeRawFetcher();
+
+  void MakeRawWatcher(std::function<void(const aos::Message *message)> watcher);
+
+  void Send(RefCountedBuffer message) {
+    index_++;
+    latest_message_ = message;
+    for (auto &watcher : watchers_) {
+      scheduler_->Schedule(scheduler_->now(),
+                           [watcher, message]() { watcher(message.get()); });
+    }
+  }
+
+  const RefCountedBuffer &latest_message() { return latest_message_; }
+
+  int64_t index() { return index_; }
+
+  size_t size() { return type_.size; }
+
+ private:
+  int64_t index_ = -1;
+  QueueTypeInfo type_;
+  ::std::vector<std::function<void(const aos::Message *message)>> watchers_;
+  RefCountedBuffer latest_message_;
+  EventScheduler *scheduler_;
+};
+
+class SimulatedTimerHandler : public TimerHandler {
+ public:
+  explicit SimulatedTimerHandler(EventScheduler *scheduler,
+                                 ::std::function<void()> fn)
+      : scheduler_(scheduler), fn_(fn) {}
+  ~SimulatedTimerHandler() {}
+
+  void Setup(monotonic_clock::time_point base,
+             monotonic_clock::duration repeat_offset) override {
+    Disable();
+    auto now = scheduler_->now();
+    base_ = base;
+    repeat_offset_ = repeat_offset;
+    if (base < now) {
+      token_ = scheduler_->Schedule(now, [this]() { HandleEvent(); });
+    } else {
+      token_ = scheduler_->Schedule(base, [this]() { HandleEvent(); });
+    }
+  }
+
+  void HandleEvent() {
+    auto now = scheduler_->now();
+    if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
+      // Reschedule.
+      while (base_ <= now) base_ += repeat_offset_;
+      token_ = scheduler_->Schedule(base_, [this]() { HandleEvent(); });
+    } else {
+      token_ = EventScheduler::Token();
+    }
+    fn_();
+  }
+
+  void Disable() override {
+    if (token_ != EventScheduler::Token()) {
+      scheduler_->Deschedule(token_);
+      token_ = EventScheduler::Token();
+    }
+  }
+
+ private:
+  EventScheduler *scheduler_;
+  EventScheduler::Token token_;
+  // Function to be run on the thread
+  ::std::function<void()> fn_;
+  monotonic_clock::time_point base_;
+  monotonic_clock::duration repeat_offset_;
+};
+
+class SimulatedEventLoop : public EventLoop {
+ public:
+  explicit SimulatedEventLoop(
+      EventScheduler *scheduler,
+      ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
+          *queues) : scheduler_(scheduler), queues_(queues){};
+  ~SimulatedEventLoop() override{};
+
+  ::aos::monotonic_clock::time_point monotonic_now() override {
+    return scheduler_->now();
+  }
+
+  std::unique_ptr<RawSender> MakeRawSender(const std::string &path,
+                                           const QueueTypeInfo &type) override;
+
+  std::unique_ptr<RawFetcher> MakeRawFetcher(
+      const std::string &path, const QueueTypeInfo &type) override;
+
+  void MakeRawWatcher(
+      const std::string &path, const QueueTypeInfo &type,
+      std::function<void(const aos::Message *message)> watcher) override;
+
+  TimerHandler *AddTimer(::std::function<void()> callback) override {
+    timers_.emplace_back(
+        new SimulatedTimerHandler(scheduler_, callback));
+    return timers_.back().get();
+  }
+
+  void OnRun(std::function<void()> on_run) override {
+    scheduler_->Schedule(scheduler_->now(), on_run);
+  }
+  void Run() override {
+    set_is_running(true);
+    scheduler_->Run();
+  }
+  void Exit() override {
+    set_is_running(false);
+    scheduler_->Exit();
+  }
+
+  SimulatedQueue *GetSimulatedQueue(
+      const ::std::pair<::std::string, QueueTypeInfo> &);
+
+  void Take(const std::string &path);
+
+ private:
+  EventScheduler *scheduler_;
+  std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue> *queues_;
+  std::unordered_set<std::string> taken_;
+  std::vector<std::unique_ptr<TimerHandler>> timers_;
+};
+
+class SimulatedEventLoopFactory {
+ public:
+  std::unique_ptr<EventLoop> CreateEventLoop() {
+    return std::unique_ptr<EventLoop>(
+        new SimulatedEventLoop(&scheduler_, &queues_));
+  }
+
+ private:
+  EventScheduler scheduler_;
+  ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue> queues_;
+};
+}  // namespace aos
+#endif  //_AOS_EVENTS_TEST_EVENT_LOOP_H_
diff --git a/aos/events/simulated-event-loop_test.cc b/aos/events/simulated-event-loop_test.cc
new file mode 100644
index 0000000..7c11b4a
--- /dev/null
+++ b/aos/events/simulated-event-loop_test.cc
@@ -0,0 +1,50 @@
+#include "aos/events/simulated-event-loop.h"
+#include "aos/events/event-loop_param_test.h"
+#include "gtest/gtest.h"
+
+namespace aos {
+namespace testing {
+
+class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
+ public:
+  std::unique_ptr<EventLoop> Make() override {
+    return event_loop.CreateEventLoop();
+  }
+ private:
+   SimulatedEventLoopFactory event_loop;
+};
+
+INSTANTIATE_TEST_CASE_P(SimulatedEventLoopTest, AbstractEventLoopTest,
+                        ::testing::Values([]() {
+                          return new SimulatedEventLoopTestFactory();
+                        }));
+
+// Test that creating an event and running the scheduler runs the event.
+TEST(EventSchedulerTest, ScheduleEvent) {
+  int counter = 0;
+  EventScheduler scheduler;
+
+  scheduler.Schedule(::aos::monotonic_clock::now(),
+                      [&counter]() { counter += 1; });
+  scheduler.Run();
+  EXPECT_EQ(counter, 1);
+  auto token = scheduler.Schedule(::aos::monotonic_clock::now(),
+                                   [&counter]() { counter += 1; });
+  scheduler.Deschedule(token);
+  scheduler.Run();
+  EXPECT_EQ(counter, 1);
+}
+
+// Test that descheduling an already scheduled event doesn't run the event.
+TEST(EventSchedulerTest, DescheduleEvent) {
+  int counter = 0;
+  EventScheduler scheduler;
+
+  auto token = scheduler.Schedule(::aos::monotonic_clock::now(),
+                                   [&counter]() { counter += 1; });
+  scheduler.Deschedule(token);
+  scheduler.Run();
+  EXPECT_EQ(counter, 0);
+}
+}  // namespace testing
+}  // namespace aos