Merge changes Ia95acd32,I714c3f06,I5b30c3b0

* changes:
  Update prints to have which node they are from
  Move code to timestamp_filter.cc
  Make gmp build, and run all the tests
diff --git a/aos/configuration.cc b/aos/configuration.cc
index 927e9d3..834ab5b 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -299,6 +299,11 @@
         continue;
       }
 
+      CHECK_EQ(c->read_method() == ReadMethod::PIN, c->num_readers() != 0)
+          << ": num_readers may be set if and only if read_method is PIN,"
+             " if you want 0 readers do not set PIN: "
+          << CleanedChannelToString(c);
+
       // Attempt to insert the channel.
       auto result = channels.insert(CopyFlatBuffer(c));
       if (!result.second) {
diff --git a/aos/configuration.fbs b/aos/configuration.fbs
index 31d89e7..9a24c8a 100644
--- a/aos/configuration.fbs
+++ b/aos/configuration.fbs
@@ -42,6 +42,13 @@
   time_to_live:uint = 0;
 }
 
+enum ReadMethod : ubyte {
+  // Copy all the data out of shared memory into a local buffer for each reader.
+  COPY,
+  // Pin the data in shared memory and read directly from there.
+  PIN,
+}
+
 // Table representing a channel.  Channels are where data is published and
 // subscribed from.  The tuple of name, type is the identifying information.
 table Channel {
@@ -78,6 +85,14 @@
   // node responsible for logging it.  Empty implies the node this connection
   // is connecting to (i.e. name).
   logger_nodes:[string];
+
+  // The way messages are read from shared memory for this channel.
+  read_method:ReadMethod = COPY;
+
+  // Sets the maximum number of senders on a channel.
+  //
+  // Currently, this must be set if and only if read_method is PIN.
+  num_readers:int;
 }
 
 // Table to support renaming channel names.
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 39a6a54..0212eaa 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -243,7 +243,7 @@
 cc_test(
     name = "shm_event_loop_test",
     srcs = ["shm_event_loop_test.cc"],
-    shard_count = 5,
+    shard_count = 16,
     deps = [
         ":event_loop_param_test",
         ":shm_event_loop",
@@ -267,6 +267,7 @@
     name = "simulated_event_loop_test",
     srcs = ["simulated_event_loop_test.cc"],
     data = ["multinode_pingpong_config.json"],
+    shard_count = 4,
     deps = [
         ":event_loop_param_test",
         ":ping_lib",
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index e69fd0d..5c7e49d 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -33,6 +33,7 @@
   context_.queue_index = 0xffffffff;
   context_.size = 0;
   context_.data = nullptr;
+  context_.buffer_index = -1;
   event_loop_->NewFetcher(this);
 }
 
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index fa78953..c922f2e 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -48,6 +48,7 @@
   realtime_clock::time_point realtime_remote_time;
 
   // The rest are only valid for Watchers and Fetchers.
+
   // Index in the queue.
   uint32_t queue_index;
   // Index into the remote queue.  Useful to determine if data was lost.  In a
@@ -59,6 +60,20 @@
   // Pointer to the data.
   const void *data;
 
+  // Index of the message buffer. This will be in [0, NumberBuffers) on
+  // read_method=PIN channels, and -1 for other channels.
+  //
+  // This only tells you about the underlying storage for this message, not
+  // anything about its position in the queue. This is only useful for advanced
+  // zero-copy use cases, on read_method=PIN channels.
+  //
+  // This will uniquely identify a message on this channel at a point in time.
+  // For senders, this point in time is while the sender has the message. With
+  // read_method==PIN, this point in time includes while the caller has access
+  // to this context. For other read_methods, this point in time may be before
+  // the caller has access to this context, which makes this pretty useless.
+  int buffer_index;
+
   // Efficiently coppies the flatbuffer into a FlatbufferVector, allocating
   // memory in the process.  It is vital that T matches the type of the
   // underlying flatbuffer.
@@ -166,6 +181,10 @@
     return &fbb_allocator_;
   }
 
+  // Index of the buffer which is currently exposed by data() and the various
+  // other accessors. This is the message the caller should be filling out.
+  virtual int buffer_index() = 0;
+
  protected:
   EventLoop *event_loop() { return event_loop_; }
 
@@ -351,13 +370,17 @@
   // Returns the queue index that this was sent with.
   uint32_t sent_queue_index() const { return sender_->sent_queue_index(); }
 
+  // Returns the buffer index which MakeBuilder() will expose access to. This is
+  // the buffer the caller can fill out.
+  int buffer_index() const { return sender_->buffer_index(); }
+
  private:
   friend class EventLoop;
   Sender(std::unique_ptr<RawSender> sender) : sender_(std::move(sender)) {}
   std::unique_ptr<RawSender> sender_;
 };
 
-// Interface for timers
+// Interface for timers.
 class TimerHandler {
  public:
   virtual ~TimerHandler();
@@ -604,6 +627,7 @@
     MakeRawWatcher(channel, [watcher](const Context &context, const void *) {
       Context new_context = context;
       new_context.data = nullptr;
+      new_context.buffer_index = -1;
       watcher(new_context);
     });
   }
@@ -622,9 +646,13 @@
   // Prevents the event loop from sending a timing report.
   void SkipTimingReport() { skip_timing_report_ = true; }
 
-  // Prevents AOS_LOG being sent to message on /aos
+  // Prevents AOS_LOG being sent to message on /aos.
   void SkipAosLog() { skip_logger_ = true; }
 
+  // Returns the number of buffers for this channel. This corresponds with the
+  // range of Context::buffer_index values for this channel.
+  virtual int NumberBuffers(const Channel *channel) = 0;
+
  protected:
   // Sets the name of the event loop.  This is the application name.
   virtual void set_name(const std::string_view name) = 0;
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 8d0d0e2..993011f 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -1,8 +1,9 @@
 #include "aos/events/event_loop_param_test.h"
 
 #include <chrono>
+#include <unordered_map>
+#include <unordered_set>
 
-#include "aos/events/test_message_generated.h"
 #include "aos/flatbuffer_merge.h"
 #include "glog/logging.h"
 #include "gmock/gmock.h"
@@ -14,6 +15,54 @@
 namespace chrono = ::std::chrono;
 }  // namespace
 
+::std::unique_ptr<EventLoop> AbstractEventLoopTest::Make(
+    std::string_view name) {
+  std::string name_copy(name);
+  if (name == "") {
+    name_copy = "loop";
+    name_copy += std::to_string(event_loop_count_);
+  }
+  ++event_loop_count_;
+  return factory_->Make(name_copy);
+}
+
+void AbstractEventLoopTest::VerifyBuffers(
+    int number_buffers,
+    std::vector<std::reference_wrapper<const Fetcher<TestMessage>>> fetchers,
+    std::vector<std::reference_wrapper<const Sender<TestMessage>>> senders) {
+  // The buffers which are in a sender.
+  std::unordered_set<int> in_sender;
+  for (const Sender<TestMessage> &sender : senders) {
+    const int this_buffer = sender.buffer_index();
+    CHECK_GE(this_buffer, 0);
+    CHECK_LT(this_buffer, number_buffers);
+    CHECK(in_sender.insert(this_buffer).second) << ": " << this_buffer;
+  }
+
+  if (read_method() != ReadMethod::PIN) {
+    // If we're not using PIN, we can't really verify anything about what
+    // buffers the fetchers have.
+    return;
+  }
+
+  // Mapping from TestMessage::value to buffer index.
+  std::unordered_map<int, int> fetcher_values;
+  for (const Fetcher<TestMessage> &fetcher : fetchers) {
+    if (!fetcher.get()) {
+      continue;
+    }
+    const int this_buffer = fetcher.context().buffer_index;
+    CHECK_GE(this_buffer, 0);
+    CHECK_LT(this_buffer, number_buffers);
+    CHECK(in_sender.count(this_buffer) == 0) << ": " << this_buffer;
+    const auto insert_result = fetcher_values.insert(
+        std::make_pair(fetcher.get()->value(), this_buffer));
+    if (!insert_result.second) {
+      CHECK_EQ(this_buffer, insert_result.first->second);
+    }
+  }
+}
+
 // Tests that watcher can receive messages from a sender.
 // Also tests that OnRun() works.
 TEST_P(AbstractEventLoopTest, Basic) {
@@ -92,6 +141,7 @@
   loop2->MakeNoArgWatcher<TestMessage>("/test", [&]() {
     EXPECT_GT(loop2->context().size, 0u);
     EXPECT_EQ(nullptr, loop2->context().data);
+    EXPECT_EQ(-1, loop2->context().buffer_index);
     this->Exit();
   });
 
@@ -222,6 +272,7 @@
   EXPECT_EQ(fetcher.context().queue_index, 0xffffffffu);
   EXPECT_EQ(fetcher.context().size, 0u);
   EXPECT_EQ(fetcher.context().data, nullptr);
+  EXPECT_EQ(fetcher.context().buffer_index, -1);
 
   aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
   TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
@@ -248,6 +299,13 @@
   EXPECT_EQ(fetcher.context().queue_index, 0x0u);
   EXPECT_EQ(fetcher.context().size, 20u);
   EXPECT_NE(fetcher.context().data, nullptr);
+  if (read_method() == ReadMethod::PIN) {
+    EXPECT_GE(fetcher.context().buffer_index, 0);
+    EXPECT_LT(fetcher.context().buffer_index,
+              loop2->NumberBuffers(fetcher.channel()));
+  } else {
+    EXPECT_EQ(fetcher.context().buffer_index, -1);
+  }
 }
 
 // Tests that watcher will receive all messages sent if they are sent after
@@ -560,6 +618,105 @@
   EXPECT_EQ(200, fetcher.get()->value());
 }
 
+// Verify that a fetcher still holds its data, even after falling behind.
+TEST_P(AbstractEventLoopTest, FetcherBehindData) {
+  auto send_loop = Make();
+  auto fetch_loop = Make();
+  auto sender = send_loop->MakeSender<TestMessage>("/test");
+  Fetcher<TestMessage> fetcher = fetch_loop->MakeFetcher<TestMessage>("/test");
+  {
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(1);
+    ASSERT_TRUE(msg.Send(builder.Finish()));
+  }
+  ASSERT_TRUE(fetcher.Fetch());
+  EXPECT_EQ(1, fetcher.get()->value());
+  for (int i = 0; i < 300; ++i) {
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(i + 2);
+    ASSERT_TRUE(msg.Send(builder.Finish()));
+  }
+  EXPECT_EQ(1, fetcher.get()->value());
+}
+
+// Try a bunch of orderings of operations with fetchers and senders. Verify that
+// all the fetchers have the correct data at each step.
+TEST_P(AbstractEventLoopTest, FetcherPermutations) {
+  for (int max_save = 0; max_save < 5; ++max_save) {
+    SCOPED_TRACE("max_save=" + std::to_string(max_save));
+
+    auto send_loop = Make();
+    auto fetch_loop = Make();
+    auto sender = send_loop->MakeSender<TestMessage>("/test");
+    const auto send_message = [&sender](int i) {
+      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+      builder.add_value(i);
+      ASSERT_TRUE(msg.Send(builder.Finish()));
+    };
+    std::vector<Fetcher<TestMessage>> fetchers;
+    for (int i = 0; i < 10; ++i) {
+      fetchers.emplace_back(fetch_loop->MakeFetcher<TestMessage>("/test"));
+    }
+    send_message(1);
+    const auto verify_buffers = [&]() {
+      std::vector<std::reference_wrapper<const Fetcher<TestMessage>>>
+          fetchers_copy;
+      for (const auto &fetcher : fetchers) {
+        fetchers_copy.emplace_back(fetcher);
+      }
+      std::vector<std::reference_wrapper<const Sender<TestMessage>>>
+          senders_copy;
+      senders_copy.emplace_back(sender);
+      VerifyBuffers(send_loop->NumberBuffers(sender.channel()), fetchers_copy,
+                    senders_copy);
+    };
+    for (auto &fetcher : fetchers) {
+      ASSERT_TRUE(fetcher.Fetch());
+      verify_buffers();
+      EXPECT_EQ(1, fetcher.get()->value());
+    }
+
+    for (int save = 1; save <= max_save; ++save) {
+      SCOPED_TRACE("save=" + std::to_string(save));
+      send_message(100 + save);
+      verify_buffers();
+      for (size_t i = 0; i < fetchers.size() - save; ++i) {
+        SCOPED_TRACE("fetcher=" + std::to_string(i));
+        ASSERT_TRUE(fetchers[i].Fetch());
+        verify_buffers();
+        EXPECT_EQ(100 + save, fetchers[i].get()->value());
+      }
+      for (size_t i = fetchers.size() - save; i < fetchers.size() - 1; ++i) {
+        SCOPED_TRACE("fetcher=" + std::to_string(i));
+        EXPECT_EQ(100 + (fetchers.size() - 1 - i), fetchers[i].get()->value());
+      }
+      EXPECT_EQ(1, fetchers.back().get()->value());
+    }
+
+    for (int i = 0; i < 300; ++i) {
+      send_message(200 + i);
+      verify_buffers();
+    }
+
+    for (size_t i = 0; i < fetchers.size() - max_save; ++i) {
+      SCOPED_TRACE("fetcher=" + std::to_string(i));
+      if (max_save > 0) {
+        EXPECT_EQ(100 + max_save, fetchers[i].get()->value());
+      } else {
+        EXPECT_EQ(1, fetchers[i].get()->value());
+      }
+    }
+    for (size_t i = fetchers.size() - max_save; i < fetchers.size() - 1; ++i) {
+      SCOPED_TRACE("fetcher=" + std::to_string(i));
+      EXPECT_EQ(100 + (fetchers.size() - 1 - i), fetchers[i].get()->value());
+    }
+    EXPECT_EQ(1, fetchers.back().get()->value());
+  }
+}
+
 // Verify that making a fetcher and watcher for "/test" succeeds.
 TEST_P(AbstractEventLoopTest, FetcherAndWatcher) {
   auto loop = Make();
@@ -642,7 +799,80 @@
   }
   EXPECT_DEATH({ loop->MakeSender<TestMessage>("/test"); },
                "Failed to create sender on \\{ \"name\": \"/test\", \"type\": "
-               "\"aos.TestMessage\" \\}, too many senders.");
+               "\"aos.TestMessage\"[^}]*\\ }, too many senders.");
+}
+
+// Verify that creating too many fetchers fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyFetchers) {
+  if (read_method() != ReadMethod::PIN) {
+    // Other read methods don't limit the number of readers, so just skip this.
+    return;
+  }
+
+  auto loop = Make();
+  std::vector<aos::Fetcher<TestMessage>> fetchers;
+  for (int i = 0; i < 10; ++i) {
+    fetchers.emplace_back(loop->MakeFetcher<TestMessage>("/test"));
+  }
+  EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); },
+               "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+               "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
+}
+
+// Verify that creating too many fetchers, split between two event loops, fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyFetchersTwoLoops) {
+  if (read_method() != ReadMethod::PIN) {
+    // Other read methods don't limit the number of readers, so just skip this.
+    return;
+  }
+
+  auto loop = Make();
+  auto loop2 = Make();
+  std::vector<aos::Fetcher<TestMessage>> fetchers;
+  for (int i = 0; i < 5; ++i) {
+    fetchers.emplace_back(loop->MakeFetcher<TestMessage>("/test"));
+    fetchers.emplace_back(loop2->MakeFetcher<TestMessage>("/test"));
+  }
+  EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); },
+               "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+               "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
+}
+
+// Verify that creating too many watchers fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyWatchers) {
+  if (read_method() != ReadMethod::PIN) {
+    // Other read methods don't limit the number of readers, so just skip this.
+    return;
+  }
+
+  std::vector<std::unique_ptr<EventLoop>> loops;
+  for (int i = 0; i < 10; ++i) {
+    loops.emplace_back(Make());
+    loops.back()->MakeWatcher("/test", [](const TestMessage &) {});
+  }
+  EXPECT_DEATH({ Make()->MakeWatcher("/test", [](const TestMessage &) {}); },
+               "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+               "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
+}
+
+// Verify that creating too many watchers and fetchers combined fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyWatchersAndFetchers) {
+  if (read_method() != ReadMethod::PIN) {
+    // Other read methods don't limit the number of readers, so just skip this.
+    return;
+  }
+
+  auto loop = Make();
+  std::vector<aos::Fetcher<TestMessage>> fetchers;
+  std::vector<std::unique_ptr<EventLoop>> loops;
+  for (int i = 0; i < 5; ++i) {
+    fetchers.emplace_back(loop->MakeFetcher<TestMessage>("/test"));
+    loops.emplace_back(Make());
+    loops.back()->MakeWatcher("/test", [](const TestMessage &) {});
+  }
+  EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); },
+               "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+               "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
 }
 
 // Verify that we can't create a sender inside OnRun.
@@ -723,6 +953,7 @@
     EXPECT_EQ(loop->context().queue_index, 0xffffffffu);
     EXPECT_EQ(loop->context().size, 0u);
     EXPECT_EQ(loop->context().data, nullptr);
+    EXPECT_EQ(loop->context().buffer_index, -1);
 
     expected_times.push_back(loop->context().monotonic_event_time);
     if (times.size() == kCount) {
@@ -965,6 +1196,15 @@
     EXPECT_LT(&msg, reinterpret_cast<const void *>(
                         reinterpret_cast<const char *>(loop1->context().data) +
                         loop1->context().size));
+    if (read_method() == ReadMethod::PIN) {
+      EXPECT_GE(loop1->context().buffer_index, 0);
+      EXPECT_LT(loop1->context().buffer_index,
+                loop1->NumberBuffers(
+                    configuration::GetChannel(loop1->configuration(), "/test",
+                                              "aos.TestMessage", "", nullptr)));
+    } else {
+      EXPECT_EQ(-1, loop1->context().buffer_index);
+    }
     triggered = true;
   });
 
@@ -1137,6 +1377,7 @@
             EXPECT_EQ(loop1->context().queue_index, 0xffffffffu);
             EXPECT_EQ(loop1->context().size, 0u);
             EXPECT_EQ(loop1->context().data, nullptr);
+            EXPECT_EQ(loop1->context().buffer_index, -1);
 
             if (times.size() == kCount) {
               LOG(INFO) << "Exiting";
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index cbd5cd1..faf6361 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -1,10 +1,12 @@
 #ifndef _AOS_EVENTS_EVENT_LOOP_PARAM_TEST_H_
 #define _AOS_EVENTS_EVENT_LOOP_PARAM_TEST_H_
 
+#include <initializer_list>
 #include <string_view>
 #include <vector>
 
 #include "aos/events/event_loop.h"
+#include "aos/events/test_message_generated.h"
 #include "aos/flatbuffers.h"
 #include "aos/json_to_flatbuffer.h"
 #include "gtest/gtest.h"
@@ -15,31 +17,30 @@
 class EventLoopTestFactory {
  public:
   EventLoopTestFactory()
-      : flatbuffer_(JsonToFlatbuffer("{\n"
-                                     "  \"channels\": [ \n"
-                                     "    {\n"
-                                     "      \"name\": \"/aos\",\n"
-                                     "      \"type\": \"aos.logging.LogMessageFbs\"\n"
-                                     "    },\n"
-                                     "    {\n"
-                                     "      \"name\": \"/aos\",\n"
-                                     "      \"type\": \"aos.timing.Report\"\n"
-                                     "    },\n"
-                                     "    {\n"
-                                     "      \"name\": \"/test\",\n"
-                                     "      \"type\": \"aos.TestMessage\"\n"
-                                     "    },\n"
-                                     "    {\n"
-                                     "      \"name\": \"/test1\",\n"
-                                     "      \"type\": \"aos.TestMessage\"\n"
-                                     "    },\n"
-                                     "    {\n"
-                                     "      \"name\": \"/test2\",\n"
-                                     "      \"type\": \"aos.TestMessage\"\n"
-                                     "    }\n"
-                                     "  ]\n"
-                                     "}\n",
-                                     Configuration::MiniReflectTypeTable())) {}
+      : flatbuffer_(JsonToFlatbuffer<Configuration>(R"config({
+  "channels": [
+    {
+      "name": "/aos",
+      "type": "aos.logging.LogMessageFbs"
+    },
+    {
+      "name": "/aos",
+      "type": "aos.timing.Report"
+    },
+    {
+      "name": "/test",
+      "type": "aos.TestMessage"
+    },
+    {
+      "name": "/test1",
+      "type": "aos.TestMessage"
+    },
+    {
+      "name": "/test2",
+      "type": "aos.TestMessage"
+    }
+  ]
+})config")) {}
 
   virtual ~EventLoopTestFactory() {}
 
@@ -58,8 +59,48 @@
   // Advances time by sleeping.  Can't be called from inside a loop.
   virtual void SleepFor(::std::chrono::nanoseconds duration) = 0;
 
+  void PinReads() {
+    static const std::string kJson = R"config({
+  "channels": [
+    {
+      "name": "/aos",
+      "type": "aos.logging.LogMessageFbs",
+      "read_method": "PIN",
+      "num_readers": 10
+    },
+    {
+      "name": "/aos",
+      "type": "aos.timing.Report",
+      "read_method": "PIN",
+      "num_readers": 10
+    },
+    {
+      "name": "/test",
+      "type": "aos.TestMessage",
+      "read_method": "PIN",
+      "num_readers": 10
+    },
+    {
+      "name": "/test1",
+      "type": "aos.TestMessage",
+      "read_method": "PIN",
+      "num_readers": 10
+    },
+    {
+      "name": "/test2",
+      "type": "aos.TestMessage",
+      "read_method": "PIN",
+      "num_readers": 10
+    }
+  ]
+})config";
+
+    flatbuffer_ = FlatbufferDetachedBuffer<Configuration>(
+        JsonToFlatbuffer(kJson, Configuration::MiniReflectTypeTable()));
+  }
+
   void EnableNodes(std::string_view my_node) {
-    std::string json = R"config({
+    static const std::string kJson = R"config({
   "channels": [
     {
       "name": "/aos/me",
@@ -127,7 +168,7 @@
 })config";
 
     flatbuffer_ = FlatbufferDetachedBuffer<Configuration>(
-        JsonToFlatbuffer(json, Configuration::MiniReflectTypeTable()));
+        JsonToFlatbuffer(kJson, Configuration::MiniReflectTypeTable()));
 
     my_node_ = configuration::GetNode(&flatbuffer_.message(), my_node);
   }
@@ -142,20 +183,20 @@
   const Node *my_node_ = nullptr;
 };
 
-class AbstractEventLoopTestBase
-    : public ::testing::TestWithParam<std::function<EventLoopTestFactory *()>> {
+class AbstractEventLoopTest
+    : public ::testing::TestWithParam<
+          std::tuple<std::function<EventLoopTestFactory *()>, ReadMethod>> {
  public:
-  AbstractEventLoopTestBase() { factory_.reset(GetParam()()); }
-
-  ::std::unique_ptr<EventLoop> Make(std::string_view name = "") {
-    std::string name_copy(name);
-    if (name == "") {
-      name_copy = "loop";
-      name_copy += std::to_string(event_loop_count_);
+  AbstractEventLoopTest() : factory_(std::get<0>(GetParam())()) {
+    if (read_method() == ReadMethod::PIN) {
+      factory_->PinReads();
     }
-    ++event_loop_count_;
-    return factory_->Make(name_copy);
   }
+
+  ReadMethod read_method() const { return std::get<1>(GetParam()); }
+
+  ::std::unique_ptr<EventLoop> Make(std::string_view name = "");
+
   ::std::unique_ptr<EventLoop> MakePrimary(std::string_view name = "primary") {
     ++event_loop_count_;
     return factory_->MakePrimary(name);
@@ -182,17 +223,20 @@
     end_timer->set_name("end");
   }
 
-  // You can implement all the usual fixture class members here.
-  // To access the test parameter, call GetParam() from class
-  // TestWithParam<T>.
+  // Verifies that the buffer_index values for all of the given objects are
+  // consistent.
+  void VerifyBuffers(
+      int number_buffers,
+      std::vector<std::reference_wrapper<const Fetcher<TestMessage>>> fetchers,
+      std::vector<std::reference_wrapper<const Sender<TestMessage>>> senders);
+
  private:
-  ::std::unique_ptr<EventLoopTestFactory> factory_;
+  const ::std::unique_ptr<EventLoopTestFactory> factory_;
 
   int event_loop_count_ = 0;
 };
 
-typedef AbstractEventLoopTestBase AbstractEventLoopDeathTest;
-typedef AbstractEventLoopTestBase AbstractEventLoopTest;
+using AbstractEventLoopDeathTest = AbstractEventLoopTest;
 
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index a39a338..5ca1067 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -177,6 +177,7 @@
   event_loop_->context_.queue_index = 0xffffffffu;
   event_loop_->context_.size = 0;
   event_loop_->context_.data = nullptr;
+  event_loop_->context_.buffer_index = -1;
 
   ftrace_.FormatMessage(
       "timer: %.*s: start now=%" PRId64 " event=%" PRId64,
@@ -221,6 +222,7 @@
   event_loop_->context_.queue_index = 0xffffffffu;
   event_loop_->context_.size = 0;
   event_loop_->context_.data = nullptr;
+  event_loop_->context_.buffer_index = -1;
 
   // Compute how many cycles elapsed and schedule the next wakeup.
   Reschedule(schedule, monotonic_start_time);
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index d2c9112..6013709 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -53,6 +53,8 @@
   FLAGS_shm_base = std::string(base) + "/dev/shm/aos";
 }
 
+namespace {
+
 std::string ShmFolder(const Channel *channel) {
   CHECK(channel->has_name());
   CHECK_EQ(channel->name()->string_view()[0], '/');
@@ -60,10 +62,10 @@
 }
 std::string ShmPath(const Channel *channel) {
   CHECK(channel->has_type());
-  return ShmFolder(channel) + channel->type()->str() + ".v2";
+  return ShmFolder(channel) + channel->type()->str() + ".v3";
 }
 
-void PageFaultData(char *data, size_t size) {
+void PageFaultDataWrite(char *data, size_t size) {
   // This just has to divide the actual page size. Being smaller will make this
   // a bit slower than necessary, but not much. 1024 is a pretty conservative
   // choice (most pages are probably 4096).
@@ -88,18 +90,40 @@
   }
 }
 
+void PageFaultDataRead(const char *data, size_t size) {
+  // This just has to divide the actual page size. Being smaller will make this
+  // a bit slower than necessary, but not much. 1024 is a pretty conservative
+  // choice (most pages are probably 4096).
+  static constexpr size_t kPageSize = 1024;
+  const size_t pages = (size + kPageSize - 1) / kPageSize;
+  for (size_t i = 0; i < pages; ++i) {
+    // We need to ensure there's a readable pagetable entry.
+    __atomic_load_n(&data[i * kPageSize], __ATOMIC_RELAXED);
+  }
+}
+
+ipc_lib::LocklessQueueConfiguration MakeQueueConfiguration(
+    const Channel *channel, std::chrono::seconds channel_storage_duration) {
+  ipc_lib::LocklessQueueConfiguration config;
+
+  config.num_watchers = channel->num_watchers();
+  config.num_senders = channel->num_senders();
+  // The value in the channel will default to 0 if readers are configured to
+  // copy.
+  config.num_pinners = channel->num_readers();
+  config.queue_size = channel_storage_duration.count() * channel->frequency();
+  config.message_data_size = channel->max_size();
+
+  return config;
+}
+
 class MMapedQueue {
  public:
   MMapedQueue(const Channel *channel,
-              const std::chrono::seconds channel_storage_duration) {
+              std::chrono::seconds channel_storage_duration)
+      : config_(MakeQueueConfiguration(channel, channel_storage_duration)) {
     std::string path = ShmPath(channel);
 
-    config_.num_watchers = channel->num_watchers();
-    config_.num_senders = channel->num_senders();
-    config_.queue_size =
-        channel_storage_duration.count() * channel->frequency();
-    config_.message_data_size = channel->max_size();
-
     size_ = ipc_lib::LocklessQueueMemorySize(config_);
 
     util::MkdirP(path, FLAGS_permissions);
@@ -107,7 +131,7 @@
     // There are 2 cases.  Either the file already exists, or it does not
     // already exist and we need to create it.  Start by trying to create it. If
     // that fails, the file has already been created and we can open it
-    // normally..  Once the file has been created it wil never be deleted.
+    // normally..  Once the file has been created it will never be deleted.
     int fd = open(path.c_str(), O_RDWR | O_CREAT | O_EXCL,
                   O_CLOEXEC | FLAGS_permissions);
     if (fd == -1 && errno == EEXIST) {
@@ -138,33 +162,51 @@
 
     data_ = mmap(NULL, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
     PCHECK(data_ != MAP_FAILED);
+    const_data_ = mmap(NULL, size_, PROT_READ, MAP_SHARED, fd, 0);
+    PCHECK(const_data_ != MAP_FAILED);
     PCHECK(close(fd) == 0);
-    PageFaultData(static_cast<char *>(data_), size_);
+    PageFaultDataWrite(static_cast<char *>(data_), size_);
+    PageFaultDataRead(static_cast<const char *>(const_data_), size_);
 
     ipc_lib::InitializeLocklessQueueMemory(memory(), config_);
   }
 
-  ~MMapedQueue() { PCHECK(munmap(data_, size_) == 0); }
+  ~MMapedQueue() {
+    PCHECK(munmap(data_, size_) == 0);
+    PCHECK(munmap(const_cast<void *>(const_data_), size_) == 0);
+  }
 
   ipc_lib::LocklessQueueMemory *memory() const {
     return reinterpret_cast<ipc_lib::LocklessQueueMemory *>(data_);
   }
 
+  const ipc_lib::LocklessQueueMemory *const_memory() const {
+    return reinterpret_cast<const ipc_lib::LocklessQueueMemory *>(const_data_);
+  }
+
   const ipc_lib::LocklessQueueConfiguration &config() const { return config_; }
 
-  absl::Span<char> GetSharedMemory() const {
+  ipc_lib::LocklessQueue queue() const {
+    return ipc_lib::LocklessQueue(const_memory(), memory(), config());
+  }
+
+  absl::Span<char> GetMutableSharedMemory() const {
     return absl::Span<char>(static_cast<char *>(data_), size_);
   }
 
+  absl::Span<const char> GetConstSharedMemory() const {
+    return absl::Span<const char>(static_cast<const char *>(const_data_),
+                                  size_);
+  }
+
  private:
-  ipc_lib::LocklessQueueConfiguration config_;
+  const ipc_lib::LocklessQueueConfiguration config_;
 
   size_t size_;
   void *data_;
+  const void *const_data_;
 };
 
-namespace {
-
 const Node *MaybeMyNode(const Configuration *configuration) {
   if (!configuration->has_nodes()) {
     return nullptr;
@@ -197,8 +239,7 @@
             channel,
             chrono::ceil<chrono::seconds>(chrono::nanoseconds(
                 event_loop->configuration()->channel_storage_duration()))),
-        lockless_queue_(lockless_queue_memory_.memory(),
-                        lockless_queue_memory_.config()) {
+        reader_(lockless_queue_memory_.queue()) {
     context_.data = nullptr;
     // Point the queue index at the next index to read starting now.  This
     // makes it such that FetchNext will read the next message sent after
@@ -208,36 +249,59 @@
 
   ~SimpleShmFetcher() {}
 
+  // Sets this object to pin or copy data, as configured in the channel.
+  void RetrieveData() {
+    if (channel_->read_method() == ReadMethod::PIN) {
+      PinDataOnFetch();
+    } else {
+      CopyDataOnFetch();
+    }
+  }
+
   // Sets this object to copy data out of the shared memory into a private
   // buffer when fetching.
   void CopyDataOnFetch() {
+    CHECK(!pin_data());
     data_storage_.reset(static_cast<char *>(
         malloc(channel_->max_size() + kChannelDataAlignment - 1)));
   }
 
+  // Sets this object to pin data in shared memory when fetching.
+  void PinDataOnFetch() {
+    CHECK(!copy_data());
+    auto maybe_pinner =
+        ipc_lib::LocklessQueuePinner::Make(lockless_queue_memory_.queue());
+    if (!maybe_pinner) {
+      LOG(FATAL) << "Failed to create reader on "
+                 << configuration::CleanedChannelToString(channel_)
+                 << ", too many readers.";
+    }
+    pinner_ = std::move(maybe_pinner.value());
+  }
+
   // Points the next message to fetch at the queue index which will be
   // populated next.
   void PointAtNextQueueIndex() {
-    actual_queue_index_ = lockless_queue_.LatestQueueIndex();
+    actual_queue_index_ = reader_.LatestIndex();
     if (!actual_queue_index_.valid()) {
       // Nothing in the queue.  The next element will show up at the 0th
       // index in the queue.
-      actual_queue_index_ =
-          ipc_lib::QueueIndex::Zero(lockless_queue_.queue_size());
+      actual_queue_index_ = ipc_lib::QueueIndex::Zero(
+          LocklessQueueSize(lockless_queue_memory_.memory()));
     } else {
       actual_queue_index_ = actual_queue_index_.Increment();
     }
   }
 
   bool FetchNext() {
-    const ipc_lib::LocklessQueue::ReadResult read_result =
+    const ipc_lib::LocklessQueueReader::Result read_result =
         DoFetch(actual_queue_index_);
 
-    return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
+    return read_result == ipc_lib::LocklessQueueReader::Result::GOOD;
   }
 
   bool Fetch() {
-    const ipc_lib::QueueIndex queue_index = lockless_queue_.LatestQueueIndex();
+    const ipc_lib::QueueIndex queue_index = reader_.LatestIndex();
     // actual_queue_index_ is only meaningful if it was set by Fetch or
     // FetchNext.  This happens when valid_data_ has been set.  So, only
     // skip checking if valid_data_ is true.
@@ -250,50 +314,74 @@
       return false;
     }
 
-    const ipc_lib::LocklessQueue::ReadResult read_result = DoFetch(queue_index);
+    const ipc_lib::LocklessQueueReader::Result read_result =
+        DoFetch(queue_index);
 
-    CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW)
+    CHECK(read_result != ipc_lib::LocklessQueueReader::Result::NOTHING_NEW)
         << ": Queue index went backwards.  This should never happen.  "
         << configuration::CleanedChannelToString(channel_);
 
-    return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
+    return read_result == ipc_lib::LocklessQueueReader::Result::GOOD;
   }
 
   Context context() const { return context_; }
 
   bool RegisterWakeup(int priority) {
-    return lockless_queue_.RegisterWakeup(priority);
+    CHECK(!watcher_);
+    watcher_ = ipc_lib::LocklessQueueWatcher::Make(
+        lockless_queue_memory_.queue(), priority);
+    return static_cast<bool>(watcher_);
   }
 
-  void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
-
-  absl::Span<char> GetSharedMemory() const {
-    return lockless_queue_memory_.GetSharedMemory();
+  void UnregisterWakeup() {
+    CHECK(watcher_);
+    watcher_ = std::nullopt;
   }
 
-  absl::Span<char> GetPrivateMemory() const {
-    // Can't usefully expose this for pinning, because the buffer changes
-    // address for each message. Callers who want to work with that should just
-    // grab the whole shared memory buffer instead.
+  absl::Span<char> GetMutableSharedMemory() {
+    return lockless_queue_memory_.GetMutableSharedMemory();
+  }
+
+  absl::Span<const char> GetConstSharedMemory() const {
+    return lockless_queue_memory_.GetConstSharedMemory();
+  }
+
+  absl::Span<const char> GetPrivateMemory() const {
+    if (pin_data()) {
+      return lockless_queue_memory_.GetConstSharedMemory();
+    }
     return absl::Span<char>(
         const_cast<SimpleShmFetcher *>(this)->data_storage_start(),
-        lockless_queue_.message_data_size());
+        LocklessQueueMessageDataSize(lockless_queue_memory_.memory()));
   }
 
  private:
-  ipc_lib::LocklessQueue::ReadResult DoFetch(ipc_lib::QueueIndex queue_index) {
+  ipc_lib::LocklessQueueReader::Result DoFetch(
+      ipc_lib::QueueIndex queue_index) {
     // TODO(austin): Get behind and make sure it dies.
     char *copy_buffer = nullptr;
     if (copy_data()) {
       copy_buffer = data_storage_start();
     }
-    ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
+    ipc_lib::LocklessQueueReader::Result read_result = reader_.Read(
         queue_index.index(), &context_.monotonic_event_time,
         &context_.realtime_event_time, &context_.monotonic_remote_time,
         &context_.realtime_remote_time, &context_.remote_queue_index,
         &context_.size, copy_buffer);
 
-    if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
+    if (read_result == ipc_lib::LocklessQueueReader::Result::GOOD) {
+      if (pin_data()) {
+        const int pin_result = pinner_->PinIndex(queue_index.index());
+        CHECK(pin_result >= 0)
+            << ": Got behind while reading and the last message was modified "
+               "out from under us while we tried to pin it. Don't get so far "
+               "behind on: "
+            << configuration::CleanedChannelToString(channel_);
+        context_.buffer_index = pin_result;
+      } else {
+        context_.buffer_index = -1;
+      }
+
       context_.queue_index = queue_index.index();
       if (context_.remote_queue_index == 0xffffffffu) {
         context_.remote_queue_index = context_.queue_index;
@@ -307,7 +395,9 @@
       const char *const data = DataBuffer();
       if (data) {
         context_.data =
-            data + lockless_queue_.message_data_size() - context_.size;
+            data +
+            LocklessQueueMessageDataSize(lockless_queue_memory_.memory()) -
+            context_.size;
       } else {
         context_.data = nullptr;
       }
@@ -317,7 +407,7 @@
     // Make sure the data wasn't modified while we were reading it.  This
     // can only happen if you are reading the last message *while* it is
     // being written to, which means you are pretty far behind.
-    CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
+    CHECK(read_result != ipc_lib::LocklessQueueReader::Result::OVERWROTE)
         << ": Got behind while reading and the last message was modified "
            "out from under us while we were reading it.  Don't get so far "
            "behind on: "
@@ -326,7 +416,7 @@
     // We fell behind between when we read the index and read the value.
     // This isn't worth recovering from since this means we went to sleep
     // for a long time in the middle of this function.
-    if (read_result == ipc_lib::LocklessQueue::ReadResult::TOO_OLD) {
+    if (read_result == ipc_lib::LocklessQueueReader::Result::TOO_OLD) {
       event_loop_->SendTimingReport();
       LOG(FATAL) << "The next message is no longer available.  "
                  << configuration::CleanedChannelToString(channel_);
@@ -346,22 +436,30 @@
     if (copy_data()) {
       return data_storage_start();
     }
+    if (pin_data()) {
+      return static_cast<const char *>(pinner_->Data());
+    }
     return nullptr;
   }
 
   bool copy_data() const { return static_cast<bool>(data_storage_); }
+  bool pin_data() const { return static_cast<bool>(pinner_); }
 
   aos::ShmEventLoop *event_loop_;
   const Channel *const channel_;
   MMapedQueue lockless_queue_memory_;
-  ipc_lib::LocklessQueue lockless_queue_;
+  ipc_lib::LocklessQueueReader reader_;
+  // This being nullopt indicates we're not looking for wakeups right now.
+  std::optional<ipc_lib::LocklessQueueWatcher> watcher_;
 
-  ipc_lib::QueueIndex actual_queue_index_ =
-      ipc_lib::LocklessQueue::empty_queue_index();
+  ipc_lib::QueueIndex actual_queue_index_ = ipc_lib::QueueIndex::Invalid();
 
   // This being empty indicates we're not going to copy data.
   std::unique_ptr<char, decltype(&free)> data_storage_{nullptr, &free};
 
+  // This being nullopt indicates we're not going to pin messages.
+  std::optional<ipc_lib::LocklessQueuePinner> pinner_;
+
   Context context_;
 };
 
@@ -370,7 +468,7 @@
   explicit ShmFetcher(ShmEventLoop *event_loop, const Channel *channel)
       : RawFetcher(event_loop, channel),
         simple_shm_fetcher_(event_loop, channel) {
-    simple_shm_fetcher_.CopyDataOnFetch();
+    simple_shm_fetcher_.RetrieveData();
   }
 
   ~ShmFetcher() { context_.data = nullptr; }
@@ -391,7 +489,7 @@
     return std::make_pair(false, monotonic_clock::min_time);
   }
 
-  absl::Span<char> GetPrivateMemory() const {
+  absl::Span<const char> GetPrivateMemory() const {
     return simple_shm_fetcher_.GetPrivateMemory();
   }
 
@@ -407,15 +505,15 @@
             channel,
             chrono::ceil<chrono::seconds>(chrono::nanoseconds(
                 event_loop->configuration()->channel_storage_duration()))),
-        lockless_queue_(lockless_queue_memory_.memory(),
-                        lockless_queue_memory_.config()),
-        lockless_queue_sender_(
-            VerifySender(lockless_queue_.MakeSender(), channel)) {}
+        lockless_queue_sender_(VerifySender(
+            ipc_lib::LocklessQueueSender::Make(lockless_queue_memory_.queue()),
+            channel)),
+        wake_upper_(lockless_queue_memory_.queue()) {}
 
   ~ShmSender() override {}
 
-  static ipc_lib::LocklessQueue::Sender VerifySender(
-      std::optional<ipc_lib::LocklessQueue::Sender> &&sender,
+  static ipc_lib::LocklessQueueSender VerifySender(
+      std::optional<ipc_lib::LocklessQueueSender> sender,
       const Channel *channel) {
     if (sender) {
       return std::move(sender.value());
@@ -437,7 +535,7 @@
     lockless_queue_sender_.Send(
         length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
         &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_);
-    lockless_queue_.Wakeup(event_loop()->priority());
+    wake_upper_.Wakeup(event_loop()->priority());
     return true;
   }
 
@@ -452,19 +550,21 @@
                                 monotonic_remote_time, realtime_remote_time,
                                 remote_queue_index, &monotonic_sent_time_,
                                 &realtime_sent_time_, &sent_queue_index_);
-    lockless_queue_.Wakeup(event_loop()->priority());
+    wake_upper_.Wakeup(event_loop()->priority());
     // TODO(austin): Return an error if we send too fast.
     return true;
   }
 
   absl::Span<char> GetSharedMemory() const {
-    return lockless_queue_memory_.GetSharedMemory();
+    return lockless_queue_memory_.GetMutableSharedMemory();
   }
 
+  int buffer_index() override { return lockless_queue_sender_.buffer_index(); }
+
  private:
   MMapedQueue lockless_queue_memory_;
-  ipc_lib::LocklessQueue lockless_queue_;
-  ipc_lib::LocklessQueue::Sender lockless_queue_sender_;
+  ipc_lib::LocklessQueueSender lockless_queue_sender_;
+  ipc_lib::LocklessQueueWakeUpper wake_upper_;
 };
 
 // Class to manage the state for a Watcher.
@@ -479,7 +579,7 @@
         event_(this),
         simple_shm_fetcher_(event_loop, channel) {
     if (copy_data) {
-      simple_shm_fetcher_.CopyDataOnFetch();
+      simple_shm_fetcher_.RetrieveData();
     }
   }
 
@@ -520,8 +620,8 @@
 
   void UnregisterWakeup() { return simple_shm_fetcher_.UnregisterWakeup(); }
 
-  absl::Span<char> GetSharedMemory() const {
-    return simple_shm_fetcher_.GetSharedMemory();
+  absl::Span<const char> GetSharedMemory() const {
+    return simple_shm_fetcher_.GetConstSharedMemory();
   }
 
  private:
@@ -948,18 +1048,26 @@
   UpdateTimingReport();
 }
 
-absl::Span<char> ShmEventLoop::GetWatcherSharedMemory(const Channel *channel) {
+absl::Span<const char> ShmEventLoop::GetWatcherSharedMemory(
+    const Channel *channel) {
   ShmWatcherState *const watcher_state =
       static_cast<ShmWatcherState *>(GetWatcherState(channel));
   return watcher_state->GetSharedMemory();
 }
 
+int ShmEventLoop::NumberBuffers(const Channel *channel) {
+  return MakeQueueConfiguration(
+             channel, chrono::ceil<chrono::seconds>(chrono::nanoseconds(
+                          configuration()->channel_storage_duration())))
+      .num_messages();
+}
+
 absl::Span<char> ShmEventLoop::GetShmSenderSharedMemory(
     const aos::RawSender *sender) const {
   return static_cast<const ShmSender *>(sender)->GetSharedMemory();
 }
 
-absl::Span<char> ShmEventLoop::GetShmFetcherPrivateMemory(
+absl::Span<const char> ShmEventLoop::GetShmFetcherPrivateMemory(
     const aos::RawFetcher *fetcher) const {
   return static_cast<const ShmFetcher *>(fetcher)->GetPrivateMemory();
 }
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index 57d3b98..8dabcb5 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -24,8 +24,8 @@
 
 }  // namespace shm_event_loop_internal
 
-// Specialization of EventLoop that is built from queues running out of shared
-// memory.
+// Concrete implementation of EventLoop that is built from queues running out of
+// shared memory.
 //
 // TODO(austin): Timing reports break multiple threads.  Need to add back in a
 // mutex.
@@ -83,7 +83,7 @@
   // Returns the local mapping of the shared memory used by the watcher on the
   // specified channel. A watcher must be created on this channel before calling
   // this.
-  absl::Span<char> GetWatcherSharedMemory(const Channel *channel);
+  absl::Span<const char> GetWatcherSharedMemory(const Channel *channel);
 
   // Returns the local mapping of the shared memory used by the provided Sender.
   template <typename T>
@@ -93,11 +93,17 @@
 
   // Returns the local mapping of the private memory used by the provided
   // Fetcher to hold messages.
+  //
+  // Note that this may be the entire shared memory region held by this fetcher,
+  // depending on its channel's read_method.
   template <typename T>
-  absl::Span<char> GetFetcherPrivateMemory(aos::Fetcher<T> *fetcher) const {
+  absl::Span<const char> GetFetcherPrivateMemory(
+      aos::Fetcher<T> *fetcher) const {
     return GetShmFetcherPrivateMemory(GetRawFetcher(fetcher));
   }
 
+  int NumberBuffers(const Channel *channel) override;
+
  private:
   friend class shm_event_loop_internal::ShmWatcherState;
   friend class shm_event_loop_internal::ShmTimerHandler;
@@ -125,7 +131,7 @@
   absl::Span<char> GetShmSenderSharedMemory(const aos::RawSender *sender) const;
 
   // Private method to access the private memory mapping of a ShmFetcher.
-  absl::Span<char> GetShmFetcherPrivateMemory(
+  absl::Span<const char> GetShmFetcherPrivateMemory(
       const aos::RawFetcher *fetcher) const;
 
   std::vector<std::function<void()>> on_run_;
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index d25e2f8..d9a8872 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -25,12 +25,12 @@
     }
 
     // Clean up anything left there before.
-    unlink((FLAGS_shm_base + "/test/aos.TestMessage.v2").c_str());
-    unlink((FLAGS_shm_base + "/test1/aos.TestMessage.v2").c_str());
-    unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v2").c_str());
-    unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v2").c_str());
-    unlink((FLAGS_shm_base + "/aos/aos.timing.Report.v2").c_str());
-    unlink((FLAGS_shm_base + "/aos/aos.logging.LogMessageFbs.v2").c_str());
+    unlink((FLAGS_shm_base + "/test/aos.TestMessage.v3").c_str());
+    unlink((FLAGS_shm_base + "/test1/aos.TestMessage.v3").c_str());
+    unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v3").c_str());
+    unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v3").c_str());
+    unlink((FLAGS_shm_base + "/aos/aos.timing.Report.v3").c_str());
+    unlink((FLAGS_shm_base + "/aos/aos.logging.LogMessageFbs.v3").c_str());
   }
 
   ~ShmEventLoopTestFactory() { FLAGS_override_hostname = ""; }
@@ -69,15 +69,25 @@
   ::aos::ShmEventLoop *primary_event_loop_;
 };
 
-INSTANTIATE_TEST_CASE_P(ShmEventLoopTest, AbstractEventLoopTest,
-                        ::testing::Values([]() {
-                          return new ShmEventLoopTestFactory();
-                        }));
+INSTANTIATE_TEST_CASE_P(ShmEventLoopCopyTest, AbstractEventLoopTest,
+                        ::testing::Values(std::make_pair(
+                            []() { return new ShmEventLoopTestFactory(); },
+                            ReadMethod::COPY)));
 
-INSTANTIATE_TEST_CASE_P(ShmEventLoopDeathTest, AbstractEventLoopDeathTest,
-                        ::testing::Values([]() {
-                          return new ShmEventLoopTestFactory();
-                        }));
+INSTANTIATE_TEST_CASE_P(ShmEventLoopCopyDeathTest, AbstractEventLoopDeathTest,
+                        ::testing::Values(std::make_pair(
+                            []() { return new ShmEventLoopTestFactory(); },
+                            ReadMethod::COPY)));
+
+INSTANTIATE_TEST_CASE_P(ShmEventLoopPinTest, AbstractEventLoopTest,
+                        ::testing::Values(std::make_pair(
+                            []() { return new ShmEventLoopTestFactory(); },
+                            ReadMethod::PIN)));
+
+INSTANTIATE_TEST_CASE_P(ShmEventLoopPinDeathTest, AbstractEventLoopDeathTest,
+                        ::testing::Values(std::make_pair(
+                            []() { return new ShmEventLoopTestFactory(); },
+                            ReadMethod::PIN)));
 
 }  // namespace
 
@@ -89,12 +99,27 @@
   return scheduler == SCHED_FIFO || scheduler == SCHED_RR;
 }
 
+class ShmEventLoopTest : public ::testing::TestWithParam<ReadMethod> {
+ public:
+  ShmEventLoopTest() {
+    if (GetParam() == ReadMethod::PIN) {
+      factory_.PinReads();
+    }
+  }
+
+  ShmEventLoopTestFactory *factory() { return &factory_; }
+
+ private:
+  ShmEventLoopTestFactory factory_;
+};
+
+using ShmEventLoopDeathTest = ShmEventLoopTest;
+
 // Tests that every handler type is realtime and runs.  There are threads
 // involved and it's easy to miss one.
-TEST(ShmEventLoopTest, AllHandlersAreRealtime) {
-  ShmEventLoopTestFactory factory;
-  auto loop = factory.MakePrimary("primary");
-  auto loop2 = factory.Make("loop2");
+TEST_P(ShmEventLoopTest, AllHandlersAreRealtime) {
+  auto loop = factory()->MakePrimary("primary");
+  auto loop2 = factory()->Make("loop2");
 
   loop->SetRuntimeRealtimePriority(1);
 
@@ -104,10 +129,10 @@
   bool did_timer = false;
   bool did_watcher = false;
 
-  auto timer = loop->AddTimer([&did_timer, &factory]() {
+  auto timer = loop->AddTimer([this, &did_timer]() {
     EXPECT_TRUE(IsRealtime());
     did_timer = true;
-    factory.Exit();
+    factory()->Exit();
   });
 
   loop->MakeWatcher("/test", [&did_watcher](const TestMessage &) {
@@ -126,7 +151,7 @@
     msg.Send(builder.Finish());
   });
 
-  factory.Run();
+  factory()->Run();
 
   EXPECT_TRUE(did_onrun);
   EXPECT_TRUE(did_timer);
@@ -135,16 +160,15 @@
 
 // Tests that missing a deadline inside the function still results in PhasedLoop
 // running at the right offset.
-TEST(ShmEventLoopTest, DelayedPhasedLoop) {
-  ShmEventLoopTestFactory factory;
-  auto loop1 = factory.MakePrimary("primary");
+TEST_P(ShmEventLoopTest, DelayedPhasedLoop) {
+  auto loop1 = factory()->MakePrimary("primary");
 
   ::std::vector<::aos::monotonic_clock::time_point> times;
 
   constexpr chrono::milliseconds kOffset = chrono::milliseconds(400);
 
   loop1->AddPhasedLoop(
-      [&times, &loop1, &kOffset, &factory](int count) {
+      [this, &times, &loop1, &kOffset](int count) {
         const ::aos::monotonic_clock::time_point monotonic_now =
             loop1->monotonic_now();
 
@@ -169,7 +193,7 @@
 
         times.push_back(loop1->monotonic_now());
         if (times.size() == 2) {
-          factory.Exit();
+          factory()->Exit();
         }
 
         // Now, add a large delay.  This should push us up to 3 cycles.
@@ -177,15 +201,14 @@
       },
       chrono::seconds(1), kOffset);
 
-  factory.Run();
+  factory()->Run();
 
   EXPECT_EQ(times.size(), 2u);
 }
 
 // Test GetWatcherSharedMemory in a few basic scenarios.
-TEST(ShmEventLoopDeathTest, GetWatcherSharedMemory) {
-  ShmEventLoopTestFactory factory;
-  auto generic_loop1 = factory.MakePrimary("primary");
+TEST_P(ShmEventLoopDeathTest, GetWatcherSharedMemory) {
+  auto generic_loop1 = factory()->MakePrimary("primary");
   ShmEventLoop *const loop1 = static_cast<ShmEventLoop *>(generic_loop1.get());
   const auto channel = configuration::GetChannel(
       loop1->configuration(), "/test", TestMessage::GetFullyQualifiedName(),
@@ -196,31 +219,85 @@
                "No watcher found for channel");
 
   // Then, actually create a watcher, and verify it returns something sane.
-  loop1->MakeWatcher("/test", [](const TestMessage &) {});
-  EXPECT_FALSE(loop1->GetWatcherSharedMemory(channel).empty());
+  absl::Span<const char> shared_memory;
+  bool ran = false;
+  loop1->MakeWatcher("/test", [this, &shared_memory,
+                               &ran](const TestMessage &message) {
+    EXPECT_FALSE(ran);
+    ran = true;
+    // If we're using pinning, then we can verify that the message is actually
+    // in the specified region.
+    if (GetParam() == ReadMethod::PIN) {
+      EXPECT_GE(reinterpret_cast<const char *>(&message),
+                shared_memory.begin());
+      EXPECT_LT(reinterpret_cast<const char *>(&message), shared_memory.end());
+    }
+    factory()->Exit();
+  });
+  shared_memory = loop1->GetWatcherSharedMemory(channel);
+  EXPECT_FALSE(shared_memory.empty());
+
+  auto loop2 = factory()->Make("sender");
+  auto sender = loop2->MakeSender<TestMessage>("/test");
+  generic_loop1->OnRun([&sender]() {
+    auto builder = sender.MakeBuilder();
+    TestMessage::Builder test_builder(*builder.fbb());
+    test_builder.add_value(1);
+    CHECK(builder.Send(test_builder.Finish()));
+  });
+  factory()->Run();
+  EXPECT_TRUE(ran);
 }
 
-TEST(ShmEventLoopTest, GetSenderSharedMemory) {
-  ShmEventLoopTestFactory factory;
-  auto generic_loop1 = factory.MakePrimary("primary");
+TEST_P(ShmEventLoopTest, GetSenderSharedMemory) {
+  auto generic_loop1 = factory()->MakePrimary("primary");
   ShmEventLoop *const loop1 = static_cast<ShmEventLoop *>(generic_loop1.get());
 
-  // check that GetSenderSharedMemory returns non-null/non-empty memory span.
+  // Check that GetSenderSharedMemory returns non-null/non-empty memory span.
   auto sender = loop1->MakeSender<TestMessage>("/test");
-  EXPECT_FALSE(loop1->GetSenderSharedMemory(&sender).empty());
+  const absl::Span<char> shared_memory = loop1->GetSenderSharedMemory(&sender);
+  EXPECT_FALSE(shared_memory.empty());
+
+  auto builder = sender.MakeBuilder();
+  uint8_t *buffer;
+  builder.fbb()->CreateUninitializedVector(5, 1, &buffer);
+  EXPECT_GE(reinterpret_cast<char *>(buffer), shared_memory.begin());
+  EXPECT_LT(reinterpret_cast<char *>(buffer), shared_memory.end());
 }
 
-TEST(ShmEventLoopTest, GetFetcherPrivateMemory) {
-  ShmEventLoopTestFactory factory;
-  auto generic_loop1 = factory.MakePrimary("primary");
+TEST_P(ShmEventLoopTest, GetFetcherPrivateMemory) {
+  auto generic_loop1 = factory()->MakePrimary("primary");
   ShmEventLoop *const loop1 = static_cast<ShmEventLoop *>(generic_loop1.get());
 
-  // check that GetFetcherPrivateMemory returns non-null/non-empty memory span.
+  // Check that GetFetcherPrivateMemory returns non-null/non-empty memory span.
   auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
-  EXPECT_FALSE(loop1->GetFetcherPrivateMemory(&fetcher).empty());
+  const auto private_memory = loop1->GetFetcherPrivateMemory(&fetcher);
+  EXPECT_FALSE(private_memory.empty());
+
+  auto loop2 = factory()->Make("sender");
+  auto sender = loop2->MakeSender<TestMessage>("/test");
+  {
+    auto builder = sender.MakeBuilder();
+    TestMessage::Builder test_builder(*builder.fbb());
+    test_builder.add_value(1);
+    CHECK(builder.Send(test_builder.Finish()));
+  }
+
+  ASSERT_TRUE(fetcher.Fetch());
+  EXPECT_GE(fetcher.context().data, private_memory.begin());
+  EXPECT_LT(fetcher.context().data, private_memory.end());
 }
 
 // TODO(austin): Test that missing a deadline with a timer recovers as expected.
 
+INSTANTIATE_TEST_CASE_P(ShmEventLoopCopyTest, ShmEventLoopTest,
+                        ::testing::Values(ReadMethod::COPY));
+INSTANTIATE_TEST_CASE_P(ShmEventLoopPinTest, ShmEventLoopTest,
+                        ::testing::Values(ReadMethod::PIN));
+INSTANTIATE_TEST_CASE_P(ShmEventLoopCopyDeathTest, ShmEventLoopDeathTest,
+                        ::testing::Values(ReadMethod::COPY));
+INSTANTIATE_TEST_CASE_P(ShmEventLoopPinDeathTest, ShmEventLoopDeathTest,
+                        ::testing::Values(ReadMethod::PIN));
+
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index bff04b9..c339ce0 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -33,7 +33,6 @@
   Context context;
 
   SimulatedChannel *const channel = nullptr;
-  int buffer_index;
 
   // The data.
   char *data(size_t buffer_size) {
@@ -82,9 +81,10 @@
 
   ::std::deque<std::shared_ptr<SimulatedMessage>> msgs_;
 
-  SimulatedEventLoop *simulated_event_loop_;
+  SimulatedEventLoop *const simulated_event_loop_;
+  const Channel *const channel_;
+  EventScheduler *const scheduler_;
   EventHandler<SimulatedWatcher> event_;
-  EventScheduler *scheduler_;
   EventScheduler::Token token_;
   SimulatedChannel *simulated_channel_ = nullptr;
 };
@@ -184,13 +184,33 @@
     }
     ++sender_count_;
   }
+
   void CountSenderDestroyed() {
     --sender_count_;
     CHECK_GE(sender_count_, 0);
   }
 
  private:
-  void CheckBufferCount() { CHECK_LT(sender_count_, number_scratch_buffers()); }
+  void CheckBufferCount() {
+    int reader_count = 0;
+    if (channel()->read_method() == ReadMethod::PIN) {
+      reader_count = watchers_.size() + fetchers_.size();
+    }
+    CHECK_LT(reader_count + sender_count_, number_scratch_buffers());
+  }
+
+  void CheckReaderCount() {
+    if (channel()->read_method() != ReadMethod::PIN) {
+      return;
+    }
+    CheckBufferCount();
+    const int reader_count = watchers_.size() + fetchers_.size();
+    if (reader_count >= channel()->num_readers()) {
+      LOG(FATAL) << "Failed to create reader on "
+                 << configuration::CleanedChannelToString(channel())
+                 << ", too many readers.";
+    }
+  }
 
   const Channel *const channel_;
   EventScheduler *const scheduler_;
@@ -227,11 +247,11 @@
 
 SimulatedMessage::SimulatedMessage(SimulatedChannel *channel_in)
     : channel(channel_in) {
-  buffer_index = channel->GetBufferIndex();
+  context.buffer_index = channel->GetBufferIndex();
 }
 
 SimulatedMessage::~SimulatedMessage() {
-  channel->FreeBufferIndex(buffer_index);
+  channel->FreeBufferIndex(context.buffer_index);
 }
 
 class SimulatedSender : public RawSender {
@@ -299,6 +319,12 @@
                   remote_queue_index);
   }
 
+  int buffer_index() override {
+    // First, ensure message_ is allocated.
+    data();
+    return message_->context.buffer_index;
+  }
+
  private:
   SimulatedChannel *simulated_channel_;
   EventLoop *event_loop_;
@@ -354,6 +380,9 @@
   void SetMsg(std::shared_ptr<SimulatedMessage> msg) {
     msg_ = msg;
     context_ = msg_->context;
+    if (channel()->read_method() != ReadMethod::PIN) {
+      context_.buffer_index = -1;
+    }
     if (context_.remote_queue_index == 0xffffffffu) {
       context_.remote_queue_index = context_.queue_index;
     }
@@ -547,6 +576,8 @@
     }
   }
 
+  int NumberBuffers(const Channel *channel) override;
+
  private:
   friend class SimulatedTimerHandler;
   friend class SimulatedPhasedLoopHandler;
@@ -644,14 +675,19 @@
   return it->second.get();
 }
 
+int SimulatedEventLoop::NumberBuffers(const Channel *channel) {
+  return GetSimulatedChannel(channel)->number_buffers();
+}
+
 SimulatedWatcher::SimulatedWatcher(
     SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
     const Channel *channel,
     std::function<void(const Context &context, const void *message)> fn)
     : WatcherState(simulated_event_loop, channel, std::move(fn)),
       simulated_event_loop_(simulated_event_loop),
-      event_(this),
+      channel_(channel),
       scheduler_(scheduler),
+      event_(this),
       token_(scheduler_->InvalidToken()) {}
 
 SimulatedWatcher::~SimulatedWatcher() {
@@ -659,7 +695,7 @@
   if (token_ != scheduler_->InvalidToken()) {
     scheduler_->Deschedule(token_);
   }
-  simulated_channel_->RemoveWatcher(this);
+  CHECK_NOTNULL(simulated_channel_)->RemoveWatcher(this);
 }
 
 void SimulatedWatcher::Schedule(std::shared_ptr<SimulatedMessage> message) {
@@ -689,6 +725,9 @@
   }
   Context context = msgs_.front()->context;
 
+  if (channel_->read_method() != ReadMethod::PIN) {
+    context.buffer_index = -1;
+  }
   if (context.remote_queue_index == 0xffffffffu) {
     context.remote_queue_index = context.queue_index;
   }
@@ -719,6 +758,7 @@
 }
 
 void SimulatedChannel::MakeRawWatcher(SimulatedWatcher *watcher) {
+  CheckReaderCount();
   watcher->SetSimulatedChannel(this);
   watchers_.emplace_back(watcher);
 }
@@ -730,6 +770,7 @@
 
 ::std::unique_ptr<RawFetcher> SimulatedChannel::MakeRawFetcher(
     EventLoop *event_loop) {
+  CheckReaderCount();
   ::std::unique_ptr<SimulatedFetcher> fetcher(
       new SimulatedFetcher(event_loop, this));
   fetchers_.push_back(fetcher.get());
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 513dc1b..78c0d44 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -51,15 +51,31 @@
   std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
 };
 
-INSTANTIATE_TEST_CASE_P(SimulatedEventLoopDeathTest, AbstractEventLoopDeathTest,
-                        ::testing::Values([]() {
-                          return new SimulatedEventLoopTestFactory();
-                        }));
+INSTANTIATE_TEST_CASE_P(SimulatedEventLoopCopyTest, AbstractEventLoopTest,
+                        ::testing::Values(std::make_tuple(
+                            []() {
+                              return new SimulatedEventLoopTestFactory();
+                            },
+                            ReadMethod::COPY)));
 
-INSTANTIATE_TEST_CASE_P(SimulatedEventLoopTest, AbstractEventLoopTest,
-                        ::testing::Values([]() {
-                          return new SimulatedEventLoopTestFactory();
-                        }));
+INSTANTIATE_TEST_CASE_P(
+    SimulatedEventLoopCopyDeathTest, AbstractEventLoopDeathTest,
+    ::testing::Values(
+        std::make_tuple([]() { return new SimulatedEventLoopTestFactory(); },
+                        ReadMethod::COPY)));
+
+INSTANTIATE_TEST_CASE_P(SimulatedEventLoopPinTest, AbstractEventLoopTest,
+                        ::testing::Values(std::make_tuple(
+                            []() {
+                              return new SimulatedEventLoopTestFactory();
+                            },
+                            ReadMethod::PIN)));
+
+INSTANTIATE_TEST_CASE_P(
+    SimulatedEventLoopPinDeathTest, AbstractEventLoopDeathTest,
+    ::testing::Values(
+        std::make_tuple([]() { return new SimulatedEventLoopTestFactory(); },
+                        ReadMethod::PIN)));
 
 // Test that creating an event and running the scheduler runs the event.
 TEST(EventSchedulerTest, ScheduleEvent) {
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index 5e14abc..6c14200 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -134,6 +134,9 @@
     srcs = ["index.cc"],
     hdrs = ["index.h"],
     visibility = ["//visibility:public"],
+    deps = [
+        "@com_github_google_glog//:glog",
+    ],
 )
 
 cc_test(
diff --git a/aos/ipc_lib/index.h b/aos/ipc_lib/index.h
index 7d979ea..a47121e 100644
--- a/aos/ipc_lib/index.h
+++ b/aos/ipc_lib/index.h
@@ -5,6 +5,8 @@
 #include <atomic>
 #include <string>
 
+#include "glog/logging.h"
+
 namespace aos {
 namespace ipc_lib {
 
@@ -52,9 +54,7 @@
   }
 
   // Gets the next index.
-  QueueIndex Increment() const {
-    return IncrementBy(1u);
-  }
+  QueueIndex Increment() const { return IncrementBy(1u); }
 
   // Gets the nth next element.
   QueueIndex IncrementBy(uint32_t amount) const {
@@ -133,12 +133,12 @@
 struct AtomicQueueIndex {
  public:
   // Atomically reads the index without any ordering constraints.
-  QueueIndex RelaxedLoad(uint32_t count) {
+  QueueIndex RelaxedLoad(uint32_t count) const {
     return QueueIndex(index_.load(::std::memory_order_relaxed), count);
   }
 
   // Full bidirectional barriers here.
-  QueueIndex Load(uint32_t count) {
+  QueueIndex Load(uint32_t count) const {
     return QueueIndex(index_.load(::std::memory_order_acquire), count);
   }
   inline void Store(QueueIndex value) {
@@ -148,6 +148,10 @@
   // Invalidates the element unconditionally.
   inline void Invalidate() { Store(QueueIndex::Invalid()); }
 
+  inline void RelaxedInvalidate() {
+    index_.store(QueueIndex::Invalid().index_, ::std::memory_order_relaxed);
+  }
+
   // Swaps expected for index atomically.  Returns true on success, false
   // otherwise.
   inline bool CompareAndExchangeStrong(QueueIndex expected, QueueIndex index) {
@@ -168,7 +172,9 @@
       : Index(queue_index.index_, message_index) {}
   Index(uint32_t queue_index, uint16_t message_index)
       : index_((queue_index & 0xffff) |
-               (static_cast<uint32_t>(message_index) << 16)) {}
+               (static_cast<uint32_t>(message_index) << 16)) {
+    CHECK_LE(message_index, MaxMessages());
+  }
 
   // Index of this message in the message array.
   uint16_t message_index() const { return (index_ >> 16) & 0xffff; }
@@ -193,13 +199,13 @@
   static constexpr uint16_t MaxMessages() { return 0xfffe; }
 
   bool operator==(const Index other) const { return other.index_ == index_; }
+  bool operator!=(const Index other) const { return other.index_ != index_; }
 
   // Returns a string representing the index.
   ::std::string DebugString() const;
 
  private:
-  Index(uint32_t index)
-      : index_(index) {}
+  Index(uint32_t index) : index_(index) {}
 
   friend class AtomicIndex;
 
@@ -216,7 +222,7 @@
 class AtomicIndex {
  public:
   // Stores and loads atomically without ordering constraints.
-  Index RelaxedLoad() {
+  Index RelaxedLoad() const {
     return Index(index_.load(::std::memory_order_relaxed));
   }
   void RelaxedStore(Index index) {
@@ -231,15 +237,20 @@
   void Store(Index index) {
     index_.store(index.index_, ::std::memory_order_release);
   }
-  Index Load() { return Index(index_.load(::std::memory_order_acquire)); }
+  Index Load() const { return Index(index_.load(::std::memory_order_acquire)); }
 
   // Swaps expected for index atomically.  Returns true on success, false
   // otherwise.
-  inline bool CompareAndExchangeStrong(Index expected, Index index) {
+  bool CompareAndExchangeStrong(Index expected, Index index) {
     return index_.compare_exchange_strong(expected.index_, index.index_,
                                           ::std::memory_order_acq_rel);
   }
 
+  bool CompareAndExchangeWeak(Index *expected, Index index) {
+    return index_.compare_exchange_weak(expected->index_, index.index_,
+                                        ::std::memory_order_acq_rel);
+  }
+
  private:
   ::std::atomic<uint32_t> index_;
 };
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index d9a1a71..4115769 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -38,6 +38,97 @@
   LocklessQueueMemory *const memory_;
 };
 
+bool IsPinned(LocklessQueueMemory *memory, Index index) {
+  DCHECK(index.valid());
+  const size_t queue_size = memory->queue_size();
+  const QueueIndex message_index =
+      memory->GetMessage(index)->header.queue_index.Load(queue_size);
+  if (!message_index.valid()) {
+    return false;
+  }
+  DCHECK(memory->GetQueue(message_index.Wrapped())->Load() != index)
+      << ": Message is in the queue";
+  for (int pinner_index = 0;
+       pinner_index < static_cast<int>(memory->config.num_pinners);
+       ++pinner_index) {
+    ipc_lib::Pinner *const pinner = memory->GetPinner(pinner_index);
+
+    if (pinner->pinned.RelaxedLoad(queue_size) == message_index) {
+      return true;
+    }
+  }
+  return false;
+}
+
+// Ensures sender->scratch_index (which must contain to_replace) is not pinned.
+//
+// Returns the new scratch_index value.
+Index SwapPinnedSenderScratch(LocklessQueueMemory *const memory,
+                              ipc_lib::Sender *const sender,
+                              const Index to_replace) {
+  // If anybody's trying to pin this message, then grab a message from a pinner
+  // to write into instead, and leave the message we pulled out of the queue
+  // (currently in our scratch_index) with a pinner.
+  //
+  // This loop will terminate in at most one iteration through the pinners in
+  // any steady-state configuration of the memory. There are only as many
+  // Pinner::pinned values to worry about as there are Pinner::scratch_index
+  // values to check against, plus to_replace, which means there will always be
+  // a free one. We might have to make multiple passes if things are being
+  // changed concurrently though, but nobody dying can make this loop fail to
+  // terminate (because the number of processes that can die is bounded, because
+  // no new ones can start while we've got the lock).
+  for (int pinner_index = 0; true;
+       pinner_index = (pinner_index + 1) % memory->config.num_pinners) {
+    if (!IsPinned(memory, to_replace)) {
+      // No pinners on our current scratch_index, so we're fine now.
+      VLOG(3) << "No pinners: " << to_replace.DebugString();
+      return to_replace;
+    }
+
+    ipc_lib::Pinner *const pinner = memory->GetPinner(pinner_index);
+
+    const Index pinner_scratch = pinner->scratch_index.RelaxedLoad();
+    CHECK(pinner_scratch.valid())
+        << ": Pinner scratch_index should always be valid";
+    if (IsPinned(memory, pinner_scratch)) {
+      // Wouldn't do us any good to swap with this one, so don't bother, and
+      // move onto the next one.
+      VLOG(3) << "Also pinned: " << pinner_scratch.DebugString();
+      continue;
+    }
+
+    sender->to_replace.RelaxedStore(pinner_scratch);
+    aos_compiler_memory_barrier();
+    // Give the pinner the message (which is currently in
+    // sender->scratch_index).
+    if (!pinner->scratch_index.CompareAndExchangeStrong(pinner_scratch,
+                                                        to_replace)) {
+      // Somebody swapped into this pinner before us. The new value is probably
+      // pinned, so we don't want to look at it again immediately.
+      VLOG(3) << "Pinner " << pinner_index
+              << " scratch_index changed: " << pinner_scratch.DebugString()
+              << ", " << to_replace.DebugString();
+      sender->to_replace.RelaxedInvalidate();
+      continue;
+    }
+    aos_compiler_memory_barrier();
+    // Now update the sender's scratch space and record that we succeeded.
+    sender->scratch_index.Store(pinner_scratch);
+    aos_compiler_memory_barrier();
+    // And then record that we succeeded, but definitely after the above
+    // store.
+    sender->to_replace.RelaxedInvalidate();
+    VLOG(3) << "Got new scratch message: " << pinner_scratch.DebugString();
+
+    // If it's in a pinner's scratch_index, it should not be in the queue, which
+    // means nobody new can pin it for real. However, they can still attempt to
+    // pin it, which means we can't verify !IsPinned down here.
+
+    return pinner_scratch;
+  }
+}
+
 // Returns true if it succeeded. Returns false if another sender died in the
 // middle.
 bool DoCleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &) {
@@ -48,6 +139,7 @@
   aos_compiler_memory_barrier();
 
   const size_t num_senders = memory->num_senders();
+  const size_t num_pinners = memory->num_pinners();
   const size_t queue_size = memory->queue_size();
   const size_t num_messages = memory->num_messages();
 
@@ -105,11 +197,17 @@
     //    to_replace = yyy
     // We are in the act of moving to_replace to scratch_index, but didn't
     // finish.  Easy.
+    //
+    // If doing a pinner swap, we've definitely done it.
 
     // 4) scratch_index = yyy
     //    to_replace = invalid
     // Finished, but died.  Looks like 1)
 
+    // Swapping with a pinner's scratch_index passes through the same states.
+    // We just need to ensure the message that ends up in the senders's
+    // scratch_index isn't pinned, using the same code as sending does.
+
     // Any cleanup code needs to follow the same set of states to be robust to
     // death, so death can be restarted.
 
@@ -117,6 +215,14 @@
       // 1) or 4).  Make sure we aren't corrupted and declare victory.
       CHECK(scratch_index.valid());
 
+      // If it's in 1) with a pinner, the sender might have a pinned message,
+      // so fix that.
+      SwapPinnedSenderScratch(memory, sender, scratch_index);
+
+      // If it's in 4), it may not have completed this step yet. This will
+      // always be a NOP if it's in 1), verified by a DCHECK.
+      memory->GetMessage(scratch_index)->header.queue_index.RelaxedInvalidate();
+
       __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
       ++valid_senders;
       continue;
@@ -129,6 +235,11 @@
       // Just need to invalidate to_replace to finish.
       sender->to_replace.Invalidate();
 
+      // Make sure to indicate it's an unused message before a sender gets its
+      // hands on it.
+      memory->GetMessage(scratch_index)->header.queue_index.RelaxedInvalidate();
+      aos_compiler_memory_barrier();
+
       // And mark that we succeeded.
       __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
       ++valid_senders;
@@ -139,6 +250,20 @@
     need_recovery[i] = true;
   }
 
+  // Cleaning up pinners is easy. We don't actually have to do anything, but
+  // invalidating its pinned field might help catch bugs elsewhere trying to
+  // read it before it's set.
+  for (size_t i = 0; i < num_pinners; ++i) {
+    Pinner *const pinner = memory->GetPinner(i);
+    const uint32_t tid =
+        __atomic_load_n(&(pinner->tid.futex), __ATOMIC_ACQUIRE);
+    if (!(tid & FUTEX_OWNER_DIED)) {
+      continue;
+    }
+    pinner->pinned.Invalidate();
+    __atomic_store_n(&(pinner->tid.futex), 0, __ATOMIC_RELEASE);
+  }
+
   // If all the senders are (or were made) good, there is no need to do the hard
   // case.
   if (valid_senders == num_senders) {
@@ -162,18 +287,18 @@
           return false;
         }
         ++num_missing;
-      } else {
-        CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
-        // We can do a relaxed load here because we're the only person touching
-        // this sender at this point, if it matters. If it's not a dead sender,
-        // then any message it every has will already be accounted for, so this
-        // will always be a NOP.
-        const Index scratch_index = sender->scratch_index.RelaxedLoad();
-        if (!accounted_for[scratch_index.message_index()]) {
-          ++num_accounted_for;
-        }
-        accounted_for[scratch_index.message_index()] = true;
+        continue;
       }
+      CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
+      // We can do a relaxed load here because we're the only person touching
+      // this sender at this point, if it matters. If it's not a dead sender,
+      // then any message it ever has will eventually be accounted for if we
+      // make enough tries through the outer loop.
+      const Index scratch_index = sender->scratch_index.RelaxedLoad();
+      if (!accounted_for[scratch_index.message_index()]) {
+        ++num_accounted_for;
+      }
+      accounted_for[scratch_index.message_index()] = true;
     }
 
     for (size_t i = 0; i < queue_size; ++i) {
@@ -185,6 +310,16 @@
       accounted_for[index.message_index()] = true;
     }
 
+    for (size_t pinner_index = 0; pinner_index < num_pinners; ++pinner_index) {
+      // Same logic as above for scratch_index applies here too.
+      const Index index =
+          memory->GetPinner(pinner_index)->scratch_index.RelaxedLoad();
+      if (!accounted_for[index.message_index()]) {
+        ++num_accounted_for;
+      }
+      accounted_for[index.message_index()] = true;
+    }
+
     CHECK_LE(num_accounted_for + num_missing, num_messages);
   }
 
@@ -224,6 +359,9 @@
         // atomically insert scratch_index into the queue yet.  So
         // invalidate to_replace.
         sender->to_replace.Invalidate();
+        // Sender definitely will not have gotten here, so finish for it.
+        memory->GetMessage(scratch_index)
+            ->header.queue_index.RelaxedInvalidate();
 
         // And then mark this sender clean.
         __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
@@ -240,6 +378,12 @@
         // scratch_index is accounted for.  That means we did the insert,
         // but didn't record it.
         CHECK(to_replace.valid());
+
+        // Make sure to indicate it's an unused message before a sender gets its
+        // hands on it.
+        memory->GetMessage(to_replace)->header.queue_index.RelaxedInvalidate();
+        aos_compiler_memory_barrier();
+
         // Finish the transaction.  Copy to_replace, then clear it.
 
         sender->scratch_index.Store(to_replace);
@@ -280,6 +424,13 @@
   return syscall(SYS_rt_tgsigqueueinfo, tgid, tid, sig, si);
 }
 
+QueueIndex ZeroOrValid(QueueIndex index) {
+  if (!index.valid()) {
+    return index.Clear();
+  }
+  return index;
+}
+
 }  // namespace
 
 size_t LocklessQueueConfiguration::message_size() const {
@@ -311,6 +462,9 @@
   CHECK_EQ(size % alignof(Sender), 0u);
   size += LocklessQueueMemory::SizeOfSenders(config);
 
+  CHECK_EQ(size % alignof(Pinner), 0u);
+  size += LocklessQueueMemory::SizeOfPinners(config);
+
   return size;
 }
 
@@ -371,6 +525,7 @@
     // TODO(austin): Check these for out of bounds.
     memory->config.num_watchers = config.num_watchers;
     memory->config.num_senders = config.num_senders;
+    memory->config.num_pinners = config.num_pinners;
     memory->config.queue_size = config.queue_size;
     memory->config.message_data_size = config.message_data_size;
 
@@ -403,6 +558,15 @@
       s->to_replace.RelaxedInvalidate();
     }
 
+    for (size_t i = 0; i < memory->num_pinners(); ++i) {
+      ::aos::ipc_lib::Pinner *pinner = memory->GetPinner(i);
+      // Nobody else can possibly be touching these because we haven't set
+      // initialized to true yet.
+      pinner->scratch_index.RelaxedStore(
+          Index(0xffff, i + memory->num_senders() + memory->queue_size()));
+      pinner->pinned.Invalidate();
+    }
+
     aos_compiler_memory_barrier();
     // Signal everything is done.  This needs to be done last, so if we die, we
     // redo initialization.
@@ -414,30 +578,57 @@
   return memory;
 }
 
-LocklessQueue::LocklessQueue(LocklessQueueMemory *memory,
-                             LocklessQueueConfiguration config)
-    : memory_(InitializeLocklessQueueMemory(memory, config)),
-      watcher_copy_(memory_->num_watchers()),
-      pid_(getpid()),
-      uid_(getuid()) {}
+void LocklessQueue::Initialize() {
+  InitializeLocklessQueueMemory(memory_, config_);
+}
 
-LocklessQueue::~LocklessQueue() {
-  CHECK_EQ(watcher_index_, -1);
+LocklessQueueWatcher::~LocklessQueueWatcher() {
+  if (watcher_index_ == -1) {
+    return;
+  }
 
+  // Since everything is self consistent, all we need to do is make sure nobody
+  // else is running.  Someone dying will get caught in the generic consistency
+  // check.
   GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
-  const int num_watchers = memory_->num_watchers();
+
+  // Make sure we are registered.
+  CHECK_NE(watcher_index_, -1);
+
+  // Make sure we still own the slot we are supposed to.
+  CHECK(
+      death_notification_is_held(&(memory_->GetWatcher(watcher_index_)->tid)));
+
+  // The act of unlocking invalidates the entry.  Invalidate it.
+  death_notification_release(&(memory_->GetWatcher(watcher_index_)->tid));
+  // And internally forget the slot.
+  watcher_index_ = -1;
+
   // Cleanup is cheap. The next user will do it anyways, so no need for us to do
   // anything right now.
 
   // And confirm that nothing is owned by us.
+  const int num_watchers = memory_->num_watchers();
   for (int i = 0; i < num_watchers; ++i) {
-    CHECK(!death_notification_is_held(&(memory_->GetWatcher(i)->tid)));
+    CHECK(!death_notification_is_held(&(memory_->GetWatcher(i)->tid)))
+        << ": " << i;
   }
 }
 
-size_t LocklessQueue::QueueSize() const { return memory_->queue_size(); }
+std::optional<LocklessQueueWatcher> LocklessQueueWatcher::Make(
+    LocklessQueue queue, int priority) {
+  queue.Initialize();
+  LocklessQueueWatcher result(queue.memory(), priority);
+  if (result.watcher_index_ != -1) {
+    return std::move(result);
+  } else {
+    return std::nullopt;
+  }
+}
 
-bool LocklessQueue::RegisterWakeup(int priority) {
+LocklessQueueWatcher::LocklessQueueWatcher(LocklessQueueMemory *memory,
+                                           int priority)
+    : memory_(memory) {
   // TODO(austin): Make sure signal coalescing is turned on.  We don't need
   // duplicates.  That will improve performance under high load.
 
@@ -466,10 +657,10 @@
 
   // Bail if we failed to find an open slot.
   if (watcher_index_ == -1) {
-    return false;
+    return;
   }
 
-  Watcher *w = memory_->GetWatcher(watcher_index_);
+  Watcher *const w = memory_->GetWatcher(watcher_index_);
 
   w->pid = getpid();
   w->priority = priority;
@@ -477,29 +668,15 @@
   // Grabbing a mutex is a compiler and memory barrier, so nothing before will
   // get rearranged afterwords.
   death_notification_init(&(w->tid));
-  return true;
 }
 
-void LocklessQueue::UnregisterWakeup() {
-  // Since everything is self consistent, all we need to do is make sure nobody
-  // else is running.  Someone dying will get caught in the generic consistency
-  // check.
-  GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
-
-  // Make sure we are registered.
-  CHECK_NE(watcher_index_, -1);
-
-  // Make sure we still own the slot we are supposed to.
-  CHECK(
-      death_notification_is_held(&(memory_->GetWatcher(watcher_index_)->tid)));
-
-  // The act of unlocking invalidates the entry.  Invalidate it.
-  death_notification_release(&(memory_->GetWatcher(watcher_index_)->tid));
-  // And internally forget the slot.
-  watcher_index_ = -1;
+LocklessQueueWakeUpper::LocklessQueueWakeUpper(LocklessQueue queue)
+    : memory_(queue.const_memory()), pid_(getpid()), uid_(getuid()) {
+  queue.Initialize();
+  watcher_copy_.resize(memory_->num_watchers());
 }
 
-int LocklessQueue::Wakeup(const int current_priority) {
+int LocklessQueueWakeUpper::Wakeup(const int current_priority) {
   const size_t num_watchers = memory_->num_watchers();
 
   CHECK_EQ(watcher_copy_.size(), num_watchers);
@@ -511,7 +688,7 @@
   // question.  There is no way without pidfd's to close this window, and
   // creating a pidfd is likely not RT.
   for (size_t i = 0; i < num_watchers; ++i) {
-    Watcher *w = memory_->GetWatcher(i);
+    const Watcher *w = memory_->GetWatcher(i);
     watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_RELAXED);
     // Force the load of the TID to come first.
     aos_compiler_memory_barrier();
@@ -584,7 +761,8 @@
   return count;
 }
 
-LocklessQueue::Sender::Sender(LocklessQueueMemory *memory) : memory_(memory) {
+LocklessQueueSender::LocklessQueueSender(LocklessQueueMemory *memory)
+    : memory_(memory) {
   GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
 
   // Since we already have the lock, go ahead and try cleaning up.
@@ -609,47 +787,54 @@
     return;
   }
 
-  ::aos::ipc_lib::Sender *s = memory_->GetSender(sender_index_);
+  ::aos::ipc_lib::Sender *const sender = memory_->GetSender(sender_index_);
 
   // Indicate that we are now alive by taking over the slot. If the previous
   // owner died, we still want to do this.
-  death_notification_init(&(s->tid));
+  death_notification_init(&(sender->tid));
+
+  const Index scratch_index = sender->scratch_index.RelaxedLoad();
+  Message *const message = memory_->GetMessage(scratch_index);
+  CHECK(!message->header.queue_index.RelaxedLoad(memory_->queue_size()).valid())
+      << ": " << std::hex << scratch_index.get();
 }
 
-LocklessQueue::Sender::~Sender() {
-  if (valid()) {
+LocklessQueueSender::~LocklessQueueSender() {
+  if (sender_index_ != -1) {
+    CHECK(memory_ != nullptr);
     death_notification_release(&(memory_->GetSender(sender_index_)->tid));
   }
 }
 
-std::optional<LocklessQueue::Sender> LocklessQueue::MakeSender() {
-  LocklessQueue::Sender result = LocklessQueue::Sender(memory_);
-  if (result.valid()) {
+std::optional<LocklessQueueSender> LocklessQueueSender::Make(
+    LocklessQueue queue) {
+  queue.Initialize();
+  LocklessQueueSender result(queue.memory());
+  if (result.sender_index_ != -1) {
     return std::move(result);
   } else {
     return std::nullopt;
   }
 }
 
-QueueIndex ZeroOrValid(QueueIndex index) {
-  if (!index.valid()) {
-    return index.Clear();
-  }
-  return index;
+size_t LocklessQueueSender::size() const {
+  return memory_->message_data_size();
 }
 
-size_t LocklessQueue::Sender::size() { return memory_->message_data_size(); }
-
-void *LocklessQueue::Sender::Data() {
+void *LocklessQueueSender::Data() {
   ::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
-  Index scratch_index = sender->scratch_index.RelaxedLoad();
-  Message *message = memory_->GetMessage(scratch_index);
-  message->header.queue_index.Invalidate();
+  const Index scratch_index = sender->scratch_index.RelaxedLoad();
+  Message *const message = memory_->GetMessage(scratch_index);
+  // We should have invalidated this when we first got the buffer. Verify that
+  // in debug mode.
+  DCHECK(
+      !message->header.queue_index.RelaxedLoad(memory_->queue_size()).valid())
+      << ": " << std::hex << scratch_index.get();
 
   return message->data(memory_->message_data_size());
 }
 
-void LocklessQueue::Sender::Send(
+void LocklessQueueSender::Send(
     const char *data, size_t length,
     aos::monotonic_clock::time_point monotonic_remote_time,
     aos::realtime_clock::time_point realtime_remote_time,
@@ -666,7 +851,7 @@
        monotonic_sent_time, realtime_sent_time, queue_index);
 }
 
-void LocklessQueue::Sender::Send(
+void LocklessQueueSender::Send(
     size_t length, aos::monotonic_clock::time_point monotonic_remote_time,
     aos::realtime_clock::time_point realtime_remote_time,
     uint32_t remote_queue_index,
@@ -682,6 +867,12 @@
   const Index scratch_index = sender->scratch_index.RelaxedLoad();
   Message *const message = memory_->GetMessage(scratch_index);
 
+  // We should have invalidated this when we first got the buffer. Verify that
+  // in debug mode.
+  DCHECK(
+      !message->header.queue_index.RelaxedLoad(memory_->queue_size()).valid())
+      << ": " << std::hex << scratch_index.get();
+
   message->header.length = length;
   // Pass these through.  Any alternative behavior can be implemented out a
   // layer.
@@ -689,6 +880,7 @@
   message->header.monotonic_remote_time = monotonic_remote_time;
   message->header.realtime_remote_time = realtime_remote_time;
 
+  Index to_replace = Index::Invalid();
   while (true) {
     const QueueIndex actual_next_queue_index =
         memory_->next_queue_index.Load(queue_size);
@@ -698,7 +890,7 @@
 
     // This needs to synchronize with whoever the previous writer at this
     // location was.
-    const Index to_replace = memory_->LoadIndex(next_queue_index);
+    to_replace = memory_->LoadIndex(next_queue_index);
 
     const QueueIndex decremented_queue_index =
         next_queue_index.DecrementBy(queue_size);
@@ -726,9 +918,14 @@
     }
 
     // Confirm that the message is what it should be.
+    //
+    // This is just a best-effort check to skip reading the clocks if possible.
+    // If this fails, then the compare-exchange below definitely would, so we
+    // can bail out now.
     {
       const QueueIndex previous_index =
-          memory_->GetMessage(to_replace)->header.queue_index.Load(queue_size);
+          memory_->GetMessage(to_replace)
+              ->header.queue_index.RelaxedLoad(queue_size);
       if (previous_index != decremented_queue_index && previous_index.valid()) {
         // Retry.
         VLOG(3) << "Something fishy happened, queue index doesn't match.  "
@@ -794,17 +991,145 @@
     aos_compiler_memory_barrier();
     // And then record that we succeeded, but definitely after the above store.
     sender->to_replace.RelaxedInvalidate();
+
     break;
   }
+
+  // to_replace is our current scratch_index. It isn't in the queue, which means
+  // nobody new can pin it. They can set their `pinned` to it, but they will
+  // back it out, so they don't count. This means that we just need to find a
+  // message for which no pinner had it in `pinned`, and then we know this
+  // message will never be pinned. We'll start with to_replace, and if that is
+  // pinned then we'll look for a new one to use instead.
+  const Index new_scratch =
+      SwapPinnedSenderScratch(memory_, sender, to_replace);
+
+  // If anybody is looking at this message (they shouldn't be), then try telling
+  // them about it (best-effort).
+  memory_->GetMessage(new_scratch)->header.queue_index.RelaxedInvalidate();
+}
+
+int LocklessQueueSender::buffer_index() const {
+  ::aos::ipc_lib::Sender *const sender = memory_->GetSender(sender_index_);
+  // We can do a relaxed load on our sender because we're the only person
+  // modifying it right now.
+  const Index scratch_index = sender->scratch_index.RelaxedLoad();
+  return scratch_index.message_index();
+}
+
+LocklessQueuePinner::LocklessQueuePinner(
+    LocklessQueueMemory *memory, const LocklessQueueMemory *const_memory)
+    : memory_(memory), const_memory_(const_memory) {
+  GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
+
+  // Since we already have the lock, go ahead and try cleaning up.
+  Cleanup(memory_, grab_queue_setup_lock);
+
+  const int num_pinners = memory_->num_pinners();
+
+  for (int i = 0; i < num_pinners; ++i) {
+    ::aos::ipc_lib::Pinner *p = memory->GetPinner(i);
+    // This doesn't need synchronization because we're the only process doing
+    // initialization right now, and nobody else will be touching pinners which
+    // we're interested in.
+    const uint32_t tid = __atomic_load_n(&(p->tid.futex), __ATOMIC_RELAXED);
+    if (tid == 0) {
+      pinner_index_ = i;
+      break;
+    }
+  }
+
+  if (pinner_index_ == -1) {
+    VLOG(1) << "Too many pinners, starting to bail.";
+    return;
+  }
+
+  ::aos::ipc_lib::Pinner *p = memory_->GetPinner(pinner_index_);
+  p->pinned.Invalidate();
+
+  // Indicate that we are now alive by taking over the slot. If the previous
+  // owner died, we still want to do this.
+  death_notification_init(&(p->tid));
+}
+
+LocklessQueuePinner::~LocklessQueuePinner() {
+  if (pinner_index_ != -1) {
+    CHECK(memory_ != nullptr);
+    memory_->GetPinner(pinner_index_)->pinned.Invalidate();
+    aos_compiler_memory_barrier();
+    death_notification_release(&(memory_->GetPinner(pinner_index_)->tid));
+  }
 }
 
-LocklessQueue::ReadResult LocklessQueue::Read(
+std::optional<LocklessQueuePinner> LocklessQueuePinner::Make(
+    LocklessQueue queue) {
+  queue.Initialize();
+  LocklessQueuePinner result(queue.memory(), queue.const_memory());
+  if (result.pinner_index_ != -1) {
+    return std::move(result);
+  } else {
+    return std::nullopt;
+  }
+}
+
+// This method doesn't mess with any scratch_index, so it doesn't have to worry
+// about message ownership.
+int LocklessQueuePinner::PinIndex(uint32_t uint32_queue_index) {
+  const size_t queue_size = memory_->queue_size();
+  const QueueIndex queue_index =
+      QueueIndex::Zero(queue_size).IncrementBy(uint32_queue_index);
+  ipc_lib::Pinner *const pinner = memory_->GetPinner(pinner_index_);
+
+  AtomicIndex *const queue_slot = memory_->GetQueue(queue_index.Wrapped());
+
+  // Indicate that we want to pin this message.
+  pinner->pinned.Store(queue_index);
+  aos_compiler_memory_barrier();
+
+  {
+    const Index message_index = queue_slot->Load();
+    Message *const message = memory_->GetMessage(message_index);
+
+    const QueueIndex message_queue_index =
+        message->header.queue_index.Load(queue_size);
+    if (message_queue_index == queue_index) {
+      VLOG(3) << "Eq: " << std::hex << message_queue_index.index();
+      aos_compiler_memory_barrier();
+      return message_index.message_index();
+    }
+    VLOG(3) << "Message reused: " << std::hex << message_queue_index.index()
+            << ", " << queue_index.index();
+  }
+
+  // Being down here means we asked to pin a message before realizing it's no
+  // longer in the queue, so back that out now.
+  pinner->pinned.Invalidate();
+  VLOG(3) << "Unpinned: " << std::hex << queue_index.index();
+  return -1;
+}
+
+size_t LocklessQueuePinner::size() const {
+  return const_memory_->message_data_size();
+}
+
+const void *LocklessQueuePinner::Data() const {
+  const size_t queue_size = const_memory_->queue_size();
+  const ::aos::ipc_lib::Pinner *const pinner =
+      const_memory_->GetPinner(pinner_index_);
+  QueueIndex pinned = pinner->pinned.RelaxedLoad(queue_size);
+  CHECK(pinned.valid());
+  const Message *message = const_memory_->GetMessage(pinned);
+
+  return message->data(const_memory_->message_data_size());
+}
+
+LocklessQueueReader::Result LocklessQueueReader::Read(
     uint32_t uint32_queue_index,
     ::aos::monotonic_clock::time_point *monotonic_sent_time,
     ::aos::realtime_clock::time_point *realtime_sent_time,
     ::aos::monotonic_clock::time_point *monotonic_remote_time,
     ::aos::realtime_clock::time_point *realtime_remote_time,
-    uint32_t *remote_queue_index, size_t *length, char *data) {
+    uint32_t *remote_queue_index, size_t *length, char *data) const {
   const size_t queue_size = memory_->queue_size();
 
   // Build up the QueueIndex.
@@ -813,7 +1138,7 @@
 
   // Read the message stored at the requested location.
   Index mi = memory_->LoadIndex(queue_index);
-  Message *m = memory_->GetMessage(mi);
+  const Message *m = memory_->GetMessage(mi);
 
   while (true) {
     // We need to confirm that the data doesn't change while we are reading it.
@@ -826,47 +1151,47 @@
       if (starting_queue_index == queue_index.DecrementBy(queue_size)) {
         VLOG(3) << "Matches: " << std::hex << starting_queue_index.index()
                 << ", " << queue_index.DecrementBy(queue_size).index();
-        return ReadResult::NOTHING_NEW;
+        return Result::NOTHING_NEW;
+      }
+
+      // Someone has re-used this message between when we pulled it out of the
+      // queue and when we grabbed its index.  It is pretty hard to deduce
+      // what happened. Just try again.
+      const Message *const new_m = memory_->GetMessage(queue_index);
+      if (m != new_m) {
+        m = new_m;
+        VLOG(3) << "Retrying, m doesn't match";
+        continue;
+      }
+
+      // We have confirmed that message still points to the same message. This
+      // means that the message didn't get swapped out from under us, so
+      // starting_queue_index is correct.
+      //
+      // Either we got too far behind (signaled by this being a valid
+      // message), or this is one of the initial messages which are invalid.
+      if (starting_queue_index.valid()) {
+        VLOG(3) << "Too old.  Tried for " << std::hex << queue_index.index()
+                << ", got " << starting_queue_index.index() << ", behind by "
+                << std::dec
+                << (starting_queue_index.index() - queue_index.index());
+        return Result::TOO_OLD;
+      }
+
+      VLOG(3) << "Initial";
+
+      // There isn't a valid message at this location.
+      //
+      // If someone asks for one of the messages within the first go around,
+      // then they need to wait.  They got ahead.  Otherwise, they are
+      // asking for something crazy, like something before the beginning of
+      // the queue.  Tell them that they are behind.
+      if (uint32_queue_index < memory_->queue_size()) {
+        VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
+        return Result::NOTHING_NEW;
       } else {
-        // Someone has re-used this message between when we pulled it out of the
-        // queue and when we grabbed its index.  It is pretty hard to deduce
-        // what happened. Just try again.
-        Message *const new_m = memory_->GetMessage(queue_index);
-        if (m != new_m) {
-          m = new_m;
-          VLOG(3) << "Retrying, m doesn't match";
-          continue;
-        }
-
-        // We have confirmed that message still points to the same message. This
-        // means that the message didn't get swapped out from under us, so
-        // starting_queue_index is correct.
-        //
-        // Either we got too far behind (signaled by this being a valid
-        // message), or this is one of the initial messages which are invalid.
-        if (starting_queue_index.valid()) {
-          VLOG(3) << "Too old.  Tried for " << std::hex << queue_index.index()
-                  << ", got " << starting_queue_index.index() << ", behind by "
-                  << std::dec
-                  << (starting_queue_index.index() - queue_index.index());
-          return ReadResult::TOO_OLD;
-        }
-
-        VLOG(3) << "Initial";
-
-        // There isn't a valid message at this location.
-        //
-        // If someone asks for one of the messages within the first go around,
-        // then they need to wait.  They got ahead.  Otherwise, they are
-        // asking for something crazy, like something before the beginning of
-        // the queue.  Tell them that they are behind.
-        if (uint32_queue_index < memory_->queue_size()) {
-          VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
-          return ReadResult::NOTHING_NEW;
-        } else {
-          VLOG(3) << "Not near zero, " << std::hex << uint32_queue_index;
-          return ReadResult::TOO_OLD;
-        }
+        VLOG(3) << "Not near zero, " << std::hex << uint32_queue_index;
+        return Result::TOO_OLD;
       }
     }
     VLOG(3) << "Eq: " << std::hex << starting_queue_index.index() << ", "
@@ -886,7 +1211,8 @@
   *monotonic_remote_time = m->header.monotonic_remote_time;
   *realtime_remote_time = m->header.realtime_remote_time;
   if (data) {
-    memcpy(data, m->data(memory_->message_data_size()), message_data_size());
+    memcpy(data, m->data(memory_->message_data_size()),
+           memory_->message_data_size());
   }
   *length = m->header.length;
 
@@ -901,18 +1227,13 @@
             << queue_index.index() << ", finished with "
             << final_queue_index.index() << ", delta: " << std::dec
             << (final_queue_index.index() - queue_index.index());
-    return ReadResult::OVERWROTE;
+    return Result::OVERWROTE;
   }
 
-  return ReadResult::GOOD;
+  return Result::GOOD;
 }
 
-size_t LocklessQueue::queue_size() const { return memory_->queue_size(); }
-size_t LocklessQueue::message_data_size() const {
-  return memory_->message_data_size();
-}
-
-QueueIndex LocklessQueue::LatestQueueIndex() {
+QueueIndex LocklessQueueReader::LatestIndex() const {
   const size_t queue_size = memory_->queue_size();
 
   // There is only one interesting case.  We need to know if the queue is empty.
@@ -922,9 +1243,16 @@
   if (next_queue_index.valid()) {
     const QueueIndex current_queue_index = next_queue_index.DecrementBy(1u);
     return current_queue_index;
-  } else {
-    return empty_queue_index();
   }
+  return QueueIndex::Invalid();
+}
+
+size_t LocklessQueueSize(const LocklessQueueMemory *memory) {
+  return memory->queue_size();
+}
+
+size_t LocklessQueueMessageDataSize(const LocklessQueueMemory *memory) {
+  return memory->message_data_size();
 }
 
 namespace {
@@ -960,6 +1288,8 @@
               << ::std::endl;
   ::std::cout << "    size_t num_senders = " << memory->config.num_senders
               << ::std::endl;
+  ::std::cout << "    size_t num_pinners = " << memory->config.num_pinners
+              << ::std::endl;
   ::std::cout << "    size_t queue_size = " << memory->config.queue_size
               << ::std::endl;
   ::std::cout << "    size_t message_data_size = "
@@ -1049,6 +1379,22 @@
   }
   ::std::cout << "  }" << ::std::endl;
 
+  ::std::cout << "  Pinner pinners[" << memory->num_pinners() << "] {"
+              << ::std::endl;
+  for (size_t i = 0; i < memory->num_pinners(); ++i) {
+    Pinner *p = memory->GetPinner(i);
+    ::std::cout << "    [" << i << "] -> Pinner {" << ::std::endl;
+    ::std::cout << "      aos_mutex tid = " << PrintMutex(&p->tid)
+                << ::std::endl;
+    ::std::cout << "      AtomicIndex scratch_index = "
+                << p->scratch_index.Load().DebugString() << ::std::endl;
+    ::std::cout << "      AtomicIndex pinned = "
+                << p->pinned.Load(memory->queue_size()).DebugString()
+                << ::std::endl;
+    ::std::cout << "    }" << ::std::endl;
+  }
+  ::std::cout << "  }" << ::std::endl;
+
   ::std::cout << "  Watcher watchers[" << memory->num_watchers() << "] {"
               << ::std::endl;
   for (size_t i = 0; i < memory->num_watchers(); ++i) {
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index de80f3d..3cd3726 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -4,8 +4,8 @@
 #include <signal.h>
 #include <sys/signalfd.h>
 #include <sys/types.h>
-#include <vector>
 #include <optional>
+#include <vector>
 
 #include "aos/ipc_lib/aos_sync.h"
 #include "aos/ipc_lib/data_alignment.h"
@@ -51,6 +51,21 @@
   AtomicIndex to_replace;
 };
 
+// Structure to hold the state required to pin messages.
+struct Pinner {
+  // The same as Sender::tid. See there for docs.
+  aos_mutex tid;
+
+  // Queue index of the message we have pinned, or Invalid if there isn't one.
+  AtomicQueueIndex pinned;
+
+  // This should always be valid.
+  //
+  // Note that this is fully independent from pinned. It's just a place to stash
+  // a message, to ensure there's always an unpinned one for a writer to grab.
+  AtomicIndex scratch_index;
+};
+
 // Structure representing a message.
 struct Message {
   struct Header {
@@ -98,6 +113,8 @@
   size_t num_watchers;
   // Size of the sender list.
   size_t num_senders;
+  // Size of the pinner list.
+  size_t num_pinners;
 
   // Size of the list of pointers into the messages list.
   size_t queue_size;
@@ -106,7 +123,7 @@
 
   size_t message_size() const;
 
-  size_t num_messages() const { return num_senders + queue_size; }
+  size_t num_messages() const { return num_senders + num_pinners + queue_size; }
 };
 
 // Structure to hold the state of the queue.
@@ -117,40 +134,72 @@
 // is done before the watcher goes RT), but needs to be RT for the sender.
 struct LocklessQueueMemory;
 
+// Returns the size of the LocklessQueueMemory.
+size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
+
 // Initializes the queue memory.  memory must be either a valid pointer to the
 // queue datastructure, or must be zero initialized.
 LocklessQueueMemory *InitializeLocklessQueueMemory(
     LocklessQueueMemory *memory, LocklessQueueConfiguration config);
 
-// Returns the size of the LocklessQueueMemory.
-size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
-
-// Prints to stdout the data inside the queue for debugging.
-void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
-
 const static unsigned int kWakeupSignal = SIGRTMIN + 2;
 
-// Class to manage sending and receiving data in the lockless queue.  This is
-// separate from the actual memory backing the queue so that memory can be
-// managed with mmap to share across the process boundary.
+// A convenient wrapper for accessing a lockless queue.
 class LocklessQueue {
  public:
-  LocklessQueue(LocklessQueueMemory *memory, LocklessQueueConfiguration config);
-  LocklessQueue(const LocklessQueue &) = delete;
-  LocklessQueue &operator=(const LocklessQueue &) = delete;
+  LocklessQueue(const LocklessQueueMemory *const_memory,
+                LocklessQueueMemory *memory, LocklessQueueConfiguration config)
+      : const_memory_(const_memory), memory_(memory), config_(config) {}
 
-  ~LocklessQueue();
+  void Initialize();
 
-  // Returns the number of messages in the queue.
-  size_t QueueSize() const;
+  LocklessQueueConfiguration config() const { return config_; }
 
-  size_t message_data_size() const;
+  const LocklessQueueMemory *const_memory() { return const_memory_; }
+  LocklessQueueMemory *memory() { return memory_; }
 
-  // Registers this thread to receive the kWakeupSignal signal when Wakeup is
-  // called. Returns false if there was an error in registration.
-  bool RegisterWakeup(int priority);
-  // Unregisters the wakeup.
-  void UnregisterWakeup();
+ private:
+  const LocklessQueueMemory *const_memory_;
+  LocklessQueueMemory *memory_;
+  LocklessQueueConfiguration config_;
+};
+
+class LocklessQueueWatcher {
+ public:
+  LocklessQueueWatcher(const LocklessQueueWatcher &) = delete;
+  LocklessQueueWatcher &operator=(const LocklessQueueWatcher &) = delete;
+  LocklessQueueWatcher(LocklessQueueWatcher &&other)
+      : memory_(other.memory_), watcher_index_(other.watcher_index_) {
+    other.watcher_index_ = -1;
+  }
+  LocklessQueueWatcher &operator=(LocklessQueueWatcher &&other) {
+    std::swap(memory_, other.memory_);
+    std::swap(watcher_index_, other.watcher_index_);
+    return *this;
+  }
+
+  ~LocklessQueueWatcher();
+
+  // Registers this thread to receive the kWakeupSignal signal when
+  // LocklessQueueWakeUpper::Wakeup is called. Returns nullopt if there was an
+  // error in registration.
+  // TODO(austin): Change the API if we find ourselves with more errors.
+  static std::optional<LocklessQueueWatcher> Make(LocklessQueue queue,
+                                                  int priority);
+
+ private:
+  LocklessQueueWatcher(LocklessQueueMemory *memory, int priority);
+
+  LocklessQueueMemory *memory_ = nullptr;
+
+  // Index in the watcher list that our entry is, or -1 if no watcher is
+  // registered.
+  int watcher_index_ = -1;
+};
+
+class LocklessQueueWakeUpper {
+ public:
+  LocklessQueueWakeUpper(LocklessQueue queue);
 
   // Sends the kWakeupSignal to all threads which have called RegisterWakeup.
   //
@@ -158,113 +207,7 @@
   // if nonrt.
   int Wakeup(int current_priority);
 
-  // If you ask for a queue index 2 past the newest, you will still get
-  // NOTHING_NEW until that gets overwritten with new data.  If you ask for an
-  // element newer than QueueSize() from the current message, we consider it
-  // behind by a large amount and return TOO_OLD.  If the message is modified
-  // out from underneath us as we read it, return OVERWROTE.
-  //
-  // data may be nullptr to indicate the data should not be copied.
-  enum class ReadResult { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
-  ReadResult Read(uint32_t queue_index,
-                  ::aos::monotonic_clock::time_point *monotonic_sent_time,
-                  ::aos::realtime_clock::time_point *realtime_sent_time,
-                  ::aos::monotonic_clock::time_point *monotonic_remote_time,
-                  ::aos::realtime_clock::time_point *realtime_remote_time,
-                  uint32_t *remote_queue_index, size_t *length, char *data);
-
-  // Returns the index to the latest queue message.  Returns empty_queue_index()
-  // if there are no messages in the queue.  Do note that this index wraps if
-  // more than 2^32 messages are sent.
-  QueueIndex LatestQueueIndex();
-  static QueueIndex empty_queue_index() { return QueueIndex::Invalid(); }
-
-  // Returns the size of the queue.  This is mostly useful for manipulating
-  // QueueIndex.
-  size_t queue_size() const;
-
-  // TODO(austin): Return the oldest queue index.  This lets us catch up nicely
-  // if we got behind.
-  // The easiest way to implement this is likely going to be to reserve the
-  // first modulo of values for the initial time around, and never reuse them.
-  // That lets us do a simple atomic read of the next index and deduce what has
-  // happened.  It will involve the simplest atomic operations.
-
-  // TODO(austin): Make it so we can find the indices which were sent just
-  // before and after a time with a binary search.
-
-  // Sender for blocks of data.  The resources associated with a sender are
-  // scoped to this object's lifetime.
-  class Sender {
-   public:
-    Sender(const Sender &) = delete;
-    Sender &operator=(const Sender &) = delete;
-    Sender(Sender &&other)
-        : memory_(other.memory_), sender_index_(other.sender_index_) {
-      other.memory_ = nullptr;
-      other.sender_index_ = -1;
-    }
-    Sender &operator=(Sender &&other) {
-      memory_ = other.memory_;
-      sender_index_ = other.sender_index_;
-      other.memory_ = nullptr;
-      other.sender_index_ = -1;
-      return *this;
-    }
-
-    ~Sender();
-
-    // Sends a message without copying the data.
-    // Copy at most size() bytes of data into the memory pointed to by Data(),
-    // and then call Send().
-    // Note: calls to Data() are expensive enough that you should cache it.
-    size_t size();
-    void *Data();
-    void Send(size_t length,
-              aos::monotonic_clock::time_point monotonic_remote_time =
-                  aos::monotonic_clock::min_time,
-              aos::realtime_clock::time_point realtime_remote_time =
-                  aos::realtime_clock::min_time,
-              uint32_t remote_queue_index = 0xffffffff,
-              aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
-              aos::realtime_clock::time_point *realtime_sent_time = nullptr,
-              uint32_t *queue_index = nullptr);
-
-    // Sends up to length data.  Does not wakeup the target.
-    void Send(const char *data, size_t length,
-              aos::monotonic_clock::time_point monotonic_remote_time =
-                  aos::monotonic_clock::min_time,
-              aos::realtime_clock::time_point realtime_remote_time =
-                  aos::realtime_clock::min_time,
-              uint32_t remote_queue_index = 0xffffffff,
-              aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
-              aos::realtime_clock::time_point *realtime_sent_time = nullptr,
-              uint32_t *queue_index = nullptr);
-
-   private:
-    friend class LocklessQueue;
-
-    Sender(LocklessQueueMemory *memory);
-
-    // Returns true if this sender is valid.  If it isn't valid, any of the
-    // other methods won't work.  This is here to allow the lockless queue to
-    // only build a sender if there was one available.
-    bool valid() const { return sender_index_ != -1 && memory_ != nullptr; }
-
-    // Pointer to the backing memory.
-    LocklessQueueMemory *memory_ = nullptr;
-
-    // Index into the sender list.
-    int sender_index_ = -1;
-  };
-
-  // Creates a sender.  If we couldn't allocate a sender, returns nullopt.
-  // TODO(austin): Change the API if we find ourselves with more errors.
-  std::optional<Sender> MakeSender();
-
  private:
-  LocklessQueueMemory *memory_ = nullptr;
-
   // Memory and datastructure used to sort a list of watchers to wake
   // up.  This isn't a copy of Watcher since tid is simpler to work with here
   // than the futex above.
@@ -273,17 +216,176 @@
     pid_t pid;
     int priority;
   };
-  // TODO(austin): Don't allocate this memory if we aren't going to send.
-  ::std::vector<WatcherCopy> watcher_copy_;
 
-  // Index in the watcher list that our entry is, or -1 if no watcher is
-  // registered.
-  int watcher_index_ = -1;
-
+  const LocklessQueueMemory *const memory_;
   const int pid_;
   const uid_t uid_;
+
+  ::std::vector<WatcherCopy> watcher_copy_;
 };
 
+// Sender for blocks of data.  The resources associated with a sender are
+// scoped to this object's lifetime.
+class LocklessQueueSender {
+ public:
+  LocklessQueueSender(const LocklessQueueSender &) = delete;
+  LocklessQueueSender &operator=(const LocklessQueueSender &) = delete;
+  LocklessQueueSender(LocklessQueueSender &&other)
+      : memory_(other.memory_), sender_index_(other.sender_index_) {
+    other.memory_ = nullptr;
+    other.sender_index_ = -1;
+  }
+  LocklessQueueSender &operator=(LocklessQueueSender &&other) {
+    std::swap(memory_, other.memory_);
+    std::swap(sender_index_, other.sender_index_);
+    return *this;
+  }
+
+  ~LocklessQueueSender();
+
+  // Creates a sender.  If we couldn't allocate a sender, returns nullopt.
+  // TODO(austin): Change the API if we find ourselves with more errors.
+  static std::optional<LocklessQueueSender> Make(LocklessQueue queue);
+
+  // Sends a message without copying the data.
+  // Copy at most size() bytes of data into the memory pointed to by Data(),
+  // and then call Send().
+  // Note: calls to Data() are expensive enough that you should cache it.
+  size_t size() const;
+  void *Data();
+  void Send(size_t length,
+            aos::monotonic_clock::time_point monotonic_remote_time =
+                aos::monotonic_clock::min_time,
+            aos::realtime_clock::time_point realtime_remote_time =
+                aos::realtime_clock::min_time,
+            uint32_t remote_queue_index = 0xffffffff,
+            aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
+            aos::realtime_clock::time_point *realtime_sent_time = nullptr,
+            uint32_t *queue_index = nullptr);
+
+  // Sends up to length data.  Does not wakeup the target.
+  void Send(const char *data, size_t length,
+            aos::monotonic_clock::time_point monotonic_remote_time =
+                aos::monotonic_clock::min_time,
+            aos::realtime_clock::time_point realtime_remote_time =
+                aos::realtime_clock::min_time,
+            uint32_t remote_queue_index = 0xffffffff,
+            aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
+            aos::realtime_clock::time_point *realtime_sent_time = nullptr,
+            uint32_t *queue_index = nullptr);
+
+  int buffer_index() const;
+
+ private:
+  LocklessQueueSender(LocklessQueueMemory *memory);
+
+  // Pointer to the backing memory.
+  LocklessQueueMemory *memory_ = nullptr;
+
+  // Index into the sender list.
+  int sender_index_ = -1;
+};
+
+// Pinner for blocks of data.  The resources associated with a pinner are
+// scoped to this object's lifetime.
+class LocklessQueuePinner {
+ public:
+  LocklessQueuePinner(const LocklessQueuePinner &) = delete;
+  LocklessQueuePinner &operator=(const LocklessQueuePinner &) = delete;
+  LocklessQueuePinner(LocklessQueuePinner &&other)
+      : memory_(other.memory_),
+        const_memory_(other.const_memory_),
+        pinner_index_(other.pinner_index_) {
+    other.pinner_index_ = -1;
+  }
+  LocklessQueuePinner &operator=(LocklessQueuePinner &&other) {
+    std::swap(memory_, other.memory_);
+    std::swap(const_memory_, other.const_memory_);
+    std::swap(pinner_index_, other.pinner_index_);
+    return *this;
+  }
+
+  ~LocklessQueuePinner();
+
+  // Creates a pinner.  If we couldn't allocate a pinner, returns nullopt.
+  // TODO(austin): Change the API if we find ourselves with more errors.
+  static std::optional<LocklessQueuePinner> Make(LocklessQueue queue);
+
+  // Attempts to pin the message at queue_index.
+  // Un-pins the previous message.
+  // Returns the buffer index (non-negative) if it succeeds.
+  // Returns -1 if that message is no longer in the queue.
+  int PinIndex(uint32_t queue_index);
+
+  // Read at most size() bytes of data into the memory pointed to by Data().
+  // Note: calls to Data() are expensive enough that you should cache it.
+  // Don't call Data() before a successful PinIndex call.
+  size_t size() const;
+  const void *Data() const;
+
+ private:
+  LocklessQueuePinner(LocklessQueueMemory *memory,
+                      const LocklessQueueMemory *const_memory);
+
+  // Pointer to the backing memory.
+  LocklessQueueMemory *memory_ = nullptr;
+  const LocklessQueueMemory *const_memory_ = nullptr;
+
+  // Index into the pinner list.
+  int pinner_index_ = -1;
+};
+
+class LocklessQueueReader {
+ public:
+  enum class Result { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
+
+  LocklessQueueReader(LocklessQueue queue) : memory_(queue.const_memory()) {
+    queue.Initialize();
+  }
+
+  // If you ask for a queue index 2 past the newest, you will still get
+  // NOTHING_NEW until that gets overwritten with new data.  If you ask for an
+  // element newer than QueueSize() from the current message, we consider it
+  // behind by a large amount and return TOO_OLD.  If the message is modified
+  // out from underneath us as we read it, return OVERWROTE.
+  //
+  // data may be nullptr to indicate the data should not be copied.
+  Result Read(uint32_t queue_index,
+              ::aos::monotonic_clock::time_point *monotonic_sent_time,
+              ::aos::realtime_clock::time_point *realtime_sent_time,
+              ::aos::monotonic_clock::time_point *monotonic_remote_time,
+              ::aos::realtime_clock::time_point *realtime_remote_time,
+              uint32_t *remote_queue_index, size_t *length, char *data) const;
+
+  // Returns the index to the latest queue message.  Returns empty_queue_index()
+  // if there are no messages in the queue.  Do note that this index wraps if
+  // more than 2^32 messages are sent.
+  QueueIndex LatestIndex() const;
+
+ private:
+  const LocklessQueueMemory *const memory_;
+};
+
+// Returns the number of messages which are logically in the queue at a time.
+size_t LocklessQueueSize(const LocklessQueueMemory *memory);
+
+// Returns the number of bytes queue users are allowed to read/write within each
+// message.
+size_t LocklessQueueMessageDataSize(const LocklessQueueMemory *memory);
+
+// TODO(austin): Return the oldest queue index.  This lets us catch up nicely
+// if we got behind.
+// The easiest way to implement this is likely going to be to reserve the
+// first modulo of values for the initial time around, and never reuse them.
+// That lets us do a simple atomic read of the next index and deduce what has
+// happened.  It will involve the simplest atomic operations.
+
+// TODO(austin): Make it so we can find the indices which were sent just
+// before and after a time with a binary search.
+
+// Prints to stdout the data inside the queue for debugging.
+void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
+
 }  // namespace ipc_lib
 }  // namespace aos
 
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index b7cdfee..b4bb66a 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -487,6 +487,9 @@
   }
   return false;
 }
+
+static int kPinnedMessageIndex = 0;
+
 }  // namespace
 
 // Tests that death during sends is recovered from correctly.
@@ -503,36 +506,47 @@
   LocklessQueueConfiguration config;
   config.num_watchers = 2;
   config.num_senders = 2;
-  config.queue_size = 4;
+  config.num_pinners = 1;
+  config.queue_size = 2;
   config.message_data_size = 32;
 
   TestShmRobustness(
       config,
       [config, tid](void *memory) {
         // Initialize the queue and grab the tid.
-        LocklessQueue queue(
+        LocklessQueue(
             reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
-            config);
+            reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
+            config)
+            .Initialize();
         *tid = gettid();
       },
       [config](void *memory) {
-        // Now try to write 2 messages.  We will get killed a bunch as this
-        // tries to happen.
         LocklessQueue queue(
             reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
+            reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
             config);
-        LocklessQueue::Sender sender = queue.MakeSender().value();
-        for (int i = 0; i < 2; ++i) {
+        // Now try to write some messages.  We will get killed a bunch as this
+        // tries to happen.
+        LocklessQueueSender sender = LocklessQueueSender::Make(queue).value();
+        LocklessQueuePinner pinner = LocklessQueuePinner::Make(queue).value();
+        for (int i = 0; i < 5; ++i) {
           char data[100];
           size_t s = snprintf(data, sizeof(data), "foobar%d", i + 1);
           sender.Send(data, s + 1);
+          // Pin a message, so when we keep writing we will exercise the pinning
+          // logic.
+          if (i == 1) {
+            CHECK_EQ(pinner.PinIndex(1), kPinnedMessageIndex);
+          }
         }
       },
       [config, tid](void *raw_memory) {
+        ::aos::ipc_lib::LocklessQueueMemory *const memory =
+            reinterpret_cast<::aos::ipc_lib::LocklessQueueMemory *>(raw_memory);
         // Confirm that we can create 2 senders (the number in the queue), and
         // send a message.  And that all the messages in the queue are valid.
-        ::aos::ipc_lib::LocklessQueueMemory *memory =
-            reinterpret_cast<::aos::ipc_lib::LocklessQueueMemory *>(raw_memory);
+        LocklessQueue queue(memory, memory, config);
 
         bool print = false;
 
@@ -550,25 +564,42 @@
         }
 
         if (print) {
+          printf("Bad version:\n");
           PrintLocklessQueueMemory(memory);
         }
 
-        LocklessQueue queue(memory, config);
         // Building and destroying a sender will clean up the queue.
-        { LocklessQueue::Sender sender = queue.MakeSender().value(); }
+        LocklessQueueSender::Make(queue).value();
 
         if (print) {
           printf("Cleaned up version:\n");
           PrintLocklessQueueMemory(memory);
         }
 
+        LocklessQueueReader reader(queue);
+
+        // Verify that the pinned message still has its contents. Note that we
+        // need to do this _before_ sending more messages, because the pinner
+        // has been cleaned up.
         {
-          LocklessQueue::Sender sender = queue.MakeSender().value();
+          const Message *const message =
+              memory->GetMessage(Index(1, kPinnedMessageIndex));
+          const auto queue_index =
+              message->header.queue_index.Load(memory->queue_size());
+          if (queue_index.valid()) {
+            const char *const data = message->data(memory->message_data_size());
+            EXPECT_EQ(data[LocklessQueueMessageDataSize(memory) -
+                           message->header.length + 6],
+                      '2');
+          }
+        }
+
+        {
+          LocklessQueueSender sender = LocklessQueueSender::Make(queue).value();
           {
             // Make a second sender to confirm that the slot was freed.
             // If the sender doesn't get cleaned up, this will fail.
-            LocklessQueue queue2(memory, config);
-            queue2.MakeSender().value();
+            LocklessQueueSender::Make(queue).value();
           }
 
           // Send a message to make sure that the queue still works.
@@ -590,25 +621,33 @@
           char read_data[1024];
           size_t length;
 
-          LocklessQueue::ReadResult read_result =
-              queue.Read(i, &monotonic_sent_time, &realtime_sent_time,
-                         &monotonic_remote_time, &realtime_remote_time,
-                         &remote_queue_index, &length, &(read_data[0]));
+          LocklessQueueReader::Result read_result =
+              reader.Read(i, &monotonic_sent_time, &realtime_sent_time,
+                          &monotonic_remote_time, &realtime_remote_time,
+                          &remote_queue_index, &length, &(read_data[0]));
 
-          if (read_result != LocklessQueue::ReadResult::GOOD) {
+          if (read_result != LocklessQueueReader::Result::GOOD) {
+            if (read_result == LocklessQueueReader::Result::TOO_OLD) {
+              ++i;
+              continue;
+            }
+            CHECK(read_result == LocklessQueueReader::Result::NOTHING_NEW)
+                << ": " << static_cast<int>(read_result);
             break;
           }
 
-          EXPECT_GT(read_data[queue.message_data_size() - length + 6],
-                    last_data)
+          EXPECT_GT(
+              read_data[LocklessQueueMessageDataSize(memory) - length + 6],
+              last_data)
               << ": Got " << read_data;
-          last_data = read_data[queue.message_data_size() - length + 6];
+          last_data =
+              read_data[LocklessQueueMessageDataSize(memory) - length + 6];
 
           ++i;
         }
 
         // Confirm our message got through.
-        EXPECT_EQ(last_data, '9');
+        EXPECT_EQ(last_data, '9') << ": Got through " << i;
       },
       /* prepare_in_child = true */ true);
 }
diff --git a/aos/ipc_lib/lockless_queue_memory.h b/aos/ipc_lib/lockless_queue_memory.h
index a10609c..bb995b9 100644
--- a/aos/ipc_lib/lockless_queue_memory.h
+++ b/aos/ipc_lib/lockless_queue_memory.h
@@ -31,6 +31,8 @@
   size_t num_watchers() const { return config.num_watchers; }
   // Size of the sender list.
   size_t num_senders() const { return config.num_senders; }
+  // Size of the pinner list.
+  size_t num_pinners() const { return config.num_pinners; }
 
   // Number of messages logically in the queue at a time.
   // List of pointers into the messages list.
@@ -60,9 +62,10 @@
   // writing:
   //
   // AtomicIndex queue[config.queue_size];
-  // Message messages[config.queue_size + config.num_senders];
+  // Message messages[config.num_messages()];
   // Watcher watchers[config.num_watchers];
   // Sender senders[config.num_senders];
+  // Pinner pinners[config.num_pinners];
 
   static constexpr size_t kDataAlignment = alignof(std::max_align_t);
 
@@ -72,27 +75,49 @@
   alignas(kDataAlignment) char data[];
 
   // Memory size functions for all 4 lists.
-  size_t SizeOfQueue() { return SizeOfQueue(config); }
+  size_t SizeOfQueue() const { return SizeOfQueue(config); }
   static size_t SizeOfQueue(LocklessQueueConfiguration config) {
     return AlignmentRoundUp(sizeof(AtomicIndex) * config.queue_size);
   }
 
-  size_t SizeOfMessages() { return SizeOfMessages(config); }
+  size_t SizeOfMessages() const { return SizeOfMessages(config); }
   static size_t SizeOfMessages(LocklessQueueConfiguration config) {
     return AlignmentRoundUp(config.message_size() * config.num_messages());
   }
 
-  size_t SizeOfWatchers() { return SizeOfWatchers(config); }
+  size_t SizeOfWatchers() const { return SizeOfWatchers(config); }
   static size_t SizeOfWatchers(LocklessQueueConfiguration config) {
     return AlignmentRoundUp(sizeof(Watcher) * config.num_watchers);
   }
 
-  size_t SizeOfSenders() { return SizeOfSenders(config); }
+  size_t SizeOfSenders() const { return SizeOfSenders(config); }
   static size_t SizeOfSenders(LocklessQueueConfiguration config) {
     return AlignmentRoundUp(sizeof(Sender) * config.num_senders);
   }
 
-  // Getters for each of the 4 lists.
+  size_t SizeOfPinners() const { return SizeOfPinners(config); }
+  static size_t SizeOfPinners(LocklessQueueConfiguration config) {
+    return AlignmentRoundUp(sizeof(Pinner) * config.num_pinners);
+  }
+
+  // Getters for each of the lists.
+
+  Pinner *GetPinner(size_t pinner_index) {
+    static_assert(alignof(Pinner) <= kDataAlignment,
+                  "kDataAlignment is too small");
+    return reinterpret_cast<Pinner *>(
+        &data[0] + SizeOfQueue() + SizeOfMessages() + SizeOfWatchers() +
+        SizeOfSenders() + pinner_index * sizeof(Pinner));
+  }
+
+  const Pinner *GetPinner(size_t pinner_index) const {
+    static_assert(alignof(const Pinner) <= kDataAlignment,
+                  "kDataAlignment is too small");
+    return reinterpret_cast<const Pinner *>(
+        &data[0] + SizeOfQueue() + SizeOfMessages() + SizeOfWatchers() +
+        SizeOfSenders() + pinner_index * sizeof(Pinner));
+  }
+
   Sender *GetSender(size_t sender_index) {
     static_assert(alignof(Sender) <= kDataAlignment,
                   "kDataAlignment is too small");
@@ -109,15 +134,29 @@
                                        watcher_index * sizeof(Watcher));
   }
 
+  const Watcher *GetWatcher(size_t watcher_index) const {
+    static_assert(alignof(const Watcher) <= kDataAlignment,
+                  "kDataAlignment is too small");
+    return reinterpret_cast<const Watcher *>(&data[0] + SizeOfQueue() +
+                                             SizeOfMessages() +
+                                             watcher_index * sizeof(Watcher));
+  }
+
   AtomicIndex *GetQueue(uint32_t index) {
     static_assert(alignof(AtomicIndex) <= kDataAlignment,
                   "kDataAlignment is too small");
     return reinterpret_cast<AtomicIndex *>(&data[0] +
                                            sizeof(AtomicIndex) * index);
   }
+  const AtomicIndex *GetQueue(uint32_t index) const {
+    static_assert(alignof(const AtomicIndex) <= kDataAlignment,
+                  "kDataAlignment is too small");
+    return reinterpret_cast<const AtomicIndex *>(&data[0] +
+                                                 sizeof(AtomicIndex) * index);
+  }
 
-  // There are num_senders + queue_size messages.  The free list is really the
-  // sender list, since those are messages available to be filled in and sent.
+  // There are num_messages() messages.  The free list is really the
+  // sender+pinner list, since those are messages available to be filled in.
   // This removes the need to find lost messages when a sender dies.
   Message *GetMessage(Index index) {
     static_assert(alignof(Message) <= kDataAlignment,
@@ -125,12 +164,19 @@
     return reinterpret_cast<Message *>(&data[0] + SizeOfQueue() +
                                        index.message_index() * message_size());
   }
+  const Message *GetMessage(Index index) const {
+    static_assert(alignof(const Message) <= kDataAlignment,
+                  "kDataAlignment is too small");
+    return reinterpret_cast<const Message *>(
+        &data[0] + SizeOfQueue() + index.message_index() * message_size());
+  }
 
   // Helpers to fetch messages from the queue.
-  Index LoadIndex(QueueIndex index) {
+  Index LoadIndex(QueueIndex index) const {
     return GetQueue(index.Wrapped())->Load();
   }
-  Message *GetMessage(QueueIndex index) {
+  Message *GetMessage(QueueIndex index) { return GetMessage(LoadIndex(index)); }
+  const Message *GetMessage(QueueIndex index) const {
     return GetMessage(LoadIndex(index));
   }
 
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index 65b2a15..e1e2516 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -44,6 +44,7 @@
   LocklessQueueTest() {
     config_.num_watchers = 10;
     config_.num_senders = 100;
+    config_.num_pinners = 5;
     config_.queue_size = 10000;
     // Exercise the alignment code.  This would throw off alignment.
     config_.message_data_size = 101;
@@ -55,15 +56,16 @@
     Reset();
   }
 
-  LocklessQueueMemory *get_memory() {
-    return reinterpret_cast<LocklessQueueMemory *>(&(memory_[0]));
+  LocklessQueue queue() {
+    return LocklessQueue(reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
+                         reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
+                         config_);
   }
 
-  void Reset() { memset(get_memory(), 0, LocklessQueueMemorySize(config_)); }
+  void Reset() { memset(&memory_[0], 0, LocklessQueueMemorySize(config_)); }
 
   // Runs until the signal is received.
   void RunUntilWakeup(Event *ready, int priority) {
-    LocklessQueue queue(get_memory(), config_);
     internal::EPoll epoll;
     SignalFd signalfd({kWakeupSignal});
 
@@ -74,16 +76,18 @@
       epoll.Quit();
     });
 
-    // Register to be woken up *after* the signalfd is catching the signals.
-    queue.RegisterWakeup(priority);
+    {
+      // Register to be woken up *after* the signalfd is catching the signals.
+      LocklessQueueWatcher watcher =
+          LocklessQueueWatcher::Make(queue(), priority).value();
 
-    // And signal we are now ready.
-    ready->Set();
+      // And signal we are now ready.
+      ready->Set();
 
-    epoll.Run();
+      epoll.Run();
 
-    // Cleanup.
-    queue.UnregisterWakeup();
+      // Cleanup, ensuring the watcher is destroyed before the signalfd.
+    }
     epoll.DeleteFd(signalfd.fd());
   }
 
@@ -98,36 +102,35 @@
 
 // Tests that wakeup doesn't do anything if nothing was registered.
 TEST_F(LocklessQueueTest, NoWatcherWakeup) {
-  LocklessQueue queue(get_memory(), config_);
+  LocklessQueueWakeUpper wake_upper(queue());
 
-  EXPECT_EQ(queue.Wakeup(7), 0);
+  EXPECT_EQ(wake_upper.Wakeup(7), 0);
 }
 
 // Tests that wakeup doesn't do anything if a wakeup was registered and then
 // unregistered.
 TEST_F(LocklessQueueTest, UnregisteredWatcherWakeup) {
-  LocklessQueue queue(get_memory(), config_);
+  LocklessQueueWakeUpper wake_upper(queue());
 
-  queue.RegisterWakeup(5);
-  queue.UnregisterWakeup();
+  { LocklessQueueWatcher::Make(queue(), 5).value(); }
 
-  EXPECT_EQ(queue.Wakeup(7), 0);
+  EXPECT_EQ(wake_upper.Wakeup(7), 0);
 }
 
 // Tests that wakeup doesn't do anything if the thread dies.
 TEST_F(LocklessQueueTest, DiedWatcherWakeup) {
-  LocklessQueue queue(get_memory(), config_);
+  LocklessQueueWakeUpper wake_upper(queue());
 
   ::std::thread([this]() {
     // Use placement new so the destructor doesn't get run.
-    ::std::aligned_storage<sizeof(LocklessQueue), alignof(LocklessQueue)>::type
-        data;
-    LocklessQueue *q = new (&data) LocklessQueue(get_memory(), config_);
-    // Register a wakeup.
-    q->RegisterWakeup(5);
-  }).join();
+    ::std::aligned_storage<sizeof(LocklessQueueWatcher),
+                           alignof(LocklessQueueWatcher)>::type data;
+    new (&data)
+        LocklessQueueWatcher(LocklessQueueWatcher::Make(queue(), 5).value());
+  })
+      .join();
 
-  EXPECT_EQ(queue.Wakeup(7), 0);
+  EXPECT_EQ(wake_upper.Wakeup(7), 0);
 }
 
 struct WatcherState {
@@ -154,16 +157,13 @@
 
     WatcherState *s = &queues.back();
     queues.back().t = ::std::thread([this, &cleanup, s]() {
-      LocklessQueue q(get_memory(), config_);
-      EXPECT_TRUE(q.RegisterWakeup(0));
+      LocklessQueueWatcher q = LocklessQueueWatcher::Make(queue(), 0).value();
 
       // Signal that this thread is ready.
       s->ready.Set();
 
       // And wait until we are asked to shut down.
       cleanup.Wait();
-
-      q.UnregisterWakeup();
     });
   }
 
@@ -173,12 +173,9 @@
   }
 
   // Now try to allocate another one.  This will fail.
-  {
-    LocklessQueue queue(get_memory(), config_);
-    EXPECT_FALSE(queue.RegisterWakeup(0));
-  }
+  EXPECT_FALSE(LocklessQueueWatcher::Make(queue(), 0));
 
-  // Trigger the threads to cleanup their resources, and wait unti they are
+  // Trigger the threads to cleanup their resources, and wait until they are
   // done.
   cleanup.Set();
   for (WatcherState &w : queues) {
@@ -186,23 +183,16 @@
   }
 
   // We should now be able to allocate a wakeup.
-  {
-    LocklessQueue queue(get_memory(), config_);
-    EXPECT_TRUE(queue.RegisterWakeup(0));
-    queue.UnregisterWakeup();
-  }
+  EXPECT_TRUE(LocklessQueueWatcher::Make(queue(), 0));
 }
 
 // Tests that too many watchers dies like expected.
 TEST_F(LocklessQueueTest, TooManySenders) {
-  ::std::vector<::std::unique_ptr<LocklessQueue>> queues;
-  ::std::vector<LocklessQueue::Sender> senders;
+  ::std::vector<LocklessQueueSender> senders;
   for (size_t i = 0; i < config_.num_senders; ++i) {
-    queues.emplace_back(new LocklessQueue(get_memory(), config_));
-    senders.emplace_back(queues.back()->MakeSender().value());
+    senders.emplace_back(LocklessQueueSender::Make(queue()).value());
   }
-  queues.emplace_back(new LocklessQueue(get_memory(), config_));
-  EXPECT_FALSE(queues.back()->MakeSender());
+  EXPECT_FALSE(LocklessQueueSender::Make(queue()));
 }
 
 // Now, start 2 threads and have them receive the signals.
@@ -211,7 +201,7 @@
   EXPECT_LE(kWakeupSignal, SIGRTMAX);
   EXPECT_GE(kWakeupSignal, SIGRTMIN);
 
-  LocklessQueue queue(get_memory(), config_);
+  LocklessQueueWakeUpper wake_upper(queue());
 
   // Event used to make sure the thread is ready before the test starts.
   Event ready1;
@@ -224,7 +214,7 @@
   ready1.Wait();
   ready2.Wait();
 
-  EXPECT_EQ(queue.Wakeup(3), 2);
+  EXPECT_EQ(wake_upper.Wakeup(3), 2);
 
   t1.join();
   t2.join();
@@ -236,15 +226,14 @@
 
 // Do a simple send test.
 TEST_F(LocklessQueueTest, Send) {
-  LocklessQueue queue(get_memory(), config_);
-
-  LocklessQueue::Sender sender = queue.MakeSender().value();
+  LocklessQueueSender sender = LocklessQueueSender::Make(queue()).value();
+  LocklessQueueReader reader(queue());
 
   // Send enough messages to wrap.
   for (int i = 0; i < 20000; ++i) {
     // Confirm that the queue index makes sense given the number of sends.
-    EXPECT_EQ(queue.LatestQueueIndex().index(),
-              i == 0 ? LocklessQueue::empty_queue_index().index() : i - 1);
+    EXPECT_EQ(reader.LatestIndex().index(),
+              i == 0 ? QueueIndex::Invalid().index() : i - 1);
 
     // Send a trivial piece of data.
     char data[100];
@@ -253,7 +242,7 @@
 
     // Confirm that the queue index still makes sense.  This is easier since the
     // empty case has been handled.
-    EXPECT_EQ(queue.LatestQueueIndex().index(), i);
+    EXPECT_EQ(reader.LatestIndex().index(), i);
 
     // Read a result from 5 in the past.
     ::aos::monotonic_clock::time_point monotonic_sent_time;
@@ -270,15 +259,15 @@
     } else {
       index = index.IncrementBy(i - 5);
     }
-    LocklessQueue::ReadResult read_result =
-        queue.Read(index.index(), &monotonic_sent_time, &realtime_sent_time,
-                   &monotonic_remote_time, &realtime_remote_time,
-                   &remote_queue_index, &length, &(read_data[0]));
+    LocklessQueueReader::Result read_result =
+        reader.Read(index.index(), &monotonic_sent_time, &realtime_sent_time,
+                    &monotonic_remote_time, &realtime_remote_time,
+                    &remote_queue_index, &length, &(read_data[0]));
 
     // This should either return GOOD, or TOO_OLD if it is before the start of
     // the queue.
-    if (read_result != LocklessQueue::ReadResult::GOOD) {
-      EXPECT_EQ(read_result, LocklessQueue::ReadResult::TOO_OLD);
+    if (read_result != LocklessQueueReader::Result::GOOD) {
+      EXPECT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
     }
   }
 }
@@ -294,9 +283,8 @@
 
   const chrono::seconds print_frequency(FLAGS_print_rate);
 
-  QueueRacer racer(get_memory(), FLAGS_thread_count, kNumMessages, config_);
-  const monotonic_clock::time_point start_time =
-      monotonic_clock::now();
+  QueueRacer racer(queue(), FLAGS_thread_count, kNumMessages);
+  const monotonic_clock::time_point start_time = monotonic_clock::now();
   const monotonic_clock::time_point end_time =
       start_time + chrono::seconds(FLAGS_duration);
 
@@ -332,7 +320,7 @@
 // Send enough messages to wrap the 32 bit send counter.
 TEST_F(LocklessQueueTest, WrappedSend) {
   uint64_t kNumMessages = 0x100010000ul;
-  QueueRacer racer(get_memory(), 1, kNumMessages, config_);
+  QueueRacer racer(queue(), 1, kNumMessages);
 
   const monotonic_clock::time_point start_time = monotonic_clock::now();
   EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index f5b3d7e..fcc8668 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -24,19 +24,16 @@
   uint64_t event_count = ::std::numeric_limits<uint64_t>::max();
 };
 
-QueueRacer::QueueRacer(LocklessQueueMemory *memory, int num_threads,
-                       uint64_t num_messages, LocklessQueueConfiguration config)
-    : memory_(memory),
-      num_threads_(num_threads),
-      num_messages_(num_messages),
-      config_(config) {
+QueueRacer::QueueRacer(LocklessQueue queue, int num_threads,
+                       uint64_t num_messages)
+    : queue_(queue), num_threads_(num_threads), num_messages_(num_messages) {
   Reset();
 }
 
 void QueueRacer::RunIteration(bool race_reads, int write_wrap_count) {
   const bool will_wrap = num_messages_ * num_threads_ *
                              static_cast<uint64_t>(1 + write_wrap_count) >
-                         config_.queue_size;
+                         queue_.config().queue_size;
 
   // Clear out shmem.
   Reset();
@@ -52,13 +49,13 @@
   ::std::vector<ThreadState> threads(num_threads_);
 
   ::std::thread queue_index_racer([this, &poll_index]() {
-    LocklessQueue queue(memory_, config_);
+    LocklessQueueReader reader(queue_);
 
     // Track the number of times we wrap, and cache the modulo.
     uint64_t wrap_count = 0;
     uint32_t last_queue_index = 0;
     const uint32_t max_queue_index =
-        QueueIndex::MaxIndex(0xffffffffu, queue.QueueSize());
+        QueueIndex::MaxIndex(0xffffffffu, queue_.config().queue_size);
     while (poll_index) {
       // We want to read everything backwards.  This will give us conservative
       // bounds.  And with enough time and randomness, we will see all the cases
@@ -81,16 +78,14 @@
       //
       // So, grab them in order.
       const uint64_t finished_writes = finished_writes_.load();
-      const QueueIndex latest_queue_index_queue_index =
-          queue.LatestQueueIndex();
+      const QueueIndex latest_queue_index_queue_index = reader.LatestIndex();
       const uint64_t started_writes = started_writes_.load();
 
       const uint32_t latest_queue_index_uint32_t =
           latest_queue_index_queue_index.index();
       uint64_t latest_queue_index = latest_queue_index_uint32_t;
 
-      if (latest_queue_index_queue_index !=
-          LocklessQueue::empty_queue_index()) {
+      if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
         // If we got smaller, we wrapped.
         if (latest_queue_index_uint32_t < last_queue_index) {
           ++wrap_count;
@@ -107,22 +102,19 @@
 
       // If we are at the beginning, the queue needs to always return empty.
       if (started_writes == 0) {
-        EXPECT_EQ(latest_queue_index_queue_index,
-                  LocklessQueue::empty_queue_index());
+        EXPECT_EQ(latest_queue_index_queue_index, QueueIndex::Invalid());
         EXPECT_EQ(finished_writes, 0);
       } else {
         if (finished_writes == 0) {
           // Plausible to be at the beginning, in which case we don't have
           // anything to check.
-          if (latest_queue_index_queue_index !=
-              LocklessQueue::empty_queue_index()) {
+          if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
             // Otherwise, we have started.  The queue can't have any more
             // entries than this.
             EXPECT_GE(started_writes, latest_queue_index + 1);
           }
         } else {
-          EXPECT_NE(latest_queue_index_queue_index,
-                    LocklessQueue::empty_queue_index());
+          EXPECT_NE(latest_queue_index_queue_index, QueueIndex::Invalid());
           // latest_queue_index is an index, not a count.  So it always reads 1
           // low.
           EXPECT_GE(latest_queue_index + 1, finished_writes);
@@ -139,42 +131,62 @@
     } else {
       t.event_count = 0;
     }
-    t.thread =
-        ::std::thread([this, &t, thread_index, &run, write_wrap_count]() {
-          // Build up a sender.
-          LocklessQueue queue(memory_, config_);
-          LocklessQueue::Sender sender = queue.MakeSender().value();
+    t.thread = ::std::thread([this, &t, thread_index, &run,
+                              write_wrap_count]() {
+      // Build up a sender.
+      LocklessQueueSender sender = LocklessQueueSender::Make(queue_).value();
+      CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
 
-          // Signal that we are ready to start sending.
-          t.ready.Set();
+      // Signal that we are ready to start sending.
+      t.ready.Set();
 
-          // Wait until signaled to start running.
-          run.Wait();
+      // Wait until signaled to start running.
+      run.Wait();
 
-          // Gogogo!
-          for (uint64_t i = 0;
-               i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count);
-               ++i) {
-            char data[sizeof(ThreadPlusCount)];
-            ThreadPlusCount tpc;
-            tpc.thread = thread_index;
-            tpc.count = i;
-
-            memcpy(data, &tpc, sizeof(ThreadPlusCount));
-
-            if (i % 0x800000 == 0x100000) {
-              fprintf(stderr, "Sent %" PRIu64 ", %f %%\n", i,
-                      static_cast<double>(i) /
-                          static_cast<double>(num_messages_ *
-                                              (1 + write_wrap_count)) *
-                          100.0);
+      // Gogogo!
+      for (uint64_t i = 0;
+           i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count);
+           ++i) {
+        char *const data = static_cast<char *>(sender.Data()) + sender.size() -
+                           sizeof(ThreadPlusCount);
+        const char fill = (i + 55) & 0xFF;
+        memset(data, fill, sizeof(ThreadPlusCount));
+        {
+          bool found_nonzero = false;
+          for (size_t i = 0; i < sizeof(ThreadPlusCount); ++i) {
+            if (data[i] != fill) {
+              found_nonzero = true;
             }
-
-            ++started_writes_;
-            sender.Send(data, sizeof(ThreadPlusCount));
-            ++finished_writes_;
           }
-        });
+          CHECK(!found_nonzero) << ": Somebody else is writing to our buffer";
+        }
+
+        ThreadPlusCount tpc;
+        tpc.thread = thread_index;
+        tpc.count = i;
+
+        memcpy(data, &tpc, sizeof(ThreadPlusCount));
+
+        if (i % 0x800000 == 0x100000) {
+          fprintf(
+              stderr, "Sent %" PRIu64 ", %f %%\n", i,
+              static_cast<double>(i) /
+                  static_cast<double>(num_messages_ * (1 + write_wrap_count)) *
+                  100.0);
+        }
+
+        ++started_writes_;
+        sender.Send(sizeof(ThreadPlusCount));
+        // Blank out the new scratch buffer, to catch other people using it.
+        {
+          char *const new_data = static_cast<char *>(sender.Data()) +
+                                 sender.size() - sizeof(ThreadPlusCount);
+          const char new_fill = ~fill;
+          memset(new_data, new_fill, sizeof(ThreadPlusCount));
+        }
+        ++finished_writes_;
+      }
+    });
     ++thread_index;
   }
 
@@ -234,17 +246,16 @@
 void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
                             ::std::vector<ThreadState> *threads) {
   // Now read back the results to double check.
-  LocklessQueue queue(memory_, config_);
-
-  const bool will_wrap =
-      num_messages_ * num_threads_ * (1 + write_wrap_count) > queue.QueueSize();
+  LocklessQueueReader reader(queue_);
+  const bool will_wrap = num_messages_ * num_threads_ * (1 + write_wrap_count) >
+                         LocklessQueueSize(queue_.memory());
 
   monotonic_clock::time_point last_monotonic_sent_time =
       monotonic_clock::epoch();
   uint64_t initial_i = 0;
   if (will_wrap) {
     initial_i = (1 + write_wrap_count) * num_messages_ * num_threads_ -
-                queue.QueueSize();
+                LocklessQueueSize(queue_.memory());
   }
 
   for (uint64_t i = initial_i;
@@ -258,27 +269,28 @@
     char read_data[1024];
 
     // Handle overflowing the message count for the wrap test.
-    const uint32_t wrapped_i = i % static_cast<size_t>(QueueIndex::MaxIndex(
-                                       0xffffffffu, queue.QueueSize()));
-    LocklessQueue::ReadResult read_result =
-        queue.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
-                   &monotonic_remote_time, &realtime_remote_time,
-                   &remote_queue_index, &length, &(read_data[0]));
+    const uint32_t wrapped_i =
+        i % static_cast<size_t>(QueueIndex::MaxIndex(
+                0xffffffffu, LocklessQueueSize(queue_.memory())));
+    LocklessQueueReader::Result read_result =
+        reader.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
+                    &monotonic_remote_time, &realtime_remote_time,
+                    &remote_queue_index, &length, &(read_data[0]));
 
     if (race_reads) {
-      if (read_result == LocklessQueue::ReadResult::NOTHING_NEW) {
+      if (read_result == LocklessQueueReader::Result::NOTHING_NEW) {
         --i;
         continue;
       }
     }
 
     if (race_reads && will_wrap) {
-      if (read_result == LocklessQueue::ReadResult::TOO_OLD) {
+      if (read_result == LocklessQueueReader::Result::TOO_OLD) {
         continue;
       }
     }
     // Every message should be good.
-    ASSERT_EQ(read_result, LocklessQueue::ReadResult::GOOD) << ": i is " << i;
+    ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD) << ": i is " << i;
 
     // And, confirm that time never went backwards.
     ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
@@ -289,7 +301,8 @@
 
     ThreadPlusCount tpc;
     ASSERT_EQ(length, sizeof(ThreadPlusCount));
-    memcpy(&tpc, read_data + queue.message_data_size() - length,
+    memcpy(&tpc,
+           read_data + LocklessQueueMessageDataSize(queue_.memory()) - length,
            sizeof(ThreadPlusCount));
 
     if (will_wrap) {
@@ -303,18 +316,18 @@
 
       if (race_reads) {
         // Make sure nothing goes backwards.  Really not much we can do here.
-        ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
-                                                                 << tpc.thread;
+        ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count)
+            << ": Thread " << tpc.thread;
         (*threads)[tpc.thread].event_count = tpc.count;
       } else {
         // Make sure nothing goes backwards.  Really not much we can do here.
-        ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
-                                                                 << tpc.thread;
+        ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
+            << ": Thread " << tpc.thread;
       }
     } else {
       // Confirm that we see every message counter from every thread.
-      ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
-                                                               << tpc.thread;
+      ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
+          << ": Thread " << tpc.thread;
     }
     ++(*threads)[tpc.thread].event_count;
   }
diff --git a/aos/ipc_lib/queue_racer.h b/aos/ipc_lib/queue_racer.h
index eaeedd4..7e92693 100644
--- a/aos/ipc_lib/queue_racer.h
+++ b/aos/ipc_lib/queue_racer.h
@@ -14,8 +14,7 @@
 // them together to all write at once.
 class QueueRacer {
  public:
-  QueueRacer(LocklessQueueMemory *memory, int num_threads,
-             uint64_t num_messages, LocklessQueueConfiguration config);
+  QueueRacer(LocklessQueue queue, int num_threads, uint64_t num_messages);
 
   // Runs an iteration of the race.
   //
@@ -35,13 +34,14 @@
   void RunIteration(bool race_reads, int write_wrap_count);
 
   size_t CurrentIndex() {
-    LocklessQueue queue(memory_, config_);
-    return queue.LatestQueueIndex().index();
+    return LocklessQueueReader(queue_).LatestIndex().index();
   }
 
  private:
   // Wipes the queue memory out so we get a clean start.
-  void Reset() { memset(memory_, 0, LocklessQueueMemorySize(config_)); }
+  void Reset() {
+    memset(queue_.memory(), 0, LocklessQueueMemorySize(queue_.config()));
+  }
 
   // This is a separate method so that when all the ASSERT_* methods, we still
   // clean up all the threads.  Otherwise we get an assert on the way out of
@@ -49,7 +49,7 @@
   void CheckReads(bool race_reads, int write_wrap_count,
                   ::std::vector<ThreadState> *threads);
 
-  LocklessQueueMemory *memory_;
+  LocklessQueue queue_;
   const uint64_t num_threads_;
   const uint64_t num_messages_;
 
@@ -60,8 +60,6 @@
   ::std::atomic<uint64_t> started_writes_;
   // Number of writes completed.
   ::std::atomic<uint64_t> finished_writes_;
-
-  const LocklessQueueConfiguration config_;
 };
 
 }  // namespace ipc_lib