Allow multiple senders on the same channel

I will do this to allow giving the kernel multiple buffers for image
frames.

Also deduplicate some infrastructure between EventLoop implementations.

Change-Id: Ifd2c9a29747481ea36be1654960dcf86303a11f4
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 8824181..cc11520 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -323,7 +323,10 @@
       ipc_lib::LocklessQueue::empty_queue_index();
 
   struct AlignedChar {
-    alignas(32) char data;
+    // Cortex-A72 (Raspberry Pi 4) and Cortex-A53 (Xavier AGX) both have 64 byte
+    // cache lines.
+    // V4L2 requires 64 byte alignment for USERPTR.
+    alignas(64) char data;
   };
 
   std::unique_ptr<AlignedChar, decltype(&free)> data_storage_;
@@ -591,7 +594,7 @@
 
 ::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
     const Channel *channel) {
-  Take(channel);
+  TakeSender(channel);
 
   return ::std::unique_ptr<RawSender>(new internal::ShmSender(this, channel));
 }
@@ -599,14 +602,7 @@
 void ShmEventLoop::MakeRawWatcher(
     const Channel *channel,
     std::function<void(const Context &context, const void *message)> watcher) {
-  Take(channel);
-
-  if (!configuration::ChannelIsReadableOnNode(channel, node())) {
-    LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
-               << "\", \"type\": \"" << channel->type()->string_view()
-               << "\" } is not able to be watched on this node.  Check your "
-                  "configuration.";
-  }
+  TakeWatcher(channel);
 
   NewWatcher(::std::unique_ptr<WatcherState>(
       new internal::WatcherState(this, channel, std::move(watcher))));
@@ -828,19 +824,6 @@
   CHECK(!is_running()) << ": ShmEventLoop destroyed while running";
 }
 
-void ShmEventLoop::Take(const Channel *channel) {
-  CHECK(!is_running()) << ": Cannot add new objects while running.";
-
-  // Cheat aggresively.  Use the shared memory path as a proxy for a unique
-  // identifier for the channel.
-  const std::string path = ShmPath(channel);
-
-  const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
-  CHECK(prior == taken_.end()) << ": " << path << " is already being used.";
-
-  taken_.emplace_back(path);
-}
-
 void ShmEventLoop::SetRuntimeRealtimePriority(int priority) {
   if (is_running()) {
     LOG(FATAL) << "Cannot set realtime priority while running.";