Switch to 32 bit message indices when possible in aos

Larger queues need larger indices.  Everything is 32 bits to support old
32 bit CPUs.  Conditionally use 64 bit atomics when we can on 64 bit
processors so we can have more messages.  32 bit processors are becoming
pretty old, so no sense hamstringing the rest of the system based on
their constraints.

This also runs both tests on 64 bit processors to maintain coverage of
the 32 bit code.

Change-Id: I5ff61d4fc41163a0b7a2f71f08fc62d7e6048583
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/BUILD b/aos/BUILD
index a5fb6c8..396d107 100644
--- a/aos/BUILD
+++ b/aos/BUILD
@@ -277,6 +277,7 @@
         ":flatbuffers",
         ":json_to_flatbuffer",
         "//aos:unique_malloc_ptr",
+        "//aos/ipc_lib:index",
         "//aos/network:team_number",
         "//aos/util:file",
         "@com_github_google_glog//:glog",
diff --git a/aos/config.bzl b/aos/config.bzl
index 4aa6b4a..d281027 100644
--- a/aos/config.bzl
+++ b/aos/config.bzl
@@ -1,4 +1,5 @@
 load("//tools/build_rules:label.bzl", "expand_label")
+load("//tools/build_rules:select.bzl", "address_size_select")
 
 AosConfigInfo = provider(fields = [
     "transitive_flatbuffers",
@@ -9,6 +10,10 @@
     _aos_config(
         name = name,
         src = src,
+        flags = address_size_select({
+            "32": ["--max_queue_size_override=0xffff"],
+            "64": ["--max_queue_size_override=0xffffffff"],
+        }),
         config_json = name + ".json",
         config_stripped = name + ".stripped.json",
         config_binary = name + ".bfbs",
@@ -38,7 +43,7 @@
     ctx.actions.run(
         outputs = [config, stripped_config, binary_config],
         inputs = all_files,
-        arguments = [
+        arguments = ctx.attr.flags + [
             config.path,
             stripped_config.path,
             binary_config.path,
@@ -74,6 +79,9 @@
             mandatory = True,
             allow_files = True,
         ),
+        "flags": attr.string_list(
+            doc = "Additional flags to pass to config_flattener.",
+        ),
         "deps": attr.label_list(
             providers = [AosConfigInfo],
         ),
diff --git a/aos/configuration.cc b/aos/configuration.cc
index 5afbee0..70e7b48 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -23,11 +23,18 @@
 
 #include "aos/configuration_generated.h"
 #include "aos/flatbuffer_merge.h"
+#include "aos/ipc_lib/index.h"
 #include "aos/json_to_flatbuffer.h"
 #include "aos/network/team_number.h"
 #include "aos/unique_malloc_ptr.h"
 #include "aos/util/file.h"
 
+DEFINE_uint32(max_queue_size_override, 0,
+              "If nonzero, this is the max number of elements in a queue to "
+              "enforce.  If zero, use the number that the processor that this "
+              "application is compiled for can support.  This is mostly useful "
+              "for config validation, and shouldn't be touched.");
+
 namespace aos {
 namespace {
 namespace chrono = std::chrono;
@@ -364,7 +371,10 @@
       }
 
       CHECK_LT(QueueSize(&config.message(), c) + QueueScratchBufferSize(c),
-               std::numeric_limits<uint16_t>::max())
+               FLAGS_max_queue_size_override != 0
+                   ? FLAGS_max_queue_size_override
+                   : std::numeric_limits<
+                         ipc_lib::QueueIndex::PackedIndexType>::max())
           << ": More messages/second configured than the queue can hold on "
           << CleanedChannelToString(c) << ", " << c->frequency() << "hz for "
           << ChannelStorageDuration(&config.message(), c).count() << "ns";
@@ -1622,12 +1632,13 @@
   return chrono::nanoseconds(config->channel_storage_duration());
 }
 
-int QueueSize(const Configuration *config, const Channel *channel) {
+size_t QueueSize(const Configuration *config, const Channel *channel) {
   return QueueSize(channel->frequency(),
                    ChannelStorageDuration(config, channel));
 }
 
-int QueueSize(size_t frequency, chrono::nanoseconds channel_storage_duration) {
+size_t QueueSize(size_t frequency,
+                 chrono::nanoseconds channel_storage_duration) {
   // Use integer arithmetic and round up at all cost.
   return static_cast<int>(
       (999999999 + static_cast<int64_t>(frequency) *
diff --git a/aos/configuration.h b/aos/configuration.h
index 68b3079..e35c0bf 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -209,9 +209,9 @@
                             const Application *application);
 
 // Returns the number of messages in the queue.
-int QueueSize(const Configuration *config, const Channel *channel);
-int QueueSize(size_t frequency,
-              std::chrono::nanoseconds channel_storage_duration);
+size_t QueueSize(const Configuration *config, const Channel *channel);
+size_t QueueSize(size_t frequency,
+                 std::chrono::nanoseconds channel_storage_duration);
 
 // Returns the number of scratch buffers in the queue.
 int QueueScratchBufferSize(const Channel *channel);
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index 5d58c29..6b40b9d 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -26,12 +26,12 @@
     }
 
     // Clean up anything left there before.
-    unlink((FLAGS_shm_base + "/test/aos.TestMessage.v4").c_str());
-    unlink((FLAGS_shm_base + "/test1/aos.TestMessage.v4").c_str());
-    unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v4").c_str());
-    unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v4").c_str());
-    unlink((FLAGS_shm_base + "/aos/aos.timing.Report.v4").c_str());
-    unlink((FLAGS_shm_base + "/aos/aos.logging.LogMessageFbs.v4").c_str());
+    unlink((FLAGS_shm_base + "/test/aos.TestMessage.v5").c_str());
+    unlink((FLAGS_shm_base + "/test1/aos.TestMessage.v5").c_str());
+    unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v5").c_str());
+    unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v5").c_str());
+    unlink((FLAGS_shm_base + "/aos/aos.timing.Report.v5").c_str());
+    unlink((FLAGS_shm_base + "/aos/aos.logging.LogMessageFbs.v5").c_str());
   }
 
   ~ShmEventLoopTestFactory() { FLAGS_override_hostname = ""; }
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 5345b8d..7e469f5 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -149,6 +149,12 @@
         channel_storage_duration_(channel_storage_duration),
         next_queue_index_(ipc_lib::QueueIndex::Zero(number_buffers())),
         scheduler_(scheduler) {
+    // Gut check that things fit.  Configuration validation should have caught
+    // this before we get here.
+    CHECK_LT(static_cast<size_t>(number_buffers()),
+             std::numeric_limits<
+                 decltype(available_buffer_indices_)::value_type>::max())
+        << configuration::CleanedChannelToString(channel);
     available_buffer_indices_.resize(number_buffers());
     for (int i = 0; i < number_buffers(); ++i) {
       available_buffer_indices_[i] = i;
@@ -291,7 +297,7 @@
   // replay) and we want to prevent new senders from being accidentally created.
   bool allow_new_senders_ = true;
 
-  std::vector<uint16_t> available_buffer_indices_;
+  std::vector<ipc_lib::QueueIndex::PackedIndexType> available_buffer_indices_;
 
   const EventScheduler *scheduler_;
 
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index cc46dac..b6c2a89 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -141,6 +141,32 @@
 )
 
 cc_library(
+    name = "index32",
+    srcs = ["index.cc"],
+    hdrs = ["index.h"],
+    defines = [
+        "AOS_QUEUE_ATOMIC_SIZE=32",
+    ],
+    target_compatible_with = ["@platforms//os:linux"],
+    visibility = ["//visibility:public"],
+    deps = [
+        ":shm_observers",
+        "@com_github_google_glog//:glog",
+    ],
+)
+
+cc_test(
+    name = "index32_test",
+    srcs = ["index_test.cc"],
+    target_compatible_with = ["@platforms//os:linux"],
+    deps = [
+        ":index32",
+        "//aos/testing:googletest",
+        "@com_github_google_glog//:glog",
+    ],
+)
+
+cc_library(
     name = "index",
     srcs = ["index.cc"],
     hdrs = ["index.h"],
diff --git a/aos/ipc_lib/index.h b/aos/ipc_lib/index.h
index 4814075..434da7c 100644
--- a/aos/ipc_lib/index.h
+++ b/aos/ipc_lib/index.h
@@ -1,6 +1,7 @@
 #ifndef AOS_IPC_LIB_INDEX_H_
 #define AOS_IPC_LIB_INDEX_H_
 
+#include <stdint.h>
 #include <sys/types.h>
 
 #include <atomic>
@@ -10,6 +11,18 @@
 
 #include "aos/ipc_lib/shm_observers.h"
 
+#ifndef AOS_QUEUE_ATOMIC_SIZE
+#if UINTPTR_MAX == 0xffffffff
+#define AOS_QUEUE_ATOMIC_SIZE 32
+/* 32-bit */
+#elif UINTPTR_MAX == 0xffffffffffffffff
+#define AOS_QUEUE_ATOMIC_SIZE 64
+/* 64-bit */
+#else
+#error "Unknown pointer size"
+#endif
+#endif
+
 namespace aos {
 namespace ipc_lib {
 
@@ -40,13 +53,19 @@
 // Structure for holding the index into the queue.
 class QueueIndex {
  public:
+#if AOS_QUEUE_ATOMIC_SIZE == 64
+  typedef uint32_t PackedIndexType;
+#else
+  typedef uint16_t PackedIndexType;
+#endif
+
   // Returns an invalid queue element which uses a reserved value.
-  static QueueIndex Invalid() { return QueueIndex(0xffffffff, 0); }
+  static QueueIndex Invalid() { return QueueIndex(sentinal_value(), 0); }
   // Returns a queue element pointing to 0.
   static QueueIndex Zero(uint32_t count) { return QueueIndex(0, count); }
 
   // Returns true if the index is valid.
-  bool valid() const { return index_ != 0xffffffff; }
+  bool valid() const { return index_ != sentinal_value(); }
 
   // Returns the modulo base used to wrap to avoid overlapping with the reserved
   // number.
@@ -88,10 +107,14 @@
     return QueueIndex(index, count_);
   }
 
-  // Returns true if the lowest 16 bits of the queue index from the Index could
-  // plausibly match this queue index.
-  bool IsPlausible(uint16_t queue_index) const {
-    return valid() && (queue_index == static_cast<uint16_t>(index_ & 0xffff));
+  // Returns true if the lowest bits of the queue index from the Index could
+  // plausibly match this queue index.  The number of bits matched depends on
+  // the the size of atomics in use.
+  bool IsPlausible(PackedIndexType queue_index) const {
+    return valid() &&
+           (queue_index ==
+            static_cast<PackedIndexType>(
+                index_ & std::numeric_limits<PackedIndexType>::max()));
   }
 
   bool operator==(const QueueIndex other) const {
@@ -170,21 +193,39 @@
 // Structure holding the queue index and the index into the message list.
 class Index {
  public:
+#if AOS_QUEUE_ATOMIC_SIZE == 64
+  typedef uint64_t IndexType;
+  typedef uint32_t MessageIndexType;
+#else
+  typedef uint32_t IndexType;
+  typedef uint16_t MessageIndexType;
+#endif
+  typedef QueueIndex::PackedIndexType PackedIndexType;
+
   // Constructs an Index.  queue_index is the QueueIndex of this message, and
   // message_index is the index into the messages structure.
-  Index(QueueIndex queue_index, uint16_t message_index)
+  Index(QueueIndex queue_index, MessageIndexType message_index)
       : Index(queue_index.index_, message_index) {}
-  Index(uint32_t queue_index, uint16_t message_index)
-      : index_((queue_index & 0xffff) |
-               (static_cast<uint32_t>(message_index) << 16)) {
+  Index(uint32_t queue_index, MessageIndexType message_index)
+      : index_(static_cast<IndexType>(
+                   queue_index & std::numeric_limits<PackedIndexType>::max()) |
+               (static_cast<IndexType>(message_index)
+                << std::numeric_limits<PackedIndexType>::digits)) {
     CHECK_LE(message_index, MaxMessages());
   }
 
   // Index of this message in the message array.
-  uint16_t message_index() const { return (index_ >> 16) & 0xffff; }
+  MessageIndexType message_index() const {
+    return (index_ >> std::numeric_limits<PackedIndexType>::digits) &
+           std::numeric_limits<MessageIndexType>::max();
+  }
 
-  // Lowest 16 bits of the queue index of this message in the queue.
-  uint16_t queue_index() const { return index_ & 0xffff; }
+  // Lowest bits of the queue index of this message in the queue.  This will
+  // either be 16 or 32 bits, depending on if we have 32 or 64 bit atomics under
+  // the cover.
+  PackedIndexType queue_index() const {
+    return index_ & std::numeric_limits<PackedIndexType>::max();
+  }
 
   // Returns true if the provided queue index plausibly represents this Index.
   bool IsPlausible(QueueIndex queue_index) const {
@@ -197,29 +238,33 @@
   bool valid() const { return index_ != sentinal_value(); }
 
   // Returns the raw Index.  This should only be used for debug.
-  uint32_t get() const { return index_; }
+  IndexType get() const { return index_; }
 
   // Returns the maximum number of messages we can store before overflowing.
-  static constexpr uint16_t MaxMessages() { return 0xfffe; }
+  static constexpr MessageIndexType MaxMessages() {
+    return std::numeric_limits<MessageIndexType>::max() - 1;
+  }
 
   bool operator==(const Index other) const { return other.index_ == index_; }
   bool operator!=(const Index other) const { return other.index_ != index_; }
 
   // Returns a string representing the index.
-  ::std::string DebugString() const;
+  std::string DebugString() const;
 
  private:
-  Index(uint32_t index) : index_(index) {}
+  Index(IndexType index) : index_(index) {}
 
   friend class AtomicIndex;
 
-  static constexpr uint32_t sentinal_value() { return 0xffffffffu; }
+  static constexpr IndexType sentinal_value() {
+    return std::numeric_limits<IndexType>::max();
+  }
 
-  // Note: a value of 0xffffffff is a sentinal to represent an invalid entry.
+  // Note: a value of all 1 bits is a sentinal to represent an invalid entry.
   // This works because we would need to have a queue index of 0x*ffff, *and*
   // have 0xffff messages in the message list.  That constraint is easy to
   // enforce by limiting the max messages.
-  uint32_t index_;
+  IndexType index_;
 };
 
 // Atomic storage for setting and getting Index objects.
@@ -257,7 +302,7 @@
   }
 
  private:
-  ::std::atomic<uint32_t> index_;
+  ::std::atomic<Index::IndexType> index_;
 };
 
 }  // namespace ipc_lib
diff --git a/aos/ipc_lib/index_test.cc b/aos/ipc_lib/index_test.cc
index a288ec9..dca75a0 100644
--- a/aos/ipc_lib/index_test.cc
+++ b/aos/ipc_lib/index_test.cc
@@ -133,6 +133,37 @@
   EXPECT_EQ(index.message_index(), 11);
 }
 
+#if AOS_QUEUE_ATOMIC_SIZE == 64
+// Tests that the 64 bit plausible has sane behavior.
+TEST(IndexTest, TestPlausible) {
+  QueueIndex five = QueueIndex::Zero(100).IncrementBy(5);
+  QueueIndex ffff = QueueIndex::Zero(100).IncrementBy(0xffff);
+
+  // Tests some various combinations of indices.
+  for (int i = 0; i < 100; ++i) {
+    Index index(five, i);
+    EXPECT_EQ(index.queue_index(), 5 + i * 0x10000);
+
+    EXPECT_TRUE(index.IsPlausible(five));
+
+    EXPECT_EQ(index.message_index(), i);
+
+    five = five.IncrementBy(0x10000);
+  }
+
+  // Tests that a queue index with a value of 0xffff doesn't match an invalid
+  // index.
+  for (int i = 0; i < 100; ++i) {
+    Index index(ffff, i);
+    EXPECT_EQ(index.queue_index(), 0xffff);
+
+    EXPECT_TRUE(index.IsPlausible(ffff));
+    EXPECT_FALSE(index.IsPlausible(QueueIndex::Invalid()));
+
+    EXPECT_EQ(index.message_index(), i);
+  }
+}
+#else
 // Tests that Plausible behaves.
 TEST(IndexTest, TestPlausible) {
   QueueIndex five = QueueIndex::Zero(100).IncrementBy(5);
@@ -162,6 +193,16 @@
     EXPECT_EQ(index.message_index(), i);
   }
 }
+#endif
+
+// Tests that the max message size makes sense.
+TEST(IndexTest, TestMaxMessages) {
+#if AOS_QUEUE_ATOMIC_SIZE == 64
+  EXPECT_EQ(Index::MaxMessages(), 0xfffffffe);
+#else
+  EXPECT_EQ(Index::MaxMessages(), 0xfffe);
+#endif
+}
 
 }  // namespace testing
 }  // namespace ipc_lib
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index 3d2c0d1..9a53eb0 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -627,7 +627,8 @@
       ::aos::ipc_lib::Sender *s = memory->GetSender(i);
       // Nobody else can possibly be touching these because we haven't set
       // initialized to true yet.
-      s->scratch_index.RelaxedStore(Index(0xffff, i + memory->queue_size()));
+      s->scratch_index.RelaxedStore(
+          Index(QueueIndex::Invalid(), i + memory->queue_size()));
       s->to_replace.RelaxedInvalidate();
     }
 
@@ -636,7 +637,8 @@
       // Nobody else can possibly be touching these because we haven't set
       // initialized to true yet.
       pinner->scratch_index.RelaxedStore(
-          Index(0xffff, i + memory->num_senders() + memory->queue_size()));
+          Index(QueueIndex::Invalid(),
+                i + memory->num_senders() + memory->queue_size()));
       pinner->pinned.Invalidate();
     }
 
@@ -1400,7 +1402,7 @@
 
 // Prints out the mutex state.  Not safe to use while the mutex is being
 // changed.
-::std::string PrintMutex(aos_mutex *mutex) {
+::std::string PrintMutex(const aos_mutex *mutex) {
   ::std::stringstream s;
   s << "aos_mutex(" << ::std::hex << mutex->futex;
 
@@ -1418,7 +1420,7 @@
 
 }  // namespace
 
-void PrintLocklessQueueMemory(LocklessQueueMemory *memory) {
+void PrintLocklessQueueMemory(const LocklessQueueMemory *memory) {
   const size_t queue_size = memory->queue_size();
   ::std::cout << "LocklessQueueMemory (" << memory << ") {" << ::std::endl;
   ::std::cout << "  aos_mutex queue_setup_lock = "
@@ -1452,7 +1454,7 @@
   ::std::cout << "  Message messages[" << memory->num_messages() << "] {"
               << ::std::endl;
   for (size_t i = 0; i < memory->num_messages(); ++i) {
-    Message *m = memory->GetMessage(Index(i, i));
+    const Message *m = memory->GetMessage(Index(i, i));
     ::std::cout << "    [" << i << "] -> Message 0x" << std::hex
                 << (reinterpret_cast<uintptr_t>(
                         memory->GetMessage(Index(i, i))) -
@@ -1484,8 +1486,9 @@
     ::std::cout << "      }" << ::std::endl;
     const bool corrupt = CheckBothRedzones(memory, m);
     if (corrupt) {
-      absl::Span<char> pre_redzone = m->PreRedzone(memory->message_data_size());
-      absl::Span<char> post_redzone =
+      absl::Span<const char> pre_redzone =
+          m->PreRedzone(memory->message_data_size());
+      absl::Span<const char> post_redzone =
           m->PostRedzone(memory->message_data_size(), memory->message_size());
 
       ::std::cout << "      pre-redzone: \""
@@ -1514,7 +1517,7 @@
   ::std::cout << "  Sender senders[" << memory->num_senders() << "] {"
               << ::std::endl;
   for (size_t i = 0; i < memory->num_senders(); ++i) {
-    Sender *s = memory->GetSender(i);
+    const Sender *s = memory->GetSender(i);
     ::std::cout << "    [" << i << "] -> Sender {" << ::std::endl;
     ::std::cout << "      aos_mutex tid = " << PrintMutex(&s->tid)
                 << ::std::endl;
@@ -1529,7 +1532,7 @@
   ::std::cout << "  Pinner pinners[" << memory->num_pinners() << "] {"
               << ::std::endl;
   for (size_t i = 0; i < memory->num_pinners(); ++i) {
-    Pinner *p = memory->GetPinner(i);
+    const Pinner *p = memory->GetPinner(i);
     ::std::cout << "    [" << i << "] -> Pinner {" << ::std::endl;
     ::std::cout << "      aos_mutex tid = " << PrintMutex(&p->tid)
                 << ::std::endl;
@@ -1545,7 +1548,7 @@
   ::std::cout << "  Watcher watchers[" << memory->num_watchers() << "] {"
               << ::std::endl;
   for (size_t i = 0; i < memory->num_watchers(); ++i) {
-    Watcher *w = memory->GetWatcher(i);
+    const Watcher *w = memory->GetWatcher(i);
     ::std::cout << "    [" << i << "] -> Watcher {" << ::std::endl;
     ::std::cout << "      aos_mutex tid = " << PrintMutex(&w->tid)
                 << ::std::endl;
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index 9cc97c0..f83b558 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -454,7 +454,7 @@
 // before and after a time with a binary search.
 
 // Prints to stdout the data inside the queue for debugging.
-void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
+void PrintLocklessQueueMemory(const LocklessQueueMemory *memory);
 
 }  // namespace ipc_lib
 }  // namespace aos
diff --git a/aos/ipc_lib/lockless_queue_memory.h b/aos/ipc_lib/lockless_queue_memory.h
index b3f9468..713d9cd 100644
--- a/aos/ipc_lib/lockless_queue_memory.h
+++ b/aos/ipc_lib/lockless_queue_memory.h
@@ -119,6 +119,14 @@
         SizeOfSenders() + pinner_index * sizeof(Pinner));
   }
 
+  const Sender *GetSender(size_t sender_index) const {
+    static_assert(alignof(Sender) <= kDataAlignment,
+                  "kDataAlignment is too small");
+    return reinterpret_cast<const Sender *>(
+        &data[0] + SizeOfQueue() + SizeOfMessages() + SizeOfWatchers() +
+        sender_index * sizeof(Sender));
+  }
+
   Sender *GetSender(size_t sender_index) {
     static_assert(alignof(Sender) <= kDataAlignment,
                   "kDataAlignment is too small");
diff --git a/aos/ipc_lib/memory_mapped_queue.cc b/aos/ipc_lib/memory_mapped_queue.cc
index c2d8d44..79f27d9 100644
--- a/aos/ipc_lib/memory_mapped_queue.cc
+++ b/aos/ipc_lib/memory_mapped_queue.cc
@@ -17,7 +17,7 @@
 
 std::string ShmPath(std::string_view shm_base, const Channel *channel) {
   CHECK(channel->has_type());
-  return ShmFolder(shm_base, channel) + channel->type()->str() + ".v4";
+  return ShmFolder(shm_base, channel) + channel->type()->str() + ".v5";
 }
 
 void PageFaultDataWrite(char *data, size_t size) {
@@ -67,6 +67,15 @@
   // copy.
   config.num_pinners = channel->num_readers();
   config.queue_size = configuration::QueueSize(configuration, channel);
+  CHECK_LT(config.queue_size,
+           std::numeric_limits<QueueIndex::PackedIndexType>::max())
+      << ": More messages/second configured than the queue can hold on "
+      << configuration::CleanedChannelToString(channel) << ", "
+      << channel->frequency() << "hz for "
+      << std::chrono::duration<double>(
+             configuration::ChannelStorageDuration(configuration, channel))
+             .count()
+      << "sec";
   config.message_data_size = channel->max_size();
 
   return config;
diff --git a/aos/starter/starter_test.cc b/aos/starter/starter_test.cc
index fcbf287..3070f8e 100644
--- a/aos/starter/starter_test.cc
+++ b/aos/starter/starter_test.cc
@@ -20,6 +20,23 @@
 namespace aos {
 namespace starter {
 
+class ThreadedStarterRunner {
+ public:
+  ThreadedStarterRunner(Starter *starter)
+      : my_thread_([this, starter]() {
+          starter->event_loop()->OnRun([this]() { event_.Set(); });
+          starter->Run();
+        }) {
+    event_.Wait();
+  }
+
+  ~ThreadedStarterRunner() { my_thread_.join(); }
+
+ private:
+  aos::Event event_;
+  std::thread my_thread_;
+};
+
 class StarterdTest : public ::testing::Test {
  public:
   StarterdTest() {
@@ -161,26 +178,17 @@
 
   SetupStarterCleanup(&starter);
 
-  Event starter_started;
-  std::thread starterd_thread([&starter, &starter_started] {
-    starter.event_loop()->OnRun(
-        [&starter_started]() { starter_started.Set(); });
-    starter.Run();
-  });
-  starter_started.Wait();
+  ThreadedStarterRunner starterd_thread(&starter);
 
-  Event client_started;
-  std::thread client_thread([&client_loop, &client_started] {
-    client_loop.OnRun([&client_started]() { client_started.Set(); });
-    client_loop.Run();
-  });
-  client_started.Wait();
+  aos::Event event;
+  client_loop.OnRun([&event]() { event.Set(); });
+  std::thread client_thread([&client_loop] { client_loop.Run(); });
+  event.Wait();
 
   watcher_loop.Run();
   test_done_ = true;
   client_thread.join();
   ASSERT_TRUE(success);
-  starterd_thread.join();
 }
 
 INSTANTIATE_TEST_SUITE_P(
@@ -270,18 +278,10 @@
 
   SetupStarterCleanup(&starter);
 
-  Event starter_started;
-  std::thread starterd_thread([&starter, &starter_started] {
-    starter.event_loop()->OnRun(
-        [&starter_started]() { starter_started.Set(); });
-    starter.Run();
-  });
-  starter_started.Wait();
+  ThreadedStarterRunner starterd_thread(&starter);
   watcher_loop.Run();
 
   test_done_ = true;
-
-  starterd_thread.join();
 }
 
 TEST_F(StarterdTest, Autostart) {
@@ -365,18 +365,10 @@
 
   SetupStarterCleanup(&starter);
 
-  Event starter_started;
-  std::thread starterd_thread([&starter, &starter_started] {
-    starter.event_loop()->OnRun(
-        [&starter_started]() { starter_started.Set(); });
-    starter.Run();
-  });
-  starter_started.Wait();
+  ThreadedStarterRunner starterd_thread(&starter);
   watcher_loop.Run();
 
   test_done_ = true;
-
-  starterd_thread.join();
 }
 
 // Tests that starterd respects autorestart.
@@ -462,18 +454,10 @@
 
   SetupStarterCleanup(&starter);
 
-  Event starter_started;
-  std::thread starterd_thread([&starter, &starter_started] {
-    starter.event_loop()->OnRun(
-        [&starter_started]() { starter_started.Set(); });
-    starter.Run();
-  });
-  starter_started.Wait();
+  ThreadedStarterRunner starterd_thread(&starter);
   watcher_loop.Run();
 
   test_done_ = true;
-
-  starterd_thread.join();
 }
 
 TEST_F(StarterdTest, StarterChainTest) {
@@ -579,17 +563,11 @@
 
   // run `starter.Run()` in a thread to simulate it running on
   // another process.
-  Event started;
-  std::thread starterd_thread([&starter, &started] {
-    starter.event_loop()->OnRun([&started]() { started.Set(); });
-    starter.Run();
-  });
+  ThreadedStarterRunner starterd_thread(&starter);
 
-  started.Wait();
   client_loop.Run();
   EXPECT_TRUE(success);
   ASSERT_FALSE(starter.event_loop()->is_running());
-  starterd_thread.join();
 }
 
 }  // namespace starter