blob: 93ae2a34a71bcfe53b3729f124bbca95aed49b3f [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#include "aos/ipc_lib/lockless_queue.h"
2
Austin Schuh20b2b082019-09-11 20:42:56 -07003#include <unistd.h>
4#include <wait.h>
Brian Silverman7b266d92021-02-17 21:24:02 -08005
Austin Schuh20b2b082019-09-11 20:42:56 -07006#include <chrono>
Tyler Chatowbf0609c2021-07-31 16:13:27 -07007#include <cinttypes>
8#include <csignal>
Austin Schuh20b2b082019-09-11 20:42:56 -07009#include <memory>
10#include <random>
11#include <thread>
12
Philipp Schrader790cb542023-07-05 21:06:52 -070013#include "gflags/gflags.h"
14#include "gtest/gtest.h"
15
Austin Schuh20b2b082019-09-11 20:42:56 -070016#include "aos/events/epoll.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070017#include "aos/ipc_lib/aos_sync.h"
Brian Silverman7b266d92021-02-17 21:24:02 -080018#include "aos/ipc_lib/event.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070019#include "aos/ipc_lib/queue_racer.h"
20#include "aos/ipc_lib/signalfd.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070021#include "aos/realtime.h"
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070022#include "aos/util/phased_loop.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070023
24DEFINE_int32(min_iterations, 100,
25 "Minimum number of stress test iterations to run");
26DEFINE_int32(duration, 5, "Number of seconds to test for");
27DEFINE_int32(print_rate, 60, "Number of seconds between status prints");
28
29// The roboRIO can only handle 10 threads before exploding. Set the default for
30// ARM to 10.
31DEFINE_int32(thread_count,
32#if defined(__ARM_EABI__)
33 10,
34#else
35 100,
36#endif
37 "Number of threads to race");
38
39namespace aos {
40namespace ipc_lib {
41namespace testing {
42
43namespace chrono = ::std::chrono;
44
45class LocklessQueueTest : public ::testing::Test {
46 public:
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070047 static constexpr monotonic_clock::duration kChannelStorageDuration =
48 std::chrono::milliseconds(500);
49
Austin Schuh20b2b082019-09-11 20:42:56 -070050 LocklessQueueTest() {
Austin Schuh20b2b082019-09-11 20:42:56 -070051 config_.num_watchers = 10;
52 config_.num_senders = 100;
Brian Silverman177567e2020-08-12 19:51:33 -070053 config_.num_pinners = 5;
Austin Schuh20b2b082019-09-11 20:42:56 -070054 config_.queue_size = 10000;
55 // Exercise the alignment code. This would throw off alignment.
56 config_.message_data_size = 101;
57
58 // Since our backing store is an array of uint64_t for alignment purposes,
59 // normalize by the size.
60 memory_.resize(LocklessQueueMemorySize(config_) / sizeof(uint64_t));
61
62 Reset();
63 }
64
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070065 LocklessQueue queue() {
66 return LocklessQueue(reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
67 reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
68 config_);
Austin Schuh20b2b082019-09-11 20:42:56 -070069 }
70
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070071 void Reset() { memset(&memory_[0], 0, LocklessQueueMemorySize(config_)); }
Austin Schuh20b2b082019-09-11 20:42:56 -070072
73 // Runs until the signal is received.
74 void RunUntilWakeup(Event *ready, int priority) {
Austin Schuh20b2b082019-09-11 20:42:56 -070075 internal::EPoll epoll;
76 SignalFd signalfd({kWakeupSignal});
77
78 epoll.OnReadable(signalfd.fd(), [&signalfd, &epoll]() {
79 signalfd_siginfo result = signalfd.Read();
80
81 fprintf(stderr, "Got signal: %d\n", result.ssi_signo);
82 epoll.Quit();
83 });
84
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070085 {
86 // Register to be woken up *after* the signalfd is catching the signals.
87 LocklessQueueWatcher watcher =
88 LocklessQueueWatcher::Make(queue(), priority).value();
Austin Schuh20b2b082019-09-11 20:42:56 -070089
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070090 // And signal we are now ready.
91 ready->Set();
Austin Schuh20b2b082019-09-11 20:42:56 -070092
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070093 epoll.Run();
Austin Schuh20b2b082019-09-11 20:42:56 -070094
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070095 // Cleanup, ensuring the watcher is destroyed before the signalfd.
96 }
Austin Schuh20b2b082019-09-11 20:42:56 -070097 epoll.DeleteFd(signalfd.fd());
98 }
99
100 // Use a type with enough alignment that we are guarenteed that everything
101 // will be aligned properly on the target platform.
102 ::std::vector<uint64_t> memory_;
103
104 LocklessQueueConfiguration config_;
105};
106
Austin Schuh20b2b082019-09-11 20:42:56 -0700107// Tests that wakeup doesn't do anything if nothing was registered.
108TEST_F(LocklessQueueTest, NoWatcherWakeup) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700109 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700110
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700111 EXPECT_EQ(wake_upper.Wakeup(7), 0);
Austin Schuh20b2b082019-09-11 20:42:56 -0700112}
113
114// Tests that wakeup doesn't do anything if a wakeup was registered and then
115// unregistered.
116TEST_F(LocklessQueueTest, UnregisteredWatcherWakeup) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700117 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700118
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700119 { LocklessQueueWatcher::Make(queue(), 5).value(); }
Austin Schuh20b2b082019-09-11 20:42:56 -0700120
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700121 EXPECT_EQ(wake_upper.Wakeup(7), 0);
Austin Schuh20b2b082019-09-11 20:42:56 -0700122}
123
124// Tests that wakeup doesn't do anything if the thread dies.
125TEST_F(LocklessQueueTest, DiedWatcherWakeup) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700126 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700127
128 ::std::thread([this]() {
129 // Use placement new so the destructor doesn't get run.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700130 ::std::aligned_storage<sizeof(LocklessQueueWatcher),
131 alignof(LocklessQueueWatcher)>::type data;
132 new (&data)
133 LocklessQueueWatcher(LocklessQueueWatcher::Make(queue(), 5).value());
Brian Silverman7b266d92021-02-17 21:24:02 -0800134 }).join();
Austin Schuh20b2b082019-09-11 20:42:56 -0700135
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700136 EXPECT_EQ(wake_upper.Wakeup(7), 0);
Austin Schuh20b2b082019-09-11 20:42:56 -0700137}
138
139struct WatcherState {
140 ::std::thread t;
141 Event ready;
142};
143
144// Tests that too many watchers fails like expected.
145TEST_F(LocklessQueueTest, TooManyWatchers) {
146 // This is going to be a barrel of monkeys.
147 // We need to spin up a bunch of watchers. But, they all need to be in
148 // different threads so they have different tids.
149 ::std::vector<WatcherState> queues;
150 // Reserve num_watchers WatcherState objects so the pointer value doesn't
151 // change out from under us below.
152 queues.reserve(config_.num_watchers);
153
154 // Event used to trigger all the threads to unregister.
155 Event cleanup;
156
157 // Start all the threads.
158 for (size_t i = 0; i < config_.num_watchers; ++i) {
159 queues.emplace_back();
160
161 WatcherState *s = &queues.back();
162 queues.back().t = ::std::thread([this, &cleanup, s]() {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700163 LocklessQueueWatcher q = LocklessQueueWatcher::Make(queue(), 0).value();
Austin Schuh20b2b082019-09-11 20:42:56 -0700164
165 // Signal that this thread is ready.
166 s->ready.Set();
167
168 // And wait until we are asked to shut down.
169 cleanup.Wait();
Austin Schuh20b2b082019-09-11 20:42:56 -0700170 });
171 }
172
173 // Wait until all the threads are actually going.
174 for (WatcherState &w : queues) {
175 w.ready.Wait();
176 }
177
178 // Now try to allocate another one. This will fail.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700179 EXPECT_FALSE(LocklessQueueWatcher::Make(queue(), 0));
Austin Schuh20b2b082019-09-11 20:42:56 -0700180
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700181 // Trigger the threads to cleanup their resources, and wait until they are
Austin Schuh20b2b082019-09-11 20:42:56 -0700182 // done.
183 cleanup.Set();
184 for (WatcherState &w : queues) {
185 w.t.join();
186 }
187
188 // We should now be able to allocate a wakeup.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700189 EXPECT_TRUE(LocklessQueueWatcher::Make(queue(), 0));
Austin Schuh20b2b082019-09-11 20:42:56 -0700190}
191
192// Tests that too many watchers dies like expected.
Austin Schuhe516ab02020-05-06 21:37:04 -0700193TEST_F(LocklessQueueTest, TooManySenders) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700194 ::std::vector<LocklessQueueSender> senders;
Austin Schuhe516ab02020-05-06 21:37:04 -0700195 for (size_t i = 0; i < config_.num_senders; ++i) {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700196 senders.emplace_back(
197 LocklessQueueSender::Make(queue(), kChannelStorageDuration).value());
Austin Schuhe516ab02020-05-06 21:37:04 -0700198 }
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700199 EXPECT_FALSE(LocklessQueueSender::Make(queue(), kChannelStorageDuration));
Austin Schuh20b2b082019-09-11 20:42:56 -0700200}
201
202// Now, start 2 threads and have them receive the signals.
203TEST_F(LocklessQueueTest, WakeUpThreads) {
204 // Confirm that the wakeup signal is in range.
205 EXPECT_LE(kWakeupSignal, SIGRTMAX);
206 EXPECT_GE(kWakeupSignal, SIGRTMIN);
207
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700208 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700209
210 // Event used to make sure the thread is ready before the test starts.
211 Event ready1;
212 Event ready2;
213
214 // Start the thread.
Austin Schuh07290cd2022-08-16 18:01:14 -0700215 ::std::thread t1([this, &ready1]() { RunUntilWakeup(&ready1, 2); });
216 ::std::thread t2([this, &ready2]() { RunUntilWakeup(&ready2, 1); });
Austin Schuh20b2b082019-09-11 20:42:56 -0700217
218 ready1.Wait();
219 ready2.Wait();
220
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700221 EXPECT_EQ(wake_upper.Wakeup(3), 2);
Austin Schuh20b2b082019-09-11 20:42:56 -0700222
223 t1.join();
224 t2.join();
225
226 // Clean up afterwords. We are pretending to be RT when we are really not.
227 // So we will be PI boosted up.
228 UnsetCurrentThreadRealtimePriority();
229}
230
231// Do a simple send test.
232TEST_F(LocklessQueueTest, Send) {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700233 LocklessQueueSender sender =
234 LocklessQueueSender::Make(queue(), kChannelStorageDuration).value();
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700235 LocklessQueueReader reader(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700236
Austin Schuh82ea7382023-07-14 15:17:34 -0700237 time::PhasedLoop loop(kChannelStorageDuration / (config_.queue_size - 1),
238 monotonic_clock::now());
239 std::function<bool(const Context &)> should_read = [](const Context &) {
240 return true;
241 };
242
Austin Schuh20b2b082019-09-11 20:42:56 -0700243 // Send enough messages to wrap.
Austin Schuh82ea7382023-07-14 15:17:34 -0700244 for (int i = 0; i < 2 * static_cast<int>(config_.queue_size); ++i) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700245 // Confirm that the queue index makes sense given the number of sends.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700246 EXPECT_EQ(reader.LatestIndex().index(),
247 i == 0 ? QueueIndex::Invalid().index() : i - 1);
Austin Schuh20b2b082019-09-11 20:42:56 -0700248
249 // Send a trivial piece of data.
250 char data[100];
251 size_t s = snprintf(data, sizeof(data), "foobar%d", i);
Austin Schuh82ea7382023-07-14 15:17:34 -0700252 ASSERT_EQ(sender.Send(data, s, monotonic_clock::min_time,
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700253 realtime_clock::min_time, 0xffffffffu, UUID::Zero(),
254 nullptr, nullptr, nullptr),
255 LocklessQueueSender::Result::GOOD);
Austin Schuh20b2b082019-09-11 20:42:56 -0700256
257 // Confirm that the queue index still makes sense. This is easier since the
258 // empty case has been handled.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700259 EXPECT_EQ(reader.LatestIndex().index(), i);
Austin Schuh20b2b082019-09-11 20:42:56 -0700260
261 // Read a result from 5 in the past.
Austin Schuhb5c6f972021-03-14 21:53:07 -0700262 monotonic_clock::time_point monotonic_sent_time;
263 realtime_clock::time_point realtime_sent_time;
264 monotonic_clock::time_point monotonic_remote_time;
265 realtime_clock::time_point realtime_remote_time;
Austin Schuhad154822019-12-27 15:45:13 -0800266 uint32_t remote_queue_index;
Austin Schuha9012be2021-07-21 15:19:11 -0700267 UUID source_boot_uuid;
Austin Schuh20b2b082019-09-11 20:42:56 -0700268 char read_data[1024];
269 size_t length;
270
271 QueueIndex index = QueueIndex::Zero(config_.queue_size);
272 if (i - 5 < 0) {
273 index = index.DecrementBy(5 - i);
274 } else {
275 index = index.IncrementBy(i - 5);
276 }
Austin Schuh8902fa52021-03-14 22:39:24 -0700277 LocklessQueueReader::Result read_result = reader.Read(
278 index.index(), &monotonic_sent_time, &realtime_sent_time,
279 &monotonic_remote_time, &realtime_remote_time, &remote_queue_index,
Austin Schuh82ea7382023-07-14 15:17:34 -0700280 &source_boot_uuid, &length, &(read_data[0]), std::ref(should_read));
Austin Schuh20b2b082019-09-11 20:42:56 -0700281
282 // This should either return GOOD, or TOO_OLD if it is before the start of
283 // the queue.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700284 if (read_result != LocklessQueueReader::Result::GOOD) {
Austin Schuh82ea7382023-07-14 15:17:34 -0700285 ASSERT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
Austin Schuh20b2b082019-09-11 20:42:56 -0700286 }
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700287
288 loop.SleepUntilNext();
Austin Schuh20b2b082019-09-11 20:42:56 -0700289 }
290}
291
292// Races a bunch of sending threads to see if it all works.
293TEST_F(LocklessQueueTest, SendRace) {
294 const size_t kNumMessages = 10000 / FLAGS_thread_count;
295
296 ::std::mt19937 generator(0);
297 ::std::uniform_int_distribution<> write_wrap_count_distribution(0, 10);
298 ::std::bernoulli_distribution race_reads_distribution;
Austin Schuh82ea7382023-07-14 15:17:34 -0700299 ::std::bernoulli_distribution set_should_read_distribution;
300 ::std::bernoulli_distribution should_read_result_distribution;
Austin Schuh20b2b082019-09-11 20:42:56 -0700301 ::std::bernoulli_distribution wrap_writes_distribution;
302
303 const chrono::seconds print_frequency(FLAGS_print_rate);
304
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700305 QueueRacer racer(queue(), FLAGS_thread_count, kNumMessages);
306 const monotonic_clock::time_point start_time = monotonic_clock::now();
Austin Schuh20b2b082019-09-11 20:42:56 -0700307 const monotonic_clock::time_point end_time =
308 start_time + chrono::seconds(FLAGS_duration);
309
310 monotonic_clock::time_point monotonic_now = start_time;
311 monotonic_clock::time_point next_print_time = start_time + print_frequency;
312 uint64_t messages = 0;
313 for (int i = 0; i < FLAGS_min_iterations || monotonic_now < end_time; ++i) {
Austin Schuh82ea7382023-07-14 15:17:34 -0700314 const bool race_reads = race_reads_distribution(generator);
315 const bool set_should_read = set_should_read_distribution(generator);
316 const bool should_read_result = should_read_result_distribution(generator);
Austin Schuh20b2b082019-09-11 20:42:56 -0700317 int write_wrap_count = write_wrap_count_distribution(generator);
318 if (!wrap_writes_distribution(generator)) {
319 write_wrap_count = 0;
320 }
Austin Schuh82ea7382023-07-14 15:17:34 -0700321 EXPECT_NO_FATAL_FAILURE(racer.RunIteration(
322 race_reads, write_wrap_count, set_should_read, should_read_result))
Austin Schuh20b2b082019-09-11 20:42:56 -0700323 << ": Running with race_reads: " << race_reads
324 << ", and write_wrap_count " << write_wrap_count << " and on iteration "
325 << i;
326
327 messages += racer.CurrentIndex();
328
329 monotonic_now = monotonic_clock::now();
330 if (monotonic_now > next_print_time) {
331 double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
332 monotonic_now - start_time)
333 .count();
334 printf("Finished iteration %d, %f iterations/sec, %f messages/second\n",
335 i, i / elapsed_seconds,
336 static_cast<double>(messages) / elapsed_seconds);
337 next_print_time = monotonic_now + print_frequency;
338 }
339 }
340}
341
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700342namespace {
343
Austin Schuh07290cd2022-08-16 18:01:14 -0700344// Temporarily pins the current thread to the first 2 available CPUs.
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700345// This speeds up the test on some machines a lot (~4x). It also preserves
346// opportunities for the 2 threads to race each other.
347class PinForTest {
348 public:
349 PinForTest() {
Austin Schuh07290cd2022-08-16 18:01:14 -0700350 cpu_set_t cpus = GetCurrentThreadAffinity();
351 old_ = cpus;
352 int number_found = 0;
353 for (int i = 0; i < CPU_SETSIZE; ++i) {
354 if (CPU_ISSET(i, &cpus)) {
355 if (number_found < 2) {
356 ++number_found;
357 } else {
358 CPU_CLR(i, &cpus);
359 }
360 }
361 }
362 SetCurrentThreadAffinity(cpus);
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700363 }
Austin Schuh07290cd2022-08-16 18:01:14 -0700364 ~PinForTest() { SetCurrentThreadAffinity(old_); }
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700365
366 private:
367 cpu_set_t old_;
368};
369
370} // namespace
371
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700372class LocklessQueueTestTooFast : public LocklessQueueTest {
373 public:
374 LocklessQueueTestTooFast() {
375 // Force a scenario where senders get rate limited
376 config_.num_watchers = 1000;
377 config_.num_senders = 100;
378 config_.num_pinners = 5;
379 config_.queue_size = 100;
380 // Exercise the alignment code. This would throw off alignment.
381 config_.message_data_size = 101;
382
383 // Since our backing store is an array of uint64_t for alignment purposes,
384 // normalize by the size.
385 memory_.resize(LocklessQueueMemorySize(config_) / sizeof(uint64_t));
386
387 Reset();
388 }
389};
390
391// Ensure we always return OK or MESSAGES_SENT_TOO_FAST under an extreme load
392// on the Sender Queue.
393TEST_F(LocklessQueueTestTooFast, MessagesSentTooFast) {
394 PinForTest pin_cpu;
395 uint64_t kNumMessages = 1000000;
396 QueueRacer racer(queue(),
397 {FLAGS_thread_count,
398 kNumMessages,
399 {LocklessQueueSender::Result::GOOD,
400 LocklessQueueSender::Result::MESSAGES_SENT_TOO_FAST},
401 std::chrono::milliseconds(500),
402 false});
403
Austin Schuh82ea7382023-07-14 15:17:34 -0700404 EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0, true, true));
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700405}
406
407// // Send enough messages to wrap the 32 bit send counter.
Austin Schuh20b2b082019-09-11 20:42:56 -0700408TEST_F(LocklessQueueTest, WrappedSend) {
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700409 PinForTest pin_cpu;
Austin Schuh20b2b082019-09-11 20:42:56 -0700410 uint64_t kNumMessages = 0x100010000ul;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700411 QueueRacer racer(queue(), 1, kNumMessages);
Austin Schuh20b2b082019-09-11 20:42:56 -0700412
413 const monotonic_clock::time_point start_time = monotonic_clock::now();
Austin Schuh82ea7382023-07-14 15:17:34 -0700414 EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0, false, true));
Austin Schuh20b2b082019-09-11 20:42:56 -0700415 const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
416 double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
417 monotonic_now - start_time)
418 .count();
419 printf("Took %f seconds to write %" PRIu64 " messages, %f messages/s\n",
420 elapsed_seconds, kNumMessages,
421 static_cast<double>(kNumMessages) / elapsed_seconds);
422}
423
424} // namespace testing
425} // namespace ipc_lib
426} // namespace aos