Add the ability to pin shared memory for reads

This can increase performance for some specific use cases with large
messages.

Change-Id: I38deaf3ce85a70c0ac11510757d193fd39ad29bb
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 8d0d0e2..606849a 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -560,6 +560,89 @@
   EXPECT_EQ(200, fetcher.get()->value());
 }
 
+// Verify that a fetcher still holds its data, even after falling behind.
+TEST_P(AbstractEventLoopTest, FetcherBehindData) {
+  auto send_loop = Make();
+  auto fetch_loop = Make();
+  auto sender = send_loop->MakeSender<TestMessage>("/test");
+  Fetcher<TestMessage> fetcher = fetch_loop->MakeFetcher<TestMessage>("/test");
+  {
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(1);
+    ASSERT_TRUE(msg.Send(builder.Finish()));
+  }
+  ASSERT_TRUE(fetcher.Fetch());
+  EXPECT_EQ(1, fetcher.get()->value());
+  for (int i = 0; i < 300; ++i) {
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(i + 2);
+    ASSERT_TRUE(msg.Send(builder.Finish()));
+  }
+  EXPECT_EQ(1, fetcher.get()->value());
+}
+
+// Try a bunch of orderings of operations with fetchers and senders. Verify that
+// all the fetchers have the correct data at each step.
+TEST_P(AbstractEventLoopTest, FetcherPermutations) {
+  for (int max_save = 0; max_save < 5; ++max_save) {
+    SCOPED_TRACE("max_save=" + std::to_string(max_save));
+
+    auto send_loop = Make();
+    auto fetch_loop = Make();
+    auto sender = send_loop->MakeSender<TestMessage>("/test");
+    const auto send_message = [&sender](int i) {
+      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+      builder.add_value(i);
+      ASSERT_TRUE(msg.Send(builder.Finish()));
+    };
+    std::vector<Fetcher<TestMessage>> fetchers;
+    for (int i = 0; i < 10; ++i) {
+      fetchers.emplace_back(fetch_loop->MakeFetcher<TestMessage>("/test"));
+    }
+    send_message(1);
+    for (auto &fetcher : fetchers) {
+      ASSERT_TRUE(fetcher.Fetch());
+      EXPECT_EQ(1, fetcher.get()->value());
+    }
+
+    for (int save = 1; save <= max_save; ++save) {
+      SCOPED_TRACE("save=" + std::to_string(save));
+      send_message(100 + save);
+      for (size_t i = 0; i < fetchers.size() - save; ++i) {
+        SCOPED_TRACE("fetcher=" + std::to_string(i));
+        ASSERT_TRUE(fetchers[i].Fetch());
+        EXPECT_EQ(100 + save, fetchers[i].get()->value());
+      }
+      for (size_t i = fetchers.size() - save; i < fetchers.size() - 1; ++i) {
+        SCOPED_TRACE("fetcher=" + std::to_string(i));
+        EXPECT_EQ(100 + (fetchers.size() - 1 - i), fetchers[i].get()->value());
+      }
+      EXPECT_EQ(1, fetchers.back().get()->value());
+    }
+
+    for (int i = 0; i < 300; ++i) {
+      send_message(200 + i);
+    }
+
+    for (size_t i = 0; i < fetchers.size() - max_save; ++i) {
+      SCOPED_TRACE("fetcher=" + std::to_string(i));
+      if (max_save > 0) {
+        EXPECT_EQ(100 + max_save, fetchers[i].get()->value());
+      } else {
+        EXPECT_EQ(1, fetchers[i].get()->value());
+      }
+    }
+    for (size_t i = fetchers.size() - max_save; i < fetchers.size() - 1; ++i) {
+      SCOPED_TRACE("fetcher=" + std::to_string(i));
+      EXPECT_EQ(100 + (fetchers.size() - 1 - i), fetchers[i].get()->value());
+    }
+    EXPECT_EQ(1, fetchers.back().get()->value());
+  }
+}
+
 // Verify that making a fetcher and watcher for "/test" succeeds.
 TEST_P(AbstractEventLoopTest, FetcherAndWatcher) {
   auto loop = Make();
@@ -642,7 +725,80 @@
   }
   EXPECT_DEATH({ loop->MakeSender<TestMessage>("/test"); },
                "Failed to create sender on \\{ \"name\": \"/test\", \"type\": "
-               "\"aos.TestMessage\" \\}, too many senders.");
+               "\"aos.TestMessage\"[^}]*\\ }, too many senders.");
+}
+
+// Verify that creating too many fetchers fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyFetchers) {
+  if (read_method() != ReadMethod::PIN) {
+    // Other read methods don't limit the number of readers, so just skip this.
+    return;
+  }
+
+  auto loop = Make();
+  std::vector<aos::Fetcher<TestMessage>> fetchers;
+  for (int i = 0; i < 10; ++i) {
+    fetchers.emplace_back(loop->MakeFetcher<TestMessage>("/test"));
+  }
+  EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); },
+               "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+               "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
+}
+
+// Verify that creating too many fetchers, split between two event loops, fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyFetchersTwoLoops) {
+  if (read_method() != ReadMethod::PIN) {
+    // Other read methods don't limit the number of readers, so just skip this.
+    return;
+  }
+
+  auto loop = Make();
+  auto loop2 = Make();
+  std::vector<aos::Fetcher<TestMessage>> fetchers;
+  for (int i = 0; i < 5; ++i) {
+    fetchers.emplace_back(loop->MakeFetcher<TestMessage>("/test"));
+    fetchers.emplace_back(loop2->MakeFetcher<TestMessage>("/test"));
+  }
+  EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); },
+               "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+               "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
+}
+
+// Verify that creating too many watchers fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyWatchers) {
+  if (read_method() != ReadMethod::PIN) {
+    // Other read methods don't limit the number of readers, so just skip this.
+    return;
+  }
+
+  std::vector<std::unique_ptr<EventLoop>> loops;
+  for (int i = 0; i < 10; ++i) {
+    loops.emplace_back(Make());
+    loops.back()->MakeWatcher("/test", [](const TestMessage &) {});
+  }
+  EXPECT_DEATH({ Make()->MakeWatcher("/test", [](const TestMessage &) {}); },
+               "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+               "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
+}
+
+// Verify that creating too many watchers and fetchers combined fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyWatchersAndFetchers) {
+  if (read_method() != ReadMethod::PIN) {
+    // Other read methods don't limit the number of readers, so just skip this.
+    return;
+  }
+
+  auto loop = Make();
+  std::vector<aos::Fetcher<TestMessage>> fetchers;
+  std::vector<std::unique_ptr<EventLoop>> loops;
+  for (int i = 0; i < 5; ++i) {
+    fetchers.emplace_back(loop->MakeFetcher<TestMessage>("/test"));
+    loops.emplace_back(Make());
+    loops.back()->MakeWatcher("/test", [](const TestMessage &) {});
+  }
+  EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); },
+               "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+               "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
 }
 
 // Verify that we can't create a sender inside OnRun.