Add TimerHandler to event loop

Change-Id: I85c9142bcff9bf2cc5b8d003d24b8d77567fbe4a
diff --git a/aos/common/time.cc b/aos/common/time.cc
index e8bd25b..0fe3ee3 100644
--- a/aos/common/time.cc
+++ b/aos/common/time.cc
@@ -98,6 +98,22 @@
       chrono::duration_cast<chrono::nanoseconds>(offset).count();
 }
 
+struct timespec to_timespec(
+    const ::aos::monotonic_clock::duration duration) {
+  struct timespec time_timespec;
+  ::std::chrono::seconds sec =
+      ::std::chrono::duration_cast<::std::chrono::seconds>(duration);
+  ::std::chrono::nanoseconds nsec =
+      ::std::chrono::duration_cast<::std::chrono::nanoseconds>(duration - sec);
+  time_timespec.tv_sec = sec.count();
+  time_timespec.tv_nsec = nsec.count();
+  return time_timespec;
+}
+
+struct timespec to_timespec(
+    const ::aos::monotonic_clock::time_point time) {
+  return to_timespec(time.time_since_epoch());
+}
 }  // namespace time
 
 constexpr monotonic_clock::time_point monotonic_clock::min_time;
diff --git a/aos/common/time.h b/aos/common/time.h
index e541932..fe0bd5f 100644
--- a/aos/common/time.h
+++ b/aos/common/time.h
@@ -78,6 +78,13 @@
   DISALLOW_COPY_AND_ASSIGN(TimeFreezer);
 };
 
+// Converts a monotonic_clock::duration into a timespec object.
+struct timespec to_timespec(::aos::monotonic_clock::duration duration);
+
+// Converts a monotonic_clock::time_point into a timespec object as time since
+// epoch.
+struct timespec to_timespec(::aos::monotonic_clock::time_point time);
+
 }  // namespace time
 }  // namespace aos
 
diff --git a/aos/common/time_test.cc b/aos/common/time_test.cc
index 94102a2..14e049e 100644
--- a/aos/common/time_test.cc
+++ b/aos/common/time_test.cc
@@ -11,7 +11,6 @@
 namespace time {
 namespace testing {
 
-
 TEST(TimeTest, FromRate) {
   EXPECT_EQ(::std::chrono::milliseconds(10), FromRate(100));
 }
@@ -26,6 +25,30 @@
   EXPECT_LT(end - start, kSleepTime + ::std::chrono::milliseconds(200));
 }
 
+// Test to_timespec for a duration.
+TEST(TimeTest, DurationToTimespec) {
+  struct timespec pos_time = to_timespec(::std::chrono::milliseconds(56262));
+  EXPECT_EQ(pos_time.tv_sec, 56);
+  EXPECT_EQ(pos_time.tv_nsec, 262000000);
+
+  struct timespec neg_time = to_timespec(::std::chrono::milliseconds(-56262));
+  EXPECT_EQ(neg_time.tv_sec, -56);
+  EXPECT_EQ(neg_time.tv_nsec, -262000000);
+}
+
+// Test to_timespec for a time_point.
+TEST(TimeTest, TimePointToTimespec) {
+  struct timespec pos_time = to_timespec(::aos::monotonic_clock::epoch() +
+                                     ::std::chrono::seconds(1432423));
+  EXPECT_EQ(pos_time.tv_sec, 1432423);
+  EXPECT_EQ(pos_time.tv_nsec, 0);
+
+  struct timespec neg_time = to_timespec(::aos::monotonic_clock::epoch() -
+                                     ::std::chrono::seconds(1432423));
+  EXPECT_EQ(neg_time.tv_sec, -1432423);
+  EXPECT_EQ(neg_time.tv_nsec, 0);
+}
+
 }  // namespace testing
 }  // namespace time
 }  // namespace aos
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 80495fe..007154c 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -24,9 +24,9 @@
   hdrs = ["shm-event-loop.h"],
   srcs = ["shm-event-loop.cc"],
   deps = [
-    "//aos/common:queues",
-    "//aos/vision/events:intrusive_free_list",
     ":event-loop",
+    "//aos/common:queues",
+    "//aos/common/logging:logging",
   ],
 )
 
diff --git a/aos/events/event-loop_param_test.cc b/aos/events/event-loop_param_test.cc
index 6f04a7d..78e744a 100644
--- a/aos/events/event-loop_param_test.cc
+++ b/aos/events/event-loop_param_test.cc
@@ -13,6 +13,13 @@
   TestMessage() { Zero(); }
 };
 
+// Ends the given event loop at the given time from now.
+void EndEventLoop(EventLoop *loop, ::std::chrono::milliseconds duration) {
+  auto end_timer = loop->AddTimer([loop]() { loop->Exit(); });
+  end_timer->Setup(loop->monotonic_now() +
+                   ::std::chrono::milliseconds(duration));
+}
+
 // Tests that watcher and fetcher can fetch from a sender.
 // Also tests that OnRun() works.
 TEST_P(AbstractEventLoopTest, Basic) {
@@ -88,5 +95,68 @@
   loop->Run();
 }
 
+// Verify that timer intervals and duration function properly.
+TEST_P(AbstractEventLoopTest, TimerIntervalAndDuration) {
+  auto loop = Make();
+  ::std::vector<::aos::monotonic_clock::time_point> iteration_list;
+
+  auto test_timer = loop->AddTimer([&iteration_list, &loop]() {
+    iteration_list.push_back(loop->monotonic_now());
+  });
+
+  test_timer->Setup(loop->monotonic_now(), ::std::chrono::milliseconds(20));
+  EndEventLoop(loop.get(), ::std::chrono::milliseconds(150));
+  // Testing that the timer thread waits for the event loop to start before
+  // running
+  ::std::this_thread::sleep_for(std::chrono::milliseconds(2));
+  loop->Run();
+
+  EXPECT_EQ(iteration_list.size(), 8);
+}
+
+// Verify that we can change a timer's parameters during execution.
+TEST_P(AbstractEventLoopTest, TimerChangeParameters) {
+  auto loop = Make();
+  ::std::vector<::aos::monotonic_clock::time_point> iteration_list;
+
+  auto test_timer = loop->AddTimer([&iteration_list, &loop]() {
+    iteration_list.push_back(loop->monotonic_now());
+  });
+
+  auto modifier_timer = loop->AddTimer([&loop, &test_timer]() {
+    test_timer->Setup(loop->monotonic_now(), ::std::chrono::milliseconds(30));
+  });
+
+
+  test_timer->Setup(loop->monotonic_now(), ::std::chrono::milliseconds(20));
+  modifier_timer->Setup(loop->monotonic_now() +
+                        ::std::chrono::milliseconds(45));
+  EndEventLoop(loop.get(), ::std::chrono::milliseconds(150));
+  loop->Run();
+
+  EXPECT_EQ(iteration_list.size(), 7);
+}
+
+// Verify that we can disable a timer during execution.
+TEST_P(AbstractEventLoopTest, TimerDisable) {
+  auto loop = Make();
+  ::std::vector<::aos::monotonic_clock::time_point> iteration_list;
+
+  auto test_timer = loop->AddTimer([&iteration_list, &loop]() {
+    iteration_list.push_back(loop->monotonic_now());
+  });
+
+  auto ender_timer = loop->AddTimer([&test_timer]() {
+    test_timer->Disable();
+  });
+
+  test_timer->Setup(loop->monotonic_now(), ::std::chrono::milliseconds(20));
+  ender_timer->Setup(loop->monotonic_now() +
+                        ::std::chrono::milliseconds(45));
+  EndEventLoop(loop.get(), ::std::chrono::milliseconds(150));
+  loop->Run();
+
+  EXPECT_EQ(iteration_list.size(), 3);
+}
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/events/event-loop_param_test.h b/aos/events/event-loop_param_test.h
index ae215db..a4e4b8e 100644
--- a/aos/events/event-loop_param_test.h
+++ b/aos/events/event-loop_param_test.h
@@ -1,6 +1,8 @@
 #ifndef _AOS_EVENTS_EVENT_LOOP_PARAM_TEST_H_
 #define _AOS_EVENTS_EVENT_LOOP_PARAM_TEST_H_
 
+#include <vector>
+
 #include "aos/events/event-loop.h"
 #include "gtest/gtest.h"
 
diff --git a/aos/events/raw-event-loop.h b/aos/events/raw-event-loop.h
index 38ceaec..5a0df1e 100644
--- a/aos/events/raw-event-loop.h
+++ b/aos/events/raw-event-loop.h
@@ -85,6 +85,21 @@
   }
 };
 
+// Interface for timers
+class TimerHandler {
+ public:
+  virtual ~TimerHandler() {}
+
+  // Timer should sleep until base, base + offset, base + offset * 2, ...
+  // If repeat_offset isn't set, the timer only expires once.
+  virtual void Setup(monotonic_clock::time_point base,
+                     monotonic_clock::duration repeat_offset =
+                         ::aos::monotonic_clock::zero()) = 0;
+
+  // Stop future calls to callback().
+  virtual void Disable() = 0;
+};
+
 // Virtual base class for all event queue-types.
 class RawEventLoop {
  public:
@@ -99,6 +114,10 @@
 
   bool is_running() const { return is_running_.load(); }
 
+  // Creates a timer that executes callback when the timer expires
+  // Returns a TimerHandle for configuration of the timer
+  virtual TimerHandler *AddTimer(::std::function<void()> callback) = 0;
+
   // Starts receiving events.
   virtual void Run() = 0;
 
diff --git a/aos/events/shm-event-loop.cc b/aos/events/shm-event-loop.cc
index d227a48..832b27b 100644
--- a/aos/events/shm-event-loop.cc
+++ b/aos/events/shm-event-loop.cc
@@ -1,6 +1,8 @@
 #include "aos/events/shm-event-loop.h"
+#include "aos/common/logging/logging.h"
 #include "aos/common/queue.h"
 
+#include <sys/timerfd.h>
 #include <atomic>
 #include <chrono>
 #include <stdexcept>
@@ -91,7 +93,7 @@
       }
 
       {
-        MutexLocker locker2(&thread_state_->mutex_);
+        MutexLocker locker(&thread_state_->mutex_);
         if (!thread_state_->is_running()) break;
 
         watcher_(reinterpret_cast<const Message *>(msg));
@@ -110,6 +112,61 @@
   RawQueue *queue_;
   std::function<void(const Message *message)> watcher_;
 };
+
+class TimerHandlerState : public TimerHandler {
+ public:
+  TimerHandlerState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
+                    ::std::function<void()> fn)
+      : thread_state_(std::move(thread_state)), fn_(::std::move(fn)) {
+    fd_ = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
+    PCHECK(fd_ != -1);
+  }
+
+  ~TimerHandlerState() {
+    PCHECK(close(fd_) == 0);
+  }
+
+  void Setup(monotonic_clock::time_point base,
+             monotonic_clock::duration repeat_offset) override {
+    struct itimerspec new_value;
+    new_value.it_interval = ::aos::time::to_timespec(repeat_offset);
+    new_value.it_value = ::aos::time::to_timespec(base);
+    PCHECK(timerfd_settime(fd_, TFD_TIMER_ABSTIME, &new_value, nullptr) == 0);
+  }
+
+  void Disable() override {
+    // Disarm the timer by feeding zero values
+    Setup(::aos::monotonic_clock::epoch(), ::aos::monotonic_clock::zero());
+  }
+
+  void Run() {
+    thread_state_->WaitForStart();
+
+    while (true) {
+      uint64_t buf;
+      ssize_t result = read(fd_, &buf, sizeof(buf));
+      PCHECK(result != -1);
+      CHECK_EQ(result, static_cast<int>(sizeof(buf)));
+
+      {
+        MutexLocker locker(&thread_state_->mutex_);
+        if (!thread_state_->is_running()) break;
+        fn_();
+        // fn_ may have exited the event loop.
+        if (!thread_state_->is_running()) break;
+      }
+    }
+  }
+
+ private:
+  std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
+
+  // File descriptor for the timer
+  int fd_;
+
+  // Function to be run on the thread
+  ::std::function<void()> fn_;
+};
 }  // namespace internal
 
 std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
@@ -141,6 +198,19 @@
   thread.detach();
 }
 
+TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
+  internal::TimerHandlerState *timer =
+      new internal::TimerHandlerState(thread_state_, ::std::move(callback));
+
+  ::std::thread t([timer] {
+    timer->Run();
+    delete timer;
+  });
+  t.detach();
+
+  return timer;
+}
+
 void ShmEventLoop::OnRun(std::function<void()> on_run) {
   on_run_.push_back(std::move(on_run));
 }
diff --git a/aos/events/shm-event-loop.h b/aos/events/shm-event-loop.h
index 9ee7926..e40ccce 100644
--- a/aos/events/shm-event-loop.h
+++ b/aos/events/shm-event-loop.h
@@ -8,6 +8,7 @@
 namespace internal {
 
 class WatcherThreadState;
+class TimerHandlerState;
 
 }  // namespace internal
 
@@ -31,12 +32,15 @@
       const std::string &path, const QueueTypeInfo &type,
       std::function<void(const aos::Message *message)> watcher) override;
 
+  TimerHandler *AddTimer(::std::function<void()> callback) override;
+
   void OnRun(std::function<void()> on_run) override;
   void Run() override;
   void Exit() override;
 
  private:
   friend class internal::WatcherThreadState;
+  friend class internal::TimerHandlerState;
   // This ThreadState ensures that two watchers in the same loop cannot be
   // triggered concurrently.  Because watchers block threads indefinitely, this
   // has to be shared_ptr in case the EventLoop is destroyed before the thread
@@ -53,6 +57,7 @@
 
    private:
     friend class internal::WatcherThreadState;
+    friend class internal::TimerHandlerState;
     friend class ShmEventLoop;
 
     // This mutex ensures that only one watch event happens at a time.