blob: 8fd08e8e92f915ba6128f1473f7eea6c1de23997 [file] [log] [blame]
James Kuszmaule5deaa42022-06-24 16:29:31 -07001#include "aos/util/threaded_queue.h"
2
Stephan Pleinesb1177672024-05-27 17:48:32 -07003#include <atomic>
4#include <memory>
5#include <utility>
6
James Kuszmaule5deaa42022-06-24 16:29:31 -07007#include "gtest/gtest.h"
8
9namespace aos::util {
10
11TEST(ThreadedQueueTest, BasicFunction) {
12 std::atomic<int> counter{10000};
13 int state = 0;
14 std::atomic<int> observed_state{0};
15 ThreadedQueue<int, int> queue(
16 [&counter, &observed_state](const int state) {
17 // Because this handler always returns more_to_push = false, it will
18 // only get called when the queue is popped from.
19 observed_state = state;
20 int count = --counter;
21 return ThreadedQueue<int, int>::PushResult{count, false, count == 0};
22 },
23 state);
24 while (true) {
25 std::optional<int> peek_result = queue.Peek();
26 std::optional<int> pop_result = queue.Pop();
27 ASSERT_EQ(peek_result.has_value(), pop_result.has_value());
28 if (peek_result.has_value()) {
29 ASSERT_EQ(peek_result.value(), pop_result.value());
30 } else {
31 break;
32 }
33 state++;
34 queue.SetState(state);
35 }
36 ASSERT_EQ(counter, 0);
37 // Our API doesn't make any guarantee about the push/pop cycle being kept in
38 // lock-step, so just check that the observed state got incremente at all.
39 ASSERT_LT(1, observed_state);
40 ASSERT_EQ(state, 10000);
41}
42
43// Test running a queue where the consumer wants to have X entries pre-loaded
44// and so communicates its current state back to the pusher.
45TEST(ThreadedQueueTest, StatefulQueue) {
46 std::atomic<int> counter{10000};
47 int state = counter;
48 constexpr int kMaxLookahead = 10;
49 std::atomic<int> observed_state{0};
50 ThreadedQueue<int, int> queue(
51 [&counter, &observed_state](const int state) {
52 observed_state = state;
53 if (counter + kMaxLookahead < state) {
54 return ThreadedQueue<int, int>::PushResult{std::nullopt, false,
55 counter == 0};
56 } else {
57 int count = --counter;
58 return ThreadedQueue<int, int>::PushResult{count, true, count == 0};
59 }
60 },
61 state);
62 while (true) {
63 std::optional<int> peek_result = queue.Peek();
64 std::optional<int> pop_result = queue.Pop();
65 ASSERT_EQ(peek_result.has_value(), pop_result.has_value());
66 if (peek_result.has_value()) {
67 ASSERT_EQ(peek_result.value(), pop_result.value());
68 } else {
69 break;
70 }
71 for (int ii = 0; ii < 2 * kMaxLookahead; ++ii) {
72 // Trigger the internal condition variable a bunch of times to cause
73 // trouble.
74 queue.Peek();
75 }
76 // The pusher should never be more than the permissible distance ahead.
77 ASSERT_GE(counter + kMaxLookahead + 1, state);
78 ASSERT_GE(observed_state, state);
79 state--;
80 queue.SetState(state);
81 // Periodically pause, ensure that the pusher has enough time to catch up,
82 // and check that it has indeed pre-queued kMaxLookahead items.
83 if (state % 1000 == 0 && state > 0) {
84 queue.WaitForNoMoreWork();
85 ASSERT_EQ(observed_state, state);
86 ASSERT_EQ(counter + kMaxLookahead + 1, state);
87 }
88 }
89 ASSERT_EQ(counter, 0);
90 ASSERT_EQ(state, 0);
91}
92
James Kuszmaule5deaa42022-06-24 16:29:31 -070093// Tests that we can exit early without any issues.
94TEST(ThreadedQueueTest, ExitEarly) {
95 // There used to exist a deadlock in this case where StopPushing would
96 // improperly synchronize things internally, but required very (un)lucky
97 // timing to hit.
98 for (int ii = 0; ii < 10000; ++ii) {
99 ThreadedQueue<int, int> queue(
100 [](int) {
101 return ThreadedQueue<int, int>::PushResult{971, false, false};
102 },
103 0);
104 queue.StopPushing();
105 }
106}
107
108} // namespace aos::util