Extract server side timestamp filtering logic
This sets us up to reuse it as part of the simulated event loop message
gateway. It is surprisingly isolated.
Change-Id: Id327044f925b4e8209e8029777ef2a1a8c8c7c15
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 5b5e84b..c7e5a28 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -130,6 +130,26 @@
)
cc_library(
+ name = "message_bridge_server_status",
+ srcs = [
+ "message_bridge_server_status.cc",
+ ],
+ hdrs = [
+ "message_bridge_server_status.h",
+ ],
+ deps = [
+ ":message_bridge_client_fbs",
+ ":message_bridge_server_fbs",
+ ":timestamp_fbs",
+ ":timestamp_filter",
+ "//aos:flatbuffer_merge",
+ "//aos:flatbuffers",
+ "//aos/events:event_loop",
+ "//aos/time",
+ ],
+)
+
+cc_library(
name = "message_bridge_server_lib",
srcs = [
"message_bridge_server_lib.cc",
@@ -145,6 +165,7 @@
":message_bridge_client_fbs",
":message_bridge_protocol",
":message_bridge_server_fbs",
+ ":message_bridge_server_status",
":sctp_lib",
":sctp_server",
":timestamp_fbs",
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 8a575aa..2832339 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -12,61 +12,8 @@
namespace aos {
namespace message_bridge {
-namespace {
-
namespace chrono = std::chrono;
-// Builds up the "empty" server statistics message to be pointed to by all the
-// connections, updated at runtime, and periodically sent.
-FlatbufferDetachedBuffer<ServerStatistics> MakeServerStatistics(
- const std::vector<std::string_view> &source_node_names,
- const Configuration *configuration) {
- flatbuffers::FlatBufferBuilder fbb;
- fbb.ForceDefaults(true);
-
- std::vector<flatbuffers::Offset<ServerConnection>> connection_offsets;
- for (const std::string_view node_name : source_node_names) {
- flatbuffers::Offset<Node> node_offset =
- CopyFlatBuffer(configuration::GetNode(configuration, node_name), &fbb);
- ServerConnection::Builder connection_builder(fbb);
- connection_builder.add_node(node_offset);
- connection_builder.add_state(State::DISCONNECTED);
- connection_builder.add_dropped_packets(0);
- connection_builder.add_sent_packets(0);
- connection_builder.add_monotonic_offset(0);
- connection_offsets.emplace_back(connection_builder.Finish());
- }
- flatbuffers::Offset<
- flatbuffers::Vector<flatbuffers::Offset<ServerConnection>>>
- connections_offset = fbb.CreateVector(connection_offsets);
-
- ServerStatistics::Builder server_statistics_builder(fbb);
- server_statistics_builder.add_connections(connections_offset);
- fbb.Finish(server_statistics_builder.Finish());
-
- return fbb.Release();
-}
-
-// Finds the statistics for the provided node name.
-ServerConnection *FindServerConnection(ServerStatistics *statistics,
- std::string_view node_name) {
- ServerConnection *matching_server_connection = nullptr;
- for (size_t i = 0; i < statistics->mutable_connections()->size(); ++i) {
- ServerConnection *server_connection =
- statistics->mutable_connections()->GetMutableObject(i);
- if (server_connection->node()->name()->string_view() == node_name) {
- matching_server_connection = server_connection;
- break;
- }
- }
-
- CHECK(matching_server_connection != nullptr) << ": Unknown client";
-
- return matching_server_connection;
-}
-
-} // namespace
-
bool ChannelState::Matches(const Channel *other_channel) {
// Confirm the normal tuple, plus make sure that the other side isn't going to
// send more data over than we expect with a mismatching size.
@@ -214,24 +161,13 @@
MessageBridgeServer::MessageBridgeServer(aos::ShmEventLoop *event_loop)
: event_loop_(event_loop),
- sender_(event_loop_->MakeSender<ServerStatistics>("/aos")),
- statistics_(MakeServerStatistics(
- configuration::DestinationNodeNames(event_loop->configuration(),
- event_loop->node()),
- event_loop->configuration())),
- timestamp_sender_(event_loop_->MakeSender<Timestamp>("/aos")),
- client_statistics_fetcher_(
- event_loop_->MakeFetcher<ClientStatistics>("/aos")),
- server_("::", event_loop->node()->port()) {
+ server_("::", event_loop->node()->port()),
+ server_status_(event_loop, [this](const Context &context) {
+ timestamp_state_->SendData(&server_, context);
+ }) {
CHECK(event_loop_->node() != nullptr) << ": No nodes configured.";
- server_connection_offsets_.reserve(
- statistics_.message().connections()->size());
-
int32_t max_size = 0;
- timestamp_fetchers_.resize(event_loop->configuration()->nodes()->size());
- filters_.resize(event_loop->configuration()->nodes()->size());
- server_connection_.resize(event_loop->configuration()->nodes()->size());
// Seed up all the per-node connection state.
// We are making the assumption here that every connection is bidirectional
@@ -252,25 +188,6 @@
VLOG(1) << "Connection to " << destination_node_name << " has size "
<< connect_size;
max_size = std::max(max_size, connect_size);
- const Node *destination_node = configuration::GetNode(
- event_loop->configuration(), destination_node_name);
-
- const int node_index = configuration::GetNodeIndex(
- event_loop->configuration(), destination_node);
-
- // Now find the timestamp channel forwarded from the other node.
- const Channel *const other_timestamp_channel =
- configuration::GetChannel(event_loop_->configuration(), "/aos",
- Timestamp::GetFullyQualifiedName(),
- event_loop_->name(), destination_node);
-
- timestamp_fetchers_[node_index] = event_loop_->MakeFetcher<Timestamp>(
- other_timestamp_channel->name()->string_view());
-
- // And then find the server connection that we should be populating
- // statistics into.
- server_connection_[node_index] = FindServerConnection(
- statistics_.mutable_message(), destination_node->name()->string_view());
}
// TODO(austin): Logging synchronization.
@@ -304,8 +221,8 @@
connection,
configuration::GetNodeIndex(event_loop_->configuration(),
connection->name()->string_view()),
- FindServerConnection(statistics_.mutable_message(),
- connection->name()->string_view()),
+ server_status_.FindServerConnection(
+ connection->name()->string_view()),
configuration::ChannelMessageIsLoggedOnNode(channel, other_node));
}
@@ -333,12 +250,6 @@
// Buffer up the max size a bit so everything fits nicely.
LOG(INFO) << "Max message size for all clients is " << max_size;
server_.SetMaxSize(max_size + 100);
-
- statistics_timer_ = event_loop_->AddTimer([this]() { Tick(); });
- event_loop_->OnRun([this]() {
- statistics_timer_->Setup(event_loop_->monotonic_now() + kPingPeriod,
- kPingPeriod);
- });
}
void MessageBridgeServer::NodeConnected(sctp_assoc_t assoc_id) {
@@ -364,15 +275,10 @@
->Get(node_index)
->name()
->string_view();
- ResetFilter(node_index);
+ server_status_.ResetFilter(node_index);
}
}
-void MessageBridgeServer::ResetFilter(int node_index) {
- filters_[node_index].Reset();
- server_connection_[node_index]->mutate_monotonic_offset(0);
-}
-
void MessageBridgeServer::MessageReceived() {
aos::unique_c_ptr<Message> message = server_.Read();
@@ -443,7 +349,7 @@
++channel_index;
}
}
- ResetFilter(node_index);
+ server_status_.ResetFilter(node_index);
VLOG(1) << "Resetting filters for " << node_index << " "
<< event_loop_->configuration()
->nodes()
@@ -465,192 +371,5 @@
}
}
-void MessageBridgeServer::SendStatistics() {
- aos::Sender<ServerStatistics>::Builder builder = sender_.MakeBuilder();
-
- server_connection_offsets_.clear();
-
- // Copy the statistics over, but only add monotonic_offset if it is valid.
- for (const ServerConnection *connection :
- *statistics_.message().connections()) {
- flatbuffers::Offset<flatbuffers::String> node_name_offset =
- builder.fbb()->CreateString(connection->node()->name()->string_view());
- Node::Builder node_builder = builder.MakeBuilder<Node>();
- node_builder.add_name(node_name_offset);
- flatbuffers::Offset<Node> node_offset = node_builder.Finish();
-
- ServerConnection::Builder server_connection_builder =
- builder.MakeBuilder<ServerConnection>();
- server_connection_builder.add_node(node_offset);
- server_connection_builder.add_state(connection->state());
- server_connection_builder.add_dropped_packets(
- connection->dropped_packets());
- server_connection_builder.add_sent_packets(connection->sent_packets());
-
- // TODO(austin): If it gets stale, drop it too.
- if (connection->monotonic_offset() != 0) {
- server_connection_builder.add_monotonic_offset(
- connection->monotonic_offset());
- }
-
- server_connection_offsets_.emplace_back(server_connection_builder.Finish());
- }
-
- flatbuffers::Offset<
- flatbuffers::Vector<flatbuffers::Offset<ServerConnection>>>
- server_connections_offset =
- builder.fbb()->CreateVector(server_connection_offsets_);
-
- ServerStatistics::Builder server_statistics_builder =
- builder.MakeBuilder<ServerStatistics>();
- server_statistics_builder.add_connections(server_connections_offset);
- builder.Send(server_statistics_builder.Finish());
-}
-
-void MessageBridgeServer::Tick() {
- // Send statistics every kStatisticsPeriod. Use the context so we don't get
- // caught up with the wakeup delay and jitter.
- if (event_loop_->context().monotonic_event_time >=
- last_statistics_send_time_ + kStatisticsPeriod) {
- SendStatistics();
- last_statistics_send_time_ = event_loop_->context().monotonic_event_time;
- }
-
- // The message_bridge_client application measures and filters the offsets from
- // all messages it receives. It then sends this on in the ClientStatistics
- // message. Collect that up and forward it back over the Timestamp message so
- // we have guarenteed traffic on the other node for timestamping. This also
- // moves the offsets back across the network so both directions can be
- // observed.
- client_statistics_fetcher_.Fetch();
-
- // Build up the timestamp message. Do it here so that we don't have invalid
- // data in it.
- FlatbufferFixedAllocatorArray<Timestamp, 1000> timestamp_copy;
- flatbuffers::FlatBufferBuilder *fbb = timestamp_copy.fbb();
-
- if (client_statistics_fetcher_.get()) {
- // Build up the list of client offsets.
- std::vector<flatbuffers::Offset<ClientOffset>> client_offsets;
-
- // Iterate through the connections this node has made.
- for (const ClientConnection *connection :
- *client_statistics_fetcher_->connections()) {
- const int node_index = configuration::GetNodeIndex(
- event_loop_->configuration(),
- connection->node()->name()->string_view());
-
- // Filter out the ones which aren't connected.
- // And the ones without monotonic offsets.
- if (connection->state() != State::CONNECTED ||
- !connection->has_monotonic_offset() ||
- client_statistics_fetcher_.context().monotonic_event_time +
- kClientStatisticsStaleTimeout <
- event_loop_->context().monotonic_event_time) {
- VLOG(1) << "Disconnected, no offset, or client message too old for "
- << connection->node()->name()->string_view();
- ResetFilter(node_index);
- continue;
- }
-
- timestamp_fetchers_[node_index].Fetch();
-
- // Find the offset computed on their node for this client connection
- // using their timestamp message.
- bool has_their_offset = false;
- std::chrono::nanoseconds their_offset = std::chrono::nanoseconds(0);
- if (timestamp_fetchers_[node_index].get() != nullptr) {
- for (const ClientOffset *client_offset :
- *timestamp_fetchers_[node_index]->offsets()) {
- if (client_offset->node()->name()->string_view() ==
- event_loop_->node()->name()->string_view()) {
- // Make sure it has an offset and the message isn't stale.
- if (client_offset->has_monotonic_offset()) {
- if (timestamp_fetchers_[node_index]
- .context()
- .monotonic_event_time +
- kTimestampStaleTimeout >
- event_loop_->context().monotonic_event_time) {
- their_offset =
- std::chrono::nanoseconds(client_offset->monotonic_offset());
- has_their_offset = true;
- } else {
- ResetFilter(node_index);
- VLOG(1) << "Timestamp old, resetting.";
- }
- }
- break;
- }
- }
- }
-
- if (has_their_offset &&
- server_connection_[node_index]->state() == State::CONNECTED) {
- // Update the filters.
- if (filters_[node_index].MissingSamples()) {
- // Update the offset the first time. This should be representative.
- filters_[node_index].set_base_offset(
- std::chrono::nanoseconds(connection->monotonic_offset()));
- }
- // The message_bridge_clients are the ones running the first filter. So
- // set the values from that and let the averaging filter run from there.
- filters_[node_index].FwdSet(
- timestamp_fetchers_[node_index].context().monotonic_remote_time,
- std::chrono::nanoseconds(connection->monotonic_offset()));
- filters_[node_index].RevSet(
- client_statistics_fetcher_.context().monotonic_event_time,
- their_offset);
-
- // Publish!
- server_connection_[node_index]->mutate_monotonic_offset(
- -filters_[node_index].offset().count());
- }
-
- // Now fill out the Timestamp message with the offset from the client.
- flatbuffers::Offset<flatbuffers::String> node_name_offset =
- fbb->CreateString(connection->node()->name()->string_view());
-
- Node::Builder node_builder(*fbb);
- node_builder.add_name(node_name_offset);
- flatbuffers::Offset<Node> node_offset = node_builder.Finish();
-
- ClientOffset::Builder client_offset_builder(*fbb);
- client_offset_builder.add_node(node_offset);
- client_offset_builder.add_monotonic_offset(
- connection->monotonic_offset());
- client_offsets.emplace_back(client_offset_builder.Finish());
- }
- flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<ClientOffset>>>
- offsets_offset = fbb->CreateVector(client_offsets);
-
- Timestamp::Builder builder(*fbb);
- builder.add_offsets(offsets_offset);
- timestamp_copy.Finish(builder.Finish());
- } else {
- // Publish an empty timestamp if we have nothing.
- flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<ClientOffset>>>
- offsets_offset =
- fbb->CreateVector(std::vector<flatbuffers::Offset<ClientOffset>>{});
- Timestamp::Builder builder(*fbb);
- builder.add_offsets(offsets_offset);
- timestamp_copy.Finish(builder.Finish());
- }
-
- // Send it out over shm, and using that timestamp, then send it out over sctp.
- // This avoid some context switches.
- timestamp_sender_.Send(timestamp_copy);
-
- Context context;
- context.monotonic_event_time = timestamp_sender_.monotonic_sent_time();
- context.realtime_event_time = timestamp_sender_.realtime_sent_time();
- context.queue_index = timestamp_sender_.sent_queue_index();
- context.size = timestamp_copy.size();
- context.data = timestamp_copy.data();
-
- // Since we are building up the timestamp to send here, we need to trigger the
- // SendData call ourselves.
- timestamp_state_->SendData(&server_, context);
-}
-
} // namespace message_bridge
} // namespace aos
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index 8e8ec64..b6a4f05 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -10,6 +10,7 @@
#include "aos/network/connect_generated.h"
#include "aos/network/message_bridge_client_generated.h"
#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/message_bridge_server_status.h"
#include "aos/network/sctp_server.h"
#include "aos/network/timestamp_generated.h"
#include "glog/logging.h"
@@ -94,12 +95,6 @@
~MessageBridgeServer() { event_loop_->epoll()->DeleteFd(server_.fd()); }
- // Time after which we consider the client statistics message stale, and reset
- // the filter.
- static constexpr std::chrono::seconds kClientStatisticsStaleTimeout{1};
- // Time after which we consider the timestamp stale, and reset the filter.
- static constexpr std::chrono::milliseconds kTimestampStaleTimeout{250};
-
private:
// Reads a message from the socket. Could be a notification.
void MessageReceived();
@@ -109,53 +104,19 @@
// Called when the server connection disconnects.
void NodeDisconnected(sctp_assoc_t assoc_id);
- // Resets the filter and clears the entry from the server statistics.
- void ResetFilter(int node_index);
-
// Called when data (either a connection request or delivery timestamps) is
// received.
void HandleData(const Message *message);
- // Handle timestamps and statistics.
- void Tick();
-
- // Sends out the statistics that are continually updated by the
- // ChannelState's.
- void SendStatistics();
-
// Event loop to schedule everything on.
aos::ShmEventLoop *event_loop_;
- // Statistics, timer, and associated sender.
- aos::Sender<ServerStatistics> sender_;
- aos::TimerHandler *statistics_timer_;
- FlatbufferDetachedBuffer<ServerStatistics> statistics_;
- std::vector<flatbuffers::Offset<ServerConnection>> server_connection_offsets_;
-
- // Sender for the timestamps that we are forwarding over the network.
- aos::Sender<Timestamp> timestamp_sender_;
- // ChannelState to send timestamps over the network with.
- ChannelState *timestamp_state_ = nullptr;
-
- // Fetcher to grab the measured offsets in the client.
- aos::Fetcher<ClientStatistics> client_statistics_fetcher_;
- // All of these are indexed by the other node index.
- // Fetcher to grab timestamps and therefore offsets from the other nodes.
- std::vector<aos::Fetcher<Timestamp>> timestamp_fetchers_;
- // Bidirectional filters for each connection.
- std::vector<ClippedAverageFilter> filters_;
- // ServerConnection to fill out the offsets for from each node.
- std::vector<ServerConnection *> server_connection_;
-
SctpServer server_;
- static constexpr std::chrono::nanoseconds kStatisticsPeriod =
- std::chrono::seconds(1);
- static constexpr std::chrono::nanoseconds kPingPeriod =
- std::chrono::milliseconds(100);
+ MessageBridgeServerStatus server_status_;
- aos::monotonic_clock::time_point last_statistics_send_time_ =
- aos::monotonic_clock::min_time;
+ // ChannelState to send timestamps over the network with.
+ ChannelState *timestamp_state_ = nullptr;
// List of channels. The entries that aren't sent from this node are left
// null.
diff --git a/aos/network/message_bridge_server_status.cc b/aos/network/message_bridge_server_status.cc
new file mode 100644
index 0000000..21badef
--- /dev/null
+++ b/aos/network/message_bridge_server_status.cc
@@ -0,0 +1,327 @@
+#include "aos/network/message_bridge_server_status.h"
+
+#include <chrono>
+#include <functional>
+
+#include "aos/configuration.h"
+#include "aos/events/event_loop.h"
+#include "aos/flatbuffer_merge.h"
+#include "aos/flatbuffers.h"
+#include "aos/network/message_bridge_client_generated.h"
+#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/timestamp_filter.h"
+#include "aos/network/timestamp_generated.h"
+
+namespace aos {
+namespace message_bridge {
+namespace {
+
+namespace chrono = std::chrono;
+
+// Builds up the "empty" server statistics message to be pointed to by all the
+// connections, updated at runtime, and periodically sent.
+FlatbufferDetachedBuffer<ServerStatistics> MakeServerStatistics(
+ const std::vector<std::string_view> &source_node_names,
+ const Configuration *configuration) {
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+
+ std::vector<flatbuffers::Offset<ServerConnection>> connection_offsets;
+ for (const std::string_view node_name : source_node_names) {
+ flatbuffers::Offset<Node> node_offset =
+ CopyFlatBuffer(configuration::GetNode(configuration, node_name), &fbb);
+ ServerConnection::Builder connection_builder(fbb);
+ connection_builder.add_node(node_offset);
+ connection_builder.add_state(State::DISCONNECTED);
+ connection_builder.add_dropped_packets(0);
+ connection_builder.add_sent_packets(0);
+ connection_builder.add_monotonic_offset(0);
+ connection_offsets.emplace_back(connection_builder.Finish());
+ }
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<ServerConnection>>>
+ connections_offset = fbb.CreateVector(connection_offsets);
+
+ ServerStatistics::Builder server_statistics_builder(fbb);
+ server_statistics_builder.add_connections(connections_offset);
+ fbb.Finish(server_statistics_builder.Finish());
+
+ return fbb.Release();
+}
+
+// Finds the statistics for the provided node name.
+ServerConnection *FindServerConnection(ServerStatistics *statistics,
+ std::string_view node_name) {
+ ServerConnection *matching_server_connection = nullptr;
+ for (size_t i = 0; i < statistics->mutable_connections()->size(); ++i) {
+ ServerConnection *server_connection =
+ statistics->mutable_connections()->GetMutableObject(i);
+ if (server_connection->node()->name()->string_view() == node_name) {
+ matching_server_connection = server_connection;
+ break;
+ }
+ }
+
+ CHECK(matching_server_connection != nullptr) << ": Unknown client";
+
+ return matching_server_connection;
+}
+
+} // namespace
+
+MessageBridgeServerStatus::MessageBridgeServerStatus(
+ aos::EventLoop *event_loop, std::function<void(const Context &)> send_data)
+ : event_loop_(event_loop),
+ sender_(event_loop_->MakeSender<ServerStatistics>("/aos")),
+ statistics_(MakeServerStatistics(
+ configuration::DestinationNodeNames(event_loop_->configuration(),
+ event_loop_->node()),
+ event_loop->configuration())),
+ client_statistics_fetcher_(
+ event_loop_->MakeFetcher<ClientStatistics>("/aos")),
+ timestamp_sender_(event_loop_->MakeSender<Timestamp>("/aos")),
+ send_data_(send_data) {
+ server_connection_offsets_.reserve(
+ statistics_.message().connections()->size());
+
+ filters_.resize(event_loop->configuration()->nodes()->size());
+ timestamp_fetchers_.resize(event_loop->configuration()->nodes()->size());
+ server_connection_.resize(event_loop->configuration()->nodes()->size());
+
+ // Seed up all the per-node connection state.
+ // We are making the assumption here that every connection is bidirectional
+ // (data is being sent both ways). This is pretty safe because we are
+ // forwarding timestamps between nodes.
+ for (std::string_view destination_node_name :
+ configuration::DestinationNodeNames(event_loop->configuration(),
+ event_loop->node())) {
+ const Node *destination_node = configuration::GetNode(
+ event_loop->configuration(), destination_node_name);
+
+ const int node_index = configuration::GetNodeIndex(
+ event_loop->configuration(), destination_node);
+
+ // Now find the timestamp channel forwarded from the other node.
+ const Channel *const other_timestamp_channel =
+ configuration::GetChannel(event_loop_->configuration(), "/aos",
+ Timestamp::GetFullyQualifiedName(),
+ event_loop_->name(), destination_node);
+
+ timestamp_fetchers_[node_index] = event_loop_->MakeFetcher<Timestamp>(
+ other_timestamp_channel->name()->string_view());
+
+ // And then find the server connection that we should be populating
+ // statistics into.
+ server_connection_[node_index] =
+ FindServerConnection(destination_node->name()->string_view());
+ }
+
+ statistics_timer_ = event_loop_->AddTimer([this]() { Tick(); });
+ event_loop_->OnRun([this]() {
+ statistics_timer_->Setup(event_loop_->monotonic_now() + kPingPeriod,
+ kPingPeriod);
+ });
+}
+
+ServerConnection *MessageBridgeServerStatus::FindServerConnection(
+ std::string_view node_name) {
+ return message_bridge::FindServerConnection(statistics_.mutable_message(),
+ node_name);
+}
+
+void MessageBridgeServerStatus::ResetFilter(int node_index) {
+ filters_[node_index].Reset();
+ server_connection_[node_index]->mutate_monotonic_offset(0);
+}
+
+void MessageBridgeServerStatus::SendStatistics() {
+ aos::Sender<ServerStatistics>::Builder builder = sender_.MakeBuilder();
+
+ server_connection_offsets_.clear();
+
+ // Copy the statistics over, but only add monotonic_offset if it is valid.
+ for (const ServerConnection *connection :
+ *statistics_.message().connections()) {
+ flatbuffers::Offset<flatbuffers::String> node_name_offset =
+ builder.fbb()->CreateString(connection->node()->name()->string_view());
+ Node::Builder node_builder = builder.MakeBuilder<Node>();
+ node_builder.add_name(node_name_offset);
+ flatbuffers::Offset<Node> node_offset = node_builder.Finish();
+
+ ServerConnection::Builder server_connection_builder =
+ builder.MakeBuilder<ServerConnection>();
+ server_connection_builder.add_node(node_offset);
+ server_connection_builder.add_state(connection->state());
+ server_connection_builder.add_dropped_packets(
+ connection->dropped_packets());
+ server_connection_builder.add_sent_packets(connection->sent_packets());
+
+ // TODO(austin): If it gets stale, drop it too.
+ if (connection->monotonic_offset() != 0) {
+ server_connection_builder.add_monotonic_offset(
+ connection->monotonic_offset());
+ }
+
+ server_connection_offsets_.emplace_back(server_connection_builder.Finish());
+ }
+
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<ServerConnection>>>
+ server_connections_offset =
+ builder.fbb()->CreateVector(server_connection_offsets_);
+
+ ServerStatistics::Builder server_statistics_builder =
+ builder.MakeBuilder<ServerStatistics>();
+ server_statistics_builder.add_connections(server_connections_offset);
+ builder.Send(server_statistics_builder.Finish());
+}
+
+void MessageBridgeServerStatus::Tick() {
+ // Send statistics every kStatisticsPeriod. Use the context so we don't get
+ // caught up with the wakeup delay and jitter.
+ if (event_loop_->context().monotonic_event_time >=
+ last_statistics_send_time_ + kStatisticsPeriod) {
+ SendStatistics();
+ last_statistics_send_time_ = event_loop_->context().monotonic_event_time;
+ }
+
+ // The message_bridge_client application measures and filters the offsets from
+ // all messages it receives. It then sends this on in the ClientStatistics
+ // message. Collect that up and forward it back over the Timestamp message so
+ // we have guarenteed traffic on the other node for timestamping. This also
+ // moves the offsets back across the network so both directions can be
+ // observed.
+ client_statistics_fetcher_.Fetch();
+
+ // Build up the timestamp message. Do it here so that we don't have invalid
+ // data in it.
+ FlatbufferFixedAllocatorArray<Timestamp, 1000> timestamp_copy;
+ flatbuffers::FlatBufferBuilder *fbb = timestamp_copy.fbb();
+
+ if (client_statistics_fetcher_.get()) {
+ // Build up the list of client offsets.
+ std::vector<flatbuffers::Offset<ClientOffset>> client_offsets;
+
+ // Iterate through the connections this node has made.
+ for (const ClientConnection *connection :
+ *client_statistics_fetcher_->connections()) {
+ const int node_index = configuration::GetNodeIndex(
+ event_loop_->configuration(),
+ connection->node()->name()->string_view());
+
+ // Filter out the ones which aren't connected.
+ // And the ones without monotonic offsets.
+ if (connection->state() != State::CONNECTED ||
+ !connection->has_monotonic_offset() ||
+ client_statistics_fetcher_.context().monotonic_event_time +
+ MessageBridgeServerStatus::kClientStatisticsStaleTimeout <
+ event_loop_->context().monotonic_event_time) {
+ VLOG(1) << "Disconnected, no offset, or client message too old for "
+ << connection->node()->name()->string_view();
+ ResetFilter(node_index);
+ continue;
+ }
+
+ timestamp_fetchers_[node_index].Fetch();
+
+ // Find the offset computed on their node for this client connection
+ // using their timestamp message.
+ bool has_their_offset = false;
+ chrono::nanoseconds their_offset = chrono::nanoseconds(0);
+ if (timestamp_fetchers_[node_index].get() != nullptr) {
+ for (const ClientOffset *client_offset :
+ *timestamp_fetchers_[node_index]->offsets()) {
+ if (client_offset->node()->name()->string_view() ==
+ event_loop_->node()->name()->string_view()) {
+ // Make sure it has an offset and the message isn't stale.
+ if (client_offset->has_monotonic_offset()) {
+ if (timestamp_fetchers_[node_index]
+ .context()
+ .monotonic_event_time +
+ MessageBridgeServerStatus::kTimestampStaleTimeout >
+ event_loop_->context().monotonic_event_time) {
+ their_offset =
+ chrono::nanoseconds(client_offset->monotonic_offset());
+ has_their_offset = true;
+ } else {
+ ResetFilter(node_index);
+ VLOG(1) << "Timestamp old, resetting.";
+ }
+ }
+ break;
+ }
+ }
+ }
+
+ if (has_their_offset &&
+ server_connection_[node_index]->state() == State::CONNECTED) {
+ // Update the filters.
+ if (filters_[node_index].MissingSamples()) {
+ // Update the offset the first time. This should be representative.
+ filters_[node_index].set_base_offset(
+ chrono::nanoseconds(connection->monotonic_offset()));
+ }
+ // The message_bridge_clients are the ones running the first filter. So
+ // set the values from that and let the averaging filter run from there.
+ filters_[node_index].FwdSet(
+ timestamp_fetchers_[node_index].context().monotonic_remote_time,
+ chrono::nanoseconds(connection->monotonic_offset()));
+ filters_[node_index].RevSet(
+ client_statistics_fetcher_.context().monotonic_event_time,
+ their_offset);
+
+ // Publish!
+ server_connection_[node_index]->mutate_monotonic_offset(
+ -filters_[node_index].offset().count());
+ }
+
+ // Now fill out the Timestamp message with the offset from the client.
+ flatbuffers::Offset<flatbuffers::String> node_name_offset =
+ fbb->CreateString(connection->node()->name()->string_view());
+
+ Node::Builder node_builder(*fbb);
+ node_builder.add_name(node_name_offset);
+ flatbuffers::Offset<Node> node_offset = node_builder.Finish();
+
+ ClientOffset::Builder client_offset_builder(*fbb);
+ client_offset_builder.add_node(node_offset);
+ client_offset_builder.add_monotonic_offset(
+ connection->monotonic_offset());
+ client_offsets.emplace_back(client_offset_builder.Finish());
+ }
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<ClientOffset>>>
+ offsets_offset = fbb->CreateVector(client_offsets);
+
+ Timestamp::Builder builder(*fbb);
+ builder.add_offsets(offsets_offset);
+ timestamp_copy.Finish(builder.Finish());
+ } else {
+ // Publish an empty timestamp if we have nothing.
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<ClientOffset>>>
+ offsets_offset =
+ fbb->CreateVector(std::vector<flatbuffers::Offset<ClientOffset>>{});
+ Timestamp::Builder builder(*fbb);
+ builder.add_offsets(offsets_offset);
+ timestamp_copy.Finish(builder.Finish());
+ }
+
+ // Send it out over shm, and using that timestamp, then send it out over sctp.
+ // This avoid some context switches.
+ timestamp_sender_.Send(timestamp_copy);
+
+ Context context;
+ context.monotonic_event_time = timestamp_sender_.monotonic_sent_time();
+ context.realtime_event_time = timestamp_sender_.realtime_sent_time();
+ context.queue_index = timestamp_sender_.sent_queue_index();
+ context.size = timestamp_copy.size();
+ context.data = timestamp_copy.data();
+
+ // Since we are building up the timestamp to send here, we need to trigger the
+ // SendData call ourselves.
+ if (send_data_) {
+ send_data_(context);
+ }
+}
+
+} // namespace message_bridge
+} // namespace aos
diff --git a/aos/network/message_bridge_server_status.h b/aos/network/message_bridge_server_status.h
new file mode 100644
index 0000000..adb747d
--- /dev/null
+++ b/aos/network/message_bridge_server_status.h
@@ -0,0 +1,82 @@
+#ifndef AOS_NETWORK_MESSAGE_BRIDGE_SERVER_STATUS_H_
+#define AOS_NETWORK_MESSAGE_BRIDGE_SERVER_STATUS_H_
+
+#include <chrono>
+#include <functional>
+
+#include "aos/events/event_loop.h"
+#include "aos/network/message_bridge_client_generated.h"
+#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/timestamp_filter.h"
+#include "aos/network/timestamp_generated.h"
+#include "aos/time/time.h"
+
+namespace aos {
+namespace message_bridge {
+
+// This class encapsulates the server side of sending server statistics and
+// managing timestamp offsets.
+class MessageBridgeServerStatus {
+ public:
+ // Time after which we consider the client statistics message stale, and reset
+ // the filter.
+ static constexpr std::chrono::seconds kClientStatisticsStaleTimeout{1};
+ // Time after which we consider the timestamp stale, and reset the filter.
+ static constexpr std::chrono::milliseconds kTimestampStaleTimeout{250};
+
+ MessageBridgeServerStatus(aos::EventLoop *event_loop,
+ std::function<void(const Context &)> send_data =
+ std::function<void(const Context &)>());
+
+ // Resets the filter and clears the entry from the server statistics.
+ void ResetFilter(int node_index);
+
+ // Returns the ServerConnection message which is updated by the server.
+ ServerConnection *FindServerConnection(std::string_view node_name);
+
+ private:
+ static constexpr std::chrono::nanoseconds kStatisticsPeriod =
+ std::chrono::seconds(1);
+ static constexpr std::chrono::nanoseconds kPingPeriod =
+ std::chrono::milliseconds(100);
+
+ // Handle timestamps and statistics.
+ void Tick();
+
+ // Sends out the statistics that are continually updated by the
+ // ChannelState's.
+ void SendStatistics();
+
+ aos::EventLoop *event_loop_;
+
+ // Statistics, timer, and associated sender.
+ aos::Sender<ServerStatistics> sender_;
+ aos::TimerHandler *statistics_timer_;
+ FlatbufferDetachedBuffer<ServerStatistics> statistics_;
+ std::vector<flatbuffers::Offset<ServerConnection>> server_connection_offsets_;
+
+ // Fetcher to grab the measured offsets in the client.
+ aos::Fetcher<ClientStatistics> client_statistics_fetcher_;
+
+ // ServerConnection to fill out the offsets for from each node.
+ std::vector<ServerConnection *> server_connection_;
+ // All of these are indexed by the other node index.
+ // Fetcher to grab timestamps and therefore offsets from the other nodes.
+ std::vector<aos::Fetcher<Timestamp>> timestamp_fetchers_;
+ // Bidirectional filters for each connection.
+ std::vector<ClippedAverageFilter> filters_;
+
+ // Sender for the timestamps that we are forwarding over the network.
+ aos::Sender<Timestamp> timestamp_sender_;
+
+ aos::monotonic_clock::time_point last_statistics_send_time_ =
+ aos::monotonic_clock::min_time;
+
+ std::function<void(const Context &)> send_data_;
+};
+
+
+} // namespace message_bridge
+} // namespace aos
+
+#endif // AOS_NETWORK_MESSAGE_BRIDGE_SERVER_STATUS_H_