blob: 53490a0fbe1c08211e2dc7f0fc88967031169391 [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#include "aos/ipc_lib/queue_racer.h"
2
3#include <inttypes.h>
4#include <string.h>
5#include <limits>
6
7#include "aos/event.h"
8#include "gtest/gtest.h"
9
10namespace aos {
11namespace ipc_lib {
12namespace {
13
14struct ThreadPlusCount {
15 int thread;
16 uint64_t count;
17};
18
19} // namespace
20
21struct ThreadState {
22 ::std::thread thread;
23 Event ready;
24 uint64_t event_count = ::std::numeric_limits<uint64_t>::max();
25};
26
27QueueRacer::QueueRacer(LocklessQueueMemory *memory, int num_threads,
28 uint64_t num_messages, LocklessQueueConfiguration config)
29 : memory_(memory),
30 num_threads_(num_threads),
31 num_messages_(num_messages),
32 config_(config) {
33 Reset();
34}
35
36void QueueRacer::RunIteration(bool race_reads, int write_wrap_count) {
37 const bool will_wrap = num_messages_ * num_threads_ *
38 static_cast<uint64_t>(1 + write_wrap_count) >
39 config_.queue_size;
40
41 // Clear out shmem.
42 Reset();
43 started_writes_ = 0;
44 finished_writes_ = 0;
45
46 // Event used to start all the threads processing at once.
47 Event run;
48
49 ::std::atomic<bool> poll_index;
50 poll_index = true;
51
52 // List of threads.
53 ::std::vector<ThreadState> threads(num_threads_);
54
55 ::std::thread queue_index_racer([this, &poll_index]() {
56 LocklessQueue queue(memory_, config_);
57
58 // Track the number of times we wrap, and cache the modulo.
59 uint64_t wrap_count = 0;
60 uint32_t last_queue_index = 0;
61 const uint32_t max_queue_index =
62 QueueIndex::MaxIndex(0xffffffffu, queue.QueueSize());
63 while (poll_index) {
64 // We want to read everything backwards. This will give us conservative
65 // bounds. And with enough time and randomness, we will see all the cases
66 // we care to see.
67
68 // These 3 numbers look at the same thing, but at different points of time
69 // in the process. The process (essentially) looks like:
70 //
71 // ++started_writes;
72 // ++latest_queue_index;
73 // ++finished_writes;
74 //
75 // We want to check that latest_queue_index is bounded by the number of
76 // writes started and finished. Basically, we can say that
77 // finished_writes < latest_queue_index always. And
78 // latest_queue_index < started_writes. And everything always increases.
79 // So, if we let more time elapse between sampling finished_writes and
80 // latest_queue_index, we will only be relaxing our bounds, not
81 // invalidating the check. The same goes for started_writes.
82 //
83 // So, grab them in order.
84 const uint64_t finished_writes = finished_writes_.load();
85 const uint32_t latest_queue_index_uint32_t = queue.LatestQueueIndex();
86 const uint64_t started_writes = started_writes_.load();
87
88 uint64_t latest_queue_index = latest_queue_index_uint32_t;
89
90 if (latest_queue_index_uint32_t != LocklessQueue::empty_queue_index()) {
91 // If we got smaller, we wrapped.
92 if (latest_queue_index_uint32_t < last_queue_index) {
93 ++wrap_count;
94 }
95 // And apply it.
96 latest_queue_index +=
97 static_cast<uint64_t>(max_queue_index) * wrap_count;
98 last_queue_index = latest_queue_index_uint32_t;
99 }
100
101 // For grins, check that we have always started more than we finished.
102 // Should never fail.
103 EXPECT_GE(started_writes, finished_writes);
104
105 // If we are at the beginning, the queue needs to always return empty.
106 if (started_writes == 0) {
107 EXPECT_EQ(latest_queue_index_uint32_t,
108 LocklessQueue::empty_queue_index());
109 EXPECT_EQ(finished_writes, 0);
110 } else {
111 if (finished_writes == 0) {
112 // Plausible to be at the beginning.
113 if (latest_queue_index_uint32_t !=
114 LocklessQueue::empty_queue_index()) {
115 // Otherwise, we have started. The queue is always allowed to
116 EXPECT_GE(started_writes, latest_queue_index + 1);
117 }
118 } else {
119 EXPECT_NE(latest_queue_index_uint32_t,
120 LocklessQueue::empty_queue_index());
121 // latest_queue_index is an index, not a count. So it always reads 1
122 // low.
123 EXPECT_GE(latest_queue_index + 1, finished_writes);
124 }
125 }
126 }
127 });
128
129 // Build up each thread and kick it off.
130 int thread_index = 0;
131 for (ThreadState &t : threads) {
132 if (will_wrap) {
133 t.event_count = ::std::numeric_limits<uint64_t>::max();
134 } else {
135 t.event_count = 0;
136 }
137 t.thread =
138 ::std::thread([this, &t, thread_index, &run, write_wrap_count]() {
139 // Build up a sender.
140 LocklessQueue queue(memory_, config_);
141 LocklessQueue::Sender sender = queue.MakeSender();
142
143 // Signal that we are ready to start sending.
144 t.ready.Set();
145
146 // Wait until signaled to start running.
147 run.Wait();
148
149 // Gogogo!
150 for (uint64_t i = 0; i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count); ++i) {
151 char data[sizeof(ThreadPlusCount)];
152 ThreadPlusCount tpc;
153 tpc.thread = thread_index;
154 tpc.count = i;
155
156 memcpy(data, &tpc, sizeof(ThreadPlusCount));
157
158 if (i % 0x800000 == 0x100000) {
159 fprintf(stderr, "Sent %" PRIu64 ", %f %%\n", i,
160 static_cast<double>(i) /
161 static_cast<double>(num_messages_ *
162 (1 + write_wrap_count)) *
163 100.0);
164 }
165
166 ++started_writes_;
167 sender.Send(data, sizeof(ThreadPlusCount));
168 ++finished_writes_;
169 }
170 });
171 ++thread_index;
172 }
173
174 // Wait until all the threads are ready.
175 for (ThreadState &t : threads) {
176 t.ready.Wait();
177 }
178
179 // And start them racing.
180 run.Set();
181
182 // Let all the threads finish before reading if we are supposed to not be
183 // racing reads.
184 if (!race_reads) {
185 for (ThreadState &t : threads) {
186 t.thread.join();
187 }
188 poll_index = false;
189 queue_index_racer.join();
190 }
191
192 CheckReads(race_reads, write_wrap_count, &threads);
193
194 // Reap all the threads.
195 if (race_reads) {
196 for (ThreadState &t : threads) {
197 t.thread.join();
198 }
199 poll_index = false;
200 queue_index_racer.join();
201 }
202
203 // Confirm that the number of writes matches the expected number of writes.
204 ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
205 started_writes_);
206 ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
207 finished_writes_);
208
209 // And that every thread sent the right number of messages.
210 for (ThreadState &t : threads) {
211 if (will_wrap) {
212 if (!race_reads) {
213 // If we are wrapping, there is a possibility that a thread writes
214 // everything *before* we can read any of it, and it all gets
215 // overwritten.
216 ASSERT_TRUE(t.event_count == ::std::numeric_limits<uint64_t>::max() ||
217 t.event_count == (1 + write_wrap_count) * num_messages_)
218 << ": Got " << t.event_count << " events, expected "
219 << (1 + write_wrap_count) * num_messages_;
220 }
221 } else {
222 ASSERT_EQ(t.event_count, num_messages_);
223 }
224 }
225}
226
227void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
228 ::std::vector<ThreadState> *threads) {
229 // Now read back the results to double check.
230 LocklessQueue queue(memory_, config_);
231
232 const bool will_wrap =
233 num_messages_ * num_threads_ * (1 + write_wrap_count) > queue.QueueSize();
234
235 monotonic_clock::time_point last_monotonic_sent_time =
236 monotonic_clock::epoch();
237 uint64_t initial_i = 0;
238 if (will_wrap) {
239 initial_i = (1 + write_wrap_count) * num_messages_ * num_threads_ -
240 queue.QueueSize();
241 }
242
243 for (uint64_t i = initial_i;
244 i < (1 + write_wrap_count) * num_messages_ * num_threads_; ++i) {
245 ::aos::monotonic_clock::time_point monotonic_sent_time;
246 ::aos::realtime_clock::time_point realtime_sent_time;
247 size_t length;
248 char read_data[1024];
249
250 // Handle overflowing the message count for the wrap test.
251 const uint32_t wrapped_i = i % static_cast<size_t>(QueueIndex::MaxIndex(
252 0xffffffffu, queue.QueueSize()));
253 LocklessQueue::ReadResult read_result =
254 queue.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
255 &length, &(read_data[0]));
256
257 if (race_reads) {
258 if (read_result == LocklessQueue::ReadResult::NOTHING_NEW) {
259 --i;
260 continue;
261 }
262 }
263
264 if (race_reads && will_wrap) {
265 if (read_result == LocklessQueue::ReadResult::TOO_OLD) {
266 continue;
267 }
268 }
269 // Every message should be good.
270 ASSERT_EQ(read_result, LocklessQueue::ReadResult::GOOD) << ": i is " << i;
271
272 // And, confirm that time never went backwards.
273 ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
274 last_monotonic_sent_time = monotonic_sent_time;
275
276 ThreadPlusCount tpc;
277 ASSERT_EQ(length, sizeof(ThreadPlusCount));
278 memcpy(&tpc, read_data, sizeof(ThreadPlusCount));
279
280 if (will_wrap) {
281 // The queue won't chang out from under us, so we should get some amount
282 // of the tail end of the messages from a a thread.
283 // Confirm that once we get our first message, they all show up.
284 if ((*threads)[tpc.thread].event_count ==
285 ::std::numeric_limits<uint64_t>::max()) {
286 (*threads)[tpc.thread].event_count = tpc.count;
287 }
288
289 if (race_reads) {
290 // Make sure nothing goes backwards. Really not much we can do here.
291 ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
292 << tpc.thread;
293 (*threads)[tpc.thread].event_count = tpc.count;
294 } else {
295 // Make sure nothing goes backwards. Really not much we can do here.
296 ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
297 << tpc.thread;
298 }
299 } else {
300 // Confirm that we see every message counter from every thread.
301 ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
302 << tpc.thread;
303 }
304 ++(*threads)[tpc.thread].event_count;
305 }
306}
307
308} // namespace ipc_lib
309} // namespace aos