Improve const-correctness in the LocklessQueue code
This will allow using a read-only mapping for reading data from queues
in ShmEventLoop. This will make it much harder for code reading from a
queue to accidentally modify it.
This involves splitting up LocklessQueue into individual components.
This makes it much more obvious what state is used where, and allows
adding some consts.
Change-Id: Ic83b0d2169e6dfae3eec656aa8e49852125698d9
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index e45c065..1ac8656 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -164,6 +164,10 @@
const ipc_lib::LocklessQueueConfiguration &config() const { return config_; }
+ ipc_lib::LocklessQueue queue() const {
+ return ipc_lib::LocklessQueue(memory(), memory(), config());
+ }
+
absl::Span<char> GetSharedMemory() const {
return absl::Span<char>(static_cast<char *>(data_), size_);
}
@@ -207,8 +211,7 @@
channel,
chrono::ceil<chrono::seconds>(chrono::nanoseconds(
event_loop->configuration()->channel_storage_duration()))),
- lockless_queue_(lockless_queue_memory_.memory(),
- lockless_queue_memory_.config()) {
+ reader_(lockless_queue_memory_.queue()) {
context_.data = nullptr;
// Point the queue index at the next index to read starting now. This
// makes it such that FetchNext will read the next message sent after
@@ -238,7 +241,8 @@
// Sets this object to pin data in shared memory when fetching.
void PinDataOnFetch() {
CHECK(!copy_data());
- auto maybe_pinner = lockless_queue_.MakePinner();
+ auto maybe_pinner =
+ ipc_lib::LocklessQueuePinner::Make(lockless_queue_memory_.queue());
if (!maybe_pinner) {
LOG(FATAL) << "Failed to create reader on "
<< configuration::CleanedChannelToString(channel_)
@@ -250,26 +254,26 @@
// Points the next message to fetch at the queue index which will be
// populated next.
void PointAtNextQueueIndex() {
- actual_queue_index_ = lockless_queue_.LatestQueueIndex();
+ actual_queue_index_ = reader_.LatestIndex();
if (!actual_queue_index_.valid()) {
// Nothing in the queue. The next element will show up at the 0th
// index in the queue.
- actual_queue_index_ =
- ipc_lib::QueueIndex::Zero(lockless_queue_.queue_size());
+ actual_queue_index_ = ipc_lib::QueueIndex::Zero(
+ LocklessQueueSize(lockless_queue_memory_.memory()));
} else {
actual_queue_index_ = actual_queue_index_.Increment();
}
}
bool FetchNext() {
- const ipc_lib::LocklessQueue::ReadResult read_result =
+ const ipc_lib::LocklessQueueReader::Result read_result =
DoFetch(actual_queue_index_);
- return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
+ return read_result == ipc_lib::LocklessQueueReader::Result::GOOD;
}
bool Fetch() {
- const ipc_lib::QueueIndex queue_index = lockless_queue_.LatestQueueIndex();
+ const ipc_lib::QueueIndex queue_index = reader_.LatestIndex();
// actual_queue_index_ is only meaningful if it was set by Fetch or
// FetchNext. This happens when valid_data_ has been set. So, only
// skip checking if valid_data_ is true.
@@ -282,22 +286,29 @@
return false;
}
- const ipc_lib::LocklessQueue::ReadResult read_result = DoFetch(queue_index);
+ const ipc_lib::LocklessQueueReader::Result read_result =
+ DoFetch(queue_index);
- CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW)
+ CHECK(read_result != ipc_lib::LocklessQueueReader::Result::NOTHING_NEW)
<< ": Queue index went backwards. This should never happen. "
<< configuration::CleanedChannelToString(channel_);
- return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
+ return read_result == ipc_lib::LocklessQueueReader::Result::GOOD;
}
Context context() const { return context_; }
bool RegisterWakeup(int priority) {
- return lockless_queue_.RegisterWakeup(priority);
+ CHECK(!watcher_);
+ watcher_ = ipc_lib::LocklessQueueWatcher::Make(
+ lockless_queue_memory_.queue(), priority);
+ return static_cast<bool>(watcher_);
}
- void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
+ void UnregisterWakeup() {
+ CHECK(watcher_);
+ watcher_ = std::nullopt;
+ }
absl::Span<char> GetSharedMemory() const {
return lockless_queue_memory_.GetSharedMemory();
@@ -309,23 +320,24 @@
// grab the whole shared memory buffer instead.
return absl::Span<char>(
const_cast<SimpleShmFetcher *>(this)->data_storage_start(),
- lockless_queue_.message_data_size());
+ LocklessQueueMessageDataSize(lockless_queue_memory_.memory()));
}
private:
- ipc_lib::LocklessQueue::ReadResult DoFetch(ipc_lib::QueueIndex queue_index) {
+ ipc_lib::LocklessQueueReader::Result DoFetch(
+ ipc_lib::QueueIndex queue_index) {
// TODO(austin): Get behind and make sure it dies.
char *copy_buffer = nullptr;
if (copy_data()) {
copy_buffer = data_storage_start();
}
- ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
+ ipc_lib::LocklessQueueReader::Result read_result = reader_.Read(
queue_index.index(), &context_.monotonic_event_time,
&context_.realtime_event_time, &context_.monotonic_remote_time,
&context_.realtime_remote_time, &context_.remote_queue_index,
&context_.size, copy_buffer);
- if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
+ if (read_result == ipc_lib::LocklessQueueReader::Result::GOOD) {
if (pin_data()) {
const int pin_result = pinner_->PinIndex(queue_index.index());
CHECK(pin_result >= 0)
@@ -351,7 +363,9 @@
const char *const data = DataBuffer();
if (data) {
context_.data =
- data + lockless_queue_.message_data_size() - context_.size;
+ data +
+ LocklessQueueMessageDataSize(lockless_queue_memory_.memory()) -
+ context_.size;
} else {
context_.data = nullptr;
}
@@ -361,7 +375,7 @@
// Make sure the data wasn't modified while we were reading it. This
// can only happen if you are reading the last message *while* it is
// being written to, which means you are pretty far behind.
- CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
+ CHECK(read_result != ipc_lib::LocklessQueueReader::Result::OVERWROTE)
<< ": Got behind while reading and the last message was modified "
"out from under us while we were reading it. Don't get so far "
"behind on: "
@@ -370,7 +384,7 @@
// We fell behind between when we read the index and read the value.
// This isn't worth recovering from since this means we went to sleep
// for a long time in the middle of this function.
- if (read_result == ipc_lib::LocklessQueue::ReadResult::TOO_OLD) {
+ if (read_result == ipc_lib::LocklessQueueReader::Result::TOO_OLD) {
event_loop_->SendTimingReport();
LOG(FATAL) << "The next message is no longer available. "
<< configuration::CleanedChannelToString(channel_);
@@ -402,16 +416,17 @@
aos::ShmEventLoop *event_loop_;
const Channel *const channel_;
MMapedQueue lockless_queue_memory_;
- ipc_lib::LocklessQueue lockless_queue_;
+ ipc_lib::LocklessQueueReader reader_;
+ // This being nullopt indicates we're not looking for wakeups right now.
+ std::optional<ipc_lib::LocklessQueueWatcher> watcher_;
- ipc_lib::QueueIndex actual_queue_index_ =
- ipc_lib::LocklessQueue::empty_queue_index();
+ ipc_lib::QueueIndex actual_queue_index_ = ipc_lib::QueueIndex::Invalid();
// This being empty indicates we're not going to copy data.
std::unique_ptr<char, decltype(&free)> data_storage_{nullptr, &free};
// This being nullopt indicates we're not going to pin messages.
- std::optional<ipc_lib::LocklessQueue::Pinner> pinner_;
+ std::optional<ipc_lib::LocklessQueuePinner> pinner_;
Context context_;
};
@@ -458,15 +473,15 @@
channel,
chrono::ceil<chrono::seconds>(chrono::nanoseconds(
event_loop->configuration()->channel_storage_duration()))),
- lockless_queue_(lockless_queue_memory_.memory(),
- lockless_queue_memory_.config()),
- lockless_queue_sender_(
- VerifySender(lockless_queue_.MakeSender(), channel)) {}
+ lockless_queue_sender_(VerifySender(
+ ipc_lib::LocklessQueueSender::Make(lockless_queue_memory_.queue()),
+ channel)),
+ wake_upper_(lockless_queue_memory_.queue()) {}
~ShmSender() override {}
- static ipc_lib::LocklessQueue::Sender VerifySender(
- std::optional<ipc_lib::LocklessQueue::Sender> &&sender,
+ static ipc_lib::LocklessQueueSender VerifySender(
+ std::optional<ipc_lib::LocklessQueueSender> sender,
const Channel *channel) {
if (sender) {
return std::move(sender.value());
@@ -488,7 +503,7 @@
lockless_queue_sender_.Send(
length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
&monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_);
- lockless_queue_.Wakeup(event_loop()->priority());
+ wake_upper_.Wakeup(event_loop()->priority());
return true;
}
@@ -503,7 +518,7 @@
monotonic_remote_time, realtime_remote_time,
remote_queue_index, &monotonic_sent_time_,
&realtime_sent_time_, &sent_queue_index_);
- lockless_queue_.Wakeup(event_loop()->priority());
+ wake_upper_.Wakeup(event_loop()->priority());
// TODO(austin): Return an error if we send too fast.
return true;
}
@@ -516,8 +531,8 @@
private:
MMapedQueue lockless_queue_memory_;
- ipc_lib::LocklessQueue lockless_queue_;
- ipc_lib::LocklessQueue::Sender lockless_queue_sender_;
+ ipc_lib::LocklessQueueSender lockless_queue_sender_;
+ ipc_lib::LocklessQueueWakeUpper wake_upper_;
};
// Class to manage the state for a Watcher.
diff --git a/aos/ipc_lib/index.h b/aos/ipc_lib/index.h
index 890e85f..a47121e 100644
--- a/aos/ipc_lib/index.h
+++ b/aos/ipc_lib/index.h
@@ -133,12 +133,12 @@
struct AtomicQueueIndex {
public:
// Atomically reads the index without any ordering constraints.
- QueueIndex RelaxedLoad(uint32_t count) {
+ QueueIndex RelaxedLoad(uint32_t count) const {
return QueueIndex(index_.load(::std::memory_order_relaxed), count);
}
// Full bidirectional barriers here.
- QueueIndex Load(uint32_t count) {
+ QueueIndex Load(uint32_t count) const {
return QueueIndex(index_.load(::std::memory_order_acquire), count);
}
inline void Store(QueueIndex value) {
@@ -222,7 +222,7 @@
class AtomicIndex {
public:
// Stores and loads atomically without ordering constraints.
- Index RelaxedLoad() {
+ Index RelaxedLoad() const {
return Index(index_.load(::std::memory_order_relaxed));
}
void RelaxedStore(Index index) {
@@ -237,7 +237,7 @@
void Store(Index index) {
index_.store(index.index_, ::std::memory_order_release);
}
- Index Load() { return Index(index_.load(::std::memory_order_acquire)); }
+ Index Load() const { return Index(index_.load(::std::memory_order_acquire)); }
// Swaps expected for index atomically. Returns true on success, false
// otherwise.
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index dad44bb..4115769 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -424,6 +424,13 @@
return syscall(SYS_rt_tgsigqueueinfo, tgid, tid, sig, si);
}
+QueueIndex ZeroOrValid(QueueIndex index) {
+ if (!index.valid()) {
+ return index.Clear();
+ }
+ return index;
+}
+
} // namespace
size_t LocklessQueueConfiguration::message_size() const {
@@ -571,30 +578,57 @@
return memory;
}
-LocklessQueue::LocklessQueue(LocklessQueueMemory *memory,
- LocklessQueueConfiguration config)
- : memory_(InitializeLocklessQueueMemory(memory, config)),
- watcher_copy_(memory_->num_watchers()),
- pid_(getpid()),
- uid_(getuid()) {}
+void LocklessQueue::Initialize() {
+ InitializeLocklessQueueMemory(memory_, config_);
+}
-LocklessQueue::~LocklessQueue() {
- CHECK_EQ(watcher_index_, -1);
+LocklessQueueWatcher::~LocklessQueueWatcher() {
+ if (watcher_index_ == -1) {
+ return;
+ }
+ // Since everything is self consistent, all we need to do is make sure nobody
+ // else is running. Someone dying will get caught in the generic consistency
+ // check.
GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
- const int num_watchers = memory_->num_watchers();
+
+ // Make sure we are registered.
+ CHECK_NE(watcher_index_, -1);
+
+ // Make sure we still own the slot we are supposed to.
+ CHECK(
+ death_notification_is_held(&(memory_->GetWatcher(watcher_index_)->tid)));
+
+ // The act of unlocking invalidates the entry. Invalidate it.
+ death_notification_release(&(memory_->GetWatcher(watcher_index_)->tid));
+ // And internally forget the slot.
+ watcher_index_ = -1;
+
// Cleanup is cheap. The next user will do it anyways, so no need for us to do
// anything right now.
// And confirm that nothing is owned by us.
+ const int num_watchers = memory_->num_watchers();
for (int i = 0; i < num_watchers; ++i) {
- CHECK(!death_notification_is_held(&(memory_->GetWatcher(i)->tid)));
+ CHECK(!death_notification_is_held(&(memory_->GetWatcher(i)->tid)))
+ << ": " << i;
}
}
-size_t LocklessQueue::QueueSize() const { return memory_->queue_size(); }
+std::optional<LocklessQueueWatcher> LocklessQueueWatcher::Make(
+ LocklessQueue queue, int priority) {
+ queue.Initialize();
+ LocklessQueueWatcher result(queue.memory(), priority);
+ if (result.watcher_index_ != -1) {
+ return std::move(result);
+ } else {
+ return std::nullopt;
+ }
+}
-bool LocklessQueue::RegisterWakeup(int priority) {
+LocklessQueueWatcher::LocklessQueueWatcher(LocklessQueueMemory *memory,
+ int priority)
+ : memory_(memory) {
// TODO(austin): Make sure signal coalescing is turned on. We don't need
// duplicates. That will improve performance under high load.
@@ -623,10 +657,10 @@
// Bail if we failed to find an open slot.
if (watcher_index_ == -1) {
- return false;
+ return;
}
- Watcher *w = memory_->GetWatcher(watcher_index_);
+ Watcher *const w = memory_->GetWatcher(watcher_index_);
w->pid = getpid();
w->priority = priority;
@@ -634,29 +668,15 @@
// Grabbing a mutex is a compiler and memory barrier, so nothing before will
// get rearranged afterwords.
death_notification_init(&(w->tid));
- return true;
}
-void LocklessQueue::UnregisterWakeup() {
- // Since everything is self consistent, all we need to do is make sure nobody
- // else is running. Someone dying will get caught in the generic consistency
- // check.
- GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
-
- // Make sure we are registered.
- CHECK_NE(watcher_index_, -1);
-
- // Make sure we still own the slot we are supposed to.
- CHECK(
- death_notification_is_held(&(memory_->GetWatcher(watcher_index_)->tid)));
-
- // The act of unlocking invalidates the entry. Invalidate it.
- death_notification_release(&(memory_->GetWatcher(watcher_index_)->tid));
- // And internally forget the slot.
- watcher_index_ = -1;
+LocklessQueueWakeUpper::LocklessQueueWakeUpper(LocklessQueue queue)
+ : memory_(queue.const_memory()), pid_(getpid()), uid_(getuid()) {
+ queue.Initialize();
+ watcher_copy_.resize(memory_->num_watchers());
}
-int LocklessQueue::Wakeup(const int current_priority) {
+int LocklessQueueWakeUpper::Wakeup(const int current_priority) {
const size_t num_watchers = memory_->num_watchers();
CHECK_EQ(watcher_copy_.size(), num_watchers);
@@ -668,7 +688,7 @@
// question. There is no way without pidfd's to close this window, and
// creating a pidfd is likely not RT.
for (size_t i = 0; i < num_watchers; ++i) {
- Watcher *w = memory_->GetWatcher(i);
+ const Watcher *w = memory_->GetWatcher(i);
watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_RELAXED);
// Force the load of the TID to come first.
aos_compiler_memory_barrier();
@@ -741,7 +761,8 @@
return count;
}
-LocklessQueue::Sender::Sender(LocklessQueueMemory *memory) : memory_(memory) {
+LocklessQueueSender::LocklessQueueSender(LocklessQueueMemory *memory)
+ : memory_(memory) {
GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
// Since we already have the lock, go ahead and try cleaning up.
@@ -778,15 +799,29 @@
<< ": " << std::hex << scratch_index.get();
}
-LocklessQueue::Sender::~Sender() {
- if (valid()) {
+LocklessQueueSender::~LocklessQueueSender() {
+ if (sender_index_ != -1) {
+ CHECK(memory_ != nullptr);
death_notification_release(&(memory_->GetSender(sender_index_)->tid));
}
}
-size_t LocklessQueue::Sender::size() { return memory_->message_data_size(); }
+std::optional<LocklessQueueSender> LocklessQueueSender::Make(
+ LocklessQueue queue) {
+ queue.Initialize();
+ LocklessQueueSender result(queue.memory());
+ if (result.sender_index_ != -1) {
+ return std::move(result);
+ } else {
+ return std::nullopt;
+ }
+}
-void *LocklessQueue::Sender::Data() {
+size_t LocklessQueueSender::size() const {
+ return memory_->message_data_size();
+}
+
+void *LocklessQueueSender::Data() {
::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
const Index scratch_index = sender->scratch_index.RelaxedLoad();
Message *const message = memory_->GetMessage(scratch_index);
@@ -799,7 +834,7 @@
return message->data(memory_->message_data_size());
}
-void LocklessQueue::Sender::Send(
+void LocklessQueueSender::Send(
const char *data, size_t length,
aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
@@ -816,127 +851,7 @@
monotonic_sent_time, realtime_sent_time, queue_index);
}
-LocklessQueue::Pinner::Pinner(LocklessQueueMemory *memory) : memory_(memory) {
- GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
-
- // Since we already have the lock, go ahead and try cleaning up.
- Cleanup(memory_, grab_queue_setup_lock);
-
- const int num_pinners = memory_->num_pinners();
-
- for (int i = 0; i < num_pinners; ++i) {
- ::aos::ipc_lib::Pinner *p = memory->GetPinner(i);
- // This doesn't need synchronization because we're the only process doing
- // initialization right now, and nobody else will be touching pinners which
- // we're interested in.
- const uint32_t tid = __atomic_load_n(&(p->tid.futex), __ATOMIC_RELAXED);
- if (tid == 0) {
- pinner_index_ = i;
- break;
- }
- }
-
- if (pinner_index_ == -1) {
- VLOG(1) << "Too many pinners, starting to bail.";
- return;
- }
-
- ::aos::ipc_lib::Pinner *p = memory_->GetPinner(pinner_index_);
- p->pinned.Invalidate();
-
- // Indicate that we are now alive by taking over the slot. If the previous
- // owner died, we still want to do this.
- death_notification_init(&(p->tid));
-}
-
-LocklessQueue::Pinner::~Pinner() {
- if (valid()) {
- memory_->GetPinner(pinner_index_)->pinned.Invalidate();
- aos_compiler_memory_barrier();
- death_notification_release(&(memory_->GetPinner(pinner_index_)->tid));
- }
-}
-
-// This method doesn't mess with any scratch_index, so it doesn't have to worry
-// about message ownership.
-int LocklessQueue::Pinner::PinIndex(uint32_t uint32_queue_index) {
- const size_t queue_size = memory_->queue_size();
- const QueueIndex queue_index =
- QueueIndex::Zero(queue_size).IncrementBy(uint32_queue_index);
- ipc_lib::Pinner *const pinner = memory_->GetPinner(pinner_index_);
-
- AtomicIndex *const queue_slot = memory_->GetQueue(queue_index.Wrapped());
-
- // Indicate that we want to pin this message.
- pinner->pinned.Store(queue_index);
- aos_compiler_memory_barrier();
-
- {
- const Index message_index = queue_slot->Load();
- Message *const message = memory_->GetMessage(message_index);
-
- const QueueIndex message_queue_index =
- message->header.queue_index.Load(queue_size);
- if (message_queue_index == queue_index) {
- VLOG(3) << "Eq: " << std::hex << message_queue_index.index();
- aos_compiler_memory_barrier();
- return message_index.message_index();
- }
- VLOG(3) << "Message reused: " << std::hex << message_queue_index.index()
- << ", " << queue_index.index();
- }
-
- // Being down here means we asked to pin a message before realizing it's no
- // longer in the queue, so back that out now.
- pinner->pinned.Invalidate();
- VLOG(3) << "Unpinned: " << std::hex << queue_index.index();
- return -1;
-}
-
-size_t LocklessQueue::Pinner::size() const {
- return memory_->message_data_size();
-}
-
-const void *LocklessQueue::Pinner::Data() const {
- const size_t queue_size = memory_->queue_size();
- ::aos::ipc_lib::Pinner *pinner = memory_->GetPinner(pinner_index_);
- QueueIndex pinned = pinner->pinned.RelaxedLoad(queue_size);
- CHECK(pinned.valid());
- const Message *message = memory_->GetMessage(pinned);
-
- return message->data(memory_->message_data_size());
-}
-
-std::optional<LocklessQueue::Sender> LocklessQueue::MakeSender() {
- LocklessQueue::Sender result = LocklessQueue::Sender(memory_);
- if (result.valid()) {
- return std::move(result);
- } else {
- return std::nullopt;
- }
-}
-
-std::optional<LocklessQueue::Pinner> LocklessQueue::MakePinner() {
- LocklessQueue::Pinner result = LocklessQueue::Pinner(memory_);
- if (result.valid()) {
- return std::move(result);
- } else {
- return std::nullopt;
- }
-}
-
-namespace {
-
-QueueIndex ZeroOrValid(QueueIndex index) {
- if (!index.valid()) {
- return index.Clear();
- }
- return index;
-}
-
-} // namespace
-
-void LocklessQueue::Sender::Send(
+void LocklessQueueSender::Send(
size_t length, aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index,
@@ -1094,7 +1009,7 @@
memory_->GetMessage(new_scratch)->header.queue_index.RelaxedInvalidate();
}
-int LocklessQueue::Sender::buffer_index() const {
+int LocklessQueueSender::buffer_index() const {
::aos::ipc_lib::Sender *const sender = memory_->GetSender(sender_index_);
// We can do a relaxed load on our sender because we're the only person
// modifying it right now.
@@ -1102,13 +1017,119 @@
return scratch_index.message_index();
}
-LocklessQueue::ReadResult LocklessQueue::Read(
+LocklessQueuePinner::LocklessQueuePinner(
+ LocklessQueueMemory *memory, const LocklessQueueMemory *const_memory)
+ : memory_(memory), const_memory_(const_memory) {
+ GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
+
+ // Since we already have the lock, go ahead and try cleaning up.
+ Cleanup(memory_, grab_queue_setup_lock);
+
+ const int num_pinners = memory_->num_pinners();
+
+ for (int i = 0; i < num_pinners; ++i) {
+ ::aos::ipc_lib::Pinner *p = memory->GetPinner(i);
+ // This doesn't need synchronization because we're the only process doing
+ // initialization right now, and nobody else will be touching pinners which
+ // we're interested in.
+ const uint32_t tid = __atomic_load_n(&(p->tid.futex), __ATOMIC_RELAXED);
+ if (tid == 0) {
+ pinner_index_ = i;
+ break;
+ }
+ }
+
+ if (pinner_index_ == -1) {
+ VLOG(1) << "Too many pinners, starting to bail.";
+ return;
+ }
+
+ ::aos::ipc_lib::Pinner *p = memory_->GetPinner(pinner_index_);
+ p->pinned.Invalidate();
+
+ // Indicate that we are now alive by taking over the slot. If the previous
+ // owner died, we still want to do this.
+ death_notification_init(&(p->tid));
+}
+
+LocklessQueuePinner::~LocklessQueuePinner() {
+ if (pinner_index_ != -1) {
+ CHECK(memory_ != nullptr);
+ memory_->GetPinner(pinner_index_)->pinned.Invalidate();
+ aos_compiler_memory_barrier();
+ death_notification_release(&(memory_->GetPinner(pinner_index_)->tid));
+ }
+}
+
+std::optional<LocklessQueuePinner> LocklessQueuePinner::Make(
+ LocklessQueue queue) {
+ queue.Initialize();
+ LocklessQueuePinner result(queue.memory(), queue.const_memory());
+ if (result.pinner_index_ != -1) {
+ return std::move(result);
+ } else {
+ return std::nullopt;
+ }
+}
+
+// This method doesn't mess with any scratch_index, so it doesn't have to worry
+// about message ownership.
+int LocklessQueuePinner::PinIndex(uint32_t uint32_queue_index) {
+ const size_t queue_size = memory_->queue_size();
+ const QueueIndex queue_index =
+ QueueIndex::Zero(queue_size).IncrementBy(uint32_queue_index);
+ ipc_lib::Pinner *const pinner = memory_->GetPinner(pinner_index_);
+
+ AtomicIndex *const queue_slot = memory_->GetQueue(queue_index.Wrapped());
+
+ // Indicate that we want to pin this message.
+ pinner->pinned.Store(queue_index);
+ aos_compiler_memory_barrier();
+
+ {
+ const Index message_index = queue_slot->Load();
+ Message *const message = memory_->GetMessage(message_index);
+
+ const QueueIndex message_queue_index =
+ message->header.queue_index.Load(queue_size);
+ if (message_queue_index == queue_index) {
+ VLOG(3) << "Eq: " << std::hex << message_queue_index.index();
+ aos_compiler_memory_barrier();
+ return message_index.message_index();
+ }
+ VLOG(3) << "Message reused: " << std::hex << message_queue_index.index()
+ << ", " << queue_index.index();
+ }
+
+ // Being down here means we asked to pin a message before realizing it's no
+ // longer in the queue, so back that out now.
+ pinner->pinned.Invalidate();
+ VLOG(3) << "Unpinned: " << std::hex << queue_index.index();
+ return -1;
+}
+
+size_t LocklessQueuePinner::size() const {
+ return const_memory_->message_data_size();
+}
+
+const void *LocklessQueuePinner::Data() const {
+ const size_t queue_size = const_memory_->queue_size();
+ const ::aos::ipc_lib::Pinner *const pinner =
+ const_memory_->GetPinner(pinner_index_);
+ QueueIndex pinned = pinner->pinned.RelaxedLoad(queue_size);
+ CHECK(pinned.valid());
+ const Message *message = const_memory_->GetMessage(pinned);
+
+ return message->data(const_memory_->message_data_size());
+}
+
+LocklessQueueReader::Result LocklessQueueReader::Read(
uint32_t uint32_queue_index,
::aos::monotonic_clock::time_point *monotonic_sent_time,
::aos::realtime_clock::time_point *realtime_sent_time,
::aos::monotonic_clock::time_point *monotonic_remote_time,
::aos::realtime_clock::time_point *realtime_remote_time,
- uint32_t *remote_queue_index, size_t *length, char *data) {
+ uint32_t *remote_queue_index, size_t *length, char *data) const {
const size_t queue_size = memory_->queue_size();
// Build up the QueueIndex.
@@ -1117,7 +1138,7 @@
// Read the message stored at the requested location.
Index mi = memory_->LoadIndex(queue_index);
- Message *m = memory_->GetMessage(mi);
+ const Message *m = memory_->GetMessage(mi);
while (true) {
// We need to confirm that the data doesn't change while we are reading it.
@@ -1130,13 +1151,13 @@
if (starting_queue_index == queue_index.DecrementBy(queue_size)) {
VLOG(3) << "Matches: " << std::hex << starting_queue_index.index()
<< ", " << queue_index.DecrementBy(queue_size).index();
- return ReadResult::NOTHING_NEW;
+ return Result::NOTHING_NEW;
}
// Someone has re-used this message between when we pulled it out of the
// queue and when we grabbed its index. It is pretty hard to deduce
// what happened. Just try again.
- Message *const new_m = memory_->GetMessage(queue_index);
+ const Message *const new_m = memory_->GetMessage(queue_index);
if (m != new_m) {
m = new_m;
VLOG(3) << "Retrying, m doesn't match";
@@ -1154,7 +1175,7 @@
<< ", got " << starting_queue_index.index() << ", behind by "
<< std::dec
<< (starting_queue_index.index() - queue_index.index());
- return ReadResult::TOO_OLD;
+ return Result::TOO_OLD;
}
VLOG(3) << "Initial";
@@ -1167,10 +1188,10 @@
// the queue. Tell them that they are behind.
if (uint32_queue_index < memory_->queue_size()) {
VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
- return ReadResult::NOTHING_NEW;
+ return Result::NOTHING_NEW;
} else {
VLOG(3) << "Not near zero, " << std::hex << uint32_queue_index;
- return ReadResult::TOO_OLD;
+ return Result::TOO_OLD;
}
}
VLOG(3) << "Eq: " << std::hex << starting_queue_index.index() << ", "
@@ -1190,7 +1211,8 @@
*monotonic_remote_time = m->header.monotonic_remote_time;
*realtime_remote_time = m->header.realtime_remote_time;
if (data) {
- memcpy(data, m->data(memory_->message_data_size()), message_data_size());
+ memcpy(data, m->data(memory_->message_data_size()),
+ memory_->message_data_size());
}
*length = m->header.length;
@@ -1205,18 +1227,13 @@
<< queue_index.index() << ", finished with "
<< final_queue_index.index() << ", delta: " << std::dec
<< (final_queue_index.index() - queue_index.index());
- return ReadResult::OVERWROTE;
+ return Result::OVERWROTE;
}
- return ReadResult::GOOD;
+ return Result::GOOD;
}
-size_t LocklessQueue::queue_size() const { return memory_->queue_size(); }
-size_t LocklessQueue::message_data_size() const {
- return memory_->message_data_size();
-}
-
-QueueIndex LocklessQueue::LatestQueueIndex() {
+QueueIndex LocklessQueueReader::LatestIndex() const {
const size_t queue_size = memory_->queue_size();
// There is only one interesting case. We need to know if the queue is empty.
@@ -1226,9 +1243,16 @@
if (next_queue_index.valid()) {
const QueueIndex current_queue_index = next_queue_index.DecrementBy(1u);
return current_queue_index;
- } else {
- return empty_queue_index();
}
+ return QueueIndex::Invalid();
+}
+
+size_t LocklessQueueSize(const LocklessQueueMemory *memory) {
+ return memory->queue_size();
+}
+
+size_t LocklessQueueMessageDataSize(const LocklessQueueMemory *memory) {
+ return memory->message_data_size();
}
namespace {
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index afa7ced..3cd3726 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -134,40 +134,72 @@
// is done before the watcher goes RT), but needs to be RT for the sender.
struct LocklessQueueMemory;
+// Returns the size of the LocklessQueueMemory.
+size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
+
// Initializes the queue memory. memory must be either a valid pointer to the
// queue datastructure, or must be zero initialized.
LocklessQueueMemory *InitializeLocklessQueueMemory(
LocklessQueueMemory *memory, LocklessQueueConfiguration config);
-// Returns the size of the LocklessQueueMemory.
-size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
-
-// Prints to stdout the data inside the queue for debugging.
-void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
-
const static unsigned int kWakeupSignal = SIGRTMIN + 2;
-// Class to manage sending and receiving data in the lockless queue. This is
-// separate from the actual memory backing the queue so that memory can be
-// managed with mmap to share across the process boundary.
+// A convenient wrapper for accessing a lockless queue.
class LocklessQueue {
public:
- LocklessQueue(LocklessQueueMemory *memory, LocklessQueueConfiguration config);
- LocklessQueue(const LocklessQueue &) = delete;
- LocklessQueue &operator=(const LocklessQueue &) = delete;
+ LocklessQueue(const LocklessQueueMemory *const_memory,
+ LocklessQueueMemory *memory, LocklessQueueConfiguration config)
+ : const_memory_(const_memory), memory_(memory), config_(config) {}
- ~LocklessQueue();
+ void Initialize();
- // Returns the number of messages in the queue.
- size_t QueueSize() const;
+ LocklessQueueConfiguration config() const { return config_; }
- size_t message_data_size() const;
+ const LocklessQueueMemory *const_memory() { return const_memory_; }
+ LocklessQueueMemory *memory() { return memory_; }
- // Registers this thread to receive the kWakeupSignal signal when Wakeup is
- // called. Returns false if there was an error in registration.
- bool RegisterWakeup(int priority);
- // Unregisters the wakeup.
- void UnregisterWakeup();
+ private:
+ const LocklessQueueMemory *const_memory_;
+ LocklessQueueMemory *memory_;
+ LocklessQueueConfiguration config_;
+};
+
+class LocklessQueueWatcher {
+ public:
+ LocklessQueueWatcher(const LocklessQueueWatcher &) = delete;
+ LocklessQueueWatcher &operator=(const LocklessQueueWatcher &) = delete;
+ LocklessQueueWatcher(LocklessQueueWatcher &&other)
+ : memory_(other.memory_), watcher_index_(other.watcher_index_) {
+ other.watcher_index_ = -1;
+ }
+ LocklessQueueWatcher &operator=(LocklessQueueWatcher &&other) {
+ std::swap(memory_, other.memory_);
+ std::swap(watcher_index_, other.watcher_index_);
+ return *this;
+ }
+
+ ~LocklessQueueWatcher();
+
+ // Registers this thread to receive the kWakeupSignal signal when
+ // LocklessQueueWakeUpper::Wakeup is called. Returns nullopt if there was an
+ // error in registration.
+ // TODO(austin): Change the API if we find ourselves with more errors.
+ static std::optional<LocklessQueueWatcher> Make(LocklessQueue queue,
+ int priority);
+
+ private:
+ LocklessQueueWatcher(LocklessQueueMemory *memory, int priority);
+
+ LocklessQueueMemory *memory_ = nullptr;
+
+ // Index in the watcher list that our entry is, or -1 if no watcher is
+ // registered.
+ int watcher_index_ = -1;
+};
+
+class LocklessQueueWakeUpper {
+ public:
+ LocklessQueueWakeUpper(LocklessQueue queue);
// Sends the kWakeupSignal to all threads which have called RegisterWakeup.
//
@@ -175,169 +207,7 @@
// if nonrt.
int Wakeup(int current_priority);
- // If you ask for a queue index 2 past the newest, you will still get
- // NOTHING_NEW until that gets overwritten with new data. If you ask for an
- // element newer than QueueSize() from the current message, we consider it
- // behind by a large amount and return TOO_OLD. If the message is modified
- // out from underneath us as we read it, return OVERWROTE.
- //
- // data may be nullptr to indicate the data should not be copied.
- enum class ReadResult { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
- ReadResult Read(uint32_t queue_index,
- ::aos::monotonic_clock::time_point *monotonic_sent_time,
- ::aos::realtime_clock::time_point *realtime_sent_time,
- ::aos::monotonic_clock::time_point *monotonic_remote_time,
- ::aos::realtime_clock::time_point *realtime_remote_time,
- uint32_t *remote_queue_index, size_t *length, char *data);
-
- // Returns the index to the latest queue message. Returns empty_queue_index()
- // if there are no messages in the queue. Do note that this index wraps if
- // more than 2^32 messages are sent.
- QueueIndex LatestQueueIndex();
- static QueueIndex empty_queue_index() { return QueueIndex::Invalid(); }
-
- // Returns the size of the queue. This is mostly useful for manipulating
- // QueueIndex.
- size_t queue_size() const;
-
- // TODO(austin): Return the oldest queue index. This lets us catch up nicely
- // if we got behind.
- // The easiest way to implement this is likely going to be to reserve the
- // first modulo of values for the initial time around, and never reuse them.
- // That lets us do a simple atomic read of the next index and deduce what has
- // happened. It will involve the simplest atomic operations.
-
- // TODO(austin): Make it so we can find the indices which were sent just
- // before and after a time with a binary search.
-
- // Sender for blocks of data. The resources associated with a sender are
- // scoped to this object's lifetime.
- class Sender {
- public:
- Sender(const Sender &) = delete;
- Sender &operator=(const Sender &) = delete;
- Sender(Sender &&other)
- : memory_(other.memory_), sender_index_(other.sender_index_) {
- other.memory_ = nullptr;
- other.sender_index_ = -1;
- }
- Sender &operator=(Sender &&other) {
- memory_ = other.memory_;
- sender_index_ = other.sender_index_;
- other.memory_ = nullptr;
- other.sender_index_ = -1;
- return *this;
- }
-
- ~Sender();
-
- // Sends a message without copying the data.
- // Copy at most size() bytes of data into the memory pointed to by Data(),
- // and then call Send().
- // Note: calls to Data() are expensive enough that you should cache it.
- size_t size();
- void *Data();
- void Send(size_t length,
- aos::monotonic_clock::time_point monotonic_remote_time =
- aos::monotonic_clock::min_time,
- aos::realtime_clock::time_point realtime_remote_time =
- aos::realtime_clock::min_time,
- uint32_t remote_queue_index = 0xffffffff,
- aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
- aos::realtime_clock::time_point *realtime_sent_time = nullptr,
- uint32_t *queue_index = nullptr);
-
- // Sends up to length data. Does not wakeup the target.
- void Send(const char *data, size_t length,
- aos::monotonic_clock::time_point monotonic_remote_time =
- aos::monotonic_clock::min_time,
- aos::realtime_clock::time_point realtime_remote_time =
- aos::realtime_clock::min_time,
- uint32_t remote_queue_index = 0xffffffff,
- aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
- aos::realtime_clock::time_point *realtime_sent_time = nullptr,
- uint32_t *queue_index = nullptr);
-
- int buffer_index() const;
-
- private:
- friend class LocklessQueue;
-
- Sender(LocklessQueueMemory *memory);
-
- // Returns true if this sender is valid. If it isn't valid, any of the
- // other methods won't work. This is here to allow the lockless queue to
- // only build a sender if there was one available.
- bool valid() const { return sender_index_ != -1 && memory_ != nullptr; }
-
- // Pointer to the backing memory.
- LocklessQueueMemory *memory_ = nullptr;
-
- // Index into the sender list.
- int sender_index_ = -1;
- };
-
- // Pinner for blocks of data. The resources associated with a pinner are
- // scoped to this object's lifetime.
- class Pinner {
- public:
- Pinner(const Pinner &) = delete;
- Pinner &operator=(const Pinner &) = delete;
- Pinner(Pinner &&other)
- : memory_(other.memory_), pinner_index_(other.pinner_index_) {
- other.memory_ = nullptr;
- other.pinner_index_ = -1;
- }
- Pinner &operator=(Pinner &&other) {
- memory_ = other.memory_;
- pinner_index_ = other.pinner_index_;
- other.memory_ = nullptr;
- other.pinner_index_ = -1;
- return *this;
- }
-
- ~Pinner();
-
- // Attempts to pin the message at queue_index.
- // Un-pins the previous message.
- // Returns the buffer index (non-negative) if it succeeds.
- // Returns -1 if that message is no longer in the queue.
- int PinIndex(uint32_t queue_index);
-
- // Read at most size() bytes of data into the memory pointed to by Data().
- // Note: calls to Data() are expensive enough that you should cache it.
- // Don't call Data() before a successful PinIndex call.
- size_t size() const;
- const void *Data() const;
-
- private:
- friend class LocklessQueue;
-
- Pinner(LocklessQueueMemory *memory);
-
- // Returns true if this pinner is valid. If it isn't valid, any of the
- // other methods won't work. This is here to allow the lockless queue to
- // only build a pinner if there was one available.
- bool valid() const { return pinner_index_ != -1 && memory_ != nullptr; }
-
- // Pointer to the backing memory.
- LocklessQueueMemory *memory_ = nullptr;
-
- // Index into the pinner list.
- int pinner_index_ = -1;
- };
-
- // Creates a sender. If we couldn't allocate a sender, returns nullopt.
- // TODO(austin): Change the API if we find ourselves with more errors.
- std::optional<Sender> MakeSender();
-
- // Creates a pinner. If we couldn't allocate a pinner, returns nullopt.
- // TODO(austin): Change the API if we find ourselves with more errors.
- std::optional<Pinner> MakePinner();
-
private:
- LocklessQueueMemory *memory_ = nullptr;
-
// Memory and datastructure used to sort a list of watchers to wake
// up. This isn't a copy of Watcher since tid is simpler to work with here
// than the futex above.
@@ -346,17 +216,176 @@
pid_t pid;
int priority;
};
- // TODO(austin): Don't allocate this memory if we aren't going to send.
- ::std::vector<WatcherCopy> watcher_copy_;
- // Index in the watcher list that our entry is, or -1 if no watcher is
- // registered.
- int watcher_index_ = -1;
-
+ const LocklessQueueMemory *const memory_;
const int pid_;
const uid_t uid_;
+
+ ::std::vector<WatcherCopy> watcher_copy_;
};
+// Sender for blocks of data. The resources associated with a sender are
+// scoped to this object's lifetime.
+class LocklessQueueSender {
+ public:
+ LocklessQueueSender(const LocklessQueueSender &) = delete;
+ LocklessQueueSender &operator=(const LocklessQueueSender &) = delete;
+ LocklessQueueSender(LocklessQueueSender &&other)
+ : memory_(other.memory_), sender_index_(other.sender_index_) {
+ other.memory_ = nullptr;
+ other.sender_index_ = -1;
+ }
+ LocklessQueueSender &operator=(LocklessQueueSender &&other) {
+ std::swap(memory_, other.memory_);
+ std::swap(sender_index_, other.sender_index_);
+ return *this;
+ }
+
+ ~LocklessQueueSender();
+
+ // Creates a sender. If we couldn't allocate a sender, returns nullopt.
+ // TODO(austin): Change the API if we find ourselves with more errors.
+ static std::optional<LocklessQueueSender> Make(LocklessQueue queue);
+
+ // Sends a message without copying the data.
+ // Copy at most size() bytes of data into the memory pointed to by Data(),
+ // and then call Send().
+ // Note: calls to Data() are expensive enough that you should cache it.
+ size_t size() const;
+ void *Data();
+ void Send(size_t length,
+ aos::monotonic_clock::time_point monotonic_remote_time =
+ aos::monotonic_clock::min_time,
+ aos::realtime_clock::time_point realtime_remote_time =
+ aos::realtime_clock::min_time,
+ uint32_t remote_queue_index = 0xffffffff,
+ aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
+ aos::realtime_clock::time_point *realtime_sent_time = nullptr,
+ uint32_t *queue_index = nullptr);
+
+ // Sends up to length data. Does not wakeup the target.
+ void Send(const char *data, size_t length,
+ aos::monotonic_clock::time_point monotonic_remote_time =
+ aos::monotonic_clock::min_time,
+ aos::realtime_clock::time_point realtime_remote_time =
+ aos::realtime_clock::min_time,
+ uint32_t remote_queue_index = 0xffffffff,
+ aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
+ aos::realtime_clock::time_point *realtime_sent_time = nullptr,
+ uint32_t *queue_index = nullptr);
+
+ int buffer_index() const;
+
+ private:
+ LocklessQueueSender(LocklessQueueMemory *memory);
+
+ // Pointer to the backing memory.
+ LocklessQueueMemory *memory_ = nullptr;
+
+ // Index into the sender list.
+ int sender_index_ = -1;
+};
+
+// Pinner for blocks of data. The resources associated with a pinner are
+// scoped to this object's lifetime.
+class LocklessQueuePinner {
+ public:
+ LocklessQueuePinner(const LocklessQueuePinner &) = delete;
+ LocklessQueuePinner &operator=(const LocklessQueuePinner &) = delete;
+ LocklessQueuePinner(LocklessQueuePinner &&other)
+ : memory_(other.memory_),
+ const_memory_(other.const_memory_),
+ pinner_index_(other.pinner_index_) {
+ other.pinner_index_ = -1;
+ }
+ LocklessQueuePinner &operator=(LocklessQueuePinner &&other) {
+ std::swap(memory_, other.memory_);
+ std::swap(const_memory_, other.const_memory_);
+ std::swap(pinner_index_, other.pinner_index_);
+ return *this;
+ }
+
+ ~LocklessQueuePinner();
+
+ // Creates a pinner. If we couldn't allocate a pinner, returns nullopt.
+ // TODO(austin): Change the API if we find ourselves with more errors.
+ static std::optional<LocklessQueuePinner> Make(LocklessQueue queue);
+
+ // Attempts to pin the message at queue_index.
+ // Un-pins the previous message.
+ // Returns the buffer index (non-negative) if it succeeds.
+ // Returns -1 if that message is no longer in the queue.
+ int PinIndex(uint32_t queue_index);
+
+ // Read at most size() bytes of data into the memory pointed to by Data().
+ // Note: calls to Data() are expensive enough that you should cache it.
+ // Don't call Data() before a successful PinIndex call.
+ size_t size() const;
+ const void *Data() const;
+
+ private:
+ LocklessQueuePinner(LocklessQueueMemory *memory,
+ const LocklessQueueMemory *const_memory);
+
+ // Pointer to the backing memory.
+ LocklessQueueMemory *memory_ = nullptr;
+ const LocklessQueueMemory *const_memory_ = nullptr;
+
+ // Index into the pinner list.
+ int pinner_index_ = -1;
+};
+
+class LocklessQueueReader {
+ public:
+ enum class Result { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
+
+ LocklessQueueReader(LocklessQueue queue) : memory_(queue.const_memory()) {
+ queue.Initialize();
+ }
+
+ // If you ask for a queue index 2 past the newest, you will still get
+ // NOTHING_NEW until that gets overwritten with new data. If you ask for an
+ // element newer than QueueSize() from the current message, we consider it
+ // behind by a large amount and return TOO_OLD. If the message is modified
+ // out from underneath us as we read it, return OVERWROTE.
+ //
+ // data may be nullptr to indicate the data should not be copied.
+ Result Read(uint32_t queue_index,
+ ::aos::monotonic_clock::time_point *monotonic_sent_time,
+ ::aos::realtime_clock::time_point *realtime_sent_time,
+ ::aos::monotonic_clock::time_point *monotonic_remote_time,
+ ::aos::realtime_clock::time_point *realtime_remote_time,
+ uint32_t *remote_queue_index, size_t *length, char *data) const;
+
+ // Returns the index to the latest queue message. Returns empty_queue_index()
+ // if there are no messages in the queue. Do note that this index wraps if
+ // more than 2^32 messages are sent.
+ QueueIndex LatestIndex() const;
+
+ private:
+ const LocklessQueueMemory *const memory_;
+};
+
+// Returns the number of messages which are logically in the queue at a time.
+size_t LocklessQueueSize(const LocklessQueueMemory *memory);
+
+// Returns the number of bytes queue users are allowed to read/write within each
+// message.
+size_t LocklessQueueMessageDataSize(const LocklessQueueMemory *memory);
+
+// TODO(austin): Return the oldest queue index. This lets us catch up nicely
+// if we got behind.
+// The easiest way to implement this is likely going to be to reserve the
+// first modulo of values for the initial time around, and never reuse them.
+// That lets us do a simple atomic read of the next index and deduce what has
+// happened. It will involve the simplest atomic operations.
+
+// TODO(austin): Make it so we can find the indices which were sent just
+// before and after a time with a binary search.
+
+// Prints to stdout the data inside the queue for debugging.
+void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
+
} // namespace ipc_lib
} // namespace aos
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index a8a2429..3d8812d 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -511,18 +511,21 @@
config,
[config, tid](void *memory) {
// Initialize the queue and grab the tid.
- LocklessQueue queue(
+ LocklessQueue(
reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
- config);
+ reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
+ config)
+ .Initialize();
*tid = gettid();
},
[config](void *memory) {
- // Now try to write 2 messages. We will get killed a bunch as this
- // tries to happen.
LocklessQueue queue(
reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
+ reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
config);
- LocklessQueue::Sender sender = queue.MakeSender().value();
+ // Now try to write 2 messages. We will get killed a bunch as this
+ // tries to happen.
+ LocklessQueueSender sender = LocklessQueueSender::Make(queue).value();
for (int i = 0; i < 2; ++i) {
char data[100];
size_t s = snprintf(data, sizeof(data), "foobar%d", i + 1);
@@ -530,10 +533,11 @@
}
},
[config, tid](void *raw_memory) {
+ ::aos::ipc_lib::LocklessQueueMemory *const memory =
+ reinterpret_cast<::aos::ipc_lib::LocklessQueueMemory *>(raw_memory);
// Confirm that we can create 2 senders (the number in the queue), and
// send a message. And that all the messages in the queue are valid.
- ::aos::ipc_lib::LocklessQueueMemory *memory =
- reinterpret_cast<::aos::ipc_lib::LocklessQueueMemory *>(raw_memory);
+ LocklessQueue queue(memory, memory, config);
bool print = false;
@@ -555,9 +559,10 @@
PrintLocklessQueueMemory(memory);
}
- LocklessQueue queue(memory, config);
// Building and destroying a sender will clean up the queue.
- { LocklessQueue::Sender sender = queue.MakeSender().value(); }
+ {
+ LocklessQueueSender sender = LocklessQueueSender::Make(queue).value();
+ }
if (print) {
printf("Cleaned up version:\n");
@@ -565,12 +570,11 @@
}
{
- LocklessQueue::Sender sender = queue.MakeSender().value();
+ LocklessQueueSender sender = LocklessQueueSender::Make(queue).value();
{
// Make a second sender to confirm that the slot was freed.
// If the sender doesn't get cleaned up, this will fail.
- LocklessQueue queue2(memory, config);
- queue2.MakeSender().value();
+ LocklessQueueSender::Make(queue).value();
}
// Send a message to make sure that the queue still works.
@@ -579,6 +583,8 @@
sender.Send(data, s + 1);
}
+ LocklessQueueReader reader(queue);
+
// Now loop through the queue and make sure the number in the snprintf
// increments.
char last_data = '0';
@@ -592,19 +598,21 @@
char read_data[1024];
size_t length;
- LocklessQueue::ReadResult read_result =
- queue.Read(i, &monotonic_sent_time, &realtime_sent_time,
- &monotonic_remote_time, &realtime_remote_time,
- &remote_queue_index, &length, &(read_data[0]));
+ LocklessQueueReader::Result read_result =
+ reader.Read(i, &monotonic_sent_time, &realtime_sent_time,
+ &monotonic_remote_time, &realtime_remote_time,
+ &remote_queue_index, &length, &(read_data[0]));
- if (read_result != LocklessQueue::ReadResult::GOOD) {
+ if (read_result != LocklessQueueReader::Result::GOOD) {
break;
}
- EXPECT_GT(read_data[queue.message_data_size() - length + 6],
- last_data)
+ EXPECT_GT(
+ read_data[LocklessQueueMessageDataSize(memory) - length + 6],
+ last_data)
<< ": Got " << read_data;
- last_data = read_data[queue.message_data_size() - length + 6];
+ last_data =
+ read_data[LocklessQueueMessageDataSize(memory) - length + 6];
++i;
}
diff --git a/aos/ipc_lib/lockless_queue_memory.h b/aos/ipc_lib/lockless_queue_memory.h
index aa9b2e1..bb995b9 100644
--- a/aos/ipc_lib/lockless_queue_memory.h
+++ b/aos/ipc_lib/lockless_queue_memory.h
@@ -75,27 +75,27 @@
alignas(kDataAlignment) char data[];
// Memory size functions for all 4 lists.
- size_t SizeOfQueue() { return SizeOfQueue(config); }
+ size_t SizeOfQueue() const { return SizeOfQueue(config); }
static size_t SizeOfQueue(LocklessQueueConfiguration config) {
return AlignmentRoundUp(sizeof(AtomicIndex) * config.queue_size);
}
- size_t SizeOfMessages() { return SizeOfMessages(config); }
+ size_t SizeOfMessages() const { return SizeOfMessages(config); }
static size_t SizeOfMessages(LocklessQueueConfiguration config) {
return AlignmentRoundUp(config.message_size() * config.num_messages());
}
- size_t SizeOfWatchers() { return SizeOfWatchers(config); }
+ size_t SizeOfWatchers() const { return SizeOfWatchers(config); }
static size_t SizeOfWatchers(LocklessQueueConfiguration config) {
return AlignmentRoundUp(sizeof(Watcher) * config.num_watchers);
}
- size_t SizeOfSenders() { return SizeOfSenders(config); }
+ size_t SizeOfSenders() const { return SizeOfSenders(config); }
static size_t SizeOfSenders(LocklessQueueConfiguration config) {
return AlignmentRoundUp(sizeof(Sender) * config.num_senders);
}
- size_t SizeOfPinners() { return SizeOfPinners(config); }
+ size_t SizeOfPinners() const { return SizeOfPinners(config); }
static size_t SizeOfPinners(LocklessQueueConfiguration config) {
return AlignmentRoundUp(sizeof(Pinner) * config.num_pinners);
}
@@ -110,6 +110,14 @@
SizeOfSenders() + pinner_index * sizeof(Pinner));
}
+ const Pinner *GetPinner(size_t pinner_index) const {
+ static_assert(alignof(const Pinner) <= kDataAlignment,
+ "kDataAlignment is too small");
+ return reinterpret_cast<const Pinner *>(
+ &data[0] + SizeOfQueue() + SizeOfMessages() + SizeOfWatchers() +
+ SizeOfSenders() + pinner_index * sizeof(Pinner));
+ }
+
Sender *GetSender(size_t sender_index) {
static_assert(alignof(Sender) <= kDataAlignment,
"kDataAlignment is too small");
@@ -126,12 +134,26 @@
watcher_index * sizeof(Watcher));
}
+ const Watcher *GetWatcher(size_t watcher_index) const {
+ static_assert(alignof(const Watcher) <= kDataAlignment,
+ "kDataAlignment is too small");
+ return reinterpret_cast<const Watcher *>(&data[0] + SizeOfQueue() +
+ SizeOfMessages() +
+ watcher_index * sizeof(Watcher));
+ }
+
AtomicIndex *GetQueue(uint32_t index) {
static_assert(alignof(AtomicIndex) <= kDataAlignment,
"kDataAlignment is too small");
return reinterpret_cast<AtomicIndex *>(&data[0] +
sizeof(AtomicIndex) * index);
}
+ const AtomicIndex *GetQueue(uint32_t index) const {
+ static_assert(alignof(const AtomicIndex) <= kDataAlignment,
+ "kDataAlignment is too small");
+ return reinterpret_cast<const AtomicIndex *>(&data[0] +
+ sizeof(AtomicIndex) * index);
+ }
// There are num_messages() messages. The free list is really the
// sender+pinner list, since those are messages available to be filled in.
@@ -142,12 +164,19 @@
return reinterpret_cast<Message *>(&data[0] + SizeOfQueue() +
index.message_index() * message_size());
}
+ const Message *GetMessage(Index index) const {
+ static_assert(alignof(const Message) <= kDataAlignment,
+ "kDataAlignment is too small");
+ return reinterpret_cast<const Message *>(
+ &data[0] + SizeOfQueue() + index.message_index() * message_size());
+ }
// Helpers to fetch messages from the queue.
- Index LoadIndex(QueueIndex index) {
+ Index LoadIndex(QueueIndex index) const {
return GetQueue(index.Wrapped())->Load();
}
- Message *GetMessage(QueueIndex index) {
+ Message *GetMessage(QueueIndex index) { return GetMessage(LoadIndex(index)); }
+ const Message *GetMessage(QueueIndex index) const {
return GetMessage(LoadIndex(index));
}
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index 4e84b9f..e1e2516 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -56,15 +56,16 @@
Reset();
}
- LocklessQueueMemory *get_memory() {
- return reinterpret_cast<LocklessQueueMemory *>(&(memory_[0]));
+ LocklessQueue queue() {
+ return LocklessQueue(reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
+ reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
+ config_);
}
- void Reset() { memset(get_memory(), 0, LocklessQueueMemorySize(config_)); }
+ void Reset() { memset(&memory_[0], 0, LocklessQueueMemorySize(config_)); }
// Runs until the signal is received.
void RunUntilWakeup(Event *ready, int priority) {
- LocklessQueue queue(get_memory(), config_);
internal::EPoll epoll;
SignalFd signalfd({kWakeupSignal});
@@ -75,16 +76,18 @@
epoll.Quit();
});
- // Register to be woken up *after* the signalfd is catching the signals.
- queue.RegisterWakeup(priority);
+ {
+ // Register to be woken up *after* the signalfd is catching the signals.
+ LocklessQueueWatcher watcher =
+ LocklessQueueWatcher::Make(queue(), priority).value();
- // And signal we are now ready.
- ready->Set();
+ // And signal we are now ready.
+ ready->Set();
- epoll.Run();
+ epoll.Run();
- // Cleanup.
- queue.UnregisterWakeup();
+ // Cleanup, ensuring the watcher is destroyed before the signalfd.
+ }
epoll.DeleteFd(signalfd.fd());
}
@@ -99,36 +102,35 @@
// Tests that wakeup doesn't do anything if nothing was registered.
TEST_F(LocklessQueueTest, NoWatcherWakeup) {
- LocklessQueue queue(get_memory(), config_);
+ LocklessQueueWakeUpper wake_upper(queue());
- EXPECT_EQ(queue.Wakeup(7), 0);
+ EXPECT_EQ(wake_upper.Wakeup(7), 0);
}
// Tests that wakeup doesn't do anything if a wakeup was registered and then
// unregistered.
TEST_F(LocklessQueueTest, UnregisteredWatcherWakeup) {
- LocklessQueue queue(get_memory(), config_);
+ LocklessQueueWakeUpper wake_upper(queue());
- queue.RegisterWakeup(5);
- queue.UnregisterWakeup();
+ { LocklessQueueWatcher::Make(queue(), 5).value(); }
- EXPECT_EQ(queue.Wakeup(7), 0);
+ EXPECT_EQ(wake_upper.Wakeup(7), 0);
}
// Tests that wakeup doesn't do anything if the thread dies.
TEST_F(LocklessQueueTest, DiedWatcherWakeup) {
- LocklessQueue queue(get_memory(), config_);
+ LocklessQueueWakeUpper wake_upper(queue());
::std::thread([this]() {
// Use placement new so the destructor doesn't get run.
- ::std::aligned_storage<sizeof(LocklessQueue), alignof(LocklessQueue)>::type
- data;
- LocklessQueue *q = new (&data) LocklessQueue(get_memory(), config_);
- // Register a wakeup.
- q->RegisterWakeup(5);
- }).join();
+ ::std::aligned_storage<sizeof(LocklessQueueWatcher),
+ alignof(LocklessQueueWatcher)>::type data;
+ new (&data)
+ LocklessQueueWatcher(LocklessQueueWatcher::Make(queue(), 5).value());
+ })
+ .join();
- EXPECT_EQ(queue.Wakeup(7), 0);
+ EXPECT_EQ(wake_upper.Wakeup(7), 0);
}
struct WatcherState {
@@ -155,16 +157,13 @@
WatcherState *s = &queues.back();
queues.back().t = ::std::thread([this, &cleanup, s]() {
- LocklessQueue q(get_memory(), config_);
- EXPECT_TRUE(q.RegisterWakeup(0));
+ LocklessQueueWatcher q = LocklessQueueWatcher::Make(queue(), 0).value();
// Signal that this thread is ready.
s->ready.Set();
// And wait until we are asked to shut down.
cleanup.Wait();
-
- q.UnregisterWakeup();
});
}
@@ -174,12 +173,9 @@
}
// Now try to allocate another one. This will fail.
- {
- LocklessQueue queue(get_memory(), config_);
- EXPECT_FALSE(queue.RegisterWakeup(0));
- }
+ EXPECT_FALSE(LocklessQueueWatcher::Make(queue(), 0));
- // Trigger the threads to cleanup their resources, and wait unti they are
+ // Trigger the threads to cleanup their resources, and wait until they are
// done.
cleanup.Set();
for (WatcherState &w : queues) {
@@ -187,23 +183,16 @@
}
// We should now be able to allocate a wakeup.
- {
- LocklessQueue queue(get_memory(), config_);
- EXPECT_TRUE(queue.RegisterWakeup(0));
- queue.UnregisterWakeup();
- }
+ EXPECT_TRUE(LocklessQueueWatcher::Make(queue(), 0));
}
// Tests that too many watchers dies like expected.
TEST_F(LocklessQueueTest, TooManySenders) {
- ::std::vector<::std::unique_ptr<LocklessQueue>> queues;
- ::std::vector<LocklessQueue::Sender> senders;
+ ::std::vector<LocklessQueueSender> senders;
for (size_t i = 0; i < config_.num_senders; ++i) {
- queues.emplace_back(new LocklessQueue(get_memory(), config_));
- senders.emplace_back(queues.back()->MakeSender().value());
+ senders.emplace_back(LocklessQueueSender::Make(queue()).value());
}
- queues.emplace_back(new LocklessQueue(get_memory(), config_));
- EXPECT_FALSE(queues.back()->MakeSender());
+ EXPECT_FALSE(LocklessQueueSender::Make(queue()));
}
// Now, start 2 threads and have them receive the signals.
@@ -212,7 +201,7 @@
EXPECT_LE(kWakeupSignal, SIGRTMAX);
EXPECT_GE(kWakeupSignal, SIGRTMIN);
- LocklessQueue queue(get_memory(), config_);
+ LocklessQueueWakeUpper wake_upper(queue());
// Event used to make sure the thread is ready before the test starts.
Event ready1;
@@ -225,7 +214,7 @@
ready1.Wait();
ready2.Wait();
- EXPECT_EQ(queue.Wakeup(3), 2);
+ EXPECT_EQ(wake_upper.Wakeup(3), 2);
t1.join();
t2.join();
@@ -237,15 +226,14 @@
// Do a simple send test.
TEST_F(LocklessQueueTest, Send) {
- LocklessQueue queue(get_memory(), config_);
-
- LocklessQueue::Sender sender = queue.MakeSender().value();
+ LocklessQueueSender sender = LocklessQueueSender::Make(queue()).value();
+ LocklessQueueReader reader(queue());
// Send enough messages to wrap.
for (int i = 0; i < 20000; ++i) {
// Confirm that the queue index makes sense given the number of sends.
- EXPECT_EQ(queue.LatestQueueIndex().index(),
- i == 0 ? LocklessQueue::empty_queue_index().index() : i - 1);
+ EXPECT_EQ(reader.LatestIndex().index(),
+ i == 0 ? QueueIndex::Invalid().index() : i - 1);
// Send a trivial piece of data.
char data[100];
@@ -254,7 +242,7 @@
// Confirm that the queue index still makes sense. This is easier since the
// empty case has been handled.
- EXPECT_EQ(queue.LatestQueueIndex().index(), i);
+ EXPECT_EQ(reader.LatestIndex().index(), i);
// Read a result from 5 in the past.
::aos::monotonic_clock::time_point monotonic_sent_time;
@@ -271,15 +259,15 @@
} else {
index = index.IncrementBy(i - 5);
}
- LocklessQueue::ReadResult read_result =
- queue.Read(index.index(), &monotonic_sent_time, &realtime_sent_time,
- &monotonic_remote_time, &realtime_remote_time,
- &remote_queue_index, &length, &(read_data[0]));
+ LocklessQueueReader::Result read_result =
+ reader.Read(index.index(), &monotonic_sent_time, &realtime_sent_time,
+ &monotonic_remote_time, &realtime_remote_time,
+ &remote_queue_index, &length, &(read_data[0]));
// This should either return GOOD, or TOO_OLD if it is before the start of
// the queue.
- if (read_result != LocklessQueue::ReadResult::GOOD) {
- EXPECT_EQ(read_result, LocklessQueue::ReadResult::TOO_OLD);
+ if (read_result != LocklessQueueReader::Result::GOOD) {
+ EXPECT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
}
}
}
@@ -295,9 +283,8 @@
const chrono::seconds print_frequency(FLAGS_print_rate);
- QueueRacer racer(get_memory(), FLAGS_thread_count, kNumMessages, config_);
- const monotonic_clock::time_point start_time =
- monotonic_clock::now();
+ QueueRacer racer(queue(), FLAGS_thread_count, kNumMessages);
+ const monotonic_clock::time_point start_time = monotonic_clock::now();
const monotonic_clock::time_point end_time =
start_time + chrono::seconds(FLAGS_duration);
@@ -333,7 +320,7 @@
// Send enough messages to wrap the 32 bit send counter.
TEST_F(LocklessQueueTest, WrappedSend) {
uint64_t kNumMessages = 0x100010000ul;
- QueueRacer racer(get_memory(), 1, kNumMessages, config_);
+ QueueRacer racer(queue(), 1, kNumMessages);
const monotonic_clock::time_point start_time = monotonic_clock::now();
EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index 69f5f21..fcc8668 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -24,19 +24,16 @@
uint64_t event_count = ::std::numeric_limits<uint64_t>::max();
};
-QueueRacer::QueueRacer(LocklessQueueMemory *memory, int num_threads,
- uint64_t num_messages, LocklessQueueConfiguration config)
- : memory_(memory),
- num_threads_(num_threads),
- num_messages_(num_messages),
- config_(config) {
+QueueRacer::QueueRacer(LocklessQueue queue, int num_threads,
+ uint64_t num_messages)
+ : queue_(queue), num_threads_(num_threads), num_messages_(num_messages) {
Reset();
}
void QueueRacer::RunIteration(bool race_reads, int write_wrap_count) {
const bool will_wrap = num_messages_ * num_threads_ *
static_cast<uint64_t>(1 + write_wrap_count) >
- config_.queue_size;
+ queue_.config().queue_size;
// Clear out shmem.
Reset();
@@ -52,13 +49,13 @@
::std::vector<ThreadState> threads(num_threads_);
::std::thread queue_index_racer([this, &poll_index]() {
- LocklessQueue queue(memory_, config_);
+ LocklessQueueReader reader(queue_);
// Track the number of times we wrap, and cache the modulo.
uint64_t wrap_count = 0;
uint32_t last_queue_index = 0;
const uint32_t max_queue_index =
- QueueIndex::MaxIndex(0xffffffffu, queue.QueueSize());
+ QueueIndex::MaxIndex(0xffffffffu, queue_.config().queue_size);
while (poll_index) {
// We want to read everything backwards. This will give us conservative
// bounds. And with enough time and randomness, we will see all the cases
@@ -81,16 +78,14 @@
//
// So, grab them in order.
const uint64_t finished_writes = finished_writes_.load();
- const QueueIndex latest_queue_index_queue_index =
- queue.LatestQueueIndex();
+ const QueueIndex latest_queue_index_queue_index = reader.LatestIndex();
const uint64_t started_writes = started_writes_.load();
const uint32_t latest_queue_index_uint32_t =
latest_queue_index_queue_index.index();
uint64_t latest_queue_index = latest_queue_index_uint32_t;
- if (latest_queue_index_queue_index !=
- LocklessQueue::empty_queue_index()) {
+ if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
// If we got smaller, we wrapped.
if (latest_queue_index_uint32_t < last_queue_index) {
++wrap_count;
@@ -107,22 +102,19 @@
// If we are at the beginning, the queue needs to always return empty.
if (started_writes == 0) {
- EXPECT_EQ(latest_queue_index_queue_index,
- LocklessQueue::empty_queue_index());
+ EXPECT_EQ(latest_queue_index_queue_index, QueueIndex::Invalid());
EXPECT_EQ(finished_writes, 0);
} else {
if (finished_writes == 0) {
// Plausible to be at the beginning, in which case we don't have
// anything to check.
- if (latest_queue_index_queue_index !=
- LocklessQueue::empty_queue_index()) {
+ if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
// Otherwise, we have started. The queue can't have any more
// entries than this.
EXPECT_GE(started_writes, latest_queue_index + 1);
}
} else {
- EXPECT_NE(latest_queue_index_queue_index,
- LocklessQueue::empty_queue_index());
+ EXPECT_NE(latest_queue_index_queue_index, QueueIndex::Invalid());
// latest_queue_index is an index, not a count. So it always reads 1
// low.
EXPECT_GE(latest_queue_index + 1, finished_writes);
@@ -142,8 +134,7 @@
t.thread = ::std::thread([this, &t, thread_index, &run,
write_wrap_count]() {
// Build up a sender.
- LocklessQueue queue(memory_, config_);
- LocklessQueue::Sender sender = queue.MakeSender().value();
+ LocklessQueueSender sender = LocklessQueueSender::Make(queue_).value();
CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
// Signal that we are ready to start sending.
@@ -255,17 +246,16 @@
void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
::std::vector<ThreadState> *threads) {
// Now read back the results to double check.
- LocklessQueue queue(memory_, config_);
-
- const bool will_wrap =
- num_messages_ * num_threads_ * (1 + write_wrap_count) > queue.QueueSize();
+ LocklessQueueReader reader(queue_);
+ const bool will_wrap = num_messages_ * num_threads_ * (1 + write_wrap_count) >
+ LocklessQueueSize(queue_.memory());
monotonic_clock::time_point last_monotonic_sent_time =
monotonic_clock::epoch();
uint64_t initial_i = 0;
if (will_wrap) {
initial_i = (1 + write_wrap_count) * num_messages_ * num_threads_ -
- queue.QueueSize();
+ LocklessQueueSize(queue_.memory());
}
for (uint64_t i = initial_i;
@@ -279,27 +269,28 @@
char read_data[1024];
// Handle overflowing the message count for the wrap test.
- const uint32_t wrapped_i = i % static_cast<size_t>(QueueIndex::MaxIndex(
- 0xffffffffu, queue.QueueSize()));
- LocklessQueue::ReadResult read_result =
- queue.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
- &monotonic_remote_time, &realtime_remote_time,
- &remote_queue_index, &length, &(read_data[0]));
+ const uint32_t wrapped_i =
+ i % static_cast<size_t>(QueueIndex::MaxIndex(
+ 0xffffffffu, LocklessQueueSize(queue_.memory())));
+ LocklessQueueReader::Result read_result =
+ reader.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
+ &monotonic_remote_time, &realtime_remote_time,
+ &remote_queue_index, &length, &(read_data[0]));
if (race_reads) {
- if (read_result == LocklessQueue::ReadResult::NOTHING_NEW) {
+ if (read_result == LocklessQueueReader::Result::NOTHING_NEW) {
--i;
continue;
}
}
if (race_reads && will_wrap) {
- if (read_result == LocklessQueue::ReadResult::TOO_OLD) {
+ if (read_result == LocklessQueueReader::Result::TOO_OLD) {
continue;
}
}
// Every message should be good.
- ASSERT_EQ(read_result, LocklessQueue::ReadResult::GOOD) << ": i is " << i;
+ ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD) << ": i is " << i;
// And, confirm that time never went backwards.
ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
@@ -310,7 +301,8 @@
ThreadPlusCount tpc;
ASSERT_EQ(length, sizeof(ThreadPlusCount));
- memcpy(&tpc, read_data + queue.message_data_size() - length,
+ memcpy(&tpc,
+ read_data + LocklessQueueMessageDataSize(queue_.memory()) - length,
sizeof(ThreadPlusCount));
if (will_wrap) {
diff --git a/aos/ipc_lib/queue_racer.h b/aos/ipc_lib/queue_racer.h
index eaeedd4..7e92693 100644
--- a/aos/ipc_lib/queue_racer.h
+++ b/aos/ipc_lib/queue_racer.h
@@ -14,8 +14,7 @@
// them together to all write at once.
class QueueRacer {
public:
- QueueRacer(LocklessQueueMemory *memory, int num_threads,
- uint64_t num_messages, LocklessQueueConfiguration config);
+ QueueRacer(LocklessQueue queue, int num_threads, uint64_t num_messages);
// Runs an iteration of the race.
//
@@ -35,13 +34,14 @@
void RunIteration(bool race_reads, int write_wrap_count);
size_t CurrentIndex() {
- LocklessQueue queue(memory_, config_);
- return queue.LatestQueueIndex().index();
+ return LocklessQueueReader(queue_).LatestIndex().index();
}
private:
// Wipes the queue memory out so we get a clean start.
- void Reset() { memset(memory_, 0, LocklessQueueMemorySize(config_)); }
+ void Reset() {
+ memset(queue_.memory(), 0, LocklessQueueMemorySize(queue_.config()));
+ }
// This is a separate method so that when all the ASSERT_* methods, we still
// clean up all the threads. Otherwise we get an assert on the way out of
@@ -49,7 +49,7 @@
void CheckReads(bool race_reads, int write_wrap_count,
::std::vector<ThreadState> *threads);
- LocklessQueueMemory *memory_;
+ LocklessQueue queue_;
const uint64_t num_threads_;
const uint64_t num_messages_;
@@ -60,8 +60,6 @@
::std::atomic<uint64_t> started_writes_;
// Number of writes completed.
::std::atomic<uint64_t> finished_writes_;
-
- const LocklessQueueConfiguration config_;
};
} // namespace ipc_lib