Add utility for thread queue pusher in AOS
This adds a class that provides a queue where one thread (that does the
heavy lifting) will fill in a queue, and another, more
timing-critical, thread has to pop messages from the queue.
Change-Id: I51586528adc34dceddbd94549285900dc16d5bf4
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/util/threaded_queue.h b/aos/util/threaded_queue.h
new file mode 100644
index 0000000..0b06632
--- /dev/null
+++ b/aos/util/threaded_queue.h
@@ -0,0 +1,99 @@
+#ifndef AOS_UTIL_THREADED_QUEUE_H_
+#define AOS_UTIL_THREADED_QUEUE_H_
+#include <functional>
+#include <optional>
+#include <queue>
+#include <thread>
+
+#include "aos/condition.h"
+#include "aos/mutex/mutex.h"
+
+namespace aos::util {
+// This class implements a queue of objects of type T in which a worker thread
+// pushes and the calling thread pops from the queue.
+//
+// This is setup such that the user will pass a worker function that will get
+// called in a separate thread whenever we think that there might be something
+// to trigger more work to be done. All the methods on this calss are intended
+// to be called from the main thread (not from within the worker function).
+// Communication between the main thread and the worker can be achieved either
+// by manually handling your own state, or preferably by using the SharedState
+// object, which will get passed into the worker. The worker gets called every
+// time that SetState() is called.
+template <typename T, typename SharedState>
+class ThreadedQueue {
+ public:
+ // PushResult is returned from the worker to indicate its current state.
+ struct PushResult {
+ // The new item to push, if any. If set to nullopt, nothing gets pushed.
+ std::optional<T> item = std::nullopt;
+ // Set to true if the worker knows that there is more work that it has to do
+ // and so should be immediately called again. If set to false, then the
+ // worker will not get called again until one of the following events
+ // occurs:
+ // 1) The queue is successfully popped from.
+ // 2) SetState() is called.
+ bool more_to_push = false;
+ // Set to true if there is no more work to be done. The worker should not
+ // get called after setting done to true.
+ bool done = false;
+ };
+ ThreadedQueue(
+ std::function<PushResult(SharedState)> push_request_handler,
+ SharedState initial_state);
+ ~ThreadedQueue();
+ // Sets state. Triggers a new call to push_request_handler.
+ void SetState(const SharedState &state);
+ // Returns the current front of the queue, blocking until a new item is
+ // available. Will only return nullopt if done is true and there will be no
+ // more items in the queue.
+ std::optional<T> Peek();
+ // Identical to Peek(), except that it also removes the item from the queue.
+ std::optional<T> Pop();
+ // Waits until the push_request_handler has returned more_to_push = false, and
+ // so is just spinning. Useful if you want ensure that the worker has done all
+ // the work it can before going to the next step.
+ void WaitForNoMoreWork();
+ // Stops any further calls to push_request_handler. Used to terminate the
+ // queue from the calling thread.
+ void StopPushing();
+
+ private:
+ // Safely grabs the current state and returns a copy.
+ SharedState State();
+ // Implements the Peek()/Pop() methods, blocking until we are either done or
+ // the next item is available in the queue.
+ std::optional<T> PeekOrPop(bool pop);
+
+ // Mutex controlling access to all shared state (in this case, all the members
+ // of this class).
+ aos::Mutex mutex_;
+ // Condition variable used to indicate when anything has happened that should
+ // cause us to check for whether the worker can add anything new to the queue.
+ // Called "popped_" in reference to that it may be called when an item is
+ // popped from the queue.
+ aos::Condition popped_;
+ // Condition variable to indicate when an item has either been pushed to the
+ // queue or there is some other state change that consumers may care about
+ // (e.g., being done).
+ aos::Condition pushed_;
+ // TODO(jkuszmaul): Evaluate impact of dynamic memory allocation in
+ // std::queue, consider using aos::RingBuffer or similarly statically
+ // allocated buffer.
+ std::queue<T> queue_;
+ // Whether we are done processing entirely.
+ bool done_{false};
+ // Set while the pusher thread is waiting on popped_.
+ // Used to notice when the push handler is out of work to do.
+ bool pusher_waiting_{false};
+ // Set when SetState() is called, cleared when State() is read. Used to track
+ // whether the push handler has been called with the most recent state.
+ bool state_updated_{false};
+ SharedState state_;
+ std::thread pusher_thread_;
+};
+
+} // namespace aos::util
+
+#include "aos/util/threaded_queue_tmpl.h"
+#endif // AOS_UTIL_THREADED_QUEUE_H_