aos/sctp: Add server partial delivery stats
Change-Id: I63439180ddf2523a73c2bc579fe4ce2a6574dc67
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index d8ab66e..26aa8a3 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -2200,9 +2200,9 @@
}
constexpr std::string_view kCombinedConfigSha1(
- "8d17eb7c2347fd4a8a9a2e0f171a91338fe4d5dd705829c39497608075a8d6fc");
+ "60f7907de2ce98f5df6cc433487165086595021fabf52176fe45f532305e447b");
constexpr std::string_view kSplitConfigSha1(
- "a6235491429b7b062e5da35c1d0d279c7e7e33cd70787f231d420ab831959744");
+ "49e1da5ed560f76252706d30f3180207b81ec2100ec9bd8adc10df45460022ce");
INSTANTIATE_TEST_CASE_P(
All, MultinodeLoggerTest,
diff --git a/aos/network/message_bridge_server.fbs b/aos/network/message_bridge_server.fbs
index fa73ba5..cdf970c 100644
--- a/aos/network/message_bridge_server.fbs
+++ b/aos/network/message_bridge_server.fbs
@@ -19,7 +19,7 @@
// Number of packets that have been dropped (if known).
dropped_packets:uint (id: 2);
- // Number of packets received on all channels.
+ // Number of packets sent on all channels.
sent_packets:uint (id: 3);
// This is the measured monotonic offset for the connected node in
@@ -30,6 +30,10 @@
// Boot UUID of the client.
boot_uuid:string (id: 5);
+ // Number of extra calls needed to receive a single message
+ // (indicates congestion)
+ partial_deliveries:uint (id: 6);
+
// TODO(austin): Per channel counts?
}
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 77fe1e4..ea75693 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -112,9 +112,10 @@
// and flushes. Whee.
}
-void ChannelState::HandleDelivery(
- sctp_assoc_t rcv_assoc_id, uint16_t /*ssn*/, absl::Span<const uint8_t> data,
- const MessageBridgeServerStatus &server_status) {
+void ChannelState::HandleDelivery(sctp_assoc_t rcv_assoc_id, uint16_t /*ssn*/,
+ absl::Span<const uint8_t> data,
+ uint32_t partial_deliveries,
+ MessageBridgeServerStatus *server_status) {
const logger::MessageHeader *message_header =
flatbuffers::GetRoot<logger::MessageHeader>(data.data());
for (Peer &peer : peers_) {
@@ -130,7 +131,7 @@
peer.timestamp_logger->MakeBuilder();
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> boot_uuid_offset =
- server_status.BootUUID(peer.node_index).PackVector(builder.fbb());
+ server_status->BootUUID(peer.node_index).PackVector(builder.fbb());
RemoteMessage::Builder remote_message_builder =
builder.MakeBuilder<RemoteMessage>();
@@ -155,6 +156,9 @@
message_header->queue_index());
remote_message_builder.add_boot_uuid(boot_uuid_offset);
+ server_status->AddPartialDeliveries(peer.node_index,
+ partial_deliveries);
+
builder.Send(remote_message_builder.Finish());
}
break;
@@ -422,6 +426,7 @@
->string_view();
server_status_.ResetFilter(node_index);
server_status_.ClearBootUUID(node_index);
+ server_status_.ResetPartialDeliveries(node_index);
}
}
@@ -495,7 +500,10 @@
++channel_index;
}
}
+ // TODO(sarah.newman): what if node_index is -1?
server_status_.ResetFilter(node_index);
+ server_status_.AddPartialDeliveries(node_index,
+ message->partial_deliveries);
server_status_.SetBootUUID(
node_index, UUID::FromString(connect->boot_uuid()->string_view()));
VLOG(1) << "Resetting filters for " << node_index << " "
@@ -515,7 +523,7 @@
message->header.rcvinfo.rcv_assoc_id,
message->header.rcvinfo.rcv_ssn,
absl::Span<const uint8_t>(message->data(), message->size),
- server_status_);
+ message->partial_deliveries, &server_status_);
}
if (VLOG_IS_ON(1)) {
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index be1801c..7ff4e3a 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -85,7 +85,8 @@
// Handles reception of delivery times.
void HandleDelivery(sctp_assoc_t rcv_assoc_id, uint16_t ssn,
absl::Span<const uint8_t> data,
- const MessageBridgeServerStatus &server_status);
+ uint32_t partial_deliveries,
+ MessageBridgeServerStatus *server_status);
// Handles (by consuming) failure to deliver a message.
void HandleFailure(
diff --git a/aos/network/message_bridge_server_status.cc b/aos/network/message_bridge_server_status.cc
index 06f88ed..48b442a 100644
--- a/aos/network/message_bridge_server_status.cc
+++ b/aos/network/message_bridge_server_status.cc
@@ -36,6 +36,7 @@
connection_builder.add_dropped_packets(0);
connection_builder.add_sent_packets(0);
connection_builder.add_monotonic_offset(0);
+ connection_builder.add_partial_deliveries(0);
connection_offsets.emplace_back(connection_builder.Finish());
}
flatbuffers::Offset<
@@ -85,6 +86,7 @@
statistics_.message().connections()->size());
filters_.resize(event_loop->configuration()->nodes()->size());
+ partial_deliveries_.resize(event_loop->configuration()->nodes()->size());
boot_uuids_.resize(event_loop->configuration()->nodes()->size(), UUID::Zero());
has_boot_uuids_.resize(event_loop->configuration()->nodes()->size(), false);
timestamp_fetchers_.resize(event_loop->configuration()->nodes()->size());
@@ -191,6 +193,8 @@
server_connection_builder.add_dropped_packets(
connection->dropped_packets());
server_connection_builder.add_sent_packets(connection->sent_packets());
+ server_connection_builder.add_partial_deliveries(
+ partial_deliveries_[node_index]);
// TODO(austin): If it gets stale, drop it too.
if (!filters_[node_index].MissingSamples()) {
diff --git a/aos/network/message_bridge_server_status.h b/aos/network/message_bridge_server_status.h
index 52fc174..04b30de 100644
--- a/aos/network/message_bridge_server_status.h
+++ b/aos/network/message_bridge_server_status.h
@@ -49,6 +49,18 @@
// one.
const UUID &BootUUID(int node_index) const { return boot_uuids_[node_index]; }
+ void AddPartialDeliveries(int node_index, uint32_t partial_deliveries) {
+ partial_deliveries_[node_index] += partial_deliveries;
+ }
+
+ void ResetPartialDeliveries(int node_index) {
+ partial_deliveries_[node_index] = 0;
+ }
+
+ uint32_t PartialDeliveries(int node_index) const {
+ return partial_deliveries_[node_index];
+ }
+
// Returns the ServerConnection message which is updated by the server.
ServerConnection *FindServerConnection(std::string_view node_name);
ServerConnection *FindServerConnection(const Node *node);
@@ -105,6 +117,8 @@
std::function<void(const Context &)> send_data_;
bool send_ = true;
+
+ std::vector<uint32_t> partial_deliveries_;
};