Make sctp buffers big enough for images
We had multiple problems.
1) kernel buffers were too small so the kernel was delivering partial
packets. Fix was to update the rootfs to increase the buffer size
and check the parameter. Secondary fix was to CHECK that received
packets were the size advertized.
2) Client wasn't configuring it's buffers to be the right size.
Configured the socket to make it big enough.
Change-Id: I276e698943aa5714ff2ca8e1ac73d6975d219eb9
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 8f3f394..6b85830 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -89,6 +89,7 @@
],
deps = [
"//aos:unique_malloc_ptr",
+ "//aos/util:file",
"//third_party/lksctp-tools:sctp",
"@com_github_google_glog//:glog",
],
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index 64b9cff..efb3dd0 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -151,6 +151,23 @@
event_loop_->OnRun(
[this]() { connect_timer_->Setup(event_loop_->monotonic_now()); });
+ int max_size = connect_message_.size();
+
+ for (const Channel *channel : *event_loop_->configuration()->channels()) {
+ CHECK(channel->has_source_node());
+
+ if (configuration::ChannelIsSendableOnNode(channel, remote_node_) &&
+ configuration::ChannelIsReadableOnNode(channel, event_loop_->node())) {
+ LOG(INFO) << "Reeiving channel "
+ << configuration::CleanedChannelToString(channel);
+ max_size = std::max(channel->max_size(), max_size);
+ }
+ }
+
+ // Buffer up the max size a bit so everything fits nicely.
+ LOG(INFO) << "Max message size for all servers is " << max_size;
+ client_.SetMaxSize(max_size + 100);
+
event_loop_->epoll()->OnReadable(client_.fd(),
[this]() { MessageReceived(); });
}
@@ -227,6 +244,10 @@
const logger::MessageHeader *message_header =
flatbuffers::GetSizePrefixedRoot<logger::MessageHeader>(message->data());
+ VLOG(1) << "Got a message of size " << message->size;
+ CHECK_EQ(message->size, flatbuffers::GetPrefixedSize(message->data()) +
+ sizeof(flatbuffers::uoffset_t));
+
connection_->mutate_received_packets(connection_->received_packets() + 1);
const int stream = message->header.rcvinfo.rcv_sid - kControlStreams();
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 53afe84..e5222fc 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -82,7 +82,8 @@
flatbuffers::FlatBufferBuilder fbb(channel_->max_size() + 100);
fbb.ForceDefaults(true);
VLOG(1) << "Found " << peers_.size() << " peers on channel "
- << channel_->name()->string_view() << " size " << context.size;
+ << channel_->name()->string_view() << " "
+ << channel_->type()->string_view() << " size " << context.size;
// TODO(austin): Use an iovec to build it up in 3 parts to avoid the copy?
// Only useful when not logging.
@@ -164,7 +165,12 @@
void ChannelState::HandleFailure(
SizePrefixedFlatbufferDetachedBuffer<logger::MessageHeader> &&message) {
// TODO(austin): Put it in the log queue.
- LOG(INFO) << "Failed to send " << FlatbufferToJson(message);
+ if (VLOG_IS_ON(2)) {
+ LOG(INFO) << "Failed to send " << FlatbufferToJson(message);
+ } else if (VLOG_IS_ON(1)) {
+ message.mutable_message()->clear_data();
+ LOG(INFO) << "Failed to send " << FlatbufferToJson(message);
+ }
// Note: this may be really out of order when we avoid the queue... We
// have the ones we know didn't make it immediately, and the ones which
@@ -375,7 +381,7 @@
} break;
}
- if (VLOG_IS_ON(1)) {
+ if (VLOG_IS_ON(2)) {
PrintNotification(message.get());
}
} else if (message->message_type == Message::kMessage) {
diff --git a/aos/network/sctp_client.h b/aos/network/sctp_client.h
index 5b6df3b..6b84638 100644
--- a/aos/network/sctp_client.h
+++ b/aos/network/sctp_client.h
@@ -43,7 +43,19 @@
void LogSctpStatus(sctp_assoc_t assoc_id);
- void set_max_size(size_t max_size) { max_size_ = max_size; }
+ void SetMaxSize(size_t max_size) {
+ max_size_ = max_size;
+ // Have the kernel give us a factor of 10 more. This lets us have more than
+ // one full sized packet in flight.
+ max_size = max_size * 10;
+
+ CHECK_GE(ReadRMemMax(), max_size);
+ CHECK_GE(ReadWMemMax(), max_size);
+ PCHECK(setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &max_size,
+ sizeof(max_size)) == 0);
+ PCHECK(setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &max_size,
+ sizeof(max_size)) == 0);
+ }
private:
struct sockaddr_storage sockaddr_remote_;
diff --git a/aos/network/sctp_lib.cc b/aos/network/sctp_lib.cc
index 7e32bb4..1e985f3 100644
--- a/aos/network/sctp_lib.cc
+++ b/aos/network/sctp_lib.cc
@@ -4,9 +4,14 @@
#include <net/if.h>
#include <netdb.h>
#include <netinet/sctp.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
#include <string_view>
+#include "aos/util/file.h"
+
DEFINE_string(interface, "", "ipv6 interface");
namespace aos {
@@ -193,8 +198,6 @@
PCHECK((size = recvmsg(fd, &inmessage, 0)) > 0);
result->size = size;
- CHECK_LE(size, max_size) << ": Message overflowed buffer.";
-
if ((MSG_NOTIFICATION & inmessage.msg_flags)) {
result->message_type = Message::kNotification;
} else {
@@ -214,6 +217,9 @@
}
}
+ CHECK_LE(size, max_size) << ": Message overflowed buffer on stream "
+ << result->header.rcvinfo.rcv_sid << ".";
+
return result;
}
@@ -226,5 +232,29 @@
<< " cumtsn=" << header.rcvinfo.rcv_cumtsn << ")";
}
+size_t ReadRMemMax() {
+ struct stat current_stat;
+ if (stat("/proc/sys/net/core/rmem_max", ¤t_stat) != -1) {
+ return static_cast<size_t>(
+ std::stoi(util::ReadFileToStringOrDie("/proc/sys/net/core/rmem_max")));
+ } else {
+ LOG(WARNING) << "/proc/sys/net/core/rmem_max doesn't exist. Are you in a "
+ "container?";
+ return 212992;
+ }
+}
+
+size_t ReadWMemMax() {
+ struct stat current_stat;
+ if (stat("/proc/sys/net/core/wmem_max", ¤t_stat) != -1) {
+ return static_cast<size_t>(
+ std::stoi(util::ReadFileToStringOrDie("/proc/sys/net/core/wmem_max")));
+ } else {
+ LOG(WARNING) << "/proc/sys/net/core/wmem_max doesn't exist. Are you in a "
+ "container?";
+ return 212992;
+ }
+}
+
} // namespace message_bridge
} // namespace aos
diff --git a/aos/network/sctp_lib.h b/aos/network/sctp_lib.h
index 0f90f87..9a1274a 100644
--- a/aos/network/sctp_lib.h
+++ b/aos/network/sctp_lib.h
@@ -72,6 +72,11 @@
// Read and allocate a message.
aos::unique_c_ptr<Message> ReadSctpMessage(int fd, int max_size);
+// Returns the max network buffer available for reading for a socket.
+size_t ReadRMemMax();
+// Returns the max network buffer available for writing for a socket.
+size_t ReadWMemMax();
+
} // namespace message_bridge
} // namespace aos
diff --git a/aos/network/sctp_server.h b/aos/network/sctp_server.h
index b702aa8..25d81fc 100644
--- a/aos/network/sctp_server.h
+++ b/aos/network/sctp_server.h
@@ -51,8 +51,13 @@
// Have the kernel give us a factor of 10 more. This lets us have more than
// one full sized packet in flight.
max_size = max_size * 10;
+
+ CHECK_GE(ReadRMemMax(), max_size);
+ CHECK_GE(ReadWMemMax(), max_size);
PCHECK(setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &max_size,
sizeof(max_size)) == 0);
+ PCHECK(setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &max_size,
+ sizeof(max_size)) == 0);
}
private: