Implement simulated FetchNext and fix its behavior

Simulation was missing FetchNext, so let's add that.  And since this is
core infrastructure, add tests.

These tests caught that ShmEventLoop was fetching messages sent before
the fetcher was constructed.  This behavior is confusing and wrong.  So
fix it.

Change-Id: I03c79731688a150e30ba871dee13b7c9c68e75b7
diff --git a/aos/events/simulated-event-loop.cc b/aos/events/simulated-event-loop.cc
index 9410b96..ce08a00 100644
--- a/aos/events/simulated-event-loop.cc
+++ b/aos/events/simulated-event-loop.cc
@@ -1,37 +1,13 @@
 #include "aos/events/simulated-event-loop.h"
 
 #include <algorithm>
+#include <deque>
 
 #include "aos/logging/logging.h"
 #include "aos/queue.h"
 
 namespace aos {
 namespace {
-class SimulatedFetcher : public RawFetcher {
- public:
-  explicit SimulatedFetcher(SimulatedQueue *queue) : queue_(queue) {}
-  ~SimulatedFetcher() {}
-
-  bool FetchNext() override {
-    LOG(FATAL, "Simulated event loops do not support FetchNext.");
-    return false;
-  }
-
-  bool Fetch() override {
-    if (index_ == queue_->index()) return false;
-
-    // Fetched message is newer
-    msg_ = queue_->latest_message();
-    index_ = queue_->index();
-    set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
-    return true;
-  }
-
- private:
-  int64_t index_ = -1;
-  SimulatedQueue *queue_;
-  RefCountedBuffer msg_;
-};
 
 class SimulatedSender : public RawSender {
  public:
@@ -52,7 +28,7 @@
       }
     }
     queue_->Send(RefCountedBuffer(msg));
-    return true;  // Maybe false instead? :)
+    return true;
   }
 
   const char *name() const override { return queue_->name(); }
@@ -63,6 +39,55 @@
 };
 }  // namespace
 
+class SimulatedFetcher : public RawFetcher {
+ public:
+  explicit SimulatedFetcher(SimulatedQueue *queue) : queue_(queue) {}
+  ~SimulatedFetcher() { queue_->UnregisterFetcher(this); }
+
+  bool FetchNext() override {
+    if (msgs_.size() == 0) return false;
+
+    msg_ = msgs_.front();
+    msgs_.pop_front();
+    set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
+    return true;
+  }
+
+  bool Fetch() override {
+    if (msgs_.size() == 0) {
+      if (!msg_ && queue_->latest_message()) {
+        msg_ = queue_->latest_message();
+        set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    // We've had a message enqueued, so we don't need to go looking for the
+    // latest message from before we started.
+    msg_ = msgs_.back();
+    msgs_.clear();
+    set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
+    return true;
+  }
+
+ private:
+  friend class SimulatedQueue;
+
+  // Internal method for Simulation to add a message to the buffer.
+  void Enqueue(RefCountedBuffer buffer) {
+    msgs_.emplace_back(buffer);
+  }
+
+  SimulatedQueue *queue_;
+  RefCountedBuffer msg_;
+
+  // Messages queued up but not in use.
+  ::std::deque<RefCountedBuffer> msgs_;
+};
+
+
 class SimulatedTimerHandler : public TimerHandler {
  public:
   explicit SimulatedTimerHandler(EventScheduler *scheduler,
@@ -256,17 +281,34 @@
 }
 
 void SimulatedQueue::MakeRawWatcher(
-    std::function<void(const aos::Message *message)> watcher) {
+    ::std::function<void(const aos::Message *message)> watcher) {
   watchers_.push_back(watcher);
 }
 
-std::unique_ptr<RawSender> SimulatedQueue::MakeRawSender(
+::std::unique_ptr<RawSender> SimulatedQueue::MakeRawSender(
     EventLoop *event_loop) {
-  return std::unique_ptr<RawSender>(new SimulatedSender(this, event_loop));
+  return ::std::unique_ptr<RawSender>(new SimulatedSender(this, event_loop));
 }
 
-std::unique_ptr<RawFetcher> SimulatedQueue::MakeRawFetcher() {
-  return std::unique_ptr<RawFetcher>(new SimulatedFetcher(this));
+::std::unique_ptr<RawFetcher> SimulatedQueue::MakeRawFetcher() {
+  ::std::unique_ptr<SimulatedFetcher> fetcher(new SimulatedFetcher(this));
+  fetchers_.push_back(fetcher.get());
+  return ::std::move(fetcher);
+}
+
+void SimulatedQueue::Send(RefCountedBuffer message) {
+  latest_message_ = message;
+  for (auto &watcher : watchers_) {
+    scheduler_->Schedule(scheduler_->monotonic_now(),
+                         [watcher, message]() { watcher(message.get()); });
+  }
+  for (auto &fetcher : fetchers_) {
+    fetcher->Enqueue(message);
+  }
+}
+
+void SimulatedQueue::UnregisterFetcher(SimulatedFetcher *fetcher) {
+  fetchers_.erase(::std::find(fetchers_.begin(), fetchers_.end(), fetcher));
 }
 
 void SimulatedEventLoop::Take(const ::std::string &path) {