Add FetchIf and FetchNextIf to aos::EventLoop

This is mostly to support logging X seconds behind now so we can trigger
restarts cleanly.  This lets us peek at the message header before
triggering a copy of the contents.

Change-Id: I41b048a6d1ed4f4951f89bdbb093fbbbf44e64a1
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 0559aae..ba2d426 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -319,6 +319,93 @@
   }
 }
 
+std::function<bool(const Context &)> MakeShouldFetch(
+    bool should_fetch, size_t *called_count = nullptr) {
+  return [should_fetch, called_count](const Context &) {
+    if (called_count != nullptr) {
+      (*called_count)++;
+    }
+    return should_fetch;
+  };
+}
+
+// Tests that a fetcher using FetchIf can fetch from a sender.
+TEST_P(AbstractEventLoopTest, FetchIfWithoutRun) {
+  auto loop1 = Make();
+  auto loop2 = Make();
+  auto loop3 = MakePrimary();
+
+  auto sender = loop1->MakeSender<TestMessage>("/test");
+
+  auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
+
+  for (const bool should_fetch : {true, false}) {
+    EXPECT_FALSE(fetcher.FetchIf(MakeShouldFetch(should_fetch)));
+    EXPECT_EQ(fetcher.get(), nullptr);
+
+    EXPECT_EQ(fetcher.context().monotonic_event_time,
+              monotonic_clock::min_time);
+    EXPECT_EQ(fetcher.context().monotonic_remote_time,
+              monotonic_clock::min_time);
+    EXPECT_EQ(fetcher.context().realtime_event_time, realtime_clock::min_time);
+    EXPECT_EQ(fetcher.context().realtime_remote_time, realtime_clock::min_time);
+    EXPECT_EQ(fetcher.context().source_boot_uuid, UUID::Zero());
+    EXPECT_EQ(fetcher.context().queue_index, 0xffffffffu);
+    EXPECT_EQ(fetcher.context().size, 0u);
+    EXPECT_EQ(fetcher.context().data, nullptr);
+    EXPECT_EQ(fetcher.context().buffer_index, -1);
+  }
+
+  aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+  TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+  builder.add_value(200);
+  msg.CheckOk(msg.Send(builder.Finish()));
+
+  // Make sure failing to fetch won't affect anything.
+  EXPECT_FALSE(fetcher.FetchIf(MakeShouldFetch(false)));
+  EXPECT_EQ(fetcher.get(), nullptr);
+
+  EXPECT_EQ(fetcher.context().monotonic_event_time, monotonic_clock::min_time);
+  EXPECT_EQ(fetcher.context().monotonic_remote_time, monotonic_clock::min_time);
+  EXPECT_EQ(fetcher.context().realtime_event_time, realtime_clock::min_time);
+  EXPECT_EQ(fetcher.context().realtime_remote_time, realtime_clock::min_time);
+  EXPECT_EQ(fetcher.context().source_boot_uuid, UUID::Zero());
+  EXPECT_EQ(fetcher.context().queue_index, 0xffffffffu);
+  EXPECT_EQ(fetcher.context().size, 0u);
+  EXPECT_EQ(fetcher.context().data, nullptr);
+  EXPECT_EQ(fetcher.context().buffer_index, -1);
+
+  // And now confirm we succeed and everything gets set right.
+  EXPECT_TRUE(fetcher.FetchIf(MakeShouldFetch(true)));
+  ASSERT_FALSE(fetcher.get() == nullptr);
+  EXPECT_EQ(fetcher.get()->value(), 200);
+
+  const chrono::milliseconds kEpsilon(100);
+
+  const aos::monotonic_clock::time_point monotonic_now = loop2->monotonic_now();
+  const aos::realtime_clock::time_point realtime_now = loop2->realtime_now();
+  EXPECT_EQ(fetcher.context().monotonic_event_time,
+            fetcher.context().monotonic_remote_time);
+  EXPECT_EQ(fetcher.context().realtime_event_time,
+            fetcher.context().realtime_remote_time);
+
+  EXPECT_GE(fetcher.context().monotonic_event_time, monotonic_now - kEpsilon);
+  EXPECT_LE(fetcher.context().monotonic_event_time, monotonic_now + kEpsilon);
+  EXPECT_GE(fetcher.context().realtime_event_time, realtime_now - kEpsilon);
+  EXPECT_LE(fetcher.context().realtime_event_time, realtime_now + kEpsilon);
+  EXPECT_EQ(fetcher.context().source_boot_uuid, loop2->boot_uuid());
+  EXPECT_EQ(fetcher.context().queue_index, 0x0u);
+  EXPECT_EQ(fetcher.context().size, 20u);
+  EXPECT_NE(fetcher.context().data, nullptr);
+  if (read_method() == ReadMethod::PIN) {
+    EXPECT_GE(fetcher.context().buffer_index, 0);
+    EXPECT_LT(fetcher.context().buffer_index,
+              loop2->NumberBuffers(fetcher.channel()));
+  } else {
+    EXPECT_EQ(fetcher.context().buffer_index, -1);
+  }
+}
+
 // Tests that watcher will receive all messages sent if they are sent after
 // initialization and before running.
 TEST_P(AbstractEventLoopTest, DoubleSendAtStartup) {
@@ -345,16 +432,10 @@
   }
 
   loop2->OnRun([&]() {
-    {
+    for (int i = 200; i < 202; ++i) {
       aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
       TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-      builder.add_value(200);
-      msg.CheckOk(msg.Send(builder.Finish()));
-    }
-    {
-      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
-      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-      builder.add_value(201);
+      builder.add_value(i);
       msg.CheckOk(msg.Send(builder.Finish()));
     }
   });
@@ -374,16 +455,10 @@
 
   ::std::vector<int> values;
 
-  {
+  for (int i = 200; i < 202; ++i) {
     aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
     TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-    builder.add_value(200);
-    msg.CheckOk(msg.Send(builder.Finish()));
-  }
-  {
-    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
-    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-    builder.add_value(201);
+    builder.add_value(i);
     msg.CheckOk(msg.Send(builder.Finish()));
   }
 
@@ -412,16 +487,10 @@
 
   ::std::vector<int> values;
 
-  {
+  for (int i = 200; i < 202; ++i) {
     aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
     TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-    builder.add_value(200);
-    msg.CheckOk(msg.Send(builder.Finish()));
-  }
-  {
-    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
-    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-    builder.add_value(201);
+    builder.add_value(i);
     msg.CheckOk(msg.Send(builder.Finish()));
   }
 
@@ -451,16 +520,10 @@
 
   ::std::vector<int> values;
 
-  {
+  for (int i = 200; i < 202; ++i) {
     aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
     TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-    builder.add_value(200);
-    msg.CheckOk(msg.Send(builder.Finish()));
-  }
-  {
-    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
-    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-    builder.add_value(201);
+    builder.add_value(i);
     msg.CheckOk(msg.Send(builder.Finish()));
   }
 
@@ -483,6 +546,41 @@
   EXPECT_THAT(0, values.size());
 }
 
+// Tests that FetchNextIf gets no messages sent before it is constructed.
+TEST_P(AbstractEventLoopTest, FetchNextIfAfterSend) {
+  auto loop1 = Make();
+  auto loop2 = MakePrimary();
+
+  auto sender = loop1->MakeSender<TestMessage>("/test");
+
+  ::std::vector<int> values;
+
+  for (int i = 200; i < 202; ++i) {
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(i);
+    msg.CheckOk(msg.Send(builder.Finish()));
+  }
+
+  auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
+
+  // Add a timer to actually quit.
+  auto test_timer = loop2->AddTimer([&fetcher, &values, this]() {
+    while (fetcher.FetchNextIf(MakeShouldFetch(true))) {
+      values.push_back(fetcher.get()->value());
+    }
+    this->Exit();
+  });
+
+  loop2->OnRun([&test_timer, &loop2]() {
+    test_timer->Schedule(loop2->monotonic_now(),
+                         ::std::chrono::milliseconds(100));
+  });
+
+  Run();
+  EXPECT_EQ(0, values.size());
+}
+
 // Tests that Fetch returns the last message created before the loop was
 // started.
 TEST_P(AbstractEventLoopTest, FetchDataFromBeforeCreation) {
@@ -493,16 +591,10 @@
 
   ::std::vector<int> values;
 
-  {
+  for (int i = 200; i < 202; ++i) {
     aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
     TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-    builder.add_value(200);
-    msg.CheckOk(msg.Send(builder.Finish()));
-  }
-  {
-    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
-    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-    builder.add_value(201);
+    builder.add_value(i);
     msg.CheckOk(msg.Send(builder.Finish()));
   }
 
@@ -529,6 +621,50 @@
   EXPECT_THAT(values, ::testing::ElementsAreArray({201}));
 }
 
+// Tests that FetchIf returns the last message created before the loop was
+// started.
+TEST_P(AbstractEventLoopTest, FetchIfDataFromBeforeCreation) {
+  auto loop1 = Make();
+  auto loop2 = MakePrimary();
+
+  auto sender = loop1->MakeSender<TestMessage>("/test");
+
+  ::std::vector<int> values;
+
+  for (int i = 200; i < 202; ++i) {
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(i);
+    msg.CheckOk(msg.Send(builder.Finish()));
+  }
+
+  auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
+
+  // Add a timer to actually quit.
+  auto test_timer = loop2->AddTimer([&fetcher, &values, this]() {
+    if (fetcher.FetchIf(MakeShouldFetch(true))) {
+      values.push_back(fetcher.get()->value());
+    }
+
+    if (fetcher.FetchIf(MakeShouldFetch(false))) {
+      values.push_back(fetcher.get()->value());
+    }
+    // Do it again to make sure we don't double fetch.
+    if (fetcher.FetchIf(MakeShouldFetch(true))) {
+      values.push_back(fetcher.get()->value());
+    }
+    this->Exit();
+  });
+
+  loop2->OnRun([&test_timer, &loop2]() {
+    test_timer->Schedule(loop2->monotonic_now(),
+                         ::std::chrono::milliseconds(100));
+  });
+
+  Run();
+  EXPECT_THAT(values, ::testing::ElementsAreArray({201}));
+}
+
 // Tests that timer handler is enabled after setup (even if it is in the past)
 // and is disabled after running
 TEST_P(AbstractEventLoopTest, CheckTimerDisabled) {
@@ -636,16 +772,10 @@
 
   ::std::vector<int> values;
 
-  {
+  for (int i = 200; i < 202; ++i) {
     aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
     TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-    builder.add_value(200);
-    msg.CheckOk(msg.Send(builder.Finish()));
-  }
-  {
-    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
-    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-    builder.add_value(201);
+    builder.add_value(i);
     msg.CheckOk(msg.Send(builder.Finish()));
   }
 
@@ -657,22 +787,10 @@
       values.push_back(fetcher.get()->value());
     }
 
-    {
+    for (int i = 202; i < 205; ++i) {
       aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
       TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-      builder.add_value(202);
-      msg.CheckOk(msg.Send(builder.Finish()));
-    }
-    {
-      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
-      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-      builder.add_value(203);
-      msg.CheckOk(msg.Send(builder.Finish()));
-    }
-    {
-      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
-      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
-      builder.add_value(204);
+      builder.add_value(i);
       msg.CheckOk(msg.Send(builder.Finish()));
     }
 
@@ -696,6 +814,62 @@
   EXPECT_THAT(values, ::testing::ElementsAreArray({201, 202, 204}));
 }
 
+// Tests that Fetch{If,} and FetchNext{If,} interleave as expected.
+TEST_P(AbstractEventLoopTest, FetchAndFetchNextIfTogether) {
+  auto loop1 = Make();
+  auto loop2 = MakePrimary();
+
+  auto sender = loop1->MakeSender<TestMessage>("/test");
+
+  ::std::vector<int> values;
+
+  for (int i = 200; i < 202; ++i) {
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(i);
+    msg.CheckOk(msg.Send(builder.Finish()));
+  }
+
+  auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
+
+  // Add a timer to actually quit.
+  auto test_timer = loop2->AddTimer([&fetcher, &values, &sender, this]() {
+    if (fetcher.Fetch()) {
+      values.push_back(fetcher.get()->value());
+    }
+
+    for (int i = 202; i < 205; ++i) {
+      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+      builder.add_value(i);
+      msg.CheckOk(msg.Send(builder.Finish()));
+    }
+
+    EXPECT_FALSE(fetcher.FetchNextIf(MakeShouldFetch(false)));
+
+    if (fetcher.FetchNext()) {
+      values.push_back(fetcher.get()->value());
+    }
+
+    EXPECT_FALSE(fetcher.FetchNextIf(MakeShouldFetch(false)));
+    EXPECT_FALSE(fetcher.FetchIf(MakeShouldFetch(false)));
+
+    if (fetcher.FetchIf(MakeShouldFetch(true))) {
+      values.push_back(fetcher.get()->value());
+    }
+
+    this->Exit();
+  });
+
+  loop2->OnRun([&test_timer, &loop2]() {
+    test_timer->Schedule(loop2->monotonic_now(),
+                         ::std::chrono::milliseconds(100));
+  });
+
+  Run();
+  EXPECT_THAT(values, ::testing::ElementsAreArray({201, 202, 204}));
+}
+
 // Tests that FetchNext behaves correctly when we get two messages in the queue
 // but don't consume the first until after the second has been sent.
 TEST_P(AbstractEventLoopTest, FetchNextTest) {
@@ -732,6 +906,49 @@
   EXPECT_EQ(200, fetcher.get()->value());
 }
 
+// Tests that FetchNext behaves correctly when we get two messages in the queue
+// but don't consume the first until after the second has been sent.
+TEST_P(AbstractEventLoopTest, FetchNextIfTest) {
+  auto send_loop = Make();
+  auto fetch_loop = Make();
+  auto sender = send_loop->MakeSender<TestMessage>("/test");
+  Fetcher<TestMessage> fetcher = fetch_loop->MakeFetcher<TestMessage>("/test");
+
+  {
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(100);
+    msg.CheckOk(msg.Send(builder.Finish()));
+  }
+
+  {
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    builder.add_value(200);
+    msg.CheckOk(msg.Send(builder.Finish()));
+  }
+
+  size_t called_count = 0;
+  ASSERT_TRUE(fetcher.FetchNextIf(MakeShouldFetch(true, &called_count)));
+  ASSERT_NE(nullptr, fetcher.get());
+  EXPECT_EQ(100, fetcher.get()->value());
+  EXPECT_EQ(called_count, 1u);
+
+  ASSERT_FALSE(fetcher.FetchNextIf(MakeShouldFetch(false, &called_count)));
+  EXPECT_EQ(called_count, 2u);
+
+  ASSERT_TRUE(fetcher.FetchNextIf(MakeShouldFetch(true, &called_count)));
+  ASSERT_NE(nullptr, fetcher.get());
+  EXPECT_EQ(200, fetcher.get()->value());
+  EXPECT_EQ(called_count, 3u);
+
+  // When we run off the end of the queue, expect to still have the old message:
+  ASSERT_FALSE(fetcher.FetchNextIf(MakeShouldFetch(false, &called_count)));
+  EXPECT_EQ(called_count, 3u);
+  ASSERT_NE(nullptr, fetcher.get());
+  EXPECT_EQ(200, fetcher.get()->value());
+}
+
 // Verify that a fetcher still holds its data, even after falling behind.
 TEST_P(AbstractEventLoopTest, FetcherBehindData) {
   auto send_loop = Make();