blob: 27f3835a0181a4cb07a44ccdd2ed26a79565289e [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#include "aos/ipc_lib/queue_racer.h"
2
Tyler Chatowbf0609c2021-07-31 16:13:27 -07003#include <cinttypes>
4#include <cstring>
Austin Schuh20b2b082019-09-11 20:42:56 -07005#include <limits>
6
Austin Schuh20b2b082019-09-11 20:42:56 -07007#include "gtest/gtest.h"
8
Philipp Schrader790cb542023-07-05 21:06:52 -07009#include "aos/ipc_lib/event.h"
10
Austin Schuh20b2b082019-09-11 20:42:56 -070011namespace aos {
12namespace ipc_lib {
13namespace {
14
15struct ThreadPlusCount {
Austin Schuh82ea7382023-07-14 15:17:34 -070016 uint64_t thread;
Austin Schuh20b2b082019-09-11 20:42:56 -070017 uint64_t count;
18};
19
20} // namespace
21
22struct ThreadState {
23 ::std::thread thread;
24 Event ready;
25 uint64_t event_count = ::std::numeric_limits<uint64_t>::max();
26};
27
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070028QueueRacer::QueueRacer(LocklessQueue queue, int num_threads,
29 uint64_t num_messages)
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070030 : queue_(queue),
31 num_threads_(num_threads),
32 num_messages_(num_messages),
33 channel_storage_duration_(std::chrono::nanoseconds(1)),
34 expected_send_results_({LocklessQueueSender::Result::GOOD}),
35 check_writes_and_reads_(true) {
36 Reset();
37}
38
39QueueRacer::QueueRacer(LocklessQueue queue,
40 const QueueRacerConfiguration &config)
41 : queue_(queue),
42 num_threads_(config.num_threads),
43 num_messages_(config.num_messages),
44 channel_storage_duration_(config.channel_storage_duration),
45 expected_send_results_(config.expected_send_results),
46 check_writes_and_reads_(config.check_writes_and_reads) {
Austin Schuh20b2b082019-09-11 20:42:56 -070047 Reset();
48}
49
Austin Schuh82ea7382023-07-14 15:17:34 -070050void QueueRacer::RunIteration(bool race_reads, int write_wrap_count,
51 bool set_should_read, bool should_read_result) {
Austin Schuh20b2b082019-09-11 20:42:56 -070052 const bool will_wrap = num_messages_ * num_threads_ *
53 static_cast<uint64_t>(1 + write_wrap_count) >
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070054 queue_.config().queue_size;
Austin Schuh20b2b082019-09-11 20:42:56 -070055
56 // Clear out shmem.
57 Reset();
58 started_writes_ = 0;
59 finished_writes_ = 0;
60
61 // Event used to start all the threads processing at once.
62 Event run;
63
Brian Silvermand05b8192019-12-22 01:06:56 -080064 ::std::atomic<bool> poll_index{true};
Austin Schuh20b2b082019-09-11 20:42:56 -070065
66 // List of threads.
67 ::std::vector<ThreadState> threads(num_threads_);
68
69 ::std::thread queue_index_racer([this, &poll_index]() {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070070 LocklessQueueReader reader(queue_);
Austin Schuh20b2b082019-09-11 20:42:56 -070071
72 // Track the number of times we wrap, and cache the modulo.
73 uint64_t wrap_count = 0;
74 uint32_t last_queue_index = 0;
75 const uint32_t max_queue_index =
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070076 QueueIndex::MaxIndex(0xffffffffu, queue_.config().queue_size);
Austin Schuh20b2b082019-09-11 20:42:56 -070077 while (poll_index) {
78 // We want to read everything backwards. This will give us conservative
79 // bounds. And with enough time and randomness, we will see all the cases
80 // we care to see.
81
82 // These 3 numbers look at the same thing, but at different points of time
83 // in the process. The process (essentially) looks like:
84 //
85 // ++started_writes;
86 // ++latest_queue_index;
87 // ++finished_writes;
88 //
89 // We want to check that latest_queue_index is bounded by the number of
90 // writes started and finished. Basically, we can say that
91 // finished_writes < latest_queue_index always. And
92 // latest_queue_index < started_writes. And everything always increases.
93 // So, if we let more time elapse between sampling finished_writes and
94 // latest_queue_index, we will only be relaxing our bounds, not
95 // invalidating the check. The same goes for started_writes.
96 //
97 // So, grab them in order.
98 const uint64_t finished_writes = finished_writes_.load();
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070099 const QueueIndex latest_queue_index_queue_index = reader.LatestIndex();
Austin Schuh20b2b082019-09-11 20:42:56 -0700100 const uint64_t started_writes = started_writes_.load();
101
Alex Perrycb7da4b2019-08-28 19:35:56 -0700102 const uint32_t latest_queue_index_uint32_t =
Brian Silvermand05b8192019-12-22 01:06:56 -0800103 latest_queue_index_queue_index.index();
Austin Schuh20b2b082019-09-11 20:42:56 -0700104 uint64_t latest_queue_index = latest_queue_index_uint32_t;
105
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700106 if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700107 // If we got smaller, we wrapped.
108 if (latest_queue_index_uint32_t < last_queue_index) {
109 ++wrap_count;
110 }
111 // And apply it.
112 latest_queue_index +=
113 static_cast<uint64_t>(max_queue_index) * wrap_count;
114 last_queue_index = latest_queue_index_uint32_t;
115 }
116
117 // For grins, check that we have always started more than we finished.
118 // Should never fail.
119 EXPECT_GE(started_writes, finished_writes);
120
121 // If we are at the beginning, the queue needs to always return empty.
122 if (started_writes == 0) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700123 EXPECT_EQ(latest_queue_index_queue_index, QueueIndex::Invalid());
Austin Schuh20b2b082019-09-11 20:42:56 -0700124 EXPECT_EQ(finished_writes, 0);
125 } else {
126 if (finished_writes == 0) {
Brian Silvermand05b8192019-12-22 01:06:56 -0800127 // Plausible to be at the beginning, in which case we don't have
128 // anything to check.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700129 if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
Brian Silvermand05b8192019-12-22 01:06:56 -0800130 // Otherwise, we have started. The queue can't have any more
131 // entries than this.
Austin Schuh20b2b082019-09-11 20:42:56 -0700132 EXPECT_GE(started_writes, latest_queue_index + 1);
133 }
134 } else {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700135 EXPECT_NE(latest_queue_index_queue_index, QueueIndex::Invalid());
Austin Schuh20b2b082019-09-11 20:42:56 -0700136 // latest_queue_index is an index, not a count. So it always reads 1
137 // low.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700138 if (check_writes_and_reads_) {
139 EXPECT_GE(latest_queue_index + 1, finished_writes);
140 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700141 }
142 }
143 }
144 });
145
146 // Build up each thread and kick it off.
147 int thread_index = 0;
148 for (ThreadState &t : threads) {
149 if (will_wrap) {
150 t.event_count = ::std::numeric_limits<uint64_t>::max();
151 } else {
152 t.event_count = 0;
153 }
Brian Silverman177567e2020-08-12 19:51:33 -0700154 t.thread = ::std::thread([this, &t, thread_index, &run,
155 write_wrap_count]() {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700156 LocklessQueueSender sender =
157 LocklessQueueSender::Make(queue_, channel_storage_duration_).value();
Brian Silverman177567e2020-08-12 19:51:33 -0700158 CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
Austin Schuh20b2b082019-09-11 20:42:56 -0700159
Brian Silverman177567e2020-08-12 19:51:33 -0700160 // Signal that we are ready to start sending.
161 t.ready.Set();
Austin Schuh20b2b082019-09-11 20:42:56 -0700162
Brian Silverman177567e2020-08-12 19:51:33 -0700163 // Wait until signaled to start running.
164 run.Wait();
Austin Schuh20b2b082019-09-11 20:42:56 -0700165
Brian Silverman177567e2020-08-12 19:51:33 -0700166 // Gogogo!
167 for (uint64_t i = 0;
168 i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count);
169 ++i) {
170 char *const data = static_cast<char *>(sender.Data()) + sender.size() -
171 sizeof(ThreadPlusCount);
172 const char fill = (i + 55) & 0xFF;
173 memset(data, fill, sizeof(ThreadPlusCount));
174 {
175 bool found_nonzero = false;
176 for (size_t i = 0; i < sizeof(ThreadPlusCount); ++i) {
177 if (data[i] != fill) {
178 found_nonzero = true;
Austin Schuh20b2b082019-09-11 20:42:56 -0700179 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700180 }
Brian Silverman177567e2020-08-12 19:51:33 -0700181 CHECK(!found_nonzero) << ": Somebody else is writing to our buffer";
182 }
183
184 ThreadPlusCount tpc;
185 tpc.thread = thread_index;
186 tpc.count = i;
187
188 memcpy(data, &tpc, sizeof(ThreadPlusCount));
189
190 if (i % 0x800000 == 0x100000) {
191 fprintf(
192 stderr, "Sent %" PRIu64 ", %f %%\n", i,
193 static_cast<double>(i) /
194 static_cast<double>(num_messages_ * (1 + write_wrap_count)) *
195 100.0);
196 }
197
198 ++started_writes_;
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700199 auto result =
200 sender.Send(sizeof(ThreadPlusCount), aos::monotonic_clock::min_time,
Austin Schuh82ea7382023-07-14 15:17:34 -0700201 aos::realtime_clock::min_time, 0xffffffff,
202 UUID::FromSpan(absl::Span<const uint8_t>(
203 reinterpret_cast<const uint8_t *>(&tpc),
204 sizeof(ThreadPlusCount))),
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700205 nullptr, nullptr, nullptr);
206
207 CHECK(std::find(expected_send_results_.begin(),
208 expected_send_results_.end(),
209 result) != expected_send_results_.end())
210 << "Unexpected send result: " << result;
211
Brian Silverman177567e2020-08-12 19:51:33 -0700212 // Blank out the new scratch buffer, to catch other people using it.
213 {
214 char *const new_data = static_cast<char *>(sender.Data()) +
215 sender.size() - sizeof(ThreadPlusCount);
216 const char new_fill = ~fill;
217 memset(new_data, new_fill, sizeof(ThreadPlusCount));
218 }
219 ++finished_writes_;
220 }
221 });
Austin Schuh20b2b082019-09-11 20:42:56 -0700222 ++thread_index;
223 }
224
225 // Wait until all the threads are ready.
226 for (ThreadState &t : threads) {
227 t.ready.Wait();
228 }
229
230 // And start them racing.
231 run.Set();
232
233 // Let all the threads finish before reading if we are supposed to not be
234 // racing reads.
235 if (!race_reads) {
236 for (ThreadState &t : threads) {
237 t.thread.join();
238 }
239 poll_index = false;
240 queue_index_racer.join();
241 }
242
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700243 if (check_writes_and_reads_) {
Austin Schuh82ea7382023-07-14 15:17:34 -0700244 CheckReads(race_reads, write_wrap_count, &threads, set_should_read,
245 should_read_result);
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700246 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700247
248 // Reap all the threads.
249 if (race_reads) {
250 for (ThreadState &t : threads) {
251 t.thread.join();
252 }
253 poll_index = false;
254 queue_index_racer.join();
255 }
256
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700257 if (check_writes_and_reads_) {
258 // Confirm that the number of writes matches the expected number of writes.
259 ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
260 started_writes_);
261 ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
262 finished_writes_);
Austin Schuh20b2b082019-09-11 20:42:56 -0700263
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700264 // And that every thread sent the right number of messages.
265 for (ThreadState &t : threads) {
266 if (will_wrap) {
267 if (!race_reads) {
268 // If we are wrapping, there is a possibility that a thread writes
269 // everything *before* we can read any of it, and it all gets
270 // overwritten.
271 ASSERT_TRUE(t.event_count == ::std::numeric_limits<uint64_t>::max() ||
272 t.event_count == (1 + write_wrap_count) * num_messages_)
273 << ": Got " << t.event_count << " events, expected "
274 << (1 + write_wrap_count) * num_messages_;
275 }
276 } else {
277 ASSERT_EQ(t.event_count, num_messages_);
Austin Schuh20b2b082019-09-11 20:42:56 -0700278 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700279 }
280 }
281}
282
283void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
Austin Schuh82ea7382023-07-14 15:17:34 -0700284 ::std::vector<ThreadState> *threads,
285 bool set_should_read, bool should_read_result) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700286 // Now read back the results to double check.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700287 LocklessQueueReader reader(queue_);
288 const bool will_wrap = num_messages_ * num_threads_ * (1 + write_wrap_count) >
289 LocklessQueueSize(queue_.memory());
Austin Schuh20b2b082019-09-11 20:42:56 -0700290
291 monotonic_clock::time_point last_monotonic_sent_time =
292 monotonic_clock::epoch();
293 uint64_t initial_i = 0;
294 if (will_wrap) {
295 initial_i = (1 + write_wrap_count) * num_messages_ * num_threads_ -
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700296 LocklessQueueSize(queue_.memory());
Austin Schuh20b2b082019-09-11 20:42:56 -0700297 }
298
Austin Schuh82ea7382023-07-14 15:17:34 -0700299 std::function<bool(const Context &)> nop;
300
301 Context fetched_context;
302 std::function<bool(const Context &)> should_read =
303 [&should_read_result, &fetched_context](const Context &context) {
304 fetched_context = context;
305 return should_read_result;
306 };
307
Austin Schuh20b2b082019-09-11 20:42:56 -0700308 for (uint64_t i = initial_i;
309 i < (1 + write_wrap_count) * num_messages_ * num_threads_; ++i) {
Austin Schuhb5c6f972021-03-14 21:53:07 -0700310 monotonic_clock::time_point monotonic_sent_time;
311 realtime_clock::time_point realtime_sent_time;
312 monotonic_clock::time_point monotonic_remote_time;
313 realtime_clock::time_point realtime_remote_time;
Austin Schuha9012be2021-07-21 15:19:11 -0700314 UUID source_boot_uuid;
Austin Schuhad154822019-12-27 15:45:13 -0800315 uint32_t remote_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -0700316 size_t length;
317 char read_data[1024];
318
319 // Handle overflowing the message count for the wrap test.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700320 const uint32_t wrapped_i =
321 i % static_cast<size_t>(QueueIndex::MaxIndex(
322 0xffffffffu, LocklessQueueSize(queue_.memory())));
Austin Schuh0bd410a2023-11-05 12:38:12 -0800323 LocklessQueueReader::Result read_result =
324 set_should_read
325 ? reader.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
326 &monotonic_remote_time, &realtime_remote_time,
327 &remote_queue_index, &source_boot_uuid, &length,
328 &(read_data[0]), std::ref(should_read))
329 : reader.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
330 &monotonic_remote_time, &realtime_remote_time,
331 &remote_queue_index, &source_boot_uuid, &length,
332 &(read_data[0]), nop);
Austin Schuh20b2b082019-09-11 20:42:56 -0700333
Austin Schuh82ea7382023-07-14 15:17:34 -0700334 // The code in lockless_queue.cc reads everything but data, checks that the
335 // header hasn't changed, then reads the data. So, if we succeed and both
336 // end up not being corrupted, then we've confirmed everything works.
337 //
338 // Feed in both combos of should_read and whether or not to return true or
339 // false from should_read. By capturing the header values inside the
340 // callback, we can also verify the state in the middle of the process to
341 // make sure we have the right boundaries.
Austin Schuh20b2b082019-09-11 20:42:56 -0700342 if (race_reads) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700343 if (read_result == LocklessQueueReader::Result::NOTHING_NEW) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700344 --i;
345 continue;
346 }
347 }
348
349 if (race_reads && will_wrap) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700350 if (read_result == LocklessQueueReader::Result::TOO_OLD) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700351 continue;
352 }
353 }
Austin Schuh82ea7382023-07-14 15:17:34 -0700354
355 if (!set_should_read) {
356 // Every message should be good.
357 ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD)
358 << ": i is " << i;
359 } else {
360 if (should_read_result) {
361 ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD)
362 << ": i is " << i;
363
364 ASSERT_EQ(monotonic_sent_time, fetched_context.monotonic_event_time);
365 ASSERT_EQ(realtime_sent_time, fetched_context.realtime_event_time);
366 ASSERT_EQ(monotonic_remote_time, fetched_context.monotonic_remote_time);
367 ASSERT_EQ(realtime_remote_time, fetched_context.realtime_remote_time);
368 ASSERT_EQ(source_boot_uuid, fetched_context.source_boot_uuid);
369 ASSERT_EQ(remote_queue_index, fetched_context.remote_queue_index);
370 ASSERT_EQ(length, fetched_context.size);
371
372 ASSERT_EQ(
373 absl::Span<const uint8_t>(
374 reinterpret_cast<const uint8_t *>(
375 read_data + LocklessQueueMessageDataSize(queue_.memory()) -
376 length),
377 length),
378 source_boot_uuid.span());
379 } else {
380 ASSERT_EQ(read_result, LocklessQueueReader::Result::FILTERED);
381 monotonic_sent_time = fetched_context.monotonic_event_time;
382 realtime_sent_time = fetched_context.realtime_event_time;
383 monotonic_remote_time = fetched_context.monotonic_remote_time;
384 realtime_remote_time = fetched_context.realtime_remote_time;
385 source_boot_uuid = fetched_context.source_boot_uuid;
386 remote_queue_index = fetched_context.remote_queue_index;
387 length = fetched_context.size;
388 }
389 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700390
391 // And, confirm that time never went backwards.
392 ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
393 last_monotonic_sent_time = monotonic_sent_time;
394
Austin Schuh82ea7382023-07-14 15:17:34 -0700395 ASSERT_EQ(monotonic_remote_time, aos::monotonic_clock::min_time);
396 ASSERT_EQ(realtime_remote_time, aos::realtime_clock::min_time);
Austin Schuhad154822019-12-27 15:45:13 -0800397
Austin Schuh20b2b082019-09-11 20:42:56 -0700398 ThreadPlusCount tpc;
Austin Schuh82ea7382023-07-14 15:17:34 -0700399 ASSERT_EQ(source_boot_uuid.span().size(), sizeof(ThreadPlusCount));
400 memcpy(&tpc, source_boot_uuid.span().data(),
401 source_boot_uuid.span().size());
Austin Schuh20b2b082019-09-11 20:42:56 -0700402
403 if (will_wrap) {
404 // The queue won't chang out from under us, so we should get some amount
405 // of the tail end of the messages from a a thread.
406 // Confirm that once we get our first message, they all show up.
407 if ((*threads)[tpc.thread].event_count ==
408 ::std::numeric_limits<uint64_t>::max()) {
409 (*threads)[tpc.thread].event_count = tpc.count;
410 }
411
412 if (race_reads) {
413 // Make sure nothing goes backwards. Really not much we can do here.
Brian Silverman177567e2020-08-12 19:51:33 -0700414 ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count)
415 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700416 (*threads)[tpc.thread].event_count = tpc.count;
417 } else {
418 // Make sure nothing goes backwards. Really not much we can do here.
Brian Silverman177567e2020-08-12 19:51:33 -0700419 ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
420 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700421 }
422 } else {
423 // Confirm that we see every message counter from every thread.
Brian Silverman177567e2020-08-12 19:51:33 -0700424 ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
425 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700426 }
427 ++(*threads)[tpc.thread].event_count;
428 }
429}
430
431} // namespace ipc_lib
432} // namespace aos