blob: 4e84b9f21e935275a546ff71a0b9e18b576a582a [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#include "aos/ipc_lib/lockless_queue.h"
2
3#include <inttypes.h>
4#include <signal.h>
5#include <unistd.h>
6#include <wait.h>
7#include <chrono>
8#include <memory>
9#include <random>
10#include <thread>
11
12#include "aos/event.h"
13#include "aos/events/epoll.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070014#include "aos/ipc_lib/aos_sync.h"
15#include "aos/ipc_lib/queue_racer.h"
16#include "aos/ipc_lib/signalfd.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070017#include "aos/realtime.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070018#include "gflags/gflags.h"
19#include "gtest/gtest.h"
20
21DEFINE_int32(min_iterations, 100,
22 "Minimum number of stress test iterations to run");
23DEFINE_int32(duration, 5, "Number of seconds to test for");
24DEFINE_int32(print_rate, 60, "Number of seconds between status prints");
25
26// The roboRIO can only handle 10 threads before exploding. Set the default for
27// ARM to 10.
28DEFINE_int32(thread_count,
29#if defined(__ARM_EABI__)
30 10,
31#else
32 100,
33#endif
34 "Number of threads to race");
35
36namespace aos {
37namespace ipc_lib {
38namespace testing {
39
40namespace chrono = ::std::chrono;
41
42class LocklessQueueTest : public ::testing::Test {
43 public:
44 LocklessQueueTest() {
Austin Schuh20b2b082019-09-11 20:42:56 -070045 config_.num_watchers = 10;
46 config_.num_senders = 100;
Brian Silverman177567e2020-08-12 19:51:33 -070047 config_.num_pinners = 5;
Austin Schuh20b2b082019-09-11 20:42:56 -070048 config_.queue_size = 10000;
49 // Exercise the alignment code. This would throw off alignment.
50 config_.message_data_size = 101;
51
52 // Since our backing store is an array of uint64_t for alignment purposes,
53 // normalize by the size.
54 memory_.resize(LocklessQueueMemorySize(config_) / sizeof(uint64_t));
55
56 Reset();
57 }
58
59 LocklessQueueMemory *get_memory() {
60 return reinterpret_cast<LocklessQueueMemory *>(&(memory_[0]));
61 }
62
63 void Reset() { memset(get_memory(), 0, LocklessQueueMemorySize(config_)); }
64
65 // Runs until the signal is received.
66 void RunUntilWakeup(Event *ready, int priority) {
67 LocklessQueue queue(get_memory(), config_);
68 internal::EPoll epoll;
69 SignalFd signalfd({kWakeupSignal});
70
71 epoll.OnReadable(signalfd.fd(), [&signalfd, &epoll]() {
72 signalfd_siginfo result = signalfd.Read();
73
74 fprintf(stderr, "Got signal: %d\n", result.ssi_signo);
75 epoll.Quit();
76 });
77
78 // Register to be woken up *after* the signalfd is catching the signals.
79 queue.RegisterWakeup(priority);
80
81 // And signal we are now ready.
82 ready->Set();
83
84 epoll.Run();
85
86 // Cleanup.
87 queue.UnregisterWakeup();
88 epoll.DeleteFd(signalfd.fd());
89 }
90
91 // Use a type with enough alignment that we are guarenteed that everything
92 // will be aligned properly on the target platform.
93 ::std::vector<uint64_t> memory_;
94
95 LocklessQueueConfiguration config_;
96};
97
98typedef LocklessQueueTest LocklessQueueDeathTest;
99
100// Tests that wakeup doesn't do anything if nothing was registered.
101TEST_F(LocklessQueueTest, NoWatcherWakeup) {
102 LocklessQueue queue(get_memory(), config_);
103
104 EXPECT_EQ(queue.Wakeup(7), 0);
105}
106
107// Tests that wakeup doesn't do anything if a wakeup was registered and then
108// unregistered.
109TEST_F(LocklessQueueTest, UnregisteredWatcherWakeup) {
110 LocklessQueue queue(get_memory(), config_);
111
112 queue.RegisterWakeup(5);
113 queue.UnregisterWakeup();
114
115 EXPECT_EQ(queue.Wakeup(7), 0);
116}
117
118// Tests that wakeup doesn't do anything if the thread dies.
119TEST_F(LocklessQueueTest, DiedWatcherWakeup) {
120 LocklessQueue queue(get_memory(), config_);
121
122 ::std::thread([this]() {
123 // Use placement new so the destructor doesn't get run.
124 ::std::aligned_storage<sizeof(LocklessQueue), alignof(LocklessQueue)>::type
125 data;
126 LocklessQueue *q = new (&data) LocklessQueue(get_memory(), config_);
127 // Register a wakeup.
128 q->RegisterWakeup(5);
129 }).join();
130
131 EXPECT_EQ(queue.Wakeup(7), 0);
132}
133
134struct WatcherState {
135 ::std::thread t;
136 Event ready;
137};
138
139// Tests that too many watchers fails like expected.
140TEST_F(LocklessQueueTest, TooManyWatchers) {
141 // This is going to be a barrel of monkeys.
142 // We need to spin up a bunch of watchers. But, they all need to be in
143 // different threads so they have different tids.
144 ::std::vector<WatcherState> queues;
145 // Reserve num_watchers WatcherState objects so the pointer value doesn't
146 // change out from under us below.
147 queues.reserve(config_.num_watchers);
148
149 // Event used to trigger all the threads to unregister.
150 Event cleanup;
151
152 // Start all the threads.
153 for (size_t i = 0; i < config_.num_watchers; ++i) {
154 queues.emplace_back();
155
156 WatcherState *s = &queues.back();
157 queues.back().t = ::std::thread([this, &cleanup, s]() {
158 LocklessQueue q(get_memory(), config_);
159 EXPECT_TRUE(q.RegisterWakeup(0));
160
161 // Signal that this thread is ready.
162 s->ready.Set();
163
164 // And wait until we are asked to shut down.
165 cleanup.Wait();
166
167 q.UnregisterWakeup();
168 });
169 }
170
171 // Wait until all the threads are actually going.
172 for (WatcherState &w : queues) {
173 w.ready.Wait();
174 }
175
176 // Now try to allocate another one. This will fail.
177 {
178 LocklessQueue queue(get_memory(), config_);
179 EXPECT_FALSE(queue.RegisterWakeup(0));
180 }
181
182 // Trigger the threads to cleanup their resources, and wait unti they are
183 // done.
184 cleanup.Set();
185 for (WatcherState &w : queues) {
186 w.t.join();
187 }
188
189 // We should now be able to allocate a wakeup.
190 {
191 LocklessQueue queue(get_memory(), config_);
192 EXPECT_TRUE(queue.RegisterWakeup(0));
193 queue.UnregisterWakeup();
194 }
195}
196
197// Tests that too many watchers dies like expected.
Austin Schuhe516ab02020-05-06 21:37:04 -0700198TEST_F(LocklessQueueTest, TooManySenders) {
199 ::std::vector<::std::unique_ptr<LocklessQueue>> queues;
200 ::std::vector<LocklessQueue::Sender> senders;
201 for (size_t i = 0; i < config_.num_senders; ++i) {
202 queues.emplace_back(new LocklessQueue(get_memory(), config_));
203 senders.emplace_back(queues.back()->MakeSender().value());
204 }
205 queues.emplace_back(new LocklessQueue(get_memory(), config_));
206 EXPECT_FALSE(queues.back()->MakeSender());
Austin Schuh20b2b082019-09-11 20:42:56 -0700207}
208
209// Now, start 2 threads and have them receive the signals.
210TEST_F(LocklessQueueTest, WakeUpThreads) {
211 // Confirm that the wakeup signal is in range.
212 EXPECT_LE(kWakeupSignal, SIGRTMAX);
213 EXPECT_GE(kWakeupSignal, SIGRTMIN);
214
215 LocklessQueue queue(get_memory(), config_);
216
217 // Event used to make sure the thread is ready before the test starts.
218 Event ready1;
219 Event ready2;
220
221 // Start the thread.
222 ::std::thread t1([this, &ready1]() { RunUntilWakeup(&ready1, 5); });
223 ::std::thread t2([this, &ready2]() { RunUntilWakeup(&ready2, 4); });
224
225 ready1.Wait();
226 ready2.Wait();
227
228 EXPECT_EQ(queue.Wakeup(3), 2);
229
230 t1.join();
231 t2.join();
232
233 // Clean up afterwords. We are pretending to be RT when we are really not.
234 // So we will be PI boosted up.
235 UnsetCurrentThreadRealtimePriority();
236}
237
238// Do a simple send test.
239TEST_F(LocklessQueueTest, Send) {
240 LocklessQueue queue(get_memory(), config_);
241
Austin Schuhe516ab02020-05-06 21:37:04 -0700242 LocklessQueue::Sender sender = queue.MakeSender().value();
Austin Schuh20b2b082019-09-11 20:42:56 -0700243
244 // Send enough messages to wrap.
245 for (int i = 0; i < 20000; ++i) {
246 // Confirm that the queue index makes sense given the number of sends.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700247 EXPECT_EQ(queue.LatestQueueIndex().index(),
248 i == 0 ? LocklessQueue::empty_queue_index().index() : i - 1);
Austin Schuh20b2b082019-09-11 20:42:56 -0700249
250 // Send a trivial piece of data.
251 char data[100];
252 size_t s = snprintf(data, sizeof(data), "foobar%d", i);
253 sender.Send(data, s);
254
255 // Confirm that the queue index still makes sense. This is easier since the
256 // empty case has been handled.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700257 EXPECT_EQ(queue.LatestQueueIndex().index(), i);
Austin Schuh20b2b082019-09-11 20:42:56 -0700258
259 // Read a result from 5 in the past.
260 ::aos::monotonic_clock::time_point monotonic_sent_time;
261 ::aos::realtime_clock::time_point realtime_sent_time;
Austin Schuhad154822019-12-27 15:45:13 -0800262 ::aos::monotonic_clock::time_point monotonic_remote_time;
263 ::aos::realtime_clock::time_point realtime_remote_time;
264 uint32_t remote_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -0700265 char read_data[1024];
266 size_t length;
267
268 QueueIndex index = QueueIndex::Zero(config_.queue_size);
269 if (i - 5 < 0) {
270 index = index.DecrementBy(5 - i);
271 } else {
272 index = index.IncrementBy(i - 5);
273 }
274 LocklessQueue::ReadResult read_result =
275 queue.Read(index.index(), &monotonic_sent_time, &realtime_sent_time,
Austin Schuhad154822019-12-27 15:45:13 -0800276 &monotonic_remote_time, &realtime_remote_time,
277 &remote_queue_index, &length, &(read_data[0]));
Austin Schuh20b2b082019-09-11 20:42:56 -0700278
279 // This should either return GOOD, or TOO_OLD if it is before the start of
280 // the queue.
281 if (read_result != LocklessQueue::ReadResult::GOOD) {
282 EXPECT_EQ(read_result, LocklessQueue::ReadResult::TOO_OLD);
283 }
284 }
285}
286
287// Races a bunch of sending threads to see if it all works.
288TEST_F(LocklessQueueTest, SendRace) {
289 const size_t kNumMessages = 10000 / FLAGS_thread_count;
290
291 ::std::mt19937 generator(0);
292 ::std::uniform_int_distribution<> write_wrap_count_distribution(0, 10);
293 ::std::bernoulli_distribution race_reads_distribution;
294 ::std::bernoulli_distribution wrap_writes_distribution;
295
296 const chrono::seconds print_frequency(FLAGS_print_rate);
297
298 QueueRacer racer(get_memory(), FLAGS_thread_count, kNumMessages, config_);
299 const monotonic_clock::time_point start_time =
300 monotonic_clock::now();
301 const monotonic_clock::time_point end_time =
302 start_time + chrono::seconds(FLAGS_duration);
303
304 monotonic_clock::time_point monotonic_now = start_time;
305 monotonic_clock::time_point next_print_time = start_time + print_frequency;
306 uint64_t messages = 0;
307 for (int i = 0; i < FLAGS_min_iterations || monotonic_now < end_time; ++i) {
308 bool race_reads = race_reads_distribution(generator);
309 int write_wrap_count = write_wrap_count_distribution(generator);
310 if (!wrap_writes_distribution(generator)) {
311 write_wrap_count = 0;
312 }
313 EXPECT_NO_FATAL_FAILURE(racer.RunIteration(race_reads, write_wrap_count))
314 << ": Running with race_reads: " << race_reads
315 << ", and write_wrap_count " << write_wrap_count << " and on iteration "
316 << i;
317
318 messages += racer.CurrentIndex();
319
320 monotonic_now = monotonic_clock::now();
321 if (monotonic_now > next_print_time) {
322 double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
323 monotonic_now - start_time)
324 .count();
325 printf("Finished iteration %d, %f iterations/sec, %f messages/second\n",
326 i, i / elapsed_seconds,
327 static_cast<double>(messages) / elapsed_seconds);
328 next_print_time = monotonic_now + print_frequency;
329 }
330 }
331}
332
333// Send enough messages to wrap the 32 bit send counter.
334TEST_F(LocklessQueueTest, WrappedSend) {
335 uint64_t kNumMessages = 0x100010000ul;
336 QueueRacer racer(get_memory(), 1, kNumMessages, config_);
337
338 const monotonic_clock::time_point start_time = monotonic_clock::now();
339 EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
340 const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
341 double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
342 monotonic_now - start_time)
343 .count();
344 printf("Took %f seconds to write %" PRIu64 " messages, %f messages/s\n",
345 elapsed_seconds, kNumMessages,
346 static_cast<double>(kNumMessages) / elapsed_seconds);
347}
348
349} // namespace testing
350} // namespace ipc_lib
351} // namespace aos