Add the ability to pin shared memory for reads

This can increase performance for some specific use cases with large
messages.

Change-Id: I38deaf3ce85a70c0ac11510757d193fd39ad29bb
diff --git a/aos/configuration.cc b/aos/configuration.cc
index 927e9d3..834ab5b 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -299,6 +299,11 @@
         continue;
       }
 
+      CHECK_EQ(c->read_method() == ReadMethod::PIN, c->num_readers() != 0)
+          << ": num_readers may be set if and only if read_method is PIN,"
+             " if you want 0 readers do not set PIN: "
+          << CleanedChannelToString(c);
+
       // Attempt to insert the channel.
       auto result = channels.insert(CopyFlatBuffer(c));
       if (!result.second) {
diff --git a/aos/configuration.fbs b/aos/configuration.fbs
index 31d89e7..9a24c8a 100644
--- a/aos/configuration.fbs
+++ b/aos/configuration.fbs
@@ -42,6 +42,13 @@
   time_to_live:uint = 0;
 }
 
+enum ReadMethod : ubyte {
+  // Copy all the data out of shared memory into a local buffer for each reader.
+  COPY,
+  // Pin the data in shared memory and read directly from there.
+  PIN,
+}
+
 // Table representing a channel.  Channels are where data is published and
 // subscribed from.  The tuple of name, type is the identifying information.
 table Channel {
@@ -78,6 +85,14 @@
   // node responsible for logging it.  Empty implies the node this connection
   // is connecting to (i.e. name).
   logger_nodes:[string];
+
+  // The way messages are read from shared memory for this channel.
+  read_method:ReadMethod = COPY;
+
+  // Sets the maximum number of senders on a channel.
+  //
+  // Currently, this must be set if and only if read_method is PIN.
+  num_readers:int;
 }
 
 // Table to support renaming channel names.
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 39a6a54..0212eaa 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -243,7 +243,7 @@
 cc_test(
     name = "shm_event_loop_test",
     srcs = ["shm_event_loop_test.cc"],
-    shard_count = 5,
+    shard_count = 16,
     deps = [
         ":event_loop_param_test",
         ":shm_event_loop",
@@ -267,6 +267,7 @@
     name = "simulated_event_loop_test",
     srcs = ["simulated_event_loop_test.cc"],
     data = ["multinode_pingpong_config.json"],
+    shard_count = 4,
     deps = [
         ":event_loop_param_test",
         ":ping_lib",
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 8d0d0e2..606849a 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -560,6 +560,89 @@
   EXPECT_EQ(200, fetcher.get()->value());
 }
 
+// Verify that a fetcher still holds its data, even after falling behind.
+TEST_P(AbstractEventLoopTest, FetcherBehindData) {
+  auto send_loop = Make();
+  auto fetch_loop = Make();
+  auto sender = send_loop->MakeSender<TestMessage>("/test");
+  Fetcher<TestMessage> fetcher = fetch_loop->MakeFetcher<TestMessage>("/test");
+  {
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(1);
+    ASSERT_TRUE(msg.Send(builder.Finish()));
+  }
+  ASSERT_TRUE(fetcher.Fetch());
+  EXPECT_EQ(1, fetcher.get()->value());
+  for (int i = 0; i < 300; ++i) {
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(i + 2);
+    ASSERT_TRUE(msg.Send(builder.Finish()));
+  }
+  EXPECT_EQ(1, fetcher.get()->value());
+}
+
+// Try a bunch of orderings of operations with fetchers and senders. Verify that
+// all the fetchers have the correct data at each step.
+TEST_P(AbstractEventLoopTest, FetcherPermutations) {
+  for (int max_save = 0; max_save < 5; ++max_save) {
+    SCOPED_TRACE("max_save=" + std::to_string(max_save));
+
+    auto send_loop = Make();
+    auto fetch_loop = Make();
+    auto sender = send_loop->MakeSender<TestMessage>("/test");
+    const auto send_message = [&sender](int i) {
+      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+      builder.add_value(i);
+      ASSERT_TRUE(msg.Send(builder.Finish()));
+    };
+    std::vector<Fetcher<TestMessage>> fetchers;
+    for (int i = 0; i < 10; ++i) {
+      fetchers.emplace_back(fetch_loop->MakeFetcher<TestMessage>("/test"));
+    }
+    send_message(1);
+    for (auto &fetcher : fetchers) {
+      ASSERT_TRUE(fetcher.Fetch());
+      EXPECT_EQ(1, fetcher.get()->value());
+    }
+
+    for (int save = 1; save <= max_save; ++save) {
+      SCOPED_TRACE("save=" + std::to_string(save));
+      send_message(100 + save);
+      for (size_t i = 0; i < fetchers.size() - save; ++i) {
+        SCOPED_TRACE("fetcher=" + std::to_string(i));
+        ASSERT_TRUE(fetchers[i].Fetch());
+        EXPECT_EQ(100 + save, fetchers[i].get()->value());
+      }
+      for (size_t i = fetchers.size() - save; i < fetchers.size() - 1; ++i) {
+        SCOPED_TRACE("fetcher=" + std::to_string(i));
+        EXPECT_EQ(100 + (fetchers.size() - 1 - i), fetchers[i].get()->value());
+      }
+      EXPECT_EQ(1, fetchers.back().get()->value());
+    }
+
+    for (int i = 0; i < 300; ++i) {
+      send_message(200 + i);
+    }
+
+    for (size_t i = 0; i < fetchers.size() - max_save; ++i) {
+      SCOPED_TRACE("fetcher=" + std::to_string(i));
+      if (max_save > 0) {
+        EXPECT_EQ(100 + max_save, fetchers[i].get()->value());
+      } else {
+        EXPECT_EQ(1, fetchers[i].get()->value());
+      }
+    }
+    for (size_t i = fetchers.size() - max_save; i < fetchers.size() - 1; ++i) {
+      SCOPED_TRACE("fetcher=" + std::to_string(i));
+      EXPECT_EQ(100 + (fetchers.size() - 1 - i), fetchers[i].get()->value());
+    }
+    EXPECT_EQ(1, fetchers.back().get()->value());
+  }
+}
+
 // Verify that making a fetcher and watcher for "/test" succeeds.
 TEST_P(AbstractEventLoopTest, FetcherAndWatcher) {
   auto loop = Make();
@@ -642,7 +725,80 @@
   }
   EXPECT_DEATH({ loop->MakeSender<TestMessage>("/test"); },
                "Failed to create sender on \\{ \"name\": \"/test\", \"type\": "
-               "\"aos.TestMessage\" \\}, too many senders.");
+               "\"aos.TestMessage\"[^}]*\\ }, too many senders.");
+}
+
+// Verify that creating too many fetchers fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyFetchers) {
+  if (read_method() != ReadMethod::PIN) {
+    // Other read methods don't limit the number of readers, so just skip this.
+    return;
+  }
+
+  auto loop = Make();
+  std::vector<aos::Fetcher<TestMessage>> fetchers;
+  for (int i = 0; i < 10; ++i) {
+    fetchers.emplace_back(loop->MakeFetcher<TestMessage>("/test"));
+  }
+  EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); },
+               "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+               "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
+}
+
+// Verify that creating too many fetchers, split between two event loops, fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyFetchersTwoLoops) {
+  if (read_method() != ReadMethod::PIN) {
+    // Other read methods don't limit the number of readers, so just skip this.
+    return;
+  }
+
+  auto loop = Make();
+  auto loop2 = Make();
+  std::vector<aos::Fetcher<TestMessage>> fetchers;
+  for (int i = 0; i < 5; ++i) {
+    fetchers.emplace_back(loop->MakeFetcher<TestMessage>("/test"));
+    fetchers.emplace_back(loop2->MakeFetcher<TestMessage>("/test"));
+  }
+  EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); },
+               "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+               "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
+}
+
+// Verify that creating too many watchers fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyWatchers) {
+  if (read_method() != ReadMethod::PIN) {
+    // Other read methods don't limit the number of readers, so just skip this.
+    return;
+  }
+
+  std::vector<std::unique_ptr<EventLoop>> loops;
+  for (int i = 0; i < 10; ++i) {
+    loops.emplace_back(Make());
+    loops.back()->MakeWatcher("/test", [](const TestMessage &) {});
+  }
+  EXPECT_DEATH({ Make()->MakeWatcher("/test", [](const TestMessage &) {}); },
+               "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+               "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
+}
+
+// Verify that creating too many watchers and fetchers combined fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyWatchersAndFetchers) {
+  if (read_method() != ReadMethod::PIN) {
+    // Other read methods don't limit the number of readers, so just skip this.
+    return;
+  }
+
+  auto loop = Make();
+  std::vector<aos::Fetcher<TestMessage>> fetchers;
+  std::vector<std::unique_ptr<EventLoop>> loops;
+  for (int i = 0; i < 5; ++i) {
+    fetchers.emplace_back(loop->MakeFetcher<TestMessage>("/test"));
+    loops.emplace_back(Make());
+    loops.back()->MakeWatcher("/test", [](const TestMessage &) {});
+  }
+  EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); },
+               "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+               "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
 }
 
 // Verify that we can't create a sender inside OnRun.
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index cbd5cd1..f158e02 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -15,31 +15,30 @@
 class EventLoopTestFactory {
  public:
   EventLoopTestFactory()
-      : flatbuffer_(JsonToFlatbuffer("{\n"
-                                     "  \"channels\": [ \n"
-                                     "    {\n"
-                                     "      \"name\": \"/aos\",\n"
-                                     "      \"type\": \"aos.logging.LogMessageFbs\"\n"
-                                     "    },\n"
-                                     "    {\n"
-                                     "      \"name\": \"/aos\",\n"
-                                     "      \"type\": \"aos.timing.Report\"\n"
-                                     "    },\n"
-                                     "    {\n"
-                                     "      \"name\": \"/test\",\n"
-                                     "      \"type\": \"aos.TestMessage\"\n"
-                                     "    },\n"
-                                     "    {\n"
-                                     "      \"name\": \"/test1\",\n"
-                                     "      \"type\": \"aos.TestMessage\"\n"
-                                     "    },\n"
-                                     "    {\n"
-                                     "      \"name\": \"/test2\",\n"
-                                     "      \"type\": \"aos.TestMessage\"\n"
-                                     "    }\n"
-                                     "  ]\n"
-                                     "}\n",
-                                     Configuration::MiniReflectTypeTable())) {}
+      : flatbuffer_(JsonToFlatbuffer<Configuration>(R"config({
+  "channels": [
+    {
+      "name": "/aos",
+      "type": "aos.logging.LogMessageFbs"
+    },
+    {
+      "name": "/aos",
+      "type": "aos.timing.Report"
+    },
+    {
+      "name": "/test",
+      "type": "aos.TestMessage"
+    },
+    {
+      "name": "/test1",
+      "type": "aos.TestMessage"
+    },
+    {
+      "name": "/test2",
+      "type": "aos.TestMessage"
+    }
+  ]
+})config")) {}
 
   virtual ~EventLoopTestFactory() {}
 
@@ -58,8 +57,48 @@
   // Advances time by sleeping.  Can't be called from inside a loop.
   virtual void SleepFor(::std::chrono::nanoseconds duration) = 0;
 
+  void PinReads() {
+    static const std::string kJson = R"config({
+  "channels": [
+    {
+      "name": "/aos",
+      "type": "aos.logging.LogMessageFbs",
+      "read_method": "PIN",
+      "num_readers": 10
+    },
+    {
+      "name": "/aos",
+      "type": "aos.timing.Report",
+      "read_method": "PIN",
+      "num_readers": 10
+    },
+    {
+      "name": "/test",
+      "type": "aos.TestMessage",
+      "read_method": "PIN",
+      "num_readers": 10
+    },
+    {
+      "name": "/test1",
+      "type": "aos.TestMessage",
+      "read_method": "PIN",
+      "num_readers": 10
+    },
+    {
+      "name": "/test2",
+      "type": "aos.TestMessage",
+      "read_method": "PIN",
+      "num_readers": 10
+    }
+  ]
+})config";
+
+    flatbuffer_ = FlatbufferDetachedBuffer<Configuration>(
+        JsonToFlatbuffer(kJson, Configuration::MiniReflectTypeTable()));
+  }
+
   void EnableNodes(std::string_view my_node) {
-    std::string json = R"config({
+    static const std::string kJson = R"config({
   "channels": [
     {
       "name": "/aos/me",
@@ -127,7 +166,7 @@
 })config";
 
     flatbuffer_ = FlatbufferDetachedBuffer<Configuration>(
-        JsonToFlatbuffer(json, Configuration::MiniReflectTypeTable()));
+        JsonToFlatbuffer(kJson, Configuration::MiniReflectTypeTable()));
 
     my_node_ = configuration::GetNode(&flatbuffer_.message(), my_node);
   }
@@ -143,9 +182,16 @@
 };
 
 class AbstractEventLoopTestBase
-    : public ::testing::TestWithParam<std::function<EventLoopTestFactory *()>> {
+    : public ::testing::TestWithParam<
+          std::tuple<std::function<EventLoopTestFactory *()>, ReadMethod>> {
  public:
-  AbstractEventLoopTestBase() { factory_.reset(GetParam()()); }
+  AbstractEventLoopTestBase() : factory_(std::get<0>(GetParam())()) {
+    if (read_method() == ReadMethod::PIN) {
+      factory_->PinReads();
+    }
+  }
+
+  ReadMethod read_method() const { return std::get<1>(GetParam()); }
 
   ::std::unique_ptr<EventLoop> Make(std::string_view name = "") {
     std::string name_copy(name);
@@ -182,11 +228,8 @@
     end_timer->set_name("end");
   }
 
-  // You can implement all the usual fixture class members here.
-  // To access the test parameter, call GetParam() from class
-  // TestWithParam<T>.
  private:
-  ::std::unique_ptr<EventLoopTestFactory> factory_;
+  const ::std::unique_ptr<EventLoopTestFactory> factory_;
 
   int event_loop_count_ = 0;
 };
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 0aa82a1..b33fe98 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -96,7 +96,9 @@
 
     config_.num_watchers = channel->num_watchers();
     config_.num_senders = channel->num_senders();
-    config_.num_pinners = 0;
+    // The value in the channel will default to 0 if readers are configured to
+    // copy.
+    config_.num_pinners = channel->num_readers();
     config_.queue_size =
         channel_storage_duration.count() * channel->frequency();
     config_.message_data_size = channel->max_size();
@@ -209,13 +211,35 @@
 
   ~SimpleShmFetcher() {}
 
+  // Sets this object to pin or copy data, as configured in the channel.
+  void RetrieveData() {
+    if (channel_->read_method() == ReadMethod::PIN) {
+      PinDataOnFetch();
+    } else {
+      CopyDataOnFetch();
+    }
+  }
+
   // Sets this object to copy data out of the shared memory into a private
   // buffer when fetching.
   void CopyDataOnFetch() {
+    CHECK(!pin_data());
     data_storage_.reset(static_cast<char *>(
         malloc(channel_->max_size() + kChannelDataAlignment - 1)));
   }
 
+  // Sets this object to pin data in shared memory when fetching.
+  void PinDataOnFetch() {
+    CHECK(!copy_data());
+    auto maybe_pinner = lockless_queue_.MakePinner();
+    if (!maybe_pinner) {
+      LOG(FATAL) << "Failed to create reader on "
+                 << configuration::CleanedChannelToString(channel_)
+                 << ", too many readers.";
+    }
+    pinner_ = std::move(maybe_pinner.value());
+  }
+
   // Points the next message to fetch at the queue index which will be
   // populated next.
   void PointAtNextQueueIndex() {
@@ -295,6 +319,14 @@
         &context_.size, copy_buffer);
 
     if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
+      if (pin_data()) {
+        CHECK(pinner_->PinIndex(queue_index.index()))
+            << ": Got behind while reading and the last message was modified "
+               "out from under us while we tried to pin it. Don't get so far "
+               "behind on: "
+            << configuration::CleanedChannelToString(channel_);
+      }
+
       context_.queue_index = queue_index.index();
       if (context_.remote_queue_index == 0xffffffffu) {
         context_.remote_queue_index = context_.queue_index;
@@ -347,10 +379,14 @@
     if (copy_data()) {
       return data_storage_start();
     }
+    if (pin_data()) {
+      return static_cast<const char *>(pinner_->Data());
+    }
     return nullptr;
   }
 
   bool copy_data() const { return static_cast<bool>(data_storage_); }
+  bool pin_data() const { return static_cast<bool>(pinner_); }
 
   aos::ShmEventLoop *event_loop_;
   const Channel *const channel_;
@@ -363,6 +399,9 @@
   // This being empty indicates we're not going to copy data.
   std::unique_ptr<char, decltype(&free)> data_storage_{nullptr, &free};
 
+  // This being nullopt indicates we're not going to pin messages.
+  std::optional<ipc_lib::LocklessQueue::Pinner> pinner_;
+
   Context context_;
 };
 
@@ -371,7 +410,7 @@
   explicit ShmFetcher(ShmEventLoop *event_loop, const Channel *channel)
       : RawFetcher(event_loop, channel),
         simple_shm_fetcher_(event_loop, channel) {
-    simple_shm_fetcher_.CopyDataOnFetch();
+    simple_shm_fetcher_.RetrieveData();
   }
 
   ~ShmFetcher() { context_.data = nullptr; }
@@ -480,7 +519,7 @@
         event_(this),
         simple_shm_fetcher_(event_loop, channel) {
     if (copy_data) {
-      simple_shm_fetcher_.CopyDataOnFetch();
+      simple_shm_fetcher_.RetrieveData();
     }
   }
 
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index f0680a9..cbb28f9 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -69,15 +69,25 @@
   ::aos::ShmEventLoop *primary_event_loop_;
 };
 
-INSTANTIATE_TEST_CASE_P(ShmEventLoopTest, AbstractEventLoopTest,
-                        ::testing::Values([]() {
-                          return new ShmEventLoopTestFactory();
-                        }));
+INSTANTIATE_TEST_CASE_P(ShmEventLoopCopyTest, AbstractEventLoopTest,
+                        ::testing::Values(std::make_pair(
+                            []() { return new ShmEventLoopTestFactory(); },
+                            ReadMethod::COPY)));
 
-INSTANTIATE_TEST_CASE_P(ShmEventLoopDeathTest, AbstractEventLoopDeathTest,
-                        ::testing::Values([]() {
-                          return new ShmEventLoopTestFactory();
-                        }));
+INSTANTIATE_TEST_CASE_P(ShmEventLoopCopyDeathTest, AbstractEventLoopDeathTest,
+                        ::testing::Values(std::make_pair(
+                            []() { return new ShmEventLoopTestFactory(); },
+                            ReadMethod::COPY)));
+
+INSTANTIATE_TEST_CASE_P(ShmEventLoopPinTest, AbstractEventLoopTest,
+                        ::testing::Values(std::make_pair(
+                            []() { return new ShmEventLoopTestFactory(); },
+                            ReadMethod::PIN)));
+
+INSTANTIATE_TEST_CASE_P(ShmEventLoopPinDeathTest, AbstractEventLoopDeathTest,
+                        ::testing::Values(std::make_pair(
+                            []() { return new ShmEventLoopTestFactory(); },
+                            ReadMethod::PIN)));
 
 }  // namespace
 
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index bff04b9..ae053fb 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -184,13 +184,33 @@
     }
     ++sender_count_;
   }
+
   void CountSenderDestroyed() {
     --sender_count_;
     CHECK_GE(sender_count_, 0);
   }
 
  private:
-  void CheckBufferCount() { CHECK_LT(sender_count_, number_scratch_buffers()); }
+  void CheckBufferCount() {
+    int reader_count = 0;
+    if (channel()->read_method() == ReadMethod::PIN) {
+      reader_count = watchers_.size() + fetchers_.size();
+    }
+    CHECK_LT(reader_count + sender_count_, number_scratch_buffers());
+  }
+
+  void CheckReaderCount() {
+    if (channel()->read_method() != ReadMethod::PIN) {
+      return;
+    }
+    CheckBufferCount();
+    const int reader_count = watchers_.size() + fetchers_.size();
+    if (reader_count >= channel()->num_readers()) {
+      LOG(FATAL) << "Failed to create reader on "
+                 << configuration::CleanedChannelToString(channel())
+                 << ", too many readers.";
+    }
+  }
 
   const Channel *const channel_;
   EventScheduler *const scheduler_;
@@ -719,6 +739,7 @@
 }
 
 void SimulatedChannel::MakeRawWatcher(SimulatedWatcher *watcher) {
+  CheckReaderCount();
   watcher->SetSimulatedChannel(this);
   watchers_.emplace_back(watcher);
 }
@@ -730,6 +751,7 @@
 
 ::std::unique_ptr<RawFetcher> SimulatedChannel::MakeRawFetcher(
     EventLoop *event_loop) {
+  CheckReaderCount();
   ::std::unique_ptr<SimulatedFetcher> fetcher(
       new SimulatedFetcher(event_loop, this));
   fetchers_.push_back(fetcher.get());
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 513dc1b..78c0d44 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -51,15 +51,31 @@
   std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
 };
 
-INSTANTIATE_TEST_CASE_P(SimulatedEventLoopDeathTest, AbstractEventLoopDeathTest,
-                        ::testing::Values([]() {
-                          return new SimulatedEventLoopTestFactory();
-                        }));
+INSTANTIATE_TEST_CASE_P(SimulatedEventLoopCopyTest, AbstractEventLoopTest,
+                        ::testing::Values(std::make_tuple(
+                            []() {
+                              return new SimulatedEventLoopTestFactory();
+                            },
+                            ReadMethod::COPY)));
 
-INSTANTIATE_TEST_CASE_P(SimulatedEventLoopTest, AbstractEventLoopTest,
-                        ::testing::Values([]() {
-                          return new SimulatedEventLoopTestFactory();
-                        }));
+INSTANTIATE_TEST_CASE_P(
+    SimulatedEventLoopCopyDeathTest, AbstractEventLoopDeathTest,
+    ::testing::Values(
+        std::make_tuple([]() { return new SimulatedEventLoopTestFactory(); },
+                        ReadMethod::COPY)));
+
+INSTANTIATE_TEST_CASE_P(SimulatedEventLoopPinTest, AbstractEventLoopTest,
+                        ::testing::Values(std::make_tuple(
+                            []() {
+                              return new SimulatedEventLoopTestFactory();
+                            },
+                            ReadMethod::PIN)));
+
+INSTANTIATE_TEST_CASE_P(
+    SimulatedEventLoopPinDeathTest, AbstractEventLoopDeathTest,
+    ::testing::Values(
+        std::make_tuple([]() { return new SimulatedEventLoopTestFactory(); },
+                        ReadMethod::PIN)));
 
 // Test that creating an event and running the scheduler runs the event.
 TEST(EventSchedulerTest, ScheduleEvent) {