blob: 1bbde48957ed3d7e81878e5249538d820e851439 [file] [log] [blame]
James Kuszmaule5deaa42022-06-24 16:29:31 -07001#ifndef AOS_UTIL_THREADED_QUEUE_H_
2#define AOS_UTIL_THREADED_QUEUE_H_
3#include <functional>
4#include <optional>
5#include <queue>
6#include <thread>
7
8#include "aos/condition.h"
9#include "aos/mutex/mutex.h"
10
11namespace aos::util {
12// This class implements a queue of objects of type T in which a worker thread
13// pushes and the calling thread pops from the queue.
14//
15// This is setup such that the user will pass a worker function that will get
16// called in a separate thread whenever we think that there might be something
17// to trigger more work to be done. All the methods on this calss are intended
18// to be called from the main thread (not from within the worker function).
19// Communication between the main thread and the worker can be achieved either
20// by manually handling your own state, or preferably by using the SharedState
21// object, which will get passed into the worker. The worker gets called every
22// time that SetState() is called.
23template <typename T, typename SharedState>
24class ThreadedQueue {
25 public:
26 // PushResult is returned from the worker to indicate its current state.
27 struct PushResult {
28 // The new item to push, if any. If set to nullopt, nothing gets pushed.
29 std::optional<T> item = std::nullopt;
30 // Set to true if the worker knows that there is more work that it has to do
31 // and so should be immediately called again. If set to false, then the
32 // worker will not get called again until one of the following events
33 // occurs:
34 // 1) The queue is successfully popped from.
35 // 2) SetState() is called.
36 bool more_to_push = false;
37 // Set to true if there is no more work to be done. The worker should not
38 // get called after setting done to true.
39 bool done = false;
40 };
Philipp Schrader790cb542023-07-05 21:06:52 -070041 ThreadedQueue(std::function<PushResult(SharedState)> push_request_handler,
42 SharedState initial_state);
James Kuszmaule5deaa42022-06-24 16:29:31 -070043 ~ThreadedQueue();
44 // Sets state. Triggers a new call to push_request_handler.
45 void SetState(const SharedState &state);
46 // Returns the current front of the queue, blocking until a new item is
47 // available. Will only return nullopt if done is true and there will be no
48 // more items in the queue.
49 std::optional<T> Peek();
50 // Identical to Peek(), except that it also removes the item from the queue.
51 std::optional<T> Pop();
52 // Waits until the push_request_handler has returned more_to_push = false, and
53 // so is just spinning. Useful if you want ensure that the worker has done all
54 // the work it can before going to the next step.
55 void WaitForNoMoreWork();
56 // Stops any further calls to push_request_handler. Used to terminate the
57 // queue from the calling thread.
58 void StopPushing();
59
60 private:
61 // Safely grabs the current state and returns a copy.
62 SharedState State();
63 // Implements the Peek()/Pop() methods, blocking until we are either done or
64 // the next item is available in the queue.
65 std::optional<T> PeekOrPop(bool pop);
66
67 // Mutex controlling access to all shared state (in this case, all the members
68 // of this class).
69 aos::Mutex mutex_;
70 // Condition variable used to indicate when anything has happened that should
71 // cause us to check for whether the worker can add anything new to the queue.
72 // Called "popped_" in reference to that it may be called when an item is
73 // popped from the queue.
74 aos::Condition popped_;
75 // Condition variable to indicate when an item has either been pushed to the
76 // queue or there is some other state change that consumers may care about
77 // (e.g., being done).
78 aos::Condition pushed_;
79 // TODO(jkuszmaul): Evaluate impact of dynamic memory allocation in
80 // std::queue, consider using aos::RingBuffer or similarly statically
81 // allocated buffer.
82 std::queue<T> queue_;
83 // Whether we are done processing entirely.
84 bool done_{false};
85 // Set while the pusher thread is waiting on popped_.
86 // Used to notice when the push handler is out of work to do.
87 bool pusher_waiting_{false};
88 // Set when SetState() is called, cleared when State() is read. Used to track
89 // whether the push handler has been called with the most recent state.
90 bool state_updated_{false};
91 SharedState state_;
92 std::thread pusher_thread_;
93};
94
95} // namespace aos::util
96
Stephan Pleinesb1177672024-05-27 17:48:32 -070097#include "aos/util/threaded_queue_tmpl.h" // IWYU pragma: keep
98#endif // AOS_UTIL_THREADED_QUEUE_H_