blob: a25d10aa500011e971f4b0c45ae15fbe2aa61f72 [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#include "aos/ipc_lib/queue_racer.h"
2
Stephan Pleines682928d2024-05-31 20:43:48 -07003#include <stdio.h>
4
5#include <algorithm>
Tyler Chatowbf0609c2021-07-31 16:13:27 -07006#include <cinttypes>
7#include <cstring>
Austin Schuh20b2b082019-09-11 20:42:56 -07008#include <limits>
Stephan Pleines682928d2024-05-31 20:43:48 -07009#include <optional>
10#include <ostream>
11#include <thread>
Austin Schuh20b2b082019-09-11 20:42:56 -070012
Austin Schuh99f7c6a2024-06-25 22:07:44 -070013#include "absl/log/check.h"
14#include "absl/log/log.h"
Stephan Pleines682928d2024-05-31 20:43:48 -070015#include "absl/types/span.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070016#include "gtest/gtest.h"
17
Stephan Pleines682928d2024-05-31 20:43:48 -070018#include "aos/events/context.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070019#include "aos/ipc_lib/event.h"
20
Stephan Pleinesf63bde82024-01-13 15:59:33 -080021namespace aos::ipc_lib {
Austin Schuh20b2b082019-09-11 20:42:56 -070022namespace {
23
24struct ThreadPlusCount {
Austin Schuh82ea7382023-07-14 15:17:34 -070025 uint64_t thread;
Austin Schuh20b2b082019-09-11 20:42:56 -070026 uint64_t count;
27};
28
29} // namespace
30
31struct ThreadState {
32 ::std::thread thread;
33 Event ready;
34 uint64_t event_count = ::std::numeric_limits<uint64_t>::max();
35};
36
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070037QueueRacer::QueueRacer(LocklessQueue queue, int num_threads,
38 uint64_t num_messages)
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070039 : queue_(queue),
40 num_threads_(num_threads),
41 num_messages_(num_messages),
42 channel_storage_duration_(std::chrono::nanoseconds(1)),
43 expected_send_results_({LocklessQueueSender::Result::GOOD}),
44 check_writes_and_reads_(true) {
45 Reset();
46}
47
48QueueRacer::QueueRacer(LocklessQueue queue,
49 const QueueRacerConfiguration &config)
50 : queue_(queue),
51 num_threads_(config.num_threads),
52 num_messages_(config.num_messages),
53 channel_storage_duration_(config.channel_storage_duration),
54 expected_send_results_(config.expected_send_results),
55 check_writes_and_reads_(config.check_writes_and_reads) {
Austin Schuh20b2b082019-09-11 20:42:56 -070056 Reset();
57}
58
Austin Schuh82ea7382023-07-14 15:17:34 -070059void QueueRacer::RunIteration(bool race_reads, int write_wrap_count,
60 bool set_should_read, bool should_read_result) {
Austin Schuh20b2b082019-09-11 20:42:56 -070061 const bool will_wrap = num_messages_ * num_threads_ *
62 static_cast<uint64_t>(1 + write_wrap_count) >
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070063 queue_.config().queue_size;
Austin Schuh20b2b082019-09-11 20:42:56 -070064
65 // Clear out shmem.
66 Reset();
67 started_writes_ = 0;
68 finished_writes_ = 0;
69
70 // Event used to start all the threads processing at once.
71 Event run;
72
Brian Silvermand05b8192019-12-22 01:06:56 -080073 ::std::atomic<bool> poll_index{true};
Austin Schuh20b2b082019-09-11 20:42:56 -070074
75 // List of threads.
76 ::std::vector<ThreadState> threads(num_threads_);
77
78 ::std::thread queue_index_racer([this, &poll_index]() {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070079 LocklessQueueReader reader(queue_);
Austin Schuh20b2b082019-09-11 20:42:56 -070080
81 // Track the number of times we wrap, and cache the modulo.
82 uint64_t wrap_count = 0;
83 uint32_t last_queue_index = 0;
84 const uint32_t max_queue_index =
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070085 QueueIndex::MaxIndex(0xffffffffu, queue_.config().queue_size);
Austin Schuh20b2b082019-09-11 20:42:56 -070086 while (poll_index) {
87 // We want to read everything backwards. This will give us conservative
88 // bounds. And with enough time and randomness, we will see all the cases
89 // we care to see.
90
91 // These 3 numbers look at the same thing, but at different points of time
92 // in the process. The process (essentially) looks like:
93 //
94 // ++started_writes;
95 // ++latest_queue_index;
96 // ++finished_writes;
97 //
98 // We want to check that latest_queue_index is bounded by the number of
99 // writes started and finished. Basically, we can say that
100 // finished_writes < latest_queue_index always. And
101 // latest_queue_index < started_writes. And everything always increases.
102 // So, if we let more time elapse between sampling finished_writes and
103 // latest_queue_index, we will only be relaxing our bounds, not
104 // invalidating the check. The same goes for started_writes.
105 //
106 // So, grab them in order.
107 const uint64_t finished_writes = finished_writes_.load();
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700108 const QueueIndex latest_queue_index_queue_index = reader.LatestIndex();
Austin Schuh20b2b082019-09-11 20:42:56 -0700109 const uint64_t started_writes = started_writes_.load();
110
Alex Perrycb7da4b2019-08-28 19:35:56 -0700111 const uint32_t latest_queue_index_uint32_t =
Brian Silvermand05b8192019-12-22 01:06:56 -0800112 latest_queue_index_queue_index.index();
Austin Schuh20b2b082019-09-11 20:42:56 -0700113 uint64_t latest_queue_index = latest_queue_index_uint32_t;
114
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700115 if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700116 // If we got smaller, we wrapped.
117 if (latest_queue_index_uint32_t < last_queue_index) {
118 ++wrap_count;
119 }
120 // And apply it.
121 latest_queue_index +=
122 static_cast<uint64_t>(max_queue_index) * wrap_count;
123 last_queue_index = latest_queue_index_uint32_t;
124 }
125
126 // For grins, check that we have always started more than we finished.
127 // Should never fail.
128 EXPECT_GE(started_writes, finished_writes);
129
130 // If we are at the beginning, the queue needs to always return empty.
131 if (started_writes == 0) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700132 EXPECT_EQ(latest_queue_index_queue_index, QueueIndex::Invalid());
Austin Schuh20b2b082019-09-11 20:42:56 -0700133 EXPECT_EQ(finished_writes, 0);
134 } else {
135 if (finished_writes == 0) {
Brian Silvermand05b8192019-12-22 01:06:56 -0800136 // Plausible to be at the beginning, in which case we don't have
137 // anything to check.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700138 if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
Brian Silvermand05b8192019-12-22 01:06:56 -0800139 // Otherwise, we have started. The queue can't have any more
140 // entries than this.
Austin Schuh20b2b082019-09-11 20:42:56 -0700141 EXPECT_GE(started_writes, latest_queue_index + 1);
142 }
143 } else {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700144 EXPECT_NE(latest_queue_index_queue_index, QueueIndex::Invalid());
Austin Schuh20b2b082019-09-11 20:42:56 -0700145 // latest_queue_index is an index, not a count. So it always reads 1
146 // low.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700147 if (check_writes_and_reads_) {
148 EXPECT_GE(latest_queue_index + 1, finished_writes);
149 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700150 }
151 }
152 }
153 });
154
155 // Build up each thread and kick it off.
156 int thread_index = 0;
157 for (ThreadState &t : threads) {
158 if (will_wrap) {
159 t.event_count = ::std::numeric_limits<uint64_t>::max();
160 } else {
161 t.event_count = 0;
162 }
Brian Silverman177567e2020-08-12 19:51:33 -0700163 t.thread = ::std::thread([this, &t, thread_index, &run,
164 write_wrap_count]() {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700165 LocklessQueueSender sender =
166 LocklessQueueSender::Make(queue_, channel_storage_duration_).value();
Brian Silverman177567e2020-08-12 19:51:33 -0700167 CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
Austin Schuh20b2b082019-09-11 20:42:56 -0700168
Brian Silverman177567e2020-08-12 19:51:33 -0700169 // Signal that we are ready to start sending.
170 t.ready.Set();
Austin Schuh20b2b082019-09-11 20:42:56 -0700171
Brian Silverman177567e2020-08-12 19:51:33 -0700172 // Wait until signaled to start running.
173 run.Wait();
Austin Schuh20b2b082019-09-11 20:42:56 -0700174
Brian Silverman177567e2020-08-12 19:51:33 -0700175 // Gogogo!
176 for (uint64_t i = 0;
177 i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count);
178 ++i) {
179 char *const data = static_cast<char *>(sender.Data()) + sender.size() -
180 sizeof(ThreadPlusCount);
181 const char fill = (i + 55) & 0xFF;
182 memset(data, fill, sizeof(ThreadPlusCount));
183 {
184 bool found_nonzero = false;
185 for (size_t i = 0; i < sizeof(ThreadPlusCount); ++i) {
186 if (data[i] != fill) {
187 found_nonzero = true;
Austin Schuh20b2b082019-09-11 20:42:56 -0700188 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700189 }
Brian Silverman177567e2020-08-12 19:51:33 -0700190 CHECK(!found_nonzero) << ": Somebody else is writing to our buffer";
191 }
192
193 ThreadPlusCount tpc;
194 tpc.thread = thread_index;
195 tpc.count = i;
196
197 memcpy(data, &tpc, sizeof(ThreadPlusCount));
198
199 if (i % 0x800000 == 0x100000) {
200 fprintf(
201 stderr, "Sent %" PRIu64 ", %f %%\n", i,
202 static_cast<double>(i) /
203 static_cast<double>(num_messages_ * (1 + write_wrap_count)) *
204 100.0);
205 }
206
207 ++started_writes_;
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700208 auto result =
209 sender.Send(sizeof(ThreadPlusCount), aos::monotonic_clock::min_time,
Austin Schuhac6d89e2024-03-27 14:56:09 -0700210 aos::realtime_clock::min_time,
211 aos::monotonic_clock::min_time, 0xffffffff,
Austin Schuh82ea7382023-07-14 15:17:34 -0700212 UUID::FromSpan(absl::Span<const uint8_t>(
213 reinterpret_cast<const uint8_t *>(&tpc),
214 sizeof(ThreadPlusCount))),
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700215 nullptr, nullptr, nullptr);
216
217 CHECK(std::find(expected_send_results_.begin(),
218 expected_send_results_.end(),
219 result) != expected_send_results_.end())
220 << "Unexpected send result: " << result;
221
Brian Silverman177567e2020-08-12 19:51:33 -0700222 // Blank out the new scratch buffer, to catch other people using it.
223 {
224 char *const new_data = static_cast<char *>(sender.Data()) +
225 sender.size() - sizeof(ThreadPlusCount);
226 const char new_fill = ~fill;
227 memset(new_data, new_fill, sizeof(ThreadPlusCount));
228 }
229 ++finished_writes_;
230 }
231 });
Austin Schuh20b2b082019-09-11 20:42:56 -0700232 ++thread_index;
233 }
234
235 // Wait until all the threads are ready.
236 for (ThreadState &t : threads) {
237 t.ready.Wait();
238 }
239
240 // And start them racing.
241 run.Set();
242
243 // Let all the threads finish before reading if we are supposed to not be
244 // racing reads.
245 if (!race_reads) {
246 for (ThreadState &t : threads) {
247 t.thread.join();
248 }
249 poll_index = false;
250 queue_index_racer.join();
251 }
252
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700253 if (check_writes_and_reads_) {
Austin Schuh82ea7382023-07-14 15:17:34 -0700254 CheckReads(race_reads, write_wrap_count, &threads, set_should_read,
255 should_read_result);
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700256 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700257
258 // Reap all the threads.
259 if (race_reads) {
260 for (ThreadState &t : threads) {
261 t.thread.join();
262 }
263 poll_index = false;
264 queue_index_racer.join();
265 }
266
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700267 if (check_writes_and_reads_) {
268 // Confirm that the number of writes matches the expected number of writes.
269 ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
270 started_writes_);
271 ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
272 finished_writes_);
Austin Schuh20b2b082019-09-11 20:42:56 -0700273
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700274 // And that every thread sent the right number of messages.
275 for (ThreadState &t : threads) {
276 if (will_wrap) {
277 if (!race_reads) {
278 // If we are wrapping, there is a possibility that a thread writes
279 // everything *before* we can read any of it, and it all gets
280 // overwritten.
281 ASSERT_TRUE(t.event_count == ::std::numeric_limits<uint64_t>::max() ||
282 t.event_count == (1 + write_wrap_count) * num_messages_)
283 << ": Got " << t.event_count << " events, expected "
284 << (1 + write_wrap_count) * num_messages_;
285 }
286 } else {
287 ASSERT_EQ(t.event_count, num_messages_);
Austin Schuh20b2b082019-09-11 20:42:56 -0700288 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700289 }
290 }
291}
292
293void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
Austin Schuh82ea7382023-07-14 15:17:34 -0700294 ::std::vector<ThreadState> *threads,
295 bool set_should_read, bool should_read_result) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700296 // Now read back the results to double check.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700297 LocklessQueueReader reader(queue_);
298 const bool will_wrap = num_messages_ * num_threads_ * (1 + write_wrap_count) >
299 LocklessQueueSize(queue_.memory());
Austin Schuh20b2b082019-09-11 20:42:56 -0700300
301 monotonic_clock::time_point last_monotonic_sent_time =
302 monotonic_clock::epoch();
303 uint64_t initial_i = 0;
304 if (will_wrap) {
305 initial_i = (1 + write_wrap_count) * num_messages_ * num_threads_ -
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700306 LocklessQueueSize(queue_.memory());
Austin Schuh20b2b082019-09-11 20:42:56 -0700307 }
308
Austin Schuh82ea7382023-07-14 15:17:34 -0700309 std::function<bool(const Context &)> nop;
310
311 Context fetched_context;
312 std::function<bool(const Context &)> should_read =
313 [&should_read_result, &fetched_context](const Context &context) {
314 fetched_context = context;
315 return should_read_result;
316 };
317
Austin Schuh20b2b082019-09-11 20:42:56 -0700318 for (uint64_t i = initial_i;
319 i < (1 + write_wrap_count) * num_messages_ * num_threads_; ++i) {
Austin Schuhb5c6f972021-03-14 21:53:07 -0700320 monotonic_clock::time_point monotonic_sent_time;
321 realtime_clock::time_point realtime_sent_time;
322 monotonic_clock::time_point monotonic_remote_time;
Austin Schuhac6d89e2024-03-27 14:56:09 -0700323 monotonic_clock::time_point monotonic_remote_transmit_time;
Austin Schuhb5c6f972021-03-14 21:53:07 -0700324 realtime_clock::time_point realtime_remote_time;
Austin Schuha9012be2021-07-21 15:19:11 -0700325 UUID source_boot_uuid;
Austin Schuhad154822019-12-27 15:45:13 -0800326 uint32_t remote_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -0700327 size_t length;
328 char read_data[1024];
329
330 // Handle overflowing the message count for the wrap test.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700331 const uint32_t wrapped_i =
332 i % static_cast<size_t>(QueueIndex::MaxIndex(
333 0xffffffffu, LocklessQueueSize(queue_.memory())));
Austin Schuh0bd410a2023-11-05 12:38:12 -0800334 LocklessQueueReader::Result read_result =
335 set_should_read
Austin Schuhac6d89e2024-03-27 14:56:09 -0700336 ? reader.Read(
337 wrapped_i, &monotonic_sent_time, &realtime_sent_time,
338 &monotonic_remote_time, &monotonic_remote_transmit_time,
339 &realtime_remote_time, &remote_queue_index, &source_boot_uuid,
340 &length, &(read_data[0]), std::ref(should_read))
Austin Schuh0bd410a2023-11-05 12:38:12 -0800341 : reader.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
Austin Schuhac6d89e2024-03-27 14:56:09 -0700342 &monotonic_remote_time,
343 &monotonic_remote_transmit_time,
344 &realtime_remote_time, &remote_queue_index,
345 &source_boot_uuid, &length, &(read_data[0]), nop);
Austin Schuh20b2b082019-09-11 20:42:56 -0700346
Austin Schuh82ea7382023-07-14 15:17:34 -0700347 // The code in lockless_queue.cc reads everything but data, checks that the
348 // header hasn't changed, then reads the data. So, if we succeed and both
349 // end up not being corrupted, then we've confirmed everything works.
350 //
351 // Feed in both combos of should_read and whether or not to return true or
352 // false from should_read. By capturing the header values inside the
353 // callback, we can also verify the state in the middle of the process to
354 // make sure we have the right boundaries.
Austin Schuh20b2b082019-09-11 20:42:56 -0700355 if (race_reads) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700356 if (read_result == LocklessQueueReader::Result::NOTHING_NEW) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700357 --i;
358 continue;
359 }
360 }
361
362 if (race_reads && will_wrap) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700363 if (read_result == LocklessQueueReader::Result::TOO_OLD) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700364 continue;
365 }
366 }
Austin Schuh82ea7382023-07-14 15:17:34 -0700367
368 if (!set_should_read) {
369 // Every message should be good.
370 ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD)
371 << ": i is " << i;
372 } else {
373 if (should_read_result) {
374 ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD)
375 << ": i is " << i;
376
377 ASSERT_EQ(monotonic_sent_time, fetched_context.monotonic_event_time);
378 ASSERT_EQ(realtime_sent_time, fetched_context.realtime_event_time);
379 ASSERT_EQ(monotonic_remote_time, fetched_context.monotonic_remote_time);
380 ASSERT_EQ(realtime_remote_time, fetched_context.realtime_remote_time);
381 ASSERT_EQ(source_boot_uuid, fetched_context.source_boot_uuid);
382 ASSERT_EQ(remote_queue_index, fetched_context.remote_queue_index);
383 ASSERT_EQ(length, fetched_context.size);
384
385 ASSERT_EQ(
386 absl::Span<const uint8_t>(
387 reinterpret_cast<const uint8_t *>(
388 read_data + LocklessQueueMessageDataSize(queue_.memory()) -
389 length),
390 length),
391 source_boot_uuid.span());
392 } else {
393 ASSERT_EQ(read_result, LocklessQueueReader::Result::FILTERED);
394 monotonic_sent_time = fetched_context.monotonic_event_time;
395 realtime_sent_time = fetched_context.realtime_event_time;
396 monotonic_remote_time = fetched_context.monotonic_remote_time;
397 realtime_remote_time = fetched_context.realtime_remote_time;
398 source_boot_uuid = fetched_context.source_boot_uuid;
399 remote_queue_index = fetched_context.remote_queue_index;
400 length = fetched_context.size;
401 }
402 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700403
404 // And, confirm that time never went backwards.
405 ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
406 last_monotonic_sent_time = monotonic_sent_time;
407
Austin Schuh82ea7382023-07-14 15:17:34 -0700408 ASSERT_EQ(monotonic_remote_time, aos::monotonic_clock::min_time);
409 ASSERT_EQ(realtime_remote_time, aos::realtime_clock::min_time);
Austin Schuhad154822019-12-27 15:45:13 -0800410
Austin Schuh20b2b082019-09-11 20:42:56 -0700411 ThreadPlusCount tpc;
Austin Schuh82ea7382023-07-14 15:17:34 -0700412 ASSERT_EQ(source_boot_uuid.span().size(), sizeof(ThreadPlusCount));
413 memcpy(&tpc, source_boot_uuid.span().data(),
414 source_boot_uuid.span().size());
Austin Schuh20b2b082019-09-11 20:42:56 -0700415
416 if (will_wrap) {
417 // The queue won't chang out from under us, so we should get some amount
418 // of the tail end of the messages from a a thread.
419 // Confirm that once we get our first message, they all show up.
420 if ((*threads)[tpc.thread].event_count ==
421 ::std::numeric_limits<uint64_t>::max()) {
422 (*threads)[tpc.thread].event_count = tpc.count;
423 }
424
425 if (race_reads) {
426 // Make sure nothing goes backwards. Really not much we can do here.
Brian Silverman177567e2020-08-12 19:51:33 -0700427 ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count)
428 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700429 (*threads)[tpc.thread].event_count = tpc.count;
430 } else {
431 // Make sure nothing goes backwards. Really not much we can do here.
Brian Silverman177567e2020-08-12 19:51:33 -0700432 ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
433 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700434 }
435 } else {
436 // Confirm that we see every message counter from every thread.
Brian Silverman177567e2020-08-12 19:51:33 -0700437 ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
438 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700439 }
440 ++(*threads)[tpc.thread].event_count;
441 }
442}
443
Stephan Pleinesf63bde82024-01-13 15:59:33 -0800444} // namespace aos::ipc_lib