blob: 69f5f219c8095791f9f128c868373e280e04d2ad [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#include "aos/ipc_lib/queue_racer.h"
2
3#include <inttypes.h>
4#include <string.h>
5#include <limits>
6
7#include "aos/event.h"
8#include "gtest/gtest.h"
9
10namespace aos {
11namespace ipc_lib {
12namespace {
13
14struct ThreadPlusCount {
15 int thread;
16 uint64_t count;
17};
18
19} // namespace
20
21struct ThreadState {
22 ::std::thread thread;
23 Event ready;
24 uint64_t event_count = ::std::numeric_limits<uint64_t>::max();
25};
26
27QueueRacer::QueueRacer(LocklessQueueMemory *memory, int num_threads,
28 uint64_t num_messages, LocklessQueueConfiguration config)
29 : memory_(memory),
30 num_threads_(num_threads),
31 num_messages_(num_messages),
32 config_(config) {
33 Reset();
34}
35
36void QueueRacer::RunIteration(bool race_reads, int write_wrap_count) {
37 const bool will_wrap = num_messages_ * num_threads_ *
38 static_cast<uint64_t>(1 + write_wrap_count) >
39 config_.queue_size;
40
41 // Clear out shmem.
42 Reset();
43 started_writes_ = 0;
44 finished_writes_ = 0;
45
46 // Event used to start all the threads processing at once.
47 Event run;
48
Brian Silvermand05b8192019-12-22 01:06:56 -080049 ::std::atomic<bool> poll_index{true};
Austin Schuh20b2b082019-09-11 20:42:56 -070050
51 // List of threads.
52 ::std::vector<ThreadState> threads(num_threads_);
53
54 ::std::thread queue_index_racer([this, &poll_index]() {
55 LocklessQueue queue(memory_, config_);
56
57 // Track the number of times we wrap, and cache the modulo.
58 uint64_t wrap_count = 0;
59 uint32_t last_queue_index = 0;
60 const uint32_t max_queue_index =
61 QueueIndex::MaxIndex(0xffffffffu, queue.QueueSize());
62 while (poll_index) {
63 // We want to read everything backwards. This will give us conservative
64 // bounds. And with enough time and randomness, we will see all the cases
65 // we care to see.
66
67 // These 3 numbers look at the same thing, but at different points of time
68 // in the process. The process (essentially) looks like:
69 //
70 // ++started_writes;
71 // ++latest_queue_index;
72 // ++finished_writes;
73 //
74 // We want to check that latest_queue_index is bounded by the number of
75 // writes started and finished. Basically, we can say that
76 // finished_writes < latest_queue_index always. And
77 // latest_queue_index < started_writes. And everything always increases.
78 // So, if we let more time elapse between sampling finished_writes and
79 // latest_queue_index, we will only be relaxing our bounds, not
80 // invalidating the check. The same goes for started_writes.
81 //
82 // So, grab them in order.
83 const uint64_t finished_writes = finished_writes_.load();
Alex Perrycb7da4b2019-08-28 19:35:56 -070084 const QueueIndex latest_queue_index_queue_index =
85 queue.LatestQueueIndex();
Austin Schuh20b2b082019-09-11 20:42:56 -070086 const uint64_t started_writes = started_writes_.load();
87
Alex Perrycb7da4b2019-08-28 19:35:56 -070088 const uint32_t latest_queue_index_uint32_t =
Brian Silvermand05b8192019-12-22 01:06:56 -080089 latest_queue_index_queue_index.index();
Austin Schuh20b2b082019-09-11 20:42:56 -070090 uint64_t latest_queue_index = latest_queue_index_uint32_t;
91
Brian Silvermand05b8192019-12-22 01:06:56 -080092 if (latest_queue_index_queue_index !=
93 LocklessQueue::empty_queue_index()) {
Austin Schuh20b2b082019-09-11 20:42:56 -070094 // If we got smaller, we wrapped.
95 if (latest_queue_index_uint32_t < last_queue_index) {
96 ++wrap_count;
97 }
98 // And apply it.
99 latest_queue_index +=
100 static_cast<uint64_t>(max_queue_index) * wrap_count;
101 last_queue_index = latest_queue_index_uint32_t;
102 }
103
104 // For grins, check that we have always started more than we finished.
105 // Should never fail.
106 EXPECT_GE(started_writes, finished_writes);
107
108 // If we are at the beginning, the queue needs to always return empty.
109 if (started_writes == 0) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700110 EXPECT_EQ(latest_queue_index_queue_index,
Austin Schuh20b2b082019-09-11 20:42:56 -0700111 LocklessQueue::empty_queue_index());
112 EXPECT_EQ(finished_writes, 0);
113 } else {
114 if (finished_writes == 0) {
Brian Silvermand05b8192019-12-22 01:06:56 -0800115 // Plausible to be at the beginning, in which case we don't have
116 // anything to check.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700117 if (latest_queue_index_queue_index !=
Austin Schuh20b2b082019-09-11 20:42:56 -0700118 LocklessQueue::empty_queue_index()) {
Brian Silvermand05b8192019-12-22 01:06:56 -0800119 // Otherwise, we have started. The queue can't have any more
120 // entries than this.
Austin Schuh20b2b082019-09-11 20:42:56 -0700121 EXPECT_GE(started_writes, latest_queue_index + 1);
122 }
123 } else {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700124 EXPECT_NE(latest_queue_index_queue_index,
Austin Schuh20b2b082019-09-11 20:42:56 -0700125 LocklessQueue::empty_queue_index());
126 // latest_queue_index is an index, not a count. So it always reads 1
127 // low.
128 EXPECT_GE(latest_queue_index + 1, finished_writes);
129 }
130 }
131 }
132 });
133
134 // Build up each thread and kick it off.
135 int thread_index = 0;
136 for (ThreadState &t : threads) {
137 if (will_wrap) {
138 t.event_count = ::std::numeric_limits<uint64_t>::max();
139 } else {
140 t.event_count = 0;
141 }
Brian Silverman177567e2020-08-12 19:51:33 -0700142 t.thread = ::std::thread([this, &t, thread_index, &run,
143 write_wrap_count]() {
144 // Build up a sender.
145 LocklessQueue queue(memory_, config_);
146 LocklessQueue::Sender sender = queue.MakeSender().value();
147 CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
Austin Schuh20b2b082019-09-11 20:42:56 -0700148
Brian Silverman177567e2020-08-12 19:51:33 -0700149 // Signal that we are ready to start sending.
150 t.ready.Set();
Austin Schuh20b2b082019-09-11 20:42:56 -0700151
Brian Silverman177567e2020-08-12 19:51:33 -0700152 // Wait until signaled to start running.
153 run.Wait();
Austin Schuh20b2b082019-09-11 20:42:56 -0700154
Brian Silverman177567e2020-08-12 19:51:33 -0700155 // Gogogo!
156 for (uint64_t i = 0;
157 i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count);
158 ++i) {
159 char *const data = static_cast<char *>(sender.Data()) + sender.size() -
160 sizeof(ThreadPlusCount);
161 const char fill = (i + 55) & 0xFF;
162 memset(data, fill, sizeof(ThreadPlusCount));
163 {
164 bool found_nonzero = false;
165 for (size_t i = 0; i < sizeof(ThreadPlusCount); ++i) {
166 if (data[i] != fill) {
167 found_nonzero = true;
Austin Schuh20b2b082019-09-11 20:42:56 -0700168 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700169 }
Brian Silverman177567e2020-08-12 19:51:33 -0700170 CHECK(!found_nonzero) << ": Somebody else is writing to our buffer";
171 }
172
173 ThreadPlusCount tpc;
174 tpc.thread = thread_index;
175 tpc.count = i;
176
177 memcpy(data, &tpc, sizeof(ThreadPlusCount));
178
179 if (i % 0x800000 == 0x100000) {
180 fprintf(
181 stderr, "Sent %" PRIu64 ", %f %%\n", i,
182 static_cast<double>(i) /
183 static_cast<double>(num_messages_ * (1 + write_wrap_count)) *
184 100.0);
185 }
186
187 ++started_writes_;
188 sender.Send(sizeof(ThreadPlusCount));
189 // Blank out the new scratch buffer, to catch other people using it.
190 {
191 char *const new_data = static_cast<char *>(sender.Data()) +
192 sender.size() - sizeof(ThreadPlusCount);
193 const char new_fill = ~fill;
194 memset(new_data, new_fill, sizeof(ThreadPlusCount));
195 }
196 ++finished_writes_;
197 }
198 });
Austin Schuh20b2b082019-09-11 20:42:56 -0700199 ++thread_index;
200 }
201
202 // Wait until all the threads are ready.
203 for (ThreadState &t : threads) {
204 t.ready.Wait();
205 }
206
207 // And start them racing.
208 run.Set();
209
210 // Let all the threads finish before reading if we are supposed to not be
211 // racing reads.
212 if (!race_reads) {
213 for (ThreadState &t : threads) {
214 t.thread.join();
215 }
216 poll_index = false;
217 queue_index_racer.join();
218 }
219
220 CheckReads(race_reads, write_wrap_count, &threads);
221
222 // Reap all the threads.
223 if (race_reads) {
224 for (ThreadState &t : threads) {
225 t.thread.join();
226 }
227 poll_index = false;
228 queue_index_racer.join();
229 }
230
231 // Confirm that the number of writes matches the expected number of writes.
232 ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
233 started_writes_);
234 ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
235 finished_writes_);
236
237 // And that every thread sent the right number of messages.
238 for (ThreadState &t : threads) {
239 if (will_wrap) {
240 if (!race_reads) {
241 // If we are wrapping, there is a possibility that a thread writes
242 // everything *before* we can read any of it, and it all gets
243 // overwritten.
244 ASSERT_TRUE(t.event_count == ::std::numeric_limits<uint64_t>::max() ||
245 t.event_count == (1 + write_wrap_count) * num_messages_)
246 << ": Got " << t.event_count << " events, expected "
247 << (1 + write_wrap_count) * num_messages_;
248 }
249 } else {
250 ASSERT_EQ(t.event_count, num_messages_);
251 }
252 }
253}
254
255void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
256 ::std::vector<ThreadState> *threads) {
257 // Now read back the results to double check.
258 LocklessQueue queue(memory_, config_);
259
260 const bool will_wrap =
261 num_messages_ * num_threads_ * (1 + write_wrap_count) > queue.QueueSize();
262
263 monotonic_clock::time_point last_monotonic_sent_time =
264 monotonic_clock::epoch();
265 uint64_t initial_i = 0;
266 if (will_wrap) {
267 initial_i = (1 + write_wrap_count) * num_messages_ * num_threads_ -
268 queue.QueueSize();
269 }
270
271 for (uint64_t i = initial_i;
272 i < (1 + write_wrap_count) * num_messages_ * num_threads_; ++i) {
273 ::aos::monotonic_clock::time_point monotonic_sent_time;
274 ::aos::realtime_clock::time_point realtime_sent_time;
Austin Schuhad154822019-12-27 15:45:13 -0800275 ::aos::monotonic_clock::time_point monotonic_remote_time;
276 ::aos::realtime_clock::time_point realtime_remote_time;
277 uint32_t remote_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -0700278 size_t length;
279 char read_data[1024];
280
281 // Handle overflowing the message count for the wrap test.
282 const uint32_t wrapped_i = i % static_cast<size_t>(QueueIndex::MaxIndex(
283 0xffffffffu, queue.QueueSize()));
284 LocklessQueue::ReadResult read_result =
285 queue.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
Austin Schuhad154822019-12-27 15:45:13 -0800286 &monotonic_remote_time, &realtime_remote_time,
287 &remote_queue_index, &length, &(read_data[0]));
Austin Schuh20b2b082019-09-11 20:42:56 -0700288
289 if (race_reads) {
290 if (read_result == LocklessQueue::ReadResult::NOTHING_NEW) {
291 --i;
292 continue;
293 }
294 }
295
296 if (race_reads && will_wrap) {
297 if (read_result == LocklessQueue::ReadResult::TOO_OLD) {
298 continue;
299 }
300 }
301 // Every message should be good.
302 ASSERT_EQ(read_result, LocklessQueue::ReadResult::GOOD) << ": i is " << i;
303
304 // And, confirm that time never went backwards.
305 ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
306 last_monotonic_sent_time = monotonic_sent_time;
307
Austin Schuhad154822019-12-27 15:45:13 -0800308 EXPECT_EQ(monotonic_remote_time, aos::monotonic_clock::min_time);
309 EXPECT_EQ(realtime_remote_time, aos::realtime_clock::min_time);
310
Austin Schuh20b2b082019-09-11 20:42:56 -0700311 ThreadPlusCount tpc;
312 ASSERT_EQ(length, sizeof(ThreadPlusCount));
Austin Schuh67420a42019-12-21 21:55:04 -0800313 memcpy(&tpc, read_data + queue.message_data_size() - length,
314 sizeof(ThreadPlusCount));
Austin Schuh20b2b082019-09-11 20:42:56 -0700315
316 if (will_wrap) {
317 // The queue won't chang out from under us, so we should get some amount
318 // of the tail end of the messages from a a thread.
319 // Confirm that once we get our first message, they all show up.
320 if ((*threads)[tpc.thread].event_count ==
321 ::std::numeric_limits<uint64_t>::max()) {
322 (*threads)[tpc.thread].event_count = tpc.count;
323 }
324
325 if (race_reads) {
326 // Make sure nothing goes backwards. Really not much we can do here.
Brian Silverman177567e2020-08-12 19:51:33 -0700327 ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count)
328 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700329 (*threads)[tpc.thread].event_count = tpc.count;
330 } else {
331 // Make sure nothing goes backwards. Really not much we can do here.
Brian Silverman177567e2020-08-12 19:51:33 -0700332 ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
333 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700334 }
335 } else {
336 // Confirm that we see every message counter from every thread.
Brian Silverman177567e2020-08-12 19:51:33 -0700337 ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
338 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700339 }
340 ++(*threads)[tpc.thread].event_count;
341 }
342}
343
344} // namespace ipc_lib
345} // namespace aos