blob: d227a48585a9b8510fb18d964816654228a6616b [file] [log] [blame]
Parker Schuhe4a70d62017-12-27 20:10:20 -08001#include "aos/events/shm-event-loop.h"
2#include "aos/common/queue.h"
3
4#include <atomic>
5#include <chrono>
6#include <stdexcept>
7
8namespace aos {
9
10ShmEventLoop::ShmEventLoop() : thread_state_(std::make_shared<ThreadState>()) {}
11
12namespace {
13class ShmFetcher : public RawFetcher {
14 public:
15 explicit ShmFetcher(RawQueue *queue) : queue_(queue) {}
16 ~ShmFetcher() {
17 if (msg_) {
18 queue_->FreeMessage(msg_);
19 }
20 }
21
22 bool Fetch() {
23 static constexpr Options<RawQueue> kOptions =
24 RawQueue::kFromEnd | RawQueue::kNonBlock;
25 const FetchValue *msg = static_cast<const FetchValue *>(
26 queue_->ReadMessageIndex(kOptions, &index_));
27 // Only update the internal pointer if we got a new message.
28 if (msg != NULL && msg != msg_) {
29 queue_->FreeMessage(msg_);
30 msg_ = msg;
31 set_most_recent(msg_);
32 return true;
33 }
34 // The message has to get freed if we didn't use it (and
35 // RawQueue::FreeMessage is ok to call on NULL).
36 queue_->FreeMessage(msg);
37 return false;
38 }
39
40 private:
41 int index_ = 0;
42 RawQueue *queue_;
43 const FetchValue *msg_ = nullptr;
44};
45
46class ShmSender : public RawSender {
47 public:
48 explicit ShmSender(RawQueue *queue) : queue_(queue) {}
49
50 SendContext *GetContext() override {
51 return reinterpret_cast<SendContext *>(queue_->GetMessage());
52 }
53
54 void Free(SendContext *context) override { queue_->FreeMessage(context); }
55
56 bool Send(SendContext *msg) override {
57 assert(queue_ != NULL);
58 return queue_->WriteMessage(msg, RawQueue::kOverride);
59 }
60
61 private:
62 RawQueue *queue_;
63};
64} // namespace
65
66namespace internal {
67class WatcherThreadState {
68 public:
69 WatcherThreadState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
70 RawQueue *queue,
71 std::function<void(const aos::Message *message)> watcher)
72 : thread_state_(std::move(thread_state)),
73 queue_(queue),
74 watcher_(std::move(watcher)) {}
75
76 void Run() {
77 thread_state_->WaitForStart();
78
79 if (!thread_state_->is_running()) return;
80
81 int32_t index = 0;
82
83 static constexpr Options<RawQueue> kOptions =
84 RawQueue::kFromEnd | RawQueue::kNonBlock;
85 const void *msg = queue_->ReadMessageIndex(kOptions, &index);
86
87 while (true) {
88 if (msg == nullptr) {
89 msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index);
90 assert(msg != nullptr);
91 }
92
93 {
94 MutexLocker locker2(&thread_state_->mutex_);
95 if (!thread_state_->is_running()) break;
96
97 watcher_(reinterpret_cast<const Message *>(msg));
98 // watcher_ may have exited the event loop.
99 if (!thread_state_->is_running()) break;
100 }
101 queue_->FreeMessage(msg);
102 msg = nullptr;
103 }
104
105 queue_->FreeMessage(msg);
106 }
107
108 private:
109 std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
110 RawQueue *queue_;
111 std::function<void(const Message *message)> watcher_;
112};
113} // namespace internal
114
115std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
116 const std::string &path, const QueueTypeInfo &type) {
117 Take(path);
118 return std::unique_ptr<RawFetcher>(new ShmFetcher(
119 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
120}
121
122std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
123 const std::string &path, const QueueTypeInfo &type) {
124 return std::unique_ptr<RawSender>(new ShmSender(
125 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
126}
127
128void ShmEventLoop::MakeRawWatcher(
129 const std::string &path, const QueueTypeInfo &type,
130 std::function<void(const Message *message)> watcher) {
131 Take(path);
132 auto *state = new internal::WatcherThreadState(
133 thread_state_,
134 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length),
135 std::move(watcher));
136
137 std::thread thread([state] {
138 state->Run();
139 delete state;
140 });
141 thread.detach();
142}
143
144void ShmEventLoop::OnRun(std::function<void()> on_run) {
145 on_run_.push_back(std::move(on_run));
146}
147
148void ShmEventLoop::Run() {
149 set_is_running(true);
150 for (const auto &run : on_run_) run();
151 thread_state_->Run();
152}
153
154void ShmEventLoop::ThreadState::Run() {
155 MutexLocker locker(&mutex_);
156 loop_running_ = true;
157 if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
158 loop_running_cond_.Broadcast();
159 while (loop_running_) {
160 if (loop_running_cond_.Wait()) {
161 ::aos::Die("ShmEventLoop mutex lock problem.\n");
162 }
163 }
164}
165
166void ShmEventLoop::ThreadState::WaitForStart() {
167 MutexLocker locker(&mutex_);
168 while (!(loop_running_ || loop_finished_)) {
169 if (loop_running_cond_.Wait()) {
170 ::aos::Die("ShmEventLoop mutex lock problem.\n");
171 }
172 }
173}
174
175void ShmEventLoop::Exit() {
176 set_is_running(false);
177 thread_state_->Exit();
178}
179
180void ShmEventLoop::ThreadState::Exit() {
181 IPCRecursiveMutexLocker locker(&mutex_);
182 if (locker.owner_died()) ::aos::Die("Owner died");
183 loop_running_ = false;
184 loop_finished_ = true;
185 loop_running_cond_.Broadcast();
186}
187
188ShmEventLoop::~ShmEventLoop() {
189 if (is_running()) {
190 ::aos::Die("ShmEventLoop destroyed while running\n");
191 }
192}
193
194void ShmEventLoop::Take(const std::string &path) {
195 if (is_running()) {
196 ::aos::Die("Cannot add new objects while running.\n");
197 }
198 if (!taken_.emplace(path).second) {
199 ::aos::Die("%s already has a listener / watcher.", path.c_str());
200 }
201}
202
203} // namespace aos