Run all timers in the main thread with epoll
This ditches 1+ threads in all use cases.
Change-Id: I8772a539c20cd10d80a57f3f53c48a3d8124461a
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 21e9677..7c2db52 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -1,4 +1,14 @@
cc_library(
+ name = "epoll",
+ srcs = ["epoll.cc"],
+ hdrs = ["epoll.h"],
+ deps = [
+ "//aos/logging",
+ "//aos/time",
+ ],
+)
+
+cc_library(
name = "raw-event-loop",
hdrs = ["raw-event-loop.h"],
deps = [
@@ -28,6 +38,7 @@
hdrs = ["shm-event-loop.h"],
visibility = ["//visibility:public"],
deps = [
+ ":epoll",
":event-loop",
"//aos:init",
"//aos:queues",
@@ -38,6 +49,7 @@
cc_test(
name = "shm-event-loop_test",
srcs = ["shm-event-loop_test.cc"],
+ shard_count = 5,
deps = [
":event-loop_param_test",
":shm-event-loop",
diff --git a/aos/events/epoll.cc b/aos/events/epoll.cc
new file mode 100644
index 0000000..83f44c3
--- /dev/null
+++ b/aos/events/epoll.cc
@@ -0,0 +1,118 @@
+#include "aos/events/epoll.h"
+
+#include <fcntl.h>
+#include <sys/epoll.h>
+#include <sys/timerfd.h>
+#include <unistd.h>
+#include <atomic>
+#include <vector>
+
+#include "aos/logging/logging.h"
+#include "aos/time/time.h"
+
+namespace aos {
+namespace internal {
+
+TimerFd::TimerFd()
+ : fd_(timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK)) {
+ PCHECK(fd_ != -1);
+ Disable();
+}
+
+void TimerFd::SetTime(monotonic_clock::time_point start,
+ monotonic_clock::duration interval) {
+ struct itimerspec new_value;
+ new_value.it_interval = ::aos::time::to_timespec(interval);
+ new_value.it_value = ::aos::time::to_timespec(start);
+ PCHECK(timerfd_settime(fd_, TFD_TIMER_ABSTIME, &new_value, nullptr) == 0);
+}
+
+void TimerFd::Read() {
+ uint64_t buf;
+ ssize_t result = read(fd_, &buf, sizeof(buf));
+ PCHECK(result != -1);
+ CHECK_EQ(result, static_cast<int>(sizeof(buf)));
+}
+
+EPoll::EPoll() : epoll_fd_(epoll_create1(EPOLL_CLOEXEC)) {
+ PCHECK(epoll_fd_ > 0);
+
+ // Create a pipe for the Quit function. We want to use a pipe to be async
+ // safe so this can be called from signal handlers.
+ int pipefd[2];
+ PCHECK(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK) == 0);
+ quit_epoll_fd_ = pipefd[0];
+ quit_signal_fd_ = pipefd[1];
+ // Read the fd when data is sent and set run_ to false.
+ OnReadable(quit_epoll_fd_, [this]() {
+ run_ = false;
+ char buf[1];
+ PCHECK(read(quit_epoll_fd_, &buf[0], 1) == 1);
+ });
+}
+
+EPoll::~EPoll() {
+ // Clean up the quit pipe and epoll fd.
+ DeleteFd(quit_epoll_fd_);
+ close(quit_signal_fd_);
+ close(quit_epoll_fd_);
+ CHECK_EQ(fns_.size(), 0u);
+ close(epoll_fd_);
+}
+
+void EPoll::Run() {
+ while (true) {
+ // Pull a single event out. Infinite timeout if we are supposed to be
+ // running, and 0 length timeout otherwise. This lets us flush the event
+ // queue before quitting.
+ struct epoll_event event;
+ int num_events = epoll_wait(epoll_fd_, &event, 1, run_ ? -1 : 0);
+ // Retry on EINTR and nothing else.
+ if (num_events == -1) {
+ if (errno == EINTR) {
+ continue;
+ }
+ PCHECK(num_events != -1);
+ }
+ if (!run_) {
+ // If we ran out of events, quit.
+ if (num_events == 0) {
+ return;
+ }
+ }
+ EventData *event_data = static_cast<struct EventData *>(event.data.ptr);
+ if (event.events & (EPOLLIN | EPOLLPRI)) {
+ event_data->in_fn();
+ }
+ }
+}
+
+void EPoll::Quit() { PCHECK(write(quit_signal_fd_, "q", 1) == 1); }
+
+void EPoll::OnReadable(int fd, ::std::function<void()> function) {
+ ::std::unique_ptr<EventData> event_data(
+ new EventData{fd, ::std::move(function)});
+
+ struct epoll_event event;
+ event.events = EPOLLIN | EPOLLPRI;
+ event.data.ptr = event_data.get();
+ PCHECK(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event) != 0);
+ fns_.push_back(::std::move(event_data));
+}
+
+// Removes fd from the event loop.
+void EPoll::DeleteFd(int fd) {
+ auto element = fns_.begin();
+ PCHECK(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr) != 0);
+ while (fns_.end() != element) {
+ if (element->get()->fd == fd) {
+ fns_.erase(element);
+ return;
+ }
+ ++element;
+ }
+ LOG(FATAL, "fd %d not found\n", fd);
+}
+
+} // namespace internal
+} // namespace aos
diff --git a/aos/events/epoll.h b/aos/events/epoll.h
new file mode 100644
index 0000000..29311c9
--- /dev/null
+++ b/aos/events/epoll.h
@@ -0,0 +1,95 @@
+#ifndef AOS_EVENTS_EPOLL_H_
+#define AOS_EVENTS_EPOLL_H_
+
+#include <fcntl.h>
+#include <sys/epoll.h>
+#include <sys/timerfd.h>
+#include <unistd.h>
+#include <atomic>
+#include <vector>
+
+#include "aos/logging/logging.h"
+#include "aos/time/time.h"
+
+namespace aos {
+namespace internal {
+
+// Class wrapping up timerfd.
+class TimerFd {
+ public:
+ TimerFd();
+ ~TimerFd() { PCHECK(close(fd_) == 0); }
+
+ TimerFd(const TimerFd &) = delete;
+ TimerFd &operator=(const TimerFd &) = delete;
+ TimerFd(TimerFd &&) = delete;
+ TimerFd &operator=(TimerFd &&) = delete;
+
+ // Sets the trigger time and repeat for the timerfd.
+ // An interval of 0 results in a single expiration.
+ void SetTime(monotonic_clock::time_point start,
+ monotonic_clock::duration interval);
+
+ // Disarms the timer.
+ void Disable() {
+ // Disarm the timer by feeding zero values
+ SetTime(::aos::monotonic_clock::epoch(), ::aos::monotonic_clock::zero());
+ }
+
+ // Reads the event. Ignores it.
+ void Read();
+
+ // Returns the file descriptor associated with the timerfd.
+ int fd() { return fd_; }
+
+ private:
+ int fd_ = -1;
+};
+
+// Class to wrap epoll and call a callback when an event happens.
+class EPoll {
+ public:
+ EPoll();
+ ~EPoll();
+ EPoll(const EPoll &) = delete;
+ EPoll &operator=(const EPoll &) = delete;
+ EPoll(EPoll &&) = delete;
+ EPoll &operator=(EPoll &&) = delete;
+
+ // Runs until Quit() is called.
+ void Run();
+
+ // Quits. Async safe.
+ void Quit();
+
+ // Registers a function to be called if the fd becomes readable.
+ // There should only be 1 function registered for each fd.
+ void OnReadable(int fd, ::std::function<void()> function);
+
+ // Removes fd from the event loop.
+ // All Fds must be cleaned up before this class is destroyed.
+ void DeleteFd(int fd);
+
+ private:
+ ::std::atomic<bool> run_{true};
+
+ // Main epoll fd.
+ int epoll_fd_;
+
+ // Structure whose pointer should be returned by epoll. Makes looking up the
+ // function fast and easy.
+ struct EventData {
+ int fd;
+ ::std::function<void()> in_fn;
+ };
+ ::std::vector<::std::unique_ptr<EventData>> fns_;
+
+ // Pipe pair for handling quit.
+ int quit_signal_fd_;
+ int quit_epoll_fd_;
+};
+
+} // namespace internal
+} // namespace aos
+
+#endif // AOS_EVENTS_EPOLL_H_
diff --git a/aos/events/event-loop.h b/aos/events/event-loop.h
index 79651ef..1435723 100644
--- a/aos/events/event-loop.h
+++ b/aos/events/event-loop.h
@@ -29,8 +29,8 @@
private:
friend class EventLoop;
- Fetcher(std::unique_ptr<RawFetcher> fetcher) : fetcher_(std::move(fetcher)) {}
- std::unique_ptr<RawFetcher> fetcher_;
+ Fetcher(::std::unique_ptr<RawFetcher> fetcher) : fetcher_(::std::move(fetcher)) {}
+ ::std::unique_ptr<RawFetcher> fetcher_;
};
// Sends messages to a queue.
@@ -85,8 +85,8 @@
private:
friend class EventLoop;
- Sender(std::unique_ptr<RawSender> sender) : sender_(std::move(sender)) {}
- std::unique_ptr<RawSender> sender_;
+ Sender(::std::unique_ptr<RawSender> sender) : sender_(::std::move(sender)) {}
+ ::std::unique_ptr<RawSender> sender_;
};
// TODO(parker): Consider making EventLoop wrap a RawEventLoop rather than
@@ -105,14 +105,14 @@
// Makes a class that will always fetch the most recent value
// sent to path.
template <typename T>
- Fetcher<T> MakeFetcher(const std::string &path) {
+ Fetcher<T> MakeFetcher(const ::std::string &path) {
return Fetcher<T>(MakeRawFetcher(path, QueueTypeInfo::Get<T>()));
}
// Makes class that allows constructing and sending messages to
// address path.
template <typename T>
- Sender<T> MakeSender(const std::string &path) {
+ Sender<T> MakeSender(const ::std::string &path) {
return Sender<T>(MakeRawSender(path, QueueTypeInfo::Get<T>()));
}
@@ -123,11 +123,11 @@
// Note that T needs to match both send and recv side.
// TODO(parker): Need to support ::std::bind. For now, use lambdas.
template <typename Watch>
- void MakeWatcher(const std::string &path, Watch &&w);
+ void MakeWatcher(const ::std::string &path, Watch &&w);
// The passed in function will be called when the event loop starts.
// Use this to run code once the thread goes into "real-time-mode",
- virtual void OnRun(std::function<void()>) = 0;
+ virtual void OnRun(::std::function<void()>) = 0;
// TODO(austin): OnExit
diff --git a/aos/events/event-loop_param_test.cc b/aos/events/event-loop_param_test.cc
index 5276aec..c74538c 100644
--- a/aos/events/event-loop_param_test.cc
+++ b/aos/events/event-loop_param_test.cc
@@ -26,10 +26,38 @@
::std::chrono::milliseconds(duration));
}
-// Tests that watcher and fetcher can fetch from a sender.
+// Tests that watcher can receive messages from a sender.
// Also tests that OnRun() works.
TEST_P(AbstractEventLoopTest, Basic) {
auto loop1 = Make();
+ auto loop2 = MakePrimary();
+
+ auto sender = loop1->MakeSender<TestMessage>("/test");
+
+ bool happened = false;
+
+ loop2->OnRun([&]() {
+ happened = true;
+
+ auto msg = sender.MakeMessage();
+ msg->msg_value = 200;
+ msg.Send();
+ });
+
+ loop2->MakeWatcher("/test", [&](const TestMessage &message) {
+ EXPECT_EQ(message.msg_value, 200);
+ loop2->Exit();
+ });
+
+ EXPECT_FALSE(happened);
+ Run();
+ EXPECT_TRUE(happened);
+}
+
+// Tests that a fetcher can fetch from a sender.
+// Also tests that OnRun() works.
+TEST_P(AbstractEventLoopTest, FetchWithoutRun) {
+ auto loop1 = Make();
auto loop2 = Make();
auto loop3 = MakePrimary();
@@ -39,15 +67,6 @@
EXPECT_FALSE(fetcher.Fetch());
- bool happened = false;
-
- loop3->OnRun([&]() { happened = true; });
-
- loop3->MakeWatcher("/test", [&](const TestMessage &message) {
- EXPECT_EQ(message.msg_value, 200);
- loop3->Exit();
- });
-
auto msg = sender.MakeMessage();
msg->msg_value = 200;
msg.Send();
@@ -55,10 +74,6 @@
EXPECT_TRUE(fetcher.Fetch());
ASSERT_FALSE(fetcher.get() == nullptr);
EXPECT_EQ(fetcher->msg_value, 200);
-
- EXPECT_FALSE(happened);
- Run();
- EXPECT_TRUE(happened);
}
// Tests that watcher will receive all messages sent if they are sent after
@@ -78,16 +93,25 @@
}
});
+ // Before Run, should be ignored.
{
auto msg = sender.MakeMessage();
- msg->msg_value = 200;
+ msg->msg_value = 199;
msg.Send();
}
- {
- auto msg = sender.MakeMessage();
- msg->msg_value = 201;
- msg.Send();
- }
+
+ loop2->OnRun([&]() {
+ {
+ auto msg = sender.MakeMessage();
+ msg->msg_value = 200;
+ msg.Send();
+ }
+ {
+ auto msg = sender.MakeMessage();
+ msg->msg_value = 201;
+ msg.Send();
+ }
+ });
Run();
@@ -361,7 +385,7 @@
}
// Verify that registering a watcher twice for "/test" fails.
-TEST_P(AbstractEventLoopTest, TwoWatcher) {
+TEST_P(AbstractEventLoopDeathTest, TwoWatcher) {
auto loop = Make();
loop->MakeWatcher("/test", [&](const TestMessage &) {});
EXPECT_DEATH(loop->MakeWatcher("/test", [&](const TestMessage &) {}),
@@ -369,7 +393,7 @@
}
// Verify that SetRuntimeRealtimePriority fails while running.
-TEST_P(AbstractEventLoopTest, SetRuntimeRealtimePriority) {
+TEST_P(AbstractEventLoopDeathTest, SetRuntimeRealtimePriority) {
auto loop = MakePrimary();
// Confirm that runtime priority calls work when not realtime.
loop->SetRuntimeRealtimePriority(5);
@@ -380,13 +404,33 @@
}
// Verify that registering a watcher and a sender for "/test" fails.
-TEST_P(AbstractEventLoopTest, WatcherAndSender) {
+TEST_P(AbstractEventLoopDeathTest, WatcherAndSender) {
auto loop = Make();
auto sender = loop->MakeSender<TestMessage>("/test");
EXPECT_DEATH(loop->MakeWatcher("/test", [&](const TestMessage &) {}),
"/test");
}
+// Verify that we can't create a sender inside OnRun.
+TEST_P(AbstractEventLoopDeathTest, SenderInOnRun) {
+ auto loop1 = MakePrimary();
+
+ loop1->OnRun(
+ [&]() { auto sender = loop1->MakeSender<TestMessage>("/test2"); });
+
+ EXPECT_DEATH(Run(), "running");
+}
+
+// Verify that we can't create a watcher inside OnRun.
+TEST_P(AbstractEventLoopDeathTest, WatcherInOnRun) {
+ auto loop1 = MakePrimary();
+
+ loop1->OnRun(
+ [&]() { loop1->MakeWatcher("/test", [&](const TestMessage &) {}); });
+
+ EXPECT_DEATH(Run(), "running");
+}
+
// Verify that Quit() works when there are multiple watchers.
TEST_P(AbstractEventLoopTest, MultipleWatcherQuit) {
auto loop1 = Make();
@@ -399,11 +443,12 @@
});
auto sender = loop1->MakeSender<TestMessage>("/test2");
- {
+
+ loop2->OnRun([&]() {
auto msg = sender.MakeMessage();
msg->msg_value = 200;
msg.Send();
- }
+ });
Run();
}
diff --git a/aos/events/event-loop_param_test.h b/aos/events/event-loop_param_test.h
index b5574ba..1d2060a 100644
--- a/aos/events/event-loop_param_test.h
+++ b/aos/events/event-loop_param_test.h
@@ -23,10 +23,10 @@
virtual void Run() = 0;
};
-class AbstractEventLoopTest
+class AbstractEventLoopTestBase
: public ::testing::TestWithParam<std::function<EventLoopTestFactory *()>> {
public:
- AbstractEventLoopTest() { factory_.reset(GetParam()()); }
+ AbstractEventLoopTestBase() { factory_.reset(GetParam()()); }
::std::unique_ptr<EventLoop> Make() { return factory_->Make(); }
::std::unique_ptr<EventLoop> MakePrimary() { return factory_->MakePrimary(); }
@@ -39,6 +39,9 @@
::std::unique_ptr<EventLoopTestFactory> factory_;
};
+typedef AbstractEventLoopTestBase AbstractEventLoopDeathTest;
+typedef AbstractEventLoopTestBase AbstractEventLoopTest;
+
} // namespace testing
} // namespace aos
diff --git a/aos/events/shm-event-loop.cc b/aos/events/shm-event-loop.cc
index ee71b4c..08f9e1f 100644
--- a/aos/events/shm-event-loop.cc
+++ b/aos/events/shm-event-loop.cc
@@ -6,15 +6,19 @@
#include <chrono>
#include <stdexcept>
+#include "aos/events/epoll.h"
#include "aos/init.h"
#include "aos/logging/logging.h"
#include "aos/queue.h"
namespace aos {
-ShmEventLoop::ShmEventLoop() : thread_state_(std::make_shared<ThreadState>()) {}
+ShmEventLoop::ShmEventLoop() {}
namespace {
+
+namespace chrono = ::std::chrono;
+
class ShmFetcher : public RawFetcher {
public:
explicit ShmFetcher(RawQueue *queue) : queue_(queue) {
@@ -79,13 +83,13 @@
public:
explicit ShmSender(RawQueue *queue) : queue_(queue) {}
- aos::Message *GetMessage() override {
- return reinterpret_cast<aos::Message *>(queue_->GetMessage());
+ ::aos::Message *GetMessage() override {
+ return reinterpret_cast<::aos::Message *>(queue_->GetMessage());
}
- void Free(aos::Message *msg) override { queue_->FreeMessage(msg); }
+ void Free(::aos::Message *msg) override { queue_->FreeMessage(msg); }
- bool Send(aos::Message *msg) override {
+ bool Send(::aos::Message *msg) override {
assert(queue_ != nullptr);
{
// TODO(austin): This lets multiple senders reorder messages since time
@@ -102,18 +106,60 @@
private:
RawQueue *queue_;
};
+
} // namespace
namespace internal {
+
+// Class to manage the state for a Watcher.
class WatcherThreadState {
public:
- WatcherThreadState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
- RawQueue *queue,
- std::function<void(const aos::Message *message)> watcher)
- : thread_state_(std::move(thread_state)),
+ WatcherThreadState(
+ ShmEventLoop::ThreadState *thread_state, RawQueue *queue,
+ ::std::function<void(const ::aos::Message *message)> watcher)
+ : thread_state_(thread_state),
queue_(queue),
index_(0),
- watcher_(std::move(watcher)) {
+ watcher_(::std::move(watcher)) {}
+
+ ~WatcherThreadState() {
+ // Only kill the thread if it is running.
+ if (running_) {
+ // TODO(austin): CHECK that we aren't RT here.
+
+ // Try joining. If we fail, we weren't asleep on the condition in the
+ // queue. So hit it again and again until that's true.
+ struct timespec end_time;
+ PCHECK(clock_gettime(CLOCK_REALTIME, &end_time) == 0);
+ while (true) {
+ void *retval = nullptr;
+ end_time.tv_nsec += 100000000;
+ if (end_time.tv_nsec > 1000000000L) {
+ end_time.tv_nsec -= 1000000000L;
+ ++end_time.tv_sec;
+ }
+ int ret = pthread_timedjoin_np(pthread_, &retval, &end_time);
+ if (ret == ETIMEDOUT) continue;
+ PCHECK(ret == 0);
+ break;
+ }
+ }
+ }
+
+ // Starts the thread and waits until it is running.
+ void Start() {
+ PCHECK(pthread_create(&pthread_, nullptr, &StaticRun, this) == 0);
+ IPCRecursiveMutexLocker locker(&thread_started_mutex_);
+ if (locker.owner_died()) ::aos::Die("Owner died");
+ while (!running_) {
+ CHECK(!thread_started_condition_.Wait());
+ }
+ }
+
+ void GrabQueueIndex() {
+ // Right after we are signaled to start, point index to the current index
+ // so we don't read any messages received before now. Otherwise we will
+ // get a significantly delayed read.
static constexpr Options<RawQueue> kOptions =
RawQueue::kFromEnd | RawQueue::kNonBlock;
const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
@@ -122,10 +168,31 @@
}
}
+ private:
+ // Runs Run given a WatcherThreadState as the argument. This is an adapter
+ // between pthreads and Run.
+ static void *StaticRun(void *arg) {
+ WatcherThreadState *watcher_thread_state =
+ reinterpret_cast<WatcherThreadState *>(arg);
+ watcher_thread_state->Run();
+ return nullptr;
+ }
+
+ // Runs the watcher callback on new messages.
void Run() {
+ // Signal the main thread that we are now ready.
thread_state_->MaybeSetCurrentThreadRealtimePriority();
+ {
+ IPCRecursiveMutexLocker locker(&thread_started_mutex_);
+ if (locker.owner_died()) ::aos::Die("Owner died");
+ running_ = true;
+ thread_started_condition_.Broadcast();
+ }
+
+ // Wait for the global start before handling events.
thread_state_->WaitForStart();
+ // Bail immediately if we are supposed to stop.
if (!thread_state_->is_running()) {
::aos::UnsetCurrentThreadRealtimePriority();
return;
@@ -133,10 +200,18 @@
const void *msg = nullptr;
while (true) {
- msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_);
- assert(msg != nullptr);
+ msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_,
+ chrono::seconds(1));
+ // We hit a timeout. Confirm that we should be running and retry. Note,
+ // is_running is threadsafe (it's an atomic underneath). Worst case, we
+ // check again in a second.
+ if (msg == nullptr) {
+ if (!thread_state_->is_running()) break;
+ continue;
+ }
{
+ // Grab the lock so that only one callback can be called at a time.
MutexLocker locker(&thread_state_->mutex_);
if (!thread_state_->is_running()) break;
@@ -144,142 +219,155 @@
// watcher_ may have exited the event loop.
if (!thread_state_->is_running()) break;
}
+ // Drop the reference.
queue_->FreeMessage(msg);
}
+ // And drop the last reference.
queue_->FreeMessage(msg);
+ // Now that everything is cleaned up, drop RT priority before destroying the
+ // thread.
::aos::UnsetCurrentThreadRealtimePriority();
}
-
- private:
- std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
+ pthread_t pthread_;
+ ShmEventLoop::ThreadState *thread_state_;
RawQueue *queue_;
int32_t index_;
+ bool running_ = false;
- std::function<void(const Message *message)> watcher_;
+ ::std::function<void(const Message *message)> watcher_;
+
+ // Mutex and condition variable used to wait until the thread is started
+ // before going RT.
+ ::aos::Mutex thread_started_mutex_;
+ ::aos::Condition thread_started_condition_{&thread_started_mutex_};
};
+// Adapter class to adapt a timerfd to a TimerHandler.
class TimerHandlerState : public TimerHandler {
public:
- TimerHandlerState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
- ::std::function<void()> fn)
- : thread_state_(std::move(thread_state)), fn_(::std::move(fn)) {
- fd_ = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
- PCHECK(fd_ != -1);
+ TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
+ : shm_event_loop_(shm_event_loop), fn_(::std::move(fn)) {
+ shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
+ MutexLocker locker(&shm_event_loop_->thread_state_.mutex_);
+ timerfd_.Read();
+ fn_();
+ });
}
- ~TimerHandlerState() {
- PCHECK(close(fd_) == 0);
- }
+ ~TimerHandlerState() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
void Setup(monotonic_clock::time_point base,
monotonic_clock::duration repeat_offset) override {
- struct itimerspec new_value;
- new_value.it_interval = ::aos::time::to_timespec(repeat_offset);
- new_value.it_value = ::aos::time::to_timespec(base);
- PCHECK(timerfd_settime(fd_, TFD_TIMER_ABSTIME, &new_value, nullptr) == 0);
+ timerfd_.SetTime(base, repeat_offset);
}
- void Disable() override {
- // Disarm the timer by feeding zero values
- Setup(::aos::monotonic_clock::epoch(), ::aos::monotonic_clock::zero());
- }
-
- void Run() {
- thread_state_->MaybeSetCurrentThreadRealtimePriority();
- thread_state_->WaitForStart();
-
- while (true) {
- uint64_t buf;
- ssize_t result = read(fd_, &buf, sizeof(buf));
- PCHECK(result != -1);
- CHECK_EQ(result, static_cast<int>(sizeof(buf)));
-
- {
- MutexLocker locker(&thread_state_->mutex_);
- if (!thread_state_->is_running()) break;
- fn_();
- // fn_ may have exited the event loop.
- if (!thread_state_->is_running()) break;
- }
- }
- ::aos::UnsetCurrentThreadRealtimePriority();
- }
+ void Disable() override { timerfd_.Disable(); }
private:
- std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
+ ShmEventLoop *shm_event_loop_;
- // File descriptor for the timer
- int fd_;
+ TimerFd timerfd_;
// Function to be run on the thread
::std::function<void()> fn_;
};
} // namespace internal
-std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
- const std::string &path, const QueueTypeInfo &type) {
- return std::unique_ptr<RawFetcher>(new ShmFetcher(
+::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
+ const ::std::string &path, const QueueTypeInfo &type) {
+ return ::std::unique_ptr<RawFetcher>(new ShmFetcher(
RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
}
-std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
- const std::string &path, const QueueTypeInfo &type) {
+::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
+ const ::std::string &path, const QueueTypeInfo &type) {
Take(path);
- return std::unique_ptr<RawSender>(new ShmSender(
+ return ::std::unique_ptr<RawSender>(new ShmSender(
RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
}
void ShmEventLoop::MakeRawWatcher(
- const std::string &path, const QueueTypeInfo &type,
- std::function<void(const Message *message)> watcher) {
+ const ::std::string &path, const QueueTypeInfo &type,
+ ::std::function<void(const Message *message)> watcher) {
Take(path);
- auto *state = new internal::WatcherThreadState(
- thread_state_,
- RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length),
- std::move(watcher));
-
- std::thread thread([state] {
- state->Run();
- delete state;
- });
- thread.detach();
+ ::std::unique_ptr<internal::WatcherThreadState> state(
+ new internal::WatcherThreadState(
+ &thread_state_, RawQueue::Fetch(path.c_str(), type.size, type.hash,
+ type.queue_length),
+ std::move(watcher)));
+ watchers_.push_back(::std::move(state));
}
TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
- internal::TimerHandlerState *timer =
- new internal::TimerHandlerState(thread_state_, ::std::move(callback));
+ ::std::unique_ptr<internal::TimerHandlerState> timer(
+ new internal::TimerHandlerState(this, ::std::move(callback)));
- ::std::thread t([timer] {
- timer->Run();
- delete timer;
- });
- t.detach();
+ timers_.push_back(::std::move(timer));
- return timer;
+ return timers_.back().get();
}
-void ShmEventLoop::OnRun(std::function<void()> on_run) {
- on_run_.push_back(std::move(on_run));
+void ShmEventLoop::OnRun(::std::function<void()> on_run) {
+ on_run_.push_back(::std::move(on_run));
}
void ShmEventLoop::Run() {
- thread_state_->MaybeSetCurrentThreadRealtimePriority();
+ // Start all the watcher threads.
+ for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
+ watcher->Start();
+ }
+
+ // Now, all the threads are up. Go RT.
+ thread_state_.MaybeSetCurrentThreadRealtimePriority();
set_is_running(true);
- for (const auto &run : on_run_) run();
- // TODO(austin): epoll event loop in main thread (if needed), and async safe
- // quit handler.
- thread_state_->Run();
+
+ // Now that we are realtime (but before the OnRun handlers run), snap the
+ // queue index.
+ for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
+ watcher->GrabQueueIndex();
+ }
+
+ // Now that we are RT, run all the OnRun handlers.
+ for (const auto &run : on_run_) {
+ run();
+ }
+ // TODO(austin): We don't need a separate watcher thread if there are only
+ // watchers and fetchers. Could lazily create the epoll loop and pick a
+ // victim watcher to run in this thread.
+ // Trigger all the threads to start now.
+ thread_state_.Start();
+
+ // And start our main event loop which runs all the timers and handles Quit.
+ epoll_.Run();
+
+ // Once epoll exits, there is no useful nonrt work left to do.
+ set_is_running(false);
+
+ // Signal all the watcher threads to exit. After this point, no more
+ // callbacks will be handled.
+ thread_state_.Exit();
+
+ // Nothing time or synchronization critical needs to happen after this point.
+ // Drop RT priority.
::aos::UnsetCurrentThreadRealtimePriority();
+
+ // The watcher threads get cleaned up in the destructor.
}
-void ShmEventLoop::ThreadState::Run() {
+void ShmEventLoop::ThreadState::Start() {
MutexLocker locker(&mutex_);
loop_running_ = true;
if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
loop_running_cond_.Broadcast();
- while (loop_running_) {
- if (loop_running_cond_.Wait()) {
+}
+
+void ShmEventLoop::ThreadState::WaitForStart() {
+ MutexLocker locker(&mutex_);
+ while (!(loop_running_ || loop_finished_)) {
+ Condition::WaitResult wait_result =
+ loop_running_cond_.WaitTimed(chrono::milliseconds(1000));
+ if (wait_result == Condition::WaitResult::kOwnerDied) {
::aos::Die("ShmEventLoop mutex lock problem.\n");
}
}
@@ -291,19 +379,7 @@
}
}
-void ShmEventLoop::ThreadState::WaitForStart() {
- MutexLocker locker(&mutex_);
- while (!(loop_running_ || loop_finished_)) {
- if (loop_running_cond_.Wait()) {
- ::aos::Die("ShmEventLoop mutex lock problem.\n");
- }
- }
-}
-
-void ShmEventLoop::Exit() {
- set_is_running(false);
- thread_state_->Exit();
-}
+void ShmEventLoop::Exit() { epoll_.Quit(); }
void ShmEventLoop::ThreadState::Exit() {
IPCRecursiveMutexLocker locker(&mutex_);
@@ -319,7 +395,7 @@
}
}
-void ShmEventLoop::Take(const std::string &path) {
+void ShmEventLoop::Take(const ::std::string &path) {
if (is_running()) {
::aos::Die("Cannot add new objects while running.\n");
}
diff --git a/aos/events/shm-event-loop.h b/aos/events/shm-event-loop.h
index 3d231ba..79101de 100644
--- a/aos/events/shm-event-loop.h
+++ b/aos/events/shm-event-loop.h
@@ -3,9 +3,11 @@
#include <unordered_set>
#include <vector>
+
#include "aos/condition.h"
-#include "aos/mutex/mutex.h"
+#include "aos/events/epoll.h"
#include "aos/events/event-loop.h"
+#include "aos/mutex/mutex.h"
namespace aos {
namespace internal {
@@ -30,26 +32,28 @@
return ::aos::monotonic_clock::now();
}
- std::unique_ptr<RawSender> MakeRawSender(const std::string &path,
- const QueueTypeInfo &type) override;
- std::unique_ptr<RawFetcher> MakeRawFetcher(
- const std::string &path, const QueueTypeInfo &type) override;
+ ::std::unique_ptr<RawSender> MakeRawSender(
+ const ::std::string &path, const QueueTypeInfo &type) override;
+ ::std::unique_ptr<RawFetcher> MakeRawFetcher(
+ const ::std::string &path, const QueueTypeInfo &type) override;
void MakeRawWatcher(
- const std::string &path, const QueueTypeInfo &type,
- std::function<void(const aos::Message *message)> watcher) override;
+ const ::std::string &path, const QueueTypeInfo &type,
+ ::std::function<void(const aos::Message *message)> watcher) override;
TimerHandler *AddTimer(::std::function<void()> callback) override;
- void OnRun(std::function<void()> on_run) override;
+ void OnRun(::std::function<void()> on_run) override;
void Run() override;
void Exit() override;
+ // TODO(austin): Add a function to register control-C call.
+
void SetRuntimeRealtimePriority(int priority) override {
if (is_running()) {
::aos::Die("Cannot set realtime priority while running.");
}
- thread_state_->priority_ = priority;
+ thread_state_.priority_ = priority;
}
private:
@@ -65,7 +69,7 @@
bool is_running() { return loop_running_; }
- void Run();
+ void Start();
void Exit();
@@ -77,23 +81,26 @@
friend class ShmEventLoop;
// This mutex ensures that only one watch event happens at a time.
- aos::Mutex mutex_;
+ ::aos::Mutex mutex_;
// Block on this until the loop starts.
- aos::Condition loop_running_cond_{&mutex_};
+ ::aos::Condition loop_running_cond_{&mutex_};
// Used to notify watchers that the loop is done.
- std::atomic<bool> loop_running_{false};
+ ::std::atomic<bool> loop_running_{false};
bool loop_finished_ = false;
int priority_ = -1;
};
- // Exclude multiple of the same type for path.
+ // Tracks that we can't have multiple watchers or a sender and a watcher (or
+ // multiple senders) on a single queue (path).
+ void Take(const ::std::string &path);
- std::vector<std::function<void()>> on_run_;
- std::shared_ptr<ThreadState> thread_state_;
+ ::std::vector<::std::function<void()>> on_run_;
+ ThreadState thread_state_;
+ ::std::vector<::std::string> taken_;
+ internal::EPoll epoll_;
- void Take(const std::string &path);
-
- std::vector<::std::string> taken_;
+ ::std::vector<::std::unique_ptr<internal::TimerHandlerState>> timers_;
+ ::std::vector<::std::unique_ptr<internal::WatcherThreadState>> watchers_;
};
} // namespace aos
diff --git a/aos/events/shm-event-loop_test.cc b/aos/events/shm-event-loop_test.cc
index efa8545..61adbe3 100644
--- a/aos/events/shm-event-loop_test.cc
+++ b/aos/events/shm-event-loop_test.cc
@@ -35,6 +35,11 @@
return new ShmEventLoopTestFactory();
}));
+INSTANTIATE_TEST_CASE_P(ShmEventLoopDeathTest, AbstractEventLoopDeathTest,
+ ::testing::Values([]() {
+ return new ShmEventLoopTestFactory();
+ }));
+
struct TestMessage : public ::aos::Message {
enum { kQueueLength = 100, kHash = 0x696c0cdc };
int msg_value;
diff --git a/aos/events/simulated-event-loop.cc b/aos/events/simulated-event-loop.cc
index 3836283..04e4628 100644
--- a/aos/events/simulated-event-loop.cc
+++ b/aos/events/simulated-event-loop.cc
@@ -313,9 +313,11 @@
void SimulatedQueue::Send(RefCountedBuffer message) {
latest_message_ = message;
- for (auto &watcher : watchers_) {
- scheduler_->Schedule(scheduler_->monotonic_now(),
- [watcher, message]() { watcher(message.get()); });
+ if (scheduler_->is_running()) {
+ for (auto &watcher : watchers_) {
+ scheduler_->Schedule(scheduler_->monotonic_now(),
+ [watcher, message]() { watcher(message.get()); });
+ }
}
for (auto &fetcher : fetchers_) {
fetcher->Enqueue(message);
diff --git a/aos/events/simulated-event-loop.h b/aos/events/simulated-event-loop.h
index 6ee7d92..81b181f 100644
--- a/aos/events/simulated-event-loop.h
+++ b/aos/events/simulated-event-loop.h
@@ -120,6 +120,8 @@
is_running_ = false;
}
+ bool is_running() const { return is_running_; }
+
void AddRawEventLoop(RawEventLoop *event_loop) {
raw_event_loops_.push_back(event_loop);
}
diff --git a/aos/events/simulated-event-loop_test.cc b/aos/events/simulated-event-loop_test.cc
index 41d4dde..bcc6f28 100644
--- a/aos/events/simulated-event-loop_test.cc
+++ b/aos/events/simulated-event-loop_test.cc
@@ -22,6 +22,11 @@
SimulatedEventLoopFactory event_loop_factory_;
};
+INSTANTIATE_TEST_CASE_P(SimulatedEventLoopDeathTest, AbstractEventLoopDeathTest,
+ ::testing::Values([]() {
+ return new SimulatedEventLoopTestFactory();
+ }));
+
INSTANTIATE_TEST_CASE_P(SimulatedEventLoopTest, AbstractEventLoopTest,
::testing::Values([]() {
return new SimulatedEventLoopTestFactory();