blob: 27dc183bc74d10768600618318456708d043072e [file] [log] [blame]
Ravago Jonesc6b919f2023-01-01 21:34:12 -08001#include "aos/util/threaded_consumer.h"
2
Stephan Pleinesb1177672024-05-27 17:48:32 -07003#include <atomic>
4#include <chrono>
5#include <initializer_list>
6#include <memory>
7#include <ostream>
8
Ravago Jonesc6b919f2023-01-01 21:34:12 -08009#include "gtest/gtest.h"
10
Stephan Pleinesf63bde82024-01-13 15:59:33 -080011namespace aos::util {
Ravago Jonesc6b919f2023-01-01 21:34:12 -080012
13// We expect it to be able to pass through everything we submit and recieves it
14// in the order that we submitted it. It should also be able to take in more
15// tasks than the size of the ring buffer as long as the worker doesn't get
16// behind.
17TEST(ThreadedConsumerTest, BasicFunction) {
18 std::atomic<int> counter{0};
19
20 ThreadedConsumer<int, 4> threaded_consumer(
21 [&counter](int task) {
22 LOG(INFO) << "task:" << task << " counter: " << counter;
23 counter = task;
24 },
25 0);
26
27 for (int number : {9, 7, 1, 3, 100, 300, 42}) {
28 EXPECT_TRUE(threaded_consumer.Push(number));
29
30 // wait
31 while (counter != number) {
32 std::this_thread::sleep_for(std::chrono::milliseconds(1));
33 }
34
35 EXPECT_EQ(counter, number);
36 }
37}
38
39// We should be able to raise the realtime priority of the worker thread, and
40// everything should work the same. It should also reset back to lower priority
41// when shutting down the worker thread.
42TEST(ThreadedConsumerTest, ElevatedPriority) {
43 std::atomic<int> counter{0};
44
45 {
46 ThreadedConsumer<int, 4> threaded_consumer(
47 [&counter](int task) {
48 CheckRealtime();
Austin Schuh32ac1202023-02-05 11:25:38 -080049 VLOG(1) << "task:" << task << " counter: " << counter;
Ravago Jonesc6b919f2023-01-01 21:34:12 -080050 counter = task;
51 },
52 20);
53
54 for (int number : {9, 7, 1, 3, 100, 300, 42}) {
55 EXPECT_TRUE(threaded_consumer.Push(number));
56
57 // wait
58 while (counter != number) {
59 std::this_thread::sleep_for(std::chrono::milliseconds(1));
60 }
61
62 EXPECT_EQ(counter, number);
63 }
64 }
65 // TODO: Check that the worker thread's priority actually gets reset before
66 // the thread is destroyed.
67
68 CheckNotRealtime();
69}
70
71// If the worker gets behind, we shouldn't silently take in more tasks and
72// destroy old ones.
73TEST(ThreadedConsumerTest, OverflowRingBuffer) {
74 std::atomic<int> counter{0};
75 std::atomic<int> should_block{true};
76
77 ThreadedConsumer<int, 4> threaded_consumer(
78 [&counter, &should_block](int task) {
Austin Schuh32ac1202023-02-05 11:25:38 -080079 VLOG(1) << "task:" << task << " counter: " << counter;
Ravago Jonesc6b919f2023-01-01 21:34:12 -080080
81 counter = task;
82
83 // prevent it from making any progress to simulate it getting behind
84 while (should_block) {
85 std::this_thread::sleep_for(std::chrono::milliseconds(1));
86 }
87 },
88 20);
89
90 // It consumes the 5 and then our worker blocks.
91 EXPECT_TRUE(threaded_consumer.Push(5));
92
93 // Wait for it to consume 5
94 while (counter != 5) {
95 std::this_thread::sleep_for(std::chrono::milliseconds(1));
96 };
97
98 // 4 more fills up the queue.
99 for (int number : {8, 9, 7, 1}) {
100 EXPECT_TRUE(threaded_consumer.Push(number));
101 }
102
103 // this one should overflow the buffer.
104 EXPECT_FALSE(threaded_consumer.Push(101));
105
106 // clean up, so we don't join() an infinite loop
107 should_block = false;
108}
109
110// The class should destruct gracefully and finish all of its work before
111// dissapearing.
112TEST(ThreadedConsumerTest, FinishesTasksOnQuit) {
113 std::atomic<int> counter{0};
114 std::atomic<int> should_block{true};
115
116 {
117 ThreadedConsumer<int, 4> threaded_consumer(
118 [&counter, &should_block](int task) {
Austin Schuh32ac1202023-02-05 11:25:38 -0800119 VLOG(1) << "task:" << task << " counter: " << counter;
Ravago Jonesc6b919f2023-01-01 21:34:12 -0800120
121 counter = task;
122
123 // prevent it from making any progress to simulate it getting behind
124 while (should_block) {
125 std::this_thread::sleep_for(std::chrono::milliseconds(1));
126 }
127 },
128 20);
129
130 // Give it some work to do
131 for (int number : {8, 9, 7, 1}) {
132 EXPECT_TRUE(threaded_consumer.Push(number));
133 }
134
135 // Wait for it to consume the first number
136 while (counter != 8) {
137 std::this_thread::sleep_for(std::chrono::milliseconds(1));
138 };
139
140 // allow it to continue
141 should_block = false;
142 }
143
144 // It should have finished all the work and gotten to the last number.
145 EXPECT_EQ(counter, 1);
146}
147
Stephan Pleinesf63bde82024-01-13 15:59:33 -0800148} // namespace aos::util