blob: 5b3d88bf63977962bb0e10a3209302ac6420c1af [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#include "aos/ipc_lib/queue_racer.h"
2
3#include <inttypes.h>
4#include <string.h>
Brian Silverman7b266d92021-02-17 21:24:02 -08005
Austin Schuh20b2b082019-09-11 20:42:56 -07006#include <limits>
7
Brian Silverman7b266d92021-02-17 21:24:02 -08008#include "aos/ipc_lib/event.h"
Austin Schuh20b2b082019-09-11 20:42:56 -07009#include "gtest/gtest.h"
10
11namespace aos {
12namespace ipc_lib {
13namespace {
14
15struct ThreadPlusCount {
16 int thread;
17 uint64_t count;
18};
19
20} // namespace
21
22struct ThreadState {
23 ::std::thread thread;
24 Event ready;
25 uint64_t event_count = ::std::numeric_limits<uint64_t>::max();
26};
27
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070028QueueRacer::QueueRacer(LocklessQueue queue, int num_threads,
29 uint64_t num_messages)
30 : queue_(queue), num_threads_(num_threads), num_messages_(num_messages) {
Austin Schuh20b2b082019-09-11 20:42:56 -070031 Reset();
32}
33
34void QueueRacer::RunIteration(bool race_reads, int write_wrap_count) {
35 const bool will_wrap = num_messages_ * num_threads_ *
36 static_cast<uint64_t>(1 + write_wrap_count) >
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070037 queue_.config().queue_size;
Austin Schuh20b2b082019-09-11 20:42:56 -070038
39 // Clear out shmem.
40 Reset();
41 started_writes_ = 0;
42 finished_writes_ = 0;
43
44 // Event used to start all the threads processing at once.
45 Event run;
46
Brian Silvermand05b8192019-12-22 01:06:56 -080047 ::std::atomic<bool> poll_index{true};
Austin Schuh20b2b082019-09-11 20:42:56 -070048
49 // List of threads.
50 ::std::vector<ThreadState> threads(num_threads_);
51
52 ::std::thread queue_index_racer([this, &poll_index]() {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070053 LocklessQueueReader reader(queue_);
Austin Schuh20b2b082019-09-11 20:42:56 -070054
55 // Track the number of times we wrap, and cache the modulo.
56 uint64_t wrap_count = 0;
57 uint32_t last_queue_index = 0;
58 const uint32_t max_queue_index =
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070059 QueueIndex::MaxIndex(0xffffffffu, queue_.config().queue_size);
Austin Schuh20b2b082019-09-11 20:42:56 -070060 while (poll_index) {
61 // We want to read everything backwards. This will give us conservative
62 // bounds. And with enough time and randomness, we will see all the cases
63 // we care to see.
64
65 // These 3 numbers look at the same thing, but at different points of time
66 // in the process. The process (essentially) looks like:
67 //
68 // ++started_writes;
69 // ++latest_queue_index;
70 // ++finished_writes;
71 //
72 // We want to check that latest_queue_index is bounded by the number of
73 // writes started and finished. Basically, we can say that
74 // finished_writes < latest_queue_index always. And
75 // latest_queue_index < started_writes. And everything always increases.
76 // So, if we let more time elapse between sampling finished_writes and
77 // latest_queue_index, we will only be relaxing our bounds, not
78 // invalidating the check. The same goes for started_writes.
79 //
80 // So, grab them in order.
81 const uint64_t finished_writes = finished_writes_.load();
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070082 const QueueIndex latest_queue_index_queue_index = reader.LatestIndex();
Austin Schuh20b2b082019-09-11 20:42:56 -070083 const uint64_t started_writes = started_writes_.load();
84
Alex Perrycb7da4b2019-08-28 19:35:56 -070085 const uint32_t latest_queue_index_uint32_t =
Brian Silvermand05b8192019-12-22 01:06:56 -080086 latest_queue_index_queue_index.index();
Austin Schuh20b2b082019-09-11 20:42:56 -070087 uint64_t latest_queue_index = latest_queue_index_uint32_t;
88
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070089 if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
Austin Schuh20b2b082019-09-11 20:42:56 -070090 // If we got smaller, we wrapped.
91 if (latest_queue_index_uint32_t < last_queue_index) {
92 ++wrap_count;
93 }
94 // And apply it.
95 latest_queue_index +=
96 static_cast<uint64_t>(max_queue_index) * wrap_count;
97 last_queue_index = latest_queue_index_uint32_t;
98 }
99
100 // For grins, check that we have always started more than we finished.
101 // Should never fail.
102 EXPECT_GE(started_writes, finished_writes);
103
104 // If we are at the beginning, the queue needs to always return empty.
105 if (started_writes == 0) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700106 EXPECT_EQ(latest_queue_index_queue_index, QueueIndex::Invalid());
Austin Schuh20b2b082019-09-11 20:42:56 -0700107 EXPECT_EQ(finished_writes, 0);
108 } else {
109 if (finished_writes == 0) {
Brian Silvermand05b8192019-12-22 01:06:56 -0800110 // Plausible to be at the beginning, in which case we don't have
111 // anything to check.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700112 if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
Brian Silvermand05b8192019-12-22 01:06:56 -0800113 // Otherwise, we have started. The queue can't have any more
114 // entries than this.
Austin Schuh20b2b082019-09-11 20:42:56 -0700115 EXPECT_GE(started_writes, latest_queue_index + 1);
116 }
117 } else {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700118 EXPECT_NE(latest_queue_index_queue_index, QueueIndex::Invalid());
Austin Schuh20b2b082019-09-11 20:42:56 -0700119 // latest_queue_index is an index, not a count. So it always reads 1
120 // low.
121 EXPECT_GE(latest_queue_index + 1, finished_writes);
122 }
123 }
124 }
125 });
126
127 // Build up each thread and kick it off.
128 int thread_index = 0;
129 for (ThreadState &t : threads) {
130 if (will_wrap) {
131 t.event_count = ::std::numeric_limits<uint64_t>::max();
132 } else {
133 t.event_count = 0;
134 }
Brian Silverman177567e2020-08-12 19:51:33 -0700135 t.thread = ::std::thread([this, &t, thread_index, &run,
136 write_wrap_count]() {
137 // Build up a sender.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700138 LocklessQueueSender sender = LocklessQueueSender::Make(queue_).value();
Brian Silverman177567e2020-08-12 19:51:33 -0700139 CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
Austin Schuh20b2b082019-09-11 20:42:56 -0700140
Brian Silverman177567e2020-08-12 19:51:33 -0700141 // Signal that we are ready to start sending.
142 t.ready.Set();
Austin Schuh20b2b082019-09-11 20:42:56 -0700143
Brian Silverman177567e2020-08-12 19:51:33 -0700144 // Wait until signaled to start running.
145 run.Wait();
Austin Schuh20b2b082019-09-11 20:42:56 -0700146
Brian Silverman177567e2020-08-12 19:51:33 -0700147 // Gogogo!
148 for (uint64_t i = 0;
149 i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count);
150 ++i) {
151 char *const data = static_cast<char *>(sender.Data()) + sender.size() -
152 sizeof(ThreadPlusCount);
153 const char fill = (i + 55) & 0xFF;
154 memset(data, fill, sizeof(ThreadPlusCount));
155 {
156 bool found_nonzero = false;
157 for (size_t i = 0; i < sizeof(ThreadPlusCount); ++i) {
158 if (data[i] != fill) {
159 found_nonzero = true;
Austin Schuh20b2b082019-09-11 20:42:56 -0700160 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700161 }
Brian Silverman177567e2020-08-12 19:51:33 -0700162 CHECK(!found_nonzero) << ": Somebody else is writing to our buffer";
163 }
164
165 ThreadPlusCount tpc;
166 tpc.thread = thread_index;
167 tpc.count = i;
168
169 memcpy(data, &tpc, sizeof(ThreadPlusCount));
170
171 if (i % 0x800000 == 0x100000) {
172 fprintf(
173 stderr, "Sent %" PRIu64 ", %f %%\n", i,
174 static_cast<double>(i) /
175 static_cast<double>(num_messages_ * (1 + write_wrap_count)) *
176 100.0);
177 }
178
179 ++started_writes_;
180 sender.Send(sizeof(ThreadPlusCount));
181 // Blank out the new scratch buffer, to catch other people using it.
182 {
183 char *const new_data = static_cast<char *>(sender.Data()) +
184 sender.size() - sizeof(ThreadPlusCount);
185 const char new_fill = ~fill;
186 memset(new_data, new_fill, sizeof(ThreadPlusCount));
187 }
188 ++finished_writes_;
189 }
190 });
Austin Schuh20b2b082019-09-11 20:42:56 -0700191 ++thread_index;
192 }
193
194 // Wait until all the threads are ready.
195 for (ThreadState &t : threads) {
196 t.ready.Wait();
197 }
198
199 // And start them racing.
200 run.Set();
201
202 // Let all the threads finish before reading if we are supposed to not be
203 // racing reads.
204 if (!race_reads) {
205 for (ThreadState &t : threads) {
206 t.thread.join();
207 }
208 poll_index = false;
209 queue_index_racer.join();
210 }
211
212 CheckReads(race_reads, write_wrap_count, &threads);
213
214 // Reap all the threads.
215 if (race_reads) {
216 for (ThreadState &t : threads) {
217 t.thread.join();
218 }
219 poll_index = false;
220 queue_index_racer.join();
221 }
222
223 // Confirm that the number of writes matches the expected number of writes.
224 ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
225 started_writes_);
226 ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
227 finished_writes_);
228
229 // And that every thread sent the right number of messages.
230 for (ThreadState &t : threads) {
231 if (will_wrap) {
232 if (!race_reads) {
233 // If we are wrapping, there is a possibility that a thread writes
234 // everything *before* we can read any of it, and it all gets
235 // overwritten.
236 ASSERT_TRUE(t.event_count == ::std::numeric_limits<uint64_t>::max() ||
237 t.event_count == (1 + write_wrap_count) * num_messages_)
238 << ": Got " << t.event_count << " events, expected "
239 << (1 + write_wrap_count) * num_messages_;
240 }
241 } else {
242 ASSERT_EQ(t.event_count, num_messages_);
243 }
244 }
245}
246
247void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
248 ::std::vector<ThreadState> *threads) {
249 // Now read back the results to double check.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700250 LocklessQueueReader reader(queue_);
251 const bool will_wrap = num_messages_ * num_threads_ * (1 + write_wrap_count) >
252 LocklessQueueSize(queue_.memory());
Austin Schuh20b2b082019-09-11 20:42:56 -0700253
254 monotonic_clock::time_point last_monotonic_sent_time =
255 monotonic_clock::epoch();
256 uint64_t initial_i = 0;
257 if (will_wrap) {
258 initial_i = (1 + write_wrap_count) * num_messages_ * num_threads_ -
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700259 LocklessQueueSize(queue_.memory());
Austin Schuh20b2b082019-09-11 20:42:56 -0700260 }
261
262 for (uint64_t i = initial_i;
263 i < (1 + write_wrap_count) * num_messages_ * num_threads_; ++i) {
264 ::aos::monotonic_clock::time_point monotonic_sent_time;
265 ::aos::realtime_clock::time_point realtime_sent_time;
Austin Schuhad154822019-12-27 15:45:13 -0800266 ::aos::monotonic_clock::time_point monotonic_remote_time;
267 ::aos::realtime_clock::time_point realtime_remote_time;
268 uint32_t remote_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -0700269 size_t length;
270 char read_data[1024];
271
272 // Handle overflowing the message count for the wrap test.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700273 const uint32_t wrapped_i =
274 i % static_cast<size_t>(QueueIndex::MaxIndex(
275 0xffffffffu, LocklessQueueSize(queue_.memory())));
276 LocklessQueueReader::Result read_result =
277 reader.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
278 &monotonic_remote_time, &realtime_remote_time,
279 &remote_queue_index, &length, &(read_data[0]));
Austin Schuh20b2b082019-09-11 20:42:56 -0700280
281 if (race_reads) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700282 if (read_result == LocklessQueueReader::Result::NOTHING_NEW) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700283 --i;
284 continue;
285 }
286 }
287
288 if (race_reads && will_wrap) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700289 if (read_result == LocklessQueueReader::Result::TOO_OLD) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700290 continue;
291 }
292 }
293 // Every message should be good.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700294 ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD) << ": i is " << i;
Austin Schuh20b2b082019-09-11 20:42:56 -0700295
296 // And, confirm that time never went backwards.
297 ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
298 last_monotonic_sent_time = monotonic_sent_time;
299
Austin Schuhad154822019-12-27 15:45:13 -0800300 EXPECT_EQ(monotonic_remote_time, aos::monotonic_clock::min_time);
301 EXPECT_EQ(realtime_remote_time, aos::realtime_clock::min_time);
302
Austin Schuh20b2b082019-09-11 20:42:56 -0700303 ThreadPlusCount tpc;
304 ASSERT_EQ(length, sizeof(ThreadPlusCount));
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700305 memcpy(&tpc,
306 read_data + LocklessQueueMessageDataSize(queue_.memory()) - length,
Austin Schuh67420a42019-12-21 21:55:04 -0800307 sizeof(ThreadPlusCount));
Austin Schuh20b2b082019-09-11 20:42:56 -0700308
309 if (will_wrap) {
310 // The queue won't chang out from under us, so we should get some amount
311 // of the tail end of the messages from a a thread.
312 // Confirm that once we get our first message, they all show up.
313 if ((*threads)[tpc.thread].event_count ==
314 ::std::numeric_limits<uint64_t>::max()) {
315 (*threads)[tpc.thread].event_count = tpc.count;
316 }
317
318 if (race_reads) {
319 // Make sure nothing goes backwards. Really not much we can do here.
Brian Silverman177567e2020-08-12 19:51:33 -0700320 ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count)
321 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700322 (*threads)[tpc.thread].event_count = tpc.count;
323 } else {
324 // Make sure nothing goes backwards. Really not much we can do here.
Brian Silverman177567e2020-08-12 19:51:33 -0700325 ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
326 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700327 }
328 } else {
329 // Confirm that we see every message counter from every thread.
Brian Silverman177567e2020-08-12 19:51:33 -0700330 ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
331 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700332 }
333 ++(*threads)[tpc.thread].event_count;
334 }
335}
336
337} // namespace ipc_lib
338} // namespace aos