blob: ee71b4c0e135336bc1ce9e0d985ef3b9b2d7cd48 [file] [log] [blame]
Parker Schuhe4a70d62017-12-27 20:10:20 -08001#include "aos/events/shm-event-loop.h"
Parker Schuhe4a70d62017-12-27 20:10:20 -08002
Neil Balch229001a2018-01-07 18:22:52 -08003#include <sys/timerfd.h>
Austin Schuh81fc9cc2019-02-02 23:25:47 -08004#include <algorithm>
Parker Schuhe4a70d62017-12-27 20:10:20 -08005#include <atomic>
6#include <chrono>
7#include <stdexcept>
8
Austin Schuh3115a202019-05-27 21:02:14 -07009#include "aos/init.h"
Austin Schuh81fc9cc2019-02-02 23:25:47 -080010#include "aos/logging/logging.h"
11#include "aos/queue.h"
12
Parker Schuhe4a70d62017-12-27 20:10:20 -080013namespace aos {
14
15ShmEventLoop::ShmEventLoop() : thread_state_(std::make_shared<ThreadState>()) {}
16
17namespace {
18class ShmFetcher : public RawFetcher {
19 public:
Austin Schuhbbce72d2019-05-26 15:11:46 -070020 explicit ShmFetcher(RawQueue *queue) : queue_(queue) {
21 // Move index_ to point to the end of the queue as it is at construction
22 // time. Also grab the oldest message but don't expose it to the user yet.
23 static constexpr Options<RawQueue> kOptions =
24 RawQueue::kFromEnd | RawQueue::kNonBlock;
25 msg_ = static_cast<const FetchValue *>(
26 queue_->ReadMessageIndex(kOptions, &index_));
27 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080028 ~ShmFetcher() {
29 if (msg_) {
30 queue_->FreeMessage(msg_);
31 }
32 }
33
James Kuszmaulc79768b2019-02-18 15:08:44 -080034 bool FetchNext() override {
35 const FetchValue *msg = static_cast<const FetchValue *>(
36 queue_->ReadMessageIndex(RawQueue::kNonBlock, &index_));
37 // Only update the internal pointer if we got a new message.
Austin Schuhbbce72d2019-05-26 15:11:46 -070038 if (msg != nullptr) {
James Kuszmaulc79768b2019-02-18 15:08:44 -080039 queue_->FreeMessage(msg_);
40 msg_ = msg;
41 set_most_recent(msg_);
42 }
Austin Schuhbbce72d2019-05-26 15:11:46 -070043 return msg != nullptr;
James Kuszmaulc79768b2019-02-18 15:08:44 -080044 }
45
46 bool Fetch() override {
Parker Schuhe4a70d62017-12-27 20:10:20 -080047 static constexpr Options<RawQueue> kOptions =
48 RawQueue::kFromEnd | RawQueue::kNonBlock;
49 const FetchValue *msg = static_cast<const FetchValue *>(
50 queue_->ReadMessageIndex(kOptions, &index_));
51 // Only update the internal pointer if we got a new message.
Austin Schuhbbce72d2019-05-26 15:11:46 -070052 if (msg != nullptr && msg != msg_) {
Parker Schuhe4a70d62017-12-27 20:10:20 -080053 queue_->FreeMessage(msg_);
54 msg_ = msg;
55 set_most_recent(msg_);
56 return true;
Austin Schuhbbce72d2019-05-26 15:11:46 -070057 } else {
58 // The message has to get freed if we didn't use it (and
59 // RawQueue::FreeMessage is ok to call on nullptr).
60 queue_->FreeMessage(msg);
61
62 // We have a message from construction time. Give it to the user now.
63 if (msg_ != nullptr && most_recent() != msg_) {
64 set_most_recent(msg_);
65 return true;
66 } else {
67 return false;
68 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080069 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080070 }
71
72 private:
73 int index_ = 0;
74 RawQueue *queue_;
75 const FetchValue *msg_ = nullptr;
76};
77
78class ShmSender : public RawSender {
79 public:
80 explicit ShmSender(RawQueue *queue) : queue_(queue) {}
81
James Kuszmaulcd1db352019-05-26 16:42:29 -070082 aos::Message *GetMessage() override {
83 return reinterpret_cast<aos::Message *>(queue_->GetMessage());
Parker Schuhe4a70d62017-12-27 20:10:20 -080084 }
85
James Kuszmaulcd1db352019-05-26 16:42:29 -070086 void Free(aos::Message *msg) override { queue_->FreeMessage(msg); }
Parker Schuhe4a70d62017-12-27 20:10:20 -080087
James Kuszmaulcd1db352019-05-26 16:42:29 -070088 bool Send(aos::Message *msg) override {
Austin Schuhbbce72d2019-05-26 15:11:46 -070089 assert(queue_ != nullptr);
Austin Schuh7267c532019-05-19 19:55:53 -070090 {
Austin Schuh7267c532019-05-19 19:55:53 -070091 // TODO(austin): This lets multiple senders reorder messages since time
92 // isn't acquired with a lock held.
James Kuszmaulcd1db352019-05-26 16:42:29 -070093 if (msg->sent_time == monotonic_clock::min_time) {
94 msg->sent_time = monotonic_clock::now();
Austin Schuh7267c532019-05-19 19:55:53 -070095 }
96 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080097 return queue_->WriteMessage(msg, RawQueue::kOverride);
98 }
99
Austin Schuhd681bbd2019-02-02 12:03:32 -0800100 const char *name() const override { return queue_->name(); }
101
Parker Schuhe4a70d62017-12-27 20:10:20 -0800102 private:
103 RawQueue *queue_;
104};
105} // namespace
106
107namespace internal {
108class WatcherThreadState {
109 public:
110 WatcherThreadState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
111 RawQueue *queue,
112 std::function<void(const aos::Message *message)> watcher)
113 : thread_state_(std::move(thread_state)),
114 queue_(queue),
Austin Schuh3578a2e2019-05-25 18:17:59 -0700115 index_(0),
116 watcher_(std::move(watcher)) {
117 static constexpr Options<RawQueue> kOptions =
118 RawQueue::kFromEnd | RawQueue::kNonBlock;
119 const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
120 if (msg) {
121 queue_->FreeMessage(msg);
122 }
123 }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800124
125 void Run() {
Austin Schuh3115a202019-05-27 21:02:14 -0700126 thread_state_->MaybeSetCurrentThreadRealtimePriority();
Parker Schuhe4a70d62017-12-27 20:10:20 -0800127 thread_state_->WaitForStart();
128
Austin Schuh3115a202019-05-27 21:02:14 -0700129 if (!thread_state_->is_running()) {
130 ::aos::UnsetCurrentThreadRealtimePriority();
131 return;
132 }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800133
Austin Schuh3578a2e2019-05-25 18:17:59 -0700134 const void *msg = nullptr;
Parker Schuhe4a70d62017-12-27 20:10:20 -0800135 while (true) {
Austin Schuh3578a2e2019-05-25 18:17:59 -0700136 msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_);
137 assert(msg != nullptr);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800138
139 {
Neil Balch229001a2018-01-07 18:22:52 -0800140 MutexLocker locker(&thread_state_->mutex_);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800141 if (!thread_state_->is_running()) break;
142
143 watcher_(reinterpret_cast<const Message *>(msg));
144 // watcher_ may have exited the event loop.
145 if (!thread_state_->is_running()) break;
146 }
147 queue_->FreeMessage(msg);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800148 }
149
150 queue_->FreeMessage(msg);
Austin Schuh3115a202019-05-27 21:02:14 -0700151 ::aos::UnsetCurrentThreadRealtimePriority();
Parker Schuhe4a70d62017-12-27 20:10:20 -0800152 }
153
154 private:
155 std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
156 RawQueue *queue_;
Austin Schuh3578a2e2019-05-25 18:17:59 -0700157 int32_t index_;
158
Parker Schuhe4a70d62017-12-27 20:10:20 -0800159 std::function<void(const Message *message)> watcher_;
160};
Neil Balch229001a2018-01-07 18:22:52 -0800161
162class TimerHandlerState : public TimerHandler {
163 public:
164 TimerHandlerState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
165 ::std::function<void()> fn)
166 : thread_state_(std::move(thread_state)), fn_(::std::move(fn)) {
167 fd_ = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
168 PCHECK(fd_ != -1);
169 }
170
171 ~TimerHandlerState() {
172 PCHECK(close(fd_) == 0);
173 }
174
175 void Setup(monotonic_clock::time_point base,
176 monotonic_clock::duration repeat_offset) override {
177 struct itimerspec new_value;
178 new_value.it_interval = ::aos::time::to_timespec(repeat_offset);
179 new_value.it_value = ::aos::time::to_timespec(base);
180 PCHECK(timerfd_settime(fd_, TFD_TIMER_ABSTIME, &new_value, nullptr) == 0);
181 }
182
183 void Disable() override {
184 // Disarm the timer by feeding zero values
185 Setup(::aos::monotonic_clock::epoch(), ::aos::monotonic_clock::zero());
186 }
187
188 void Run() {
Austin Schuh3115a202019-05-27 21:02:14 -0700189 thread_state_->MaybeSetCurrentThreadRealtimePriority();
Neil Balch229001a2018-01-07 18:22:52 -0800190 thread_state_->WaitForStart();
191
192 while (true) {
193 uint64_t buf;
194 ssize_t result = read(fd_, &buf, sizeof(buf));
195 PCHECK(result != -1);
196 CHECK_EQ(result, static_cast<int>(sizeof(buf)));
197
198 {
199 MutexLocker locker(&thread_state_->mutex_);
200 if (!thread_state_->is_running()) break;
201 fn_();
202 // fn_ may have exited the event loop.
203 if (!thread_state_->is_running()) break;
204 }
205 }
Austin Schuh3115a202019-05-27 21:02:14 -0700206 ::aos::UnsetCurrentThreadRealtimePriority();
Neil Balch229001a2018-01-07 18:22:52 -0800207 }
208
209 private:
210 std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
211
212 // File descriptor for the timer
213 int fd_;
214
215 // Function to be run on the thread
216 ::std::function<void()> fn_;
217};
Parker Schuhe4a70d62017-12-27 20:10:20 -0800218} // namespace internal
219
220std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
221 const std::string &path, const QueueTypeInfo &type) {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800222 return std::unique_ptr<RawFetcher>(new ShmFetcher(
223 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
224}
225
226std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
227 const std::string &path, const QueueTypeInfo &type) {
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800228 Take(path);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800229 return std::unique_ptr<RawSender>(new ShmSender(
230 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
231}
232
233void ShmEventLoop::MakeRawWatcher(
234 const std::string &path, const QueueTypeInfo &type,
235 std::function<void(const Message *message)> watcher) {
236 Take(path);
237 auto *state = new internal::WatcherThreadState(
238 thread_state_,
239 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length),
240 std::move(watcher));
241
242 std::thread thread([state] {
243 state->Run();
244 delete state;
245 });
246 thread.detach();
247}
248
Neil Balch229001a2018-01-07 18:22:52 -0800249TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
250 internal::TimerHandlerState *timer =
251 new internal::TimerHandlerState(thread_state_, ::std::move(callback));
252
253 ::std::thread t([timer] {
254 timer->Run();
255 delete timer;
256 });
257 t.detach();
258
259 return timer;
260}
261
Parker Schuhe4a70d62017-12-27 20:10:20 -0800262void ShmEventLoop::OnRun(std::function<void()> on_run) {
263 on_run_.push_back(std::move(on_run));
264}
265
266void ShmEventLoop::Run() {
Austin Schuh3115a202019-05-27 21:02:14 -0700267 thread_state_->MaybeSetCurrentThreadRealtimePriority();
Parker Schuhe4a70d62017-12-27 20:10:20 -0800268 set_is_running(true);
269 for (const auto &run : on_run_) run();
Austin Schuha1654ed2019-01-27 17:24:54 -0800270 // TODO(austin): epoll event loop in main thread (if needed), and async safe
271 // quit handler.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800272 thread_state_->Run();
Austin Schuh3115a202019-05-27 21:02:14 -0700273 ::aos::UnsetCurrentThreadRealtimePriority();
Parker Schuhe4a70d62017-12-27 20:10:20 -0800274}
275
276void ShmEventLoop::ThreadState::Run() {
277 MutexLocker locker(&mutex_);
278 loop_running_ = true;
279 if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
280 loop_running_cond_.Broadcast();
281 while (loop_running_) {
282 if (loop_running_cond_.Wait()) {
283 ::aos::Die("ShmEventLoop mutex lock problem.\n");
284 }
285 }
286}
287
Austin Schuh3115a202019-05-27 21:02:14 -0700288void ShmEventLoop::ThreadState::MaybeSetCurrentThreadRealtimePriority() {
289 if (priority_ != -1) {
290 ::aos::SetCurrentThreadRealtimePriority(priority_);
291 }
292}
293
Parker Schuhe4a70d62017-12-27 20:10:20 -0800294void ShmEventLoop::ThreadState::WaitForStart() {
295 MutexLocker locker(&mutex_);
296 while (!(loop_running_ || loop_finished_)) {
297 if (loop_running_cond_.Wait()) {
298 ::aos::Die("ShmEventLoop mutex lock problem.\n");
299 }
300 }
301}
302
303void ShmEventLoop::Exit() {
304 set_is_running(false);
305 thread_state_->Exit();
306}
307
308void ShmEventLoop::ThreadState::Exit() {
309 IPCRecursiveMutexLocker locker(&mutex_);
310 if (locker.owner_died()) ::aos::Die("Owner died");
311 loop_running_ = false;
312 loop_finished_ = true;
313 loop_running_cond_.Broadcast();
314}
315
316ShmEventLoop::~ShmEventLoop() {
317 if (is_running()) {
318 ::aos::Die("ShmEventLoop destroyed while running\n");
319 }
320}
321
322void ShmEventLoop::Take(const std::string &path) {
323 if (is_running()) {
324 ::aos::Die("Cannot add new objects while running.\n");
325 }
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800326
327 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
328 if (prior != taken_.end()) {
329 ::aos::Die("%s is already being used.", path.c_str());
330 } else {
331 taken_.emplace_back(path);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800332 }
333}
334
335} // namespace aos