Kill camera_frames queue

Use EventLoop instead.

Change-Id: I39352fecbddd677a32de1a0e77ac11947bc76c6f
diff --git a/y2019/vision/server/server.cc b/y2019/vision/server/server.cc
index 4d00cdf..44a7c6f 100644
--- a/y2019/vision/server/server.cc
+++ b/y2019/vision/server/server.cc
@@ -134,126 +134,148 @@
 }
 
 void DataThread(seasocks::Server *server, WebsocketHandler *websocket_handler) {
-  auto &camera_frames = y2019::control_loops::drivetrain::camera_frames;
-  auto &drivetrain_status = frc971::control_loops::drivetrain_queue.status;
-  auto &superstructure_status =
-      ::y2019::control_loops::superstructure::superstructure_queue.status;
+  ::aos::ShmEventLoop event_loop;
 
-  std::array<LocalCameraFrame, 5> latest_frames;
-  std::array<aos::monotonic_clock::time_point, 5> last_target_time;
+  ::aos::Fetcher<::frc971::control_loops::DrivetrainQueue::Status>
+      drivetrain_status_fetcher =
+          event_loop
+              .MakeFetcher<::frc971::control_loops::DrivetrainQueue::Status>(
+                  ".frc971.control_loops.drivetrain_queue.status");
+
+  ::aos::Fetcher<
+      ::y2019::control_loops::superstructure::SuperstructureQueue::Status>
+      superstructure_status_fetcher = event_loop.MakeFetcher<
+          ::y2019::control_loops::superstructure::SuperstructureQueue::Status>(
+          ".y2019.control_loops.superstructure.superstructure_queue.status");
+
+  ::std::array<LocalCameraFrame, 5> latest_frames;
+  ::std::array<aos::monotonic_clock::time_point, 5> last_target_time;
   last_target_time.fill(aos::monotonic_clock::epoch());
   aos::monotonic_clock::time_point last_send_time = aos::monotonic_clock::now();
   DebugData debug_data;
   ::aos::RingBuffer<DrivetrainPosition, 200> drivetrain_log;
 
-  while (true) {
-    camera_frames.FetchNextBlocking();
-    while (drivetrain_status.FetchNext()) {
-      DrivetrainPosition drivetrain_position{
-          drivetrain_status->sent_time, drivetrain_status->x,
-          drivetrain_status->y, drivetrain_status->theta};
+  event_loop.MakeWatcher(
+      ".y2019.control_loops.drivetrain.camera_frames",
+      [websocket_handler, server, &latest_frames, &last_target_time,
+       &drivetrain_status_fetcher, &superstructure_status_fetcher,
+       &last_send_time, &drivetrain_log,
+       &debug_data](const ::y2019::control_loops::drivetrain::CameraFrame
+                        &camera_frames) {
+        while (drivetrain_status_fetcher.FetchNext()) {
+          DrivetrainPosition drivetrain_position{
+              drivetrain_status_fetcher->sent_time,
+              drivetrain_status_fetcher->x, drivetrain_status_fetcher->y,
+              drivetrain_status_fetcher->theta};
 
-      drivetrain_log.Push(drivetrain_position);
-    }
-    superstructure_status.FetchLatest();
-    if (!drivetrain_status.get() || !superstructure_status.get()) {
-      // Try again if we don't have any drivetrain statuses.
-      continue;
-    }
-    const auto now = aos::monotonic_clock::now();
-
-    {
-      const auto &new_frame = *camera_frames;
-      // TODO(james): Maybe we only want to fill out a new frame if it has
-      // targets or the saved target is > 0.1 sec old? Not sure, but for now
-      if (new_frame.camera < latest_frames.size()) {
-        latest_frames[new_frame.camera].capture_time =
-            aos::monotonic_clock::time_point(
-                chrono::nanoseconds(new_frame.timestamp));
-        latest_frames[new_frame.camera].targets.clear();
-        if (new_frame.num_targets > 0) {
-          last_target_time[new_frame.camera] =
-              latest_frames[new_frame.camera].capture_time;
+          drivetrain_log.Push(drivetrain_position);
         }
-        for (int target = 0; target < new_frame.num_targets; ++target) {
-          latest_frames[new_frame.camera].targets.emplace_back();
-          const float heading = new_frame.targets[target].heading;
-          const float distance = new_frame.targets[target].distance;
-          latest_frames[new_frame.camera].targets.back().x =
-              ::std::cos(heading) * distance;
-          latest_frames[new_frame.camera].targets.back().y =
-              ::std::sin(heading) * distance;
-          latest_frames[new_frame.camera].targets.back().theta =
-              new_frame.targets[target].skew;
+        superstructure_status_fetcher.Fetch();
+        if (!drivetrain_status_fetcher.get() ||
+            !superstructure_status_fetcher.get()) {
+          // Try again if we don't have any drivetrain statuses.
+          return;
         }
-      }
-    }
+        const auto now = aos::monotonic_clock::now();
 
-    for (size_t ii = 0; ii < latest_frames.size(); ++ii) {
-      CameraDebug *camera_debug = debug_data.add_camera_debug();
-      LocalCameraFrame cur_frame = latest_frames[ii];
-      constants::Values::CameraCalibration camera_info =
-          constants::GetValues().cameras[ii];
-      frc971::control_loops::TypedPose<double> camera_pose = camera_info.pose;
+        {
+          const auto &new_frame = camera_frames;
+          // TODO(james): Maybe we only want to fill out a new frame if it has
+          // targets or the saved target is > 0.1 sec old? Not sure, but for now
+          if (new_frame.camera < latest_frames.size()) {
+            latest_frames[new_frame.camera].capture_time =
+                aos::monotonic_clock::time_point(
+                    chrono::nanoseconds(new_frame.timestamp));
+            latest_frames[new_frame.camera].targets.clear();
+            if (new_frame.num_targets > 0) {
+              last_target_time[new_frame.camera] =
+                  latest_frames[new_frame.camera].capture_time;
+            }
+            for (int target = 0; target < new_frame.num_targets; ++target) {
+              latest_frames[new_frame.camera].targets.emplace_back();
+              const float heading = new_frame.targets[target].heading;
+              const float distance = new_frame.targets[target].distance;
+              latest_frames[new_frame.camera].targets.back().x =
+                  ::std::cos(heading) * distance;
+              latest_frames[new_frame.camera].targets.back().y =
+                  ::std::sin(heading) * distance;
+              latest_frames[new_frame.camera].targets.back().theta =
+                  new_frame.targets[target].skew;
+            }
+          }
+        }
 
-      const DrivetrainPosition robot_position = ComputePosition(
-          drivetrain_log, cur_frame.capture_time);
-      const ::frc971::control_loops::TypedPose<double> robot_pose(
-          {robot_position.x, robot_position.y, 0}, robot_position.theta);
+        for (size_t ii = 0; ii < latest_frames.size(); ++ii) {
+          CameraDebug *camera_debug = debug_data.add_camera_debug();
+          LocalCameraFrame cur_frame = latest_frames[ii];
+          constants::Values::CameraCalibration camera_info =
+              constants::GetValues().cameras[ii];
+          frc971::control_loops::TypedPose<double> camera_pose =
+              camera_info.pose;
 
-      camera_pose.set_base(&robot_pose);
+          const DrivetrainPosition robot_position =
+              ComputePosition(drivetrain_log, cur_frame.capture_time);
+          const ::frc971::control_loops::TypedPose<double> robot_pose(
+              {robot_position.x, robot_position.y, 0}, robot_position.theta);
 
-      camera_debug->set_current_frame_age(
-          chrono::duration_cast<chrono::duration<double>>(
-              now - cur_frame.capture_time)
-              .count());
-      camera_debug->set_time_since_last_target(
-          chrono::duration_cast<chrono::duration<double>>(now -
-                                                          last_target_time[ii])
-              .count());
-      for (const auto &target : cur_frame.targets) {
-        frc971::control_loops::TypedPose<double> target_pose(
-            &camera_pose, {target.x, target.y, 0}, target.theta);
-        Pose *pose = camera_debug->add_targets();
-        pose->set_x(target_pose.abs_pos().x());
-        pose->set_y(target_pose.abs_pos().y());
-        pose->set_theta(target_pose.abs_theta());
-      }
-    }
+          camera_pose.set_base(&robot_pose);
 
-    if (now > last_send_time + chrono::milliseconds(100)) {
-      last_send_time = now;
-      debug_data.mutable_robot_pose()->set_x(drivetrain_status->x);
-      debug_data.mutable_robot_pose()->set_y(drivetrain_status->y);
-      debug_data.mutable_robot_pose()->set_theta(drivetrain_status->theta);
-      {
-        LineFollowDebug *line_debug = debug_data.mutable_line_follow_debug();
-        line_debug->set_frozen(drivetrain_status->line_follow_logging.frozen);
-        line_debug->set_have_target(
-            drivetrain_status->line_follow_logging.have_target);
-        line_debug->mutable_goal_target()->set_x(
-            drivetrain_status->line_follow_logging.x);
-        line_debug->mutable_goal_target()->set_y(
-            drivetrain_status->line_follow_logging.y);
-        line_debug->mutable_goal_target()->set_theta(
-            drivetrain_status->line_follow_logging.theta);
-      }
+          camera_debug->set_current_frame_age(
+              chrono::duration_cast<chrono::duration<double>>(
+                  now - cur_frame.capture_time)
+                  .count());
+          camera_debug->set_time_since_last_target(
+              chrono::duration_cast<chrono::duration<double>>(
+                  now - last_target_time[ii])
+                  .count());
+          for (const auto &target : cur_frame.targets) {
+            frc971::control_loops::TypedPose<double> target_pose(
+                &camera_pose, {target.x, target.y, 0}, target.theta);
+            Pose *pose = camera_debug->add_targets();
+            pose->set_x(target_pose.abs_pos().x());
+            pose->set_y(target_pose.abs_pos().y());
+            pose->set_theta(target_pose.abs_theta());
+          }
+        }
 
-      Sensors *sensors = debug_data.mutable_sensors();
-      sensors->set_wrist(superstructure_status->wrist.position);
-      sensors->set_elevator(superstructure_status->elevator.position);
-      sensors->set_intake(superstructure_status->intake.position);
-      sensors->set_stilts(superstructure_status->stilts.position);
-      sensors->set_has_piece(superstructure_status->has_piece);
+        if (now > last_send_time + chrono::milliseconds(100)) {
+          last_send_time = now;
+          debug_data.mutable_robot_pose()->set_x(drivetrain_status_fetcher->x);
+          debug_data.mutable_robot_pose()->set_y(drivetrain_status_fetcher->y);
+          debug_data.mutable_robot_pose()->set_theta(
+              drivetrain_status_fetcher->theta);
+          {
+            LineFollowDebug *line_debug =
+                debug_data.mutable_line_follow_debug();
+            line_debug->set_frozen(
+                drivetrain_status_fetcher->line_follow_logging.frozen);
+            line_debug->set_have_target(
+                drivetrain_status_fetcher->line_follow_logging.have_target);
+            line_debug->mutable_goal_target()->set_x(
+                drivetrain_status_fetcher->line_follow_logging.x);
+            line_debug->mutable_goal_target()->set_y(
+                drivetrain_status_fetcher->line_follow_logging.y);
+            line_debug->mutable_goal_target()->set_theta(
+                drivetrain_status_fetcher->line_follow_logging.theta);
+          }
 
-      ::std::string json;
-      google::protobuf::util::MessageToJsonString(debug_data, &json);
-      server->execute(
-          std::make_shared<UpdateData>(websocket_handler, ::std::move(json)));
+          Sensors *sensors = debug_data.mutable_sensors();
+          sensors->set_wrist(superstructure_status_fetcher->wrist.position);
+          sensors->set_elevator(
+              superstructure_status_fetcher->elevator.position);
+          sensors->set_intake(superstructure_status_fetcher->intake.position);
+          sensors->set_stilts(superstructure_status_fetcher->stilts.position);
+          sensors->set_has_piece(superstructure_status_fetcher->has_piece);
 
-      debug_data.Clear();
-    }
-  }
+          ::std::string json;
+          google::protobuf::util::MessageToJsonString(debug_data, &json);
+          server->execute(std::make_shared<UpdateData>(websocket_handler,
+                                                       ::std::move(json)));
+
+          debug_data.Clear();
+        }
+      });
+  event_loop.Run();
 }
 
 }  // namespace vision