Change the event-loop restriction

It's weird but acceptable to create as many fetchers as you want.
RobotState is a common thing that multiple people want.  We don't want
to support senders and watchers on the same loop (do you get your own
messages?) until there's a valid use case.

Change-Id: I215f452311d3f47ffd487d78885ae2848c6ffe9b
diff --git a/aos/events/event-loop.h b/aos/events/event-loop.h
index a187981..0590d7b 100644
--- a/aos/events/event-loop.h
+++ b/aos/events/event-loop.h
@@ -59,11 +59,13 @@
     const T &operator*() const { return *get(); }
     const T *operator->() const { return get(); }
 
-    // Sends the message to the queue. Should only be called once.
-    void Send() {
+    // Sends the message to the queue. Should only be called once.  Returns true
+    // if the message was successfully sent, and false otherwise.
+    bool Send() {
       RawSender *sender = &msg_.get_deleter();
       msg_->SetTimeToNow();
-      sender->Send(reinterpret_cast<RawSender::SendContext *>(msg_.release()));
+      return sender->Send(
+          reinterpret_cast<RawSender::SendContext *>(msg_.release()));
     }
 
    private:
@@ -91,6 +93,10 @@
   // Current time.
   virtual monotonic_clock::time_point monotonic_now() = 0;
 
+  // Note, it is supported to create:
+  //   multiple fetchers, and (one sender or one watcher) per <path, type>
+  //   tuple.
+
   // Makes a class that will always fetch the most recent value
   // sent to path.
   template <typename T>
diff --git a/aos/events/event-loop_param_test.cc b/aos/events/event-loop_param_test.cc
index 78e744a..efac8f9 100644
--- a/aos/events/event-loop_param_test.cc
+++ b/aos/events/event-loop_param_test.cc
@@ -55,44 +55,54 @@
   EXPECT_TRUE(happened);
 }
 
-// Verify that making a fetcher and handler for "/test" dies.
-TEST_P(AbstractEventLoopTest, FetcherAndHandler) {
+// Verify that making a fetcher and watcher for "/test" succeeds.
+TEST_P(AbstractEventLoopTest, FetcherAndWatcher) {
   auto loop = Make();
   auto fetcher = loop->MakeFetcher<TestMessage>("/test");
-  EXPECT_DEATH(loop->MakeWatcher("/test", [&](const TestMessage &) {}), "/test");
+  loop->MakeWatcher("/test", [&](const TestMessage &) {});
 }
 
-// Verify that making 2 fetchers for "/test" fails.
+// Verify that making 2 fetchers for "/test" succeeds.
 TEST_P(AbstractEventLoopTest, TwoFetcher) {
   auto loop = Make();
   auto fetcher = loop->MakeFetcher<TestMessage>("/test");
-  EXPECT_DEATH(loop->MakeFetcher<TestMessage>("/test"), "/test");
+  auto fetcher2 = loop->MakeFetcher<TestMessage>("/test");
 }
 
-// Verify that registering a handler twice for "/test" fails.
-TEST_P(AbstractEventLoopTest, TwoHandler) {
+// Verify that registering a watcher twice for "/test" fails.
+TEST_P(AbstractEventLoopTest, TwoWatcher) {
   auto loop = Make();
   loop->MakeWatcher("/test", [&](const TestMessage &) {});
-  EXPECT_DEATH(loop->MakeWatcher("/test", [&](const TestMessage &) {}), "/test");
+  EXPECT_DEATH(loop->MakeWatcher("/test", [&](const TestMessage &) {}),
+               "/test");
+}
+
+// Verify that registering a watcher and a sender for "/test" fails.
+TEST_P(AbstractEventLoopTest, WatcherAndSender) {
+  auto loop = Make();
+  auto sender = loop->MakeSender<TestMessage>("/test");
+  EXPECT_DEATH(loop->MakeWatcher("/test", [&](const TestMessage &) {}),
+               "/test");
 }
 
 // Verify that Quit() works when there are multiple watchers.
-TEST_P(AbstractEventLoopTest, MultipleFetcherQuit) {
-  auto loop = Make();
+TEST_P(AbstractEventLoopTest, MultipleWatcherQuit) {
+  auto loop1 = Make();
+  auto loop2 = Make();
 
-  auto sender = loop->MakeSender<TestMessage>("/test2");
+  auto sender = loop1->MakeSender<TestMessage>("/test2");
   {
     auto msg = sender.MakeMessage();
     msg->msg_value = 200;
     msg.Send();
   }
 
-  loop->MakeWatcher("/test1", [&](const TestMessage &) {});
-  loop->MakeWatcher("/test2", [&](const TestMessage &message) {
+  loop2->MakeWatcher("/test1", [&](const TestMessage &) {});
+  loop2->MakeWatcher("/test2", [&](const TestMessage &message) {
     EXPECT_EQ(message.msg_value, 200);
-    loop->Exit();
+    loop2->Exit();
   });
-  loop->Run();
+  loop2->Run();
 }
 
 // Verify that timer intervals and duration function properly.
diff --git a/aos/events/shm-event-loop.cc b/aos/events/shm-event-loop.cc
index 945ebc0..781c963 100644
--- a/aos/events/shm-event-loop.cc
+++ b/aos/events/shm-event-loop.cc
@@ -1,12 +1,14 @@
 #include "aos/events/shm-event-loop.h"
-#include "aos/logging/logging.h"
-#include "aos/queue.h"
 
 #include <sys/timerfd.h>
+#include <algorithm>
 #include <atomic>
 #include <chrono>
 #include <stdexcept>
 
+#include "aos/logging/logging.h"
+#include "aos/queue.h"
+
 namespace aos {
 
 ShmEventLoop::ShmEventLoop() : thread_state_(std::make_shared<ThreadState>()) {}
@@ -173,13 +175,13 @@
 
 std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
     const std::string &path, const QueueTypeInfo &type) {
-  Take(path);
   return std::unique_ptr<RawFetcher>(new ShmFetcher(
       RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
 }
 
 std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
     const std::string &path, const QueueTypeInfo &type) {
+  Take(path);
   return std::unique_ptr<RawSender>(new ShmSender(
       RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
 }
@@ -269,8 +271,12 @@
   if (is_running()) {
     ::aos::Die("Cannot add new objects while running.\n");
   }
-  if (!taken_.emplace(path).second) {
-    ::aos::Die("%s already has a listener / watcher.", path.c_str());
+
+  const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
+  if (prior != taken_.end()) {
+    ::aos::Die("%s is already being used.", path.c_str());
+  } else {
+    taken_.emplace_back(path);
   }
 }
 
diff --git a/aos/events/shm-event-loop.h b/aos/events/shm-event-loop.h
index 49d65f7..e8ff267 100644
--- a/aos/events/shm-event-loop.h
+++ b/aos/events/shm-event-loop.h
@@ -17,6 +17,10 @@
 
 // Specialization of EventLoop that is built from queues running out of shared
 // memory. See more details at aos/queue.h
+//
+// This object must be interacted with from one thread, but the Senders and
+// Fetchers may be used from multiple threads afterwords (as long as their
+// destructors are called back in one thread again)
 class ShmEventLoop : public EventLoop {
  public:
   ShmEventLoop();
@@ -73,12 +77,13 @@
   };
 
   // Exclude multiple of the same type for path.
-  void Take(const std::string &path);
 
   std::vector<std::function<void()>> on_run_;
   std::shared_ptr<ThreadState> thread_state_;
 
-  std::unordered_set<std::string> taken_;
+  void Take(const std::string &path);
+
+  std::vector<::std::string> taken_;
 };
 
 }  // namespace aos
diff --git a/aos/events/simulated-event-loop.cc b/aos/events/simulated-event-loop.cc
index e82c381..a23b18c 100644
--- a/aos/events/simulated-event-loop.cc
+++ b/aos/events/simulated-event-loop.cc
@@ -1,4 +1,7 @@
 #include "aos/events/simulated-event-loop.h"
+
+#include <algorithm>
+
 #include "aos/queue.h"
 
 namespace aos {
@@ -80,13 +83,13 @@
 
 std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
     const std::string &path, const QueueTypeInfo &type) {
+  Take(path);
   ::std::pair<::std::string, QueueTypeInfo> key(path, type);
   return GetSimulatedQueue(key)->MakeRawSender();
 }
 
 std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
     const std::string &path, const QueueTypeInfo &type) {
-  Take(path);
   ::std::pair<::std::string, QueueTypeInfo> key(path, type);
   return GetSimulatedQueue(key)->MakeRawFetcher();
 }
@@ -116,12 +119,15 @@
   return std::unique_ptr<RawFetcher>(new SimulatedFetcher(this));
 }
 
-void SimulatedEventLoop::Take(const std::string &path) {
+void SimulatedEventLoop::Take(const ::std::string &path) {
   if (is_running()) {
     ::aos::Die("Cannot add new objects while running.\n");
   }
-  if (!taken_.emplace(path).second) {
-    ::aos::Die("%s already has a listener / watcher.", path.c_str());
+  const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
+  if (prior != taken_.end()) {
+    ::aos::Die("%s is already being used.", path.c_str());
+  } else {
+    taken_.emplace_back(path);
   }
 }
 }  // namespace aos
diff --git a/aos/events/simulated-event-loop.h b/aos/events/simulated-event-loop.h
index 79b48b8..b5c94bf 100644
--- a/aos/events/simulated-event-loop.h
+++ b/aos/events/simulated-event-loop.h
@@ -212,8 +212,7 @@
       std::function<void(const aos::Message *message)> watcher) override;
 
   TimerHandler *AddTimer(::std::function<void()> callback) override {
-    timers_.emplace_back(
-        new SimulatedTimerHandler(scheduler_, callback));
+    timers_.emplace_back(new SimulatedTimerHandler(scheduler_, callback));
     return timers_.back().get();
   }
 
@@ -232,19 +231,20 @@
   SimulatedQueue *GetSimulatedQueue(
       const ::std::pair<::std::string, QueueTypeInfo> &);
 
-  void Take(const std::string &path);
+  void Take(const ::std::string &path);
 
  private:
   EventScheduler *scheduler_;
-  std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue> *queues_;
-  std::unordered_set<std::string> taken_;
-  std::vector<std::unique_ptr<TimerHandler>> timers_;
+  ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
+      *queues_;
+  ::std::vector<std::string> taken_;
+  ::std::vector<std::unique_ptr<TimerHandler>> timers_;
 };
 
 class SimulatedEventLoopFactory {
  public:
-  std::unique_ptr<EventLoop> CreateEventLoop() {
-    return std::unique_ptr<EventLoop>(
+  ::std::unique_ptr<EventLoop> CreateEventLoop() {
+    return ::std::unique_ptr<EventLoop>(
         new SimulatedEventLoop(&scheduler_, &queues_));
   }