Add test and fix watcher startup behavior.

The desired behavior for a watcher is that it will trigger for all
messages as they come in *after* the watcher is created.  We don't want
to get messages from before we are constructed, and don't want to miss
any.

Fix the behavior and add tests to confirm.

Change-Id: I81717078f18835d7d15f6e5461f2d66ac58bcfb7
diff --git a/aos/events/shm-event-loop.cc b/aos/events/shm-event-loop.cc
index c9ff6fd..f55939c 100644
--- a/aos/events/shm-event-loop.cc
+++ b/aos/events/shm-event-loop.cc
@@ -97,24 +97,25 @@
                      std::function<void(const aos::Message *message)> watcher)
       : thread_state_(std::move(thread_state)),
         queue_(queue),
-        watcher_(std::move(watcher)) {}
+        index_(0),
+        watcher_(std::move(watcher)) {
+    static constexpr Options<RawQueue> kOptions =
+        RawQueue::kFromEnd | RawQueue::kNonBlock;
+    const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
+    if (msg) {
+      queue_->FreeMessage(msg);
+    }
+  }
 
   void Run() {
     thread_state_->WaitForStart();
 
     if (!thread_state_->is_running()) return;
 
-    int32_t index = 0;
-
-    static constexpr Options<RawQueue> kOptions =
-        RawQueue::kFromEnd | RawQueue::kNonBlock;
-    const void *msg = queue_->ReadMessageIndex(kOptions, &index);
-
+    const void *msg = nullptr;
     while (true) {
-      if (msg == nullptr) {
-        msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index);
-        assert(msg != nullptr);
-      }
+      msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_);
+      assert(msg != nullptr);
 
       {
         MutexLocker locker(&thread_state_->mutex_);
@@ -125,7 +126,6 @@
         if (!thread_state_->is_running()) break;
       }
       queue_->FreeMessage(msg);
-      msg = nullptr;
     }
 
     queue_->FreeMessage(msg);
@@ -134,6 +134,8 @@
  private:
   std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
   RawQueue *queue_;
+  int32_t index_;
+
   std::function<void(const Message *message)> watcher_;
 };