blob: 07bc86caf26f182d660a3635f4a7e9bde4efe24d [file] [log] [blame]
Austin Schuhe84c3ed2019-12-14 15:29:48 -08001#include "aos/network/sctp_lib.h"
2
3#include <arpa/inet.h>
4#include <net/if.h>
5#include <netdb.h>
6#include <netinet/sctp.h>
Austin Schuh2fe4b712020-03-15 14:21:45 -07007#include <sys/stat.h>
8#include <sys/types.h>
9#include <unistd.h>
Austin Schuhe84c3ed2019-12-14 15:29:48 -080010
11#include <string_view>
12
Austin Schuh2fe4b712020-03-15 14:21:45 -070013#include "aos/util/file.h"
14
Austin Schuhe84c3ed2019-12-14 15:29:48 -080015DEFINE_string(interface, "", "ipv6 interface");
16
17namespace aos {
18namespace message_bridge {
19
20namespace {
21const char *sac_state_tbl[] = {"COMMUNICATION_UP", "COMMUNICATION_LOST",
22 "RESTART", "SHUTDOWN_COMPLETE",
23 "CANT_START_ASSOCICATION"};
24
25typedef union {
26 struct sctp_initmsg init;
27 struct sctp_sndrcvinfo sndrcvinfo;
28} _sctp_cmsg_data_t;
29
30} // namespace
31
32struct sockaddr_storage ResolveSocket(std::string_view host, int port) {
33 struct sockaddr_storage result;
34 struct addrinfo *addrinfo_result;
35 struct sockaddr_in *t_addr = (struct sockaddr_in *)&result;
36 struct sockaddr_in6 *t_addr6 = (struct sockaddr_in6 *)&result;
37
Austin Schuh6d227942020-02-22 13:29:57 -080038 PCHECK(getaddrinfo(std::string(host).c_str(), 0, NULL, &addrinfo_result) == 0)
39 << ": Failed to look up " << host;
Austin Schuhe84c3ed2019-12-14 15:29:48 -080040
41 switch (addrinfo_result->ai_family) {
42 case AF_INET:
43 memcpy(t_addr, addrinfo_result->ai_addr, addrinfo_result->ai_addrlen);
44 t_addr->sin_family = addrinfo_result->ai_family;
45 t_addr->sin_port = htons(port);
46
47 break;
48 case AF_INET6:
49 memcpy(t_addr6, addrinfo_result->ai_addr, addrinfo_result->ai_addrlen);
50 t_addr6->sin6_family = addrinfo_result->ai_family;
51 t_addr6->sin6_port = htons(port);
52
53 if (FLAGS_interface.size() > 0) {
54 t_addr6->sin6_scope_id = if_nametoindex(FLAGS_interface.c_str());
55 }
56
57 break;
58 }
59
60 // Now print it back out nicely.
61 char host_string[NI_MAXHOST];
62 char service_string[NI_MAXSERV];
63
64 int error = getnameinfo((struct sockaddr *)&result,
65 addrinfo_result->ai_addrlen, host_string, NI_MAXHOST,
66 service_string, NI_MAXSERV, NI_NUMERICHOST);
67
68 if (error) {
69 LOG(ERROR) << "Reverse lookup failed ... " << gai_strerror(error);
70 }
71
72 LOG(INFO) << "remote:addr=" << host_string << ", port=" << service_string
73 << ", family=" << addrinfo_result->ai_family;
74
75 freeaddrinfo(addrinfo_result);
76
77 return result;
78}
79
80std::string_view Family(const struct sockaddr_storage &sockaddr) {
81 if (sockaddr.ss_family == AF_INET) {
82 return "AF_INET";
83 } else if (sockaddr.ss_family == AF_INET6) {
84 return "AF_INET6";
85 } else {
86 return "unknown";
87 }
88}
89std::string Address(const struct sockaddr_storage &sockaddr) {
90 char addrbuf[INET6_ADDRSTRLEN];
91 if (sockaddr.ss_family == AF_INET) {
92 const struct sockaddr_in *sin = (const struct sockaddr_in *)&sockaddr;
93 return std::string(
94 inet_ntop(AF_INET, &sin->sin_addr, addrbuf, INET6_ADDRSTRLEN));
95 } else {
96 const struct sockaddr_in6 *sin6 = (const struct sockaddr_in6 *)&sockaddr;
97 return std::string(
98 inet_ntop(AF_INET6, &sin6->sin6_addr, addrbuf, INET6_ADDRSTRLEN));
99 }
100}
101
102void PrintNotification(const Message *msg) {
103 const union sctp_notification *snp =
104 (const union sctp_notification *)msg->data();
105
106 LOG(INFO) << "Notification:";
107
108 switch (snp->sn_header.sn_type) {
109 case SCTP_ASSOC_CHANGE: {
110 const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
111 LOG(INFO) << "SCTP_ASSOC_CHANGE(" << sac_state_tbl[sac->sac_state] << ")";
112 VLOG(1) << " (assoc_change: state=" << sac->sac_state
113 << ", error=" << sac->sac_error
114 << ", instr=" << sac->sac_inbound_streams
115 << " outstr=" << sac->sac_outbound_streams
116 << ", assoc=" << sac->sac_assoc_id << ")";
117 } break;
118 case SCTP_PEER_ADDR_CHANGE: {
119 const struct sctp_paddr_change *spc = &snp->sn_paddr_change;
120 LOG(INFO) << " SlCTP_PEER_ADDR_CHANGE";
121 VLOG(1) << "\t\t(peer_addr_change: " << Address(spc->spc_aaddr)
122 << " state=" << spc->spc_state << ", error=" << spc->spc_error
123 << ")";
124 } break;
125 case SCTP_SEND_FAILED: {
126 const struct sctp_send_failed *ssf = &snp->sn_send_failed;
127 LOG(INFO) << " SCTP_SEND_FAILED";
128 VLOG(1) << "\t\t(sendfailed: len=" << ssf->ssf_length
129 << " err=" << ssf->ssf_error << ")";
130 } break;
131 case SCTP_REMOTE_ERROR: {
132 const struct sctp_remote_error *sre = &snp->sn_remote_error;
133 LOG(INFO) << " SCTP_REMOTE_ERROR";
134 VLOG(1) << "\t\t(remote_error: err=" << ntohs(sre->sre_error) << ")";
135 } break;
Austin Schuhf7777002020-09-01 18:41:28 -0700136 case SCTP_STREAM_CHANGE_EVENT: {
Austin Schuh62a0c272021-03-31 21:04:53 -0700137 const struct sctp_stream_change_event *sce = &snp->sn_strchange_event;
Austin Schuhf7777002020-09-01 18:41:28 -0700138 LOG(INFO) << " SCTP_STREAM_CHANGE_EVENT";
139 VLOG(1) << "\t\t(stream_change_event: flags=" << sce->strchange_flags
140 << ", assoc_id=" << sce->strchange_assoc_id
141 << ", instrms=" << sce->strchange_instrms
142 << ", outstrms=" << sce->strchange_outstrms << " )";
143 } break;
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800144 case SCTP_SHUTDOWN_EVENT: {
145 LOG(INFO) << " SCTP_SHUTDOWN_EVENT";
146 } break;
147 default:
148 LOG(INFO) << " Unknown type: " << snp->sn_header.sn_type;
149 break;
150 }
151}
152
153std::string GetHostname() {
154 char buf[256];
155 buf[sizeof(buf) - 1] = '\0';
156 PCHECK(gethostname(buf, sizeof(buf) - 1) == 0);
157 return buf;
158}
159
160std::string Message::PeerAddress() const { return Address(sin); }
161
162void LogSctpStatus(int fd, sctp_assoc_t assoc_id) {
163 struct sctp_status status;
164 memset(&status, 0, sizeof(status));
165 status.sstat_assoc_id = assoc_id;
166
167 socklen_t size = sizeof(status);
Austin Schuha5f545b2021-07-31 20:39:42 -0700168 const int result = getsockopt(fd, SOL_SCTP, SCTP_STATUS,
169 reinterpret_cast<void *>(&status), &size);
170 if (result == -1 && errno == EINVAL) {
171 LOG(INFO) << "sctp_status) not associated";
172 return;
173 }
174 PCHECK(result == 0);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800175
176 LOG(INFO) << "sctp_status) sstat_assoc_id:" << status.sstat_assoc_id
177 << " sstat_state:" << status.sstat_state
178 << " sstat_rwnd:" << status.sstat_rwnd
179 << " sstat_unackdata:" << status.sstat_unackdata
180 << " sstat_penddata:" << status.sstat_penddata
181 << " sstat_instrms:" << status.sstat_instrms
182 << " sstat_outstrms:" << status.sstat_outstrms
183 << " sstat_fragmentation_point:" << status.sstat_fragmentation_point
184 << " sstat_primary.spinfo_srtt:" << status.sstat_primary.spinfo_srtt
185 << " sstat_primary.spinfo_rto:" << status.sstat_primary.spinfo_rto;
186}
187
Austin Schuh507f7582021-07-31 20:39:55 -0700188void SctpReadWrite::OpenSocket(const struct sockaddr_storage &sockaddr_local) {
189 fd_ = socket(sockaddr_local.ss_family, SOCK_SEQPACKET, IPPROTO_SCTP);
190 PCHECK(fd_ != -1);
191 LOG(INFO) << "socket(" << Family(sockaddr_local)
192 << ", SOCK_SEQPACKET, IPPROTOSCTP) = " << fd_;
193 {
194 // Per https://tools.ietf.org/html/rfc6458
195 // Setting this to !0 allows event notifications to be interleaved
196 // with data if enabled, and would have to be handled in the code.
197 // Enabling interleaving would only matter during congestion, which
198 // typically only happens during application startup.
199 int interleaving = 0;
200 PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE,
201 &interleaving, sizeof(interleaving)) == 0);
202 }
203 {
204 // Enable recvinfo when a packet arrives.
205 int on = 1;
206 PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_RECVRCVINFO, &on, sizeof(int)) ==
207 0);
208 }
209
210 DoSetMaxSize();
211}
212
213bool SctpReadWrite::SendMessage(
214 int stream, std::string_view data, int time_to_live,
215 std::optional<struct sockaddr_storage> sockaddr_remote,
216 sctp_assoc_t snd_assoc_id) {
217 CHECK(fd_ != -1);
218 struct iovec iov;
219 iov.iov_base = const_cast<char *>(data.data());
220 iov.iov_len = data.size();
221
222 // Use the assoc_id for the destination instead of the msg_name.
223 struct msghdr outmsg;
224 if (sockaddr_remote) {
225 outmsg.msg_name = &*sockaddr_remote;
226 outmsg.msg_namelen = sizeof(*sockaddr_remote);
227 VLOG(1) << "Sending to " << Address(*sockaddr_remote);
228 } else {
229 outmsg.msg_namelen = 0;
230 }
231
232 // Data to send.
233 outmsg.msg_iov = &iov;
234 outmsg.msg_iovlen = 1;
235
236 // Build up the sndinfo message.
237 char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
238 outmsg.msg_control = outcmsg;
239 outmsg.msg_controllen = sizeof(outcmsg);
240 outmsg.msg_flags = 0;
241
242 struct cmsghdr *cmsg = CMSG_FIRSTHDR(&outmsg);
243 cmsg->cmsg_level = IPPROTO_SCTP;
244 cmsg->cmsg_type = SCTP_SNDRCV;
245 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
246
247 struct sctp_sndrcvinfo *sinfo =
248 reinterpret_cast<struct sctp_sndrcvinfo *>(CMSG_DATA(cmsg));
249 memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
250 sinfo->sinfo_ppid = ++send_ppid_;
251 sinfo->sinfo_stream = stream;
252 sinfo->sinfo_flags = 0;
253 sinfo->sinfo_assoc_id = snd_assoc_id;
254 sinfo->sinfo_timetolive = time_to_live;
255
256 // And send.
257 const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
258 if (size == -1) {
259 if (errno == EPIPE || errno == EAGAIN || errno == ESHUTDOWN ||
260 errno == EINTR) {
261 return false;
262 }
263 PLOG(FATAL) << "sendmsg on sctp socket failed";
264 return false;
265 }
266 CHECK_EQ(static_cast<ssize_t>(data.size()), size);
267 VLOG(1) << "Sent " << data.size();
268 return true;
269}
270
271aos::unique_c_ptr<Message> SctpReadWrite::ReadMessage() {
272 CHECK(fd_ != -1);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800273 aos::unique_c_ptr<Message> result(
Austin Schuh507f7582021-07-31 20:39:55 -0700274 reinterpret_cast<Message *>(malloc(sizeof(Message) + max_size_ + 1)));
Austin Schuhc4202572021-03-31 21:06:55 -0700275 result->size = 0;
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800276
Austin Schuhc4202572021-03-31 21:06:55 -0700277 int count = 0;
278 int last_flags = 0;
279 for (count = 0; !(last_flags & MSG_EOR); count++) {
Austin Schuh05c18122021-07-31 20:39:47 -0700280 struct msghdr inmessage;
Austin Schuhc4202572021-03-31 21:06:55 -0700281 memset(&inmessage, 0, sizeof(struct msghdr));
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800282
Austin Schuh05c18122021-07-31 20:39:47 -0700283 struct iovec iov;
Austin Schuh507f7582021-07-31 20:39:55 -0700284 iov.iov_len = max_size_ + 1 - result->size;
Austin Schuhc4202572021-03-31 21:06:55 -0700285 iov.iov_base = result->mutable_data() + result->size;
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800286
Austin Schuhc4202572021-03-31 21:06:55 -0700287 inmessage.msg_iov = &iov;
288 inmessage.msg_iovlen = 1;
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800289
Austin Schuh05c18122021-07-31 20:39:47 -0700290 char incmsg[CMSG_SPACE(sizeof(_sctp_cmsg_data_t))];
Austin Schuhc4202572021-03-31 21:06:55 -0700291 inmessage.msg_control = incmsg;
292 inmessage.msg_controllen = sizeof(incmsg);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800293
Austin Schuhc4202572021-03-31 21:06:55 -0700294 inmessage.msg_namelen = sizeof(struct sockaddr_storage);
295 inmessage.msg_name = &result->sin;
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800296
Austin Schuhc4202572021-03-31 21:06:55 -0700297 ssize_t size;
Austin Schuh507f7582021-07-31 20:39:55 -0700298 PCHECK((size = recvmsg(fd_, &inmessage, 0)) > 0);
Austin Schuhc4202572021-03-31 21:06:55 -0700299
300 if (count > 0) {
301 VLOG(1) << "Count: " << count;
302 VLOG(1) << "Last msg_flags: " << last_flags;
303 VLOG(1) << "msg_flags: " << inmessage.msg_flags;
304 VLOG(1) << "Current size: " << result->size;
305 VLOG(1) << "Received size: " << size;
Austin Schuh05c18122021-07-31 20:39:47 -0700306 CHECK_EQ(MSG_NOTIFICATION & inmessage.msg_flags,
307 MSG_NOTIFICATION & last_flags);
Austin Schuhc4202572021-03-31 21:06:55 -0700308 }
309
310 result->size += size;
311 last_flags = inmessage.msg_flags;
312
313 for (struct cmsghdr *scmsg = CMSG_FIRSTHDR(&inmessage); scmsg != NULL;
314 scmsg = CMSG_NXTHDR(&inmessage, scmsg)) {
315 switch (scmsg->cmsg_type) {
316 case SCTP_RCVINFO: {
Austin Schuh05c18122021-07-31 20:39:47 -0700317 struct sctp_rcvinfo *data =
318 reinterpret_cast<struct sctp_rcvinfo *>(CMSG_DATA(scmsg));
Austin Schuhc4202572021-03-31 21:06:55 -0700319 if (count > 0) {
320 VLOG(1) << "Got sctp_rcvinfo on continued packet";
321 CHECK_EQ(result->header.rcvinfo.rcv_sid, data->rcv_sid);
322 CHECK_EQ(result->header.rcvinfo.rcv_ssn, data->rcv_ssn);
323 CHECK_EQ(result->header.rcvinfo.rcv_ppid, data->rcv_ppid);
324 CHECK_EQ(result->header.rcvinfo.rcv_assoc_id, data->rcv_assoc_id);
325 }
326 result->header.rcvinfo = *data;
327 } break;
328 default:
329 LOG(INFO) << "\tUnknown type: " << scmsg->cmsg_type;
330 break;
331 }
332 }
333
Austin Schuh05c18122021-07-31 20:39:47 -0700334 CHECK_NE(last_flags & MSG_CTRUNC, MSG_CTRUNC)
Austin Schuhc4202572021-03-31 21:06:55 -0700335 << ": Control message truncated.";
336
Austin Schuh507f7582021-07-31 20:39:55 -0700337 CHECK_LE(result->size, max_size_)
338 << ": Message overflowed buffer on stream "
339 << result->header.rcvinfo.rcv_sid << ".";
Austin Schuhc4202572021-03-31 21:06:55 -0700340 }
341
342 result->partial_deliveries = count - 1;
343 if (count > 1) {
344 VLOG(1) << "Final count: " << count;
345 VLOG(1) << "Final size: " << result->size;
346 }
347
Austin Schuh05c18122021-07-31 20:39:47 -0700348 if ((MSG_NOTIFICATION & last_flags)) {
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800349 result->message_type = Message::kNotification;
350 } else {
351 result->message_type = Message::kMessage;
352 }
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800353 return result;
354}
355
Austin Schuh507f7582021-07-31 20:39:55 -0700356void SctpReadWrite::CloseSocket() {
357 if (fd_ == -1) {
358 return;
359 }
360 LOG(INFO) << "close(" << fd_ << ")";
361 PCHECK(close(fd_) == 0);
362 fd_ = -1;
363}
364
365void SctpReadWrite::DoSetMaxSize() {
366 // Have the kernel give us a factor of 10 more. This lets us have more than
367 // one full sized packet in flight.
368 size_t max_size = max_size_ * 10;
369
370 CHECK_GE(ReadRMemMax(), max_size)
371 << "rmem_max is too low. To increase rmem_max temporarily, do sysctl "
372 "-w net.core.rmem_max="
373 << max_size;
374 CHECK_GE(ReadWMemMax(), max_size)
375 << "wmem_max is too low. To increase wmem_max temporarily, do sysctl "
376 "-w net.core.wmem_max="
377 << max_size;
378 PCHECK(setsockopt(fd(), SOL_SOCKET, SO_RCVBUF, &max_size, sizeof(max_size)) ==
379 0);
380 PCHECK(setsockopt(fd(), SOL_SOCKET, SO_SNDBUF, &max_size, sizeof(max_size)) ==
381 0);
382}
383
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800384void Message::LogRcvInfo() const {
385 LOG(INFO) << "\tSNDRCV (stream=" << header.rcvinfo.rcv_sid
386 << " ssn=" << header.rcvinfo.rcv_ssn
387 << " tsn=" << header.rcvinfo.rcv_tsn << " flags=0x" << std::hex
388 << header.rcvinfo.rcv_flags << std::dec
389 << " ppid=" << header.rcvinfo.rcv_ppid
390 << " cumtsn=" << header.rcvinfo.rcv_cumtsn << ")";
391}
392
Austin Schuh2fe4b712020-03-15 14:21:45 -0700393size_t ReadRMemMax() {
394 struct stat current_stat;
395 if (stat("/proc/sys/net/core/rmem_max", &current_stat) != -1) {
396 return static_cast<size_t>(
397 std::stoi(util::ReadFileToStringOrDie("/proc/sys/net/core/rmem_max")));
398 } else {
399 LOG(WARNING) << "/proc/sys/net/core/rmem_max doesn't exist. Are you in a "
400 "container?";
401 return 212992;
402 }
403}
404
405size_t ReadWMemMax() {
406 struct stat current_stat;
407 if (stat("/proc/sys/net/core/wmem_max", &current_stat) != -1) {
408 return static_cast<size_t>(
409 std::stoi(util::ReadFileToStringOrDie("/proc/sys/net/core/wmem_max")));
410 } else {
411 LOG(WARNING) << "/proc/sys/net/core/wmem_max doesn't exist. Are you in a "
412 "container?";
413 return 212992;
414 }
415}
416
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800417} // namespace message_bridge
418} // namespace aos