Add FetchNext to ShmEventLoop

I need this as a stop-gap until watchers work in simulation so that I
can receive every camera frame without using watchers.

Change-Id: Ic2c87fc11e69952ff8028e1484f5b236afdae70e
diff --git a/aos/events/BUILD b/aos/events/BUILD
index a5e83a9..e0e6bad 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -73,5 +73,6 @@
     deps = [
         ":event-loop",
         "//aos:queues",
+        "//aos/logging",
     ],
 )
diff --git a/aos/events/event-loop.h b/aos/events/event-loop.h
index 0590d7b..90eb721 100644
--- a/aos/events/event-loop.h
+++ b/aos/events/event-loop.h
@@ -13,6 +13,8 @@
 class Fetcher {
  public:
   Fetcher() {}
+  // Fetches the next message. Returns whether it fetched a new message.
+  bool FetchNext() { return fetcher_->FetchNext(); }
   // Fetches the most recent message. Returns whether it fetched a new message.
   bool Fetch() { return fetcher_->Fetch(); }
 
diff --git a/aos/events/raw-event-loop.h b/aos/events/raw-event-loop.h
index a903e93..d12187a 100644
--- a/aos/events/raw-event-loop.h
+++ b/aos/events/raw-event-loop.h
@@ -23,6 +23,10 @@
   RawFetcher() {}
   virtual ~RawFetcher() {}
 
+  // Non-blocking fetch of the next message in the queue. Returns true if there
+  // was a new message and we got it.
+  virtual bool FetchNext() = 0;
+  // Non-blocking fetch of the latest message:
   virtual bool Fetch() = 0;
 
   const FetchValue *most_recent() { return most_recent_; }
diff --git a/aos/events/shm-event-loop.cc b/aos/events/shm-event-loop.cc
index 781c963..a70d6f3 100644
--- a/aos/events/shm-event-loop.cc
+++ b/aos/events/shm-event-loop.cc
@@ -23,7 +23,19 @@
     }
   }
 
-  bool Fetch() {
+  bool FetchNext() override {
+    const FetchValue *msg = static_cast<const FetchValue *>(
+        queue_->ReadMessageIndex(RawQueue::kNonBlock, &index_));
+    // Only update the internal pointer if we got a new message.
+    if (msg != NULL) {
+      queue_->FreeMessage(msg_);
+      msg_ = msg;
+      set_most_recent(msg_);
+    }
+    return msg != NULL;
+  }
+
+  bool Fetch() override {
     static constexpr Options<RawQueue> kOptions =
         RawQueue::kFromEnd | RawQueue::kNonBlock;
     const FetchValue *msg = static_cast<const FetchValue *>(
diff --git a/aos/events/shm-event-loop_test.cc b/aos/events/shm-event-loop_test.cc
index 544adba..d6f493f 100644
--- a/aos/events/shm-event-loop_test.cc
+++ b/aos/events/shm-event-loop_test.cc
@@ -22,6 +22,54 @@
                           return new ShmEventLoopTestFactory();
                         }));
 
+struct TestMessage : public ::aos::Message {
+  enum { kQueueLength = 100, kHash = 0x696c0cdc };
+  int msg_value;
+
+  void Zero() { msg_value = 0; }
+  static size_t Size() { return 1 + ::aos::Message::Size(); }
+  size_t Print(char *buffer, size_t length) const;
+  TestMessage() { Zero(); }
+};
+
 }  // namespace
+
+// Tests that FetchNext behaves correctly when we get two messages in the queue
+// but don't consume the first until after the second has been sent.
+// This cannot be abstracted to AbstractEventLoopTest because not all
+// event loops currently support FetchNext().
+TEST(ShmEventLoopTest, FetchNextTest) {
+  ::aos::testing::TestSharedMemory my_shm;
+
+  ShmEventLoop send_loop;
+  ShmEventLoop fetch_loop;
+  auto sender = send_loop.MakeSender<TestMessage>("/test");
+  Fetcher<TestMessage> fetcher = fetch_loop.MakeFetcher<TestMessage>("/test");
+
+  {
+    auto msg = sender.MakeMessage();
+    msg->msg_value = 100;
+    ASSERT_TRUE(msg.Send());
+  }
+
+  {
+    auto msg = sender.MakeMessage();
+    msg->msg_value = 200;
+    ASSERT_TRUE(msg.Send());
+  }
+
+  ASSERT_TRUE(fetcher.FetchNext());
+  ASSERT_NE(nullptr, fetcher.get());
+  EXPECT_EQ(100, fetcher->msg_value);
+
+  ASSERT_TRUE(fetcher.FetchNext());
+  ASSERT_NE(nullptr, fetcher.get());
+  EXPECT_EQ(200, fetcher->msg_value);
+
+  // When we run off the end of the queue, expect to still have the old message:
+  ASSERT_FALSE(fetcher.FetchNext());
+  ASSERT_NE(nullptr, fetcher.get());
+  EXPECT_EQ(200, fetcher->msg_value);
+}
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/events/simulated-event-loop.cc b/aos/events/simulated-event-loop.cc
index a23b18c..5fe1bf5 100644
--- a/aos/events/simulated-event-loop.cc
+++ b/aos/events/simulated-event-loop.cc
@@ -2,6 +2,7 @@
 
 #include <algorithm>
 
+#include "aos/logging/logging.h"
 #include "aos/queue.h"
 
 namespace aos {
@@ -11,6 +12,11 @@
   explicit SimulatedFetcher(SimulatedQueue *queue) : queue_(queue) {}
   ~SimulatedFetcher() {}
 
+  bool FetchNext() override {
+    LOG(FATAL, "Simulated event loops do not support FetchNext.");
+    return false;
+  }
+
   bool Fetch() override {
     if (index_ == queue_->index()) return false;