Improve const-correctness in the LocklessQueue code
This will allow using a read-only mapping for reading data from queues
in ShmEventLoop. This will make it much harder for code reading from a
queue to accidentally modify it.
This involves splitting up LocklessQueue into individual components.
This makes it much more obvious what state is used where, and allows
adding some consts.
Change-Id: Ic83b0d2169e6dfae3eec656aa8e49852125698d9
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index 69f5f21..fcc8668 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -24,19 +24,16 @@
uint64_t event_count = ::std::numeric_limits<uint64_t>::max();
};
-QueueRacer::QueueRacer(LocklessQueueMemory *memory, int num_threads,
- uint64_t num_messages, LocklessQueueConfiguration config)
- : memory_(memory),
- num_threads_(num_threads),
- num_messages_(num_messages),
- config_(config) {
+QueueRacer::QueueRacer(LocklessQueue queue, int num_threads,
+ uint64_t num_messages)
+ : queue_(queue), num_threads_(num_threads), num_messages_(num_messages) {
Reset();
}
void QueueRacer::RunIteration(bool race_reads, int write_wrap_count) {
const bool will_wrap = num_messages_ * num_threads_ *
static_cast<uint64_t>(1 + write_wrap_count) >
- config_.queue_size;
+ queue_.config().queue_size;
// Clear out shmem.
Reset();
@@ -52,13 +49,13 @@
::std::vector<ThreadState> threads(num_threads_);
::std::thread queue_index_racer([this, &poll_index]() {
- LocklessQueue queue(memory_, config_);
+ LocklessQueueReader reader(queue_);
// Track the number of times we wrap, and cache the modulo.
uint64_t wrap_count = 0;
uint32_t last_queue_index = 0;
const uint32_t max_queue_index =
- QueueIndex::MaxIndex(0xffffffffu, queue.QueueSize());
+ QueueIndex::MaxIndex(0xffffffffu, queue_.config().queue_size);
while (poll_index) {
// We want to read everything backwards. This will give us conservative
// bounds. And with enough time and randomness, we will see all the cases
@@ -81,16 +78,14 @@
//
// So, grab them in order.
const uint64_t finished_writes = finished_writes_.load();
- const QueueIndex latest_queue_index_queue_index =
- queue.LatestQueueIndex();
+ const QueueIndex latest_queue_index_queue_index = reader.LatestIndex();
const uint64_t started_writes = started_writes_.load();
const uint32_t latest_queue_index_uint32_t =
latest_queue_index_queue_index.index();
uint64_t latest_queue_index = latest_queue_index_uint32_t;
- if (latest_queue_index_queue_index !=
- LocklessQueue::empty_queue_index()) {
+ if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
// If we got smaller, we wrapped.
if (latest_queue_index_uint32_t < last_queue_index) {
++wrap_count;
@@ -107,22 +102,19 @@
// If we are at the beginning, the queue needs to always return empty.
if (started_writes == 0) {
- EXPECT_EQ(latest_queue_index_queue_index,
- LocklessQueue::empty_queue_index());
+ EXPECT_EQ(latest_queue_index_queue_index, QueueIndex::Invalid());
EXPECT_EQ(finished_writes, 0);
} else {
if (finished_writes == 0) {
// Plausible to be at the beginning, in which case we don't have
// anything to check.
- if (latest_queue_index_queue_index !=
- LocklessQueue::empty_queue_index()) {
+ if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
// Otherwise, we have started. The queue can't have any more
// entries than this.
EXPECT_GE(started_writes, latest_queue_index + 1);
}
} else {
- EXPECT_NE(latest_queue_index_queue_index,
- LocklessQueue::empty_queue_index());
+ EXPECT_NE(latest_queue_index_queue_index, QueueIndex::Invalid());
// latest_queue_index is an index, not a count. So it always reads 1
// low.
EXPECT_GE(latest_queue_index + 1, finished_writes);
@@ -142,8 +134,7 @@
t.thread = ::std::thread([this, &t, thread_index, &run,
write_wrap_count]() {
// Build up a sender.
- LocklessQueue queue(memory_, config_);
- LocklessQueue::Sender sender = queue.MakeSender().value();
+ LocklessQueueSender sender = LocklessQueueSender::Make(queue_).value();
CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
// Signal that we are ready to start sending.
@@ -255,17 +246,16 @@
void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
::std::vector<ThreadState> *threads) {
// Now read back the results to double check.
- LocklessQueue queue(memory_, config_);
-
- const bool will_wrap =
- num_messages_ * num_threads_ * (1 + write_wrap_count) > queue.QueueSize();
+ LocklessQueueReader reader(queue_);
+ const bool will_wrap = num_messages_ * num_threads_ * (1 + write_wrap_count) >
+ LocklessQueueSize(queue_.memory());
monotonic_clock::time_point last_monotonic_sent_time =
monotonic_clock::epoch();
uint64_t initial_i = 0;
if (will_wrap) {
initial_i = (1 + write_wrap_count) * num_messages_ * num_threads_ -
- queue.QueueSize();
+ LocklessQueueSize(queue_.memory());
}
for (uint64_t i = initial_i;
@@ -279,27 +269,28 @@
char read_data[1024];
// Handle overflowing the message count for the wrap test.
- const uint32_t wrapped_i = i % static_cast<size_t>(QueueIndex::MaxIndex(
- 0xffffffffu, queue.QueueSize()));
- LocklessQueue::ReadResult read_result =
- queue.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
- &monotonic_remote_time, &realtime_remote_time,
- &remote_queue_index, &length, &(read_data[0]));
+ const uint32_t wrapped_i =
+ i % static_cast<size_t>(QueueIndex::MaxIndex(
+ 0xffffffffu, LocklessQueueSize(queue_.memory())));
+ LocklessQueueReader::Result read_result =
+ reader.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
+ &monotonic_remote_time, &realtime_remote_time,
+ &remote_queue_index, &length, &(read_data[0]));
if (race_reads) {
- if (read_result == LocklessQueue::ReadResult::NOTHING_NEW) {
+ if (read_result == LocklessQueueReader::Result::NOTHING_NEW) {
--i;
continue;
}
}
if (race_reads && will_wrap) {
- if (read_result == LocklessQueue::ReadResult::TOO_OLD) {
+ if (read_result == LocklessQueueReader::Result::TOO_OLD) {
continue;
}
}
// Every message should be good.
- ASSERT_EQ(read_result, LocklessQueue::ReadResult::GOOD) << ": i is " << i;
+ ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD) << ": i is " << i;
// And, confirm that time never went backwards.
ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
@@ -310,7 +301,8 @@
ThreadPlusCount tpc;
ASSERT_EQ(length, sizeof(ThreadPlusCount));
- memcpy(&tpc, read_data + queue.message_data_size() - length,
+ memcpy(&tpc,
+ read_data + LocklessQueueMessageDataSize(queue_.memory()) - length,
sizeof(ThreadPlusCount));
if (will_wrap) {