Use a pool for messages when configured
This removes allocations of the Message object in message_bridge_server
by reusing buffers from a small pool.
Change-Id: Ibc7f3f73d7b5619bee50985a3ffb10b862684df8
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 0bf9054..8978060 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -277,6 +277,7 @@
// Start out with a decent size big enough to hold timestamps.
size_t max_size = 204;
+ size_t destination_nodes = 0u;
// Seed up all the per-node connection state.
// We are making the assumption here that every connection is bidirectional
// (data is being sent both ways). This is pretty safe because we are
@@ -284,6 +285,7 @@
for (std::string_view destination_node_name :
configuration::DestinationNodeNames(event_loop->configuration(),
event_loop->node())) {
+ ++destination_nodes;
// Find the largest connection message so we can size our buffers big enough
// to receive a connection message. The connect message comes from the
// client to the server, so swap the node arguments.
@@ -389,6 +391,16 @@
// Buffer up the max size a bit so everything fits nicely.
LOG(INFO) << "Max message size for all clients is " << max_size;
server_.SetMaxSize(max_size);
+
+ // Since we are doing interleaving mode 1, we will see at most 1 message being
+ // delivered at a time for an association. That means, if a message is
+ // started to be delivered, all the following parts will be from the same
+ // message in the same stream. The server can have at most 1 association per
+ // client active, and can then (reasonably) have 1 new client connecting
+ // trying to talk. And 2 messages per association (one partially filled one,
+ // and 1 new one with more of the data).
+ server_.SetPoolSize((destination_nodes + 1) * 2);
+
reconnected_.reserve(max_channels());
}
@@ -466,6 +478,7 @@
} else if (message->message_type == Message::kMessage) {
HandleData(message.get());
}
+ server_.FreeMessage(std::move(message));
}
void MessageBridgeServer::MaybeIncrementInvalidConnectionCount(
diff --git a/aos/network/sctp_lib.cc b/aos/network/sctp_lib.cc
index 9632d3c..3729f9f 100644
--- a/aos/network/sctp_lib.cc
+++ b/aos/network/sctp_lib.cc
@@ -334,12 +334,23 @@
return true;
}
-// We read each fragment into a fresh Message, because most of them won't be
-// fragmented. If we do end up with a fragment, then we copy the data out of it.
-aos::unique_c_ptr<Message> SctpReadWrite::ReadMessage() {
- CHECK(fd_ != -1);
+void SctpReadWrite::FreeMessage(aos::unique_c_ptr<Message> &&message) {
+ if (use_pool_) {
+ free_messages_.emplace_back(std::move(message));
+ }
+}
- while (true) {
+void SctpReadWrite::SetPoolSize(size_t pool_size) {
+ CHECK(!use_pool_);
+ free_messages_.reserve(pool_size);
+ for (size_t i = 0; i < pool_size; ++i) {
+ free_messages_.emplace_back(AcquireMessage());
+ }
+ use_pool_ = true;
+}
+
+aos::unique_c_ptr<Message> SctpReadWrite::AcquireMessage() {
+ if (!use_pool_) {
constexpr size_t kMessageAlign = alignof(Message);
const size_t max_message_size =
((sizeof(Message) + max_size_ + 1 + (kMessageAlign - 1)) /
@@ -347,6 +358,22 @@
kMessageAlign;
aos::unique_c_ptr<Message> result(reinterpret_cast<Message *>(
aligned_alloc(kMessageAlign, max_message_size)));
+ return result;
+ } else {
+ CHECK_GT(free_messages_.size(), 0u);
+ aos::unique_c_ptr<Message> result = std::move(free_messages_.back());
+ free_messages_.pop_back();
+ return result;
+ }
+}
+
+// We read each fragment into a fresh Message, because most of them won't be
+// fragmented. If we do end up with a fragment, then we copy the data out of it.
+aos::unique_c_ptr<Message> SctpReadWrite::ReadMessage() {
+ CHECK(fd_ != -1);
+
+ while (true) {
+ aos::unique_c_ptr<Message> result = AcquireMessage();
struct msghdr inmessage;
memset(&inmessage, 0, sizeof(struct msghdr));
diff --git a/aos/network/sctp_lib.h b/aos/network/sctp_lib.h
index 6cd11a3..06152d8 100644
--- a/aos/network/sctp_lib.h
+++ b/aos/network/sctp_lib.h
@@ -115,7 +115,15 @@
}
}
+ // Returns a message returned from ReadMessage back to the pool.
+ void FreeMessage(aos::unique_c_ptr<Message> &&message);
+
+ // Allocates messages for the pool. SetMaxSize must be set first.
+ void SetPoolSize(size_t pool_size);
+
private:
+ aos::unique_c_ptr<Message> AcquireMessage();
+
void CloseSocket();
void DoSetMaxSize();
@@ -131,6 +139,9 @@
size_t max_size_ = 1000;
std::vector<aos::unique_c_ptr<Message>> partial_messages_;
+
+ bool use_pool_ = false;
+ std::vector<aos::unique_c_ptr<Message>> free_messages_;
};
// Returns the max network buffer available for reading for a socket.
diff --git a/aos/network/sctp_server.h b/aos/network/sctp_server.h
index dbfd1ac..1f7e2ee 100644
--- a/aos/network/sctp_server.h
+++ b/aos/network/sctp_server.h
@@ -30,6 +30,11 @@
// Receives the next packet from the remote.
aos::unique_c_ptr<Message> Read() { return sctp_.ReadMessage(); }
+ // Frees the message returned by Read();
+ void FreeMessage(aos::unique_c_ptr<Message> &&message) {
+ sctp_.FreeMessage(std::move(message));
+ }
+
// Sends a block of data to a client on a stream with a TTL. Returns true on
// success.
bool Send(std::string_view data, sctp_assoc_t snd_assoc_id, int stream,
@@ -54,6 +59,8 @@
void SetMaxSize(size_t max_size) { sctp_.SetMaxSize(max_size); }
+ void SetPoolSize(size_t pool_size) { sctp_.SetPoolSize(pool_size); }
+
private:
struct sockaddr_storage sockaddr_local_;
SctpReadWrite sctp_;