Initialize channel memory in parallel
When there are a large amount of channels, it takes a while to map them
and make sure there are pages behind them and owned by starterd. This
turns out to be many seconds on larger systems. This can be done in
parallel, and achieves a 50+% speedup on that part of the startup
process.
Change-Id: Id45e279b3dcd19e7d81c3b5b75ef324519f3721a
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/starter/starterd_lib.cc b/aos/starter/starterd_lib.cc
index 7c94863..f08ecc2 100644
--- a/aos/starter/starterd_lib.cc
+++ b/aos/starter/starterd_lib.cc
@@ -16,6 +16,10 @@
// to set the file permissions on the shared memory block.
DECLARE_uint32(permissions);
+DEFINE_uint32(queue_initialization_threads, 0,
+ "Number of threads to spin up to initialize the queue. 0 means "
+ "use the main thread.");
+
namespace aos {
namespace starter {
@@ -108,13 +112,44 @@
// MemoryMappedQueues for each one to allocate the shared memory before
// spawning any shasta process.
if (config_msg_->has_channels()) {
+ LOG(INFO) << "Starting to initialize shared memory.";
const aos::Node *this_node = event_loop_.node();
- std::vector<const aos::Channel *> intranode_channels;
+ std::vector<const aos::Channel *> channels_to_construct;
for (const aos::Channel *channel : *config_msg_->channels()) {
if (aos::configuration::ChannelIsReadableOnNode(channel, this_node)) {
- AddChannel(channel);
+ if (FLAGS_queue_initialization_threads == 0) {
+ AddChannel(channel);
+ } else {
+ channels_to_construct.push_back(channel);
+ }
}
}
+
+ if (FLAGS_queue_initialization_threads != 0) {
+ std::mutex pool_mutex;
+ std::vector<std::thread> threads;
+ threads.reserve(FLAGS_queue_initialization_threads);
+ for (size_t i = 0; i < FLAGS_queue_initialization_threads; ++i) {
+ threads.emplace_back([this, &pool_mutex, &channels_to_construct]() {
+ while (true) {
+ const aos::Channel *channel;
+ {
+ std::unique_lock<std::mutex> locker(pool_mutex);
+ if (channels_to_construct.empty()) {
+ return;
+ }
+ channel = channels_to_construct.back();
+ channels_to_construct.pop_back();
+ }
+ AddChannel(channel);
+ }
+ });
+ }
+ for (size_t i = 0; i < FLAGS_queue_initialization_threads; ++i) {
+ threads[i].join();
+ }
+ }
+ LOG(INFO) << "Starting applications.";
}
}
@@ -250,8 +285,14 @@
void Starter::AddChannel(const aos::Channel *channel) {
CHECK_NOTNULL(channel);
- shm_queues_.emplace_back(std::make_unique<aos::ipc_lib::MemoryMappedQueue>(
- shm_base_, FLAGS_permissions, event_loop_.configuration(), channel));
+ std::unique_ptr<aos::ipc_lib::MemoryMappedQueue> queue =
+ std::make_unique<aos::ipc_lib::MemoryMappedQueue>(
+ shm_base_, FLAGS_permissions, event_loop_.configuration(), channel);
+
+ {
+ std::unique_lock<std::mutex> locker(queue_mutex_);
+ shm_queues_.emplace_back(std::move(queue));
+ }
VLOG(1) << "Created MemoryMappedQueue for "
<< aos::configuration::StrippedChannelToString(channel) << " under "
<< shm_base_;