blob: f3b49b628d109a54c77f151c90bca10ad1c3b7a1 [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#include "aos/ipc_lib/lockless_queue.h"
2
3#include <inttypes.h>
4#include <signal.h>
5#include <unistd.h>
6#include <wait.h>
7#include <chrono>
8#include <memory>
9#include <random>
10#include <thread>
11
12#include "aos/event.h"
13#include "aos/events/epoll.h"
14#include "aos/init.h"
15#include "aos/ipc_lib/aos_sync.h"
16#include "aos/ipc_lib/queue_racer.h"
17#include "aos/ipc_lib/signalfd.h"
18#include "aos/testing/test_logging.h"
19#include "gflags/gflags.h"
20#include "gtest/gtest.h"
21
22DEFINE_int32(min_iterations, 100,
23 "Minimum number of stress test iterations to run");
24DEFINE_int32(duration, 5, "Number of seconds to test for");
25DEFINE_int32(print_rate, 60, "Number of seconds between status prints");
26
27// The roboRIO can only handle 10 threads before exploding. Set the default for
28// ARM to 10.
29DEFINE_int32(thread_count,
30#if defined(__ARM_EABI__)
31 10,
32#else
33 100,
34#endif
35 "Number of threads to race");
36
37namespace aos {
38namespace ipc_lib {
39namespace testing {
40
41namespace chrono = ::std::chrono;
42
43class LocklessQueueTest : public ::testing::Test {
44 public:
45 LocklessQueueTest() {
46 ::aos::testing::EnableTestLogging();
47 config_.num_watchers = 10;
48 config_.num_senders = 100;
49 config_.queue_size = 10000;
50 // Exercise the alignment code. This would throw off alignment.
51 config_.message_data_size = 101;
52
53 // Since our backing store is an array of uint64_t for alignment purposes,
54 // normalize by the size.
55 memory_.resize(LocklessQueueMemorySize(config_) / sizeof(uint64_t));
56
57 Reset();
58 }
59
60 LocklessQueueMemory *get_memory() {
61 return reinterpret_cast<LocklessQueueMemory *>(&(memory_[0]));
62 }
63
64 void Reset() { memset(get_memory(), 0, LocklessQueueMemorySize(config_)); }
65
66 // Runs until the signal is received.
67 void RunUntilWakeup(Event *ready, int priority) {
68 LocklessQueue queue(get_memory(), config_);
69 internal::EPoll epoll;
70 SignalFd signalfd({kWakeupSignal});
71
72 epoll.OnReadable(signalfd.fd(), [&signalfd, &epoll]() {
73 signalfd_siginfo result = signalfd.Read();
74
75 fprintf(stderr, "Got signal: %d\n", result.ssi_signo);
76 epoll.Quit();
77 });
78
79 // Register to be woken up *after* the signalfd is catching the signals.
80 queue.RegisterWakeup(priority);
81
82 // And signal we are now ready.
83 ready->Set();
84
85 epoll.Run();
86
87 // Cleanup.
88 queue.UnregisterWakeup();
89 epoll.DeleteFd(signalfd.fd());
90 }
91
92 // Use a type with enough alignment that we are guarenteed that everything
93 // will be aligned properly on the target platform.
94 ::std::vector<uint64_t> memory_;
95
96 LocklessQueueConfiguration config_;
97};
98
99typedef LocklessQueueTest LocklessQueueDeathTest;
100
101// Tests that wakeup doesn't do anything if nothing was registered.
102TEST_F(LocklessQueueTest, NoWatcherWakeup) {
103 LocklessQueue queue(get_memory(), config_);
104
105 EXPECT_EQ(queue.Wakeup(7), 0);
106}
107
108// Tests that wakeup doesn't do anything if a wakeup was registered and then
109// unregistered.
110TEST_F(LocklessQueueTest, UnregisteredWatcherWakeup) {
111 LocklessQueue queue(get_memory(), config_);
112
113 queue.RegisterWakeup(5);
114 queue.UnregisterWakeup();
115
116 EXPECT_EQ(queue.Wakeup(7), 0);
117}
118
119// Tests that wakeup doesn't do anything if the thread dies.
120TEST_F(LocklessQueueTest, DiedWatcherWakeup) {
121 LocklessQueue queue(get_memory(), config_);
122
123 ::std::thread([this]() {
124 // Use placement new so the destructor doesn't get run.
125 ::std::aligned_storage<sizeof(LocklessQueue), alignof(LocklessQueue)>::type
126 data;
127 LocklessQueue *q = new (&data) LocklessQueue(get_memory(), config_);
128 // Register a wakeup.
129 q->RegisterWakeup(5);
130 }).join();
131
132 EXPECT_EQ(queue.Wakeup(7), 0);
133}
134
135struct WatcherState {
136 ::std::thread t;
137 Event ready;
138};
139
140// Tests that too many watchers fails like expected.
141TEST_F(LocklessQueueTest, TooManyWatchers) {
142 // This is going to be a barrel of monkeys.
143 // We need to spin up a bunch of watchers. But, they all need to be in
144 // different threads so they have different tids.
145 ::std::vector<WatcherState> queues;
146 // Reserve num_watchers WatcherState objects so the pointer value doesn't
147 // change out from under us below.
148 queues.reserve(config_.num_watchers);
149
150 // Event used to trigger all the threads to unregister.
151 Event cleanup;
152
153 // Start all the threads.
154 for (size_t i = 0; i < config_.num_watchers; ++i) {
155 queues.emplace_back();
156
157 WatcherState *s = &queues.back();
158 queues.back().t = ::std::thread([this, &cleanup, s]() {
159 LocklessQueue q(get_memory(), config_);
160 EXPECT_TRUE(q.RegisterWakeup(0));
161
162 // Signal that this thread is ready.
163 s->ready.Set();
164
165 // And wait until we are asked to shut down.
166 cleanup.Wait();
167
168 q.UnregisterWakeup();
169 });
170 }
171
172 // Wait until all the threads are actually going.
173 for (WatcherState &w : queues) {
174 w.ready.Wait();
175 }
176
177 // Now try to allocate another one. This will fail.
178 {
179 LocklessQueue queue(get_memory(), config_);
180 EXPECT_FALSE(queue.RegisterWakeup(0));
181 }
182
183 // Trigger the threads to cleanup their resources, and wait unti they are
184 // done.
185 cleanup.Set();
186 for (WatcherState &w : queues) {
187 w.t.join();
188 }
189
190 // We should now be able to allocate a wakeup.
191 {
192 LocklessQueue queue(get_memory(), config_);
193 EXPECT_TRUE(queue.RegisterWakeup(0));
194 queue.UnregisterWakeup();
195 }
196}
197
198// Tests that too many watchers dies like expected.
199TEST_F(LocklessQueueDeathTest, TooManySenders) {
200 EXPECT_DEATH(
201 {
202 ::std::vector<::std::unique_ptr<LocklessQueue>> queues;
203 ::std::vector<LocklessQueue::Sender> senders;
204 for (size_t i = 0; i < config_.num_senders + 1; ++i) {
205 queues.emplace_back(new LocklessQueue(get_memory(), config_));
206 senders.emplace_back(queues.back()->MakeSender());
207 }
208 },
209 "Too many senders");
210}
211
212// Now, start 2 threads and have them receive the signals.
213TEST_F(LocklessQueueTest, WakeUpThreads) {
214 // Confirm that the wakeup signal is in range.
215 EXPECT_LE(kWakeupSignal, SIGRTMAX);
216 EXPECT_GE(kWakeupSignal, SIGRTMIN);
217
218 LocklessQueue queue(get_memory(), config_);
219
220 // Event used to make sure the thread is ready before the test starts.
221 Event ready1;
222 Event ready2;
223
224 // Start the thread.
225 ::std::thread t1([this, &ready1]() { RunUntilWakeup(&ready1, 5); });
226 ::std::thread t2([this, &ready2]() { RunUntilWakeup(&ready2, 4); });
227
228 ready1.Wait();
229 ready2.Wait();
230
231 EXPECT_EQ(queue.Wakeup(3), 2);
232
233 t1.join();
234 t2.join();
235
236 // Clean up afterwords. We are pretending to be RT when we are really not.
237 // So we will be PI boosted up.
238 UnsetCurrentThreadRealtimePriority();
239}
240
241// Do a simple send test.
242TEST_F(LocklessQueueTest, Send) {
243 LocklessQueue queue(get_memory(), config_);
244
245 LocklessQueue::Sender sender = queue.MakeSender();
246
247 // Send enough messages to wrap.
248 for (int i = 0; i < 20000; ++i) {
249 // Confirm that the queue index makes sense given the number of sends.
250 EXPECT_EQ(queue.LatestQueueIndex(),
251 i == 0 ? LocklessQueue::empty_queue_index() : i - 1);
252
253 // Send a trivial piece of data.
254 char data[100];
255 size_t s = snprintf(data, sizeof(data), "foobar%d", i);
256 sender.Send(data, s);
257
258 // Confirm that the queue index still makes sense. This is easier since the
259 // empty case has been handled.
260 EXPECT_EQ(queue.LatestQueueIndex(), i);
261
262 // Read a result from 5 in the past.
263 ::aos::monotonic_clock::time_point monotonic_sent_time;
264 ::aos::realtime_clock::time_point realtime_sent_time;
265 char read_data[1024];
266 size_t length;
267
268 QueueIndex index = QueueIndex::Zero(config_.queue_size);
269 if (i - 5 < 0) {
270 index = index.DecrementBy(5 - i);
271 } else {
272 index = index.IncrementBy(i - 5);
273 }
274 LocklessQueue::ReadResult read_result =
275 queue.Read(index.index(), &monotonic_sent_time, &realtime_sent_time,
276 &length, &(read_data[0]));
277
278 // This should either return GOOD, or TOO_OLD if it is before the start of
279 // the queue.
280 if (read_result != LocklessQueue::ReadResult::GOOD) {
281 EXPECT_EQ(read_result, LocklessQueue::ReadResult::TOO_OLD);
282 }
283 }
284}
285
286// Races a bunch of sending threads to see if it all works.
287TEST_F(LocklessQueueTest, SendRace) {
288 const size_t kNumMessages = 10000 / FLAGS_thread_count;
289
290 ::std::mt19937 generator(0);
291 ::std::uniform_int_distribution<> write_wrap_count_distribution(0, 10);
292 ::std::bernoulli_distribution race_reads_distribution;
293 ::std::bernoulli_distribution wrap_writes_distribution;
294
295 const chrono::seconds print_frequency(FLAGS_print_rate);
296
297 QueueRacer racer(get_memory(), FLAGS_thread_count, kNumMessages, config_);
298 const monotonic_clock::time_point start_time =
299 monotonic_clock::now();
300 const monotonic_clock::time_point end_time =
301 start_time + chrono::seconds(FLAGS_duration);
302
303 monotonic_clock::time_point monotonic_now = start_time;
304 monotonic_clock::time_point next_print_time = start_time + print_frequency;
305 uint64_t messages = 0;
306 for (int i = 0; i < FLAGS_min_iterations || monotonic_now < end_time; ++i) {
307 bool race_reads = race_reads_distribution(generator);
308 int write_wrap_count = write_wrap_count_distribution(generator);
309 if (!wrap_writes_distribution(generator)) {
310 write_wrap_count = 0;
311 }
312 EXPECT_NO_FATAL_FAILURE(racer.RunIteration(race_reads, write_wrap_count))
313 << ": Running with race_reads: " << race_reads
314 << ", and write_wrap_count " << write_wrap_count << " and on iteration "
315 << i;
316
317 messages += racer.CurrentIndex();
318
319 monotonic_now = monotonic_clock::now();
320 if (monotonic_now > next_print_time) {
321 double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
322 monotonic_now - start_time)
323 .count();
324 printf("Finished iteration %d, %f iterations/sec, %f messages/second\n",
325 i, i / elapsed_seconds,
326 static_cast<double>(messages) / elapsed_seconds);
327 next_print_time = monotonic_now + print_frequency;
328 }
329 }
330}
331
332// Send enough messages to wrap the 32 bit send counter.
333TEST_F(LocklessQueueTest, WrappedSend) {
334 uint64_t kNumMessages = 0x100010000ul;
335 QueueRacer racer(get_memory(), 1, kNumMessages, config_);
336
337 const monotonic_clock::time_point start_time = monotonic_clock::now();
338 EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
339 const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
340 double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
341 monotonic_now - start_time)
342 .count();
343 printf("Took %f seconds to write %" PRIu64 " messages, %f messages/s\n",
344 elapsed_seconds, kNumMessages,
345 static_cast<double>(kNumMessages) / elapsed_seconds);
346}
347
348} // namespace testing
349} // namespace ipc_lib
350} // namespace aos