Fix (and test) sending large messages over message_bridge

Brennan installed message_bridge and it didn't start.  Whops...  I added
a test to reproduce it too.

Change-Id: I54e3e76af5588760599445660876071be37440df
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index d1c07eb..c43fc9a 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -128,7 +128,9 @@
   event_loop_->OnRun(
       [this]() { connect_timer_->Setup(event_loop_->monotonic_now()); });
 
-  int max_size = connect_message_.span().size();
+  size_t max_write_size =
+      std::max(static_cast<size_t>(208u), connect_message_.span().size());
+  size_t max_read_size = 0u;
 
   for (const Channel *channel : *event_loop_->configuration()->channels()) {
     CHECK(channel->has_source_node());
@@ -137,15 +139,18 @@
         configuration::ChannelIsReadableOnNode(channel, event_loop_->node())) {
       VLOG(1) << "Receiving channel "
               << configuration::CleanedChannelToString(channel);
-      max_size = std::max(channel->max_size(), max_size);
+      max_read_size = std::max(static_cast<size_t>(channel->max_size() + 208u),
+                               max_read_size);
     }
   }
 
   // Buffer up the max size a bit so everything fits nicely.
-  LOG(INFO) << "Max message size for all servers is " << max_size;
+  LOG(INFO) << "Max read message size for all servers is " << max_read_size;
+  LOG(INFO) << "Max write message size for all servers is " << max_write_size;
   // RemoteMessage header appears to be between 100 and 204 bytes of overhead
   // from the vector of data.  No need to get super tight to that bound.
-  client_.SetMaxSize(max_size + 204);
+  client_.SetMaxReadSize(max_read_size);
+  client_.SetMaxWriteSize(max_write_size);
 
   // 1 client talks to 1 server.  With interleaving support 1 turned on, we'll
   // at most see 1 partial message, and 1 incoming part, for a total of 2
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 6cea1da..ce68402 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -403,7 +403,8 @@
 
   // Buffer up the max size a bit so everything fits nicely.
   LOG(INFO) << "Max message size for all clients is " << max_size;
-  server_.SetMaxSize(max_size);
+  server_.SetMaxReadSize(max_size);
+  server_.SetMaxWriteSize(max_channel_size + kRemoteDataHeaderMaxSize);
 
   // Since we are doing interleaving mode 1, we will see at most 1 message being
   // delivered at a time for an association.  That means, if a message is
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 02e6a02..681001e 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -357,6 +357,8 @@
   MakePi1Server();
   MakePi1Client();
 
+  const std::string long_data = std::string(10000, 'a');
+
   // And build the app which sends the pings.
   FLAGS_application_name = "ping";
   aos::ShmEventLoop ping_event_loop(&config.message());
@@ -422,7 +424,8 @@
   int ping_count = 0;
   int pi1_server_statistics_count = 0;
   ping_event_loop.MakeWatcher("/pi1/aos", [this, &ping_count, &ping_sender,
-                                           &pi1_server_statistics_count](
+                                           &pi1_server_statistics_count,
+                                           &long_data](
                                               const ServerStatistics &stats) {
     VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
 
@@ -460,6 +463,7 @@
     if (connected) {
       VLOG(1) << "Connected!  Sent ping.";
       auto builder = ping_sender.MakeBuilder();
+      builder.fbb()->CreateString(long_data);
       examples::Ping::Builder ping_builder =
           builder.MakeBuilder<examples::Ping>();
       ping_builder.add_value(ping_count + 971);
diff --git a/aos/network/message_bridge_test_combined_timestamps_common.json b/aos/network/message_bridge_test_combined_timestamps_common.json
index 5d82965..13a0514 100644
--- a/aos/network/message_bridge_test_combined_timestamps_common.json
+++ b/aos/network/message_bridge_test_combined_timestamps_common.json
@@ -110,7 +110,8 @@
           "timestamp_logger": "REMOTE_LOGGER",
           "timestamp_logger_nodes": ["pi1"]
         }
-      ]
+      ],
+      "max_size": 20480
     },
     {
       "name": "/test",
@@ -125,7 +126,8 @@
           "timestamp_logger": "REMOTE_LOGGER",
           "timestamp_logger_nodes": ["pi1"]
         }
-      ]
+      ],
+      "max_size": 20480
     },
     {
       "name": "/unreliable",
@@ -139,7 +141,8 @@
           "timestamp_logger_nodes": ["pi1"],
           "time_to_live": 5000000
         }
-      ]
+      ],
+      "max_size": 20480
     }
   ],
   "maps": [
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
index a30734d..dad1675 100644
--- a/aos/network/message_bridge_test_common.json
+++ b/aos/network/message_bridge_test_common.json
@@ -126,7 +126,8 @@
           "timestamp_logger": "REMOTE_LOGGER",
           "timestamp_logger_nodes": ["pi1"]
         }
-      ]
+      ],
+      "max_size": 20480
     },
     {
       "name": "/test",
@@ -141,7 +142,8 @@
           "timestamp_logger": "REMOTE_LOGGER",
           "timestamp_logger_nodes": ["pi2"]
         }
-      ]
+      ],
+      "max_size": 20480
     },
     {
       "name": "/unreliable",
@@ -155,7 +157,8 @@
           "timestamp_logger_nodes": ["pi1"],
           "time_to_live": 5000000
         }
-      ]
+      ],
+      "max_size": 20480
     }
   ],
   "maps": [
diff --git a/aos/network/sctp_client.h b/aos/network/sctp_client.h
index 1a0a907..d7a2b43 100644
--- a/aos/network/sctp_client.h
+++ b/aos/network/sctp_client.h
@@ -47,7 +47,8 @@
 
   void LogSctpStatus(sctp_assoc_t assoc_id);
 
-  void SetMaxSize(size_t max_size) { sctp_.SetMaxSize(max_size); }
+  void SetMaxReadSize(size_t max_size) { sctp_.SetMaxReadSize(max_size); }
+  void SetMaxWriteSize(size_t max_size) { sctp_.SetMaxWriteSize(max_size); }
   void SetPoolSize(size_t pool_size) { sctp_.SetPoolSize(pool_size); }
 
   void SetAssociationId(sctp_assoc_t sac_assoc_id) {
diff --git a/aos/network/sctp_lib.cc b/aos/network/sctp_lib.cc
index 3729f9f..3f482c3 100644
--- a/aos/network/sctp_lib.cc
+++ b/aos/network/sctp_lib.cc
@@ -353,7 +353,7 @@
   if (!use_pool_) {
     constexpr size_t kMessageAlign = alignof(Message);
     const size_t max_message_size =
-        ((sizeof(Message) + max_size_ + 1 + (kMessageAlign - 1)) /
+        ((sizeof(Message) + max_read_size_ + 1 + (kMessageAlign - 1)) /
          kMessageAlign) *
         kMessageAlign;
     aos::unique_c_ptr<Message> result(reinterpret_cast<Message *>(
@@ -379,7 +379,7 @@
     memset(&inmessage, 0, sizeof(struct msghdr));
 
     struct iovec iov;
-    iov.iov_len = max_size_ + 1;
+    iov.iov_len = max_read_size_ + 1;
     iov.iov_base = result->mutable_data();
 
     inmessage.msg_iov = &iov;
@@ -404,7 +404,7 @@
     CHECK(!(inmessage.msg_flags & MSG_CTRUNC))
         << ": Control message truncated.";
 
-    CHECK_LE(size, static_cast<ssize_t>(max_size_))
+    CHECK_LE(size, static_cast<ssize_t>(max_read_size_))
         << ": Message overflowed buffer on stream "
         << result->header.rcvinfo.rcv_sid << ".";
 
@@ -472,7 +472,7 @@
           << result->header.rcvinfo.rcv_assoc_id;
 
       // Now copy the data over and update the size.
-      CHECK_LE(partial_message->size + result->size, max_size_)
+      CHECK_LE(partial_message->size + result->size, max_read_size_)
           << ": Assembled fragments overflowed buffer on stream "
           << result->header.rcvinfo.rcv_sid << ".";
       memcpy(partial_message->mutable_data() + partial_message->size,
@@ -568,10 +568,10 @@
 }
 
 void SctpReadWrite::DoSetMaxSize() {
-  size_t max_size = max_size_;
+  size_t max_size = max_write_size_;
 
   // This sets the max packet size that we can send.
-  CHECK_GE(ReadWMemMax(), max_size)
+  CHECK_GE(ReadWMemMax(), max_write_size_)
       << "wmem_max is too low. To increase wmem_max temporarily, do sysctl "
          "-w net.core.wmem_max="
       << max_size;
diff --git a/aos/network/sctp_lib.h b/aos/network/sctp_lib.h
index 06152d8..a852365 100644
--- a/aos/network/sctp_lib.h
+++ b/aos/network/sctp_lib.h
@@ -105,11 +105,21 @@
 
   int fd() const { return fd_; }
 
-  void SetMaxSize(size_t max_size) {
+  void SetMaxReadSize(size_t max_size) {
     CHECK(partial_messages_.empty())
         << ": May not update size with queued fragments because we do not "
            "track individual message sizes";
-    max_size_ = max_size;
+    max_read_size_ = max_size;
+    if (fd_ != -1) {
+      DoSetMaxSize();
+    }
+  }
+
+  void SetMaxWriteSize(size_t max_size) {
+    CHECK(partial_messages_.empty())
+        << ": May not update size with queued fragments because we do not "
+           "track individual message sizes";
+    max_write_size_ = max_size;
     if (fd_ != -1) {
       DoSetMaxSize();
     }
@@ -136,7 +146,8 @@
   // We use this as a unique identifier that just increments for each message.
   uint32_t send_ppid_ = 0;
 
-  size_t max_size_ = 1000;
+  size_t max_read_size_ = 1000;
+  size_t max_write_size_ = 1000;
 
   std::vector<aos::unique_c_ptr<Message>> partial_messages_;
 
diff --git a/aos/network/sctp_server.cc b/aos/network/sctp_server.cc
index 0bac4c7..a78aa34 100644
--- a/aos/network/sctp_server.cc
+++ b/aos/network/sctp_server.cc
@@ -64,7 +64,8 @@
 
     PCHECK(listen(fd(), 100) == 0);
 
-    SetMaxSize(1000);
+    SetMaxReadSize(1000);
+    SetMaxWriteSize(1000);
     break;
   }
 }
diff --git a/aos/network/sctp_server.h b/aos/network/sctp_server.h
index 1f7e2ee..800c2c1 100644
--- a/aos/network/sctp_server.h
+++ b/aos/network/sctp_server.h
@@ -57,7 +57,8 @@
   void SetStreamPriority(sctp_assoc_t assoc_id, int stream_id,
                          uint16_t priority);
 
-  void SetMaxSize(size_t max_size) { sctp_.SetMaxSize(max_size); }
+  void SetMaxReadSize(size_t max_size) { sctp_.SetMaxReadSize(max_size); }
+  void SetMaxWriteSize(size_t max_size) { sctp_.SetMaxWriteSize(max_size); }
 
   void SetPoolSize(size_t pool_size) { sctp_.SetPoolSize(pool_size); }