Add the ability to pin shared memory for reads
This can increase performance for some specific use cases with large
messages.
Change-Id: I38deaf3ce85a70c0ac11510757d193fd39ad29bb
diff --git a/aos/configuration.cc b/aos/configuration.cc
index 927e9d3..834ab5b 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -299,6 +299,11 @@
continue;
}
+ CHECK_EQ(c->read_method() == ReadMethod::PIN, c->num_readers() != 0)
+ << ": num_readers may be set if and only if read_method is PIN,"
+ " if you want 0 readers do not set PIN: "
+ << CleanedChannelToString(c);
+
// Attempt to insert the channel.
auto result = channels.insert(CopyFlatBuffer(c));
if (!result.second) {
diff --git a/aos/configuration.fbs b/aos/configuration.fbs
index 31d89e7..9a24c8a 100644
--- a/aos/configuration.fbs
+++ b/aos/configuration.fbs
@@ -42,6 +42,13 @@
time_to_live:uint = 0;
}
+enum ReadMethod : ubyte {
+ // Copy all the data out of shared memory into a local buffer for each reader.
+ COPY,
+ // Pin the data in shared memory and read directly from there.
+ PIN,
+}
+
// Table representing a channel. Channels are where data is published and
// subscribed from. The tuple of name, type is the identifying information.
table Channel {
@@ -78,6 +85,14 @@
// node responsible for logging it. Empty implies the node this connection
// is connecting to (i.e. name).
logger_nodes:[string];
+
+ // The way messages are read from shared memory for this channel.
+ read_method:ReadMethod = COPY;
+
+ // Sets the maximum number of senders on a channel.
+ //
+ // Currently, this must be set if and only if read_method is PIN.
+ num_readers:int;
}
// Table to support renaming channel names.
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 39a6a54..0212eaa 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -243,7 +243,7 @@
cc_test(
name = "shm_event_loop_test",
srcs = ["shm_event_loop_test.cc"],
- shard_count = 5,
+ shard_count = 16,
deps = [
":event_loop_param_test",
":shm_event_loop",
@@ -267,6 +267,7 @@
name = "simulated_event_loop_test",
srcs = ["simulated_event_loop_test.cc"],
data = ["multinode_pingpong_config.json"],
+ shard_count = 4,
deps = [
":event_loop_param_test",
":ping_lib",
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 8d0d0e2..606849a 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -560,6 +560,89 @@
EXPECT_EQ(200, fetcher.get()->value());
}
+// Verify that a fetcher still holds its data, even after falling behind.
+TEST_P(AbstractEventLoopTest, FetcherBehindData) {
+ auto send_loop = Make();
+ auto fetch_loop = Make();
+ auto sender = send_loop->MakeSender<TestMessage>("/test");
+ Fetcher<TestMessage> fetcher = fetch_loop->MakeFetcher<TestMessage>("/test");
+ {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(1);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ }
+ ASSERT_TRUE(fetcher.Fetch());
+ EXPECT_EQ(1, fetcher.get()->value());
+ for (int i = 0; i < 300; ++i) {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(i + 2);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ }
+ EXPECT_EQ(1, fetcher.get()->value());
+}
+
+// Try a bunch of orderings of operations with fetchers and senders. Verify that
+// all the fetchers have the correct data at each step.
+TEST_P(AbstractEventLoopTest, FetcherPermutations) {
+ for (int max_save = 0; max_save < 5; ++max_save) {
+ SCOPED_TRACE("max_save=" + std::to_string(max_save));
+
+ auto send_loop = Make();
+ auto fetch_loop = Make();
+ auto sender = send_loop->MakeSender<TestMessage>("/test");
+ const auto send_message = [&sender](int i) {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(i);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ };
+ std::vector<Fetcher<TestMessage>> fetchers;
+ for (int i = 0; i < 10; ++i) {
+ fetchers.emplace_back(fetch_loop->MakeFetcher<TestMessage>("/test"));
+ }
+ send_message(1);
+ for (auto &fetcher : fetchers) {
+ ASSERT_TRUE(fetcher.Fetch());
+ EXPECT_EQ(1, fetcher.get()->value());
+ }
+
+ for (int save = 1; save <= max_save; ++save) {
+ SCOPED_TRACE("save=" + std::to_string(save));
+ send_message(100 + save);
+ for (size_t i = 0; i < fetchers.size() - save; ++i) {
+ SCOPED_TRACE("fetcher=" + std::to_string(i));
+ ASSERT_TRUE(fetchers[i].Fetch());
+ EXPECT_EQ(100 + save, fetchers[i].get()->value());
+ }
+ for (size_t i = fetchers.size() - save; i < fetchers.size() - 1; ++i) {
+ SCOPED_TRACE("fetcher=" + std::to_string(i));
+ EXPECT_EQ(100 + (fetchers.size() - 1 - i), fetchers[i].get()->value());
+ }
+ EXPECT_EQ(1, fetchers.back().get()->value());
+ }
+
+ for (int i = 0; i < 300; ++i) {
+ send_message(200 + i);
+ }
+
+ for (size_t i = 0; i < fetchers.size() - max_save; ++i) {
+ SCOPED_TRACE("fetcher=" + std::to_string(i));
+ if (max_save > 0) {
+ EXPECT_EQ(100 + max_save, fetchers[i].get()->value());
+ } else {
+ EXPECT_EQ(1, fetchers[i].get()->value());
+ }
+ }
+ for (size_t i = fetchers.size() - max_save; i < fetchers.size() - 1; ++i) {
+ SCOPED_TRACE("fetcher=" + std::to_string(i));
+ EXPECT_EQ(100 + (fetchers.size() - 1 - i), fetchers[i].get()->value());
+ }
+ EXPECT_EQ(1, fetchers.back().get()->value());
+ }
+}
+
// Verify that making a fetcher and watcher for "/test" succeeds.
TEST_P(AbstractEventLoopTest, FetcherAndWatcher) {
auto loop = Make();
@@ -642,7 +725,80 @@
}
EXPECT_DEATH({ loop->MakeSender<TestMessage>("/test"); },
"Failed to create sender on \\{ \"name\": \"/test\", \"type\": "
- "\"aos.TestMessage\" \\}, too many senders.");
+ "\"aos.TestMessage\"[^}]*\\ }, too many senders.");
+}
+
+// Verify that creating too many fetchers fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyFetchers) {
+ if (read_method() != ReadMethod::PIN) {
+ // Other read methods don't limit the number of readers, so just skip this.
+ return;
+ }
+
+ auto loop = Make();
+ std::vector<aos::Fetcher<TestMessage>> fetchers;
+ for (int i = 0; i < 10; ++i) {
+ fetchers.emplace_back(loop->MakeFetcher<TestMessage>("/test"));
+ }
+ EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); },
+ "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+ "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
+}
+
+// Verify that creating too many fetchers, split between two event loops, fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyFetchersTwoLoops) {
+ if (read_method() != ReadMethod::PIN) {
+ // Other read methods don't limit the number of readers, so just skip this.
+ return;
+ }
+
+ auto loop = Make();
+ auto loop2 = Make();
+ std::vector<aos::Fetcher<TestMessage>> fetchers;
+ for (int i = 0; i < 5; ++i) {
+ fetchers.emplace_back(loop->MakeFetcher<TestMessage>("/test"));
+ fetchers.emplace_back(loop2->MakeFetcher<TestMessage>("/test"));
+ }
+ EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); },
+ "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+ "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
+}
+
+// Verify that creating too many watchers fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyWatchers) {
+ if (read_method() != ReadMethod::PIN) {
+ // Other read methods don't limit the number of readers, so just skip this.
+ return;
+ }
+
+ std::vector<std::unique_ptr<EventLoop>> loops;
+ for (int i = 0; i < 10; ++i) {
+ loops.emplace_back(Make());
+ loops.back()->MakeWatcher("/test", [](const TestMessage &) {});
+ }
+ EXPECT_DEATH({ Make()->MakeWatcher("/test", [](const TestMessage &) {}); },
+ "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+ "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
+}
+
+// Verify that creating too many watchers and fetchers combined fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyWatchersAndFetchers) {
+ if (read_method() != ReadMethod::PIN) {
+ // Other read methods don't limit the number of readers, so just skip this.
+ return;
+ }
+
+ auto loop = Make();
+ std::vector<aos::Fetcher<TestMessage>> fetchers;
+ std::vector<std::unique_ptr<EventLoop>> loops;
+ for (int i = 0; i < 5; ++i) {
+ fetchers.emplace_back(loop->MakeFetcher<TestMessage>("/test"));
+ loops.emplace_back(Make());
+ loops.back()->MakeWatcher("/test", [](const TestMessage &) {});
+ }
+ EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); },
+ "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+ "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
}
// Verify that we can't create a sender inside OnRun.
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index cbd5cd1..f158e02 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -15,31 +15,30 @@
class EventLoopTestFactory {
public:
EventLoopTestFactory()
- : flatbuffer_(JsonToFlatbuffer("{\n"
- " \"channels\": [ \n"
- " {\n"
- " \"name\": \"/aos\",\n"
- " \"type\": \"aos.logging.LogMessageFbs\"\n"
- " },\n"
- " {\n"
- " \"name\": \"/aos\",\n"
- " \"type\": \"aos.timing.Report\"\n"
- " },\n"
- " {\n"
- " \"name\": \"/test\",\n"
- " \"type\": \"aos.TestMessage\"\n"
- " },\n"
- " {\n"
- " \"name\": \"/test1\",\n"
- " \"type\": \"aos.TestMessage\"\n"
- " },\n"
- " {\n"
- " \"name\": \"/test2\",\n"
- " \"type\": \"aos.TestMessage\"\n"
- " }\n"
- " ]\n"
- "}\n",
- Configuration::MiniReflectTypeTable())) {}
+ : flatbuffer_(JsonToFlatbuffer<Configuration>(R"config({
+ "channels": [
+ {
+ "name": "/aos",
+ "type": "aos.logging.LogMessageFbs"
+ },
+ {
+ "name": "/aos",
+ "type": "aos.timing.Report"
+ },
+ {
+ "name": "/test",
+ "type": "aos.TestMessage"
+ },
+ {
+ "name": "/test1",
+ "type": "aos.TestMessage"
+ },
+ {
+ "name": "/test2",
+ "type": "aos.TestMessage"
+ }
+ ]
+})config")) {}
virtual ~EventLoopTestFactory() {}
@@ -58,8 +57,48 @@
// Advances time by sleeping. Can't be called from inside a loop.
virtual void SleepFor(::std::chrono::nanoseconds duration) = 0;
+ void PinReads() {
+ static const std::string kJson = R"config({
+ "channels": [
+ {
+ "name": "/aos",
+ "type": "aos.logging.LogMessageFbs",
+ "read_method": "PIN",
+ "num_readers": 10
+ },
+ {
+ "name": "/aos",
+ "type": "aos.timing.Report",
+ "read_method": "PIN",
+ "num_readers": 10
+ },
+ {
+ "name": "/test",
+ "type": "aos.TestMessage",
+ "read_method": "PIN",
+ "num_readers": 10
+ },
+ {
+ "name": "/test1",
+ "type": "aos.TestMessage",
+ "read_method": "PIN",
+ "num_readers": 10
+ },
+ {
+ "name": "/test2",
+ "type": "aos.TestMessage",
+ "read_method": "PIN",
+ "num_readers": 10
+ }
+ ]
+})config";
+
+ flatbuffer_ = FlatbufferDetachedBuffer<Configuration>(
+ JsonToFlatbuffer(kJson, Configuration::MiniReflectTypeTable()));
+ }
+
void EnableNodes(std::string_view my_node) {
- std::string json = R"config({
+ static const std::string kJson = R"config({
"channels": [
{
"name": "/aos/me",
@@ -127,7 +166,7 @@
})config";
flatbuffer_ = FlatbufferDetachedBuffer<Configuration>(
- JsonToFlatbuffer(json, Configuration::MiniReflectTypeTable()));
+ JsonToFlatbuffer(kJson, Configuration::MiniReflectTypeTable()));
my_node_ = configuration::GetNode(&flatbuffer_.message(), my_node);
}
@@ -143,9 +182,16 @@
};
class AbstractEventLoopTestBase
- : public ::testing::TestWithParam<std::function<EventLoopTestFactory *()>> {
+ : public ::testing::TestWithParam<
+ std::tuple<std::function<EventLoopTestFactory *()>, ReadMethod>> {
public:
- AbstractEventLoopTestBase() { factory_.reset(GetParam()()); }
+ AbstractEventLoopTestBase() : factory_(std::get<0>(GetParam())()) {
+ if (read_method() == ReadMethod::PIN) {
+ factory_->PinReads();
+ }
+ }
+
+ ReadMethod read_method() const { return std::get<1>(GetParam()); }
::std::unique_ptr<EventLoop> Make(std::string_view name = "") {
std::string name_copy(name);
@@ -182,11 +228,8 @@
end_timer->set_name("end");
}
- // You can implement all the usual fixture class members here.
- // To access the test parameter, call GetParam() from class
- // TestWithParam<T>.
private:
- ::std::unique_ptr<EventLoopTestFactory> factory_;
+ const ::std::unique_ptr<EventLoopTestFactory> factory_;
int event_loop_count_ = 0;
};
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 0aa82a1..b33fe98 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -96,7 +96,9 @@
config_.num_watchers = channel->num_watchers();
config_.num_senders = channel->num_senders();
- config_.num_pinners = 0;
+ // The value in the channel will default to 0 if readers are configured to
+ // copy.
+ config_.num_pinners = channel->num_readers();
config_.queue_size =
channel_storage_duration.count() * channel->frequency();
config_.message_data_size = channel->max_size();
@@ -209,13 +211,35 @@
~SimpleShmFetcher() {}
+ // Sets this object to pin or copy data, as configured in the channel.
+ void RetrieveData() {
+ if (channel_->read_method() == ReadMethod::PIN) {
+ PinDataOnFetch();
+ } else {
+ CopyDataOnFetch();
+ }
+ }
+
// Sets this object to copy data out of the shared memory into a private
// buffer when fetching.
void CopyDataOnFetch() {
+ CHECK(!pin_data());
data_storage_.reset(static_cast<char *>(
malloc(channel_->max_size() + kChannelDataAlignment - 1)));
}
+ // Sets this object to pin data in shared memory when fetching.
+ void PinDataOnFetch() {
+ CHECK(!copy_data());
+ auto maybe_pinner = lockless_queue_.MakePinner();
+ if (!maybe_pinner) {
+ LOG(FATAL) << "Failed to create reader on "
+ << configuration::CleanedChannelToString(channel_)
+ << ", too many readers.";
+ }
+ pinner_ = std::move(maybe_pinner.value());
+ }
+
// Points the next message to fetch at the queue index which will be
// populated next.
void PointAtNextQueueIndex() {
@@ -295,6 +319,14 @@
&context_.size, copy_buffer);
if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
+ if (pin_data()) {
+ CHECK(pinner_->PinIndex(queue_index.index()))
+ << ": Got behind while reading and the last message was modified "
+ "out from under us while we tried to pin it. Don't get so far "
+ "behind on: "
+ << configuration::CleanedChannelToString(channel_);
+ }
+
context_.queue_index = queue_index.index();
if (context_.remote_queue_index == 0xffffffffu) {
context_.remote_queue_index = context_.queue_index;
@@ -347,10 +379,14 @@
if (copy_data()) {
return data_storage_start();
}
+ if (pin_data()) {
+ return static_cast<const char *>(pinner_->Data());
+ }
return nullptr;
}
bool copy_data() const { return static_cast<bool>(data_storage_); }
+ bool pin_data() const { return static_cast<bool>(pinner_); }
aos::ShmEventLoop *event_loop_;
const Channel *const channel_;
@@ -363,6 +399,9 @@
// This being empty indicates we're not going to copy data.
std::unique_ptr<char, decltype(&free)> data_storage_{nullptr, &free};
+ // This being nullopt indicates we're not going to pin messages.
+ std::optional<ipc_lib::LocklessQueue::Pinner> pinner_;
+
Context context_;
};
@@ -371,7 +410,7 @@
explicit ShmFetcher(ShmEventLoop *event_loop, const Channel *channel)
: RawFetcher(event_loop, channel),
simple_shm_fetcher_(event_loop, channel) {
- simple_shm_fetcher_.CopyDataOnFetch();
+ simple_shm_fetcher_.RetrieveData();
}
~ShmFetcher() { context_.data = nullptr; }
@@ -480,7 +519,7 @@
event_(this),
simple_shm_fetcher_(event_loop, channel) {
if (copy_data) {
- simple_shm_fetcher_.CopyDataOnFetch();
+ simple_shm_fetcher_.RetrieveData();
}
}
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index f0680a9..cbb28f9 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -69,15 +69,25 @@
::aos::ShmEventLoop *primary_event_loop_;
};
-INSTANTIATE_TEST_CASE_P(ShmEventLoopTest, AbstractEventLoopTest,
- ::testing::Values([]() {
- return new ShmEventLoopTestFactory();
- }));
+INSTANTIATE_TEST_CASE_P(ShmEventLoopCopyTest, AbstractEventLoopTest,
+ ::testing::Values(std::make_pair(
+ []() { return new ShmEventLoopTestFactory(); },
+ ReadMethod::COPY)));
-INSTANTIATE_TEST_CASE_P(ShmEventLoopDeathTest, AbstractEventLoopDeathTest,
- ::testing::Values([]() {
- return new ShmEventLoopTestFactory();
- }));
+INSTANTIATE_TEST_CASE_P(ShmEventLoopCopyDeathTest, AbstractEventLoopDeathTest,
+ ::testing::Values(std::make_pair(
+ []() { return new ShmEventLoopTestFactory(); },
+ ReadMethod::COPY)));
+
+INSTANTIATE_TEST_CASE_P(ShmEventLoopPinTest, AbstractEventLoopTest,
+ ::testing::Values(std::make_pair(
+ []() { return new ShmEventLoopTestFactory(); },
+ ReadMethod::PIN)));
+
+INSTANTIATE_TEST_CASE_P(ShmEventLoopPinDeathTest, AbstractEventLoopDeathTest,
+ ::testing::Values(std::make_pair(
+ []() { return new ShmEventLoopTestFactory(); },
+ ReadMethod::PIN)));
} // namespace
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index bff04b9..ae053fb 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -184,13 +184,33 @@
}
++sender_count_;
}
+
void CountSenderDestroyed() {
--sender_count_;
CHECK_GE(sender_count_, 0);
}
private:
- void CheckBufferCount() { CHECK_LT(sender_count_, number_scratch_buffers()); }
+ void CheckBufferCount() {
+ int reader_count = 0;
+ if (channel()->read_method() == ReadMethod::PIN) {
+ reader_count = watchers_.size() + fetchers_.size();
+ }
+ CHECK_LT(reader_count + sender_count_, number_scratch_buffers());
+ }
+
+ void CheckReaderCount() {
+ if (channel()->read_method() != ReadMethod::PIN) {
+ return;
+ }
+ CheckBufferCount();
+ const int reader_count = watchers_.size() + fetchers_.size();
+ if (reader_count >= channel()->num_readers()) {
+ LOG(FATAL) << "Failed to create reader on "
+ << configuration::CleanedChannelToString(channel())
+ << ", too many readers.";
+ }
+ }
const Channel *const channel_;
EventScheduler *const scheduler_;
@@ -719,6 +739,7 @@
}
void SimulatedChannel::MakeRawWatcher(SimulatedWatcher *watcher) {
+ CheckReaderCount();
watcher->SetSimulatedChannel(this);
watchers_.emplace_back(watcher);
}
@@ -730,6 +751,7 @@
::std::unique_ptr<RawFetcher> SimulatedChannel::MakeRawFetcher(
EventLoop *event_loop) {
+ CheckReaderCount();
::std::unique_ptr<SimulatedFetcher> fetcher(
new SimulatedFetcher(event_loop, this));
fetchers_.push_back(fetcher.get());
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 513dc1b..78c0d44 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -51,15 +51,31 @@
std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
};
-INSTANTIATE_TEST_CASE_P(SimulatedEventLoopDeathTest, AbstractEventLoopDeathTest,
- ::testing::Values([]() {
- return new SimulatedEventLoopTestFactory();
- }));
+INSTANTIATE_TEST_CASE_P(SimulatedEventLoopCopyTest, AbstractEventLoopTest,
+ ::testing::Values(std::make_tuple(
+ []() {
+ return new SimulatedEventLoopTestFactory();
+ },
+ ReadMethod::COPY)));
-INSTANTIATE_TEST_CASE_P(SimulatedEventLoopTest, AbstractEventLoopTest,
- ::testing::Values([]() {
- return new SimulatedEventLoopTestFactory();
- }));
+INSTANTIATE_TEST_CASE_P(
+ SimulatedEventLoopCopyDeathTest, AbstractEventLoopDeathTest,
+ ::testing::Values(
+ std::make_tuple([]() { return new SimulatedEventLoopTestFactory(); },
+ ReadMethod::COPY)));
+
+INSTANTIATE_TEST_CASE_P(SimulatedEventLoopPinTest, AbstractEventLoopTest,
+ ::testing::Values(std::make_tuple(
+ []() {
+ return new SimulatedEventLoopTestFactory();
+ },
+ ReadMethod::PIN)));
+
+INSTANTIATE_TEST_CASE_P(
+ SimulatedEventLoopPinDeathTest, AbstractEventLoopDeathTest,
+ ::testing::Values(
+ std::make_tuple([]() { return new SimulatedEventLoopTestFactory(); },
+ ReadMethod::PIN)));
// Test that creating an event and running the scheduler runs the event.
TEST(EventSchedulerTest, ScheduleEvent) {