blob: 8c0f3a28f9936468d6afb22841c5b9c472697e44 [file] [log] [blame]
Parker Schuhe4a70d62017-12-27 20:10:20 -08001#include "aos/events/shm-event-loop.h"
Parker Schuhe4a70d62017-12-27 20:10:20 -08002
Neil Balch229001a2018-01-07 18:22:52 -08003#include <sys/timerfd.h>
Austin Schuh81fc9cc2019-02-02 23:25:47 -08004#include <algorithm>
Parker Schuhe4a70d62017-12-27 20:10:20 -08005#include <atomic>
6#include <chrono>
7#include <stdexcept>
8
Austin Schuh6b6dfa52019-06-12 20:16:20 -07009#include "aos/events/epoll.h"
Austin Schuh3115a202019-05-27 21:02:14 -070010#include "aos/init.h"
Austin Schuh81fc9cc2019-02-02 23:25:47 -080011#include "aos/logging/logging.h"
12#include "aos/queue.h"
Austin Schuh52d325c2019-06-23 18:59:06 -070013#include "aos/util/phased_loop.h"
Austin Schuh81fc9cc2019-02-02 23:25:47 -080014
Parker Schuhe4a70d62017-12-27 20:10:20 -080015namespace aos {
16
Austin Schuh6b6dfa52019-06-12 20:16:20 -070017ShmEventLoop::ShmEventLoop() {}
Parker Schuhe4a70d62017-12-27 20:10:20 -080018
19namespace {
Austin Schuh6b6dfa52019-06-12 20:16:20 -070020
21namespace chrono = ::std::chrono;
22
Parker Schuhe4a70d62017-12-27 20:10:20 -080023class ShmFetcher : public RawFetcher {
24 public:
Austin Schuhbbce72d2019-05-26 15:11:46 -070025 explicit ShmFetcher(RawQueue *queue) : queue_(queue) {
26 // Move index_ to point to the end of the queue as it is at construction
27 // time. Also grab the oldest message but don't expose it to the user yet.
28 static constexpr Options<RawQueue> kOptions =
29 RawQueue::kFromEnd | RawQueue::kNonBlock;
30 msg_ = static_cast<const FetchValue *>(
31 queue_->ReadMessageIndex(kOptions, &index_));
32 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080033 ~ShmFetcher() {
34 if (msg_) {
35 queue_->FreeMessage(msg_);
36 }
37 }
38
James Kuszmaulc79768b2019-02-18 15:08:44 -080039 bool FetchNext() override {
40 const FetchValue *msg = static_cast<const FetchValue *>(
41 queue_->ReadMessageIndex(RawQueue::kNonBlock, &index_));
42 // Only update the internal pointer if we got a new message.
Austin Schuhbbce72d2019-05-26 15:11:46 -070043 if (msg != nullptr) {
James Kuszmaulc79768b2019-02-18 15:08:44 -080044 queue_->FreeMessage(msg_);
45 msg_ = msg;
46 set_most_recent(msg_);
47 }
Austin Schuhbbce72d2019-05-26 15:11:46 -070048 return msg != nullptr;
James Kuszmaulc79768b2019-02-18 15:08:44 -080049 }
50
51 bool Fetch() override {
Parker Schuhe4a70d62017-12-27 20:10:20 -080052 static constexpr Options<RawQueue> kOptions =
53 RawQueue::kFromEnd | RawQueue::kNonBlock;
54 const FetchValue *msg = static_cast<const FetchValue *>(
55 queue_->ReadMessageIndex(kOptions, &index_));
56 // Only update the internal pointer if we got a new message.
Austin Schuhbbce72d2019-05-26 15:11:46 -070057 if (msg != nullptr && msg != msg_) {
Parker Schuhe4a70d62017-12-27 20:10:20 -080058 queue_->FreeMessage(msg_);
59 msg_ = msg;
60 set_most_recent(msg_);
61 return true;
Austin Schuhbbce72d2019-05-26 15:11:46 -070062 } else {
63 // The message has to get freed if we didn't use it (and
64 // RawQueue::FreeMessage is ok to call on nullptr).
65 queue_->FreeMessage(msg);
66
67 // We have a message from construction time. Give it to the user now.
68 if (msg_ != nullptr && most_recent() != msg_) {
69 set_most_recent(msg_);
70 return true;
71 } else {
72 return false;
73 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080074 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080075 }
76
77 private:
78 int index_ = 0;
79 RawQueue *queue_;
80 const FetchValue *msg_ = nullptr;
81};
82
83class ShmSender : public RawSender {
84 public:
85 explicit ShmSender(RawQueue *queue) : queue_(queue) {}
86
Austin Schuh6b6dfa52019-06-12 20:16:20 -070087 ::aos::Message *GetMessage() override {
88 return reinterpret_cast<::aos::Message *>(queue_->GetMessage());
Parker Schuhe4a70d62017-12-27 20:10:20 -080089 }
90
Austin Schuh6b6dfa52019-06-12 20:16:20 -070091 void Free(::aos::Message *msg) override { queue_->FreeMessage(msg); }
Parker Schuhe4a70d62017-12-27 20:10:20 -080092
Austin Schuh6b6dfa52019-06-12 20:16:20 -070093 bool Send(::aos::Message *msg) override {
Austin Schuhbbce72d2019-05-26 15:11:46 -070094 assert(queue_ != nullptr);
Austin Schuh7267c532019-05-19 19:55:53 -070095 {
Austin Schuh7267c532019-05-19 19:55:53 -070096 // TODO(austin): This lets multiple senders reorder messages since time
97 // isn't acquired with a lock held.
James Kuszmaulcd1db352019-05-26 16:42:29 -070098 if (msg->sent_time == monotonic_clock::min_time) {
99 msg->sent_time = monotonic_clock::now();
Austin Schuh7267c532019-05-19 19:55:53 -0700100 }
101 }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800102 return queue_->WriteMessage(msg, RawQueue::kOverride);
103 }
104
Austin Schuhd681bbd2019-02-02 12:03:32 -0800105 const char *name() const override { return queue_->name(); }
106
Parker Schuhe4a70d62017-12-27 20:10:20 -0800107 private:
108 RawQueue *queue_;
109};
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700110
Parker Schuhe4a70d62017-12-27 20:10:20 -0800111} // namespace
112
113namespace internal {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700114
115// Class to manage the state for a Watcher.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800116class WatcherThreadState {
117 public:
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700118 WatcherThreadState(
119 ShmEventLoop::ThreadState *thread_state, RawQueue *queue,
120 ::std::function<void(const ::aos::Message *message)> watcher)
121 : thread_state_(thread_state),
Parker Schuhe4a70d62017-12-27 20:10:20 -0800122 queue_(queue),
Austin Schuh3578a2e2019-05-25 18:17:59 -0700123 index_(0),
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700124 watcher_(::std::move(watcher)) {}
125
126 ~WatcherThreadState() {
127 // Only kill the thread if it is running.
128 if (running_) {
129 // TODO(austin): CHECK that we aren't RT here.
130
131 // Try joining. If we fail, we weren't asleep on the condition in the
132 // queue. So hit it again and again until that's true.
133 struct timespec end_time;
134 PCHECK(clock_gettime(CLOCK_REALTIME, &end_time) == 0);
135 while (true) {
136 void *retval = nullptr;
137 end_time.tv_nsec += 100000000;
138 if (end_time.tv_nsec > 1000000000L) {
139 end_time.tv_nsec -= 1000000000L;
140 ++end_time.tv_sec;
141 }
142 int ret = pthread_timedjoin_np(pthread_, &retval, &end_time);
143 if (ret == ETIMEDOUT) continue;
144 PCHECK(ret == 0);
145 break;
146 }
147 }
148 }
149
150 // Starts the thread and waits until it is running.
151 void Start() {
152 PCHECK(pthread_create(&pthread_, nullptr, &StaticRun, this) == 0);
153 IPCRecursiveMutexLocker locker(&thread_started_mutex_);
154 if (locker.owner_died()) ::aos::Die("Owner died");
155 while (!running_) {
156 CHECK(!thread_started_condition_.Wait());
157 }
158 }
159
160 void GrabQueueIndex() {
161 // Right after we are signaled to start, point index to the current index
162 // so we don't read any messages received before now. Otherwise we will
163 // get a significantly delayed read.
Austin Schuh3578a2e2019-05-25 18:17:59 -0700164 static constexpr Options<RawQueue> kOptions =
165 RawQueue::kFromEnd | RawQueue::kNonBlock;
166 const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
167 if (msg) {
168 queue_->FreeMessage(msg);
169 }
170 }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800171
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700172 private:
173 // Runs Run given a WatcherThreadState as the argument. This is an adapter
174 // between pthreads and Run.
175 static void *StaticRun(void *arg) {
176 WatcherThreadState *watcher_thread_state =
177 reinterpret_cast<WatcherThreadState *>(arg);
178 watcher_thread_state->Run();
179 return nullptr;
180 }
181
182 // Runs the watcher callback on new messages.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800183 void Run() {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700184 // Signal the main thread that we are now ready.
Austin Schuh3115a202019-05-27 21:02:14 -0700185 thread_state_->MaybeSetCurrentThreadRealtimePriority();
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700186 {
187 IPCRecursiveMutexLocker locker(&thread_started_mutex_);
188 if (locker.owner_died()) ::aos::Die("Owner died");
189 running_ = true;
190 thread_started_condition_.Broadcast();
191 }
192
193 // Wait for the global start before handling events.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800194 thread_state_->WaitForStart();
195
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700196 // Bail immediately if we are supposed to stop.
Austin Schuh3115a202019-05-27 21:02:14 -0700197 if (!thread_state_->is_running()) {
198 ::aos::UnsetCurrentThreadRealtimePriority();
199 return;
200 }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800201
Austin Schuh3578a2e2019-05-25 18:17:59 -0700202 const void *msg = nullptr;
Parker Schuhe4a70d62017-12-27 20:10:20 -0800203 while (true) {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700204 msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_,
205 chrono::seconds(1));
206 // We hit a timeout. Confirm that we should be running and retry. Note,
207 // is_running is threadsafe (it's an atomic underneath). Worst case, we
208 // check again in a second.
209 if (msg == nullptr) {
210 if (!thread_state_->is_running()) break;
211 continue;
212 }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800213
214 {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700215 // Grab the lock so that only one callback can be called at a time.
Neil Balch229001a2018-01-07 18:22:52 -0800216 MutexLocker locker(&thread_state_->mutex_);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800217 if (!thread_state_->is_running()) break;
218
219 watcher_(reinterpret_cast<const Message *>(msg));
220 // watcher_ may have exited the event loop.
221 if (!thread_state_->is_running()) break;
222 }
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700223 // Drop the reference.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800224 queue_->FreeMessage(msg);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800225 }
226
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700227 // And drop the last reference.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800228 queue_->FreeMessage(msg);
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700229 // Now that everything is cleaned up, drop RT priority before destroying the
230 // thread.
Austin Schuh3115a202019-05-27 21:02:14 -0700231 ::aos::UnsetCurrentThreadRealtimePriority();
Parker Schuhe4a70d62017-12-27 20:10:20 -0800232 }
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700233 pthread_t pthread_;
234 ShmEventLoop::ThreadState *thread_state_;
Parker Schuhe4a70d62017-12-27 20:10:20 -0800235 RawQueue *queue_;
Austin Schuh3578a2e2019-05-25 18:17:59 -0700236 int32_t index_;
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700237 bool running_ = false;
Austin Schuh3578a2e2019-05-25 18:17:59 -0700238
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700239 ::std::function<void(const Message *message)> watcher_;
240
241 // Mutex and condition variable used to wait until the thread is started
242 // before going RT.
243 ::aos::Mutex thread_started_mutex_;
244 ::aos::Condition thread_started_condition_{&thread_started_mutex_};
Parker Schuhe4a70d62017-12-27 20:10:20 -0800245};
Neil Balch229001a2018-01-07 18:22:52 -0800246
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700247// Adapter class to adapt a timerfd to a TimerHandler.
Austin Schuh52d325c2019-06-23 18:59:06 -0700248// The part of the API which is accessed by the TimerHandler interface needs to
249// be threadsafe. This means Setup and Disable.
Neil Balch229001a2018-01-07 18:22:52 -0800250class TimerHandlerState : public TimerHandler {
251 public:
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700252 TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
253 : shm_event_loop_(shm_event_loop), fn_(::std::move(fn)) {
254 shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
255 MutexLocker locker(&shm_event_loop_->thread_state_.mutex_);
256 timerfd_.Read();
257 fn_();
258 });
Neil Balch229001a2018-01-07 18:22:52 -0800259 }
260
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700261 ~TimerHandlerState() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
Neil Balch229001a2018-01-07 18:22:52 -0800262
263 void Setup(monotonic_clock::time_point base,
264 monotonic_clock::duration repeat_offset) override {
Austin Schuh52d325c2019-06-23 18:59:06 -0700265 // SetTime is threadsafe already.
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700266 timerfd_.SetTime(base, repeat_offset);
Neil Balch229001a2018-01-07 18:22:52 -0800267 }
268
Austin Schuh52d325c2019-06-23 18:59:06 -0700269 void Disable() override {
270 // Disable is also threadsafe already.
271 timerfd_.Disable();
272 }
Neil Balch229001a2018-01-07 18:22:52 -0800273
274 private:
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700275 ShmEventLoop *shm_event_loop_;
Neil Balch229001a2018-01-07 18:22:52 -0800276
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700277 TimerFd timerfd_;
Neil Balch229001a2018-01-07 18:22:52 -0800278
279 // Function to be run on the thread
280 ::std::function<void()> fn_;
281};
Austin Schuh52d325c2019-06-23 18:59:06 -0700282
283// Adapter class to the timerfd and PhasedLoop.
284// The part of the API which is accessed by the PhasedLoopHandler interface
285// needs to be threadsafe. This means set_interval_and_offset
286class PhasedLoopHandler : public ::aos::PhasedLoopHandler {
287 public:
288 PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
289 const monotonic_clock::duration interval,
290 const monotonic_clock::duration offset)
291 : shm_event_loop_(shm_event_loop),
292 phased_loop_(interval, shm_event_loop_->monotonic_now(), offset),
293 fn_(::std::move(fn)) {
294 shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
295 MutexLocker locker(&shm_event_loop_->thread_state_.mutex_);
296 {
297 MutexLocker locker(&mutex_);
298 timerfd_.Read();
299 }
300 // Call the function. To avoid needing a recursive mutex, drop the lock
301 // before running the function.
302 fn_(cycles_elapsed_);
303 {
304 MutexLocker locker(&mutex_);
305 Reschedule();
306 }
307 });
308 }
309
310 ~PhasedLoopHandler() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
311
312 void set_interval_and_offset(
313 const monotonic_clock::duration interval,
314 const monotonic_clock::duration offset) override {
315 MutexLocker locker(&mutex_);
316 phased_loop_.set_interval_and_offset(interval, offset);
317 }
318
319 void Startup() {
320 MutexLocker locker(&mutex_);
321 phased_loop_.Reset(shm_event_loop_->monotonic_now());
322 Reschedule();
323 }
324
325 private:
326 // Reschedules the timer. Must be called with the mutex held.
327 void Reschedule() {
328 cycles_elapsed_ = phased_loop_.Iterate(shm_event_loop_->monotonic_now());
329 timerfd_.SetTime(phased_loop_.sleep_time(), ::aos::monotonic_clock::zero());
330 }
331
332 ShmEventLoop *shm_event_loop_;
333
334 // Mutex to protect access to the timerfd_ (not strictly necessary), and the
335 // phased_loop (necessary).
336 ::aos::Mutex mutex_;
337
338 TimerFd timerfd_;
339 time::PhasedLoop phased_loop_;
340
341 int cycles_elapsed_ = 1;
342
343 // Function to be run
344 const ::std::function<void(int)> fn_;
345};
Parker Schuhe4a70d62017-12-27 20:10:20 -0800346} // namespace internal
347
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700348::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
349 const ::std::string &path, const QueueTypeInfo &type) {
350 return ::std::unique_ptr<RawFetcher>(new ShmFetcher(
Parker Schuhe4a70d62017-12-27 20:10:20 -0800351 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
352}
353
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700354::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
355 const ::std::string &path, const QueueTypeInfo &type) {
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800356 Take(path);
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700357 return ::std::unique_ptr<RawSender>(new ShmSender(
Parker Schuhe4a70d62017-12-27 20:10:20 -0800358 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
359}
360
361void ShmEventLoop::MakeRawWatcher(
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700362 const ::std::string &path, const QueueTypeInfo &type,
363 ::std::function<void(const Message *message)> watcher) {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800364 Take(path);
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700365 ::std::unique_ptr<internal::WatcherThreadState> state(
366 new internal::WatcherThreadState(
367 &thread_state_, RawQueue::Fetch(path.c_str(), type.size, type.hash,
368 type.queue_length),
369 std::move(watcher)));
370 watchers_.push_back(::std::move(state));
Parker Schuhe4a70d62017-12-27 20:10:20 -0800371}
372
Neil Balch229001a2018-01-07 18:22:52 -0800373TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700374 ::std::unique_ptr<internal::TimerHandlerState> timer(
375 new internal::TimerHandlerState(this, ::std::move(callback)));
Neil Balch229001a2018-01-07 18:22:52 -0800376
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700377 timers_.push_back(::std::move(timer));
Neil Balch229001a2018-01-07 18:22:52 -0800378
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700379 return timers_.back().get();
Neil Balch229001a2018-01-07 18:22:52 -0800380}
381
Austin Schuh52d325c2019-06-23 18:59:06 -0700382PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
383 ::std::function<void(int)> callback,
384 const monotonic_clock::duration interval,
385 const monotonic_clock::duration offset) {
386 ::std::unique_ptr<internal::PhasedLoopHandler> phased_loop(
387 new internal::PhasedLoopHandler(this, ::std::move(callback), interval,
388 offset));
389
390 phased_loops_.push_back(::std::move(phased_loop));
391
392 return phased_loops_.back().get();
393}
394
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700395void ShmEventLoop::OnRun(::std::function<void()> on_run) {
396 on_run_.push_back(::std::move(on_run));
Parker Schuhe4a70d62017-12-27 20:10:20 -0800397}
398
399void ShmEventLoop::Run() {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700400 // Start all the watcher threads.
401 for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
402 watcher->Start();
403 }
404
405 // Now, all the threads are up. Go RT.
406 thread_state_.MaybeSetCurrentThreadRealtimePriority();
Parker Schuhe4a70d62017-12-27 20:10:20 -0800407 set_is_running(true);
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700408
409 // Now that we are realtime (but before the OnRun handlers run), snap the
410 // queue index.
411 for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
412 watcher->GrabQueueIndex();
413 }
414
415 // Now that we are RT, run all the OnRun handlers.
416 for (const auto &run : on_run_) {
417 run();
418 }
Austin Schuh52d325c2019-06-23 18:59:06 -0700419
420 // Start up all the phased loops.
421 for (::std::unique_ptr<internal::PhasedLoopHandler> &phased_loop :
422 phased_loops_) {
423 phased_loop->Startup();
424 }
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700425 // TODO(austin): We don't need a separate watcher thread if there are only
426 // watchers and fetchers. Could lazily create the epoll loop and pick a
427 // victim watcher to run in this thread.
428 // Trigger all the threads to start now.
429 thread_state_.Start();
430
431 // And start our main event loop which runs all the timers and handles Quit.
432 epoll_.Run();
433
434 // Once epoll exits, there is no useful nonrt work left to do.
435 set_is_running(false);
436
437 // Signal all the watcher threads to exit. After this point, no more
438 // callbacks will be handled.
439 thread_state_.Exit();
440
441 // Nothing time or synchronization critical needs to happen after this point.
442 // Drop RT priority.
Austin Schuh3115a202019-05-27 21:02:14 -0700443 ::aos::UnsetCurrentThreadRealtimePriority();
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700444
445 // The watcher threads get cleaned up in the destructor.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800446}
447
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700448void ShmEventLoop::ThreadState::Start() {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800449 MutexLocker locker(&mutex_);
450 loop_running_ = true;
451 if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
452 loop_running_cond_.Broadcast();
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700453}
454
455void ShmEventLoop::ThreadState::WaitForStart() {
456 MutexLocker locker(&mutex_);
457 while (!(loop_running_ || loop_finished_)) {
458 Condition::WaitResult wait_result =
459 loop_running_cond_.WaitTimed(chrono::milliseconds(1000));
460 if (wait_result == Condition::WaitResult::kOwnerDied) {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800461 ::aos::Die("ShmEventLoop mutex lock problem.\n");
462 }
463 }
464}
465
Austin Schuh3115a202019-05-27 21:02:14 -0700466void ShmEventLoop::ThreadState::MaybeSetCurrentThreadRealtimePriority() {
467 if (priority_ != -1) {
468 ::aos::SetCurrentThreadRealtimePriority(priority_);
469 }
470}
471
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700472void ShmEventLoop::Exit() { epoll_.Quit(); }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800473
474void ShmEventLoop::ThreadState::Exit() {
475 IPCRecursiveMutexLocker locker(&mutex_);
476 if (locker.owner_died()) ::aos::Die("Owner died");
477 loop_running_ = false;
478 loop_finished_ = true;
479 loop_running_cond_.Broadcast();
480}
481
482ShmEventLoop::~ShmEventLoop() {
483 if (is_running()) {
484 ::aos::Die("ShmEventLoop destroyed while running\n");
485 }
486}
487
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700488void ShmEventLoop::Take(const ::std::string &path) {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800489 if (is_running()) {
490 ::aos::Die("Cannot add new objects while running.\n");
491 }
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800492
493 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
494 if (prior != taken_.end()) {
495 ::aos::Die("%s is already being used.", path.c_str());
496 } else {
497 taken_.emplace_back(path);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800498 }
499}
500
501} // namespace aos