blob: 7c0408d0259316bf8552cbf98b119c41c2950d7e [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#include "aos/ipc_lib/queue_racer.h"
2
Tyler Chatowbf0609c2021-07-31 16:13:27 -07003#include <cinttypes>
4#include <cstring>
Austin Schuh20b2b082019-09-11 20:42:56 -07005#include <limits>
6
Austin Schuh20b2b082019-09-11 20:42:56 -07007#include "gtest/gtest.h"
8
Philipp Schrader790cb542023-07-05 21:06:52 -07009#include "aos/ipc_lib/event.h"
10
Austin Schuh20b2b082019-09-11 20:42:56 -070011namespace aos {
12namespace ipc_lib {
13namespace {
14
15struct ThreadPlusCount {
16 int thread;
17 uint64_t count;
18};
19
20} // namespace
21
22struct ThreadState {
23 ::std::thread thread;
24 Event ready;
25 uint64_t event_count = ::std::numeric_limits<uint64_t>::max();
26};
27
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070028QueueRacer::QueueRacer(LocklessQueue queue, int num_threads,
29 uint64_t num_messages)
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070030 : queue_(queue),
31 num_threads_(num_threads),
32 num_messages_(num_messages),
33 channel_storage_duration_(std::chrono::nanoseconds(1)),
34 expected_send_results_({LocklessQueueSender::Result::GOOD}),
35 check_writes_and_reads_(true) {
36 Reset();
37}
38
39QueueRacer::QueueRacer(LocklessQueue queue,
40 const QueueRacerConfiguration &config)
41 : queue_(queue),
42 num_threads_(config.num_threads),
43 num_messages_(config.num_messages),
44 channel_storage_duration_(config.channel_storage_duration),
45 expected_send_results_(config.expected_send_results),
46 check_writes_and_reads_(config.check_writes_and_reads) {
Austin Schuh20b2b082019-09-11 20:42:56 -070047 Reset();
48}
49
50void QueueRacer::RunIteration(bool race_reads, int write_wrap_count) {
51 const bool will_wrap = num_messages_ * num_threads_ *
52 static_cast<uint64_t>(1 + write_wrap_count) >
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070053 queue_.config().queue_size;
Austin Schuh20b2b082019-09-11 20:42:56 -070054
55 // Clear out shmem.
56 Reset();
57 started_writes_ = 0;
58 finished_writes_ = 0;
59
60 // Event used to start all the threads processing at once.
61 Event run;
62
Brian Silvermand05b8192019-12-22 01:06:56 -080063 ::std::atomic<bool> poll_index{true};
Austin Schuh20b2b082019-09-11 20:42:56 -070064
65 // List of threads.
66 ::std::vector<ThreadState> threads(num_threads_);
67
68 ::std::thread queue_index_racer([this, &poll_index]() {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070069 LocklessQueueReader reader(queue_);
Austin Schuh20b2b082019-09-11 20:42:56 -070070
71 // Track the number of times we wrap, and cache the modulo.
72 uint64_t wrap_count = 0;
73 uint32_t last_queue_index = 0;
74 const uint32_t max_queue_index =
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070075 QueueIndex::MaxIndex(0xffffffffu, queue_.config().queue_size);
Austin Schuh20b2b082019-09-11 20:42:56 -070076 while (poll_index) {
77 // We want to read everything backwards. This will give us conservative
78 // bounds. And with enough time and randomness, we will see all the cases
79 // we care to see.
80
81 // These 3 numbers look at the same thing, but at different points of time
82 // in the process. The process (essentially) looks like:
83 //
84 // ++started_writes;
85 // ++latest_queue_index;
86 // ++finished_writes;
87 //
88 // We want to check that latest_queue_index is bounded by the number of
89 // writes started and finished. Basically, we can say that
90 // finished_writes < latest_queue_index always. And
91 // latest_queue_index < started_writes. And everything always increases.
92 // So, if we let more time elapse between sampling finished_writes and
93 // latest_queue_index, we will only be relaxing our bounds, not
94 // invalidating the check. The same goes for started_writes.
95 //
96 // So, grab them in order.
97 const uint64_t finished_writes = finished_writes_.load();
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070098 const QueueIndex latest_queue_index_queue_index = reader.LatestIndex();
Austin Schuh20b2b082019-09-11 20:42:56 -070099 const uint64_t started_writes = started_writes_.load();
100
Alex Perrycb7da4b2019-08-28 19:35:56 -0700101 const uint32_t latest_queue_index_uint32_t =
Brian Silvermand05b8192019-12-22 01:06:56 -0800102 latest_queue_index_queue_index.index();
Austin Schuh20b2b082019-09-11 20:42:56 -0700103 uint64_t latest_queue_index = latest_queue_index_uint32_t;
104
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700105 if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700106 // If we got smaller, we wrapped.
107 if (latest_queue_index_uint32_t < last_queue_index) {
108 ++wrap_count;
109 }
110 // And apply it.
111 latest_queue_index +=
112 static_cast<uint64_t>(max_queue_index) * wrap_count;
113 last_queue_index = latest_queue_index_uint32_t;
114 }
115
116 // For grins, check that we have always started more than we finished.
117 // Should never fail.
118 EXPECT_GE(started_writes, finished_writes);
119
120 // If we are at the beginning, the queue needs to always return empty.
121 if (started_writes == 0) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700122 EXPECT_EQ(latest_queue_index_queue_index, QueueIndex::Invalid());
Austin Schuh20b2b082019-09-11 20:42:56 -0700123 EXPECT_EQ(finished_writes, 0);
124 } else {
125 if (finished_writes == 0) {
Brian Silvermand05b8192019-12-22 01:06:56 -0800126 // Plausible to be at the beginning, in which case we don't have
127 // anything to check.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700128 if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
Brian Silvermand05b8192019-12-22 01:06:56 -0800129 // Otherwise, we have started. The queue can't have any more
130 // entries than this.
Austin Schuh20b2b082019-09-11 20:42:56 -0700131 EXPECT_GE(started_writes, latest_queue_index + 1);
132 }
133 } else {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700134 EXPECT_NE(latest_queue_index_queue_index, QueueIndex::Invalid());
Austin Schuh20b2b082019-09-11 20:42:56 -0700135 // latest_queue_index is an index, not a count. So it always reads 1
136 // low.
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700137 if (check_writes_and_reads_) {
138 EXPECT_GE(latest_queue_index + 1, finished_writes);
139 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700140 }
141 }
142 }
143 });
144
145 // Build up each thread and kick it off.
146 int thread_index = 0;
147 for (ThreadState &t : threads) {
148 if (will_wrap) {
149 t.event_count = ::std::numeric_limits<uint64_t>::max();
150 } else {
151 t.event_count = 0;
152 }
Brian Silverman177567e2020-08-12 19:51:33 -0700153 t.thread = ::std::thread([this, &t, thread_index, &run,
154 write_wrap_count]() {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700155 LocklessQueueSender sender =
156 LocklessQueueSender::Make(queue_, channel_storage_duration_).value();
Brian Silverman177567e2020-08-12 19:51:33 -0700157 CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
Austin Schuh20b2b082019-09-11 20:42:56 -0700158
Brian Silverman177567e2020-08-12 19:51:33 -0700159 // Signal that we are ready to start sending.
160 t.ready.Set();
Austin Schuh20b2b082019-09-11 20:42:56 -0700161
Brian Silverman177567e2020-08-12 19:51:33 -0700162 // Wait until signaled to start running.
163 run.Wait();
Austin Schuh20b2b082019-09-11 20:42:56 -0700164
Brian Silverman177567e2020-08-12 19:51:33 -0700165 // Gogogo!
166 for (uint64_t i = 0;
167 i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count);
168 ++i) {
169 char *const data = static_cast<char *>(sender.Data()) + sender.size() -
170 sizeof(ThreadPlusCount);
171 const char fill = (i + 55) & 0xFF;
172 memset(data, fill, sizeof(ThreadPlusCount));
173 {
174 bool found_nonzero = false;
175 for (size_t i = 0; i < sizeof(ThreadPlusCount); ++i) {
176 if (data[i] != fill) {
177 found_nonzero = true;
Austin Schuh20b2b082019-09-11 20:42:56 -0700178 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700179 }
Brian Silverman177567e2020-08-12 19:51:33 -0700180 CHECK(!found_nonzero) << ": Somebody else is writing to our buffer";
181 }
182
183 ThreadPlusCount tpc;
184 tpc.thread = thread_index;
185 tpc.count = i;
186
187 memcpy(data, &tpc, sizeof(ThreadPlusCount));
188
189 if (i % 0x800000 == 0x100000) {
190 fprintf(
191 stderr, "Sent %" PRIu64 ", %f %%\n", i,
192 static_cast<double>(i) /
193 static_cast<double>(num_messages_ * (1 + write_wrap_count)) *
194 100.0);
195 }
196
197 ++started_writes_;
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700198 auto result =
199 sender.Send(sizeof(ThreadPlusCount), aos::monotonic_clock::min_time,
200 aos::realtime_clock::min_time, 0xffffffff, UUID::Zero(),
201 nullptr, nullptr, nullptr);
202
203 CHECK(std::find(expected_send_results_.begin(),
204 expected_send_results_.end(),
205 result) != expected_send_results_.end())
206 << "Unexpected send result: " << result;
207
Brian Silverman177567e2020-08-12 19:51:33 -0700208 // Blank out the new scratch buffer, to catch other people using it.
209 {
210 char *const new_data = static_cast<char *>(sender.Data()) +
211 sender.size() - sizeof(ThreadPlusCount);
212 const char new_fill = ~fill;
213 memset(new_data, new_fill, sizeof(ThreadPlusCount));
214 }
215 ++finished_writes_;
216 }
217 });
Austin Schuh20b2b082019-09-11 20:42:56 -0700218 ++thread_index;
219 }
220
221 // Wait until all the threads are ready.
222 for (ThreadState &t : threads) {
223 t.ready.Wait();
224 }
225
226 // And start them racing.
227 run.Set();
228
229 // Let all the threads finish before reading if we are supposed to not be
230 // racing reads.
231 if (!race_reads) {
232 for (ThreadState &t : threads) {
233 t.thread.join();
234 }
235 poll_index = false;
236 queue_index_racer.join();
237 }
238
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700239 if (check_writes_and_reads_) {
240 CheckReads(race_reads, write_wrap_count, &threads);
241 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700242
243 // Reap all the threads.
244 if (race_reads) {
245 for (ThreadState &t : threads) {
246 t.thread.join();
247 }
248 poll_index = false;
249 queue_index_racer.join();
250 }
251
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700252 if (check_writes_and_reads_) {
253 // Confirm that the number of writes matches the expected number of writes.
254 ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
255 started_writes_);
256 ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
257 finished_writes_);
Austin Schuh20b2b082019-09-11 20:42:56 -0700258
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700259 // And that every thread sent the right number of messages.
260 for (ThreadState &t : threads) {
261 if (will_wrap) {
262 if (!race_reads) {
263 // If we are wrapping, there is a possibility that a thread writes
264 // everything *before* we can read any of it, and it all gets
265 // overwritten.
266 ASSERT_TRUE(t.event_count == ::std::numeric_limits<uint64_t>::max() ||
267 t.event_count == (1 + write_wrap_count) * num_messages_)
268 << ": Got " << t.event_count << " events, expected "
269 << (1 + write_wrap_count) * num_messages_;
270 }
271 } else {
272 ASSERT_EQ(t.event_count, num_messages_);
Austin Schuh20b2b082019-09-11 20:42:56 -0700273 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700274 }
275 }
276}
277
278void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
279 ::std::vector<ThreadState> *threads) {
280 // Now read back the results to double check.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700281 LocklessQueueReader reader(queue_);
282 const bool will_wrap = num_messages_ * num_threads_ * (1 + write_wrap_count) >
283 LocklessQueueSize(queue_.memory());
Austin Schuh20b2b082019-09-11 20:42:56 -0700284
285 monotonic_clock::time_point last_monotonic_sent_time =
286 monotonic_clock::epoch();
287 uint64_t initial_i = 0;
288 if (will_wrap) {
289 initial_i = (1 + write_wrap_count) * num_messages_ * num_threads_ -
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700290 LocklessQueueSize(queue_.memory());
Austin Schuh20b2b082019-09-11 20:42:56 -0700291 }
292
293 for (uint64_t i = initial_i;
294 i < (1 + write_wrap_count) * num_messages_ * num_threads_; ++i) {
Austin Schuhb5c6f972021-03-14 21:53:07 -0700295 monotonic_clock::time_point monotonic_sent_time;
296 realtime_clock::time_point realtime_sent_time;
297 monotonic_clock::time_point monotonic_remote_time;
298 realtime_clock::time_point realtime_remote_time;
Austin Schuha9012be2021-07-21 15:19:11 -0700299 UUID source_boot_uuid;
Austin Schuhad154822019-12-27 15:45:13 -0800300 uint32_t remote_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -0700301 size_t length;
302 char read_data[1024];
303
304 // Handle overflowing the message count for the wrap test.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700305 const uint32_t wrapped_i =
306 i % static_cast<size_t>(QueueIndex::MaxIndex(
307 0xffffffffu, LocklessQueueSize(queue_.memory())));
Austin Schuh8902fa52021-03-14 22:39:24 -0700308 LocklessQueueReader::Result read_result = reader.Read(
309 wrapped_i, &monotonic_sent_time, &realtime_sent_time,
310 &monotonic_remote_time, &realtime_remote_time, &remote_queue_index,
Austin Schuha9012be2021-07-21 15:19:11 -0700311 &source_boot_uuid, &length, &(read_data[0]));
Austin Schuh20b2b082019-09-11 20:42:56 -0700312
313 if (race_reads) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700314 if (read_result == LocklessQueueReader::Result::NOTHING_NEW) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700315 --i;
316 continue;
317 }
318 }
319
320 if (race_reads && will_wrap) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700321 if (read_result == LocklessQueueReader::Result::TOO_OLD) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700322 continue;
323 }
324 }
325 // Every message should be good.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700326 ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD) << ": i is " << i;
Austin Schuh20b2b082019-09-11 20:42:56 -0700327
328 // And, confirm that time never went backwards.
329 ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
330 last_monotonic_sent_time = monotonic_sent_time;
331
Austin Schuhad154822019-12-27 15:45:13 -0800332 EXPECT_EQ(monotonic_remote_time, aos::monotonic_clock::min_time);
333 EXPECT_EQ(realtime_remote_time, aos::realtime_clock::min_time);
Austin Schuha9012be2021-07-21 15:19:11 -0700334 EXPECT_EQ(source_boot_uuid, UUID::Zero());
Austin Schuhad154822019-12-27 15:45:13 -0800335
Austin Schuh20b2b082019-09-11 20:42:56 -0700336 ThreadPlusCount tpc;
337 ASSERT_EQ(length, sizeof(ThreadPlusCount));
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700338 memcpy(&tpc,
339 read_data + LocklessQueueMessageDataSize(queue_.memory()) - length,
Austin Schuh67420a42019-12-21 21:55:04 -0800340 sizeof(ThreadPlusCount));
Austin Schuh20b2b082019-09-11 20:42:56 -0700341
342 if (will_wrap) {
343 // The queue won't chang out from under us, so we should get some amount
344 // of the tail end of the messages from a a thread.
345 // Confirm that once we get our first message, they all show up.
346 if ((*threads)[tpc.thread].event_count ==
347 ::std::numeric_limits<uint64_t>::max()) {
348 (*threads)[tpc.thread].event_count = tpc.count;
349 }
350
351 if (race_reads) {
352 // Make sure nothing goes backwards. Really not much we can do here.
Brian Silverman177567e2020-08-12 19:51:33 -0700353 ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count)
354 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700355 (*threads)[tpc.thread].event_count = tpc.count;
356 } else {
357 // Make sure nothing goes backwards. Really not much we can do here.
Brian Silverman177567e2020-08-12 19:51:33 -0700358 ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
359 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700360 }
361 } else {
362 // Confirm that we see every message counter from every thread.
Brian Silverman177567e2020-08-12 19:51:33 -0700363 ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
364 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700365 }
366 ++(*threads)[tpc.thread].event_count;
367 }
368}
369
370} // namespace ipc_lib
371} // namespace aos