blob: 67ed6c517487924921707829461eaa8c02cc75b6 [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#include "aos/ipc_lib/queue_racer.h"
2
Stephan Pleines682928d2024-05-31 20:43:48 -07003#include <stdio.h>
4
5#include <algorithm>
Tyler Chatowbf0609c2021-07-31 16:13:27 -07006#include <cinttypes>
7#include <cstring>
Austin Schuh20b2b082019-09-11 20:42:56 -07008#include <limits>
Stephan Pleines682928d2024-05-31 20:43:48 -07009#include <optional>
10#include <ostream>
11#include <thread>
Austin Schuh20b2b082019-09-11 20:42:56 -070012
Stephan Pleines682928d2024-05-31 20:43:48 -070013#include "absl/types/span.h"
14#include "glog/logging.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070015#include "gtest/gtest.h"
16
Stephan Pleines682928d2024-05-31 20:43:48 -070017#include "aos/events/context.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070018#include "aos/ipc_lib/event.h"
19
Stephan Pleinesf63bde82024-01-13 15:59:33 -080020namespace aos::ipc_lib {
Austin Schuh20b2b082019-09-11 20:42:56 -070021namespace {
22
23struct ThreadPlusCount {
Austin Schuh82ea7382023-07-14 15:17:34 -070024 uint64_t thread;
Austin Schuh20b2b082019-09-11 20:42:56 -070025 uint64_t count;
26};
27
28} // namespace
29
30struct ThreadState {
31 ::std::thread thread;
32 Event ready;
33 uint64_t event_count = ::std::numeric_limits<uint64_t>::max();
34};
35
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070036QueueRacer::QueueRacer(LocklessQueue queue, int num_threads,
37 uint64_t num_messages)
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070038 : queue_(queue),
39 num_threads_(num_threads),
40 num_messages_(num_messages),
41 channel_storage_duration_(std::chrono::nanoseconds(1)),
42 expected_send_results_({LocklessQueueSender::Result::GOOD}),
43 check_writes_and_reads_(true) {
44 Reset();
45}
46
47QueueRacer::QueueRacer(LocklessQueue queue,
48 const QueueRacerConfiguration &config)
49 : queue_(queue),
50 num_threads_(config.num_threads),
51 num_messages_(config.num_messages),
52 channel_storage_duration_(config.channel_storage_duration),
53 expected_send_results_(config.expected_send_results),
54 check_writes_and_reads_(config.check_writes_and_reads) {
Austin Schuh20b2b082019-09-11 20:42:56 -070055 Reset();
56}
57
Austin Schuh82ea7382023-07-14 15:17:34 -070058void QueueRacer::RunIteration(bool race_reads, int write_wrap_count,
59 bool set_should_read, bool should_read_result) {
Austin Schuh20b2b082019-09-11 20:42:56 -070060 const bool will_wrap = num_messages_ * num_threads_ *
61 static_cast<uint64_t>(1 + write_wrap_count) >
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070062 queue_.config().queue_size;
Austin Schuh20b2b082019-09-11 20:42:56 -070063
64 // Clear out shmem.
65 Reset();
66 started_writes_ = 0;
67 finished_writes_ = 0;
68
69 // Event used to start all the threads processing at once.
70 Event run;
71
Brian Silvermand05b8192019-12-22 01:06:56 -080072 ::std::atomic<bool> poll_index{true};
Austin Schuh20b2b082019-09-11 20:42:56 -070073
74 // List of threads.
75 ::std::vector<ThreadState> threads(num_threads_);
76
77 ::std::thread queue_index_racer([this, &poll_index]() {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070078 LocklessQueueReader reader(queue_);
Austin Schuh20b2b082019-09-11 20:42:56 -070079
80 // Track the number of times we wrap, and cache the modulo.
81 uint64_t wrap_count = 0;
82 uint32_t last_queue_index = 0;
83 const uint32_t max_queue_index =
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070084 QueueIndex::MaxIndex(0xffffffffu, queue_.config().queue_size);
Austin Schuh20b2b082019-09-11 20:42:56 -070085 while (poll_index) {
86 // We want to read everything backwards. This will give us conservative
87 // bounds. And with enough time and randomness, we will see all the cases
88 // we care to see.
89
90 // These 3 numbers look at the same thing, but at different points of time
91 // in the process. The process (essentially) looks like:
92 //
93 // ++started_writes;
94 // ++latest_queue_index;
95 // ++finished_writes;
96 //
97 // We want to check that latest_queue_index is bounded by the number of
98 // writes started and finished. Basically, we can say that
99 // finished_writes < latest_queue_index always. And
100 // latest_queue_index < started_writes. And everything always increases.
101 // So, if we let more time elapse between sampling finished_writes and
102 // latest_queue_index, we will only be relaxing our bounds, not
103 // invalidating the check. The same goes for started_writes.
104 //
105 // So, grab them in order.
106 const uint64_t finished_writes = finished_writes_.load();
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700107 const QueueIndex latest_queue_index_queue_index = reader.LatestIndex();
Austin Schuh20b2b082019-09-11 20:42:56 -0700108 const uint64_t started_writes = started_writes_.load();
109
Alex Perrycb7da4b2019-08-28 19:35:56 -0700110 const uint32_t latest_queue_index_uint32_t =
Brian Silvermand05b8192019-12-22 01:06:56 -0800111 latest_queue_index_queue_index.index();
Austin Schuh20b2b082019-09-11 20:42:56 -0700112 uint64_t latest_queue_index = latest_queue_index_uint32_t;
113
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700114 if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700115 // If we got smaller, we wrapped.
116 if (latest_queue_index_uint32_t < last_queue_index) {
117 ++wrap_count;
118 }
119 // And apply it.
120 latest_queue_index +=
121 static_cast<uint64_t>(max_queue_index) * wrap_count;
122 last_queue_index = latest_queue_index_uint32_t;
123 }
124
125 // For grins, check that we have always started more than we finished.
126 // Should never fail.
127 EXPECT_GE(started_writes, finished_writes);
128
129 // If we are at the beginning, the queue needs to always return empty.
130 if (started_writes == 0) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700131 EXPECT_EQ(latest_queue_index_queue_index, QueueIndex::Invalid());
Austin Schuh20b2b082019-09-11 20:42:56 -0700132 EXPECT_EQ(finished_writes, 0);
133 } else {
134 if (finished_writes == 0) {
Brian Silvermand05b8192019-12-22 01:06:56 -0800135 // Plausible to be at the beginning, in which case we don't have
136 // anything to check.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700137 if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
Brian Silvermand05b8192019-12-22 01:06:56 -0800138 // Otherwise, we have started. The queue can't have any more
139 // entries than this.
Austin Schuh20b2b082019-09-11 20:42:56 -0700140 EXPECT_GE(started_writes, latest_queue_index + 1);
141 }
142 } else {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700143 EXPECT_NE(latest_queue_index_queue_index, QueueIndex::Invalid());
Austin Schuh20b2b082019-09-11 20:42:56 -0700144 // latest_queue_index is an index, not a count. So it always reads 1
145 // low.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700146 if (check_writes_and_reads_) {
147 EXPECT_GE(latest_queue_index + 1, finished_writes);
148 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700149 }
150 }
151 }
152 });
153
154 // Build up each thread and kick it off.
155 int thread_index = 0;
156 for (ThreadState &t : threads) {
157 if (will_wrap) {
158 t.event_count = ::std::numeric_limits<uint64_t>::max();
159 } else {
160 t.event_count = 0;
161 }
Brian Silverman177567e2020-08-12 19:51:33 -0700162 t.thread = ::std::thread([this, &t, thread_index, &run,
163 write_wrap_count]() {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700164 LocklessQueueSender sender =
165 LocklessQueueSender::Make(queue_, channel_storage_duration_).value();
Brian Silverman177567e2020-08-12 19:51:33 -0700166 CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
Austin Schuh20b2b082019-09-11 20:42:56 -0700167
Brian Silverman177567e2020-08-12 19:51:33 -0700168 // Signal that we are ready to start sending.
169 t.ready.Set();
Austin Schuh20b2b082019-09-11 20:42:56 -0700170
Brian Silverman177567e2020-08-12 19:51:33 -0700171 // Wait until signaled to start running.
172 run.Wait();
Austin Schuh20b2b082019-09-11 20:42:56 -0700173
Brian Silverman177567e2020-08-12 19:51:33 -0700174 // Gogogo!
175 for (uint64_t i = 0;
176 i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count);
177 ++i) {
178 char *const data = static_cast<char *>(sender.Data()) + sender.size() -
179 sizeof(ThreadPlusCount);
180 const char fill = (i + 55) & 0xFF;
181 memset(data, fill, sizeof(ThreadPlusCount));
182 {
183 bool found_nonzero = false;
184 for (size_t i = 0; i < sizeof(ThreadPlusCount); ++i) {
185 if (data[i] != fill) {
186 found_nonzero = true;
Austin Schuh20b2b082019-09-11 20:42:56 -0700187 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700188 }
Brian Silverman177567e2020-08-12 19:51:33 -0700189 CHECK(!found_nonzero) << ": Somebody else is writing to our buffer";
190 }
191
192 ThreadPlusCount tpc;
193 tpc.thread = thread_index;
194 tpc.count = i;
195
196 memcpy(data, &tpc, sizeof(ThreadPlusCount));
197
198 if (i % 0x800000 == 0x100000) {
199 fprintf(
200 stderr, "Sent %" PRIu64 ", %f %%\n", i,
201 static_cast<double>(i) /
202 static_cast<double>(num_messages_ * (1 + write_wrap_count)) *
203 100.0);
204 }
205
206 ++started_writes_;
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700207 auto result =
208 sender.Send(sizeof(ThreadPlusCount), aos::monotonic_clock::min_time,
Austin Schuhac6d89e2024-03-27 14:56:09 -0700209 aos::realtime_clock::min_time,
210 aos::monotonic_clock::min_time, 0xffffffff,
Austin Schuh82ea7382023-07-14 15:17:34 -0700211 UUID::FromSpan(absl::Span<const uint8_t>(
212 reinterpret_cast<const uint8_t *>(&tpc),
213 sizeof(ThreadPlusCount))),
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700214 nullptr, nullptr, nullptr);
215
216 CHECK(std::find(expected_send_results_.begin(),
217 expected_send_results_.end(),
218 result) != expected_send_results_.end())
219 << "Unexpected send result: " << result;
220
Brian Silverman177567e2020-08-12 19:51:33 -0700221 // Blank out the new scratch buffer, to catch other people using it.
222 {
223 char *const new_data = static_cast<char *>(sender.Data()) +
224 sender.size() - sizeof(ThreadPlusCount);
225 const char new_fill = ~fill;
226 memset(new_data, new_fill, sizeof(ThreadPlusCount));
227 }
228 ++finished_writes_;
229 }
230 });
Austin Schuh20b2b082019-09-11 20:42:56 -0700231 ++thread_index;
232 }
233
234 // Wait until all the threads are ready.
235 for (ThreadState &t : threads) {
236 t.ready.Wait();
237 }
238
239 // And start them racing.
240 run.Set();
241
242 // Let all the threads finish before reading if we are supposed to not be
243 // racing reads.
244 if (!race_reads) {
245 for (ThreadState &t : threads) {
246 t.thread.join();
247 }
248 poll_index = false;
249 queue_index_racer.join();
250 }
251
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700252 if (check_writes_and_reads_) {
Austin Schuh82ea7382023-07-14 15:17:34 -0700253 CheckReads(race_reads, write_wrap_count, &threads, set_should_read,
254 should_read_result);
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700255 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700256
257 // Reap all the threads.
258 if (race_reads) {
259 for (ThreadState &t : threads) {
260 t.thread.join();
261 }
262 poll_index = false;
263 queue_index_racer.join();
264 }
265
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700266 if (check_writes_and_reads_) {
267 // Confirm that the number of writes matches the expected number of writes.
268 ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
269 started_writes_);
270 ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
271 finished_writes_);
Austin Schuh20b2b082019-09-11 20:42:56 -0700272
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700273 // And that every thread sent the right number of messages.
274 for (ThreadState &t : threads) {
275 if (will_wrap) {
276 if (!race_reads) {
277 // If we are wrapping, there is a possibility that a thread writes
278 // everything *before* we can read any of it, and it all gets
279 // overwritten.
280 ASSERT_TRUE(t.event_count == ::std::numeric_limits<uint64_t>::max() ||
281 t.event_count == (1 + write_wrap_count) * num_messages_)
282 << ": Got " << t.event_count << " events, expected "
283 << (1 + write_wrap_count) * num_messages_;
284 }
285 } else {
286 ASSERT_EQ(t.event_count, num_messages_);
Austin Schuh20b2b082019-09-11 20:42:56 -0700287 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700288 }
289 }
290}
291
292void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
Austin Schuh82ea7382023-07-14 15:17:34 -0700293 ::std::vector<ThreadState> *threads,
294 bool set_should_read, bool should_read_result) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700295 // Now read back the results to double check.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700296 LocklessQueueReader reader(queue_);
297 const bool will_wrap = num_messages_ * num_threads_ * (1 + write_wrap_count) >
298 LocklessQueueSize(queue_.memory());
Austin Schuh20b2b082019-09-11 20:42:56 -0700299
300 monotonic_clock::time_point last_monotonic_sent_time =
301 monotonic_clock::epoch();
302 uint64_t initial_i = 0;
303 if (will_wrap) {
304 initial_i = (1 + write_wrap_count) * num_messages_ * num_threads_ -
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700305 LocklessQueueSize(queue_.memory());
Austin Schuh20b2b082019-09-11 20:42:56 -0700306 }
307
Austin Schuh82ea7382023-07-14 15:17:34 -0700308 std::function<bool(const Context &)> nop;
309
310 Context fetched_context;
311 std::function<bool(const Context &)> should_read =
312 [&should_read_result, &fetched_context](const Context &context) {
313 fetched_context = context;
314 return should_read_result;
315 };
316
Austin Schuh20b2b082019-09-11 20:42:56 -0700317 for (uint64_t i = initial_i;
318 i < (1 + write_wrap_count) * num_messages_ * num_threads_; ++i) {
Austin Schuhb5c6f972021-03-14 21:53:07 -0700319 monotonic_clock::time_point monotonic_sent_time;
320 realtime_clock::time_point realtime_sent_time;
321 monotonic_clock::time_point monotonic_remote_time;
Austin Schuhac6d89e2024-03-27 14:56:09 -0700322 monotonic_clock::time_point monotonic_remote_transmit_time;
Austin Schuhb5c6f972021-03-14 21:53:07 -0700323 realtime_clock::time_point realtime_remote_time;
Austin Schuha9012be2021-07-21 15:19:11 -0700324 UUID source_boot_uuid;
Austin Schuhad154822019-12-27 15:45:13 -0800325 uint32_t remote_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -0700326 size_t length;
327 char read_data[1024];
328
329 // Handle overflowing the message count for the wrap test.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700330 const uint32_t wrapped_i =
331 i % static_cast<size_t>(QueueIndex::MaxIndex(
332 0xffffffffu, LocklessQueueSize(queue_.memory())));
Austin Schuh0bd410a2023-11-05 12:38:12 -0800333 LocklessQueueReader::Result read_result =
334 set_should_read
Austin Schuhac6d89e2024-03-27 14:56:09 -0700335 ? reader.Read(
336 wrapped_i, &monotonic_sent_time, &realtime_sent_time,
337 &monotonic_remote_time, &monotonic_remote_transmit_time,
338 &realtime_remote_time, &remote_queue_index, &source_boot_uuid,
339 &length, &(read_data[0]), std::ref(should_read))
Austin Schuh0bd410a2023-11-05 12:38:12 -0800340 : reader.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
Austin Schuhac6d89e2024-03-27 14:56:09 -0700341 &monotonic_remote_time,
342 &monotonic_remote_transmit_time,
343 &realtime_remote_time, &remote_queue_index,
344 &source_boot_uuid, &length, &(read_data[0]), nop);
Austin Schuh20b2b082019-09-11 20:42:56 -0700345
Austin Schuh82ea7382023-07-14 15:17:34 -0700346 // The code in lockless_queue.cc reads everything but data, checks that the
347 // header hasn't changed, then reads the data. So, if we succeed and both
348 // end up not being corrupted, then we've confirmed everything works.
349 //
350 // Feed in both combos of should_read and whether or not to return true or
351 // false from should_read. By capturing the header values inside the
352 // callback, we can also verify the state in the middle of the process to
353 // make sure we have the right boundaries.
Austin Schuh20b2b082019-09-11 20:42:56 -0700354 if (race_reads) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700355 if (read_result == LocklessQueueReader::Result::NOTHING_NEW) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700356 --i;
357 continue;
358 }
359 }
360
361 if (race_reads && will_wrap) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700362 if (read_result == LocklessQueueReader::Result::TOO_OLD) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700363 continue;
364 }
365 }
Austin Schuh82ea7382023-07-14 15:17:34 -0700366
367 if (!set_should_read) {
368 // Every message should be good.
369 ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD)
370 << ": i is " << i;
371 } else {
372 if (should_read_result) {
373 ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD)
374 << ": i is " << i;
375
376 ASSERT_EQ(monotonic_sent_time, fetched_context.monotonic_event_time);
377 ASSERT_EQ(realtime_sent_time, fetched_context.realtime_event_time);
378 ASSERT_EQ(monotonic_remote_time, fetched_context.monotonic_remote_time);
379 ASSERT_EQ(realtime_remote_time, fetched_context.realtime_remote_time);
380 ASSERT_EQ(source_boot_uuid, fetched_context.source_boot_uuid);
381 ASSERT_EQ(remote_queue_index, fetched_context.remote_queue_index);
382 ASSERT_EQ(length, fetched_context.size);
383
384 ASSERT_EQ(
385 absl::Span<const uint8_t>(
386 reinterpret_cast<const uint8_t *>(
387 read_data + LocklessQueueMessageDataSize(queue_.memory()) -
388 length),
389 length),
390 source_boot_uuid.span());
391 } else {
392 ASSERT_EQ(read_result, LocklessQueueReader::Result::FILTERED);
393 monotonic_sent_time = fetched_context.monotonic_event_time;
394 realtime_sent_time = fetched_context.realtime_event_time;
395 monotonic_remote_time = fetched_context.monotonic_remote_time;
396 realtime_remote_time = fetched_context.realtime_remote_time;
397 source_boot_uuid = fetched_context.source_boot_uuid;
398 remote_queue_index = fetched_context.remote_queue_index;
399 length = fetched_context.size;
400 }
401 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700402
403 // And, confirm that time never went backwards.
404 ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
405 last_monotonic_sent_time = monotonic_sent_time;
406
Austin Schuh82ea7382023-07-14 15:17:34 -0700407 ASSERT_EQ(monotonic_remote_time, aos::monotonic_clock::min_time);
408 ASSERT_EQ(realtime_remote_time, aos::realtime_clock::min_time);
Austin Schuhad154822019-12-27 15:45:13 -0800409
Austin Schuh20b2b082019-09-11 20:42:56 -0700410 ThreadPlusCount tpc;
Austin Schuh82ea7382023-07-14 15:17:34 -0700411 ASSERT_EQ(source_boot_uuid.span().size(), sizeof(ThreadPlusCount));
412 memcpy(&tpc, source_boot_uuid.span().data(),
413 source_boot_uuid.span().size());
Austin Schuh20b2b082019-09-11 20:42:56 -0700414
415 if (will_wrap) {
416 // The queue won't chang out from under us, so we should get some amount
417 // of the tail end of the messages from a a thread.
418 // Confirm that once we get our first message, they all show up.
419 if ((*threads)[tpc.thread].event_count ==
420 ::std::numeric_limits<uint64_t>::max()) {
421 (*threads)[tpc.thread].event_count = tpc.count;
422 }
423
424 if (race_reads) {
425 // Make sure nothing goes backwards. Really not much we can do here.
Brian Silverman177567e2020-08-12 19:51:33 -0700426 ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count)
427 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700428 (*threads)[tpc.thread].event_count = tpc.count;
429 } else {
430 // Make sure nothing goes backwards. Really not much we can do here.
Brian Silverman177567e2020-08-12 19:51:33 -0700431 ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
432 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700433 }
434 } else {
435 // Confirm that we see every message counter from every thread.
Brian Silverman177567e2020-08-12 19:51:33 -0700436 ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
437 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700438 }
439 ++(*threads)[tpc.thread].event_count;
440 }
441}
442
Stephan Pleinesf63bde82024-01-13 15:59:33 -0800443} // namespace aos::ipc_lib