Implement simulated FetchNext and fix its behavior

Simulation was missing FetchNext, so let's add that.  And since this is
core infrastructure, add tests.

These tests caught that ShmEventLoop was fetching messages sent before
the fetcher was constructed.  This behavior is confusing and wrong.  So
fix it.

Change-Id: I03c79731688a150e30ba871dee13b7c9c68e75b7
diff --git a/aos/events/event-loop.h b/aos/events/event-loop.h
index a605cf2..bedf9a5 100644
--- a/aos/events/event-loop.h
+++ b/aos/events/event-loop.h
@@ -13,9 +13,12 @@
 class Fetcher {
  public:
   Fetcher() {}
-  // Fetches the next message. Returns whether it fetched a new message.
+  // Fetches the next message. Returns true if it fetched a new message.  This
+  // method will only return messages sent after the Fetcher was created.
   bool FetchNext() { return fetcher_->FetchNext(); }
-  // Fetches the most recent message. Returns whether it fetched a new message.
+  // Fetches the most recent message. Returns true if it fetched a new message.
+  // This will return the latest message regardless of if it was sent before or
+  // after the fetcher was created.
   bool Fetch() { return fetcher_->Fetch(); }
 
   const T *get() const {
diff --git a/aos/events/event-loop_param_test.cc b/aos/events/event-loop_param_test.cc
index f8eb5da..371cb78 100644
--- a/aos/events/event-loop_param_test.cc
+++ b/aos/events/event-loop_param_test.cc
@@ -37,6 +37,8 @@
 
   auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
 
+  EXPECT_FALSE(fetcher.Fetch());
+
   bool happened = false;
 
   loop3->OnRun([&]() { happened = true; });
@@ -70,7 +72,6 @@
   ::std::vector<int> values;
 
   loop2->MakeWatcher("/test", [&](const TestMessage &message) {
-    fprintf(stderr, "Got a message\n");
     values.push_back(message.msg_value);
     if (values.size() == 2) {
       loop2->Exit();
@@ -128,6 +129,187 @@
   EXPECT_EQ(0, values.size());
 }
 
+// Tests that FetchNext gets all the messages sent after it is constructed.
+TEST_P(AbstractEventLoopTest, FetchNext) {
+  auto loop1 = Make();
+  auto loop2 = MakePrimary();
+
+  auto sender = loop1->MakeSender<TestMessage>("/test");
+  auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
+
+  ::std::vector<int> values;
+
+  {
+    auto msg = sender.MakeMessage();
+    msg->msg_value = 200;
+    msg.Send();
+  }
+  {
+    auto msg = sender.MakeMessage();
+    msg->msg_value = 201;
+    msg.Send();
+  }
+
+  // Add a timer to actually quit.
+  auto test_timer = loop2->AddTimer([&loop2, &fetcher, &values]() {
+    while (fetcher.FetchNext()) {
+      values.push_back(fetcher->msg_value);
+    }
+    loop2->Exit();
+  });
+
+  loop2->OnRun([&test_timer, &loop2]() {
+    test_timer->Setup(loop2->monotonic_now(), ::std::chrono::milliseconds(100));
+  });
+
+  Run();
+  EXPECT_THAT(values, ::testing::ElementsAreArray({200, 201}));
+}
+
+// Tests that FetchNext gets no messages sent before it is constructed.
+TEST_P(AbstractEventLoopTest, FetchNextAfterSend) {
+  auto loop1 = Make();
+  auto loop2 = MakePrimary();
+
+  auto sender = loop1->MakeSender<TestMessage>("/test");
+
+  ::std::vector<int> values;
+
+  {
+    auto msg = sender.MakeMessage();
+    msg->msg_value = 200;
+    msg.Send();
+  }
+  {
+    auto msg = sender.MakeMessage();
+    msg->msg_value = 201;
+    msg.Send();
+  }
+
+  auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
+
+  // Add a timer to actually quit.
+  auto test_timer = loop2->AddTimer([&loop2, &fetcher, &values]() {
+    while (fetcher.FetchNext()) {
+      values.push_back(fetcher->msg_value);
+    }
+    loop2->Exit();
+  });
+
+  loop2->OnRun([&test_timer, &loop2]() {
+    test_timer->Setup(loop2->monotonic_now(), ::std::chrono::milliseconds(100));
+  });
+
+  Run();
+  EXPECT_THAT(0, values.size());
+}
+
+// Tests that Fetch returns the last message created before the loop was
+// started.
+TEST_P(AbstractEventLoopTest, FetchDataFromBeforeCreation) {
+  auto loop1 = Make();
+  auto loop2 = MakePrimary();
+
+  auto sender = loop1->MakeSender<TestMessage>("/test");
+
+  ::std::vector<int> values;
+
+  {
+    auto msg = sender.MakeMessage();
+    msg->msg_value = 200;
+    msg.Send();
+  }
+  {
+    auto msg = sender.MakeMessage();
+    msg->msg_value = 201;
+    msg.Send();
+  }
+
+  auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
+
+  // Add a timer to actually quit.
+  auto test_timer = loop2->AddTimer([&loop2, &fetcher, &values]() {
+    if (fetcher.Fetch()) {
+      values.push_back(fetcher->msg_value);
+    }
+    // Do it again to make sure we don't double fetch.
+    if (fetcher.Fetch()) {
+      values.push_back(fetcher->msg_value);
+    }
+    loop2->Exit();
+  });
+
+  loop2->OnRun([&test_timer, &loop2]() {
+    test_timer->Setup(loop2->monotonic_now(), ::std::chrono::milliseconds(100));
+  });
+
+  Run();
+  EXPECT_THAT(values, ::testing::ElementsAreArray({201}));
+}
+
+// Tests that Fetch and FetchNext interleave as expected.
+TEST_P(AbstractEventLoopTest, FetchAndFetchNextTogether) {
+  auto loop1 = Make();
+  auto loop2 = MakePrimary();
+
+  auto sender = loop1->MakeSender<TestMessage>("/test");
+
+  ::std::vector<int> values;
+
+  {
+    auto msg = sender.MakeMessage();
+    msg->msg_value = 200;
+    msg.Send();
+  }
+  {
+    auto msg = sender.MakeMessage();
+    msg->msg_value = 201;
+    msg.Send();
+  }
+
+  auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
+
+  // Add a timer to actually quit.
+  auto test_timer = loop2->AddTimer([&loop2, &fetcher, &values, &sender]() {
+    if (fetcher.Fetch()) {
+      values.push_back(fetcher->msg_value);
+    }
+
+    {
+      auto msg = sender.MakeMessage();
+      msg->msg_value = 202;
+      msg.Send();
+    }
+    {
+      auto msg = sender.MakeMessage();
+      msg->msg_value = 203;
+      msg.Send();
+    }
+    {
+      auto msg = sender.MakeMessage();
+      msg->msg_value = 204;
+      msg.Send();
+    }
+
+    if (fetcher.FetchNext()) {
+      values.push_back(fetcher->msg_value);
+    }
+
+    if (fetcher.Fetch()) {
+      values.push_back(fetcher->msg_value);
+    }
+
+    loop2->Exit();
+  });
+
+  loop2->OnRun([&test_timer, &loop2]() {
+    test_timer->Setup(loop2->monotonic_now(), ::std::chrono::milliseconds(100));
+  });
+
+  Run();
+  EXPECT_THAT(values, ::testing::ElementsAreArray({201, 202, 204}));
+}
+
 // Verify that making a fetcher and watcher for "/test" succeeds.
 TEST_P(AbstractEventLoopTest, FetcherAndWatcher) {
   auto loop = Make();
diff --git a/aos/events/shm-event-loop.cc b/aos/events/shm-event-loop.cc
index c4c2c88..11d9312 100644
--- a/aos/events/shm-event-loop.cc
+++ b/aos/events/shm-event-loop.cc
@@ -16,7 +16,14 @@
 namespace {
 class ShmFetcher : public RawFetcher {
  public:
-  explicit ShmFetcher(RawQueue *queue) : queue_(queue) {}
+  explicit ShmFetcher(RawQueue *queue) : queue_(queue) {
+    // Move index_ to point to the end of the queue as it is at construction
+    // time.  Also grab the oldest message but don't expose it to the user yet.
+    static constexpr Options<RawQueue> kOptions =
+        RawQueue::kFromEnd | RawQueue::kNonBlock;
+    msg_ = static_cast<const FetchValue *>(
+        queue_->ReadMessageIndex(kOptions, &index_));
+  }
   ~ShmFetcher() {
     if (msg_) {
       queue_->FreeMessage(msg_);
@@ -27,12 +34,12 @@
     const FetchValue *msg = static_cast<const FetchValue *>(
         queue_->ReadMessageIndex(RawQueue::kNonBlock, &index_));
     // Only update the internal pointer if we got a new message.
-    if (msg != NULL) {
+    if (msg != nullptr) {
       queue_->FreeMessage(msg_);
       msg_ = msg;
       set_most_recent(msg_);
     }
-    return msg != NULL;
+    return msg != nullptr;
   }
 
   bool Fetch() override {
@@ -41,16 +48,24 @@
     const FetchValue *msg = static_cast<const FetchValue *>(
         queue_->ReadMessageIndex(kOptions, &index_));
     // Only update the internal pointer if we got a new message.
-    if (msg != NULL && msg != msg_) {
+    if (msg != nullptr && msg != msg_) {
       queue_->FreeMessage(msg_);
       msg_ = msg;
       set_most_recent(msg_);
       return true;
+    } else {
+      // The message has to get freed if we didn't use it (and
+      // RawQueue::FreeMessage is ok to call on nullptr).
+      queue_->FreeMessage(msg);
+
+      // We have a message from construction time.  Give it to the user now.
+      if (msg_ != nullptr && most_recent() != msg_) {
+        set_most_recent(msg_);
+        return true;
+      } else {
+        return false;
+      }
     }
-    // The message has to get freed if we didn't use it (and
-    // RawQueue::FreeMessage is ok to call on NULL).
-    queue_->FreeMessage(msg);
-    return false;
   }
 
  private:
@@ -70,7 +85,7 @@
   void Free(aos::Message *msg) override { queue_->FreeMessage(msg); }
 
   bool Send(aos::Message *msg) override {
-    assert(queue_ != NULL);
+    assert(queue_ != nullptr);
     {
       // TODO(austin): This lets multiple senders reorder messages since time
       // isn't acquired with a lock held.
diff --git a/aos/events/simulated-event-loop.cc b/aos/events/simulated-event-loop.cc
index 9410b96..ce08a00 100644
--- a/aos/events/simulated-event-loop.cc
+++ b/aos/events/simulated-event-loop.cc
@@ -1,37 +1,13 @@
 #include "aos/events/simulated-event-loop.h"
 
 #include <algorithm>
+#include <deque>
 
 #include "aos/logging/logging.h"
 #include "aos/queue.h"
 
 namespace aos {
 namespace {
-class SimulatedFetcher : public RawFetcher {
- public:
-  explicit SimulatedFetcher(SimulatedQueue *queue) : queue_(queue) {}
-  ~SimulatedFetcher() {}
-
-  bool FetchNext() override {
-    LOG(FATAL, "Simulated event loops do not support FetchNext.");
-    return false;
-  }
-
-  bool Fetch() override {
-    if (index_ == queue_->index()) return false;
-
-    // Fetched message is newer
-    msg_ = queue_->latest_message();
-    index_ = queue_->index();
-    set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
-    return true;
-  }
-
- private:
-  int64_t index_ = -1;
-  SimulatedQueue *queue_;
-  RefCountedBuffer msg_;
-};
 
 class SimulatedSender : public RawSender {
  public:
@@ -52,7 +28,7 @@
       }
     }
     queue_->Send(RefCountedBuffer(msg));
-    return true;  // Maybe false instead? :)
+    return true;
   }
 
   const char *name() const override { return queue_->name(); }
@@ -63,6 +39,55 @@
 };
 }  // namespace
 
+class SimulatedFetcher : public RawFetcher {
+ public:
+  explicit SimulatedFetcher(SimulatedQueue *queue) : queue_(queue) {}
+  ~SimulatedFetcher() { queue_->UnregisterFetcher(this); }
+
+  bool FetchNext() override {
+    if (msgs_.size() == 0) return false;
+
+    msg_ = msgs_.front();
+    msgs_.pop_front();
+    set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
+    return true;
+  }
+
+  bool Fetch() override {
+    if (msgs_.size() == 0) {
+      if (!msg_ && queue_->latest_message()) {
+        msg_ = queue_->latest_message();
+        set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    // We've had a message enqueued, so we don't need to go looking for the
+    // latest message from before we started.
+    msg_ = msgs_.back();
+    msgs_.clear();
+    set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
+    return true;
+  }
+
+ private:
+  friend class SimulatedQueue;
+
+  // Internal method for Simulation to add a message to the buffer.
+  void Enqueue(RefCountedBuffer buffer) {
+    msgs_.emplace_back(buffer);
+  }
+
+  SimulatedQueue *queue_;
+  RefCountedBuffer msg_;
+
+  // Messages queued up but not in use.
+  ::std::deque<RefCountedBuffer> msgs_;
+};
+
+
 class SimulatedTimerHandler : public TimerHandler {
  public:
   explicit SimulatedTimerHandler(EventScheduler *scheduler,
@@ -256,17 +281,34 @@
 }
 
 void SimulatedQueue::MakeRawWatcher(
-    std::function<void(const aos::Message *message)> watcher) {
+    ::std::function<void(const aos::Message *message)> watcher) {
   watchers_.push_back(watcher);
 }
 
-std::unique_ptr<RawSender> SimulatedQueue::MakeRawSender(
+::std::unique_ptr<RawSender> SimulatedQueue::MakeRawSender(
     EventLoop *event_loop) {
-  return std::unique_ptr<RawSender>(new SimulatedSender(this, event_loop));
+  return ::std::unique_ptr<RawSender>(new SimulatedSender(this, event_loop));
 }
 
-std::unique_ptr<RawFetcher> SimulatedQueue::MakeRawFetcher() {
-  return std::unique_ptr<RawFetcher>(new SimulatedFetcher(this));
+::std::unique_ptr<RawFetcher> SimulatedQueue::MakeRawFetcher() {
+  ::std::unique_ptr<SimulatedFetcher> fetcher(new SimulatedFetcher(this));
+  fetchers_.push_back(fetcher.get());
+  return ::std::move(fetcher);
+}
+
+void SimulatedQueue::Send(RefCountedBuffer message) {
+  latest_message_ = message;
+  for (auto &watcher : watchers_) {
+    scheduler_->Schedule(scheduler_->monotonic_now(),
+                         [watcher, message]() { watcher(message.get()); });
+  }
+  for (auto &fetcher : fetchers_) {
+    fetcher->Enqueue(message);
+  }
+}
+
+void SimulatedQueue::UnregisterFetcher(SimulatedFetcher *fetcher) {
+  fetchers_.erase(::std::find(fetchers_.begin(), fetchers_.end(), fetcher));
 }
 
 void SimulatedEventLoop::Take(const ::std::string &path) {
diff --git a/aos/events/simulated-event-loop.h b/aos/events/simulated-event-loop.h
index 03b6b5a..6ee7d92 100644
--- a/aos/events/simulated-event-loop.h
+++ b/aos/events/simulated-event-loop.h
@@ -45,7 +45,9 @@
 
   RefCountedBuffer(const RefCountedBuffer &other) {
     data_ = other.data_;
-    ++*GetRefCount();
+    if (data_ != nullptr) {
+      ++*GetRefCount();
+    }
   }
 
   RefCountedBuffer(RefCountedBuffer &&other) { std::swap(data_, other.data_); }
@@ -64,6 +66,8 @@
     return *this;
   }
 
+  operator bool() const { return data_ != nullptr; }
+
   aos::Message *get() const { return static_cast<aos::Message *>(data_); }
 
   aos::Message *release() {
@@ -133,41 +137,48 @@
   ::std::vector<RawEventLoop *> raw_event_loops_;
 };
 
+// Class for simulated fetchers.
+class SimulatedFetcher;
+
 class SimulatedQueue {
  public:
   explicit SimulatedQueue(const QueueTypeInfo &type, const ::std::string &name,
                           EventScheduler *scheduler)
       : type_(type), name_(name), scheduler_(scheduler){};
 
+  ~SimulatedQueue() { CHECK_EQ(0u, fetchers_.size()); }
+
+  // Makes a connected raw sender which calls Send below.
   ::std::unique_ptr<RawSender> MakeRawSender(EventLoop *event_loop);
 
+  // Makes a connected raw fetcher.
   ::std::unique_ptr<RawFetcher> MakeRawFetcher();
 
+  // Registers a watcher for the queue.
   void MakeRawWatcher(
       ::std::function<void(const ::aos::Message *message)> watcher);
 
-  void Send(RefCountedBuffer message) {
-    index_++;
-    latest_message_ = message;
-    for (auto &watcher : watchers_) {
-      scheduler_->Schedule(scheduler_->monotonic_now(),
-                           [watcher, message]() { watcher(message.get()); });
-    }
-  }
+  // Sends the message to all the connected receivers and fetchers.
+  void Send(RefCountedBuffer message);
+
+  // Unregisters a fetcher.
+  void UnregisterFetcher(SimulatedFetcher *fetcher);
 
   const RefCountedBuffer &latest_message() { return latest_message_; }
 
-  int64_t index() { return index_; }
-
-  size_t size() { return type_.size; }
+  size_t size() const { return type_.size; }
 
   const char *name() const { return name_.c_str(); }
 
  private:
-  int64_t index_ = -1;
-  QueueTypeInfo type_;
+  const QueueTypeInfo type_;
   const ::std::string name_;
+
+  // List of all watchers.
   ::std::vector<std::function<void(const aos::Message *message)>> watchers_;
+
+  // List of all fetchers.
+  ::std::vector<SimulatedFetcher *> fetchers_;
   RefCountedBuffer latest_message_;
   EventScheduler *scheduler_;
 };