James Kuszmaul | e5deaa4 | 2022-06-24 16:29:31 -0700 | [diff] [blame^] | 1 | #include "aos/util/threaded_queue.h" |
| 2 | |
| 3 | #include "gtest/gtest.h" |
| 4 | |
| 5 | namespace aos::util { |
| 6 | |
| 7 | TEST(ThreadedQueueTest, BasicFunction) { |
| 8 | std::atomic<int> counter{10000}; |
| 9 | int state = 0; |
| 10 | std::atomic<int> observed_state{0}; |
| 11 | ThreadedQueue<int, int> queue( |
| 12 | [&counter, &observed_state](const int state) { |
| 13 | // Because this handler always returns more_to_push = false, it will |
| 14 | // only get called when the queue is popped from. |
| 15 | observed_state = state; |
| 16 | int count = --counter; |
| 17 | return ThreadedQueue<int, int>::PushResult{count, false, count == 0}; |
| 18 | }, |
| 19 | state); |
| 20 | while (true) { |
| 21 | std::optional<int> peek_result = queue.Peek(); |
| 22 | std::optional<int> pop_result = queue.Pop(); |
| 23 | ASSERT_EQ(peek_result.has_value(), pop_result.has_value()); |
| 24 | if (peek_result.has_value()) { |
| 25 | ASSERT_EQ(peek_result.value(), pop_result.value()); |
| 26 | } else { |
| 27 | break; |
| 28 | } |
| 29 | state++; |
| 30 | queue.SetState(state); |
| 31 | } |
| 32 | ASSERT_EQ(counter, 0); |
| 33 | // Our API doesn't make any guarantee about the push/pop cycle being kept in |
| 34 | // lock-step, so just check that the observed state got incremente at all. |
| 35 | ASSERT_LT(1, observed_state); |
| 36 | ASSERT_EQ(state, 10000); |
| 37 | } |
| 38 | |
| 39 | // Test running a queue where the consumer wants to have X entries pre-loaded |
| 40 | // and so communicates its current state back to the pusher. |
| 41 | TEST(ThreadedQueueTest, StatefulQueue) { |
| 42 | std::atomic<int> counter{10000}; |
| 43 | int state = counter; |
| 44 | constexpr int kMaxLookahead = 10; |
| 45 | std::atomic<int> observed_state{0}; |
| 46 | ThreadedQueue<int, int> queue( |
| 47 | [&counter, &observed_state](const int state) { |
| 48 | observed_state = state; |
| 49 | if (counter + kMaxLookahead < state) { |
| 50 | return ThreadedQueue<int, int>::PushResult{std::nullopt, false, |
| 51 | counter == 0}; |
| 52 | } else { |
| 53 | int count = --counter; |
| 54 | return ThreadedQueue<int, int>::PushResult{count, true, count == 0}; |
| 55 | } |
| 56 | }, |
| 57 | state); |
| 58 | while (true) { |
| 59 | std::optional<int> peek_result = queue.Peek(); |
| 60 | std::optional<int> pop_result = queue.Pop(); |
| 61 | ASSERT_EQ(peek_result.has_value(), pop_result.has_value()); |
| 62 | if (peek_result.has_value()) { |
| 63 | ASSERT_EQ(peek_result.value(), pop_result.value()); |
| 64 | } else { |
| 65 | break; |
| 66 | } |
| 67 | for (int ii = 0; ii < 2 * kMaxLookahead; ++ii) { |
| 68 | // Trigger the internal condition variable a bunch of times to cause |
| 69 | // trouble. |
| 70 | queue.Peek(); |
| 71 | } |
| 72 | // The pusher should never be more than the permissible distance ahead. |
| 73 | ASSERT_GE(counter + kMaxLookahead + 1, state); |
| 74 | ASSERT_GE(observed_state, state); |
| 75 | state--; |
| 76 | queue.SetState(state); |
| 77 | // Periodically pause, ensure that the pusher has enough time to catch up, |
| 78 | // and check that it has indeed pre-queued kMaxLookahead items. |
| 79 | if (state % 1000 == 0 && state > 0) { |
| 80 | queue.WaitForNoMoreWork(); |
| 81 | ASSERT_EQ(observed_state, state); |
| 82 | ASSERT_EQ(counter + kMaxLookahead + 1, state); |
| 83 | } |
| 84 | } |
| 85 | ASSERT_EQ(counter, 0); |
| 86 | ASSERT_EQ(state, 0); |
| 87 | } |
| 88 | |
| 89 | |
| 90 | // Tests that we can exit early without any issues. |
| 91 | TEST(ThreadedQueueTest, ExitEarly) { |
| 92 | // There used to exist a deadlock in this case where StopPushing would |
| 93 | // improperly synchronize things internally, but required very (un)lucky |
| 94 | // timing to hit. |
| 95 | for (int ii = 0; ii < 10000; ++ii) { |
| 96 | ThreadedQueue<int, int> queue( |
| 97 | [](int) { |
| 98 | return ThreadedQueue<int, int>::PushResult{971, false, false}; |
| 99 | }, |
| 100 | 0); |
| 101 | queue.StopPushing(); |
| 102 | } |
| 103 | } |
| 104 | |
| 105 | } // namespace aos::util |