blob: fcc8668bec2577fed5ecffd36b6d7ce43c2a65c9 [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#include "aos/ipc_lib/queue_racer.h"
2
3#include <inttypes.h>
4#include <string.h>
5#include <limits>
6
7#include "aos/event.h"
8#include "gtest/gtest.h"
9
10namespace aos {
11namespace ipc_lib {
12namespace {
13
14struct ThreadPlusCount {
15 int thread;
16 uint64_t count;
17};
18
19} // namespace
20
21struct ThreadState {
22 ::std::thread thread;
23 Event ready;
24 uint64_t event_count = ::std::numeric_limits<uint64_t>::max();
25};
26
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070027QueueRacer::QueueRacer(LocklessQueue queue, int num_threads,
28 uint64_t num_messages)
29 : queue_(queue), num_threads_(num_threads), num_messages_(num_messages) {
Austin Schuh20b2b082019-09-11 20:42:56 -070030 Reset();
31}
32
33void QueueRacer::RunIteration(bool race_reads, int write_wrap_count) {
34 const bool will_wrap = num_messages_ * num_threads_ *
35 static_cast<uint64_t>(1 + write_wrap_count) >
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070036 queue_.config().queue_size;
Austin Schuh20b2b082019-09-11 20:42:56 -070037
38 // Clear out shmem.
39 Reset();
40 started_writes_ = 0;
41 finished_writes_ = 0;
42
43 // Event used to start all the threads processing at once.
44 Event run;
45
Brian Silvermand05b8192019-12-22 01:06:56 -080046 ::std::atomic<bool> poll_index{true};
Austin Schuh20b2b082019-09-11 20:42:56 -070047
48 // List of threads.
49 ::std::vector<ThreadState> threads(num_threads_);
50
51 ::std::thread queue_index_racer([this, &poll_index]() {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070052 LocklessQueueReader reader(queue_);
Austin Schuh20b2b082019-09-11 20:42:56 -070053
54 // Track the number of times we wrap, and cache the modulo.
55 uint64_t wrap_count = 0;
56 uint32_t last_queue_index = 0;
57 const uint32_t max_queue_index =
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070058 QueueIndex::MaxIndex(0xffffffffu, queue_.config().queue_size);
Austin Schuh20b2b082019-09-11 20:42:56 -070059 while (poll_index) {
60 // We want to read everything backwards. This will give us conservative
61 // bounds. And with enough time and randomness, we will see all the cases
62 // we care to see.
63
64 // These 3 numbers look at the same thing, but at different points of time
65 // in the process. The process (essentially) looks like:
66 //
67 // ++started_writes;
68 // ++latest_queue_index;
69 // ++finished_writes;
70 //
71 // We want to check that latest_queue_index is bounded by the number of
72 // writes started and finished. Basically, we can say that
73 // finished_writes < latest_queue_index always. And
74 // latest_queue_index < started_writes. And everything always increases.
75 // So, if we let more time elapse between sampling finished_writes and
76 // latest_queue_index, we will only be relaxing our bounds, not
77 // invalidating the check. The same goes for started_writes.
78 //
79 // So, grab them in order.
80 const uint64_t finished_writes = finished_writes_.load();
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070081 const QueueIndex latest_queue_index_queue_index = reader.LatestIndex();
Austin Schuh20b2b082019-09-11 20:42:56 -070082 const uint64_t started_writes = started_writes_.load();
83
Alex Perrycb7da4b2019-08-28 19:35:56 -070084 const uint32_t latest_queue_index_uint32_t =
Brian Silvermand05b8192019-12-22 01:06:56 -080085 latest_queue_index_queue_index.index();
Austin Schuh20b2b082019-09-11 20:42:56 -070086 uint64_t latest_queue_index = latest_queue_index_uint32_t;
87
Brian Silvermanfc0d2e82020-08-12 19:58:35 -070088 if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
Austin Schuh20b2b082019-09-11 20:42:56 -070089 // If we got smaller, we wrapped.
90 if (latest_queue_index_uint32_t < last_queue_index) {
91 ++wrap_count;
92 }
93 // And apply it.
94 latest_queue_index +=
95 static_cast<uint64_t>(max_queue_index) * wrap_count;
96 last_queue_index = latest_queue_index_uint32_t;
97 }
98
99 // For grins, check that we have always started more than we finished.
100 // Should never fail.
101 EXPECT_GE(started_writes, finished_writes);
102
103 // If we are at the beginning, the queue needs to always return empty.
104 if (started_writes == 0) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700105 EXPECT_EQ(latest_queue_index_queue_index, QueueIndex::Invalid());
Austin Schuh20b2b082019-09-11 20:42:56 -0700106 EXPECT_EQ(finished_writes, 0);
107 } else {
108 if (finished_writes == 0) {
Brian Silvermand05b8192019-12-22 01:06:56 -0800109 // Plausible to be at the beginning, in which case we don't have
110 // anything to check.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700111 if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
Brian Silvermand05b8192019-12-22 01:06:56 -0800112 // Otherwise, we have started. The queue can't have any more
113 // entries than this.
Austin Schuh20b2b082019-09-11 20:42:56 -0700114 EXPECT_GE(started_writes, latest_queue_index + 1);
115 }
116 } else {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700117 EXPECT_NE(latest_queue_index_queue_index, QueueIndex::Invalid());
Austin Schuh20b2b082019-09-11 20:42:56 -0700118 // latest_queue_index is an index, not a count. So it always reads 1
119 // low.
120 EXPECT_GE(latest_queue_index + 1, finished_writes);
121 }
122 }
123 }
124 });
125
126 // Build up each thread and kick it off.
127 int thread_index = 0;
128 for (ThreadState &t : threads) {
129 if (will_wrap) {
130 t.event_count = ::std::numeric_limits<uint64_t>::max();
131 } else {
132 t.event_count = 0;
133 }
Brian Silverman177567e2020-08-12 19:51:33 -0700134 t.thread = ::std::thread([this, &t, thread_index, &run,
135 write_wrap_count]() {
136 // Build up a sender.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700137 LocklessQueueSender sender = LocklessQueueSender::Make(queue_).value();
Brian Silverman177567e2020-08-12 19:51:33 -0700138 CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
Austin Schuh20b2b082019-09-11 20:42:56 -0700139
Brian Silverman177567e2020-08-12 19:51:33 -0700140 // Signal that we are ready to start sending.
141 t.ready.Set();
Austin Schuh20b2b082019-09-11 20:42:56 -0700142
Brian Silverman177567e2020-08-12 19:51:33 -0700143 // Wait until signaled to start running.
144 run.Wait();
Austin Schuh20b2b082019-09-11 20:42:56 -0700145
Brian Silverman177567e2020-08-12 19:51:33 -0700146 // Gogogo!
147 for (uint64_t i = 0;
148 i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count);
149 ++i) {
150 char *const data = static_cast<char *>(sender.Data()) + sender.size() -
151 sizeof(ThreadPlusCount);
152 const char fill = (i + 55) & 0xFF;
153 memset(data, fill, sizeof(ThreadPlusCount));
154 {
155 bool found_nonzero = false;
156 for (size_t i = 0; i < sizeof(ThreadPlusCount); ++i) {
157 if (data[i] != fill) {
158 found_nonzero = true;
Austin Schuh20b2b082019-09-11 20:42:56 -0700159 }
Austin Schuh20b2b082019-09-11 20:42:56 -0700160 }
Brian Silverman177567e2020-08-12 19:51:33 -0700161 CHECK(!found_nonzero) << ": Somebody else is writing to our buffer";
162 }
163
164 ThreadPlusCount tpc;
165 tpc.thread = thread_index;
166 tpc.count = i;
167
168 memcpy(data, &tpc, sizeof(ThreadPlusCount));
169
170 if (i % 0x800000 == 0x100000) {
171 fprintf(
172 stderr, "Sent %" PRIu64 ", %f %%\n", i,
173 static_cast<double>(i) /
174 static_cast<double>(num_messages_ * (1 + write_wrap_count)) *
175 100.0);
176 }
177
178 ++started_writes_;
179 sender.Send(sizeof(ThreadPlusCount));
180 // Blank out the new scratch buffer, to catch other people using it.
181 {
182 char *const new_data = static_cast<char *>(sender.Data()) +
183 sender.size() - sizeof(ThreadPlusCount);
184 const char new_fill = ~fill;
185 memset(new_data, new_fill, sizeof(ThreadPlusCount));
186 }
187 ++finished_writes_;
188 }
189 });
Austin Schuh20b2b082019-09-11 20:42:56 -0700190 ++thread_index;
191 }
192
193 // Wait until all the threads are ready.
194 for (ThreadState &t : threads) {
195 t.ready.Wait();
196 }
197
198 // And start them racing.
199 run.Set();
200
201 // Let all the threads finish before reading if we are supposed to not be
202 // racing reads.
203 if (!race_reads) {
204 for (ThreadState &t : threads) {
205 t.thread.join();
206 }
207 poll_index = false;
208 queue_index_racer.join();
209 }
210
211 CheckReads(race_reads, write_wrap_count, &threads);
212
213 // Reap all the threads.
214 if (race_reads) {
215 for (ThreadState &t : threads) {
216 t.thread.join();
217 }
218 poll_index = false;
219 queue_index_racer.join();
220 }
221
222 // Confirm that the number of writes matches the expected number of writes.
223 ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
224 started_writes_);
225 ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
226 finished_writes_);
227
228 // And that every thread sent the right number of messages.
229 for (ThreadState &t : threads) {
230 if (will_wrap) {
231 if (!race_reads) {
232 // If we are wrapping, there is a possibility that a thread writes
233 // everything *before* we can read any of it, and it all gets
234 // overwritten.
235 ASSERT_TRUE(t.event_count == ::std::numeric_limits<uint64_t>::max() ||
236 t.event_count == (1 + write_wrap_count) * num_messages_)
237 << ": Got " << t.event_count << " events, expected "
238 << (1 + write_wrap_count) * num_messages_;
239 }
240 } else {
241 ASSERT_EQ(t.event_count, num_messages_);
242 }
243 }
244}
245
246void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
247 ::std::vector<ThreadState> *threads) {
248 // Now read back the results to double check.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700249 LocklessQueueReader reader(queue_);
250 const bool will_wrap = num_messages_ * num_threads_ * (1 + write_wrap_count) >
251 LocklessQueueSize(queue_.memory());
Austin Schuh20b2b082019-09-11 20:42:56 -0700252
253 monotonic_clock::time_point last_monotonic_sent_time =
254 monotonic_clock::epoch();
255 uint64_t initial_i = 0;
256 if (will_wrap) {
257 initial_i = (1 + write_wrap_count) * num_messages_ * num_threads_ -
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700258 LocklessQueueSize(queue_.memory());
Austin Schuh20b2b082019-09-11 20:42:56 -0700259 }
260
261 for (uint64_t i = initial_i;
262 i < (1 + write_wrap_count) * num_messages_ * num_threads_; ++i) {
263 ::aos::monotonic_clock::time_point monotonic_sent_time;
264 ::aos::realtime_clock::time_point realtime_sent_time;
Austin Schuhad154822019-12-27 15:45:13 -0800265 ::aos::monotonic_clock::time_point monotonic_remote_time;
266 ::aos::realtime_clock::time_point realtime_remote_time;
267 uint32_t remote_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -0700268 size_t length;
269 char read_data[1024];
270
271 // Handle overflowing the message count for the wrap test.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700272 const uint32_t wrapped_i =
273 i % static_cast<size_t>(QueueIndex::MaxIndex(
274 0xffffffffu, LocklessQueueSize(queue_.memory())));
275 LocklessQueueReader::Result read_result =
276 reader.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
277 &monotonic_remote_time, &realtime_remote_time,
278 &remote_queue_index, &length, &(read_data[0]));
Austin Schuh20b2b082019-09-11 20:42:56 -0700279
280 if (race_reads) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700281 if (read_result == LocklessQueueReader::Result::NOTHING_NEW) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700282 --i;
283 continue;
284 }
285 }
286
287 if (race_reads && will_wrap) {
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700288 if (read_result == LocklessQueueReader::Result::TOO_OLD) {
Austin Schuh20b2b082019-09-11 20:42:56 -0700289 continue;
290 }
291 }
292 // Every message should be good.
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700293 ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD) << ": i is " << i;
Austin Schuh20b2b082019-09-11 20:42:56 -0700294
295 // And, confirm that time never went backwards.
296 ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
297 last_monotonic_sent_time = monotonic_sent_time;
298
Austin Schuhad154822019-12-27 15:45:13 -0800299 EXPECT_EQ(monotonic_remote_time, aos::monotonic_clock::min_time);
300 EXPECT_EQ(realtime_remote_time, aos::realtime_clock::min_time);
301
Austin Schuh20b2b082019-09-11 20:42:56 -0700302 ThreadPlusCount tpc;
303 ASSERT_EQ(length, sizeof(ThreadPlusCount));
Brian Silvermanfc0d2e82020-08-12 19:58:35 -0700304 memcpy(&tpc,
305 read_data + LocklessQueueMessageDataSize(queue_.memory()) - length,
Austin Schuh67420a42019-12-21 21:55:04 -0800306 sizeof(ThreadPlusCount));
Austin Schuh20b2b082019-09-11 20:42:56 -0700307
308 if (will_wrap) {
309 // The queue won't chang out from under us, so we should get some amount
310 // of the tail end of the messages from a a thread.
311 // Confirm that once we get our first message, they all show up.
312 if ((*threads)[tpc.thread].event_count ==
313 ::std::numeric_limits<uint64_t>::max()) {
314 (*threads)[tpc.thread].event_count = tpc.count;
315 }
316
317 if (race_reads) {
318 // Make sure nothing goes backwards. Really not much we can do here.
Brian Silverman177567e2020-08-12 19:51:33 -0700319 ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count)
320 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700321 (*threads)[tpc.thread].event_count = tpc.count;
322 } else {
323 // Make sure nothing goes backwards. Really not much we can do here.
Brian Silverman177567e2020-08-12 19:51:33 -0700324 ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
325 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700326 }
327 } else {
328 // Confirm that we see every message counter from every thread.
Brian Silverman177567e2020-08-12 19:51:33 -0700329 ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
330 << ": Thread " << tpc.thread;
Austin Schuh20b2b082019-09-11 20:42:56 -0700331 }
332 ++(*threads)[tpc.thread].event_count;
333 }
334}
335
336} // namespace ipc_lib
337} // namespace aos