Add the channel name into the sender creation failure message

It used to just fail saying "Too Many Senders".  This isn't actionable.
We want the chanel name in there.

The fix is to propegate the failure up to the event loop where it can
make a more educated decision.  Also, add a test (and make sure
simulation matches).

Change-Id: If70397ee319ad25ce5ab7ed146e8f5057d8af100
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index fbcb62e..71bef37 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -604,6 +604,18 @@
                "/test");
 }
 
+// Verify that creating too many senders fails.
+TEST_P(AbstractEventLoopDeathTest, TooManySenders) {
+  auto loop = Make();
+  std::vector<aos::Sender<TestMessage>> senders;
+  for (int i = 0; i < 10; ++i) {
+    senders.emplace_back(loop->MakeSender<TestMessage>("/test"));
+  }
+  EXPECT_DEATH({ loop->MakeSender<TestMessage>("/test"); },
+               "Failed to create sender on \\{ \"name\": \"/test\", \"type\": "
+               "\"aos.TestMessage\" \\}, too many senders.");
+}
+
 // Verify that we can't create a sender inside OnRun.
 TEST_P(AbstractEventLoopDeathTest, SenderInOnRun) {
   auto loop1 = MakePrimary();
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index ce34ad1..b53be2f 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -374,10 +374,22 @@
                 event_loop->configuration()->channel_storage_duration()))),
         lockless_queue_(lockless_queue_memory_.memory(),
                         lockless_queue_memory_.config()),
-        lockless_queue_sender_(lockless_queue_.MakeSender()) {}
+        lockless_queue_sender_(
+            VerifySender(lockless_queue_.MakeSender(), channel)) {}
 
   ~ShmSender() override {}
 
+  static ipc_lib::LocklessQueue::Sender VerifySender(
+      std::optional<ipc_lib::LocklessQueue::Sender> &&sender,
+      const Channel *channel) {
+    if (sender) {
+      return std::move(sender.value());
+    }
+    LOG(FATAL) << "Failed to create sender on "
+               << configuration::CleanedChannelToString(channel)
+               << ", too many senders.";
+  }
+
   void *data() override { return lockless_queue_sender_.Data(); }
   size_t size() override { return lockless_queue_sender_.size(); }
   bool DoSend(size_t length,
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 1241834..ee4e6d6 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -102,6 +102,19 @@
 
   const Channel *channel() const { return channel_; }
 
+  void CountSenderCreated() {
+    if (sender_count_ >= channel()->num_senders()) {
+      LOG(FATAL) << "Failed to create sender on "
+                 << configuration::CleanedChannelToString(channel())
+                 << ", too many senders.";
+    }
+    ++sender_count_;
+  }
+  void CountSenderDestroyed() {
+    --sender_count_;
+    CHECK_GE(sender_count_, 0);
+  }
+
  private:
   const Channel *channel_;
 
@@ -114,6 +127,8 @@
   EventScheduler *scheduler_;
 
   ipc_lib::QueueIndex next_queue_index_;
+
+  int sender_count_ = 0;
 };
 
 namespace {
@@ -134,8 +149,10 @@
   SimulatedSender(SimulatedChannel *simulated_channel, EventLoop *event_loop)
       : RawSender(event_loop, simulated_channel->channel()),
         simulated_channel_(simulated_channel),
-        event_loop_(event_loop) {}
-  ~SimulatedSender() {}
+        event_loop_(event_loop) {
+    simulated_channel_->CountSenderCreated();
+  }
+  ~SimulatedSender() { simulated_channel_->CountSenderDestroyed(); }
 
   void *data() override {
     if (!message_) {