Prototype event-loop.h for queue dependency injection.
Change-Id: I1af3f71a5b0cca741641f872f55dab10474ee064
diff --git a/aos/events/shm-event-loop.cc b/aos/events/shm-event-loop.cc
new file mode 100644
index 0000000..d227a48
--- /dev/null
+++ b/aos/events/shm-event-loop.cc
@@ -0,0 +1,203 @@
+#include "aos/events/shm-event-loop.h"
+#include "aos/common/queue.h"
+
+#include <atomic>
+#include <chrono>
+#include <stdexcept>
+
+namespace aos {
+
+ShmEventLoop::ShmEventLoop() : thread_state_(std::make_shared<ThreadState>()) {}
+
+namespace {
+class ShmFetcher : public RawFetcher {
+ public:
+ explicit ShmFetcher(RawQueue *queue) : queue_(queue) {}
+ ~ShmFetcher() {
+ if (msg_) {
+ queue_->FreeMessage(msg_);
+ }
+ }
+
+ bool Fetch() {
+ static constexpr Options<RawQueue> kOptions =
+ RawQueue::kFromEnd | RawQueue::kNonBlock;
+ const FetchValue *msg = static_cast<const FetchValue *>(
+ queue_->ReadMessageIndex(kOptions, &index_));
+ // Only update the internal pointer if we got a new message.
+ if (msg != NULL && msg != msg_) {
+ queue_->FreeMessage(msg_);
+ msg_ = msg;
+ set_most_recent(msg_);
+ return true;
+ }
+ // The message has to get freed if we didn't use it (and
+ // RawQueue::FreeMessage is ok to call on NULL).
+ queue_->FreeMessage(msg);
+ return false;
+ }
+
+ private:
+ int index_ = 0;
+ RawQueue *queue_;
+ const FetchValue *msg_ = nullptr;
+};
+
+class ShmSender : public RawSender {
+ public:
+ explicit ShmSender(RawQueue *queue) : queue_(queue) {}
+
+ SendContext *GetContext() override {
+ return reinterpret_cast<SendContext *>(queue_->GetMessage());
+ }
+
+ void Free(SendContext *context) override { queue_->FreeMessage(context); }
+
+ bool Send(SendContext *msg) override {
+ assert(queue_ != NULL);
+ return queue_->WriteMessage(msg, RawQueue::kOverride);
+ }
+
+ private:
+ RawQueue *queue_;
+};
+} // namespace
+
+namespace internal {
+class WatcherThreadState {
+ public:
+ WatcherThreadState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
+ RawQueue *queue,
+ std::function<void(const aos::Message *message)> watcher)
+ : thread_state_(std::move(thread_state)),
+ queue_(queue),
+ watcher_(std::move(watcher)) {}
+
+ void Run() {
+ thread_state_->WaitForStart();
+
+ if (!thread_state_->is_running()) return;
+
+ int32_t index = 0;
+
+ static constexpr Options<RawQueue> kOptions =
+ RawQueue::kFromEnd | RawQueue::kNonBlock;
+ const void *msg = queue_->ReadMessageIndex(kOptions, &index);
+
+ while (true) {
+ if (msg == nullptr) {
+ msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index);
+ assert(msg != nullptr);
+ }
+
+ {
+ MutexLocker locker2(&thread_state_->mutex_);
+ if (!thread_state_->is_running()) break;
+
+ watcher_(reinterpret_cast<const Message *>(msg));
+ // watcher_ may have exited the event loop.
+ if (!thread_state_->is_running()) break;
+ }
+ queue_->FreeMessage(msg);
+ msg = nullptr;
+ }
+
+ queue_->FreeMessage(msg);
+ }
+
+ private:
+ std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
+ RawQueue *queue_;
+ std::function<void(const Message *message)> watcher_;
+};
+} // namespace internal
+
+std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
+ const std::string &path, const QueueTypeInfo &type) {
+ Take(path);
+ return std::unique_ptr<RawFetcher>(new ShmFetcher(
+ RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
+}
+
+std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
+ const std::string &path, const QueueTypeInfo &type) {
+ return std::unique_ptr<RawSender>(new ShmSender(
+ RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
+}
+
+void ShmEventLoop::MakeRawWatcher(
+ const std::string &path, const QueueTypeInfo &type,
+ std::function<void(const Message *message)> watcher) {
+ Take(path);
+ auto *state = new internal::WatcherThreadState(
+ thread_state_,
+ RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length),
+ std::move(watcher));
+
+ std::thread thread([state] {
+ state->Run();
+ delete state;
+ });
+ thread.detach();
+}
+
+void ShmEventLoop::OnRun(std::function<void()> on_run) {
+ on_run_.push_back(std::move(on_run));
+}
+
+void ShmEventLoop::Run() {
+ set_is_running(true);
+ for (const auto &run : on_run_) run();
+ thread_state_->Run();
+}
+
+void ShmEventLoop::ThreadState::Run() {
+ MutexLocker locker(&mutex_);
+ loop_running_ = true;
+ if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
+ loop_running_cond_.Broadcast();
+ while (loop_running_) {
+ if (loop_running_cond_.Wait()) {
+ ::aos::Die("ShmEventLoop mutex lock problem.\n");
+ }
+ }
+}
+
+void ShmEventLoop::ThreadState::WaitForStart() {
+ MutexLocker locker(&mutex_);
+ while (!(loop_running_ || loop_finished_)) {
+ if (loop_running_cond_.Wait()) {
+ ::aos::Die("ShmEventLoop mutex lock problem.\n");
+ }
+ }
+}
+
+void ShmEventLoop::Exit() {
+ set_is_running(false);
+ thread_state_->Exit();
+}
+
+void ShmEventLoop::ThreadState::Exit() {
+ IPCRecursiveMutexLocker locker(&mutex_);
+ if (locker.owner_died()) ::aos::Die("Owner died");
+ loop_running_ = false;
+ loop_finished_ = true;
+ loop_running_cond_.Broadcast();
+}
+
+ShmEventLoop::~ShmEventLoop() {
+ if (is_running()) {
+ ::aos::Die("ShmEventLoop destroyed while running\n");
+ }
+}
+
+void ShmEventLoop::Take(const std::string &path) {
+ if (is_running()) {
+ ::aos::Die("Cannot add new objects while running.\n");
+ }
+ if (!taken_.emplace(path).second) {
+ ::aos::Die("%s already has a listener / watcher.", path.c_str());
+ }
+}
+
+} // namespace aos