Defer requeuing buffers in v4l2 reader

Enqueuing buffers takes a long time, so we want to move it out of the
way of images being sent.

Requeue the buffers in a worker thread that's running at a lower
priority.

Signed-off-by: Ravago Jones <ravagojones@gmail.com>
Change-Id: I3887bca9099916b1864ff88893b940d6a31f7edf
diff --git a/aos/util/BUILD b/aos/util/BUILD
index 48dc054..929b376 100644
--- a/aos/util/BUILD
+++ b/aos/util/BUILD
@@ -466,6 +466,28 @@
 )
 
 cc_library(
+    name = "threaded_consumer",
+    hdrs = [
+        "threaded_consumer.h",
+    ],
+    deps = [
+        "//aos:condition",
+        "//aos:realtime",
+        "//aos/containers:ring_buffer",
+        "//aos/mutex",
+    ],
+)
+
+cc_test(
+    name = "threaded_consumer_test",
+    srcs = ["threaded_consumer_test.cc"],
+    deps = [
+        ":threaded_consumer",
+        "//aos/testing:googletest",
+    ],
+)
+
+cc_library(
     name = "foxglove_websocket_lib",
     srcs = ["foxglove_websocket_lib.cc"],
     hdrs = ["foxglove_websocket_lib.h"],
diff --git a/aos/util/threaded_consumer.h b/aos/util/threaded_consumer.h
new file mode 100644
index 0000000..95ec79a
--- /dev/null
+++ b/aos/util/threaded_consumer.h
@@ -0,0 +1,102 @@
+#ifndef AOS_UTIL_THREADED_CONSUMER_H_
+#define AOS_UTIL_THREADED_CONSUMER_H_
+
+#include <functional>
+#include <optional>
+#include <thread>
+
+#include "aos/condition.h"
+#include "aos/containers/ring_buffer.h"
+#include "aos/mutex/mutex.h"
+#include "aos/realtime.h"
+
+namespace aos {
+namespace util {
+
+// This class implements a threadpool of a single worker that accepts work
+// from the main thread through a queue and executes it at a different realtime
+// priority.
+//
+// There is no mechanism to get data back to the main thread, the worker only
+// acts as a consumer. When this class is destroyed, it join()s the worker and
+// finishes all outstanding tasks.
+template <typename T, int QueueSize>
+class ThreadedConsumer {
+ public:
+  // Constructs a new ThreadedConsumer with the given consumer function to be
+  // run at the given realtime priority. If worker_priority is zero, the thread
+  // will stay at non realtime priority.
+  ThreadedConsumer(std::function<void(T)> consumer_function,
+                   int worker_priority)
+      : consumer_function_(consumer_function),
+        worker_priority_(worker_priority),
+        more_tasks_(&mutex_),
+        worker_thread_([this]() { WorkerFunction(); }) {}
+
+  ~ThreadedConsumer() {
+    {
+      aos::MutexLocker locker(&mutex_);
+      quit_ = true;
+      more_tasks_.Broadcast();
+    }
+    worker_thread_.join();
+  }
+
+  // Submits another task to be processed by the worker.
+  // Returns true if successfully pushed onto the queue, and false if the queue
+  // is full.
+  bool Push(T task) {
+    aos::MutexLocker locker(&mutex_);
+
+    if (task_queue_.full()) {
+      return false;
+    }
+
+    task_queue_.Push(task);
+    more_tasks_.Broadcast();
+
+    return true;
+  }
+
+ private:
+  void WorkerFunction() {
+    if (worker_priority_ > 0) {
+      aos::SetCurrentThreadRealtimePriority(worker_priority_);
+    }
+
+    while (true) {
+      std::optional<T> task;
+
+      {
+        aos::MutexLocker locker(&mutex_);
+        while (task_queue_.empty() && !quit_) {
+          CHECK(!more_tasks_.Wait());
+        }
+
+        if (task_queue_.empty() && quit_) break;
+
+        // Pop
+        task = std::move(task_queue_[0]);
+        task_queue_.Shift();
+      }
+
+      consumer_function_(*task);
+      task.reset();
+    }
+
+    aos::UnsetCurrentThreadRealtimePriority();
+  }
+
+  std::function<void(T)> consumer_function_;
+  aos::RingBuffer<T, QueueSize> task_queue_;
+  aos::Mutex mutex_;
+  bool quit_ = false;
+  int worker_priority_;
+  aos::Condition more_tasks_;
+  std::thread worker_thread_;
+};
+
+}  // namespace util
+}  // namespace aos
+
+#endif  // AOS_UTIL_THREADWORKER_H_
diff --git a/aos/util/threaded_consumer_test.cc b/aos/util/threaded_consumer_test.cc
new file mode 100644
index 0000000..f137108
--- /dev/null
+++ b/aos/util/threaded_consumer_test.cc
@@ -0,0 +1,144 @@
+#include "aos/util/threaded_consumer.h"
+
+#include "gtest/gtest.h"
+
+namespace aos {
+namespace util {
+
+// We expect it to be able to pass through everything we submit and recieves it
+// in the order that we submitted it. It should also be able to take in more
+// tasks than the size of the ring buffer as long as the worker doesn't get
+// behind.
+TEST(ThreadedConsumerTest, BasicFunction) {
+  std::atomic<int> counter{0};
+
+  ThreadedConsumer<int, 4> threaded_consumer(
+      [&counter](int task) {
+        LOG(INFO) << "task:" << task << " counter: " << counter;
+        counter = task;
+      },
+      0);
+
+  for (int number : {9, 7, 1, 3, 100, 300, 42}) {
+    EXPECT_TRUE(threaded_consumer.Push(number));
+
+    // wait
+    while (counter != number) {
+      std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+
+    EXPECT_EQ(counter, number);
+  }
+}
+
+// We should be able to raise the realtime priority of the worker thread, and
+// everything should work the same. It should also reset back to lower priority
+// when shutting down the worker thread.
+TEST(ThreadedConsumerTest, ElevatedPriority) {
+  std::atomic<int> counter{0};
+
+  {
+    ThreadedConsumer<int, 4> threaded_consumer(
+        [&counter](int task) {
+          CheckRealtime();
+          LOG(INFO) << "task:" << task << " counter: " << counter;
+          counter = task;
+        },
+        20);
+
+    for (int number : {9, 7, 1, 3, 100, 300, 42}) {
+      EXPECT_TRUE(threaded_consumer.Push(number));
+
+      // wait
+      while (counter != number) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+      }
+
+      EXPECT_EQ(counter, number);
+    }
+  }
+  // TODO: Check that the worker thread's priority actually gets reset before
+  // the thread is destroyed.
+
+  CheckNotRealtime();
+}
+
+// If the worker gets behind, we shouldn't silently take in more tasks and
+// destroy old ones.
+TEST(ThreadedConsumerTest, OverflowRingBuffer) {
+  std::atomic<int> counter{0};
+  std::atomic<int> should_block{true};
+
+  ThreadedConsumer<int, 4> threaded_consumer(
+      [&counter, &should_block](int task) {
+        LOG(INFO) << "task:" << task << " counter: " << counter;
+
+        counter = task;
+
+        // prevent it from making any progress to simulate it getting behind
+        while (should_block) {
+          std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        }
+      },
+      20);
+
+  // It consumes the 5 and then our worker blocks.
+  EXPECT_TRUE(threaded_consumer.Push(5));
+
+  // Wait for it to consume 5
+  while (counter != 5) {
+    std::this_thread::sleep_for(std::chrono::milliseconds(1));
+  };
+
+  // 4 more fills up the queue.
+  for (int number : {8, 9, 7, 1}) {
+    EXPECT_TRUE(threaded_consumer.Push(number));
+  }
+
+  // this one should overflow the buffer.
+  EXPECT_FALSE(threaded_consumer.Push(101));
+
+  // clean up, so we don't join() an infinite loop
+  should_block = false;
+}
+
+// The class should destruct gracefully and finish all of its work before
+// dissapearing.
+TEST(ThreadedConsumerTest, FinishesTasksOnQuit) {
+  std::atomic<int> counter{0};
+  std::atomic<int> should_block{true};
+
+  {
+    ThreadedConsumer<int, 4> threaded_consumer(
+        [&counter, &should_block](int task) {
+          LOG(INFO) << "task:" << task << " counter: " << counter;
+
+          counter = task;
+
+          // prevent it from making any progress to simulate it getting behind
+          while (should_block) {
+            std::this_thread::sleep_for(std::chrono::milliseconds(1));
+          }
+        },
+        20);
+
+    // Give it some work to do
+    for (int number : {8, 9, 7, 1}) {
+      EXPECT_TRUE(threaded_consumer.Push(number));
+    }
+
+    // Wait for it to consume the first number
+    while (counter != 8) {
+      std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    };
+
+    // allow it to continue
+    should_block = false;
+  }
+
+  // It should have finished all the work and gotten to the last number.
+  EXPECT_EQ(counter, 1);
+}
+
+}  // namespace util
+}  // namespace aos
diff --git a/frc971/vision/BUILD b/frc971/vision/BUILD
index c52afae..4dbd66c 100644
--- a/frc971/vision/BUILD
+++ b/frc971/vision/BUILD
@@ -76,6 +76,7 @@
         "//aos/events:epoll",
         "//aos/events:event_loop",
         "//aos/scoped:scoped_fd",
+        "//aos/util:threaded_consumer",
         "@com_github_google_glog//:glog",
         "@com_google_absl//absl/base",
     ],
diff --git a/frc971/vision/v4l2_reader.cc b/frc971/vision/v4l2_reader.cc
index a6bcb4d..7c0546f 100644
--- a/frc971/vision/v4l2_reader.cc
+++ b/frc971/vision/v4l2_reader.cc
@@ -85,18 +85,23 @@
 
   for (size_t i = 0; i < buffers_.size(); ++i) {
     buffers_[i].sender = event_loop_->MakeSender<CameraImage>("/camera");
-    EnqueueBuffer(i);
+    MarkBufferToBeEnqueued(i);
   }
   int type = multiplanar() ? V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE
                            : V4L2_BUF_TYPE_VIDEO_CAPTURE;
   PCHECK(Ioctl(VIDIOC_STREAMON, &type) == 0);
 }
 
+void V4L2ReaderBase::MarkBufferToBeEnqueued(int buffer_index) {
+  ReinitializeBuffer(buffer_index);
+  EnqueueBuffer(buffer_index);
+}
+
 void V4L2ReaderBase::MaybeEnqueue() {
   // First, enqueue any old buffer we already have. This is the one which
   // may have been sent.
   if (saved_buffer_) {
-    EnqueueBuffer(saved_buffer_.index);
+    MarkBufferToBeEnqueued(saved_buffer_.index);
     saved_buffer_.Clear();
   }
   ftrace_.FormatMessage("Enqueued previous buffer %d", saved_buffer_.index);
@@ -114,7 +119,7 @@
       // going.
       if (previous_buffer) {
         ftrace_.FormatMessage("Previous %d", previous_buffer.index);
-        EnqueueBuffer(previous_buffer.index);
+        MarkBufferToBeEnqueued(previous_buffer.index);
       }
       continue;
     }
@@ -133,7 +138,12 @@
   }
 }
 
-void V4L2ReaderBase::SendLatestImage() { buffers_[saved_buffer_.index].Send(); }
+void V4L2ReaderBase::SendLatestImage() {
+  buffers_[saved_buffer_.index].Send();
+
+  MarkBufferToBeEnqueued(saved_buffer_.index);
+  saved_buffer_.Clear();
+}
 
 void V4L2ReaderBase::SetExposure(size_t duration) {
   v4l2_control manual_control;
@@ -236,7 +246,8 @@
 
   CHECK_GE(buffer_number, 0);
   CHECK_LT(buffer_number, static_cast<int>(buffers_.size()));
-  buffers_[buffer_number].InitializeMessage(ImageSize());
+  CHECK(buffers_[buffer_number].data_pointer != nullptr);
+
   struct v4l2_buffer buffer;
   struct v4l2_plane planes[1];
   memset(&buffer, 0, sizeof(buffer));
@@ -315,16 +326,21 @@
                                        const std::string &image_sensor_subdev)
     : V4L2ReaderBase(event_loop, device_name),
       epoll_(epoll),
-      image_sensor_fd_(open(image_sensor_subdev.c_str(), O_RDWR | O_NONBLOCK)) {
+      image_sensor_fd_(open(image_sensor_subdev.c_str(), O_RDWR | O_NONBLOCK)),
+      buffer_requeuer_([this](int buffer) { EnqueueBuffer(buffer); }, 20) {
   PCHECK(image_sensor_fd_.get() != -1)
       << " Failed to open device " << device_name;
-
   StreamOn();
   epoll_->OnReadable(fd().get(), [this]() { OnImageReady(); });
 }
 
 RockchipV4L2Reader::~RockchipV4L2Reader() { epoll_->DeleteFd(fd().get()); }
 
+void RockchipV4L2Reader::MarkBufferToBeEnqueued(int buffer) {
+  ReinitializeBuffer(buffer);
+  buffer_requeuer_.Push(buffer);
+}
+
 void RockchipV4L2Reader::OnImageReady() {
   if (!ReadLatestImage()) {
     return;
diff --git a/frc971/vision/v4l2_reader.h b/frc971/vision/v4l2_reader.h
index 669c157..31c0888 100644
--- a/frc971/vision/v4l2_reader.h
+++ b/frc971/vision/v4l2_reader.h
@@ -5,10 +5,13 @@
 #include <string>
 
 #include "absl/types/span.h"
+#include "aos/containers/ring_buffer.h"
 #include "aos/events/epoll.h"
 #include "aos/events/event_loop.h"
 #include "aos/ftrace.h"
+#include "aos/realtime.h"
 #include "aos/scoped/scoped_fd.h"
+#include "aos/util/threaded_consumer.h"
 #include "frc971/vision/vision_generated.h"
 #include "glog/logging.h"
 
@@ -57,6 +60,23 @@
   void StreamOff();
   void StreamOn();
 
+  // Enqueues a buffer for v4l2 to stream into (expensive).
+  void EnqueueBuffer(int buffer_index);
+
+  // Initializations that need to happen in the main thread.
+  //
+  // Implementations of MarkBufferToBeEnqueued should call this before calling
+  // EnqueueBuffer.
+  void ReinitializeBuffer(int buffer_index) {
+    CHECK_GE(buffer_index, 0);
+    CHECK_LT(buffer_index, static_cast<int>(buffers_.size()));
+    buffers_[buffer_index].InitializeMessage(ImageSize());
+  }
+
+  // Submits a buffer to be enqueued later in a lower priority thread.
+  // Legacy V4L2Reader still does this in the main thread.
+  virtual void MarkBufferToBeEnqueued(int buffer_index);
+
   int Ioctl(unsigned long number, void *arg);
 
   bool multiplanar() const { return multiplanar_; }
@@ -70,9 +90,9 @@
 
   const aos::ScopedFD &fd() { return fd_; };
 
- private:
   static constexpr int kNumberBuffers = 4;
 
+ private:
   struct Buffer {
     void InitializeMessage(size_t max_image_size);
 
@@ -113,8 +133,6 @@
   // buffer, or BufferInfo() if there wasn't a frame to dequeue.
   BufferInfo DequeueBuffer();
 
-  void EnqueueBuffer(int buffer);
-
   // The mmaped V4L2 buffers.
   std::array<Buffer, kNumberBuffers> buffers_;
 
@@ -159,11 +177,15 @@
  private:
   void OnImageReady();
 
+  void MarkBufferToBeEnqueued(int buffer) override;
+
   int ImageSensorIoctl(unsigned long number, void *arg);
 
   aos::internal::EPoll *epoll_;
 
   aos::ScopedFD image_sensor_fd_;
+
+  aos::util::ThreadedConsumer<int, kNumberBuffers> buffer_requeuer_;
 };
 
 }  // namespace vision