blob: a90b9768a083f219baab038f0df8582edf4409d2 [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#include "aos/ipc_lib/lockless_queue.h"
2
3#include <inttypes.h>
4#include <signal.h>
5#include <unistd.h>
6#include <wait.h>
Brian Silverman7b266d92021-02-17 21:24:02 -08007
Austin Schuh20b2b082019-09-11 20:42:56 -07008#include <chrono>
9#include <memory>
10#include <random>
11#include <thread>
12
Austin Schuh20b2b082019-09-11 20:42:56 -070013#include "aos/events/epoll.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070014#include "aos/ipc_lib/aos_sync.h"
Brian Silverman7b266d92021-02-17 21:24:02 -080015#include "aos/ipc_lib/event.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070016#include "aos/ipc_lib/queue_racer.h"
17#include "aos/ipc_lib/signalfd.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070018#include "aos/realtime.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070019#include "gflags/gflags.h"
20#include "gtest/gtest.h"
21
22DEFINE_int32(min_iterations, 100,
23 "Minimum number of stress test iterations to run");
24DEFINE_int32(duration, 5, "Number of seconds to test for");
25DEFINE_int32(print_rate, 60, "Number of seconds between status prints");
26
27// The roboRIO can only handle 10 threads before exploding. Set the default for
28// ARM to 10.
29DEFINE_int32(thread_count,
30#if defined(__ARM_EABI__)
31 10,
32#else
33 100,
34#endif
35 "Number of threads to race");
36
37namespace aos {
38namespace ipc_lib {
39namespace testing {
40
41namespace chrono = ::std::chrono;
42
43class LocklessQueueTest : public ::testing::Test {
44 public:
45 LocklessQueueTest() {
Austin Schuh20b2b082019-09-11 20:42:56 -070046 config_.num_watchers = 10;
47 config_.num_senders = 100;
Brian Silverman177567e2020-08-12 19:51:33 -070048 config_.num_pinners = 5;
Austin Schuh20b2b082019-09-11 20:42:56 -070049 config_.queue_size = 10000;
50 // Exercise the alignment code. This would throw off alignment.
51 config_.message_data_size = 101;
52
53 // Since our backing store is an array of uint64_t for alignment purposes,
54 // normalize by the size.
55 memory_.resize(LocklessQueueMemorySize(config_) / sizeof(uint64_t));
56
57 Reset();
58 }
59
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070060 LocklessQueue queue() {
61 return LocklessQueue(reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
62 reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
63 config_);
Austin Schuh20b2b082019-09-11 20:42:56 -070064 }
65
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070066 void Reset() { memset(&memory_[0], 0, LocklessQueueMemorySize(config_)); }
Austin Schuh20b2b082019-09-11 20:42:56 -070067
68 // Runs until the signal is received.
69 void RunUntilWakeup(Event *ready, int priority) {
Austin Schuh20b2b082019-09-11 20:42:56 -070070 internal::EPoll epoll;
71 SignalFd signalfd({kWakeupSignal});
72
73 epoll.OnReadable(signalfd.fd(), [&signalfd, &epoll]() {
74 signalfd_siginfo result = signalfd.Read();
75
76 fprintf(stderr, "Got signal: %d\n", result.ssi_signo);
77 epoll.Quit();
78 });
79
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070080 {
81 // Register to be woken up *after* the signalfd is catching the signals.
82 LocklessQueueWatcher watcher =
83 LocklessQueueWatcher::Make(queue(), priority).value();
Austin Schuh20b2b082019-09-11 20:42:56 -070084
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070085 // And signal we are now ready.
86 ready->Set();
Austin Schuh20b2b082019-09-11 20:42:56 -070087
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070088 epoll.Run();
Austin Schuh20b2b082019-09-11 20:42:56 -070089
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070090 // Cleanup, ensuring the watcher is destroyed before the signalfd.
91 }
Austin Schuh20b2b082019-09-11 20:42:56 -070092 epoll.DeleteFd(signalfd.fd());
93 }
94
95 // Use a type with enough alignment that we are guarenteed that everything
96 // will be aligned properly on the target platform.
97 ::std::vector<uint64_t> memory_;
98
99 LocklessQueueConfiguration config_;
100};
101
102typedef LocklessQueueTest LocklessQueueDeathTest;
103
104// Tests that wakeup doesn't do anything if nothing was registered.
105TEST_F(LocklessQueueTest, NoWatcherWakeup) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700106 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700107
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700108 EXPECT_EQ(wake_upper.Wakeup(7), 0);
Austin Schuh20b2b082019-09-11 20:42:56 -0700109}
110
111// Tests that wakeup doesn't do anything if a wakeup was registered and then
112// unregistered.
113TEST_F(LocklessQueueTest, UnregisteredWatcherWakeup) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700114 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700115
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700116 { LocklessQueueWatcher::Make(queue(), 5).value(); }
Austin Schuh20b2b082019-09-11 20:42:56 -0700117
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700118 EXPECT_EQ(wake_upper.Wakeup(7), 0);
Austin Schuh20b2b082019-09-11 20:42:56 -0700119}
120
121// Tests that wakeup doesn't do anything if the thread dies.
122TEST_F(LocklessQueueTest, DiedWatcherWakeup) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700123 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700124
125 ::std::thread([this]() {
126 // Use placement new so the destructor doesn't get run.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700127 ::std::aligned_storage<sizeof(LocklessQueueWatcher),
128 alignof(LocklessQueueWatcher)>::type data;
129 new (&data)
130 LocklessQueueWatcher(LocklessQueueWatcher::Make(queue(), 5).value());
Brian Silverman7b266d92021-02-17 21:24:02 -0800131 }).join();
Austin Schuh20b2b082019-09-11 20:42:56 -0700132
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700133 EXPECT_EQ(wake_upper.Wakeup(7), 0);
Austin Schuh20b2b082019-09-11 20:42:56 -0700134}
135
136struct WatcherState {
137 ::std::thread t;
138 Event ready;
139};
140
141// Tests that too many watchers fails like expected.
142TEST_F(LocklessQueueTest, TooManyWatchers) {
143 // This is going to be a barrel of monkeys.
144 // We need to spin up a bunch of watchers. But, they all need to be in
145 // different threads so they have different tids.
146 ::std::vector<WatcherState> queues;
147 // Reserve num_watchers WatcherState objects so the pointer value doesn't
148 // change out from under us below.
149 queues.reserve(config_.num_watchers);
150
151 // Event used to trigger all the threads to unregister.
152 Event cleanup;
153
154 // Start all the threads.
155 for (size_t i = 0; i < config_.num_watchers; ++i) {
156 queues.emplace_back();
157
158 WatcherState *s = &queues.back();
159 queues.back().t = ::std::thread([this, &cleanup, s]() {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700160 LocklessQueueWatcher q = LocklessQueueWatcher::Make(queue(), 0).value();
Austin Schuh20b2b082019-09-11 20:42:56 -0700161
162 // Signal that this thread is ready.
163 s->ready.Set();
164
165 // And wait until we are asked to shut down.
166 cleanup.Wait();
Austin Schuh20b2b082019-09-11 20:42:56 -0700167 });
168 }
169
170 // Wait until all the threads are actually going.
171 for (WatcherState &w : queues) {
172 w.ready.Wait();
173 }
174
175 // Now try to allocate another one. This will fail.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700176 EXPECT_FALSE(LocklessQueueWatcher::Make(queue(), 0));
Austin Schuh20b2b082019-09-11 20:42:56 -0700177
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700178 // Trigger the threads to cleanup their resources, and wait until they are
Austin Schuh20b2b082019-09-11 20:42:56 -0700179 // done.
180 cleanup.Set();
181 for (WatcherState &w : queues) {
182 w.t.join();
183 }
184
185 // We should now be able to allocate a wakeup.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700186 EXPECT_TRUE(LocklessQueueWatcher::Make(queue(), 0));
Austin Schuh20b2b082019-09-11 20:42:56 -0700187}
188
189// Tests that too many watchers dies like expected.
Austin Schuhe516ab02020-05-06 21:37:04 -0700190TEST_F(LocklessQueueTest, TooManySenders) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700191 ::std::vector<LocklessQueueSender> senders;
Austin Schuhe516ab02020-05-06 21:37:04 -0700192 for (size_t i = 0; i < config_.num_senders; ++i) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700193 senders.emplace_back(LocklessQueueSender::Make(queue()).value());
Austin Schuhe516ab02020-05-06 21:37:04 -0700194 }
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700195 EXPECT_FALSE(LocklessQueueSender::Make(queue()));
Austin Schuh20b2b082019-09-11 20:42:56 -0700196}
197
198// Now, start 2 threads and have them receive the signals.
199TEST_F(LocklessQueueTest, WakeUpThreads) {
200 // Confirm that the wakeup signal is in range.
201 EXPECT_LE(kWakeupSignal, SIGRTMAX);
202 EXPECT_GE(kWakeupSignal, SIGRTMIN);
203
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700204 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700205
206 // Event used to make sure the thread is ready before the test starts.
207 Event ready1;
208 Event ready2;
209
210 // Start the thread.
211 ::std::thread t1([this, &ready1]() { RunUntilWakeup(&ready1, 5); });
212 ::std::thread t2([this, &ready2]() { RunUntilWakeup(&ready2, 4); });
213
214 ready1.Wait();
215 ready2.Wait();
216
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700217 EXPECT_EQ(wake_upper.Wakeup(3), 2);
Austin Schuh20b2b082019-09-11 20:42:56 -0700218
219 t1.join();
220 t2.join();
221
222 // Clean up afterwords. We are pretending to be RT when we are really not.
223 // So we will be PI boosted up.
224 UnsetCurrentThreadRealtimePriority();
225}
226
227// Do a simple send test.
228TEST_F(LocklessQueueTest, Send) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700229 LocklessQueueSender sender = LocklessQueueSender::Make(queue()).value();
230 LocklessQueueReader reader(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700231
232 // Send enough messages to wrap.
233 for (int i = 0; i < 20000; ++i) {
234 // Confirm that the queue index makes sense given the number of sends.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700235 EXPECT_EQ(reader.LatestIndex().index(),
236 i == 0 ? QueueIndex::Invalid().index() : i - 1);
Austin Schuh20b2b082019-09-11 20:42:56 -0700237
238 // Send a trivial piece of data.
239 char data[100];
240 size_t s = snprintf(data, sizeof(data), "foobar%d", i);
241 sender.Send(data, s);
242
243 // Confirm that the queue index still makes sense. This is easier since the
244 // empty case has been handled.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700245 EXPECT_EQ(reader.LatestIndex().index(), i);
Austin Schuh20b2b082019-09-11 20:42:56 -0700246
247 // Read a result from 5 in the past.
248 ::aos::monotonic_clock::time_point monotonic_sent_time;
249 ::aos::realtime_clock::time_point realtime_sent_time;
Austin Schuhad154822019-12-27 15:45:13 -0800250 ::aos::monotonic_clock::time_point monotonic_remote_time;
251 ::aos::realtime_clock::time_point realtime_remote_time;
252 uint32_t remote_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -0700253 char read_data[1024];
254 size_t length;
255
256 QueueIndex index = QueueIndex::Zero(config_.queue_size);
257 if (i - 5 < 0) {
258 index = index.DecrementBy(5 - i);
259 } else {
260 index = index.IncrementBy(i - 5);
261 }
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700262 LocklessQueueReader::Result read_result =
263 reader.Read(index.index(), &monotonic_sent_time, &realtime_sent_time,
264 &monotonic_remote_time, &realtime_remote_time,
265 &remote_queue_index, &length, &(read_data[0]));
Austin Schuh20b2b082019-09-11 20:42:56 -0700266
267 // This should either return GOOD, or TOO_OLD if it is before the start of
268 // the queue.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700269 if (read_result != LocklessQueueReader::Result::GOOD) {
270 EXPECT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
Austin Schuh20b2b082019-09-11 20:42:56 -0700271 }
272 }
273}
274
275// Races a bunch of sending threads to see if it all works.
276TEST_F(LocklessQueueTest, SendRace) {
277 const size_t kNumMessages = 10000 / FLAGS_thread_count;
278
279 ::std::mt19937 generator(0);
280 ::std::uniform_int_distribution<> write_wrap_count_distribution(0, 10);
281 ::std::bernoulli_distribution race_reads_distribution;
282 ::std::bernoulli_distribution wrap_writes_distribution;
283
284 const chrono::seconds print_frequency(FLAGS_print_rate);
285
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700286 QueueRacer racer(queue(), FLAGS_thread_count, kNumMessages);
287 const monotonic_clock::time_point start_time = monotonic_clock::now();
Austin Schuh20b2b082019-09-11 20:42:56 -0700288 const monotonic_clock::time_point end_time =
289 start_time + chrono::seconds(FLAGS_duration);
290
291 monotonic_clock::time_point monotonic_now = start_time;
292 monotonic_clock::time_point next_print_time = start_time + print_frequency;
293 uint64_t messages = 0;
294 for (int i = 0; i < FLAGS_min_iterations || monotonic_now < end_time; ++i) {
295 bool race_reads = race_reads_distribution(generator);
296 int write_wrap_count = write_wrap_count_distribution(generator);
297 if (!wrap_writes_distribution(generator)) {
298 write_wrap_count = 0;
299 }
300 EXPECT_NO_FATAL_FAILURE(racer.RunIteration(race_reads, write_wrap_count))
301 << ": Running with race_reads: " << race_reads
302 << ", and write_wrap_count " << write_wrap_count << " and on iteration "
303 << i;
304
305 messages += racer.CurrentIndex();
306
307 monotonic_now = monotonic_clock::now();
308 if (monotonic_now > next_print_time) {
309 double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
310 monotonic_now - start_time)
311 .count();
312 printf("Finished iteration %d, %f iterations/sec, %f messages/second\n",
313 i, i / elapsed_seconds,
314 static_cast<double>(messages) / elapsed_seconds);
315 next_print_time = monotonic_now + print_frequency;
316 }
317 }
318}
319
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700320namespace {
321
322// Temporarily pins the current thread to CPUs 0 and 1.
323// This speeds up the test on some machines a lot (~4x). It also preserves
324// opportunities for the 2 threads to race each other.
325class PinForTest {
326 public:
327 PinForTest() {
328 PCHECK(sched_getaffinity(0, sizeof(old_), &old_) == 0);
329 cpu_set_t new_set;
330 CPU_ZERO(&new_set);
331 CPU_SET(0, &new_set);
332 CPU_SET(1, &new_set);
333 PCHECK(sched_setaffinity(0, sizeof(new_set), &new_set) == 0);
334 }
335 ~PinForTest() { PCHECK(sched_setaffinity(0, sizeof(old_), &old_) == 0); }
336
337 private:
338 cpu_set_t old_;
339};
340
341} // namespace
342
Austin Schuh20b2b082019-09-11 20:42:56 -0700343// Send enough messages to wrap the 32 bit send counter.
344TEST_F(LocklessQueueTest, WrappedSend) {
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700345 PinForTest pin_cpu;
Austin Schuh20b2b082019-09-11 20:42:56 -0700346 uint64_t kNumMessages = 0x100010000ul;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700347 QueueRacer racer(queue(), 1, kNumMessages);
Austin Schuh20b2b082019-09-11 20:42:56 -0700348
349 const monotonic_clock::time_point start_time = monotonic_clock::now();
350 EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
351 const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
352 double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
353 monotonic_now - start_time)
354 .count();
355 printf("Took %f seconds to write %" PRIu64 " messages, %f messages/s\n",
356 elapsed_seconds, kNumMessages,
357 static_cast<double>(kNumMessages) / elapsed_seconds);
358}
359
360} // namespace testing
361} // namespace ipc_lib
362} // namespace aos