blob: eead9f52e8b1e058297a5b11b7d61247e30b004a [file] [log] [blame]
Parker Schuhe4a70d62017-12-27 20:10:20 -08001#include "aos/events/shm-event-loop.h"
Parker Schuhe4a70d62017-12-27 20:10:20 -08002
Neil Balch229001a2018-01-07 18:22:52 -08003#include <sys/timerfd.h>
Austin Schuh81fc9cc2019-02-02 23:25:47 -08004#include <algorithm>
Parker Schuhe4a70d62017-12-27 20:10:20 -08005#include <atomic>
6#include <chrono>
7#include <stdexcept>
8
Austin Schuh6b6dfa52019-06-12 20:16:20 -07009#include "aos/events/epoll.h"
Austin Schuh3115a202019-05-27 21:02:14 -070010#include "aos/init.h"
Austin Schuh81fc9cc2019-02-02 23:25:47 -080011#include "aos/logging/logging.h"
12#include "aos/queue.h"
Austin Schuh52d325c2019-06-23 18:59:06 -070013#include "aos/util/phased_loop.h"
Austin Schuh81fc9cc2019-02-02 23:25:47 -080014
Parker Schuhe4a70d62017-12-27 20:10:20 -080015namespace aos {
16
Austin Schuh6b6dfa52019-06-12 20:16:20 -070017ShmEventLoop::ShmEventLoop() {}
Parker Schuhe4a70d62017-12-27 20:10:20 -080018
19namespace {
Austin Schuh6b6dfa52019-06-12 20:16:20 -070020
21namespace chrono = ::std::chrono;
22
Parker Schuhe4a70d62017-12-27 20:10:20 -080023class ShmFetcher : public RawFetcher {
24 public:
Austin Schuhbbce72d2019-05-26 15:11:46 -070025 explicit ShmFetcher(RawQueue *queue) : queue_(queue) {
26 // Move index_ to point to the end of the queue as it is at construction
27 // time. Also grab the oldest message but don't expose it to the user yet.
28 static constexpr Options<RawQueue> kOptions =
29 RawQueue::kFromEnd | RawQueue::kNonBlock;
30 msg_ = static_cast<const FetchValue *>(
31 queue_->ReadMessageIndex(kOptions, &index_));
32 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080033 ~ShmFetcher() {
34 if (msg_) {
35 queue_->FreeMessage(msg_);
36 }
37 }
38
James Kuszmaulc79768b2019-02-18 15:08:44 -080039 bool FetchNext() override {
40 const FetchValue *msg = static_cast<const FetchValue *>(
41 queue_->ReadMessageIndex(RawQueue::kNonBlock, &index_));
42 // Only update the internal pointer if we got a new message.
Austin Schuhbbce72d2019-05-26 15:11:46 -070043 if (msg != nullptr) {
James Kuszmaulc79768b2019-02-18 15:08:44 -080044 queue_->FreeMessage(msg_);
45 msg_ = msg;
46 set_most_recent(msg_);
47 }
Austin Schuhbbce72d2019-05-26 15:11:46 -070048 return msg != nullptr;
James Kuszmaulc79768b2019-02-18 15:08:44 -080049 }
50
51 bool Fetch() override {
Parker Schuhe4a70d62017-12-27 20:10:20 -080052 static constexpr Options<RawQueue> kOptions =
53 RawQueue::kFromEnd | RawQueue::kNonBlock;
54 const FetchValue *msg = static_cast<const FetchValue *>(
55 queue_->ReadMessageIndex(kOptions, &index_));
56 // Only update the internal pointer if we got a new message.
Austin Schuhbbce72d2019-05-26 15:11:46 -070057 if (msg != nullptr && msg != msg_) {
Parker Schuhe4a70d62017-12-27 20:10:20 -080058 queue_->FreeMessage(msg_);
59 msg_ = msg;
60 set_most_recent(msg_);
61 return true;
Austin Schuhbbce72d2019-05-26 15:11:46 -070062 } else {
63 // The message has to get freed if we didn't use it (and
64 // RawQueue::FreeMessage is ok to call on nullptr).
65 queue_->FreeMessage(msg);
66
67 // We have a message from construction time. Give it to the user now.
68 if (msg_ != nullptr && most_recent() != msg_) {
69 set_most_recent(msg_);
70 return true;
71 } else {
72 return false;
73 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080074 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080075 }
76
77 private:
78 int index_ = 0;
79 RawQueue *queue_;
80 const FetchValue *msg_ = nullptr;
81};
82
83class ShmSender : public RawSender {
84 public:
85 explicit ShmSender(RawQueue *queue) : queue_(queue) {}
86
Austin Schuh6b6dfa52019-06-12 20:16:20 -070087 ::aos::Message *GetMessage() override {
88 return reinterpret_cast<::aos::Message *>(queue_->GetMessage());
Parker Schuhe4a70d62017-12-27 20:10:20 -080089 }
90
Austin Schuh6b6dfa52019-06-12 20:16:20 -070091 void Free(::aos::Message *msg) override { queue_->FreeMessage(msg); }
Parker Schuhe4a70d62017-12-27 20:10:20 -080092
Austin Schuh6b6dfa52019-06-12 20:16:20 -070093 bool Send(::aos::Message *msg) override {
Austin Schuhbbce72d2019-05-26 15:11:46 -070094 assert(queue_ != nullptr);
Austin Schuh7267c532019-05-19 19:55:53 -070095 {
Austin Schuh7267c532019-05-19 19:55:53 -070096 // TODO(austin): This lets multiple senders reorder messages since time
97 // isn't acquired with a lock held.
James Kuszmaulcd1db352019-05-26 16:42:29 -070098 if (msg->sent_time == monotonic_clock::min_time) {
99 msg->sent_time = monotonic_clock::now();
Austin Schuh7267c532019-05-19 19:55:53 -0700100 }
101 }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800102 return queue_->WriteMessage(msg, RawQueue::kOverride);
103 }
104
Austin Schuhd681bbd2019-02-02 12:03:32 -0800105 const char *name() const override { return queue_->name(); }
106
Parker Schuhe4a70d62017-12-27 20:10:20 -0800107 private:
108 RawQueue *queue_;
109};
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700110
Parker Schuhe4a70d62017-12-27 20:10:20 -0800111} // namespace
112
113namespace internal {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700114
115// Class to manage the state for a Watcher.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800116class WatcherThreadState {
117 public:
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700118 WatcherThreadState(
119 ShmEventLoop::ThreadState *thread_state, RawQueue *queue,
120 ::std::function<void(const ::aos::Message *message)> watcher)
121 : thread_state_(thread_state),
Parker Schuhe4a70d62017-12-27 20:10:20 -0800122 queue_(queue),
Austin Schuh3578a2e2019-05-25 18:17:59 -0700123 index_(0),
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700124 watcher_(::std::move(watcher)) {}
125
126 ~WatcherThreadState() {
127 // Only kill the thread if it is running.
128 if (running_) {
129 // TODO(austin): CHECK that we aren't RT here.
130
131 // Try joining. If we fail, we weren't asleep on the condition in the
132 // queue. So hit it again and again until that's true.
133 struct timespec end_time;
134 PCHECK(clock_gettime(CLOCK_REALTIME, &end_time) == 0);
135 while (true) {
136 void *retval = nullptr;
137 end_time.tv_nsec += 100000000;
138 if (end_time.tv_nsec > 1000000000L) {
139 end_time.tv_nsec -= 1000000000L;
140 ++end_time.tv_sec;
141 }
142 int ret = pthread_timedjoin_np(pthread_, &retval, &end_time);
143 if (ret == ETIMEDOUT) continue;
144 PCHECK(ret == 0);
145 break;
146 }
147 }
148 }
149
150 // Starts the thread and waits until it is running.
151 void Start() {
152 PCHECK(pthread_create(&pthread_, nullptr, &StaticRun, this) == 0);
153 IPCRecursiveMutexLocker locker(&thread_started_mutex_);
154 if (locker.owner_died()) ::aos::Die("Owner died");
155 while (!running_) {
156 CHECK(!thread_started_condition_.Wait());
157 }
158 }
159
160 void GrabQueueIndex() {
161 // Right after we are signaled to start, point index to the current index
162 // so we don't read any messages received before now. Otherwise we will
163 // get a significantly delayed read.
Austin Schuh3578a2e2019-05-25 18:17:59 -0700164 static constexpr Options<RawQueue> kOptions =
165 RawQueue::kFromEnd | RawQueue::kNonBlock;
166 const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
167 if (msg) {
168 queue_->FreeMessage(msg);
169 }
170 }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800171
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700172 private:
173 // Runs Run given a WatcherThreadState as the argument. This is an adapter
174 // between pthreads and Run.
175 static void *StaticRun(void *arg) {
176 WatcherThreadState *watcher_thread_state =
177 reinterpret_cast<WatcherThreadState *>(arg);
178 watcher_thread_state->Run();
179 return nullptr;
180 }
181
182 // Runs the watcher callback on new messages.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800183 void Run() {
Austin Schuh0fc3b3e2019-06-29 13:56:21 -0700184 ::aos::SetCurrentThreadName(thread_state_->name() + ".watcher");
185
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700186 // Signal the main thread that we are now ready.
Austin Schuh3115a202019-05-27 21:02:14 -0700187 thread_state_->MaybeSetCurrentThreadRealtimePriority();
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700188 {
189 IPCRecursiveMutexLocker locker(&thread_started_mutex_);
190 if (locker.owner_died()) ::aos::Die("Owner died");
191 running_ = true;
192 thread_started_condition_.Broadcast();
193 }
194
195 // Wait for the global start before handling events.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800196 thread_state_->WaitForStart();
197
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700198 // Bail immediately if we are supposed to stop.
Austin Schuh3115a202019-05-27 21:02:14 -0700199 if (!thread_state_->is_running()) {
200 ::aos::UnsetCurrentThreadRealtimePriority();
201 return;
202 }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800203
Austin Schuh3578a2e2019-05-25 18:17:59 -0700204 const void *msg = nullptr;
Parker Schuhe4a70d62017-12-27 20:10:20 -0800205 while (true) {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700206 msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_,
207 chrono::seconds(1));
208 // We hit a timeout. Confirm that we should be running and retry. Note,
209 // is_running is threadsafe (it's an atomic underneath). Worst case, we
210 // check again in a second.
211 if (msg == nullptr) {
212 if (!thread_state_->is_running()) break;
213 continue;
214 }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800215
216 {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700217 // Grab the lock so that only one callback can be called at a time.
Neil Balch229001a2018-01-07 18:22:52 -0800218 MutexLocker locker(&thread_state_->mutex_);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800219 if (!thread_state_->is_running()) break;
220
221 watcher_(reinterpret_cast<const Message *>(msg));
222 // watcher_ may have exited the event loop.
223 if (!thread_state_->is_running()) break;
224 }
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700225 // Drop the reference.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800226 queue_->FreeMessage(msg);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800227 }
228
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700229 // And drop the last reference.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800230 queue_->FreeMessage(msg);
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700231 // Now that everything is cleaned up, drop RT priority before destroying the
232 // thread.
Austin Schuh3115a202019-05-27 21:02:14 -0700233 ::aos::UnsetCurrentThreadRealtimePriority();
Parker Schuhe4a70d62017-12-27 20:10:20 -0800234 }
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700235 pthread_t pthread_;
236 ShmEventLoop::ThreadState *thread_state_;
Parker Schuhe4a70d62017-12-27 20:10:20 -0800237 RawQueue *queue_;
Austin Schuh3578a2e2019-05-25 18:17:59 -0700238 int32_t index_;
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700239 bool running_ = false;
Austin Schuh3578a2e2019-05-25 18:17:59 -0700240
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700241 ::std::function<void(const Message *message)> watcher_;
242
243 // Mutex and condition variable used to wait until the thread is started
244 // before going RT.
245 ::aos::Mutex thread_started_mutex_;
246 ::aos::Condition thread_started_condition_{&thread_started_mutex_};
Parker Schuhe4a70d62017-12-27 20:10:20 -0800247};
Neil Balch229001a2018-01-07 18:22:52 -0800248
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700249// Adapter class to adapt a timerfd to a TimerHandler.
Austin Schuh52d325c2019-06-23 18:59:06 -0700250// The part of the API which is accessed by the TimerHandler interface needs to
251// be threadsafe. This means Setup and Disable.
Neil Balch229001a2018-01-07 18:22:52 -0800252class TimerHandlerState : public TimerHandler {
253 public:
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700254 TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
255 : shm_event_loop_(shm_event_loop), fn_(::std::move(fn)) {
256 shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
257 MutexLocker locker(&shm_event_loop_->thread_state_.mutex_);
258 timerfd_.Read();
259 fn_();
260 });
Neil Balch229001a2018-01-07 18:22:52 -0800261 }
262
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700263 ~TimerHandlerState() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
Neil Balch229001a2018-01-07 18:22:52 -0800264
265 void Setup(monotonic_clock::time_point base,
266 monotonic_clock::duration repeat_offset) override {
Austin Schuh52d325c2019-06-23 18:59:06 -0700267 // SetTime is threadsafe already.
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700268 timerfd_.SetTime(base, repeat_offset);
Neil Balch229001a2018-01-07 18:22:52 -0800269 }
270
Austin Schuh52d325c2019-06-23 18:59:06 -0700271 void Disable() override {
272 // Disable is also threadsafe already.
273 timerfd_.Disable();
274 }
Neil Balch229001a2018-01-07 18:22:52 -0800275
276 private:
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700277 ShmEventLoop *shm_event_loop_;
Neil Balch229001a2018-01-07 18:22:52 -0800278
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700279 TimerFd timerfd_;
Neil Balch229001a2018-01-07 18:22:52 -0800280
281 // Function to be run on the thread
282 ::std::function<void()> fn_;
283};
Austin Schuh52d325c2019-06-23 18:59:06 -0700284
285// Adapter class to the timerfd and PhasedLoop.
286// The part of the API which is accessed by the PhasedLoopHandler interface
287// needs to be threadsafe. This means set_interval_and_offset
288class PhasedLoopHandler : public ::aos::PhasedLoopHandler {
289 public:
290 PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
291 const monotonic_clock::duration interval,
292 const monotonic_clock::duration offset)
293 : shm_event_loop_(shm_event_loop),
294 phased_loop_(interval, shm_event_loop_->monotonic_now(), offset),
295 fn_(::std::move(fn)) {
296 shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
297 MutexLocker locker(&shm_event_loop_->thread_state_.mutex_);
298 {
299 MutexLocker locker(&mutex_);
300 timerfd_.Read();
301 }
302 // Call the function. To avoid needing a recursive mutex, drop the lock
303 // before running the function.
304 fn_(cycles_elapsed_);
305 {
306 MutexLocker locker(&mutex_);
307 Reschedule();
308 }
309 });
310 }
311
312 ~PhasedLoopHandler() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
313
314 void set_interval_and_offset(
315 const monotonic_clock::duration interval,
316 const monotonic_clock::duration offset) override {
317 MutexLocker locker(&mutex_);
318 phased_loop_.set_interval_and_offset(interval, offset);
319 }
320
321 void Startup() {
322 MutexLocker locker(&mutex_);
323 phased_loop_.Reset(shm_event_loop_->monotonic_now());
324 Reschedule();
325 }
326
327 private:
328 // Reschedules the timer. Must be called with the mutex held.
329 void Reschedule() {
330 cycles_elapsed_ = phased_loop_.Iterate(shm_event_loop_->monotonic_now());
331 timerfd_.SetTime(phased_loop_.sleep_time(), ::aos::monotonic_clock::zero());
332 }
333
334 ShmEventLoop *shm_event_loop_;
335
336 // Mutex to protect access to the timerfd_ (not strictly necessary), and the
337 // phased_loop (necessary).
338 ::aos::Mutex mutex_;
339
340 TimerFd timerfd_;
341 time::PhasedLoop phased_loop_;
342
343 int cycles_elapsed_ = 1;
344
345 // Function to be run
346 const ::std::function<void(int)> fn_;
347};
Parker Schuhe4a70d62017-12-27 20:10:20 -0800348} // namespace internal
349
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700350::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
351 const ::std::string &path, const QueueTypeInfo &type) {
352 return ::std::unique_ptr<RawFetcher>(new ShmFetcher(
Parker Schuhe4a70d62017-12-27 20:10:20 -0800353 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
354}
355
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700356::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
357 const ::std::string &path, const QueueTypeInfo &type) {
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800358 Take(path);
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700359 return ::std::unique_ptr<RawSender>(new ShmSender(
Parker Schuhe4a70d62017-12-27 20:10:20 -0800360 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
361}
362
363void ShmEventLoop::MakeRawWatcher(
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700364 const ::std::string &path, const QueueTypeInfo &type,
365 ::std::function<void(const Message *message)> watcher) {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800366 Take(path);
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700367 ::std::unique_ptr<internal::WatcherThreadState> state(
368 new internal::WatcherThreadState(
369 &thread_state_, RawQueue::Fetch(path.c_str(), type.size, type.hash,
370 type.queue_length),
371 std::move(watcher)));
372 watchers_.push_back(::std::move(state));
Parker Schuhe4a70d62017-12-27 20:10:20 -0800373}
374
Neil Balch229001a2018-01-07 18:22:52 -0800375TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700376 ::std::unique_ptr<internal::TimerHandlerState> timer(
377 new internal::TimerHandlerState(this, ::std::move(callback)));
Neil Balch229001a2018-01-07 18:22:52 -0800378
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700379 timers_.push_back(::std::move(timer));
Neil Balch229001a2018-01-07 18:22:52 -0800380
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700381 return timers_.back().get();
Neil Balch229001a2018-01-07 18:22:52 -0800382}
383
Austin Schuh52d325c2019-06-23 18:59:06 -0700384PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
385 ::std::function<void(int)> callback,
386 const monotonic_clock::duration interval,
387 const monotonic_clock::duration offset) {
388 ::std::unique_ptr<internal::PhasedLoopHandler> phased_loop(
389 new internal::PhasedLoopHandler(this, ::std::move(callback), interval,
390 offset));
391
392 phased_loops_.push_back(::std::move(phased_loop));
393
394 return phased_loops_.back().get();
395}
396
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700397void ShmEventLoop::OnRun(::std::function<void()> on_run) {
398 on_run_.push_back(::std::move(on_run));
Parker Schuhe4a70d62017-12-27 20:10:20 -0800399}
400
Austin Schuh0fc3b3e2019-06-29 13:56:21 -0700401void ShmEventLoop::set_name(const char *name) { thread_state_.name_ = name; }
402
Parker Schuhe4a70d62017-12-27 20:10:20 -0800403void ShmEventLoop::Run() {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700404 // Start all the watcher threads.
405 for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
406 watcher->Start();
407 }
408
Austin Schuh0fc3b3e2019-06-29 13:56:21 -0700409 ::aos::SetCurrentThreadName(thread_state_.name());
410
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700411 // Now, all the threads are up. Go RT.
412 thread_state_.MaybeSetCurrentThreadRealtimePriority();
Parker Schuhe4a70d62017-12-27 20:10:20 -0800413 set_is_running(true);
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700414
415 // Now that we are realtime (but before the OnRun handlers run), snap the
416 // queue index.
417 for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
418 watcher->GrabQueueIndex();
419 }
420
421 // Now that we are RT, run all the OnRun handlers.
422 for (const auto &run : on_run_) {
423 run();
424 }
Austin Schuh52d325c2019-06-23 18:59:06 -0700425
426 // Start up all the phased loops.
427 for (::std::unique_ptr<internal::PhasedLoopHandler> &phased_loop :
428 phased_loops_) {
429 phased_loop->Startup();
430 }
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700431 // TODO(austin): We don't need a separate watcher thread if there are only
432 // watchers and fetchers. Could lazily create the epoll loop and pick a
433 // victim watcher to run in this thread.
434 // Trigger all the threads to start now.
435 thread_state_.Start();
436
437 // And start our main event loop which runs all the timers and handles Quit.
438 epoll_.Run();
439
440 // Once epoll exits, there is no useful nonrt work left to do.
441 set_is_running(false);
442
443 // Signal all the watcher threads to exit. After this point, no more
444 // callbacks will be handled.
445 thread_state_.Exit();
446
447 // Nothing time or synchronization critical needs to happen after this point.
448 // Drop RT priority.
Austin Schuh3115a202019-05-27 21:02:14 -0700449 ::aos::UnsetCurrentThreadRealtimePriority();
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700450
451 // The watcher threads get cleaned up in the destructor.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800452}
453
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700454void ShmEventLoop::ThreadState::Start() {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800455 MutexLocker locker(&mutex_);
456 loop_running_ = true;
457 if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
458 loop_running_cond_.Broadcast();
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700459}
460
461void ShmEventLoop::ThreadState::WaitForStart() {
462 MutexLocker locker(&mutex_);
463 while (!(loop_running_ || loop_finished_)) {
464 Condition::WaitResult wait_result =
465 loop_running_cond_.WaitTimed(chrono::milliseconds(1000));
466 if (wait_result == Condition::WaitResult::kOwnerDied) {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800467 ::aos::Die("ShmEventLoop mutex lock problem.\n");
468 }
469 }
470}
471
Austin Schuh3115a202019-05-27 21:02:14 -0700472void ShmEventLoop::ThreadState::MaybeSetCurrentThreadRealtimePriority() {
473 if (priority_ != -1) {
474 ::aos::SetCurrentThreadRealtimePriority(priority_);
475 }
476}
477
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700478void ShmEventLoop::Exit() { epoll_.Quit(); }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800479
480void ShmEventLoop::ThreadState::Exit() {
481 IPCRecursiveMutexLocker locker(&mutex_);
482 if (locker.owner_died()) ::aos::Die("Owner died");
483 loop_running_ = false;
484 loop_finished_ = true;
485 loop_running_cond_.Broadcast();
486}
487
488ShmEventLoop::~ShmEventLoop() {
489 if (is_running()) {
490 ::aos::Die("ShmEventLoop destroyed while running\n");
491 }
492}
493
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700494void ShmEventLoop::Take(const ::std::string &path) {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800495 if (is_running()) {
496 ::aos::Die("Cannot add new objects while running.\n");
497 }
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800498
499 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
500 if (prior != taken_.end()) {
501 ::aos::Die("%s is already being used.", path.c_str());
502 } else {
503 taken_.emplace_back(path);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800504 }
505}
506
507} // namespace aos