Add FetchIf and FetchNextIf to aos::EventLoop
This is mostly to support logging X seconds behind now so we can trigger
restarts cleanly. This lets us peek at the message header before
triggering a copy of the contents.
Change-Id: I41b048a6d1ed4f4951f89bdbb093fbbbf44e64a1
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 0559aae..ba2d426 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -319,6 +319,93 @@
}
}
+std::function<bool(const Context &)> MakeShouldFetch(
+ bool should_fetch, size_t *called_count = nullptr) {
+ return [should_fetch, called_count](const Context &) {
+ if (called_count != nullptr) {
+ (*called_count)++;
+ }
+ return should_fetch;
+ };
+}
+
+// Tests that a fetcher using FetchIf can fetch from a sender.
+TEST_P(AbstractEventLoopTest, FetchIfWithoutRun) {
+ auto loop1 = Make();
+ auto loop2 = Make();
+ auto loop3 = MakePrimary();
+
+ auto sender = loop1->MakeSender<TestMessage>("/test");
+
+ auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
+
+ for (const bool should_fetch : {true, false}) {
+ EXPECT_FALSE(fetcher.FetchIf(MakeShouldFetch(should_fetch)));
+ EXPECT_EQ(fetcher.get(), nullptr);
+
+ EXPECT_EQ(fetcher.context().monotonic_event_time,
+ monotonic_clock::min_time);
+ EXPECT_EQ(fetcher.context().monotonic_remote_time,
+ monotonic_clock::min_time);
+ EXPECT_EQ(fetcher.context().realtime_event_time, realtime_clock::min_time);
+ EXPECT_EQ(fetcher.context().realtime_remote_time, realtime_clock::min_time);
+ EXPECT_EQ(fetcher.context().source_boot_uuid, UUID::Zero());
+ EXPECT_EQ(fetcher.context().queue_index, 0xffffffffu);
+ EXPECT_EQ(fetcher.context().size, 0u);
+ EXPECT_EQ(fetcher.context().data, nullptr);
+ EXPECT_EQ(fetcher.context().buffer_index, -1);
+ }
+
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ msg.CheckOk(msg.Send(builder.Finish()));
+
+ // Make sure failing to fetch won't affect anything.
+ EXPECT_FALSE(fetcher.FetchIf(MakeShouldFetch(false)));
+ EXPECT_EQ(fetcher.get(), nullptr);
+
+ EXPECT_EQ(fetcher.context().monotonic_event_time, monotonic_clock::min_time);
+ EXPECT_EQ(fetcher.context().monotonic_remote_time, monotonic_clock::min_time);
+ EXPECT_EQ(fetcher.context().realtime_event_time, realtime_clock::min_time);
+ EXPECT_EQ(fetcher.context().realtime_remote_time, realtime_clock::min_time);
+ EXPECT_EQ(fetcher.context().source_boot_uuid, UUID::Zero());
+ EXPECT_EQ(fetcher.context().queue_index, 0xffffffffu);
+ EXPECT_EQ(fetcher.context().size, 0u);
+ EXPECT_EQ(fetcher.context().data, nullptr);
+ EXPECT_EQ(fetcher.context().buffer_index, -1);
+
+ // And now confirm we succeed and everything gets set right.
+ EXPECT_TRUE(fetcher.FetchIf(MakeShouldFetch(true)));
+ ASSERT_FALSE(fetcher.get() == nullptr);
+ EXPECT_EQ(fetcher.get()->value(), 200);
+
+ const chrono::milliseconds kEpsilon(100);
+
+ const aos::monotonic_clock::time_point monotonic_now = loop2->monotonic_now();
+ const aos::realtime_clock::time_point realtime_now = loop2->realtime_now();
+ EXPECT_EQ(fetcher.context().monotonic_event_time,
+ fetcher.context().monotonic_remote_time);
+ EXPECT_EQ(fetcher.context().realtime_event_time,
+ fetcher.context().realtime_remote_time);
+
+ EXPECT_GE(fetcher.context().monotonic_event_time, monotonic_now - kEpsilon);
+ EXPECT_LE(fetcher.context().monotonic_event_time, monotonic_now + kEpsilon);
+ EXPECT_GE(fetcher.context().realtime_event_time, realtime_now - kEpsilon);
+ EXPECT_LE(fetcher.context().realtime_event_time, realtime_now + kEpsilon);
+ EXPECT_EQ(fetcher.context().source_boot_uuid, loop2->boot_uuid());
+ EXPECT_EQ(fetcher.context().queue_index, 0x0u);
+ EXPECT_EQ(fetcher.context().size, 20u);
+ EXPECT_NE(fetcher.context().data, nullptr);
+ if (read_method() == ReadMethod::PIN) {
+ EXPECT_GE(fetcher.context().buffer_index, 0);
+ EXPECT_LT(fetcher.context().buffer_index,
+ loop2->NumberBuffers(fetcher.channel()));
+ } else {
+ EXPECT_EQ(fetcher.context().buffer_index, -1);
+ }
+}
+
// Tests that watcher will receive all messages sent if they are sent after
// initialization and before running.
TEST_P(AbstractEventLoopTest, DoubleSendAtStartup) {
@@ -345,16 +432,10 @@
}
loop2->OnRun([&]() {
- {
+ for (int i = 200; i < 202; ++i) {
aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(200);
- msg.CheckOk(msg.Send(builder.Finish()));
- }
- {
- aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
- TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(201);
+ builder.add_value(i);
msg.CheckOk(msg.Send(builder.Finish()));
}
});
@@ -374,16 +455,10 @@
::std::vector<int> values;
- {
+ for (int i = 200; i < 202; ++i) {
aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(200);
- msg.CheckOk(msg.Send(builder.Finish()));
- }
- {
- aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
- TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(201);
+ builder.add_value(i);
msg.CheckOk(msg.Send(builder.Finish()));
}
@@ -412,16 +487,10 @@
::std::vector<int> values;
- {
+ for (int i = 200; i < 202; ++i) {
aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(200);
- msg.CheckOk(msg.Send(builder.Finish()));
- }
- {
- aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
- TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(201);
+ builder.add_value(i);
msg.CheckOk(msg.Send(builder.Finish()));
}
@@ -451,16 +520,10 @@
::std::vector<int> values;
- {
+ for (int i = 200; i < 202; ++i) {
aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(200);
- msg.CheckOk(msg.Send(builder.Finish()));
- }
- {
- aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
- TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(201);
+ builder.add_value(i);
msg.CheckOk(msg.Send(builder.Finish()));
}
@@ -483,6 +546,41 @@
EXPECT_THAT(0, values.size());
}
+// Tests that FetchNextIf gets no messages sent before it is constructed.
+TEST_P(AbstractEventLoopTest, FetchNextIfAfterSend) {
+ auto loop1 = Make();
+ auto loop2 = MakePrimary();
+
+ auto sender = loop1->MakeSender<TestMessage>("/test");
+
+ ::std::vector<int> values;
+
+ for (int i = 200; i < 202; ++i) {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(i);
+ msg.CheckOk(msg.Send(builder.Finish()));
+ }
+
+ auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
+
+ // Add a timer to actually quit.
+ auto test_timer = loop2->AddTimer([&fetcher, &values, this]() {
+ while (fetcher.FetchNextIf(MakeShouldFetch(true))) {
+ values.push_back(fetcher.get()->value());
+ }
+ this->Exit();
+ });
+
+ loop2->OnRun([&test_timer, &loop2]() {
+ test_timer->Schedule(loop2->monotonic_now(),
+ ::std::chrono::milliseconds(100));
+ });
+
+ Run();
+ EXPECT_EQ(0, values.size());
+}
+
// Tests that Fetch returns the last message created before the loop was
// started.
TEST_P(AbstractEventLoopTest, FetchDataFromBeforeCreation) {
@@ -493,16 +591,10 @@
::std::vector<int> values;
- {
+ for (int i = 200; i < 202; ++i) {
aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(200);
- msg.CheckOk(msg.Send(builder.Finish()));
- }
- {
- aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
- TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(201);
+ builder.add_value(i);
msg.CheckOk(msg.Send(builder.Finish()));
}
@@ -529,6 +621,50 @@
EXPECT_THAT(values, ::testing::ElementsAreArray({201}));
}
+// Tests that FetchIf returns the last message created before the loop was
+// started.
+TEST_P(AbstractEventLoopTest, FetchIfDataFromBeforeCreation) {
+ auto loop1 = Make();
+ auto loop2 = MakePrimary();
+
+ auto sender = loop1->MakeSender<TestMessage>("/test");
+
+ ::std::vector<int> values;
+
+ for (int i = 200; i < 202; ++i) {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(i);
+ msg.CheckOk(msg.Send(builder.Finish()));
+ }
+
+ auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
+
+ // Add a timer to actually quit.
+ auto test_timer = loop2->AddTimer([&fetcher, &values, this]() {
+ if (fetcher.FetchIf(MakeShouldFetch(true))) {
+ values.push_back(fetcher.get()->value());
+ }
+
+ if (fetcher.FetchIf(MakeShouldFetch(false))) {
+ values.push_back(fetcher.get()->value());
+ }
+ // Do it again to make sure we don't double fetch.
+ if (fetcher.FetchIf(MakeShouldFetch(true))) {
+ values.push_back(fetcher.get()->value());
+ }
+ this->Exit();
+ });
+
+ loop2->OnRun([&test_timer, &loop2]() {
+ test_timer->Schedule(loop2->monotonic_now(),
+ ::std::chrono::milliseconds(100));
+ });
+
+ Run();
+ EXPECT_THAT(values, ::testing::ElementsAreArray({201}));
+}
+
// Tests that timer handler is enabled after setup (even if it is in the past)
// and is disabled after running
TEST_P(AbstractEventLoopTest, CheckTimerDisabled) {
@@ -636,16 +772,10 @@
::std::vector<int> values;
- {
+ for (int i = 200; i < 202; ++i) {
aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(200);
- msg.CheckOk(msg.Send(builder.Finish()));
- }
- {
- aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
- TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(201);
+ builder.add_value(i);
msg.CheckOk(msg.Send(builder.Finish()));
}
@@ -657,22 +787,10 @@
values.push_back(fetcher.get()->value());
}
- {
+ for (int i = 202; i < 205; ++i) {
aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(202);
- msg.CheckOk(msg.Send(builder.Finish()));
- }
- {
- aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
- TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(203);
- msg.CheckOk(msg.Send(builder.Finish()));
- }
- {
- aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
- TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(204);
+ builder.add_value(i);
msg.CheckOk(msg.Send(builder.Finish()));
}
@@ -696,6 +814,62 @@
EXPECT_THAT(values, ::testing::ElementsAreArray({201, 202, 204}));
}
+// Tests that Fetch{If,} and FetchNext{If,} interleave as expected.
+TEST_P(AbstractEventLoopTest, FetchAndFetchNextIfTogether) {
+ auto loop1 = Make();
+ auto loop2 = MakePrimary();
+
+ auto sender = loop1->MakeSender<TestMessage>("/test");
+
+ ::std::vector<int> values;
+
+ for (int i = 200; i < 202; ++i) {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(i);
+ msg.CheckOk(msg.Send(builder.Finish()));
+ }
+
+ auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
+
+ // Add a timer to actually quit.
+ auto test_timer = loop2->AddTimer([&fetcher, &values, &sender, this]() {
+ if (fetcher.Fetch()) {
+ values.push_back(fetcher.get()->value());
+ }
+
+ for (int i = 202; i < 205; ++i) {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(i);
+ msg.CheckOk(msg.Send(builder.Finish()));
+ }
+
+ EXPECT_FALSE(fetcher.FetchNextIf(MakeShouldFetch(false)));
+
+ if (fetcher.FetchNext()) {
+ values.push_back(fetcher.get()->value());
+ }
+
+ EXPECT_FALSE(fetcher.FetchNextIf(MakeShouldFetch(false)));
+ EXPECT_FALSE(fetcher.FetchIf(MakeShouldFetch(false)));
+
+ if (fetcher.FetchIf(MakeShouldFetch(true))) {
+ values.push_back(fetcher.get()->value());
+ }
+
+ this->Exit();
+ });
+
+ loop2->OnRun([&test_timer, &loop2]() {
+ test_timer->Schedule(loop2->monotonic_now(),
+ ::std::chrono::milliseconds(100));
+ });
+
+ Run();
+ EXPECT_THAT(values, ::testing::ElementsAreArray({201, 202, 204}));
+}
+
// Tests that FetchNext behaves correctly when we get two messages in the queue
// but don't consume the first until after the second has been sent.
TEST_P(AbstractEventLoopTest, FetchNextTest) {
@@ -732,6 +906,49 @@
EXPECT_EQ(200, fetcher.get()->value());
}
+// Tests that FetchNext behaves correctly when we get two messages in the queue
+// but don't consume the first until after the second has been sent.
+TEST_P(AbstractEventLoopTest, FetchNextIfTest) {
+ auto send_loop = Make();
+ auto fetch_loop = Make();
+ auto sender = send_loop->MakeSender<TestMessage>("/test");
+ Fetcher<TestMessage> fetcher = fetch_loop->MakeFetcher<TestMessage>("/test");
+
+ {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(100);
+ msg.CheckOk(msg.Send(builder.Finish()));
+ }
+
+ {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ msg.CheckOk(msg.Send(builder.Finish()));
+ }
+
+ size_t called_count = 0;
+ ASSERT_TRUE(fetcher.FetchNextIf(MakeShouldFetch(true, &called_count)));
+ ASSERT_NE(nullptr, fetcher.get());
+ EXPECT_EQ(100, fetcher.get()->value());
+ EXPECT_EQ(called_count, 1u);
+
+ ASSERT_FALSE(fetcher.FetchNextIf(MakeShouldFetch(false, &called_count)));
+ EXPECT_EQ(called_count, 2u);
+
+ ASSERT_TRUE(fetcher.FetchNextIf(MakeShouldFetch(true, &called_count)));
+ ASSERT_NE(nullptr, fetcher.get());
+ EXPECT_EQ(200, fetcher.get()->value());
+ EXPECT_EQ(called_count, 3u);
+
+ // When we run off the end of the queue, expect to still have the old message:
+ ASSERT_FALSE(fetcher.FetchNextIf(MakeShouldFetch(false, &called_count)));
+ EXPECT_EQ(called_count, 3u);
+ ASSERT_NE(nullptr, fetcher.get());
+ EXPECT_EQ(200, fetcher.get()->value());
+}
+
// Verify that a fetcher still holds its data, even after falling behind.
TEST_P(AbstractEventLoopTest, FetcherBehindData) {
auto send_loop = Make();