Add the channel name into the sender creation failure message
It used to just fail saying "Too Many Senders". This isn't actionable.
We want the chanel name in there.
The fix is to propegate the failure up to the event loop where it can
make a more educated decision. Also, add a test (and make sure
simulation matches).
Change-Id: If70397ee319ad25ce5ab7ed146e8f5057d8af100
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index fbcb62e..71bef37 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -604,6 +604,18 @@
"/test");
}
+// Verify that creating too many senders fails.
+TEST_P(AbstractEventLoopDeathTest, TooManySenders) {
+ auto loop = Make();
+ std::vector<aos::Sender<TestMessage>> senders;
+ for (int i = 0; i < 10; ++i) {
+ senders.emplace_back(loop->MakeSender<TestMessage>("/test"));
+ }
+ EXPECT_DEATH({ loop->MakeSender<TestMessage>("/test"); },
+ "Failed to create sender on \\{ \"name\": \"/test\", \"type\": "
+ "\"aos.TestMessage\" \\}, too many senders.");
+}
+
// Verify that we can't create a sender inside OnRun.
TEST_P(AbstractEventLoopDeathTest, SenderInOnRun) {
auto loop1 = MakePrimary();
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index ce34ad1..b53be2f 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -374,10 +374,22 @@
event_loop->configuration()->channel_storage_duration()))),
lockless_queue_(lockless_queue_memory_.memory(),
lockless_queue_memory_.config()),
- lockless_queue_sender_(lockless_queue_.MakeSender()) {}
+ lockless_queue_sender_(
+ VerifySender(lockless_queue_.MakeSender(), channel)) {}
~ShmSender() override {}
+ static ipc_lib::LocklessQueue::Sender VerifySender(
+ std::optional<ipc_lib::LocklessQueue::Sender> &&sender,
+ const Channel *channel) {
+ if (sender) {
+ return std::move(sender.value());
+ }
+ LOG(FATAL) << "Failed to create sender on "
+ << configuration::CleanedChannelToString(channel)
+ << ", too many senders.";
+ }
+
void *data() override { return lockless_queue_sender_.Data(); }
size_t size() override { return lockless_queue_sender_.size(); }
bool DoSend(size_t length,
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 1241834..ee4e6d6 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -102,6 +102,19 @@
const Channel *channel() const { return channel_; }
+ void CountSenderCreated() {
+ if (sender_count_ >= channel()->num_senders()) {
+ LOG(FATAL) << "Failed to create sender on "
+ << configuration::CleanedChannelToString(channel())
+ << ", too many senders.";
+ }
+ ++sender_count_;
+ }
+ void CountSenderDestroyed() {
+ --sender_count_;
+ CHECK_GE(sender_count_, 0);
+ }
+
private:
const Channel *channel_;
@@ -114,6 +127,8 @@
EventScheduler *scheduler_;
ipc_lib::QueueIndex next_queue_index_;
+
+ int sender_count_ = 0;
};
namespace {
@@ -134,8 +149,10 @@
SimulatedSender(SimulatedChannel *simulated_channel, EventLoop *event_loop)
: RawSender(event_loop, simulated_channel->channel()),
simulated_channel_(simulated_channel),
- event_loop_(event_loop) {}
- ~SimulatedSender() {}
+ event_loop_(event_loop) {
+ simulated_channel_->CountSenderCreated();
+ }
+ ~SimulatedSender() { simulated_channel_->CountSenderDestroyed(); }
void *data() override {
if (!message_) {
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index f31d80d..728d2d1 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -518,7 +518,8 @@
}
if (sender_index_ == -1) {
- LOG(FATAL) << "Too many senders";
+ VLOG(1) << "Too many senders, starting to bail.";
+ return;
}
::aos::ipc_lib::Sender *s = memory_->GetSender(sender_index_);
@@ -529,13 +530,18 @@
}
LocklessQueue::Sender::~Sender() {
- if (memory_ != nullptr) {
+ if (valid()) {
death_notification_release(&(memory_->GetSender(sender_index_)->tid));
}
}
-LocklessQueue::Sender LocklessQueue::MakeSender() {
- return LocklessQueue::Sender(memory_);
+std::optional<LocklessQueue::Sender> LocklessQueue::MakeSender() {
+ LocklessQueue::Sender result = LocklessQueue::Sender(memory_);
+ if (result.valid()) {
+ return std::move(result);
+ } else {
+ return std::nullopt;
+ }
}
QueueIndex ZeroOrValid(QueueIndex index) {
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index 550485f..de80f3d 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -5,6 +5,7 @@
#include <sys/signalfd.h>
#include <sys/types.h>
#include <vector>
+#include <optional>
#include "aos/ipc_lib/aos_sync.h"
#include "aos/ipc_lib/data_alignment.h"
@@ -245,6 +246,11 @@
Sender(LocklessQueueMemory *memory);
+ // Returns true if this sender is valid. If it isn't valid, any of the
+ // other methods won't work. This is here to allow the lockless queue to
+ // only build a sender if there was one available.
+ bool valid() const { return sender_index_ != -1 && memory_ != nullptr; }
+
// Pointer to the backing memory.
LocklessQueueMemory *memory_ = nullptr;
@@ -252,8 +258,9 @@
int sender_index_ = -1;
};
- // Creates a sender.
- Sender MakeSender();
+ // Creates a sender. If we couldn't allocate a sender, returns nullopt.
+ // TODO(austin): Change the API if we find ourselves with more errors.
+ std::optional<Sender> MakeSender();
private:
LocklessQueueMemory *memory_ = nullptr;
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index 213c9e4..b7cdfee 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -521,7 +521,7 @@
LocklessQueue queue(
reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
config);
- LocklessQueue::Sender sender = queue.MakeSender();
+ LocklessQueue::Sender sender = queue.MakeSender().value();
for (int i = 0; i < 2; ++i) {
char data[100];
size_t s = snprintf(data, sizeof(data), "foobar%d", i + 1);
@@ -555,7 +555,7 @@
LocklessQueue queue(memory, config);
// Building and destroying a sender will clean up the queue.
- { LocklessQueue::Sender sender = queue.MakeSender(); }
+ { LocklessQueue::Sender sender = queue.MakeSender().value(); }
if (print) {
printf("Cleaned up version:\n");
@@ -563,12 +563,12 @@
}
{
- LocklessQueue::Sender sender = queue.MakeSender();
+ LocklessQueue::Sender sender = queue.MakeSender().value();
{
// Make a second sender to confirm that the slot was freed.
// If the sender doesn't get cleaned up, this will fail.
LocklessQueue queue2(memory, config);
- queue2.MakeSender();
+ queue2.MakeSender().value();
}
// Send a message to make sure that the queue still works.
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index 109c2ea..65b2a15 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -194,17 +194,15 @@
}
// Tests that too many watchers dies like expected.
-TEST_F(LocklessQueueDeathTest, TooManySenders) {
- EXPECT_DEATH(
- {
- ::std::vector<::std::unique_ptr<LocklessQueue>> queues;
- ::std::vector<LocklessQueue::Sender> senders;
- for (size_t i = 0; i < config_.num_senders + 1; ++i) {
- queues.emplace_back(new LocklessQueue(get_memory(), config_));
- senders.emplace_back(queues.back()->MakeSender());
- }
- },
- "Too many senders");
+TEST_F(LocklessQueueTest, TooManySenders) {
+ ::std::vector<::std::unique_ptr<LocklessQueue>> queues;
+ ::std::vector<LocklessQueue::Sender> senders;
+ for (size_t i = 0; i < config_.num_senders; ++i) {
+ queues.emplace_back(new LocklessQueue(get_memory(), config_));
+ senders.emplace_back(queues.back()->MakeSender().value());
+ }
+ queues.emplace_back(new LocklessQueue(get_memory(), config_));
+ EXPECT_FALSE(queues.back()->MakeSender());
}
// Now, start 2 threads and have them receive the signals.
@@ -240,7 +238,7 @@
TEST_F(LocklessQueueTest, Send) {
LocklessQueue queue(get_memory(), config_);
- LocklessQueue::Sender sender = queue.MakeSender();
+ LocklessQueue::Sender sender = queue.MakeSender().value();
// Send enough messages to wrap.
for (int i = 0; i < 20000; ++i) {
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index 9bb0a70..f5b3d7e 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -143,7 +143,7 @@
::std::thread([this, &t, thread_index, &run, write_wrap_count]() {
// Build up a sender.
LocklessQueue queue(memory_, config_);
- LocklessQueue::Sender sender = queue.MakeSender();
+ LocklessQueue::Sender sender = queue.MakeSender().value();
// Signal that we are ready to start sending.
t.ready.Set();