Move ReadSctpMessage to a class

In preparation for reassembling partial messages in userspace.

Change-Id: Ifa530698058ea775362eee4ec1bf9e6e0d3dd5de
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/network/sctp_client.cc b/aos/network/sctp_client.cc
index 9e77a84..113b525 100644
--- a/aos/network/sctp_client.cc
+++ b/aos/network/sctp_client.cc
@@ -19,42 +19,23 @@
 SctpClient::SctpClient(std::string_view remote_host, int remote_port,
                        int streams, std::string_view local_host, int local_port)
     : sockaddr_remote_(ResolveSocket(remote_host, remote_port)),
-      sockaddr_local_(ResolveSocket(local_host, local_port)),
-      fd_(socket(sockaddr_local_.ss_family, SOCK_SEQPACKET, IPPROTO_SCTP)) {
-  LOG(INFO) << "socket(" << Family(sockaddr_local_)
-            << ", SOCK_SEQPACKET, IPPROTOSCTP) = " << fd_;
-  PCHECK(fd_ != -1);
-
-  {
-    // Per https://tools.ietf.org/html/rfc6458
-    // Setting this to !0 allows event notifications to be interleaved
-    // with data if enabled, and would have to be handled in the code.
-    // Enabling interleaving would only matter during congestion, which
-    // typically only happens during application startup.
-    int interleaving = 0;
-    PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE,
-                      &interleaving, sizeof(interleaving)) == 0);
-  }
+      sockaddr_local_(ResolveSocket(local_host, local_port)) {
+  sctp_.OpenSocket(sockaddr_local_);
 
   {
     struct sctp_initmsg initmsg;
     memset(&initmsg, 0, sizeof(struct sctp_initmsg));
     initmsg.sinit_num_ostreams = streams;
     initmsg.sinit_max_instreams = streams;
-    PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
+    PCHECK(setsockopt(fd(), IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
                       sizeof(struct sctp_initmsg)) == 0);
   }
 
   {
-    int on = 1;
-    PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_RECVRCVINFO, &on, sizeof(int)) ==
-           0);
-  }
-  {
     // Servers send promptly.  Clients don't.
     // TODO(austin): Revisit this assumption when we have time sync.
     int on = 0;
-    PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_NODELAY, &on, sizeof(int)) == 0);
+    PCHECK(setsockopt(fd(), IPPROTO_SCTP, SCTP_NODELAY, &on, sizeof(int)) == 0);
   }
 
   {
@@ -64,70 +45,15 @@
     memset(&subscribe, 0, sizeof(subscribe));
     subscribe.sctp_association_event = 1;
     subscribe.sctp_stream_change_event = 1;
-    PCHECK(setsockopt(fd_, SOL_SCTP, SCTP_EVENTS, (char *)&subscribe,
+    PCHECK(setsockopt(fd(), SOL_SCTP, SCTP_EVENTS, (char *)&subscribe,
                       sizeof(subscribe)) == 0);
   }
 
-  PCHECK(bind(fd_, (struct sockaddr *)&sockaddr_local_,
+  PCHECK(bind(fd(), (struct sockaddr *)&sockaddr_local_,
               sockaddr_local_.ss_family == AF_INET6
                   ? sizeof(struct sockaddr_in6)
                   : sizeof(struct sockaddr_in)) == 0);
-  VLOG(1) << "bind(" << fd_ << ", " << Address(sockaddr_local_) << ")";
-}
-
-aos::unique_c_ptr<Message> SctpClient::Read() {
-  return ReadSctpMessage(fd_, max_size_);
-}
-
-bool SctpClient::Send(int stream, std::string_view data, int time_to_live) {
-  struct iovec iov;
-  iov.iov_base = const_cast<char *>(data.data());
-  iov.iov_len = data.size();
-
-  struct msghdr outmsg;
-  // Target to send to.
-  outmsg.msg_name = &sockaddr_remote_;
-  outmsg.msg_namelen = sizeof(struct sockaddr_storage);
-  VLOG(1) << "Sending to " << Address(sockaddr_remote_);
-
-  // Data to send.
-  outmsg.msg_iov = &iov;
-  outmsg.msg_iovlen = 1;
-
-  // Build up the sndinfo message.
-  char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
-  outmsg.msg_control = outcmsg;
-  outmsg.msg_controllen = sizeof(outcmsg);
-  outmsg.msg_flags = 0;
-
-  struct cmsghdr *cmsg = CMSG_FIRSTHDR(&outmsg);
-  cmsg->cmsg_level = IPPROTO_SCTP;
-  cmsg->cmsg_type = SCTP_SNDRCV;
-  cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
-
-  outmsg.msg_controllen = cmsg->cmsg_len;
-  struct sctp_sndrcvinfo *sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
-  memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
-  sinfo->sinfo_ppid = rand();
-  sinfo->sinfo_stream = stream;
-  sinfo->sinfo_context = 19;
-  sinfo->sinfo_flags = 0;
-  sinfo->sinfo_timetolive = time_to_live;
-
-  // And send.
-  const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
-  if (size == -1) {
-    if (errno != EPIPE && errno != EAGAIN && errno != ESHUTDOWN) {
-      PCHECK(size == static_cast<ssize_t>(data.size()));
-    } else {
-      return false;
-    }
-  } else {
-    CHECK_EQ(static_cast<ssize_t>(data.size()), size);
-  }
-
-  VLOG(1) << "Sent " << data.size();
-  return true;
+  VLOG(1) << "bind(" << fd() << ", " << Address(sockaddr_local_) << ")";
 }
 
 void SctpClient::LogSctpStatus(sctp_assoc_t assoc_id) {
diff --git a/aos/network/sctp_client.h b/aos/network/sctp_client.h
index bc3d1f6..9356612 100644
--- a/aos/network/sctp_client.h
+++ b/aos/network/sctp_client.h
@@ -18,18 +18,18 @@
   SctpClient(std::string_view remote_host, int remote_port, int streams,
              std::string_view local_host = "0.0.0.0", int local_port = 9971);
 
-  ~SctpClient() {
-    LOG(INFO) << "close(" << fd_ << ")";
-    PCHECK(close(fd_) == 0);
-  }
+  ~SctpClient() {}
 
   // Receives the next packet from the remote.
-  aos::unique_c_ptr<Message> Read();
+  aos::unique_c_ptr<Message> Read() { return sctp_.ReadMessage(); }
 
   // Sends a block of data on a stream with a TTL.
-  bool Send(int stream, std::string_view data, int time_to_live);
+  // TODO(austin): time_to_live should be a chrono::duration
+  bool Send(int stream, std::string_view data, int time_to_live) {
+    return sctp_.SendMessage(stream, data, time_to_live, sockaddr_remote_, 0);
+  }
 
-  int fd() { return fd_; }
+  int fd() { return sctp_.fd(); }
 
   // Enables the priority scheduler.  This is a SCTP feature which lets us
   // configure the priority per stream so that higher priority packets don't get
@@ -43,30 +43,12 @@
 
   void LogSctpStatus(sctp_assoc_t assoc_id);
 
-  void SetMaxSize(size_t max_size) {
-    max_size_ = max_size;
-    // Have the kernel give us a factor of 10 more.  This lets us have more than
-    // one full sized packet in flight.
-    max_size = max_size * 10;
-
-    CHECK_GE(ReadRMemMax(), max_size)
-        << "rmem_max is too low. To increase rmem_max temporarily, do sysctl "
-           "-w net.core.rmem_max=NEW_SIZE";
-    CHECK_GE(ReadWMemMax(), max_size)
-        << "wmem_max is too low. To increase wmem_max temporarily, do sysctl "
-           "-w net.core.wmem_max=NEW_SIZE";
-    PCHECK(setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &max_size,
-                      sizeof(max_size)) == 0);
-    PCHECK(setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &max_size,
-                      sizeof(max_size)) == 0);
-  }
+  void SetMaxSize(size_t max_size) { sctp_.SetMaxSize(max_size); }
 
  private:
   struct sockaddr_storage sockaddr_remote_;
   struct sockaddr_storage sockaddr_local_;
-  int fd_;
-
-  size_t max_size_ = 1000;
+  SctpReadWrite sctp_;
 };
 
 }  // namespace message_bridge
diff --git a/aos/network/sctp_lib.cc b/aos/network/sctp_lib.cc
index c5e2080..07bc86c 100644
--- a/aos/network/sctp_lib.cc
+++ b/aos/network/sctp_lib.cc
@@ -185,9 +185,93 @@
             << " sstat_primary.spinfo_rto:" << status.sstat_primary.spinfo_rto;
 }
 
-aos::unique_c_ptr<Message> ReadSctpMessage(int fd, size_t max_size) {
+void SctpReadWrite::OpenSocket(const struct sockaddr_storage &sockaddr_local) {
+  fd_ = socket(sockaddr_local.ss_family, SOCK_SEQPACKET, IPPROTO_SCTP);
+  PCHECK(fd_ != -1);
+  LOG(INFO) << "socket(" << Family(sockaddr_local)
+            << ", SOCK_SEQPACKET, IPPROTOSCTP) = " << fd_;
+  {
+    // Per https://tools.ietf.org/html/rfc6458
+    // Setting this to !0 allows event notifications to be interleaved
+    // with data if enabled, and would have to be handled in the code.
+    // Enabling interleaving would only matter during congestion, which
+    // typically only happens during application startup.
+    int interleaving = 0;
+    PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE,
+                      &interleaving, sizeof(interleaving)) == 0);
+  }
+  {
+    // Enable recvinfo when a packet arrives.
+    int on = 1;
+    PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_RECVRCVINFO, &on, sizeof(int)) ==
+           0);
+  }
+
+  DoSetMaxSize();
+}
+
+bool SctpReadWrite::SendMessage(
+    int stream, std::string_view data, int time_to_live,
+    std::optional<struct sockaddr_storage> sockaddr_remote,
+    sctp_assoc_t snd_assoc_id) {
+  CHECK(fd_ != -1);
+  struct iovec iov;
+  iov.iov_base = const_cast<char *>(data.data());
+  iov.iov_len = data.size();
+
+  // Use the assoc_id for the destination instead of the msg_name.
+  struct msghdr outmsg;
+  if (sockaddr_remote) {
+    outmsg.msg_name = &*sockaddr_remote;
+    outmsg.msg_namelen = sizeof(*sockaddr_remote);
+    VLOG(1) << "Sending to " << Address(*sockaddr_remote);
+  } else {
+    outmsg.msg_namelen = 0;
+  }
+
+  // Data to send.
+  outmsg.msg_iov = &iov;
+  outmsg.msg_iovlen = 1;
+
+  // Build up the sndinfo message.
+  char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
+  outmsg.msg_control = outcmsg;
+  outmsg.msg_controllen = sizeof(outcmsg);
+  outmsg.msg_flags = 0;
+
+  struct cmsghdr *cmsg = CMSG_FIRSTHDR(&outmsg);
+  cmsg->cmsg_level = IPPROTO_SCTP;
+  cmsg->cmsg_type = SCTP_SNDRCV;
+  cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
+
+  struct sctp_sndrcvinfo *sinfo =
+      reinterpret_cast<struct sctp_sndrcvinfo *>(CMSG_DATA(cmsg));
+  memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
+  sinfo->sinfo_ppid = ++send_ppid_;
+  sinfo->sinfo_stream = stream;
+  sinfo->sinfo_flags = 0;
+  sinfo->sinfo_assoc_id = snd_assoc_id;
+  sinfo->sinfo_timetolive = time_to_live;
+
+  // And send.
+  const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
+  if (size == -1) {
+    if (errno == EPIPE || errno == EAGAIN || errno == ESHUTDOWN ||
+        errno == EINTR) {
+      return false;
+    }
+    PLOG(FATAL) << "sendmsg on sctp socket failed";
+    return false;
+  }
+  CHECK_EQ(static_cast<ssize_t>(data.size()), size);
+  VLOG(1) << "Sent " << data.size();
+  return true;
+}
+
+aos::unique_c_ptr<Message> SctpReadWrite::ReadMessage() {
+  CHECK(fd_ != -1);
   aos::unique_c_ptr<Message> result(
-      reinterpret_cast<Message *>(malloc(sizeof(Message) + max_size + 1)));
+      reinterpret_cast<Message *>(malloc(sizeof(Message) + max_size_ + 1)));
   result->size = 0;
 
   int count = 0;
@@ -197,7 +281,7 @@
     memset(&inmessage, 0, sizeof(struct msghdr));
 
     struct iovec iov;
-    iov.iov_len = max_size + 1 - result->size;
+    iov.iov_len = max_size_ + 1 - result->size;
     iov.iov_base = result->mutable_data() + result->size;
 
     inmessage.msg_iov = &iov;
@@ -211,7 +295,7 @@
     inmessage.msg_name = &result->sin;
 
     ssize_t size;
-    PCHECK((size = recvmsg(fd, &inmessage, 0)) > 0);
+    PCHECK((size = recvmsg(fd_, &inmessage, 0)) > 0);
 
     if (count > 0) {
       VLOG(1) << "Count: " << count;
@@ -250,8 +334,9 @@
     CHECK_NE(last_flags & MSG_CTRUNC, MSG_CTRUNC)
         << ": Control message truncated.";
 
-    CHECK_LE(result->size, max_size) << ": Message overflowed buffer on stream "
-                                     << result->header.rcvinfo.rcv_sid << ".";
+    CHECK_LE(result->size, max_size_)
+        << ": Message overflowed buffer on stream "
+        << result->header.rcvinfo.rcv_sid << ".";
   }
 
   result->partial_deliveries = count - 1;
@@ -268,6 +353,34 @@
   return result;
 }
 
+void SctpReadWrite::CloseSocket() {
+  if (fd_ == -1) {
+    return;
+  }
+  LOG(INFO) << "close(" << fd_ << ")";
+  PCHECK(close(fd_) == 0);
+  fd_ = -1;
+}
+
+void SctpReadWrite::DoSetMaxSize() {
+  // Have the kernel give us a factor of 10 more.  This lets us have more than
+  // one full sized packet in flight.
+  size_t max_size = max_size_ * 10;
+
+  CHECK_GE(ReadRMemMax(), max_size)
+      << "rmem_max is too low. To increase rmem_max temporarily, do sysctl "
+         "-w net.core.rmem_max="
+      << max_size;
+  CHECK_GE(ReadWMemMax(), max_size)
+      << "wmem_max is too low. To increase wmem_max temporarily, do sysctl "
+         "-w net.core.wmem_max="
+      << max_size;
+  PCHECK(setsockopt(fd(), SOL_SOCKET, SO_RCVBUF, &max_size, sizeof(max_size)) ==
+         0);
+  PCHECK(setsockopt(fd(), SOL_SOCKET, SO_SNDBUF, &max_size, sizeof(max_size)) ==
+         0);
+}
+
 void Message::LogRcvInfo() const {
   LOG(INFO) << "\tSNDRCV (stream=" << header.rcvinfo.rcv_sid
             << " ssn=" << header.rcvinfo.rcv_ssn
diff --git a/aos/network/sctp_lib.h b/aos/network/sctp_lib.h
index 265d1f5..427f828 100644
--- a/aos/network/sctp_lib.h
+++ b/aos/network/sctp_lib.h
@@ -5,6 +5,7 @@
 #include <netinet/sctp.h>
 
 #include <memory>
+#include <optional>
 #include <string>
 #include <string_view>
 
@@ -71,8 +72,45 @@
 // Gets and logs the contents of the sctp_status message.
 void LogSctpStatus(int fd, sctp_assoc_t assoc_id);
 
-// Read and allocate a message.
-aos::unique_c_ptr<Message> ReadSctpMessage(int fd, size_t max_size);
+// Manages reading and writing SCTP messages.
+class SctpReadWrite {
+ public:
+  SctpReadWrite() = default;
+  ~SctpReadWrite() { CloseSocket(); }
+
+  // Opens a new socket.
+  void OpenSocket(const struct sockaddr_storage &sockaddr_local);
+
+  // Sends a message to the kernel.
+  // Returns true for success. Will not send a partial message on failure.
+  bool SendMessage(int stream, std::string_view data, int time_to_live,
+                   std::optional<struct sockaddr_storage> sockaddr_remote,
+                   sctp_assoc_t snd_assoc_id);
+
+  // Reads from the kernel until a complete message is received or it blocks.
+  // Returns nullptr if the kernel blocks before returning a complete message.
+  aos::unique_c_ptr<Message> ReadMessage();
+
+  int fd() const { return fd_; }
+
+  void SetMaxSize(size_t max_size) {
+    max_size_ = max_size;
+    if (fd_ != -1) {
+      DoSetMaxSize();
+    }
+  }
+
+ private:
+  void CloseSocket();
+  void DoSetMaxSize();
+
+  int fd_ = -1;
+
+  // We use this as a unique identifier that just increments for each message.
+  uint32_t send_ppid_ = 0;
+
+  size_t max_size_ = 1000;
+};
 
 // Returns the max network buffer available for reading for a socket.
 size_t ReadRMemMax();
diff --git a/aos/network/sctp_server.cc b/aos/network/sctp_server.cc
index 47a5719..327f5a0 100644
--- a/aos/network/sctp_server.cc
+++ b/aos/network/sctp_server.cc
@@ -23,10 +23,7 @@
 SctpServer::SctpServer(std::string_view local_host, int local_port)
     : sockaddr_local_(ResolveSocket(local_host, local_port)) {
   while (true) {
-    fd_ = socket(sockaddr_local_.ss_family, SOCK_SEQPACKET, IPPROTO_SCTP);
-    LOG(INFO) << "socket(" << Family(sockaddr_local_)
-              << ", SOCK_SEQPACKET, IPPROTOSCTP) = " << fd_;
-    PCHECK(fd_ != -1);
+    sctp_.OpenSocket(sockaddr_local_);
 
     {
       struct sctp_event_subscribe subscribe;
@@ -35,60 +32,42 @@
       subscribe.sctp_send_failure_event = 1;
       subscribe.sctp_partial_delivery_event = 1;
 
-      PCHECK(setsockopt(fd_, SOL_SCTP, SCTP_EVENTS, (char *)&subscribe,
+      PCHECK(setsockopt(fd(), SOL_SCTP, SCTP_EVENTS, (char *)&subscribe,
                         sizeof(subscribe)) == 0);
     }
     {
-      // Enable recvinfo when a packet arrives.
-      int on = 1;
-      PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_RECVRCVINFO, &on,
-                        sizeof(int)) == 0);
-    }
-    {
-      // Per https://tools.ietf.org/html/rfc6458
-      // Setting this to !0 allows event notifications to be interleaved
-      // with data if enabled, and would have to be handled in the code.
-      // Enabling interleaving would only matter during congestion, which
-      // typically only happens during application startup.
-      int interleaving = 0;
-      PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE,
-                        &interleaving, sizeof(interleaving)) == 0);
-    }
-    {
       // Turn off the NAGLE algorithm.
       int on = 1;
-      PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_NODELAY, &on, sizeof(int)) ==
+      PCHECK(setsockopt(fd(), IPPROTO_SCTP, SCTP_NODELAY, &on, sizeof(int)) ==
              0);
     }
 
     {
       int on = 1;
-      PCHECK(setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(int)) == 0);
+      LOG(INFO) << "setsockopt(" << fd()
+                << ", SOL_SOCKET, SO_REUSEADDR, &on, sizeof(int)";
+      PCHECK(setsockopt(fd(), SOL_SOCKET, SO_REUSEADDR, &on, sizeof(int)) == 0);
     }
 
     // And go!
-    if (bind(fd_, (struct sockaddr *)&sockaddr_local_,
+    if (bind(fd(), (struct sockaddr *)&sockaddr_local_,
              sockaddr_local_.ss_family == AF_INET6
                  ? sizeof(struct sockaddr_in6)
                  : sizeof(struct sockaddr_in)) != 0) {
       PLOG(ERROR) << "Failed to bind, retrying";
-      close(fd_);
+      close(fd());
       std::this_thread::sleep_for(std::chrono::seconds(5));
       continue;
     }
-    LOG(INFO) << "bind(" << fd_ << ", " << Address(sockaddr_local_) << ")";
+    LOG(INFO) << "bind(" << fd() << ", " << Address(sockaddr_local_) << ")";
 
-    PCHECK(listen(fd_, 100) == 0);
+    PCHECK(listen(fd(), 100) == 0);
 
     SetMaxSize(1000);
     break;
   }
 }
 
-aos::unique_c_ptr<Message> SctpServer::Read() {
-  return ReadSctpMessage(fd_, max_size_);
-}
-
 bool SctpServer::Abort(sctp_assoc_t snd_assoc_id) {
   // Use the assoc_id for the destination instead of the msg_name.
   struct msghdr outmsg;
@@ -109,13 +88,12 @@
 
   struct sctp_sndrcvinfo *sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
   memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
-  sinfo->sinfo_ppid = ++ppid_;
   sinfo->sinfo_stream = 0;
   sinfo->sinfo_flags = SCTP_ABORT;
   sinfo->sinfo_assoc_id = snd_assoc_id;
 
   // And send.
-  const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
+  const ssize_t size = sendmsg(fd(), &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
   if (size == -1) {
     if (errno == EPIPE || errno == EAGAIN || errno == ESHUTDOWN) {
       return false;
@@ -127,53 +105,6 @@
   }
 }
 
-bool SctpServer::Send(std::string_view data, sctp_assoc_t snd_assoc_id,
-                      int stream, int timetolive) {
-  struct iovec iov;
-  iov.iov_base = const_cast<char *>(data.data());
-  iov.iov_len = data.size();
-
-  // Use the assoc_id for the destination instead of the msg_name.
-  struct msghdr outmsg;
-  outmsg.msg_namelen = 0;
-
-  // Data to send.
-  outmsg.msg_iov = &iov;
-  outmsg.msg_iovlen = 1;
-
-  // Build up the sndinfo message.
-  char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
-  outmsg.msg_control = outcmsg;
-  outmsg.msg_controllen = CMSG_SPACE(sizeof(struct sctp_sndrcvinfo));
-  outmsg.msg_flags = 0;
-
-  struct cmsghdr *cmsg = CMSG_FIRSTHDR(&outmsg);
-  cmsg->cmsg_level = IPPROTO_SCTP;
-  cmsg->cmsg_type = SCTP_SNDRCV;
-  cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
-
-  struct sctp_sndrcvinfo *sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
-  memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
-  sinfo->sinfo_ppid = ++ppid_;
-  sinfo->sinfo_stream = stream;
-  sinfo->sinfo_flags = 0;
-  sinfo->sinfo_assoc_id = snd_assoc_id;
-  sinfo->sinfo_timetolive = timetolive;
-
-  // And send.
-  const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
-  if (size == -1) {
-    if (errno == EPIPE || errno == EAGAIN || errno == ESHUTDOWN) {
-      return false;
-    }
-    PCHECK(size == static_cast<ssize_t>(data.size()));
-    return false;
-  } else {
-    CHECK_EQ(static_cast<ssize_t>(data.size()), size);
-    return true;
-  }
-}
-
 void SctpServer::SetPriorityScheduler(sctp_assoc_t assoc_id) {
   struct sctp_assoc_value scheduler;
   memset(&scheduler, 0, sizeof(scheduler));
diff --git a/aos/network/sctp_server.h b/aos/network/sctp_server.h
index d995c55..c6737d6 100644
--- a/aos/network/sctp_server.h
+++ b/aos/network/sctp_server.h
@@ -24,23 +24,23 @@
  public:
   SctpServer(std::string_view local_host = "0.0.0.0", int local_port = 9971);
 
-  ~SctpServer() {
-    LOG(INFO) << "close(" << fd_ << ")";
-    PCHECK(close(fd_) == 0);
-  }
+  ~SctpServer() {}
 
   // Receives the next packet from the remote.
-  aos::unique_c_ptr<Message> Read();
+  aos::unique_c_ptr<Message> Read() { return sctp_.ReadMessage(); }
 
   // Sends a block of data to a client on a stream with a TTL.  Returns true on
   // success.
   bool Send(std::string_view data, sctp_assoc_t snd_assoc_id, int stream,
-            int timetolive);
+            int time_to_live) {
+    return sctp_.SendMessage(stream, data, time_to_live, std::nullopt,
+                             snd_assoc_id);
+  }
 
   // Aborts a connection.  Returns true on success.
   bool Abort(sctp_assoc_t snd_assoc_id);
 
-  int fd() { return fd_; }
+  int fd() { return sctp_.fd(); }
 
   // Enables the priority scheduler.  This is a SCTP feature which lets us
   // configure the priority per stream so that higher priority packets don't get
@@ -51,27 +51,11 @@
   void SetStreamPriority(sctp_assoc_t assoc_id, int stream_id,
                          uint16_t priority);
 
-  void SetMaxSize(size_t max_size) {
-    max_size_ = max_size;
-    // Have the kernel give us a factor of 10 more.  This lets us have more than
-    // one full sized packet in flight.
-    max_size = max_size * 10;
-
-    CHECK_GE(ReadRMemMax(), max_size);
-    CHECK_GE(ReadWMemMax(), max_size);
-    PCHECK(setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &max_size,
-                      sizeof(max_size)) == 0);
-    PCHECK(setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &max_size,
-                      sizeof(max_size)) == 0);
-  }
+  void SetMaxSize(size_t max_size) { sctp_.SetMaxSize(max_size); }
 
  private:
   struct sockaddr_storage sockaddr_local_;
-  int fd_;
-
-  size_t max_size_ = 1000;
-
-  int ppid_ = 1;
+  SctpReadWrite sctp_;
 };
 
 }  // namespace message_bridge