Add simulated version of the event loop
Change-Id: Id61ae7e72d71a52c59497ba13caa6f8267abec34
diff --git a/aos/events/simulated-event-loop.cc b/aos/events/simulated-event-loop.cc
new file mode 100644
index 0000000..90b78cd
--- /dev/null
+++ b/aos/events/simulated-event-loop.cc
@@ -0,0 +1,123 @@
+#include "aos/events/simulated-event-loop.h"
+#include "aos/common/queue.h"
+
+namespace aos {
+namespace {
+class SimulatedFetcher : public RawFetcher {
+ public:
+ explicit SimulatedFetcher(SimulatedQueue *queue) : queue_(queue) {}
+ ~SimulatedFetcher() {}
+
+ bool Fetch() override {
+ if (index_ == queue_->index()) return false;
+
+ // Fetched message is newer
+ msg_ = queue_->latest_message();
+ index_ = queue_->index();
+ set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
+ return true;
+ }
+
+ private:
+ int64_t index_ = -1;
+ SimulatedQueue *queue_;
+ RefCountedBuffer msg_;
+};
+
+class SimulatedSender : public RawSender {
+ public:
+ SimulatedSender(SimulatedQueue *queue) : queue_(queue) {}
+ ~SimulatedSender() {}
+
+ SendContext *GetContext() override {
+ return reinterpret_cast<SendContext *>(
+ RefCountedBuffer(queue_->size()).release());
+ }
+
+ void Free(SendContext *context) override {
+ RefCountedBuffer(reinterpret_cast<aos::Message *>(context));
+ }
+
+ bool Send(SendContext *context) override {
+ queue_->Send(RefCountedBuffer(reinterpret_cast<aos::Message *>(context)));
+ return true; // Maybe false instead? :)
+ }
+
+ private:
+ SimulatedQueue *queue_;
+};
+} // namespace
+
+EventScheduler::Token EventScheduler::Schedule(
+ ::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
+ return events_list_.emplace(time, callback);
+}
+
+void EventScheduler::Deschedule(EventScheduler::Token token) {
+ events_list_.erase(token);
+}
+
+void EventScheduler::Run() {
+ is_running_ = true;
+ while (!events_list_.empty() && is_running_) {
+ auto iter = events_list_.begin();
+ now_ = iter->first;
+ ::std::function<void()> callback = ::std::move(iter->second);
+ events_list_.erase(iter);
+ callback();
+ }
+}
+
+void SimulatedEventLoop::MakeRawWatcher(
+ const std::string &path, const QueueTypeInfo &type,
+ std::function<void(const aos::Message *message)> watcher) {
+ Take(path);
+ ::std::pair<::std::string, QueueTypeInfo> key(path, type);
+ GetSimulatedQueue(key)->MakeRawWatcher(watcher);
+}
+
+std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
+ const std::string &path, const QueueTypeInfo &type) {
+ ::std::pair<::std::string, QueueTypeInfo> key(path, type);
+ return GetSimulatedQueue(key)->MakeRawSender();
+}
+
+std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
+ const std::string &path, const QueueTypeInfo &type) {
+ Take(path);
+ ::std::pair<::std::string, QueueTypeInfo> key(path, type);
+ return GetSimulatedQueue(key)->MakeRawFetcher();
+}
+
+SimulatedQueue *SimulatedEventLoop::GetSimulatedQueue(
+ const ::std::pair<::std::string, QueueTypeInfo> &type) {
+ auto it = queues_->find(type);
+ if (it == queues_->end()) {
+ it = queues_->emplace(type, SimulatedQueue(type.second, scheduler_))
+ .first;
+ }
+ return &it->second;
+}
+
+void SimulatedQueue::MakeRawWatcher(
+ std::function<void(const aos::Message *message)> watcher) {
+ watchers_.push_back(watcher);
+}
+
+std::unique_ptr<RawSender> SimulatedQueue::MakeRawSender() {
+ return std::unique_ptr<RawSender>(new SimulatedSender(this));
+}
+
+std::unique_ptr<RawFetcher> SimulatedQueue::MakeRawFetcher() {
+ return std::unique_ptr<RawFetcher>(new SimulatedFetcher(this));
+}
+
+void SimulatedEventLoop::Take(const std::string &path) {
+ if (is_running()) {
+ ::aos::Die("Cannot add new objects while running.\n");
+ }
+ if (!taken_.emplace(path).second) {
+ ::aos::Die("%s already has a listener / watcher.", path.c_str());
+ }
+}
+} // namespace aos