Add utility for thread queue pusher in AOS
This adds a class that provides a queue where one thread (that does the
heavy lifting) will fill in a queue, and another, more
timing-critical, thread has to pop messages from the queue.
Change-Id: I51586528adc34dceddbd94549285900dc16d5bf4
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/util/BUILD b/aos/util/BUILD
index 12e5ab7..40293c0 100644
--- a/aos/util/BUILD
+++ b/aos/util/BUILD
@@ -384,3 +384,24 @@
visibility = ["//visibility:public"],
deps = ["//aos:python_init"],
)
+
+cc_library(
+ name = "threaded_queue",
+ hdrs = [
+ "threaded_queue.h",
+ "threaded_queue_tmpl.h",
+ ],
+ deps = [
+ "//aos:condition",
+ "//aos/mutex",
+ ],
+)
+
+cc_test(
+ name = "threaded_queue_test",
+ srcs = ["threaded_queue_test.cc"],
+ deps = [
+ ":threaded_queue",
+ "//aos/testing:googletest",
+ ],
+)
diff --git a/aos/util/threaded_queue.h b/aos/util/threaded_queue.h
new file mode 100644
index 0000000..0b06632
--- /dev/null
+++ b/aos/util/threaded_queue.h
@@ -0,0 +1,99 @@
+#ifndef AOS_UTIL_THREADED_QUEUE_H_
+#define AOS_UTIL_THREADED_QUEUE_H_
+#include <functional>
+#include <optional>
+#include <queue>
+#include <thread>
+
+#include "aos/condition.h"
+#include "aos/mutex/mutex.h"
+
+namespace aos::util {
+// This class implements a queue of objects of type T in which a worker thread
+// pushes and the calling thread pops from the queue.
+//
+// This is setup such that the user will pass a worker function that will get
+// called in a separate thread whenever we think that there might be something
+// to trigger more work to be done. All the methods on this calss are intended
+// to be called from the main thread (not from within the worker function).
+// Communication between the main thread and the worker can be achieved either
+// by manually handling your own state, or preferably by using the SharedState
+// object, which will get passed into the worker. The worker gets called every
+// time that SetState() is called.
+template <typename T, typename SharedState>
+class ThreadedQueue {
+ public:
+ // PushResult is returned from the worker to indicate its current state.
+ struct PushResult {
+ // The new item to push, if any. If set to nullopt, nothing gets pushed.
+ std::optional<T> item = std::nullopt;
+ // Set to true if the worker knows that there is more work that it has to do
+ // and so should be immediately called again. If set to false, then the
+ // worker will not get called again until one of the following events
+ // occurs:
+ // 1) The queue is successfully popped from.
+ // 2) SetState() is called.
+ bool more_to_push = false;
+ // Set to true if there is no more work to be done. The worker should not
+ // get called after setting done to true.
+ bool done = false;
+ };
+ ThreadedQueue(
+ std::function<PushResult(SharedState)> push_request_handler,
+ SharedState initial_state);
+ ~ThreadedQueue();
+ // Sets state. Triggers a new call to push_request_handler.
+ void SetState(const SharedState &state);
+ // Returns the current front of the queue, blocking until a new item is
+ // available. Will only return nullopt if done is true and there will be no
+ // more items in the queue.
+ std::optional<T> Peek();
+ // Identical to Peek(), except that it also removes the item from the queue.
+ std::optional<T> Pop();
+ // Waits until the push_request_handler has returned more_to_push = false, and
+ // so is just spinning. Useful if you want ensure that the worker has done all
+ // the work it can before going to the next step.
+ void WaitForNoMoreWork();
+ // Stops any further calls to push_request_handler. Used to terminate the
+ // queue from the calling thread.
+ void StopPushing();
+
+ private:
+ // Safely grabs the current state and returns a copy.
+ SharedState State();
+ // Implements the Peek()/Pop() methods, blocking until we are either done or
+ // the next item is available in the queue.
+ std::optional<T> PeekOrPop(bool pop);
+
+ // Mutex controlling access to all shared state (in this case, all the members
+ // of this class).
+ aos::Mutex mutex_;
+ // Condition variable used to indicate when anything has happened that should
+ // cause us to check for whether the worker can add anything new to the queue.
+ // Called "popped_" in reference to that it may be called when an item is
+ // popped from the queue.
+ aos::Condition popped_;
+ // Condition variable to indicate when an item has either been pushed to the
+ // queue or there is some other state change that consumers may care about
+ // (e.g., being done).
+ aos::Condition pushed_;
+ // TODO(jkuszmaul): Evaluate impact of dynamic memory allocation in
+ // std::queue, consider using aos::RingBuffer or similarly statically
+ // allocated buffer.
+ std::queue<T> queue_;
+ // Whether we are done processing entirely.
+ bool done_{false};
+ // Set while the pusher thread is waiting on popped_.
+ // Used to notice when the push handler is out of work to do.
+ bool pusher_waiting_{false};
+ // Set when SetState() is called, cleared when State() is read. Used to track
+ // whether the push handler has been called with the most recent state.
+ bool state_updated_{false};
+ SharedState state_;
+ std::thread pusher_thread_;
+};
+
+} // namespace aos::util
+
+#include "aos/util/threaded_queue_tmpl.h"
+#endif // AOS_UTIL_THREADED_QUEUE_H_
diff --git a/aos/util/threaded_queue_test.cc b/aos/util/threaded_queue_test.cc
new file mode 100644
index 0000000..4f27c57
--- /dev/null
+++ b/aos/util/threaded_queue_test.cc
@@ -0,0 +1,105 @@
+#include "aos/util/threaded_queue.h"
+
+#include "gtest/gtest.h"
+
+namespace aos::util {
+
+TEST(ThreadedQueueTest, BasicFunction) {
+ std::atomic<int> counter{10000};
+ int state = 0;
+ std::atomic<int> observed_state{0};
+ ThreadedQueue<int, int> queue(
+ [&counter, &observed_state](const int state) {
+ // Because this handler always returns more_to_push = false, it will
+ // only get called when the queue is popped from.
+ observed_state = state;
+ int count = --counter;
+ return ThreadedQueue<int, int>::PushResult{count, false, count == 0};
+ },
+ state);
+ while (true) {
+ std::optional<int> peek_result = queue.Peek();
+ std::optional<int> pop_result = queue.Pop();
+ ASSERT_EQ(peek_result.has_value(), pop_result.has_value());
+ if (peek_result.has_value()) {
+ ASSERT_EQ(peek_result.value(), pop_result.value());
+ } else {
+ break;
+ }
+ state++;
+ queue.SetState(state);
+ }
+ ASSERT_EQ(counter, 0);
+ // Our API doesn't make any guarantee about the push/pop cycle being kept in
+ // lock-step, so just check that the observed state got incremente at all.
+ ASSERT_LT(1, observed_state);
+ ASSERT_EQ(state, 10000);
+}
+
+// Test running a queue where the consumer wants to have X entries pre-loaded
+// and so communicates its current state back to the pusher.
+TEST(ThreadedQueueTest, StatefulQueue) {
+ std::atomic<int> counter{10000};
+ int state = counter;
+ constexpr int kMaxLookahead = 10;
+ std::atomic<int> observed_state{0};
+ ThreadedQueue<int, int> queue(
+ [&counter, &observed_state](const int state) {
+ observed_state = state;
+ if (counter + kMaxLookahead < state) {
+ return ThreadedQueue<int, int>::PushResult{std::nullopt, false,
+ counter == 0};
+ } else {
+ int count = --counter;
+ return ThreadedQueue<int, int>::PushResult{count, true, count == 0};
+ }
+ },
+ state);
+ while (true) {
+ std::optional<int> peek_result = queue.Peek();
+ std::optional<int> pop_result = queue.Pop();
+ ASSERT_EQ(peek_result.has_value(), pop_result.has_value());
+ if (peek_result.has_value()) {
+ ASSERT_EQ(peek_result.value(), pop_result.value());
+ } else {
+ break;
+ }
+ for (int ii = 0; ii < 2 * kMaxLookahead; ++ii) {
+ // Trigger the internal condition variable a bunch of times to cause
+ // trouble.
+ queue.Peek();
+ }
+ // The pusher should never be more than the permissible distance ahead.
+ ASSERT_GE(counter + kMaxLookahead + 1, state);
+ ASSERT_GE(observed_state, state);
+ state--;
+ queue.SetState(state);
+ // Periodically pause, ensure that the pusher has enough time to catch up,
+ // and check that it has indeed pre-queued kMaxLookahead items.
+ if (state % 1000 == 0 && state > 0) {
+ queue.WaitForNoMoreWork();
+ ASSERT_EQ(observed_state, state);
+ ASSERT_EQ(counter + kMaxLookahead + 1, state);
+ }
+ }
+ ASSERT_EQ(counter, 0);
+ ASSERT_EQ(state, 0);
+}
+
+
+// Tests that we can exit early without any issues.
+TEST(ThreadedQueueTest, ExitEarly) {
+ // There used to exist a deadlock in this case where StopPushing would
+ // improperly synchronize things internally, but required very (un)lucky
+ // timing to hit.
+ for (int ii = 0; ii < 10000; ++ii) {
+ ThreadedQueue<int, int> queue(
+ [](int) {
+ return ThreadedQueue<int, int>::PushResult{971, false, false};
+ },
+ 0);
+ queue.StopPushing();
+ }
+}
+
+} // namespace aos::util
diff --git a/aos/util/threaded_queue_tmpl.h b/aos/util/threaded_queue_tmpl.h
new file mode 100644
index 0000000..cb4cdfc
--- /dev/null
+++ b/aos/util/threaded_queue_tmpl.h
@@ -0,0 +1,99 @@
+namespace aos::util {
+template <typename T, typename SharedState>
+ThreadedQueue<T, SharedState>::ThreadedQueue(
+ std::function<PushResult(SharedState)> push_request_handler,
+ SharedState initial_state)
+ : popped_(&mutex_),
+ pushed_(&mutex_),
+ state_(initial_state),
+ pusher_thread_([this, push_request_handler]() {
+ while (true) {
+ PushResult result = push_request_handler(State());
+ {
+ MutexLocker locker(&mutex_);
+ done_ = done_ || result.done;
+ if (result.item.has_value()) {
+ queue_.push(std::move(result.item.value()));
+ }
+ pushed_.Broadcast();
+ if (done_) {
+ return;
+ }
+ if (result.more_to_push || state_updated_) {
+ continue;
+ } else {
+ pusher_waiting_ = true;
+ CHECK(!popped_.Wait());
+ pusher_waiting_ = false;
+ }
+ }
+ }
+ }) {}
+
+template <typename T, typename SharedState>
+ThreadedQueue<T, SharedState>::~ThreadedQueue() {
+ StopPushing();
+ pusher_thread_.join();
+}
+
+template <typename T, typename SharedState>
+void ThreadedQueue<T, SharedState>::WaitForNoMoreWork() {
+ MutexLocker locker(&mutex_);
+ while (state_updated_ || (!pusher_waiting_ && !done_)) {
+ CHECK(!pushed_.Wait());
+ }
+}
+
+template <typename T, typename SharedState>
+SharedState ThreadedQueue<T, SharedState>::State() {
+ MutexLocker locker(&mutex_);
+ state_updated_ = false;
+ return state_;
+}
+
+template <typename T, typename SharedState>
+void ThreadedQueue<T, SharedState>::SetState(const SharedState &state) {
+ MutexLocker locker(&mutex_);
+ state_ = state;
+ state_updated_ = true;
+ popped_.Broadcast();
+}
+
+template <typename T, typename SharedState>
+void ThreadedQueue<T, SharedState>::StopPushing() {
+ // Ensure that the mutex is locked before doing anything, to make sure that
+ // the pushing thread actually observes the change.
+ MutexLocker locker(&mutex_);
+ done_ = true;
+ popped_.Broadcast();
+}
+
+template <typename T, typename SharedState>
+std::optional<T> ThreadedQueue<T, SharedState>::Peek() {
+ return PeekOrPop(false);
+}
+
+template <typename T, typename SharedState>
+std::optional<T> ThreadedQueue<T, SharedState>::Pop() {
+ return PeekOrPop(true);
+}
+
+template <typename T, typename SharedState>
+std::optional<T> ThreadedQueue<T, SharedState>::PeekOrPop(bool pop) {
+ MutexLocker locker(&mutex_);
+ while (!done_ && queue_.empty()) {
+ CHECK(!pushed_.Wait());
+ }
+ if (queue_.empty()) {
+ return std::nullopt;
+ }
+ if (pop) {
+ T result = std::move(queue_.front());
+ queue_.pop();
+ popped_.Broadcast();
+ return result;
+ } else {
+ return queue_.front();
+ }
+}
+} // namespace aos::util