blob: cb4cdfcc5c1ad349bd1b733a7821b3dfe1ab16aa [file] [log] [blame]
James Kuszmaule5deaa42022-06-24 16:29:31 -07001namespace aos::util {
2template <typename T, typename SharedState>
3ThreadedQueue<T, SharedState>::ThreadedQueue(
4 std::function<PushResult(SharedState)> push_request_handler,
5 SharedState initial_state)
6 : popped_(&mutex_),
7 pushed_(&mutex_),
8 state_(initial_state),
9 pusher_thread_([this, push_request_handler]() {
10 while (true) {
11 PushResult result = push_request_handler(State());
12 {
13 MutexLocker locker(&mutex_);
14 done_ = done_ || result.done;
15 if (result.item.has_value()) {
16 queue_.push(std::move(result.item.value()));
17 }
18 pushed_.Broadcast();
19 if (done_) {
20 return;
21 }
22 if (result.more_to_push || state_updated_) {
23 continue;
24 } else {
25 pusher_waiting_ = true;
26 CHECK(!popped_.Wait());
27 pusher_waiting_ = false;
28 }
29 }
30 }
31 }) {}
32
33template <typename T, typename SharedState>
34ThreadedQueue<T, SharedState>::~ThreadedQueue() {
35 StopPushing();
36 pusher_thread_.join();
37}
38
39template <typename T, typename SharedState>
40void ThreadedQueue<T, SharedState>::WaitForNoMoreWork() {
41 MutexLocker locker(&mutex_);
42 while (state_updated_ || (!pusher_waiting_ && !done_)) {
43 CHECK(!pushed_.Wait());
44 }
45}
46
47template <typename T, typename SharedState>
48SharedState ThreadedQueue<T, SharedState>::State() {
49 MutexLocker locker(&mutex_);
50 state_updated_ = false;
51 return state_;
52}
53
54template <typename T, typename SharedState>
55void ThreadedQueue<T, SharedState>::SetState(const SharedState &state) {
56 MutexLocker locker(&mutex_);
57 state_ = state;
58 state_updated_ = true;
59 popped_.Broadcast();
60}
61
62template <typename T, typename SharedState>
63void ThreadedQueue<T, SharedState>::StopPushing() {
64 // Ensure that the mutex is locked before doing anything, to make sure that
65 // the pushing thread actually observes the change.
66 MutexLocker locker(&mutex_);
67 done_ = true;
68 popped_.Broadcast();
69}
70
71template <typename T, typename SharedState>
72std::optional<T> ThreadedQueue<T, SharedState>::Peek() {
73 return PeekOrPop(false);
74}
75
76template <typename T, typename SharedState>
77std::optional<T> ThreadedQueue<T, SharedState>::Pop() {
78 return PeekOrPop(true);
79}
80
81template <typename T, typename SharedState>
82std::optional<T> ThreadedQueue<T, SharedState>::PeekOrPop(bool pop) {
83 MutexLocker locker(&mutex_);
84 while (!done_ && queue_.empty()) {
85 CHECK(!pushed_.Wait());
86 }
87 if (queue_.empty()) {
88 return std::nullopt;
89 }
90 if (pop) {
91 T result = std::move(queue_.front());
92 queue_.pop();
93 popped_.Broadcast();
94 return result;
95 } else {
96 return queue_.front();
97 }
98}
99} // namespace aos::util