Fix (and test) sending large messages over message_bridge
Brennan installed message_bridge and it didn't start. Whops... I added
a test to reproduce it too.
Change-Id: I54e3e76af5588760599445660876071be37440df
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index d1c07eb..c43fc9a 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -128,7 +128,9 @@
event_loop_->OnRun(
[this]() { connect_timer_->Setup(event_loop_->monotonic_now()); });
- int max_size = connect_message_.span().size();
+ size_t max_write_size =
+ std::max(static_cast<size_t>(208u), connect_message_.span().size());
+ size_t max_read_size = 0u;
for (const Channel *channel : *event_loop_->configuration()->channels()) {
CHECK(channel->has_source_node());
@@ -137,15 +139,18 @@
configuration::ChannelIsReadableOnNode(channel, event_loop_->node())) {
VLOG(1) << "Receiving channel "
<< configuration::CleanedChannelToString(channel);
- max_size = std::max(channel->max_size(), max_size);
+ max_read_size = std::max(static_cast<size_t>(channel->max_size() + 208u),
+ max_read_size);
}
}
// Buffer up the max size a bit so everything fits nicely.
- LOG(INFO) << "Max message size for all servers is " << max_size;
+ LOG(INFO) << "Max read message size for all servers is " << max_read_size;
+ LOG(INFO) << "Max write message size for all servers is " << max_write_size;
// RemoteMessage header appears to be between 100 and 204 bytes of overhead
// from the vector of data. No need to get super tight to that bound.
- client_.SetMaxSize(max_size + 204);
+ client_.SetMaxReadSize(max_read_size);
+ client_.SetMaxWriteSize(max_write_size);
// 1 client talks to 1 server. With interleaving support 1 turned on, we'll
// at most see 1 partial message, and 1 incoming part, for a total of 2
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 6cea1da..ce68402 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -403,7 +403,8 @@
// Buffer up the max size a bit so everything fits nicely.
LOG(INFO) << "Max message size for all clients is " << max_size;
- server_.SetMaxSize(max_size);
+ server_.SetMaxReadSize(max_size);
+ server_.SetMaxWriteSize(max_channel_size + kRemoteDataHeaderMaxSize);
// Since we are doing interleaving mode 1, we will see at most 1 message being
// delivered at a time for an association. That means, if a message is
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 02e6a02..681001e 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -357,6 +357,8 @@
MakePi1Server();
MakePi1Client();
+ const std::string long_data = std::string(10000, 'a');
+
// And build the app which sends the pings.
FLAGS_application_name = "ping";
aos::ShmEventLoop ping_event_loop(&config.message());
@@ -422,7 +424,8 @@
int ping_count = 0;
int pi1_server_statistics_count = 0;
ping_event_loop.MakeWatcher("/pi1/aos", [this, &ping_count, &ping_sender,
- &pi1_server_statistics_count](
+ &pi1_server_statistics_count,
+ &long_data](
const ServerStatistics &stats) {
VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
@@ -460,6 +463,7 @@
if (connected) {
VLOG(1) << "Connected! Sent ping.";
auto builder = ping_sender.MakeBuilder();
+ builder.fbb()->CreateString(long_data);
examples::Ping::Builder ping_builder =
builder.MakeBuilder<examples::Ping>();
ping_builder.add_value(ping_count + 971);
diff --git a/aos/network/message_bridge_test_combined_timestamps_common.json b/aos/network/message_bridge_test_combined_timestamps_common.json
index 5d82965..13a0514 100644
--- a/aos/network/message_bridge_test_combined_timestamps_common.json
+++ b/aos/network/message_bridge_test_combined_timestamps_common.json
@@ -110,7 +110,8 @@
"timestamp_logger": "REMOTE_LOGGER",
"timestamp_logger_nodes": ["pi1"]
}
- ]
+ ],
+ "max_size": 20480
},
{
"name": "/test",
@@ -125,7 +126,8 @@
"timestamp_logger": "REMOTE_LOGGER",
"timestamp_logger_nodes": ["pi1"]
}
- ]
+ ],
+ "max_size": 20480
},
{
"name": "/unreliable",
@@ -139,7 +141,8 @@
"timestamp_logger_nodes": ["pi1"],
"time_to_live": 5000000
}
- ]
+ ],
+ "max_size": 20480
}
],
"maps": [
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
index a30734d..dad1675 100644
--- a/aos/network/message_bridge_test_common.json
+++ b/aos/network/message_bridge_test_common.json
@@ -126,7 +126,8 @@
"timestamp_logger": "REMOTE_LOGGER",
"timestamp_logger_nodes": ["pi1"]
}
- ]
+ ],
+ "max_size": 20480
},
{
"name": "/test",
@@ -141,7 +142,8 @@
"timestamp_logger": "REMOTE_LOGGER",
"timestamp_logger_nodes": ["pi2"]
}
- ]
+ ],
+ "max_size": 20480
},
{
"name": "/unreliable",
@@ -155,7 +157,8 @@
"timestamp_logger_nodes": ["pi1"],
"time_to_live": 5000000
}
- ]
+ ],
+ "max_size": 20480
}
],
"maps": [
diff --git a/aos/network/sctp_client.h b/aos/network/sctp_client.h
index 1a0a907..d7a2b43 100644
--- a/aos/network/sctp_client.h
+++ b/aos/network/sctp_client.h
@@ -47,7 +47,8 @@
void LogSctpStatus(sctp_assoc_t assoc_id);
- void SetMaxSize(size_t max_size) { sctp_.SetMaxSize(max_size); }
+ void SetMaxReadSize(size_t max_size) { sctp_.SetMaxReadSize(max_size); }
+ void SetMaxWriteSize(size_t max_size) { sctp_.SetMaxWriteSize(max_size); }
void SetPoolSize(size_t pool_size) { sctp_.SetPoolSize(pool_size); }
void SetAssociationId(sctp_assoc_t sac_assoc_id) {
diff --git a/aos/network/sctp_lib.cc b/aos/network/sctp_lib.cc
index 3729f9f..3f482c3 100644
--- a/aos/network/sctp_lib.cc
+++ b/aos/network/sctp_lib.cc
@@ -353,7 +353,7 @@
if (!use_pool_) {
constexpr size_t kMessageAlign = alignof(Message);
const size_t max_message_size =
- ((sizeof(Message) + max_size_ + 1 + (kMessageAlign - 1)) /
+ ((sizeof(Message) + max_read_size_ + 1 + (kMessageAlign - 1)) /
kMessageAlign) *
kMessageAlign;
aos::unique_c_ptr<Message> result(reinterpret_cast<Message *>(
@@ -379,7 +379,7 @@
memset(&inmessage, 0, sizeof(struct msghdr));
struct iovec iov;
- iov.iov_len = max_size_ + 1;
+ iov.iov_len = max_read_size_ + 1;
iov.iov_base = result->mutable_data();
inmessage.msg_iov = &iov;
@@ -404,7 +404,7 @@
CHECK(!(inmessage.msg_flags & MSG_CTRUNC))
<< ": Control message truncated.";
- CHECK_LE(size, static_cast<ssize_t>(max_size_))
+ CHECK_LE(size, static_cast<ssize_t>(max_read_size_))
<< ": Message overflowed buffer on stream "
<< result->header.rcvinfo.rcv_sid << ".";
@@ -472,7 +472,7 @@
<< result->header.rcvinfo.rcv_assoc_id;
// Now copy the data over and update the size.
- CHECK_LE(partial_message->size + result->size, max_size_)
+ CHECK_LE(partial_message->size + result->size, max_read_size_)
<< ": Assembled fragments overflowed buffer on stream "
<< result->header.rcvinfo.rcv_sid << ".";
memcpy(partial_message->mutable_data() + partial_message->size,
@@ -568,10 +568,10 @@
}
void SctpReadWrite::DoSetMaxSize() {
- size_t max_size = max_size_;
+ size_t max_size = max_write_size_;
// This sets the max packet size that we can send.
- CHECK_GE(ReadWMemMax(), max_size)
+ CHECK_GE(ReadWMemMax(), max_write_size_)
<< "wmem_max is too low. To increase wmem_max temporarily, do sysctl "
"-w net.core.wmem_max="
<< max_size;
diff --git a/aos/network/sctp_lib.h b/aos/network/sctp_lib.h
index 06152d8..a852365 100644
--- a/aos/network/sctp_lib.h
+++ b/aos/network/sctp_lib.h
@@ -105,11 +105,21 @@
int fd() const { return fd_; }
- void SetMaxSize(size_t max_size) {
+ void SetMaxReadSize(size_t max_size) {
CHECK(partial_messages_.empty())
<< ": May not update size with queued fragments because we do not "
"track individual message sizes";
- max_size_ = max_size;
+ max_read_size_ = max_size;
+ if (fd_ != -1) {
+ DoSetMaxSize();
+ }
+ }
+
+ void SetMaxWriteSize(size_t max_size) {
+ CHECK(partial_messages_.empty())
+ << ": May not update size with queued fragments because we do not "
+ "track individual message sizes";
+ max_write_size_ = max_size;
if (fd_ != -1) {
DoSetMaxSize();
}
@@ -136,7 +146,8 @@
// We use this as a unique identifier that just increments for each message.
uint32_t send_ppid_ = 0;
- size_t max_size_ = 1000;
+ size_t max_read_size_ = 1000;
+ size_t max_write_size_ = 1000;
std::vector<aos::unique_c_ptr<Message>> partial_messages_;
diff --git a/aos/network/sctp_server.cc b/aos/network/sctp_server.cc
index 0bac4c7..a78aa34 100644
--- a/aos/network/sctp_server.cc
+++ b/aos/network/sctp_server.cc
@@ -64,7 +64,8 @@
PCHECK(listen(fd(), 100) == 0);
- SetMaxSize(1000);
+ SetMaxReadSize(1000);
+ SetMaxWriteSize(1000);
break;
}
}
diff --git a/aos/network/sctp_server.h b/aos/network/sctp_server.h
index 1f7e2ee..800c2c1 100644
--- a/aos/network/sctp_server.h
+++ b/aos/network/sctp_server.h
@@ -57,7 +57,8 @@
void SetStreamPriority(sctp_assoc_t assoc_id, int stream_id,
uint16_t priority);
- void SetMaxSize(size_t max_size) { sctp_.SetMaxSize(max_size); }
+ void SetMaxReadSize(size_t max_size) { sctp_.SetMaxReadSize(max_size); }
+ void SetMaxWriteSize(size_t max_size) { sctp_.SetMaxWriteSize(max_size); }
void SetPoolSize(size_t pool_size) { sctp_.SetPoolSize(pool_size); }