Refactor internal state vector in MessageBridgeServerStatus
Rather than a bunch of vectors, use one vector of structs.
Change-Id: Ic01486c59665b78fbe488faef9881553446609be
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index acf439e..b363142 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -276,8 +276,9 @@
fbb.ForceDefaults(true);
// Reset the filter every time the UUID changes. There's probably a more
// clever way to do this, but that means a better concept of rebooting.
- if (server_status_->BootUUID(destination_node_index_) !=
- send_node_factory_->boot_uuid()) {
+ if (!server_status_->BootUUID(destination_node_index_).has_value() ||
+ (server_status_->BootUUID(destination_node_index_).value() !=
+ send_node_factory_->boot_uuid())) {
server_status_->ResetFilter(destination_node_index_);
server_status_->SetBootUUID(destination_node_index_,
send_node_factory_->boot_uuid());
@@ -442,9 +443,9 @@
node_state->event_loop->node());
size_t node_index = 0;
- for (ServerConnection *connection :
- node_state->server_status->server_connection()) {
- if (connection != nullptr) {
+ for (const std::optional<MessageBridgeServerStatus::NodeState>
+ &connection : node_state->server_status->nodes()) {
+ if (connection.has_value()) {
node_state->server_status->ResetFilter(node_index);
}
++node_index;
@@ -636,8 +637,9 @@
{
size_t node_index = 0;
- for (ServerConnection *connection : server_status->server_connection()) {
- if (connection) {
+ for (const std::optional<MessageBridgeServerStatus::NodeState> &connection :
+ server_status->nodes()) {
+ if (connection.has_value()) {
if (boot_uuids_[node_index] != UUID::Zero()) {
switch (server_state_[node_index]) {
case message_bridge::State::DISCONNECTED:
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index ef20fa6..0947794 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -133,7 +133,9 @@
peer.timestamp_logger->MakeBuilder();
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> boot_uuid_offset =
- server_status->BootUUID(peer.node_index).PackVector(builder.fbb());
+ server_status->BootUUID(peer.node_index)
+ .value()
+ .PackVector(builder.fbb());
RemoteMessage::Builder remote_message_builder =
builder.MakeBuilder<RemoteMessage>();
@@ -540,7 +542,8 @@
const int node_index =
configuration::GetNodeIndex(event_loop_->configuration(), client_node);
- ServerConnection *connection = server_status_.server_connection()[node_index];
+ ServerConnection *connection =
+ server_status_.nodes()[node_index].value().server_connection;
if (connection != nullptr) {
connection->mutate_invalid_connection_count(
diff --git a/aos/network/message_bridge_server_status.cc b/aos/network/message_bridge_server_status.cc
index 96917f1..d823f30 100644
--- a/aos/network/message_bridge_server_status.cc
+++ b/aos/network/message_bridge_server_status.cc
@@ -90,13 +90,7 @@
statistics_.message().connections()->size());
client_offsets_.reserve(statistics_.message().connections()->size());
- filters_.resize(event_loop->configuration()->nodes()->size());
- partial_deliveries_.resize(event_loop->configuration()->nodes()->size());
- boot_uuids_.resize(event_loop->configuration()->nodes()->size(),
- UUID::Zero());
- has_boot_uuids_.resize(event_loop->configuration()->nodes()->size(), false);
- timestamp_fetchers_.resize(event_loop->configuration()->nodes()->size());
- server_connection_.resize(event_loop->configuration()->nodes()->size());
+ nodes_.resize(event_loop->configuration()->nodes()->size());
// Seed up all the per-node connection state.
// We are making the assumption here that every connection is bidirectional
@@ -117,13 +111,13 @@
Timestamp::GetFullyQualifiedName(),
event_loop_->name(), destination_node);
- timestamp_fetchers_[node_index] = event_loop_->MakeFetcher<Timestamp>(
- other_timestamp_channel->name()->string_view());
-
- // And then find the server connection that we should be populating
- // statistics into.
- server_connection_[node_index] =
- FindServerConnection(destination_node->name()->string_view());
+ nodes_[node_index] = NodeState{
+ .server_connection =
+ FindServerConnection(destination_node->name()->string_view()),
+ .timestamp_fetcher = event_loop_->MakeFetcher<Timestamp>(
+ other_timestamp_channel->name()->string_view()),
+ .filter = {},
+ .boot_uuid = std::nullopt};
}
statistics_timer_ = event_loop_->AddTimer([this]() { Tick(); });
@@ -150,40 +144,40 @@
void MessageBridgeServerStatus::SetBootUUID(int node_index,
const UUID &boot_uuid) {
- has_boot_uuids_[node_index] = true;
- boot_uuids_[node_index] = boot_uuid;
+ nodes_[node_index].value().boot_uuid = boot_uuid;
SendStatistics();
last_statistics_send_time_ = event_loop_->monotonic_now();
}
void MessageBridgeServerStatus::ClearBootUUID(int node_index) {
- has_boot_uuids_[node_index] = false;
+ nodes_[node_index].value().boot_uuid.reset();
SendStatistics();
last_statistics_send_time_ = event_loop_->monotonic_now();
}
void MessageBridgeServerStatus::ResetFilter(int node_index) {
- filters_[node_index].Reset();
- server_connection_[node_index]->mutate_monotonic_offset(0);
+ nodes_[node_index].value().filter.Reset();
+ nodes_[node_index].value().server_connection->mutate_monotonic_offset(0);
}
void MessageBridgeServerStatus::Connect(
int node_index, monotonic_clock::time_point monotonic_now) {
- server_connection_[node_index]->mutate_state(State::CONNECTED);
+ ServerConnection *message = nodes_[node_index].value().server_connection;
+ message->mutate_state(State::CONNECTED);
// Only count connections if the timestamp changes. This deduplicates
// multiple channel connections at the same point in time.
- if (server_connection_[node_index]->connected_since_time() !=
+ if (message->connected_since_time() !=
monotonic_now.time_since_epoch().count()) {
- server_connection_[node_index]->mutate_connection_count(
- server_connection_[node_index]->connection_count() + 1);
- server_connection_[node_index]->mutate_connected_since_time(
+ message->mutate_connection_count(message->connection_count() + 1);
+ message->mutate_connected_since_time(
monotonic_now.time_since_epoch().count());
}
}
void MessageBridgeServerStatus::Disconnect(int node_index) {
- server_connection_[node_index]->mutate_state(State::DISCONNECTED);
- server_connection_[node_index]->mutate_connected_since_time(
+ ServerConnection *message = nodes_[node_index].value().server_connection;
+ message->mutate_state(State::DISCONNECTED);
+ message->mutate_connected_since_time(
aos::monotonic_clock::min_time.time_since_epoch().count());
}
@@ -208,8 +202,10 @@
flatbuffers::Offset<flatbuffers::String> boot_uuid_offset;
if (connection->state() == State::CONNECTED &&
- has_boot_uuids_[node_index]) {
- boot_uuid_offset = boot_uuids_[node_index].PackString(builder.fbb());
+ nodes_[node_index].value().boot_uuid.has_value()) {
+ boot_uuid_offset =
+ nodes_[node_index].value().boot_uuid.value().PackString(
+ builder.fbb());
}
ServerConnection::Builder server_connection_builder =
@@ -220,7 +216,7 @@
connection->dropped_packets());
server_connection_builder.add_sent_packets(connection->sent_packets());
server_connection_builder.add_partial_deliveries(
- partial_deliveries_[node_index]);
+ PartialDeliveries(node_index));
if (connection->connected_since_time() !=
monotonic_clock::min_time.time_since_epoch().count()) {
@@ -239,7 +235,7 @@
}
// TODO(austin): If it gets stale, drop it too.
- if (!filters_[node_index].MissingSamples()) {
+ if (!nodes_[node_index].value().filter.MissingSamples()) {
server_connection_builder.add_monotonic_offset(
connection->monotonic_offset());
}
@@ -313,22 +309,22 @@
continue;
}
- timestamp_fetchers_[node_index].Fetch();
+ aos::Fetcher<Timestamp> ×tamp_fetcher =
+ nodes_[node_index].value().timestamp_fetcher;
+ timestamp_fetcher.Fetch();
// Find the offset computed on their node for this client connection
// using their timestamp message.
bool has_their_offset = false;
chrono::nanoseconds their_offset = chrono::nanoseconds(0);
- if (timestamp_fetchers_[node_index].get() != nullptr) {
+ if (timestamp_fetcher.get() != nullptr) {
for (const ClientOffset *client_offset :
- *timestamp_fetchers_[node_index]->offsets()) {
+ *timestamp_fetcher->offsets()) {
if (client_offset->node()->name()->string_view() ==
event_loop_->node()->name()->string_view()) {
// Make sure it has an offset and the message isn't stale.
if (client_offset->has_monotonic_offset()) {
- if (timestamp_fetchers_[node_index]
- .context()
- .monotonic_event_time +
+ if (timestamp_fetcher.context().monotonic_event_time +
MessageBridgeServerStatus::kTimestampStaleTimeout >
event_loop_->context().monotonic_event_time) {
their_offset =
@@ -344,26 +340,24 @@
}
}
- if (has_their_offset &&
- server_connection_[node_index]->state() == State::CONNECTED) {
+ ServerConnection *message = nodes_[node_index].value().server_connection;
+ ClippedAverageFilter &filter = nodes_[node_index].value().filter;
+ if (has_their_offset && message->state() == State::CONNECTED) {
// Update the filters.
- if (filters_[node_index].MissingSamples()) {
+ if (filter.MissingSamples()) {
// Update the offset the first time. This should be representative.
- filters_[node_index].set_base_offset(
+ filter.set_base_offset(
chrono::nanoseconds(connection->monotonic_offset()));
}
// The message_bridge_clients are the ones running the first filter. So
// set the values from that and let the averaging filter run from there.
- filters_[node_index].FwdSet(
- timestamp_fetchers_[node_index].context().monotonic_remote_time,
- chrono::nanoseconds(connection->monotonic_offset()));
- filters_[node_index].RevSet(
- client_statistics_fetcher_.context().monotonic_event_time,
- their_offset);
+ filter.FwdSet(timestamp_fetcher.context().monotonic_remote_time,
+ chrono::nanoseconds(connection->monotonic_offset()));
+ filter.RevSet(client_statistics_fetcher_.context().monotonic_event_time,
+ their_offset);
// Publish!
- server_connection_[node_index]->mutate_monotonic_offset(
- -filters_[node_index].offset().count());
+ message->mutate_monotonic_offset(-filter.offset().count());
}
// Now fill out the Timestamp message with the offset from the client.
diff --git a/aos/network/message_bridge_server_status.h b/aos/network/message_bridge_server_status.h
index 624b376..c71bd84 100644
--- a/aos/network/message_bridge_server_status.h
+++ b/aos/network/message_bridge_server_status.h
@@ -24,6 +24,21 @@
// Time after which we consider the timestamp stale, and reset the filter.
static constexpr std::chrono::milliseconds kTimestampStaleTimeout{1000};
+ // Struct containing all of the relevant state for a given node.
+ struct NodeState {
+ // Mutable status for this node, to be sent out in the ServerStatistics
+ // message.
+ ServerConnection *server_connection;
+ // Fetcher to retrieve timestamps for the connection to the other node, for
+ // feeding the timestamp filter.
+ aos::Fetcher<Timestamp> timestamp_fetcher;
+ // Filter for calculating current time offsets to the other node.
+ ClippedAverageFilter filter;
+ // Current boot UUID of the other node, if available.
+ std::optional<UUID> boot_uuid;
+ uint32_t partial_deliveries = 0;
+ };
+
MessageBridgeServerStatus(aos::EventLoop *event_loop,
std::function<void(const Context &)> send_data =
std::function<void(const Context &)>());
@@ -48,29 +63,28 @@
void Connect(int node_index, monotonic_clock::time_point monotonic_now);
void Disconnect(int node_index);
- // Returns the boot UUID for a node, or an empty string_view if there isn't
- // one.
- const UUID &BootUUID(int node_index) const { return boot_uuids_[node_index]; }
+ // Returns the boot UUID for a node, or nullopt if there isn't one.
+ const std::optional<UUID> &BootUUID(int node_index) const {
+ return nodes_[node_index].value().boot_uuid;
+ }
void AddPartialDeliveries(int node_index, uint32_t partial_deliveries) {
- partial_deliveries_[node_index] += partial_deliveries;
+ nodes_[node_index].value().partial_deliveries += partial_deliveries;
}
void ResetPartialDeliveries(int node_index) {
- partial_deliveries_[node_index] = 0;
+ nodes_[node_index].value().partial_deliveries = 0;
}
uint32_t PartialDeliveries(int node_index) const {
- return partial_deliveries_[node_index];
+ return nodes_[node_index].value().partial_deliveries;
}
// Returns the ServerConnection message which is updated by the server.
ServerConnection *FindServerConnection(std::string_view node_name);
ServerConnection *FindServerConnection(const Node *node);
- const std::vector<ServerConnection *> &server_connection() {
- return server_connection_;
- }
+ const std::vector<std::optional<NodeState>> &nodes() { return nodes_; }
// Disables sending out any statistics messages.
void DisableStatistics(bool destroy_senders);
@@ -106,16 +120,7 @@
aos::Fetcher<ClientStatistics> client_statistics_fetcher_;
// ServerConnection to fill out the offsets for from each node.
- std::vector<ServerConnection *> server_connection_;
- // All of these are indexed by the other node index.
- // Fetcher to grab timestamps and therefore offsets from the other nodes.
- std::vector<aos::Fetcher<Timestamp>> timestamp_fetchers_;
- // Bidirectional filters for each connection.
- std::vector<ClippedAverageFilter> filters_;
-
- // List of UUIDs for each node.
- std::vector<UUID> boot_uuids_;
- std::vector<bool> has_boot_uuids_;
+ std::vector<std::optional<NodeState>> nodes_;
// Sender for the timestamps that we are forwarding over the network.
aos::Sender<Timestamp> timestamp_sender_;
@@ -129,8 +134,6 @@
bool send_ = true;
- std::vector<uint32_t> partial_deliveries_;
-
size_t invalid_connection_count_ = 0u;
std::vector<flatbuffers::Offset<ClientOffset>> client_offsets_;