Add utility for thread queue pusher in AOS

This adds a class that provides a queue where one thread (that does the
heavy lifting) will fill in a queue, and another, more
timing-critical, thread has to pop messages from the queue.

Change-Id: I51586528adc34dceddbd94549285900dc16d5bf4
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/util/BUILD b/aos/util/BUILD
index 12e5ab7..40293c0 100644
--- a/aos/util/BUILD
+++ b/aos/util/BUILD
@@ -384,3 +384,24 @@
     visibility = ["//visibility:public"],
     deps = ["//aos:python_init"],
 )
+
+cc_library(
+    name = "threaded_queue",
+    hdrs = [
+        "threaded_queue.h",
+        "threaded_queue_tmpl.h",
+    ],
+    deps = [
+        "//aos:condition",
+        "//aos/mutex",
+    ],
+)
+
+cc_test(
+    name = "threaded_queue_test",
+    srcs = ["threaded_queue_test.cc"],
+    deps = [
+        ":threaded_queue",
+        "//aos/testing:googletest",
+    ],
+)
diff --git a/aos/util/threaded_queue.h b/aos/util/threaded_queue.h
new file mode 100644
index 0000000..0b06632
--- /dev/null
+++ b/aos/util/threaded_queue.h
@@ -0,0 +1,99 @@
+#ifndef AOS_UTIL_THREADED_QUEUE_H_
+#define AOS_UTIL_THREADED_QUEUE_H_
+#include <functional>
+#include <optional>
+#include <queue>
+#include <thread>
+
+#include "aos/condition.h"
+#include "aos/mutex/mutex.h"
+
+namespace aos::util {
+// This class implements a queue of objects of type T in which a worker thread
+// pushes and the calling thread pops from the queue.
+//
+// This is setup such that the user will pass a worker function that will get
+// called in a separate thread whenever we think that there might be something
+// to trigger more work to be done. All the methods on this calss are intended
+// to be called from the main thread (not from within the worker function).
+// Communication between the main thread and the worker can be achieved either
+// by manually handling your own state, or preferably by using the SharedState
+// object, which will get passed into the worker. The worker gets called every
+// time that SetState() is called.
+template <typename T, typename SharedState>
+class ThreadedQueue {
+ public:
+  // PushResult is returned from the worker to indicate its current state.
+  struct PushResult {
+    // The new item to push, if any. If set to nullopt, nothing gets pushed.
+    std::optional<T> item = std::nullopt;
+    // Set to true if the worker knows that there is more work that it has to do
+    // and so should be immediately called again. If set to false, then the
+    // worker will not get called again until one of the following events
+    // occurs:
+    // 1) The queue is successfully popped from.
+    // 2) SetState() is called.
+    bool more_to_push = false;
+    // Set to true if there is no more work to be done. The worker should not
+    // get called after setting done to true.
+    bool done = false;
+  };
+  ThreadedQueue(
+      std::function<PushResult(SharedState)> push_request_handler,
+      SharedState initial_state);
+  ~ThreadedQueue();
+  // Sets state. Triggers a new call to push_request_handler.
+  void SetState(const SharedState &state);
+  // Returns the current front of the queue, blocking until a new item is
+  // available. Will only return nullopt if done is true and there will be no
+  // more items in the queue.
+  std::optional<T> Peek();
+  // Identical to Peek(), except that it also removes the item from the queue.
+  std::optional<T> Pop();
+  // Waits until the push_request_handler has returned more_to_push = false, and
+  // so is just spinning. Useful if you want ensure that the worker has done all
+  // the work it can before going to the next step.
+  void WaitForNoMoreWork();
+  // Stops any further calls to push_request_handler. Used to terminate the
+  // queue from the calling thread.
+  void StopPushing();
+
+ private:
+  // Safely grabs the current state and returns a copy.
+  SharedState State();
+  // Implements the Peek()/Pop() methods, blocking until we are either done or
+  // the next item is available in the queue.
+  std::optional<T> PeekOrPop(bool pop);
+
+  // Mutex controlling access to all shared state (in this case, all the members
+  // of this class).
+  aos::Mutex mutex_;
+  // Condition variable used to indicate when anything has happened that should
+  // cause us to check for whether the worker can add anything new to the queue.
+  // Called "popped_" in reference to that it may be called when an item is
+  // popped from the queue.
+  aos::Condition popped_;
+  // Condition variable to indicate when an item has either been pushed to the
+  // queue or there is some other state change that consumers may care about
+  // (e.g., being done).
+  aos::Condition pushed_;
+  // TODO(jkuszmaul): Evaluate impact of dynamic memory allocation in
+  // std::queue, consider using aos::RingBuffer or similarly statically
+  // allocated buffer.
+  std::queue<T> queue_;
+  // Whether we are done processing entirely.
+  bool done_{false};
+  // Set while the pusher thread is waiting on popped_.
+  // Used to notice when the push handler is out of work to do.
+  bool pusher_waiting_{false};
+  // Set when SetState() is called, cleared when State() is read. Used to track
+  // whether the push handler has been called with the most recent state.
+  bool state_updated_{false};
+  SharedState state_;
+  std::thread pusher_thread_;
+};
+
+}  // namespace aos::util
+
+#include "aos/util/threaded_queue_tmpl.h"
+#endif  // AOS_UTIL_THREADED_QUEUE_H_
diff --git a/aos/util/threaded_queue_test.cc b/aos/util/threaded_queue_test.cc
new file mode 100644
index 0000000..4f27c57
--- /dev/null
+++ b/aos/util/threaded_queue_test.cc
@@ -0,0 +1,105 @@
+#include "aos/util/threaded_queue.h"
+
+#include "gtest/gtest.h"
+
+namespace aos::util {
+
+TEST(ThreadedQueueTest, BasicFunction) {
+  std::atomic<int> counter{10000};
+  int state = 0;
+  std::atomic<int> observed_state{0};
+  ThreadedQueue<int, int> queue(
+      [&counter, &observed_state](const int state) {
+        // Because this handler always returns more_to_push = false, it will
+        // only get called when the queue is popped from.
+        observed_state = state;
+        int count = --counter;
+        return ThreadedQueue<int, int>::PushResult{count, false, count == 0};
+      },
+      state);
+  while (true) {
+    std::optional<int> peek_result = queue.Peek();
+    std::optional<int> pop_result = queue.Pop();
+    ASSERT_EQ(peek_result.has_value(), pop_result.has_value());
+    if (peek_result.has_value()) {
+      ASSERT_EQ(peek_result.value(), pop_result.value());
+    } else {
+      break;
+    }
+    state++;
+    queue.SetState(state);
+  }
+  ASSERT_EQ(counter, 0);
+  // Our API doesn't make any guarantee about the push/pop cycle being kept in
+  // lock-step, so just check that the observed state got incremente at all.
+  ASSERT_LT(1, observed_state);
+  ASSERT_EQ(state, 10000);
+}
+
+// Test running a queue where the consumer wants to have X entries pre-loaded
+// and so communicates its current state back to the pusher.
+TEST(ThreadedQueueTest, StatefulQueue) {
+  std::atomic<int> counter{10000};
+  int state = counter;
+  constexpr int kMaxLookahead = 10;
+  std::atomic<int> observed_state{0};
+  ThreadedQueue<int, int> queue(
+      [&counter, &observed_state](const int state) {
+        observed_state = state;
+        if (counter + kMaxLookahead < state) {
+          return ThreadedQueue<int, int>::PushResult{std::nullopt, false,
+                                                     counter == 0};
+        } else {
+          int count = --counter;
+          return ThreadedQueue<int, int>::PushResult{count, true, count == 0};
+        }
+      },
+      state);
+  while (true) {
+    std::optional<int> peek_result = queue.Peek();
+    std::optional<int> pop_result = queue.Pop();
+    ASSERT_EQ(peek_result.has_value(), pop_result.has_value());
+    if (peek_result.has_value()) {
+      ASSERT_EQ(peek_result.value(), pop_result.value());
+    } else {
+      break;
+    }
+    for (int ii = 0; ii < 2 * kMaxLookahead; ++ii) {
+      // Trigger the internal condition variable a bunch of times to cause
+      // trouble.
+      queue.Peek();
+    }
+    // The pusher should never be more than the permissible distance ahead.
+    ASSERT_GE(counter + kMaxLookahead + 1, state);
+    ASSERT_GE(observed_state, state);
+    state--;
+    queue.SetState(state);
+    // Periodically pause, ensure that the pusher has enough time to catch up,
+    // and check that it has indeed pre-queued kMaxLookahead items.
+    if (state % 1000 == 0 && state > 0) {
+      queue.WaitForNoMoreWork();
+      ASSERT_EQ(observed_state, state);
+      ASSERT_EQ(counter + kMaxLookahead + 1, state);
+    }
+  }
+  ASSERT_EQ(counter, 0);
+  ASSERT_EQ(state, 0);
+}
+
+
+// Tests that we can exit early without any issues.
+TEST(ThreadedQueueTest, ExitEarly) {
+  // There used to exist a deadlock in this case where StopPushing would
+  // improperly synchronize things internally, but required very (un)lucky
+  // timing to hit.
+  for (int ii = 0; ii < 10000; ++ii) {
+    ThreadedQueue<int, int> queue(
+        [](int) {
+          return ThreadedQueue<int, int>::PushResult{971, false, false};
+        },
+        0);
+    queue.StopPushing();
+  }
+}
+
+}  // namespace aos::util
diff --git a/aos/util/threaded_queue_tmpl.h b/aos/util/threaded_queue_tmpl.h
new file mode 100644
index 0000000..cb4cdfc
--- /dev/null
+++ b/aos/util/threaded_queue_tmpl.h
@@ -0,0 +1,99 @@
+namespace aos::util {
+template <typename T, typename SharedState>
+ThreadedQueue<T, SharedState>::ThreadedQueue(
+    std::function<PushResult(SharedState)> push_request_handler,
+    SharedState initial_state)
+    : popped_(&mutex_),
+      pushed_(&mutex_),
+      state_(initial_state),
+      pusher_thread_([this, push_request_handler]() {
+        while (true) {
+          PushResult result = push_request_handler(State());
+          {
+            MutexLocker locker(&mutex_);
+            done_ = done_ || result.done;
+            if (result.item.has_value()) {
+              queue_.push(std::move(result.item.value()));
+            }
+            pushed_.Broadcast();
+            if (done_) {
+              return;
+            }
+            if (result.more_to_push || state_updated_) {
+              continue;
+            } else {
+              pusher_waiting_ = true;
+              CHECK(!popped_.Wait());
+              pusher_waiting_ = false;
+            }
+          }
+        }
+      }) {}
+
+template <typename T, typename SharedState>
+ThreadedQueue<T, SharedState>::~ThreadedQueue() {
+  StopPushing();
+  pusher_thread_.join();
+}
+
+template <typename T, typename SharedState>
+void ThreadedQueue<T, SharedState>::WaitForNoMoreWork() {
+  MutexLocker locker(&mutex_);
+  while (state_updated_ || (!pusher_waiting_ && !done_)) {
+    CHECK(!pushed_.Wait());
+  }
+}
+
+template <typename T, typename SharedState>
+SharedState ThreadedQueue<T, SharedState>::State() {
+  MutexLocker locker(&mutex_);
+  state_updated_ = false;
+  return state_;
+}
+
+template <typename T, typename SharedState>
+void ThreadedQueue<T, SharedState>::SetState(const SharedState &state) {
+  MutexLocker locker(&mutex_);
+  state_ = state;
+  state_updated_ = true;
+  popped_.Broadcast();
+}
+
+template <typename T, typename SharedState>
+void ThreadedQueue<T, SharedState>::StopPushing() {
+  // Ensure that the mutex is locked before doing anything, to make sure that
+  // the pushing thread actually observes the change.
+  MutexLocker locker(&mutex_);
+  done_ = true;
+  popped_.Broadcast();
+}
+
+template <typename T, typename SharedState>
+std::optional<T> ThreadedQueue<T, SharedState>::Peek() {
+  return PeekOrPop(false);
+}
+
+template <typename T, typename SharedState>
+std::optional<T> ThreadedQueue<T, SharedState>::Pop() {
+  return PeekOrPop(true);
+}
+
+template <typename T, typename SharedState>
+std::optional<T> ThreadedQueue<T, SharedState>::PeekOrPop(bool pop) {
+  MutexLocker locker(&mutex_);
+  while (!done_ && queue_.empty()) {
+    CHECK(!pushed_.Wait());
+  }
+  if (queue_.empty()) {
+    return std::nullopt;
+  }
+  if (pop) {
+    T result = std::move(queue_.front());
+    queue_.pop();
+    popped_.Broadcast();
+    return result;
+  } else {
+    return queue_.front();
+  }
+}
+}  // namespace aos::util