Use a pool for messages when configured

This removes allocations of the Message object in message_bridge_server
by reusing buffers from a small pool.

Change-Id: Ibc7f3f73d7b5619bee50985a3ffb10b862684df8
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 0bf9054..8978060 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -277,6 +277,7 @@
   // Start out with a decent size big enough to hold timestamps.
   size_t max_size = 204;
 
+  size_t destination_nodes = 0u;
   // Seed up all the per-node connection state.
   // We are making the assumption here that every connection is bidirectional
   // (data is being sent both ways).  This is pretty safe because we are
@@ -284,6 +285,7 @@
   for (std::string_view destination_node_name :
        configuration::DestinationNodeNames(event_loop->configuration(),
                                            event_loop->node())) {
+    ++destination_nodes;
     // Find the largest connection message so we can size our buffers big enough
     // to receive a connection message.  The connect message comes from the
     // client to the server, so swap the node arguments.
@@ -389,6 +391,16 @@
   // Buffer up the max size a bit so everything fits nicely.
   LOG(INFO) << "Max message size for all clients is " << max_size;
   server_.SetMaxSize(max_size);
+
+  // Since we are doing interleaving mode 1, we will see at most 1 message being
+  // delivered at a time for an association.  That means, if a message is
+  // started to be delivered, all the following parts will be from the same
+  // message in the same stream.  The server can have at most 1 association per
+  // client active, and can then (reasonably) have 1 new client connecting
+  // trying to talk.  And 2 messages per association (one partially filled one,
+  // and 1 new one with more of the data).
+  server_.SetPoolSize((destination_nodes + 1) * 2);
+
   reconnected_.reserve(max_channels());
 }
 
@@ -466,6 +478,7 @@
   } else if (message->message_type == Message::kMessage) {
     HandleData(message.get());
   }
+  server_.FreeMessage(std::move(message));
 }
 
 void MessageBridgeServer::MaybeIncrementInvalidConnectionCount(
diff --git a/aos/network/sctp_lib.cc b/aos/network/sctp_lib.cc
index 9632d3c..3729f9f 100644
--- a/aos/network/sctp_lib.cc
+++ b/aos/network/sctp_lib.cc
@@ -334,12 +334,23 @@
   return true;
 }
 
-// We read each fragment into a fresh Message, because most of them won't be
-// fragmented. If we do end up with a fragment, then we copy the data out of it.
-aos::unique_c_ptr<Message> SctpReadWrite::ReadMessage() {
-  CHECK(fd_ != -1);
+void SctpReadWrite::FreeMessage(aos::unique_c_ptr<Message> &&message) {
+  if (use_pool_) {
+    free_messages_.emplace_back(std::move(message));
+  }
+}
 
-  while (true) {
+void SctpReadWrite::SetPoolSize(size_t pool_size) {
+  CHECK(!use_pool_);
+  free_messages_.reserve(pool_size);
+  for (size_t i = 0; i < pool_size; ++i) {
+    free_messages_.emplace_back(AcquireMessage());
+  }
+  use_pool_ = true;
+}
+
+aos::unique_c_ptr<Message> SctpReadWrite::AcquireMessage() {
+  if (!use_pool_) {
     constexpr size_t kMessageAlign = alignof(Message);
     const size_t max_message_size =
         ((sizeof(Message) + max_size_ + 1 + (kMessageAlign - 1)) /
@@ -347,6 +358,22 @@
         kMessageAlign;
     aos::unique_c_ptr<Message> result(reinterpret_cast<Message *>(
         aligned_alloc(kMessageAlign, max_message_size)));
+    return result;
+  } else {
+    CHECK_GT(free_messages_.size(), 0u);
+    aos::unique_c_ptr<Message> result = std::move(free_messages_.back());
+    free_messages_.pop_back();
+    return result;
+  }
+}
+
+// We read each fragment into a fresh Message, because most of them won't be
+// fragmented. If we do end up with a fragment, then we copy the data out of it.
+aos::unique_c_ptr<Message> SctpReadWrite::ReadMessage() {
+  CHECK(fd_ != -1);
+
+  while (true) {
+    aos::unique_c_ptr<Message> result = AcquireMessage();
 
     struct msghdr inmessage;
     memset(&inmessage, 0, sizeof(struct msghdr));
diff --git a/aos/network/sctp_lib.h b/aos/network/sctp_lib.h
index 6cd11a3..06152d8 100644
--- a/aos/network/sctp_lib.h
+++ b/aos/network/sctp_lib.h
@@ -115,7 +115,15 @@
     }
   }
 
+  // Returns a message returned from ReadMessage back to the pool.
+  void FreeMessage(aos::unique_c_ptr<Message> &&message);
+
+  // Allocates messages for the pool.  SetMaxSize must be set first.
+  void SetPoolSize(size_t pool_size);
+
  private:
+  aos::unique_c_ptr<Message> AcquireMessage();
+
   void CloseSocket();
   void DoSetMaxSize();
 
@@ -131,6 +139,9 @@
   size_t max_size_ = 1000;
 
   std::vector<aos::unique_c_ptr<Message>> partial_messages_;
+
+  bool use_pool_ = false;
+  std::vector<aos::unique_c_ptr<Message>> free_messages_;
 };
 
 // Returns the max network buffer available for reading for a socket.
diff --git a/aos/network/sctp_server.h b/aos/network/sctp_server.h
index dbfd1ac..1f7e2ee 100644
--- a/aos/network/sctp_server.h
+++ b/aos/network/sctp_server.h
@@ -30,6 +30,11 @@
   // Receives the next packet from the remote.
   aos::unique_c_ptr<Message> Read() { return sctp_.ReadMessage(); }
 
+  // Frees the message returned by Read();
+  void FreeMessage(aos::unique_c_ptr<Message> &&message) {
+    sctp_.FreeMessage(std::move(message));
+  }
+
   // Sends a block of data to a client on a stream with a TTL.  Returns true on
   // success.
   bool Send(std::string_view data, sctp_assoc_t snd_assoc_id, int stream,
@@ -54,6 +59,8 @@
 
   void SetMaxSize(size_t max_size) { sctp_.SetMaxSize(max_size); }
 
+  void SetPoolSize(size_t pool_size) { sctp_.SetPoolSize(pool_size); }
+
  private:
   struct sockaddr_storage sockaddr_local_;
   SctpReadWrite sctp_;