Add AddPhasedLoop to support phased loops

This lets us convert phased loops over nicely in various places.

Change-Id: Icdde4520f991fc541fbbe7ab3d9b945bb8c12e83
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 7c2db52..aaa1c86 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -43,6 +43,7 @@
         "//aos:init",
         "//aos:queues",
         "//aos/logging",
+        "//aos/util:phased_loop",
     ],
 )
 
@@ -89,5 +90,6 @@
         "//aos:queues",
         "//aos/logging",
         "//aos/testing:test_logging",
+        "//aos/util:phased_loop",
     ],
 )
diff --git a/aos/events/event-loop_param_test.cc b/aos/events/event-loop_param_test.cc
index c74538c..0a10ecb 100644
--- a/aos/events/event-loop_param_test.cc
+++ b/aos/events/event-loop_param_test.cc
@@ -1,10 +1,15 @@
 #include "aos/events/event-loop_param_test.h"
 
+#include <chrono>
+
 #include "gmock/gmock.h"
 #include "gtest/gtest.h"
 
 namespace aos {
 namespace testing {
+namespace {
+namespace chrono = ::std::chrono;
+}  // namespace
 
 struct TestMessage : public ::aos::Message {
   enum { kQueueLength = 100, kHash = 0x696c0cdc };
@@ -462,6 +467,7 @@
     iteration_list.push_back(loop->monotonic_now());
   });
 
+  // TODO(austin): This should be an error...  Should be done in OnRun only.
   test_timer->Setup(loop->monotonic_now(), ::std::chrono::milliseconds(20));
   EndEventLoop(loop.get(), ::std::chrono::milliseconds(150));
   // Testing that the timer thread waits for the event loop to start before
@@ -485,7 +491,6 @@
     test_timer->Setup(loop->monotonic_now(), ::std::chrono::milliseconds(30));
   });
 
-
   test_timer->Setup(loop->monotonic_now(), ::std::chrono::milliseconds(20));
   modifier_timer->Setup(loop->monotonic_now() +
                         ::std::chrono::milliseconds(45));
@@ -541,12 +546,76 @@
       fetcher->sent_time - (loop1->monotonic_now() - ::std::chrono::seconds(1));
 
   EXPECT_TRUE(time_offset > ::std::chrono::milliseconds(-500))
-      << ": Got " << fetcher->sent_time.time_since_epoch().count() << " expected "
-      << loop1->monotonic_now().time_since_epoch().count();
+      << ": Got " << fetcher->sent_time.time_since_epoch().count()
+      << " expected " << loop1->monotonic_now().time_since_epoch().count();
   EXPECT_TRUE(time_offset < ::std::chrono::milliseconds(500))
       << ": Got " << fetcher->sent_time.time_since_epoch().count()
       << " expected " << loop1->monotonic_now().time_since_epoch().count();
 }
 
+// Tests that a couple phased loops run in a row result in the correct offset
+// and period.
+TEST_P(AbstractEventLoopTest, PhasedLoopTest) {
+  const chrono::milliseconds kOffset = chrono::milliseconds(400);
+  const int kCount = 5;
+
+  auto loop1 = MakePrimary();
+
+  // Collect up a couple of samples.
+  ::std::vector<::aos::monotonic_clock::time_point> times;
+
+  // Run kCount iterations.
+  loop1->AddPhasedLoop(
+      [&times, &loop1](int count) {
+        EXPECT_EQ(count, 1);
+        times.push_back(loop1->monotonic_now());
+        LOG(INFO, "%zu\n", times.size());
+        if (times.size() == kCount) {
+          loop1->Exit();
+        }
+      },
+      chrono::seconds(1), kOffset);
+
+  // Add a delay to make sure that delay during startup doesn't result in a
+  // "missed cycle".
+  SleepFor(chrono::seconds(2));
+
+  Run();
+
+  // Confirm that we got both the right number of samples, and it's odd.
+  EXPECT_EQ(times.size(), static_cast<size_t>(kCount));
+  EXPECT_EQ((times.size() % 2), 1);
+
+  // Grab the middle sample.
+  ::aos::monotonic_clock::time_point middle_time = times[times.size() / 2 + 1];
+
+  // Add up all the delays of all the times.
+  ::aos::monotonic_clock::duration sum = chrono::seconds(0);
+  for (const ::aos::monotonic_clock::time_point time : times) {
+    sum += time - middle_time;
+  }
+
+  // Average and add to the middle to find the average time.
+  sum /= times.size();
+  middle_time += sum;
+
+  // Compute the offset from the start of the second of the average time.  This
+  // should be pretty close to the offset.
+  const ::aos::monotonic_clock::duration remainder =
+      middle_time.time_since_epoch() -
+      chrono::duration_cast<chrono::seconds>(middle_time.time_since_epoch());
+
+  const chrono::milliseconds kEpsilon(100);
+  EXPECT_LT(remainder, kOffset + kEpsilon);
+  EXPECT_GT(remainder, kOffset - kEpsilon);
+
+  // Make sure that the average duration is close to 1 second.
+  EXPECT_NEAR(chrono::duration_cast<chrono::duration<double>>(times.back() -
+                                                              times.front())
+                      .count() /
+                  static_cast<double>(times.size() - 1),
+              1.0, 0.1);
+}
+
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/events/event-loop_param_test.h b/aos/events/event-loop_param_test.h
index 1d2060a..26d869d 100644
--- a/aos/events/event-loop_param_test.h
+++ b/aos/events/event-loop_param_test.h
@@ -21,6 +21,9 @@
 
   // Runs the loops until they quit.
   virtual void Run() = 0;
+
+  // Advances time by sleeping.  Can't be called from inside a loop.
+  virtual void SleepFor(::std::chrono::nanoseconds duration) = 0;
 };
 
 class AbstractEventLoopTestBase
@@ -32,6 +35,10 @@
   ::std::unique_ptr<EventLoop> MakePrimary() { return factory_->MakePrimary(); }
 
   void Run() { return factory_->Run(); }
+
+  void SleepFor(::std::chrono::nanoseconds duration) {
+    return factory_->SleepFor(duration);
+  }
   // You can implement all the usual fixture class members here.
   // To access the test parameter, call GetParam() from class
   // TestWithParam<T>.
diff --git a/aos/events/raw-event-loop.h b/aos/events/raw-event-loop.h
index be605e9..6ff7179 100644
--- a/aos/events/raw-event-loop.h
+++ b/aos/events/raw-event-loop.h
@@ -112,6 +112,18 @@
   virtual void Disable() = 0;
 };
 
+// Interface for phased loops.  They are built on timers.
+class PhasedLoopHandler {
+ public:
+  virtual ~PhasedLoopHandler() {}
+
+  // Sets the interval and offset.  Any changes to interval and offset only take
+  // effect when the handler finishes running.
+  virtual void set_interval_and_offset(
+      const monotonic_clock::duration interval,
+      const monotonic_clock::duration offset) = 0;
+};
+
 class EventScheduler;
 
 // Virtual base class for all event queue-types.
@@ -126,16 +138,25 @@
   // Use this to run code once the thread goes into "real-time-mode",
   virtual void OnRun(::std::function<void()> on_run) = 0;
 
+  // Threadsafe.
   bool is_running() const { return is_running_.load(); }
 
   // Creates a timer that executes callback when the timer expires
   // Returns a TimerHandle for configuration of the timer
   virtual TimerHandler *AddTimer(::std::function<void()> callback) = 0;
 
+  // Creates a timer that executes callback periodically at the specified
+  // interval and offset.  Returns a PhasedLoopHandler for interacting with the
+  // timer.
+  virtual PhasedLoopHandler *AddPhasedLoop(
+      ::std::function<void(int)> callback,
+      const monotonic_clock::duration interval,
+      const monotonic_clock::duration offset = ::std::chrono::seconds(0)) = 0;
+
   // Stops receiving events.
   virtual void Exit() = 0;
 
-  // TODO(austin): This shouldn't belong.
+  // TODO(austin): This shouldn't belong here.
   virtual void Run() = 0;
 
  protected:
diff --git a/aos/events/shm-event-loop.cc b/aos/events/shm-event-loop.cc
index 08f9e1f..8c0f3a2 100644
--- a/aos/events/shm-event-loop.cc
+++ b/aos/events/shm-event-loop.cc
@@ -10,6 +10,7 @@
 #include "aos/init.h"
 #include "aos/logging/logging.h"
 #include "aos/queue.h"
+#include "aos/util/phased_loop.h"
 
 namespace aos {
 
@@ -244,6 +245,8 @@
 };
 
 // Adapter class to adapt a timerfd to a TimerHandler.
+// The part of the API which is accessed by the TimerHandler interface needs to
+// be threadsafe.  This means Setup and Disable.
 class TimerHandlerState : public TimerHandler {
  public:
   TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
@@ -259,10 +262,14 @@
 
   void Setup(monotonic_clock::time_point base,
              monotonic_clock::duration repeat_offset) override {
+    // SetTime is threadsafe already.
     timerfd_.SetTime(base, repeat_offset);
   }
 
-  void Disable() override { timerfd_.Disable(); }
+  void Disable() override {
+    // Disable is also threadsafe already.
+    timerfd_.Disable();
+  }
 
  private:
   ShmEventLoop *shm_event_loop_;
@@ -272,6 +279,70 @@
   // Function to be run on the thread
   ::std::function<void()> fn_;
 };
+
+// Adapter class to the timerfd and PhasedLoop.
+// The part of the API which is accessed by the PhasedLoopHandler interface
+// needs to be threadsafe.  This means set_interval_and_offset
+class PhasedLoopHandler : public ::aos::PhasedLoopHandler {
+ public:
+  PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
+                    const monotonic_clock::duration interval,
+                    const monotonic_clock::duration offset)
+      : shm_event_loop_(shm_event_loop),
+        phased_loop_(interval, shm_event_loop_->monotonic_now(), offset),
+        fn_(::std::move(fn)) {
+    shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
+      MutexLocker locker(&shm_event_loop_->thread_state_.mutex_);
+      {
+        MutexLocker locker(&mutex_);
+        timerfd_.Read();
+      }
+      // Call the function.  To avoid needing a recursive mutex, drop the lock
+      // before running the function.
+      fn_(cycles_elapsed_);
+      {
+        MutexLocker locker(&mutex_);
+        Reschedule();
+      }
+    });
+  }
+
+  ~PhasedLoopHandler() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
+
+  void set_interval_and_offset(
+      const monotonic_clock::duration interval,
+      const monotonic_clock::duration offset) override {
+    MutexLocker locker(&mutex_);
+    phased_loop_.set_interval_and_offset(interval, offset);
+  }
+
+  void Startup() {
+    MutexLocker locker(&mutex_);
+    phased_loop_.Reset(shm_event_loop_->monotonic_now());
+    Reschedule();
+  }
+
+ private:
+  // Reschedules the timer.  Must be called with the mutex held.
+  void Reschedule() {
+    cycles_elapsed_ = phased_loop_.Iterate(shm_event_loop_->monotonic_now());
+    timerfd_.SetTime(phased_loop_.sleep_time(), ::aos::monotonic_clock::zero());
+  }
+
+  ShmEventLoop *shm_event_loop_;
+
+  // Mutex to protect access to the timerfd_ (not strictly necessary), and the
+  // phased_loop (necessary).
+  ::aos::Mutex mutex_;
+
+  TimerFd timerfd_;
+  time::PhasedLoop phased_loop_;
+
+  int cycles_elapsed_ = 1;
+
+  // Function to be run
+  const ::std::function<void(int)> fn_;
+};
 }  // namespace internal
 
 ::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
@@ -308,6 +379,19 @@
   return timers_.back().get();
 }
 
+PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
+    ::std::function<void(int)> callback,
+    const monotonic_clock::duration interval,
+    const monotonic_clock::duration offset) {
+  ::std::unique_ptr<internal::PhasedLoopHandler> phased_loop(
+      new internal::PhasedLoopHandler(this, ::std::move(callback), interval,
+                                      offset));
+
+  phased_loops_.push_back(::std::move(phased_loop));
+
+  return phased_loops_.back().get();
+}
+
 void ShmEventLoop::OnRun(::std::function<void()> on_run) {
   on_run_.push_back(::std::move(on_run));
 }
@@ -332,6 +416,12 @@
   for (const auto &run : on_run_) {
     run();
   }
+
+  // Start up all the phased loops.
+  for (::std::unique_ptr<internal::PhasedLoopHandler> &phased_loop :
+       phased_loops_) {
+    phased_loop->Startup();
+  }
   // TODO(austin): We don't need a separate watcher thread if there are only
   // watchers and fetchers.  Could lazily create the epoll loop and pick a
   // victim watcher to run in this thread.
diff --git a/aos/events/shm-event-loop.h b/aos/events/shm-event-loop.h
index 79101de..91ca342 100644
--- a/aos/events/shm-event-loop.h
+++ b/aos/events/shm-event-loop.h
@@ -14,6 +14,7 @@
 
 class WatcherThreadState;
 class TimerHandlerState;
+class PhasedLoopHandler;
 
 }  // namespace internal
 
@@ -42,6 +43,11 @@
       ::std::function<void(const aos::Message *message)> watcher) override;
 
   TimerHandler *AddTimer(::std::function<void()> callback) override;
+  ::aos::PhasedLoopHandler *AddPhasedLoop(
+      ::std::function<void(int)> callback,
+      const monotonic_clock::duration interval,
+      const monotonic_clock::duration offset =
+          ::std::chrono::seconds(0)) override;
 
   void OnRun(::std::function<void()> on_run) override;
   void Run() override;
@@ -59,6 +65,7 @@
  private:
   friend class internal::WatcherThreadState;
   friend class internal::TimerHandlerState;
+  friend class internal::PhasedLoopHandler;
   // This ThreadState ensures that two watchers in the same loop cannot be
   // triggered concurrently.  Because watchers block threads indefinitely, this
   // has to be shared_ptr in case the EventLoop is destroyed before the thread
@@ -78,6 +85,7 @@
    private:
     friend class internal::WatcherThreadState;
     friend class internal::TimerHandlerState;
+    friend class internal::PhasedLoopHandler;
     friend class ShmEventLoop;
 
     // This mutex ensures that only one watch event happens at a time.
@@ -100,6 +108,7 @@
   internal::EPoll epoll_;
 
   ::std::vector<::std::unique_ptr<internal::TimerHandlerState>> timers_;
+  ::std::vector<::std::unique_ptr<internal::PhasedLoopHandler>> phased_loops_;
   ::std::vector<::std::unique_ptr<internal::WatcherThreadState>> watchers_;
 };
 
diff --git a/aos/events/shm-event-loop_test.cc b/aos/events/shm-event-loop_test.cc
index 61adbe3..8d0f552 100644
--- a/aos/events/shm-event-loop_test.cc
+++ b/aos/events/shm-event-loop_test.cc
@@ -24,6 +24,10 @@
 
   void Run() override { CHECK_NOTNULL(primary_event_loop_)->Run(); }
 
+  void SleepFor(::std::chrono::nanoseconds duration) override {
+    ::std::this_thread::sleep_for(duration);
+  }
+
  private:
   ::aos::testing::TestSharedMemory my_shm_;
 
@@ -102,5 +106,55 @@
   EXPECT_TRUE(did_timer);
   EXPECT_TRUE(did_watcher);
 }
+
+// Tests that missing a deadline inside the function still results in PhasedLoop
+// running at the right offset.
+TEST(ShmEventLoopTest, DelayedPhasedLoop) {
+  ShmEventLoopTestFactory factory;
+  auto loop1 = factory.MakePrimary();
+
+  ::std::vector<::aos::monotonic_clock::time_point> times;
+
+  constexpr chrono::milliseconds kOffset = chrono::milliseconds(400);
+
+  loop1->AddPhasedLoop(
+      [&times, &loop1, &kOffset](int count) {
+        const ::aos::monotonic_clock::time_point monotonic_now =
+            loop1->monotonic_now();
+
+        // Compute our offset.
+        const ::aos::monotonic_clock::duration remainder =
+            monotonic_now.time_since_epoch() -
+            chrono::duration_cast<chrono::seconds>(
+                monotonic_now.time_since_epoch());
+
+        // Make sure we we are called near where we should be even when we
+        // delay.
+        constexpr chrono::milliseconds kEpsilon(200);
+        EXPECT_LT(remainder, kOffset + kEpsilon);
+        EXPECT_GT(remainder, kOffset - kEpsilon);
+
+        // Confirm that we see the missed count when we sleep.
+        if (times.size() == 0) {
+          EXPECT_EQ(count, 1);
+        } else {
+          EXPECT_EQ(count, 3);
+        }
+
+        times.push_back(loop1->monotonic_now());
+        if (times.size() == 2) {
+          loop1->Exit();
+        }
+
+        // Now, add a large delay.  This should push us up to 3 cycles.
+        ::std::this_thread::sleep_for(chrono::milliseconds(2500));
+      },
+      chrono::seconds(1), kOffset);
+
+  factory.Run();
+
+  EXPECT_EQ(times.size(), 2u);
+}
+
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/events/simulated-event-loop.cc b/aos/events/simulated-event-loop.cc
index 04e4628..94cf67a 100644
--- a/aos/events/simulated-event-loop.cc
+++ b/aos/events/simulated-event-loop.cc
@@ -6,6 +6,7 @@
 #include "aos/logging/logging.h"
 #include "aos/queue.h"
 #include "aos/testing/test_logging.h"
+#include "aos/util/phased_loop.h"
 
 namespace aos {
 namespace {
@@ -90,7 +91,6 @@
   ::std::deque<RefCountedBuffer> msgs_;
 };
 
-
 class SimulatedTimerHandler : public TimerHandler {
  public:
   explicit SimulatedTimerHandler(EventScheduler *scheduler,
@@ -132,6 +132,10 @@
     }
   }
 
+  ::aos::monotonic_clock::time_point monotonic_now() const {
+    return scheduler_->monotonic_now();
+  }
+
  private:
   EventScheduler *scheduler_;
   EventScheduler::Token token_;
@@ -141,6 +145,49 @@
   monotonic_clock::duration repeat_offset_;
 };
 
+class SimulatedPhasedLoopHandler : public PhasedLoopHandler {
+ public:
+  SimulatedPhasedLoopHandler(EventScheduler *scheduler,
+                             ::std::function<void(int)> fn,
+                             const monotonic_clock::duration interval,
+                             const monotonic_clock::duration offset)
+      : simulated_timer_handler_(scheduler, [this]() { HandleTimerWakeup(); }),
+        phased_loop_(interval, simulated_timer_handler_.monotonic_now(),
+                     offset),
+        fn_(fn) {
+    // TODO(austin): This assumes time doesn't change between when the
+    // constructor is called and when we start running.  It's probably a safe
+    // assumption.
+    Reschedule();
+  }
+
+  void HandleTimerWakeup() {
+    fn_(cycles_elapsed_);
+    Reschedule();
+  }
+
+  void set_interval_and_offset(
+      const monotonic_clock::duration interval,
+      const monotonic_clock::duration offset) override {
+    phased_loop_.set_interval_and_offset(interval, offset);
+  }
+
+  void Reschedule() {
+    cycles_elapsed_ =
+        phased_loop_.Iterate(simulated_timer_handler_.monotonic_now());
+    simulated_timer_handler_.Setup(phased_loop_.sleep_time(),
+                                   ::aos::monotonic_clock::zero());
+  }
+
+ private:
+  SimulatedTimerHandler simulated_timer_handler_;
+
+  time::PhasedLoop phased_loop_;
+
+  int cycles_elapsed_ = 1;
+
+  ::std::function<void(int)> fn_;
+};
 
 class SimulatedEventLoop : public EventLoop {
  public:
@@ -172,6 +219,15 @@
     return timers_.back().get();
   }
 
+  PhasedLoopHandler *AddPhasedLoop(::std::function<void(int)> callback,
+                                   const monotonic_clock::duration interval,
+                                   const monotonic_clock::duration offset =
+                                       ::std::chrono::seconds(0)) override {
+    phased_loops_.emplace_back(
+        new SimulatedPhasedLoopHandler(scheduler_, callback, interval, offset));
+    return phased_loops_.back().get();
+  }
+
   void OnRun(::std::function<void()> on_run) override {
     scheduler_->Schedule(scheduler_->monotonic_now(), on_run);
   }
@@ -200,6 +256,7 @@
       *queues_;
   ::std::vector<std::string> taken_;
   ::std::vector<std::unique_ptr<TimerHandler>> timers_;
+  ::std::vector<std::unique_ptr<PhasedLoopHandler>> phased_loops_;
 };
 
 EventScheduler::Token EventScheduler::Schedule(
diff --git a/aos/events/simulated-event-loop_test.cc b/aos/events/simulated-event-loop_test.cc
index bcc6f28..be52243 100644
--- a/aos/events/simulated-event-loop_test.cc
+++ b/aos/events/simulated-event-loop_test.cc
@@ -18,6 +18,10 @@
 
   void Run() override { event_loop_factory_.Run(); }
 
+  // TODO(austin): Implement this.  It's used currently for a phased loop test.
+  // I'm not sure how much that matters.
+  void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
+
  private:
    SimulatedEventLoopFactory event_loop_factory_;
 };