Add FetchIf and FetchNextIf to aos::EventLoop
This is mostly to support logging X seconds behind now so we can trigger
restarts cleanly. This lets us peek at the message header before
triggering a copy of the contents.
Change-Id: I41b048a6d1ed4f4951f89bdbb093fbbbf44e64a1
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 174df38..a09c3d8 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -471,7 +471,7 @@
name = "shm_event_loop_test",
srcs = ["shm_event_loop_test.cc"],
flaky = True,
- shard_count = 24,
+ shard_count = 50,
target_compatible_with = ["@platforms//os:linux"],
deps = [
":event_loop_param_test",
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 17e3e00..3926e26 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -48,9 +48,16 @@
// Fetches the next message in the queue without blocking. Returns true if
// there was a new message and we got it.
bool FetchNext();
+ // Fetches the next message if there is one, and the provided function returns
+ // true. The data and buffer_index are the only pieces of the Context which
+ // are zeroed out. The function must be valid.
+ bool FetchNextIf(std::function<bool(const Context &context)> fn);
// Fetches the latest message without blocking.
bool Fetch();
+ // Fetches the latest message conditionally without blocking. fn must be
+ // valid.
+ bool FetchIf(std::function<bool(const Context &context)> fn);
// Returns the channel this fetcher uses.
const Channel *channel() const { return channel_; }
@@ -67,7 +74,11 @@
friend class EventLoop;
// Implementation
virtual std::pair<bool, monotonic_clock::time_point> DoFetchNext() = 0;
+ virtual std::pair<bool, monotonic_clock::time_point> DoFetchNextIf(
+ std::function<bool(const Context &)> fn) = 0;
virtual std::pair<bool, monotonic_clock::time_point> DoFetch() = 0;
+ virtual std::pair<bool, monotonic_clock::time_point> DoFetchIf(
+ std::function<bool(const Context &)> fn) = 0;
EventLoop *const event_loop_;
const Channel *const channel_;
@@ -241,6 +252,18 @@
return result;
}
+ // Fetches the next message if there is one, and the provided function returns
+ // true. The data and buffer_index are the only pieces of the Context which
+ // are zeroed out. The function must be valid.
+ bool FetchNextIf(std::function<bool(const Context &)> fn) {
+ const bool result = CHECK_NOTNULL(fetcher_)->FetchNextIf(std::move(fn));
+ if (result) {
+ CheckChannelDataAlignment(fetcher_->context().data,
+ fetcher_->context().size);
+ }
+ return result;
+ }
+
// Fetches the most recent message. Returns true if it fetched a new message.
// This will return the latest message regardless of if it was sent before or
// after the fetcher was created.
@@ -253,6 +276,18 @@
return result;
}
+ // Fetches the most recent message conditionally. Returns true if it fetched a
+ // new message. This will return the latest message regardless of if it was
+ // sent before or after the fetcher was created. The function must be valid.
+ bool FetchIf(std::function<bool(const Context &)> fn) {
+ const bool result = CHECK_NOTNULL(fetcher_)->FetchIf(std::move(fn));
+ if (result) {
+ CheckChannelDataAlignment(fetcher_->context().data,
+ fetcher_->context().size);
+ }
+ return result;
+ }
+
// Returns a pointer to the contained flatbuffer, or nullptr if there is no
// available message.
const T *get() const {
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 0559aae..ba2d426 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -319,6 +319,93 @@
}
}
+std::function<bool(const Context &)> MakeShouldFetch(
+ bool should_fetch, size_t *called_count = nullptr) {
+ return [should_fetch, called_count](const Context &) {
+ if (called_count != nullptr) {
+ (*called_count)++;
+ }
+ return should_fetch;
+ };
+}
+
+// Tests that a fetcher using FetchIf can fetch from a sender.
+TEST_P(AbstractEventLoopTest, FetchIfWithoutRun) {
+ auto loop1 = Make();
+ auto loop2 = Make();
+ auto loop3 = MakePrimary();
+
+ auto sender = loop1->MakeSender<TestMessage>("/test");
+
+ auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
+
+ for (const bool should_fetch : {true, false}) {
+ EXPECT_FALSE(fetcher.FetchIf(MakeShouldFetch(should_fetch)));
+ EXPECT_EQ(fetcher.get(), nullptr);
+
+ EXPECT_EQ(fetcher.context().monotonic_event_time,
+ monotonic_clock::min_time);
+ EXPECT_EQ(fetcher.context().monotonic_remote_time,
+ monotonic_clock::min_time);
+ EXPECT_EQ(fetcher.context().realtime_event_time, realtime_clock::min_time);
+ EXPECT_EQ(fetcher.context().realtime_remote_time, realtime_clock::min_time);
+ EXPECT_EQ(fetcher.context().source_boot_uuid, UUID::Zero());
+ EXPECT_EQ(fetcher.context().queue_index, 0xffffffffu);
+ EXPECT_EQ(fetcher.context().size, 0u);
+ EXPECT_EQ(fetcher.context().data, nullptr);
+ EXPECT_EQ(fetcher.context().buffer_index, -1);
+ }
+
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ msg.CheckOk(msg.Send(builder.Finish()));
+
+ // Make sure failing to fetch won't affect anything.
+ EXPECT_FALSE(fetcher.FetchIf(MakeShouldFetch(false)));
+ EXPECT_EQ(fetcher.get(), nullptr);
+
+ EXPECT_EQ(fetcher.context().monotonic_event_time, monotonic_clock::min_time);
+ EXPECT_EQ(fetcher.context().monotonic_remote_time, monotonic_clock::min_time);
+ EXPECT_EQ(fetcher.context().realtime_event_time, realtime_clock::min_time);
+ EXPECT_EQ(fetcher.context().realtime_remote_time, realtime_clock::min_time);
+ EXPECT_EQ(fetcher.context().source_boot_uuid, UUID::Zero());
+ EXPECT_EQ(fetcher.context().queue_index, 0xffffffffu);
+ EXPECT_EQ(fetcher.context().size, 0u);
+ EXPECT_EQ(fetcher.context().data, nullptr);
+ EXPECT_EQ(fetcher.context().buffer_index, -1);
+
+ // And now confirm we succeed and everything gets set right.
+ EXPECT_TRUE(fetcher.FetchIf(MakeShouldFetch(true)));
+ ASSERT_FALSE(fetcher.get() == nullptr);
+ EXPECT_EQ(fetcher.get()->value(), 200);
+
+ const chrono::milliseconds kEpsilon(100);
+
+ const aos::monotonic_clock::time_point monotonic_now = loop2->monotonic_now();
+ const aos::realtime_clock::time_point realtime_now = loop2->realtime_now();
+ EXPECT_EQ(fetcher.context().monotonic_event_time,
+ fetcher.context().monotonic_remote_time);
+ EXPECT_EQ(fetcher.context().realtime_event_time,
+ fetcher.context().realtime_remote_time);
+
+ EXPECT_GE(fetcher.context().monotonic_event_time, monotonic_now - kEpsilon);
+ EXPECT_LE(fetcher.context().monotonic_event_time, monotonic_now + kEpsilon);
+ EXPECT_GE(fetcher.context().realtime_event_time, realtime_now - kEpsilon);
+ EXPECT_LE(fetcher.context().realtime_event_time, realtime_now + kEpsilon);
+ EXPECT_EQ(fetcher.context().source_boot_uuid, loop2->boot_uuid());
+ EXPECT_EQ(fetcher.context().queue_index, 0x0u);
+ EXPECT_EQ(fetcher.context().size, 20u);
+ EXPECT_NE(fetcher.context().data, nullptr);
+ if (read_method() == ReadMethod::PIN) {
+ EXPECT_GE(fetcher.context().buffer_index, 0);
+ EXPECT_LT(fetcher.context().buffer_index,
+ loop2->NumberBuffers(fetcher.channel()));
+ } else {
+ EXPECT_EQ(fetcher.context().buffer_index, -1);
+ }
+}
+
// Tests that watcher will receive all messages sent if they are sent after
// initialization and before running.
TEST_P(AbstractEventLoopTest, DoubleSendAtStartup) {
@@ -345,16 +432,10 @@
}
loop2->OnRun([&]() {
- {
+ for (int i = 200; i < 202; ++i) {
aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(200);
- msg.CheckOk(msg.Send(builder.Finish()));
- }
- {
- aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
- TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(201);
+ builder.add_value(i);
msg.CheckOk(msg.Send(builder.Finish()));
}
});
@@ -374,16 +455,10 @@
::std::vector<int> values;
- {
+ for (int i = 200; i < 202; ++i) {
aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(200);
- msg.CheckOk(msg.Send(builder.Finish()));
- }
- {
- aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
- TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(201);
+ builder.add_value(i);
msg.CheckOk(msg.Send(builder.Finish()));
}
@@ -412,16 +487,10 @@
::std::vector<int> values;
- {
+ for (int i = 200; i < 202; ++i) {
aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(200);
- msg.CheckOk(msg.Send(builder.Finish()));
- }
- {
- aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
- TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(201);
+ builder.add_value(i);
msg.CheckOk(msg.Send(builder.Finish()));
}
@@ -451,16 +520,10 @@
::std::vector<int> values;
- {
+ for (int i = 200; i < 202; ++i) {
aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(200);
- msg.CheckOk(msg.Send(builder.Finish()));
- }
- {
- aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
- TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(201);
+ builder.add_value(i);
msg.CheckOk(msg.Send(builder.Finish()));
}
@@ -483,6 +546,41 @@
EXPECT_THAT(0, values.size());
}
+// Tests that FetchNextIf gets no messages sent before it is constructed.
+TEST_P(AbstractEventLoopTest, FetchNextIfAfterSend) {
+ auto loop1 = Make();
+ auto loop2 = MakePrimary();
+
+ auto sender = loop1->MakeSender<TestMessage>("/test");
+
+ ::std::vector<int> values;
+
+ for (int i = 200; i < 202; ++i) {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(i);
+ msg.CheckOk(msg.Send(builder.Finish()));
+ }
+
+ auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
+
+ // Add a timer to actually quit.
+ auto test_timer = loop2->AddTimer([&fetcher, &values, this]() {
+ while (fetcher.FetchNextIf(MakeShouldFetch(true))) {
+ values.push_back(fetcher.get()->value());
+ }
+ this->Exit();
+ });
+
+ loop2->OnRun([&test_timer, &loop2]() {
+ test_timer->Schedule(loop2->monotonic_now(),
+ ::std::chrono::milliseconds(100));
+ });
+
+ Run();
+ EXPECT_EQ(0, values.size());
+}
+
// Tests that Fetch returns the last message created before the loop was
// started.
TEST_P(AbstractEventLoopTest, FetchDataFromBeforeCreation) {
@@ -493,16 +591,10 @@
::std::vector<int> values;
- {
+ for (int i = 200; i < 202; ++i) {
aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(200);
- msg.CheckOk(msg.Send(builder.Finish()));
- }
- {
- aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
- TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(201);
+ builder.add_value(i);
msg.CheckOk(msg.Send(builder.Finish()));
}
@@ -529,6 +621,50 @@
EXPECT_THAT(values, ::testing::ElementsAreArray({201}));
}
+// Tests that FetchIf returns the last message created before the loop was
+// started.
+TEST_P(AbstractEventLoopTest, FetchIfDataFromBeforeCreation) {
+ auto loop1 = Make();
+ auto loop2 = MakePrimary();
+
+ auto sender = loop1->MakeSender<TestMessage>("/test");
+
+ ::std::vector<int> values;
+
+ for (int i = 200; i < 202; ++i) {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(i);
+ msg.CheckOk(msg.Send(builder.Finish()));
+ }
+
+ auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
+
+ // Add a timer to actually quit.
+ auto test_timer = loop2->AddTimer([&fetcher, &values, this]() {
+ if (fetcher.FetchIf(MakeShouldFetch(true))) {
+ values.push_back(fetcher.get()->value());
+ }
+
+ if (fetcher.FetchIf(MakeShouldFetch(false))) {
+ values.push_back(fetcher.get()->value());
+ }
+ // Do it again to make sure we don't double fetch.
+ if (fetcher.FetchIf(MakeShouldFetch(true))) {
+ values.push_back(fetcher.get()->value());
+ }
+ this->Exit();
+ });
+
+ loop2->OnRun([&test_timer, &loop2]() {
+ test_timer->Schedule(loop2->monotonic_now(),
+ ::std::chrono::milliseconds(100));
+ });
+
+ Run();
+ EXPECT_THAT(values, ::testing::ElementsAreArray({201}));
+}
+
// Tests that timer handler is enabled after setup (even if it is in the past)
// and is disabled after running
TEST_P(AbstractEventLoopTest, CheckTimerDisabled) {
@@ -636,16 +772,10 @@
::std::vector<int> values;
- {
+ for (int i = 200; i < 202; ++i) {
aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(200);
- msg.CheckOk(msg.Send(builder.Finish()));
- }
- {
- aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
- TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(201);
+ builder.add_value(i);
msg.CheckOk(msg.Send(builder.Finish()));
}
@@ -657,22 +787,10 @@
values.push_back(fetcher.get()->value());
}
- {
+ for (int i = 202; i < 205; ++i) {
aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(202);
- msg.CheckOk(msg.Send(builder.Finish()));
- }
- {
- aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
- TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(203);
- msg.CheckOk(msg.Send(builder.Finish()));
- }
- {
- aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
- TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
- builder.add_value(204);
+ builder.add_value(i);
msg.CheckOk(msg.Send(builder.Finish()));
}
@@ -696,6 +814,62 @@
EXPECT_THAT(values, ::testing::ElementsAreArray({201, 202, 204}));
}
+// Tests that Fetch{If,} and FetchNext{If,} interleave as expected.
+TEST_P(AbstractEventLoopTest, FetchAndFetchNextIfTogether) {
+ auto loop1 = Make();
+ auto loop2 = MakePrimary();
+
+ auto sender = loop1->MakeSender<TestMessage>("/test");
+
+ ::std::vector<int> values;
+
+ for (int i = 200; i < 202; ++i) {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(i);
+ msg.CheckOk(msg.Send(builder.Finish()));
+ }
+
+ auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
+
+ // Add a timer to actually quit.
+ auto test_timer = loop2->AddTimer([&fetcher, &values, &sender, this]() {
+ if (fetcher.Fetch()) {
+ values.push_back(fetcher.get()->value());
+ }
+
+ for (int i = 202; i < 205; ++i) {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(i);
+ msg.CheckOk(msg.Send(builder.Finish()));
+ }
+
+ EXPECT_FALSE(fetcher.FetchNextIf(MakeShouldFetch(false)));
+
+ if (fetcher.FetchNext()) {
+ values.push_back(fetcher.get()->value());
+ }
+
+ EXPECT_FALSE(fetcher.FetchNextIf(MakeShouldFetch(false)));
+ EXPECT_FALSE(fetcher.FetchIf(MakeShouldFetch(false)));
+
+ if (fetcher.FetchIf(MakeShouldFetch(true))) {
+ values.push_back(fetcher.get()->value());
+ }
+
+ this->Exit();
+ });
+
+ loop2->OnRun([&test_timer, &loop2]() {
+ test_timer->Schedule(loop2->monotonic_now(),
+ ::std::chrono::milliseconds(100));
+ });
+
+ Run();
+ EXPECT_THAT(values, ::testing::ElementsAreArray({201, 202, 204}));
+}
+
// Tests that FetchNext behaves correctly when we get two messages in the queue
// but don't consume the first until after the second has been sent.
TEST_P(AbstractEventLoopTest, FetchNextTest) {
@@ -732,6 +906,49 @@
EXPECT_EQ(200, fetcher.get()->value());
}
+// Tests that FetchNext behaves correctly when we get two messages in the queue
+// but don't consume the first until after the second has been sent.
+TEST_P(AbstractEventLoopTest, FetchNextIfTest) {
+ auto send_loop = Make();
+ auto fetch_loop = Make();
+ auto sender = send_loop->MakeSender<TestMessage>("/test");
+ Fetcher<TestMessage> fetcher = fetch_loop->MakeFetcher<TestMessage>("/test");
+
+ {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(100);
+ msg.CheckOk(msg.Send(builder.Finish()));
+ }
+
+ {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ msg.CheckOk(msg.Send(builder.Finish()));
+ }
+
+ size_t called_count = 0;
+ ASSERT_TRUE(fetcher.FetchNextIf(MakeShouldFetch(true, &called_count)));
+ ASSERT_NE(nullptr, fetcher.get());
+ EXPECT_EQ(100, fetcher.get()->value());
+ EXPECT_EQ(called_count, 1u);
+
+ ASSERT_FALSE(fetcher.FetchNextIf(MakeShouldFetch(false, &called_count)));
+ EXPECT_EQ(called_count, 2u);
+
+ ASSERT_TRUE(fetcher.FetchNextIf(MakeShouldFetch(true, &called_count)));
+ ASSERT_NE(nullptr, fetcher.get());
+ EXPECT_EQ(200, fetcher.get()->value());
+ EXPECT_EQ(called_count, 3u);
+
+ // When we run off the end of the queue, expect to still have the old message:
+ ASSERT_FALSE(fetcher.FetchNextIf(MakeShouldFetch(false, &called_count)));
+ EXPECT_EQ(called_count, 3u);
+ ASSERT_NE(nullptr, fetcher.get());
+ EXPECT_EQ(200, fetcher.get()->value());
+}
+
// Verify that a fetcher still holds its data, even after falling behind.
TEST_P(AbstractEventLoopTest, FetcherBehindData) {
auto send_loop = Make();
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index 6c132b2..9a5fe19 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -102,6 +102,37 @@
return false;
}
+inline bool RawFetcher::FetchNextIf(std::function<bool(const Context &)> fn) {
+ DCHECK(fn);
+ const auto result = DoFetchNextIf(std::move(fn));
+ if (result.first) {
+ if (timing_.fetcher) {
+ timing_.fetcher->mutate_count(timing_.fetcher->count() + 1);
+ }
+ const monotonic_clock::time_point monotonic_time = result.second;
+ ftrace_.FormatMessage(
+ "%.*s: fetch next if: now=%" PRId64 " event=%" PRId64 " queue=%" PRIu32,
+ static_cast<int>(ftrace_prefix_.size()), ftrace_prefix_.data(),
+ static_cast<int64_t>(monotonic_time.time_since_epoch().count()),
+ static_cast<int64_t>(
+ context_.monotonic_event_time.time_since_epoch().count()),
+ context_.queue_index);
+ const float latency =
+ std::chrono::duration_cast<std::chrono::duration<float>>(
+ monotonic_time - context_.monotonic_event_time)
+ .count();
+ timing_.latency.Add(latency);
+ return true;
+ }
+ ftrace_.FormatMessage(
+ "%.*s: fetch next: still event=%" PRId64 " queue=%" PRIu32,
+ static_cast<int>(ftrace_prefix_.size()), ftrace_prefix_.data(),
+ static_cast<int64_t>(
+ context_.monotonic_event_time.time_since_epoch().count()),
+ context_.queue_index);
+ return false;
+}
+
inline bool RawFetcher::Fetch() {
const auto result = DoFetch();
if (result.first) {
@@ -132,6 +163,38 @@
return false;
}
+inline bool RawFetcher::FetchIf(std::function<bool(const Context &)> fn) {
+ DCHECK(fn);
+
+ const auto result = DoFetchIf(std::move(fn));
+ if (result.first) {
+ if (timing_.fetcher) {
+ timing_.fetcher->mutate_count(timing_.fetcher->count() + 1);
+ }
+ const monotonic_clock::time_point monotonic_time = result.second;
+ ftrace_.FormatMessage(
+ "%.*s: fetch latest: now=%" PRId64 " event=%" PRId64 " queue=%" PRIu32,
+ static_cast<int>(ftrace_prefix_.size()), ftrace_prefix_.data(),
+ static_cast<int64_t>(monotonic_time.time_since_epoch().count()),
+ static_cast<int64_t>(
+ context_.monotonic_event_time.time_since_epoch().count()),
+ context_.queue_index);
+ const float latency =
+ std::chrono::duration_cast<std::chrono::duration<float>>(
+ monotonic_time - context_.monotonic_event_time)
+ .count();
+ timing_.latency.Add(latency);
+ return true;
+ }
+ ftrace_.FormatMessage(
+ "%.*s: fetch latest: still event=%" PRId64 " queue=%" PRIu32,
+ static_cast<int>(ftrace_prefix_.size()), ftrace_prefix_.data(),
+ static_cast<int64_t>(
+ context_.monotonic_event_time.time_since_epoch().count()),
+ context_.queue_index);
+ return false;
+}
+
inline RawSender::Error RawSender::Send(size_t size) {
return Send(size, monotonic_clock::min_time, realtime_clock::min_time,
0xffffffffu, event_loop_->boot_uuid());
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 8523cdf..0dc23d9 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -165,14 +165,16 @@
}
}
- bool FetchNext() {
+ bool FetchNext() { return FetchNextIf(std::ref(should_fetch_)); }
+
+ bool FetchNextIf(std::function<bool(const Context &)> fn) {
const ipc_lib::LocklessQueueReader::Result read_result =
- DoFetch(actual_queue_index_);
+ DoFetch(actual_queue_index_, std::move(fn));
return read_result == ipc_lib::LocklessQueueReader::Result::GOOD;
}
- bool Fetch() {
+ bool FetchIf(std::function<bool(const Context &)> fn) {
const ipc_lib::QueueIndex queue_index = reader_.LatestIndex();
// actual_queue_index_ is only meaningful if it was set by Fetch or
// FetchNext. This happens when valid_data_ has been set. So, only
@@ -187,7 +189,7 @@
}
const ipc_lib::LocklessQueueReader::Result read_result =
- DoFetch(queue_index);
+ DoFetch(queue_index, std::move(fn));
CHECK(read_result != ipc_lib::LocklessQueueReader::Result::NOTHING_NEW)
<< ": Queue index went backwards. This should never happen. "
@@ -196,6 +198,8 @@
return read_result == ipc_lib::LocklessQueueReader::Result::GOOD;
}
+ bool Fetch() { return FetchIf(std::ref(should_fetch_)); }
+
Context context() const { return context_; }
bool RegisterWakeup(int priority) {
@@ -229,7 +233,8 @@
private:
ipc_lib::LocklessQueueReader::Result DoFetch(
- ipc_lib::QueueIndex queue_index) {
+ ipc_lib::QueueIndex queue_index,
+ std::function<bool(const Context &context)> fn) {
// TODO(austin): Get behind and make sure it dies.
char *copy_buffer = nullptr;
if (copy_data()) {
@@ -239,8 +244,7 @@
queue_index.index(), &context_.monotonic_event_time,
&context_.realtime_event_time, &context_.monotonic_remote_time,
&context_.realtime_remote_time, &context_.remote_queue_index,
- &context_.source_boot_uuid, &context_.size, copy_buffer,
- std::ref(should_fetch_));
+ &context_.source_boot_uuid, &context_.size, copy_buffer, std::move(fn));
if (read_result == ipc_lib::LocklessQueueReader::Result::GOOD) {
if (pin_data()) {
@@ -336,9 +340,7 @@
Context context_;
// Pre-allocated should_fetch function so we don't allocate.
- std::function<bool(const Context &)> should_fetch_ = [](const Context &) {
- return true;
- };
+ const std::function<bool(const Context &)> should_fetch_;
};
class ShmFetcher : public RawFetcher {
@@ -364,6 +366,16 @@
return std::make_pair(false, monotonic_clock::min_time);
}
+ std::pair<bool, monotonic_clock::time_point> DoFetchNextIf(
+ std::function<bool(const Context &context)> fn) override {
+ shm_event_loop()->CheckCurrentThread();
+ if (simple_shm_fetcher_.FetchNextIf(std::move(fn))) {
+ context_ = simple_shm_fetcher_.context();
+ return std::make_pair(true, monotonic_clock::now());
+ }
+ return std::make_pair(false, monotonic_clock::min_time);
+ }
+
std::pair<bool, monotonic_clock::time_point> DoFetch() override {
shm_event_loop()->CheckCurrentThread();
if (simple_shm_fetcher_.Fetch()) {
@@ -373,6 +385,16 @@
return std::make_pair(false, monotonic_clock::min_time);
}
+ std::pair<bool, monotonic_clock::time_point> DoFetchIf(
+ std::function<bool(const Context &context)> fn) override {
+ shm_event_loop()->CheckCurrentThread();
+ if (simple_shm_fetcher_.FetchIf(std::move(fn))) {
+ context_ = simple_shm_fetcher_.context();
+ return std::make_pair(true, monotonic_clock::now());
+ }
+ return std::make_pair(false, monotonic_clock::min_time);
+ }
+
absl::Span<const char> GetPrivateMemory() const {
return simple_shm_fetcher_.GetPrivateMemory();
}
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 7e469f5..59d8d7a 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -393,6 +393,11 @@
~SimulatedFetcher() { simulated_channel_->UnregisterFetcher(this); }
std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
+ return DoFetchNextIf(std::function<bool(const Context &context)>());
+ }
+
+ std::pair<bool, monotonic_clock::time_point> DoFetchNextIf(
+ std::function<bool(const Context &context)> fn) override {
// The allocations in here are due to infrastructure and don't count in the
// no mallocs in RT code.
ScopedNotRealtime nrt;
@@ -404,12 +409,27 @@
<< configuration::StrippedChannelToString(
simulated_channel_->channel());
- SetMsg(msgs_.front());
+ if (fn) {
+ Context context = msgs_.front()->context;
+ context.data = nullptr;
+ context.buffer_index = -1;
+
+ if (!fn(context)) {
+ return std::make_pair(false, monotonic_clock::min_time);
+ }
+ }
+
+ SetMsg(std::move(msgs_.front()));
msgs_.pop_front();
return std::make_pair(true, event_loop()->monotonic_now());
}
std::pair<bool, monotonic_clock::time_point> DoFetch() override {
+ return DoFetchIf(std::function<bool(const Context &context)>());
+ }
+
+ std::pair<bool, monotonic_clock::time_point> DoFetchIf(
+ std::function<bool(const Context &context)> fn) override {
// The allocations in here are due to infrastructure and don't count in the
// no mallocs in RT code.
ScopedNotRealtime nrt;
@@ -417,13 +437,35 @@
// TODO(austin): Can we just do this logic unconditionally? It is a lot
// simpler. And call clear, obviously.
if (!msg_ && simulated_channel_->latest_message()) {
- SetMsg(simulated_channel_->latest_message());
+ std::shared_ptr<SimulatedMessage> latest_message =
+ simulated_channel_->latest_message();
+
+ if (fn) {
+ Context context = latest_message->context;
+ context.data = nullptr;
+ context.buffer_index = -1;
+
+ if (!fn(context)) {
+ return std::make_pair(false, monotonic_clock::min_time);
+ }
+ }
+ SetMsg(std::move(latest_message));
return std::make_pair(true, event_loop()->monotonic_now());
} else {
return std::make_pair(false, monotonic_clock::min_time);
}
}
+ if (fn) {
+ Context context = msgs_.back()->context;
+ context.data = nullptr;
+ context.buffer_index = -1;
+
+ if (!fn(context)) {
+ return std::make_pair(false, monotonic_clock::min_time);
+ }
+ }
+
// We've had a message enqueued, so we don't need to go looking for the
// latest message from before we started.
SetMsg(msgs_.back());