James Kuszmaul | e5deaa4 | 2022-06-24 16:29:31 -0700 | [diff] [blame^] | 1 | #ifndef AOS_UTIL_THREADED_QUEUE_H_ |
| 2 | #define AOS_UTIL_THREADED_QUEUE_H_ |
| 3 | #include <functional> |
| 4 | #include <optional> |
| 5 | #include <queue> |
| 6 | #include <thread> |
| 7 | |
| 8 | #include "aos/condition.h" |
| 9 | #include "aos/mutex/mutex.h" |
| 10 | |
| 11 | namespace aos::util { |
| 12 | // This class implements a queue of objects of type T in which a worker thread |
| 13 | // pushes and the calling thread pops from the queue. |
| 14 | // |
| 15 | // This is setup such that the user will pass a worker function that will get |
| 16 | // called in a separate thread whenever we think that there might be something |
| 17 | // to trigger more work to be done. All the methods on this calss are intended |
| 18 | // to be called from the main thread (not from within the worker function). |
| 19 | // Communication between the main thread and the worker can be achieved either |
| 20 | // by manually handling your own state, or preferably by using the SharedState |
| 21 | // object, which will get passed into the worker. The worker gets called every |
| 22 | // time that SetState() is called. |
| 23 | template <typename T, typename SharedState> |
| 24 | class ThreadedQueue { |
| 25 | public: |
| 26 | // PushResult is returned from the worker to indicate its current state. |
| 27 | struct PushResult { |
| 28 | // The new item to push, if any. If set to nullopt, nothing gets pushed. |
| 29 | std::optional<T> item = std::nullopt; |
| 30 | // Set to true if the worker knows that there is more work that it has to do |
| 31 | // and so should be immediately called again. If set to false, then the |
| 32 | // worker will not get called again until one of the following events |
| 33 | // occurs: |
| 34 | // 1) The queue is successfully popped from. |
| 35 | // 2) SetState() is called. |
| 36 | bool more_to_push = false; |
| 37 | // Set to true if there is no more work to be done. The worker should not |
| 38 | // get called after setting done to true. |
| 39 | bool done = false; |
| 40 | }; |
| 41 | ThreadedQueue( |
| 42 | std::function<PushResult(SharedState)> push_request_handler, |
| 43 | SharedState initial_state); |
| 44 | ~ThreadedQueue(); |
| 45 | // Sets state. Triggers a new call to push_request_handler. |
| 46 | void SetState(const SharedState &state); |
| 47 | // Returns the current front of the queue, blocking until a new item is |
| 48 | // available. Will only return nullopt if done is true and there will be no |
| 49 | // more items in the queue. |
| 50 | std::optional<T> Peek(); |
| 51 | // Identical to Peek(), except that it also removes the item from the queue. |
| 52 | std::optional<T> Pop(); |
| 53 | // Waits until the push_request_handler has returned more_to_push = false, and |
| 54 | // so is just spinning. Useful if you want ensure that the worker has done all |
| 55 | // the work it can before going to the next step. |
| 56 | void WaitForNoMoreWork(); |
| 57 | // Stops any further calls to push_request_handler. Used to terminate the |
| 58 | // queue from the calling thread. |
| 59 | void StopPushing(); |
| 60 | |
| 61 | private: |
| 62 | // Safely grabs the current state and returns a copy. |
| 63 | SharedState State(); |
| 64 | // Implements the Peek()/Pop() methods, blocking until we are either done or |
| 65 | // the next item is available in the queue. |
| 66 | std::optional<T> PeekOrPop(bool pop); |
| 67 | |
| 68 | // Mutex controlling access to all shared state (in this case, all the members |
| 69 | // of this class). |
| 70 | aos::Mutex mutex_; |
| 71 | // Condition variable used to indicate when anything has happened that should |
| 72 | // cause us to check for whether the worker can add anything new to the queue. |
| 73 | // Called "popped_" in reference to that it may be called when an item is |
| 74 | // popped from the queue. |
| 75 | aos::Condition popped_; |
| 76 | // Condition variable to indicate when an item has either been pushed to the |
| 77 | // queue or there is some other state change that consumers may care about |
| 78 | // (e.g., being done). |
| 79 | aos::Condition pushed_; |
| 80 | // TODO(jkuszmaul): Evaluate impact of dynamic memory allocation in |
| 81 | // std::queue, consider using aos::RingBuffer or similarly statically |
| 82 | // allocated buffer. |
| 83 | std::queue<T> queue_; |
| 84 | // Whether we are done processing entirely. |
| 85 | bool done_{false}; |
| 86 | // Set while the pusher thread is waiting on popped_. |
| 87 | // Used to notice when the push handler is out of work to do. |
| 88 | bool pusher_waiting_{false}; |
| 89 | // Set when SetState() is called, cleared when State() is read. Used to track |
| 90 | // whether the push handler has been called with the most recent state. |
| 91 | bool state_updated_{false}; |
| 92 | SharedState state_; |
| 93 | std::thread pusher_thread_; |
| 94 | }; |
| 95 | |
| 96 | } // namespace aos::util |
| 97 | |
| 98 | #include "aos/util/threaded_queue_tmpl.h" |
| 99 | #endif // AOS_UTIL_THREADED_QUEUE_H_ |