Remove rest of mallocs from message_bridge_server

Remove the big malloc for the outgoing message flat buffer builder, and
then a bunch of log statements and vectors which didn't really need to
be there.

Change-Id: I155e4bde23028beefd65af5c04a6e5cba279304a
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 8978060..6cea1da 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -27,8 +27,10 @@
 }
 
 flatbuffers::FlatBufferBuilder ChannelState::PackContext(
-    const Context &context) {
-  flatbuffers::FlatBufferBuilder fbb(channel_->max_size() + 100);
+    FixedAllocator *allocator, const Context &context) {
+  flatbuffers::FlatBufferBuilder fbb(
+      channel_->max_size() + MessageBridgeServer::kRemoteDataHeaderMaxSize,
+      allocator);
   fbb.ForceDefaults(true);
   VLOG(2) << "Found " << peers_.size() << " peers on channel "
           << channel_->name()->string_view() << " "
@@ -58,10 +60,11 @@
   return fbb;
 }
 
-void ChannelState::SendData(SctpServer *server, const Context &context) {
+void ChannelState::SendData(SctpServer *server, FixedAllocator *allocator,
+                            const Context &context) {
   // TODO(austin): I don't like allocating this buffer when we are just freeing
   // it at the end of the function.
-  flatbuffers::FlatBufferBuilder fbb = PackContext(context);
+  flatbuffers::FlatBufferBuilder fbb = PackContext(allocator, context);
 
   // TODO(austin): Track which connections need to be reliable and handle
   // resending properly.
@@ -193,6 +196,7 @@
 
 int ChannelState::NodeConnected(const Node *node, sctp_assoc_t assoc_id,
                                 int stream, SctpServer *server,
+                                FixedAllocator *allocator,
                                 aos::monotonic_clock::time_point monotonic_now,
                                 std::vector<sctp_assoc_t> *reconnected) {
   VLOG(1) << "Channel " << channel_->name()->string_view() << " "
@@ -210,14 +214,18 @@
                      peer.sac_assoc_id) == reconnected->end())) {
         reconnected->push_back(peer.sac_assoc_id);
         if (peer.sac_assoc_id == assoc_id) {
-          LOG_EVERY_T(WARNING, 0.025)
-              << "Node " << node->name()->string_view() << " reconnecting on "
-              << assoc_id << " with the same ID, something got lost";
+          if (VLOG_IS_ON(1)) {
+            LOG_EVERY_T(WARNING, 0.025)
+                << "Node " << node->name()->string_view() << " reconnecting on "
+                << assoc_id << " with the same ID, something got lost";
+          }
         } else {
-          LOG_EVERY_T(WARNING, 0.025)
-              << "Node " << node->name()->string_view() << " "
-              << " already connected on " << peer.sac_assoc_id
-              << " aborting old connection and switching to " << assoc_id;
+          if (VLOG_IS_ON(1)) {
+            LOG_EVERY_T(WARNING, 0.025)
+                << "Node " << node->name()->string_view() << " "
+                << " already connected on " << peer.sac_assoc_id
+                << " aborting old connection and switching to " << assoc_id;
+          }
           server->Abort(peer.sac_assoc_id);
         }
       }
@@ -236,10 +244,8 @@
                 << (last_message_fetcher_->context().data != nullptr);
         if (last_message_fetcher_->context().data != nullptr) {
           // SendData sends to all...  Only send to the new one.
-          // TODO(austin): I don't like allocating this buffer when we are just
-          // freeing it at the end of the function.
           flatbuffers::FlatBufferBuilder fbb =
-              PackContext(last_message_fetcher_->context());
+              PackContext(allocator, last_message_fetcher_->context());
 
           if (server->Send(std::string_view(reinterpret_cast<const char *>(
                                                 fbb.GetBufferPointer()),
@@ -268,9 +274,11 @@
               event_loop->node()->port()),
       server_status_(event_loop,
                      [this](const Context &context) {
-                       timestamp_state_->SendData(&server_, context);
+                       timestamp_state_->SendData(&server_, &allocator_,
+                                                  context);
                      }),
-      config_sha256_(std::move(config_sha256)) {
+      config_sha256_(std::move(config_sha256)),
+      allocator_(0) {
   CHECK_EQ(config_sha256_.size(), 64u) << ": Wrong length sha256sum";
   CHECK(event_loop_->node() != nullptr) << ": No nodes configured.";
 
@@ -309,6 +317,7 @@
   LOG(INFO) << "Hostname: " << event_loop_->node()->hostname()->string_view();
 
   int channel_index = 0;
+  size_t max_channel_size = 0u;
   const Channel *const timestamp_channel = configuration::GetChannel(
       event_loop_->configuration(), "/aos", Timestamp::GetFullyQualifiedName(),
       event_loop_->name(), event_loop_->node());
@@ -330,6 +339,10 @@
           any_reliable = true;
         }
       }
+
+      max_channel_size =
+          std::max(static_cast<size_t>(channel->max_size()), max_channel_size);
+
       std::unique_ptr<ChannelState> state(new ChannelState{
           channel, channel_index,
           any_reliable ? event_loop_->MakeRawFetcher(channel) : nullptr});
@@ -363,7 +376,7 @@
         event_loop_->MakeRawWatcher(
             channel, [this, state_ptr](const Context &context,
                                        const void * /*message*/) {
-              state_ptr->SendData(&server_, context);
+              state_ptr->SendData(&server_, &allocator_, context);
             });
       } else {
         for (const Connection *connection : *channel->destination_nodes()) {
@@ -401,6 +414,8 @@
   // and 1 new one with more of the data).
   server_.SetPoolSize((destination_nodes + 1) * 2);
 
+  allocator_ = FixedAllocator(max_channel_size + kRemoteDataHeaderMaxSize);
+
   reconnected_.reserve(max_channels());
 }
 
@@ -518,8 +533,10 @@
     {
       flatbuffers::Verifier verifier(message->data(), message->size);
       if (!connect->Verify(verifier)) {
-        LOG_EVERY_T(WARNING, 1.0)
-            << "Failed to verify message, disconnecting client";
+        if (VLOG_IS_ON(1)) {
+          LOG_EVERY_T(WARNING, 1.0)
+              << "Failed to verify message, disconnecting client";
+        }
         server_.Abort(message->header.rcvinfo.rcv_assoc_id);
 
         MaybeIncrementInvalidConnectionCount(nullptr);
@@ -529,8 +546,9 @@
     VLOG(1) << FlatbufferToJson(connect);
 
     if (!connect->has_config_sha256()) {
-      LOG_EVERY_T(WARNING, 1.0)
-          << "Client missing config_sha256, disconnecting client";
+      if (VLOG_IS_ON(1)) {
+        LOG(WARNING) << "Client missing config_sha256, disconnecting client";
+      }
       server_.Abort(message->header.rcvinfo.rcv_assoc_id);
 
       MaybeIncrementInvalidConnectionCount(connect->node());
@@ -538,10 +556,12 @@
     }
 
     if (connect->config_sha256()->string_view() != config_sha256_) {
-      LOG_EVERY_T(WARNING, 1.0) << "Client config sha256 of "
-                                << connect->config_sha256()->string_view()
-                                << " doesn't match our config sha256 of "
-                                << config_sha256_ << ", disconnecting client";
+      if (VLOG_IS_ON(1)) {
+        LOG(WARNING) << "Client config sha256 of "
+                     << connect->config_sha256()->string_view()
+                     << " doesn't match our config sha256 of " << config_sha256_
+                     << ", disconnecting client";
+      }
       server_.Abort(message->header.rcvinfo.rcv_assoc_id);
 
       MaybeIncrementInvalidConnectionCount(connect->node());
@@ -550,8 +570,10 @@
 
     if (connect->channels_to_transfer()->size() >
         static_cast<size_t>(max_channels())) {
-      LOG_EVERY_T(WARNING, 1.0)
-          << "Client has more channels than we do, disconnecting client";
+      if (VLOG_IS_ON(1)) {
+        LOG(WARNING)
+            << "Client has more channels than we do, disconnecting client";
+      }
       server_.Abort(message->header.rcvinfo.rcv_assoc_id);
 
       MaybeIncrementInvalidConnectionCount(connect->node());
@@ -579,7 +601,8 @@
         if (channel_state->Matches(channel)) {
           node_index = channel_state->NodeConnected(
               connect->node(), message->header.rcvinfo.rcv_assoc_id,
-              channel_index, &server_, monotonic_now, &reconnected_);
+              channel_index, &server_, &allocator_, monotonic_now,
+              &reconnected_);
           CHECK_NE(node_index, -1)
               << ": Failed to find node "
               << aos::FlatbufferToJson(connect->node()) << " for connection "
@@ -589,8 +612,14 @@
         }
       }
       if (!matched) {
-        LOG(ERROR) << "Remote tried registering for unknown channel "
-                   << FlatbufferToJson(channel);
+        if (VLOG_IS_ON(1)) {
+          LOG(ERROR) << "Remote tried registering for unknown channel "
+                     << FlatbufferToJson(channel);
+        }
+        server_.Abort(message->header.rcvinfo.rcv_assoc_id);
+
+        MaybeIncrementInvalidConnectionCount(connect->node());
+        return;
       }
       ++channel_index;
     }
@@ -629,11 +658,13 @@
       message->LogRcvInfo();
     }
   } else {
-    message->LogRcvInfo();
-    // TODO(sarah.newman): add some versioning concept such that if this was a
-    // fatal error, we would never get here.
-    LOG_FIRST_N(ERROR, 20) << "Unexpected stream id "
-                           << message->header.rcvinfo.rcv_sid;
+    // We should never see the client sending us something on the wrong stream.
+    // Just explode...  In theory, this could let a client DOS us, but we trust
+    // the client.
+    if (VLOG_IS_ON(2)) {
+      message->LogRcvInfo();
+    }
+    LOG(FATAL) << "Unexpected stream id " << message->header.rcvinfo.rcv_sid;
   }
 }
 
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index 2884e58..fdc0441 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -68,7 +68,7 @@
   // This will potentially grow to the number of associations as we find reconnects.
   int NodeDisconnected(sctp_assoc_t assoc_id);
   int NodeConnected(const Node *node, sctp_assoc_t assoc_id, int stream,
-                    SctpServer *server,
+                    SctpServer *server, FixedAllocator *allocator,
                     aos::monotonic_clock::time_point monotonic_now,
                     std::vector<sctp_assoc_t> *reconnected);
 
@@ -83,10 +83,12 @@
   bool Matches(const Channel *other_channel);
 
   // Sends the data in context using the provided server.
-  void SendData(SctpServer *server, const Context &context);
+  void SendData(SctpServer *server, FixedAllocator *allocator,
+                const Context &context);
 
   // Packs a context into a size prefixed message header for transmission.
-  flatbuffers::FlatBufferBuilder PackContext(const Context &context);
+  flatbuffers::FlatBufferBuilder PackContext(FixedAllocator *allocator,
+                                             const Context &context);
 
   // Handles reception of delivery times.
   void HandleDelivery(sctp_assoc_t rcv_assoc_id, uint16_t ssn,
@@ -109,6 +111,10 @@
 // node.  It handles the session and dispatches data to the ChannelState.
 class MessageBridgeServer {
  public:
+  // Size to reserve when building the RemoteData message for the header over
+  // the data size.
+  static constexpr size_t kRemoteDataHeaderMaxSize = 208u;
+
   MessageBridgeServer(aos::ShmEventLoop *event_loop, std::string config_sha256);
 
   ~MessageBridgeServer() { event_loop_->epoll()->DeleteFd(server_.fd()); }
@@ -152,11 +158,13 @@
   // null.
   std::vector<std::unique_ptr<ChannelState>> channels_;
 
-  std::string config_sha256_;
+  const std::string config_sha256_;
 
   // List of assoc_id's that have been found already when connecting.  This is a
   // member variable so the memory is allocated in the constructor.
   std::vector<sctp_assoc_t> reconnected_;
+
+  FixedAllocator allocator_;
 };
 
 }  // namespace message_bridge
diff --git a/aos/network/message_bridge_server_status.cc b/aos/network/message_bridge_server_status.cc
index 8b186b7..df87134 100644
--- a/aos/network/message_bridge_server_status.cc
+++ b/aos/network/message_bridge_server_status.cc
@@ -88,6 +88,7 @@
       send_data_(send_data) {
   server_connection_offsets_.reserve(
       statistics_.message().connections()->size());
+  client_offsets_.reserve(statistics_.message().connections()->size());
 
   filters_.resize(event_loop->configuration()->nodes()->size());
   partial_deliveries_.resize(event_loop->configuration()->nodes()->size());
@@ -290,7 +291,7 @@
 
   if (client_statistics_fetcher_.get()) {
     // Build up the list of client offsets.
-    std::vector<flatbuffers::Offset<ClientOffset>> client_offsets;
+    client_offsets_.clear();
 
     // Iterate through the connections this node has made.
     for (const ClientConnection *connection :
@@ -377,10 +378,10 @@
       client_offset_builder.add_node(node_offset);
       client_offset_builder.add_monotonic_offset(
           connection->monotonic_offset());
-      client_offsets.emplace_back(client_offset_builder.Finish());
+      client_offsets_.emplace_back(client_offset_builder.Finish());
     }
     flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<ClientOffset>>>
-        offsets_offset = fbb->CreateVector(client_offsets);
+        offsets_offset = fbb->CreateVector(client_offsets_);
 
     Timestamp::Builder builder(*fbb);
     builder.add_offsets(offsets_offset);
diff --git a/aos/network/message_bridge_server_status.h b/aos/network/message_bridge_server_status.h
index d513a0e..624b376 100644
--- a/aos/network/message_bridge_server_status.h
+++ b/aos/network/message_bridge_server_status.h
@@ -68,7 +68,7 @@
   ServerConnection *FindServerConnection(std::string_view node_name);
   ServerConnection *FindServerConnection(const Node *node);
 
-  std::vector<ServerConnection *> server_connection() {
+  const std::vector<ServerConnection *> &server_connection() {
     return server_connection_;
   }
 
@@ -132,6 +132,8 @@
   std::vector<uint32_t> partial_deliveries_;
 
   size_t invalid_connection_count_ = 0u;
+
+  std::vector<flatbuffers::Offset<ClientOffset>> client_offsets_;
 };
 
 }  // namespace message_bridge