blob: bb754d810745b4139e638e9a546e16a4c1c3a6a6 [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#include "aos/ipc_lib/queue_racer.h"
2
3#include <inttypes.h>
4#include <string.h>
5#include <limits>
6
7#include "aos/event.h"
8#include "gtest/gtest.h"
9
10namespace aos {
11namespace ipc_lib {
12namespace {
13
14struct ThreadPlusCount {
15 int thread;
16 uint64_t count;
17};
18
19} // namespace
20
21struct ThreadState {
22 ::std::thread thread;
23 Event ready;
24 uint64_t event_count = ::std::numeric_limits<uint64_t>::max();
25};
26
27QueueRacer::QueueRacer(LocklessQueueMemory *memory, int num_threads,
28 uint64_t num_messages, LocklessQueueConfiguration config)
29 : memory_(memory),
30 num_threads_(num_threads),
31 num_messages_(num_messages),
32 config_(config) {
33 Reset();
34}
35
36void QueueRacer::RunIteration(bool race_reads, int write_wrap_count) {
37 const bool will_wrap = num_messages_ * num_threads_ *
38 static_cast<uint64_t>(1 + write_wrap_count) >
39 config_.queue_size;
40
41 // Clear out shmem.
42 Reset();
43 started_writes_ = 0;
44 finished_writes_ = 0;
45
46 // Event used to start all the threads processing at once.
47 Event run;
48
49 ::std::atomic<bool> poll_index;
50 poll_index = true;
51
52 // List of threads.
53 ::std::vector<ThreadState> threads(num_threads_);
54
55 ::std::thread queue_index_racer([this, &poll_index]() {
56 LocklessQueue queue(memory_, config_);
57
58 // Track the number of times we wrap, and cache the modulo.
59 uint64_t wrap_count = 0;
60 uint32_t last_queue_index = 0;
61 const uint32_t max_queue_index =
62 QueueIndex::MaxIndex(0xffffffffu, queue.QueueSize());
63 while (poll_index) {
64 // We want to read everything backwards. This will give us conservative
65 // bounds. And with enough time and randomness, we will see all the cases
66 // we care to see.
67
68 // These 3 numbers look at the same thing, but at different points of time
69 // in the process. The process (essentially) looks like:
70 //
71 // ++started_writes;
72 // ++latest_queue_index;
73 // ++finished_writes;
74 //
75 // We want to check that latest_queue_index is bounded by the number of
76 // writes started and finished. Basically, we can say that
77 // finished_writes < latest_queue_index always. And
78 // latest_queue_index < started_writes. And everything always increases.
79 // So, if we let more time elapse between sampling finished_writes and
80 // latest_queue_index, we will only be relaxing our bounds, not
81 // invalidating the check. The same goes for started_writes.
82 //
83 // So, grab them in order.
84 const uint64_t finished_writes = finished_writes_.load();
Alex Perrycb7da4b2019-08-28 19:35:56 -070085 const QueueIndex latest_queue_index_queue_index =
86 queue.LatestQueueIndex();
Austin Schuh20b2b082019-09-11 20:42:56 -070087 const uint64_t started_writes = started_writes_.load();
88
Alex Perrycb7da4b2019-08-28 19:35:56 -070089 const uint32_t latest_queue_index_uint32_t =
90 queue.LatestQueueIndex().index();
Austin Schuh20b2b082019-09-11 20:42:56 -070091 uint64_t latest_queue_index = latest_queue_index_uint32_t;
92
Alex Perrycb7da4b2019-08-28 19:35:56 -070093 if (latest_queue_index_queue_index != LocklessQueue::empty_queue_index()) {
Austin Schuh20b2b082019-09-11 20:42:56 -070094 // If we got smaller, we wrapped.
95 if (latest_queue_index_uint32_t < last_queue_index) {
96 ++wrap_count;
97 }
98 // And apply it.
99 latest_queue_index +=
100 static_cast<uint64_t>(max_queue_index) * wrap_count;
101 last_queue_index = latest_queue_index_uint32_t;
102 }
103
104 // For grins, check that we have always started more than we finished.
105 // Should never fail.
106 EXPECT_GE(started_writes, finished_writes);
107
108 // If we are at the beginning, the queue needs to always return empty.
109 if (started_writes == 0) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700110 EXPECT_EQ(latest_queue_index_queue_index,
Austin Schuh20b2b082019-09-11 20:42:56 -0700111 LocklessQueue::empty_queue_index());
112 EXPECT_EQ(finished_writes, 0);
113 } else {
114 if (finished_writes == 0) {
115 // Plausible to be at the beginning.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700116 if (latest_queue_index_queue_index !=
Austin Schuh20b2b082019-09-11 20:42:56 -0700117 LocklessQueue::empty_queue_index()) {
118 // Otherwise, we have started. The queue is always allowed to
119 EXPECT_GE(started_writes, latest_queue_index + 1);
120 }
121 } else {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700122 EXPECT_NE(latest_queue_index_queue_index,
Austin Schuh20b2b082019-09-11 20:42:56 -0700123 LocklessQueue::empty_queue_index());
124 // latest_queue_index is an index, not a count. So it always reads 1
125 // low.
126 EXPECT_GE(latest_queue_index + 1, finished_writes);
127 }
128 }
129 }
130 });
131
132 // Build up each thread and kick it off.
133 int thread_index = 0;
134 for (ThreadState &t : threads) {
135 if (will_wrap) {
136 t.event_count = ::std::numeric_limits<uint64_t>::max();
137 } else {
138 t.event_count = 0;
139 }
140 t.thread =
141 ::std::thread([this, &t, thread_index, &run, write_wrap_count]() {
142 // Build up a sender.
143 LocklessQueue queue(memory_, config_);
144 LocklessQueue::Sender sender = queue.MakeSender();
145
146 // Signal that we are ready to start sending.
147 t.ready.Set();
148
149 // Wait until signaled to start running.
150 run.Wait();
151
152 // Gogogo!
153 for (uint64_t i = 0; i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count); ++i) {
154 char data[sizeof(ThreadPlusCount)];
155 ThreadPlusCount tpc;
156 tpc.thread = thread_index;
157 tpc.count = i;
158
159 memcpy(data, &tpc, sizeof(ThreadPlusCount));
160
161 if (i % 0x800000 == 0x100000) {
162 fprintf(stderr, "Sent %" PRIu64 ", %f %%\n", i,
163 static_cast<double>(i) /
164 static_cast<double>(num_messages_ *
165 (1 + write_wrap_count)) *
166 100.0);
167 }
168
169 ++started_writes_;
170 sender.Send(data, sizeof(ThreadPlusCount));
171 ++finished_writes_;
172 }
173 });
174 ++thread_index;
175 }
176
177 // Wait until all the threads are ready.
178 for (ThreadState &t : threads) {
179 t.ready.Wait();
180 }
181
182 // And start them racing.
183 run.Set();
184
185 // Let all the threads finish before reading if we are supposed to not be
186 // racing reads.
187 if (!race_reads) {
188 for (ThreadState &t : threads) {
189 t.thread.join();
190 }
191 poll_index = false;
192 queue_index_racer.join();
193 }
194
195 CheckReads(race_reads, write_wrap_count, &threads);
196
197 // Reap all the threads.
198 if (race_reads) {
199 for (ThreadState &t : threads) {
200 t.thread.join();
201 }
202 poll_index = false;
203 queue_index_racer.join();
204 }
205
206 // Confirm that the number of writes matches the expected number of writes.
207 ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
208 started_writes_);
209 ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
210 finished_writes_);
211
212 // And that every thread sent the right number of messages.
213 for (ThreadState &t : threads) {
214 if (will_wrap) {
215 if (!race_reads) {
216 // If we are wrapping, there is a possibility that a thread writes
217 // everything *before* we can read any of it, and it all gets
218 // overwritten.
219 ASSERT_TRUE(t.event_count == ::std::numeric_limits<uint64_t>::max() ||
220 t.event_count == (1 + write_wrap_count) * num_messages_)
221 << ": Got " << t.event_count << " events, expected "
222 << (1 + write_wrap_count) * num_messages_;
223 }
224 } else {
225 ASSERT_EQ(t.event_count, num_messages_);
226 }
227 }
228}
229
230void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
231 ::std::vector<ThreadState> *threads) {
232 // Now read back the results to double check.
233 LocklessQueue queue(memory_, config_);
234
235 const bool will_wrap =
236 num_messages_ * num_threads_ * (1 + write_wrap_count) > queue.QueueSize();
237
238 monotonic_clock::time_point last_monotonic_sent_time =
239 monotonic_clock::epoch();
240 uint64_t initial_i = 0;
241 if (will_wrap) {
242 initial_i = (1 + write_wrap_count) * num_messages_ * num_threads_ -
243 queue.QueueSize();
244 }
245
246 for (uint64_t i = initial_i;
247 i < (1 + write_wrap_count) * num_messages_ * num_threads_; ++i) {
248 ::aos::monotonic_clock::time_point monotonic_sent_time;
249 ::aos::realtime_clock::time_point realtime_sent_time;
250 size_t length;
251 char read_data[1024];
252
253 // Handle overflowing the message count for the wrap test.
254 const uint32_t wrapped_i = i % static_cast<size_t>(QueueIndex::MaxIndex(
255 0xffffffffu, queue.QueueSize()));
256 LocklessQueue::ReadResult read_result =
257 queue.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
258 &length, &(read_data[0]));
259
260 if (race_reads) {
261 if (read_result == LocklessQueue::ReadResult::NOTHING_NEW) {
262 --i;
263 continue;
264 }
265 }
266
267 if (race_reads && will_wrap) {
268 if (read_result == LocklessQueue::ReadResult::TOO_OLD) {
269 continue;
270 }
271 }
272 // Every message should be good.
273 ASSERT_EQ(read_result, LocklessQueue::ReadResult::GOOD) << ": i is " << i;
274
275 // And, confirm that time never went backwards.
276 ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
277 last_monotonic_sent_time = monotonic_sent_time;
278
279 ThreadPlusCount tpc;
280 ASSERT_EQ(length, sizeof(ThreadPlusCount));
281 memcpy(&tpc, read_data, sizeof(ThreadPlusCount));
282
283 if (will_wrap) {
284 // The queue won't chang out from under us, so we should get some amount
285 // of the tail end of the messages from a a thread.
286 // Confirm that once we get our first message, they all show up.
287 if ((*threads)[tpc.thread].event_count ==
288 ::std::numeric_limits<uint64_t>::max()) {
289 (*threads)[tpc.thread].event_count = tpc.count;
290 }
291
292 if (race_reads) {
293 // Make sure nothing goes backwards. Really not much we can do here.
294 ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
295 << tpc.thread;
296 (*threads)[tpc.thread].event_count = tpc.count;
297 } else {
298 // Make sure nothing goes backwards. Really not much we can do here.
299 ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
300 << tpc.thread;
301 }
302 } else {
303 // Confirm that we see every message counter from every thread.
304 ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
305 << tpc.thread;
306 }
307 ++(*threads)[tpc.thread].event_count;
308 }
309}
310
311} // namespace ipc_lib
312} // namespace aos