blob: 0b066321d502a7d1d3aaacdb225332690b0199e5 [file] [log] [blame]
James Kuszmaule5deaa42022-06-24 16:29:31 -07001#ifndef AOS_UTIL_THREADED_QUEUE_H_
2#define AOS_UTIL_THREADED_QUEUE_H_
3#include <functional>
4#include <optional>
5#include <queue>
6#include <thread>
7
8#include "aos/condition.h"
9#include "aos/mutex/mutex.h"
10
11namespace aos::util {
12// This class implements a queue of objects of type T in which a worker thread
13// pushes and the calling thread pops from the queue.
14//
15// This is setup such that the user will pass a worker function that will get
16// called in a separate thread whenever we think that there might be something
17// to trigger more work to be done. All the methods on this calss are intended
18// to be called from the main thread (not from within the worker function).
19// Communication between the main thread and the worker can be achieved either
20// by manually handling your own state, or preferably by using the SharedState
21// object, which will get passed into the worker. The worker gets called every
22// time that SetState() is called.
23template <typename T, typename SharedState>
24class ThreadedQueue {
25 public:
26 // PushResult is returned from the worker to indicate its current state.
27 struct PushResult {
28 // The new item to push, if any. If set to nullopt, nothing gets pushed.
29 std::optional<T> item = std::nullopt;
30 // Set to true if the worker knows that there is more work that it has to do
31 // and so should be immediately called again. If set to false, then the
32 // worker will not get called again until one of the following events
33 // occurs:
34 // 1) The queue is successfully popped from.
35 // 2) SetState() is called.
36 bool more_to_push = false;
37 // Set to true if there is no more work to be done. The worker should not
38 // get called after setting done to true.
39 bool done = false;
40 };
41 ThreadedQueue(
42 std::function<PushResult(SharedState)> push_request_handler,
43 SharedState initial_state);
44 ~ThreadedQueue();
45 // Sets state. Triggers a new call to push_request_handler.
46 void SetState(const SharedState &state);
47 // Returns the current front of the queue, blocking until a new item is
48 // available. Will only return nullopt if done is true and there will be no
49 // more items in the queue.
50 std::optional<T> Peek();
51 // Identical to Peek(), except that it also removes the item from the queue.
52 std::optional<T> Pop();
53 // Waits until the push_request_handler has returned more_to_push = false, and
54 // so is just spinning. Useful if you want ensure that the worker has done all
55 // the work it can before going to the next step.
56 void WaitForNoMoreWork();
57 // Stops any further calls to push_request_handler. Used to terminate the
58 // queue from the calling thread.
59 void StopPushing();
60
61 private:
62 // Safely grabs the current state and returns a copy.
63 SharedState State();
64 // Implements the Peek()/Pop() methods, blocking until we are either done or
65 // the next item is available in the queue.
66 std::optional<T> PeekOrPop(bool pop);
67
68 // Mutex controlling access to all shared state (in this case, all the members
69 // of this class).
70 aos::Mutex mutex_;
71 // Condition variable used to indicate when anything has happened that should
72 // cause us to check for whether the worker can add anything new to the queue.
73 // Called "popped_" in reference to that it may be called when an item is
74 // popped from the queue.
75 aos::Condition popped_;
76 // Condition variable to indicate when an item has either been pushed to the
77 // queue or there is some other state change that consumers may care about
78 // (e.g., being done).
79 aos::Condition pushed_;
80 // TODO(jkuszmaul): Evaluate impact of dynamic memory allocation in
81 // std::queue, consider using aos::RingBuffer or similarly statically
82 // allocated buffer.
83 std::queue<T> queue_;
84 // Whether we are done processing entirely.
85 bool done_{false};
86 // Set while the pusher thread is waiting on popped_.
87 // Used to notice when the push handler is out of work to do.
88 bool pusher_waiting_{false};
89 // Set when SetState() is called, cleared when State() is read. Used to track
90 // whether the push handler has been called with the most recent state.
91 bool state_updated_{false};
92 SharedState state_;
93 std::thread pusher_thread_;
94};
95
96} // namespace aos::util
97
98#include "aos/util/threaded_queue_tmpl.h"
99#endif // AOS_UTIL_THREADED_QUEUE_H_