Shard image thresholding

It's down to ~9ms per frame now.

Change-Id: If5b0b51105d3d9e8a2435b1f077e46eeb9f3e94a
diff --git a/aos/vision/blob/threshold.cc b/aos/vision/blob/threshold.cc
index 36dcafe..46221b5 100644
--- a/aos/vision/blob/threshold.cc
+++ b/aos/vision/blob/threshold.cc
@@ -30,6 +30,7 @@
       // The per-channel (YUYV) values in the current chunk.
       uint8_t chunk_channels[2 * kChunkSize];
       memcpy(&chunk_channels[0], current_row + x * kChunkSize * 2, 2 * kChunkSize);
+      __builtin_prefetch(current_row + (x + 1) * kChunkSize * 2);
 
       for (int i = 0; i < kChunkSize; ++i) {
         if ((chunk_channels[i * 2] > value) != in_range) {
@@ -51,5 +52,74 @@
   return RangeImage(0, std::move(result));
 }
 
+FastYuyvYPooledThresholder::FastYuyvYPooledThresholder() {
+  states_.fill(ThreadState::kWaitingForInputData);
+  for (int i = 0; i < kThreads; ++i) {
+    threads_[i] = std::thread([this, i]() { RunThread(i); });
+  }
+}
+
+FastYuyvYPooledThresholder::~FastYuyvYPooledThresholder() {
+  {
+    std::unique_lock<std::mutex> locker(mutex_);
+    quit_ = true;
+    condition_variable_.notify_all();
+  }
+  for (int i = 0; i < kThreads; ++i) {
+    threads_[i].join();
+  }
+}
+
+RangeImage FastYuyvYPooledThresholder::Threshold(ImageFormat fmt,
+                                                 const char *data,
+                                                 uint8_t value) {
+  input_format_ = fmt;
+  input_data_ = data;
+  input_value_ = value;
+  {
+    std::unique_lock<std::mutex> locker(mutex_);
+    for (int i = 0; i < kThreads; ++i) {
+      states_[i] = ThreadState::kProcessing;
+    }
+    condition_variable_.notify_all();
+    while (!AllThreadsDone()) {
+      condition_variable_.wait(locker);
+    }
+  }
+  std::vector<std::vector<ImageRange>> result;
+  result.reserve(fmt.h);
+  for (int i = 0; i < kThreads; ++i) {
+    result.insert(result.end(), outputs_[i].begin(), outputs_[i].end());
+  }
+  return RangeImage(0, std::move(result));
+}
+
+void FastYuyvYPooledThresholder::RunThread(int i) {
+  while (true) {
+    {
+      std::unique_lock<std::mutex> locker(mutex_);
+      while (states_[i] == ThreadState::kWaitingForInputData) {
+        if (quit_) {
+          return;
+        }
+        condition_variable_.wait(locker);
+      }
+    }
+
+    ImageFormat shard_format = input_format_;
+    CHECK_EQ(shard_format.h % kThreads, 0);
+    shard_format.h /= kThreads;
+
+    outputs_[i] = FastYuyvYThreshold(
+        shard_format, input_data_ + shard_format.w * 2 * shard_format.h * i,
+        input_value_);
+    {
+      std::unique_lock<std::mutex> locker(mutex_);
+      states_[i] = ThreadState::kWaitingForInputData;
+      condition_variable_.notify_all();
+    }
+  }
+}
+
 }  // namespace vision
 }  // namespace aos