Add simulated version of the event loop
Change-Id: Id61ae7e72d71a52c59497ba13caa6f8267abec34
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 007154c..d2108a8 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -50,3 +50,25 @@
],
testonly = True,
)
+
+cc_test(
+ name = "simulated-event-loop_test",
+ srcs = ["simulated-event-loop_test.cc"],
+ deps = [
+ "//aos/testing:googletest",
+ ":event-loop_param_test",
+ ":simulated-event-loop",
+ ],
+ testonly = True,
+)
+
+cc_library(
+ name = "simulated-event-loop",
+ hdrs = ["simulated-event-loop.h"],
+ srcs = ["simulated-event-loop.cc"],
+ deps = [
+ ":event-loop",
+ "//aos/common:queues",
+ ],
+)
+
diff --git a/aos/events/raw-event-loop.h b/aos/events/raw-event-loop.h
index 5a0df1e..9b1408d 100644
--- a/aos/events/raw-event-loop.h
+++ b/aos/events/raw-event-loop.h
@@ -83,6 +83,14 @@
info.queue_length = T::kQueueLength;
return info;
}
+
+ // Necessary for the comparison of QueueTypeInfo objects in the
+ // SimulatedEventLoop.
+ bool operator<(const QueueTypeInfo &other) const {
+ if (size != other.size) return size < other.size;
+ if (hash != other.hash) return hash < other.hash;
+ return queue_length < other.queue_length;
+ }
};
// Interface for timers
diff --git a/aos/events/shm-event-loop.h b/aos/events/shm-event-loop.h
index e40ccce..3b36f40 100644
--- a/aos/events/shm-event-loop.h
+++ b/aos/events/shm-event-loop.h
@@ -12,7 +12,7 @@
} // namespace internal
-// Specialization of EventLoop that is build from queues running out of shared
+// Specialization of EventLoop that is built from queues running out of shared
// memory. See more details at aos/common/queue.h
class ShmEventLoop : public EventLoop {
public:
diff --git a/aos/events/simulated-event-loop.cc b/aos/events/simulated-event-loop.cc
new file mode 100644
index 0000000..90b78cd
--- /dev/null
+++ b/aos/events/simulated-event-loop.cc
@@ -0,0 +1,123 @@
+#include "aos/events/simulated-event-loop.h"
+#include "aos/common/queue.h"
+
+namespace aos {
+namespace {
+class SimulatedFetcher : public RawFetcher {
+ public:
+ explicit SimulatedFetcher(SimulatedQueue *queue) : queue_(queue) {}
+ ~SimulatedFetcher() {}
+
+ bool Fetch() override {
+ if (index_ == queue_->index()) return false;
+
+ // Fetched message is newer
+ msg_ = queue_->latest_message();
+ index_ = queue_->index();
+ set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
+ return true;
+ }
+
+ private:
+ int64_t index_ = -1;
+ SimulatedQueue *queue_;
+ RefCountedBuffer msg_;
+};
+
+class SimulatedSender : public RawSender {
+ public:
+ SimulatedSender(SimulatedQueue *queue) : queue_(queue) {}
+ ~SimulatedSender() {}
+
+ SendContext *GetContext() override {
+ return reinterpret_cast<SendContext *>(
+ RefCountedBuffer(queue_->size()).release());
+ }
+
+ void Free(SendContext *context) override {
+ RefCountedBuffer(reinterpret_cast<aos::Message *>(context));
+ }
+
+ bool Send(SendContext *context) override {
+ queue_->Send(RefCountedBuffer(reinterpret_cast<aos::Message *>(context)));
+ return true; // Maybe false instead? :)
+ }
+
+ private:
+ SimulatedQueue *queue_;
+};
+} // namespace
+
+EventScheduler::Token EventScheduler::Schedule(
+ ::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
+ return events_list_.emplace(time, callback);
+}
+
+void EventScheduler::Deschedule(EventScheduler::Token token) {
+ events_list_.erase(token);
+}
+
+void EventScheduler::Run() {
+ is_running_ = true;
+ while (!events_list_.empty() && is_running_) {
+ auto iter = events_list_.begin();
+ now_ = iter->first;
+ ::std::function<void()> callback = ::std::move(iter->second);
+ events_list_.erase(iter);
+ callback();
+ }
+}
+
+void SimulatedEventLoop::MakeRawWatcher(
+ const std::string &path, const QueueTypeInfo &type,
+ std::function<void(const aos::Message *message)> watcher) {
+ Take(path);
+ ::std::pair<::std::string, QueueTypeInfo> key(path, type);
+ GetSimulatedQueue(key)->MakeRawWatcher(watcher);
+}
+
+std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
+ const std::string &path, const QueueTypeInfo &type) {
+ ::std::pair<::std::string, QueueTypeInfo> key(path, type);
+ return GetSimulatedQueue(key)->MakeRawSender();
+}
+
+std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
+ const std::string &path, const QueueTypeInfo &type) {
+ Take(path);
+ ::std::pair<::std::string, QueueTypeInfo> key(path, type);
+ return GetSimulatedQueue(key)->MakeRawFetcher();
+}
+
+SimulatedQueue *SimulatedEventLoop::GetSimulatedQueue(
+ const ::std::pair<::std::string, QueueTypeInfo> &type) {
+ auto it = queues_->find(type);
+ if (it == queues_->end()) {
+ it = queues_->emplace(type, SimulatedQueue(type.second, scheduler_))
+ .first;
+ }
+ return &it->second;
+}
+
+void SimulatedQueue::MakeRawWatcher(
+ std::function<void(const aos::Message *message)> watcher) {
+ watchers_.push_back(watcher);
+}
+
+std::unique_ptr<RawSender> SimulatedQueue::MakeRawSender() {
+ return std::unique_ptr<RawSender>(new SimulatedSender(this));
+}
+
+std::unique_ptr<RawFetcher> SimulatedQueue::MakeRawFetcher() {
+ return std::unique_ptr<RawFetcher>(new SimulatedFetcher(this));
+}
+
+void SimulatedEventLoop::Take(const std::string &path) {
+ if (is_running()) {
+ ::aos::Die("Cannot add new objects while running.\n");
+ }
+ if (!taken_.emplace(path).second) {
+ ::aos::Die("%s already has a listener / watcher.", path.c_str());
+ }
+}
+} // namespace aos
diff --git a/aos/events/simulated-event-loop.h b/aos/events/simulated-event-loop.h
new file mode 100644
index 0000000..829ee8c
--- /dev/null
+++ b/aos/events/simulated-event-loop.h
@@ -0,0 +1,252 @@
+#ifndef _AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
+#define _AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
+
+#include <map>
+#include <memory>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "aos/events/event-loop.h"
+
+namespace aos {
+
+class RefCountedBuffer {
+ public:
+ RefCountedBuffer() {}
+ ~RefCountedBuffer() { clear(); }
+
+ explicit RefCountedBuffer(aos::Message *data) : data_(data) {}
+
+ explicit RefCountedBuffer(size_t size) {
+ data_ = reinterpret_cast<uint8_t *>(malloc(kRefCountSize + size)) +
+ kRefCountSize;
+ // Initialize the allocated memory with an integer
+ *GetRefCount() = 1;
+ }
+
+ RefCountedBuffer(const RefCountedBuffer &other) {
+ data_ = other.data_;
+ ++*GetRefCount();
+ }
+
+ RefCountedBuffer(RefCountedBuffer &&other) { std::swap(data_, other.data_); }
+
+ RefCountedBuffer &operator=(const RefCountedBuffer &other) {
+ if (this == &other) return *this;
+ clear();
+ data_ = other.data_;
+ ++*GetRefCount();
+ return *this;
+ }
+
+ RefCountedBuffer &operator=(RefCountedBuffer &&other) {
+ if (this == &other) return *this;
+ std::swap(data_, other.data_);
+ return *this;
+ }
+
+ aos::Message *get() const { return static_cast<aos::Message *>(data_); }
+
+ aos::Message *release() {
+ auto tmp = get();
+ data_ = nullptr;
+ return tmp;
+ }
+
+ void clear() {
+ if (data_ != nullptr) {
+ if (--*GetRefCount() == 0) {
+ // Free memory block from the start of the allocated block
+ free(GetRefCount());
+ }
+ data_ = nullptr;
+ }
+ }
+
+ private:
+ void *data_ = nullptr;
+ // Qty. memory to be allocated to the ref counter
+ static constexpr size_t kRefCountSize = sizeof(int64_t);
+
+ int64_t *GetRefCount() {
+ // Need to cast the void* to an 8 bit long object (size of void* is
+ // technically 0)
+ return reinterpret_cast<int64_t *>(static_cast<void *>(
+ reinterpret_cast<uint8_t *>(data_) - kRefCountSize));
+ }
+};
+
+class EventScheduler {
+ public:
+ using QueueType = ::std::multimap<::aos::monotonic_clock::time_point,
+ ::std::function<void()>>;
+ using Token = QueueType::iterator;
+
+ // Schedule an event with a callback function
+ // Returns an iterator to the event
+ Token Schedule(::aos::monotonic_clock::time_point time,
+ ::std::function<void()> callback);
+
+ // Deschedule an event by its iterator
+ void Deschedule(Token token);
+
+ void Run();
+
+ void Exit() { is_running_ = false; }
+
+ ::aos::monotonic_clock::time_point now() { return now_; }
+
+ private:
+ ::aos::monotonic_clock::time_point now_ = ::aos::monotonic_clock::epoch();
+ QueueType events_list_;
+ bool is_running_ = false;
+};
+
+class SimulatedQueue {
+ public:
+ explicit SimulatedQueue(const QueueTypeInfo &type, EventScheduler *scheduler)
+ : type_(type), scheduler_(scheduler){};
+
+ std::unique_ptr<RawSender> MakeRawSender();
+
+ std::unique_ptr<RawFetcher> MakeRawFetcher();
+
+ void MakeRawWatcher(std::function<void(const aos::Message *message)> watcher);
+
+ void Send(RefCountedBuffer message) {
+ index_++;
+ latest_message_ = message;
+ for (auto &watcher : watchers_) {
+ scheduler_->Schedule(scheduler_->now(),
+ [watcher, message]() { watcher(message.get()); });
+ }
+ }
+
+ const RefCountedBuffer &latest_message() { return latest_message_; }
+
+ int64_t index() { return index_; }
+
+ size_t size() { return type_.size; }
+
+ private:
+ int64_t index_ = -1;
+ QueueTypeInfo type_;
+ ::std::vector<std::function<void(const aos::Message *message)>> watchers_;
+ RefCountedBuffer latest_message_;
+ EventScheduler *scheduler_;
+};
+
+class SimulatedTimerHandler : public TimerHandler {
+ public:
+ explicit SimulatedTimerHandler(EventScheduler *scheduler,
+ ::std::function<void()> fn)
+ : scheduler_(scheduler), fn_(fn) {}
+ ~SimulatedTimerHandler() {}
+
+ void Setup(monotonic_clock::time_point base,
+ monotonic_clock::duration repeat_offset) override {
+ Disable();
+ auto now = scheduler_->now();
+ base_ = base;
+ repeat_offset_ = repeat_offset;
+ if (base < now) {
+ token_ = scheduler_->Schedule(now, [this]() { HandleEvent(); });
+ } else {
+ token_ = scheduler_->Schedule(base, [this]() { HandleEvent(); });
+ }
+ }
+
+ void HandleEvent() {
+ auto now = scheduler_->now();
+ if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
+ // Reschedule.
+ while (base_ <= now) base_ += repeat_offset_;
+ token_ = scheduler_->Schedule(base_, [this]() { HandleEvent(); });
+ } else {
+ token_ = EventScheduler::Token();
+ }
+ fn_();
+ }
+
+ void Disable() override {
+ if (token_ != EventScheduler::Token()) {
+ scheduler_->Deschedule(token_);
+ token_ = EventScheduler::Token();
+ }
+ }
+
+ private:
+ EventScheduler *scheduler_;
+ EventScheduler::Token token_;
+ // Function to be run on the thread
+ ::std::function<void()> fn_;
+ monotonic_clock::time_point base_;
+ monotonic_clock::duration repeat_offset_;
+};
+
+class SimulatedEventLoop : public EventLoop {
+ public:
+ explicit SimulatedEventLoop(
+ EventScheduler *scheduler,
+ ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
+ *queues) : scheduler_(scheduler), queues_(queues){};
+ ~SimulatedEventLoop() override{};
+
+ ::aos::monotonic_clock::time_point monotonic_now() override {
+ return scheduler_->now();
+ }
+
+ std::unique_ptr<RawSender> MakeRawSender(const std::string &path,
+ const QueueTypeInfo &type) override;
+
+ std::unique_ptr<RawFetcher> MakeRawFetcher(
+ const std::string &path, const QueueTypeInfo &type) override;
+
+ void MakeRawWatcher(
+ const std::string &path, const QueueTypeInfo &type,
+ std::function<void(const aos::Message *message)> watcher) override;
+
+ TimerHandler *AddTimer(::std::function<void()> callback) override {
+ timers_.emplace_back(
+ new SimulatedTimerHandler(scheduler_, callback));
+ return timers_.back().get();
+ }
+
+ void OnRun(std::function<void()> on_run) override {
+ scheduler_->Schedule(scheduler_->now(), on_run);
+ }
+ void Run() override {
+ set_is_running(true);
+ scheduler_->Run();
+ }
+ void Exit() override {
+ set_is_running(false);
+ scheduler_->Exit();
+ }
+
+ SimulatedQueue *GetSimulatedQueue(
+ const ::std::pair<::std::string, QueueTypeInfo> &);
+
+ void Take(const std::string &path);
+
+ private:
+ EventScheduler *scheduler_;
+ std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue> *queues_;
+ std::unordered_set<std::string> taken_;
+ std::vector<std::unique_ptr<TimerHandler>> timers_;
+};
+
+class SimulatedEventLoopFactory {
+ public:
+ std::unique_ptr<EventLoop> CreateEventLoop() {
+ return std::unique_ptr<EventLoop>(
+ new SimulatedEventLoop(&scheduler_, &queues_));
+ }
+
+ private:
+ EventScheduler scheduler_;
+ ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue> queues_;
+};
+} // namespace aos
+#endif //_AOS_EVENTS_TEST_EVENT_LOOP_H_
diff --git a/aos/events/simulated-event-loop_test.cc b/aos/events/simulated-event-loop_test.cc
new file mode 100644
index 0000000..7c11b4a
--- /dev/null
+++ b/aos/events/simulated-event-loop_test.cc
@@ -0,0 +1,50 @@
+#include "aos/events/simulated-event-loop.h"
+#include "aos/events/event-loop_param_test.h"
+#include "gtest/gtest.h"
+
+namespace aos {
+namespace testing {
+
+class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
+ public:
+ std::unique_ptr<EventLoop> Make() override {
+ return event_loop.CreateEventLoop();
+ }
+ private:
+ SimulatedEventLoopFactory event_loop;
+};
+
+INSTANTIATE_TEST_CASE_P(SimulatedEventLoopTest, AbstractEventLoopTest,
+ ::testing::Values([]() {
+ return new SimulatedEventLoopTestFactory();
+ }));
+
+// Test that creating an event and running the scheduler runs the event.
+TEST(EventSchedulerTest, ScheduleEvent) {
+ int counter = 0;
+ EventScheduler scheduler;
+
+ scheduler.Schedule(::aos::monotonic_clock::now(),
+ [&counter]() { counter += 1; });
+ scheduler.Run();
+ EXPECT_EQ(counter, 1);
+ auto token = scheduler.Schedule(::aos::monotonic_clock::now(),
+ [&counter]() { counter += 1; });
+ scheduler.Deschedule(token);
+ scheduler.Run();
+ EXPECT_EQ(counter, 1);
+}
+
+// Test that descheduling an already scheduled event doesn't run the event.
+TEST(EventSchedulerTest, DescheduleEvent) {
+ int counter = 0;
+ EventScheduler scheduler;
+
+ auto token = scheduler.Schedule(::aos::monotonic_clock::now(),
+ [&counter]() { counter += 1; });
+ scheduler.Deschedule(token);
+ scheduler.Run();
+ EXPECT_EQ(counter, 0);
+}
+} // namespace testing
+} // namespace aos