blob: 11d931261a5b5cabd1599a7ad06544b103019729 [file] [log] [blame]
Parker Schuhe4a70d62017-12-27 20:10:20 -08001#include "aos/events/shm-event-loop.h"
Parker Schuhe4a70d62017-12-27 20:10:20 -08002
Neil Balch229001a2018-01-07 18:22:52 -08003#include <sys/timerfd.h>
Austin Schuh81fc9cc2019-02-02 23:25:47 -08004#include <algorithm>
Parker Schuhe4a70d62017-12-27 20:10:20 -08005#include <atomic>
6#include <chrono>
7#include <stdexcept>
8
Austin Schuh81fc9cc2019-02-02 23:25:47 -08009#include "aos/logging/logging.h"
10#include "aos/queue.h"
11
Parker Schuhe4a70d62017-12-27 20:10:20 -080012namespace aos {
13
14ShmEventLoop::ShmEventLoop() : thread_state_(std::make_shared<ThreadState>()) {}
15
16namespace {
17class ShmFetcher : public RawFetcher {
18 public:
Austin Schuhbbce72d2019-05-26 15:11:46 -070019 explicit ShmFetcher(RawQueue *queue) : queue_(queue) {
20 // Move index_ to point to the end of the queue as it is at construction
21 // time. Also grab the oldest message but don't expose it to the user yet.
22 static constexpr Options<RawQueue> kOptions =
23 RawQueue::kFromEnd | RawQueue::kNonBlock;
24 msg_ = static_cast<const FetchValue *>(
25 queue_->ReadMessageIndex(kOptions, &index_));
26 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080027 ~ShmFetcher() {
28 if (msg_) {
29 queue_->FreeMessage(msg_);
30 }
31 }
32
James Kuszmaulc79768b2019-02-18 15:08:44 -080033 bool FetchNext() override {
34 const FetchValue *msg = static_cast<const FetchValue *>(
35 queue_->ReadMessageIndex(RawQueue::kNonBlock, &index_));
36 // Only update the internal pointer if we got a new message.
Austin Schuhbbce72d2019-05-26 15:11:46 -070037 if (msg != nullptr) {
James Kuszmaulc79768b2019-02-18 15:08:44 -080038 queue_->FreeMessage(msg_);
39 msg_ = msg;
40 set_most_recent(msg_);
41 }
Austin Schuhbbce72d2019-05-26 15:11:46 -070042 return msg != nullptr;
James Kuszmaulc79768b2019-02-18 15:08:44 -080043 }
44
45 bool Fetch() override {
Parker Schuhe4a70d62017-12-27 20:10:20 -080046 static constexpr Options<RawQueue> kOptions =
47 RawQueue::kFromEnd | RawQueue::kNonBlock;
48 const FetchValue *msg = static_cast<const FetchValue *>(
49 queue_->ReadMessageIndex(kOptions, &index_));
50 // Only update the internal pointer if we got a new message.
Austin Schuhbbce72d2019-05-26 15:11:46 -070051 if (msg != nullptr && msg != msg_) {
Parker Schuhe4a70d62017-12-27 20:10:20 -080052 queue_->FreeMessage(msg_);
53 msg_ = msg;
54 set_most_recent(msg_);
55 return true;
Austin Schuhbbce72d2019-05-26 15:11:46 -070056 } else {
57 // The message has to get freed if we didn't use it (and
58 // RawQueue::FreeMessage is ok to call on nullptr).
59 queue_->FreeMessage(msg);
60
61 // We have a message from construction time. Give it to the user now.
62 if (msg_ != nullptr && most_recent() != msg_) {
63 set_most_recent(msg_);
64 return true;
65 } else {
66 return false;
67 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080068 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080069 }
70
71 private:
72 int index_ = 0;
73 RawQueue *queue_;
74 const FetchValue *msg_ = nullptr;
75};
76
77class ShmSender : public RawSender {
78 public:
79 explicit ShmSender(RawQueue *queue) : queue_(queue) {}
80
James Kuszmaulcd1db352019-05-26 16:42:29 -070081 aos::Message *GetMessage() override {
82 return reinterpret_cast<aos::Message *>(queue_->GetMessage());
Parker Schuhe4a70d62017-12-27 20:10:20 -080083 }
84
James Kuszmaulcd1db352019-05-26 16:42:29 -070085 void Free(aos::Message *msg) override { queue_->FreeMessage(msg); }
Parker Schuhe4a70d62017-12-27 20:10:20 -080086
James Kuszmaulcd1db352019-05-26 16:42:29 -070087 bool Send(aos::Message *msg) override {
Austin Schuhbbce72d2019-05-26 15:11:46 -070088 assert(queue_ != nullptr);
Austin Schuh7267c532019-05-19 19:55:53 -070089 {
Austin Schuh7267c532019-05-19 19:55:53 -070090 // TODO(austin): This lets multiple senders reorder messages since time
91 // isn't acquired with a lock held.
James Kuszmaulcd1db352019-05-26 16:42:29 -070092 if (msg->sent_time == monotonic_clock::min_time) {
93 msg->sent_time = monotonic_clock::now();
Austin Schuh7267c532019-05-19 19:55:53 -070094 }
95 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080096 return queue_->WriteMessage(msg, RawQueue::kOverride);
97 }
98
Austin Schuhd681bbd2019-02-02 12:03:32 -080099 const char *name() const override { return queue_->name(); }
100
Parker Schuhe4a70d62017-12-27 20:10:20 -0800101 private:
102 RawQueue *queue_;
103};
104} // namespace
105
106namespace internal {
107class WatcherThreadState {
108 public:
109 WatcherThreadState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
110 RawQueue *queue,
111 std::function<void(const aos::Message *message)> watcher)
112 : thread_state_(std::move(thread_state)),
113 queue_(queue),
Austin Schuh3578a2e2019-05-25 18:17:59 -0700114 index_(0),
115 watcher_(std::move(watcher)) {
116 static constexpr Options<RawQueue> kOptions =
117 RawQueue::kFromEnd | RawQueue::kNonBlock;
118 const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
119 if (msg) {
120 queue_->FreeMessage(msg);
121 }
122 }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800123
124 void Run() {
125 thread_state_->WaitForStart();
126
127 if (!thread_state_->is_running()) return;
128
Austin Schuh3578a2e2019-05-25 18:17:59 -0700129 const void *msg = nullptr;
Parker Schuhe4a70d62017-12-27 20:10:20 -0800130 while (true) {
Austin Schuh3578a2e2019-05-25 18:17:59 -0700131 msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_);
132 assert(msg != nullptr);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800133
134 {
Neil Balch229001a2018-01-07 18:22:52 -0800135 MutexLocker locker(&thread_state_->mutex_);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800136 if (!thread_state_->is_running()) break;
137
138 watcher_(reinterpret_cast<const Message *>(msg));
139 // watcher_ may have exited the event loop.
140 if (!thread_state_->is_running()) break;
141 }
142 queue_->FreeMessage(msg);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800143 }
144
145 queue_->FreeMessage(msg);
146 }
147
148 private:
149 std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
150 RawQueue *queue_;
Austin Schuh3578a2e2019-05-25 18:17:59 -0700151 int32_t index_;
152
Parker Schuhe4a70d62017-12-27 20:10:20 -0800153 std::function<void(const Message *message)> watcher_;
154};
Neil Balch229001a2018-01-07 18:22:52 -0800155
156class TimerHandlerState : public TimerHandler {
157 public:
158 TimerHandlerState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
159 ::std::function<void()> fn)
160 : thread_state_(std::move(thread_state)), fn_(::std::move(fn)) {
161 fd_ = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
162 PCHECK(fd_ != -1);
163 }
164
165 ~TimerHandlerState() {
166 PCHECK(close(fd_) == 0);
167 }
168
169 void Setup(monotonic_clock::time_point base,
170 monotonic_clock::duration repeat_offset) override {
171 struct itimerspec new_value;
172 new_value.it_interval = ::aos::time::to_timespec(repeat_offset);
173 new_value.it_value = ::aos::time::to_timespec(base);
174 PCHECK(timerfd_settime(fd_, TFD_TIMER_ABSTIME, &new_value, nullptr) == 0);
175 }
176
177 void Disable() override {
178 // Disarm the timer by feeding zero values
179 Setup(::aos::monotonic_clock::epoch(), ::aos::monotonic_clock::zero());
180 }
181
182 void Run() {
183 thread_state_->WaitForStart();
184
185 while (true) {
186 uint64_t buf;
187 ssize_t result = read(fd_, &buf, sizeof(buf));
188 PCHECK(result != -1);
189 CHECK_EQ(result, static_cast<int>(sizeof(buf)));
190
191 {
192 MutexLocker locker(&thread_state_->mutex_);
193 if (!thread_state_->is_running()) break;
194 fn_();
195 // fn_ may have exited the event loop.
196 if (!thread_state_->is_running()) break;
197 }
198 }
199 }
200
201 private:
202 std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
203
204 // File descriptor for the timer
205 int fd_;
206
207 // Function to be run on the thread
208 ::std::function<void()> fn_;
209};
Parker Schuhe4a70d62017-12-27 20:10:20 -0800210} // namespace internal
211
212std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
213 const std::string &path, const QueueTypeInfo &type) {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800214 return std::unique_ptr<RawFetcher>(new ShmFetcher(
215 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
216}
217
218std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
219 const std::string &path, const QueueTypeInfo &type) {
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800220 Take(path);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800221 return std::unique_ptr<RawSender>(new ShmSender(
222 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
223}
224
225void ShmEventLoop::MakeRawWatcher(
226 const std::string &path, const QueueTypeInfo &type,
227 std::function<void(const Message *message)> watcher) {
228 Take(path);
229 auto *state = new internal::WatcherThreadState(
230 thread_state_,
231 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length),
232 std::move(watcher));
233
234 std::thread thread([state] {
235 state->Run();
236 delete state;
237 });
238 thread.detach();
239}
240
Neil Balch229001a2018-01-07 18:22:52 -0800241TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
242 internal::TimerHandlerState *timer =
243 new internal::TimerHandlerState(thread_state_, ::std::move(callback));
244
245 ::std::thread t([timer] {
246 timer->Run();
247 delete timer;
248 });
249 t.detach();
250
251 return timer;
252}
253
Parker Schuhe4a70d62017-12-27 20:10:20 -0800254void ShmEventLoop::OnRun(std::function<void()> on_run) {
255 on_run_.push_back(std::move(on_run));
256}
257
258void ShmEventLoop::Run() {
259 set_is_running(true);
260 for (const auto &run : on_run_) run();
Austin Schuha1654ed2019-01-27 17:24:54 -0800261 // TODO(austin): epoll event loop in main thread (if needed), and async safe
262 // quit handler.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800263 thread_state_->Run();
264}
265
266void ShmEventLoop::ThreadState::Run() {
267 MutexLocker locker(&mutex_);
268 loop_running_ = true;
269 if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
270 loop_running_cond_.Broadcast();
271 while (loop_running_) {
272 if (loop_running_cond_.Wait()) {
273 ::aos::Die("ShmEventLoop mutex lock problem.\n");
274 }
275 }
276}
277
278void ShmEventLoop::ThreadState::WaitForStart() {
279 MutexLocker locker(&mutex_);
280 while (!(loop_running_ || loop_finished_)) {
281 if (loop_running_cond_.Wait()) {
282 ::aos::Die("ShmEventLoop mutex lock problem.\n");
283 }
284 }
285}
286
287void ShmEventLoop::Exit() {
288 set_is_running(false);
289 thread_state_->Exit();
290}
291
292void ShmEventLoop::ThreadState::Exit() {
293 IPCRecursiveMutexLocker locker(&mutex_);
294 if (locker.owner_died()) ::aos::Die("Owner died");
295 loop_running_ = false;
296 loop_finished_ = true;
297 loop_running_cond_.Broadcast();
298}
299
300ShmEventLoop::~ShmEventLoop() {
301 if (is_running()) {
302 ::aos::Die("ShmEventLoop destroyed while running\n");
303 }
304}
305
306void ShmEventLoop::Take(const std::string &path) {
307 if (is_running()) {
308 ::aos::Die("Cannot add new objects while running.\n");
309 }
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800310
311 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
312 if (prior != taken_.end()) {
313 ::aos::Die("%s is already being used.", path.c_str());
314 } else {
315 taken_.emplace_back(path);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800316 }
317}
318
319} // namespace aos