Allow multiple senders on the same channel

I will do this to allow giving the kernel multiple buffers for image
frames.

Also deduplicate some infrastructure between EventLoop implementations.

Change-Id: Ifd2c9a29747481ea36be1654960dcf86303a11f4
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 6f1f453..a183cae 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -61,6 +61,7 @@
         "//aos/time",
         "//aos/util:phased_loop",
         "@com_github_google_flatbuffers//:flatbuffers",
+        "@com_google_absl//absl/container:btree",
     ],
 )
 
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index a370ca8..e368df2 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -127,6 +127,35 @@
   return watchers_.back().get();
 }
 
+void EventLoop::TakeWatcher(const Channel *channel) {
+  CHECK(!is_running()) << ": Cannot add new objects while running.";
+  ChannelIndex(channel);
+
+  CHECK(taken_senders_.find(channel) == taken_senders_.end())
+      << ": " << FlatbufferToJson(channel) << " is already being used.";
+
+  auto result = taken_watchers_.insert(channel);
+  CHECK(result.second) << ": " << FlatbufferToJson(channel)
+                       << " is already being used.";
+
+  if (!configuration::ChannelIsReadableOnNode(channel, node())) {
+    LOG(FATAL) << ": " << FlatbufferToJson(channel)
+               << " is not able to be watched on this node.  Check your "
+                  "configuration.";
+  }
+}
+
+void EventLoop::TakeSender(const Channel *channel) {
+  CHECK(!is_running()) << ": Cannot add new objects while running.";
+  ChannelIndex(channel);
+
+  CHECK(taken_watchers_.find(channel) == taken_watchers_.end())
+      << ": Channel " << FlatbufferToJson(channel) << " is already being used.";
+
+  // We don't care if this is a duplicate.
+  taken_senders_.insert(channel);
+}
+
 void EventLoop::SendTimingReport() {
   // We need to do a fancy dance here to get all the accounting to work right.
   // We want to copy the memory here, but then send after resetting. Otherwise
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index ea2bf6e..4a12096 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -14,6 +14,8 @@
 #include "aos/json_to_flatbuffer.h"
 #include "aos/time/time.h"
 #include "aos/util/phased_loop.h"
+
+#include "absl/container/btree_set.h"
 #include "flatbuffers/flatbuffers.h"
 #include "glog/logging.h"
 
@@ -499,6 +501,11 @@
   void DeleteFetcher(RawFetcher *fetcher);
   WatcherState *NewWatcher(std::unique_ptr<WatcherState> watcher);
 
+  // Tracks that we have a (single) watcher on the given channel.
+  void TakeWatcher(const Channel *channel);
+  // Tracks that we have at least one sender on the given channel.
+  void TakeSender(const Channel *channel);
+
   std::vector<RawSender *> senders_;
   std::vector<RawFetcher *> fetchers_;
 
@@ -536,6 +543,8 @@
 
   // If true, don't send out timing reports.
   bool skip_timing_report_ = false;
+
+  absl::btree_set<const Channel *> taken_watchers_, taken_senders_;
 };
 
 }  // namespace aos
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index e1e05a2..b72a8e2 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -45,6 +45,47 @@
   EXPECT_TRUE(happened);
 }
 
+// Tests that watcher can receive messages from two senders.
+// Also tests that OnRun() works.
+TEST_P(AbstractEventLoopTest, BasicTwoSenders) {
+  auto loop1 = Make();
+  auto loop2 = MakePrimary();
+
+  aos::Sender<TestMessage> sender1 = loop1->MakeSender<TestMessage>("/test");
+  aos::Sender<TestMessage> sender2 = loop1->MakeSender<TestMessage>("/test");
+
+  bool happened = false;
+
+  loop2->OnRun([&]() {
+    happened = true;
+
+    {
+      aos::Sender<TestMessage>::Builder msg = sender1.MakeBuilder();
+      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+      builder.add_value(200);
+      ASSERT_TRUE(msg.Send(builder.Finish()));
+    }
+    {
+      aos::Sender<TestMessage>::Builder msg = sender2.MakeBuilder();
+      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+      builder.add_value(200);
+      ASSERT_TRUE(msg.Send(builder.Finish()));
+    }
+  });
+
+  int messages_received = 0;
+  loop2->MakeWatcher("/test", [&](const TestMessage &message) {
+    EXPECT_EQ(message.value(), 200);
+    this->Exit();
+    ++messages_received;
+  });
+
+  EXPECT_FALSE(happened);
+  Run();
+  EXPECT_TRUE(happened);
+  EXPECT_EQ(messages_received, 2);
+}
+
 // Tests that a fetcher can fetch from a sender.
 // Also tests that OnRun() works.
 TEST_P(AbstractEventLoopTest, FetchWithoutRun) {
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index 60e8216..c2c9884 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -36,13 +36,6 @@
       << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
       << T::GetFullyQualifiedName() << "\" } not found in config.";
 
-  if (!configuration::ChannelIsReadableOnNode(channel, node())) {
-    LOG(FATAL) << "Channel { \"name\": \"" << channel_name << "\", \"type\": \""
-               << T::GetFullyQualifiedName()
-               << "\" } is not able to be watched on this node.  Check your "
-                  "configuration.";
-  }
-
   return MakeRawWatcher(
       channel, [this, w](const Context &context, const void *message) {
         context_ = context;
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 8824181..cc11520 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -323,7 +323,10 @@
       ipc_lib::LocklessQueue::empty_queue_index();
 
   struct AlignedChar {
-    alignas(32) char data;
+    // Cortex-A72 (Raspberry Pi 4) and Cortex-A53 (Xavier AGX) both have 64 byte
+    // cache lines.
+    // V4L2 requires 64 byte alignment for USERPTR.
+    alignas(64) char data;
   };
 
   std::unique_ptr<AlignedChar, decltype(&free)> data_storage_;
@@ -591,7 +594,7 @@
 
 ::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
     const Channel *channel) {
-  Take(channel);
+  TakeSender(channel);
 
   return ::std::unique_ptr<RawSender>(new internal::ShmSender(this, channel));
 }
@@ -599,14 +602,7 @@
 void ShmEventLoop::MakeRawWatcher(
     const Channel *channel,
     std::function<void(const Context &context, const void *message)> watcher) {
-  Take(channel);
-
-  if (!configuration::ChannelIsReadableOnNode(channel, node())) {
-    LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
-               << "\", \"type\": \"" << channel->type()->string_view()
-               << "\" } is not able to be watched on this node.  Check your "
-                  "configuration.";
-  }
+  TakeWatcher(channel);
 
   NewWatcher(::std::unique_ptr<WatcherState>(
       new internal::WatcherState(this, channel, std::move(watcher))));
@@ -828,19 +824,6 @@
   CHECK(!is_running()) << ": ShmEventLoop destroyed while running";
 }
 
-void ShmEventLoop::Take(const Channel *channel) {
-  CHECK(!is_running()) << ": Cannot add new objects while running.";
-
-  // Cheat aggresively.  Use the shared memory path as a proxy for a unique
-  // identifier for the channel.
-  const std::string path = ShmPath(channel);
-
-  const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
-  CHECK(prior == taken_.end()) << ": " << path << " is already being used.";
-
-  taken_.emplace_back(path);
-}
-
 void ShmEventLoop::SetRuntimeRealtimePriority(int priority) {
   if (is_running()) {
     LOG(FATAL) << "Cannot set realtime priority while running.";
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index c888e57..52bb338 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -79,10 +79,6 @@
 
   void HandleEvent();
 
-  // Tracks that we can't have multiple watchers or a sender and a watcher (or
-  // multiple senders) on a single queue (path).
-  void Take(const Channel *channel);
-
   // Returns the TID of the event loop.
   pid_t GetTid() override;
 
@@ -90,7 +86,6 @@
   int priority_ = 0;
   std::string name_;
   const Node *const node_;
-  std::vector<std::string> taken_;
 
   internal::EPoll epoll_;
 };
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index d783a3a..e34a23f 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -5,7 +5,6 @@
 #include <string_view>
 
 #include "absl/container/btree_map.h"
-#include "absl/container/btree_set.h"
 #include "aos/json_to_flatbuffer.h"
 #include "aos/util/phased_loop.h"
 
@@ -16,7 +15,7 @@
 struct SimulatedMessage {
   // Struct to let us force data to be well aligned.
   struct OveralignedChar {
-    char data alignas(32);
+    char data alignas(64);
   };
 
   // Context for the data.
@@ -406,8 +405,6 @@
 
   SimulatedChannel *GetSimulatedChannel(const Channel *channel);
 
-  void Take(const Channel *channel);
-
   void SetRuntimeRealtimePriority(int priority) override {
     CHECK(!is_running()) << ": Cannot set realtime priority while running.";
     priority_ = priority;
@@ -439,7 +436,6 @@
   absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> *channels_;
   std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
       *raw_event_loops_;
-  absl::btree_set<SimpleChannel> taken_;
 
   ::std::string name_;
 
@@ -470,15 +466,7 @@
 void SimulatedEventLoop::MakeRawWatcher(
     const Channel *channel,
     std::function<void(const Context &channel, const void *message)> watcher) {
-  ChannelIndex(channel);
-  Take(channel);
-
-  if (!configuration::ChannelIsReadableOnNode(channel, node())) {
-    LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
-               << "\", \"type\": \"" << channel->type()->string_view()
-               << "\" } is not able to be watched on this node.  Check your "
-                  "configuration.";
-  }
+  TakeWatcher(channel);
 
   std::unique_ptr<SimulatedWatcher> shm_watcher(
       new SimulatedWatcher(this, scheduler_, channel, std::move(watcher)));
@@ -489,8 +477,8 @@
 
 std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
     const Channel *channel) {
-  ChannelIndex(channel);
-  Take(channel);
+  TakeSender(channel);
+
   return GetSimulatedChannel(channel)->MakeRawSender(this);
 }
 
@@ -719,14 +707,6 @@
   simulated_event_loop_->AddEvent(&event_);
 }
 
-void SimulatedEventLoop::Take(const Channel *channel) {
-  CHECK(!is_running()) << ": Cannot add new objects while running.";
-
-  auto result = taken_.insert(SimpleChannel(channel));
-  CHECK(result.second) << ": " << FlatbufferToJson(channel)
-                       << " is already being used.";
-}
-
 SimulatedEventLoopFactory::SimulatedEventLoopFactory(
     const Configuration *configuration)
     : configuration_(CHECK_NOTNULL(configuration)), node_(nullptr) {