Add the ability to pin shared memory for reads
This can increase performance for some specific use cases with large
messages.
Change-Id: I38deaf3ce85a70c0ac11510757d193fd39ad29bb
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 8d0d0e2..606849a 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -560,6 +560,89 @@
EXPECT_EQ(200, fetcher.get()->value());
}
+// Verify that a fetcher still holds its data, even after falling behind.
+TEST_P(AbstractEventLoopTest, FetcherBehindData) {
+ auto send_loop = Make();
+ auto fetch_loop = Make();
+ auto sender = send_loop->MakeSender<TestMessage>("/test");
+ Fetcher<TestMessage> fetcher = fetch_loop->MakeFetcher<TestMessage>("/test");
+ {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(1);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ }
+ ASSERT_TRUE(fetcher.Fetch());
+ EXPECT_EQ(1, fetcher.get()->value());
+ for (int i = 0; i < 300; ++i) {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(i + 2);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ }
+ EXPECT_EQ(1, fetcher.get()->value());
+}
+
+// Try a bunch of orderings of operations with fetchers and senders. Verify that
+// all the fetchers have the correct data at each step.
+TEST_P(AbstractEventLoopTest, FetcherPermutations) {
+ for (int max_save = 0; max_save < 5; ++max_save) {
+ SCOPED_TRACE("max_save=" + std::to_string(max_save));
+
+ auto send_loop = Make();
+ auto fetch_loop = Make();
+ auto sender = send_loop->MakeSender<TestMessage>("/test");
+ const auto send_message = [&sender](int i) {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(i);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ };
+ std::vector<Fetcher<TestMessage>> fetchers;
+ for (int i = 0; i < 10; ++i) {
+ fetchers.emplace_back(fetch_loop->MakeFetcher<TestMessage>("/test"));
+ }
+ send_message(1);
+ for (auto &fetcher : fetchers) {
+ ASSERT_TRUE(fetcher.Fetch());
+ EXPECT_EQ(1, fetcher.get()->value());
+ }
+
+ for (int save = 1; save <= max_save; ++save) {
+ SCOPED_TRACE("save=" + std::to_string(save));
+ send_message(100 + save);
+ for (size_t i = 0; i < fetchers.size() - save; ++i) {
+ SCOPED_TRACE("fetcher=" + std::to_string(i));
+ ASSERT_TRUE(fetchers[i].Fetch());
+ EXPECT_EQ(100 + save, fetchers[i].get()->value());
+ }
+ for (size_t i = fetchers.size() - save; i < fetchers.size() - 1; ++i) {
+ SCOPED_TRACE("fetcher=" + std::to_string(i));
+ EXPECT_EQ(100 + (fetchers.size() - 1 - i), fetchers[i].get()->value());
+ }
+ EXPECT_EQ(1, fetchers.back().get()->value());
+ }
+
+ for (int i = 0; i < 300; ++i) {
+ send_message(200 + i);
+ }
+
+ for (size_t i = 0; i < fetchers.size() - max_save; ++i) {
+ SCOPED_TRACE("fetcher=" + std::to_string(i));
+ if (max_save > 0) {
+ EXPECT_EQ(100 + max_save, fetchers[i].get()->value());
+ } else {
+ EXPECT_EQ(1, fetchers[i].get()->value());
+ }
+ }
+ for (size_t i = fetchers.size() - max_save; i < fetchers.size() - 1; ++i) {
+ SCOPED_TRACE("fetcher=" + std::to_string(i));
+ EXPECT_EQ(100 + (fetchers.size() - 1 - i), fetchers[i].get()->value());
+ }
+ EXPECT_EQ(1, fetchers.back().get()->value());
+ }
+}
+
// Verify that making a fetcher and watcher for "/test" succeeds.
TEST_P(AbstractEventLoopTest, FetcherAndWatcher) {
auto loop = Make();
@@ -642,7 +725,80 @@
}
EXPECT_DEATH({ loop->MakeSender<TestMessage>("/test"); },
"Failed to create sender on \\{ \"name\": \"/test\", \"type\": "
- "\"aos.TestMessage\" \\}, too many senders.");
+ "\"aos.TestMessage\"[^}]*\\ }, too many senders.");
+}
+
+// Verify that creating too many fetchers fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyFetchers) {
+ if (read_method() != ReadMethod::PIN) {
+ // Other read methods don't limit the number of readers, so just skip this.
+ return;
+ }
+
+ auto loop = Make();
+ std::vector<aos::Fetcher<TestMessage>> fetchers;
+ for (int i = 0; i < 10; ++i) {
+ fetchers.emplace_back(loop->MakeFetcher<TestMessage>("/test"));
+ }
+ EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); },
+ "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+ "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
+}
+
+// Verify that creating too many fetchers, split between two event loops, fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyFetchersTwoLoops) {
+ if (read_method() != ReadMethod::PIN) {
+ // Other read methods don't limit the number of readers, so just skip this.
+ return;
+ }
+
+ auto loop = Make();
+ auto loop2 = Make();
+ std::vector<aos::Fetcher<TestMessage>> fetchers;
+ for (int i = 0; i < 5; ++i) {
+ fetchers.emplace_back(loop->MakeFetcher<TestMessage>("/test"));
+ fetchers.emplace_back(loop2->MakeFetcher<TestMessage>("/test"));
+ }
+ EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); },
+ "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+ "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
+}
+
+// Verify that creating too many watchers fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyWatchers) {
+ if (read_method() != ReadMethod::PIN) {
+ // Other read methods don't limit the number of readers, so just skip this.
+ return;
+ }
+
+ std::vector<std::unique_ptr<EventLoop>> loops;
+ for (int i = 0; i < 10; ++i) {
+ loops.emplace_back(Make());
+ loops.back()->MakeWatcher("/test", [](const TestMessage &) {});
+ }
+ EXPECT_DEATH({ Make()->MakeWatcher("/test", [](const TestMessage &) {}); },
+ "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+ "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
+}
+
+// Verify that creating too many watchers and fetchers combined fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyWatchersAndFetchers) {
+ if (read_method() != ReadMethod::PIN) {
+ // Other read methods don't limit the number of readers, so just skip this.
+ return;
+ }
+
+ auto loop = Make();
+ std::vector<aos::Fetcher<TestMessage>> fetchers;
+ std::vector<std::unique_ptr<EventLoop>> loops;
+ for (int i = 0; i < 5; ++i) {
+ fetchers.emplace_back(loop->MakeFetcher<TestMessage>("/test"));
+ loops.emplace_back(Make());
+ loops.back()->MakeWatcher("/test", [](const TestMessage &) {});
+ }
+ EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); },
+ "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+ "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
}
// Verify that we can't create a sender inside OnRun.