Run watchers even if a timer is firing constantly
Also make sure to drain the signalfd each time through.
There can only be so many outstanding signals on a system, and this
bounds that much better. It also handles events which take too long to
handle and other events are ready to be handled.
Change-Id: I24ea65356c7316818ce70f34a84174426b85d329
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 39ce8db..61a4149 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -19,7 +19,6 @@
#include "aos/events/timing_statistics.h"
#include "aos/init.h"
#include "aos/ipc_lib/lockless_queue.h"
-#include "aos/ipc_lib/signalfd.h"
#include "aos/realtime.h"
#include "aos/stl_mutex/stl_mutex.h"
#include "aos/util/file.h"
@@ -654,7 +653,7 @@
shm_event_loop_(shm_event_loop),
event_(this) {
shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
- // The timer may fire spurriously. HandleEvent on the event loop will
+ // The timer may fire spuriously. HandleEvent on the event loop will
// call the callback if it is needed. It may also have called it when
// processing some other event, and the kernel decided to deliver this
// wakeup anyways.
@@ -845,22 +844,82 @@
on_run_.push_back(::std::move(on_run));
}
+// This is a bit tricky because watchers can generate new events at any time (as
+// long as it's in the past). We want to check the watchers at least once before
+// declaring there are no events to handle, and we want to check them again if
+// event processing takes long enough that we find an event after that point in
+// time to handle.
void ShmEventLoop::HandleEvent() {
- // Update all the times for handlers.
- for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
- ShmWatcherState *watcher =
- reinterpret_cast<ShmWatcherState *>(base_watcher.get());
-
- watcher->CheckForNewData();
+ // Time through which we've checked for new events in watchers.
+ monotonic_clock::time_point checked_until = monotonic_clock::min_time;
+ if (!signalfd_) {
+ // Nothing to check, so we can bail out immediately once we're out of
+ // events.
+ CHECK(watchers_.empty());
+ checked_until = monotonic_clock::max_time;
}
+ // Loop until we run out of events to check.
while (true) {
- if (EventCount() == 0 ||
- PeekEvent()->event_time() > monotonic_clock::now()) {
+ // Time of the next event we know about. If this is before checked_until, we
+ // know there aren't any new events before the next one that we already know
+ // about, so no need to check the watchers.
+ monotonic_clock::time_point next_time = monotonic_clock::max_time;
+
+ if (EventCount() == 0) {
+ if (checked_until != monotonic_clock::min_time) {
+ // No events, and we've already checked the watchers at least once, so
+ // we're all done.
+ //
+ // There's a small chance that a watcher has gotten another event in
+ // between checked_until and now. If so, then the signalfd will be
+ // triggered now and we'll re-enter HandleEvent immediately. This is
+ // unlikely though, so we don't want to spend time checking all the
+ // watchers unnecessarily.
+ break;
+ }
+ } else {
+ next_time = PeekEvent()->event_time();
+ }
+ const auto now = monotonic_clock::now();
+
+ if (next_time > checked_until) {
+ // Read all of the signals, because there's no point in waking up again
+ // immediately to handle each one if we've fallen behind.
+ //
+ // This is safe before checking for new data on the watchers. If a signal
+ // is cleared here, the corresponding CheckForNewData() call below will
+ // pick it up.
+ while (true) {
+ const signalfd_siginfo result = signalfd_->Read();
+ if (result.ssi_signo == 0) {
+ break;
+ }
+ CHECK_EQ(result.ssi_signo, ipc_lib::kWakeupSignal);
+ }
+
+ // Check all the watchers for new events.
+ for (std::unique_ptr<WatcherState> &base_watcher : watchers_) {
+ ShmWatcherState *const watcher =
+ reinterpret_cast<ShmWatcherState *>(base_watcher.get());
+
+ watcher->CheckForNewData();
+ }
+ if (EventCount() == 0) {
+ // Still no events, all done now.
+ break;
+ }
+
+ checked_until = now;
+ // Check for any new events we found.
+ next_time = PeekEvent()->event_time();
+ }
+
+ if (next_time > now) {
break;
}
- EventLoopEvent *event = PopEvent();
+ EventLoopEvent *const event = PopEvent();
event->HandleEvent();
}
}
@@ -961,21 +1020,10 @@
void ShmEventLoop::Run() {
SignalHandler::global()->Register(this);
- std::unique_ptr<ipc_lib::SignalFd> signalfd;
-
if (watchers_.size() > 0) {
- signalfd.reset(new ipc_lib::SignalFd({ipc_lib::kWakeupSignal}));
+ signalfd_.reset(new ipc_lib::SignalFd({ipc_lib::kWakeupSignal}));
- epoll_.OnReadable(signalfd->fd(), [signalfd_ptr = signalfd.get(), this]() {
- signalfd_siginfo result = signalfd_ptr->Read();
- CHECK_EQ(result.ssi_signo, ipc_lib::kWakeupSignal);
-
- // TODO(austin): We should really be checking *everything*, not just
- // watchers, and calling the oldest thing first. That will improve
- // determinism a lot.
-
- HandleEvent();
- });
+ epoll_.OnReadable(signalfd_->fd(), [this]() { HandleEvent(); });
}
MaybeScheduleTimingReports();
@@ -1034,8 +1082,8 @@
}
if (watchers_.size() > 0) {
- epoll_.DeleteFd(signalfd->fd());
- signalfd.reset();
+ epoll_.DeleteFd(signalfd_->fd());
+ signalfd_.reset();
}
SignalHandler::global()->Unregister(this);
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index b76b4fe..845857c 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -8,6 +8,7 @@
#include "aos/events/epoll.h"
#include "aos/events/event_loop.h"
#include "aos/events/event_loop_generated.h"
+#include "aos/ipc_lib/signalfd.h"
DECLARE_string(application_name);
DECLARE_string(shm_base);
@@ -151,6 +152,9 @@
const Node *const node_;
internal::EPoll epoll_;
+
+ // Only set during Run().
+ std::unique_ptr<ipc_lib::SignalFd> signalfd_;
};
} // namespace aos