blob: 414b7fbd1a0b9d694708e8eff215e6f82c390c94 [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#include "aos/ipc_lib/queue_racer.h"
2
Tyler Chatowbf0609c2021-07-31 16:13:27 -07003#include <cinttypes>
4#include <cstring>
Austin Schuh20b2b082019-09-11 20:42:56 -07005#include <limits>
6
Brian Silverman7b266d92021-02-17 21:24:02 -08007#include "aos/ipc_lib/event.h"
Austin Schuh20b2b082019-09-11 20:42:56 -07008#include "gtest/gtest.h"
9
10namespace aos {
11namespace ipc_lib {
12namespace {
13
14struct ThreadPlusCount {
15 int thread;
16 uint64_t count;
17};
18
19} // namespace
20
21struct ThreadState {
22 ::std::thread thread;
23 Event ready;
24 uint64_t event_count = ::std::numeric_limits<uint64_t>::max();
25};
26
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070027QueueRacer::QueueRacer(LocklessQueue queue, int num_threads,
28 uint64_t num_messages)
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070029 : queue_(queue),
30 num_threads_(num_threads),
31 num_messages_(num_messages),
32 channel_storage_duration_(std::chrono::nanoseconds(1)),
33 expected_send_results_({LocklessQueueSender::Result::GOOD}),
34 check_writes_and_reads_(true) {
35 Reset();
36}
37
38QueueRacer::QueueRacer(LocklessQueue queue,
39 const QueueRacerConfiguration &config)
40 : queue_(queue),
41 num_threads_(config.num_threads),
42 num_messages_(config.num_messages),
43 channel_storage_duration_(config.channel_storage_duration),
44 expected_send_results_(config.expected_send_results),
45 check_writes_and_reads_(config.check_writes_and_reads) {
Austin Schuh20b2b082019-09-11 20:42:56 -070046 Reset();
47}
48
49void QueueRacer::RunIteration(bool race_reads, int write_wrap_count) {
50 const bool will_wrap = num_messages_ * num_threads_ *
51 static_cast<uint64_t>(1 + write_wrap_count) >
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070052 queue_.config().queue_size;
Austin Schuh20b2b082019-09-11 20:42:56 -070053
54 // Clear out shmem.
55 Reset();
56 started_writes_ = 0;
57 finished_writes_ = 0;
58
59 // Event used to start all the threads processing at once.
60 Event run;
61
Brian Silvermand05b8192019-12-22 01:06:56 -080062 ::std::atomic<bool> poll_index{true};
Austin Schuh20b2b082019-09-11 20:42:56 -070063
64 // List of threads.
65 ::std::vector<ThreadState> threads(num_threads_);
66
67 ::std::thread queue_index_racer([this, &poll_index]() {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070068 LocklessQueueReader reader(queue_);
Austin Schuh20b2b082019-09-11 20:42:56 -070069
70 // Track the number of times we wrap, and cache the modulo.
71 uint64_t wrap_count = 0;
72 uint32_t last_queue_index = 0;
73 const uint32_t max_queue_index =
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070074 QueueIndex::MaxIndex(0xffffffffu, queue_.config().queue_size);
Austin Schuh20b2b082019-09-11 20:42:56 -070075 while (poll_index) {
76 // We want to read everything backwards. This will give us conservative
77 // bounds. And with enough time and randomness, we will see all the cases
78 // we care to see.
79
80 // These 3 numbers look at the same thing, but at different points of time
81 // in the process. The process (essentially) looks like:
82 //
83 // ++started_writes;
84 // ++latest_queue_index;
85 // ++finished_writes;
86 //
87 // We want to check that latest_queue_index is bounded by the number of
88 // writes started and finished. Basically, we can say that
89 // finished_writes < latest_queue_index always. And
90 // latest_queue_index < started_writes. And everything always increases.
91 // So, if we let more time elapse between sampling finished_writes and
92 // latest_queue_index, we will only be relaxing our bounds, not
93 // invalidating the check. The same goes for started_writes.
94 //
95 // So, grab them in order.
96 const uint64_t finished_writes = finished_writes_.load();
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070097 const QueueIndex latest_queue_index_queue_index = reader.LatestIndex();
Austin Schuh20b2b082019-09-11 20:42:56 -070098 const uint64_t started_writes = started_writes_.load();
99
Alex Perrycb7da4b2019-08-28 19:35:56 -0700100 const uint32_t latest_queue_index_uint32_t =
Brian Silvermand05b8192019-12-22 01:06:56 -0800101 latest_queue_index_queue_index.index();
Austin Schuh20b2b082019-09-11 20:42:56 -0700102 uint64_t latest_queue_index = latest_queue_index_uint32_t;
103
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700104 if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700105 // If we got smaller, we wrapped.
106 if (latest_queue_index_uint32_t < last_queue_index) {
107 ++wrap_count;
108 }
109 // And apply it.
110 latest_queue_index +=
111 static_cast<uint64_t>(max_queue_index) * wrap_count;
112 last_queue_index = latest_queue_index_uint32_t;
113 }
114
115 // For grins, check that we have always started more than we finished.
116 // Should never fail.
117 EXPECT_GE(started_writes, finished_writes);
118
119 // If we are at the beginning, the queue needs to always return empty.
120 if (started_writes == 0) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700121 EXPECT_EQ(latest_queue_index_queue_index, QueueIndex::Invalid());
Austin Schuh20b2b082019-09-11 20:42:56 -0700122 EXPECT_EQ(finished_writes, 0);
123 } else {
124 if (finished_writes == 0) {
Brian Silvermand05b8192019-12-22 01:06:56 -0800125 // Plausible to be at the beginning, in which case we don't have
126 // anything to check.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700127 if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
Brian Silvermand05b8192019-12-22 01:06:56 -0800128 // Otherwise, we have started. The queue can't have any more
129 // entries than this.
Austin Schuh20b2b082019-09-11 20:42:56 -0700130 EXPECT_GE(started_writes, latest_queue_index + 1);
131 }
132 } else {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700133 EXPECT_NE(latest_queue_index_queue_index, QueueIndex::Invalid());
Austin Schuh20b2b082019-09-11 20:42:56 -0700134 // latest_queue_index is an index, not a count. So it always reads 1
135 // low.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700136 if (check_writes_and_reads_) {
137 EXPECT_GE(latest_queue_index + 1, finished_writes);
138 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700139 }
140 }
141 }
142 });
143
144 // Build up each thread and kick it off.
145 int thread_index = 0;
146 for (ThreadState &t : threads) {
147 if (will_wrap) {
148 t.event_count = ::std::numeric_limits<uint64_t>::max();
149 } else {
150 t.event_count = 0;
151 }
Brian Silverman177567e2020-08-12 19:51:33 -0700152 t.thread = ::std::thread([this, &t, thread_index, &run,
153 write_wrap_count]() {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700154 LocklessQueueSender sender =
155 LocklessQueueSender::Make(queue_, channel_storage_duration_).value();
Brian Silverman177567e2020-08-12 19:51:33 -0700156 CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
Austin Schuh20b2b082019-09-11 20:42:56 -0700157
Brian Silverman177567e2020-08-12 19:51:33 -0700158 // Signal that we are ready to start sending.
159 t.ready.Set();
Austin Schuh20b2b082019-09-11 20:42:56 -0700160
Brian Silverman177567e2020-08-12 19:51:33 -0700161 // Wait until signaled to start running.
162 run.Wait();
Austin Schuh20b2b082019-09-11 20:42:56 -0700163
Brian Silverman177567e2020-08-12 19:51:33 -0700164 // Gogogo!
165 for (uint64_t i = 0;
166 i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count);
167 ++i) {
168 char *const data = static_cast<char *>(sender.Data()) + sender.size() -
169 sizeof(ThreadPlusCount);
170 const char fill = (i + 55) & 0xFF;
171 memset(data, fill, sizeof(ThreadPlusCount));
172 {
173 bool found_nonzero = false;
174 for (size_t i = 0; i < sizeof(ThreadPlusCount); ++i) {
175 if (data[i] != fill) {
176 found_nonzero = true;
Austin Schuh20b2b082019-09-11 20:42:56 -0700177 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700178 }
Brian Silverman177567e2020-08-12 19:51:33 -0700179 CHECK(!found_nonzero) << ": Somebody else is writing to our buffer";
180 }
181
182 ThreadPlusCount tpc;
183 tpc.thread = thread_index;
184 tpc.count = i;
185
186 memcpy(data, &tpc, sizeof(ThreadPlusCount));
187
188 if (i % 0x800000 == 0x100000) {
189 fprintf(
190 stderr, "Sent %" PRIu64 ", %f %%\n", i,
191 static_cast<double>(i) /
192 static_cast<double>(num_messages_ * (1 + write_wrap_count)) *
193 100.0);
194 }
195
196 ++started_writes_;
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700197 auto result =
198 sender.Send(sizeof(ThreadPlusCount), aos::monotonic_clock::min_time,
199 aos::realtime_clock::min_time, 0xffffffff, UUID::Zero(),
200 nullptr, nullptr, nullptr);
201
202 CHECK(std::find(expected_send_results_.begin(),
203 expected_send_results_.end(),
204 result) != expected_send_results_.end())
205 << "Unexpected send result: " << result;
206
Brian Silverman177567e2020-08-12 19:51:33 -0700207 // Blank out the new scratch buffer, to catch other people using it.
208 {
209 char *const new_data = static_cast<char *>(sender.Data()) +
210 sender.size() - sizeof(ThreadPlusCount);
211 const char new_fill = ~fill;
212 memset(new_data, new_fill, sizeof(ThreadPlusCount));
213 }
214 ++finished_writes_;
215 }
216 });
Austin Schuh20b2b082019-09-11 20:42:56 -0700217 ++thread_index;
218 }
219
220 // Wait until all the threads are ready.
221 for (ThreadState &t : threads) {
222 t.ready.Wait();
223 }
224
225 // And start them racing.
226 run.Set();
227
228 // Let all the threads finish before reading if we are supposed to not be
229 // racing reads.
230 if (!race_reads) {
231 for (ThreadState &t : threads) {
232 t.thread.join();
233 }
234 poll_index = false;
235 queue_index_racer.join();
236 }
237
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700238 if (check_writes_and_reads_) {
239 CheckReads(race_reads, write_wrap_count, &threads);
240 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700241
242 // Reap all the threads.
243 if (race_reads) {
244 for (ThreadState &t : threads) {
245 t.thread.join();
246 }
247 poll_index = false;
248 queue_index_racer.join();
249 }
250
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700251 if (check_writes_and_reads_) {
252 // Confirm that the number of writes matches the expected number of writes.
253 ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
254 started_writes_);
255 ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
256 finished_writes_);
Austin Schuh20b2b082019-09-11 20:42:56 -0700257
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700258 // And that every thread sent the right number of messages.
259 for (ThreadState &t : threads) {
260 if (will_wrap) {
261 if (!race_reads) {
262 // If we are wrapping, there is a possibility that a thread writes
263 // everything *before* we can read any of it, and it all gets
264 // overwritten.
265 ASSERT_TRUE(t.event_count == ::std::numeric_limits<uint64_t>::max() ||
266 t.event_count == (1 + write_wrap_count) * num_messages_)
267 << ": Got " << t.event_count << " events, expected "
268 << (1 + write_wrap_count) * num_messages_;
269 }
270 } else {
271 ASSERT_EQ(t.event_count, num_messages_);
Austin Schuh20b2b082019-09-11 20:42:56 -0700272 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700273 }
274 }
275}
276
277void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
278 ::std::vector<ThreadState> *threads) {
279 // Now read back the results to double check.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700280 LocklessQueueReader reader(queue_);
281 const bool will_wrap = num_messages_ * num_threads_ * (1 + write_wrap_count) >
282 LocklessQueueSize(queue_.memory());
Austin Schuh20b2b082019-09-11 20:42:56 -0700283
284 monotonic_clock::time_point last_monotonic_sent_time =
285 monotonic_clock::epoch();
286 uint64_t initial_i = 0;
287 if (will_wrap) {
288 initial_i = (1 + write_wrap_count) * num_messages_ * num_threads_ -
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700289 LocklessQueueSize(queue_.memory());
Austin Schuh20b2b082019-09-11 20:42:56 -0700290 }
291
292 for (uint64_t i = initial_i;
293 i < (1 + write_wrap_count) * num_messages_ * num_threads_; ++i) {
Austin Schuhb5c6f972021-03-14 21:53:07 -0700294 monotonic_clock::time_point monotonic_sent_time;
295 realtime_clock::time_point realtime_sent_time;
296 monotonic_clock::time_point monotonic_remote_time;
297 realtime_clock::time_point realtime_remote_time;
Austin Schuha9012be2021-07-21 15:19:11 -0700298 UUID source_boot_uuid;
Austin Schuhad154822019-12-27 15:45:13 -0800299 uint32_t remote_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -0700300 size_t length;
301 char read_data[1024];
302
303 // Handle overflowing the message count for the wrap test.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700304 const uint32_t wrapped_i =
305 i % static_cast<size_t>(QueueIndex::MaxIndex(
306 0xffffffffu, LocklessQueueSize(queue_.memory())));
Austin Schuh8902fa52021-03-14 22:39:24 -0700307 LocklessQueueReader::Result read_result = reader.Read(
308 wrapped_i, &monotonic_sent_time, &realtime_sent_time,
309 &monotonic_remote_time, &realtime_remote_time, &remote_queue_index,
Austin Schuha9012be2021-07-21 15:19:11 -0700310 &source_boot_uuid, &length, &(read_data[0]));
Austin Schuh20b2b082019-09-11 20:42:56 -0700311
312 if (race_reads) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700313 if (read_result == LocklessQueueReader::Result::NOTHING_NEW) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700314 --i;
315 continue;
316 }
317 }
318
319 if (race_reads && will_wrap) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700320 if (read_result == LocklessQueueReader::Result::TOO_OLD) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700321 continue;
322 }
323 }
324 // Every message should be good.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700325 ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD) << ": i is " << i;
Austin Schuh20b2b082019-09-11 20:42:56 -0700326
327 // And, confirm that time never went backwards.
328 ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
329 last_monotonic_sent_time = monotonic_sent_time;
330
Austin Schuhad154822019-12-27 15:45:13 -0800331 EXPECT_EQ(monotonic_remote_time, aos::monotonic_clock::min_time);
332 EXPECT_EQ(realtime_remote_time, aos::realtime_clock::min_time);
Austin Schuha9012be2021-07-21 15:19:11 -0700333 EXPECT_EQ(source_boot_uuid, UUID::Zero());
Austin Schuhad154822019-12-27 15:45:13 -0800334
Austin Schuh20b2b082019-09-11 20:42:56 -0700335 ThreadPlusCount tpc;
336 ASSERT_EQ(length, sizeof(ThreadPlusCount));
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700337 memcpy(&tpc,
338 read_data + LocklessQueueMessageDataSize(queue_.memory()) - length,
Austin Schuh67420a42019-12-21 21:55:04 -0800339 sizeof(ThreadPlusCount));
Austin Schuh20b2b082019-09-11 20:42:56 -0700340
341 if (will_wrap) {
342 // The queue won't chang out from under us, so we should get some amount
343 // of the tail end of the messages from a a thread.
344 // Confirm that once we get our first message, they all show up.
345 if ((*threads)[tpc.thread].event_count ==
346 ::std::numeric_limits<uint64_t>::max()) {
347 (*threads)[tpc.thread].event_count = tpc.count;
348 }
349
350 if (race_reads) {
351 // Make sure nothing goes backwards. Really not much we can do here.
Brian Silverman177567e2020-08-12 19:51:33 -0700352 ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count)
353 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700354 (*threads)[tpc.thread].event_count = tpc.count;
355 } else {
356 // Make sure nothing goes backwards. Really not much we can do here.
Brian Silverman177567e2020-08-12 19:51:33 -0700357 ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
358 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700359 }
360 } else {
361 // Confirm that we see every message counter from every thread.
Brian Silverman177567e2020-08-12 19:51:33 -0700362 ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
363 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700364 }
365 ++(*threads)[tpc.thread].event_count;
366 }
367}
368
369} // namespace ipc_lib
370} // namespace aos