Use starterd to pre-allocate all AOS message queue shared memory.
AOS uses shared memory for intranode message channels. The first
application that creates a sender/fetcher/watcher for each channel
allocates the shared memory for the message queues, and is given credit
for the shared memory as part of its memory footprint.
The non-deterministic launch order of all the shasta applications means
that the measured memory footprint of many of the processes can vary
greatly, depending on where it was in the launch order that time.
As a result, the memory_limit values in the config files are way
bigger than they need to be, if we could have deterministic measurements
of the actual memory usage of each process.
The starterd process is what actually spawns all the processes, using
the configuration data. Making this process create MemoryMappedQueues
for every intranode channel on its node will pre-allocate all the
message queue shared memory, which dramatially reduces the measured
memory usage of most of the processes it spawns, and makes those numbers
much more deterministic.
Renamed MMappedQueue to MemoryMappedQueue and moved it from
shm_event_loop.cc to new source files in the lockless_queue library.
Update starterd_lib to identify all channels in the config that are
readable on this node and allocate MemoryMappedQueues for each.
Change-Id: I40649b61653968495ba616b72f95d799a8e06414
Signed-off-by: Brian J Griglak <brian.griglak@bluerivertech.com>
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index ef6c4f3..381a273 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -168,14 +168,19 @@
srcs = [
"lockless_queue.cc",
"lockless_queue_memory.h",
+ "memory_mapped_queue.cc",
],
- hdrs = ["lockless_queue.h"],
+ hdrs = [
+ "lockless_queue.h",
+ "memory_mapped_queue.h",
+ ],
target_compatible_with = ["@platforms//os:linux"],
visibility = ["//visibility:public"],
deps = [
":aos_sync",
":data_alignment",
":index",
+ "//aos:configuration",
"//aos:realtime",
"//aos:uuid",
"//aos/time",
diff --git a/aos/ipc_lib/memory_mapped_queue.cc b/aos/ipc_lib/memory_mapped_queue.cc
new file mode 100644
index 0000000..c2d8d44
--- /dev/null
+++ b/aos/ipc_lib/memory_mapped_queue.cc
@@ -0,0 +1,135 @@
+#include "aos/ipc_lib/memory_mapped_queue.h"
+
+#include <fcntl.h>
+#include <sys/mman.h>
+#include <unistd.h>
+
+#include "absl/strings/str_cat.h"
+
+namespace aos {
+namespace ipc_lib {
+
+std::string ShmFolder(std::string_view shm_base, const Channel *channel) {
+ CHECK(channel->has_name());
+ CHECK_EQ(channel->name()->string_view()[0], '/');
+ return absl::StrCat(shm_base, channel->name()->string_view(), "/");
+}
+
+std::string ShmPath(std::string_view shm_base, const Channel *channel) {
+ CHECK(channel->has_type());
+ return ShmFolder(shm_base, channel) + channel->type()->str() + ".v4";
+}
+
+void PageFaultDataWrite(char *data, size_t size) {
+ // This just has to divide the actual page size. Being smaller will make this
+ // a bit slower than necessary, but not much. 1024 is a pretty conservative
+ // choice (most pages are probably 4096).
+ static constexpr size_t kPageSize = 1024;
+ const size_t pages = (size + kPageSize - 1) / kPageSize;
+ for (size_t i = 0; i < pages; ++i) {
+ char zero = 0;
+ // We need to ensure there's a writable pagetable entry, but avoid modifying
+ // the data.
+ //
+ // Even if you lock the data into memory, some kernels still seem to lazily
+ // create the actual pagetable entries. This means we need to somehow
+ // "write" to the page.
+ //
+ // Also, this takes place while other processes may be concurrently
+ // opening/initializing the memory, so we need to avoid corrupting that.
+ //
+ // This is the simplest operation I could think of which achieves that:
+ // "store 0 if it's already 0".
+ __atomic_compare_exchange_n(&data[i * kPageSize], &zero, 0, true,
+ __ATOMIC_RELAXED, __ATOMIC_RELAXED);
+ }
+}
+
+void PageFaultDataRead(const char *data, size_t size) {
+ // This just has to divide the actual page size. Being smaller will make this
+ // a bit slower than necessary, but not much. 1024 is a pretty conservative
+ // choice (most pages are probably 4096).
+ static constexpr size_t kPageSize = 1024;
+ const size_t pages = (size + kPageSize - 1) / kPageSize;
+ for (size_t i = 0; i < pages; ++i) {
+ // We need to ensure there's a readable pagetable entry.
+ __atomic_load_n(&data[i * kPageSize], __ATOMIC_RELAXED);
+ }
+}
+
+LocklessQueueConfiguration MakeQueueConfiguration(
+ const Configuration *configuration, const Channel *channel) {
+ LocklessQueueConfiguration config;
+
+ config.num_watchers = channel->num_watchers();
+ config.num_senders = channel->num_senders();
+ // The value in the channel will default to 0 if readers are configured to
+ // copy.
+ config.num_pinners = channel->num_readers();
+ config.queue_size = configuration::QueueSize(configuration, channel);
+ config.message_data_size = channel->max_size();
+
+ return config;
+}
+
+MemoryMappedQueue::MemoryMappedQueue(std::string_view shm_base,
+ uint32_t permissions,
+ const Configuration *config,
+ const Channel *channel)
+ : config_(MakeQueueConfiguration(config, channel)) {
+ std::string path = ShmPath(shm_base, channel);
+
+ size_ = ipc_lib::LocklessQueueMemorySize(config_);
+
+ util::MkdirP(path, permissions);
+
+ // There are 2 cases. Either the file already exists, or it does not
+ // already exist and we need to create it. Start by trying to create it. If
+ // that fails, the file has already been created and we can open it
+ // normally.. Once the file has been created it will never be deleted.
+ int fd =
+ open(path.c_str(), O_RDWR | O_CREAT | O_EXCL, O_CLOEXEC | permissions);
+ if ((fd == -1) && (errno == EEXIST)) {
+ VLOG(1) << path << " already created.";
+ // File already exists.
+ fd = open(path.c_str(), O_RDWR, O_CLOEXEC);
+ PCHECK(fd != -1) << ": Failed to open " << path;
+ while (true) {
+ struct stat st;
+ PCHECK(fstat(fd, &st) == 0);
+ if (st.st_size != 0) {
+ CHECK_EQ(static_cast<size_t>(st.st_size), size_)
+ << ": Size of " << path
+ << " doesn't match expected size of backing queue file. Did the "
+ "queue definition change?";
+ break;
+ } else {
+ // The creating process didn't get around to it yet. Give it a bit.
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ VLOG(1) << path << " is zero size, waiting";
+ }
+ }
+ } else {
+ VLOG(1) << "Created " << path;
+ PCHECK(fd != -1) << ": Failed to open " << path;
+ PCHECK(ftruncate(fd, size_) == 0);
+ }
+
+ data_ = mmap(NULL, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+ PCHECK(data_ != MAP_FAILED);
+ const_data_ = mmap(NULL, size_, PROT_READ, MAP_SHARED, fd, 0);
+ PCHECK(const_data_ != MAP_FAILED);
+ PCHECK(close(fd) == 0);
+ PageFaultDataWrite(static_cast<char *>(data_), size_);
+ PageFaultDataRead(static_cast<const char *>(const_data_), size_);
+
+ ipc_lib::InitializeLocklessQueueMemory(memory(), config_);
+}
+
+MemoryMappedQueue::~MemoryMappedQueue() {
+ PCHECK(munmap(data_, size_) == 0);
+ PCHECK(munmap(const_cast<void *>(const_data_), size_) == 0);
+}
+
+} // namespace ipc_lib
+} // namespace aos
diff --git a/aos/ipc_lib/memory_mapped_queue.h b/aos/ipc_lib/memory_mapped_queue.h
new file mode 100644
index 0000000..784c559
--- /dev/null
+++ b/aos/ipc_lib/memory_mapped_queue.h
@@ -0,0 +1,63 @@
+#ifndef AOS_IPC_LIB_MEMORY_MAPPED_QUEUE_H_
+#define AOS_IPC_LIB_MEMORY_MAPPED_QUEUE_H_
+
+#include "absl/types/span.h"
+#include "aos/configuration.h"
+#include "aos/ipc_lib/lockless_queue.h"
+
+namespace aos {
+namespace ipc_lib {
+
+std::string ShmFolder(std::string_view shm_base, const Channel *channel);
+
+std::string ShmPath(std::string_view shm_base, const Channel *channel);
+
+LocklessQueueConfiguration MakeQueueConfiguration(
+ const Configuration *configuration, const Channel *channel);
+
+class MemoryMappedQueue {
+ public:
+ MemoryMappedQueue(std::string_view shm_base, uint32_t permissions,
+ const Configuration *config, const Channel *channel);
+ ~MemoryMappedQueue();
+
+ // This class can't be default or copy constructed.
+ MemoryMappedQueue() = delete;
+ MemoryMappedQueue(const MemoryMappedQueue &other) = delete;
+ MemoryMappedQueue &operator=(const MemoryMappedQueue &rhs) = delete;
+
+ LocklessQueueMemory *memory() const {
+ return reinterpret_cast<ipc_lib::LocklessQueueMemory *>(data_);
+ }
+
+ const LocklessQueueMemory *const_memory() const {
+ return reinterpret_cast<const LocklessQueueMemory *>(const_data_);
+ }
+
+ const LocklessQueueConfiguration &config() const { return config_; }
+
+ LocklessQueue queue() const {
+ return LocklessQueue(const_memory(), memory(), config());
+ }
+
+ absl::Span<char> GetMutableSharedMemory() const {
+ return absl::Span<char>(static_cast<char *>(data_), size_);
+ }
+
+ absl::Span<const char> GetConstSharedMemory() const {
+ return absl::Span<const char>(static_cast<const char *>(const_data_),
+ size_);
+ }
+
+ private:
+ const LocklessQueueConfiguration config_;
+
+ size_t size_;
+ void *data_;
+ const void *const_data_;
+};
+
+} // namespace ipc_lib
+} // namespace aos
+
+#endif // AOS_IPC_LIB_MEMORY_MAPPED_QUEUE_H_