Improve const-correctness in the LocklessQueue code
This will allow using a read-only mapping for reading data from queues
in ShmEventLoop. This will make it much harder for code reading from a
queue to accidentally modify it.
This involves splitting up LocklessQueue into individual components.
This makes it much more obvious what state is used where, and allows
adding some consts.
Change-Id: Ic83b0d2169e6dfae3eec656aa8e49852125698d9
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index 4e84b9f..e1e2516 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -56,15 +56,16 @@
Reset();
}
- LocklessQueueMemory *get_memory() {
- return reinterpret_cast<LocklessQueueMemory *>(&(memory_[0]));
+ LocklessQueue queue() {
+ return LocklessQueue(reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
+ reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
+ config_);
}
- void Reset() { memset(get_memory(), 0, LocklessQueueMemorySize(config_)); }
+ void Reset() { memset(&memory_[0], 0, LocklessQueueMemorySize(config_)); }
// Runs until the signal is received.
void RunUntilWakeup(Event *ready, int priority) {
- LocklessQueue queue(get_memory(), config_);
internal::EPoll epoll;
SignalFd signalfd({kWakeupSignal});
@@ -75,16 +76,18 @@
epoll.Quit();
});
- // Register to be woken up *after* the signalfd is catching the signals.
- queue.RegisterWakeup(priority);
+ {
+ // Register to be woken up *after* the signalfd is catching the signals.
+ LocklessQueueWatcher watcher =
+ LocklessQueueWatcher::Make(queue(), priority).value();
- // And signal we are now ready.
- ready->Set();
+ // And signal we are now ready.
+ ready->Set();
- epoll.Run();
+ epoll.Run();
- // Cleanup.
- queue.UnregisterWakeup();
+ // Cleanup, ensuring the watcher is destroyed before the signalfd.
+ }
epoll.DeleteFd(signalfd.fd());
}
@@ -99,36 +102,35 @@
// Tests that wakeup doesn't do anything if nothing was registered.
TEST_F(LocklessQueueTest, NoWatcherWakeup) {
- LocklessQueue queue(get_memory(), config_);
+ LocklessQueueWakeUpper wake_upper(queue());
- EXPECT_EQ(queue.Wakeup(7), 0);
+ EXPECT_EQ(wake_upper.Wakeup(7), 0);
}
// Tests that wakeup doesn't do anything if a wakeup was registered and then
// unregistered.
TEST_F(LocklessQueueTest, UnregisteredWatcherWakeup) {
- LocklessQueue queue(get_memory(), config_);
+ LocklessQueueWakeUpper wake_upper(queue());
- queue.RegisterWakeup(5);
- queue.UnregisterWakeup();
+ { LocklessQueueWatcher::Make(queue(), 5).value(); }
- EXPECT_EQ(queue.Wakeup(7), 0);
+ EXPECT_EQ(wake_upper.Wakeup(7), 0);
}
// Tests that wakeup doesn't do anything if the thread dies.
TEST_F(LocklessQueueTest, DiedWatcherWakeup) {
- LocklessQueue queue(get_memory(), config_);
+ LocklessQueueWakeUpper wake_upper(queue());
::std::thread([this]() {
// Use placement new so the destructor doesn't get run.
- ::std::aligned_storage<sizeof(LocklessQueue), alignof(LocklessQueue)>::type
- data;
- LocklessQueue *q = new (&data) LocklessQueue(get_memory(), config_);
- // Register a wakeup.
- q->RegisterWakeup(5);
- }).join();
+ ::std::aligned_storage<sizeof(LocklessQueueWatcher),
+ alignof(LocklessQueueWatcher)>::type data;
+ new (&data)
+ LocklessQueueWatcher(LocklessQueueWatcher::Make(queue(), 5).value());
+ })
+ .join();
- EXPECT_EQ(queue.Wakeup(7), 0);
+ EXPECT_EQ(wake_upper.Wakeup(7), 0);
}
struct WatcherState {
@@ -155,16 +157,13 @@
WatcherState *s = &queues.back();
queues.back().t = ::std::thread([this, &cleanup, s]() {
- LocklessQueue q(get_memory(), config_);
- EXPECT_TRUE(q.RegisterWakeup(0));
+ LocklessQueueWatcher q = LocklessQueueWatcher::Make(queue(), 0).value();
// Signal that this thread is ready.
s->ready.Set();
// And wait until we are asked to shut down.
cleanup.Wait();
-
- q.UnregisterWakeup();
});
}
@@ -174,12 +173,9 @@
}
// Now try to allocate another one. This will fail.
- {
- LocklessQueue queue(get_memory(), config_);
- EXPECT_FALSE(queue.RegisterWakeup(0));
- }
+ EXPECT_FALSE(LocklessQueueWatcher::Make(queue(), 0));
- // Trigger the threads to cleanup their resources, and wait unti they are
+ // Trigger the threads to cleanup their resources, and wait until they are
// done.
cleanup.Set();
for (WatcherState &w : queues) {
@@ -187,23 +183,16 @@
}
// We should now be able to allocate a wakeup.
- {
- LocklessQueue queue(get_memory(), config_);
- EXPECT_TRUE(queue.RegisterWakeup(0));
- queue.UnregisterWakeup();
- }
+ EXPECT_TRUE(LocklessQueueWatcher::Make(queue(), 0));
}
// Tests that too many watchers dies like expected.
TEST_F(LocklessQueueTest, TooManySenders) {
- ::std::vector<::std::unique_ptr<LocklessQueue>> queues;
- ::std::vector<LocklessQueue::Sender> senders;
+ ::std::vector<LocklessQueueSender> senders;
for (size_t i = 0; i < config_.num_senders; ++i) {
- queues.emplace_back(new LocklessQueue(get_memory(), config_));
- senders.emplace_back(queues.back()->MakeSender().value());
+ senders.emplace_back(LocklessQueueSender::Make(queue()).value());
}
- queues.emplace_back(new LocklessQueue(get_memory(), config_));
- EXPECT_FALSE(queues.back()->MakeSender());
+ EXPECT_FALSE(LocklessQueueSender::Make(queue()));
}
// Now, start 2 threads and have them receive the signals.
@@ -212,7 +201,7 @@
EXPECT_LE(kWakeupSignal, SIGRTMAX);
EXPECT_GE(kWakeupSignal, SIGRTMIN);
- LocklessQueue queue(get_memory(), config_);
+ LocklessQueueWakeUpper wake_upper(queue());
// Event used to make sure the thread is ready before the test starts.
Event ready1;
@@ -225,7 +214,7 @@
ready1.Wait();
ready2.Wait();
- EXPECT_EQ(queue.Wakeup(3), 2);
+ EXPECT_EQ(wake_upper.Wakeup(3), 2);
t1.join();
t2.join();
@@ -237,15 +226,14 @@
// Do a simple send test.
TEST_F(LocklessQueueTest, Send) {
- LocklessQueue queue(get_memory(), config_);
-
- LocklessQueue::Sender sender = queue.MakeSender().value();
+ LocklessQueueSender sender = LocklessQueueSender::Make(queue()).value();
+ LocklessQueueReader reader(queue());
// Send enough messages to wrap.
for (int i = 0; i < 20000; ++i) {
// Confirm that the queue index makes sense given the number of sends.
- EXPECT_EQ(queue.LatestQueueIndex().index(),
- i == 0 ? LocklessQueue::empty_queue_index().index() : i - 1);
+ EXPECT_EQ(reader.LatestIndex().index(),
+ i == 0 ? QueueIndex::Invalid().index() : i - 1);
// Send a trivial piece of data.
char data[100];
@@ -254,7 +242,7 @@
// Confirm that the queue index still makes sense. This is easier since the
// empty case has been handled.
- EXPECT_EQ(queue.LatestQueueIndex().index(), i);
+ EXPECT_EQ(reader.LatestIndex().index(), i);
// Read a result from 5 in the past.
::aos::monotonic_clock::time_point monotonic_sent_time;
@@ -271,15 +259,15 @@
} else {
index = index.IncrementBy(i - 5);
}
- LocklessQueue::ReadResult read_result =
- queue.Read(index.index(), &monotonic_sent_time, &realtime_sent_time,
- &monotonic_remote_time, &realtime_remote_time,
- &remote_queue_index, &length, &(read_data[0]));
+ LocklessQueueReader::Result read_result =
+ reader.Read(index.index(), &monotonic_sent_time, &realtime_sent_time,
+ &monotonic_remote_time, &realtime_remote_time,
+ &remote_queue_index, &length, &(read_data[0]));
// This should either return GOOD, or TOO_OLD if it is before the start of
// the queue.
- if (read_result != LocklessQueue::ReadResult::GOOD) {
- EXPECT_EQ(read_result, LocklessQueue::ReadResult::TOO_OLD);
+ if (read_result != LocklessQueueReader::Result::GOOD) {
+ EXPECT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
}
}
}
@@ -295,9 +283,8 @@
const chrono::seconds print_frequency(FLAGS_print_rate);
- QueueRacer racer(get_memory(), FLAGS_thread_count, kNumMessages, config_);
- const monotonic_clock::time_point start_time =
- monotonic_clock::now();
+ QueueRacer racer(queue(), FLAGS_thread_count, kNumMessages);
+ const monotonic_clock::time_point start_time = monotonic_clock::now();
const monotonic_clock::time_point end_time =
start_time + chrono::seconds(FLAGS_duration);
@@ -333,7 +320,7 @@
// Send enough messages to wrap the 32 bit send counter.
TEST_F(LocklessQueueTest, WrappedSend) {
uint64_t kNumMessages = 0x100010000ul;
- QueueRacer racer(get_memory(), 1, kNumMessages, config_);
+ QueueRacer racer(queue(), 1, kNumMessages);
const monotonic_clock::time_point start_time = monotonic_clock::now();
EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));