Enforce channel message limits when building configs
We only support 2^16-1 messages/channel. Enforce this when building the
configuration, rather than at runtime in the simulation. This will
make the errors cleaner and more direct.
Change-Id: I368f6c6596a7b0eaa562a70cd385ca1b94e10b93
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/configuration.cc b/aos/configuration.cc
index db00ce7..736f772 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -29,6 +29,8 @@
namespace aos {
namespace {
+namespace chrono = std::chrono;
+
bool EndsWith(std::string_view str, std::string_view end) {
if (str.size() < end.size()) {
return false;
@@ -358,6 +360,12 @@
<< ", can only use [-a-zA-Z0-9_/]";
}
+ CHECK_LT(QueueSize(&config.message(), c) + QueueScratchBufferSize(c),
+ std::numeric_limits<uint16_t>::max())
+ << ": More messages/second configured than the queue can hold on "
+ << CleanedChannelToString(c) << ", " << c->frequency() << "hz for "
+ << config.message().channel_storage_duration() << "ns";
+
if (c->has_logger_nodes()) {
// Confirm that we don't have duplicate logger nodes.
absl::btree_set<std::string_view> logger_nodes;
@@ -1564,5 +1572,22 @@
return result;
}
+int QueueSize(const Configuration *config, const Channel *channel) {
+ return QueueSize(channel->frequency(),
+ chrono::nanoseconds(config->channel_storage_duration()));
+}
+
+int QueueSize(size_t frequency, chrono::nanoseconds channel_storage_duration) {
+ // Use integer arithmetic and round up at all cost.
+ return static_cast<int>(
+ (999999999 + static_cast<int64_t>(frequency) *
+ static_cast<int64_t>(channel_storage_duration.count())) /
+ static_cast<int64_t>(1000000000));
+}
+
+int QueueScratchBufferSize(const Channel *channel) {
+ return channel->num_readers() + channel->num_senders();
+}
+
} // namespace configuration
} // namespace aos
diff --git a/aos/configuration.h b/aos/configuration.h
index 31603fe..8515b18 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -5,6 +5,7 @@
#include <netinet/in.h>
#include <sys/socket.h>
+#include <chrono>
#include <cstdint>
#include <string_view>
@@ -203,6 +204,14 @@
bool ApplicationShouldStart(const Configuration *config, const Node *my_node,
const Application *application);
+// Returns the number of messages in the queue.
+int QueueSize(const Configuration *config, const Channel *channel);
+int QueueSize(size_t frequency,
+ std::chrono::nanoseconds channel_storage_duration);
+
+// Returns the number of scratch buffers in the queue.
+int QueueScratchBufferSize(const Channel *channel);
+
// TODO(austin): GetSchema<T>(const Flatbuffer<Configuration> &config);
} // namespace configuration
diff --git a/aos/configuration_test.cc b/aos/configuration_test.cc
index 242ee17..fa74e20 100644
--- a/aos/configuration_test.cc
+++ b/aos/configuration_test.cc
@@ -16,6 +16,7 @@
namespace testing {
using aos::testing::ArtifactPath;
+namespace chrono = std::chrono;
class ConfigurationTest : public ::testing::Test {
public:
@@ -988,6 +989,28 @@
"Found duplicate logger_nodes in");
}
+// Tests that we properly compute the queue size for the provided duration.
+TEST_F(ConfigurationTest, QueueSize) {
+ EXPECT_EQ(QueueSize(100, chrono::seconds(2)), 200);
+ EXPECT_EQ(QueueSize(200, chrono::seconds(2)), 400);
+ EXPECT_EQ(QueueSize(100, chrono::seconds(6)), 600);
+ EXPECT_EQ(QueueSize(100, chrono::milliseconds(10)), 1);
+ EXPECT_EQ(QueueSize(100, chrono::milliseconds(10) - chrono::nanoseconds(1)),
+ 1);
+ EXPECT_EQ(QueueSize(100, chrono::milliseconds(10) - chrono::nanoseconds(2)),
+ 1);
+}
+
+// Tests that we compute scratch buffer size correctly too.
+TEST_F(ConfigurationTest, QueueScratchBufferSize) {
+ const aos::FlatbufferDetachedBuffer<Channel> channel =
+ JsonToFlatbuffer<Channel>(
+ "{ \"name\": \"/foo\", \"type\": \".aos.bar\", \"num_readers\": 5, "
+ "\"num_senders\": 10 }");
+
+ EXPECT_EQ(QueueScratchBufferSize(&channel.message()), 15);
+}
+
} // namespace testing
} // namespace configuration
} // namespace aos
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 9159553..a43de78 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -104,7 +104,7 @@
}
ipc_lib::LocklessQueueConfiguration MakeQueueConfiguration(
- const Channel *channel, std::chrono::seconds channel_storage_duration) {
+ const Configuration *configuration, const Channel *channel) {
ipc_lib::LocklessQueueConfiguration config;
config.num_watchers = channel->num_watchers();
@@ -112,7 +112,7 @@
// The value in the channel will default to 0 if readers are configured to
// copy.
config.num_pinners = channel->num_readers();
- config.queue_size = channel_storage_duration.count() * channel->frequency();
+ config.queue_size = configuration::QueueSize(configuration, channel);
config.message_data_size = channel->max_size();
return config;
@@ -120,9 +120,9 @@
class MMappedQueue {
public:
- MMappedQueue(std::string_view shm_base, const Channel *channel,
- std::chrono::seconds channel_storage_duration)
- : config_(MakeQueueConfiguration(channel, channel_storage_duration)) {
+ MMappedQueue(std::string_view shm_base, const Configuration *config,
+ const Channel *channel)
+ : config_(MakeQueueConfiguration(config, channel)) {
std::string path = ShmPath(shm_base, channel);
size_ = ipc_lib::LocklessQueueMemorySize(config_);
@@ -240,10 +240,7 @@
const Channel *channel)
: event_loop_(event_loop),
channel_(channel),
- lockless_queue_memory_(
- shm_base, channel,
- chrono::ceil<chrono::seconds>(chrono::nanoseconds(
- event_loop->configuration()->channel_storage_duration()))),
+ lockless_queue_memory_(shm_base, event_loop->configuration(), channel),
reader_(lockless_queue_memory_.queue()) {
context_.data = nullptr;
// Point the queue index at the next index to read starting now. This
@@ -517,10 +514,7 @@
explicit ShmSender(std::string_view shm_base, EventLoop *event_loop,
const Channel *channel)
: RawSender(event_loop, channel),
- lockless_queue_memory_(
- shm_base, channel,
- chrono::ceil<chrono::seconds>(chrono::nanoseconds(
- event_loop->configuration()->channel_storage_duration()))),
+ lockless_queue_memory_(shm_base, event_loop->configuration(), channel),
lockless_queue_sender_(VerifySender(
ipc_lib::LocklessQueueSender::Make(
lockless_queue_memory_.queue(),
@@ -1219,10 +1213,7 @@
int ShmEventLoop::NumberBuffers(const Channel *channel) {
CheckCurrentThread();
- return MakeQueueConfiguration(
- channel, chrono::ceil<chrono::seconds>(chrono::nanoseconds(
- configuration()->channel_storage_duration())))
- .num_messages();
+ return MakeQueueConfiguration(configuration(), channel).num_messages();
}
absl::Span<char> ShmEventLoop::GetShmSenderSharedMemory(
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 6bf3ec1..e0ed7a0 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -185,10 +185,8 @@
// The number of messages we pretend to have in the queue.
int queue_size() const {
- return channel()->frequency() *
- std::chrono::duration_cast<std::chrono::duration<double>>(
- channel_storage_duration_)
- .count();
+ return configuration::QueueSize(channel()->frequency(),
+ channel_storage_duration_);
}
std::chrono::nanoseconds channel_storage_duration() const {
@@ -197,10 +195,7 @@
// The number of extra buffers (beyond the queue) we pretend to have.
int number_scratch_buffers() const {
- // We need to start creating messages before we know how many
- // senders+readers we'll have, so we need to just pick something which is
- // always big enough.
- return 50;
+ return configuration::QueueScratchBufferSize(channel());
}
int number_buffers() const { return queue_size() + number_scratch_buffers(); }
@@ -256,12 +251,12 @@
const Channel *channel() const { return channel_; }
void CountSenderCreated() {
- CheckBufferCount();
if (sender_count_ >= channel()->num_senders()) {
LOG(FATAL) << "Failed to create sender on "
<< configuration::CleanedChannelToString(channel())
<< ", too many senders.";
}
+ CheckBufferCount();
++sender_count_;
}