James Kuszmaul | e5deaa4 | 2022-06-24 16:29:31 -0700 | [diff] [blame^] | 1 | namespace aos::util { |
| 2 | template <typename T, typename SharedState> |
| 3 | ThreadedQueue<T, SharedState>::ThreadedQueue( |
| 4 | std::function<PushResult(SharedState)> push_request_handler, |
| 5 | SharedState initial_state) |
| 6 | : popped_(&mutex_), |
| 7 | pushed_(&mutex_), |
| 8 | state_(initial_state), |
| 9 | pusher_thread_([this, push_request_handler]() { |
| 10 | while (true) { |
| 11 | PushResult result = push_request_handler(State()); |
| 12 | { |
| 13 | MutexLocker locker(&mutex_); |
| 14 | done_ = done_ || result.done; |
| 15 | if (result.item.has_value()) { |
| 16 | queue_.push(std::move(result.item.value())); |
| 17 | } |
| 18 | pushed_.Broadcast(); |
| 19 | if (done_) { |
| 20 | return; |
| 21 | } |
| 22 | if (result.more_to_push || state_updated_) { |
| 23 | continue; |
| 24 | } else { |
| 25 | pusher_waiting_ = true; |
| 26 | CHECK(!popped_.Wait()); |
| 27 | pusher_waiting_ = false; |
| 28 | } |
| 29 | } |
| 30 | } |
| 31 | }) {} |
| 32 | |
| 33 | template <typename T, typename SharedState> |
| 34 | ThreadedQueue<T, SharedState>::~ThreadedQueue() { |
| 35 | StopPushing(); |
| 36 | pusher_thread_.join(); |
| 37 | } |
| 38 | |
| 39 | template <typename T, typename SharedState> |
| 40 | void ThreadedQueue<T, SharedState>::WaitForNoMoreWork() { |
| 41 | MutexLocker locker(&mutex_); |
| 42 | while (state_updated_ || (!pusher_waiting_ && !done_)) { |
| 43 | CHECK(!pushed_.Wait()); |
| 44 | } |
| 45 | } |
| 46 | |
| 47 | template <typename T, typename SharedState> |
| 48 | SharedState ThreadedQueue<T, SharedState>::State() { |
| 49 | MutexLocker locker(&mutex_); |
| 50 | state_updated_ = false; |
| 51 | return state_; |
| 52 | } |
| 53 | |
| 54 | template <typename T, typename SharedState> |
| 55 | void ThreadedQueue<T, SharedState>::SetState(const SharedState &state) { |
| 56 | MutexLocker locker(&mutex_); |
| 57 | state_ = state; |
| 58 | state_updated_ = true; |
| 59 | popped_.Broadcast(); |
| 60 | } |
| 61 | |
| 62 | template <typename T, typename SharedState> |
| 63 | void ThreadedQueue<T, SharedState>::StopPushing() { |
| 64 | // Ensure that the mutex is locked before doing anything, to make sure that |
| 65 | // the pushing thread actually observes the change. |
| 66 | MutexLocker locker(&mutex_); |
| 67 | done_ = true; |
| 68 | popped_.Broadcast(); |
| 69 | } |
| 70 | |
| 71 | template <typename T, typename SharedState> |
| 72 | std::optional<T> ThreadedQueue<T, SharedState>::Peek() { |
| 73 | return PeekOrPop(false); |
| 74 | } |
| 75 | |
| 76 | template <typename T, typename SharedState> |
| 77 | std::optional<T> ThreadedQueue<T, SharedState>::Pop() { |
| 78 | return PeekOrPop(true); |
| 79 | } |
| 80 | |
| 81 | template <typename T, typename SharedState> |
| 82 | std::optional<T> ThreadedQueue<T, SharedState>::PeekOrPop(bool pop) { |
| 83 | MutexLocker locker(&mutex_); |
| 84 | while (!done_ && queue_.empty()) { |
| 85 | CHECK(!pushed_.Wait()); |
| 86 | } |
| 87 | if (queue_.empty()) { |
| 88 | return std::nullopt; |
| 89 | } |
| 90 | if (pop) { |
| 91 | T result = std::move(queue_.front()); |
| 92 | queue_.pop(); |
| 93 | popped_.Broadcast(); |
| 94 | return result; |
| 95 | } else { |
| 96 | return queue_.front(); |
| 97 | } |
| 98 | } |
| 99 | } // namespace aos::util |