Improve const-correctness in the LocklessQueue code
This will allow using a read-only mapping for reading data from queues
in ShmEventLoop. This will make it much harder for code reading from a
queue to accidentally modify it.
This involves splitting up LocklessQueue into individual components.
This makes it much more obvious what state is used where, and allows
adding some consts.
Change-Id: Ic83b0d2169e6dfae3eec656aa8e49852125698d9
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index afa7ced..3cd3726 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -134,40 +134,72 @@
// is done before the watcher goes RT), but needs to be RT for the sender.
struct LocklessQueueMemory;
+// Returns the size of the LocklessQueueMemory.
+size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
+
// Initializes the queue memory. memory must be either a valid pointer to the
// queue datastructure, or must be zero initialized.
LocklessQueueMemory *InitializeLocklessQueueMemory(
LocklessQueueMemory *memory, LocklessQueueConfiguration config);
-// Returns the size of the LocklessQueueMemory.
-size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
-
-// Prints to stdout the data inside the queue for debugging.
-void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
-
const static unsigned int kWakeupSignal = SIGRTMIN + 2;
-// Class to manage sending and receiving data in the lockless queue. This is
-// separate from the actual memory backing the queue so that memory can be
-// managed with mmap to share across the process boundary.
+// A convenient wrapper for accessing a lockless queue.
class LocklessQueue {
public:
- LocklessQueue(LocklessQueueMemory *memory, LocklessQueueConfiguration config);
- LocklessQueue(const LocklessQueue &) = delete;
- LocklessQueue &operator=(const LocklessQueue &) = delete;
+ LocklessQueue(const LocklessQueueMemory *const_memory,
+ LocklessQueueMemory *memory, LocklessQueueConfiguration config)
+ : const_memory_(const_memory), memory_(memory), config_(config) {}
- ~LocklessQueue();
+ void Initialize();
- // Returns the number of messages in the queue.
- size_t QueueSize() const;
+ LocklessQueueConfiguration config() const { return config_; }
- size_t message_data_size() const;
+ const LocklessQueueMemory *const_memory() { return const_memory_; }
+ LocklessQueueMemory *memory() { return memory_; }
- // Registers this thread to receive the kWakeupSignal signal when Wakeup is
- // called. Returns false if there was an error in registration.
- bool RegisterWakeup(int priority);
- // Unregisters the wakeup.
- void UnregisterWakeup();
+ private:
+ const LocklessQueueMemory *const_memory_;
+ LocklessQueueMemory *memory_;
+ LocklessQueueConfiguration config_;
+};
+
+class LocklessQueueWatcher {
+ public:
+ LocklessQueueWatcher(const LocklessQueueWatcher &) = delete;
+ LocklessQueueWatcher &operator=(const LocklessQueueWatcher &) = delete;
+ LocklessQueueWatcher(LocklessQueueWatcher &&other)
+ : memory_(other.memory_), watcher_index_(other.watcher_index_) {
+ other.watcher_index_ = -1;
+ }
+ LocklessQueueWatcher &operator=(LocklessQueueWatcher &&other) {
+ std::swap(memory_, other.memory_);
+ std::swap(watcher_index_, other.watcher_index_);
+ return *this;
+ }
+
+ ~LocklessQueueWatcher();
+
+ // Registers this thread to receive the kWakeupSignal signal when
+ // LocklessQueueWakeUpper::Wakeup is called. Returns nullopt if there was an
+ // error in registration.
+ // TODO(austin): Change the API if we find ourselves with more errors.
+ static std::optional<LocklessQueueWatcher> Make(LocklessQueue queue,
+ int priority);
+
+ private:
+ LocklessQueueWatcher(LocklessQueueMemory *memory, int priority);
+
+ LocklessQueueMemory *memory_ = nullptr;
+
+ // Index in the watcher list that our entry is, or -1 if no watcher is
+ // registered.
+ int watcher_index_ = -1;
+};
+
+class LocklessQueueWakeUpper {
+ public:
+ LocklessQueueWakeUpper(LocklessQueue queue);
// Sends the kWakeupSignal to all threads which have called RegisterWakeup.
//
@@ -175,169 +207,7 @@
// if nonrt.
int Wakeup(int current_priority);
- // If you ask for a queue index 2 past the newest, you will still get
- // NOTHING_NEW until that gets overwritten with new data. If you ask for an
- // element newer than QueueSize() from the current message, we consider it
- // behind by a large amount and return TOO_OLD. If the message is modified
- // out from underneath us as we read it, return OVERWROTE.
- //
- // data may be nullptr to indicate the data should not be copied.
- enum class ReadResult { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
- ReadResult Read(uint32_t queue_index,
- ::aos::monotonic_clock::time_point *monotonic_sent_time,
- ::aos::realtime_clock::time_point *realtime_sent_time,
- ::aos::monotonic_clock::time_point *monotonic_remote_time,
- ::aos::realtime_clock::time_point *realtime_remote_time,
- uint32_t *remote_queue_index, size_t *length, char *data);
-
- // Returns the index to the latest queue message. Returns empty_queue_index()
- // if there are no messages in the queue. Do note that this index wraps if
- // more than 2^32 messages are sent.
- QueueIndex LatestQueueIndex();
- static QueueIndex empty_queue_index() { return QueueIndex::Invalid(); }
-
- // Returns the size of the queue. This is mostly useful for manipulating
- // QueueIndex.
- size_t queue_size() const;
-
- // TODO(austin): Return the oldest queue index. This lets us catch up nicely
- // if we got behind.
- // The easiest way to implement this is likely going to be to reserve the
- // first modulo of values for the initial time around, and never reuse them.
- // That lets us do a simple atomic read of the next index and deduce what has
- // happened. It will involve the simplest atomic operations.
-
- // TODO(austin): Make it so we can find the indices which were sent just
- // before and after a time with a binary search.
-
- // Sender for blocks of data. The resources associated with a sender are
- // scoped to this object's lifetime.
- class Sender {
- public:
- Sender(const Sender &) = delete;
- Sender &operator=(const Sender &) = delete;
- Sender(Sender &&other)
- : memory_(other.memory_), sender_index_(other.sender_index_) {
- other.memory_ = nullptr;
- other.sender_index_ = -1;
- }
- Sender &operator=(Sender &&other) {
- memory_ = other.memory_;
- sender_index_ = other.sender_index_;
- other.memory_ = nullptr;
- other.sender_index_ = -1;
- return *this;
- }
-
- ~Sender();
-
- // Sends a message without copying the data.
- // Copy at most size() bytes of data into the memory pointed to by Data(),
- // and then call Send().
- // Note: calls to Data() are expensive enough that you should cache it.
- size_t size();
- void *Data();
- void Send(size_t length,
- aos::monotonic_clock::time_point monotonic_remote_time =
- aos::monotonic_clock::min_time,
- aos::realtime_clock::time_point realtime_remote_time =
- aos::realtime_clock::min_time,
- uint32_t remote_queue_index = 0xffffffff,
- aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
- aos::realtime_clock::time_point *realtime_sent_time = nullptr,
- uint32_t *queue_index = nullptr);
-
- // Sends up to length data. Does not wakeup the target.
- void Send(const char *data, size_t length,
- aos::monotonic_clock::time_point monotonic_remote_time =
- aos::monotonic_clock::min_time,
- aos::realtime_clock::time_point realtime_remote_time =
- aos::realtime_clock::min_time,
- uint32_t remote_queue_index = 0xffffffff,
- aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
- aos::realtime_clock::time_point *realtime_sent_time = nullptr,
- uint32_t *queue_index = nullptr);
-
- int buffer_index() const;
-
- private:
- friend class LocklessQueue;
-
- Sender(LocklessQueueMemory *memory);
-
- // Returns true if this sender is valid. If it isn't valid, any of the
- // other methods won't work. This is here to allow the lockless queue to
- // only build a sender if there was one available.
- bool valid() const { return sender_index_ != -1 && memory_ != nullptr; }
-
- // Pointer to the backing memory.
- LocklessQueueMemory *memory_ = nullptr;
-
- // Index into the sender list.
- int sender_index_ = -1;
- };
-
- // Pinner for blocks of data. The resources associated with a pinner are
- // scoped to this object's lifetime.
- class Pinner {
- public:
- Pinner(const Pinner &) = delete;
- Pinner &operator=(const Pinner &) = delete;
- Pinner(Pinner &&other)
- : memory_(other.memory_), pinner_index_(other.pinner_index_) {
- other.memory_ = nullptr;
- other.pinner_index_ = -1;
- }
- Pinner &operator=(Pinner &&other) {
- memory_ = other.memory_;
- pinner_index_ = other.pinner_index_;
- other.memory_ = nullptr;
- other.pinner_index_ = -1;
- return *this;
- }
-
- ~Pinner();
-
- // Attempts to pin the message at queue_index.
- // Un-pins the previous message.
- // Returns the buffer index (non-negative) if it succeeds.
- // Returns -1 if that message is no longer in the queue.
- int PinIndex(uint32_t queue_index);
-
- // Read at most size() bytes of data into the memory pointed to by Data().
- // Note: calls to Data() are expensive enough that you should cache it.
- // Don't call Data() before a successful PinIndex call.
- size_t size() const;
- const void *Data() const;
-
- private:
- friend class LocklessQueue;
-
- Pinner(LocklessQueueMemory *memory);
-
- // Returns true if this pinner is valid. If it isn't valid, any of the
- // other methods won't work. This is here to allow the lockless queue to
- // only build a pinner if there was one available.
- bool valid() const { return pinner_index_ != -1 && memory_ != nullptr; }
-
- // Pointer to the backing memory.
- LocklessQueueMemory *memory_ = nullptr;
-
- // Index into the pinner list.
- int pinner_index_ = -1;
- };
-
- // Creates a sender. If we couldn't allocate a sender, returns nullopt.
- // TODO(austin): Change the API if we find ourselves with more errors.
- std::optional<Sender> MakeSender();
-
- // Creates a pinner. If we couldn't allocate a pinner, returns nullopt.
- // TODO(austin): Change the API if we find ourselves with more errors.
- std::optional<Pinner> MakePinner();
-
private:
- LocklessQueueMemory *memory_ = nullptr;
-
// Memory and datastructure used to sort a list of watchers to wake
// up. This isn't a copy of Watcher since tid is simpler to work with here
// than the futex above.
@@ -346,17 +216,176 @@
pid_t pid;
int priority;
};
- // TODO(austin): Don't allocate this memory if we aren't going to send.
- ::std::vector<WatcherCopy> watcher_copy_;
- // Index in the watcher list that our entry is, or -1 if no watcher is
- // registered.
- int watcher_index_ = -1;
-
+ const LocklessQueueMemory *const memory_;
const int pid_;
const uid_t uid_;
+
+ ::std::vector<WatcherCopy> watcher_copy_;
};
+// Sender for blocks of data. The resources associated with a sender are
+// scoped to this object's lifetime.
+class LocklessQueueSender {
+ public:
+ LocklessQueueSender(const LocklessQueueSender &) = delete;
+ LocklessQueueSender &operator=(const LocklessQueueSender &) = delete;
+ LocklessQueueSender(LocklessQueueSender &&other)
+ : memory_(other.memory_), sender_index_(other.sender_index_) {
+ other.memory_ = nullptr;
+ other.sender_index_ = -1;
+ }
+ LocklessQueueSender &operator=(LocklessQueueSender &&other) {
+ std::swap(memory_, other.memory_);
+ std::swap(sender_index_, other.sender_index_);
+ return *this;
+ }
+
+ ~LocklessQueueSender();
+
+ // Creates a sender. If we couldn't allocate a sender, returns nullopt.
+ // TODO(austin): Change the API if we find ourselves with more errors.
+ static std::optional<LocklessQueueSender> Make(LocklessQueue queue);
+
+ // Sends a message without copying the data.
+ // Copy at most size() bytes of data into the memory pointed to by Data(),
+ // and then call Send().
+ // Note: calls to Data() are expensive enough that you should cache it.
+ size_t size() const;
+ void *Data();
+ void Send(size_t length,
+ aos::monotonic_clock::time_point monotonic_remote_time =
+ aos::monotonic_clock::min_time,
+ aos::realtime_clock::time_point realtime_remote_time =
+ aos::realtime_clock::min_time,
+ uint32_t remote_queue_index = 0xffffffff,
+ aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
+ aos::realtime_clock::time_point *realtime_sent_time = nullptr,
+ uint32_t *queue_index = nullptr);
+
+ // Sends up to length data. Does not wakeup the target.
+ void Send(const char *data, size_t length,
+ aos::monotonic_clock::time_point monotonic_remote_time =
+ aos::monotonic_clock::min_time,
+ aos::realtime_clock::time_point realtime_remote_time =
+ aos::realtime_clock::min_time,
+ uint32_t remote_queue_index = 0xffffffff,
+ aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
+ aos::realtime_clock::time_point *realtime_sent_time = nullptr,
+ uint32_t *queue_index = nullptr);
+
+ int buffer_index() const;
+
+ private:
+ LocklessQueueSender(LocklessQueueMemory *memory);
+
+ // Pointer to the backing memory.
+ LocklessQueueMemory *memory_ = nullptr;
+
+ // Index into the sender list.
+ int sender_index_ = -1;
+};
+
+// Pinner for blocks of data. The resources associated with a pinner are
+// scoped to this object's lifetime.
+class LocklessQueuePinner {
+ public:
+ LocklessQueuePinner(const LocklessQueuePinner &) = delete;
+ LocklessQueuePinner &operator=(const LocklessQueuePinner &) = delete;
+ LocklessQueuePinner(LocklessQueuePinner &&other)
+ : memory_(other.memory_),
+ const_memory_(other.const_memory_),
+ pinner_index_(other.pinner_index_) {
+ other.pinner_index_ = -1;
+ }
+ LocklessQueuePinner &operator=(LocklessQueuePinner &&other) {
+ std::swap(memory_, other.memory_);
+ std::swap(const_memory_, other.const_memory_);
+ std::swap(pinner_index_, other.pinner_index_);
+ return *this;
+ }
+
+ ~LocklessQueuePinner();
+
+ // Creates a pinner. If we couldn't allocate a pinner, returns nullopt.
+ // TODO(austin): Change the API if we find ourselves with more errors.
+ static std::optional<LocklessQueuePinner> Make(LocklessQueue queue);
+
+ // Attempts to pin the message at queue_index.
+ // Un-pins the previous message.
+ // Returns the buffer index (non-negative) if it succeeds.
+ // Returns -1 if that message is no longer in the queue.
+ int PinIndex(uint32_t queue_index);
+
+ // Read at most size() bytes of data into the memory pointed to by Data().
+ // Note: calls to Data() are expensive enough that you should cache it.
+ // Don't call Data() before a successful PinIndex call.
+ size_t size() const;
+ const void *Data() const;
+
+ private:
+ LocklessQueuePinner(LocklessQueueMemory *memory,
+ const LocklessQueueMemory *const_memory);
+
+ // Pointer to the backing memory.
+ LocklessQueueMemory *memory_ = nullptr;
+ const LocklessQueueMemory *const_memory_ = nullptr;
+
+ // Index into the pinner list.
+ int pinner_index_ = -1;
+};
+
+class LocklessQueueReader {
+ public:
+ enum class Result { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
+
+ LocklessQueueReader(LocklessQueue queue) : memory_(queue.const_memory()) {
+ queue.Initialize();
+ }
+
+ // If you ask for a queue index 2 past the newest, you will still get
+ // NOTHING_NEW until that gets overwritten with new data. If you ask for an
+ // element newer than QueueSize() from the current message, we consider it
+ // behind by a large amount and return TOO_OLD. If the message is modified
+ // out from underneath us as we read it, return OVERWROTE.
+ //
+ // data may be nullptr to indicate the data should not be copied.
+ Result Read(uint32_t queue_index,
+ ::aos::monotonic_clock::time_point *monotonic_sent_time,
+ ::aos::realtime_clock::time_point *realtime_sent_time,
+ ::aos::monotonic_clock::time_point *monotonic_remote_time,
+ ::aos::realtime_clock::time_point *realtime_remote_time,
+ uint32_t *remote_queue_index, size_t *length, char *data) const;
+
+ // Returns the index to the latest queue message. Returns empty_queue_index()
+ // if there are no messages in the queue. Do note that this index wraps if
+ // more than 2^32 messages are sent.
+ QueueIndex LatestIndex() const;
+
+ private:
+ const LocklessQueueMemory *const memory_;
+};
+
+// Returns the number of messages which are logically in the queue at a time.
+size_t LocklessQueueSize(const LocklessQueueMemory *memory);
+
+// Returns the number of bytes queue users are allowed to read/write within each
+// message.
+size_t LocklessQueueMessageDataSize(const LocklessQueueMemory *memory);
+
+// TODO(austin): Return the oldest queue index. This lets us catch up nicely
+// if we got behind.
+// The easiest way to implement this is likely going to be to reserve the
+// first modulo of values for the initial time around, and never reuse them.
+// That lets us do a simple atomic read of the next index and deduce what has
+// happened. It will involve the simplest atomic operations.
+
+// TODO(austin): Make it so we can find the indices which were sent just
+// before and after a time with a binary search.
+
+// Prints to stdout the data inside the queue for debugging.
+void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
+
} // namespace ipc_lib
} // namespace aos