Enforce channel message limits when building configs

We only support 2^16-1 messages/channel.  Enforce this when building the
configuration, rather than at runtime in the simulation.  This will
make the errors cleaner and more direct.

Change-Id: I368f6c6596a7b0eaa562a70cd385ca1b94e10b93
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/configuration.cc b/aos/configuration.cc
index db00ce7..736f772 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -29,6 +29,8 @@
 
 namespace aos {
 namespace {
+namespace chrono = std::chrono;
+
 bool EndsWith(std::string_view str, std::string_view end) {
   if (str.size() < end.size()) {
     return false;
@@ -358,6 +360,12 @@
                    << ", can only use [-a-zA-Z0-9_/]";
       }
 
+      CHECK_LT(QueueSize(&config.message(), c) + QueueScratchBufferSize(c),
+               std::numeric_limits<uint16_t>::max())
+          << ": More messages/second configured than the queue can hold on "
+          << CleanedChannelToString(c) << ", " << c->frequency() << "hz for "
+          << config.message().channel_storage_duration() << "ns";
+
       if (c->has_logger_nodes()) {
         // Confirm that we don't have duplicate logger nodes.
         absl::btree_set<std::string_view> logger_nodes;
@@ -1564,5 +1572,22 @@
   return result;
 }
 
+int QueueSize(const Configuration *config, const Channel *channel) {
+  return QueueSize(channel->frequency(),
+                   chrono::nanoseconds(config->channel_storage_duration()));
+}
+
+int QueueSize(size_t frequency, chrono::nanoseconds channel_storage_duration) {
+  // Use integer arithmetic and round up at all cost.
+  return static_cast<int>(
+      (999999999 + static_cast<int64_t>(frequency) *
+                       static_cast<int64_t>(channel_storage_duration.count())) /
+      static_cast<int64_t>(1000000000));
+}
+
+int QueueScratchBufferSize(const Channel *channel) {
+  return channel->num_readers() + channel->num_senders();
+}
+
 }  // namespace configuration
 }  // namespace aos
diff --git a/aos/configuration.h b/aos/configuration.h
index 31603fe..8515b18 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -5,6 +5,7 @@
 #include <netinet/in.h>
 #include <sys/socket.h>
 
+#include <chrono>
 #include <cstdint>
 #include <string_view>
 
@@ -203,6 +204,14 @@
 bool ApplicationShouldStart(const Configuration *config, const Node *my_node,
                             const Application *application);
 
+// Returns the number of messages in the queue.
+int QueueSize(const Configuration *config, const Channel *channel);
+int QueueSize(size_t frequency,
+              std::chrono::nanoseconds channel_storage_duration);
+
+// Returns the number of scratch buffers in the queue.
+int QueueScratchBufferSize(const Channel *channel);
+
 // TODO(austin): GetSchema<T>(const Flatbuffer<Configuration> &config);
 
 }  // namespace configuration
diff --git a/aos/configuration_test.cc b/aos/configuration_test.cc
index 242ee17..fa74e20 100644
--- a/aos/configuration_test.cc
+++ b/aos/configuration_test.cc
@@ -16,6 +16,7 @@
 namespace testing {
 
 using aos::testing::ArtifactPath;
+namespace chrono = std::chrono;
 
 class ConfigurationTest : public ::testing::Test {
  public:
@@ -988,6 +989,28 @@
       "Found duplicate logger_nodes in");
 }
 
+// Tests that we properly compute the queue size for the provided duration.
+TEST_F(ConfigurationTest, QueueSize) {
+  EXPECT_EQ(QueueSize(100, chrono::seconds(2)), 200);
+  EXPECT_EQ(QueueSize(200, chrono::seconds(2)), 400);
+  EXPECT_EQ(QueueSize(100, chrono::seconds(6)), 600);
+  EXPECT_EQ(QueueSize(100, chrono::milliseconds(10)), 1);
+  EXPECT_EQ(QueueSize(100, chrono::milliseconds(10) - chrono::nanoseconds(1)),
+            1);
+  EXPECT_EQ(QueueSize(100, chrono::milliseconds(10) - chrono::nanoseconds(2)),
+            1);
+}
+
+// Tests that we compute scratch buffer size correctly too.
+TEST_F(ConfigurationTest, QueueScratchBufferSize) {
+  const aos::FlatbufferDetachedBuffer<Channel> channel =
+      JsonToFlatbuffer<Channel>(
+          "{ \"name\": \"/foo\", \"type\": \".aos.bar\", \"num_readers\": 5, "
+          "\"num_senders\": 10 }");
+
+  EXPECT_EQ(QueueScratchBufferSize(&channel.message()), 15);
+}
+
 }  // namespace testing
 }  // namespace configuration
 }  // namespace aos
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 9159553..a43de78 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -104,7 +104,7 @@
 }
 
 ipc_lib::LocklessQueueConfiguration MakeQueueConfiguration(
-    const Channel *channel, std::chrono::seconds channel_storage_duration) {
+    const Configuration *configuration, const Channel *channel) {
   ipc_lib::LocklessQueueConfiguration config;
 
   config.num_watchers = channel->num_watchers();
@@ -112,7 +112,7 @@
   // The value in the channel will default to 0 if readers are configured to
   // copy.
   config.num_pinners = channel->num_readers();
-  config.queue_size = channel_storage_duration.count() * channel->frequency();
+  config.queue_size = configuration::QueueSize(configuration, channel);
   config.message_data_size = channel->max_size();
 
   return config;
@@ -120,9 +120,9 @@
 
 class MMappedQueue {
  public:
-  MMappedQueue(std::string_view shm_base, const Channel *channel,
-               std::chrono::seconds channel_storage_duration)
-      : config_(MakeQueueConfiguration(channel, channel_storage_duration)) {
+  MMappedQueue(std::string_view shm_base, const Configuration *config,
+               const Channel *channel)
+      : config_(MakeQueueConfiguration(config, channel)) {
     std::string path = ShmPath(shm_base, channel);
 
     size_ = ipc_lib::LocklessQueueMemorySize(config_);
@@ -240,10 +240,7 @@
                             const Channel *channel)
       : event_loop_(event_loop),
         channel_(channel),
-        lockless_queue_memory_(
-            shm_base, channel,
-            chrono::ceil<chrono::seconds>(chrono::nanoseconds(
-                event_loop->configuration()->channel_storage_duration()))),
+        lockless_queue_memory_(shm_base, event_loop->configuration(), channel),
         reader_(lockless_queue_memory_.queue()) {
     context_.data = nullptr;
     // Point the queue index at the next index to read starting now.  This
@@ -517,10 +514,7 @@
   explicit ShmSender(std::string_view shm_base, EventLoop *event_loop,
                      const Channel *channel)
       : RawSender(event_loop, channel),
-        lockless_queue_memory_(
-            shm_base, channel,
-            chrono::ceil<chrono::seconds>(chrono::nanoseconds(
-                event_loop->configuration()->channel_storage_duration()))),
+        lockless_queue_memory_(shm_base, event_loop->configuration(), channel),
         lockless_queue_sender_(VerifySender(
             ipc_lib::LocklessQueueSender::Make(
                 lockless_queue_memory_.queue(),
@@ -1219,10 +1213,7 @@
 
 int ShmEventLoop::NumberBuffers(const Channel *channel) {
   CheckCurrentThread();
-  return MakeQueueConfiguration(
-             channel, chrono::ceil<chrono::seconds>(chrono::nanoseconds(
-                          configuration()->channel_storage_duration())))
-      .num_messages();
+  return MakeQueueConfiguration(configuration(), channel).num_messages();
 }
 
 absl::Span<char> ShmEventLoop::GetShmSenderSharedMemory(
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 6bf3ec1..e0ed7a0 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -185,10 +185,8 @@
 
   // The number of messages we pretend to have in the queue.
   int queue_size() const {
-    return channel()->frequency() *
-           std::chrono::duration_cast<std::chrono::duration<double>>(
-               channel_storage_duration_)
-               .count();
+    return configuration::QueueSize(channel()->frequency(),
+                                    channel_storage_duration_);
   }
 
   std::chrono::nanoseconds channel_storage_duration() const {
@@ -197,10 +195,7 @@
 
   // The number of extra buffers (beyond the queue) we pretend to have.
   int number_scratch_buffers() const {
-    // We need to start creating messages before we know how many
-    // senders+readers we'll have, so we need to just pick something which is
-    // always big enough.
-    return 50;
+    return configuration::QueueScratchBufferSize(channel());
   }
 
   int number_buffers() const { return queue_size() + number_scratch_buffers(); }
@@ -256,12 +251,12 @@
   const Channel *channel() const { return channel_; }
 
   void CountSenderCreated() {
-    CheckBufferCount();
     if (sender_count_ >= channel()->num_senders()) {
       LOG(FATAL) << "Failed to create sender on "
                  << configuration::CleanedChannelToString(channel())
                  << ", too many senders.";
     }
+    CheckBufferCount();
     ++sender_count_;
   }