Expose a unique index for each event loop buffer

This can be helpful for indexing into other datastructures based on the
messages, by giving an identifier which will be unique as long as the
message is pinned.

Change-Id: I49ce18fba25a796005e64b40e5d1d5c55ca15543
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index e69fd0d..5c7e49d 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -33,6 +33,7 @@
   context_.queue_index = 0xffffffff;
   context_.size = 0;
   context_.data = nullptr;
+  context_.buffer_index = -1;
   event_loop_->NewFetcher(this);
 }
 
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index fa78953..c922f2e 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -48,6 +48,7 @@
   realtime_clock::time_point realtime_remote_time;
 
   // The rest are only valid for Watchers and Fetchers.
+
   // Index in the queue.
   uint32_t queue_index;
   // Index into the remote queue.  Useful to determine if data was lost.  In a
@@ -59,6 +60,20 @@
   // Pointer to the data.
   const void *data;
 
+  // Index of the message buffer. This will be in [0, NumberBuffers) on
+  // read_method=PIN channels, and -1 for other channels.
+  //
+  // This only tells you about the underlying storage for this message, not
+  // anything about its position in the queue. This is only useful for advanced
+  // zero-copy use cases, on read_method=PIN channels.
+  //
+  // This will uniquely identify a message on this channel at a point in time.
+  // For senders, this point in time is while the sender has the message. With
+  // read_method==PIN, this point in time includes while the caller has access
+  // to this context. For other read_methods, this point in time may be before
+  // the caller has access to this context, which makes this pretty useless.
+  int buffer_index;
+
   // Efficiently coppies the flatbuffer into a FlatbufferVector, allocating
   // memory in the process.  It is vital that T matches the type of the
   // underlying flatbuffer.
@@ -166,6 +181,10 @@
     return &fbb_allocator_;
   }
 
+  // Index of the buffer which is currently exposed by data() and the various
+  // other accessors. This is the message the caller should be filling out.
+  virtual int buffer_index() = 0;
+
  protected:
   EventLoop *event_loop() { return event_loop_; }
 
@@ -351,13 +370,17 @@
   // Returns the queue index that this was sent with.
   uint32_t sent_queue_index() const { return sender_->sent_queue_index(); }
 
+  // Returns the buffer index which MakeBuilder() will expose access to. This is
+  // the buffer the caller can fill out.
+  int buffer_index() const { return sender_->buffer_index(); }
+
  private:
   friend class EventLoop;
   Sender(std::unique_ptr<RawSender> sender) : sender_(std::move(sender)) {}
   std::unique_ptr<RawSender> sender_;
 };
 
-// Interface for timers
+// Interface for timers.
 class TimerHandler {
  public:
   virtual ~TimerHandler();
@@ -604,6 +627,7 @@
     MakeRawWatcher(channel, [watcher](const Context &context, const void *) {
       Context new_context = context;
       new_context.data = nullptr;
+      new_context.buffer_index = -1;
       watcher(new_context);
     });
   }
@@ -622,9 +646,13 @@
   // Prevents the event loop from sending a timing report.
   void SkipTimingReport() { skip_timing_report_ = true; }
 
-  // Prevents AOS_LOG being sent to message on /aos
+  // Prevents AOS_LOG being sent to message on /aos.
   void SkipAosLog() { skip_logger_ = true; }
 
+  // Returns the number of buffers for this channel. This corresponds with the
+  // range of Context::buffer_index values for this channel.
+  virtual int NumberBuffers(const Channel *channel) = 0;
+
  protected:
   // Sets the name of the event loop.  This is the application name.
   virtual void set_name(const std::string_view name) = 0;
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 606849a..993011f 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -1,8 +1,9 @@
 #include "aos/events/event_loop_param_test.h"
 
 #include <chrono>
+#include <unordered_map>
+#include <unordered_set>
 
-#include "aos/events/test_message_generated.h"
 #include "aos/flatbuffer_merge.h"
 #include "glog/logging.h"
 #include "gmock/gmock.h"
@@ -14,6 +15,54 @@
 namespace chrono = ::std::chrono;
 }  // namespace
 
+::std::unique_ptr<EventLoop> AbstractEventLoopTest::Make(
+    std::string_view name) {
+  std::string name_copy(name);
+  if (name == "") {
+    name_copy = "loop";
+    name_copy += std::to_string(event_loop_count_);
+  }
+  ++event_loop_count_;
+  return factory_->Make(name_copy);
+}
+
+void AbstractEventLoopTest::VerifyBuffers(
+    int number_buffers,
+    std::vector<std::reference_wrapper<const Fetcher<TestMessage>>> fetchers,
+    std::vector<std::reference_wrapper<const Sender<TestMessage>>> senders) {
+  // The buffers which are in a sender.
+  std::unordered_set<int> in_sender;
+  for (const Sender<TestMessage> &sender : senders) {
+    const int this_buffer = sender.buffer_index();
+    CHECK_GE(this_buffer, 0);
+    CHECK_LT(this_buffer, number_buffers);
+    CHECK(in_sender.insert(this_buffer).second) << ": " << this_buffer;
+  }
+
+  if (read_method() != ReadMethod::PIN) {
+    // If we're not using PIN, we can't really verify anything about what
+    // buffers the fetchers have.
+    return;
+  }
+
+  // Mapping from TestMessage::value to buffer index.
+  std::unordered_map<int, int> fetcher_values;
+  for (const Fetcher<TestMessage> &fetcher : fetchers) {
+    if (!fetcher.get()) {
+      continue;
+    }
+    const int this_buffer = fetcher.context().buffer_index;
+    CHECK_GE(this_buffer, 0);
+    CHECK_LT(this_buffer, number_buffers);
+    CHECK(in_sender.count(this_buffer) == 0) << ": " << this_buffer;
+    const auto insert_result = fetcher_values.insert(
+        std::make_pair(fetcher.get()->value(), this_buffer));
+    if (!insert_result.second) {
+      CHECK_EQ(this_buffer, insert_result.first->second);
+    }
+  }
+}
+
 // Tests that watcher can receive messages from a sender.
 // Also tests that OnRun() works.
 TEST_P(AbstractEventLoopTest, Basic) {
@@ -92,6 +141,7 @@
   loop2->MakeNoArgWatcher<TestMessage>("/test", [&]() {
     EXPECT_GT(loop2->context().size, 0u);
     EXPECT_EQ(nullptr, loop2->context().data);
+    EXPECT_EQ(-1, loop2->context().buffer_index);
     this->Exit();
   });
 
@@ -222,6 +272,7 @@
   EXPECT_EQ(fetcher.context().queue_index, 0xffffffffu);
   EXPECT_EQ(fetcher.context().size, 0u);
   EXPECT_EQ(fetcher.context().data, nullptr);
+  EXPECT_EQ(fetcher.context().buffer_index, -1);
 
   aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
   TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
@@ -248,6 +299,13 @@
   EXPECT_EQ(fetcher.context().queue_index, 0x0u);
   EXPECT_EQ(fetcher.context().size, 20u);
   EXPECT_NE(fetcher.context().data, nullptr);
+  if (read_method() == ReadMethod::PIN) {
+    EXPECT_GE(fetcher.context().buffer_index, 0);
+    EXPECT_LT(fetcher.context().buffer_index,
+              loop2->NumberBuffers(fetcher.channel()));
+  } else {
+    EXPECT_EQ(fetcher.context().buffer_index, -1);
+  }
 }
 
 // Tests that watcher will receive all messages sent if they are sent after
@@ -603,17 +661,32 @@
       fetchers.emplace_back(fetch_loop->MakeFetcher<TestMessage>("/test"));
     }
     send_message(1);
+    const auto verify_buffers = [&]() {
+      std::vector<std::reference_wrapper<const Fetcher<TestMessage>>>
+          fetchers_copy;
+      for (const auto &fetcher : fetchers) {
+        fetchers_copy.emplace_back(fetcher);
+      }
+      std::vector<std::reference_wrapper<const Sender<TestMessage>>>
+          senders_copy;
+      senders_copy.emplace_back(sender);
+      VerifyBuffers(send_loop->NumberBuffers(sender.channel()), fetchers_copy,
+                    senders_copy);
+    };
     for (auto &fetcher : fetchers) {
       ASSERT_TRUE(fetcher.Fetch());
+      verify_buffers();
       EXPECT_EQ(1, fetcher.get()->value());
     }
 
     for (int save = 1; save <= max_save; ++save) {
       SCOPED_TRACE("save=" + std::to_string(save));
       send_message(100 + save);
+      verify_buffers();
       for (size_t i = 0; i < fetchers.size() - save; ++i) {
         SCOPED_TRACE("fetcher=" + std::to_string(i));
         ASSERT_TRUE(fetchers[i].Fetch());
+        verify_buffers();
         EXPECT_EQ(100 + save, fetchers[i].get()->value());
       }
       for (size_t i = fetchers.size() - save; i < fetchers.size() - 1; ++i) {
@@ -625,6 +698,7 @@
 
     for (int i = 0; i < 300; ++i) {
       send_message(200 + i);
+      verify_buffers();
     }
 
     for (size_t i = 0; i < fetchers.size() - max_save; ++i) {
@@ -879,6 +953,7 @@
     EXPECT_EQ(loop->context().queue_index, 0xffffffffu);
     EXPECT_EQ(loop->context().size, 0u);
     EXPECT_EQ(loop->context().data, nullptr);
+    EXPECT_EQ(loop->context().buffer_index, -1);
 
     expected_times.push_back(loop->context().monotonic_event_time);
     if (times.size() == kCount) {
@@ -1121,6 +1196,15 @@
     EXPECT_LT(&msg, reinterpret_cast<const void *>(
                         reinterpret_cast<const char *>(loop1->context().data) +
                         loop1->context().size));
+    if (read_method() == ReadMethod::PIN) {
+      EXPECT_GE(loop1->context().buffer_index, 0);
+      EXPECT_LT(loop1->context().buffer_index,
+                loop1->NumberBuffers(
+                    configuration::GetChannel(loop1->configuration(), "/test",
+                                              "aos.TestMessage", "", nullptr)));
+    } else {
+      EXPECT_EQ(-1, loop1->context().buffer_index);
+    }
     triggered = true;
   });
 
@@ -1293,6 +1377,7 @@
             EXPECT_EQ(loop1->context().queue_index, 0xffffffffu);
             EXPECT_EQ(loop1->context().size, 0u);
             EXPECT_EQ(loop1->context().data, nullptr);
+            EXPECT_EQ(loop1->context().buffer_index, -1);
 
             if (times.size() == kCount) {
               LOG(INFO) << "Exiting";
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index f158e02..faf6361 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -1,10 +1,12 @@
 #ifndef _AOS_EVENTS_EVENT_LOOP_PARAM_TEST_H_
 #define _AOS_EVENTS_EVENT_LOOP_PARAM_TEST_H_
 
+#include <initializer_list>
 #include <string_view>
 #include <vector>
 
 #include "aos/events/event_loop.h"
+#include "aos/events/test_message_generated.h"
 #include "aos/flatbuffers.h"
 #include "aos/json_to_flatbuffer.h"
 #include "gtest/gtest.h"
@@ -181,11 +183,11 @@
   const Node *my_node_ = nullptr;
 };
 
-class AbstractEventLoopTestBase
+class AbstractEventLoopTest
     : public ::testing::TestWithParam<
           std::tuple<std::function<EventLoopTestFactory *()>, ReadMethod>> {
  public:
-  AbstractEventLoopTestBase() : factory_(std::get<0>(GetParam())()) {
+  AbstractEventLoopTest() : factory_(std::get<0>(GetParam())()) {
     if (read_method() == ReadMethod::PIN) {
       factory_->PinReads();
     }
@@ -193,15 +195,8 @@
 
   ReadMethod read_method() const { return std::get<1>(GetParam()); }
 
-  ::std::unique_ptr<EventLoop> Make(std::string_view name = "") {
-    std::string name_copy(name);
-    if (name == "") {
-      name_copy = "loop";
-      name_copy += std::to_string(event_loop_count_);
-    }
-    ++event_loop_count_;
-    return factory_->Make(name_copy);
-  }
+  ::std::unique_ptr<EventLoop> Make(std::string_view name = "");
+
   ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name = "primary") {
     ++event_loop_count_;
     return factory_->MakePrimary(name);
@@ -228,14 +223,20 @@
     end_timer->set_name("end");
   }
 
+  // Verifies that the buffer_index values for all of the given objects are
+  // consistent.
+  void VerifyBuffers(
+      int number_buffers,
+      std::vector<std::reference_wrapper<const Fetcher<TestMessage>>> fetchers,
+      std::vector<std::reference_wrapper<const Sender<TestMessage>>> senders);
+
  private:
   const ::std::unique_ptr<EventLoopTestFactory> factory_;
 
   int event_loop_count_ = 0;
 };
 
-typedef AbstractEventLoopTestBase AbstractEventLoopDeathTest;
-typedef AbstractEventLoopTestBase AbstractEventLoopTest;
+using AbstractEventLoopDeathTest = AbstractEventLoopTest;
 
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index a39a338..5ca1067 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -177,6 +177,7 @@
   event_loop_->context_.queue_index = 0xffffffffu;
   event_loop_->context_.size = 0;
   event_loop_->context_.data = nullptr;
+  event_loop_->context_.buffer_index = -1;
 
   ftrace_.FormatMessage(
       "timer: %.*s: start now=%" PRId64 " event=%" PRId64,
@@ -221,6 +222,7 @@
   event_loop_->context_.queue_index = 0xffffffffu;
   event_loop_->context_.size = 0;
   event_loop_->context_.data = nullptr;
+  event_loop_->context_.buffer_index = -1;
 
   // Compute how many cycles elapsed and schedule the next wakeup.
   Reschedule(schedule, monotonic_start_time);
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index b33fe98..e45c065 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -53,6 +53,8 @@
   FLAGS_shm_base = std::string(base) + "/dev/shm/aos";
 }
 
+namespace {
+
 std::string ShmFolder(const Channel *channel) {
   CHECK(channel->has_name());
   CHECK_EQ(channel->name()->string_view()[0], '/');
@@ -88,21 +90,28 @@
   }
 }
 
+ipc_lib::LocklessQueueConfiguration MakeQueueConfiguration(
+    const Channel *channel, std::chrono::seconds channel_storage_duration) {
+  ipc_lib::LocklessQueueConfiguration config;
+
+  config.num_watchers = channel->num_watchers();
+  config.num_senders = channel->num_senders();
+  // The value in the channel will default to 0 if readers are configured to
+  // copy.
+  config.num_pinners = channel->num_readers();
+  config.queue_size = channel_storage_duration.count() * channel->frequency();
+  config.message_data_size = channel->max_size();
+
+  return config;
+}
+
 class MMapedQueue {
  public:
   MMapedQueue(const Channel *channel,
-              const std::chrono::seconds channel_storage_duration) {
+              std::chrono::seconds channel_storage_duration)
+      : config_(MakeQueueConfiguration(channel, channel_storage_duration)) {
     std::string path = ShmPath(channel);
 
-    config_.num_watchers = channel->num_watchers();
-    config_.num_senders = channel->num_senders();
-    // The value in the channel will default to 0 if readers are configured to
-    // copy.
-    config_.num_pinners = channel->num_readers();
-    config_.queue_size =
-        channel_storage_duration.count() * channel->frequency();
-    config_.message_data_size = channel->max_size();
-
     size_ = ipc_lib::LocklessQueueMemorySize(config_);
 
     util::MkdirP(path, FLAGS_permissions);
@@ -110,7 +119,7 @@
     // There are 2 cases.  Either the file already exists, or it does not
     // already exist and we need to create it.  Start by trying to create it. If
     // that fails, the file has already been created and we can open it
-    // normally..  Once the file has been created it wil never be deleted.
+    // normally..  Once the file has been created it will never be deleted.
     int fd = open(path.c_str(), O_RDWR | O_CREAT | O_EXCL,
                   O_CLOEXEC | FLAGS_permissions);
     if (fd == -1 && errno == EEXIST) {
@@ -160,14 +169,12 @@
   }
 
  private:
-  ipc_lib::LocklessQueueConfiguration config_;
+  const ipc_lib::LocklessQueueConfiguration config_;
 
   size_t size_;
   void *data_;
 };
 
-namespace {
-
 const Node *MaybeMyNode(const Configuration *configuration) {
   if (!configuration->has_nodes()) {
     return nullptr;
@@ -320,11 +327,15 @@
 
     if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
       if (pin_data()) {
-        CHECK(pinner_->PinIndex(queue_index.index()))
+        const int pin_result = pinner_->PinIndex(queue_index.index());
+        CHECK(pin_result >= 0)
             << ": Got behind while reading and the last message was modified "
                "out from under us while we tried to pin it. Don't get so far "
                "behind on: "
             << configuration::CleanedChannelToString(channel_);
+        context_.buffer_index = pin_result;
+      } else {
+        context_.buffer_index = -1;
       }
 
       context_.queue_index = queue_index.index();
@@ -501,6 +512,8 @@
     return lockless_queue_memory_.GetSharedMemory();
   }
 
+  int buffer_index() override { return lockless_queue_sender_.buffer_index(); }
+
  private:
   MMapedQueue lockless_queue_memory_;
   ipc_lib::LocklessQueue lockless_queue_;
@@ -994,6 +1007,13 @@
   return watcher_state->GetSharedMemory();
 }
 
+int ShmEventLoop::NumberBuffers(const Channel *channel) {
+  return MakeQueueConfiguration(
+             channel, chrono::ceil<chrono::seconds>(chrono::nanoseconds(
+                          configuration()->channel_storage_duration())))
+      .num_messages();
+}
+
 absl::Span<char> ShmEventLoop::GetShmSenderSharedMemory(
     const aos::RawSender *sender) const {
   return static_cast<const ShmSender *>(sender)->GetSharedMemory();
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index 57d3b98..d7ea5e6 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -24,8 +24,8 @@
 
 }  // namespace shm_event_loop_internal
 
-// Specialization of EventLoop that is built from queues running out of shared
-// memory.
+// Concrete implementation of EventLoop that is built from queues running out of
+// shared memory.
 //
 // TODO(austin): Timing reports break multiple threads.  Need to add back in a
 // mutex.
@@ -98,6 +98,8 @@
     return GetShmFetcherPrivateMemory(GetRawFetcher(fetcher));
   }
 
+  int NumberBuffers(const Channel *channel) override;
+
  private:
   friend class shm_event_loop_internal::ShmWatcherState;
   friend class shm_event_loop_internal::ShmTimerHandler;
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index ae053fb..c339ce0 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -33,7 +33,6 @@
   Context context;
 
   SimulatedChannel *const channel = nullptr;
-  int buffer_index;
 
   // The data.
   char *data(size_t buffer_size) {
@@ -82,9 +81,10 @@
 
   ::std::deque<std::shared_ptr<SimulatedMessage>> msgs_;
 
-  SimulatedEventLoop *simulated_event_loop_;
+  SimulatedEventLoop *const simulated_event_loop_;
+  const Channel *const channel_;
+  EventScheduler *const scheduler_;
   EventHandler<SimulatedWatcher> event_;
-  EventScheduler *scheduler_;
   EventScheduler::Token token_;
   SimulatedChannel *simulated_channel_ = nullptr;
 };
@@ -247,11 +247,11 @@
 
 SimulatedMessage::SimulatedMessage(SimulatedChannel *channel_in)
     : channel(channel_in) {
-  buffer_index = channel->GetBufferIndex();
+  context.buffer_index = channel->GetBufferIndex();
 }
 
 SimulatedMessage::~SimulatedMessage() {
-  channel->FreeBufferIndex(buffer_index);
+  channel->FreeBufferIndex(context.buffer_index);
 }
 
 class SimulatedSender : public RawSender {
@@ -319,6 +319,12 @@
                   remote_queue_index);
   }
 
+  int buffer_index() override {
+    // First, ensure message_ is allocated.
+    data();
+    return message_->context.buffer_index;
+  }
+
  private:
   SimulatedChannel *simulated_channel_;
   EventLoop *event_loop_;
@@ -374,6 +380,9 @@
   void SetMsg(std::shared_ptr<SimulatedMessage> msg) {
     msg_ = msg;
     context_ = msg_->context;
+    if (channel()->read_method() != ReadMethod::PIN) {
+      context_.buffer_index = -1;
+    }
     if (context_.remote_queue_index == 0xffffffffu) {
       context_.remote_queue_index = context_.queue_index;
     }
@@ -567,6 +576,8 @@
     }
   }
 
+  int NumberBuffers(const Channel *channel) override;
+
  private:
   friend class SimulatedTimerHandler;
   friend class SimulatedPhasedLoopHandler;
@@ -664,14 +675,19 @@
   return it->second.get();
 }
 
+int SimulatedEventLoop::NumberBuffers(const Channel *channel) {
+  return GetSimulatedChannel(channel)->number_buffers();
+}
+
 SimulatedWatcher::SimulatedWatcher(
     SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
     const Channel *channel,
     std::function<void(const Context &context, const void *message)> fn)
     : WatcherState(simulated_event_loop, channel, std::move(fn)),
       simulated_event_loop_(simulated_event_loop),
-      event_(this),
+      channel_(channel),
       scheduler_(scheduler),
+      event_(this),
       token_(scheduler_->InvalidToken()) {}
 
 SimulatedWatcher::~SimulatedWatcher() {
@@ -679,7 +695,7 @@
   if (token_ != scheduler_->InvalidToken()) {
     scheduler_->Deschedule(token_);
   }
-  simulated_channel_->RemoveWatcher(this);
+  CHECK_NOTNULL(simulated_channel_)->RemoveWatcher(this);
 }
 
 void SimulatedWatcher::Schedule(std::shared_ptr<SimulatedMessage> message) {
@@ -709,6 +725,9 @@
   }
   Context context = msgs_.front()->context;
 
+  if (channel_->read_method() != ReadMethod::PIN) {
+    context.buffer_index = -1;
+  }
   if (context.remote_queue_index == 0xffffffffu) {
     context.remote_queue_index = context.queue_index;
   }
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index 01e5f24..dad44bb 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -859,7 +859,7 @@
 
 // This method doesn't mess with any scratch_index, so it doesn't have to worry
 // about message ownership.
-bool LocklessQueue::Pinner::PinIndex(uint32_t uint32_queue_index) {
+int LocklessQueue::Pinner::PinIndex(uint32_t uint32_queue_index) {
   const size_t queue_size = memory_->queue_size();
   const QueueIndex queue_index =
       QueueIndex::Zero(queue_size).IncrementBy(uint32_queue_index);
@@ -880,7 +880,7 @@
     if (message_queue_index == queue_index) {
       VLOG(3) << "Eq: " << std::hex << message_queue_index.index();
       aos_compiler_memory_barrier();
-      return true;
+      return message_index.message_index();
     }
     VLOG(3) << "Message reused: " << std::hex << message_queue_index.index()
             << ", " << queue_index.index();
@@ -890,7 +890,7 @@
   // longer in the queue, so back that out now.
   pinner->pinned.Invalidate();
   VLOG(3) << "Unpinned: " << std::hex << queue_index.index();
-  return false;
+  return -1;
 }
 
 size_t LocklessQueue::Pinner::size() const {
@@ -1094,6 +1094,14 @@
   memory_->GetMessage(new_scratch)->header.queue_index.RelaxedInvalidate();
 }
 
+int LocklessQueue::Sender::buffer_index() const {
+  ::aos::ipc_lib::Sender *const sender = memory_->GetSender(sender_index_);
+  // We can do a relaxed load on our sender because we're the only person
+  // modifying it right now.
+  const Index scratch_index = sender->scratch_index.RelaxedLoad();
+  return scratch_index.message_index();
+}
+
 LocklessQueue::ReadResult LocklessQueue::Read(
     uint32_t uint32_queue_index,
     ::aos::monotonic_clock::time_point *monotonic_sent_time,
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index 5676c53..afa7ced 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -258,6 +258,8 @@
               aos::realtime_clock::time_point *realtime_sent_time = nullptr,
               uint32_t *queue_index = nullptr);
 
+    int buffer_index() const;
+
    private:
     friend class LocklessQueue;
 
@@ -298,9 +300,9 @@
 
     // Attempts to pin the message at queue_index.
     // Un-pins the previous message.
-    // Returns true if it succeeds.
-    // Returns false if that message is no longer in the queue.
-    bool PinIndex(uint32_t queue_index);
+    // Returns the buffer index (non-negative) if it succeeds.
+    // Returns -1 if that message is no longer in the queue.
+    int PinIndex(uint32_t queue_index);
 
     // Read at most size() bytes of data into the memory pointed to by Data().
     // Note: calls to Data() are expensive enough that you should cache it.