Add utility for thread queue pusher in AOS

This adds a class that provides a queue where one thread (that does the
heavy lifting) will fill in a queue, and another, more
timing-critical, thread has to pop messages from the queue.

Change-Id: I51586528adc34dceddbd94549285900dc16d5bf4
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/util/threaded_queue_tmpl.h b/aos/util/threaded_queue_tmpl.h
new file mode 100644
index 0000000..cb4cdfc
--- /dev/null
+++ b/aos/util/threaded_queue_tmpl.h
@@ -0,0 +1,99 @@
+namespace aos::util {
+template <typename T, typename SharedState>
+ThreadedQueue<T, SharedState>::ThreadedQueue(
+    std::function<PushResult(SharedState)> push_request_handler,
+    SharedState initial_state)
+    : popped_(&mutex_),
+      pushed_(&mutex_),
+      state_(initial_state),
+      pusher_thread_([this, push_request_handler]() {
+        while (true) {
+          PushResult result = push_request_handler(State());
+          {
+            MutexLocker locker(&mutex_);
+            done_ = done_ || result.done;
+            if (result.item.has_value()) {
+              queue_.push(std::move(result.item.value()));
+            }
+            pushed_.Broadcast();
+            if (done_) {
+              return;
+            }
+            if (result.more_to_push || state_updated_) {
+              continue;
+            } else {
+              pusher_waiting_ = true;
+              CHECK(!popped_.Wait());
+              pusher_waiting_ = false;
+            }
+          }
+        }
+      }) {}
+
+template <typename T, typename SharedState>
+ThreadedQueue<T, SharedState>::~ThreadedQueue() {
+  StopPushing();
+  pusher_thread_.join();
+}
+
+template <typename T, typename SharedState>
+void ThreadedQueue<T, SharedState>::WaitForNoMoreWork() {
+  MutexLocker locker(&mutex_);
+  while (state_updated_ || (!pusher_waiting_ && !done_)) {
+    CHECK(!pushed_.Wait());
+  }
+}
+
+template <typename T, typename SharedState>
+SharedState ThreadedQueue<T, SharedState>::State() {
+  MutexLocker locker(&mutex_);
+  state_updated_ = false;
+  return state_;
+}
+
+template <typename T, typename SharedState>
+void ThreadedQueue<T, SharedState>::SetState(const SharedState &state) {
+  MutexLocker locker(&mutex_);
+  state_ = state;
+  state_updated_ = true;
+  popped_.Broadcast();
+}
+
+template <typename T, typename SharedState>
+void ThreadedQueue<T, SharedState>::StopPushing() {
+  // Ensure that the mutex is locked before doing anything, to make sure that
+  // the pushing thread actually observes the change.
+  MutexLocker locker(&mutex_);
+  done_ = true;
+  popped_.Broadcast();
+}
+
+template <typename T, typename SharedState>
+std::optional<T> ThreadedQueue<T, SharedState>::Peek() {
+  return PeekOrPop(false);
+}
+
+template <typename T, typename SharedState>
+std::optional<T> ThreadedQueue<T, SharedState>::Pop() {
+  return PeekOrPop(true);
+}
+
+template <typename T, typename SharedState>
+std::optional<T> ThreadedQueue<T, SharedState>::PeekOrPop(bool pop) {
+  MutexLocker locker(&mutex_);
+  while (!done_ && queue_.empty()) {
+    CHECK(!pushed_.Wait());
+  }
+  if (queue_.empty()) {
+    return std::nullopt;
+  }
+  if (pop) {
+    T result = std::move(queue_.front());
+    queue_.pop();
+    popped_.Broadcast();
+    return result;
+  } else {
+    return queue_.front();
+  }
+}
+}  // namespace aos::util