blob: f55939c127b5432cf5d1c018802a46898c388b47 [file] [log] [blame]
Parker Schuhe4a70d62017-12-27 20:10:20 -08001#include "aos/events/shm-event-loop.h"
Parker Schuhe4a70d62017-12-27 20:10:20 -08002
Neil Balch229001a2018-01-07 18:22:52 -08003#include <sys/timerfd.h>
Austin Schuh81fc9cc2019-02-02 23:25:47 -08004#include <algorithm>
Parker Schuhe4a70d62017-12-27 20:10:20 -08005#include <atomic>
6#include <chrono>
7#include <stdexcept>
8
Austin Schuh81fc9cc2019-02-02 23:25:47 -08009#include "aos/logging/logging.h"
10#include "aos/queue.h"
11
Parker Schuhe4a70d62017-12-27 20:10:20 -080012namespace aos {
13
14ShmEventLoop::ShmEventLoop() : thread_state_(std::make_shared<ThreadState>()) {}
15
16namespace {
17class ShmFetcher : public RawFetcher {
18 public:
19 explicit ShmFetcher(RawQueue *queue) : queue_(queue) {}
20 ~ShmFetcher() {
21 if (msg_) {
22 queue_->FreeMessage(msg_);
23 }
24 }
25
James Kuszmaulc79768b2019-02-18 15:08:44 -080026 bool FetchNext() override {
27 const FetchValue *msg = static_cast<const FetchValue *>(
28 queue_->ReadMessageIndex(RawQueue::kNonBlock, &index_));
29 // Only update the internal pointer if we got a new message.
30 if (msg != NULL) {
31 queue_->FreeMessage(msg_);
32 msg_ = msg;
33 set_most_recent(msg_);
34 }
35 return msg != NULL;
36 }
37
38 bool Fetch() override {
Parker Schuhe4a70d62017-12-27 20:10:20 -080039 static constexpr Options<RawQueue> kOptions =
40 RawQueue::kFromEnd | RawQueue::kNonBlock;
41 const FetchValue *msg = static_cast<const FetchValue *>(
42 queue_->ReadMessageIndex(kOptions, &index_));
43 // Only update the internal pointer if we got a new message.
44 if (msg != NULL && msg != msg_) {
45 queue_->FreeMessage(msg_);
46 msg_ = msg;
47 set_most_recent(msg_);
48 return true;
49 }
50 // The message has to get freed if we didn't use it (and
51 // RawQueue::FreeMessage is ok to call on NULL).
52 queue_->FreeMessage(msg);
53 return false;
54 }
55
56 private:
57 int index_ = 0;
58 RawQueue *queue_;
59 const FetchValue *msg_ = nullptr;
60};
61
62class ShmSender : public RawSender {
63 public:
64 explicit ShmSender(RawQueue *queue) : queue_(queue) {}
65
66 SendContext *GetContext() override {
67 return reinterpret_cast<SendContext *>(queue_->GetMessage());
68 }
69
70 void Free(SendContext *context) override { queue_->FreeMessage(context); }
71
72 bool Send(SendContext *msg) override {
73 assert(queue_ != NULL);
Austin Schuh7267c532019-05-19 19:55:53 -070074 {
75 ::aos::Message *aos_msg = reinterpret_cast<Message *>(msg);
76 // TODO(austin): This lets multiple senders reorder messages since time
77 // isn't acquired with a lock held.
78 if (aos_msg->sent_time == monotonic_clock::min_time) {
79 aos_msg->sent_time = monotonic_clock::now();
80 }
81 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080082 return queue_->WriteMessage(msg, RawQueue::kOverride);
83 }
84
Austin Schuhd681bbd2019-02-02 12:03:32 -080085 const char *name() const override { return queue_->name(); }
86
Parker Schuhe4a70d62017-12-27 20:10:20 -080087 private:
88 RawQueue *queue_;
89};
90} // namespace
91
92namespace internal {
93class WatcherThreadState {
94 public:
95 WatcherThreadState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
96 RawQueue *queue,
97 std::function<void(const aos::Message *message)> watcher)
98 : thread_state_(std::move(thread_state)),
99 queue_(queue),
Austin Schuh3578a2e2019-05-25 18:17:59 -0700100 index_(0),
101 watcher_(std::move(watcher)) {
102 static constexpr Options<RawQueue> kOptions =
103 RawQueue::kFromEnd | RawQueue::kNonBlock;
104 const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
105 if (msg) {
106 queue_->FreeMessage(msg);
107 }
108 }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800109
110 void Run() {
111 thread_state_->WaitForStart();
112
113 if (!thread_state_->is_running()) return;
114
Austin Schuh3578a2e2019-05-25 18:17:59 -0700115 const void *msg = nullptr;
Parker Schuhe4a70d62017-12-27 20:10:20 -0800116 while (true) {
Austin Schuh3578a2e2019-05-25 18:17:59 -0700117 msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_);
118 assert(msg != nullptr);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800119
120 {
Neil Balch229001a2018-01-07 18:22:52 -0800121 MutexLocker locker(&thread_state_->mutex_);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800122 if (!thread_state_->is_running()) break;
123
124 watcher_(reinterpret_cast<const Message *>(msg));
125 // watcher_ may have exited the event loop.
126 if (!thread_state_->is_running()) break;
127 }
128 queue_->FreeMessage(msg);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800129 }
130
131 queue_->FreeMessage(msg);
132 }
133
134 private:
135 std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
136 RawQueue *queue_;
Austin Schuh3578a2e2019-05-25 18:17:59 -0700137 int32_t index_;
138
Parker Schuhe4a70d62017-12-27 20:10:20 -0800139 std::function<void(const Message *message)> watcher_;
140};
Neil Balch229001a2018-01-07 18:22:52 -0800141
142class TimerHandlerState : public TimerHandler {
143 public:
144 TimerHandlerState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
145 ::std::function<void()> fn)
146 : thread_state_(std::move(thread_state)), fn_(::std::move(fn)) {
147 fd_ = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
148 PCHECK(fd_ != -1);
149 }
150
151 ~TimerHandlerState() {
152 PCHECK(close(fd_) == 0);
153 }
154
155 void Setup(monotonic_clock::time_point base,
156 monotonic_clock::duration repeat_offset) override {
157 struct itimerspec new_value;
158 new_value.it_interval = ::aos::time::to_timespec(repeat_offset);
159 new_value.it_value = ::aos::time::to_timespec(base);
160 PCHECK(timerfd_settime(fd_, TFD_TIMER_ABSTIME, &new_value, nullptr) == 0);
161 }
162
163 void Disable() override {
164 // Disarm the timer by feeding zero values
165 Setup(::aos::monotonic_clock::epoch(), ::aos::monotonic_clock::zero());
166 }
167
168 void Run() {
169 thread_state_->WaitForStart();
170
171 while (true) {
172 uint64_t buf;
173 ssize_t result = read(fd_, &buf, sizeof(buf));
174 PCHECK(result != -1);
175 CHECK_EQ(result, static_cast<int>(sizeof(buf)));
176
177 {
178 MutexLocker locker(&thread_state_->mutex_);
179 if (!thread_state_->is_running()) break;
180 fn_();
181 // fn_ may have exited the event loop.
182 if (!thread_state_->is_running()) break;
183 }
184 }
185 }
186
187 private:
188 std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
189
190 // File descriptor for the timer
191 int fd_;
192
193 // Function to be run on the thread
194 ::std::function<void()> fn_;
195};
Parker Schuhe4a70d62017-12-27 20:10:20 -0800196} // namespace internal
197
198std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
199 const std::string &path, const QueueTypeInfo &type) {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800200 return std::unique_ptr<RawFetcher>(new ShmFetcher(
201 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
202}
203
204std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
205 const std::string &path, const QueueTypeInfo &type) {
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800206 Take(path);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800207 return std::unique_ptr<RawSender>(new ShmSender(
208 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
209}
210
211void ShmEventLoop::MakeRawWatcher(
212 const std::string &path, const QueueTypeInfo &type,
213 std::function<void(const Message *message)> watcher) {
214 Take(path);
215 auto *state = new internal::WatcherThreadState(
216 thread_state_,
217 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length),
218 std::move(watcher));
219
220 std::thread thread([state] {
221 state->Run();
222 delete state;
223 });
224 thread.detach();
225}
226
Neil Balch229001a2018-01-07 18:22:52 -0800227TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
228 internal::TimerHandlerState *timer =
229 new internal::TimerHandlerState(thread_state_, ::std::move(callback));
230
231 ::std::thread t([timer] {
232 timer->Run();
233 delete timer;
234 });
235 t.detach();
236
237 return timer;
238}
239
Parker Schuhe4a70d62017-12-27 20:10:20 -0800240void ShmEventLoop::OnRun(std::function<void()> on_run) {
241 on_run_.push_back(std::move(on_run));
242}
243
244void ShmEventLoop::Run() {
245 set_is_running(true);
246 for (const auto &run : on_run_) run();
Austin Schuha1654ed2019-01-27 17:24:54 -0800247 // TODO(austin): epoll event loop in main thread (if needed), and async safe
248 // quit handler.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800249 thread_state_->Run();
250}
251
252void ShmEventLoop::ThreadState::Run() {
253 MutexLocker locker(&mutex_);
254 loop_running_ = true;
255 if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
256 loop_running_cond_.Broadcast();
257 while (loop_running_) {
258 if (loop_running_cond_.Wait()) {
259 ::aos::Die("ShmEventLoop mutex lock problem.\n");
260 }
261 }
262}
263
264void ShmEventLoop::ThreadState::WaitForStart() {
265 MutexLocker locker(&mutex_);
266 while (!(loop_running_ || loop_finished_)) {
267 if (loop_running_cond_.Wait()) {
268 ::aos::Die("ShmEventLoop mutex lock problem.\n");
269 }
270 }
271}
272
273void ShmEventLoop::Exit() {
274 set_is_running(false);
275 thread_state_->Exit();
276}
277
278void ShmEventLoop::ThreadState::Exit() {
279 IPCRecursiveMutexLocker locker(&mutex_);
280 if (locker.owner_died()) ::aos::Die("Owner died");
281 loop_running_ = false;
282 loop_finished_ = true;
283 loop_running_cond_.Broadcast();
284}
285
286ShmEventLoop::~ShmEventLoop() {
287 if (is_running()) {
288 ::aos::Die("ShmEventLoop destroyed while running\n");
289 }
290}
291
292void ShmEventLoop::Take(const std::string &path) {
293 if (is_running()) {
294 ::aos::Die("Cannot add new objects while running.\n");
295 }
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800296
297 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
298 if (prior != taken_.end()) {
299 ::aos::Die("%s is already being used.", path.c_str());
300 } else {
301 taken_.emplace_back(path);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800302 }
303}
304
305} // namespace aos