blob: 08f9e1f1276c86cbe3f8edafe66cb751e0262740 [file] [log] [blame]
Parker Schuhe4a70d62017-12-27 20:10:20 -08001#include "aos/events/shm-event-loop.h"
Parker Schuhe4a70d62017-12-27 20:10:20 -08002
Neil Balch229001a2018-01-07 18:22:52 -08003#include <sys/timerfd.h>
Austin Schuh81fc9cc2019-02-02 23:25:47 -08004#include <algorithm>
Parker Schuhe4a70d62017-12-27 20:10:20 -08005#include <atomic>
6#include <chrono>
7#include <stdexcept>
8
Austin Schuh6b6dfa52019-06-12 20:16:20 -07009#include "aos/events/epoll.h"
Austin Schuh3115a202019-05-27 21:02:14 -070010#include "aos/init.h"
Austin Schuh81fc9cc2019-02-02 23:25:47 -080011#include "aos/logging/logging.h"
12#include "aos/queue.h"
13
Parker Schuhe4a70d62017-12-27 20:10:20 -080014namespace aos {
15
Austin Schuh6b6dfa52019-06-12 20:16:20 -070016ShmEventLoop::ShmEventLoop() {}
Parker Schuhe4a70d62017-12-27 20:10:20 -080017
18namespace {
Austin Schuh6b6dfa52019-06-12 20:16:20 -070019
20namespace chrono = ::std::chrono;
21
Parker Schuhe4a70d62017-12-27 20:10:20 -080022class ShmFetcher : public RawFetcher {
23 public:
Austin Schuhbbce72d2019-05-26 15:11:46 -070024 explicit ShmFetcher(RawQueue *queue) : queue_(queue) {
25 // Move index_ to point to the end of the queue as it is at construction
26 // time. Also grab the oldest message but don't expose it to the user yet.
27 static constexpr Options<RawQueue> kOptions =
28 RawQueue::kFromEnd | RawQueue::kNonBlock;
29 msg_ = static_cast<const FetchValue *>(
30 queue_->ReadMessageIndex(kOptions, &index_));
31 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080032 ~ShmFetcher() {
33 if (msg_) {
34 queue_->FreeMessage(msg_);
35 }
36 }
37
James Kuszmaulc79768b2019-02-18 15:08:44 -080038 bool FetchNext() override {
39 const FetchValue *msg = static_cast<const FetchValue *>(
40 queue_->ReadMessageIndex(RawQueue::kNonBlock, &index_));
41 // Only update the internal pointer if we got a new message.
Austin Schuhbbce72d2019-05-26 15:11:46 -070042 if (msg != nullptr) {
James Kuszmaulc79768b2019-02-18 15:08:44 -080043 queue_->FreeMessage(msg_);
44 msg_ = msg;
45 set_most_recent(msg_);
46 }
Austin Schuhbbce72d2019-05-26 15:11:46 -070047 return msg != nullptr;
James Kuszmaulc79768b2019-02-18 15:08:44 -080048 }
49
50 bool Fetch() override {
Parker Schuhe4a70d62017-12-27 20:10:20 -080051 static constexpr Options<RawQueue> kOptions =
52 RawQueue::kFromEnd | RawQueue::kNonBlock;
53 const FetchValue *msg = static_cast<const FetchValue *>(
54 queue_->ReadMessageIndex(kOptions, &index_));
55 // Only update the internal pointer if we got a new message.
Austin Schuhbbce72d2019-05-26 15:11:46 -070056 if (msg != nullptr && msg != msg_) {
Parker Schuhe4a70d62017-12-27 20:10:20 -080057 queue_->FreeMessage(msg_);
58 msg_ = msg;
59 set_most_recent(msg_);
60 return true;
Austin Schuhbbce72d2019-05-26 15:11:46 -070061 } else {
62 // The message has to get freed if we didn't use it (and
63 // RawQueue::FreeMessage is ok to call on nullptr).
64 queue_->FreeMessage(msg);
65
66 // We have a message from construction time. Give it to the user now.
67 if (msg_ != nullptr && most_recent() != msg_) {
68 set_most_recent(msg_);
69 return true;
70 } else {
71 return false;
72 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080073 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080074 }
75
76 private:
77 int index_ = 0;
78 RawQueue *queue_;
79 const FetchValue *msg_ = nullptr;
80};
81
82class ShmSender : public RawSender {
83 public:
84 explicit ShmSender(RawQueue *queue) : queue_(queue) {}
85
Austin Schuh6b6dfa52019-06-12 20:16:20 -070086 ::aos::Message *GetMessage() override {
87 return reinterpret_cast<::aos::Message *>(queue_->GetMessage());
Parker Schuhe4a70d62017-12-27 20:10:20 -080088 }
89
Austin Schuh6b6dfa52019-06-12 20:16:20 -070090 void Free(::aos::Message *msg) override { queue_->FreeMessage(msg); }
Parker Schuhe4a70d62017-12-27 20:10:20 -080091
Austin Schuh6b6dfa52019-06-12 20:16:20 -070092 bool Send(::aos::Message *msg) override {
Austin Schuhbbce72d2019-05-26 15:11:46 -070093 assert(queue_ != nullptr);
Austin Schuh7267c532019-05-19 19:55:53 -070094 {
Austin Schuh7267c532019-05-19 19:55:53 -070095 // TODO(austin): This lets multiple senders reorder messages since time
96 // isn't acquired with a lock held.
James Kuszmaulcd1db352019-05-26 16:42:29 -070097 if (msg->sent_time == monotonic_clock::min_time) {
98 msg->sent_time = monotonic_clock::now();
Austin Schuh7267c532019-05-19 19:55:53 -070099 }
100 }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800101 return queue_->WriteMessage(msg, RawQueue::kOverride);
102 }
103
Austin Schuhd681bbd2019-02-02 12:03:32 -0800104 const char *name() const override { return queue_->name(); }
105
Parker Schuhe4a70d62017-12-27 20:10:20 -0800106 private:
107 RawQueue *queue_;
108};
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700109
Parker Schuhe4a70d62017-12-27 20:10:20 -0800110} // namespace
111
112namespace internal {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700113
114// Class to manage the state for a Watcher.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800115class WatcherThreadState {
116 public:
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700117 WatcherThreadState(
118 ShmEventLoop::ThreadState *thread_state, RawQueue *queue,
119 ::std::function<void(const ::aos::Message *message)> watcher)
120 : thread_state_(thread_state),
Parker Schuhe4a70d62017-12-27 20:10:20 -0800121 queue_(queue),
Austin Schuh3578a2e2019-05-25 18:17:59 -0700122 index_(0),
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700123 watcher_(::std::move(watcher)) {}
124
125 ~WatcherThreadState() {
126 // Only kill the thread if it is running.
127 if (running_) {
128 // TODO(austin): CHECK that we aren't RT here.
129
130 // Try joining. If we fail, we weren't asleep on the condition in the
131 // queue. So hit it again and again until that's true.
132 struct timespec end_time;
133 PCHECK(clock_gettime(CLOCK_REALTIME, &end_time) == 0);
134 while (true) {
135 void *retval = nullptr;
136 end_time.tv_nsec += 100000000;
137 if (end_time.tv_nsec > 1000000000L) {
138 end_time.tv_nsec -= 1000000000L;
139 ++end_time.tv_sec;
140 }
141 int ret = pthread_timedjoin_np(pthread_, &retval, &end_time);
142 if (ret == ETIMEDOUT) continue;
143 PCHECK(ret == 0);
144 break;
145 }
146 }
147 }
148
149 // Starts the thread and waits until it is running.
150 void Start() {
151 PCHECK(pthread_create(&pthread_, nullptr, &StaticRun, this) == 0);
152 IPCRecursiveMutexLocker locker(&thread_started_mutex_);
153 if (locker.owner_died()) ::aos::Die("Owner died");
154 while (!running_) {
155 CHECK(!thread_started_condition_.Wait());
156 }
157 }
158
159 void GrabQueueIndex() {
160 // Right after we are signaled to start, point index to the current index
161 // so we don't read any messages received before now. Otherwise we will
162 // get a significantly delayed read.
Austin Schuh3578a2e2019-05-25 18:17:59 -0700163 static constexpr Options<RawQueue> kOptions =
164 RawQueue::kFromEnd | RawQueue::kNonBlock;
165 const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
166 if (msg) {
167 queue_->FreeMessage(msg);
168 }
169 }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800170
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700171 private:
172 // Runs Run given a WatcherThreadState as the argument. This is an adapter
173 // between pthreads and Run.
174 static void *StaticRun(void *arg) {
175 WatcherThreadState *watcher_thread_state =
176 reinterpret_cast<WatcherThreadState *>(arg);
177 watcher_thread_state->Run();
178 return nullptr;
179 }
180
181 // Runs the watcher callback on new messages.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800182 void Run() {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700183 // Signal the main thread that we are now ready.
Austin Schuh3115a202019-05-27 21:02:14 -0700184 thread_state_->MaybeSetCurrentThreadRealtimePriority();
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700185 {
186 IPCRecursiveMutexLocker locker(&thread_started_mutex_);
187 if (locker.owner_died()) ::aos::Die("Owner died");
188 running_ = true;
189 thread_started_condition_.Broadcast();
190 }
191
192 // Wait for the global start before handling events.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800193 thread_state_->WaitForStart();
194
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700195 // Bail immediately if we are supposed to stop.
Austin Schuh3115a202019-05-27 21:02:14 -0700196 if (!thread_state_->is_running()) {
197 ::aos::UnsetCurrentThreadRealtimePriority();
198 return;
199 }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800200
Austin Schuh3578a2e2019-05-25 18:17:59 -0700201 const void *msg = nullptr;
Parker Schuhe4a70d62017-12-27 20:10:20 -0800202 while (true) {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700203 msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_,
204 chrono::seconds(1));
205 // We hit a timeout. Confirm that we should be running and retry. Note,
206 // is_running is threadsafe (it's an atomic underneath). Worst case, we
207 // check again in a second.
208 if (msg == nullptr) {
209 if (!thread_state_->is_running()) break;
210 continue;
211 }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800212
213 {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700214 // Grab the lock so that only one callback can be called at a time.
Neil Balch229001a2018-01-07 18:22:52 -0800215 MutexLocker locker(&thread_state_->mutex_);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800216 if (!thread_state_->is_running()) break;
217
218 watcher_(reinterpret_cast<const Message *>(msg));
219 // watcher_ may have exited the event loop.
220 if (!thread_state_->is_running()) break;
221 }
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700222 // Drop the reference.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800223 queue_->FreeMessage(msg);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800224 }
225
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700226 // And drop the last reference.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800227 queue_->FreeMessage(msg);
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700228 // Now that everything is cleaned up, drop RT priority before destroying the
229 // thread.
Austin Schuh3115a202019-05-27 21:02:14 -0700230 ::aos::UnsetCurrentThreadRealtimePriority();
Parker Schuhe4a70d62017-12-27 20:10:20 -0800231 }
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700232 pthread_t pthread_;
233 ShmEventLoop::ThreadState *thread_state_;
Parker Schuhe4a70d62017-12-27 20:10:20 -0800234 RawQueue *queue_;
Austin Schuh3578a2e2019-05-25 18:17:59 -0700235 int32_t index_;
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700236 bool running_ = false;
Austin Schuh3578a2e2019-05-25 18:17:59 -0700237
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700238 ::std::function<void(const Message *message)> watcher_;
239
240 // Mutex and condition variable used to wait until the thread is started
241 // before going RT.
242 ::aos::Mutex thread_started_mutex_;
243 ::aos::Condition thread_started_condition_{&thread_started_mutex_};
Parker Schuhe4a70d62017-12-27 20:10:20 -0800244};
Neil Balch229001a2018-01-07 18:22:52 -0800245
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700246// Adapter class to adapt a timerfd to a TimerHandler.
Neil Balch229001a2018-01-07 18:22:52 -0800247class TimerHandlerState : public TimerHandler {
248 public:
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700249 TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
250 : shm_event_loop_(shm_event_loop), fn_(::std::move(fn)) {
251 shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
252 MutexLocker locker(&shm_event_loop_->thread_state_.mutex_);
253 timerfd_.Read();
254 fn_();
255 });
Neil Balch229001a2018-01-07 18:22:52 -0800256 }
257
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700258 ~TimerHandlerState() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
Neil Balch229001a2018-01-07 18:22:52 -0800259
260 void Setup(monotonic_clock::time_point base,
261 monotonic_clock::duration repeat_offset) override {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700262 timerfd_.SetTime(base, repeat_offset);
Neil Balch229001a2018-01-07 18:22:52 -0800263 }
264
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700265 void Disable() override { timerfd_.Disable(); }
Neil Balch229001a2018-01-07 18:22:52 -0800266
267 private:
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700268 ShmEventLoop *shm_event_loop_;
Neil Balch229001a2018-01-07 18:22:52 -0800269
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700270 TimerFd timerfd_;
Neil Balch229001a2018-01-07 18:22:52 -0800271
272 // Function to be run on the thread
273 ::std::function<void()> fn_;
274};
Parker Schuhe4a70d62017-12-27 20:10:20 -0800275} // namespace internal
276
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700277::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
278 const ::std::string &path, const QueueTypeInfo &type) {
279 return ::std::unique_ptr<RawFetcher>(new ShmFetcher(
Parker Schuhe4a70d62017-12-27 20:10:20 -0800280 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
281}
282
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700283::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
284 const ::std::string &path, const QueueTypeInfo &type) {
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800285 Take(path);
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700286 return ::std::unique_ptr<RawSender>(new ShmSender(
Parker Schuhe4a70d62017-12-27 20:10:20 -0800287 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
288}
289
290void ShmEventLoop::MakeRawWatcher(
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700291 const ::std::string &path, const QueueTypeInfo &type,
292 ::std::function<void(const Message *message)> watcher) {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800293 Take(path);
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700294 ::std::unique_ptr<internal::WatcherThreadState> state(
295 new internal::WatcherThreadState(
296 &thread_state_, RawQueue::Fetch(path.c_str(), type.size, type.hash,
297 type.queue_length),
298 std::move(watcher)));
299 watchers_.push_back(::std::move(state));
Parker Schuhe4a70d62017-12-27 20:10:20 -0800300}
301
Neil Balch229001a2018-01-07 18:22:52 -0800302TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700303 ::std::unique_ptr<internal::TimerHandlerState> timer(
304 new internal::TimerHandlerState(this, ::std::move(callback)));
Neil Balch229001a2018-01-07 18:22:52 -0800305
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700306 timers_.push_back(::std::move(timer));
Neil Balch229001a2018-01-07 18:22:52 -0800307
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700308 return timers_.back().get();
Neil Balch229001a2018-01-07 18:22:52 -0800309}
310
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700311void ShmEventLoop::OnRun(::std::function<void()> on_run) {
312 on_run_.push_back(::std::move(on_run));
Parker Schuhe4a70d62017-12-27 20:10:20 -0800313}
314
315void ShmEventLoop::Run() {
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700316 // Start all the watcher threads.
317 for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
318 watcher->Start();
319 }
320
321 // Now, all the threads are up. Go RT.
322 thread_state_.MaybeSetCurrentThreadRealtimePriority();
Parker Schuhe4a70d62017-12-27 20:10:20 -0800323 set_is_running(true);
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700324
325 // Now that we are realtime (but before the OnRun handlers run), snap the
326 // queue index.
327 for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
328 watcher->GrabQueueIndex();
329 }
330
331 // Now that we are RT, run all the OnRun handlers.
332 for (const auto &run : on_run_) {
333 run();
334 }
335 // TODO(austin): We don't need a separate watcher thread if there are only
336 // watchers and fetchers. Could lazily create the epoll loop and pick a
337 // victim watcher to run in this thread.
338 // Trigger all the threads to start now.
339 thread_state_.Start();
340
341 // And start our main event loop which runs all the timers and handles Quit.
342 epoll_.Run();
343
344 // Once epoll exits, there is no useful nonrt work left to do.
345 set_is_running(false);
346
347 // Signal all the watcher threads to exit. After this point, no more
348 // callbacks will be handled.
349 thread_state_.Exit();
350
351 // Nothing time or synchronization critical needs to happen after this point.
352 // Drop RT priority.
Austin Schuh3115a202019-05-27 21:02:14 -0700353 ::aos::UnsetCurrentThreadRealtimePriority();
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700354
355 // The watcher threads get cleaned up in the destructor.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800356}
357
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700358void ShmEventLoop::ThreadState::Start() {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800359 MutexLocker locker(&mutex_);
360 loop_running_ = true;
361 if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
362 loop_running_cond_.Broadcast();
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700363}
364
365void ShmEventLoop::ThreadState::WaitForStart() {
366 MutexLocker locker(&mutex_);
367 while (!(loop_running_ || loop_finished_)) {
368 Condition::WaitResult wait_result =
369 loop_running_cond_.WaitTimed(chrono::milliseconds(1000));
370 if (wait_result == Condition::WaitResult::kOwnerDied) {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800371 ::aos::Die("ShmEventLoop mutex lock problem.\n");
372 }
373 }
374}
375
Austin Schuh3115a202019-05-27 21:02:14 -0700376void ShmEventLoop::ThreadState::MaybeSetCurrentThreadRealtimePriority() {
377 if (priority_ != -1) {
378 ::aos::SetCurrentThreadRealtimePriority(priority_);
379 }
380}
381
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700382void ShmEventLoop::Exit() { epoll_.Quit(); }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800383
384void ShmEventLoop::ThreadState::Exit() {
385 IPCRecursiveMutexLocker locker(&mutex_);
386 if (locker.owner_died()) ::aos::Die("Owner died");
387 loop_running_ = false;
388 loop_finished_ = true;
389 loop_running_cond_.Broadcast();
390}
391
392ShmEventLoop::~ShmEventLoop() {
393 if (is_running()) {
394 ::aos::Die("ShmEventLoop destroyed while running\n");
395 }
396}
397
Austin Schuh6b6dfa52019-06-12 20:16:20 -0700398void ShmEventLoop::Take(const ::std::string &path) {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800399 if (is_running()) {
400 ::aos::Die("Cannot add new objects while running.\n");
401 }
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800402
403 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
404 if (prior != taken_.end()) {
405 ::aos::Die("%s is already being used.", path.c_str());
406 } else {
407 taken_.emplace_back(path);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800408 }
409}
410
411} // namespace aos