blob: 96375f0523783c54992d76c5dc47ad59af683cc6 [file] [log] [blame]
Ravago Jonesc6b919f2023-01-01 21:34:12 -08001#include "aos/util/threaded_consumer.h"
2
3#include "gtest/gtest.h"
4
5namespace aos {
6namespace util {
7
8// We expect it to be able to pass through everything we submit and recieves it
9// in the order that we submitted it. It should also be able to take in more
10// tasks than the size of the ring buffer as long as the worker doesn't get
11// behind.
12TEST(ThreadedConsumerTest, BasicFunction) {
13 std::atomic<int> counter{0};
14
15 ThreadedConsumer<int, 4> threaded_consumer(
16 [&counter](int task) {
17 LOG(INFO) << "task:" << task << " counter: " << counter;
18 counter = task;
19 },
20 0);
21
22 for (int number : {9, 7, 1, 3, 100, 300, 42}) {
23 EXPECT_TRUE(threaded_consumer.Push(number));
24
25 // wait
26 while (counter != number) {
27 std::this_thread::sleep_for(std::chrono::milliseconds(1));
28 }
29
30 EXPECT_EQ(counter, number);
31 }
32}
33
34// We should be able to raise the realtime priority of the worker thread, and
35// everything should work the same. It should also reset back to lower priority
36// when shutting down the worker thread.
37TEST(ThreadedConsumerTest, ElevatedPriority) {
38 std::atomic<int> counter{0};
39
40 {
41 ThreadedConsumer<int, 4> threaded_consumer(
42 [&counter](int task) {
43 CheckRealtime();
Austin Schuh32ac1202023-02-05 11:25:38 -080044 VLOG(1) << "task:" << task << " counter: " << counter;
Ravago Jonesc6b919f2023-01-01 21:34:12 -080045 counter = task;
46 },
47 20);
48
49 for (int number : {9, 7, 1, 3, 100, 300, 42}) {
50 EXPECT_TRUE(threaded_consumer.Push(number));
51
52 // wait
53 while (counter != number) {
54 std::this_thread::sleep_for(std::chrono::milliseconds(1));
55 }
56
57 EXPECT_EQ(counter, number);
58 }
59 }
60 // TODO: Check that the worker thread's priority actually gets reset before
61 // the thread is destroyed.
62
63 CheckNotRealtime();
64}
65
66// If the worker gets behind, we shouldn't silently take in more tasks and
67// destroy old ones.
68TEST(ThreadedConsumerTest, OverflowRingBuffer) {
69 std::atomic<int> counter{0};
70 std::atomic<int> should_block{true};
71
72 ThreadedConsumer<int, 4> threaded_consumer(
73 [&counter, &should_block](int task) {
Austin Schuh32ac1202023-02-05 11:25:38 -080074 VLOG(1) << "task:" << task << " counter: " << counter;
Ravago Jonesc6b919f2023-01-01 21:34:12 -080075
76 counter = task;
77
78 // prevent it from making any progress to simulate it getting behind
79 while (should_block) {
80 std::this_thread::sleep_for(std::chrono::milliseconds(1));
81 }
82 },
83 20);
84
85 // It consumes the 5 and then our worker blocks.
86 EXPECT_TRUE(threaded_consumer.Push(5));
87
88 // Wait for it to consume 5
89 while (counter != 5) {
90 std::this_thread::sleep_for(std::chrono::milliseconds(1));
91 };
92
93 // 4 more fills up the queue.
94 for (int number : {8, 9, 7, 1}) {
95 EXPECT_TRUE(threaded_consumer.Push(number));
96 }
97
98 // this one should overflow the buffer.
99 EXPECT_FALSE(threaded_consumer.Push(101));
100
101 // clean up, so we don't join() an infinite loop
102 should_block = false;
103}
104
105// The class should destruct gracefully and finish all of its work before
106// dissapearing.
107TEST(ThreadedConsumerTest, FinishesTasksOnQuit) {
108 std::atomic<int> counter{0};
109 std::atomic<int> should_block{true};
110
111 {
112 ThreadedConsumer<int, 4> threaded_consumer(
113 [&counter, &should_block](int task) {
Austin Schuh32ac1202023-02-05 11:25:38 -0800114 VLOG(1) << "task:" << task << " counter: " << counter;
Ravago Jonesc6b919f2023-01-01 21:34:12 -0800115
116 counter = task;
117
118 // prevent it from making any progress to simulate it getting behind
119 while (should_block) {
120 std::this_thread::sleep_for(std::chrono::milliseconds(1));
121 }
122 },
123 20);
124
125 // Give it some work to do
126 for (int number : {8, 9, 7, 1}) {
127 EXPECT_TRUE(threaded_consumer.Push(number));
128 }
129
130 // Wait for it to consume the first number
131 while (counter != 8) {
132 std::this_thread::sleep_for(std::chrono::milliseconds(1));
133 };
134
135 // allow it to continue
136 should_block = false;
137 }
138
139 // It should have finished all the work and gotten to the last number.
140 EXPECT_EQ(counter, 1);
141}
142
143} // namespace util
144} // namespace aos