Remove rest of mallocs from message_bridge_server
Remove the big malloc for the outgoing message flat buffer builder, and
then a bunch of log statements and vectors which didn't really need to
be there.
Change-Id: I155e4bde23028beefd65af5c04a6e5cba279304a
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 8978060..6cea1da 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -27,8 +27,10 @@
}
flatbuffers::FlatBufferBuilder ChannelState::PackContext(
- const Context &context) {
- flatbuffers::FlatBufferBuilder fbb(channel_->max_size() + 100);
+ FixedAllocator *allocator, const Context &context) {
+ flatbuffers::FlatBufferBuilder fbb(
+ channel_->max_size() + MessageBridgeServer::kRemoteDataHeaderMaxSize,
+ allocator);
fbb.ForceDefaults(true);
VLOG(2) << "Found " << peers_.size() << " peers on channel "
<< channel_->name()->string_view() << " "
@@ -58,10 +60,11 @@
return fbb;
}
-void ChannelState::SendData(SctpServer *server, const Context &context) {
+void ChannelState::SendData(SctpServer *server, FixedAllocator *allocator,
+ const Context &context) {
// TODO(austin): I don't like allocating this buffer when we are just freeing
// it at the end of the function.
- flatbuffers::FlatBufferBuilder fbb = PackContext(context);
+ flatbuffers::FlatBufferBuilder fbb = PackContext(allocator, context);
// TODO(austin): Track which connections need to be reliable and handle
// resending properly.
@@ -193,6 +196,7 @@
int ChannelState::NodeConnected(const Node *node, sctp_assoc_t assoc_id,
int stream, SctpServer *server,
+ FixedAllocator *allocator,
aos::monotonic_clock::time_point monotonic_now,
std::vector<sctp_assoc_t> *reconnected) {
VLOG(1) << "Channel " << channel_->name()->string_view() << " "
@@ -210,14 +214,18 @@
peer.sac_assoc_id) == reconnected->end())) {
reconnected->push_back(peer.sac_assoc_id);
if (peer.sac_assoc_id == assoc_id) {
- LOG_EVERY_T(WARNING, 0.025)
- << "Node " << node->name()->string_view() << " reconnecting on "
- << assoc_id << " with the same ID, something got lost";
+ if (VLOG_IS_ON(1)) {
+ LOG_EVERY_T(WARNING, 0.025)
+ << "Node " << node->name()->string_view() << " reconnecting on "
+ << assoc_id << " with the same ID, something got lost";
+ }
} else {
- LOG_EVERY_T(WARNING, 0.025)
- << "Node " << node->name()->string_view() << " "
- << " already connected on " << peer.sac_assoc_id
- << " aborting old connection and switching to " << assoc_id;
+ if (VLOG_IS_ON(1)) {
+ LOG_EVERY_T(WARNING, 0.025)
+ << "Node " << node->name()->string_view() << " "
+ << " already connected on " << peer.sac_assoc_id
+ << " aborting old connection and switching to " << assoc_id;
+ }
server->Abort(peer.sac_assoc_id);
}
}
@@ -236,10 +244,8 @@
<< (last_message_fetcher_->context().data != nullptr);
if (last_message_fetcher_->context().data != nullptr) {
// SendData sends to all... Only send to the new one.
- // TODO(austin): I don't like allocating this buffer when we are just
- // freeing it at the end of the function.
flatbuffers::FlatBufferBuilder fbb =
- PackContext(last_message_fetcher_->context());
+ PackContext(allocator, last_message_fetcher_->context());
if (server->Send(std::string_view(reinterpret_cast<const char *>(
fbb.GetBufferPointer()),
@@ -268,9 +274,11 @@
event_loop->node()->port()),
server_status_(event_loop,
[this](const Context &context) {
- timestamp_state_->SendData(&server_, context);
+ timestamp_state_->SendData(&server_, &allocator_,
+ context);
}),
- config_sha256_(std::move(config_sha256)) {
+ config_sha256_(std::move(config_sha256)),
+ allocator_(0) {
CHECK_EQ(config_sha256_.size(), 64u) << ": Wrong length sha256sum";
CHECK(event_loop_->node() != nullptr) << ": No nodes configured.";
@@ -309,6 +317,7 @@
LOG(INFO) << "Hostname: " << event_loop_->node()->hostname()->string_view();
int channel_index = 0;
+ size_t max_channel_size = 0u;
const Channel *const timestamp_channel = configuration::GetChannel(
event_loop_->configuration(), "/aos", Timestamp::GetFullyQualifiedName(),
event_loop_->name(), event_loop_->node());
@@ -330,6 +339,10 @@
any_reliable = true;
}
}
+
+ max_channel_size =
+ std::max(static_cast<size_t>(channel->max_size()), max_channel_size);
+
std::unique_ptr<ChannelState> state(new ChannelState{
channel, channel_index,
any_reliable ? event_loop_->MakeRawFetcher(channel) : nullptr});
@@ -363,7 +376,7 @@
event_loop_->MakeRawWatcher(
channel, [this, state_ptr](const Context &context,
const void * /*message*/) {
- state_ptr->SendData(&server_, context);
+ state_ptr->SendData(&server_, &allocator_, context);
});
} else {
for (const Connection *connection : *channel->destination_nodes()) {
@@ -401,6 +414,8 @@
// and 1 new one with more of the data).
server_.SetPoolSize((destination_nodes + 1) * 2);
+ allocator_ = FixedAllocator(max_channel_size + kRemoteDataHeaderMaxSize);
+
reconnected_.reserve(max_channels());
}
@@ -518,8 +533,10 @@
{
flatbuffers::Verifier verifier(message->data(), message->size);
if (!connect->Verify(verifier)) {
- LOG_EVERY_T(WARNING, 1.0)
- << "Failed to verify message, disconnecting client";
+ if (VLOG_IS_ON(1)) {
+ LOG_EVERY_T(WARNING, 1.0)
+ << "Failed to verify message, disconnecting client";
+ }
server_.Abort(message->header.rcvinfo.rcv_assoc_id);
MaybeIncrementInvalidConnectionCount(nullptr);
@@ -529,8 +546,9 @@
VLOG(1) << FlatbufferToJson(connect);
if (!connect->has_config_sha256()) {
- LOG_EVERY_T(WARNING, 1.0)
- << "Client missing config_sha256, disconnecting client";
+ if (VLOG_IS_ON(1)) {
+ LOG(WARNING) << "Client missing config_sha256, disconnecting client";
+ }
server_.Abort(message->header.rcvinfo.rcv_assoc_id);
MaybeIncrementInvalidConnectionCount(connect->node());
@@ -538,10 +556,12 @@
}
if (connect->config_sha256()->string_view() != config_sha256_) {
- LOG_EVERY_T(WARNING, 1.0) << "Client config sha256 of "
- << connect->config_sha256()->string_view()
- << " doesn't match our config sha256 of "
- << config_sha256_ << ", disconnecting client";
+ if (VLOG_IS_ON(1)) {
+ LOG(WARNING) << "Client config sha256 of "
+ << connect->config_sha256()->string_view()
+ << " doesn't match our config sha256 of " << config_sha256_
+ << ", disconnecting client";
+ }
server_.Abort(message->header.rcvinfo.rcv_assoc_id);
MaybeIncrementInvalidConnectionCount(connect->node());
@@ -550,8 +570,10 @@
if (connect->channels_to_transfer()->size() >
static_cast<size_t>(max_channels())) {
- LOG_EVERY_T(WARNING, 1.0)
- << "Client has more channels than we do, disconnecting client";
+ if (VLOG_IS_ON(1)) {
+ LOG(WARNING)
+ << "Client has more channels than we do, disconnecting client";
+ }
server_.Abort(message->header.rcvinfo.rcv_assoc_id);
MaybeIncrementInvalidConnectionCount(connect->node());
@@ -579,7 +601,8 @@
if (channel_state->Matches(channel)) {
node_index = channel_state->NodeConnected(
connect->node(), message->header.rcvinfo.rcv_assoc_id,
- channel_index, &server_, monotonic_now, &reconnected_);
+ channel_index, &server_, &allocator_, monotonic_now,
+ &reconnected_);
CHECK_NE(node_index, -1)
<< ": Failed to find node "
<< aos::FlatbufferToJson(connect->node()) << " for connection "
@@ -589,8 +612,14 @@
}
}
if (!matched) {
- LOG(ERROR) << "Remote tried registering for unknown channel "
- << FlatbufferToJson(channel);
+ if (VLOG_IS_ON(1)) {
+ LOG(ERROR) << "Remote tried registering for unknown channel "
+ << FlatbufferToJson(channel);
+ }
+ server_.Abort(message->header.rcvinfo.rcv_assoc_id);
+
+ MaybeIncrementInvalidConnectionCount(connect->node());
+ return;
}
++channel_index;
}
@@ -629,11 +658,13 @@
message->LogRcvInfo();
}
} else {
- message->LogRcvInfo();
- // TODO(sarah.newman): add some versioning concept such that if this was a
- // fatal error, we would never get here.
- LOG_FIRST_N(ERROR, 20) << "Unexpected stream id "
- << message->header.rcvinfo.rcv_sid;
+ // We should never see the client sending us something on the wrong stream.
+ // Just explode... In theory, this could let a client DOS us, but we trust
+ // the client.
+ if (VLOG_IS_ON(2)) {
+ message->LogRcvInfo();
+ }
+ LOG(FATAL) << "Unexpected stream id " << message->header.rcvinfo.rcv_sid;
}
}
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index 2884e58..fdc0441 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -68,7 +68,7 @@
// This will potentially grow to the number of associations as we find reconnects.
int NodeDisconnected(sctp_assoc_t assoc_id);
int NodeConnected(const Node *node, sctp_assoc_t assoc_id, int stream,
- SctpServer *server,
+ SctpServer *server, FixedAllocator *allocator,
aos::monotonic_clock::time_point monotonic_now,
std::vector<sctp_assoc_t> *reconnected);
@@ -83,10 +83,12 @@
bool Matches(const Channel *other_channel);
// Sends the data in context using the provided server.
- void SendData(SctpServer *server, const Context &context);
+ void SendData(SctpServer *server, FixedAllocator *allocator,
+ const Context &context);
// Packs a context into a size prefixed message header for transmission.
- flatbuffers::FlatBufferBuilder PackContext(const Context &context);
+ flatbuffers::FlatBufferBuilder PackContext(FixedAllocator *allocator,
+ const Context &context);
// Handles reception of delivery times.
void HandleDelivery(sctp_assoc_t rcv_assoc_id, uint16_t ssn,
@@ -109,6 +111,10 @@
// node. It handles the session and dispatches data to the ChannelState.
class MessageBridgeServer {
public:
+ // Size to reserve when building the RemoteData message for the header over
+ // the data size.
+ static constexpr size_t kRemoteDataHeaderMaxSize = 208u;
+
MessageBridgeServer(aos::ShmEventLoop *event_loop, std::string config_sha256);
~MessageBridgeServer() { event_loop_->epoll()->DeleteFd(server_.fd()); }
@@ -152,11 +158,13 @@
// null.
std::vector<std::unique_ptr<ChannelState>> channels_;
- std::string config_sha256_;
+ const std::string config_sha256_;
// List of assoc_id's that have been found already when connecting. This is a
// member variable so the memory is allocated in the constructor.
std::vector<sctp_assoc_t> reconnected_;
+
+ FixedAllocator allocator_;
};
} // namespace message_bridge
diff --git a/aos/network/message_bridge_server_status.cc b/aos/network/message_bridge_server_status.cc
index 8b186b7..df87134 100644
--- a/aos/network/message_bridge_server_status.cc
+++ b/aos/network/message_bridge_server_status.cc
@@ -88,6 +88,7 @@
send_data_(send_data) {
server_connection_offsets_.reserve(
statistics_.message().connections()->size());
+ client_offsets_.reserve(statistics_.message().connections()->size());
filters_.resize(event_loop->configuration()->nodes()->size());
partial_deliveries_.resize(event_loop->configuration()->nodes()->size());
@@ -290,7 +291,7 @@
if (client_statistics_fetcher_.get()) {
// Build up the list of client offsets.
- std::vector<flatbuffers::Offset<ClientOffset>> client_offsets;
+ client_offsets_.clear();
// Iterate through the connections this node has made.
for (const ClientConnection *connection :
@@ -377,10 +378,10 @@
client_offset_builder.add_node(node_offset);
client_offset_builder.add_monotonic_offset(
connection->monotonic_offset());
- client_offsets.emplace_back(client_offset_builder.Finish());
+ client_offsets_.emplace_back(client_offset_builder.Finish());
}
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<ClientOffset>>>
- offsets_offset = fbb->CreateVector(client_offsets);
+ offsets_offset = fbb->CreateVector(client_offsets_);
Timestamp::Builder builder(*fbb);
builder.add_offsets(offsets_offset);
diff --git a/aos/network/message_bridge_server_status.h b/aos/network/message_bridge_server_status.h
index d513a0e..624b376 100644
--- a/aos/network/message_bridge_server_status.h
+++ b/aos/network/message_bridge_server_status.h
@@ -68,7 +68,7 @@
ServerConnection *FindServerConnection(std::string_view node_name);
ServerConnection *FindServerConnection(const Node *node);
- std::vector<ServerConnection *> server_connection() {
+ const std::vector<ServerConnection *> &server_connection() {
return server_connection_;
}
@@ -132,6 +132,8 @@
std::vector<uint32_t> partial_deliveries_;
size_t invalid_connection_count_ = 0u;
+
+ std::vector<flatbuffers::Offset<ClientOffset>> client_offsets_;
};
} // namespace message_bridge