Run all timers in the main thread with epoll
This ditches 1+ threads in all use cases.
Change-Id: I8772a539c20cd10d80a57f3f53c48a3d8124461a
diff --git a/aos/events/shm-event-loop.cc b/aos/events/shm-event-loop.cc
index ee71b4c..08f9e1f 100644
--- a/aos/events/shm-event-loop.cc
+++ b/aos/events/shm-event-loop.cc
@@ -6,15 +6,19 @@
#include <chrono>
#include <stdexcept>
+#include "aos/events/epoll.h"
#include "aos/init.h"
#include "aos/logging/logging.h"
#include "aos/queue.h"
namespace aos {
-ShmEventLoop::ShmEventLoop() : thread_state_(std::make_shared<ThreadState>()) {}
+ShmEventLoop::ShmEventLoop() {}
namespace {
+
+namespace chrono = ::std::chrono;
+
class ShmFetcher : public RawFetcher {
public:
explicit ShmFetcher(RawQueue *queue) : queue_(queue) {
@@ -79,13 +83,13 @@
public:
explicit ShmSender(RawQueue *queue) : queue_(queue) {}
- aos::Message *GetMessage() override {
- return reinterpret_cast<aos::Message *>(queue_->GetMessage());
+ ::aos::Message *GetMessage() override {
+ return reinterpret_cast<::aos::Message *>(queue_->GetMessage());
}
- void Free(aos::Message *msg) override { queue_->FreeMessage(msg); }
+ void Free(::aos::Message *msg) override { queue_->FreeMessage(msg); }
- bool Send(aos::Message *msg) override {
+ bool Send(::aos::Message *msg) override {
assert(queue_ != nullptr);
{
// TODO(austin): This lets multiple senders reorder messages since time
@@ -102,18 +106,60 @@
private:
RawQueue *queue_;
};
+
} // namespace
namespace internal {
+
+// Class to manage the state for a Watcher.
class WatcherThreadState {
public:
- WatcherThreadState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
- RawQueue *queue,
- std::function<void(const aos::Message *message)> watcher)
- : thread_state_(std::move(thread_state)),
+ WatcherThreadState(
+ ShmEventLoop::ThreadState *thread_state, RawQueue *queue,
+ ::std::function<void(const ::aos::Message *message)> watcher)
+ : thread_state_(thread_state),
queue_(queue),
index_(0),
- watcher_(std::move(watcher)) {
+ watcher_(::std::move(watcher)) {}
+
+ ~WatcherThreadState() {
+ // Only kill the thread if it is running.
+ if (running_) {
+ // TODO(austin): CHECK that we aren't RT here.
+
+ // Try joining. If we fail, we weren't asleep on the condition in the
+ // queue. So hit it again and again until that's true.
+ struct timespec end_time;
+ PCHECK(clock_gettime(CLOCK_REALTIME, &end_time) == 0);
+ while (true) {
+ void *retval = nullptr;
+ end_time.tv_nsec += 100000000;
+ if (end_time.tv_nsec > 1000000000L) {
+ end_time.tv_nsec -= 1000000000L;
+ ++end_time.tv_sec;
+ }
+ int ret = pthread_timedjoin_np(pthread_, &retval, &end_time);
+ if (ret == ETIMEDOUT) continue;
+ PCHECK(ret == 0);
+ break;
+ }
+ }
+ }
+
+ // Starts the thread and waits until it is running.
+ void Start() {
+ PCHECK(pthread_create(&pthread_, nullptr, &StaticRun, this) == 0);
+ IPCRecursiveMutexLocker locker(&thread_started_mutex_);
+ if (locker.owner_died()) ::aos::Die("Owner died");
+ while (!running_) {
+ CHECK(!thread_started_condition_.Wait());
+ }
+ }
+
+ void GrabQueueIndex() {
+ // Right after we are signaled to start, point index to the current index
+ // so we don't read any messages received before now. Otherwise we will
+ // get a significantly delayed read.
static constexpr Options<RawQueue> kOptions =
RawQueue::kFromEnd | RawQueue::kNonBlock;
const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
@@ -122,10 +168,31 @@
}
}
+ private:
+ // Runs Run given a WatcherThreadState as the argument. This is an adapter
+ // between pthreads and Run.
+ static void *StaticRun(void *arg) {
+ WatcherThreadState *watcher_thread_state =
+ reinterpret_cast<WatcherThreadState *>(arg);
+ watcher_thread_state->Run();
+ return nullptr;
+ }
+
+ // Runs the watcher callback on new messages.
void Run() {
+ // Signal the main thread that we are now ready.
thread_state_->MaybeSetCurrentThreadRealtimePriority();
+ {
+ IPCRecursiveMutexLocker locker(&thread_started_mutex_);
+ if (locker.owner_died()) ::aos::Die("Owner died");
+ running_ = true;
+ thread_started_condition_.Broadcast();
+ }
+
+ // Wait for the global start before handling events.
thread_state_->WaitForStart();
+ // Bail immediately if we are supposed to stop.
if (!thread_state_->is_running()) {
::aos::UnsetCurrentThreadRealtimePriority();
return;
@@ -133,10 +200,18 @@
const void *msg = nullptr;
while (true) {
- msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_);
- assert(msg != nullptr);
+ msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_,
+ chrono::seconds(1));
+ // We hit a timeout. Confirm that we should be running and retry. Note,
+ // is_running is threadsafe (it's an atomic underneath). Worst case, we
+ // check again in a second.
+ if (msg == nullptr) {
+ if (!thread_state_->is_running()) break;
+ continue;
+ }
{
+ // Grab the lock so that only one callback can be called at a time.
MutexLocker locker(&thread_state_->mutex_);
if (!thread_state_->is_running()) break;
@@ -144,142 +219,155 @@
// watcher_ may have exited the event loop.
if (!thread_state_->is_running()) break;
}
+ // Drop the reference.
queue_->FreeMessage(msg);
}
+ // And drop the last reference.
queue_->FreeMessage(msg);
+ // Now that everything is cleaned up, drop RT priority before destroying the
+ // thread.
::aos::UnsetCurrentThreadRealtimePriority();
}
-
- private:
- std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
+ pthread_t pthread_;
+ ShmEventLoop::ThreadState *thread_state_;
RawQueue *queue_;
int32_t index_;
+ bool running_ = false;
- std::function<void(const Message *message)> watcher_;
+ ::std::function<void(const Message *message)> watcher_;
+
+ // Mutex and condition variable used to wait until the thread is started
+ // before going RT.
+ ::aos::Mutex thread_started_mutex_;
+ ::aos::Condition thread_started_condition_{&thread_started_mutex_};
};
+// Adapter class to adapt a timerfd to a TimerHandler.
class TimerHandlerState : public TimerHandler {
public:
- TimerHandlerState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
- ::std::function<void()> fn)
- : thread_state_(std::move(thread_state)), fn_(::std::move(fn)) {
- fd_ = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
- PCHECK(fd_ != -1);
+ TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
+ : shm_event_loop_(shm_event_loop), fn_(::std::move(fn)) {
+ shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
+ MutexLocker locker(&shm_event_loop_->thread_state_.mutex_);
+ timerfd_.Read();
+ fn_();
+ });
}
- ~TimerHandlerState() {
- PCHECK(close(fd_) == 0);
- }
+ ~TimerHandlerState() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
void Setup(monotonic_clock::time_point base,
monotonic_clock::duration repeat_offset) override {
- struct itimerspec new_value;
- new_value.it_interval = ::aos::time::to_timespec(repeat_offset);
- new_value.it_value = ::aos::time::to_timespec(base);
- PCHECK(timerfd_settime(fd_, TFD_TIMER_ABSTIME, &new_value, nullptr) == 0);
+ timerfd_.SetTime(base, repeat_offset);
}
- void Disable() override {
- // Disarm the timer by feeding zero values
- Setup(::aos::monotonic_clock::epoch(), ::aos::monotonic_clock::zero());
- }
-
- void Run() {
- thread_state_->MaybeSetCurrentThreadRealtimePriority();
- thread_state_->WaitForStart();
-
- while (true) {
- uint64_t buf;
- ssize_t result = read(fd_, &buf, sizeof(buf));
- PCHECK(result != -1);
- CHECK_EQ(result, static_cast<int>(sizeof(buf)));
-
- {
- MutexLocker locker(&thread_state_->mutex_);
- if (!thread_state_->is_running()) break;
- fn_();
- // fn_ may have exited the event loop.
- if (!thread_state_->is_running()) break;
- }
- }
- ::aos::UnsetCurrentThreadRealtimePriority();
- }
+ void Disable() override { timerfd_.Disable(); }
private:
- std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
+ ShmEventLoop *shm_event_loop_;
- // File descriptor for the timer
- int fd_;
+ TimerFd timerfd_;
// Function to be run on the thread
::std::function<void()> fn_;
};
} // namespace internal
-std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
- const std::string &path, const QueueTypeInfo &type) {
- return std::unique_ptr<RawFetcher>(new ShmFetcher(
+::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
+ const ::std::string &path, const QueueTypeInfo &type) {
+ return ::std::unique_ptr<RawFetcher>(new ShmFetcher(
RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
}
-std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
- const std::string &path, const QueueTypeInfo &type) {
+::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
+ const ::std::string &path, const QueueTypeInfo &type) {
Take(path);
- return std::unique_ptr<RawSender>(new ShmSender(
+ return ::std::unique_ptr<RawSender>(new ShmSender(
RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
}
void ShmEventLoop::MakeRawWatcher(
- const std::string &path, const QueueTypeInfo &type,
- std::function<void(const Message *message)> watcher) {
+ const ::std::string &path, const QueueTypeInfo &type,
+ ::std::function<void(const Message *message)> watcher) {
Take(path);
- auto *state = new internal::WatcherThreadState(
- thread_state_,
- RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length),
- std::move(watcher));
-
- std::thread thread([state] {
- state->Run();
- delete state;
- });
- thread.detach();
+ ::std::unique_ptr<internal::WatcherThreadState> state(
+ new internal::WatcherThreadState(
+ &thread_state_, RawQueue::Fetch(path.c_str(), type.size, type.hash,
+ type.queue_length),
+ std::move(watcher)));
+ watchers_.push_back(::std::move(state));
}
TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
- internal::TimerHandlerState *timer =
- new internal::TimerHandlerState(thread_state_, ::std::move(callback));
+ ::std::unique_ptr<internal::TimerHandlerState> timer(
+ new internal::TimerHandlerState(this, ::std::move(callback)));
- ::std::thread t([timer] {
- timer->Run();
- delete timer;
- });
- t.detach();
+ timers_.push_back(::std::move(timer));
- return timer;
+ return timers_.back().get();
}
-void ShmEventLoop::OnRun(std::function<void()> on_run) {
- on_run_.push_back(std::move(on_run));
+void ShmEventLoop::OnRun(::std::function<void()> on_run) {
+ on_run_.push_back(::std::move(on_run));
}
void ShmEventLoop::Run() {
- thread_state_->MaybeSetCurrentThreadRealtimePriority();
+ // Start all the watcher threads.
+ for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
+ watcher->Start();
+ }
+
+ // Now, all the threads are up. Go RT.
+ thread_state_.MaybeSetCurrentThreadRealtimePriority();
set_is_running(true);
- for (const auto &run : on_run_) run();
- // TODO(austin): epoll event loop in main thread (if needed), and async safe
- // quit handler.
- thread_state_->Run();
+
+ // Now that we are realtime (but before the OnRun handlers run), snap the
+ // queue index.
+ for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
+ watcher->GrabQueueIndex();
+ }
+
+ // Now that we are RT, run all the OnRun handlers.
+ for (const auto &run : on_run_) {
+ run();
+ }
+ // TODO(austin): We don't need a separate watcher thread if there are only
+ // watchers and fetchers. Could lazily create the epoll loop and pick a
+ // victim watcher to run in this thread.
+ // Trigger all the threads to start now.
+ thread_state_.Start();
+
+ // And start our main event loop which runs all the timers and handles Quit.
+ epoll_.Run();
+
+ // Once epoll exits, there is no useful nonrt work left to do.
+ set_is_running(false);
+
+ // Signal all the watcher threads to exit. After this point, no more
+ // callbacks will be handled.
+ thread_state_.Exit();
+
+ // Nothing time or synchronization critical needs to happen after this point.
+ // Drop RT priority.
::aos::UnsetCurrentThreadRealtimePriority();
+
+ // The watcher threads get cleaned up in the destructor.
}
-void ShmEventLoop::ThreadState::Run() {
+void ShmEventLoop::ThreadState::Start() {
MutexLocker locker(&mutex_);
loop_running_ = true;
if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
loop_running_cond_.Broadcast();
- while (loop_running_) {
- if (loop_running_cond_.Wait()) {
+}
+
+void ShmEventLoop::ThreadState::WaitForStart() {
+ MutexLocker locker(&mutex_);
+ while (!(loop_running_ || loop_finished_)) {
+ Condition::WaitResult wait_result =
+ loop_running_cond_.WaitTimed(chrono::milliseconds(1000));
+ if (wait_result == Condition::WaitResult::kOwnerDied) {
::aos::Die("ShmEventLoop mutex lock problem.\n");
}
}
@@ -291,19 +379,7 @@
}
}
-void ShmEventLoop::ThreadState::WaitForStart() {
- MutexLocker locker(&mutex_);
- while (!(loop_running_ || loop_finished_)) {
- if (loop_running_cond_.Wait()) {
- ::aos::Die("ShmEventLoop mutex lock problem.\n");
- }
- }
-}
-
-void ShmEventLoop::Exit() {
- set_is_running(false);
- thread_state_->Exit();
-}
+void ShmEventLoop::Exit() { epoll_.Quit(); }
void ShmEventLoop::ThreadState::Exit() {
IPCRecursiveMutexLocker locker(&mutex_);
@@ -319,7 +395,7 @@
}
}
-void ShmEventLoop::Take(const std::string &path) {
+void ShmEventLoop::Take(const ::std::string &path) {
if (is_running()) {
::aos::Die("Cannot add new objects while running.\n");
}