blob: 471f6e120af2dc84c71600f5ac6df4a089b8f0f5 [file] [log] [blame]
Ravago Jonesc6b919f2023-01-01 21:34:12 -08001#ifndef AOS_UTIL_THREADED_CONSUMER_H_
2#define AOS_UTIL_THREADED_CONSUMER_H_
3
4#include <functional>
5#include <optional>
6#include <thread>
7
Stephan Pleinesb1177672024-05-27 17:48:32 -07008#include "glog/logging.h"
9
Ravago Jonesc6b919f2023-01-01 21:34:12 -080010#include "aos/condition.h"
11#include "aos/containers/ring_buffer.h"
12#include "aos/mutex/mutex.h"
13#include "aos/realtime.h"
14
Stephan Pleinesd99b1ee2024-02-02 20:56:44 -080015namespace aos::util {
Ravago Jonesc6b919f2023-01-01 21:34:12 -080016
17// This class implements a threadpool of a single worker that accepts work
18// from the main thread through a queue and executes it at a different realtime
19// priority.
20//
21// There is no mechanism to get data back to the main thread, the worker only
22// acts as a consumer. When this class is destroyed, it join()s the worker and
23// finishes all outstanding tasks.
24template <typename T, int QueueSize>
25class ThreadedConsumer {
26 public:
27 // Constructs a new ThreadedConsumer with the given consumer function to be
28 // run at the given realtime priority. If worker_priority is zero, the thread
29 // will stay at non realtime priority.
30 ThreadedConsumer(std::function<void(T)> consumer_function,
31 int worker_priority)
32 : consumer_function_(consumer_function),
33 worker_priority_(worker_priority),
34 more_tasks_(&mutex_),
35 worker_thread_([this]() { WorkerFunction(); }) {}
36
37 ~ThreadedConsumer() {
38 {
39 aos::MutexLocker locker(&mutex_);
40 quit_ = true;
41 more_tasks_.Broadcast();
42 }
43 worker_thread_.join();
44 }
45
46 // Submits another task to be processed by the worker.
47 // Returns true if successfully pushed onto the queue, and false if the queue
48 // is full.
49 bool Push(T task) {
50 aos::MutexLocker locker(&mutex_);
51
52 if (task_queue_.full()) {
53 return false;
54 }
55
56 task_queue_.Push(task);
57 more_tasks_.Broadcast();
58
59 return true;
60 }
61
62 private:
63 void WorkerFunction() {
64 if (worker_priority_ > 0) {
65 aos::SetCurrentThreadRealtimePriority(worker_priority_);
66 }
67
68 while (true) {
69 std::optional<T> task;
70
71 {
72 aos::MutexLocker locker(&mutex_);
73 while (task_queue_.empty() && !quit_) {
74 CHECK(!more_tasks_.Wait());
75 }
76
77 if (task_queue_.empty() && quit_) break;
78
79 // Pop
80 task = std::move(task_queue_[0]);
81 task_queue_.Shift();
82 }
83
84 consumer_function_(*task);
85 task.reset();
86 }
87
88 aos::UnsetCurrentThreadRealtimePriority();
89 }
90
91 std::function<void(T)> consumer_function_;
92 aos::RingBuffer<T, QueueSize> task_queue_;
93 aos::Mutex mutex_;
94 bool quit_ = false;
95 int worker_priority_;
96 aos::Condition more_tasks_;
97 std::thread worker_thread_;
98};
99
Stephan Pleinesd99b1ee2024-02-02 20:56:44 -0800100} // namespace aos::util
Ravago Jonesc6b919f2023-01-01 21:34:12 -0800101
102#endif // AOS_UTIL_THREADWORKER_H_