Implement simulated FetchNext and fix its behavior

Simulation was missing FetchNext, so let's add that.  And since this is
core infrastructure, add tests.

These tests caught that ShmEventLoop was fetching messages sent before
the fetcher was constructed.  This behavior is confusing and wrong.  So
fix it.

Change-Id: I03c79731688a150e30ba871dee13b7c9c68e75b7
diff --git a/aos/events/simulated-event-loop.h b/aos/events/simulated-event-loop.h
index 03b6b5a..6ee7d92 100644
--- a/aos/events/simulated-event-loop.h
+++ b/aos/events/simulated-event-loop.h
@@ -45,7 +45,9 @@
 
   RefCountedBuffer(const RefCountedBuffer &other) {
     data_ = other.data_;
-    ++*GetRefCount();
+    if (data_ != nullptr) {
+      ++*GetRefCount();
+    }
   }
 
   RefCountedBuffer(RefCountedBuffer &&other) { std::swap(data_, other.data_); }
@@ -64,6 +66,8 @@
     return *this;
   }
 
+  operator bool() const { return data_ != nullptr; }
+
   aos::Message *get() const { return static_cast<aos::Message *>(data_); }
 
   aos::Message *release() {
@@ -133,41 +137,48 @@
   ::std::vector<RawEventLoop *> raw_event_loops_;
 };
 
+// Class for simulated fetchers.
+class SimulatedFetcher;
+
 class SimulatedQueue {
  public:
   explicit SimulatedQueue(const QueueTypeInfo &type, const ::std::string &name,
                           EventScheduler *scheduler)
       : type_(type), name_(name), scheduler_(scheduler){};
 
+  ~SimulatedQueue() { CHECK_EQ(0u, fetchers_.size()); }
+
+  // Makes a connected raw sender which calls Send below.
   ::std::unique_ptr<RawSender> MakeRawSender(EventLoop *event_loop);
 
+  // Makes a connected raw fetcher.
   ::std::unique_ptr<RawFetcher> MakeRawFetcher();
 
+  // Registers a watcher for the queue.
   void MakeRawWatcher(
       ::std::function<void(const ::aos::Message *message)> watcher);
 
-  void Send(RefCountedBuffer message) {
-    index_++;
-    latest_message_ = message;
-    for (auto &watcher : watchers_) {
-      scheduler_->Schedule(scheduler_->monotonic_now(),
-                           [watcher, message]() { watcher(message.get()); });
-    }
-  }
+  // Sends the message to all the connected receivers and fetchers.
+  void Send(RefCountedBuffer message);
+
+  // Unregisters a fetcher.
+  void UnregisterFetcher(SimulatedFetcher *fetcher);
 
   const RefCountedBuffer &latest_message() { return latest_message_; }
 
-  int64_t index() { return index_; }
-
-  size_t size() { return type_.size; }
+  size_t size() const { return type_.size; }
 
   const char *name() const { return name_.c_str(); }
 
  private:
-  int64_t index_ = -1;
-  QueueTypeInfo type_;
+  const QueueTypeInfo type_;
   const ::std::string name_;
+
+  // List of all watchers.
   ::std::vector<std::function<void(const aos::Message *message)>> watchers_;
+
+  // List of all fetchers.
+  ::std::vector<SimulatedFetcher *> fetchers_;
   RefCountedBuffer latest_message_;
   EventScheduler *scheduler_;
 };