Add LocklessQueue::Pinner class

This will allow reading messages from queues without copying, which is
helpful for speeding up the processing of images.

Change-Id: Ia4bb98afa6fe1c1b5cc186e3071c7458f143d77d
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index de80f3d..5676c53 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -4,8 +4,8 @@
 #include <signal.h>
 #include <sys/signalfd.h>
 #include <sys/types.h>
-#include <vector>
 #include <optional>
+#include <vector>
 
 #include "aos/ipc_lib/aos_sync.h"
 #include "aos/ipc_lib/data_alignment.h"
@@ -51,6 +51,21 @@
   AtomicIndex to_replace;
 };
 
+// Structure to hold the state required to pin messages.
+struct Pinner {
+  // The same as Sender::tid. See there for docs.
+  aos_mutex tid;
+
+  // Queue index of the message we have pinned, or Invalid if there isn't one.
+  AtomicQueueIndex pinned;
+
+  // This should always be valid.
+  //
+  // Note that this is fully independent from pinned. It's just a place to stash
+  // a message, to ensure there's always an unpinned one for a writer to grab.
+  AtomicIndex scratch_index;
+};
+
 // Structure representing a message.
 struct Message {
   struct Header {
@@ -98,6 +113,8 @@
   size_t num_watchers;
   // Size of the sender list.
   size_t num_senders;
+  // Size of the pinner list.
+  size_t num_pinners;
 
   // Size of the list of pointers into the messages list.
   size_t queue_size;
@@ -106,7 +123,7 @@
 
   size_t message_size() const;
 
-  size_t num_messages() const { return num_senders + queue_size; }
+  size_t num_messages() const { return num_senders + num_pinners + queue_size; }
 };
 
 // Structure to hold the state of the queue.
@@ -258,10 +275,64 @@
     int sender_index_ = -1;
   };
 
+  // Pinner for blocks of data.  The resources associated with a pinner are
+  // scoped to this object's lifetime.
+  class Pinner {
+   public:
+    Pinner(const Pinner &) = delete;
+    Pinner &operator=(const Pinner &) = delete;
+    Pinner(Pinner &&other)
+        : memory_(other.memory_), pinner_index_(other.pinner_index_) {
+      other.memory_ = nullptr;
+      other.pinner_index_ = -1;
+    }
+    Pinner &operator=(Pinner &&other) {
+      memory_ = other.memory_;
+      pinner_index_ = other.pinner_index_;
+      other.memory_ = nullptr;
+      other.pinner_index_ = -1;
+      return *this;
+    }
+
+    ~Pinner();
+
+    // Attempts to pin the message at queue_index.
+    // Un-pins the previous message.
+    // Returns true if it succeeds.
+    // Returns false if that message is no longer in the queue.
+    bool PinIndex(uint32_t queue_index);
+
+    // Read at most size() bytes of data into the memory pointed to by Data().
+    // Note: calls to Data() are expensive enough that you should cache it.
+    // Don't call Data() before a successful PinIndex call.
+    size_t size() const;
+    const void *Data() const;
+
+   private:
+    friend class LocklessQueue;
+
+    Pinner(LocklessQueueMemory *memory);
+
+    // Returns true if this pinner is valid.  If it isn't valid, any of the
+    // other methods won't work.  This is here to allow the lockless queue to
+    // only build a pinner if there was one available.
+    bool valid() const { return pinner_index_ != -1 && memory_ != nullptr; }
+
+    // Pointer to the backing memory.
+    LocklessQueueMemory *memory_ = nullptr;
+
+    // Index into the pinner list.
+    int pinner_index_ = -1;
+  };
+
   // Creates a sender.  If we couldn't allocate a sender, returns nullopt.
   // TODO(austin): Change the API if we find ourselves with more errors.
   std::optional<Sender> MakeSender();
 
+  // Creates a pinner.  If we couldn't allocate a pinner, returns nullopt.
+  // TODO(austin): Change the API if we find ourselves with more errors.
+  std::optional<Pinner> MakePinner();
+
  private:
   LocklessQueueMemory *memory_ = nullptr;