Add AddPhasedLoop to support phased loops
This lets us convert phased loops over nicely in various places.
Change-Id: Icdde4520f991fc541fbbe7ab3d9b945bb8c12e83
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 7c2db52..aaa1c86 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -43,6 +43,7 @@
"//aos:init",
"//aos:queues",
"//aos/logging",
+ "//aos/util:phased_loop",
],
)
@@ -89,5 +90,6 @@
"//aos:queues",
"//aos/logging",
"//aos/testing:test_logging",
+ "//aos/util:phased_loop",
],
)
diff --git a/aos/events/event-loop_param_test.cc b/aos/events/event-loop_param_test.cc
index c74538c..0a10ecb 100644
--- a/aos/events/event-loop_param_test.cc
+++ b/aos/events/event-loop_param_test.cc
@@ -1,10 +1,15 @@
#include "aos/events/event-loop_param_test.h"
+#include <chrono>
+
#include "gmock/gmock.h"
#include "gtest/gtest.h"
namespace aos {
namespace testing {
+namespace {
+namespace chrono = ::std::chrono;
+} // namespace
struct TestMessage : public ::aos::Message {
enum { kQueueLength = 100, kHash = 0x696c0cdc };
@@ -462,6 +467,7 @@
iteration_list.push_back(loop->monotonic_now());
});
+ // TODO(austin): This should be an error... Should be done in OnRun only.
test_timer->Setup(loop->monotonic_now(), ::std::chrono::milliseconds(20));
EndEventLoop(loop.get(), ::std::chrono::milliseconds(150));
// Testing that the timer thread waits for the event loop to start before
@@ -485,7 +491,6 @@
test_timer->Setup(loop->monotonic_now(), ::std::chrono::milliseconds(30));
});
-
test_timer->Setup(loop->monotonic_now(), ::std::chrono::milliseconds(20));
modifier_timer->Setup(loop->monotonic_now() +
::std::chrono::milliseconds(45));
@@ -541,12 +546,76 @@
fetcher->sent_time - (loop1->monotonic_now() - ::std::chrono::seconds(1));
EXPECT_TRUE(time_offset > ::std::chrono::milliseconds(-500))
- << ": Got " << fetcher->sent_time.time_since_epoch().count() << " expected "
- << loop1->monotonic_now().time_since_epoch().count();
+ << ": Got " << fetcher->sent_time.time_since_epoch().count()
+ << " expected " << loop1->monotonic_now().time_since_epoch().count();
EXPECT_TRUE(time_offset < ::std::chrono::milliseconds(500))
<< ": Got " << fetcher->sent_time.time_since_epoch().count()
<< " expected " << loop1->monotonic_now().time_since_epoch().count();
}
+// Tests that a couple phased loops run in a row result in the correct offset
+// and period.
+TEST_P(AbstractEventLoopTest, PhasedLoopTest) {
+ const chrono::milliseconds kOffset = chrono::milliseconds(400);
+ const int kCount = 5;
+
+ auto loop1 = MakePrimary();
+
+ // Collect up a couple of samples.
+ ::std::vector<::aos::monotonic_clock::time_point> times;
+
+ // Run kCount iterations.
+ loop1->AddPhasedLoop(
+ [×, &loop1](int count) {
+ EXPECT_EQ(count, 1);
+ times.push_back(loop1->monotonic_now());
+ LOG(INFO, "%zu\n", times.size());
+ if (times.size() == kCount) {
+ loop1->Exit();
+ }
+ },
+ chrono::seconds(1), kOffset);
+
+ // Add a delay to make sure that delay during startup doesn't result in a
+ // "missed cycle".
+ SleepFor(chrono::seconds(2));
+
+ Run();
+
+ // Confirm that we got both the right number of samples, and it's odd.
+ EXPECT_EQ(times.size(), static_cast<size_t>(kCount));
+ EXPECT_EQ((times.size() % 2), 1);
+
+ // Grab the middle sample.
+ ::aos::monotonic_clock::time_point middle_time = times[times.size() / 2 + 1];
+
+ // Add up all the delays of all the times.
+ ::aos::monotonic_clock::duration sum = chrono::seconds(0);
+ for (const ::aos::monotonic_clock::time_point time : times) {
+ sum += time - middle_time;
+ }
+
+ // Average and add to the middle to find the average time.
+ sum /= times.size();
+ middle_time += sum;
+
+ // Compute the offset from the start of the second of the average time. This
+ // should be pretty close to the offset.
+ const ::aos::monotonic_clock::duration remainder =
+ middle_time.time_since_epoch() -
+ chrono::duration_cast<chrono::seconds>(middle_time.time_since_epoch());
+
+ const chrono::milliseconds kEpsilon(100);
+ EXPECT_LT(remainder, kOffset + kEpsilon);
+ EXPECT_GT(remainder, kOffset - kEpsilon);
+
+ // Make sure that the average duration is close to 1 second.
+ EXPECT_NEAR(chrono::duration_cast<chrono::duration<double>>(times.back() -
+ times.front())
+ .count() /
+ static_cast<double>(times.size() - 1),
+ 1.0, 0.1);
+}
+
} // namespace testing
} // namespace aos
diff --git a/aos/events/event-loop_param_test.h b/aos/events/event-loop_param_test.h
index 1d2060a..26d869d 100644
--- a/aos/events/event-loop_param_test.h
+++ b/aos/events/event-loop_param_test.h
@@ -21,6 +21,9 @@
// Runs the loops until they quit.
virtual void Run() = 0;
+
+ // Advances time by sleeping. Can't be called from inside a loop.
+ virtual void SleepFor(::std::chrono::nanoseconds duration) = 0;
};
class AbstractEventLoopTestBase
@@ -32,6 +35,10 @@
::std::unique_ptr<EventLoop> MakePrimary() { return factory_->MakePrimary(); }
void Run() { return factory_->Run(); }
+
+ void SleepFor(::std::chrono::nanoseconds duration) {
+ return factory_->SleepFor(duration);
+ }
// You can implement all the usual fixture class members here.
// To access the test parameter, call GetParam() from class
// TestWithParam<T>.
diff --git a/aos/events/raw-event-loop.h b/aos/events/raw-event-loop.h
index be605e9..6ff7179 100644
--- a/aos/events/raw-event-loop.h
+++ b/aos/events/raw-event-loop.h
@@ -112,6 +112,18 @@
virtual void Disable() = 0;
};
+// Interface for phased loops. They are built on timers.
+class PhasedLoopHandler {
+ public:
+ virtual ~PhasedLoopHandler() {}
+
+ // Sets the interval and offset. Any changes to interval and offset only take
+ // effect when the handler finishes running.
+ virtual void set_interval_and_offset(
+ const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset) = 0;
+};
+
class EventScheduler;
// Virtual base class for all event queue-types.
@@ -126,16 +138,25 @@
// Use this to run code once the thread goes into "real-time-mode",
virtual void OnRun(::std::function<void()> on_run) = 0;
+ // Threadsafe.
bool is_running() const { return is_running_.load(); }
// Creates a timer that executes callback when the timer expires
// Returns a TimerHandle for configuration of the timer
virtual TimerHandler *AddTimer(::std::function<void()> callback) = 0;
+ // Creates a timer that executes callback periodically at the specified
+ // interval and offset. Returns a PhasedLoopHandler for interacting with the
+ // timer.
+ virtual PhasedLoopHandler *AddPhasedLoop(
+ ::std::function<void(int)> callback,
+ const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset = ::std::chrono::seconds(0)) = 0;
+
// Stops receiving events.
virtual void Exit() = 0;
- // TODO(austin): This shouldn't belong.
+ // TODO(austin): This shouldn't belong here.
virtual void Run() = 0;
protected:
diff --git a/aos/events/shm-event-loop.cc b/aos/events/shm-event-loop.cc
index 08f9e1f..8c0f3a2 100644
--- a/aos/events/shm-event-loop.cc
+++ b/aos/events/shm-event-loop.cc
@@ -10,6 +10,7 @@
#include "aos/init.h"
#include "aos/logging/logging.h"
#include "aos/queue.h"
+#include "aos/util/phased_loop.h"
namespace aos {
@@ -244,6 +245,8 @@
};
// Adapter class to adapt a timerfd to a TimerHandler.
+// The part of the API which is accessed by the TimerHandler interface needs to
+// be threadsafe. This means Setup and Disable.
class TimerHandlerState : public TimerHandler {
public:
TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
@@ -259,10 +262,14 @@
void Setup(monotonic_clock::time_point base,
monotonic_clock::duration repeat_offset) override {
+ // SetTime is threadsafe already.
timerfd_.SetTime(base, repeat_offset);
}
- void Disable() override { timerfd_.Disable(); }
+ void Disable() override {
+ // Disable is also threadsafe already.
+ timerfd_.Disable();
+ }
private:
ShmEventLoop *shm_event_loop_;
@@ -272,6 +279,70 @@
// Function to be run on the thread
::std::function<void()> fn_;
};
+
+// Adapter class to the timerfd and PhasedLoop.
+// The part of the API which is accessed by the PhasedLoopHandler interface
+// needs to be threadsafe. This means set_interval_and_offset
+class PhasedLoopHandler : public ::aos::PhasedLoopHandler {
+ public:
+ PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
+ const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset)
+ : shm_event_loop_(shm_event_loop),
+ phased_loop_(interval, shm_event_loop_->monotonic_now(), offset),
+ fn_(::std::move(fn)) {
+ shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
+ MutexLocker locker(&shm_event_loop_->thread_state_.mutex_);
+ {
+ MutexLocker locker(&mutex_);
+ timerfd_.Read();
+ }
+ // Call the function. To avoid needing a recursive mutex, drop the lock
+ // before running the function.
+ fn_(cycles_elapsed_);
+ {
+ MutexLocker locker(&mutex_);
+ Reschedule();
+ }
+ });
+ }
+
+ ~PhasedLoopHandler() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
+
+ void set_interval_and_offset(
+ const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset) override {
+ MutexLocker locker(&mutex_);
+ phased_loop_.set_interval_and_offset(interval, offset);
+ }
+
+ void Startup() {
+ MutexLocker locker(&mutex_);
+ phased_loop_.Reset(shm_event_loop_->monotonic_now());
+ Reschedule();
+ }
+
+ private:
+ // Reschedules the timer. Must be called with the mutex held.
+ void Reschedule() {
+ cycles_elapsed_ = phased_loop_.Iterate(shm_event_loop_->monotonic_now());
+ timerfd_.SetTime(phased_loop_.sleep_time(), ::aos::monotonic_clock::zero());
+ }
+
+ ShmEventLoop *shm_event_loop_;
+
+ // Mutex to protect access to the timerfd_ (not strictly necessary), and the
+ // phased_loop (necessary).
+ ::aos::Mutex mutex_;
+
+ TimerFd timerfd_;
+ time::PhasedLoop phased_loop_;
+
+ int cycles_elapsed_ = 1;
+
+ // Function to be run
+ const ::std::function<void(int)> fn_;
+};
} // namespace internal
::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
@@ -308,6 +379,19 @@
return timers_.back().get();
}
+PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
+ ::std::function<void(int)> callback,
+ const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset) {
+ ::std::unique_ptr<internal::PhasedLoopHandler> phased_loop(
+ new internal::PhasedLoopHandler(this, ::std::move(callback), interval,
+ offset));
+
+ phased_loops_.push_back(::std::move(phased_loop));
+
+ return phased_loops_.back().get();
+}
+
void ShmEventLoop::OnRun(::std::function<void()> on_run) {
on_run_.push_back(::std::move(on_run));
}
@@ -332,6 +416,12 @@
for (const auto &run : on_run_) {
run();
}
+
+ // Start up all the phased loops.
+ for (::std::unique_ptr<internal::PhasedLoopHandler> &phased_loop :
+ phased_loops_) {
+ phased_loop->Startup();
+ }
// TODO(austin): We don't need a separate watcher thread if there are only
// watchers and fetchers. Could lazily create the epoll loop and pick a
// victim watcher to run in this thread.
diff --git a/aos/events/shm-event-loop.h b/aos/events/shm-event-loop.h
index 79101de..91ca342 100644
--- a/aos/events/shm-event-loop.h
+++ b/aos/events/shm-event-loop.h
@@ -14,6 +14,7 @@
class WatcherThreadState;
class TimerHandlerState;
+class PhasedLoopHandler;
} // namespace internal
@@ -42,6 +43,11 @@
::std::function<void(const aos::Message *message)> watcher) override;
TimerHandler *AddTimer(::std::function<void()> callback) override;
+ ::aos::PhasedLoopHandler *AddPhasedLoop(
+ ::std::function<void(int)> callback,
+ const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset =
+ ::std::chrono::seconds(0)) override;
void OnRun(::std::function<void()> on_run) override;
void Run() override;
@@ -59,6 +65,7 @@
private:
friend class internal::WatcherThreadState;
friend class internal::TimerHandlerState;
+ friend class internal::PhasedLoopHandler;
// This ThreadState ensures that two watchers in the same loop cannot be
// triggered concurrently. Because watchers block threads indefinitely, this
// has to be shared_ptr in case the EventLoop is destroyed before the thread
@@ -78,6 +85,7 @@
private:
friend class internal::WatcherThreadState;
friend class internal::TimerHandlerState;
+ friend class internal::PhasedLoopHandler;
friend class ShmEventLoop;
// This mutex ensures that only one watch event happens at a time.
@@ -100,6 +108,7 @@
internal::EPoll epoll_;
::std::vector<::std::unique_ptr<internal::TimerHandlerState>> timers_;
+ ::std::vector<::std::unique_ptr<internal::PhasedLoopHandler>> phased_loops_;
::std::vector<::std::unique_ptr<internal::WatcherThreadState>> watchers_;
};
diff --git a/aos/events/shm-event-loop_test.cc b/aos/events/shm-event-loop_test.cc
index 61adbe3..8d0f552 100644
--- a/aos/events/shm-event-loop_test.cc
+++ b/aos/events/shm-event-loop_test.cc
@@ -24,6 +24,10 @@
void Run() override { CHECK_NOTNULL(primary_event_loop_)->Run(); }
+ void SleepFor(::std::chrono::nanoseconds duration) override {
+ ::std::this_thread::sleep_for(duration);
+ }
+
private:
::aos::testing::TestSharedMemory my_shm_;
@@ -102,5 +106,55 @@
EXPECT_TRUE(did_timer);
EXPECT_TRUE(did_watcher);
}
+
+// Tests that missing a deadline inside the function still results in PhasedLoop
+// running at the right offset.
+TEST(ShmEventLoopTest, DelayedPhasedLoop) {
+ ShmEventLoopTestFactory factory;
+ auto loop1 = factory.MakePrimary();
+
+ ::std::vector<::aos::monotonic_clock::time_point> times;
+
+ constexpr chrono::milliseconds kOffset = chrono::milliseconds(400);
+
+ loop1->AddPhasedLoop(
+ [×, &loop1, &kOffset](int count) {
+ const ::aos::monotonic_clock::time_point monotonic_now =
+ loop1->monotonic_now();
+
+ // Compute our offset.
+ const ::aos::monotonic_clock::duration remainder =
+ monotonic_now.time_since_epoch() -
+ chrono::duration_cast<chrono::seconds>(
+ monotonic_now.time_since_epoch());
+
+ // Make sure we we are called near where we should be even when we
+ // delay.
+ constexpr chrono::milliseconds kEpsilon(200);
+ EXPECT_LT(remainder, kOffset + kEpsilon);
+ EXPECT_GT(remainder, kOffset - kEpsilon);
+
+ // Confirm that we see the missed count when we sleep.
+ if (times.size() == 0) {
+ EXPECT_EQ(count, 1);
+ } else {
+ EXPECT_EQ(count, 3);
+ }
+
+ times.push_back(loop1->monotonic_now());
+ if (times.size() == 2) {
+ loop1->Exit();
+ }
+
+ // Now, add a large delay. This should push us up to 3 cycles.
+ ::std::this_thread::sleep_for(chrono::milliseconds(2500));
+ },
+ chrono::seconds(1), kOffset);
+
+ factory.Run();
+
+ EXPECT_EQ(times.size(), 2u);
+}
+
} // namespace testing
} // namespace aos
diff --git a/aos/events/simulated-event-loop.cc b/aos/events/simulated-event-loop.cc
index 04e4628..94cf67a 100644
--- a/aos/events/simulated-event-loop.cc
+++ b/aos/events/simulated-event-loop.cc
@@ -6,6 +6,7 @@
#include "aos/logging/logging.h"
#include "aos/queue.h"
#include "aos/testing/test_logging.h"
+#include "aos/util/phased_loop.h"
namespace aos {
namespace {
@@ -90,7 +91,6 @@
::std::deque<RefCountedBuffer> msgs_;
};
-
class SimulatedTimerHandler : public TimerHandler {
public:
explicit SimulatedTimerHandler(EventScheduler *scheduler,
@@ -132,6 +132,10 @@
}
}
+ ::aos::monotonic_clock::time_point monotonic_now() const {
+ return scheduler_->monotonic_now();
+ }
+
private:
EventScheduler *scheduler_;
EventScheduler::Token token_;
@@ -141,6 +145,49 @@
monotonic_clock::duration repeat_offset_;
};
+class SimulatedPhasedLoopHandler : public PhasedLoopHandler {
+ public:
+ SimulatedPhasedLoopHandler(EventScheduler *scheduler,
+ ::std::function<void(int)> fn,
+ const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset)
+ : simulated_timer_handler_(scheduler, [this]() { HandleTimerWakeup(); }),
+ phased_loop_(interval, simulated_timer_handler_.monotonic_now(),
+ offset),
+ fn_(fn) {
+ // TODO(austin): This assumes time doesn't change between when the
+ // constructor is called and when we start running. It's probably a safe
+ // assumption.
+ Reschedule();
+ }
+
+ void HandleTimerWakeup() {
+ fn_(cycles_elapsed_);
+ Reschedule();
+ }
+
+ void set_interval_and_offset(
+ const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset) override {
+ phased_loop_.set_interval_and_offset(interval, offset);
+ }
+
+ void Reschedule() {
+ cycles_elapsed_ =
+ phased_loop_.Iterate(simulated_timer_handler_.monotonic_now());
+ simulated_timer_handler_.Setup(phased_loop_.sleep_time(),
+ ::aos::monotonic_clock::zero());
+ }
+
+ private:
+ SimulatedTimerHandler simulated_timer_handler_;
+
+ time::PhasedLoop phased_loop_;
+
+ int cycles_elapsed_ = 1;
+
+ ::std::function<void(int)> fn_;
+};
class SimulatedEventLoop : public EventLoop {
public:
@@ -172,6 +219,15 @@
return timers_.back().get();
}
+ PhasedLoopHandler *AddPhasedLoop(::std::function<void(int)> callback,
+ const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset =
+ ::std::chrono::seconds(0)) override {
+ phased_loops_.emplace_back(
+ new SimulatedPhasedLoopHandler(scheduler_, callback, interval, offset));
+ return phased_loops_.back().get();
+ }
+
void OnRun(::std::function<void()> on_run) override {
scheduler_->Schedule(scheduler_->monotonic_now(), on_run);
}
@@ -200,6 +256,7 @@
*queues_;
::std::vector<std::string> taken_;
::std::vector<std::unique_ptr<TimerHandler>> timers_;
+ ::std::vector<std::unique_ptr<PhasedLoopHandler>> phased_loops_;
};
EventScheduler::Token EventScheduler::Schedule(
diff --git a/aos/events/simulated-event-loop_test.cc b/aos/events/simulated-event-loop_test.cc
index bcc6f28..be52243 100644
--- a/aos/events/simulated-event-loop_test.cc
+++ b/aos/events/simulated-event-loop_test.cc
@@ -18,6 +18,10 @@
void Run() override { event_loop_factory_.Run(); }
+ // TODO(austin): Implement this. It's used currently for a phased loop test.
+ // I'm not sure how much that matters.
+ void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
+
private:
SimulatedEventLoopFactory event_loop_factory_;
};