Add LocklessQueue::Pinner class
This will allow reading messages from queues without copying, which is
helpful for speeding up the processing of images.
Change-Id: Ia4bb98afa6fe1c1b5cc186e3071c7458f143d77d
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index de80f3d..5676c53 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -4,8 +4,8 @@
#include <signal.h>
#include <sys/signalfd.h>
#include <sys/types.h>
-#include <vector>
#include <optional>
+#include <vector>
#include "aos/ipc_lib/aos_sync.h"
#include "aos/ipc_lib/data_alignment.h"
@@ -51,6 +51,21 @@
AtomicIndex to_replace;
};
+// Structure to hold the state required to pin messages.
+struct Pinner {
+ // The same as Sender::tid. See there for docs.
+ aos_mutex tid;
+
+ // Queue index of the message we have pinned, or Invalid if there isn't one.
+ AtomicQueueIndex pinned;
+
+ // This should always be valid.
+ //
+ // Note that this is fully independent from pinned. It's just a place to stash
+ // a message, to ensure there's always an unpinned one for a writer to grab.
+ AtomicIndex scratch_index;
+};
+
// Structure representing a message.
struct Message {
struct Header {
@@ -98,6 +113,8 @@
size_t num_watchers;
// Size of the sender list.
size_t num_senders;
+ // Size of the pinner list.
+ size_t num_pinners;
// Size of the list of pointers into the messages list.
size_t queue_size;
@@ -106,7 +123,7 @@
size_t message_size() const;
- size_t num_messages() const { return num_senders + queue_size; }
+ size_t num_messages() const { return num_senders + num_pinners + queue_size; }
};
// Structure to hold the state of the queue.
@@ -258,10 +275,64 @@
int sender_index_ = -1;
};
+ // Pinner for blocks of data. The resources associated with a pinner are
+ // scoped to this object's lifetime.
+ class Pinner {
+ public:
+ Pinner(const Pinner &) = delete;
+ Pinner &operator=(const Pinner &) = delete;
+ Pinner(Pinner &&other)
+ : memory_(other.memory_), pinner_index_(other.pinner_index_) {
+ other.memory_ = nullptr;
+ other.pinner_index_ = -1;
+ }
+ Pinner &operator=(Pinner &&other) {
+ memory_ = other.memory_;
+ pinner_index_ = other.pinner_index_;
+ other.memory_ = nullptr;
+ other.pinner_index_ = -1;
+ return *this;
+ }
+
+ ~Pinner();
+
+ // Attempts to pin the message at queue_index.
+ // Un-pins the previous message.
+ // Returns true if it succeeds.
+ // Returns false if that message is no longer in the queue.
+ bool PinIndex(uint32_t queue_index);
+
+ // Read at most size() bytes of data into the memory pointed to by Data().
+ // Note: calls to Data() are expensive enough that you should cache it.
+ // Don't call Data() before a successful PinIndex call.
+ size_t size() const;
+ const void *Data() const;
+
+ private:
+ friend class LocklessQueue;
+
+ Pinner(LocklessQueueMemory *memory);
+
+ // Returns true if this pinner is valid. If it isn't valid, any of the
+ // other methods won't work. This is here to allow the lockless queue to
+ // only build a pinner if there was one available.
+ bool valid() const { return pinner_index_ != -1 && memory_ != nullptr; }
+
+ // Pointer to the backing memory.
+ LocklessQueueMemory *memory_ = nullptr;
+
+ // Index into the pinner list.
+ int pinner_index_ = -1;
+ };
+
// Creates a sender. If we couldn't allocate a sender, returns nullopt.
// TODO(austin): Change the API if we find ourselves with more errors.
std::optional<Sender> MakeSender();
+ // Creates a pinner. If we couldn't allocate a pinner, returns nullopt.
+ // TODO(austin): Change the API if we find ourselves with more errors.
+ std::optional<Pinner> MakePinner();
+
private:
LocklessQueueMemory *memory_ = nullptr;