Add FetchIf and FetchNextIf to aos::EventLoop

This is mostly to support logging X seconds behind now so we can trigger
restarts cleanly.  This lets us peek at the message header before
triggering a copy of the contents.

Change-Id: I41b048a6d1ed4f4951f89bdbb093fbbbf44e64a1
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 174df38..a09c3d8 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -471,7 +471,7 @@
     name = "shm_event_loop_test",
     srcs = ["shm_event_loop_test.cc"],
     flaky = True,
-    shard_count = 24,
+    shard_count = 50,
     target_compatible_with = ["@platforms//os:linux"],
     deps = [
         ":event_loop_param_test",
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 17e3e00..3926e26 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -48,9 +48,16 @@
   // Fetches the next message in the queue without blocking. Returns true if
   // there was a new message and we got it.
   bool FetchNext();
+  // Fetches the next message if there is one, and the provided function returns
+  // true.  The data and buffer_index are the only pieces of the Context which
+  // are zeroed out.  The function must be valid.
+  bool FetchNextIf(std::function<bool(const Context &context)> fn);
 
   // Fetches the latest message without blocking.
   bool Fetch();
+  // Fetches the latest message conditionally without blocking.  fn must be
+  // valid.
+  bool FetchIf(std::function<bool(const Context &context)> fn);
 
   // Returns the channel this fetcher uses.
   const Channel *channel() const { return channel_; }
@@ -67,7 +74,11 @@
   friend class EventLoop;
   // Implementation
   virtual std::pair<bool, monotonic_clock::time_point> DoFetchNext() = 0;
+  virtual std::pair<bool, monotonic_clock::time_point> DoFetchNextIf(
+      std::function<bool(const Context &)> fn) = 0;
   virtual std::pair<bool, monotonic_clock::time_point> DoFetch() = 0;
+  virtual std::pair<bool, monotonic_clock::time_point> DoFetchIf(
+      std::function<bool(const Context &)> fn) = 0;
 
   EventLoop *const event_loop_;
   const Channel *const channel_;
@@ -241,6 +252,18 @@
     return result;
   }
 
+  // Fetches the next message if there is one, and the provided function returns
+  // true.  The data and buffer_index are the only pieces of the Context which
+  // are zeroed out.  The function must be valid.
+  bool FetchNextIf(std::function<bool(const Context &)> fn) {
+    const bool result = CHECK_NOTNULL(fetcher_)->FetchNextIf(std::move(fn));
+    if (result) {
+      CheckChannelDataAlignment(fetcher_->context().data,
+                                fetcher_->context().size);
+    }
+    return result;
+  }
+
   // Fetches the most recent message. Returns true if it fetched a new message.
   // This will return the latest message regardless of if it was sent before or
   // after the fetcher was created.
@@ -253,6 +276,18 @@
     return result;
   }
 
+  // Fetches the most recent message conditionally. Returns true if it fetched a
+  // new message. This will return the latest message regardless of if it was
+  // sent before or after the fetcher was created.  The function must be valid.
+  bool FetchIf(std::function<bool(const Context &)> fn) {
+    const bool result = CHECK_NOTNULL(fetcher_)->FetchIf(std::move(fn));
+    if (result) {
+      CheckChannelDataAlignment(fetcher_->context().data,
+                                fetcher_->context().size);
+    }
+    return result;
+  }
+
   // Returns a pointer to the contained flatbuffer, or nullptr if there is no
   // available message.
   const T *get() const {
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 0559aae..ba2d426 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -319,6 +319,93 @@
   }
 }
 
+std::function<bool(const Context &)> MakeShouldFetch(
+    bool should_fetch, size_t *called_count = nullptr) {
+  return [should_fetch, called_count](const Context &) {
+    if (called_count != nullptr) {
+      (*called_count)++;
+    }
+    return should_fetch;
+  };
+}
+
+// Tests that a fetcher using FetchIf can fetch from a sender.
+TEST_P(AbstractEventLoopTest, FetchIfWithoutRun) {
+  auto loop1 = Make();
+  auto loop2 = Make();
+  auto loop3 = MakePrimary();
+
+  auto sender = loop1->MakeSender<TestMessage>("/test");
+
+  auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
+
+  for (const bool should_fetch : {true, false}) {
+    EXPECT_FALSE(fetcher.FetchIf(MakeShouldFetch(should_fetch)));
+    EXPECT_EQ(fetcher.get(), nullptr);
+
+    EXPECT_EQ(fetcher.context().monotonic_event_time,
+              monotonic_clock::min_time);
+    EXPECT_EQ(fetcher.context().monotonic_remote_time,
+              monotonic_clock::min_time);
+    EXPECT_EQ(fetcher.context().realtime_event_time, realtime_clock::min_time);
+    EXPECT_EQ(fetcher.context().realtime_remote_time, realtime_clock::min_time);
+    EXPECT_EQ(fetcher.context().source_boot_uuid, UUID::Zero());
+    EXPECT_EQ(fetcher.context().queue_index, 0xffffffffu);
+    EXPECT_EQ(fetcher.context().size, 0u);
+    EXPECT_EQ(fetcher.context().data, nullptr);
+    EXPECT_EQ(fetcher.context().buffer_index, -1);
+  }
+
+  aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+  TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+  builder.add_value(200);
+  msg.CheckOk(msg.Send(builder.Finish()));
+
+  // Make sure failing to fetch won't affect anything.
+  EXPECT_FALSE(fetcher.FetchIf(MakeShouldFetch(false)));
+  EXPECT_EQ(fetcher.get(), nullptr);
+
+  EXPECT_EQ(fetcher.context().monotonic_event_time, monotonic_clock::min_time);
+  EXPECT_EQ(fetcher.context().monotonic_remote_time, monotonic_clock::min_time);
+  EXPECT_EQ(fetcher.context().realtime_event_time, realtime_clock::min_time);
+  EXPECT_EQ(fetcher.context().realtime_remote_time, realtime_clock::min_time);
+  EXPECT_EQ(fetcher.context().source_boot_uuid, UUID::Zero());
+  EXPECT_EQ(fetcher.context().queue_index, 0xffffffffu);
+  EXPECT_EQ(fetcher.context().size, 0u);
+  EXPECT_EQ(fetcher.context().data, nullptr);
+  EXPECT_EQ(fetcher.context().buffer_index, -1);
+
+  // And now confirm we succeed and everything gets set right.
+  EXPECT_TRUE(fetcher.FetchIf(MakeShouldFetch(true)));
+  ASSERT_FALSE(fetcher.get() == nullptr);
+  EXPECT_EQ(fetcher.get()->value(), 200);
+
+  const chrono::milliseconds kEpsilon(100);
+
+  const aos::monotonic_clock::time_point monotonic_now = loop2->monotonic_now();
+  const aos::realtime_clock::time_point realtime_now = loop2->realtime_now();
+  EXPECT_EQ(fetcher.context().monotonic_event_time,
+            fetcher.context().monotonic_remote_time);
+  EXPECT_EQ(fetcher.context().realtime_event_time,
+            fetcher.context().realtime_remote_time);
+
+  EXPECT_GE(fetcher.context().monotonic_event_time, monotonic_now - kEpsilon);
+  EXPECT_LE(fetcher.context().monotonic_event_time, monotonic_now + kEpsilon);
+  EXPECT_GE(fetcher.context().realtime_event_time, realtime_now - kEpsilon);
+  EXPECT_LE(fetcher.context().realtime_event_time, realtime_now + kEpsilon);
+  EXPECT_EQ(fetcher.context().source_boot_uuid, loop2->boot_uuid());
+  EXPECT_EQ(fetcher.context().queue_index, 0x0u);
+  EXPECT_EQ(fetcher.context().size, 20u);
+  EXPECT_NE(fetcher.context().data, nullptr);
+  if (read_method() == ReadMethod::PIN) {
+    EXPECT_GE(fetcher.context().buffer_index, 0);
+    EXPECT_LT(fetcher.context().buffer_index,
+              loop2->NumberBuffers(fetcher.channel()));
+  } else {
+    EXPECT_EQ(fetcher.context().buffer_index, -1);
+  }
+}
+
 // Tests that watcher will receive all messages sent if they are sent after
 // initialization and before running.
 TEST_P(AbstractEventLoopTest, DoubleSendAtStartup) {
@@ -345,16 +432,10 @@
   }
 
   loop2->OnRun([&]() {
-    {
+    for (int i = 200; i < 202; ++i) {
       aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
       TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-      builder.add_value(200);
-      msg.CheckOk(msg.Send(builder.Finish()));
-    }
-    {
-      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
-      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-      builder.add_value(201);
+      builder.add_value(i);
       msg.CheckOk(msg.Send(builder.Finish()));
     }
   });
@@ -374,16 +455,10 @@
 
   ::std::vector<int> values;
 
-  {
+  for (int i = 200; i < 202; ++i) {
     aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
     TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-    builder.add_value(200);
-    msg.CheckOk(msg.Send(builder.Finish()));
-  }
-  {
-    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
-    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-    builder.add_value(201);
+    builder.add_value(i);
     msg.CheckOk(msg.Send(builder.Finish()));
   }
 
@@ -412,16 +487,10 @@
 
   ::std::vector<int> values;
 
-  {
+  for (int i = 200; i < 202; ++i) {
     aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
     TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-    builder.add_value(200);
-    msg.CheckOk(msg.Send(builder.Finish()));
-  }
-  {
-    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
-    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-    builder.add_value(201);
+    builder.add_value(i);
     msg.CheckOk(msg.Send(builder.Finish()));
   }
 
@@ -451,16 +520,10 @@
 
   ::std::vector<int> values;
 
-  {
+  for (int i = 200; i < 202; ++i) {
     aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
     TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-    builder.add_value(200);
-    msg.CheckOk(msg.Send(builder.Finish()));
-  }
-  {
-    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
-    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-    builder.add_value(201);
+    builder.add_value(i);
     msg.CheckOk(msg.Send(builder.Finish()));
   }
 
@@ -483,6 +546,41 @@
   EXPECT_THAT(0, values.size());
 }
 
+// Tests that FetchNextIf gets no messages sent before it is constructed.
+TEST_P(AbstractEventLoopTest, FetchNextIfAfterSend) {
+  auto loop1 = Make();
+  auto loop2 = MakePrimary();
+
+  auto sender = loop1->MakeSender<TestMessage>("/test");
+
+  ::std::vector<int> values;
+
+  for (int i = 200; i < 202; ++i) {
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(i);
+    msg.CheckOk(msg.Send(builder.Finish()));
+  }
+
+  auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
+
+  // Add a timer to actually quit.
+  auto test_timer = loop2->AddTimer([&fetcher, &values, this]() {
+    while (fetcher.FetchNextIf(MakeShouldFetch(true))) {
+      values.push_back(fetcher.get()->value());
+    }
+    this->Exit();
+  });
+
+  loop2->OnRun([&test_timer, &loop2]() {
+    test_timer->Schedule(loop2->monotonic_now(),
+                         ::std::chrono::milliseconds(100));
+  });
+
+  Run();
+  EXPECT_EQ(0, values.size());
+}
+
 // Tests that Fetch returns the last message created before the loop was
 // started.
 TEST_P(AbstractEventLoopTest, FetchDataFromBeforeCreation) {
@@ -493,16 +591,10 @@
 
   ::std::vector<int> values;
 
-  {
+  for (int i = 200; i < 202; ++i) {
     aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
     TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-    builder.add_value(200);
-    msg.CheckOk(msg.Send(builder.Finish()));
-  }
-  {
-    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
-    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-    builder.add_value(201);
+    builder.add_value(i);
     msg.CheckOk(msg.Send(builder.Finish()));
   }
 
@@ -529,6 +621,50 @@
   EXPECT_THAT(values, ::testing::ElementsAreArray({201}));
 }
 
+// Tests that FetchIf returns the last message created before the loop was
+// started.
+TEST_P(AbstractEventLoopTest, FetchIfDataFromBeforeCreation) {
+  auto loop1 = Make();
+  auto loop2 = MakePrimary();
+
+  auto sender = loop1->MakeSender<TestMessage>("/test");
+
+  ::std::vector<int> values;
+
+  for (int i = 200; i < 202; ++i) {
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(i);
+    msg.CheckOk(msg.Send(builder.Finish()));
+  }
+
+  auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
+
+  // Add a timer to actually quit.
+  auto test_timer = loop2->AddTimer([&fetcher, &values, this]() {
+    if (fetcher.FetchIf(MakeShouldFetch(true))) {
+      values.push_back(fetcher.get()->value());
+    }
+
+    if (fetcher.FetchIf(MakeShouldFetch(false))) {
+      values.push_back(fetcher.get()->value());
+    }
+    // Do it again to make sure we don't double fetch.
+    if (fetcher.FetchIf(MakeShouldFetch(true))) {
+      values.push_back(fetcher.get()->value());
+    }
+    this->Exit();
+  });
+
+  loop2->OnRun([&test_timer, &loop2]() {
+    test_timer->Schedule(loop2->monotonic_now(),
+                         ::std::chrono::milliseconds(100));
+  });
+
+  Run();
+  EXPECT_THAT(values, ::testing::ElementsAreArray({201}));
+}
+
 // Tests that timer handler is enabled after setup (even if it is in the past)
 // and is disabled after running
 TEST_P(AbstractEventLoopTest, CheckTimerDisabled) {
@@ -636,16 +772,10 @@
 
   ::std::vector<int> values;
 
-  {
+  for (int i = 200; i < 202; ++i) {
     aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
     TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-    builder.add_value(200);
-    msg.CheckOk(msg.Send(builder.Finish()));
-  }
-  {
-    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
-    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-    builder.add_value(201);
+    builder.add_value(i);
     msg.CheckOk(msg.Send(builder.Finish()));
   }
 
@@ -657,22 +787,10 @@
       values.push_back(fetcher.get()->value());
     }
 
-    {
+    for (int i = 202; i < 205; ++i) {
       aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
       TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-      builder.add_value(202);
-      msg.CheckOk(msg.Send(builder.Finish()));
-    }
-    {
-      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
-      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-      builder.add_value(203);
-      msg.CheckOk(msg.Send(builder.Finish()));
-    }
-    {
-      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
-      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-      builder.add_value(204);
+      builder.add_value(i);
       msg.CheckOk(msg.Send(builder.Finish()));
     }
 
@@ -696,6 +814,62 @@
   EXPECT_THAT(values, ::testing::ElementsAreArray({201, 202, 204}));
 }
 
+// Tests that Fetch{If,} and FetchNext{If,} interleave as expected.
+TEST_P(AbstractEventLoopTest, FetchAndFetchNextIfTogether) {
+  auto loop1 = Make();
+  auto loop2 = MakePrimary();
+
+  auto sender = loop1->MakeSender<TestMessage>("/test");
+
+  ::std::vector<int> values;
+
+  for (int i = 200; i < 202; ++i) {
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(i);
+    msg.CheckOk(msg.Send(builder.Finish()));
+  }
+
+  auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
+
+  // Add a timer to actually quit.
+  auto test_timer = loop2->AddTimer([&fetcher, &values, &sender, this]() {
+    if (fetcher.Fetch()) {
+      values.push_back(fetcher.get()->value());
+    }
+
+    for (int i = 202; i < 205; ++i) {
+      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+      builder.add_value(i);
+      msg.CheckOk(msg.Send(builder.Finish()));
+    }
+
+    EXPECT_FALSE(fetcher.FetchNextIf(MakeShouldFetch(false)));
+
+    if (fetcher.FetchNext()) {
+      values.push_back(fetcher.get()->value());
+    }
+
+    EXPECT_FALSE(fetcher.FetchNextIf(MakeShouldFetch(false)));
+    EXPECT_FALSE(fetcher.FetchIf(MakeShouldFetch(false)));
+
+    if (fetcher.FetchIf(MakeShouldFetch(true))) {
+      values.push_back(fetcher.get()->value());
+    }
+
+    this->Exit();
+  });
+
+  loop2->OnRun([&test_timer, &loop2]() {
+    test_timer->Schedule(loop2->monotonic_now(),
+                         ::std::chrono::milliseconds(100));
+  });
+
+  Run();
+  EXPECT_THAT(values, ::testing::ElementsAreArray({201, 202, 204}));
+}
+
 // Tests that FetchNext behaves correctly when we get two messages in the queue
 // but don't consume the first until after the second has been sent.
 TEST_P(AbstractEventLoopTest, FetchNextTest) {
@@ -732,6 +906,49 @@
   EXPECT_EQ(200, fetcher.get()->value());
 }
 
+// Tests that FetchNext behaves correctly when we get two messages in the queue
+// but don't consume the first until after the second has been sent.
+TEST_P(AbstractEventLoopTest, FetchNextIfTest) {
+  auto send_loop = Make();
+  auto fetch_loop = Make();
+  auto sender = send_loop->MakeSender<TestMessage>("/test");
+  Fetcher<TestMessage> fetcher = fetch_loop->MakeFetcher<TestMessage>("/test");
+
+  {
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(100);
+    msg.CheckOk(msg.Send(builder.Finish()));
+  }
+
+  {
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(200);
+    msg.CheckOk(msg.Send(builder.Finish()));
+  }
+
+  size_t called_count = 0;
+  ASSERT_TRUE(fetcher.FetchNextIf(MakeShouldFetch(true, &called_count)));
+  ASSERT_NE(nullptr, fetcher.get());
+  EXPECT_EQ(100, fetcher.get()->value());
+  EXPECT_EQ(called_count, 1u);
+
+  ASSERT_FALSE(fetcher.FetchNextIf(MakeShouldFetch(false, &called_count)));
+  EXPECT_EQ(called_count, 2u);
+
+  ASSERT_TRUE(fetcher.FetchNextIf(MakeShouldFetch(true, &called_count)));
+  ASSERT_NE(nullptr, fetcher.get());
+  EXPECT_EQ(200, fetcher.get()->value());
+  EXPECT_EQ(called_count, 3u);
+
+  // When we run off the end of the queue, expect to still have the old message:
+  ASSERT_FALSE(fetcher.FetchNextIf(MakeShouldFetch(false, &called_count)));
+  EXPECT_EQ(called_count, 3u);
+  ASSERT_NE(nullptr, fetcher.get());
+  EXPECT_EQ(200, fetcher.get()->value());
+}
+
 // Verify that a fetcher still holds its data, even after falling behind.
 TEST_P(AbstractEventLoopTest, FetcherBehindData) {
   auto send_loop = Make();
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index 6c132b2..9a5fe19 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -102,6 +102,37 @@
   return false;
 }
 
+inline bool RawFetcher::FetchNextIf(std::function<bool(const Context &)> fn) {
+  DCHECK(fn);
+  const auto result = DoFetchNextIf(std::move(fn));
+  if (result.first) {
+    if (timing_.fetcher) {
+      timing_.fetcher->mutate_count(timing_.fetcher->count() + 1);
+    }
+    const monotonic_clock::time_point monotonic_time = result.second;
+    ftrace_.FormatMessage(
+        "%.*s: fetch next if: now=%" PRId64 " event=%" PRId64 " queue=%" PRIu32,
+        static_cast<int>(ftrace_prefix_.size()), ftrace_prefix_.data(),
+        static_cast<int64_t>(monotonic_time.time_since_epoch().count()),
+        static_cast<int64_t>(
+            context_.monotonic_event_time.time_since_epoch().count()),
+        context_.queue_index);
+    const float latency =
+        std::chrono::duration_cast<std::chrono::duration<float>>(
+            monotonic_time - context_.monotonic_event_time)
+            .count();
+    timing_.latency.Add(latency);
+    return true;
+  }
+  ftrace_.FormatMessage(
+      "%.*s: fetch next: still event=%" PRId64 " queue=%" PRIu32,
+      static_cast<int>(ftrace_prefix_.size()), ftrace_prefix_.data(),
+      static_cast<int64_t>(
+          context_.monotonic_event_time.time_since_epoch().count()),
+      context_.queue_index);
+  return false;
+}
+
 inline bool RawFetcher::Fetch() {
   const auto result = DoFetch();
   if (result.first) {
@@ -132,6 +163,38 @@
   return false;
 }
 
+inline bool RawFetcher::FetchIf(std::function<bool(const Context &)> fn) {
+  DCHECK(fn);
+
+  const auto result = DoFetchIf(std::move(fn));
+  if (result.first) {
+    if (timing_.fetcher) {
+      timing_.fetcher->mutate_count(timing_.fetcher->count() + 1);
+    }
+    const monotonic_clock::time_point monotonic_time = result.second;
+    ftrace_.FormatMessage(
+        "%.*s: fetch latest: now=%" PRId64 " event=%" PRId64 " queue=%" PRIu32,
+        static_cast<int>(ftrace_prefix_.size()), ftrace_prefix_.data(),
+        static_cast<int64_t>(monotonic_time.time_since_epoch().count()),
+        static_cast<int64_t>(
+            context_.monotonic_event_time.time_since_epoch().count()),
+        context_.queue_index);
+    const float latency =
+        std::chrono::duration_cast<std::chrono::duration<float>>(
+            monotonic_time - context_.monotonic_event_time)
+            .count();
+    timing_.latency.Add(latency);
+    return true;
+  }
+  ftrace_.FormatMessage(
+      "%.*s: fetch latest: still event=%" PRId64 " queue=%" PRIu32,
+      static_cast<int>(ftrace_prefix_.size()), ftrace_prefix_.data(),
+      static_cast<int64_t>(
+          context_.monotonic_event_time.time_since_epoch().count()),
+      context_.queue_index);
+  return false;
+}
+
 inline RawSender::Error RawSender::Send(size_t size) {
   return Send(size, monotonic_clock::min_time, realtime_clock::min_time,
               0xffffffffu, event_loop_->boot_uuid());
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 8523cdf..0dc23d9 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -165,14 +165,16 @@
     }
   }
 
-  bool FetchNext() {
+  bool FetchNext() { return FetchNextIf(std::ref(should_fetch_)); }
+
+  bool FetchNextIf(std::function<bool(const Context &)> fn) {
     const ipc_lib::LocklessQueueReader::Result read_result =
-        DoFetch(actual_queue_index_);
+        DoFetch(actual_queue_index_, std::move(fn));
 
     return read_result == ipc_lib::LocklessQueueReader::Result::GOOD;
   }
 
-  bool Fetch() {
+  bool FetchIf(std::function<bool(const Context &)> fn) {
     const ipc_lib::QueueIndex queue_index = reader_.LatestIndex();
     // actual_queue_index_ is only meaningful if it was set by Fetch or
     // FetchNext.  This happens when valid_data_ has been set.  So, only
@@ -187,7 +189,7 @@
     }
 
     const ipc_lib::LocklessQueueReader::Result read_result =
-        DoFetch(queue_index);
+        DoFetch(queue_index, std::move(fn));
 
     CHECK(read_result != ipc_lib::LocklessQueueReader::Result::NOTHING_NEW)
         << ": Queue index went backwards.  This should never happen.  "
@@ -196,6 +198,8 @@
     return read_result == ipc_lib::LocklessQueueReader::Result::GOOD;
   }
 
+  bool Fetch() { return FetchIf(std::ref(should_fetch_)); }
+
   Context context() const { return context_; }
 
   bool RegisterWakeup(int priority) {
@@ -229,7 +233,8 @@
 
  private:
   ipc_lib::LocklessQueueReader::Result DoFetch(
-      ipc_lib::QueueIndex queue_index) {
+      ipc_lib::QueueIndex queue_index,
+      std::function<bool(const Context &context)> fn) {
     // TODO(austin): Get behind and make sure it dies.
     char *copy_buffer = nullptr;
     if (copy_data()) {
@@ -239,8 +244,7 @@
         queue_index.index(), &context_.monotonic_event_time,
         &context_.realtime_event_time, &context_.monotonic_remote_time,
         &context_.realtime_remote_time, &context_.remote_queue_index,
-        &context_.source_boot_uuid, &context_.size, copy_buffer,
-        std::ref(should_fetch_));
+        &context_.source_boot_uuid, &context_.size, copy_buffer, std::move(fn));
 
     if (read_result == ipc_lib::LocklessQueueReader::Result::GOOD) {
       if (pin_data()) {
@@ -336,9 +340,7 @@
   Context context_;
 
   // Pre-allocated should_fetch function so we don't allocate.
-  std::function<bool(const Context &)> should_fetch_ = [](const Context &) {
-    return true;
-  };
+  const std::function<bool(const Context &)> should_fetch_;
 };
 
 class ShmFetcher : public RawFetcher {
@@ -364,6 +366,16 @@
     return std::make_pair(false, monotonic_clock::min_time);
   }
 
+  std::pair<bool, monotonic_clock::time_point> DoFetchNextIf(
+      std::function<bool(const Context &context)> fn) override {
+    shm_event_loop()->CheckCurrentThread();
+    if (simple_shm_fetcher_.FetchNextIf(std::move(fn))) {
+      context_ = simple_shm_fetcher_.context();
+      return std::make_pair(true, monotonic_clock::now());
+    }
+    return std::make_pair(false, monotonic_clock::min_time);
+  }
+
   std::pair<bool, monotonic_clock::time_point> DoFetch() override {
     shm_event_loop()->CheckCurrentThread();
     if (simple_shm_fetcher_.Fetch()) {
@@ -373,6 +385,16 @@
     return std::make_pair(false, monotonic_clock::min_time);
   }
 
+  std::pair<bool, monotonic_clock::time_point> DoFetchIf(
+      std::function<bool(const Context &context)> fn) override {
+    shm_event_loop()->CheckCurrentThread();
+    if (simple_shm_fetcher_.FetchIf(std::move(fn))) {
+      context_ = simple_shm_fetcher_.context();
+      return std::make_pair(true, monotonic_clock::now());
+    }
+    return std::make_pair(false, monotonic_clock::min_time);
+  }
+
   absl::Span<const char> GetPrivateMemory() const {
     return simple_shm_fetcher_.GetPrivateMemory();
   }
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 7e469f5..59d8d7a 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -393,6 +393,11 @@
   ~SimulatedFetcher() { simulated_channel_->UnregisterFetcher(this); }
 
   std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
+    return DoFetchNextIf(std::function<bool(const Context &context)>());
+  }
+
+  std::pair<bool, monotonic_clock::time_point> DoFetchNextIf(
+      std::function<bool(const Context &context)> fn) override {
     // The allocations in here are due to infrastructure and don't count in the
     // no mallocs in RT code.
     ScopedNotRealtime nrt;
@@ -404,12 +409,27 @@
                          << configuration::StrippedChannelToString(
                                 simulated_channel_->channel());
 
-    SetMsg(msgs_.front());
+    if (fn) {
+      Context context = msgs_.front()->context;
+      context.data = nullptr;
+      context.buffer_index = -1;
+
+      if (!fn(context)) {
+        return std::make_pair(false, monotonic_clock::min_time);
+      }
+    }
+
+    SetMsg(std::move(msgs_.front()));
     msgs_.pop_front();
     return std::make_pair(true, event_loop()->monotonic_now());
   }
 
   std::pair<bool, monotonic_clock::time_point> DoFetch() override {
+    return DoFetchIf(std::function<bool(const Context &context)>());
+  }
+
+  std::pair<bool, monotonic_clock::time_point> DoFetchIf(
+      std::function<bool(const Context &context)> fn) override {
     // The allocations in here are due to infrastructure and don't count in the
     // no mallocs in RT code.
     ScopedNotRealtime nrt;
@@ -417,13 +437,35 @@
       // TODO(austin): Can we just do this logic unconditionally?  It is a lot
       // simpler.  And call clear, obviously.
       if (!msg_ && simulated_channel_->latest_message()) {
-        SetMsg(simulated_channel_->latest_message());
+        std::shared_ptr<SimulatedMessage> latest_message =
+            simulated_channel_->latest_message();
+
+        if (fn) {
+          Context context = latest_message->context;
+          context.data = nullptr;
+          context.buffer_index = -1;
+
+          if (!fn(context)) {
+            return std::make_pair(false, monotonic_clock::min_time);
+          }
+        }
+        SetMsg(std::move(latest_message));
         return std::make_pair(true, event_loop()->monotonic_now());
       } else {
         return std::make_pair(false, monotonic_clock::min_time);
       }
     }
 
+    if (fn) {
+      Context context = msgs_.back()->context;
+      context.data = nullptr;
+      context.buffer_index = -1;
+
+      if (!fn(context)) {
+        return std::make_pair(false, monotonic_clock::min_time);
+      }
+    }
+
     // We've had a message enqueued, so we don't need to go looking for the
     // latest message from before we started.
     SetMsg(msgs_.back());