Convert ControlLoop over to using EventLoop

Next step is to pass in the event loop and flip over to using it in
tests and at runtime.

Change-Id: Ia3b8f099e4eacc0ffa8fbfe00d651f3684a8c783
diff --git a/aos/controls/BUILD b/aos/controls/BUILD
index d597e19..8c44823 100644
--- a/aos/controls/BUILD
+++ b/aos/controls/BUILD
@@ -91,6 +91,8 @@
     deps = [
         ":control_loop_queues",
         "//aos:queues",
+        "//aos/events:event-loop",
+        "//aos/events:shm-event-loop",
         "//aos/logging",
         "//aos/logging:queue_logging",
         "//aos/robot_state",
diff --git a/aos/controls/control_loop-tmpl.h b/aos/controls/control_loop-tmpl.h
index 1834728..6953575 100644
--- a/aos/controls/control_loop-tmpl.h
+++ b/aos/controls/control_loop-tmpl.h
@@ -17,26 +17,35 @@
 
 template <class T>
 void ControlLoop<T>::ZeroOutputs() {
-  aos::ScopedMessagePtr<OutputType> output =
-      control_loop_->output.MakeMessage();
+  typename ::aos::Sender<OutputType>::Message output =
+      output_sender_.MakeMessage();
   Zero(output.get());
   output.Send();
 }
 
 template <class T>
 void ControlLoop<T>::Iterate() {
+  control_loop_->position.FetchAnother();
+  IteratePosition(*control_loop_->position.get());
+}
+
+template <class T>
+void ControlLoop<T>::IteratePosition(const PositionType &position) {
+  // Since Exit() isn't async safe, we want to call Exit from the periodic
+  // handler.
+  if (!run_) {
+    event_loop_->Exit();
+  }
   no_goal_.Print();
   no_sensor_state_.Print();
   motors_off_log_.Print();
 
-  control_loop_->position.FetchAnother();
-  const PositionType *const position = control_loop_->position.get();
-  LOG_STRUCT(DEBUG, "position", *position);
+  LOG_STRUCT(DEBUG, "position", position);
 
   // Fetch the latest control loop goal. If there is no new
   // goal, we will just reuse the old one.
-  control_loop_->goal.FetchLatest();
-  const GoalType *goal = control_loop_->goal.get();
+  goal_fetcher_.Fetch();
+  const GoalType *goal = goal_fetcher_.get();
   if (goal) {
     LOG_STRUCT(DEBUG, "goal", *goal);
   } else {
@@ -77,23 +86,23 @@
     outputs_enabled = false;
   }
 
-  aos::ScopedMessagePtr<StatusType> status =
-      control_loop_->status.MakeMessage();
+  typename ::aos::Sender<StatusType>::Message status =
+      status_sender_.MakeMessage();
   if (status.get() == nullptr) {
     return;
   }
 
   if (outputs_enabled) {
-    aos::ScopedMessagePtr<OutputType> output =
-        control_loop_->output.MakeMessage();
-    RunIteration(goal, position, output.get(), status.get());
+    typename ::aos::Sender<OutputType>::Message output =
+        output_sender_.MakeMessage();
+    RunIteration(goal, &position, output.get(), status.get());
 
     output->SetTimeToNow();
     LOG_STRUCT(DEBUG, "output", *output);
     output.Send();
   } else {
     // The outputs are disabled, so pass nullptr in for the output.
-    RunIteration(goal, position, nullptr, status.get());
+    RunIteration(goal, &position, nullptr, status.get());
     ZeroOutputs();
   }
 
@@ -113,9 +122,12 @@
   PCHECK(sigaction(SIGQUIT, &action, nullptr));
   PCHECK(sigaction(SIGINT, &action, nullptr));
 
-  while (run_) {
-    Iterate();
-  }
+  event_loop_->MakeWatcher(::std::string(control_loop_->name()) + ".position",
+                           [this](const PositionType &position) {
+                             this->IteratePosition(position);
+                           });
+
+  event_loop_->Run();
   LOG(INFO, "Shutting down\n");
 }
 
diff --git a/aos/controls/control_loop.h b/aos/controls/control_loop.h
index 335a1bb..0b6e391 100644
--- a/aos/controls/control_loop.h
+++ b/aos/controls/control_loop.h
@@ -4,6 +4,8 @@
 #include <string.h>
 #include <atomic>
 
+#include "aos/events/event-loop.h"
+#include "aos/events/shm-event-loop.h"
 #include "aos/queue.h"
 #include "aos/time/time.h"
 #include "aos/type_traits/type_traits.h"
@@ -49,7 +51,15 @@
     decltype(*(static_cast<T *>(NULL)->output.MakeMessage().get()))>::type
       OutputType;
 
-  ControlLoop(T *control_loop) : control_loop_(control_loop) {}
+  ControlLoop(T *control_loop)
+      : event_loop_(new ::aos::ShmEventLoop()), control_loop_(control_loop) {
+    output_sender_ = event_loop_->MakeSender<OutputType>(
+        ::std::string(control_loop_->name()) + ".output");
+    status_sender_ = event_loop_->MakeSender<StatusType>(
+        ::std::string(control_loop_->name()) + ".status");
+    goal_fetcher_ = event_loop_->MakeFetcher<GoalType>(
+        ::std::string(control_loop_->name()) + ".goal");
+  }
 
   // Returns true if all the counters etc in the sensor data have been reset.
   // This will return true only a single time per reset.
@@ -79,6 +89,8 @@
   void Iterate() override;
 
  protected:
+  void IteratePosition(const PositionType &position);
+
   static void Quit(int /*signum*/) {
     run_ = false;
   }
@@ -108,8 +120,13 @@
       ::std::chrono::milliseconds(100);
 
   // Pointer to the queue group
+  ::std::unique_ptr<EventLoop> event_loop_;
   T *control_loop_;
 
+  ::aos::Sender<OutputType> output_sender_;
+  ::aos::Sender<StatusType> status_sender_;
+  ::aos::Fetcher<GoalType> goal_fetcher_;
+
   bool reset_ = false;
   int32_t sensor_reader_pid_ = 0;
 
diff --git a/aos/events/BUILD b/aos/events/BUILD
index ad888f6..a5e83a9 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -1,74 +1,77 @@
-
 cc_library(
-  name = "raw-event-loop",
-  hdrs = ["raw-event-loop.h"],
-  deps = [
-    "//aos/time:time",
-    "//aos:queues",
-  ],
+    name = "raw-event-loop",
+    hdrs = ["raw-event-loop.h"],
+    deps = [
+        "//aos:queues",
+        "//aos/time",
+    ],
 )
 
 cc_library(
-  name = "event-loop",
-  hdrs = ["event-loop.h", "raw-event-loop.h"],
-  srcs = ["event-loop-tmpl.h"],
-  deps = [
-    ":raw-event-loop",
-    "//aos/time:time",
-    "//aos:queues",
-  ],
+    name = "event-loop",
+    srcs = ["event-loop-tmpl.h"],
+    hdrs = [
+        "event-loop.h",
+        "raw-event-loop.h",
+    ],
+    visibility = ["//visibility:public"],
+    deps = [
+        ":raw-event-loop",
+        "//aos:queues",
+        "//aos/time",
+    ],
 )
 
 cc_library(
-  name = "shm-event-loop",
-  hdrs = ["shm-event-loop.h"],
-  srcs = ["shm-event-loop.cc"],
-  deps = [
-    ":event-loop",
-    "//aos:queues",
-    "//aos/logging:logging",
-  ],
+    name = "shm-event-loop",
+    srcs = ["shm-event-loop.cc"],
+    hdrs = ["shm-event-loop.h"],
+    visibility = ["//visibility:public"],
+    deps = [
+        ":event-loop",
+        "//aos:queues",
+        "//aos/logging",
+    ],
 )
 
 cc_test(
-  name = "shm-event-loop_test",
-  srcs = ["shm-event-loop_test.cc"],
-  deps = [
-    ":event-loop_param_test",
-    ":shm-event-loop",
-    "//aos/testing:test_shm",
-  ],
+    name = "shm-event-loop_test",
+    srcs = ["shm-event-loop_test.cc"],
+    deps = [
+        ":event-loop_param_test",
+        ":shm-event-loop",
+        "//aos/testing:test_shm",
+    ],
 )
 
 cc_library(
-  name = "event-loop_param_test",
-  srcs = ["event-loop_param_test.cc"],
-  hdrs = ["event-loop_param_test.h"],
-  deps = [
-    "event-loop",
-    "//aos/testing:googletest",
-  ],
-  testonly = True,
+    name = "event-loop_param_test",
+    testonly = True,
+    srcs = ["event-loop_param_test.cc"],
+    hdrs = ["event-loop_param_test.h"],
+    deps = [
+        "event-loop",
+        "//aos/testing:googletest",
+    ],
 )
 
 cc_test(
-  name = "simulated-event-loop_test",
-  srcs = ["simulated-event-loop_test.cc"],
-  deps = [
-    "//aos/testing:googletest",
-    ":event-loop_param_test",
-    ":simulated-event-loop",
-  ],
-  testonly = True,
+    name = "simulated-event-loop_test",
+    testonly = True,
+    srcs = ["simulated-event-loop_test.cc"],
+    deps = [
+        ":event-loop_param_test",
+        ":simulated-event-loop",
+        "//aos/testing:googletest",
+    ],
 )
 
 cc_library(
-  name = "simulated-event-loop",
-  hdrs = ["simulated-event-loop.h"],
-  srcs = ["simulated-event-loop.cc"],
-  deps = [
-    ":event-loop",
-    "//aos:queues",
-  ],
+    name = "simulated-event-loop",
+    srcs = ["simulated-event-loop.cc"],
+    hdrs = ["simulated-event-loop.h"],
+    deps = [
+        ":event-loop",
+        "//aos:queues",
+    ],
 )
-
diff --git a/aos/events/event-loop.h b/aos/events/event-loop.h
index 57ff49b..0b336c3 100644
--- a/aos/events/event-loop.h
+++ b/aos/events/event-loop.h
@@ -107,6 +107,7 @@
   //
   // This will watch messages sent to path.
   // Note that T needs to match both send and recv side.
+  // TODO(parker): Need to support ::std::bind.  For now, use lambdas.
   template <typename Watch>
   void MakeWatcher(const std::string &path, Watch &&w);
 
diff --git a/aos/events/shm-event-loop.cc b/aos/events/shm-event-loop.cc
index 6231df2..b05aa84 100644
--- a/aos/events/shm-event-loop.cc
+++ b/aos/events/shm-event-loop.cc
@@ -218,6 +218,8 @@
 void ShmEventLoop::Run() {
   set_is_running(true);
   for (const auto &run : on_run_) run();
+  // TODO(austin): epoll event loop in main thread (if needed), and async safe
+  // quit handler.
   thread_state_->Run();
 }
 
diff --git a/y2014/control_loops/shooter/shooter_lib_test.cc b/y2014/control_loops/shooter/shooter_lib_test.cc
index 108502b..a5ac55c 100644
--- a/y2014/control_loops/shooter/shooter_lib_test.cc
+++ b/y2014/control_loops/shooter/shooter_lib_test.cc
@@ -359,6 +359,7 @@
   }
   // EXPECT_NEAR(0.0, shooter_motor_.GetPosition(), 0.01);
   double pos = shooter_motor_plant_.GetAbsolutePosition();
+  EXPECT_TRUE(shooter_queue_.goal.FetchLatest());
   EXPECT_NEAR(
       shooter_motor_.PowerToPosition(shooter_queue_.goal->shot_power),
       pos, 0.05);
@@ -400,6 +401,7 @@
   }
 
   double pos = shooter_motor_plant_.GetAbsolutePosition();
+  EXPECT_TRUE(shooter_queue_.goal.FetchLatest());
   EXPECT_NEAR(
       shooter_motor_.PowerToPosition(shooter_queue_.goal->shot_power),
       pos, 0.05);
@@ -438,7 +440,9 @@
   }
 
   double pos = shooter_motor_plant_.GetAbsolutePosition();
-  EXPECT_NEAR(shooter_motor_.PowerToPosition(shooter_queue_.goal->shot_power), pos, 0.05);
+  EXPECT_TRUE(shooter_queue_.goal.FetchLatest());
+  EXPECT_NEAR(shooter_motor_.PowerToPosition(shooter_queue_.goal->shot_power),
+              pos, 0.05);
   EXPECT_EQ(ShooterMotor::STATE_READY, shooter_motor_.state());
   EXPECT_TRUE(hit_fire);
 }
@@ -472,8 +476,8 @@
   EXPECT_EQ(ShooterMotor::STATE_READY, shooter_motor_.state());
   shooter_queue_.goal.MakeWithBuilder().shot_power(14.0).Send();
 
-  while (::aos::monotonic_clock::now() <
-         ::aos::monotonic_clock::time_point(chrono::seconds(1))) {
+  while (monotonic_clock::now() <
+         monotonic_clock::time_point(chrono::seconds(2))) {
     shooter_motor_plant_.SendPositionMessage();
     shooter_motor_.Iterate();
     shooter_motor_plant_.Simulate();
@@ -481,6 +485,7 @@
   }
 
   double pos = shooter_motor_plant_.GetAbsolutePosition();
+  EXPECT_TRUE(shooter_queue_.goal.FetchLatest());
   EXPECT_NEAR(
       shooter_motor_.PowerToPosition(shooter_queue_.goal->shot_power),
       pos, 0.05);
@@ -650,6 +655,7 @@
   }
   // EXPECT_NEAR(0.0, shooter_motor_.GetPosition(), 0.01);
   double pos = shooter_motor_plant_.GetAbsolutePosition();
+  EXPECT_TRUE(shooter_queue_.goal.FetchLatest());
   EXPECT_NEAR(
       shooter_motor_.PowerToPosition(shooter_queue_.goal->shot_power),
       pos, 0.05);
@@ -671,6 +677,7 @@
   }
   // EXPECT_NEAR(0.0, shooter_motor_.GetPosition(), 0.01);
   double pos = shooter_motor_plant_.GetAbsolutePosition();
+  EXPECT_TRUE(shooter_queue_.goal.FetchLatest());
   EXPECT_NEAR(
       shooter_motor_.PowerToPosition(shooter_queue_.goal->shot_power),
       pos, 0.05);
@@ -702,6 +709,7 @@
   }
   // EXPECT_NEAR(0.0, shooter_motor_.GetPosition(), 0.01);
   double pos = shooter_motor_plant_.GetAbsolutePosition();
+  EXPECT_TRUE(shooter_queue_.goal.FetchLatest());
   EXPECT_NEAR(
       shooter_motor_.PowerToPosition(shooter_queue_.goal->shot_power),
       pos, 0.05);
diff --git a/y2018/control_loops/superstructure/superstructure_lib_test.cc b/y2018/control_loops/superstructure/superstructure_lib_test.cc
index 0856a7d..0ad4d16 100644
--- a/y2018/control_loops/superstructure/superstructure_lib_test.cc
+++ b/y2018/control_loops/superstructure/superstructure_lib_test.cc
@@ -198,9 +198,9 @@
              constants::GetValues().arm_distal.zeroing),
         superstructure_queue_(".y2018.control_loops.superstructure",
                               ".y2018.control_loops.superstructure.goal",
-                              ".y2018.control_loops.superstructure.position",
                               ".y2018.control_loops.superstructure.output",
-                              ".y2018.control_loops.superstructure.status") {
+                              ".y2018.control_loops.superstructure.status",
+                              ".y2018.control_loops.superstructure.position") {
     // Start the intake out in the middle by default.
     InitializeIntakePosition((constants::Values::kIntakeRange().lower +
                               constants::Values::kIntakeRange().upper) /
@@ -245,8 +245,8 @@
 
   // Simulates the intake for a single timestep.
   void Simulate() {
-    EXPECT_TRUE(superstructure_queue_.output.FetchLatest());
-    EXPECT_TRUE(superstructure_queue_.status.FetchLatest());
+    ASSERT_TRUE(superstructure_queue_.output.FetchLatest());
+    ASSERT_TRUE(superstructure_queue_.status.FetchLatest());
 
     left_intake_.Simulate(superstructure_queue_.output->left_intake);
     right_intake_.Simulate(superstructure_queue_.output->right_intake);
@@ -270,9 +270,9 @@
   SuperstructureTest()
       : superstructure_queue_(".y2018.control_loops.superstructure",
                               ".y2018.control_loops.superstructure.goal",
-                              ".y2018.control_loops.superstructure.position",
                               ".y2018.control_loops.superstructure.output",
-                              ".y2018.control_loops.superstructure.status"),
+                              ".y2018.control_loops.superstructure.status",
+                              ".y2018.control_loops.superstructure.position"),
         superstructure_(&superstructure_queue_) {
     status_light.Clear();
     ::y2018::vision::vision_status.Clear();