blob: bdb9755a3c3e018c1df0bd8432faa83293d36b01 [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#include "aos/ipc_lib/lockless_queue.h"
2
Stephan Pleines682928d2024-05-31 20:43:48 -07003#include <sched.h>
4#include <stdio.h>
5#include <string.h>
6#include <sys/signalfd.h>
Brian Silverman7b266d92021-02-17 21:24:02 -08007
Stephan Pleines682928d2024-05-31 20:43:48 -07008#include <algorithm>
Austin Schuh20b2b082019-09-11 20:42:56 -07009#include <chrono>
Tyler Chatowbf0609c2021-07-31 16:13:27 -070010#include <cinttypes>
Stephan Pleines682928d2024-05-31 20:43:48 -070011#include <compare>
Tyler Chatowbf0609c2021-07-31 16:13:27 -070012#include <csignal>
Stephan Pleines682928d2024-05-31 20:43:48 -070013#include <new>
14#include <ostream>
Austin Schuh20b2b082019-09-11 20:42:56 -070015#include <random>
Stephan Pleines682928d2024-05-31 20:43:48 -070016#include <ratio>
Austin Schuh20b2b082019-09-11 20:42:56 -070017#include <thread>
Stephan Pleines682928d2024-05-31 20:43:48 -070018#include <type_traits>
Austin Schuh20b2b082019-09-11 20:42:56 -070019
Austin Schuh99f7c6a2024-06-25 22:07:44 -070020#include "absl/flags/flag.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070021#include "gtest/gtest.h"
22
Austin Schuh20b2b082019-09-11 20:42:56 -070023#include "aos/events/epoll.h"
Brian Silverman7b266d92021-02-17 21:24:02 -080024#include "aos/ipc_lib/event.h"
Austin Schuhfaec51a2023-09-08 17:43:32 -070025#include "aos/ipc_lib/lockless_queue_memory.h"
26#include "aos/ipc_lib/lockless_queue_stepping.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070027#include "aos/ipc_lib/queue_racer.h"
28#include "aos/ipc_lib/signalfd.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070029#include "aos/realtime.h"
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070030#include "aos/util/phased_loop.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070031
Austin Schuh99f7c6a2024-06-25 22:07:44 -070032ABSL_FLAG(int32_t, min_iterations, 100,
33 "Minimum number of stress test iterations to run");
34ABSL_FLAG(int32_t, duration, 5, "Number of seconds to test for");
35ABSL_FLAG(int32_t, print_rate, 60, "Number of seconds between status prints");
Austin Schuh20b2b082019-09-11 20:42:56 -070036
37// The roboRIO can only handle 10 threads before exploding. Set the default for
38// ARM to 10.
Austin Schuh99f7c6a2024-06-25 22:07:44 -070039ABSL_FLAG(int32_t, thread_count,
Austin Schuh20b2b082019-09-11 20:42:56 -070040#if defined(__ARM_EABI__)
Austin Schuh99f7c6a2024-06-25 22:07:44 -070041 10,
Austin Schuh20b2b082019-09-11 20:42:56 -070042#else
Austin Schuh99f7c6a2024-06-25 22:07:44 -070043 100,
Austin Schuh20b2b082019-09-11 20:42:56 -070044#endif
Austin Schuh99f7c6a2024-06-25 22:07:44 -070045 "Number of threads to race");
Austin Schuh20b2b082019-09-11 20:42:56 -070046
Stephan Pleinesf63bde82024-01-13 15:59:33 -080047namespace aos::ipc_lib::testing {
Austin Schuh20b2b082019-09-11 20:42:56 -070048
49namespace chrono = ::std::chrono;
50
51class LocklessQueueTest : public ::testing::Test {
52 public:
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070053 static constexpr monotonic_clock::duration kChannelStorageDuration =
54 std::chrono::milliseconds(500);
55
Austin Schuh20b2b082019-09-11 20:42:56 -070056 LocklessQueueTest() {
Austin Schuh20b2b082019-09-11 20:42:56 -070057 config_.num_watchers = 10;
58 config_.num_senders = 100;
Brian Silverman177567e2020-08-12 19:51:33 -070059 config_.num_pinners = 5;
Austin Schuh20b2b082019-09-11 20:42:56 -070060 config_.queue_size = 10000;
61 // Exercise the alignment code. This would throw off alignment.
62 config_.message_data_size = 101;
63
64 // Since our backing store is an array of uint64_t for alignment purposes,
65 // normalize by the size.
66 memory_.resize(LocklessQueueMemorySize(config_) / sizeof(uint64_t));
67
68 Reset();
69 }
70
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070071 LocklessQueue queue() {
72 return LocklessQueue(reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
73 reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
74 config_);
Austin Schuh20b2b082019-09-11 20:42:56 -070075 }
76
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070077 void Reset() { memset(&memory_[0], 0, LocklessQueueMemorySize(config_)); }
Austin Schuh20b2b082019-09-11 20:42:56 -070078
79 // Runs until the signal is received.
80 void RunUntilWakeup(Event *ready, int priority) {
Austin Schuh20b2b082019-09-11 20:42:56 -070081 internal::EPoll epoll;
82 SignalFd signalfd({kWakeupSignal});
83
84 epoll.OnReadable(signalfd.fd(), [&signalfd, &epoll]() {
85 signalfd_siginfo result = signalfd.Read();
86
87 fprintf(stderr, "Got signal: %d\n", result.ssi_signo);
88 epoll.Quit();
89 });
90
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070091 {
92 // Register to be woken up *after* the signalfd is catching the signals.
93 LocklessQueueWatcher watcher =
94 LocklessQueueWatcher::Make(queue(), priority).value();
Austin Schuh20b2b082019-09-11 20:42:56 -070095
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070096 // And signal we are now ready.
97 ready->Set();
Austin Schuh20b2b082019-09-11 20:42:56 -070098
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070099 epoll.Run();
Austin Schuh20b2b082019-09-11 20:42:56 -0700100
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700101 // Cleanup, ensuring the watcher is destroyed before the signalfd.
102 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700103 epoll.DeleteFd(signalfd.fd());
104 }
105
106 // Use a type with enough alignment that we are guarenteed that everything
107 // will be aligned properly on the target platform.
108 ::std::vector<uint64_t> memory_;
109
110 LocklessQueueConfiguration config_;
111};
112
Austin Schuh20b2b082019-09-11 20:42:56 -0700113// Tests that wakeup doesn't do anything if nothing was registered.
114TEST_F(LocklessQueueTest, NoWatcherWakeup) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700115 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700116
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700117 EXPECT_EQ(wake_upper.Wakeup(7), 0);
Austin Schuh20b2b082019-09-11 20:42:56 -0700118}
119
120// Tests that wakeup doesn't do anything if a wakeup was registered and then
121// unregistered.
122TEST_F(LocklessQueueTest, UnregisteredWatcherWakeup) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700123 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700124
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700125 { LocklessQueueWatcher::Make(queue(), 5).value(); }
Austin Schuh20b2b082019-09-11 20:42:56 -0700126
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700127 EXPECT_EQ(wake_upper.Wakeup(7), 0);
Austin Schuh20b2b082019-09-11 20:42:56 -0700128}
129
130// Tests that wakeup doesn't do anything if the thread dies.
131TEST_F(LocklessQueueTest, DiedWatcherWakeup) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700132 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700133
134 ::std::thread([this]() {
135 // Use placement new so the destructor doesn't get run.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700136 ::std::aligned_storage<sizeof(LocklessQueueWatcher),
137 alignof(LocklessQueueWatcher)>::type data;
138 new (&data)
139 LocklessQueueWatcher(LocklessQueueWatcher::Make(queue(), 5).value());
Brian Silverman7b266d92021-02-17 21:24:02 -0800140 }).join();
Austin Schuh20b2b082019-09-11 20:42:56 -0700141
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700142 EXPECT_EQ(wake_upper.Wakeup(7), 0);
Austin Schuh20b2b082019-09-11 20:42:56 -0700143}
144
145struct WatcherState {
146 ::std::thread t;
147 Event ready;
148};
149
150// Tests that too many watchers fails like expected.
151TEST_F(LocklessQueueTest, TooManyWatchers) {
152 // This is going to be a barrel of monkeys.
153 // We need to spin up a bunch of watchers. But, they all need to be in
154 // different threads so they have different tids.
155 ::std::vector<WatcherState> queues;
156 // Reserve num_watchers WatcherState objects so the pointer value doesn't
157 // change out from under us below.
158 queues.reserve(config_.num_watchers);
159
160 // Event used to trigger all the threads to unregister.
161 Event cleanup;
162
163 // Start all the threads.
164 for (size_t i = 0; i < config_.num_watchers; ++i) {
165 queues.emplace_back();
166
167 WatcherState *s = &queues.back();
168 queues.back().t = ::std::thread([this, &cleanup, s]() {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700169 LocklessQueueWatcher q = LocklessQueueWatcher::Make(queue(), 0).value();
Austin Schuh20b2b082019-09-11 20:42:56 -0700170
171 // Signal that this thread is ready.
172 s->ready.Set();
173
174 // And wait until we are asked to shut down.
175 cleanup.Wait();
Austin Schuh20b2b082019-09-11 20:42:56 -0700176 });
177 }
178
179 // Wait until all the threads are actually going.
180 for (WatcherState &w : queues) {
181 w.ready.Wait();
182 }
183
184 // Now try to allocate another one. This will fail.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700185 EXPECT_FALSE(LocklessQueueWatcher::Make(queue(), 0));
Austin Schuh20b2b082019-09-11 20:42:56 -0700186
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700187 // Trigger the threads to cleanup their resources, and wait until they are
Austin Schuh20b2b082019-09-11 20:42:56 -0700188 // done.
189 cleanup.Set();
190 for (WatcherState &w : queues) {
191 w.t.join();
192 }
193
194 // We should now be able to allocate a wakeup.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700195 EXPECT_TRUE(LocklessQueueWatcher::Make(queue(), 0));
Austin Schuh20b2b082019-09-11 20:42:56 -0700196}
197
198// Tests that too many watchers dies like expected.
Austin Schuhe516ab02020-05-06 21:37:04 -0700199TEST_F(LocklessQueueTest, TooManySenders) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700200 ::std::vector<LocklessQueueSender> senders;
Austin Schuhe516ab02020-05-06 21:37:04 -0700201 for (size_t i = 0; i < config_.num_senders; ++i) {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700202 senders.emplace_back(
203 LocklessQueueSender::Make(queue(), kChannelStorageDuration).value());
Austin Schuhe516ab02020-05-06 21:37:04 -0700204 }
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700205 EXPECT_FALSE(LocklessQueueSender::Make(queue(), kChannelStorageDuration));
Austin Schuh20b2b082019-09-11 20:42:56 -0700206}
207
208// Now, start 2 threads and have them receive the signals.
209TEST_F(LocklessQueueTest, WakeUpThreads) {
210 // Confirm that the wakeup signal is in range.
211 EXPECT_LE(kWakeupSignal, SIGRTMAX);
212 EXPECT_GE(kWakeupSignal, SIGRTMIN);
213
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700214 LocklessQueueWakeUpper wake_upper(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700215
216 // Event used to make sure the thread is ready before the test starts.
217 Event ready1;
218 Event ready2;
219
220 // Start the thread.
Austin Schuh07290cd2022-08-16 18:01:14 -0700221 ::std::thread t1([this, &ready1]() { RunUntilWakeup(&ready1, 2); });
222 ::std::thread t2([this, &ready2]() { RunUntilWakeup(&ready2, 1); });
Austin Schuh20b2b082019-09-11 20:42:56 -0700223
224 ready1.Wait();
225 ready2.Wait();
226
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700227 EXPECT_EQ(wake_upper.Wakeup(3), 2);
Austin Schuh20b2b082019-09-11 20:42:56 -0700228
229 t1.join();
230 t2.join();
231
232 // Clean up afterwords. We are pretending to be RT when we are really not.
233 // So we will be PI boosted up.
234 UnsetCurrentThreadRealtimePriority();
235}
236
237// Do a simple send test.
238TEST_F(LocklessQueueTest, Send) {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700239 LocklessQueueSender sender =
240 LocklessQueueSender::Make(queue(), kChannelStorageDuration).value();
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700241 LocklessQueueReader reader(queue());
Austin Schuh20b2b082019-09-11 20:42:56 -0700242
Austin Schuh82ea7382023-07-14 15:17:34 -0700243 time::PhasedLoop loop(kChannelStorageDuration / (config_.queue_size - 1),
244 monotonic_clock::now());
245 std::function<bool(const Context &)> should_read = [](const Context &) {
246 return true;
247 };
248
Austin Schuh20b2b082019-09-11 20:42:56 -0700249 // Send enough messages to wrap.
Austin Schuh82ea7382023-07-14 15:17:34 -0700250 for (int i = 0; i < 2 * static_cast<int>(config_.queue_size); ++i) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700251 // Confirm that the queue index makes sense given the number of sends.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700252 EXPECT_EQ(reader.LatestIndex().index(),
253 i == 0 ? QueueIndex::Invalid().index() : i - 1);
Austin Schuh20b2b082019-09-11 20:42:56 -0700254
255 // Send a trivial piece of data.
256 char data[100];
257 size_t s = snprintf(data, sizeof(data), "foobar%d", i);
Austin Schuh82ea7382023-07-14 15:17:34 -0700258 ASSERT_EQ(sender.Send(data, s, monotonic_clock::min_time,
Austin Schuhac6d89e2024-03-27 14:56:09 -0700259 realtime_clock::min_time, monotonic_clock::min_time,
260 0xffffffffu, UUID::Zero(), nullptr, nullptr, nullptr),
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700261 LocklessQueueSender::Result::GOOD);
Austin Schuh20b2b082019-09-11 20:42:56 -0700262
263 // Confirm that the queue index still makes sense. This is easier since the
264 // empty case has been handled.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700265 EXPECT_EQ(reader.LatestIndex().index(), i);
Austin Schuh20b2b082019-09-11 20:42:56 -0700266
267 // Read a result from 5 in the past.
Austin Schuhb5c6f972021-03-14 21:53:07 -0700268 monotonic_clock::time_point monotonic_sent_time;
269 realtime_clock::time_point realtime_sent_time;
270 monotonic_clock::time_point monotonic_remote_time;
Austin Schuhac6d89e2024-03-27 14:56:09 -0700271 monotonic_clock::time_point monotonic_remote_transmit_time;
Austin Schuhb5c6f972021-03-14 21:53:07 -0700272 realtime_clock::time_point realtime_remote_time;
Austin Schuhad154822019-12-27 15:45:13 -0800273 uint32_t remote_queue_index;
Austin Schuha9012be2021-07-21 15:19:11 -0700274 UUID source_boot_uuid;
Austin Schuh20b2b082019-09-11 20:42:56 -0700275 char read_data[1024];
276 size_t length;
277
278 QueueIndex index = QueueIndex::Zero(config_.queue_size);
279 if (i - 5 < 0) {
280 index = index.DecrementBy(5 - i);
281 } else {
282 index = index.IncrementBy(i - 5);
283 }
Austin Schuh8902fa52021-03-14 22:39:24 -0700284 LocklessQueueReader::Result read_result = reader.Read(
285 index.index(), &monotonic_sent_time, &realtime_sent_time,
Austin Schuhac6d89e2024-03-27 14:56:09 -0700286 &monotonic_remote_time, &monotonic_remote_transmit_time,
287 &realtime_remote_time, &remote_queue_index, &source_boot_uuid, &length,
288 &(read_data[0]), std::ref(should_read));
Austin Schuh20b2b082019-09-11 20:42:56 -0700289
290 // This should either return GOOD, or TOO_OLD if it is before the start of
291 // the queue.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700292 if (read_result != LocklessQueueReader::Result::GOOD) {
Austin Schuh82ea7382023-07-14 15:17:34 -0700293 ASSERT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
Austin Schuh20b2b082019-09-11 20:42:56 -0700294 }
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700295
296 loop.SleepUntilNext();
Austin Schuh20b2b082019-09-11 20:42:56 -0700297 }
298}
299
300// Races a bunch of sending threads to see if it all works.
301TEST_F(LocklessQueueTest, SendRace) {
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700302 const size_t kNumMessages = 10000 / absl::GetFlag(FLAGS_thread_count);
Austin Schuh20b2b082019-09-11 20:42:56 -0700303
304 ::std::mt19937 generator(0);
305 ::std::uniform_int_distribution<> write_wrap_count_distribution(0, 10);
306 ::std::bernoulli_distribution race_reads_distribution;
Austin Schuh82ea7382023-07-14 15:17:34 -0700307 ::std::bernoulli_distribution set_should_read_distribution;
308 ::std::bernoulli_distribution should_read_result_distribution;
Austin Schuh20b2b082019-09-11 20:42:56 -0700309 ::std::bernoulli_distribution wrap_writes_distribution;
310
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700311 const chrono::seconds print_frequency(absl::GetFlag(FLAGS_print_rate));
Austin Schuh20b2b082019-09-11 20:42:56 -0700312
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700313 QueueRacer racer(queue(), absl::GetFlag(FLAGS_thread_count), kNumMessages);
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700314 const monotonic_clock::time_point start_time = monotonic_clock::now();
Austin Schuh20b2b082019-09-11 20:42:56 -0700315 const monotonic_clock::time_point end_time =
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700316 start_time + chrono::seconds(absl::GetFlag(FLAGS_duration));
Austin Schuh20b2b082019-09-11 20:42:56 -0700317
318 monotonic_clock::time_point monotonic_now = start_time;
319 monotonic_clock::time_point next_print_time = start_time + print_frequency;
320 uint64_t messages = 0;
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700321 for (int i = 0;
322 i < absl::GetFlag(FLAGS_min_iterations) || monotonic_now < end_time;
323 ++i) {
Austin Schuh82ea7382023-07-14 15:17:34 -0700324 const bool race_reads = race_reads_distribution(generator);
325 const bool set_should_read = set_should_read_distribution(generator);
326 const bool should_read_result = should_read_result_distribution(generator);
Austin Schuh20b2b082019-09-11 20:42:56 -0700327 int write_wrap_count = write_wrap_count_distribution(generator);
328 if (!wrap_writes_distribution(generator)) {
329 write_wrap_count = 0;
330 }
Austin Schuh82ea7382023-07-14 15:17:34 -0700331 EXPECT_NO_FATAL_FAILURE(racer.RunIteration(
332 race_reads, write_wrap_count, set_should_read, should_read_result))
Austin Schuh20b2b082019-09-11 20:42:56 -0700333 << ": Running with race_reads: " << race_reads
334 << ", and write_wrap_count " << write_wrap_count << " and on iteration "
335 << i;
336
337 messages += racer.CurrentIndex();
338
339 monotonic_now = monotonic_clock::now();
340 if (monotonic_now > next_print_time) {
341 double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
342 monotonic_now - start_time)
343 .count();
344 printf("Finished iteration %d, %f iterations/sec, %f messages/second\n",
345 i, i / elapsed_seconds,
346 static_cast<double>(messages) / elapsed_seconds);
347 next_print_time = monotonic_now + print_frequency;
348 }
349 }
350}
351
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700352namespace {
353
Austin Schuh07290cd2022-08-16 18:01:14 -0700354// Temporarily pins the current thread to the first 2 available CPUs.
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700355// This speeds up the test on some machines a lot (~4x). It also preserves
356// opportunities for the 2 threads to race each other.
357class PinForTest {
358 public:
359 PinForTest() {
Austin Schuh07290cd2022-08-16 18:01:14 -0700360 cpu_set_t cpus = GetCurrentThreadAffinity();
361 old_ = cpus;
362 int number_found = 0;
363 for (int i = 0; i < CPU_SETSIZE; ++i) {
364 if (CPU_ISSET(i, &cpus)) {
365 if (number_found < 2) {
366 ++number_found;
367 } else {
368 CPU_CLR(i, &cpus);
369 }
370 }
371 }
372 SetCurrentThreadAffinity(cpus);
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700373 }
Austin Schuh07290cd2022-08-16 18:01:14 -0700374 ~PinForTest() { SetCurrentThreadAffinity(old_); }
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700375
376 private:
377 cpu_set_t old_;
378};
379
380} // namespace
381
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700382class LocklessQueueTestTooFast : public LocklessQueueTest {
383 public:
384 LocklessQueueTestTooFast() {
385 // Force a scenario where senders get rate limited
386 config_.num_watchers = 1000;
387 config_.num_senders = 100;
388 config_.num_pinners = 5;
389 config_.queue_size = 100;
390 // Exercise the alignment code. This would throw off alignment.
391 config_.message_data_size = 101;
392
393 // Since our backing store is an array of uint64_t for alignment purposes,
394 // normalize by the size.
395 memory_.resize(LocklessQueueMemorySize(config_) / sizeof(uint64_t));
396
397 Reset();
398 }
399};
400
401// Ensure we always return OK or MESSAGES_SENT_TOO_FAST under an extreme load
402// on the Sender Queue.
403TEST_F(LocklessQueueTestTooFast, MessagesSentTooFast) {
404 PinForTest pin_cpu;
405 uint64_t kNumMessages = 1000000;
406 QueueRacer racer(queue(),
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700407 {absl::GetFlag(FLAGS_thread_count),
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700408 kNumMessages,
409 {LocklessQueueSender::Result::GOOD,
410 LocklessQueueSender::Result::MESSAGES_SENT_TOO_FAST},
411 std::chrono::milliseconds(500),
412 false});
413
Austin Schuh82ea7382023-07-14 15:17:34 -0700414 EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0, true, true));
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700415}
416
417// // Send enough messages to wrap the 32 bit send counter.
Austin Schuh20b2b082019-09-11 20:42:56 -0700418TEST_F(LocklessQueueTest, WrappedSend) {
Brian Silverman0eaa1da2020-08-12 20:03:52 -0700419 PinForTest pin_cpu;
Austin Schuh20b2b082019-09-11 20:42:56 -0700420 uint64_t kNumMessages = 0x100010000ul;
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700421 QueueRacer racer(queue(), 1, kNumMessages);
Austin Schuh20b2b082019-09-11 20:42:56 -0700422
423 const monotonic_clock::time_point start_time = monotonic_clock::now();
Austin Schuh82ea7382023-07-14 15:17:34 -0700424 EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0, false, true));
Austin Schuh20b2b082019-09-11 20:42:56 -0700425 const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
426 double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
427 monotonic_now - start_time)
428 .count();
429 printf("Took %f seconds to write %" PRIu64 " messages, %f messages/s\n",
430 elapsed_seconds, kNumMessages,
431 static_cast<double>(kNumMessages) / elapsed_seconds);
432}
433
Austin Schuhfaec51a2023-09-08 17:43:32 -0700434#if defined(SUPPORTS_SHM_ROBUSTNESS_TEST)
435
436// Verifies that LatestIndex points to the same message as the logic from
437// "FetchNext", which increments the index until it gets "NOTHING_NEW" back.
438// This is so we can confirm fetchers and watchers all see the same message at
439// the same point in time.
440int VerifyMessages(LocklessQueue *queue, LocklessQueueMemory *memory) {
441 LocklessQueueReader reader(*queue);
442
443 const ipc_lib::QueueIndex queue_index = reader.LatestIndex();
444 if (!queue_index.valid()) {
445 return 0;
446 }
447
448 // Now loop through the queue and make sure the number in the snprintf
449 // increments.
450 char last_data = '0';
451 int i = 0;
452
453 // Callback which isn't set so we don't exercise the conditional reading code.
454 std::function<bool(const Context &)> should_read_callback;
455
456 // Now, read as far as we can until we get NOTHING_NEW. This simulates
457 // FetchNext.
458 while (true) {
459 monotonic_clock::time_point monotonic_sent_time;
460 realtime_clock::time_point realtime_sent_time;
461 monotonic_clock::time_point monotonic_remote_time;
Austin Schuhac6d89e2024-03-27 14:56:09 -0700462 monotonic_clock::time_point monotonic_remote_transmit_time;
Austin Schuhfaec51a2023-09-08 17:43:32 -0700463 realtime_clock::time_point realtime_remote_time;
464 uint32_t remote_queue_index;
465 UUID source_boot_uuid;
466 char read_data[1024];
467 size_t length;
468
469 LocklessQueueReader::Result read_result = reader.Read(
470 i, &monotonic_sent_time, &realtime_sent_time, &monotonic_remote_time,
Austin Schuhac6d89e2024-03-27 14:56:09 -0700471 &monotonic_remote_transmit_time, &realtime_remote_time,
472 &remote_queue_index, &source_boot_uuid, &length, &(read_data[0]),
473 should_read_callback);
Austin Schuhfaec51a2023-09-08 17:43:32 -0700474
475 if (read_result != LocklessQueueReader::Result::GOOD) {
476 if (read_result == LocklessQueueReader::Result::TOO_OLD) {
477 ++i;
478 continue;
479 }
480 CHECK(read_result == LocklessQueueReader::Result::NOTHING_NEW)
481 << ": " << static_cast<int>(read_result);
482 break;
483 }
484
485 EXPECT_GT(read_data[LocklessQueueMessageDataSize(memory) - length + 6],
486 last_data)
487 << ": Got " << read_data << " for " << i;
488 last_data = read_data[LocklessQueueMessageDataSize(memory) - length + 6];
489
490 ++i;
491 }
492
493 // The latest queue index should match the fetched queue index.
494 if (i == 0) {
495 EXPECT_FALSE(queue_index.valid());
496 } else {
497 EXPECT_EQ(queue_index.index(), i - 1);
498 }
499 return i;
500}
501
502// Tests that at all points in the publish step, fetch == fetch next. This
503// means that there is an atomic point at which the message is viewed as visible
504// to consumers. Do this by killing the writer after each change to shared
505// memory, and confirming fetch == fetch next each time.
506TEST_F(LocklessQueueTest, FetchEqFetchNext) {
507 SharedTid tid;
508
509 // Make a small queue so it is easier to debug.
510 LocklessQueueConfiguration config;
511 config.num_watchers = 1;
512 config.num_senders = 2;
513 config.num_pinners = 0;
514 config.queue_size = 3;
515 config.message_data_size = 32;
516
517 TestShmRobustness(
518 config,
519 [config, &tid](void *memory) {
520 // Initialize the queue.
521 LocklessQueue(
522 reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
523 reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
524 config)
525 .Initialize();
526 tid.Set();
527 },
528 [config](void *memory) {
529 LocklessQueue queue(
530 reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
531 reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
532 config);
533 // Now try to write some messages. We will get killed a bunch as this
534 // tries to happen.
535 LocklessQueueSender sender =
536 LocklessQueueSender::Make(queue, chrono::nanoseconds(1)).value();
537 for (int i = 0; i < 5; ++i) {
538 char data[100];
539 size_t s = snprintf(data, sizeof(data), "foobar%d", i + 1);
Austin Schuhac6d89e2024-03-27 14:56:09 -0700540 ASSERT_EQ(
541 sender.Send(data, s + 1, monotonic_clock::min_time,
542 realtime_clock::min_time, monotonic_clock::min_time,
543 0xffffffffl, UUID::Zero(), nullptr, nullptr, nullptr),
544 LocklessQueueSender::Result::GOOD);
Austin Schuhfaec51a2023-09-08 17:43:32 -0700545 }
546 },
547 [config, &tid](void *raw_memory) {
548 ::aos::ipc_lib::LocklessQueueMemory *const memory =
549 reinterpret_cast<::aos::ipc_lib::LocklessQueueMemory *>(raw_memory);
550 LocklessQueue queue(memory, memory, config);
Philipp Schraderab2f8432023-09-17 18:58:06 -0700551 PretendThatOwnerIsDeadForTesting(&memory->queue_setup_lock, tid.Get());
Austin Schuhfaec51a2023-09-08 17:43:32 -0700552
553 if (VLOG_IS_ON(1)) {
554 PrintLocklessQueueMemory(memory);
555 }
556
557 const int i = VerifyMessages(&queue, memory);
558
559 LocklessQueueSender sender =
560 LocklessQueueSender::Make(queue, chrono::nanoseconds(1)).value();
561 {
562 char data[100];
563 size_t s = snprintf(data, sizeof(data), "foobar%d", i + 1);
Austin Schuhac6d89e2024-03-27 14:56:09 -0700564 ASSERT_EQ(
565 sender.Send(data, s + 1, monotonic_clock::min_time,
566 realtime_clock::min_time, monotonic_clock::min_time,
567 0xffffffffl, UUID::Zero(), nullptr, nullptr, nullptr),
568 LocklessQueueSender::Result::GOOD);
Austin Schuhfaec51a2023-09-08 17:43:32 -0700569 }
570
571 // Now, make sure we can send 1 message and receive it to confirm we
572 // haven't corrupted next_queue_index irrevocably.
573 const int newi = VerifyMessages(&queue, memory);
574 EXPECT_EQ(newi, i + 1);
575 });
576}
577
578#endif
579
Stephan Pleinesf63bde82024-01-13 15:59:33 -0800580} // namespace aos::ipc_lib::testing