blob: 3bf4f36f021749c73b34611e6664098e67d38abf [file] [log] [blame]
Ravago Jonesc6b919f2023-01-01 21:34:12 -08001#ifndef AOS_UTIL_THREADED_CONSUMER_H_
2#define AOS_UTIL_THREADED_CONSUMER_H_
3
4#include <functional>
5#include <optional>
6#include <thread>
7
8#include "aos/condition.h"
9#include "aos/containers/ring_buffer.h"
10#include "aos/mutex/mutex.h"
11#include "aos/realtime.h"
12
Stephan Pleinesd99b1ee2024-02-02 20:56:44 -080013namespace aos::util {
Ravago Jonesc6b919f2023-01-01 21:34:12 -080014
15// This class implements a threadpool of a single worker that accepts work
16// from the main thread through a queue and executes it at a different realtime
17// priority.
18//
19// There is no mechanism to get data back to the main thread, the worker only
20// acts as a consumer. When this class is destroyed, it join()s the worker and
21// finishes all outstanding tasks.
22template <typename T, int QueueSize>
23class ThreadedConsumer {
24 public:
25 // Constructs a new ThreadedConsumer with the given consumer function to be
26 // run at the given realtime priority. If worker_priority is zero, the thread
27 // will stay at non realtime priority.
28 ThreadedConsumer(std::function<void(T)> consumer_function,
29 int worker_priority)
30 : consumer_function_(consumer_function),
31 worker_priority_(worker_priority),
32 more_tasks_(&mutex_),
33 worker_thread_([this]() { WorkerFunction(); }) {}
34
35 ~ThreadedConsumer() {
36 {
37 aos::MutexLocker locker(&mutex_);
38 quit_ = true;
39 more_tasks_.Broadcast();
40 }
41 worker_thread_.join();
42 }
43
44 // Submits another task to be processed by the worker.
45 // Returns true if successfully pushed onto the queue, and false if the queue
46 // is full.
47 bool Push(T task) {
48 aos::MutexLocker locker(&mutex_);
49
50 if (task_queue_.full()) {
51 return false;
52 }
53
54 task_queue_.Push(task);
55 more_tasks_.Broadcast();
56
57 return true;
58 }
59
60 private:
61 void WorkerFunction() {
62 if (worker_priority_ > 0) {
63 aos::SetCurrentThreadRealtimePriority(worker_priority_);
64 }
65
66 while (true) {
67 std::optional<T> task;
68
69 {
70 aos::MutexLocker locker(&mutex_);
71 while (task_queue_.empty() && !quit_) {
72 CHECK(!more_tasks_.Wait());
73 }
74
75 if (task_queue_.empty() && quit_) break;
76
77 // Pop
78 task = std::move(task_queue_[0]);
79 task_queue_.Shift();
80 }
81
82 consumer_function_(*task);
83 task.reset();
84 }
85
86 aos::UnsetCurrentThreadRealtimePriority();
87 }
88
89 std::function<void(T)> consumer_function_;
90 aos::RingBuffer<T, QueueSize> task_queue_;
91 aos::Mutex mutex_;
92 bool quit_ = false;
93 int worker_priority_;
94 aos::Condition more_tasks_;
95 std::thread worker_thread_;
96};
97
Stephan Pleinesd99b1ee2024-02-02 20:56:44 -080098} // namespace aos::util
Ravago Jonesc6b919f2023-01-01 21:34:12 -080099
100#endif // AOS_UTIL_THREADWORKER_H_