Move ReadSctpMessage to a class
In preparation for reassembling partial messages in userspace.
Change-Id: Ifa530698058ea775362eee4ec1bf9e6e0d3dd5de
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/network/sctp_client.cc b/aos/network/sctp_client.cc
index 9e77a84..113b525 100644
--- a/aos/network/sctp_client.cc
+++ b/aos/network/sctp_client.cc
@@ -19,42 +19,23 @@
SctpClient::SctpClient(std::string_view remote_host, int remote_port,
int streams, std::string_view local_host, int local_port)
: sockaddr_remote_(ResolveSocket(remote_host, remote_port)),
- sockaddr_local_(ResolveSocket(local_host, local_port)),
- fd_(socket(sockaddr_local_.ss_family, SOCK_SEQPACKET, IPPROTO_SCTP)) {
- LOG(INFO) << "socket(" << Family(sockaddr_local_)
- << ", SOCK_SEQPACKET, IPPROTOSCTP) = " << fd_;
- PCHECK(fd_ != -1);
-
- {
- // Per https://tools.ietf.org/html/rfc6458
- // Setting this to !0 allows event notifications to be interleaved
- // with data if enabled, and would have to be handled in the code.
- // Enabling interleaving would only matter during congestion, which
- // typically only happens during application startup.
- int interleaving = 0;
- PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE,
- &interleaving, sizeof(interleaving)) == 0);
- }
+ sockaddr_local_(ResolveSocket(local_host, local_port)) {
+ sctp_.OpenSocket(sockaddr_local_);
{
struct sctp_initmsg initmsg;
memset(&initmsg, 0, sizeof(struct sctp_initmsg));
initmsg.sinit_num_ostreams = streams;
initmsg.sinit_max_instreams = streams;
- PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
+ PCHECK(setsockopt(fd(), IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
sizeof(struct sctp_initmsg)) == 0);
}
{
- int on = 1;
- PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_RECVRCVINFO, &on, sizeof(int)) ==
- 0);
- }
- {
// Servers send promptly. Clients don't.
// TODO(austin): Revisit this assumption when we have time sync.
int on = 0;
- PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_NODELAY, &on, sizeof(int)) == 0);
+ PCHECK(setsockopt(fd(), IPPROTO_SCTP, SCTP_NODELAY, &on, sizeof(int)) == 0);
}
{
@@ -64,70 +45,15 @@
memset(&subscribe, 0, sizeof(subscribe));
subscribe.sctp_association_event = 1;
subscribe.sctp_stream_change_event = 1;
- PCHECK(setsockopt(fd_, SOL_SCTP, SCTP_EVENTS, (char *)&subscribe,
+ PCHECK(setsockopt(fd(), SOL_SCTP, SCTP_EVENTS, (char *)&subscribe,
sizeof(subscribe)) == 0);
}
- PCHECK(bind(fd_, (struct sockaddr *)&sockaddr_local_,
+ PCHECK(bind(fd(), (struct sockaddr *)&sockaddr_local_,
sockaddr_local_.ss_family == AF_INET6
? sizeof(struct sockaddr_in6)
: sizeof(struct sockaddr_in)) == 0);
- VLOG(1) << "bind(" << fd_ << ", " << Address(sockaddr_local_) << ")";
-}
-
-aos::unique_c_ptr<Message> SctpClient::Read() {
- return ReadSctpMessage(fd_, max_size_);
-}
-
-bool SctpClient::Send(int stream, std::string_view data, int time_to_live) {
- struct iovec iov;
- iov.iov_base = const_cast<char *>(data.data());
- iov.iov_len = data.size();
-
- struct msghdr outmsg;
- // Target to send to.
- outmsg.msg_name = &sockaddr_remote_;
- outmsg.msg_namelen = sizeof(struct sockaddr_storage);
- VLOG(1) << "Sending to " << Address(sockaddr_remote_);
-
- // Data to send.
- outmsg.msg_iov = &iov;
- outmsg.msg_iovlen = 1;
-
- // Build up the sndinfo message.
- char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
- outmsg.msg_control = outcmsg;
- outmsg.msg_controllen = sizeof(outcmsg);
- outmsg.msg_flags = 0;
-
- struct cmsghdr *cmsg = CMSG_FIRSTHDR(&outmsg);
- cmsg->cmsg_level = IPPROTO_SCTP;
- cmsg->cmsg_type = SCTP_SNDRCV;
- cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
-
- outmsg.msg_controllen = cmsg->cmsg_len;
- struct sctp_sndrcvinfo *sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
- memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
- sinfo->sinfo_ppid = rand();
- sinfo->sinfo_stream = stream;
- sinfo->sinfo_context = 19;
- sinfo->sinfo_flags = 0;
- sinfo->sinfo_timetolive = time_to_live;
-
- // And send.
- const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
- if (size == -1) {
- if (errno != EPIPE && errno != EAGAIN && errno != ESHUTDOWN) {
- PCHECK(size == static_cast<ssize_t>(data.size()));
- } else {
- return false;
- }
- } else {
- CHECK_EQ(static_cast<ssize_t>(data.size()), size);
- }
-
- VLOG(1) << "Sent " << data.size();
- return true;
+ VLOG(1) << "bind(" << fd() << ", " << Address(sockaddr_local_) << ")";
}
void SctpClient::LogSctpStatus(sctp_assoc_t assoc_id) {
diff --git a/aos/network/sctp_client.h b/aos/network/sctp_client.h
index bc3d1f6..9356612 100644
--- a/aos/network/sctp_client.h
+++ b/aos/network/sctp_client.h
@@ -18,18 +18,18 @@
SctpClient(std::string_view remote_host, int remote_port, int streams,
std::string_view local_host = "0.0.0.0", int local_port = 9971);
- ~SctpClient() {
- LOG(INFO) << "close(" << fd_ << ")";
- PCHECK(close(fd_) == 0);
- }
+ ~SctpClient() {}
// Receives the next packet from the remote.
- aos::unique_c_ptr<Message> Read();
+ aos::unique_c_ptr<Message> Read() { return sctp_.ReadMessage(); }
// Sends a block of data on a stream with a TTL.
- bool Send(int stream, std::string_view data, int time_to_live);
+ // TODO(austin): time_to_live should be a chrono::duration
+ bool Send(int stream, std::string_view data, int time_to_live) {
+ return sctp_.SendMessage(stream, data, time_to_live, sockaddr_remote_, 0);
+ }
- int fd() { return fd_; }
+ int fd() { return sctp_.fd(); }
// Enables the priority scheduler. This is a SCTP feature which lets us
// configure the priority per stream so that higher priority packets don't get
@@ -43,30 +43,12 @@
void LogSctpStatus(sctp_assoc_t assoc_id);
- void SetMaxSize(size_t max_size) {
- max_size_ = max_size;
- // Have the kernel give us a factor of 10 more. This lets us have more than
- // one full sized packet in flight.
- max_size = max_size * 10;
-
- CHECK_GE(ReadRMemMax(), max_size)
- << "rmem_max is too low. To increase rmem_max temporarily, do sysctl "
- "-w net.core.rmem_max=NEW_SIZE";
- CHECK_GE(ReadWMemMax(), max_size)
- << "wmem_max is too low. To increase wmem_max temporarily, do sysctl "
- "-w net.core.wmem_max=NEW_SIZE";
- PCHECK(setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &max_size,
- sizeof(max_size)) == 0);
- PCHECK(setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &max_size,
- sizeof(max_size)) == 0);
- }
+ void SetMaxSize(size_t max_size) { sctp_.SetMaxSize(max_size); }
private:
struct sockaddr_storage sockaddr_remote_;
struct sockaddr_storage sockaddr_local_;
- int fd_;
-
- size_t max_size_ = 1000;
+ SctpReadWrite sctp_;
};
} // namespace message_bridge
diff --git a/aos/network/sctp_lib.cc b/aos/network/sctp_lib.cc
index c5e2080..07bc86c 100644
--- a/aos/network/sctp_lib.cc
+++ b/aos/network/sctp_lib.cc
@@ -185,9 +185,93 @@
<< " sstat_primary.spinfo_rto:" << status.sstat_primary.spinfo_rto;
}
-aos::unique_c_ptr<Message> ReadSctpMessage(int fd, size_t max_size) {
+void SctpReadWrite::OpenSocket(const struct sockaddr_storage &sockaddr_local) {
+ fd_ = socket(sockaddr_local.ss_family, SOCK_SEQPACKET, IPPROTO_SCTP);
+ PCHECK(fd_ != -1);
+ LOG(INFO) << "socket(" << Family(sockaddr_local)
+ << ", SOCK_SEQPACKET, IPPROTOSCTP) = " << fd_;
+ {
+ // Per https://tools.ietf.org/html/rfc6458
+ // Setting this to !0 allows event notifications to be interleaved
+ // with data if enabled, and would have to be handled in the code.
+ // Enabling interleaving would only matter during congestion, which
+ // typically only happens during application startup.
+ int interleaving = 0;
+ PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE,
+ &interleaving, sizeof(interleaving)) == 0);
+ }
+ {
+ // Enable recvinfo when a packet arrives.
+ int on = 1;
+ PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_RECVRCVINFO, &on, sizeof(int)) ==
+ 0);
+ }
+
+ DoSetMaxSize();
+}
+
+bool SctpReadWrite::SendMessage(
+ int stream, std::string_view data, int time_to_live,
+ std::optional<struct sockaddr_storage> sockaddr_remote,
+ sctp_assoc_t snd_assoc_id) {
+ CHECK(fd_ != -1);
+ struct iovec iov;
+ iov.iov_base = const_cast<char *>(data.data());
+ iov.iov_len = data.size();
+
+ // Use the assoc_id for the destination instead of the msg_name.
+ struct msghdr outmsg;
+ if (sockaddr_remote) {
+ outmsg.msg_name = &*sockaddr_remote;
+ outmsg.msg_namelen = sizeof(*sockaddr_remote);
+ VLOG(1) << "Sending to " << Address(*sockaddr_remote);
+ } else {
+ outmsg.msg_namelen = 0;
+ }
+
+ // Data to send.
+ outmsg.msg_iov = &iov;
+ outmsg.msg_iovlen = 1;
+
+ // Build up the sndinfo message.
+ char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
+ outmsg.msg_control = outcmsg;
+ outmsg.msg_controllen = sizeof(outcmsg);
+ outmsg.msg_flags = 0;
+
+ struct cmsghdr *cmsg = CMSG_FIRSTHDR(&outmsg);
+ cmsg->cmsg_level = IPPROTO_SCTP;
+ cmsg->cmsg_type = SCTP_SNDRCV;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
+
+ struct sctp_sndrcvinfo *sinfo =
+ reinterpret_cast<struct sctp_sndrcvinfo *>(CMSG_DATA(cmsg));
+ memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
+ sinfo->sinfo_ppid = ++send_ppid_;
+ sinfo->sinfo_stream = stream;
+ sinfo->sinfo_flags = 0;
+ sinfo->sinfo_assoc_id = snd_assoc_id;
+ sinfo->sinfo_timetolive = time_to_live;
+
+ // And send.
+ const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
+ if (size == -1) {
+ if (errno == EPIPE || errno == EAGAIN || errno == ESHUTDOWN ||
+ errno == EINTR) {
+ return false;
+ }
+ PLOG(FATAL) << "sendmsg on sctp socket failed";
+ return false;
+ }
+ CHECK_EQ(static_cast<ssize_t>(data.size()), size);
+ VLOG(1) << "Sent " << data.size();
+ return true;
+}
+
+aos::unique_c_ptr<Message> SctpReadWrite::ReadMessage() {
+ CHECK(fd_ != -1);
aos::unique_c_ptr<Message> result(
- reinterpret_cast<Message *>(malloc(sizeof(Message) + max_size + 1)));
+ reinterpret_cast<Message *>(malloc(sizeof(Message) + max_size_ + 1)));
result->size = 0;
int count = 0;
@@ -197,7 +281,7 @@
memset(&inmessage, 0, sizeof(struct msghdr));
struct iovec iov;
- iov.iov_len = max_size + 1 - result->size;
+ iov.iov_len = max_size_ + 1 - result->size;
iov.iov_base = result->mutable_data() + result->size;
inmessage.msg_iov = &iov;
@@ -211,7 +295,7 @@
inmessage.msg_name = &result->sin;
ssize_t size;
- PCHECK((size = recvmsg(fd, &inmessage, 0)) > 0);
+ PCHECK((size = recvmsg(fd_, &inmessage, 0)) > 0);
if (count > 0) {
VLOG(1) << "Count: " << count;
@@ -250,8 +334,9 @@
CHECK_NE(last_flags & MSG_CTRUNC, MSG_CTRUNC)
<< ": Control message truncated.";
- CHECK_LE(result->size, max_size) << ": Message overflowed buffer on stream "
- << result->header.rcvinfo.rcv_sid << ".";
+ CHECK_LE(result->size, max_size_)
+ << ": Message overflowed buffer on stream "
+ << result->header.rcvinfo.rcv_sid << ".";
}
result->partial_deliveries = count - 1;
@@ -268,6 +353,34 @@
return result;
}
+void SctpReadWrite::CloseSocket() {
+ if (fd_ == -1) {
+ return;
+ }
+ LOG(INFO) << "close(" << fd_ << ")";
+ PCHECK(close(fd_) == 0);
+ fd_ = -1;
+}
+
+void SctpReadWrite::DoSetMaxSize() {
+ // Have the kernel give us a factor of 10 more. This lets us have more than
+ // one full sized packet in flight.
+ size_t max_size = max_size_ * 10;
+
+ CHECK_GE(ReadRMemMax(), max_size)
+ << "rmem_max is too low. To increase rmem_max temporarily, do sysctl "
+ "-w net.core.rmem_max="
+ << max_size;
+ CHECK_GE(ReadWMemMax(), max_size)
+ << "wmem_max is too low. To increase wmem_max temporarily, do sysctl "
+ "-w net.core.wmem_max="
+ << max_size;
+ PCHECK(setsockopt(fd(), SOL_SOCKET, SO_RCVBUF, &max_size, sizeof(max_size)) ==
+ 0);
+ PCHECK(setsockopt(fd(), SOL_SOCKET, SO_SNDBUF, &max_size, sizeof(max_size)) ==
+ 0);
+}
+
void Message::LogRcvInfo() const {
LOG(INFO) << "\tSNDRCV (stream=" << header.rcvinfo.rcv_sid
<< " ssn=" << header.rcvinfo.rcv_ssn
diff --git a/aos/network/sctp_lib.h b/aos/network/sctp_lib.h
index 265d1f5..427f828 100644
--- a/aos/network/sctp_lib.h
+++ b/aos/network/sctp_lib.h
@@ -5,6 +5,7 @@
#include <netinet/sctp.h>
#include <memory>
+#include <optional>
#include <string>
#include <string_view>
@@ -71,8 +72,45 @@
// Gets and logs the contents of the sctp_status message.
void LogSctpStatus(int fd, sctp_assoc_t assoc_id);
-// Read and allocate a message.
-aos::unique_c_ptr<Message> ReadSctpMessage(int fd, size_t max_size);
+// Manages reading and writing SCTP messages.
+class SctpReadWrite {
+ public:
+ SctpReadWrite() = default;
+ ~SctpReadWrite() { CloseSocket(); }
+
+ // Opens a new socket.
+ void OpenSocket(const struct sockaddr_storage &sockaddr_local);
+
+ // Sends a message to the kernel.
+ // Returns true for success. Will not send a partial message on failure.
+ bool SendMessage(int stream, std::string_view data, int time_to_live,
+ std::optional<struct sockaddr_storage> sockaddr_remote,
+ sctp_assoc_t snd_assoc_id);
+
+ // Reads from the kernel until a complete message is received or it blocks.
+ // Returns nullptr if the kernel blocks before returning a complete message.
+ aos::unique_c_ptr<Message> ReadMessage();
+
+ int fd() const { return fd_; }
+
+ void SetMaxSize(size_t max_size) {
+ max_size_ = max_size;
+ if (fd_ != -1) {
+ DoSetMaxSize();
+ }
+ }
+
+ private:
+ void CloseSocket();
+ void DoSetMaxSize();
+
+ int fd_ = -1;
+
+ // We use this as a unique identifier that just increments for each message.
+ uint32_t send_ppid_ = 0;
+
+ size_t max_size_ = 1000;
+};
// Returns the max network buffer available for reading for a socket.
size_t ReadRMemMax();
diff --git a/aos/network/sctp_server.cc b/aos/network/sctp_server.cc
index 47a5719..327f5a0 100644
--- a/aos/network/sctp_server.cc
+++ b/aos/network/sctp_server.cc
@@ -23,10 +23,7 @@
SctpServer::SctpServer(std::string_view local_host, int local_port)
: sockaddr_local_(ResolveSocket(local_host, local_port)) {
while (true) {
- fd_ = socket(sockaddr_local_.ss_family, SOCK_SEQPACKET, IPPROTO_SCTP);
- LOG(INFO) << "socket(" << Family(sockaddr_local_)
- << ", SOCK_SEQPACKET, IPPROTOSCTP) = " << fd_;
- PCHECK(fd_ != -1);
+ sctp_.OpenSocket(sockaddr_local_);
{
struct sctp_event_subscribe subscribe;
@@ -35,60 +32,42 @@
subscribe.sctp_send_failure_event = 1;
subscribe.sctp_partial_delivery_event = 1;
- PCHECK(setsockopt(fd_, SOL_SCTP, SCTP_EVENTS, (char *)&subscribe,
+ PCHECK(setsockopt(fd(), SOL_SCTP, SCTP_EVENTS, (char *)&subscribe,
sizeof(subscribe)) == 0);
}
{
- // Enable recvinfo when a packet arrives.
- int on = 1;
- PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_RECVRCVINFO, &on,
- sizeof(int)) == 0);
- }
- {
- // Per https://tools.ietf.org/html/rfc6458
- // Setting this to !0 allows event notifications to be interleaved
- // with data if enabled, and would have to be handled in the code.
- // Enabling interleaving would only matter during congestion, which
- // typically only happens during application startup.
- int interleaving = 0;
- PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE,
- &interleaving, sizeof(interleaving)) == 0);
- }
- {
// Turn off the NAGLE algorithm.
int on = 1;
- PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_NODELAY, &on, sizeof(int)) ==
+ PCHECK(setsockopt(fd(), IPPROTO_SCTP, SCTP_NODELAY, &on, sizeof(int)) ==
0);
}
{
int on = 1;
- PCHECK(setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(int)) == 0);
+ LOG(INFO) << "setsockopt(" << fd()
+ << ", SOL_SOCKET, SO_REUSEADDR, &on, sizeof(int)";
+ PCHECK(setsockopt(fd(), SOL_SOCKET, SO_REUSEADDR, &on, sizeof(int)) == 0);
}
// And go!
- if (bind(fd_, (struct sockaddr *)&sockaddr_local_,
+ if (bind(fd(), (struct sockaddr *)&sockaddr_local_,
sockaddr_local_.ss_family == AF_INET6
? sizeof(struct sockaddr_in6)
: sizeof(struct sockaddr_in)) != 0) {
PLOG(ERROR) << "Failed to bind, retrying";
- close(fd_);
+ close(fd());
std::this_thread::sleep_for(std::chrono::seconds(5));
continue;
}
- LOG(INFO) << "bind(" << fd_ << ", " << Address(sockaddr_local_) << ")";
+ LOG(INFO) << "bind(" << fd() << ", " << Address(sockaddr_local_) << ")";
- PCHECK(listen(fd_, 100) == 0);
+ PCHECK(listen(fd(), 100) == 0);
SetMaxSize(1000);
break;
}
}
-aos::unique_c_ptr<Message> SctpServer::Read() {
- return ReadSctpMessage(fd_, max_size_);
-}
-
bool SctpServer::Abort(sctp_assoc_t snd_assoc_id) {
// Use the assoc_id for the destination instead of the msg_name.
struct msghdr outmsg;
@@ -109,13 +88,12 @@
struct sctp_sndrcvinfo *sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
- sinfo->sinfo_ppid = ++ppid_;
sinfo->sinfo_stream = 0;
sinfo->sinfo_flags = SCTP_ABORT;
sinfo->sinfo_assoc_id = snd_assoc_id;
// And send.
- const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
+ const ssize_t size = sendmsg(fd(), &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
if (size == -1) {
if (errno == EPIPE || errno == EAGAIN || errno == ESHUTDOWN) {
return false;
@@ -127,53 +105,6 @@
}
}
-bool SctpServer::Send(std::string_view data, sctp_assoc_t snd_assoc_id,
- int stream, int timetolive) {
- struct iovec iov;
- iov.iov_base = const_cast<char *>(data.data());
- iov.iov_len = data.size();
-
- // Use the assoc_id for the destination instead of the msg_name.
- struct msghdr outmsg;
- outmsg.msg_namelen = 0;
-
- // Data to send.
- outmsg.msg_iov = &iov;
- outmsg.msg_iovlen = 1;
-
- // Build up the sndinfo message.
- char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
- outmsg.msg_control = outcmsg;
- outmsg.msg_controllen = CMSG_SPACE(sizeof(struct sctp_sndrcvinfo));
- outmsg.msg_flags = 0;
-
- struct cmsghdr *cmsg = CMSG_FIRSTHDR(&outmsg);
- cmsg->cmsg_level = IPPROTO_SCTP;
- cmsg->cmsg_type = SCTP_SNDRCV;
- cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
-
- struct sctp_sndrcvinfo *sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
- memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
- sinfo->sinfo_ppid = ++ppid_;
- sinfo->sinfo_stream = stream;
- sinfo->sinfo_flags = 0;
- sinfo->sinfo_assoc_id = snd_assoc_id;
- sinfo->sinfo_timetolive = timetolive;
-
- // And send.
- const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
- if (size == -1) {
- if (errno == EPIPE || errno == EAGAIN || errno == ESHUTDOWN) {
- return false;
- }
- PCHECK(size == static_cast<ssize_t>(data.size()));
- return false;
- } else {
- CHECK_EQ(static_cast<ssize_t>(data.size()), size);
- return true;
- }
-}
-
void SctpServer::SetPriorityScheduler(sctp_assoc_t assoc_id) {
struct sctp_assoc_value scheduler;
memset(&scheduler, 0, sizeof(scheduler));
diff --git a/aos/network/sctp_server.h b/aos/network/sctp_server.h
index d995c55..c6737d6 100644
--- a/aos/network/sctp_server.h
+++ b/aos/network/sctp_server.h
@@ -24,23 +24,23 @@
public:
SctpServer(std::string_view local_host = "0.0.0.0", int local_port = 9971);
- ~SctpServer() {
- LOG(INFO) << "close(" << fd_ << ")";
- PCHECK(close(fd_) == 0);
- }
+ ~SctpServer() {}
// Receives the next packet from the remote.
- aos::unique_c_ptr<Message> Read();
+ aos::unique_c_ptr<Message> Read() { return sctp_.ReadMessage(); }
// Sends a block of data to a client on a stream with a TTL. Returns true on
// success.
bool Send(std::string_view data, sctp_assoc_t snd_assoc_id, int stream,
- int timetolive);
+ int time_to_live) {
+ return sctp_.SendMessage(stream, data, time_to_live, std::nullopt,
+ snd_assoc_id);
+ }
// Aborts a connection. Returns true on success.
bool Abort(sctp_assoc_t snd_assoc_id);
- int fd() { return fd_; }
+ int fd() { return sctp_.fd(); }
// Enables the priority scheduler. This is a SCTP feature which lets us
// configure the priority per stream so that higher priority packets don't get
@@ -51,27 +51,11 @@
void SetStreamPriority(sctp_assoc_t assoc_id, int stream_id,
uint16_t priority);
- void SetMaxSize(size_t max_size) {
- max_size_ = max_size;
- // Have the kernel give us a factor of 10 more. This lets us have more than
- // one full sized packet in flight.
- max_size = max_size * 10;
-
- CHECK_GE(ReadRMemMax(), max_size);
- CHECK_GE(ReadWMemMax(), max_size);
- PCHECK(setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &max_size,
- sizeof(max_size)) == 0);
- PCHECK(setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &max_size,
- sizeof(max_size)) == 0);
- }
+ void SetMaxSize(size_t max_size) { sctp_.SetMaxSize(max_size); }
private:
struct sockaddr_storage sockaddr_local_;
- int fd_;
-
- size_t max_size_ = 1000;
-
- int ppid_ = 1;
+ SctpReadWrite sctp_;
};
} // namespace message_bridge