blob: bfd991620c923a7100bf291fe0465beedcf3e25d [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#include "aos/ipc_lib/lockless_queue.h"
2
Austin Schuhfaec51a2023-09-08 17:43:32 -07003#include <sys/mman.h>
Austin Schuh20b2b082019-09-11 20:42:56 -07004#include <unistd.h>
5#include <wait.h>
Brian Silverman7b266d92021-02-17 21:24:02 -08006
Austin Schuh20b2b082019-09-11 20:42:56 -07007#include <chrono>
Tyler Chatowbf0609c2021-07-31 16:13:27 -07008#include <cinttypes>
9#include <csignal>
Austin Schuh20b2b082019-09-11 20:42:56 -070010#include <memory>
11#include <random>
12#include <thread>
13
Philipp Schrader790cb542023-07-05 21:06:52 -070014#include "gflags/gflags.h"
15#include "gtest/gtest.h"
16
Austin Schuh20b2b082019-09-11 20:42:56 -070017#include "aos/events/epoll.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070018#include "aos/ipc_lib/aos_sync.h"
Brian Silverman7b266d92021-02-17 21:24:02 -080019#include "aos/ipc_lib/event.h"
Austin Schuhfaec51a2023-09-08 17:43:32 -070020#include "aos/ipc_lib/lockless_queue_memory.h"
21#include "aos/ipc_lib/lockless_queue_stepping.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070022#include "aos/ipc_lib/queue_racer.h"
23#include "aos/ipc_lib/signalfd.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070024#include "aos/realtime.h"
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070025#include "aos/util/phased_loop.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070026
27DEFINE_int32(min_iterations, 100,
28 "Minimum number of stress test iterations to run");
29DEFINE_int32(duration, 5, "Number of seconds to test for");
30DEFINE_int32(print_rate, 60, "Number of seconds between status prints");
31
32// The roboRIO can only handle 10 threads before exploding. Set the default for
33// ARM to 10.
34DEFINE_int32(thread_count,
35#if defined(__ARM_EABI__)
36 10,
37#else
38 100,
39#endif
40 "Number of threads to race");
41
Stephan Pleinesf63bde82024-01-13 15:59:33 -080042namespace aos::ipc_lib::testing {
Austin Schuh20b2b082019-09-11 20:42:56 -070043
44namespace chrono = ::std::chrono;
45
46class LocklessQueueTest : public ::testing::Test {
47 public:
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070048 static constexpr monotonic_clock::duration kChannelStorageDuration =
49 std::chrono::milliseconds(500);
50
Austin Schuh20b2b082019-09-11 20:42:56 -070051 LocklessQueueTest() {
Austin Schuh20b2b082019-09-11 20:42:56 -070052 config_.num_watchers = 10;
53 config_.num_senders = 100;
Brian Silverman177567e2020-08-12 19:51:33 -070054 config_.num_pinners = 5;
Austin Schuh20b2b082019-09-11 20:42:56 -070055 config_.queue_size = 10000;
56 // Exercise the alignment code. This would throw off alignment.
57 config_.message_data_size = 101;
58
59 // Since our backing store is an array of uint64_t for alignment purposes,
60 // normalize by the size.
61 memory_.resize(LocklessQueueMemorySize(config_) / sizeof(uint64_t));
62
63 Reset();
64 }
65
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070066 LocklessQueue queue() {
67 return LocklessQueue(reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
68 reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
69 config_);
Austin Schuh20b2b082019-09-11 20:42:56 -070070 }
71
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070072 void Reset() { memset(&memory_[0], 0, LocklessQueueMemorySize(config_)); }
Austin Schuh20b2b082019-09-11 20:42:56 -070073
74 // Runs until the signal is received.
75 void RunUntilWakeup(Event *ready, int priority) {
Austin Schuh20b2b082019-09-11 20:42:56 -070076 internal::EPoll epoll;
77 SignalFd signalfd({kWakeupSignal});
78
79 epoll.OnReadable(signalfd.fd(), [&signalfd, &epoll]() {
80 signalfd_siginfo result = signalfd.Read();
81
82 fprintf(stderr, "Got signal: %d\n", result.ssi_signo);
83 epoll.Quit();
84 });
85
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070086 {
87 // Register to be woken up *after* the signalfd is catching the signals.
88 LocklessQueueWatcher watcher =
89 LocklessQueueWatcher::Make(queue(), priority).value();
Austin Schuh20b2b082019-09-11 20:42:56 -070090
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070091 // And signal we are now ready.
92 ready->Set();
Austin Schuh20b2b082019-09-11 20:42:56 -070093
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070094 epoll.Run();
Austin Schuh20b2b082019-09-11 20:42:56 -070095
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070096 // Cleanup, ensuring the watcher is destroyed before the signalfd.
97 }
Austin Schuh20b2b082019-09-11 20:42:56 -070098 epoll.DeleteFd(signalfd.fd());
99 }
100
101 // Use a type with enough alignment that we are guarenteed that everything
102 // will be aligned properly on the target platform.
103 ::std::vector<uint64_t> memory_;
104
105 LocklessQueueConfiguration config_;
106};
107
Austin Schuh20b2b082019-09-11 20:42:56 -0700108// Tests that wakeup doesn't do anything if nothing was registered.
109TEST_F(LocklessQueueTest, NoWatcherWakeup) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700110 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700111
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700112 EXPECT_EQ(wake_upper.Wakeup(7), 0);
Austin Schuh20b2b082019-09-11 20:42:56 -0700113}
114
115// Tests that wakeup doesn't do anything if a wakeup was registered and then
116// unregistered.
117TEST_F(LocklessQueueTest, UnregisteredWatcherWakeup) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700118 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700119
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700120 { LocklessQueueWatcher::Make(queue(), 5).value(); }
Austin Schuh20b2b082019-09-11 20:42:56 -0700121
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700122 EXPECT_EQ(wake_upper.Wakeup(7), 0);
Austin Schuh20b2b082019-09-11 20:42:56 -0700123}
124
125// Tests that wakeup doesn't do anything if the thread dies.
126TEST_F(LocklessQueueTest, DiedWatcherWakeup) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700127 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700128
129 ::std::thread([this]() {
130 // Use placement new so the destructor doesn't get run.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700131 ::std::aligned_storage<sizeof(LocklessQueueWatcher),
132 alignof(LocklessQueueWatcher)>::type data;
133 new (&data)
134 LocklessQueueWatcher(LocklessQueueWatcher::Make(queue(), 5).value());
Brian Silverman7b266d92021-02-17 21:24:02 -0800135 }).join();
Austin Schuh20b2b082019-09-11 20:42:56 -0700136
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700137 EXPECT_EQ(wake_upper.Wakeup(7), 0);
Austin Schuh20b2b082019-09-11 20:42:56 -0700138}
139
140struct WatcherState {
141 ::std::thread t;
142 Event ready;
143};
144
145// Tests that too many watchers fails like expected.
146TEST_F(LocklessQueueTest, TooManyWatchers) {
147 // This is going to be a barrel of monkeys.
148 // We need to spin up a bunch of watchers. But, they all need to be in
149 // different threads so they have different tids.
150 ::std::vector<WatcherState> queues;
151 // Reserve num_watchers WatcherState objects so the pointer value doesn't
152 // change out from under us below.
153 queues.reserve(config_.num_watchers);
154
155 // Event used to trigger all the threads to unregister.
156 Event cleanup;
157
158 // Start all the threads.
159 for (size_t i = 0; i < config_.num_watchers; ++i) {
160 queues.emplace_back();
161
162 WatcherState *s = &queues.back();
163 queues.back().t = ::std::thread([this, &cleanup, s]() {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700164 LocklessQueueWatcher q = LocklessQueueWatcher::Make(queue(), 0).value();
Austin Schuh20b2b082019-09-11 20:42:56 -0700165
166 // Signal that this thread is ready.
167 s->ready.Set();
168
169 // And wait until we are asked to shut down.
170 cleanup.Wait();
Austin Schuh20b2b082019-09-11 20:42:56 -0700171 });
172 }
173
174 // Wait until all the threads are actually going.
175 for (WatcherState &w : queues) {
176 w.ready.Wait();
177 }
178
179 // Now try to allocate another one. This will fail.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700180 EXPECT_FALSE(LocklessQueueWatcher::Make(queue(), 0));
Austin Schuh20b2b082019-09-11 20:42:56 -0700181
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700182 // Trigger the threads to cleanup their resources, and wait until they are
Austin Schuh20b2b082019-09-11 20:42:56 -0700183 // done.
184 cleanup.Set();
185 for (WatcherState &w : queues) {
186 w.t.join();
187 }
188
189 // We should now be able to allocate a wakeup.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700190 EXPECT_TRUE(LocklessQueueWatcher::Make(queue(), 0));
Austin Schuh20b2b082019-09-11 20:42:56 -0700191}
192
193// Tests that too many watchers dies like expected.
Austin Schuhe516ab02020-05-06 21:37:04 -0700194TEST_F(LocklessQueueTest, TooManySenders) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700195 ::std::vector<LocklessQueueSender> senders;
Austin Schuhe516ab02020-05-06 21:37:04 -0700196 for (size_t i = 0; i < config_.num_senders; ++i) {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700197 senders.emplace_back(
198 LocklessQueueSender::Make(queue(), kChannelStorageDuration).value());
Austin Schuhe516ab02020-05-06 21:37:04 -0700199 }
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700200 EXPECT_FALSE(LocklessQueueSender::Make(queue(), kChannelStorageDuration));
Austin Schuh20b2b082019-09-11 20:42:56 -0700201}
202
203// Now, start 2 threads and have them receive the signals.
204TEST_F(LocklessQueueTest, WakeUpThreads) {
205 // Confirm that the wakeup signal is in range.
206 EXPECT_LE(kWakeupSignal, SIGRTMAX);
207 EXPECT_GE(kWakeupSignal, SIGRTMIN);
208
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700209 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700210
211 // Event used to make sure the thread is ready before the test starts.
212 Event ready1;
213 Event ready2;
214
215 // Start the thread.
Austin Schuh07290cd2022-08-16 18:01:14 -0700216 ::std::thread t1([this, &ready1]() { RunUntilWakeup(&ready1, 2); });
217 ::std::thread t2([this, &ready2]() { RunUntilWakeup(&ready2, 1); });
Austin Schuh20b2b082019-09-11 20:42:56 -0700218
219 ready1.Wait();
220 ready2.Wait();
221
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700222 EXPECT_EQ(wake_upper.Wakeup(3), 2);
Austin Schuh20b2b082019-09-11 20:42:56 -0700223
224 t1.join();
225 t2.join();
226
227 // Clean up afterwords. We are pretending to be RT when we are really not.
228 // So we will be PI boosted up.
229 UnsetCurrentThreadRealtimePriority();
230}
231
232// Do a simple send test.
233TEST_F(LocklessQueueTest, Send) {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700234 LocklessQueueSender sender =
235 LocklessQueueSender::Make(queue(), kChannelStorageDuration).value();
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700236 LocklessQueueReader reader(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700237
Austin Schuh82ea7382023-07-14 15:17:34 -0700238 time::PhasedLoop loop(kChannelStorageDuration / (config_.queue_size - 1),
239 monotonic_clock::now());
240 std::function<bool(const Context &)> should_read = [](const Context &) {
241 return true;
242 };
243
Austin Schuh20b2b082019-09-11 20:42:56 -0700244 // Send enough messages to wrap.
Austin Schuh82ea7382023-07-14 15:17:34 -0700245 for (int i = 0; i < 2 * static_cast<int>(config_.queue_size); ++i) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700246 // Confirm that the queue index makes sense given the number of sends.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700247 EXPECT_EQ(reader.LatestIndex().index(),
248 i == 0 ? QueueIndex::Invalid().index() : i - 1);
Austin Schuh20b2b082019-09-11 20:42:56 -0700249
250 // Send a trivial piece of data.
251 char data[100];
252 size_t s = snprintf(data, sizeof(data), "foobar%d", i);
Austin Schuh82ea7382023-07-14 15:17:34 -0700253 ASSERT_EQ(sender.Send(data, s, monotonic_clock::min_time,
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700254 realtime_clock::min_time, 0xffffffffu, UUID::Zero(),
255 nullptr, nullptr, nullptr),
256 LocklessQueueSender::Result::GOOD);
Austin Schuh20b2b082019-09-11 20:42:56 -0700257
258 // Confirm that the queue index still makes sense. This is easier since the
259 // empty case has been handled.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700260 EXPECT_EQ(reader.LatestIndex().index(), i);
Austin Schuh20b2b082019-09-11 20:42:56 -0700261
262 // Read a result from 5 in the past.
Austin Schuhb5c6f972021-03-14 21:53:07 -0700263 monotonic_clock::time_point monotonic_sent_time;
264 realtime_clock::time_point realtime_sent_time;
265 monotonic_clock::time_point monotonic_remote_time;
266 realtime_clock::time_point realtime_remote_time;
Austin Schuhad154822019-12-27 15:45:13 -0800267 uint32_t remote_queue_index;
Austin Schuha9012be2021-07-21 15:19:11 -0700268 UUID source_boot_uuid;
Austin Schuh20b2b082019-09-11 20:42:56 -0700269 char read_data[1024];
270 size_t length;
271
272 QueueIndex index = QueueIndex::Zero(config_.queue_size);
273 if (i - 5 < 0) {
274 index = index.DecrementBy(5 - i);
275 } else {
276 index = index.IncrementBy(i - 5);
277 }
Austin Schuh8902fa52021-03-14 22:39:24 -0700278 LocklessQueueReader::Result read_result = reader.Read(
279 index.index(), &monotonic_sent_time, &realtime_sent_time,
280 &monotonic_remote_time, &realtime_remote_time, &remote_queue_index,
Austin Schuh82ea7382023-07-14 15:17:34 -0700281 &source_boot_uuid, &length, &(read_data[0]), std::ref(should_read));
Austin Schuh20b2b082019-09-11 20:42:56 -0700282
283 // This should either return GOOD, or TOO_OLD if it is before the start of
284 // the queue.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700285 if (read_result != LocklessQueueReader::Result::GOOD) {
Austin Schuh82ea7382023-07-14 15:17:34 -0700286 ASSERT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
Austin Schuh20b2b082019-09-11 20:42:56 -0700287 }
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700288
289 loop.SleepUntilNext();
Austin Schuh20b2b082019-09-11 20:42:56 -0700290 }
291}
292
293// Races a bunch of sending threads to see if it all works.
294TEST_F(LocklessQueueTest, SendRace) {
295 const size_t kNumMessages = 10000 / FLAGS_thread_count;
296
297 ::std::mt19937 generator(0);
298 ::std::uniform_int_distribution<> write_wrap_count_distribution(0, 10);
299 ::std::bernoulli_distribution race_reads_distribution;
Austin Schuh82ea7382023-07-14 15:17:34 -0700300 ::std::bernoulli_distribution set_should_read_distribution;
301 ::std::bernoulli_distribution should_read_result_distribution;
Austin Schuh20b2b082019-09-11 20:42:56 -0700302 ::std::bernoulli_distribution wrap_writes_distribution;
303
304 const chrono::seconds print_frequency(FLAGS_print_rate);
305
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700306 QueueRacer racer(queue(), FLAGS_thread_count, kNumMessages);
307 const monotonic_clock::time_point start_time = monotonic_clock::now();
Austin Schuh20b2b082019-09-11 20:42:56 -0700308 const monotonic_clock::time_point end_time =
309 start_time + chrono::seconds(FLAGS_duration);
310
311 monotonic_clock::time_point monotonic_now = start_time;
312 monotonic_clock::time_point next_print_time = start_time + print_frequency;
313 uint64_t messages = 0;
314 for (int i = 0; i < FLAGS_min_iterations || monotonic_now < end_time; ++i) {
Austin Schuh82ea7382023-07-14 15:17:34 -0700315 const bool race_reads = race_reads_distribution(generator);
316 const bool set_should_read = set_should_read_distribution(generator);
317 const bool should_read_result = should_read_result_distribution(generator);
Austin Schuh20b2b082019-09-11 20:42:56 -0700318 int write_wrap_count = write_wrap_count_distribution(generator);
319 if (!wrap_writes_distribution(generator)) {
320 write_wrap_count = 0;
321 }
Austin Schuh82ea7382023-07-14 15:17:34 -0700322 EXPECT_NO_FATAL_FAILURE(racer.RunIteration(
323 race_reads, write_wrap_count, set_should_read, should_read_result))
Austin Schuh20b2b082019-09-11 20:42:56 -0700324 << ": Running with race_reads: " << race_reads
325 << ", and write_wrap_count " << write_wrap_count << " and on iteration "
326 << i;
327
328 messages += racer.CurrentIndex();
329
330 monotonic_now = monotonic_clock::now();
331 if (monotonic_now > next_print_time) {
332 double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
333 monotonic_now - start_time)
334 .count();
335 printf("Finished iteration %d, %f iterations/sec, %f messages/second\n",
336 i, i / elapsed_seconds,
337 static_cast<double>(messages) / elapsed_seconds);
338 next_print_time = monotonic_now + print_frequency;
339 }
340 }
341}
342
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700343namespace {
344
Austin Schuh07290cd2022-08-16 18:01:14 -0700345// Temporarily pins the current thread to the first 2 available CPUs.
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700346// This speeds up the test on some machines a lot (~4x). It also preserves
347// opportunities for the 2 threads to race each other.
348class PinForTest {
349 public:
350 PinForTest() {
Austin Schuh07290cd2022-08-16 18:01:14 -0700351 cpu_set_t cpus = GetCurrentThreadAffinity();
352 old_ = cpus;
353 int number_found = 0;
354 for (int i = 0; i < CPU_SETSIZE; ++i) {
355 if (CPU_ISSET(i, &cpus)) {
356 if (number_found < 2) {
357 ++number_found;
358 } else {
359 CPU_CLR(i, &cpus);
360 }
361 }
362 }
363 SetCurrentThreadAffinity(cpus);
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700364 }
Austin Schuh07290cd2022-08-16 18:01:14 -0700365 ~PinForTest() { SetCurrentThreadAffinity(old_); }
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700366
367 private:
368 cpu_set_t old_;
369};
370
371} // namespace
372
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700373class LocklessQueueTestTooFast : public LocklessQueueTest {
374 public:
375 LocklessQueueTestTooFast() {
376 // Force a scenario where senders get rate limited
377 config_.num_watchers = 1000;
378 config_.num_senders = 100;
379 config_.num_pinners = 5;
380 config_.queue_size = 100;
381 // Exercise the alignment code. This would throw off alignment.
382 config_.message_data_size = 101;
383
384 // Since our backing store is an array of uint64_t for alignment purposes,
385 // normalize by the size.
386 memory_.resize(LocklessQueueMemorySize(config_) / sizeof(uint64_t));
387
388 Reset();
389 }
390};
391
392// Ensure we always return OK or MESSAGES_SENT_TOO_FAST under an extreme load
393// on the Sender Queue.
394TEST_F(LocklessQueueTestTooFast, MessagesSentTooFast) {
395 PinForTest pin_cpu;
396 uint64_t kNumMessages = 1000000;
397 QueueRacer racer(queue(),
398 {FLAGS_thread_count,
399 kNumMessages,
400 {LocklessQueueSender::Result::GOOD,
401 LocklessQueueSender::Result::MESSAGES_SENT_TOO_FAST},
402 std::chrono::milliseconds(500),
403 false});
404
Austin Schuh82ea7382023-07-14 15:17:34 -0700405 EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0, true, true));
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700406}
407
408// // Send enough messages to wrap the 32 bit send counter.
Austin Schuh20b2b082019-09-11 20:42:56 -0700409TEST_F(LocklessQueueTest, WrappedSend) {
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700410 PinForTest pin_cpu;
Austin Schuh20b2b082019-09-11 20:42:56 -0700411 uint64_t kNumMessages = 0x100010000ul;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700412 QueueRacer racer(queue(), 1, kNumMessages);
Austin Schuh20b2b082019-09-11 20:42:56 -0700413
414 const monotonic_clock::time_point start_time = monotonic_clock::now();
Austin Schuh82ea7382023-07-14 15:17:34 -0700415 EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0, false, true));
Austin Schuh20b2b082019-09-11 20:42:56 -0700416 const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
417 double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
418 monotonic_now - start_time)
419 .count();
420 printf("Took %f seconds to write %" PRIu64 " messages, %f messages/s\n",
421 elapsed_seconds, kNumMessages,
422 static_cast<double>(kNumMessages) / elapsed_seconds);
423}
424
Austin Schuhfaec51a2023-09-08 17:43:32 -0700425#if defined(SUPPORTS_SHM_ROBUSTNESS_TEST)
426
427// Verifies that LatestIndex points to the same message as the logic from
428// "FetchNext", which increments the index until it gets "NOTHING_NEW" back.
429// This is so we can confirm fetchers and watchers all see the same message at
430// the same point in time.
431int VerifyMessages(LocklessQueue *queue, LocklessQueueMemory *memory) {
432 LocklessQueueReader reader(*queue);
433
434 const ipc_lib::QueueIndex queue_index = reader.LatestIndex();
435 if (!queue_index.valid()) {
436 return 0;
437 }
438
439 // Now loop through the queue and make sure the number in the snprintf
440 // increments.
441 char last_data = '0';
442 int i = 0;
443
444 // Callback which isn't set so we don't exercise the conditional reading code.
445 std::function<bool(const Context &)> should_read_callback;
446
447 // Now, read as far as we can until we get NOTHING_NEW. This simulates
448 // FetchNext.
449 while (true) {
450 monotonic_clock::time_point monotonic_sent_time;
451 realtime_clock::time_point realtime_sent_time;
452 monotonic_clock::time_point monotonic_remote_time;
453 realtime_clock::time_point realtime_remote_time;
454 uint32_t remote_queue_index;
455 UUID source_boot_uuid;
456 char read_data[1024];
457 size_t length;
458
459 LocklessQueueReader::Result read_result = reader.Read(
460 i, &monotonic_sent_time, &realtime_sent_time, &monotonic_remote_time,
461 &realtime_remote_time, &remote_queue_index, &source_boot_uuid, &length,
Austin Schuh0bd410a2023-11-05 12:38:12 -0800462 &(read_data[0]), should_read_callback);
Austin Schuhfaec51a2023-09-08 17:43:32 -0700463
464 if (read_result != LocklessQueueReader::Result::GOOD) {
465 if (read_result == LocklessQueueReader::Result::TOO_OLD) {
466 ++i;
467 continue;
468 }
469 CHECK(read_result == LocklessQueueReader::Result::NOTHING_NEW)
470 << ": " << static_cast<int>(read_result);
471 break;
472 }
473
474 EXPECT_GT(read_data[LocklessQueueMessageDataSize(memory) - length + 6],
475 last_data)
476 << ": Got " << read_data << " for " << i;
477 last_data = read_data[LocklessQueueMessageDataSize(memory) - length + 6];
478
479 ++i;
480 }
481
482 // The latest queue index should match the fetched queue index.
483 if (i == 0) {
484 EXPECT_FALSE(queue_index.valid());
485 } else {
486 EXPECT_EQ(queue_index.index(), i - 1);
487 }
488 return i;
489}
490
491// Tests that at all points in the publish step, fetch == fetch next. This
492// means that there is an atomic point at which the message is viewed as visible
493// to consumers. Do this by killing the writer after each change to shared
494// memory, and confirming fetch == fetch next each time.
495TEST_F(LocklessQueueTest, FetchEqFetchNext) {
496 SharedTid tid;
497
498 // Make a small queue so it is easier to debug.
499 LocklessQueueConfiguration config;
500 config.num_watchers = 1;
501 config.num_senders = 2;
502 config.num_pinners = 0;
503 config.queue_size = 3;
504 config.message_data_size = 32;
505
506 TestShmRobustness(
507 config,
508 [config, &tid](void *memory) {
509 // Initialize the queue.
510 LocklessQueue(
511 reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
512 reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
513 config)
514 .Initialize();
515 tid.Set();
516 },
517 [config](void *memory) {
518 LocklessQueue queue(
519 reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
520 reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
521 config);
522 // Now try to write some messages. We will get killed a bunch as this
523 // tries to happen.
524 LocklessQueueSender sender =
525 LocklessQueueSender::Make(queue, chrono::nanoseconds(1)).value();
526 for (int i = 0; i < 5; ++i) {
527 char data[100];
528 size_t s = snprintf(data, sizeof(data), "foobar%d", i + 1);
529 ASSERT_EQ(sender.Send(data, s + 1, monotonic_clock::min_time,
530 realtime_clock::min_time, 0xffffffffl,
531 UUID::Zero(), nullptr, nullptr, nullptr),
532 LocklessQueueSender::Result::GOOD);
533 }
534 },
535 [config, &tid](void *raw_memory) {
536 ::aos::ipc_lib::LocklessQueueMemory *const memory =
537 reinterpret_cast<::aos::ipc_lib::LocklessQueueMemory *>(raw_memory);
538 LocklessQueue queue(memory, memory, config);
Philipp Schraderab2f8432023-09-17 18:58:06 -0700539 PretendThatOwnerIsDeadForTesting(&memory->queue_setup_lock, tid.Get());
Austin Schuhfaec51a2023-09-08 17:43:32 -0700540
541 if (VLOG_IS_ON(1)) {
542 PrintLocklessQueueMemory(memory);
543 }
544
545 const int i = VerifyMessages(&queue, memory);
546
547 LocklessQueueSender sender =
548 LocklessQueueSender::Make(queue, chrono::nanoseconds(1)).value();
549 {
550 char data[100];
551 size_t s = snprintf(data, sizeof(data), "foobar%d", i + 1);
552 ASSERT_EQ(sender.Send(data, s + 1, monotonic_clock::min_time,
553 realtime_clock::min_time, 0xffffffffl,
554 UUID::Zero(), nullptr, nullptr, nullptr),
555 LocklessQueueSender::Result::GOOD);
556 }
557
558 // Now, make sure we can send 1 message and receive it to confirm we
559 // haven't corrupted next_queue_index irrevocably.
560 const int newi = VerifyMessages(&queue, memory);
561 EXPECT_EQ(newi, i + 1);
562 });
563}
564
565#endif
566
Stephan Pleinesf63bde82024-01-13 15:59:33 -0800567} // namespace aos::ipc_lib::testing