blob: b708ebcaf140bd08a8a701616dd5fb7039ed0cc8 [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#include "aos/ipc_lib/lockless_queue.h"
2
Austin Schuh20b2b082019-09-11 20:42:56 -07003#include <unistd.h>
4#include <wait.h>
Brian Silverman7b266d92021-02-17 21:24:02 -08005
Austin Schuh20b2b082019-09-11 20:42:56 -07006#include <chrono>
Tyler Chatowbf0609c2021-07-31 16:13:27 -07007#include <cinttypes>
8#include <csignal>
Austin Schuh20b2b082019-09-11 20:42:56 -07009#include <memory>
10#include <random>
11#include <thread>
12
Austin Schuh20b2b082019-09-11 20:42:56 -070013#include "aos/events/epoll.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070014#include "aos/ipc_lib/aos_sync.h"
Brian Silverman7b266d92021-02-17 21:24:02 -080015#include "aos/ipc_lib/event.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070016#include "aos/ipc_lib/queue_racer.h"
17#include "aos/ipc_lib/signalfd.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070018#include "aos/realtime.h"
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070019#include "aos/util/phased_loop.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070020#include "gflags/gflags.h"
21#include "gtest/gtest.h"
22
23DEFINE_int32(min_iterations, 100,
24 "Minimum number of stress test iterations to run");
25DEFINE_int32(duration, 5, "Number of seconds to test for");
26DEFINE_int32(print_rate, 60, "Number of seconds between status prints");
27
28// The roboRIO can only handle 10 threads before exploding. Set the default for
29// ARM to 10.
30DEFINE_int32(thread_count,
31#if defined(__ARM_EABI__)
32 10,
33#else
34 100,
35#endif
36 "Number of threads to race");
37
38namespace aos {
39namespace ipc_lib {
40namespace testing {
41
42namespace chrono = ::std::chrono;
43
44class LocklessQueueTest : public ::testing::Test {
45 public:
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070046 static constexpr monotonic_clock::duration kChannelStorageDuration =
47 std::chrono::milliseconds(500);
48
Austin Schuh20b2b082019-09-11 20:42:56 -070049 LocklessQueueTest() {
Austin Schuh20b2b082019-09-11 20:42:56 -070050 config_.num_watchers = 10;
51 config_.num_senders = 100;
Brian Silverman177567e2020-08-12 19:51:33 -070052 config_.num_pinners = 5;
Austin Schuh20b2b082019-09-11 20:42:56 -070053 config_.queue_size = 10000;
54 // Exercise the alignment code. This would throw off alignment.
55 config_.message_data_size = 101;
56
57 // Since our backing store is an array of uint64_t for alignment purposes,
58 // normalize by the size.
59 memory_.resize(LocklessQueueMemorySize(config_) / sizeof(uint64_t));
60
61 Reset();
62 }
63
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070064 LocklessQueue queue() {
65 return LocklessQueue(reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
66 reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
67 config_);
Austin Schuh20b2b082019-09-11 20:42:56 -070068 }
69
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070070 void Reset() { memset(&memory_[0], 0, LocklessQueueMemorySize(config_)); }
Austin Schuh20b2b082019-09-11 20:42:56 -070071
72 // Runs until the signal is received.
73 void RunUntilWakeup(Event *ready, int priority) {
Austin Schuh20b2b082019-09-11 20:42:56 -070074 internal::EPoll epoll;
75 SignalFd signalfd({kWakeupSignal});
76
77 epoll.OnReadable(signalfd.fd(), [&signalfd, &epoll]() {
78 signalfd_siginfo result = signalfd.Read();
79
80 fprintf(stderr, "Got signal: %d\n", result.ssi_signo);
81 epoll.Quit();
82 });
83
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070084 {
85 // Register to be woken up *after* the signalfd is catching the signals.
86 LocklessQueueWatcher watcher =
87 LocklessQueueWatcher::Make(queue(), priority).value();
Austin Schuh20b2b082019-09-11 20:42:56 -070088
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070089 // And signal we are now ready.
90 ready->Set();
Austin Schuh20b2b082019-09-11 20:42:56 -070091
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070092 epoll.Run();
Austin Schuh20b2b082019-09-11 20:42:56 -070093
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070094 // Cleanup, ensuring the watcher is destroyed before the signalfd.
95 }
Austin Schuh20b2b082019-09-11 20:42:56 -070096 epoll.DeleteFd(signalfd.fd());
97 }
98
99 // Use a type with enough alignment that we are guarenteed that everything
100 // will be aligned properly on the target platform.
101 ::std::vector<uint64_t> memory_;
102
103 LocklessQueueConfiguration config_;
104};
105
Austin Schuh20b2b082019-09-11 20:42:56 -0700106// Tests that wakeup doesn't do anything if nothing was registered.
107TEST_F(LocklessQueueTest, NoWatcherWakeup) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700108 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700109
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700110 EXPECT_EQ(wake_upper.Wakeup(7), 0);
Austin Schuh20b2b082019-09-11 20:42:56 -0700111}
112
113// Tests that wakeup doesn't do anything if a wakeup was registered and then
114// unregistered.
115TEST_F(LocklessQueueTest, UnregisteredWatcherWakeup) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700116 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700117
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700118 { LocklessQueueWatcher::Make(queue(), 5).value(); }
Austin Schuh20b2b082019-09-11 20:42:56 -0700119
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700120 EXPECT_EQ(wake_upper.Wakeup(7), 0);
Austin Schuh20b2b082019-09-11 20:42:56 -0700121}
122
123// Tests that wakeup doesn't do anything if the thread dies.
124TEST_F(LocklessQueueTest, DiedWatcherWakeup) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700125 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700126
127 ::std::thread([this]() {
128 // Use placement new so the destructor doesn't get run.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700129 ::std::aligned_storage<sizeof(LocklessQueueWatcher),
130 alignof(LocklessQueueWatcher)>::type data;
131 new (&data)
132 LocklessQueueWatcher(LocklessQueueWatcher::Make(queue(), 5).value());
Brian Silverman7b266d92021-02-17 21:24:02 -0800133 }).join();
Austin Schuh20b2b082019-09-11 20:42:56 -0700134
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700135 EXPECT_EQ(wake_upper.Wakeup(7), 0);
Austin Schuh20b2b082019-09-11 20:42:56 -0700136}
137
138struct WatcherState {
139 ::std::thread t;
140 Event ready;
141};
142
143// Tests that too many watchers fails like expected.
144TEST_F(LocklessQueueTest, TooManyWatchers) {
145 // This is going to be a barrel of monkeys.
146 // We need to spin up a bunch of watchers. But, they all need to be in
147 // different threads so they have different tids.
148 ::std::vector<WatcherState> queues;
149 // Reserve num_watchers WatcherState objects so the pointer value doesn't
150 // change out from under us below.
151 queues.reserve(config_.num_watchers);
152
153 // Event used to trigger all the threads to unregister.
154 Event cleanup;
155
156 // Start all the threads.
157 for (size_t i = 0; i < config_.num_watchers; ++i) {
158 queues.emplace_back();
159
160 WatcherState *s = &queues.back();
161 queues.back().t = ::std::thread([this, &cleanup, s]() {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700162 LocklessQueueWatcher q = LocklessQueueWatcher::Make(queue(), 0).value();
Austin Schuh20b2b082019-09-11 20:42:56 -0700163
164 // Signal that this thread is ready.
165 s->ready.Set();
166
167 // And wait until we are asked to shut down.
168 cleanup.Wait();
Austin Schuh20b2b082019-09-11 20:42:56 -0700169 });
170 }
171
172 // Wait until all the threads are actually going.
173 for (WatcherState &w : queues) {
174 w.ready.Wait();
175 }
176
177 // Now try to allocate another one. This will fail.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700178 EXPECT_FALSE(LocklessQueueWatcher::Make(queue(), 0));
Austin Schuh20b2b082019-09-11 20:42:56 -0700179
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700180 // Trigger the threads to cleanup their resources, and wait until they are
Austin Schuh20b2b082019-09-11 20:42:56 -0700181 // done.
182 cleanup.Set();
183 for (WatcherState &w : queues) {
184 w.t.join();
185 }
186
187 // We should now be able to allocate a wakeup.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700188 EXPECT_TRUE(LocklessQueueWatcher::Make(queue(), 0));
Austin Schuh20b2b082019-09-11 20:42:56 -0700189}
190
191// Tests that too many watchers dies like expected.
Austin Schuhe516ab02020-05-06 21:37:04 -0700192TEST_F(LocklessQueueTest, TooManySenders) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700193 ::std::vector<LocklessQueueSender> senders;
Austin Schuhe516ab02020-05-06 21:37:04 -0700194 for (size_t i = 0; i < config_.num_senders; ++i) {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700195 senders.emplace_back(
196 LocklessQueueSender::Make(queue(), kChannelStorageDuration).value());
Austin Schuhe516ab02020-05-06 21:37:04 -0700197 }
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700198 EXPECT_FALSE(LocklessQueueSender::Make(queue(), kChannelStorageDuration));
Austin Schuh20b2b082019-09-11 20:42:56 -0700199}
200
201// Now, start 2 threads and have them receive the signals.
202TEST_F(LocklessQueueTest, WakeUpThreads) {
203 // Confirm that the wakeup signal is in range.
204 EXPECT_LE(kWakeupSignal, SIGRTMAX);
205 EXPECT_GE(kWakeupSignal, SIGRTMIN);
206
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700207 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700208
209 // Event used to make sure the thread is ready before the test starts.
210 Event ready1;
211 Event ready2;
212
213 // Start the thread.
Austin Schuh07290cd2022-08-16 18:01:14 -0700214 ::std::thread t1([this, &ready1]() { RunUntilWakeup(&ready1, 2); });
215 ::std::thread t2([this, &ready2]() { RunUntilWakeup(&ready2, 1); });
Austin Schuh20b2b082019-09-11 20:42:56 -0700216
217 ready1.Wait();
218 ready2.Wait();
219
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700220 EXPECT_EQ(wake_upper.Wakeup(3), 2);
Austin Schuh20b2b082019-09-11 20:42:56 -0700221
222 t1.join();
223 t2.join();
224
225 // Clean up afterwords. We are pretending to be RT when we are really not.
226 // So we will be PI boosted up.
227 UnsetCurrentThreadRealtimePriority();
228}
229
230// Do a simple send test.
231TEST_F(LocklessQueueTest, Send) {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700232 LocklessQueueSender sender =
233 LocklessQueueSender::Make(queue(), kChannelStorageDuration).value();
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700234 LocklessQueueReader reader(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700235
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700236 time::PhasedLoop loop(std::chrono::microseconds(1), monotonic_clock::now());
Austin Schuh20b2b082019-09-11 20:42:56 -0700237 // Send enough messages to wrap.
238 for (int i = 0; i < 20000; ++i) {
239 // Confirm that the queue index makes sense given the number of sends.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700240 EXPECT_EQ(reader.LatestIndex().index(),
241 i == 0 ? QueueIndex::Invalid().index() : i - 1);
Austin Schuh20b2b082019-09-11 20:42:56 -0700242
243 // Send a trivial piece of data.
244 char data[100];
245 size_t s = snprintf(data, sizeof(data), "foobar%d", i);
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700246 EXPECT_EQ(sender.Send(data, s, monotonic_clock::min_time,
247 realtime_clock::min_time, 0xffffffffu, UUID::Zero(),
248 nullptr, nullptr, nullptr),
249 LocklessQueueSender::Result::GOOD);
Austin Schuh20b2b082019-09-11 20:42:56 -0700250
251 // Confirm that the queue index still makes sense. This is easier since the
252 // empty case has been handled.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700253 EXPECT_EQ(reader.LatestIndex().index(), i);
Austin Schuh20b2b082019-09-11 20:42:56 -0700254
255 // Read a result from 5 in the past.
Austin Schuhb5c6f972021-03-14 21:53:07 -0700256 monotonic_clock::time_point monotonic_sent_time;
257 realtime_clock::time_point realtime_sent_time;
258 monotonic_clock::time_point monotonic_remote_time;
259 realtime_clock::time_point realtime_remote_time;
Austin Schuhad154822019-12-27 15:45:13 -0800260 uint32_t remote_queue_index;
Austin Schuha9012be2021-07-21 15:19:11 -0700261 UUID source_boot_uuid;
Austin Schuh20b2b082019-09-11 20:42:56 -0700262 char read_data[1024];
263 size_t length;
264
265 QueueIndex index = QueueIndex::Zero(config_.queue_size);
266 if (i - 5 < 0) {
267 index = index.DecrementBy(5 - i);
268 } else {
269 index = index.IncrementBy(i - 5);
270 }
Austin Schuh8902fa52021-03-14 22:39:24 -0700271 LocklessQueueReader::Result read_result = reader.Read(
272 index.index(), &monotonic_sent_time, &realtime_sent_time,
273 &monotonic_remote_time, &realtime_remote_time, &remote_queue_index,
Austin Schuha9012be2021-07-21 15:19:11 -0700274 &source_boot_uuid, &length, &(read_data[0]));
Austin Schuh20b2b082019-09-11 20:42:56 -0700275
276 // This should either return GOOD, or TOO_OLD if it is before the start of
277 // the queue.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700278 if (read_result != LocklessQueueReader::Result::GOOD) {
279 EXPECT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
Austin Schuh20b2b082019-09-11 20:42:56 -0700280 }
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700281
282 loop.SleepUntilNext();
Austin Schuh20b2b082019-09-11 20:42:56 -0700283 }
284}
285
286// Races a bunch of sending threads to see if it all works.
287TEST_F(LocklessQueueTest, SendRace) {
288 const size_t kNumMessages = 10000 / FLAGS_thread_count;
289
290 ::std::mt19937 generator(0);
291 ::std::uniform_int_distribution<> write_wrap_count_distribution(0, 10);
292 ::std::bernoulli_distribution race_reads_distribution;
293 ::std::bernoulli_distribution wrap_writes_distribution;
294
295 const chrono::seconds print_frequency(FLAGS_print_rate);
296
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700297 QueueRacer racer(queue(), FLAGS_thread_count, kNumMessages);
298 const monotonic_clock::time_point start_time = monotonic_clock::now();
Austin Schuh20b2b082019-09-11 20:42:56 -0700299 const monotonic_clock::time_point end_time =
300 start_time + chrono::seconds(FLAGS_duration);
301
302 monotonic_clock::time_point monotonic_now = start_time;
303 monotonic_clock::time_point next_print_time = start_time + print_frequency;
304 uint64_t messages = 0;
305 for (int i = 0; i < FLAGS_min_iterations || monotonic_now < end_time; ++i) {
306 bool race_reads = race_reads_distribution(generator);
307 int write_wrap_count = write_wrap_count_distribution(generator);
308 if (!wrap_writes_distribution(generator)) {
309 write_wrap_count = 0;
310 }
311 EXPECT_NO_FATAL_FAILURE(racer.RunIteration(race_reads, write_wrap_count))
312 << ": Running with race_reads: " << race_reads
313 << ", and write_wrap_count " << write_wrap_count << " and on iteration "
314 << i;
315
316 messages += racer.CurrentIndex();
317
318 monotonic_now = monotonic_clock::now();
319 if (monotonic_now > next_print_time) {
320 double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
321 monotonic_now - start_time)
322 .count();
323 printf("Finished iteration %d, %f iterations/sec, %f messages/second\n",
324 i, i / elapsed_seconds,
325 static_cast<double>(messages) / elapsed_seconds);
326 next_print_time = monotonic_now + print_frequency;
327 }
328 }
329}
330
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700331namespace {
332
Austin Schuh07290cd2022-08-16 18:01:14 -0700333// Temporarily pins the current thread to the first 2 available CPUs.
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700334// This speeds up the test on some machines a lot (~4x). It also preserves
335// opportunities for the 2 threads to race each other.
336class PinForTest {
337 public:
338 PinForTest() {
Austin Schuh07290cd2022-08-16 18:01:14 -0700339 cpu_set_t cpus = GetCurrentThreadAffinity();
340 old_ = cpus;
341 int number_found = 0;
342 for (int i = 0; i < CPU_SETSIZE; ++i) {
343 if (CPU_ISSET(i, &cpus)) {
344 if (number_found < 2) {
345 ++number_found;
346 } else {
347 CPU_CLR(i, &cpus);
348 }
349 }
350 }
351 SetCurrentThreadAffinity(cpus);
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700352 }
Austin Schuh07290cd2022-08-16 18:01:14 -0700353 ~PinForTest() { SetCurrentThreadAffinity(old_); }
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700354
355 private:
356 cpu_set_t old_;
357};
358
359} // namespace
360
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700361class LocklessQueueTestTooFast : public LocklessQueueTest {
362 public:
363 LocklessQueueTestTooFast() {
364 // Force a scenario where senders get rate limited
365 config_.num_watchers = 1000;
366 config_.num_senders = 100;
367 config_.num_pinners = 5;
368 config_.queue_size = 100;
369 // Exercise the alignment code. This would throw off alignment.
370 config_.message_data_size = 101;
371
372 // Since our backing store is an array of uint64_t for alignment purposes,
373 // normalize by the size.
374 memory_.resize(LocklessQueueMemorySize(config_) / sizeof(uint64_t));
375
376 Reset();
377 }
378};
379
380// Ensure we always return OK or MESSAGES_SENT_TOO_FAST under an extreme load
381// on the Sender Queue.
382TEST_F(LocklessQueueTestTooFast, MessagesSentTooFast) {
383 PinForTest pin_cpu;
384 uint64_t kNumMessages = 1000000;
385 QueueRacer racer(queue(),
386 {FLAGS_thread_count,
387 kNumMessages,
388 {LocklessQueueSender::Result::GOOD,
389 LocklessQueueSender::Result::MESSAGES_SENT_TOO_FAST},
390 std::chrono::milliseconds(500),
391 false});
392
393 EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
394}
395
396// // Send enough messages to wrap the 32 bit send counter.
Austin Schuh20b2b082019-09-11 20:42:56 -0700397TEST_F(LocklessQueueTest, WrappedSend) {
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700398 PinForTest pin_cpu;
Austin Schuh20b2b082019-09-11 20:42:56 -0700399 uint64_t kNumMessages = 0x100010000ul;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700400 QueueRacer racer(queue(), 1, kNumMessages);
Austin Schuh20b2b082019-09-11 20:42:56 -0700401
402 const monotonic_clock::time_point start_time = monotonic_clock::now();
403 EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
404 const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
405 double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
406 monotonic_now - start_time)
407 .count();
408 printf("Took %f seconds to write %" PRIu64 " messages, %f messages/s\n",
409 elapsed_seconds, kNumMessages,
410 static_cast<double>(kNumMessages) / elapsed_seconds);
411}
412
413} // namespace testing
414} // namespace ipc_lib
415} // namespace aos