blob: 4f27c57eaaa57ad386e5f91dd87d67f47fd0d578 [file] [log] [blame]
James Kuszmaule5deaa42022-06-24 16:29:31 -07001#include "aos/util/threaded_queue.h"
2
3#include "gtest/gtest.h"
4
5namespace aos::util {
6
7TEST(ThreadedQueueTest, BasicFunction) {
8 std::atomic<int> counter{10000};
9 int state = 0;
10 std::atomic<int> observed_state{0};
11 ThreadedQueue<int, int> queue(
12 [&counter, &observed_state](const int state) {
13 // Because this handler always returns more_to_push = false, it will
14 // only get called when the queue is popped from.
15 observed_state = state;
16 int count = --counter;
17 return ThreadedQueue<int, int>::PushResult{count, false, count == 0};
18 },
19 state);
20 while (true) {
21 std::optional<int> peek_result = queue.Peek();
22 std::optional<int> pop_result = queue.Pop();
23 ASSERT_EQ(peek_result.has_value(), pop_result.has_value());
24 if (peek_result.has_value()) {
25 ASSERT_EQ(peek_result.value(), pop_result.value());
26 } else {
27 break;
28 }
29 state++;
30 queue.SetState(state);
31 }
32 ASSERT_EQ(counter, 0);
33 // Our API doesn't make any guarantee about the push/pop cycle being kept in
34 // lock-step, so just check that the observed state got incremente at all.
35 ASSERT_LT(1, observed_state);
36 ASSERT_EQ(state, 10000);
37}
38
39// Test running a queue where the consumer wants to have X entries pre-loaded
40// and so communicates its current state back to the pusher.
41TEST(ThreadedQueueTest, StatefulQueue) {
42 std::atomic<int> counter{10000};
43 int state = counter;
44 constexpr int kMaxLookahead = 10;
45 std::atomic<int> observed_state{0};
46 ThreadedQueue<int, int> queue(
47 [&counter, &observed_state](const int state) {
48 observed_state = state;
49 if (counter + kMaxLookahead < state) {
50 return ThreadedQueue<int, int>::PushResult{std::nullopt, false,
51 counter == 0};
52 } else {
53 int count = --counter;
54 return ThreadedQueue<int, int>::PushResult{count, true, count == 0};
55 }
56 },
57 state);
58 while (true) {
59 std::optional<int> peek_result = queue.Peek();
60 std::optional<int> pop_result = queue.Pop();
61 ASSERT_EQ(peek_result.has_value(), pop_result.has_value());
62 if (peek_result.has_value()) {
63 ASSERT_EQ(peek_result.value(), pop_result.value());
64 } else {
65 break;
66 }
67 for (int ii = 0; ii < 2 * kMaxLookahead; ++ii) {
68 // Trigger the internal condition variable a bunch of times to cause
69 // trouble.
70 queue.Peek();
71 }
72 // The pusher should never be more than the permissible distance ahead.
73 ASSERT_GE(counter + kMaxLookahead + 1, state);
74 ASSERT_GE(observed_state, state);
75 state--;
76 queue.SetState(state);
77 // Periodically pause, ensure that the pusher has enough time to catch up,
78 // and check that it has indeed pre-queued kMaxLookahead items.
79 if (state % 1000 == 0 && state > 0) {
80 queue.WaitForNoMoreWork();
81 ASSERT_EQ(observed_state, state);
82 ASSERT_EQ(counter + kMaxLookahead + 1, state);
83 }
84 }
85 ASSERT_EQ(counter, 0);
86 ASSERT_EQ(state, 0);
87}
88
89
90// Tests that we can exit early without any issues.
91TEST(ThreadedQueueTest, ExitEarly) {
92 // There used to exist a deadlock in this case where StopPushing would
93 // improperly synchronize things internally, but required very (un)lucky
94 // timing to hit.
95 for (int ii = 0; ii < 10000; ++ii) {
96 ThreadedQueue<int, int> queue(
97 [](int) {
98 return ThreadedQueue<int, int>::PushResult{971, false, false};
99 },
100 0);
101 queue.StopPushing();
102 }
103}
104
105} // namespace aos::util