blob: 77cad02930bf29a11c89d79313e9163dee657c4a [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include "aos/network/sctp_lib.h"
2
3#include <arpa/inet.h>
Adam Snaiderbe263512023-05-18 20:40:23 -07004#include <linux/sctp.h>
Austin Schuhe84c3ed2019-12-14 15:29:48 -08005#include <net/if.h>
6#include <netdb.h>
Adam Snaiderbe263512023-05-18 20:40:23 -07007#include <sys/socket.h>
Austin Schuh2fe4b712020-03-15 14:21:45 -07008#include <sys/stat.h>
9#include <sys/types.h>
10#include <unistd.h>
Austin Schuhe84c3ed2019-12-14 15:29:48 -080011
Austin Schuha705d782021-07-31 20:40:00 -070012#include <algorithm>
Austin Schuhe84c3ed2019-12-14 15:29:48 -080013#include <string_view>
14
Austin Schuh2fe4b712020-03-15 14:21:45 -070015#include "aos/util/file.h"
16
Austin Schuh0a0a8272021-12-08 13:19:32 -080017DEFINE_string(interface, "", "network interface");
Brian Silverman0c6d44e2021-11-10 12:27:49 -080018DEFINE_bool(disable_ipv6, false, "disable ipv6");
Austin Schuh9dd8f592021-12-25 14:32:43 -080019DEFINE_int32(rmem, 0, "If nonzero, set rmem to this size.");
Austin Schuhe84c3ed2019-12-14 15:29:48 -080020
21namespace aos {
22namespace message_bridge {
23
24namespace {
25const char *sac_state_tbl[] = {"COMMUNICATION_UP", "COMMUNICATION_LOST",
26 "RESTART", "SHUTDOWN_COMPLETE",
Sarah Newman4aeb2372022-04-06 13:07:11 -070027 "CANT_START_ASSOCIATION"};
Austin Schuhe84c3ed2019-12-14 15:29:48 -080028
29typedef union {
30 struct sctp_initmsg init;
31 struct sctp_sndrcvinfo sndrcvinfo;
32} _sctp_cmsg_data_t;
33
Adam Snaider96a0f4b2023-05-18 20:41:19 -070034// Returns true if SCTP authentication is available and enabled.
35bool SctpAuthIsEnabled() {
36#if HAS_SCTP_AUTH
37 struct stat current_stat;
38 if (stat("/proc/sys/net/sctp/auth_enable", &current_stat) != -1) {
39 int value = std::stoi(
40 util::ReadFileToStringOrDie("/proc/sys/net/sctp/auth_enable"));
41 CHECK(value == 0 || value == 1)
42 << "Unknown auth enable sysctl value: " << value;
43 return value == 1;
44 } else {
45 LOG(WARNING) << "/proc/sys/net/sctp/auth_enable doesn't exist.";
46 return false;
47 }
48#else
49 return false;
50#endif
51}
52
Austin Schuhe84c3ed2019-12-14 15:29:48 -080053} // namespace
54
Austin Schuh0a0a8272021-12-08 13:19:32 -080055bool Ipv6Enabled() {
56 if (FLAGS_disable_ipv6) {
57 return false;
58 }
59 int fd = socket(AF_INET6, SOCK_SEQPACKET, IPPROTO_SCTP);
60 if (fd != -1) {
61 close(fd);
62 return true;
63 }
64 switch (errno) {
65 case EAFNOSUPPORT:
66 case EINVAL:
67 case EPROTONOSUPPORT:
68 PLOG(INFO) << "no ipv6";
69 return false;
70 default:
71 PLOG(FATAL) << "Open socket failed";
72 return false;
73 };
74}
75
76struct sockaddr_storage ResolveSocket(std::string_view host, int port,
77 bool use_ipv6) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -080078 struct sockaddr_storage result;
James Kuszmaul784deb72023-02-17 14:42:51 -080079 memset(&result, 0, sizeof(result));
Austin Schuhe84c3ed2019-12-14 15:29:48 -080080 struct addrinfo *addrinfo_result;
81 struct sockaddr_in *t_addr = (struct sockaddr_in *)&result;
82 struct sockaddr_in6 *t_addr6 = (struct sockaddr_in6 *)&result;
Brian Silverman0c6d44e2021-11-10 12:27:49 -080083 struct addrinfo hints;
84 memset(&hints, 0, sizeof(hints));
Austin Schuh0a0a8272021-12-08 13:19:32 -080085 if (!use_ipv6) {
Brian Silverman0c6d44e2021-11-10 12:27:49 -080086 hints.ai_family = AF_INET;
87 } else {
Austin Schuh0a0a8272021-12-08 13:19:32 -080088 // Default to IPv6 as the clearly superior protocol, since it also handles
89 // IPv4.
Brian Silverman0c6d44e2021-11-10 12:27:49 -080090 hints.ai_family = AF_INET6;
91 }
92 hints.ai_socktype = SOCK_SEQPACKET;
93 hints.ai_protocol = IPPROTO_SCTP;
94 // We deliberately avoid AI_ADDRCONFIG here because it breaks running things
95 // inside Bazel's test sandbox, which has no non-localhost IPv4 or IPv6
96 // addresses. Also, it's not really helpful, because most systems will have
97 // link-local addresses of both types with any interface that's up.
98 hints.ai_flags = AI_PASSIVE | AI_V4MAPPED | AI_NUMERICSERV;
99 int ret = getaddrinfo(host.empty() ? nullptr : std::string(host).c_str(),
100 std::to_string(port).c_str(), &hints, &addrinfo_result);
Brian Silverman0c6d44e2021-11-10 12:27:49 -0800101 if (ret == EAI_SYSTEM) {
102 PLOG(FATAL) << "getaddrinfo failed to look up '" << host << "'";
103 } else if (ret != 0) {
104 LOG(FATAL) << "getaddrinfo failed to look up '" << host
105 << "': " << gai_strerror(ret);
106 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800107 switch (addrinfo_result->ai_family) {
108 case AF_INET:
109 memcpy(t_addr, addrinfo_result->ai_addr, addrinfo_result->ai_addrlen);
110 t_addr->sin_family = addrinfo_result->ai_family;
111 t_addr->sin_port = htons(port);
112
113 break;
114 case AF_INET6:
115 memcpy(t_addr6, addrinfo_result->ai_addr, addrinfo_result->ai_addrlen);
116 t_addr6->sin6_family = addrinfo_result->ai_family;
117 t_addr6->sin6_port = htons(port);
118
119 if (FLAGS_interface.size() > 0) {
120 t_addr6->sin6_scope_id = if_nametoindex(FLAGS_interface.c_str());
121 }
122
123 break;
124 }
125
126 // Now print it back out nicely.
127 char host_string[NI_MAXHOST];
128 char service_string[NI_MAXSERV];
129
130 int error = getnameinfo((struct sockaddr *)&result,
131 addrinfo_result->ai_addrlen, host_string, NI_MAXHOST,
132 service_string, NI_MAXSERV, NI_NUMERICHOST);
133
134 if (error) {
135 LOG(ERROR) << "Reverse lookup failed ... " << gai_strerror(error);
136 }
137
138 LOG(INFO) << "remote:addr=" << host_string << ", port=" << service_string
139 << ", family=" << addrinfo_result->ai_family;
140
141 freeaddrinfo(addrinfo_result);
142
143 return result;
144}
145
146std::string_view Family(const struct sockaddr_storage &sockaddr) {
147 if (sockaddr.ss_family == AF_INET) {
148 return "AF_INET";
149 } else if (sockaddr.ss_family == AF_INET6) {
150 return "AF_INET6";
151 } else {
152 return "unknown";
153 }
154}
155std::string Address(const struct sockaddr_storage &sockaddr) {
156 char addrbuf[INET6_ADDRSTRLEN];
157 if (sockaddr.ss_family == AF_INET) {
158 const struct sockaddr_in *sin = (const struct sockaddr_in *)&sockaddr;
159 return std::string(
160 inet_ntop(AF_INET, &sin->sin_addr, addrbuf, INET6_ADDRSTRLEN));
161 } else {
162 const struct sockaddr_in6 *sin6 = (const struct sockaddr_in6 *)&sockaddr;
163 return std::string(
164 inet_ntop(AF_INET6, &sin6->sin6_addr, addrbuf, INET6_ADDRSTRLEN));
165 }
166}
167
168void PrintNotification(const Message *msg) {
169 const union sctp_notification *snp =
170 (const union sctp_notification *)msg->data();
171
172 LOG(INFO) << "Notification:";
173
174 switch (snp->sn_header.sn_type) {
175 case SCTP_ASSOC_CHANGE: {
176 const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
177 LOG(INFO) << "SCTP_ASSOC_CHANGE(" << sac_state_tbl[sac->sac_state] << ")";
178 VLOG(1) << " (assoc_change: state=" << sac->sac_state
179 << ", error=" << sac->sac_error
180 << ", instr=" << sac->sac_inbound_streams
181 << " outstr=" << sac->sac_outbound_streams
182 << ", assoc=" << sac->sac_assoc_id << ")";
183 } break;
184 case SCTP_PEER_ADDR_CHANGE: {
185 const struct sctp_paddr_change *spc = &snp->sn_paddr_change;
186 LOG(INFO) << " SlCTP_PEER_ADDR_CHANGE";
187 VLOG(1) << "\t\t(peer_addr_change: " << Address(spc->spc_aaddr)
188 << " state=" << spc->spc_state << ", error=" << spc->spc_error
189 << ")";
190 } break;
191 case SCTP_SEND_FAILED: {
192 const struct sctp_send_failed *ssf = &snp->sn_send_failed;
193 LOG(INFO) << " SCTP_SEND_FAILED";
194 VLOG(1) << "\t\t(sendfailed: len=" << ssf->ssf_length
195 << " err=" << ssf->ssf_error << ")";
196 } break;
197 case SCTP_REMOTE_ERROR: {
198 const struct sctp_remote_error *sre = &snp->sn_remote_error;
199 LOG(INFO) << " SCTP_REMOTE_ERROR";
200 VLOG(1) << "\t\t(remote_error: err=" << ntohs(sre->sre_error) << ")";
201 } break;
Austin Schuhf7777002020-09-01 18:41:28 -0700202 case SCTP_STREAM_CHANGE_EVENT: {
Austin Schuh62a0c272021-03-31 21:04:53 -0700203 const struct sctp_stream_change_event *sce = &snp->sn_strchange_event;
Austin Schuhf7777002020-09-01 18:41:28 -0700204 LOG(INFO) << " SCTP_STREAM_CHANGE_EVENT";
205 VLOG(1) << "\t\t(stream_change_event: flags=" << sce->strchange_flags
206 << ", assoc_id=" << sce->strchange_assoc_id
207 << ", instrms=" << sce->strchange_instrms
208 << ", outstrms=" << sce->strchange_outstrms << " )";
209 } break;
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800210 case SCTP_SHUTDOWN_EVENT: {
211 LOG(INFO) << " SCTP_SHUTDOWN_EVENT";
212 } break;
213 default:
214 LOG(INFO) << " Unknown type: " << snp->sn_header.sn_type;
215 break;
216 }
217}
218
219std::string GetHostname() {
220 char buf[256];
221 buf[sizeof(buf) - 1] = '\0';
222 PCHECK(gethostname(buf, sizeof(buf) - 1) == 0);
223 return buf;
224}
225
226std::string Message::PeerAddress() const { return Address(sin); }
227
228void LogSctpStatus(int fd, sctp_assoc_t assoc_id) {
229 struct sctp_status status;
230 memset(&status, 0, sizeof(status));
231 status.sstat_assoc_id = assoc_id;
232
233 socklen_t size = sizeof(status);
Adam Snaiderbe263512023-05-18 20:40:23 -0700234 const int result = getsockopt(fd, IPPROTO_SCTP, SCTP_STATUS,
Austin Schuha5f545b2021-07-31 20:39:42 -0700235 reinterpret_cast<void *>(&status), &size);
236 if (result == -1 && errno == EINVAL) {
237 LOG(INFO) << "sctp_status) not associated";
238 return;
239 }
240 PCHECK(result == 0);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800241
242 LOG(INFO) << "sctp_status) sstat_assoc_id:" << status.sstat_assoc_id
243 << " sstat_state:" << status.sstat_state
244 << " sstat_rwnd:" << status.sstat_rwnd
245 << " sstat_unackdata:" << status.sstat_unackdata
246 << " sstat_penddata:" << status.sstat_penddata
247 << " sstat_instrms:" << status.sstat_instrms
248 << " sstat_outstrms:" << status.sstat_outstrms
249 << " sstat_fragmentation_point:" << status.sstat_fragmentation_point
250 << " sstat_primary.spinfo_srtt:" << status.sstat_primary.spinfo_srtt
251 << " sstat_primary.spinfo_rto:" << status.sstat_primary.spinfo_rto;
252}
253
Austin Schuh507f7582021-07-31 20:39:55 -0700254void SctpReadWrite::OpenSocket(const struct sockaddr_storage &sockaddr_local) {
255 fd_ = socket(sockaddr_local.ss_family, SOCK_SEQPACKET, IPPROTO_SCTP);
256 PCHECK(fd_ != -1);
257 LOG(INFO) << "socket(" << Family(sockaddr_local)
258 << ", SOCK_SEQPACKET, IPPROTOSCTP) = " << fd_;
259 {
260 // Per https://tools.ietf.org/html/rfc6458
261 // Setting this to !0 allows event notifications to be interleaved
Austin Schuha705d782021-07-31 20:40:00 -0700262 // with data if enabled. This typically only matters during congestion.
263 // However, Linux seems to interleave under memory pressure regardless of
264 // this being enabled, so we have to handle it in the code anyways, so might
265 // as well turn it on all the time.
266 // TODO(Brian): Change this to 2 once we have kernels that support it, and
267 // also address the TODO in ProcessNotification to match on all the
268 // necessary fields.
269 int interleaving = 1;
Austin Schuh507f7582021-07-31 20:39:55 -0700270 PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE,
271 &interleaving, sizeof(interleaving)) == 0);
272 }
273 {
274 // Enable recvinfo when a packet arrives.
275 int on = 1;
276 PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_RECVRCVINFO, &on, sizeof(int)) ==
277 0);
278 }
279
Austin Schuha705d782021-07-31 20:40:00 -0700280 {
281 // TODO(austin): This is the old style registration... But, the sctp
282 // stack out in the wild for linux is old and primitive.
283 struct sctp_event_subscribe subscribe;
284 memset(&subscribe, 0, sizeof(subscribe));
285 subscribe.sctp_association_event = 1;
286 subscribe.sctp_stream_change_event = 1;
287 subscribe.sctp_partial_delivery_event = 1;
Adam Snaiderbe263512023-05-18 20:40:23 -0700288 PCHECK(setsockopt(fd(), IPPROTO_SCTP, SCTP_EVENTS, (char *)&subscribe,
Austin Schuha705d782021-07-31 20:40:00 -0700289 sizeof(subscribe)) == 0);
290 }
291
Adam Snaider96a0f4b2023-05-18 20:41:19 -0700292 if (!auth_key_.empty()) {
293 CHECK(SctpAuthIsEnabled())
294 << "SCTP Authentication is disabled. Enable it with 'sysctl -w "
295 "net.sctp.auth_enable=1' and try again.";
296#if HAS_SCTP_AUTH
297 // Set up the key with id `1`.
298 sctp_authkey *const authkey =
299 (sctp_authkey *)malloc(sizeof(sctp_authkey) + auth_key_.size());
300 authkey->sca_keynumber = 1;
301 authkey->sca_keylength = auth_key_.size();
302 authkey->sca_assoc_id = SCTP_ALL_ASSOC;
303 memcpy(&authkey->sca_key, auth_key_.data(), auth_key_.size());
304
305 PCHECK(setsockopt(fd(), IPPROTO_SCTP, SCTP_AUTH_KEY, authkey,
306 sizeof(sctp_authkey) + auth_key_.size()) == 0);
307 free(authkey);
308
309 // Set key `1` as active.
310 struct sctp_authkeyid authkeyid;
311 authkeyid.scact_keynumber = 1;
312 authkeyid.scact_assoc_id = SCTP_ALL_ASSOC;
313 PCHECK(setsockopt(fd(), IPPROTO_SCTP, SCTP_AUTH_ACTIVE_KEY, &authkeyid,
314 sizeof(authkeyid)) == 0);
315
316 // Set up authentication for data chunks.
317 struct sctp_authchunk authchunk;
318 authchunk.sauth_chunk = 0;
319
320 PCHECK(setsockopt(fd(), IPPROTO_SCTP, SCTP_AUTH_CHUNK, &authchunk,
321 sizeof(authchunk)) == 0);
322
323 // Disallow the null key.
324 authkeyid.scact_keynumber = 0;
325 PCHECK(setsockopt(fd(), IPPROTO_SCTP, SCTP_AUTH_DELETE_KEY, &authkeyid,
326 sizeof(authkeyid)) == 0);
327#endif
328 }
329
Austin Schuh507f7582021-07-31 20:39:55 -0700330 DoSetMaxSize();
331}
332
333bool SctpReadWrite::SendMessage(
334 int stream, std::string_view data, int time_to_live,
335 std::optional<struct sockaddr_storage> sockaddr_remote,
336 sctp_assoc_t snd_assoc_id) {
337 CHECK(fd_ != -1);
338 struct iovec iov;
339 iov.iov_base = const_cast<char *>(data.data());
340 iov.iov_len = data.size();
341
342 // Use the assoc_id for the destination instead of the msg_name.
343 struct msghdr outmsg;
James Kuszmaul784deb72023-02-17 14:42:51 -0800344 memset(&outmsg, 0, sizeof(outmsg));
Austin Schuh507f7582021-07-31 20:39:55 -0700345 if (sockaddr_remote) {
346 outmsg.msg_name = &*sockaddr_remote;
347 outmsg.msg_namelen = sizeof(*sockaddr_remote);
Sarah Newman4aeb2372022-04-06 13:07:11 -0700348 VLOG(2) << "Sending to " << Address(*sockaddr_remote);
Austin Schuh507f7582021-07-31 20:39:55 -0700349 } else {
350 outmsg.msg_namelen = 0;
351 }
352
353 // Data to send.
354 outmsg.msg_iov = &iov;
355 outmsg.msg_iovlen = 1;
356
357 // Build up the sndinfo message.
358 char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
359 outmsg.msg_control = outcmsg;
360 outmsg.msg_controllen = sizeof(outcmsg);
361 outmsg.msg_flags = 0;
362
363 struct cmsghdr *cmsg = CMSG_FIRSTHDR(&outmsg);
364 cmsg->cmsg_level = IPPROTO_SCTP;
365 cmsg->cmsg_type = SCTP_SNDRCV;
366 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
367
368 struct sctp_sndrcvinfo *sinfo =
369 reinterpret_cast<struct sctp_sndrcvinfo *>(CMSG_DATA(cmsg));
370 memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
371 sinfo->sinfo_ppid = ++send_ppid_;
372 sinfo->sinfo_stream = stream;
373 sinfo->sinfo_flags = 0;
374 sinfo->sinfo_assoc_id = snd_assoc_id;
375 sinfo->sinfo_timetolive = time_to_live;
376
377 // And send.
378 const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
379 if (size == -1) {
380 if (errno == EPIPE || errno == EAGAIN || errno == ESHUTDOWN ||
381 errno == EINTR) {
Austin Schuh581fab92022-02-07 19:50:54 -0800382 if (VLOG_IS_ON(1)) {
383 PLOG(WARNING) << "sendmsg on sctp socket failed";
384 }
Austin Schuh507f7582021-07-31 20:39:55 -0700385 return false;
386 }
387 PLOG(FATAL) << "sendmsg on sctp socket failed";
388 return false;
389 }
390 CHECK_EQ(static_cast<ssize_t>(data.size()), size);
Sarah Newman4aeb2372022-04-06 13:07:11 -0700391 VLOG(2) << "Sent " << data.size();
Austin Schuh507f7582021-07-31 20:39:55 -0700392 return true;
393}
394
Adam Snaider96a0f4b2023-05-18 20:41:19 -0700395SctpReadWrite::SctpReadWrite(std::vector<uint8_t> auth_key)
396 : auth_key_(std::move(auth_key)) {}
397
Austin Schuhf95a6ab2023-05-15 14:34:57 -0700398void SctpReadWrite::FreeMessage(aos::unique_c_ptr<Message> &&message) {
399 if (use_pool_) {
400 free_messages_.emplace_back(std::move(message));
401 }
402}
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800403
Austin Schuhf95a6ab2023-05-15 14:34:57 -0700404void SctpReadWrite::SetPoolSize(size_t pool_size) {
405 CHECK(!use_pool_);
406 free_messages_.reserve(pool_size);
407 for (size_t i = 0; i < pool_size; ++i) {
408 free_messages_.emplace_back(AcquireMessage());
409 }
410 use_pool_ = true;
411}
412
413aos::unique_c_ptr<Message> SctpReadWrite::AcquireMessage() {
414 if (!use_pool_) {
James Kuszmaul6e622382023-02-17 14:56:38 -0800415 constexpr size_t kMessageAlign = alignof(Message);
416 const size_t max_message_size =
Austin Schuh89e1e9c2023-05-15 14:38:44 -0700417 ((sizeof(Message) + max_read_size_ + 1 + (kMessageAlign - 1)) /
James Kuszmaul6e622382023-02-17 14:56:38 -0800418 kMessageAlign) *
419 kMessageAlign;
420 aos::unique_c_ptr<Message> result(reinterpret_cast<Message *>(
421 aligned_alloc(kMessageAlign, max_message_size)));
Austin Schuhf95a6ab2023-05-15 14:34:57 -0700422 return result;
423 } else {
424 CHECK_GT(free_messages_.size(), 0u);
425 aos::unique_c_ptr<Message> result = std::move(free_messages_.back());
426 free_messages_.pop_back();
427 return result;
428 }
429}
430
431// We read each fragment into a fresh Message, because most of them won't be
432// fragmented. If we do end up with a fragment, then we copy the data out of it.
433aos::unique_c_ptr<Message> SctpReadWrite::ReadMessage() {
434 CHECK(fd_ != -1);
435
436 while (true) {
437 aos::unique_c_ptr<Message> result = AcquireMessage();
Austin Schuha705d782021-07-31 20:40:00 -0700438
Austin Schuh05c18122021-07-31 20:39:47 -0700439 struct msghdr inmessage;
Austin Schuhc4202572021-03-31 21:06:55 -0700440 memset(&inmessage, 0, sizeof(struct msghdr));
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800441
Austin Schuh05c18122021-07-31 20:39:47 -0700442 struct iovec iov;
Austin Schuh89e1e9c2023-05-15 14:38:44 -0700443 iov.iov_len = max_read_size_ + 1;
Austin Schuha705d782021-07-31 20:40:00 -0700444 iov.iov_base = result->mutable_data();
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800445
Austin Schuhc4202572021-03-31 21:06:55 -0700446 inmessage.msg_iov = &iov;
447 inmessage.msg_iovlen = 1;
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800448
Austin Schuh05c18122021-07-31 20:39:47 -0700449 char incmsg[CMSG_SPACE(sizeof(_sctp_cmsg_data_t))];
Austin Schuhc4202572021-03-31 21:06:55 -0700450 inmessage.msg_control = incmsg;
451 inmessage.msg_controllen = sizeof(incmsg);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800452
Austin Schuhc4202572021-03-31 21:06:55 -0700453 inmessage.msg_namelen = sizeof(struct sockaddr_storage);
454 inmessage.msg_name = &result->sin;
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800455
Austin Schuha705d782021-07-31 20:40:00 -0700456 const ssize_t size = recvmsg(fd_, &inmessage, MSG_DONTWAIT);
457 if (size == -1) {
458 if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
459 // These are all non-fatal failures indicating we should retry later.
460 return nullptr;
Austin Schuhc4202572021-03-31 21:06:55 -0700461 }
Austin Schuha705d782021-07-31 20:40:00 -0700462 PLOG(FATAL) << "recvmsg on sctp socket " << fd_ << " failed";
Austin Schuhc4202572021-03-31 21:06:55 -0700463 }
464
Austin Schuha705d782021-07-31 20:40:00 -0700465 CHECK(!(inmessage.msg_flags & MSG_CTRUNC))
Austin Schuhc4202572021-03-31 21:06:55 -0700466 << ": Control message truncated.";
467
Austin Schuha705d782021-07-31 20:40:00 -0700468 if (MSG_NOTIFICATION & inmessage.msg_flags) {
469 result->message_type = Message::kNotification;
470 } else {
471 result->message_type = Message::kMessage;
472 }
473 result->partial_deliveries = 0;
Austin Schuhc4202572021-03-31 21:06:55 -0700474
Austin Schuha705d782021-07-31 20:40:00 -0700475 {
476 bool found_rcvinfo = false;
477 for (struct cmsghdr *scmsg = CMSG_FIRSTHDR(&inmessage); scmsg != NULL;
478 scmsg = CMSG_NXTHDR(&inmessage, scmsg)) {
479 switch (scmsg->cmsg_type) {
480 case SCTP_RCVINFO: {
481 CHECK(!found_rcvinfo);
482 found_rcvinfo = true;
483 result->header.rcvinfo =
484 *reinterpret_cast<struct sctp_rcvinfo *>(CMSG_DATA(scmsg));
485 } break;
486 default:
487 LOG(INFO) << "\tUnknown type: " << scmsg->cmsg_type;
488 break;
489 }
490 }
491 CHECK_EQ(found_rcvinfo, result->message_type == Message::kMessage)
492 << ": Failed to find a SCTP_RCVINFO cmsghdr. flags: "
493 << inmessage.msg_flags;
494 }
Austin Schuh89f23e32023-05-15 17:06:43 -0700495
496 // Client just sent too big a block of data. Eat it and signal up the
497 // chain.
498 result->size = size;
499 if (size > static_cast<ssize_t>(max_read_size_)) {
500 Abort(result->header.rcvinfo.rcv_assoc_id);
501 result->message_type = Message::kOverflow;
502
503 VLOG(1) << "Message overflowed buffer on stream "
504 << result->header.rcvinfo.rcv_sid << ", disconnecting."
505 << " Check for config mismatch or rogue device.";
506 return result;
507 }
508
Austin Schuha705d782021-07-31 20:40:00 -0700509 if (result->message_type == Message::kNotification) {
510 // Notifications are never fragmented, just return it now.
511 CHECK(inmessage.msg_flags & MSG_EOR)
512 << ": Notifications should never be big enough to fragment";
513 if (ProcessNotification(result.get())) {
514 // We handled this notification internally, so don't pass it on.
515 return nullptr;
516 }
517 return result;
518 }
519
520 auto partial_message_iterator =
521 std::find_if(partial_messages_.begin(), partial_messages_.end(),
522 [&result](const aos::unique_c_ptr<Message> &candidate) {
523 return result->header.rcvinfo.rcv_sid ==
524 candidate->header.rcvinfo.rcv_sid &&
525 result->header.rcvinfo.rcv_ssn ==
526 candidate->header.rcvinfo.rcv_ssn &&
527 result->header.rcvinfo.rcv_assoc_id ==
528 candidate->header.rcvinfo.rcv_assoc_id;
529 });
530 if (partial_message_iterator != partial_messages_.end()) {
531 const aos::unique_c_ptr<Message> &partial_message =
532 *partial_message_iterator;
533 // Verify it's really part of the same message.
534 CHECK_EQ(partial_message->message_type, result->message_type)
535 << ": for " << result->header.rcvinfo.rcv_sid << ","
536 << result->header.rcvinfo.rcv_ssn << ","
537 << result->header.rcvinfo.rcv_assoc_id;
538 CHECK_EQ(partial_message->header.rcvinfo.rcv_ppid,
539 result->header.rcvinfo.rcv_ppid)
540 << ": for " << result->header.rcvinfo.rcv_sid << ","
541 << result->header.rcvinfo.rcv_ssn << ","
542 << result->header.rcvinfo.rcv_assoc_id;
543
544 // Now copy the data over and update the size.
Austin Schuh89e1e9c2023-05-15 14:38:44 -0700545 CHECK_LE(partial_message->size + result->size, max_read_size_)
Austin Schuha705d782021-07-31 20:40:00 -0700546 << ": Assembled fragments overflowed buffer on stream "
547 << result->header.rcvinfo.rcv_sid << ".";
548 memcpy(partial_message->mutable_data() + partial_message->size,
549 result->data(), result->size);
550 ++partial_message->partial_deliveries;
Sarah Newman4aeb2372022-04-06 13:07:11 -0700551 VLOG(2) << "Merged fragment of " << result->size << " after "
Austin Schuha705d782021-07-31 20:40:00 -0700552 << partial_message->size << ", had "
553 << partial_message->partial_deliveries
554 << ", for: " << result->header.rcvinfo.rcv_sid << ","
555 << result->header.rcvinfo.rcv_ssn << ","
556 << result->header.rcvinfo.rcv_assoc_id;
557 partial_message->size += result->size;
558 result.reset();
559 }
560
561 if (inmessage.msg_flags & MSG_EOR) {
562 // This is the last fragment, so we have something to return.
563 if (partial_message_iterator != partial_messages_.end()) {
564 // It was already merged into the message in the list, so now we pull
565 // that out of the list and return it.
566 CHECK(!result);
567 result = std::move(*partial_message_iterator);
568 partial_messages_.erase(partial_message_iterator);
569 VLOG(1) << "Final count: " << (result->partial_deliveries + 1)
570 << ", size: " << result->size
571 << ", for: " << result->header.rcvinfo.rcv_sid << ","
572 << result->header.rcvinfo.rcv_ssn << ","
573 << result->header.rcvinfo.rcv_assoc_id;
574 }
575 CHECK(result);
576 return result;
577 }
578 if (partial_message_iterator == partial_messages_.end()) {
Sarah Newman4aeb2372022-04-06 13:07:11 -0700579 VLOG(2) << "Starting fragment for: " << result->header.rcvinfo.rcv_sid
Austin Schuha705d782021-07-31 20:40:00 -0700580 << "," << result->header.rcvinfo.rcv_ssn << ","
581 << result->header.rcvinfo.rcv_assoc_id;
582 // Need to record this as the first fragment.
583 partial_messages_.emplace_back(std::move(result));
584 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800585 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800586}
587
Sarah Newman80e955e2022-04-13 11:19:36 -0700588bool SctpReadWrite::Abort(sctp_assoc_t snd_assoc_id) {
589 if (fd_ == -1) {
590 return true;
591 }
592 VLOG(1) << "Sending abort to assoc " << snd_assoc_id;
593
594 // Use the assoc_id for the destination instead of the msg_name.
595 struct msghdr outmsg;
James Kuszmaul784deb72023-02-17 14:42:51 -0800596 memset(&outmsg, 0, sizeof(outmsg));
Sarah Newman80e955e2022-04-13 11:19:36 -0700597 outmsg.msg_namelen = 0;
598
599 outmsg.msg_iovlen = 0;
600
601 // Build up the sndinfo message.
602 char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
603 outmsg.msg_control = outcmsg;
604 outmsg.msg_controllen = CMSG_SPACE(sizeof(struct sctp_sndrcvinfo));
605 outmsg.msg_flags = 0;
606
607 struct cmsghdr *cmsg = CMSG_FIRSTHDR(&outmsg);
608 cmsg->cmsg_level = IPPROTO_SCTP;
609 cmsg->cmsg_type = SCTP_SNDRCV;
610 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
611
612 struct sctp_sndrcvinfo *sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
613 memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
614 sinfo->sinfo_stream = 0;
615 sinfo->sinfo_flags = SCTP_ABORT;
616 sinfo->sinfo_assoc_id = snd_assoc_id;
617
618 // And send.
619 const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
620 if (size == -1) {
621 if (errno == EPIPE || errno == EAGAIN || errno == ESHUTDOWN) {
622 return false;
623 }
624 return false;
625 } else {
626 CHECK_EQ(0, size);
627 return true;
628 }
629}
630
Austin Schuh507f7582021-07-31 20:39:55 -0700631void SctpReadWrite::CloseSocket() {
632 if (fd_ == -1) {
633 return;
634 }
635 LOG(INFO) << "close(" << fd_ << ")";
636 PCHECK(close(fd_) == 0);
637 fd_ = -1;
638}
639
640void SctpReadWrite::DoSetMaxSize() {
Austin Schuh89e1e9c2023-05-15 14:38:44 -0700641 size_t max_size = max_write_size_;
Austin Schuh507f7582021-07-31 20:39:55 -0700642
Austin Schuh61224882021-10-11 18:21:11 -0700643 // This sets the max packet size that we can send.
Austin Schuh89e1e9c2023-05-15 14:38:44 -0700644 CHECK_GE(ReadWMemMax(), max_write_size_)
Austin Schuh507f7582021-07-31 20:39:55 -0700645 << "wmem_max is too low. To increase wmem_max temporarily, do sysctl "
646 "-w net.core.wmem_max="
647 << max_size;
Austin Schuh507f7582021-07-31 20:39:55 -0700648 PCHECK(setsockopt(fd(), SOL_SOCKET, SO_SNDBUF, &max_size, sizeof(max_size)) ==
649 0);
Austin Schuh61224882021-10-11 18:21:11 -0700650
651 // The SO_RCVBUF option (also controlled by net.core.rmem_default) needs to be
652 // decently large but the actual size can be measured by tuning. The defaults
653 // should be fine. If it isn't big enough, transmission will fail.
Austin Schuh9dd8f592021-12-25 14:32:43 -0800654 if (FLAGS_rmem > 0) {
655 size_t rmem = FLAGS_rmem;
656 PCHECK(setsockopt(fd(), SOL_SOCKET, SO_RCVBUF, &rmem, sizeof(rmem)) == 0);
657 }
Austin Schuh507f7582021-07-31 20:39:55 -0700658}
659
Austin Schuha705d782021-07-31 20:40:00 -0700660bool SctpReadWrite::ProcessNotification(const Message *message) {
661 const union sctp_notification *const snp =
662 reinterpret_cast<const union sctp_notification *>(message->data());
663 switch (snp->sn_header.sn_type) {
664 case SCTP_PARTIAL_DELIVERY_EVENT: {
665 const struct sctp_pdapi_event *const partial_delivery =
666 &snp->sn_pdapi_event;
667 CHECK_EQ(partial_delivery->pdapi_length, sizeof(*partial_delivery))
668 << ": Kernel's SCTP code is not a version we support";
669 switch (partial_delivery->pdapi_indication) {
670 case SCTP_PARTIAL_DELIVERY_ABORTED: {
671 const auto iterator = std::find_if(
672 partial_messages_.begin(), partial_messages_.end(),
673 [partial_delivery](const aos::unique_c_ptr<Message> &candidate) {
674 // TODO(Brian): Once we have new enough userpace headers, for
675 // kernels that support level-2 interleaving, we'll need to add
676 // this:
677 // candidate->header.rcvinfo.rcv_sid ==
678 // partial_delivery->pdapi_stream &&
679 // candidate->header.rcvinfo.rcv_ssn ==
680 // partial_delivery->pdapi_seq &&
681 return candidate->header.rcvinfo.rcv_assoc_id ==
682 partial_delivery->pdapi_assoc_id;
683 });
684 CHECK(iterator != partial_messages_.end())
685 << ": Got out of sync with the kernel for "
686 << partial_delivery->pdapi_assoc_id;
687 VLOG(1) << "Pruning partial delivery for "
688 << iterator->get()->header.rcvinfo.rcv_sid << ","
689 << iterator->get()->header.rcvinfo.rcv_ssn << ","
690 << iterator->get()->header.rcvinfo.rcv_assoc_id;
691 partial_messages_.erase(iterator);
692 }
693 return true;
694 }
695 } break;
696 }
697 return false;
698}
699
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800700void Message::LogRcvInfo() const {
701 LOG(INFO) << "\tSNDRCV (stream=" << header.rcvinfo.rcv_sid
702 << " ssn=" << header.rcvinfo.rcv_ssn
703 << " tsn=" << header.rcvinfo.rcv_tsn << " flags=0x" << std::hex
704 << header.rcvinfo.rcv_flags << std::dec
705 << " ppid=" << header.rcvinfo.rcv_ppid
706 << " cumtsn=" << header.rcvinfo.rcv_cumtsn << ")";
707}
708
Austin Schuh2fe4b712020-03-15 14:21:45 -0700709size_t ReadRMemMax() {
710 struct stat current_stat;
711 if (stat("/proc/sys/net/core/rmem_max", &current_stat) != -1) {
712 return static_cast<size_t>(
713 std::stoi(util::ReadFileToStringOrDie("/proc/sys/net/core/rmem_max")));
714 } else {
715 LOG(WARNING) << "/proc/sys/net/core/rmem_max doesn't exist. Are you in a "
716 "container?";
717 return 212992;
718 }
719}
720
721size_t ReadWMemMax() {
722 struct stat current_stat;
723 if (stat("/proc/sys/net/core/wmem_max", &current_stat) != -1) {
724 return static_cast<size_t>(
725 std::stoi(util::ReadFileToStringOrDie("/proc/sys/net/core/wmem_max")));
726 } else {
727 LOG(WARNING) << "/proc/sys/net/core/wmem_max doesn't exist. Are you in a "
728 "container?";
729 return 212992;
730 }
731}
732
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800733} // namespace message_bridge
734} // namespace aos