Use starterd to pre-allocate all AOS message queue shared memory.

AOS uses shared memory for intranode message channels.  The first
application that creates a sender/fetcher/watcher for each channel
allocates the shared memory for the message queues, and is given credit
for the shared memory as part of its memory footprint.
The non-deterministic launch order of all the shasta applications means
that the measured memory footprint of many of the processes can vary
greatly, depending on where it was in the launch order that time.
As a result, the memory_limit values in the config files are way
bigger than they need to be, if we could have deterministic measurements
of the actual memory usage of each process.

The starterd process is what actually spawns all the processes, using
the configuration data.  Making this process create MemoryMappedQueues
for every intranode channel on its node will pre-allocate all the
message queue shared memory, which dramatially reduces the measured
memory usage of most of the processes it spawns, and makes those numbers
much more deterministic.

Renamed MMappedQueue to MemoryMappedQueue and moved it from
shm_event_loop.cc to new source files in the lockless_queue library.
Update starterd_lib to identify all channels in the config that are
readable on this node and allocate MemoryMappedQueues for each.

Change-Id: I40649b61653968495ba616b72f95d799a8e06414
Signed-off-by: Brian J Griglak <brian.griglak@bluerivertech.com>
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index b2502a7..1352249 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -1,10 +1,8 @@
 #include "aos/events/shm_event_loop.h"
 
-#include <sys/mman.h>
 #include <sys/stat.h>
 #include <sys/syscall.h>
 #include <sys/types.h>
-#include <unistd.h>
 
 #include <algorithm>
 #include <atomic>
@@ -12,13 +10,13 @@
 #include <iterator>
 #include <stdexcept>
 
-#include "absl/strings/str_cat.h"
 #include "aos/events/aos_logging.h"
 #include "aos/events/epoll.h"
 #include "aos/events/event_loop_generated.h"
 #include "aos/events/timing_statistics.h"
 #include "aos/init.h"
 #include "aos/ipc_lib/lockless_queue.h"
+#include "aos/ipc_lib/memory_mapped_queue.h"
 #include "aos/realtime.h"
 #include "aos/stl_mutex/stl_mutex.h"
 #include "aos/util/file.h"
@@ -56,158 +54,6 @@
 
 namespace {
 
-std::string ShmFolder(std::string_view shm_base, const Channel *channel) {
-  CHECK(channel->has_name());
-  CHECK_EQ(channel->name()->string_view()[0], '/');
-  return absl::StrCat(shm_base, channel->name()->string_view(), "/");
-}
-std::string ShmPath(std::string_view shm_base, const Channel *channel) {
-  CHECK(channel->has_type());
-  return ShmFolder(shm_base, channel) + channel->type()->str() + ".v4";
-}
-
-void PageFaultDataWrite(char *data, size_t size) {
-  // This just has to divide the actual page size. Being smaller will make this
-  // a bit slower than necessary, but not much. 1024 is a pretty conservative
-  // choice (most pages are probably 4096).
-  static constexpr size_t kPageSize = 1024;
-  const size_t pages = (size + kPageSize - 1) / kPageSize;
-  for (size_t i = 0; i < pages; ++i) {
-    char zero = 0;
-    // We need to ensure there's a writable pagetable entry, but avoid modifying
-    // the data.
-    //
-    // Even if you lock the data into memory, some kernels still seem to lazily
-    // create the actual pagetable entries. This means we need to somehow
-    // "write" to the page.
-    //
-    // Also, this takes place while other processes may be concurrently
-    // opening/initializing the memory, so we need to avoid corrupting that.
-    //
-    // This is the simplest operation I could think of which achieves that:
-    // "store 0 if it's already 0".
-    __atomic_compare_exchange_n(&data[i * kPageSize], &zero, 0, true,
-                                __ATOMIC_RELAXED, __ATOMIC_RELAXED);
-  }
-}
-
-void PageFaultDataRead(const char *data, size_t size) {
-  // This just has to divide the actual page size. Being smaller will make this
-  // a bit slower than necessary, but not much. 1024 is a pretty conservative
-  // choice (most pages are probably 4096).
-  static constexpr size_t kPageSize = 1024;
-  const size_t pages = (size + kPageSize - 1) / kPageSize;
-  for (size_t i = 0; i < pages; ++i) {
-    // We need to ensure there's a readable pagetable entry.
-    __atomic_load_n(&data[i * kPageSize], __ATOMIC_RELAXED);
-  }
-}
-
-ipc_lib::LocklessQueueConfiguration MakeQueueConfiguration(
-    const Configuration *configuration, const Channel *channel) {
-  ipc_lib::LocklessQueueConfiguration config;
-
-  config.num_watchers = channel->num_watchers();
-  config.num_senders = channel->num_senders();
-  // The value in the channel will default to 0 if readers are configured to
-  // copy.
-  config.num_pinners = channel->num_readers();
-  config.queue_size = configuration::QueueSize(configuration, channel);
-  config.message_data_size = channel->max_size();
-
-  return config;
-}
-
-class MMappedQueue {
- public:
-  MMappedQueue(std::string_view shm_base, const Configuration *config,
-               const Channel *channel)
-      : config_(MakeQueueConfiguration(config, channel)) {
-    std::string path = ShmPath(shm_base, channel);
-
-    size_ = ipc_lib::LocklessQueueMemorySize(config_);
-
-    util::MkdirP(path, FLAGS_permissions);
-
-    // There are 2 cases.  Either the file already exists, or it does not
-    // already exist and we need to create it.  Start by trying to create it. If
-    // that fails, the file has already been created and we can open it
-    // normally..  Once the file has been created it will never be deleted.
-    int fd = open(path.c_str(), O_RDWR | O_CREAT | O_EXCL,
-                  O_CLOEXEC | FLAGS_permissions);
-    if (fd == -1 && errno == EEXIST) {
-      VLOG(1) << path << " already created.";
-      // File already exists.
-      fd = open(path.c_str(), O_RDWR, O_CLOEXEC);
-      PCHECK(fd != -1) << ": Failed to open " << path;
-      while (true) {
-        struct stat st;
-        PCHECK(fstat(fd, &st) == 0);
-        if (st.st_size != 0) {
-          CHECK_EQ(static_cast<size_t>(st.st_size), size_)
-              << ": Size of " << path
-              << " doesn't match expected size of backing queue file.  Did the "
-                 "queue definition change?";
-          break;
-        } else {
-          // The creating process didn't get around to it yet.  Give it a bit.
-          std::this_thread::sleep_for(std::chrono::milliseconds(10));
-          VLOG(1) << path << " is zero size, waiting";
-        }
-      }
-    } else {
-      VLOG(1) << "Created " << path;
-      PCHECK(fd != -1) << ": Failed to open " << path;
-      PCHECK(ftruncate(fd, size_) == 0);
-    }
-
-    data_ = mmap(NULL, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
-    PCHECK(data_ != MAP_FAILED);
-    const_data_ = mmap(NULL, size_, PROT_READ, MAP_SHARED, fd, 0);
-    PCHECK(const_data_ != MAP_FAILED);
-    PCHECK(close(fd) == 0);
-    PageFaultDataWrite(static_cast<char *>(data_), size_);
-    PageFaultDataRead(static_cast<const char *>(const_data_), size_);
-
-    ipc_lib::InitializeLocklessQueueMemory(memory(), config_);
-  }
-
-  ~MMappedQueue() {
-    PCHECK(munmap(data_, size_) == 0);
-    PCHECK(munmap(const_cast<void *>(const_data_), size_) == 0);
-  }
-
-  ipc_lib::LocklessQueueMemory *memory() const {
-    return reinterpret_cast<ipc_lib::LocklessQueueMemory *>(data_);
-  }
-
-  const ipc_lib::LocklessQueueMemory *const_memory() const {
-    return reinterpret_cast<const ipc_lib::LocklessQueueMemory *>(const_data_);
-  }
-
-  const ipc_lib::LocklessQueueConfiguration &config() const { return config_; }
-
-  ipc_lib::LocklessQueue queue() const {
-    return ipc_lib::LocklessQueue(const_memory(), memory(), config());
-  }
-
-  absl::Span<char> GetMutableSharedMemory() const {
-    return absl::Span<char>(static_cast<char *>(data_), size_);
-  }
-
-  absl::Span<const char> GetConstSharedMemory() const {
-    return absl::Span<const char>(static_cast<const char *>(const_data_),
-                                  size_);
-  }
-
- private:
-  const ipc_lib::LocklessQueueConfiguration config_;
-
-  size_t size_;
-  void *data_;
-  const void *const_data_;
-};
-
 const Node *MaybeMyNode(const Configuration *configuration) {
   if (!configuration->has_nodes()) {
     return nullptr;
@@ -216,8 +62,6 @@
   return configuration::GetMyNode(configuration);
 }
 
-namespace chrono = ::std::chrono;
-
 }  // namespace
 
 ShmEventLoop::ShmEventLoop(const Configuration *configuration)
@@ -241,7 +85,8 @@
                             const Channel *channel)
       : event_loop_(event_loop),
         channel_(channel),
-        lockless_queue_memory_(shm_base, event_loop->configuration(), channel),
+        lockless_queue_memory_(shm_base, FLAGS_permissions,
+                               event_loop->configuration(), channel),
         reader_(lockless_queue_memory_.queue()) {
     context_.data = nullptr;
     // Point the queue index at the next index to read starting now.  This
@@ -450,7 +295,7 @@
 
   aos::ShmEventLoop *event_loop_;
   const Channel *const channel_;
-  MMappedQueue lockless_queue_memory_;
+  ipc_lib::MemoryMappedQueue lockless_queue_memory_;
   ipc_lib::LocklessQueueReader reader_;
   // This being nullopt indicates we're not looking for wakeups right now.
   std::optional<ipc_lib::LocklessQueueWatcher> watcher_;
@@ -531,7 +376,8 @@
   explicit ShmSender(std::string_view shm_base, EventLoop *event_loop,
                      const Channel *channel)
       : RawSender(event_loop, channel),
-        lockless_queue_memory_(shm_base, event_loop->configuration(), channel),
+        lockless_queue_memory_(shm_base, FLAGS_permissions,
+                               event_loop->configuration(), channel),
         lockless_queue_sender_(VerifySender(
             ipc_lib::LocklessQueueSender::Make(
                 lockless_queue_memory_.queue(),
@@ -638,7 +484,7 @@
                << static_cast<int>(result);
   }
 
-  MMappedQueue lockless_queue_memory_;
+  ipc_lib::MemoryMappedQueue lockless_queue_memory_;
   ipc_lib::LocklessQueueSender lockless_queue_sender_;
   ipc_lib::LocklessQueueWakeUpper wake_upper_;
 };
@@ -751,7 +597,7 @@
       return;
     }
 
-    if (repeat_offset_ == chrono::seconds(0)) {
+    if (repeat_offset_ == std::chrono::seconds(0)) {
       timerfd_.Disable();
     } else {
       // Compute how many cycles have elapsed and schedule the next iteration
@@ -765,7 +611,7 @@
       // Update the heap and schedule the timerfd wakeup.
       event_.set_event_time(base_);
       shm_event_loop_->AddEvent(&event_);
-      timerfd_.SetTime(base_, chrono::seconds(0));
+      timerfd_.SetTime(base_, std::chrono::seconds(0));
     }
   }
 
@@ -1239,7 +1085,8 @@
 
 int ShmEventLoop::NumberBuffers(const Channel *channel) {
   CheckCurrentThread();
-  return MakeQueueConfiguration(configuration(), channel).num_messages();
+  return ipc_lib::MakeQueueConfiguration(configuration(), channel)
+      .num_messages();
 }
 
 absl::Span<char> ShmEventLoop::GetShmSenderSharedMemory(
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index ef6c4f3..381a273 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -168,14 +168,19 @@
     srcs = [
         "lockless_queue.cc",
         "lockless_queue_memory.h",
+        "memory_mapped_queue.cc",
     ],
-    hdrs = ["lockless_queue.h"],
+    hdrs = [
+        "lockless_queue.h",
+        "memory_mapped_queue.h",
+    ],
     target_compatible_with = ["@platforms//os:linux"],
     visibility = ["//visibility:public"],
     deps = [
         ":aos_sync",
         ":data_alignment",
         ":index",
+        "//aos:configuration",
         "//aos:realtime",
         "//aos:uuid",
         "//aos/time",
diff --git a/aos/ipc_lib/memory_mapped_queue.cc b/aos/ipc_lib/memory_mapped_queue.cc
new file mode 100644
index 0000000..c2d8d44
--- /dev/null
+++ b/aos/ipc_lib/memory_mapped_queue.cc
@@ -0,0 +1,135 @@
+#include "aos/ipc_lib/memory_mapped_queue.h"
+
+#include <fcntl.h>
+#include <sys/mman.h>
+#include <unistd.h>
+
+#include "absl/strings/str_cat.h"
+
+namespace aos {
+namespace ipc_lib {
+
+std::string ShmFolder(std::string_view shm_base, const Channel *channel) {
+  CHECK(channel->has_name());
+  CHECK_EQ(channel->name()->string_view()[0], '/');
+  return absl::StrCat(shm_base, channel->name()->string_view(), "/");
+}
+
+std::string ShmPath(std::string_view shm_base, const Channel *channel) {
+  CHECK(channel->has_type());
+  return ShmFolder(shm_base, channel) + channel->type()->str() + ".v4";
+}
+
+void PageFaultDataWrite(char *data, size_t size) {
+  // This just has to divide the actual page size. Being smaller will make this
+  // a bit slower than necessary, but not much. 1024 is a pretty conservative
+  // choice (most pages are probably 4096).
+  static constexpr size_t kPageSize = 1024;
+  const size_t pages = (size + kPageSize - 1) / kPageSize;
+  for (size_t i = 0; i < pages; ++i) {
+    char zero = 0;
+    // We need to ensure there's a writable pagetable entry, but avoid modifying
+    // the data.
+    //
+    // Even if you lock the data into memory, some kernels still seem to lazily
+    // create the actual pagetable entries. This means we need to somehow
+    // "write" to the page.
+    //
+    // Also, this takes place while other processes may be concurrently
+    // opening/initializing the memory, so we need to avoid corrupting that.
+    //
+    // This is the simplest operation I could think of which achieves that:
+    // "store 0 if it's already 0".
+    __atomic_compare_exchange_n(&data[i * kPageSize], &zero, 0, true,
+                                __ATOMIC_RELAXED, __ATOMIC_RELAXED);
+  }
+}
+
+void PageFaultDataRead(const char *data, size_t size) {
+  // This just has to divide the actual page size. Being smaller will make this
+  // a bit slower than necessary, but not much. 1024 is a pretty conservative
+  // choice (most pages are probably 4096).
+  static constexpr size_t kPageSize = 1024;
+  const size_t pages = (size + kPageSize - 1) / kPageSize;
+  for (size_t i = 0; i < pages; ++i) {
+    // We need to ensure there's a readable pagetable entry.
+    __atomic_load_n(&data[i * kPageSize], __ATOMIC_RELAXED);
+  }
+}
+
+LocklessQueueConfiguration MakeQueueConfiguration(
+    const Configuration *configuration, const Channel *channel) {
+  LocklessQueueConfiguration config;
+
+  config.num_watchers = channel->num_watchers();
+  config.num_senders = channel->num_senders();
+  // The value in the channel will default to 0 if readers are configured to
+  // copy.
+  config.num_pinners = channel->num_readers();
+  config.queue_size = configuration::QueueSize(configuration, channel);
+  config.message_data_size = channel->max_size();
+
+  return config;
+}
+
+MemoryMappedQueue::MemoryMappedQueue(std::string_view shm_base,
+                                     uint32_t permissions,
+                                     const Configuration *config,
+                                     const Channel *channel)
+    : config_(MakeQueueConfiguration(config, channel)) {
+  std::string path = ShmPath(shm_base, channel);
+
+  size_ = ipc_lib::LocklessQueueMemorySize(config_);
+
+  util::MkdirP(path, permissions);
+
+  // There are 2 cases.  Either the file already exists, or it does not
+  // already exist and we need to create it.  Start by trying to create it. If
+  // that fails, the file has already been created and we can open it
+  // normally..  Once the file has been created it will never be deleted.
+  int fd =
+      open(path.c_str(), O_RDWR | O_CREAT | O_EXCL, O_CLOEXEC | permissions);
+  if ((fd == -1) && (errno == EEXIST)) {
+    VLOG(1) << path << " already created.";
+    // File already exists.
+    fd = open(path.c_str(), O_RDWR, O_CLOEXEC);
+    PCHECK(fd != -1) << ": Failed to open " << path;
+    while (true) {
+      struct stat st;
+      PCHECK(fstat(fd, &st) == 0);
+      if (st.st_size != 0) {
+        CHECK_EQ(static_cast<size_t>(st.st_size), size_)
+            << ": Size of " << path
+            << " doesn't match expected size of backing queue file.  Did the "
+               "queue definition change?";
+        break;
+      } else {
+        // The creating process didn't get around to it yet.  Give it a bit.
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        VLOG(1) << path << " is zero size, waiting";
+      }
+    }
+  } else {
+    VLOG(1) << "Created " << path;
+    PCHECK(fd != -1) << ": Failed to open " << path;
+    PCHECK(ftruncate(fd, size_) == 0);
+  }
+
+  data_ = mmap(NULL, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+  PCHECK(data_ != MAP_FAILED);
+  const_data_ = mmap(NULL, size_, PROT_READ, MAP_SHARED, fd, 0);
+  PCHECK(const_data_ != MAP_FAILED);
+  PCHECK(close(fd) == 0);
+  PageFaultDataWrite(static_cast<char *>(data_), size_);
+  PageFaultDataRead(static_cast<const char *>(const_data_), size_);
+
+  ipc_lib::InitializeLocklessQueueMemory(memory(), config_);
+}
+
+MemoryMappedQueue::~MemoryMappedQueue() {
+  PCHECK(munmap(data_, size_) == 0);
+  PCHECK(munmap(const_cast<void *>(const_data_), size_) == 0);
+}
+
+}  // namespace ipc_lib
+}  // namespace aos
diff --git a/aos/ipc_lib/memory_mapped_queue.h b/aos/ipc_lib/memory_mapped_queue.h
new file mode 100644
index 0000000..784c559
--- /dev/null
+++ b/aos/ipc_lib/memory_mapped_queue.h
@@ -0,0 +1,63 @@
+#ifndef AOS_IPC_LIB_MEMORY_MAPPED_QUEUE_H_
+#define AOS_IPC_LIB_MEMORY_MAPPED_QUEUE_H_
+
+#include "absl/types/span.h"
+#include "aos/configuration.h"
+#include "aos/ipc_lib/lockless_queue.h"
+
+namespace aos {
+namespace ipc_lib {
+
+std::string ShmFolder(std::string_view shm_base, const Channel *channel);
+
+std::string ShmPath(std::string_view shm_base, const Channel *channel);
+
+LocklessQueueConfiguration MakeQueueConfiguration(
+    const Configuration *configuration, const Channel *channel);
+
+class MemoryMappedQueue {
+ public:
+  MemoryMappedQueue(std::string_view shm_base, uint32_t permissions,
+                    const Configuration *config, const Channel *channel);
+  ~MemoryMappedQueue();
+
+  // This class can't be default or copy constructed.
+  MemoryMappedQueue() = delete;
+  MemoryMappedQueue(const MemoryMappedQueue &other) = delete;
+  MemoryMappedQueue &operator=(const MemoryMappedQueue &rhs) = delete;
+
+  LocklessQueueMemory *memory() const {
+    return reinterpret_cast<ipc_lib::LocklessQueueMemory *>(data_);
+  }
+
+  const LocklessQueueMemory *const_memory() const {
+    return reinterpret_cast<const LocklessQueueMemory *>(const_data_);
+  }
+
+  const LocklessQueueConfiguration &config() const { return config_; }
+
+  LocklessQueue queue() const {
+    return LocklessQueue(const_memory(), memory(), config());
+  }
+
+  absl::Span<char> GetMutableSharedMemory() const {
+    return absl::Span<char>(static_cast<char *>(data_), size_);
+  }
+
+  absl::Span<const char> GetConstSharedMemory() const {
+    return absl::Span<const char>(static_cast<const char *>(const_data_),
+                                  size_);
+  }
+
+ private:
+  const LocklessQueueConfiguration config_;
+
+  size_t size_;
+  void *data_;
+  const void *const_data_;
+};
+
+}  // namespace ipc_lib
+}  // namespace aos
+
+#endif  //  AOS_IPC_LIB_MEMORY_MAPPED_QUEUE_H_
diff --git a/aos/starter/starterd_lib.cc b/aos/starter/starterd_lib.cc
index 84e4d00..485d1f1 100644
--- a/aos/starter/starterd_lib.cc
+++ b/aos/starter/starterd_lib.cc
@@ -8,6 +8,13 @@
 #include "glog/logging.h"
 #include "glog/stl_logging.h"
 
+// FLAGS_shm_base is defined elsewhere, declare it here so it can be used
+// to override the shared memory folder for unit testing.
+DECLARE_string(shm_base);
+// FLAGS_permissions is defined elsewhere, declare it here so it can be used
+// to set the file permissions on the shared memory block.
+DECLARE_uint32(permissions);
+
 namespace aos {
 namespace starter {
 
@@ -32,6 +39,7 @@
       max_status_count_(
           event_loop_.GetChannel<aos::starter::Status>("/aos")->frequency() -
           1),
+      shm_base_(FLAGS_shm_base),
       listener_(&event_loop_,
                 [this](signalfd_siginfo signal) { OnSignal(signal); }),
       top_(&event_loop_) {
@@ -67,6 +75,8 @@
     }
   }
 
+  // Catalogue all the applications for this node, so we can keep an eye on
+  // them.
   if (config_msg_->has_applications()) {
     const flatbuffers::Vector<flatbuffers::Offset<aos::Application>>
         *applications = config_msg_->applications();
@@ -88,6 +98,19 @@
       }
     }
   }
+
+  // Catalogue all the intranode channels for this node, and create
+  // MemoryMappedQueues for each one to allocate the shared memory before
+  // spawning any shasta process.
+  if (config_msg_->has_channels()) {
+    const aos::Node *this_node = event_loop_.node();
+    std::vector<const aos::Channel *> intranode_channels;
+    for (const aos::Channel *channel : *config_msg_->channels()) {
+      if (aos::configuration::ChannelIsReadableOnNode(channel, this_node)) {
+        AddChannel(channel);
+      }
+    }
+  }
 }
 
 void Starter::HandleStarterRpc(const StarterRpc &command) {
@@ -220,5 +243,14 @@
   builder.CheckOk(builder.Send(status_builder.Finish()));
 }
 
+void Starter::AddChannel(const aos::Channel *channel) {
+  CHECK_NOTNULL(channel);
+  shm_queues_.emplace_back(std::make_unique<aos::ipc_lib::MemoryMappedQueue>(
+      shm_base_, FLAGS_permissions, event_loop_.configuration(), channel));
+  VLOG(1) << "Created MemoryMappedQueue for "
+          << aos::configuration::StrippedChannelToString(channel) << " under "
+          << shm_base_;
+}
+
 }  // namespace starter
 }  // namespace aos
diff --git a/aos/starter/starterd_lib.h b/aos/starter/starterd_lib.h
index 66549ef..946e135 100644
--- a/aos/starter/starterd_lib.h
+++ b/aos/starter/starterd_lib.h
@@ -11,7 +11,7 @@
 #include <vector>
 
 #include "aos/configuration.h"
-#include "aos/events/shm_event_loop.h"
+#include "aos/ipc_lib/memory_mapped_queue.h"
 #include "aos/ipc_lib/signalfd.h"
 #include "aos/macros.h"
 #include "aos/starter/starter_generated.h"
@@ -61,6 +61,11 @@
 
   void SendStatus();
 
+  // Creates a MemoryMappedQueue for the given channel, to pre-allocate shared
+  // memory to give this process credit for the memory instead of any other
+  // process that accesses it.
+  void AddChannel(const aos::Channel *channel);
+
   const std::string config_path_;
   const aos::Configuration *config_msg_;
 
@@ -73,6 +78,12 @@
   const int max_status_count_;
 
   std::unordered_map<std::string, Application> applications_;
+  std::vector<std::unique_ptr<aos::ipc_lib::MemoryMappedQueue>> shm_queues_;
+
+  // Capture the --shm_base flag at construction time.  This makes it much
+  // easier to make different shared memory regions for doing things like
+  // multi-node tests.
+  std::string shm_base_;
 
   // Set to true on cleanup to block rpc commands and ensure cleanup only
   // happens once.