blob: 34e276234988eb381a09463119088f94b4f50fc0 [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#include "aos/ipc_lib/lockless_queue.h"
2
Austin Schuh20b2b082019-09-11 20:42:56 -07003#include <unistd.h>
4#include <wait.h>
Brian Silverman7b266d92021-02-17 21:24:02 -08005
Austin Schuh20b2b082019-09-11 20:42:56 -07006#include <chrono>
Tyler Chatowbf0609c2021-07-31 16:13:27 -07007#include <cinttypes>
8#include <csignal>
Austin Schuh20b2b082019-09-11 20:42:56 -07009#include <memory>
10#include <random>
11#include <thread>
12
Philipp Schrader790cb542023-07-05 21:06:52 -070013#include "gflags/gflags.h"
14#include "gtest/gtest.h"
15
Austin Schuh20b2b082019-09-11 20:42:56 -070016#include "aos/events/epoll.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070017#include "aos/ipc_lib/aos_sync.h"
Brian Silverman7b266d92021-02-17 21:24:02 -080018#include "aos/ipc_lib/event.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070019#include "aos/ipc_lib/queue_racer.h"
20#include "aos/ipc_lib/signalfd.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070021#include "aos/realtime.h"
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070022#include "aos/util/phased_loop.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070023
24DEFINE_int32(min_iterations, 100,
25 "Minimum number of stress test iterations to run");
26DEFINE_int32(duration, 5, "Number of seconds to test for");
27DEFINE_int32(print_rate, 60, "Number of seconds between status prints");
28
29// The roboRIO can only handle 10 threads before exploding. Set the default for
30// ARM to 10.
31DEFINE_int32(thread_count,
32#if defined(__ARM_EABI__)
33 10,
34#else
35 100,
36#endif
37 "Number of threads to race");
38
39namespace aos {
40namespace ipc_lib {
41namespace testing {
42
43namespace chrono = ::std::chrono;
44
45class LocklessQueueTest : public ::testing::Test {
46 public:
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070047 static constexpr monotonic_clock::duration kChannelStorageDuration =
48 std::chrono::milliseconds(500);
49
Austin Schuh20b2b082019-09-11 20:42:56 -070050 LocklessQueueTest() {
Austin Schuh20b2b082019-09-11 20:42:56 -070051 config_.num_watchers = 10;
52 config_.num_senders = 100;
Brian Silverman177567e2020-08-12 19:51:33 -070053 config_.num_pinners = 5;
Austin Schuh20b2b082019-09-11 20:42:56 -070054 config_.queue_size = 10000;
55 // Exercise the alignment code. This would throw off alignment.
56 config_.message_data_size = 101;
57
58 // Since our backing store is an array of uint64_t for alignment purposes,
59 // normalize by the size.
60 memory_.resize(LocklessQueueMemorySize(config_) / sizeof(uint64_t));
61
62 Reset();
63 }
64
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070065 LocklessQueue queue() {
66 return LocklessQueue(reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
67 reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
68 config_);
Austin Schuh20b2b082019-09-11 20:42:56 -070069 }
70
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070071 void Reset() { memset(&memory_[0], 0, LocklessQueueMemorySize(config_)); }
Austin Schuh20b2b082019-09-11 20:42:56 -070072
73 // Runs until the signal is received.
74 void RunUntilWakeup(Event *ready, int priority) {
Austin Schuh20b2b082019-09-11 20:42:56 -070075 internal::EPoll epoll;
76 SignalFd signalfd({kWakeupSignal});
77
78 epoll.OnReadable(signalfd.fd(), [&signalfd, &epoll]() {
79 signalfd_siginfo result = signalfd.Read();
80
81 fprintf(stderr, "Got signal: %d\n", result.ssi_signo);
82 epoll.Quit();
83 });
84
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070085 {
86 // Register to be woken up *after* the signalfd is catching the signals.
87 LocklessQueueWatcher watcher =
88 LocklessQueueWatcher::Make(queue(), priority).value();
Austin Schuh20b2b082019-09-11 20:42:56 -070089
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070090 // And signal we are now ready.
91 ready->Set();
Austin Schuh20b2b082019-09-11 20:42:56 -070092
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070093 epoll.Run();
Austin Schuh20b2b082019-09-11 20:42:56 -070094
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070095 // Cleanup, ensuring the watcher is destroyed before the signalfd.
96 }
Austin Schuh20b2b082019-09-11 20:42:56 -070097 epoll.DeleteFd(signalfd.fd());
98 }
99
100 // Use a type with enough alignment that we are guarenteed that everything
101 // will be aligned properly on the target platform.
102 ::std::vector<uint64_t> memory_;
103
104 LocklessQueueConfiguration config_;
105};
106
Austin Schuh20b2b082019-09-11 20:42:56 -0700107// Tests that wakeup doesn't do anything if nothing was registered.
108TEST_F(LocklessQueueTest, NoWatcherWakeup) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700109 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700110
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700111 EXPECT_EQ(wake_upper.Wakeup(7), 0);
Austin Schuh20b2b082019-09-11 20:42:56 -0700112}
113
114// Tests that wakeup doesn't do anything if a wakeup was registered and then
115// unregistered.
116TEST_F(LocklessQueueTest, UnregisteredWatcherWakeup) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700117 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700118
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700119 { LocklessQueueWatcher::Make(queue(), 5).value(); }
Austin Schuh20b2b082019-09-11 20:42:56 -0700120
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700121 EXPECT_EQ(wake_upper.Wakeup(7), 0);
Austin Schuh20b2b082019-09-11 20:42:56 -0700122}
123
124// Tests that wakeup doesn't do anything if the thread dies.
125TEST_F(LocklessQueueTest, DiedWatcherWakeup) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700126 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700127
128 ::std::thread([this]() {
129 // Use placement new so the destructor doesn't get run.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700130 ::std::aligned_storage<sizeof(LocklessQueueWatcher),
131 alignof(LocklessQueueWatcher)>::type data;
132 new (&data)
133 LocklessQueueWatcher(LocklessQueueWatcher::Make(queue(), 5).value());
Brian Silverman7b266d92021-02-17 21:24:02 -0800134 }).join();
Austin Schuh20b2b082019-09-11 20:42:56 -0700135
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700136 EXPECT_EQ(wake_upper.Wakeup(7), 0);
Austin Schuh20b2b082019-09-11 20:42:56 -0700137}
138
139struct WatcherState {
140 ::std::thread t;
141 Event ready;
142};
143
144// Tests that too many watchers fails like expected.
145TEST_F(LocklessQueueTest, TooManyWatchers) {
146 // This is going to be a barrel of monkeys.
147 // We need to spin up a bunch of watchers. But, they all need to be in
148 // different threads so they have different tids.
149 ::std::vector<WatcherState> queues;
150 // Reserve num_watchers WatcherState objects so the pointer value doesn't
151 // change out from under us below.
152 queues.reserve(config_.num_watchers);
153
154 // Event used to trigger all the threads to unregister.
155 Event cleanup;
156
157 // Start all the threads.
158 for (size_t i = 0; i < config_.num_watchers; ++i) {
159 queues.emplace_back();
160
161 WatcherState *s = &queues.back();
162 queues.back().t = ::std::thread([this, &cleanup, s]() {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700163 LocklessQueueWatcher q = LocklessQueueWatcher::Make(queue(), 0).value();
Austin Schuh20b2b082019-09-11 20:42:56 -0700164
165 // Signal that this thread is ready.
166 s->ready.Set();
167
168 // And wait until we are asked to shut down.
169 cleanup.Wait();
Austin Schuh20b2b082019-09-11 20:42:56 -0700170 });
171 }
172
173 // Wait until all the threads are actually going.
174 for (WatcherState &w : queues) {
175 w.ready.Wait();
176 }
177
178 // Now try to allocate another one. This will fail.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700179 EXPECT_FALSE(LocklessQueueWatcher::Make(queue(), 0));
Austin Schuh20b2b082019-09-11 20:42:56 -0700180
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700181 // Trigger the threads to cleanup their resources, and wait until they are
Austin Schuh20b2b082019-09-11 20:42:56 -0700182 // done.
183 cleanup.Set();
184 for (WatcherState &w : queues) {
185 w.t.join();
186 }
187
188 // We should now be able to allocate a wakeup.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700189 EXPECT_TRUE(LocklessQueueWatcher::Make(queue(), 0));
Austin Schuh20b2b082019-09-11 20:42:56 -0700190}
191
192// Tests that too many watchers dies like expected.
Austin Schuhe516ab02020-05-06 21:37:04 -0700193TEST_F(LocklessQueueTest, TooManySenders) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700194 ::std::vector<LocklessQueueSender> senders;
Austin Schuhe516ab02020-05-06 21:37:04 -0700195 for (size_t i = 0; i < config_.num_senders; ++i) {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700196 senders.emplace_back(
197 LocklessQueueSender::Make(queue(), kChannelStorageDuration).value());
Austin Schuhe516ab02020-05-06 21:37:04 -0700198 }
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700199 EXPECT_FALSE(LocklessQueueSender::Make(queue(), kChannelStorageDuration));
Austin Schuh20b2b082019-09-11 20:42:56 -0700200}
201
202// Now, start 2 threads and have them receive the signals.
203TEST_F(LocklessQueueTest, WakeUpThreads) {
204 // Confirm that the wakeup signal is in range.
205 EXPECT_LE(kWakeupSignal, SIGRTMAX);
206 EXPECT_GE(kWakeupSignal, SIGRTMIN);
207
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700208 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700209
210 // Event used to make sure the thread is ready before the test starts.
211 Event ready1;
212 Event ready2;
213
214 // Start the thread.
Austin Schuh07290cd2022-08-16 18:01:14 -0700215 ::std::thread t1([this, &ready1]() { RunUntilWakeup(&ready1, 2); });
216 ::std::thread t2([this, &ready2]() { RunUntilWakeup(&ready2, 1); });
Austin Schuh20b2b082019-09-11 20:42:56 -0700217
218 ready1.Wait();
219 ready2.Wait();
220
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700221 EXPECT_EQ(wake_upper.Wakeup(3), 2);
Austin Schuh20b2b082019-09-11 20:42:56 -0700222
223 t1.join();
224 t2.join();
225
226 // Clean up afterwords. We are pretending to be RT when we are really not.
227 // So we will be PI boosted up.
228 UnsetCurrentThreadRealtimePriority();
229}
230
231// Do a simple send test.
232TEST_F(LocklessQueueTest, Send) {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700233 LocklessQueueSender sender =
234 LocklessQueueSender::Make(queue(), kChannelStorageDuration).value();
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700235 LocklessQueueReader reader(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700236
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700237 time::PhasedLoop loop(std::chrono::microseconds(1), monotonic_clock::now());
Austin Schuh20b2b082019-09-11 20:42:56 -0700238 // Send enough messages to wrap.
239 for (int i = 0; i < 20000; ++i) {
240 // Confirm that the queue index makes sense given the number of sends.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700241 EXPECT_EQ(reader.LatestIndex().index(),
242 i == 0 ? QueueIndex::Invalid().index() : i - 1);
Austin Schuh20b2b082019-09-11 20:42:56 -0700243
244 // Send a trivial piece of data.
245 char data[100];
246 size_t s = snprintf(data, sizeof(data), "foobar%d", i);
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700247 EXPECT_EQ(sender.Send(data, s, monotonic_clock::min_time,
248 realtime_clock::min_time, 0xffffffffu, UUID::Zero(),
249 nullptr, nullptr, nullptr),
250 LocklessQueueSender::Result::GOOD);
Austin Schuh20b2b082019-09-11 20:42:56 -0700251
252 // Confirm that the queue index still makes sense. This is easier since the
253 // empty case has been handled.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700254 EXPECT_EQ(reader.LatestIndex().index(), i);
Austin Schuh20b2b082019-09-11 20:42:56 -0700255
256 // Read a result from 5 in the past.
Austin Schuhb5c6f972021-03-14 21:53:07 -0700257 monotonic_clock::time_point monotonic_sent_time;
258 realtime_clock::time_point realtime_sent_time;
259 monotonic_clock::time_point monotonic_remote_time;
260 realtime_clock::time_point realtime_remote_time;
Austin Schuhad154822019-12-27 15:45:13 -0800261 uint32_t remote_queue_index;
Austin Schuha9012be2021-07-21 15:19:11 -0700262 UUID source_boot_uuid;
Austin Schuh20b2b082019-09-11 20:42:56 -0700263 char read_data[1024];
264 size_t length;
265
266 QueueIndex index = QueueIndex::Zero(config_.queue_size);
267 if (i - 5 < 0) {
268 index = index.DecrementBy(5 - i);
269 } else {
270 index = index.IncrementBy(i - 5);
271 }
Austin Schuh8902fa52021-03-14 22:39:24 -0700272 LocklessQueueReader::Result read_result = reader.Read(
273 index.index(), &monotonic_sent_time, &realtime_sent_time,
274 &monotonic_remote_time, &realtime_remote_time, &remote_queue_index,
Austin Schuha9012be2021-07-21 15:19:11 -0700275 &source_boot_uuid, &length, &(read_data[0]));
Austin Schuh20b2b082019-09-11 20:42:56 -0700276
277 // This should either return GOOD, or TOO_OLD if it is before the start of
278 // the queue.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700279 if (read_result != LocklessQueueReader::Result::GOOD) {
280 EXPECT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
Austin Schuh20b2b082019-09-11 20:42:56 -0700281 }
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700282
283 loop.SleepUntilNext();
Austin Schuh20b2b082019-09-11 20:42:56 -0700284 }
285}
286
287// Races a bunch of sending threads to see if it all works.
288TEST_F(LocklessQueueTest, SendRace) {
289 const size_t kNumMessages = 10000 / FLAGS_thread_count;
290
291 ::std::mt19937 generator(0);
292 ::std::uniform_int_distribution<> write_wrap_count_distribution(0, 10);
293 ::std::bernoulli_distribution race_reads_distribution;
294 ::std::bernoulli_distribution wrap_writes_distribution;
295
296 const chrono::seconds print_frequency(FLAGS_print_rate);
297
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700298 QueueRacer racer(queue(), FLAGS_thread_count, kNumMessages);
299 const monotonic_clock::time_point start_time = monotonic_clock::now();
Austin Schuh20b2b082019-09-11 20:42:56 -0700300 const monotonic_clock::time_point end_time =
301 start_time + chrono::seconds(FLAGS_duration);
302
303 monotonic_clock::time_point monotonic_now = start_time;
304 monotonic_clock::time_point next_print_time = start_time + print_frequency;
305 uint64_t messages = 0;
306 for (int i = 0; i < FLAGS_min_iterations || monotonic_now < end_time; ++i) {
307 bool race_reads = race_reads_distribution(generator);
308 int write_wrap_count = write_wrap_count_distribution(generator);
309 if (!wrap_writes_distribution(generator)) {
310 write_wrap_count = 0;
311 }
312 EXPECT_NO_FATAL_FAILURE(racer.RunIteration(race_reads, write_wrap_count))
313 << ": Running with race_reads: " << race_reads
314 << ", and write_wrap_count " << write_wrap_count << " and on iteration "
315 << i;
316
317 messages += racer.CurrentIndex();
318
319 monotonic_now = monotonic_clock::now();
320 if (monotonic_now > next_print_time) {
321 double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
322 monotonic_now - start_time)
323 .count();
324 printf("Finished iteration %d, %f iterations/sec, %f messages/second\n",
325 i, i / elapsed_seconds,
326 static_cast<double>(messages) / elapsed_seconds);
327 next_print_time = monotonic_now + print_frequency;
328 }
329 }
330}
331
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700332namespace {
333
Austin Schuh07290cd2022-08-16 18:01:14 -0700334// Temporarily pins the current thread to the first 2 available CPUs.
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700335// This speeds up the test on some machines a lot (~4x). It also preserves
336// opportunities for the 2 threads to race each other.
337class PinForTest {
338 public:
339 PinForTest() {
Austin Schuh07290cd2022-08-16 18:01:14 -0700340 cpu_set_t cpus = GetCurrentThreadAffinity();
341 old_ = cpus;
342 int number_found = 0;
343 for (int i = 0; i < CPU_SETSIZE; ++i) {
344 if (CPU_ISSET(i, &cpus)) {
345 if (number_found < 2) {
346 ++number_found;
347 } else {
348 CPU_CLR(i, &cpus);
349 }
350 }
351 }
352 SetCurrentThreadAffinity(cpus);
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700353 }
Austin Schuh07290cd2022-08-16 18:01:14 -0700354 ~PinForTest() { SetCurrentThreadAffinity(old_); }
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700355
356 private:
357 cpu_set_t old_;
358};
359
360} // namespace
361
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700362class LocklessQueueTestTooFast : public LocklessQueueTest {
363 public:
364 LocklessQueueTestTooFast() {
365 // Force a scenario where senders get rate limited
366 config_.num_watchers = 1000;
367 config_.num_senders = 100;
368 config_.num_pinners = 5;
369 config_.queue_size = 100;
370 // Exercise the alignment code. This would throw off alignment.
371 config_.message_data_size = 101;
372
373 // Since our backing store is an array of uint64_t for alignment purposes,
374 // normalize by the size.
375 memory_.resize(LocklessQueueMemorySize(config_) / sizeof(uint64_t));
376
377 Reset();
378 }
379};
380
381// Ensure we always return OK or MESSAGES_SENT_TOO_FAST under an extreme load
382// on the Sender Queue.
383TEST_F(LocklessQueueTestTooFast, MessagesSentTooFast) {
384 PinForTest pin_cpu;
385 uint64_t kNumMessages = 1000000;
386 QueueRacer racer(queue(),
387 {FLAGS_thread_count,
388 kNumMessages,
389 {LocklessQueueSender::Result::GOOD,
390 LocklessQueueSender::Result::MESSAGES_SENT_TOO_FAST},
391 std::chrono::milliseconds(500),
392 false});
393
394 EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
395}
396
397// // Send enough messages to wrap the 32 bit send counter.
Austin Schuh20b2b082019-09-11 20:42:56 -0700398TEST_F(LocklessQueueTest, WrappedSend) {
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700399 PinForTest pin_cpu;
Austin Schuh20b2b082019-09-11 20:42:56 -0700400 uint64_t kNumMessages = 0x100010000ul;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700401 QueueRacer racer(queue(), 1, kNumMessages);
Austin Schuh20b2b082019-09-11 20:42:56 -0700402
403 const monotonic_clock::time_point start_time = monotonic_clock::now();
404 EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
405 const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
406 double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
407 monotonic_now - start_time)
408 .count();
409 printf("Took %f seconds to write %" PRIu64 " messages, %f messages/s\n",
410 elapsed_seconds, kNumMessages,
411 static_cast<double>(kNumMessages) / elapsed_seconds);
412}
413
414} // namespace testing
415} // namespace ipc_lib
416} // namespace aos