blob: 5034f0b50db000cb43d7c4463396428c9765c9f8 [file] [log] [blame]
Parker Schuhe4a70d62017-12-27 20:10:20 -08001#include "aos/events/shm-event-loop.h"
Parker Schuhe4a70d62017-12-27 20:10:20 -08002
Neil Balch229001a2018-01-07 18:22:52 -08003#include <sys/timerfd.h>
Austin Schuh81fc9cc2019-02-02 23:25:47 -08004#include <algorithm>
Parker Schuhe4a70d62017-12-27 20:10:20 -08005#include <atomic>
6#include <chrono>
7#include <stdexcept>
8
Austin Schuh6b6dfa52019-06-12 20:16:20 -07009#include "aos/events/epoll.h"
Austin Schuh3115a202019-05-27 21:02:14 -070010#include "aos/init.h"
Austin Schuh81fc9cc2019-02-02 23:25:47 -080011#include "aos/logging/logging.h"
12#include "aos/queue.h"
Austin Schuh52d325c2019-06-23 18:59:06 -070013#include "aos/util/phased_loop.h"
Austin Schuh81fc9cc2019-02-02 23:25:47 -080014
Parker Schuhe4a70d62017-12-27 20:10:20 -080015namespace aos {
16
Austin Schuh6b6dfa52019-06-12 20:16:20 -070017ShmEventLoop::ShmEventLoop() {}
Parker Schuhe4a70d62017-12-27 20:10:20 -080018
19namespace {
Austin Schuh6b6dfa52019-06-12 20:16:20 -070020
21namespace chrono = ::std::chrono;
22
Parker Schuhe4a70d62017-12-27 20:10:20 -080023class ShmFetcher : public RawFetcher {
24 public:
Austin Schuhbbce72d2019-05-26 15:11:46 -070025 explicit ShmFetcher(RawQueue *queue) : queue_(queue) {
26 // Move index_ to point to the end of the queue as it is at construction
27 // time. Also grab the oldest message but don't expose it to the user yet.
28 static constexpr Options<RawQueue> kOptions =
29 RawQueue::kFromEnd | RawQueue::kNonBlock;
James Kuszmaulaf04d732019-10-06 21:51:55 -070030 msg_ = queue_->ReadMessageIndex(kOptions, &index_);
Austin Schuhbbce72d2019-05-26 15:11:46 -070031 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080032 ~ShmFetcher() {
33 if (msg_) {
34 queue_->FreeMessage(msg_);
35 }
36 }
37
James Kuszmaulc79768b2019-02-18 15:08:44 -080038 bool FetchNext() override {
James Kuszmaulaf04d732019-10-06 21:51:55 -070039 const void *msg = queue_->ReadMessageIndex(RawQueue::kNonBlock, &index_);
James Kuszmaulc79768b2019-02-18 15:08:44 -080040 // Only update the internal pointer if we got a new message.
Austin Schuhbbce72d2019-05-26 15:11:46 -070041 if (msg != nullptr) {
James Kuszmaulc79768b2019-02-18 15:08:44 -080042 queue_->FreeMessage(msg_);
43 msg_ = msg;
44 set_most_recent(msg_);
45 }
Austin Schuhbbce72d2019-05-26 15:11:46 -070046 return msg != nullptr;
James Kuszmaulc79768b2019-02-18 15:08:44 -080047 }
48
49 bool Fetch() override {
Parker Schuhe4a70d62017-12-27 20:10:20 -080050 static constexpr Options<RawQueue> kOptions =
51 RawQueue::kFromEnd | RawQueue::kNonBlock;
James Kuszmaulaf04d732019-10-06 21:51:55 -070052 const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
Parker Schuhe4a70d62017-12-27 20:10:20 -080053 // Only update the internal pointer if we got a new message.
Austin Schuhbbce72d2019-05-26 15:11:46 -070054 if (msg != nullptr && msg != msg_) {
Parker Schuhe4a70d62017-12-27 20:10:20 -080055 queue_->FreeMessage(msg_);
56 msg_ = msg;
57 set_most_recent(msg_);
58 return true;
Austin Schuhbbce72d2019-05-26 15:11:46 -070059 } else {
60 // The message has to get freed if we didn't use it (and
61 // RawQueue::FreeMessage is ok to call on nullptr).
62 queue_->FreeMessage(msg);
63
64 // We have a message from construction time. Give it to the user now.
65 if (msg_ != nullptr && most_recent() != msg_) {
66 set_most_recent(msg_);
67 return true;
68 } else {
69 return false;
70 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080071 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080072 }
73
74 private:
75 int index_ = 0;
76 RawQueue *queue_;
James Kuszmaulaf04d732019-10-06 21:51:55 -070077 const void *msg_ = nullptr;
Parker Schuhe4a70d62017-12-27 20:10:20 -080078};
79
80class ShmSender : public RawSender {
81 public:
82 explicit ShmSender(RawQueue *queue) : queue_(queue) {}
83
Austin Schuh6b6dfa52019-06-12 20:16:20 -070084 ::aos::Message *GetMessage() override {
85 return reinterpret_cast<::aos::Message *>(queue_->GetMessage());
Parker Schuhe4a70d62017-12-27 20:10:20 -080086 }
87
Austin Schuh6b6dfa52019-06-12 20:16:20 -070088 void Free(::aos::Message *msg) override { queue_->FreeMessage(msg); }
Parker Schuhe4a70d62017-12-27 20:10:20 -080089
Austin Schuh6b6dfa52019-06-12 20:16:20 -070090 bool Send(::aos::Message *msg) override {
Austin Schuhbbce72d2019-05-26 15:11:46 -070091 assert(queue_ != nullptr);
Austin Schuh7267c532019-05-19 19:55:53 -070092 {
Austin Schuh7267c532019-05-19 19:55:53 -070093 // TODO(austin): This lets multiple senders reorder messages since time
94 // isn't acquired with a lock held.
James Kuszmaulcd1db352019-05-26 16:42:29 -070095 if (msg->sent_time == monotonic_clock::min_time) {
96 msg->sent_time = monotonic_clock::now();
Austin Schuh7267c532019-05-19 19:55:53 -070097 }
98 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080099 return queue_->WriteMessage(msg, RawQueue::kOverride);
100 }
101
Austin Schuhd681bbd2019-02-02 12:03:32 -0800102 const char *name() const override { return queue_->name(); }
103
Parker Schuhe4a70d62017-12-27 20:10:20 -0800104 private:
105 RawQueue *queue_;
106};
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700107
Parker Schuhe4a70d62017-12-27 20:10:20 -0800108} // namespace
109
110namespace internal {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700111
112// Class to manage the state for a Watcher.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800113class WatcherThreadState {
114 public:
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700115 WatcherThreadState(
116 ShmEventLoop::ThreadState *thread_state, RawQueue *queue,
117 ::std::function<void(const ::aos::Message *message)> watcher)
118 : thread_state_(thread_state),
Parker Schuhe4a70d62017-12-27 20:10:20 -0800119 queue_(queue),
Austin Schuh3578a2e2019-05-25 18:17:59 -0700120 index_(0),
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700121 watcher_(::std::move(watcher)) {}
122
123 ~WatcherThreadState() {
124 // Only kill the thread if it is running.
125 if (running_) {
126 // TODO(austin): CHECK that we aren't RT here.
127
128 // Try joining. If we fail, we weren't asleep on the condition in the
129 // queue. So hit it again and again until that's true.
130 struct timespec end_time;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700131 AOS_PCHECK(clock_gettime(CLOCK_REALTIME, &end_time) == 0);
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700132 while (true) {
133 void *retval = nullptr;
134 end_time.tv_nsec += 100000000;
135 if (end_time.tv_nsec > 1000000000L) {
136 end_time.tv_nsec -= 1000000000L;
137 ++end_time.tv_sec;
138 }
139 int ret = pthread_timedjoin_np(pthread_, &retval, &end_time);
140 if (ret == ETIMEDOUT) continue;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700141 AOS_PCHECK(ret == 0);
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700142 break;
143 }
144 }
145 }
146
147 // Starts the thread and waits until it is running.
148 void Start() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700149 AOS_PCHECK(pthread_create(&pthread_, nullptr, &StaticRun, this) == 0);
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700150 IPCRecursiveMutexLocker locker(&thread_started_mutex_);
151 if (locker.owner_died()) ::aos::Die("Owner died");
152 while (!running_) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700153 AOS_CHECK(!thread_started_condition_.Wait());
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700154 }
155 }
156
157 void GrabQueueIndex() {
158 // Right after we are signaled to start, point index to the current index
159 // so we don't read any messages received before now. Otherwise we will
160 // get a significantly delayed read.
Austin Schuh3578a2e2019-05-25 18:17:59 -0700161 static constexpr Options<RawQueue> kOptions =
162 RawQueue::kFromEnd | RawQueue::kNonBlock;
163 const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
164 if (msg) {
165 queue_->FreeMessage(msg);
166 }
167 }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800168
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700169 private:
170 // Runs Run given a WatcherThreadState as the argument. This is an adapter
171 // between pthreads and Run.
172 static void *StaticRun(void *arg) {
173 WatcherThreadState *watcher_thread_state =
174 reinterpret_cast<WatcherThreadState *>(arg);
175 watcher_thread_state->Run();
176 return nullptr;
177 }
178
179 // Runs the watcher callback on new messages.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800180 void Run() {
Austin Schuh0fc3b3e2019-06-29 13:56:21 -0700181 ::aos::SetCurrentThreadName(thread_state_->name() + ".watcher");
182
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700183 // Signal the main thread that we are now ready.
Austin Schuh3115a202019-05-27 21:02:14 -0700184 thread_state_->MaybeSetCurrentThreadRealtimePriority();
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700185 {
186 IPCRecursiveMutexLocker locker(&thread_started_mutex_);
187 if (locker.owner_died()) ::aos::Die("Owner died");
188 running_ = true;
189 thread_started_condition_.Broadcast();
190 }
191
192 // Wait for the global start before handling events.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800193 thread_state_->WaitForStart();
194
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700195 // Bail immediately if we are supposed to stop.
Austin Schuh3115a202019-05-27 21:02:14 -0700196 if (!thread_state_->is_running()) {
197 ::aos::UnsetCurrentThreadRealtimePriority();
198 return;
199 }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800200
Austin Schuh3578a2e2019-05-25 18:17:59 -0700201 const void *msg = nullptr;
Parker Schuhe4a70d62017-12-27 20:10:20 -0800202 while (true) {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700203 msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_,
204 chrono::seconds(1));
205 // We hit a timeout. Confirm that we should be running and retry. Note,
206 // is_running is threadsafe (it's an atomic underneath). Worst case, we
207 // check again in a second.
208 if (msg == nullptr) {
209 if (!thread_state_->is_running()) break;
210 continue;
211 }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800212
213 {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700214 // Grab the lock so that only one callback can be called at a time.
Neil Balch229001a2018-01-07 18:22:52 -0800215 MutexLocker locker(&thread_state_->mutex_);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800216 if (!thread_state_->is_running()) break;
217
218 watcher_(reinterpret_cast<const Message *>(msg));
219 // watcher_ may have exited the event loop.
220 if (!thread_state_->is_running()) break;
221 }
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700222 // Drop the reference.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800223 queue_->FreeMessage(msg);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800224 }
225
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700226 // And drop the last reference.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800227 queue_->FreeMessage(msg);
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700228 // Now that everything is cleaned up, drop RT priority before destroying the
229 // thread.
Austin Schuh3115a202019-05-27 21:02:14 -0700230 ::aos::UnsetCurrentThreadRealtimePriority();
Parker Schuhe4a70d62017-12-27 20:10:20 -0800231 }
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700232 pthread_t pthread_;
233 ShmEventLoop::ThreadState *thread_state_;
Parker Schuhe4a70d62017-12-27 20:10:20 -0800234 RawQueue *queue_;
Austin Schuh3578a2e2019-05-25 18:17:59 -0700235 int32_t index_;
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700236 bool running_ = false;
Austin Schuh3578a2e2019-05-25 18:17:59 -0700237
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700238 ::std::function<void(const Message *message)> watcher_;
239
240 // Mutex and condition variable used to wait until the thread is started
241 // before going RT.
242 ::aos::Mutex thread_started_mutex_;
243 ::aos::Condition thread_started_condition_{&thread_started_mutex_};
Parker Schuhe4a70d62017-12-27 20:10:20 -0800244};
Neil Balch229001a2018-01-07 18:22:52 -0800245
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700246// Adapter class to adapt a timerfd to a TimerHandler.
Austin Schuh52d325c2019-06-23 18:59:06 -0700247// The part of the API which is accessed by the TimerHandler interface needs to
248// be threadsafe. This means Setup and Disable.
Neil Balch229001a2018-01-07 18:22:52 -0800249class TimerHandlerState : public TimerHandler {
250 public:
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700251 TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
252 : shm_event_loop_(shm_event_loop), fn_(::std::move(fn)) {
253 shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
254 MutexLocker locker(&shm_event_loop_->thread_state_.mutex_);
255 timerfd_.Read();
256 fn_();
257 });
Neil Balch229001a2018-01-07 18:22:52 -0800258 }
259
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700260 ~TimerHandlerState() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
Neil Balch229001a2018-01-07 18:22:52 -0800261
262 void Setup(monotonic_clock::time_point base,
263 monotonic_clock::duration repeat_offset) override {
Austin Schuh52d325c2019-06-23 18:59:06 -0700264 // SetTime is threadsafe already.
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700265 timerfd_.SetTime(base, repeat_offset);
Neil Balch229001a2018-01-07 18:22:52 -0800266 }
267
Austin Schuh52d325c2019-06-23 18:59:06 -0700268 void Disable() override {
269 // Disable is also threadsafe already.
270 timerfd_.Disable();
271 }
Neil Balch229001a2018-01-07 18:22:52 -0800272
273 private:
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700274 ShmEventLoop *shm_event_loop_;
Neil Balch229001a2018-01-07 18:22:52 -0800275
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700276 TimerFd timerfd_;
Neil Balch229001a2018-01-07 18:22:52 -0800277
278 // Function to be run on the thread
279 ::std::function<void()> fn_;
280};
Austin Schuh52d325c2019-06-23 18:59:06 -0700281
282// Adapter class to the timerfd and PhasedLoop.
283// The part of the API which is accessed by the PhasedLoopHandler interface
284// needs to be threadsafe. This means set_interval_and_offset
285class PhasedLoopHandler : public ::aos::PhasedLoopHandler {
286 public:
287 PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
288 const monotonic_clock::duration interval,
289 const monotonic_clock::duration offset)
290 : shm_event_loop_(shm_event_loop),
291 phased_loop_(interval, shm_event_loop_->monotonic_now(), offset),
292 fn_(::std::move(fn)) {
293 shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
294 MutexLocker locker(&shm_event_loop_->thread_state_.mutex_);
295 {
296 MutexLocker locker(&mutex_);
297 timerfd_.Read();
298 }
299 // Call the function. To avoid needing a recursive mutex, drop the lock
300 // before running the function.
301 fn_(cycles_elapsed_);
302 {
303 MutexLocker locker(&mutex_);
304 Reschedule();
305 }
306 });
307 }
308
309 ~PhasedLoopHandler() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
310
311 void set_interval_and_offset(
312 const monotonic_clock::duration interval,
313 const monotonic_clock::duration offset) override {
314 MutexLocker locker(&mutex_);
315 phased_loop_.set_interval_and_offset(interval, offset);
316 }
317
318 void Startup() {
319 MutexLocker locker(&mutex_);
320 phased_loop_.Reset(shm_event_loop_->monotonic_now());
321 Reschedule();
322 }
323
324 private:
325 // Reschedules the timer. Must be called with the mutex held.
326 void Reschedule() {
327 cycles_elapsed_ = phased_loop_.Iterate(shm_event_loop_->monotonic_now());
328 timerfd_.SetTime(phased_loop_.sleep_time(), ::aos::monotonic_clock::zero());
329 }
330
331 ShmEventLoop *shm_event_loop_;
332
333 // Mutex to protect access to the timerfd_ (not strictly necessary), and the
334 // phased_loop (necessary).
335 ::aos::Mutex mutex_;
336
337 TimerFd timerfd_;
338 time::PhasedLoop phased_loop_;
339
340 int cycles_elapsed_ = 1;
341
342 // Function to be run
343 const ::std::function<void(int)> fn_;
344};
Parker Schuhe4a70d62017-12-27 20:10:20 -0800345} // namespace internal
346
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700347::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
348 const ::std::string &path, const QueueTypeInfo &type) {
349 return ::std::unique_ptr<RawFetcher>(new ShmFetcher(
Parker Schuhe4a70d62017-12-27 20:10:20 -0800350 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
351}
352
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700353::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
354 const ::std::string &path, const QueueTypeInfo &type) {
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800355 Take(path);
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700356 return ::std::unique_ptr<RawSender>(new ShmSender(
Parker Schuhe4a70d62017-12-27 20:10:20 -0800357 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
358}
359
360void ShmEventLoop::MakeRawWatcher(
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700361 const ::std::string &path, const QueueTypeInfo &type,
362 ::std::function<void(const Message *message)> watcher) {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800363 Take(path);
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700364 ::std::unique_ptr<internal::WatcherThreadState> state(
365 new internal::WatcherThreadState(
366 &thread_state_, RawQueue::Fetch(path.c_str(), type.size, type.hash,
367 type.queue_length),
368 std::move(watcher)));
369 watchers_.push_back(::std::move(state));
Parker Schuhe4a70d62017-12-27 20:10:20 -0800370}
371
Neil Balch229001a2018-01-07 18:22:52 -0800372TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700373 ::std::unique_ptr<internal::TimerHandlerState> timer(
374 new internal::TimerHandlerState(this, ::std::move(callback)));
Neil Balch229001a2018-01-07 18:22:52 -0800375
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700376 timers_.push_back(::std::move(timer));
Neil Balch229001a2018-01-07 18:22:52 -0800377
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700378 return timers_.back().get();
Neil Balch229001a2018-01-07 18:22:52 -0800379}
380
Austin Schuh52d325c2019-06-23 18:59:06 -0700381PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
382 ::std::function<void(int)> callback,
383 const monotonic_clock::duration interval,
384 const monotonic_clock::duration offset) {
385 ::std::unique_ptr<internal::PhasedLoopHandler> phased_loop(
386 new internal::PhasedLoopHandler(this, ::std::move(callback), interval,
387 offset));
388
389 phased_loops_.push_back(::std::move(phased_loop));
390
391 return phased_loops_.back().get();
392}
393
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700394void ShmEventLoop::OnRun(::std::function<void()> on_run) {
395 on_run_.push_back(::std::move(on_run));
Parker Schuhe4a70d62017-12-27 20:10:20 -0800396}
397
Austin Schuh0fc3b3e2019-06-29 13:56:21 -0700398void ShmEventLoop::set_name(const char *name) { thread_state_.name_ = name; }
399
Parker Schuhe4a70d62017-12-27 20:10:20 -0800400void ShmEventLoop::Run() {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700401 // Start all the watcher threads.
402 for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
403 watcher->Start();
404 }
405
Austin Schuh0fc3b3e2019-06-29 13:56:21 -0700406 ::aos::SetCurrentThreadName(thread_state_.name());
407
Austin Schuh9fe68f72019-08-10 19:32:03 -0700408 // Now, all the threads are up. Lock everything into memory and go RT.
409 if (thread_state_.priority_ != -1) {
410 ::aos::InitRT();
411 }
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700412 thread_state_.MaybeSetCurrentThreadRealtimePriority();
Parker Schuhe4a70d62017-12-27 20:10:20 -0800413 set_is_running(true);
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700414
415 // Now that we are realtime (but before the OnRun handlers run), snap the
416 // queue index.
417 for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
418 watcher->GrabQueueIndex();
419 }
420
421 // Now that we are RT, run all the OnRun handlers.
422 for (const auto &run : on_run_) {
423 run();
424 }
Austin Schuh52d325c2019-06-23 18:59:06 -0700425
426 // Start up all the phased loops.
427 for (::std::unique_ptr<internal::PhasedLoopHandler> &phased_loop :
428 phased_loops_) {
429 phased_loop->Startup();
430 }
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700431 // TODO(austin): We don't need a separate watcher thread if there are only
432 // watchers and fetchers. Could lazily create the epoll loop and pick a
433 // victim watcher to run in this thread.
434 // Trigger all the threads to start now.
435 thread_state_.Start();
436
437 // And start our main event loop which runs all the timers and handles Quit.
438 epoll_.Run();
439
440 // Once epoll exits, there is no useful nonrt work left to do.
441 set_is_running(false);
442
443 // Signal all the watcher threads to exit. After this point, no more
444 // callbacks will be handled.
445 thread_state_.Exit();
446
447 // Nothing time or synchronization critical needs to happen after this point.
448 // Drop RT priority.
Austin Schuh3115a202019-05-27 21:02:14 -0700449 ::aos::UnsetCurrentThreadRealtimePriority();
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700450
451 // The watcher threads get cleaned up in the destructor.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800452}
453
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700454void ShmEventLoop::ThreadState::Start() {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800455 MutexLocker locker(&mutex_);
456 loop_running_ = true;
457 if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
458 loop_running_cond_.Broadcast();
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700459}
460
461void ShmEventLoop::ThreadState::WaitForStart() {
462 MutexLocker locker(&mutex_);
463 while (!(loop_running_ || loop_finished_)) {
464 Condition::WaitResult wait_result =
465 loop_running_cond_.WaitTimed(chrono::milliseconds(1000));
466 if (wait_result == Condition::WaitResult::kOwnerDied) {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800467 ::aos::Die("ShmEventLoop mutex lock problem.\n");
468 }
469 }
470}
471
Austin Schuh3115a202019-05-27 21:02:14 -0700472void ShmEventLoop::ThreadState::MaybeSetCurrentThreadRealtimePriority() {
473 if (priority_ != -1) {
474 ::aos::SetCurrentThreadRealtimePriority(priority_);
475 }
476}
477
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700478void ShmEventLoop::Exit() { epoll_.Quit(); }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800479
480void ShmEventLoop::ThreadState::Exit() {
481 IPCRecursiveMutexLocker locker(&mutex_);
482 if (locker.owner_died()) ::aos::Die("Owner died");
483 loop_running_ = false;
484 loop_finished_ = true;
485 loop_running_cond_.Broadcast();
486}
487
488ShmEventLoop::~ShmEventLoop() {
489 if (is_running()) {
490 ::aos::Die("ShmEventLoop destroyed while running\n");
491 }
492}
493
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700494void ShmEventLoop::Take(const ::std::string &path) {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800495 if (is_running()) {
496 ::aos::Die("Cannot add new objects while running.\n");
497 }
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800498
499 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
500 if (prior != taken_.end()) {
501 ::aos::Die("%s is already being used.", path.c_str());
502 } else {
503 taken_.emplace_back(path);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800504 }
505}
506
507} // namespace aos