Add LocklessQueue::Pinner class
This will allow reading messages from queues without copying, which is
helpful for speeding up the processing of images.
Change-Id: Ia4bb98afa6fe1c1b5cc186e3071c7458f143d77d
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index f5b3d7e..69f5f21 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -139,42 +139,63 @@
} else {
t.event_count = 0;
}
- t.thread =
- ::std::thread([this, &t, thread_index, &run, write_wrap_count]() {
- // Build up a sender.
- LocklessQueue queue(memory_, config_);
- LocklessQueue::Sender sender = queue.MakeSender().value();
+ t.thread = ::std::thread([this, &t, thread_index, &run,
+ write_wrap_count]() {
+ // Build up a sender.
+ LocklessQueue queue(memory_, config_);
+ LocklessQueue::Sender sender = queue.MakeSender().value();
+ CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
- // Signal that we are ready to start sending.
- t.ready.Set();
+ // Signal that we are ready to start sending.
+ t.ready.Set();
- // Wait until signaled to start running.
- run.Wait();
+ // Wait until signaled to start running.
+ run.Wait();
- // Gogogo!
- for (uint64_t i = 0;
- i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count);
- ++i) {
- char data[sizeof(ThreadPlusCount)];
- ThreadPlusCount tpc;
- tpc.thread = thread_index;
- tpc.count = i;
-
- memcpy(data, &tpc, sizeof(ThreadPlusCount));
-
- if (i % 0x800000 == 0x100000) {
- fprintf(stderr, "Sent %" PRIu64 ", %f %%\n", i,
- static_cast<double>(i) /
- static_cast<double>(num_messages_ *
- (1 + write_wrap_count)) *
- 100.0);
+ // Gogogo!
+ for (uint64_t i = 0;
+ i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count);
+ ++i) {
+ char *const data = static_cast<char *>(sender.Data()) + sender.size() -
+ sizeof(ThreadPlusCount);
+ const char fill = (i + 55) & 0xFF;
+ memset(data, fill, sizeof(ThreadPlusCount));
+ {
+ bool found_nonzero = false;
+ for (size_t i = 0; i < sizeof(ThreadPlusCount); ++i) {
+ if (data[i] != fill) {
+ found_nonzero = true;
}
-
- ++started_writes_;
- sender.Send(data, sizeof(ThreadPlusCount));
- ++finished_writes_;
}
- });
+ CHECK(!found_nonzero) << ": Somebody else is writing to our buffer";
+ }
+
+ ThreadPlusCount tpc;
+ tpc.thread = thread_index;
+ tpc.count = i;
+
+ memcpy(data, &tpc, sizeof(ThreadPlusCount));
+
+ if (i % 0x800000 == 0x100000) {
+ fprintf(
+ stderr, "Sent %" PRIu64 ", %f %%\n", i,
+ static_cast<double>(i) /
+ static_cast<double>(num_messages_ * (1 + write_wrap_count)) *
+ 100.0);
+ }
+
+ ++started_writes_;
+ sender.Send(sizeof(ThreadPlusCount));
+ // Blank out the new scratch buffer, to catch other people using it.
+ {
+ char *const new_data = static_cast<char *>(sender.Data()) +
+ sender.size() - sizeof(ThreadPlusCount);
+ const char new_fill = ~fill;
+ memset(new_data, new_fill, sizeof(ThreadPlusCount));
+ }
+ ++finished_writes_;
+ }
+ });
++thread_index;
}
@@ -303,18 +324,18 @@
if (race_reads) {
// Make sure nothing goes backwards. Really not much we can do here.
- ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
- << tpc.thread;
+ ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count)
+ << ": Thread " << tpc.thread;
(*threads)[tpc.thread].event_count = tpc.count;
} else {
// Make sure nothing goes backwards. Really not much we can do here.
- ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
- << tpc.thread;
+ ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
+ << ": Thread " << tpc.thread;
}
} else {
// Confirm that we see every message counter from every thread.
- ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
- << tpc.thread;
+ ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
+ << ": Thread " << tpc.thread;
}
++(*threads)[tpc.thread].event_count;
}