Add test and fix watcher startup behavior.

The desired behavior for a watcher is that it will trigger for all
messages as they come in *after* the watcher is created.  We don't want
to get messages from before we are constructed, and don't want to miss
any.

Fix the behavior and add tests to confirm.

Change-Id: I81717078f18835d7d15f6e5461f2d66ac58bcfb7
diff --git a/aos/events/event-loop_param_test.cc b/aos/events/event-loop_param_test.cc
index 6f1e5be..f8eb5da 100644
--- a/aos/events/event-loop_param_test.cc
+++ b/aos/events/event-loop_param_test.cc
@@ -1,5 +1,8 @@
 #include "aos/events/event-loop_param_test.h"
 
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
 namespace aos {
 namespace testing {
 
@@ -34,16 +37,6 @@
 
   auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
 
-  auto msg = sender.MakeMessage();
-
-  msg->msg_value = 200;
-
-  msg.Send();
-
-  EXPECT_TRUE(fetcher.Fetch());
-  ASSERT_FALSE(fetcher.get() == nullptr);
-  EXPECT_EQ(fetcher->msg_value, 200);
-
   bool happened = false;
 
   loop3->OnRun([&]() { happened = true; });
@@ -53,11 +46,88 @@
     loop3->Exit();
   });
 
+  auto msg = sender.MakeMessage();
+  msg->msg_value = 200;
+  msg.Send();
+
+  EXPECT_TRUE(fetcher.Fetch());
+  ASSERT_FALSE(fetcher.get() == nullptr);
+  EXPECT_EQ(fetcher->msg_value, 200);
+
   EXPECT_FALSE(happened);
   Run();
   EXPECT_TRUE(happened);
 }
 
+// Tests that watcher will receive all messages sent if they are sent after
+// initialization and before running.
+TEST_P(AbstractEventLoopTest, DoubleSendAtStartup) {
+  auto loop1 = Make();
+  auto loop2 = MakePrimary();
+
+  auto sender = loop1->MakeSender<TestMessage>("/test");
+
+  ::std::vector<int> values;
+
+  loop2->MakeWatcher("/test", [&](const TestMessage &message) {
+    fprintf(stderr, "Got a message\n");
+    values.push_back(message.msg_value);
+    if (values.size() == 2) {
+      loop2->Exit();
+    }
+  });
+
+  {
+    auto msg = sender.MakeMessage();
+    msg->msg_value = 200;
+    msg.Send();
+  }
+  {
+    auto msg = sender.MakeMessage();
+    msg->msg_value = 201;
+    msg.Send();
+  }
+
+  Run();
+
+  EXPECT_THAT(values, ::testing::ElementsAreArray({200, 201}));
+}
+
+// Tests that watcher will not receive messages sent before the watcher is
+// created.
+TEST_P(AbstractEventLoopTest, DoubleSendAfterStartup) {
+  auto loop1 = Make();
+  auto loop2 = MakePrimary();
+
+  auto sender = loop1->MakeSender<TestMessage>("/test");
+
+  ::std::vector<int> values;
+
+  {
+    auto msg = sender.MakeMessage();
+    msg->msg_value = 200;
+    msg.Send();
+  }
+  {
+    auto msg = sender.MakeMessage();
+    msg->msg_value = 201;
+    msg.Send();
+  }
+
+  loop2->MakeWatcher("/test", [&](const TestMessage &message) {
+    values.push_back(message.msg_value);
+  });
+
+  // Add a timer to actually quit.
+  auto test_timer = loop2->AddTimer([&loop2]() { loop2->Exit(); });
+  loop2->OnRun([&test_timer, &loop2]() {
+    test_timer->Setup(loop2->monotonic_now(), ::std::chrono::milliseconds(100));
+  });
+
+  Run();
+  EXPECT_EQ(0, values.size());
+}
+
 // Verify that making a fetcher and watcher for "/test" succeeds.
 TEST_P(AbstractEventLoopTest, FetcherAndWatcher) {
   auto loop = Make();
@@ -93,6 +163,12 @@
   auto loop1 = Make();
   auto loop2 = MakePrimary();
 
+  loop2->MakeWatcher("/test1", [&](const TestMessage &) {});
+  loop2->MakeWatcher("/test2", [&](const TestMessage &message) {
+    EXPECT_EQ(message.msg_value, 200);
+    loop2->Exit();
+  });
+
   auto sender = loop1->MakeSender<TestMessage>("/test2");
   {
     auto msg = sender.MakeMessage();
@@ -100,11 +176,6 @@
     msg.Send();
   }
 
-  loop2->MakeWatcher("/test1", [&](const TestMessage &) {});
-  loop2->MakeWatcher("/test2", [&](const TestMessage &message) {
-    EXPECT_EQ(message.msg_value, 200);
-    loop2->Exit();
-  });
   Run();
 }
 
diff --git a/aos/events/shm-event-loop.cc b/aos/events/shm-event-loop.cc
index c9ff6fd..f55939c 100644
--- a/aos/events/shm-event-loop.cc
+++ b/aos/events/shm-event-loop.cc
@@ -97,24 +97,25 @@
                      std::function<void(const aos::Message *message)> watcher)
       : thread_state_(std::move(thread_state)),
         queue_(queue),
-        watcher_(std::move(watcher)) {}
+        index_(0),
+        watcher_(std::move(watcher)) {
+    static constexpr Options<RawQueue> kOptions =
+        RawQueue::kFromEnd | RawQueue::kNonBlock;
+    const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
+    if (msg) {
+      queue_->FreeMessage(msg);
+    }
+  }
 
   void Run() {
     thread_state_->WaitForStart();
 
     if (!thread_state_->is_running()) return;
 
-    int32_t index = 0;
-
-    static constexpr Options<RawQueue> kOptions =
-        RawQueue::kFromEnd | RawQueue::kNonBlock;
-    const void *msg = queue_->ReadMessageIndex(kOptions, &index);
-
+    const void *msg = nullptr;
     while (true) {
-      if (msg == nullptr) {
-        msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index);
-        assert(msg != nullptr);
-      }
+      msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_);
+      assert(msg != nullptr);
 
       {
         MutexLocker locker(&thread_state_->mutex_);
@@ -125,7 +126,6 @@
         if (!thread_state_->is_running()) break;
       }
       queue_->FreeMessage(msg);
-      msg = nullptr;
     }
 
     queue_->FreeMessage(msg);
@@ -134,6 +134,8 @@
  private:
   std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
   RawQueue *queue_;
+  int32_t index_;
+
   std::function<void(const Message *message)> watcher_;
 };