blob: cb4cdfcc5c1ad349bd1b733a7821b3dfe1ab16aa [file] [log] [blame]
namespace aos::util {
template <typename T, typename SharedState>
ThreadedQueue<T, SharedState>::ThreadedQueue(
std::function<PushResult(SharedState)> push_request_handler,
SharedState initial_state)
: popped_(&mutex_),
pushed_(&mutex_),
state_(initial_state),
pusher_thread_([this, push_request_handler]() {
while (true) {
PushResult result = push_request_handler(State());
{
MutexLocker locker(&mutex_);
done_ = done_ || result.done;
if (result.item.has_value()) {
queue_.push(std::move(result.item.value()));
}
pushed_.Broadcast();
if (done_) {
return;
}
if (result.more_to_push || state_updated_) {
continue;
} else {
pusher_waiting_ = true;
CHECK(!popped_.Wait());
pusher_waiting_ = false;
}
}
}
}) {}
template <typename T, typename SharedState>
ThreadedQueue<T, SharedState>::~ThreadedQueue() {
StopPushing();
pusher_thread_.join();
}
template <typename T, typename SharedState>
void ThreadedQueue<T, SharedState>::WaitForNoMoreWork() {
MutexLocker locker(&mutex_);
while (state_updated_ || (!pusher_waiting_ && !done_)) {
CHECK(!pushed_.Wait());
}
}
template <typename T, typename SharedState>
SharedState ThreadedQueue<T, SharedState>::State() {
MutexLocker locker(&mutex_);
state_updated_ = false;
return state_;
}
template <typename T, typename SharedState>
void ThreadedQueue<T, SharedState>::SetState(const SharedState &state) {
MutexLocker locker(&mutex_);
state_ = state;
state_updated_ = true;
popped_.Broadcast();
}
template <typename T, typename SharedState>
void ThreadedQueue<T, SharedState>::StopPushing() {
// Ensure that the mutex is locked before doing anything, to make sure that
// the pushing thread actually observes the change.
MutexLocker locker(&mutex_);
done_ = true;
popped_.Broadcast();
}
template <typename T, typename SharedState>
std::optional<T> ThreadedQueue<T, SharedState>::Peek() {
return PeekOrPop(false);
}
template <typename T, typename SharedState>
std::optional<T> ThreadedQueue<T, SharedState>::Pop() {
return PeekOrPop(true);
}
template <typename T, typename SharedState>
std::optional<T> ThreadedQueue<T, SharedState>::PeekOrPop(bool pop) {
MutexLocker locker(&mutex_);
while (!done_ && queue_.empty()) {
CHECK(!pushed_.Wait());
}
if (queue_.empty()) {
return std::nullopt;
}
if (pop) {
T result = std::move(queue_.front());
queue_.pop();
popped_.Broadcast();
return result;
} else {
return queue_.front();
}
}
} // namespace aos::util