blob: 05508a80fbb506975437ffddf2d47add872c1056 [file] [log] [blame]
#ifndef AOS_UTIL_THREADED_CONSUMER_H_
#define AOS_UTIL_THREADED_CONSUMER_H_
#include <functional>
#include <optional>
#include <thread>
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "aos/condition.h"
#include "aos/containers/ring_buffer.h"
#include "aos/mutex/mutex.h"
#include "aos/realtime.h"
namespace aos::util {
// This class implements a threadpool of a single worker that accepts work
// from the main thread through a queue and executes it at a different realtime
// priority.
//
// There is no mechanism to get data back to the main thread, the worker only
// acts as a consumer. When this class is destroyed, it join()s the worker and
// finishes all outstanding tasks.
template <typename T, int QueueSize>
class ThreadedConsumer {
public:
// Constructs a new ThreadedConsumer with the given consumer function to be
// run at the given realtime priority. If worker_priority is zero, the thread
// will stay at non realtime priority.
ThreadedConsumer(std::function<void(T)> consumer_function,
int worker_priority)
: consumer_function_(consumer_function),
worker_priority_(worker_priority),
more_tasks_(&mutex_),
worker_thread_([this]() { WorkerFunction(); }) {}
~ThreadedConsumer() {
{
aos::MutexLocker locker(&mutex_);
quit_ = true;
more_tasks_.Broadcast();
}
worker_thread_.join();
}
// Submits another task to be processed by the worker.
// Returns true if successfully pushed onto the queue, and false if the queue
// is full.
bool Push(T task) {
aos::MutexLocker locker(&mutex_);
if (task_queue_.full()) {
return false;
}
task_queue_.Push(task);
more_tasks_.Broadcast();
return true;
}
private:
void WorkerFunction() {
if (worker_priority_ > 0) {
aos::SetCurrentThreadRealtimePriority(worker_priority_);
}
while (true) {
std::optional<T> task;
{
aos::MutexLocker locker(&mutex_);
while (task_queue_.empty() && !quit_) {
CHECK(!more_tasks_.Wait());
}
if (task_queue_.empty() && quit_) break;
// Pop
task = std::move(task_queue_[0]);
task_queue_.Shift();
}
consumer_function_(*task);
task.reset();
}
aos::UnsetCurrentThreadRealtimePriority();
}
std::function<void(T)> consumer_function_;
aos::RingBuffer<T, QueueSize> task_queue_;
aos::Mutex mutex_;
bool quit_ = false;
int worker_priority_;
aos::Condition more_tasks_;
std::thread worker_thread_;
};
} // namespace aos::util
#endif // AOS_UTIL_THREADWORKER_H_