blob: 05508a80fbb506975437ffddf2d47add872c1056 [file] [log] [blame]
Ravago Jonesc6b919f2023-01-01 21:34:12 -08001#ifndef AOS_UTIL_THREADED_CONSUMER_H_
2#define AOS_UTIL_THREADED_CONSUMER_H_
3
4#include <functional>
5#include <optional>
6#include <thread>
7
Austin Schuh99f7c6a2024-06-25 22:07:44 -07008#include "absl/log/check.h"
9#include "absl/log/log.h"
Stephan Pleinesb1177672024-05-27 17:48:32 -070010
Ravago Jonesc6b919f2023-01-01 21:34:12 -080011#include "aos/condition.h"
12#include "aos/containers/ring_buffer.h"
13#include "aos/mutex/mutex.h"
14#include "aos/realtime.h"
15
Stephan Pleinesd99b1ee2024-02-02 20:56:44 -080016namespace aos::util {
Ravago Jonesc6b919f2023-01-01 21:34:12 -080017
18// This class implements a threadpool of a single worker that accepts work
19// from the main thread through a queue and executes it at a different realtime
20// priority.
21//
22// There is no mechanism to get data back to the main thread, the worker only
23// acts as a consumer. When this class is destroyed, it join()s the worker and
24// finishes all outstanding tasks.
25template <typename T, int QueueSize>
26class ThreadedConsumer {
27 public:
28 // Constructs a new ThreadedConsumer with the given consumer function to be
29 // run at the given realtime priority. If worker_priority is zero, the thread
30 // will stay at non realtime priority.
31 ThreadedConsumer(std::function<void(T)> consumer_function,
32 int worker_priority)
33 : consumer_function_(consumer_function),
34 worker_priority_(worker_priority),
35 more_tasks_(&mutex_),
36 worker_thread_([this]() { WorkerFunction(); }) {}
37
38 ~ThreadedConsumer() {
39 {
40 aos::MutexLocker locker(&mutex_);
41 quit_ = true;
42 more_tasks_.Broadcast();
43 }
44 worker_thread_.join();
45 }
46
47 // Submits another task to be processed by the worker.
48 // Returns true if successfully pushed onto the queue, and false if the queue
49 // is full.
50 bool Push(T task) {
51 aos::MutexLocker locker(&mutex_);
52
53 if (task_queue_.full()) {
54 return false;
55 }
56
57 task_queue_.Push(task);
58 more_tasks_.Broadcast();
59
60 return true;
61 }
62
63 private:
64 void WorkerFunction() {
65 if (worker_priority_ > 0) {
66 aos::SetCurrentThreadRealtimePriority(worker_priority_);
67 }
68
69 while (true) {
70 std::optional<T> task;
71
72 {
73 aos::MutexLocker locker(&mutex_);
74 while (task_queue_.empty() && !quit_) {
75 CHECK(!more_tasks_.Wait());
76 }
77
78 if (task_queue_.empty() && quit_) break;
79
80 // Pop
81 task = std::move(task_queue_[0]);
82 task_queue_.Shift();
83 }
84
85 consumer_function_(*task);
86 task.reset();
87 }
88
89 aos::UnsetCurrentThreadRealtimePriority();
90 }
91
92 std::function<void(T)> consumer_function_;
93 aos::RingBuffer<T, QueueSize> task_queue_;
94 aos::Mutex mutex_;
95 bool quit_ = false;
96 int worker_priority_;
97 aos::Condition more_tasks_;
98 std::thread worker_thread_;
99};
100
Stephan Pleinesd99b1ee2024-02-02 20:56:44 -0800101} // namespace aos::util
Ravago Jonesc6b919f2023-01-01 21:34:12 -0800102
103#endif // AOS_UTIL_THREADWORKER_H_