aos/sctp: Add server partial delivery stats

Change-Id: I63439180ddf2523a73c2bc579fe4ce2a6574dc67
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index d8ab66e..26aa8a3 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -2200,9 +2200,9 @@
 }
 
 constexpr std::string_view kCombinedConfigSha1(
-    "8d17eb7c2347fd4a8a9a2e0f171a91338fe4d5dd705829c39497608075a8d6fc");
+    "60f7907de2ce98f5df6cc433487165086595021fabf52176fe45f532305e447b");
 constexpr std::string_view kSplitConfigSha1(
-    "a6235491429b7b062e5da35c1d0d279c7e7e33cd70787f231d420ab831959744");
+    "49e1da5ed560f76252706d30f3180207b81ec2100ec9bd8adc10df45460022ce");
 
 INSTANTIATE_TEST_CASE_P(
     All, MultinodeLoggerTest,
diff --git a/aos/network/message_bridge_server.fbs b/aos/network/message_bridge_server.fbs
index fa73ba5..cdf970c 100644
--- a/aos/network/message_bridge_server.fbs
+++ b/aos/network/message_bridge_server.fbs
@@ -19,7 +19,7 @@
   // Number of packets that have been dropped (if known).
   dropped_packets:uint (id: 2);
 
-  // Number of packets received on all channels.
+  // Number of packets sent on all channels.
   sent_packets:uint (id: 3);
 
   // This is the measured monotonic offset for the connected node in
@@ -30,6 +30,10 @@
   // Boot UUID of the client.
   boot_uuid:string (id: 5);
 
+  // Number of extra calls needed to receive a single message
+  // (indicates congestion)
+  partial_deliveries:uint (id: 6);
+
   // TODO(austin): Per channel counts?
 }
 
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 77fe1e4..ea75693 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -112,9 +112,10 @@
   // and flushes.  Whee.
 }
 
-void ChannelState::HandleDelivery(
-    sctp_assoc_t rcv_assoc_id, uint16_t /*ssn*/, absl::Span<const uint8_t> data,
-    const MessageBridgeServerStatus &server_status) {
+void ChannelState::HandleDelivery(sctp_assoc_t rcv_assoc_id, uint16_t /*ssn*/,
+                                  absl::Span<const uint8_t> data,
+                                  uint32_t partial_deliveries,
+                                  MessageBridgeServerStatus *server_status) {
   const logger::MessageHeader *message_header =
       flatbuffers::GetRoot<logger::MessageHeader>(data.data());
   for (Peer &peer : peers_) {
@@ -130,7 +131,7 @@
             peer.timestamp_logger->MakeBuilder();
 
         flatbuffers::Offset<flatbuffers::Vector<uint8_t>> boot_uuid_offset =
-            server_status.BootUUID(peer.node_index).PackVector(builder.fbb());
+            server_status->BootUUID(peer.node_index).PackVector(builder.fbb());
 
         RemoteMessage::Builder remote_message_builder =
             builder.MakeBuilder<RemoteMessage>();
@@ -155,6 +156,9 @@
             message_header->queue_index());
         remote_message_builder.add_boot_uuid(boot_uuid_offset);
 
+        server_status->AddPartialDeliveries(peer.node_index,
+                                            partial_deliveries);
+
         builder.Send(remote_message_builder.Finish());
       }
       break;
@@ -422,6 +426,7 @@
                    ->string_view();
     server_status_.ResetFilter(node_index);
     server_status_.ClearBootUUID(node_index);
+    server_status_.ResetPartialDeliveries(node_index);
   }
 }
 
@@ -495,7 +500,10 @@
         ++channel_index;
       }
     }
+    // TODO(sarah.newman): what if node_index is -1?
     server_status_.ResetFilter(node_index);
+    server_status_.AddPartialDeliveries(node_index,
+                                        message->partial_deliveries);
     server_status_.SetBootUUID(
         node_index, UUID::FromString(connect->boot_uuid()->string_view()));
     VLOG(1) << "Resetting filters for " << node_index << " "
@@ -515,7 +523,7 @@
             message->header.rcvinfo.rcv_assoc_id,
             message->header.rcvinfo.rcv_ssn,
             absl::Span<const uint8_t>(message->data(), message->size),
-            server_status_);
+            message->partial_deliveries, &server_status_);
   }
 
   if (VLOG_IS_ON(1)) {
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index be1801c..7ff4e3a 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -85,7 +85,8 @@
   // Handles reception of delivery times.
   void HandleDelivery(sctp_assoc_t rcv_assoc_id, uint16_t ssn,
                       absl::Span<const uint8_t> data,
-                      const MessageBridgeServerStatus &server_status);
+                      uint32_t partial_deliveries,
+                      MessageBridgeServerStatus *server_status);
 
   // Handles (by consuming) failure to deliver a message.
   void HandleFailure(
diff --git a/aos/network/message_bridge_server_status.cc b/aos/network/message_bridge_server_status.cc
index 06f88ed..48b442a 100644
--- a/aos/network/message_bridge_server_status.cc
+++ b/aos/network/message_bridge_server_status.cc
@@ -36,6 +36,7 @@
     connection_builder.add_dropped_packets(0);
     connection_builder.add_sent_packets(0);
     connection_builder.add_monotonic_offset(0);
+    connection_builder.add_partial_deliveries(0);
     connection_offsets.emplace_back(connection_builder.Finish());
   }
   flatbuffers::Offset<
@@ -85,6 +86,7 @@
       statistics_.message().connections()->size());
 
   filters_.resize(event_loop->configuration()->nodes()->size());
+  partial_deliveries_.resize(event_loop->configuration()->nodes()->size());
   boot_uuids_.resize(event_loop->configuration()->nodes()->size(), UUID::Zero());
   has_boot_uuids_.resize(event_loop->configuration()->nodes()->size(), false);
   timestamp_fetchers_.resize(event_loop->configuration()->nodes()->size());
@@ -191,6 +193,8 @@
     server_connection_builder.add_dropped_packets(
         connection->dropped_packets());
     server_connection_builder.add_sent_packets(connection->sent_packets());
+    server_connection_builder.add_partial_deliveries(
+        partial_deliveries_[node_index]);
 
     // TODO(austin): If it gets stale, drop it too.
     if (!filters_[node_index].MissingSamples()) {
diff --git a/aos/network/message_bridge_server_status.h b/aos/network/message_bridge_server_status.h
index 52fc174..04b30de 100644
--- a/aos/network/message_bridge_server_status.h
+++ b/aos/network/message_bridge_server_status.h
@@ -49,6 +49,18 @@
   // one.
   const UUID &BootUUID(int node_index) const { return boot_uuids_[node_index]; }
 
+  void AddPartialDeliveries(int node_index, uint32_t partial_deliveries) {
+    partial_deliveries_[node_index] += partial_deliveries;
+  }
+
+  void ResetPartialDeliveries(int node_index) {
+    partial_deliveries_[node_index] = 0;
+  }
+
+  uint32_t PartialDeliveries(int node_index) const {
+    return partial_deliveries_[node_index];
+  }
+
   // Returns the ServerConnection message which is updated by the server.
   ServerConnection *FindServerConnection(std::string_view node_name);
   ServerConnection *FindServerConnection(const Node *node);
@@ -105,6 +117,8 @@
   std::function<void(const Context &)> send_data_;
 
   bool send_ = true;
+
+  std::vector<uint32_t> partial_deliveries_;
 };