blob: 95ec79ae82f17b3b5996832012e4145dc4e45046 [file] [log] [blame]
Ravago Jonesc6b919f2023-01-01 21:34:12 -08001#ifndef AOS_UTIL_THREADED_CONSUMER_H_
2#define AOS_UTIL_THREADED_CONSUMER_H_
3
4#include <functional>
5#include <optional>
6#include <thread>
7
8#include "aos/condition.h"
9#include "aos/containers/ring_buffer.h"
10#include "aos/mutex/mutex.h"
11#include "aos/realtime.h"
12
13namespace aos {
14namespace util {
15
16// This class implements a threadpool of a single worker that accepts work
17// from the main thread through a queue and executes it at a different realtime
18// priority.
19//
20// There is no mechanism to get data back to the main thread, the worker only
21// acts as a consumer. When this class is destroyed, it join()s the worker and
22// finishes all outstanding tasks.
23template <typename T, int QueueSize>
24class ThreadedConsumer {
25 public:
26 // Constructs a new ThreadedConsumer with the given consumer function to be
27 // run at the given realtime priority. If worker_priority is zero, the thread
28 // will stay at non realtime priority.
29 ThreadedConsumer(std::function<void(T)> consumer_function,
30 int worker_priority)
31 : consumer_function_(consumer_function),
32 worker_priority_(worker_priority),
33 more_tasks_(&mutex_),
34 worker_thread_([this]() { WorkerFunction(); }) {}
35
36 ~ThreadedConsumer() {
37 {
38 aos::MutexLocker locker(&mutex_);
39 quit_ = true;
40 more_tasks_.Broadcast();
41 }
42 worker_thread_.join();
43 }
44
45 // Submits another task to be processed by the worker.
46 // Returns true if successfully pushed onto the queue, and false if the queue
47 // is full.
48 bool Push(T task) {
49 aos::MutexLocker locker(&mutex_);
50
51 if (task_queue_.full()) {
52 return false;
53 }
54
55 task_queue_.Push(task);
56 more_tasks_.Broadcast();
57
58 return true;
59 }
60
61 private:
62 void WorkerFunction() {
63 if (worker_priority_ > 0) {
64 aos::SetCurrentThreadRealtimePriority(worker_priority_);
65 }
66
67 while (true) {
68 std::optional<T> task;
69
70 {
71 aos::MutexLocker locker(&mutex_);
72 while (task_queue_.empty() && !quit_) {
73 CHECK(!more_tasks_.Wait());
74 }
75
76 if (task_queue_.empty() && quit_) break;
77
78 // Pop
79 task = std::move(task_queue_[0]);
80 task_queue_.Shift();
81 }
82
83 consumer_function_(*task);
84 task.reset();
85 }
86
87 aos::UnsetCurrentThreadRealtimePriority();
88 }
89
90 std::function<void(T)> consumer_function_;
91 aos::RingBuffer<T, QueueSize> task_queue_;
92 aos::Mutex mutex_;
93 bool quit_ = false;
94 int worker_priority_;
95 aos::Condition more_tasks_;
96 std::thread worker_thread_;
97};
98
99} // namespace util
100} // namespace aos
101
102#endif // AOS_UTIL_THREADWORKER_H_