aos/network: client to abort connection with no data

One problem we have seen is that the server appears to
successfully receive and process a connect message,
but never sends data. Because the server never sends
data, the client keeps sending the connect message at
kReconnectTimeout (3) second intervals. But the SCTP
state never changes.

If we can reasonably expect that there should be data
within 3 seconds and that failing to receive data also
means the connection has failed in some way, force SCTP
to start over by sending an abort. This results in an
SCTP_ASSOC_CHANGE(COMMUNICATION_LOST) notification
regardless of the state of the other side of the
connection, which will call NodeDisconnected and mark
the connection as down.

This is not guaranteed to fix the issues we were seeing,
but hopefully avoids the SCTP connection getting stuck
in a buggy state.

It may incidentally also force a reconnect faster when
an ethernet cable is disconnected and reconnected.

Change-Id: I6fdd0eed9bbe6a694152363ade48706b057cba6d
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index 0448c7d..b7ee3cc 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -196,19 +196,26 @@
 }
 
 void SctpClientConnection::SendConnect() {
+  ScheduleConnectTimeout();
+
+  // If we're already connected, assume something went wrong and abort
+  // the connection.
+  if (client_status_->GetClientConnection(client_index_)->state() ==
+      aos::message_bridge::State::CONNECTED) {
+    client_.Abort();
+    return;
+  }
   // Try to send the connect message.  If that fails, retry.
   if (client_.Send(kConnectStream(),
                    std::string_view(reinterpret_cast<const char *>(
                                         connect_message_.span().data()),
                                     connect_message_.span().size()),
                    0)) {
-    VLOG(1) << "Connect to " << remote_node_->hostname()->string_view()
+    VLOG(1) << "Sending connect to " << remote_node_->hostname()->string_view()
             << " succeeded.";
-    ScheduleConnectTimeout();
   } else {
     VLOG(1) << "Connect to " << remote_node_->hostname()->string_view()
             << " failed.";
-    NodeDisconnected();
   }
 }
 
@@ -218,11 +225,14 @@
   // We want to tell the kernel to schedule the packets on this new stream with
   // the priority scheduler.  This only needs to be done once per stream.
   client_.SetPriorityScheduler(assoc_id);
+  client_.SetAssociationId(assoc_id);
 
   client_status_->Connect(client_index_);
 }
 
 void SctpClientConnection::NodeDisconnected() {
+  client_.SetAssociationId(0);
+
   connect_timer_->Setup(
       event_loop_->monotonic_now() + chrono::milliseconds(100),
       chrono::milliseconds(100));
diff --git a/aos/network/sctp_client.h b/aos/network/sctp_client.h
index 9356612..1ba32ff 100644
--- a/aos/network/sctp_client.h
+++ b/aos/network/sctp_client.h
@@ -26,9 +26,13 @@
   // Sends a block of data on a stream with a TTL.
   // TODO(austin): time_to_live should be a chrono::duration
   bool Send(int stream, std::string_view data, int time_to_live) {
-    return sctp_.SendMessage(stream, data, time_to_live, sockaddr_remote_, 0);
+    return sctp_.SendMessage(stream, data, time_to_live, sockaddr_remote_,
+                             sac_assoc_id_);
   }
 
+  // Aborts a connection.  Returns true on success.
+  bool Abort() { return sctp_.Abort(sac_assoc_id_); }
+
   int fd() { return sctp_.fd(); }
 
   // Enables the priority scheduler.  This is a SCTP feature which lets us
@@ -45,10 +49,17 @@
 
   void SetMaxSize(size_t max_size) { sctp_.SetMaxSize(max_size); }
 
+  void SetAssociationId(sctp_assoc_t sac_assoc_id) {
+    sac_assoc_id_ = sac_assoc_id;
+  }
+
  private:
   struct sockaddr_storage sockaddr_remote_;
   struct sockaddr_storage sockaddr_local_;
   SctpReadWrite sctp_;
+
+  // Valid if != 0.
+  sctp_assoc_t sac_assoc_id_ = 0;
 };
 
 }  // namespace message_bridge
diff --git a/aos/network/sctp_lib.cc b/aos/network/sctp_lib.cc
index 21c0b78..53a28cb 100644
--- a/aos/network/sctp_lib.cc
+++ b/aos/network/sctp_lib.cc
@@ -481,6 +481,48 @@
   }
 }
 
+bool SctpReadWrite::Abort(sctp_assoc_t snd_assoc_id) {
+  if (fd_ == -1) {
+    return true;
+  }
+  VLOG(1) << "Sending abort to assoc " << snd_assoc_id;
+
+  // Use the assoc_id for the destination instead of the msg_name.
+  struct msghdr outmsg;
+  outmsg.msg_namelen = 0;
+
+  outmsg.msg_iovlen = 0;
+
+  // Build up the sndinfo message.
+  char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
+  outmsg.msg_control = outcmsg;
+  outmsg.msg_controllen = CMSG_SPACE(sizeof(struct sctp_sndrcvinfo));
+  outmsg.msg_flags = 0;
+
+  struct cmsghdr *cmsg = CMSG_FIRSTHDR(&outmsg);
+  cmsg->cmsg_level = IPPROTO_SCTP;
+  cmsg->cmsg_type = SCTP_SNDRCV;
+  cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
+
+  struct sctp_sndrcvinfo *sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
+  memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
+  sinfo->sinfo_stream = 0;
+  sinfo->sinfo_flags = SCTP_ABORT;
+  sinfo->sinfo_assoc_id = snd_assoc_id;
+
+  // And send.
+  const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
+  if (size == -1) {
+    if (errno == EPIPE || errno == EAGAIN || errno == ESHUTDOWN) {
+      return false;
+    }
+    return false;
+  } else {
+    CHECK_EQ(0, size);
+    return true;
+  }
+}
+
 void SctpReadWrite::CloseSocket() {
   if (fd_ == -1) {
     return;
diff --git a/aos/network/sctp_lib.h b/aos/network/sctp_lib.h
index e81e6b3..6cd11a3 100644
--- a/aos/network/sctp_lib.h
+++ b/aos/network/sctp_lib.h
@@ -100,6 +100,9 @@
   // Returns nullptr if the kernel blocks before returning a complete message.
   aos::unique_c_ptr<Message> ReadMessage();
 
+  // Send an abort message for the given association.
+  bool Abort(sctp_assoc_t snd_assoc_id);
+
   int fd() const { return fd_; }
 
   void SetMaxSize(size_t max_size) {
diff --git a/aos/network/sctp_server.cc b/aos/network/sctp_server.cc
index 93d1e88..15da7c0 100644
--- a/aos/network/sctp_server.cc
+++ b/aos/network/sctp_server.cc
@@ -69,43 +69,6 @@
   }
 }
 
-bool SctpServer::Abort(sctp_assoc_t snd_assoc_id) {
-  // Use the assoc_id for the destination instead of the msg_name.
-  struct msghdr outmsg;
-  outmsg.msg_namelen = 0;
-
-  outmsg.msg_iovlen = 0;
-
-  // Build up the sndinfo message.
-  char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
-  outmsg.msg_control = outcmsg;
-  outmsg.msg_controllen = CMSG_SPACE(sizeof(struct sctp_sndrcvinfo));
-  outmsg.msg_flags = 0;
-
-  struct cmsghdr *cmsg = CMSG_FIRSTHDR(&outmsg);
-  cmsg->cmsg_level = IPPROTO_SCTP;
-  cmsg->cmsg_type = SCTP_SNDRCV;
-  cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
-
-  struct sctp_sndrcvinfo *sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
-  memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
-  sinfo->sinfo_stream = 0;
-  sinfo->sinfo_flags = SCTP_ABORT;
-  sinfo->sinfo_assoc_id = snd_assoc_id;
-
-  // And send.
-  const ssize_t size = sendmsg(fd(), &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
-  if (size == -1) {
-    if (errno == EPIPE || errno == EAGAIN || errno == ESHUTDOWN) {
-      return false;
-    }
-    return false;
-  } else {
-    CHECK_EQ(0, size);
-    return true;
-  }
-}
-
 void SctpServer::SetPriorityScheduler(sctp_assoc_t assoc_id) {
   struct sctp_assoc_value scheduler;
   memset(&scheduler, 0, sizeof(scheduler));
diff --git a/aos/network/sctp_server.h b/aos/network/sctp_server.h
index 6ffe93e..dbfd1ac 100644
--- a/aos/network/sctp_server.h
+++ b/aos/network/sctp_server.h
@@ -39,7 +39,7 @@
   }
 
   // Aborts a connection.  Returns true on success.
-  bool Abort(sctp_assoc_t snd_assoc_id);
+  bool Abort(sctp_assoc_t snd_assoc_id) { return sctp_.Abort(snd_assoc_id); }
 
   int fd() { return sctp_.fd(); }