Add simulated version of the event loop
Change-Id: Id61ae7e72d71a52c59497ba13caa6f8267abec34
diff --git a/aos/events/simulated-event-loop.h b/aos/events/simulated-event-loop.h
new file mode 100644
index 0000000..829ee8c
--- /dev/null
+++ b/aos/events/simulated-event-loop.h
@@ -0,0 +1,252 @@
+#ifndef _AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
+#define _AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
+
+#include <map>
+#include <memory>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "aos/events/event-loop.h"
+
+namespace aos {
+
+class RefCountedBuffer {
+ public:
+ RefCountedBuffer() {}
+ ~RefCountedBuffer() { clear(); }
+
+ explicit RefCountedBuffer(aos::Message *data) : data_(data) {}
+
+ explicit RefCountedBuffer(size_t size) {
+ data_ = reinterpret_cast<uint8_t *>(malloc(kRefCountSize + size)) +
+ kRefCountSize;
+ // Initialize the allocated memory with an integer
+ *GetRefCount() = 1;
+ }
+
+ RefCountedBuffer(const RefCountedBuffer &other) {
+ data_ = other.data_;
+ ++*GetRefCount();
+ }
+
+ RefCountedBuffer(RefCountedBuffer &&other) { std::swap(data_, other.data_); }
+
+ RefCountedBuffer &operator=(const RefCountedBuffer &other) {
+ if (this == &other) return *this;
+ clear();
+ data_ = other.data_;
+ ++*GetRefCount();
+ return *this;
+ }
+
+ RefCountedBuffer &operator=(RefCountedBuffer &&other) {
+ if (this == &other) return *this;
+ std::swap(data_, other.data_);
+ return *this;
+ }
+
+ aos::Message *get() const { return static_cast<aos::Message *>(data_); }
+
+ aos::Message *release() {
+ auto tmp = get();
+ data_ = nullptr;
+ return tmp;
+ }
+
+ void clear() {
+ if (data_ != nullptr) {
+ if (--*GetRefCount() == 0) {
+ // Free memory block from the start of the allocated block
+ free(GetRefCount());
+ }
+ data_ = nullptr;
+ }
+ }
+
+ private:
+ void *data_ = nullptr;
+ // Qty. memory to be allocated to the ref counter
+ static constexpr size_t kRefCountSize = sizeof(int64_t);
+
+ int64_t *GetRefCount() {
+ // Need to cast the void* to an 8 bit long object (size of void* is
+ // technically 0)
+ return reinterpret_cast<int64_t *>(static_cast<void *>(
+ reinterpret_cast<uint8_t *>(data_) - kRefCountSize));
+ }
+};
+
+class EventScheduler {
+ public:
+ using QueueType = ::std::multimap<::aos::monotonic_clock::time_point,
+ ::std::function<void()>>;
+ using Token = QueueType::iterator;
+
+ // Schedule an event with a callback function
+ // Returns an iterator to the event
+ Token Schedule(::aos::monotonic_clock::time_point time,
+ ::std::function<void()> callback);
+
+ // Deschedule an event by its iterator
+ void Deschedule(Token token);
+
+ void Run();
+
+ void Exit() { is_running_ = false; }
+
+ ::aos::monotonic_clock::time_point now() { return now_; }
+
+ private:
+ ::aos::monotonic_clock::time_point now_ = ::aos::monotonic_clock::epoch();
+ QueueType events_list_;
+ bool is_running_ = false;
+};
+
+class SimulatedQueue {
+ public:
+ explicit SimulatedQueue(const QueueTypeInfo &type, EventScheduler *scheduler)
+ : type_(type), scheduler_(scheduler){};
+
+ std::unique_ptr<RawSender> MakeRawSender();
+
+ std::unique_ptr<RawFetcher> MakeRawFetcher();
+
+ void MakeRawWatcher(std::function<void(const aos::Message *message)> watcher);
+
+ void Send(RefCountedBuffer message) {
+ index_++;
+ latest_message_ = message;
+ for (auto &watcher : watchers_) {
+ scheduler_->Schedule(scheduler_->now(),
+ [watcher, message]() { watcher(message.get()); });
+ }
+ }
+
+ const RefCountedBuffer &latest_message() { return latest_message_; }
+
+ int64_t index() { return index_; }
+
+ size_t size() { return type_.size; }
+
+ private:
+ int64_t index_ = -1;
+ QueueTypeInfo type_;
+ ::std::vector<std::function<void(const aos::Message *message)>> watchers_;
+ RefCountedBuffer latest_message_;
+ EventScheduler *scheduler_;
+};
+
+class SimulatedTimerHandler : public TimerHandler {
+ public:
+ explicit SimulatedTimerHandler(EventScheduler *scheduler,
+ ::std::function<void()> fn)
+ : scheduler_(scheduler), fn_(fn) {}
+ ~SimulatedTimerHandler() {}
+
+ void Setup(monotonic_clock::time_point base,
+ monotonic_clock::duration repeat_offset) override {
+ Disable();
+ auto now = scheduler_->now();
+ base_ = base;
+ repeat_offset_ = repeat_offset;
+ if (base < now) {
+ token_ = scheduler_->Schedule(now, [this]() { HandleEvent(); });
+ } else {
+ token_ = scheduler_->Schedule(base, [this]() { HandleEvent(); });
+ }
+ }
+
+ void HandleEvent() {
+ auto now = scheduler_->now();
+ if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
+ // Reschedule.
+ while (base_ <= now) base_ += repeat_offset_;
+ token_ = scheduler_->Schedule(base_, [this]() { HandleEvent(); });
+ } else {
+ token_ = EventScheduler::Token();
+ }
+ fn_();
+ }
+
+ void Disable() override {
+ if (token_ != EventScheduler::Token()) {
+ scheduler_->Deschedule(token_);
+ token_ = EventScheduler::Token();
+ }
+ }
+
+ private:
+ EventScheduler *scheduler_;
+ EventScheduler::Token token_;
+ // Function to be run on the thread
+ ::std::function<void()> fn_;
+ monotonic_clock::time_point base_;
+ monotonic_clock::duration repeat_offset_;
+};
+
+class SimulatedEventLoop : public EventLoop {
+ public:
+ explicit SimulatedEventLoop(
+ EventScheduler *scheduler,
+ ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
+ *queues) : scheduler_(scheduler), queues_(queues){};
+ ~SimulatedEventLoop() override{};
+
+ ::aos::monotonic_clock::time_point monotonic_now() override {
+ return scheduler_->now();
+ }
+
+ std::unique_ptr<RawSender> MakeRawSender(const std::string &path,
+ const QueueTypeInfo &type) override;
+
+ std::unique_ptr<RawFetcher> MakeRawFetcher(
+ const std::string &path, const QueueTypeInfo &type) override;
+
+ void MakeRawWatcher(
+ const std::string &path, const QueueTypeInfo &type,
+ std::function<void(const aos::Message *message)> watcher) override;
+
+ TimerHandler *AddTimer(::std::function<void()> callback) override {
+ timers_.emplace_back(
+ new SimulatedTimerHandler(scheduler_, callback));
+ return timers_.back().get();
+ }
+
+ void OnRun(std::function<void()> on_run) override {
+ scheduler_->Schedule(scheduler_->now(), on_run);
+ }
+ void Run() override {
+ set_is_running(true);
+ scheduler_->Run();
+ }
+ void Exit() override {
+ set_is_running(false);
+ scheduler_->Exit();
+ }
+
+ SimulatedQueue *GetSimulatedQueue(
+ const ::std::pair<::std::string, QueueTypeInfo> &);
+
+ void Take(const std::string &path);
+
+ private:
+ EventScheduler *scheduler_;
+ std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue> *queues_;
+ std::unordered_set<std::string> taken_;
+ std::vector<std::unique_ptr<TimerHandler>> timers_;
+};
+
+class SimulatedEventLoopFactory {
+ public:
+ std::unique_ptr<EventLoop> CreateEventLoop() {
+ return std::unique_ptr<EventLoop>(
+ new SimulatedEventLoop(&scheduler_, &queues_));
+ }
+
+ private:
+ EventScheduler scheduler_;
+ ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue> queues_;
+};
+} // namespace aos
+#endif //_AOS_EVENTS_TEST_EVENT_LOOP_H_