blob: 36f3121a6532c7861439444085609b2017c95b59 [file] [log] [blame]
Ravago Jonesc6b919f2023-01-01 21:34:12 -08001#include "aos/util/threaded_consumer.h"
2
3#include "gtest/gtest.h"
4
Stephan Pleinesf63bde82024-01-13 15:59:33 -08005namespace aos::util {
Ravago Jonesc6b919f2023-01-01 21:34:12 -08006
7// We expect it to be able to pass through everything we submit and recieves it
8// in the order that we submitted it. It should also be able to take in more
9// tasks than the size of the ring buffer as long as the worker doesn't get
10// behind.
11TEST(ThreadedConsumerTest, BasicFunction) {
12 std::atomic<int> counter{0};
13
14 ThreadedConsumer<int, 4> threaded_consumer(
15 [&counter](int task) {
16 LOG(INFO) << "task:" << task << " counter: " << counter;
17 counter = task;
18 },
19 0);
20
21 for (int number : {9, 7, 1, 3, 100, 300, 42}) {
22 EXPECT_TRUE(threaded_consumer.Push(number));
23
24 // wait
25 while (counter != number) {
26 std::this_thread::sleep_for(std::chrono::milliseconds(1));
27 }
28
29 EXPECT_EQ(counter, number);
30 }
31}
32
33// We should be able to raise the realtime priority of the worker thread, and
34// everything should work the same. It should also reset back to lower priority
35// when shutting down the worker thread.
36TEST(ThreadedConsumerTest, ElevatedPriority) {
37 std::atomic<int> counter{0};
38
39 {
40 ThreadedConsumer<int, 4> threaded_consumer(
41 [&counter](int task) {
42 CheckRealtime();
Austin Schuh32ac1202023-02-05 11:25:38 -080043 VLOG(1) << "task:" << task << " counter: " << counter;
Ravago Jonesc6b919f2023-01-01 21:34:12 -080044 counter = task;
45 },
46 20);
47
48 for (int number : {9, 7, 1, 3, 100, 300, 42}) {
49 EXPECT_TRUE(threaded_consumer.Push(number));
50
51 // wait
52 while (counter != number) {
53 std::this_thread::sleep_for(std::chrono::milliseconds(1));
54 }
55
56 EXPECT_EQ(counter, number);
57 }
58 }
59 // TODO: Check that the worker thread's priority actually gets reset before
60 // the thread is destroyed.
61
62 CheckNotRealtime();
63}
64
65// If the worker gets behind, we shouldn't silently take in more tasks and
66// destroy old ones.
67TEST(ThreadedConsumerTest, OverflowRingBuffer) {
68 std::atomic<int> counter{0};
69 std::atomic<int> should_block{true};
70
71 ThreadedConsumer<int, 4> threaded_consumer(
72 [&counter, &should_block](int task) {
Austin Schuh32ac1202023-02-05 11:25:38 -080073 VLOG(1) << "task:" << task << " counter: " << counter;
Ravago Jonesc6b919f2023-01-01 21:34:12 -080074
75 counter = task;
76
77 // prevent it from making any progress to simulate it getting behind
78 while (should_block) {
79 std::this_thread::sleep_for(std::chrono::milliseconds(1));
80 }
81 },
82 20);
83
84 // It consumes the 5 and then our worker blocks.
85 EXPECT_TRUE(threaded_consumer.Push(5));
86
87 // Wait for it to consume 5
88 while (counter != 5) {
89 std::this_thread::sleep_for(std::chrono::milliseconds(1));
90 };
91
92 // 4 more fills up the queue.
93 for (int number : {8, 9, 7, 1}) {
94 EXPECT_TRUE(threaded_consumer.Push(number));
95 }
96
97 // this one should overflow the buffer.
98 EXPECT_FALSE(threaded_consumer.Push(101));
99
100 // clean up, so we don't join() an infinite loop
101 should_block = false;
102}
103
104// The class should destruct gracefully and finish all of its work before
105// dissapearing.
106TEST(ThreadedConsumerTest, FinishesTasksOnQuit) {
107 std::atomic<int> counter{0};
108 std::atomic<int> should_block{true};
109
110 {
111 ThreadedConsumer<int, 4> threaded_consumer(
112 [&counter, &should_block](int task) {
Austin Schuh32ac1202023-02-05 11:25:38 -0800113 VLOG(1) << "task:" << task << " counter: " << counter;
Ravago Jonesc6b919f2023-01-01 21:34:12 -0800114
115 counter = task;
116
117 // prevent it from making any progress to simulate it getting behind
118 while (should_block) {
119 std::this_thread::sleep_for(std::chrono::milliseconds(1));
120 }
121 },
122 20);
123
124 // Give it some work to do
125 for (int number : {8, 9, 7, 1}) {
126 EXPECT_TRUE(threaded_consumer.Push(number));
127 }
128
129 // Wait for it to consume the first number
130 while (counter != 8) {
131 std::this_thread::sleep_for(std::chrono::milliseconds(1));
132 };
133
134 // allow it to continue
135 should_block = false;
136 }
137
138 // It should have finished all the work and gotten to the last number.
139 EXPECT_EQ(counter, 1);
140}
141
Stephan Pleinesf63bde82024-01-13 15:59:33 -0800142} // namespace aos::util